T
- type of the stream itemsK
- type of the keypublic interface StreamStageWithKey<T,K> extends GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
<R> StreamStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> StreamStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
default <R> StreamStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<C> StreamStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
TriPredicate<? super C,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> StreamStage<T> |
filterUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
Asynchronous version of
GeneralStageWithKey.filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>) : the filterAsyncFn returns a CompletableFuture<Boolean> instead of
just a boolean . |
<C,R> StreamStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns as the output items. |
<C,R> StreamStage<R> |
flatMapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
Asynchronous version of
GeneralStageWithKey.flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>) : the flatMapAsyncFn returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R> . |
<C,R> StreamStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<C,R> StreamStage<R> |
mapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
default <V,R> StreamStage<R> |
mapUsingIMap(IMap<K,V> iMap,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <V,R> StreamStage<R> |
mapUsingIMap(String mapName,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
<R> StreamStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
StageWithKeyAndWindow<T,K> |
window(WindowDefinition wDef)
Adds the definition of the window to use in the group-and-aggregate
pipeline stage being constructed.
|
keyFn
@Nonnull StageWithKeyAndWindow<T,K> window(@Nonnull WindowDefinition wDef)
@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
IMap
with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(
"enriching-map",
(Item item, ItemDetail detail) -> item.setDetail(detail)
);
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function@Nonnull default <V,R> StreamStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
GeneralStageWithKey
IMap
using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the
input in order to achieve data locality. The value it fetches from the
IMap
is stored on the cluster member where the processing takes
place. However, if the map doesn't use the default partitioning strategy,
data locality will be broken.mapUsingIMap
in interface GeneralStageWithKey<T,K>
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the contextmapFn
- the mapping function@Nonnull <C,R> StreamStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends R> mapFn)
GeneralStageWithKey
contextFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> item.setDetail(reg.fetchDetail(key))
);
mapUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C,R> StreamStage<R> mapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> item.setDetail(detail))
);
The latency of the async call will add to the total latency of the
output.mapUsingContextAsync
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- the future's result type of the mapping functioncontextFactory
- the context factorymapAsyncFn
- a stateless mapping function. Can map to null (return
a null future)@Nonnull <C> StreamStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriPredicate<? super C,? super K,? super T> filterFn)
GeneralStageWithKey
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetail(key).contains("blade")
);
filterUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C> StreamStage<T> filterUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>)
: the filterAsyncFn
returns a CompletableFuture<Boolean>
instead of
just a boolean
.
The function must not return a null future.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> detail.contains("blade"))
);
The latency of the async call will add to the total latency of the output.
filterUsingContextAsync
in interface GeneralStageWithKey<T,K>
C
- type of context objectcontextFactory
- the context factoryfilterAsyncFn
- a stateless filtering function@Nonnull <C,R> StreamStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
GeneralStageWithKey
Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products
.groupingKey(Product::getId)
.flatMapUsingContext(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> Traversers.traverseIterable(
registry.fetchParts(productId))
);
flatMapUsingContext
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- type of the output itemscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function@Nonnull <C,R> StreamStage<R> flatMapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
GeneralStageWithKey
GeneralStageWithKey.flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>)
: the flatMapAsyncFn
returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R>
.
The function can return a null future or the future can return a null traverser: in both cases it will act just like a filter.
Sample usage:
StreamStage<Part> productParts = products
.groupingKey(Product::getId)
.flatMapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> registry
.fetchPartsAsync(productId)
.thenApply(parts -> Traversers.traverseIterable(parts))
);
The latency of the async call will add to the latency of the items.
flatMapUsingContextAsync
in interface GeneralStageWithKey<T,K>
C
- type of context objectR
- the type of the returned stagecontextFactory
- the context factoryflatMapAsyncFn
- a stateless flatmapping function. Can map to null
(return a null future)@Nonnull <R> StreamStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
GeneralStageWithKey
{2, 7, 8, -5}
, the output will be {2,
9, 17, 12}
.
Sample usage:
StreamStage<Entry<Color, Long>> aggregated = items
.groupingKey(Item::getColor)
.rollingAggregate(AggregateOperations.counting());
This stage is fault-tolerant and saves its state to the snapshot.
rollingAggregate
in interface GeneralStageWithKey<T,K>
R
- type of the aggregate operation resultaggrOp
- the aggregate operation to perform@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull default <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> StreamStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
GeneralStageWithKey
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
customTransform
in interface GeneralStageWithKey<T,K>
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2019 Hazelcast, Inc.. All rights reserved.