Interface BatchStage<T>
- Type Parameters:
T
- the type of items coming out of this stage
- All Superinterfaces:
GeneralStage<T>
,Stage
public interface BatchStage<T> extends GeneralStage<T>
pipeline
that will
observe a finite amount of data (a batch). It accepts input from its
upstream stages (if any) and passes its output to its downstream stages.- Since:
- 3.0
-
Method Summary
Modifier and Type Method Description <R> BatchStage<R>
aggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all the items it receives.default <T1, R0, R1> BatchStage<Tuple2<R0,R1>>
aggregate2(AggregateOperation1<? super T,?,? extends R0> aggrOp0, BatchStage<T1> stage1, AggregateOperation1<? super T1,?,? extends R1> aggrOp1)
Attaches a stage that co-aggregates the data from this and the supplied stage by performing a separate aggregate operation on each and emitting aTuple2
with their results.<T1, R> BatchStage<R>
aggregate2(BatchStage<T1> stage1, AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all the items it receives from both this stage andstage1
you supply.default <T1, T2, R0, R1, R2>
BatchStage<Tuple3<R0,R1,R2>>aggregate3(AggregateOperation1<? super T,?,? extends R0> aggrOp0, BatchStage<T1> stage1, AggregateOperation1<? super T1,?,? extends R1> aggrOp1, BatchStage<T2> stage2, AggregateOperation1<? super T2,?,? extends R2> aggrOp2)
Attaches a stage that co-aggregates the data from this and the two supplied stages by performing a separate aggregate operation on each and emitting aTuple3
with their results.<T1, T2, R> BatchStage<R>
aggregate3(BatchStage<T1> stage1, BatchStage<T2> stage2, AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)
Attaches a stage that performs the given aggregate operation over all the items it receives from this stage as well asstage1
andstage2
you supply.default AggregateBuilder1<T>
aggregateBuilder()
Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default <R0> AggregateBuilder<R0>
aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)
Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages.default <R> BatchStage<R>
apply(FunctionEx<? super BatchStage<T>,? extends BatchStage<R>> transformFn)
Transformsthis
stage using the providedtransformFn
and returns the transformed stage.default <R> BatchStage<R>
customTransform(String stageName, SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.<R> BatchStage<R>
customTransform(String stageName, ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.default <R> BatchStage<R>
customTransform(String stageName, ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.default BatchStage<T>
distinct()
Attaches a stage that emits just the items that are distinct according to their definition of equality (equals
andhashCode
).BatchStage<T>
filter(PredicateEx<T> filterFn)
Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it.<S> BatchStage<T>
filterStateful(SupplierEx<? extends S> createFn, BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation.<S> BatchStage<T>
filterUsingService(ServiceFactory<?,S> serviceFactory, BiPredicateEx<? super S,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it.<R> BatchStage<R>
flatMap(FunctionEx<? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverser
it returns.<S, R> BatchStage<R>
flatMapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation.<S, R> BatchStage<R>
flatMapUsingService(ServiceFactory<?,S> serviceFactory, BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all items from theTraverser
it returns as the output items.<K> BatchStageWithKey<T,K>
groupingKey(FunctionEx<? super T,? extends K> keyFn)
Specifies the function that will extract a key from the items in the associated pipeline stage.<K, T1_IN, T1, R>
BatchStage<R>hashJoin(BatchStage<T1_IN> stage1, JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, BiFunctionEx<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage a hash-joining stage and returns it.<K1, K2, T1_IN, T2_IN, T1, T2, R>
BatchStage<R>hashJoin2(BatchStage<T1_IN> stage1, JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, BatchStage<T2_IN> stage2, JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, TriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages a hash-joining stage and returns it.default HashJoinBuilder<T>
hashJoinBuilder()
Returns a fluent API builder object to construct a hash join operation with any number of contributing stages.<K, T1_IN, T1, R>
BatchStage<R>innerHashJoin(BatchStage<T1_IN> stage1, JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, BiFunctionEx<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage an inner hash-joining stage and returns it.<K1, K2, T1_IN, T2_IN, T1, T2, R>
BatchStage<R>innerHashJoin2(BatchStage<T1_IN> stage1, JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, BatchStage<T2_IN> stage2, JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, TriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages a inner hash-joining stage and returns it.<R> BatchStage<R>
map(FunctionEx<? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item.<S, R> BatchStage<R>
mapStateful(SupplierEx<? extends S> createFn, BiFunctionEx<? super S,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation.default <K, V, R> BatchStage<R>
mapUsingIMap(IMap<K,V> iMap, FunctionEx<? super T,? extends K> lookupKeyFn, BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the suppliedIMap
is performed and the result of the lookup is merged with the item and emitted.default <K, V, R> BatchStage<R>
mapUsingIMap(String mapName, FunctionEx<? super T,? extends K> lookupKeyFn, BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in theIMap
with the supplied name is performed and the result of the lookup is merged with the item and emitted.default <K, V, R> BatchStage<R>
mapUsingReplicatedMap(ReplicatedMap<K,V> replicatedMap, FunctionEx<? super T,? extends K> lookupKeyFn, BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the suppliedReplicatedMap
is performed and the result of the lookup is merged with the item and emitted.default <K, V, R> BatchStage<R>
mapUsingReplicatedMap(String mapName, FunctionEx<? super T,? extends K> lookupKeyFn, BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in theReplicatedMap
with the supplied name is performed and the result of the lookup is merged with the item and emitted.<S, R> BatchStage<R>
mapUsingService(ServiceFactory<?,S> serviceFactory, BiFunctionEx<? super S,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the supplied function to each input item independently and emits the function's result as the output item.<S, R> BatchStage<R>
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
Asynchronous version ofGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.default <S, R> BatchStage<R>
mapUsingServiceAsync(ServiceFactory<?,S> serviceFactory, BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)
Asynchronous version ofGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.<S, R> BatchStage<R>
mapUsingServiceAsyncBatched(ServiceFactory<?,S> serviceFactory, int maxBatchSize, BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)
Batched version ofGeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items and returns aCompletableFuture<List<R>>
.BatchStage<T>
merge(BatchStage<? extends T> other)
Attaches a stage that emits all the items from this stage as well as all the items from the supplied stage.default BatchStage<T>
peek()
Adds a peeking layer to this compute stage which logs its output.default BatchStage<T>
peek(FunctionEx<? super T,? extends CharSequence> toStringFn)
Adds a peeking layer to this compute stage which logs its output.BatchStage<T>
peek(PredicateEx<? super T> shouldLogFn, FunctionEx<? super T,? extends CharSequence> toStringFn)
Attaches a peeking stage which logs this stage's output and passes it through without transformation.default <A, R> BatchStage<R>
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage.BatchStage<T>
setLocalParallelism(int localParallelism)
Sets the preferred local parallelism (number of processors per Jet cluster member) this stage will configure its DAG vertices with.BatchStage<T>
setName(String name)
Overrides the default name of the stage with the name you choose and returns the stage.
-
Method Details
-
groupingKey
@Nonnull <K> BatchStageWithKey<T,K> groupingKey(@Nonnull FunctionEx<? super T,? extends K> keyFn)Specifies the function that will extract a key from the items in the associated pipeline stage. This enables the operations that need the key, such as grouped aggregation.Sample usage:
users.groupingKey(User::getId)
Note: make sure the extracted key is not-null, it would fail the job otherwise. Also make sure that it implements
equals()
andhashCode()
.- Specified by:
groupingKey
in interfaceGeneralStage<T>
- Type Parameters:
K
- type of the key- Parameters:
keyFn
- function that extracts the grouping key- Returns:
- the newly attached stage
-
map
Description copied from interface:GeneralStage
Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. If the result isnull
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.This sample takes a stream of names and outputs the names in lowercase:
stage.map(name -> name.toLowerCase())
- Specified by:
map
in interfaceGeneralStage<T>
- Type Parameters:
R
- the result type of the mapping function- Parameters:
mapFn
- a stateless mapping function- Returns:
- the newly attached stage
-
filter
Description copied from interface:GeneralStage
Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. Returns the newly attached stage.This sample removes empty strings from the stream:
stage.filter(name -> !name.isEmpty())
- Specified by:
filter
in interfaceGeneralStage<T>
- Parameters:
filterFn
- a stateless filter predicate function- Returns:
- the newly attached stage
-
flatMap
@Nonnull <R> BatchStage<R> flatMap(@Nonnull FunctionEx<? super T,? extends Traverser<R>> flatMapFn)Description copied from interface:GeneralStage
Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from theTraverser
it returns. The traverser must be null-terminated.This sample takes a stream of sentences and outputs a stream of individual words in them:
stage.flatMap(sentence -> traverseArray(sentence.split("\\W+")))
- Specified by:
flatMap
in interfaceGeneralStage<T>
- Type Parameters:
R
- the type of items in the result's traversers- Parameters:
flatMapFn
- a stateless flatmapping function, whose result type is Jet'sTraverser
. It must not return null traverser, but can return an empty traverser.- Returns:
- the newly attached stage
-
mapStateful
@Nonnull <S, R> BatchStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a stage that performs a stateful mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item tomapFn
, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.This sample takes a stream of
long
numbers representing request latencies, computes the cumulative latency of all requests so far, and starts emitting alarm messages when the cumulative latency crosses a "bad behavior" threshold:
This code has the same result asStreamStage<Long> latencyAlarms = latencies.mapStateful( LongAccumulator::new, (sum, latency) -> { sum.add(latency); long cumulativeLatency = sum.get(); return (cumulativeLatency <= LATENCY_THRESHOLD) ? null : cumulativeLatency; } );
latencies.rollingAggregate(summing())
.- Specified by:
mapStateful
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
createFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and outputs the result item. It may modify the state object.
-
filterStateful
@Nonnull <S> BatchStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)Description copied from interface:GeneralStage
Attaches a stage that performs a stateful filtering operation.createFn
returns the object that holds the state. Jet passes this object along with each input item tofilterFn
, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.This sample decimates the input (throws out every 10th item):
GeneralStage<String> decimated = input.filterStateful( LongAccumulator::new, (counter, item) -> { counter.add(1); return counter.get() % 10 != 0; } );
- Specified by:
filterStateful
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of the state object- Parameters:
createFn
- function that returns the state objectfilterFn
- function that receives the state object and the input item and produces the boolean result. It may modify the state object.
-
flatMapStateful
@Nonnull <S, R> BatchStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)Description copied from interface:GeneralStage
Attaches a stage that performs a stateful flat-mapping operation.createFn
returns the object that holds the state. Jet passes this object along with each input item toflatMapFn
, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.This sample inserts a punctuation mark (a special string) after every 10th input string:
GeneralStage<String> punctuated = input.flatMapStateful( LongAccumulator::new, (counter, item) -> { counter.add(1); return counter.get() % 10 == 0 ? Traversers.traverseItems("punctuation", item) : Traversers.singleton(item); } );
- Specified by:
flatMapStateful
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of the state objectR
- type of the result- Parameters:
createFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser.
-
rollingAggregate
@Nonnull default <A, R> BatchStage<R> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)Description copied from interface:GeneralStage
Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses anAggregateOperation
. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by theexport
primitive).Sample usage:
For example, if your input isstage.rollingAggregate(AggregateOperations.summing())
{2, 7, 8, -5}
, the output will be{2, 9, 17, 12}
.This stage is fault-tolerant and saves its state to the snapshot.
NOTE: since the output for each item depends on all the previous items, this operation cannot be parallelized. Jet will perform it on a single member, single-threaded. Jet also supports
keyed rolling aggregation
which it can parallelize by partitioning.- Specified by:
rollingAggregate
in interfaceGeneralStage<T>
R
- result type of the aggregate operation- Parameters:
aggrOp
- the aggregate operation to do the aggregation- Returns:
- the newly attached stage
-
mapUsingService
@Nonnull <S, R> BatchStage<R> mapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a mapping stage which applies the supplied function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
.If the mapping result is
null
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.This sample takes a stream of stock items and sets the
detail
field on them by looking up from a registry:stage.mapUsingService( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())), (reg, item) -> item.setDetail(reg.fetchDetail(item)) )
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
mapUsingService
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service objectR
- the result type of the mapping function- Parameters:
serviceFactory
- the service factorymapFn
- a stateless mapping function- Returns:
- the newly attached stage
-
mapUsingServiceAsync
@Nonnull default <S, R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)Description copied from interface:GeneralStage
Asynchronous version ofGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the
detail
field on them by looking up from a registry:stage.mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())), (reg, item) -> reg.fetchDetailAsync(item) .thenApply(detail -> item.setDetail(detail)) )
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
mapUsingServiceAsync
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service objectR
- the future result type of the mapping function- Parameters:
serviceFactory
- the service factorymapAsyncFn
- a stateless mapping function. Can map to null (return a null future)- Returns:
- the newly attached stage
-
mapUsingServiceAsync
@Nonnull <S, R> BatchStage<R> mapUsingServiceAsync(@Nonnull ServiceFactory<?,S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S,? super T,? extends CompletableFuture<R>> mapAsyncFn)Description copied from interface:GeneralStage
Asynchronous version ofGeneralStage.mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>)
: themapAsyncFn
returns aCompletableFuture<R>
instead of justR
.The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the
detail
field on them by looking up from a registry:stage.mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())), (reg, item) -> reg.fetchDetailAsync(item) .thenApply(detail -> item.setDetail(detail)) )
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
mapUsingServiceAsync
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service objectR
- the future result type of the mapping function- Parameters:
serviceFactory
- the service factorymaxConcurrentOps
- maximum number of concurrent async operations per processorpreserveOrder
- whether the ordering of the input items should be preservedmapAsyncFn
- a stateless mapping function. Can map to null (return a null future)- Returns:
- the newly attached stage
-
filterUsingService
@Nonnull <S> BatchStage<T> filterUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiPredicateEx<? super S,? super T> filterFn)Description copied from interface:GeneralStage
Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
.This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:
photos.filterUsingService( ServiceFactories.sharedService(ctx -> new ImageClassifier(ctx.jetInstance())), (classifier, photo) -> classifier.classify(photo).equals("cat") )
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
filterUsingService
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service object- Parameters:
serviceFactory
- the service factoryfilterFn
- a stateless filter predicate function- Returns:
- the newly attached stage
-
flatMapUsingService
@Nonnull <S, R> BatchStage<R> flatMapUsingService(@Nonnull ServiceFactory<?,S> serviceFactory, @Nonnull BiFunctionEx<? super S,? super T,? extends Traverser<R>> flatMapFn)Description copied from interface:GeneralStage
Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all items from theTraverser
it returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the suppliedserviceFactory
.This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:
StreamStage<Part> parts = products.flatMapUsingService( ServiceFactories.sharedService(ctx -> new PartRegistryCtx()), (registry, product) -> Traversers.traverseIterable( registry.fetchParts(product)) );
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
flatMapUsingService
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service objectR
- the type of items in the result's traversers- Parameters:
serviceFactory
- the service factoryflatMapFn
- a stateless flatmapping function, whose result type is Jet'sTraverser
. It must not return null traverser, but can return an empty traverser.- Returns:
- the newly attached stage
-
mapUsingServiceAsyncBatched
@Nonnull <S, R> BatchStage<R> mapUsingServiceAsyncBatched(@Nonnull ServiceFactory<?,S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,? super List<T>,? extends CompletableFuture<List<R>>> mapAsyncFn)Description copied from interface:GeneralStage
Batched version ofGeneralStage.mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>)
:mapAsyncFn
takes a list of input items and returns aCompletableFuture<List<R>>
. The size of the input list is limited by the givenmaxBatchSize
.The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.
This transform can perform filtering by putting
null
elements into the output list.The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the
detail
field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as100
:stage.mapUsingServiceAsyncBatched( ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())), 100, (reg, itemList) -> reg .fetchDetailsAsync(itemList) .thenApply(detailList -> { for (int i = 0; i < itemList.size(); i++) { itemList.get(i).setDetail(detailList.get(i)) } }) )
Interaction with fault-tolerant unbounded jobs
If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.- Specified by:
mapUsingServiceAsyncBatched
in interfaceGeneralStage<T>
- Type Parameters:
S
- type of service objectR
- the future result type of the mapping function- Parameters:
serviceFactory
- the service factorymaxBatchSize
- max size of the input listmapAsyncFn
- a stateless mapping function- Returns:
- the newly attached stage
-
mapUsingReplicatedMap
@Nonnull default <K, V, R> BatchStage<R> mapUsingReplicatedMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a mapping stage where for each item a lookup in theReplicatedMap
with the supplied name is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
This sample takes a stream of stock items and sets theK key = lookupKeyFn.apply(item); V value = replicatedMap.get(key); return mapFn.apply(item, value);
detail
field on them by looking up from a registry:items.mapUsingReplicatedMap( "enriching-map", item -> item.getDetailId(), (Item item, ItemDetail detail) -> item.setDetail(detail) )
- Specified by:
mapUsingReplicatedMap
in interfaceGeneralStage<T>
- Type Parameters:
K
- type of the key in theReplicatedMap
V
- type of the value in theReplicatedMap
R
- type of the output item- Parameters:
mapName
- name of theReplicatedMap
lookupKeyFn
- a function which returns the key to look up in the map. Must not return nullmapFn
- the mapping function- Returns:
- the newly attached stage
-
mapUsingReplicatedMap
@Nonnull default <K, V, R> BatchStage<R> mapUsingReplicatedMap(@Nonnull ReplicatedMap<K,V> replicatedMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a mapping stage where for each item a lookup in the suppliedReplicatedMap
is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
This sample takes a stream of stock items and sets theK key = lookupKeyFn.apply(item); V value = replicatedMap.get(key); return mapFn.apply(item, value);
detail
field on them by looking up from a registry:items.mapUsingReplicatedMap( enrichingMap, item -> item.getDetailId(), (item, detail) -> item.setDetail(detail) )
- Specified by:
mapUsingReplicatedMap
in interfaceGeneralStage<T>
- Type Parameters:
K
- type of the key in theReplicatedMap
V
- type of the value in theReplicatedMap
R
- type of the output item- Parameters:
replicatedMap
- theReplicatedMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the map. Must not return nullmapFn
- the mapping function- Returns:
- the newly attached stage
-
mapUsingIMap
@Nonnull default <K, V, R> BatchStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a mapping stage where for each item a lookup in theIMap
with the supplied name is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
This sample takes a stream of stock items and sets theK key = lookupKeyFn.apply(item); V value = map.get(key); return mapFn.apply(item, value);
detail
field on them by looking up from a registry:
See alsoitems.mapUsingIMap( "enriching-map", item -> item.getDetailId(), (Item item, ItemDetail detail) -> item.setDetail(detail) )
GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned version of this operation.- Specified by:
mapUsingIMap
in interfaceGeneralStage<T>
- Type Parameters:
K
- type of the key in theIMap
V
- type of the value in theIMap
R
- type of the output item- Parameters:
mapName
- name of theIMap
lookupKeyFn
- a function which returns the key to look up in the map. Must not return nullmapFn
- the mapping function- Returns:
- the newly attached stage
-
mapUsingIMap
@Nonnull default <K, V, R> BatchStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)Description copied from interface:GeneralStage
Attaches a mapping stage where for each item a lookup in the suppliedIMap
is performed and the result of the lookup is merged with the item and emitted.If the result of the mapping is
null
, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.The mapping logic is equivalent to:
This sample takes a stream of stock items and sets theK key = lookupKeyFn.apply(item); V value = map.get(key); return mapFn.apply(item, value);
detail
field on them by looking up from a registry:
See alsoitems.mapUsingIMap( enrichingMap, item -> item.getDetailId(), (item, detail) -> item.setDetail(detail) )
GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned version of this operation.- Specified by:
mapUsingIMap
in interfaceGeneralStage<T>
- Type Parameters:
K
- type of the key in theIMap
V
- type of the value in theIMap
R
- type of the output item- Parameters:
iMap
- theIMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the map. Must not return nullmapFn
- the mapping function- Returns:
- the newly attached stage
-
distinct
Attaches a stage that emits just the items that are distinct according to their definition of equality (equals
andhashCode
). There is no guarantee which one of equal items it will emit.- Returns:
- the newly attached stage
-
merge
Attaches a stage that emits all the items from this stage as well as all the items from the supplied stage. The other stage's type parameter must be assignment-compatible with this stage's type parameter.- Parameters:
other
- the other stage whose data to merge into this one- Returns:
- the newly attached stage
-
hashJoin
@Nonnull <K, T1_IN, T1, R> BatchStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)Description copied from interface:GeneralStage
Attaches to both this and the supplied stage a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to thepackage javadoc
for a detailed description of the hash-join transform.This sample joins a stream of users to a stream of countries and outputs a stream of users with the
country
field set:// Types of the input stages: BatchStage<User> users; BatchStage<Map.Entry<Long, Country>> idAndCountry; users.hashJoin( idAndCountry, JoinClause.joinMapEntries(User::getCountryId), (user, country) -> user.setCountry(country) )
- Specified by:
hashJoin
in interfaceGeneralStage<T>
- Type Parameters:
K
- the type of the join keyT1_IN
- the type ofstage1
itemsT1
- the result type of projection onstage1
itemsR
- the resulting output type- Parameters:
stage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output value- Returns:
- the newly attached stage
-
innerHashJoin
@Nonnull <K, T1_IN, T1, R> BatchStage<R> innerHashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)Description copied from interface:GeneralStage
Attaches to both this and the supplied stage an inner hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to thepackage javadoc
for a detailed description of the hash-join transform.This sample joins a stream of users to a stream of countries and outputs a stream of users with the
country
field set:// Types of the input stages: BatchStage<User> users; BatchStage<Map.Entry<Long, Country>> idAndCountry; users.innerHashJoin( idAndCountry, JoinClause.joinMapEntries(User::getCountryId), (user, country) -> user.setCountry(country) )
This method is similar to
GeneralStage.hashJoin(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.function.BiFunctionEx<T, T1, R>)
method, but it guarantees that both input items will be not-null. Nulls will be filtered out before reaching#mapToOutputFn
.- Specified by:
innerHashJoin
in interfaceGeneralStage<T>
- Type Parameters:
K
- the type of the join keyT1_IN
- the type ofstage1
itemsT1
- the result type of projection onstage1
itemsR
- the resulting output type- Parameters:
stage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output value- Returns:
- the newly attached stage
-
hashJoin2
@Nonnull <K1, K2, T1_IN, T2_IN, T1, T2, R> BatchStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)Description copied from interface:GeneralStage
Attaches to this and the two supplied stages a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to thepackage javadoc
for a detailed description of the hash-join transform.This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the
country
andcompany
fields set:// Types of the input stages: BatchStage<User> users; BatchStage<Map.Entry<Long, Country>> idAndCountry; BatchStage<Map.Entry<Long, Company>> idAndCompany; users.hashJoin2( idAndCountry, JoinClause.joinMapEntries(User::getCountryId), idAndCompany, JoinClause.joinMapEntries(User::getCompanyId), (user, country, company) -> user.setCountry(country).setCompany(company) )
- Specified by:
hashJoin2
in interfaceGeneralStage<T>
- Type Parameters:
K1
- the type of key forstage1
K2
- the type of key forstage2
T1_IN
- the type ofstage1
itemsT2_IN
- the type ofstage2
itemsT1
- the result type of projection ofstage1
itemsT2
- the result type of projection ofstage2
itemsR
- the resulting output type- Parameters:
stage1
- the first stage to joinjoinClause1
- specifies how to join withstage1
stage2
- the second stage to joinjoinClause2
- specifies how to join withstage2
mapToOutputFn
- function to map the joined items to the output value- Returns:
- the newly attached stage
-
innerHashJoin2
@Nonnull <K1, K2, T1_IN, T2_IN, T1, T2, R> BatchStage<R> innerHashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)Description copied from interface:GeneralStage
Attaches to this and the two supplied stages a inner hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to thepackage javadoc
for a detailed description of the hash-join transform.This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the
country
andcompany
fields set:// Types of the input stages: BatchStage<User> users; BatchStage<Map.Entry<Long, Country>> idAndCountry; BatchStage<Map.Entry<Long, Company>> idAndCompany; users.innerHashJoin2( idAndCountry, JoinClause.joinMapEntries(User::getCountryId), idAndCompany, JoinClause.joinMapEntries(User::getCompanyId), (user, country, company) -> user.setCountry(country).setCompany(company) )
This method is similar to
GeneralStage.hashJoin2(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K1, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.jet.pipeline.BatchStage<T2_IN>, com.hazelcast.jet.pipeline.JoinClause<K2, ? super T, ? super T2_IN, ? extends T2>, com.hazelcast.jet.function.TriFunction<T, T1, T2, R>)
method, but it guarantees that both input items will be not-null. Nulls will be filtered out before reaching#mapToOutputFn
.- Specified by:
innerHashJoin2
in interfaceGeneralStage<T>
- Type Parameters:
K1
- the type of key forstage1
K2
- the type of key forstage2
T1_IN
- the type ofstage1
itemsT2_IN
- the type ofstage2
itemsT1
- the result type of projection ofstage1
itemsT2
- the result type of projection ofstage2
itemsR
- the resulting output type- Parameters:
stage1
- the first stage to joinjoinClause1
- specifies how to join withstage1
stage2
- the second stage to joinjoinClause2
- specifies how to join withstage2
mapToOutputFn
- function to map the joined items to the output value- Returns:
- the newly attached stage
-
hashJoinBuilder
Description copied from interface:GeneralStage
Returns a fluent API builder object to construct a hash join operation with any number of contributing stages. It is mainly intended for hash-joins with three or more enriching stages. For one or two stages prefer the directstage.hashJoinN(...)
calls because they offer more static type safety.This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the
country
andcompany
fields set:// Types of the input stages: StreamStage<User> users; BatchStage<Map.Entry<Long, Country>> idAndCountry; BatchStage<Map.Entry<Long, Company>> idAndCompany; StreamHashJoinBuilder<User> builder = users.hashJoinBuilder(); Tag<Country> tCountry = builder.add(idAndCountry, JoinClause.joinMapEntries(User::getCountryId)); Tag<Company> tCompany = builder.add(idAndCompany, JoinClause.joinMapEntries(User::getCompanyId)); StreamStage<User> joined = builder.build((user, itemsByTag) -> user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
- Specified by:
hashJoinBuilder
in interfaceGeneralStage<T>
- Returns:
- the newly attached stage
-
aggregate
@Nonnull <R> BatchStage<R> aggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all the items it receives. The aggregating stage emits a single item.Sample usage:
stage.aggregate(AggregateOperations.counting())
- Type Parameters:
R
- the type of the result- Parameters:
aggrOp
- the aggregate operation to perform- See Also:
AggregateOperations
-
aggregate2
@Nonnull <T1, R> BatchStage<R> aggregate2(@Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation2<? super T,? super T1,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all the items it receives from both this stage andstage1
you supply. This variant requires you to provide a two-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of two single-input aggregate operations, one for each input stream, then you should usestage0.aggregate2(aggrOp0, stage1, aggrOp1)
because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.The returned stage emits a single item.
Sample usage:
BatchStage<Tuple2<Long, Long>> counts = pageVisits.aggregate2(addToCarts, AggregateOperations.aggregateOperation2( AggregateOperations.counting(), AggregateOperations.counting()) );
- Type Parameters:
T1
- type of items instage1
R
- type of the result- Parameters:
aggrOp
- the aggregate operation to perform- See Also:
AggregateOperations
-
aggregate2
@Nonnull default <T1, R0, R1> BatchStage<Tuple2<R0,R1>> aggregate2(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1)Attaches a stage that co-aggregates the data from this and the supplied stage by performing a separate aggregate operation on each and emitting aTuple2
with their results.The returned stage emits a single item.
Sample usage:
BatchStage<Tuple2<Long, Long>> counts = pageVisits.aggregate2( AggregateOperations.counting(), addToCarts, AggregateOperations.counting() );
- Type Parameters:
T1
- type of the items in the other stageR0
- type of the aggregated result for this stageR1
- type of the aggregated result for the other stage- Parameters:
aggrOp0
- aggregate operation to perform on this stagestage1
- the other stageaggrOp1
- aggregate operation to perform on the other stage
-
aggregate3
@Nonnull <T1, T2, R> BatchStage<R> aggregate3(@Nonnull BatchStage<T1> stage1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation3<? super T,? super T1,? super T2,?,? extends R> aggrOp)Attaches a stage that performs the given aggregate operation over all the items it receives from this stage as well asstage1
andstage2
you supply. This variant requires you to provide a three-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of two single-input aggregate operations, one for each input stream, then you should usestage0.aggregate3(aggrOp0, stage1, aggrOp1, stage2, aggrOp2)
because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.The returned stage emits a single item.
Sample usage:
BatchStage<Tuple3<Long, Long, Long>> counts = pageVisits.aggregate3( addToCarts, payments, AggregateOperations.aggregateOperation3( AggregateOperations.counting(), AggregateOperations.counting(), AggregateOperations.counting()));
- Type Parameters:
T1
- type of items instage1
T2
- type of items instage2
R
- type of the result- Parameters:
aggrOp
- the aggregate operation to perform- See Also:
AggregateOperations
-
aggregate3
@Nonnull default <T1, T2, R0, R1, R2> BatchStage<Tuple3<R0,R1,R2>> aggregate3(@Nonnull AggregateOperation1<? super T,?,? extends R0> aggrOp0, @Nonnull BatchStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,?,? extends R1> aggrOp1, @Nonnull BatchStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,?,? extends R2> aggrOp2)Attaches a stage that co-aggregates the data from this and the two supplied stages by performing a separate aggregate operation on each and emitting aTuple3
with their results.The returned stage emits a single item.
Sample usage:
BatchStage<Tuple3<Long, Long, Long>> counts = pageVisits.aggregate3( AggregateOperations.counting(), addToCarts, AggregateOperations.counting(), payments, AggregateOperations.counting() );
- Type Parameters:
T1
- type of the items instage1
T2
- type of the items instage2
R0
- type of the aggregated result for this stageR1
- type of the aggregated result forstage1
R2
- type of the aggregated result forstage2
- Parameters:
aggrOp0
- aggregate operation to perform on this stagestage1
- the first additional stageaggrOp1
- aggregate operation to perform onstage1
stage2
- the second additional stageaggrOp2
- aggregate operation to perform onstage2
-
aggregateBuilder
@Nonnull default <R0> AggregateBuilder<R0> aggregateBuilder(AggregateOperation1<? super T,?,? extends R0> aggrOp0)Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get. You supply an aggregate operation for each input stage and in the output you get the individual aggregation results in aMap.Entry(key, itemsByTag)
. Use the tag you get frombuilder.add(stageN, aggrOpN)
to retrieve the aggregated result for that stage. Usebuilder.tag0()
as the tag of this stage. You will also be able to supply a function to the builder that immediately transforms theItemsByTag
to the desired output type.This example counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:
BatchStage<Long> stage0 = p.readFrom(source0); BatchStage<Long> stage1 = p.readFrom(source1); BatchStage<Long> stage2 = p.readFrom(source2); AggregateBuilder<Long> b = stage0.aggregateBuilder( AggregateOperations.counting()); Tag<Long> tag0 = b.tag0(); Tag<Long> tag1 = b.add(stage1, AggregateOperations.summingLong(Number::longValue)); Tag<Double> tag2 = b.add(stage2, AggregateOperations.averagingLong(Number::longValue)); BatchStage<ItemsByTag> aggregated = b.build(); aggregated.map(ibt -> String.format( "Count of stage0: %d, sum of stage1: %d, average of stage2: %f", ibt.get(tag0), ibt.get(tag1), ibt.get(tag2)) );
-
aggregateBuilder
Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get.This builder requires you to provide a multi-input aggregate operation. If you can express your logic in terms of single-input aggregate operations, one for each input stream, then you should use
stage0.aggregateBuilder(aggrOp0)
because it offers a simpler API. Use this builder only when you have the need to implement an aggregate operation that combines all the input streams into the same accumulator.This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct
stage.aggregateN(...)
calls because they offer more static type safety.To add the other stages, call
add(stage)
. Collect all the tags returned fromadd()
and use them when building the aggregate operation. Retrieve the tag of the first stage (from which you obtained this builder) by callingAggregateBuilder1.tag0()
.This example takes three streams of strings and counts the distinct strings across all of them:
Pipeline p = Pipeline.create(); BatchStage<String> stage0 = p.readFrom(source0); BatchStage<String> stage1 = p.readFrom(source1); BatchStage<String> stage2 = p.readFrom(source2); AggregateBuilder1<String> b = stage0.aggregateBuilder(); Tag<String> tag0 = b.tag0(); Tag<String> tag1 = b.add(stage1); Tag<String> tag2 = b.add(stage2); BatchStage<Integer> aggregated = b.build(AggregateOperation .withCreate(HashSet<String>::new) .andAccumulate(tag0, (acc, item) -> acc.add(item)) .andAccumulate(tag1, (acc, item) -> acc.add(item)) .andAccumulate(tag2, (acc, item) -> acc.add(item)) .andCombine(HashSet::addAll) .andFinish(HashSet::size));
-
peek
Description copied from interface:GeneralStage
Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it logs the result of itstoString()
method at the INFO level to the log categorycom.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
. The stage logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development use, when running Jet on a local machine.- Specified by:
peek
in interfaceGeneralStage<T>
- Returns:
- the newly attached stage
- See Also:
GeneralStage.peek(PredicateEx, FunctionEx)
,GeneralStage.peek(FunctionEx)
-
peek
@Nonnull BatchStage<T> peek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)Description copied from interface:GeneralStage
Attaches a peeking stage which logs this stage's output and passes it through without transformation. For each item the stage emits, it:-
uses the
shouldLogFn
predicate to see whether to log the item -
if yes, uses then uses
toStringFn
to get the item's string representation -
logs the string at the INFO level to the log category
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Sample usage:
users.peek( user -> user.getName().size() > 100, User::getName )
- Specified by:
peek
in interfaceGeneralStage<T>
- Parameters:
shouldLogFn
- a function to filter the logged items. You can usealwaysTrue()
as a pass-through filter when you don't need any filtering.toStringFn
- a function that returns a string representation of the item- Returns:
- the newly attached stage
- See Also:
GeneralStage.peek(FunctionEx)
,GeneralStage.peek()
-
uses the
-
peek
@Nonnull default BatchStage<T> peek(@Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)Description copied from interface:GeneralStage
Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it:-
uses
toStringFn
to get a string representation of the item -
logs the string at the INFO level to the log category
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Sample usage:
users.peek(User::getName)
- Specified by:
peek
in interfaceGeneralStage<T>
- Parameters:
toStringFn
- a function that returns a string representation of the item- Returns:
- the newly attached stage
- See Also:
GeneralStage.peek(PredicateEx, FunctionEx)
,GeneralStage.peek()
-
uses
-
customTransform
@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)Description copied from interface:GeneralStage
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStage<T>
- Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors- Returns:
- the newly attached stage
-
customTransform
@Nonnull default <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)Description copied from interface:GeneralStage
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStage<T>
- Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors- Returns:
- the newly attached stage
-
customTransform
@Nonnull <R> BatchStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)Description copied from interface:GeneralStage
Attaches a stage with a custom transform based on the provided supplier of Core APIProcessor
s.Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
- Specified by:
customTransform
in interfaceGeneralStage<T>
- Type Parameters:
R
- the type of the output items- Parameters:
stageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors- Returns:
- the newly attached stage
-
apply
@Nonnull default <R> BatchStage<R> apply(@Nonnull FunctionEx<? super BatchStage<T>,? extends BatchStage<R>> transformFn)Transformsthis
stage using the providedtransformFn
and returns the transformed stage. It allows you to extract common pipeline transformations into a method and then call that method without interrupting the chained pipeline expression.For example, say you have this code:
You can capture theBatchStage<String> input = pipeline.readFrom(textSource); BatchStage<String> cleanedUp = input .map(String::toLowerCase) .filter(s -> s.startsWith("success"));
map
andfilter
steps into a common "cleanup" transformation:
Now you can insert this transformation as just another step in your pipeline:BatchStage<String> cleanUp(BatchStage<String> input) { return input.map(String::toLowerCase) .filter(s -> s.startsWith("success")); }
BatchStage<String> tokens = pipeline .readFrom(textSource) .apply(this::cleanUp) .flatMap(line -> traverseArray(line.split("\\W+")));
- Type Parameters:
R
- type of the returned stage- Parameters:
transformFn
- function to transform this stage into another stage- Since:
- 3.1
-
setLocalParallelism
Description copied from interface:Stage
Sets the preferred local parallelism (number of processors per Jet cluster member) this stage will configure its DAG vertices with. Jet always uses the same number of processors on each member, so the total parallelism automatically increases if another member joins the cluster.While most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.
The default value is -1 and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.
- Specified by:
setLocalParallelism
in interfaceGeneralStage<T>
- Specified by:
setLocalParallelism
in interfaceStage
- Returns:
- this stage
-
setName
Description copied from interface:Stage
Overrides the default name of the stage with the name you choose and returns the stage. This can be useful for debugging purposes, to better distinguish pipeline stages in the diagnostic output.- Specified by:
setName
in interfaceGeneralStage<T>
- Specified by:
setName
in interfaceStage
- Parameters:
name
- the stage name- Returns:
- this stage
-