T
- type of the stream itemK
- type of the grouping keypublic interface GeneralStageWithKey<T,K>
Modifier and Type | Method and Description |
---|---|
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<C> GeneralStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
TriPredicate<? super C,? super K,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> GeneralStage<T> |
filterUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
Asynchronous version of
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>) : the filterAsyncFn returns a CompletableFuture<Boolean> instead of
just a boolean . |
<C,R> GeneralStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns as the output items. |
<C,R> GeneralStage<R> |
flatMapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
Asynchronous version of
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>) : the flatMapAsyncFn returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R> . |
FunctionEx<? super T,? extends K> |
keyFn()
Returns the function that extracts the key from stream items.
|
<C,R> GeneralStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<C,R> GeneralStage<R> |
mapUsingContextAsync(ContextFactory<C> contextFactory,
TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
default <V,R> GeneralStage<R> |
mapUsingIMap(IMap<K,V> iMap,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
default <V,R> GeneralStage<R> |
mapUsingIMap(String mapName,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted. |
<R> GeneralStage<Map.Entry<K,R>> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
@Nonnull FunctionEx<? super T,? extends K> keyFn()
@Nonnull <C,R> GeneralStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends R> mapFn)
contextFactory
. If the
mapping result is null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> item.setDetail(reg.fetchDetail(key))
);
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C,R> GeneralStage<R> mapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<R>> mapAsyncFn)
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> item.setDetail(detail))
);
The latency of the async call will add to the total latency of the
output.C
- type of context objectR
- the future's result type of the mapping functioncontextFactory
- the context factorymapAsyncFn
- a stateless mapping function. Can map to null (return
a null future)@Nonnull <C> GeneralStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriPredicate<? super C,? super K,? super T> filterFn)
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetail(key).contains("blade")
);
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C> GeneralStage<T> filterUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Boolean>> filterAsyncFn)
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriPredicate<? super C, ? super K, ? super T>)
: the filterAsyncFn
returns a CompletableFuture<Boolean>
instead of
just a boolean
.
The function must not return a null future.
Sample usage:
items.groupingKey(Item::getDetailId)
.filterUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry()),
(reg, key, item) -> reg.fetchDetailAsync(key)
.thenApply(detail -> detail.contains("blade"))
);
The latency of the async call will add to the total latency of the output.
C
- type of context objectcontextFactory
- the context factoryfilterAsyncFn
- a stateless filtering function@Nonnull <C,R> GeneralStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,? extends Traverser<? extends R>> flatMapFn)
Traverser
it returns as the output items. The traverser must
be null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
Jet uses the key-extracting function specified on this stage for partitioning: all the items with the same key will see the same context instance (but note that the same instance serves many keys). One case where this is useful is fetching data from an external system because you can use a near-cache without duplicating the cached data.
Sample usage:
StreamStage<Part> parts = products
.groupingKey(Product::getId)
.flatMapUsingContext(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> Traversers.traverseIterable(
registry.fetchParts(productId))
);
C
- type of context objectR
- type of the output itemscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function@Nonnull <C,R> GeneralStage<R> flatMapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull TriFunction<? super C,? super K,? super T,CompletableFuture<Traverser<R>>> flatMapAsyncFn)
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.TriFunction<? super C, ? super K, ? super T, ? extends com.hazelcast.jet.Traverser<? extends R>>)
: the flatMapAsyncFn
returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R>
.
The function can return a null future or the future can return a null traverser: in both cases it will act just like a filter.
Sample usage:
StreamStage<Part> productParts = products
.groupingKey(Product::getId)
.flatMapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new PartRegistry()),
(registry, productId, product) -> registry
.fetchPartsAsync(productId)
.thenApply(parts -> Traversers.traverseIterable(parts))
);
The latency of the async call will add to the latency of the items.
C
- type of context objectR
- the type of the returned stagecontextFactory
- the context factoryflatMapAsyncFn
- a stateless flatmapping function. Can map to null
(return a null future)@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
with the supplied name using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(
"enriching-map",
(Item item, ItemDetail detail) -> item.setDetail(detail)
);
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the input in order
to achieve data locality. The value it fetches from the IMap
is
stored on the cluster member where the processing takes place. However,
if the map doesn't use the default partitioning strategy, the data
locality will be broken.V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
mapFn
- the mapping function@Nonnull default <V,R> GeneralStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
using the grouping key is performed
and the result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
V value = map.get(groupingKey);
return mapFn.apply(item, value);
Sample usage:
items.groupingKey(Item::getDetailId)
.mapUsingIMap(enrichingMap, (item, detail) -> item.setDetail(detail));
This stage is similar to stageWithoutKey.mapUsingIMap()
,
but here Jet knows the key and uses it to partition and distribute the
input in order to achieve data locality. The value it fetches from the
IMap
is stored on the cluster member where the processing takes
place. However, if the map doesn't use the default partitioning strategy,
data locality will be broken.V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to use as the contextmapFn
- the mapping function@Nonnull <R> GeneralStage<Map.Entry<K,R>> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
{2, 7, 8, -5}
, the output will be {2,
9, 17, 12}
.
Sample usage:
StreamStage<Entry<Color, Long>> aggregated = items
.groupingKey(Item::getColor)
.rollingAggregate(AggregateOperations.counting());
This stage is fault-tolerant and saves its state to the snapshot.
R
- type of the aggregate operation resultaggrOp
- the aggregate operation to perform@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
Processor
s. The inbound edge will be distributed and
partitioned using the key function assigned to this stage.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2019 Hazelcast, Inc.. All rights reserved.