T
- the type of items coming out of this stagepublic interface GeneralStage<T> extends Stage
batch
and stream
pipeline stages, defining those operations that apply to both.
Unless specified otherwise, all functions passed to methods of this interface must be stateless.
Modifier and Type | Method and Description |
---|---|
StreamStage<T> |
addTimestamps(ToLongFunctionEx<? super T> timestampFn,
long allowedLag)
Adds a timestamp to each item in the stream using the supplied function
and specifies the allowed amount of disorder between them.
|
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorMetaSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
ProcessorSupplier procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
<R> GeneralStage<R> |
customTransform(String stageName,
SupplierEx<Processor> procSupplier)
Attaches a stage with a custom transform based on the provided supplier
of Core API
Processor s. |
SinkStage |
drainTo(Sink<? super T> sink)
Attaches a sink stage, one that accepts data but doesn't emit any.
|
GeneralStage<T> |
filter(PredicateEx<T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> GeneralStage<T> |
filterUsingContext(ContextFactory<C> contextFactory,
BiPredicateEx<? super C,? super T> filterFn)
Attaches a filtering stage which applies the provided predicate function
to each input item to decide whether to pass the item to the output or
to discard it.
|
<C> GeneralStage<T> |
filterUsingContextAsync(ContextFactory<C> contextFactory,
BiFunctionEx<? super C,? super T,? extends CompletableFuture<Boolean>> filterAsyncFn)
Asynchronous version of
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiPredicateEx<? super C, ? super T>) : the filterAsyncFn returns a CompletableFuture<Boolean> instead of
just a boolean . |
<R> GeneralStage<R> |
flatMap(FunctionEx<? super T,? extends Traverser<? extends R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all the items from the
Traverser it returns. |
<C,R> GeneralStage<R> |
flatMapUsingContext(ContextFactory<C> contextFactory,
BiFunctionEx<? super C,? super T,? extends Traverser<R>> flatMapFn)
Attaches a flat-mapping stage which applies the supplied function to
each input item independently and emits all items from the
Traverser it returns as the output items. |
<C,R> GeneralStage<R> |
flatMapUsingContextAsync(ContextFactory<C> contextFactory,
BiFunctionEx<? super C,? super T,? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn)
Asynchronous version of
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiFunctionEx<? super C, ? super T, ? extends com.hazelcast.jet.Traverser<R>>) : the flatMapAsyncFn returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R> . |
<K> GeneralStageWithKey<T,K> |
groupingKey(FunctionEx<? super T,? extends K> keyFn)
Specifies the function that will extract a key from the items in the
associated pipeline stage.
|
<K,T1_IN,T1,R> |
hashJoin(BatchStage<T1_IN> stage1,
JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1,
BiFunctionEx<T,T1,R> mapToOutputFn)
Attaches to both this and the supplied stage a hash-joining stage and
returns it.
|
<K1,K2,T1_IN,T2_IN,T1,T2,R> |
hashJoin2(BatchStage<T1_IN> stage1,
JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1,
BatchStage<T2_IN> stage2,
JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2,
TriFunction<T,T1,T2,R> mapToOutputFn)
Attaches to this and the two supplied stages a hash-joining stage and
returns it.
|
GeneralHashJoinBuilder<T> |
hashJoinBuilder()
Returns a fluent API builder object to construct a hash join operation
with any number of contributing stages.
|
<R> GeneralStage<R> |
map(FunctionEx<? super T,? extends R> mapFn)
Attaches a mapping stage which applies the given function to each input
item independently and emits the function's result as the output item.
|
<C,R> GeneralStage<R> |
mapUsingContext(ContextFactory<C> contextFactory,
BiFunctionEx<? super C,? super T,? extends R> mapFn)
Attaches a mapping stage which applies the supplied function to each
input item independently and emits the function's result as the output
item.
|
<C,R> GeneralStage<R> |
mapUsingContextAsync(ContextFactory<C> contextFactory,
BiFunctionEx<? super C,? super T,? extends CompletableFuture<R>> mapAsyncFn)
Asynchronous version of
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiFunctionEx<? super C, ? super T, ? extends R>) : the mapAsyncFn
returns a CompletableFuture<R> instead of just R . |
default <K,V,R> GeneralStage<R> |
mapUsingIMap(IMap<K,V> iMap,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
IMap is performed and the result of the
lookup is merged with the item and emitted. |
default <K,V,R> GeneralStage<R> |
mapUsingIMap(String mapName,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
IMap with the supplied name is performed and the
result of the lookup is merged with the item and emitted. |
default <K,V,R> GeneralStage<R> |
mapUsingReplicatedMap(ReplicatedMap<K,V> replicatedMap,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
supplied
ReplicatedMap is performed and the result of the
lookup is merged with the item and emitted. |
default <K,V,R> GeneralStage<R> |
mapUsingReplicatedMap(String mapName,
FunctionEx<? super T,? extends K> lookupKeyFn,
BiFunctionEx<? super T,? super V,? extends R> mapFn)
Attaches a mapping stage where for each item a lookup in the
ReplicatedMap with the supplied name is performed and the
result of the lookup is merged with the item and emitted. |
default GeneralStage<T> |
peek()
Adds a peeking layer to this compute stage which logs its output.
|
default GeneralStage<T> |
peek(FunctionEx<? super T,? extends CharSequence> toStringFn)
Adds a peeking layer to this compute stage which logs its output.
|
GeneralStage<T> |
peek(PredicateEx<? super T> shouldLogFn,
FunctionEx<? super T,? extends CharSequence> toStringFn)
Attaches a peeking stage which logs this stage's output and passes it
through without transformation.
|
<R> GeneralStage<R> |
rollingAggregate(AggregateOperation1<? super T,?,? extends R> aggrOp)
Attaches a rolling aggregation stage.
|
GeneralStage<T> |
setName(String name)
Overrides the default name of the stage with the name you choose and
returns the stage.
|
getPipeline, name, setLocalParallelism
@Nonnull <R> GeneralStage<R> map(@Nonnull FunctionEx<? super T,? extends R> mapFn)
null
, it emits nothing. Therefore this stage
can be used to implement filtering semantics as well.
This sample takes a stream of names and outputs the names in lowercase:
stage.map(name -> name.toLowerCase())
R
- the result type of the mapping functionmapFn
- a stateless mapping function@Nonnull GeneralStage<T> filter(@Nonnull PredicateEx<T> filterFn)
This sample removes empty strings from the stream:
stage.filter(name -> !name.isEmpty())
filterFn
- a stateless filter predicate function@Nonnull <R> GeneralStage<R> flatMap(@Nonnull FunctionEx<? super T,? extends Traverser<? extends R>> flatMapFn)
Traverser
it returns. The traverser must be null-terminated.
This sample takes a stream of sentences and outputs a stream of individual words in them:
stage.map(sentence -> traverseArray(sentence.split("\\W+")))
R
- the type of items in the result's traversersflatMapFn
- a stateless flatmapping function, whose result type is
Jet's Traverser
@Nonnull <C,R> GeneralStage<R> mapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C,? super T,? extends R> mapFn)
contextFactory
.
If the mapping result is null
, it emits nothing. Therefore this
stage can be used to implement filtering semantics as well.
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
stage.mapUsingContext(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry(jet)),
(reg, item) -> item.setDetail(reg.fetchDetail(item))
)
C
- type of context objectR
- the result type of the mapping functioncontextFactory
- the context factorymapFn
- a stateless mapping function@Nonnull <C,R> GeneralStage<R> mapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C,? super T,? extends CompletableFuture<R>> mapAsyncFn)
mapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiFunctionEx<? super C, ? super T, ? extends R>)
: the mapAsyncFn
returns a CompletableFuture<R>
instead of just R
.
The function can return a null future or the future can return a null result: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
stage.mapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ItemDetailRegistry(jet)),
(reg, item) -> reg.fetchDetailAsync(item)
.thenApply(detail -> item.setDetail(detail)
)
C
- type of context objectR
- the future's result type of the mapping functioncontextFactory
- the context factorymapAsyncFn
- a stateless mapping function. Can map to null (return
a null future)@Nonnull <C> GeneralStage<T> filterUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiPredicateEx<? super C,? super T> filterFn)
contextFactory
.
This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:
photos.filterUsingContext(
ContextFactory.withCreateFn(jet -> new ImageClassifier(jet)),
(classifier, photo) -> classifier.classify(photo).equals("cat")
)
C
- type of context objectcontextFactory
- the context factoryfilterFn
- a stateless filter predicate function@Nonnull <C> GeneralStage<T> filterUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C,? super T,? extends CompletableFuture<Boolean>> filterAsyncFn)
filterUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiPredicateEx<? super C, ? super T>)
: the filterAsyncFn
returns a CompletableFuture<Boolean>
instead of
just a boolean
.
The function must not return a null future.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:
photos.filterUsingContextAsync(
ContextFactory.withCreateFn(jet -> new ImageClassifier(jet)),
(classifier, photo) -> reg.classifyAsync(photo)
.thenApply(it -> it.equals("cat"))
)
C
- type of context objectcontextFactory
- the context factoryfilterAsyncFn
- a stateless filtering function@Nonnull <C,R> GeneralStage<R> flatMapUsingContext(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C,? super T,? extends Traverser<R>> flatMapFn)
Traverser
it returns as the output items. The traverser must be
null-terminated. The mapping function receives another
parameter, the context object, which Jet will create using the supplied
contextFactory
.
This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:
StreamStage<Part> parts = products.flatMapUsingContext(
ContextFactory.withCreateFn(jet -> new PartRegistryCtx()),
(registry, product) -> Traversers.traverseIterable(
registry.fetchParts(product))
);
C
- type of context objectR
- the type of items in the result's traverserscontextFactory
- the context factoryflatMapFn
- a stateless flatmapping function, whose result type is
Jet's Traverser
@Nonnull <C,R> GeneralStage<R> flatMapUsingContextAsync(@Nonnull ContextFactory<C> contextFactory, @Nonnull BiFunctionEx<? super C,? super T,? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn)
flatMapUsingContext(com.hazelcast.jet.pipeline.ContextFactory<C>, com.hazelcast.jet.function.BiFunctionEx<? super C, ? super T, ? extends com.hazelcast.jet.Traverser<R>>)
: the flatMapAsyncFn
returns a CompletableFuture<Traverser<R>>
instead of just Traverser<R>
.
The function can return a null future or the future can return a null traverser: in both cases it will act just like a filter.
The latency of the async call will add to the total latency of the output.
This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:
StreamStage<Part> parts = products.flatMapUsingContextAsync(
ContextFactory.withCreateFn(jet -> new PartRegistryCtx()),
(registry, product) -> registry
.fetchPartsAsync(product)
.thenApply(parts -> Traversers.traverseIterable(parts))
);
C
- type of context objectR
- the type of the returned stagecontextFactory
- the context factoryflatMapAsyncFn
- a stateless flatmapping function. Can map to null
(return a null future)@Nonnull default <K,V,R> GeneralStage<R> mapUsingReplicatedMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
ReplicatedMap
with the supplied name is performed and the
result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as
well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = replicatedMap.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingReplicatedMap(
"enriching-map",
item -> item.getDetailId(),
(Item item, ItemDetail detail) -> item.setDetail(detail)
)
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemmapName
- name of the ReplicatedMap
lookupKeyFn
- a function which returns the key to look up in the
map. Must not return nullmapFn
- the mapping function@Nonnull default <K,V,R> GeneralStage<R> mapUsingReplicatedMap(@Nonnull ReplicatedMap<K,V> replicatedMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
ReplicatedMap
is performed and the result of the
lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = replicatedMap.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingReplicatedMap(
enrichingMap,
item -> item.getDetailId(),
(item, detail) -> item.setDetail(detail)
)
K
- type of the key in the ReplicatedMap
V
- type of the value in the ReplicatedMap
R
- type of the output itemreplicatedMap
- the ReplicatedMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the
map. Must not return nullmapFn
- the mapping function@Nonnull default <K,V,R> GeneralStage<R> mapUsingIMap(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
with the supplied name is performed and the
result of the lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = map.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingIMap(
"enriching-map",
item -> item.getDetailId(),
(Item item, ItemDetail detail) -> item.setDetail(detail)
)
See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.jet.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned version of
this operation.K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemmapName
- name of the IMap
lookupKeyFn
- a function which returns the key to look up in the
map. Must not return nullmapFn
- the mapping function@Nonnull default <K,V,R> GeneralStage<R> mapUsingIMap(@Nonnull IMap<K,V> iMap, @Nonnull FunctionEx<? super T,? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,? super V,? extends R> mapFn)
IMap
is performed and the result of the
lookup is merged with the item and emitted.
If the result of the mapping is null
, it emits nothing.
Therefore this stage can be used to implement filtering semantics as well.
The mapping logic is equivalent to:
K key = lookupKeyFn.apply(item);
V value = map.get(key);
return mapFn.apply(item, value);
This sample takes a stream of stock items and sets the detail
field on them by looking up from a registry:
items.mapUsingIMap(
enrichingMap,
item -> item.getDetailId(),
(item, detail) -> item.setDetail(detail)
)
See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.jet.function.BiFunctionEx<? super T, ? super V, ? extends R>)
for a partitioned version of
this operation.K
- type of the key in the IMap
V
- type of the value in the IMap
R
- type of the output itemiMap
- the IMap
to lookup fromlookupKeyFn
- a function which returns the key to look up in the
map. Must not return nullmapFn
- the mapping function@Nonnull <R> GeneralStage<R> rollingAggregate(@Nonnull AggregateOperation1<? super T,?,? extends R> aggrOp)
{2, 7, 8, -5}
, the output will be {2, 9, 17, 12}
(see
the example below). The number of input and output items is equal.
Sample usage:
stage.rollingAggregate(AggregateOperations.summing())
This stage is fault-tolerant and saves its state to the snapshot.
NOTE 1: since the output for each item depends on all
the previous items, this operation cannot be parallelized. Jet will
perform it on a single member, single-threaded. Jet also supports
keyed rolling aggregation
which it can parallelize by partitioning.
R
- result type of the aggregate operationaggrOp
- the aggregate operation to do the aggregation@Nonnull <K,T1_IN,T1,R> GeneralStage<R> hashJoin(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BiFunctionEx<T,T1,R> mapToOutputFn)
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to a stream of countries and outputs
a stream of users with the country
field set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
users.hashJoin(
idAndCountry,
JoinClause.joinMapEntries(User::getCountryId),
(user, country) -> user.setCountry(country)
)
K
- the type of the join keyT1_IN
- the type of stage1
itemsT1
- the result type of projection on stage1
itemsR
- the resulting output typestage1
- the stage to hash-join with this onejoinClause1
- specifies how to join the two streamsmapToOutputFn
- function to map the joined items to the output value@Nonnull <K1,K2,T1_IN,T2_IN,T1,T2,R> GeneralStage<R> hashJoin2(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,? super T,? super T1_IN,? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,? super T,? super T2_IN,? extends T2> joinClause2, @Nonnull TriFunction<T,T1,T2,R> mapToOutputFn)
package javadoc
for a detailed description of the hash-join transform.
This sample joins a stream of users to streams of countries and
companies, and outputs a stream of users with the country
and
company
fields set:
// Types of the input stages:
BatchStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
BatchStage<Map.Entry<Long, Company>> idAndCompany;
users.hashJoin(
idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
(user, country, company) -> user.setCountry(country).setCompany(company)
)
K1
- the type of key for stage1
T1_IN
- the type of stage1
itemsT1
- the result type of projection of stage1
itemsK2
- the type of key for stage2
T2_IN
- the type of stage2
itemsT2
- the result type of projection of stage2
itemsR
- the resulting output typestage1
- the first stage to joinjoinClause1
- specifies how to join with stage1
stage2
- the second stage to joinjoinClause2
- specifies how to join with stage2
mapToOutputFn
- function to map the joined items to the output value@Nonnull GeneralHashJoinBuilder<T> hashJoinBuilder()
stage.hashJoinN(...)
calls because they offer
more static type safety.
This sample joins a stream of users to streams of countries and
companies, and outputs a stream of users with the country
and
company
fields set:
// Types of the input stages:
StreamStage<User> users;
BatchStage<Map.Entry<Long, Country>> idAndCountry;
BatchStage<Map.Entry<Long, Company>> idAndCompany;
StreamHashJoinBuilder<User> builder = users.hashJoinBuilder();
Tag<Country> tCountry = builder.add(idAndCountry,
JoinClause.joinMapEntries(User::getCountryId));
Tag<Company> tCompany = builder.add(idAndCompany,
JoinClause.joinMapEntries(User::getCompanyId));
StreamStage<User> joined = builder.build((user, itemsByTag) ->
user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
@Nonnull <K> GeneralStageWithKey<T,K> groupingKey(@Nonnull FunctionEx<? super T,? extends K> keyFn)
Sample usage:
users.groupingKey(User::getId)
Note: make sure the extracted key is not-null, it would fail the
job otherwise. Also make sure that it implements equals()
and
hashCode()
.
K
- type of the keykeyFn
- function that extracts the grouping key@Nonnull StreamStage<T> addTimestamps(@Nonnull ToLongFunctionEx<? super T> timestampFn, long allowedLag)
allowedLag
parameter controls by how much
the timestamp can be lower than the highest one observed so far. If
it is even lower, Jet will drop the item as being "too late".
For example, if the sequence of the timestamps is [1,4,3,2]
and
you configured the allowed lag as 1
, Jet will let through the
event with timestamp 3
, but it will drop the last one (timestamp
2
).
The amount of lag you configure strongly influences the latency of Jet's output. Jet cannot finalize the window until it knows it has observed all the events belonging to it, and the more lag it must tolerate, the longer will it have to wait for possible latecomers. On the other hand, if you don't allow enough lag, you face the risk of failing to account for the data that came in after the results were already emitted.
Sample usage:
events.addTimestamps(Event::getTimestamp, 1000)
Note: This method adds the timestamps after the source emitted
them. When timestamps are added at this moment, source partitions won't
be coalesced properly and will be treated as a single stream. The
allowed lag will need to cover for the additional disorder introduced by
merging the streams. The streams are merged in an unpredictable order
and it can happen, for example, that after the job was suspended for a
long time, there can be a very recent event in partition1 and a very old
event partition2. If partition1 happens to be merged first, the recent
event could render the old one late, if the allowed lag is not large
enough.
To add timestamps in source, use withTimestamps()
.
Warning: make sure the property you access in timestampFn
isn't null, it would fail the job. Also that there are no nonsensical
values such as -1, MIN_VALUE, 2100-01-01 etc - we'll treat those as real
timestamps and they can cause unspecified behaviour.
timestampFn
- a function that returns the timestamp for each item,
typically in millisecondsallowedLag
- the allowed lag behind the top observed timestamp.
Time unit is the same as the unit used by timestampFn
IllegalArgumentException
- if this stage already has timestamps@Nonnull SinkStage drainTo(@Nonnull Sink<? super T> sink)
You cannot reuse the sink in other drainTo
calls. If you want to
drain multiple stages to the same sink, use Pipeline.drainTo(com.hazelcast.jet.pipeline.Sink<? super T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>...)
.
This will be more efficient than creating a new sink each time.
@Nonnull GeneralStage<T> peek(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
shouldLogFn
predicate to see whether to log the item
toStringFn
to get the item's string
representation
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Sample usage:
users.peek(
user -> user.getName().size() > 100,
User::getName
)
shouldLogFn
- a function to filter the logged items. You can use alwaysTrue()
as a pass-through filter when you don't need any
filtering.toStringFn
- a function that returns a string representation of the itempeek(FunctionEx)
,
peek()
@Nonnull default GeneralStage<T> peek(@Nonnull FunctionEx<? super T,? extends CharSequence> toStringFn)
toStringFn
to get a string representation of the item
com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
Sample usage:
users.peek(User::getName)
toStringFn
- a function that returns a string representation of the itempeek(PredicateEx, FunctionEx)
,
peek()
@Nonnull default GeneralStage<T> peek()
toString()
method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
.
The stage logs each item on whichever cluster member it happens to
receive it. Its primary purpose is for development use, when running Jet
on a local machine.peek(PredicateEx, FunctionEx)
,
peek(FunctionEx)
@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processors@Nonnull <R> GeneralStage<R> customTransform(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
Processor
s.
Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.
R
- the type of the output itemsstageName
- a human-readable name for the custom stageprocSupplier
- the supplier of processorsCopyright © 2019 Hazelcast, Inc.. All rights reserved.