T
- type of the stream itemK
- type of the grouping keypublic interface GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<S> GeneralStage<T> |
filterStateful(SupplierEx<? extends S> createFn,
BiPredicateEx<? super S,? super T> filterFn)
Attaches a stage that performs a stateful filtering operation.
|
<C> GeneralStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
TriPredicate<? super C,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> GeneralStage<T> |
filterUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
Asynchronous version of
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>) : the filterAsyncFn returns a CompletableFuture<Boolean> instead of
just a boolean . |
<S,R> GeneralStage<R> |
flatMapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
Attaches a stage that performs a stateful flat-mapping operation.
|
<C,R> GeneralStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns as the output items. |
<C,R> GeneralStage<R> |
flatMapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
Asynchronous version of
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>) : the flatMapAsyncFn returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R> . |
FunctionEx<? super T,? extends K> |
keyFn()
Returns the function that extracts the key from stream items.
|
<S,R> GeneralStage<R> |
mapStateful(SupplierEx<? extends S> createFn,
TriFunction<? super S,? super K,? super T,? extends R> mapFn)
Attaches a stage that performs a stateful mapping operation.
|
<C,R> GeneralStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<C,R> GeneralStage<R> |
mapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
default <V,R> GeneralStage<R> |
mapUsingIMap(IMap<K,V> iMap,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <V,R> GeneralStage<R> |
mapUsingIMap(String mapName,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <A,R> GeneralStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,A,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
@Nonnull FunctionEx<? super T,? extends K> keyFn()
@Nonnull <S,R> GeneralStage<R> mapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends R> mapFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to mapFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
This sample takes a stream of pairs (serverId, latency)
representing the latencies of serving individual requests and outputs
the cumulative latency of all handled requests so far, for each
server separately:
GeneralStage<Entry<String, Long>> latencies;
GeneralStage<Entry<String, Long>> cumulativeLatencies = latencies
.groupingKey(Entry::getKey)
.mapStateful(
LongAccumulator::new,
(sum, key, entry) -> {
sum.add(entry.getValue());
return entry(key, sum.get());
}
);
This code has the same result as latencies.groupingKey(Entry::getKey).rollingAggregate(summing())
.S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectmapFn
- function that receives the state object and the input item and
outputs the result item. It may modify the state object.@Nonnull <S> GeneralStage<T> filterStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,? super T> filterFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to filterFn
, which
can update the object's state. For each grouping key there's a separate
state object. The state object will be included in the state snapshot,
so it survives job restarts. For this reason it must be serializable.
This sample groups a stream of strings by length and decimates each group (throws out every 10th string of each length):
GeneralStage<String> decimated = input
.groupingKey(String::length)
.filterStateful(
LongAccumulator::new,
(counter, item) -> {
counter.add(1);
return counter.get() % 10 != 0;
}
);
S
- type of the state objectcreateFn
- function that returns the state objectfilterFn
- predicate that receives the state object and the input item and
outputs a boolean value. It may modify the state object.@Nonnull <S,R> GeneralStage<R> flatMapStateful(@Nonnull SupplierEx<? extends S> createFn, @Nonnull TriFunction<? super S,? super K,? super T,? extends Traverser<R>> flatMapFn)
createFn
returns the object that holds the state. Jet passes this
object along with each input item and its key to flatMapFn
,
which can update the object's state. For each grouping key there's a
separate state object. The state object will be included in the state
snapshot, so it survives job restarts. For this reason it must be
serializable.
This sample groups a stream of strings by length and inserts punctuation (a special string) after every 10th string in each group:
GeneralStage<String> punctuated = input
.groupingKey(String::length)
.flatMapStateful(
LongAccumulator::new,
(counter, key, item) -> {
counter.add(1);
return counter.get() % 10 == 0
? Traversers.traverseItems("punctuation" + key, item)
: Traversers.singleton(item);
}
);
S
- type of the state objectR
- type of the resultcreateFn
- function that returns the state objectflatMapFn
- function that receives the state object and the input item and
outputs the result items. It may modify the state
object. It must not return null traverser, but can
return an empty traverser.@Nonnull default <A,R> GeneralStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,A,? extends R> aggrOp)
AggregateOperation
. It passes each input item to
the accumulator and outputs the current result of aggregation (as
returned by the export
primitive).
Sample usage:
StreamStage<Entry<Color, Long>> aggregated = items
.groupingKey(Item::getColor)
.rollingAggregate(AggregateOperations.counting());
For example, if your input is {2, 7, 8, -5}
, the output will be
{2, 9, 17, 12}
.
This stage is fault-tolerant and saves its state to the snapshot.
R
- type of the aggregate operation resultaggrOp
- the aggregate operation to perform@Nonnull <C,R> GeneralStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends R> mapFn)
contextFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> item.setDetail(reg.fetchDetail(key))
);
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C,R> GeneralStage<R> mapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> item.setDetail(detail))
);
The latency of the async call will add to the total latency of the
output.C
- type of context objectR
- the future's result type of the mapping functioncontextFactory
- the context factorymapAsyncFn
- a stateless mapping function. Can map to null (return
a null future)@Nonnull <C> GeneralStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriPredicate<? super C,? super K,? super T> filterFn)
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetail(key).contains("blade")
);
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C> GeneralStage<T> filterUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>)
: the filterAsyncFn
returns a CompletableFuture<Boolean>
instead of
just a boolean
.
The function must not return a null future.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> detail.contains("blade"))
);
The latency of the async call will add to the total latency of the output.
C
- type of context objectcontextFactory
- the context factoryfilterAsyncFn
- a stateless filtering function@Nonnull <C,R> GeneralStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products
.groupingKey(Product::getId)
.flatMapUsingContext(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> Traversers.traverseIterable(
registry.fetchParts(productId))
);
C
- type of context objectR
- type of the output itemscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function. It must not return
null traverser, but can return an empty traverser.@Nonnull <C,R> GeneralStage<R> flatMapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>)
: the flatMapAsyncFn
returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R>
.
The function can return a null future or the future can return a null traverser: in both cases it will act just like a filter.
Sample usage:
StreamStage<Part> productParts = products
.groupingKey(Product::getId)
.flatMapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> registry
.fetchPartsAsync(productId)
.thenApply(parts -> Traversers.traverseIterable(parts))
);
The latency of the async call will add to the latency of the items.
C
- type of context objectR
- the type of the returned stagecontextFactory
- the context factoryflatMapAsyncFn
- a stateless flatmapping function. Can map to null
(return a null future), but the future must not
return null traverser, but can return an empty traverser.@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(
"enriching-map",
(Item item, ItemDetail detail) -> item.setDetail(detail)
);
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the
input in order to achieve data locality. The value it fetches from the
IMap
is stored on the cluster member where the processing takes
place. However, if the map doesn't use the default partitioning strategy,
data locality will be broken.V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the contextmapFn
- the mapping function@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2019 Hazelcast, Inc.. All rights reserved.