Interface GeneralStage<T>

Type Parameters:
T - the type of items coming out of this stage
All Superinterfaces:
Stage
All Known Subinterfaces:
BatchStage<T>, StreamStage<T>

public interface GeneralStage<T>
extends Stage
The common aspect of batch and stream pipeline stages, defining those operations that apply to both.

Unless specified otherwise, all functions passed to methods of this interface must be stateless.

Since:
3.0
  • Method Details

    • map

      @Nonnull <R> GeneralStage<R> map​(@Nonnull FunctionEx<? super T,​? extends R> mapFn)
      Attaches a mapping stage which applies the given function to each input item independently and emits the function's result as the output item. If the result is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      This sample takes a stream of names and outputs the names in lowercase:

      
       stage.map(name -> name.toLowerCase())
       
      Type Parameters:
      R - the result type of the mapping function
      Parameters:
      mapFn - a stateless mapping function
      Returns:
      the newly attached stage
    • filter

      @Nonnull GeneralStage<T> filter​(@Nonnull PredicateEx<T> filterFn)
      Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. Returns the newly attached stage.

      This sample removes empty strings from the stream:

      
       stage.filter(name -> !name.isEmpty())
       
      Parameters:
      filterFn - a stateless filter predicate function
      Returns:
      the newly attached stage
    • flatMap

      @Nonnull <R> GeneralStage<R> flatMap​(@Nonnull FunctionEx<? super T,​? extends Traverser<R>> flatMapFn)
      Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all the items from the Traverser it returns. The traverser must be null-terminated.

      This sample takes a stream of sentences and outputs a stream of individual words in them:

      
       stage.flatMap(sentence -> traverseArray(sentence.split("\\W+")))
       
      Type Parameters:
      R - the type of items in the result's traversers
      Parameters:
      flatMapFn - a stateless flatmapping function, whose result type is Jet's Traverser. It must not return null traverser, but can return an empty traverser.
      Returns:
      the newly attached stage
    • mapStateful

      @Nonnull <S,​ R> GeneralStage<R> mapStateful​(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,​? super T,​? extends R> mapFn)
      Attaches a stage that performs a stateful mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item to mapFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.

      This sample takes a stream of long numbers representing request latencies, computes the cumulative latency of all requests so far, and starts emitting alarm messages when the cumulative latency crosses a "bad behavior" threshold:

      
       StreamStage<Long> latencyAlarms = latencies.mapStateful(
               LongAccumulator::new,
               (sum, latency) -> {
                   sum.add(latency);
                   long cumulativeLatency = sum.get();
                   return (cumulativeLatency <= LATENCY_THRESHOLD)
                           ? null
                           : cumulativeLatency;
               }
       );
       
      This code has the same result as latencies.rollingAggregate(summing()).
      Type Parameters:
      S - type of the state object
      R - type of the result
      Parameters:
      createFn - function that returns the state object
      mapFn - function that receives the state object and the input item and outputs the result item. It may modify the state object.
    • filterStateful

      @Nonnull <S> GeneralStage<T> filterStateful​(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiPredicateEx<? super S,​? super T> filterFn)
      Attaches a stage that performs a stateful filtering operation. createFn returns the object that holds the state. Jet passes this object along with each input item to filterFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.

      This sample decimates the input (throws out every 10th item):

      
       GeneralStage<String> decimated = input.filterStateful(
               LongAccumulator::new,
               (counter, item) -> {
                   counter.add(1);
                   return counter.get() % 10 != 0;
               }
       );
       
      Type Parameters:
      S - type of the state object
      Parameters:
      createFn - function that returns the state object
      filterFn - function that receives the state object and the input item and produces the boolean result. It may modify the state object.
    • flatMapStateful

      @Nonnull <S,​ R> GeneralStage<R> flatMapStateful​(@Nonnull SupplierEx<? extends S> createFn, @Nonnull BiFunctionEx<? super S,​? super T,​? extends Traverser<R>> flatMapFn)
      Attaches a stage that performs a stateful flat-mapping operation. createFn returns the object that holds the state. Jet passes this object along with each input item to flatMapFn, which can update the object's state. The state object will be included in the state snapshot, so it survives job restarts. For this reason it must be serializable.

      This sample inserts a punctuation mark (a special string) after every 10th input string:

      
       GeneralStage<String> punctuated = input.flatMapStateful(
               LongAccumulator::new,
               (counter, item) -> {
                   counter.add(1);
                   return counter.get() % 10 == 0
                           ? Traversers.traverseItems("punctuation", item)
                           : Traversers.singleton(item);
               }
       );
       
      Type Parameters:
      S - type of the state object
      R - type of the result
      Parameters:
      createFn - function that returns the state object
      flatMapFn - function that receives the state object and the input item and outputs the result items. It may modify the state object. It must not return null traverser, but can return an empty traverser.
    • rollingAggregate

      @Nonnull default <A,​ R> GeneralStage<R> rollingAggregate​(@Nonnull AggregateOperation1<? super T,​A,​? extends R> aggrOp)
      Attaches a rolling aggregation stage. This is a special case of stateful mapping that uses an AggregateOperation. It passes each input item to the accumulator and outputs the current result of aggregation (as returned by the export primitive).

      Sample usage:

      
       stage.rollingAggregate(AggregateOperations.summing())
       
      For example, if your input is {2, 7, 8, -5}, the output will be {2, 9, 17, 12}.

      This stage is fault-tolerant and saves its state to the snapshot.

      NOTE: since the output for each item depends on all the previous items, this operation cannot be parallelized. Jet will perform it on a single member, single-threaded. Jet also supports keyed rolling aggregation which it can parallelize by partitioning.

      Type Parameters:
      R - result type of the aggregate operation
      Parameters:
      aggrOp - the aggregate operation to do the aggregation
      Returns:
      the newly attached stage
    • mapUsingService

      @Nonnull <S,​ R> GeneralStage<R> mapUsingService​(@Nonnull ServiceFactory<?,​S> serviceFactory, @Nonnull BiFunctionEx<? super S,​? super T,​? extends R> mapFn)
      Attaches a mapping stage which applies the supplied function to each input item independently and emits the function's result as the output item. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

      If the mapping result is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

      
       stage.mapUsingService(
           ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())),
           (reg, item) -> item.setDetail(reg.fetchDetail(item))
       )
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      R - the result type of the mapping function
      Parameters:
      serviceFactory - the service factory
      mapFn - a stateless mapping function
      Returns:
      the newly attached stage
    • mapUsingServiceAsync

      @Nonnull default <S,​ R> GeneralStage<R> mapUsingServiceAsync​(@Nonnull ServiceFactory<?,​S> serviceFactory, @Nonnull BiFunctionEx<? super S,​? super T,​? extends CompletableFuture<R>> mapAsyncFn)
      Asynchronous version of mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

      Uses default values for some extra parameters, so the maximum number of concurrent async operations per processor will be limited to 4 and whether or not the order of input items should be preserved will be true.

      The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

      The latency of the async call will add to the total latency of the output.

      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

      
       stage.mapUsingServiceAsync(
           ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())),
           (reg, item) -> reg.fetchDetailAsync(item)
                             .thenApply(detail -> item.setDetail(detail))
       )
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      R - the future result type of the mapping function
      Parameters:
      serviceFactory - the service factory
      mapAsyncFn - a stateless mapping function. Can map to null (return a null future)
      Returns:
      the newly attached stage
    • mapUsingServiceAsync

      @Nonnull <S,​ R> GeneralStage<R> mapUsingServiceAsync​(@Nonnull ServiceFactory<?,​S> serviceFactory, int maxConcurrentOps, boolean preserveOrder, @Nonnull BiFunctionEx<? super S,​? super T,​? extends CompletableFuture<R>> mapAsyncFn)
      Asynchronous version of mapUsingService(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends R>): the mapAsyncFn returns a CompletableFuture<R> instead of just R.

      The function can return a null future or the future can return a null result: in both cases it will act just like a filter.

      The latency of the async call will add to the total latency of the output.

      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:

      
       stage.mapUsingServiceAsync(
           ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())),
           (reg, item) -> reg.fetchDetailAsync(item)
                             .thenApply(detail -> item.setDetail(detail))
       )
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      R - the future result type of the mapping function
      Parameters:
      serviceFactory - the service factory
      maxConcurrentOps - maximum number of concurrent async operations per processor
      preserveOrder - whether the ordering of the input items should be preserved
      mapAsyncFn - a stateless mapping function. Can map to null (return a null future)
      Returns:
      the newly attached stage
    • mapUsingServiceAsyncBatched

      @Nonnull <S,​ R> GeneralStage<R> mapUsingServiceAsyncBatched​(@Nonnull ServiceFactory<?,​S> serviceFactory, int maxBatchSize, @Nonnull BiFunctionEx<? super S,​? super List<T>,​? extends CompletableFuture<List<R>>> mapAsyncFn)
      Batched version of mapUsingServiceAsync(com.hazelcast.jet.pipeline.ServiceFactory<?, S>, com.hazelcast.function.BiFunctionEx<? super S, ? super T, ? extends java.util.concurrent.CompletableFuture<R>>): mapAsyncFn takes a list of input items and returns a CompletableFuture<List<R>>. The size of the input list is limited by the given maxBatchSize.

      The number of in-flight batches being completed asynchronously is limited to 2 and this mapping operation always preserves the order of input elements.

      This transform can perform filtering by putting null elements into the output list.

      The latency of the async call will add to the total latency of the output.

      This sample takes a stream of stock items and sets the detail field on them by performing batched lookups from a registry. The max size of the items to lookup is specified as 100:

      
       stage.mapUsingServiceAsyncBatched(
           ServiceFactories.sharedService(ctx -> new ItemDetailRegistry(ctx.jetInstance())),
           100,
           (reg, itemList) -> reg
                   .fetchDetailsAsync(itemList)
                   .thenApply(detailList -> {
                       for (int i = 0; i < itemList.size(); i++) {
                           itemList.get(i).setDetail(detailList.get(i))
                       }
                   })
       )
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      R - the future result type of the mapping function
      Parameters:
      serviceFactory - the service factory
      maxBatchSize - max size of the input list
      mapAsyncFn - a stateless mapping function
      Returns:
      the newly attached stage
      Since:
      4.0
    • filterUsingService

      @Nonnull <S> GeneralStage<T> filterUsingService​(@Nonnull ServiceFactory<?,​S> serviceFactory, @Nonnull BiPredicateEx<? super S,​? super T> filterFn)
      Attaches a filtering stage which applies the provided predicate function to each input item to decide whether to pass the item to the output or to discard it. The predicate function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

      This sample takes a stream of photos, uses an image classifier to reason about their contents, and keeps only photos of cats:

      
       photos.filterUsingService(
           ServiceFactories.sharedService(ctx -> new ImageClassifier(ctx.jetInstance())),
           (classifier, photo) -> classifier.classify(photo).equals("cat")
       )
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      Parameters:
      serviceFactory - the service factory
      filterFn - a stateless filter predicate function
      Returns:
      the newly attached stage
    • flatMapUsingService

      @Nonnull <S,​ R> GeneralStage<R> flatMapUsingService​(@Nonnull ServiceFactory<?,​S> serviceFactory, @Nonnull BiFunctionEx<? super S,​? super T,​? extends Traverser<R>> flatMapFn)
      Attaches a flat-mapping stage which applies the supplied function to each input item independently and emits all items from the Traverser it returns as the output items. The traverser must be null-terminated. The mapping function receives another parameter, the service object, which Jet will create using the supplied serviceFactory.

      This sample takes a stream of products and outputs an "exploded" stream of all the parts that go into making them:

      
       StreamStage<Part> parts = products.flatMapUsingService(
           ServiceFactories.sharedService(ctx -> new PartRegistryCtx()),
           (registry, product) -> Traversers.traverseIterable(
                                      registry.fetchParts(product))
       );
       

      Interaction with fault-tolerant unbounded jobs

      If you use this stage in a fault-tolerant unbounded job, keep in mind that any state the service object maintains doesn't participate in Jet's fault tolerance protocol. If the state is local, it will be lost after a job restart; if it is saved to some durable storage, the state of that storage won't be rewound to the last checkpoint, so you'll perform duplicate updates.
      Type Parameters:
      S - type of service object
      R - the type of items in the result's traversers
      Parameters:
      serviceFactory - the service factory
      flatMapFn - a stateless flatmapping function, whose result type is Jet's Traverser. It must not return null traverser, but can return an empty traverser.
      Returns:
      the newly attached stage
    • mapUsingReplicatedMap

      @Nonnull default <K,​ V,​ R> GeneralStage<R> mapUsingReplicatedMap​(@Nonnull String mapName, @Nonnull FunctionEx<? super T,​? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
      Attaches a mapping stage where for each item a lookup in the ReplicatedMap with the supplied name is performed and the result of the lookup is merged with the item and emitted.

      If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      The mapping logic is equivalent to:

      
       K key = lookupKeyFn.apply(item);
       V value = replicatedMap.get(key);
       return mapFn.apply(item, value);
       
      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
      
       items.mapUsingReplicatedMap(
           "enriching-map",
           item -> item.getDetailId(),
           (Item item, ItemDetail detail) -> item.setDetail(detail)
       )
       
      Type Parameters:
      K - type of the key in the ReplicatedMap
      V - type of the value in the ReplicatedMap
      R - type of the output item
      Parameters:
      mapName - name of the ReplicatedMap
      lookupKeyFn - a function which returns the key to look up in the map. Must not return null
      mapFn - the mapping function
      Returns:
      the newly attached stage
    • mapUsingReplicatedMap

      @Nonnull default <K,​ V,​ R> GeneralStage<R> mapUsingReplicatedMap​(@Nonnull ReplicatedMap<K,​V> replicatedMap, @Nonnull FunctionEx<? super T,​? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
      Attaches a mapping stage where for each item a lookup in the supplied ReplicatedMap is performed and the result of the lookup is merged with the item and emitted.

      If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      The mapping logic is equivalent to:

      
       K key = lookupKeyFn.apply(item);
       V value = replicatedMap.get(key);
       return mapFn.apply(item, value);
       
      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
      
       items.mapUsingReplicatedMap(
           enrichingMap,
           item -> item.getDetailId(),
           (item, detail) -> item.setDetail(detail)
       )
       
      Type Parameters:
      K - type of the key in the ReplicatedMap
      V - type of the value in the ReplicatedMap
      R - type of the output item
      Parameters:
      replicatedMap - the ReplicatedMap to lookup from
      lookupKeyFn - a function which returns the key to look up in the map. Must not return null
      mapFn - the mapping function
      Returns:
      the newly attached stage
    • mapUsingIMap

      @Nonnull default <K,​ V,​ R> GeneralStage<R> mapUsingIMap​(@Nonnull String mapName, @Nonnull FunctionEx<? super T,​? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
      Attaches a mapping stage where for each item a lookup in the IMap with the supplied name is performed and the result of the lookup is merged with the item and emitted.

      If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      The mapping logic is equivalent to:

      
       K key = lookupKeyFn.apply(item);
       V value = map.get(key);
       return mapFn.apply(item, value);
       
      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
      
       items.mapUsingIMap(
           "enriching-map",
           item -> item.getDetailId(),
           (Item item, ItemDetail detail) -> item.setDetail(detail)
       )
       
      See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned version of this operation.
      Type Parameters:
      K - type of the key in the IMap
      V - type of the value in the IMap
      R - type of the output item
      Parameters:
      mapName - name of the IMap
      lookupKeyFn - a function which returns the key to look up in the map. Must not return null
      mapFn - the mapping function
      Returns:
      the newly attached stage
    • mapUsingIMap

      @Nonnull default <K,​ V,​ R> GeneralStage<R> mapUsingIMap​(@Nonnull IMap<K,​V> iMap, @Nonnull FunctionEx<? super T,​? extends K> lookupKeyFn, @Nonnull BiFunctionEx<? super T,​? super V,​? extends R> mapFn)
      Attaches a mapping stage where for each item a lookup in the supplied IMap is performed and the result of the lookup is merged with the item and emitted.

      If the result of the mapping is null, it emits nothing. Therefore this stage can be used to implement filtering semantics as well.

      The mapping logic is equivalent to:

      
       K key = lookupKeyFn.apply(item);
       V value = map.get(key);
       return mapFn.apply(item, value);
       
      This sample takes a stream of stock items and sets the detail field on them by looking up from a registry:
      
       items.mapUsingIMap(
           enrichingMap,
           item -> item.getDetailId(),
           (item, detail) -> item.setDetail(detail)
       )
       
      See also GeneralStageWithKey.mapUsingIMap(java.lang.String, com.hazelcast.function.BiFunctionEx<? super T, ? super V, ? extends R>) for a partitioned version of this operation.
      Type Parameters:
      K - type of the key in the IMap
      V - type of the value in the IMap
      R - type of the output item
      Parameters:
      iMap - the IMap to lookup from
      lookupKeyFn - a function which returns the key to look up in the map. Must not return null
      mapFn - the mapping function
      Returns:
      the newly attached stage
    • hashJoin

      @Nonnull <K,​ T1_IN,​ T1,​ R> GeneralStage<R> hashJoin​(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,​? super T,​? super T1_IN,​? extends T1> joinClause1, @Nonnull BiFunctionEx<T,​T1,​R> mapToOutputFn)
      Attaches to both this and the supplied stage a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

      This sample joins a stream of users to a stream of countries and outputs a stream of users with the country field set:

      
       // Types of the input stages:
       BatchStage<User> users;
       BatchStage<Map.Entry<Long, Country>> idAndCountry;
      
       users.hashJoin(
           idAndCountry,
           JoinClause.joinMapEntries(User::getCountryId),
           (user, country) -> user.setCountry(country)
       )
       
      Type Parameters:
      K - the type of the join key
      T1_IN - the type of stage1 items
      T1 - the result type of projection on stage1 items
      R - the resulting output type
      Parameters:
      stage1 - the stage to hash-join with this one
      joinClause1 - specifies how to join the two streams
      mapToOutputFn - function to map the joined items to the output value
      Returns:
      the newly attached stage
    • innerHashJoin

      @Nonnull <K,​ T1_IN,​ T1,​ R> GeneralStage<R> innerHashJoin​(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K,​? super T,​? super T1_IN,​? extends T1> joinClause1, @Nonnull BiFunctionEx<T,​T1,​R> mapToOutputFn)
      Attaches to both this and the supplied stage an inner hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

      This sample joins a stream of users to a stream of countries and outputs a stream of users with the country field set:

      
       // Types of the input stages:
       BatchStage<User> users;
       BatchStage<Map.Entry<Long, Country>> idAndCountry;
      
       users.innerHashJoin(
           idAndCountry,
           JoinClause.joinMapEntries(User::getCountryId),
           (user, country) -> user.setCountry(country)
       )
       

      This method is similar to hashJoin(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.function.BiFunctionEx<T, T1, R>) method, but it guarantees that both input items will be not-null. Nulls will be filtered out before reaching #mapToOutputFn.

      Type Parameters:
      K - the type of the join key
      T1_IN - the type of stage1 items
      T1 - the result type of projection on stage1 items
      R - the resulting output type
      Parameters:
      stage1 - the stage to hash-join with this one
      joinClause1 - specifies how to join the two streams
      mapToOutputFn - function to map the joined items to the output value
      Returns:
      the newly attached stage
      Since:
      4.1
    • hashJoin2

      @Nonnull <K1,​ K2,​ T1_IN,​ T2_IN,​ T1,​ T2,​ R> GeneralStage<R> hashJoin2​(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,​? super T,​? super T1_IN,​? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,​? super T,​? super T2_IN,​? extends T2> joinClause2, @Nonnull TriFunction<T,​T1,​T2,​R> mapToOutputFn)
      Attaches to this and the two supplied stages a hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

      This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the country and company fields set:

      
       // Types of the input stages:
       BatchStage<User> users;
       BatchStage<Map.Entry<Long, Country>> idAndCountry;
       BatchStage<Map.Entry<Long, Company>> idAndCompany;
      
       users.hashJoin2(
           idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
           idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
           (user, country, company) -> user.setCountry(country).setCompany(company)
       )
       
      Type Parameters:
      K1 - the type of key for stage1
      T1_IN - the type of stage1 items
      T1 - the result type of projection of stage1 items
      K2 - the type of key for stage2
      T2_IN - the type of stage2 items
      T2 - the result type of projection of stage2 items
      R - the resulting output type
      Parameters:
      stage1 - the first stage to join
      joinClause1 - specifies how to join with stage1
      stage2 - the second stage to join
      joinClause2 - specifies how to join with stage2
      mapToOutputFn - function to map the joined items to the output value
      Returns:
      the newly attached stage
    • innerHashJoin2

      @Nonnull <K1,​ K2,​ T1_IN,​ T2_IN,​ T1,​ T2,​ R> GeneralStage<R> innerHashJoin2​(@Nonnull BatchStage<T1_IN> stage1, @Nonnull JoinClause<K1,​? super T,​? super T1_IN,​? extends T1> joinClause1, @Nonnull BatchStage<T2_IN> stage2, @Nonnull JoinClause<K2,​? super T,​? super T2_IN,​? extends T2> joinClause2, @Nonnull TriFunction<T,​T1,​T2,​R> mapToOutputFn)
      Attaches to this and the two supplied stages a inner hash-joining stage and returns it. This stage plays the role of the primary stage in the hash-join. Please refer to the package javadoc for a detailed description of the hash-join transform.

      This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the country and company fields set:

      
       // Types of the input stages:
       BatchStage<User> users;
       BatchStage<Map.Entry<Long, Country>> idAndCountry;
       BatchStage<Map.Entry<Long, Company>> idAndCompany;
      
       users.innerHashJoin2(
           idAndCountry, JoinClause.joinMapEntries(User::getCountryId),
           idAndCompany, JoinClause.joinMapEntries(User::getCompanyId),
           (user, country, company) -> user.setCountry(country).setCompany(company)
       )
       

      This method is similar to hashJoin2(com.hazelcast.jet.pipeline.BatchStage<T1_IN>, com.hazelcast.jet.pipeline.JoinClause<K1, ? super T, ? super T1_IN, ? extends T1>, com.hazelcast.jet.pipeline.BatchStage<T2_IN>, com.hazelcast.jet.pipeline.JoinClause<K2, ? super T, ? super T2_IN, ? extends T2>, com.hazelcast.jet.function.TriFunction<T, T1, T2, R>) method, but it guarantees that both input items will be not-null. Nulls will be filtered out before reaching #mapToOutputFn.

      Type Parameters:
      K1 - the type of key for stage1
      T1_IN - the type of stage1 items
      T1 - the result type of projection of stage1 items
      K2 - the type of key for stage2
      T2_IN - the type of stage2 items
      T2 - the result type of projection of stage2 items
      R - the resulting output type
      Parameters:
      stage1 - the first stage to join
      joinClause1 - specifies how to join with stage1
      stage2 - the second stage to join
      joinClause2 - specifies how to join with stage2
      mapToOutputFn - function to map the joined items to the output value
      Returns:
      the newly attached stage
      Since:
      4.1
    • hashJoinBuilder

      @Nonnull GeneralHashJoinBuilder<T> hashJoinBuilder()
      Returns a fluent API builder object to construct a hash join operation with any number of contributing stages. It is mainly intended for hash-joins with three or more enriching stages. For one or two stages prefer the direct stage.hashJoinN(...) calls because they offer more static type safety.

      This sample joins a stream of users to streams of countries and companies, and outputs a stream of users with the country and company fields set:

      
       // Types of the input stages:
       StreamStage<User> users;
       BatchStage<Map.Entry<Long, Country>> idAndCountry;
       BatchStage<Map.Entry<Long, Company>> idAndCompany;
      
       StreamHashJoinBuilder<User> builder = users.hashJoinBuilder();
       Tag<Country> tCountry = builder.add(idAndCountry,
               JoinClause.joinMapEntries(User::getCountryId));
       Tag<Company> tCompany = builder.add(idAndCompany,
               JoinClause.joinMapEntries(User::getCompanyId));
       StreamStage<User> joined = builder.build((user, itemsByTag) ->
               user.setCountry(itemsByTag.get(tCountry)).setCompany(itemsByTag.get(tCompany)));
       
      Returns:
      the newly attached stage
    • groupingKey

      @Nonnull <K> GeneralStageWithKey<T,​K> groupingKey​(@Nonnull FunctionEx<? super T,​? extends K> keyFn)
      Specifies the function that will extract a key from the items in the associated pipeline stage. This enables the operations that need the key, such as grouped aggregation.

      Sample usage:

      
       users.groupingKey(User::getId)
       

      Note: make sure the extracted key is not-null, it would fail the job otherwise. Also make sure that it implements equals() and hashCode().

      Type Parameters:
      K - type of the key
      Parameters:
      keyFn - function that extracts the grouping key
      Returns:
      the newly attached stage
    • addTimestamps

      @Nonnull StreamStage<T> addTimestamps​(@Nonnull ToLongFunctionEx<? super T> timestampFn, long allowedLag)
      Adds a timestamp to each item in the stream using the supplied function and specifies the allowed amount of disorder between them. As the stream moves on, the timestamps must increase, but you can tell Jet to accept some items that "come in late", i.e., have a lower timestamp than the items before them. The allowedLag parameter controls by how much the timestamp can be lower than the highest one observed so far. If it is even lower, Jet will drop the item as being "too late".

      For example, if the sequence of the timestamps is [1,4,3,2] and you configured the allowed lag as 1, Jet will let through the event with timestamp 3, but it will drop the last one (timestamp 2).

      The amount of lag you configure strongly influences the latency of Jet's output. Jet cannot finalize the window until it knows it has observed all the events belonging to it, and the more lag it must tolerate, the longer will it have to wait for possible latecomers. On the other hand, if you don't allow enough lag, you face the risk of failing to account for the data that came in after the results were already emitted.

      Sample usage:

      
       events.addTimestamps(Event::getTimestamp, 1000)
       

      Note: This method adds the timestamps after the source emitted them. When timestamps are added at this moment, source partitions won't be coalesced properly and will be treated as a single stream. The allowed lag will need to cover for the additional disorder introduced by merging the streams. The streams are merged in an unpredictable order and it can happen, for example, that after the job was suspended for a long time, there can be a very recent event in partition1 and a very old event partition2. If partition1 happens to be merged first, the recent event could render the old one late, if the allowed lag is not large enough.
      To add timestamps in source, use withTimestamps().

      Warning: make sure the property you access in timestampFn isn't null, it would fail the job. Also that there are no nonsensical values such as -1, MIN_VALUE, 2100-01-01 etc - we'll treat those as real timestamps and they can cause unspecified behaviour.

      Parameters:
      timestampFn - a function that returns the timestamp for each item, typically in milliseconds
      allowedLag - the allowed lag behind the top observed timestamp. Time unit is the same as the unit used by timestampFn
      Returns:
      the newly attached stage
      Throws:
      IllegalArgumentException - if this stage already has timestamps
    • writeTo

      @Nonnull SinkStage writeTo​(@Nonnull Sink<? super T> sink)
      Attaches a sink stage, one that accepts data but doesn't emit any. The supplied argument specifies what to do with the received data (typically push it to some outside resource).

      You cannot reuse the sink in other writeTo calls. If you want to write multiple stages to the same sink, use Pipeline.writeTo(com.hazelcast.jet.pipeline.Sink<? super T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>, com.hazelcast.jet.pipeline.GeneralStage<? extends T>...). This will be more efficient than creating a new sink each time.

      Returns:
      the newly attached sink stage
    • peek

      @Nonnull GeneralStage<T> peek​(@Nonnull PredicateEx<? super T> shouldLogFn, @Nonnull FunctionEx<? super T,​? extends CharSequence> toStringFn)
      Attaches a peeking stage which logs this stage's output and passes it through without transformation. For each item the stage emits, it:
      1. uses the shouldLogFn predicate to see whether to log the item
      2. if yes, uses then uses toStringFn to get the item's string representation
      3. logs the string at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
      The stage logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development use, when running Jet on a local machine.

      Sample usage:

      
       users.peek(
           user -> user.getName().size() > 100,
           User::getName
       )
       
      Parameters:
      shouldLogFn - a function to filter the logged items. You can use alwaysTrue() as a pass-through filter when you don't need any filtering.
      toStringFn - a function that returns a string representation of the item
      Returns:
      the newly attached stage
      See Also:
      peek(FunctionEx), peek()
    • peek

      @Nonnull default GeneralStage<T> peek​(@Nonnull FunctionEx<? super T,​? extends CharSequence> toStringFn)
      Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it:
      1. uses toStringFn to get a string representation of the item
      2. logs the string at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>
      The stage logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development use, when running Jet on a local machine.

      Sample usage:

      
       users.peek(User::getName)
       
      Parameters:
      toStringFn - a function that returns a string representation of the item
      Returns:
      the newly attached stage
      See Also:
      peek(PredicateEx, FunctionEx), peek()
    • peek

      @Nonnull default GeneralStage<T> peek()
      Adds a peeking layer to this compute stage which logs its output. For each item the stage emits, it logs the result of its toString() method at the INFO level to the log category com.hazelcast.jet.impl.processor.PeekWrappedP.<vertexName>#<processorIndex>. The stage logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development use, when running Jet on a local machine.
      Returns:
      the newly attached stage
      See Also:
      peek(PredicateEx, FunctionEx), peek(FunctionEx)
    • customTransform

      @Nonnull <R> GeneralStage<R> customTransform​(@Nonnull String stageName, @Nonnull SupplierEx<Processor> procSupplier)
      Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

      Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

      Type Parameters:
      R - the type of the output items
      Parameters:
      stageName - a human-readable name for the custom stage
      procSupplier - the supplier of processors
      Returns:
      the newly attached stage
    • customTransform

      @Nonnull <R> GeneralStage<R> customTransform​(@Nonnull String stageName, @Nonnull ProcessorSupplier procSupplier)
      Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

      Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

      Type Parameters:
      R - the type of the output items
      Parameters:
      stageName - a human-readable name for the custom stage
      procSupplier - the supplier of processors
      Returns:
      the newly attached stage
    • customTransform

      @Nonnull <R> GeneralStage<R> customTransform​(@Nonnull String stageName, @Nonnull ProcessorMetaSupplier procSupplier)
      Attaches a stage with a custom transform based on the provided supplier of Core API Processors.

      Note that the type parameter of the returned stage is inferred from the call site and not propagated from the processor that will produce the result, so there is no actual type safety provided.

      Type Parameters:
      R - the type of the output items
      Parameters:
      stageName - a human-readable name for the custom stage
      procSupplier - the supplier of processors
      Returns:
      the newly attached stage
    • setLocalParallelism

      @Nonnull GeneralStage<T> setLocalParallelism​(int localParallelism)
      Description copied from interface: Stage
      Sets the preferred local parallelism (number of processors per Jet cluster member) this stage will configure its DAG vertices with. Jet always uses the same number of processors on each member, so the total parallelism automatically increases if another member joins the cluster.

      While most stages are backed by 1 vertex, there are exceptions. If a stage uses two vertices, each of them will have the given local parallelism, so in total there will be twice as many processors per member.

      The default value is -1 and it signals to Jet to figure out a default value. Jet will determine the vertex's local parallelism during job initialization from the global default and the processor meta-supplier's preferred value.

      Specified by:
      setLocalParallelism in interface Stage
      Returns:
      this stage
    • setName

      @Nonnull GeneralStage<T> setName​(@Nonnull String name)
      Description copied from interface: Stage
      Overrides the default name of the stage with the name you choose and returns the stage. This can be useful for debugging purposes, to better distinguish pipeline stages in the diagnostic output.
      Specified by:
      setName in interface Stage
      Parameters:
      name - the stage name
      Returns:
      this stage