Interface StageWithWindow<T>

Type Parameters:
T - type of the input item

public interface StageWithWindow<T>
Represents an intermediate step in the construction of a pipeline stage that performs a windowed aggregate operation. You can perform a global aggregation or add a grouping key to perform a group-and-aggregate operation.
Since:
3.0
  • Method Details

    • streamStage

      @Nonnull StreamStage<T> streamStage()
      Returns the pipeline stage associated with this object. It is the stage to which you are about to attach an aggregating stage.
    • windowDefinition

      @Nonnull WindowDefinition windowDefinition()
      Returns the definition of the window for the windowed aggregation operation that you are about to construct using this object.
    • groupingKey

      @Nonnull <K> StageWithKeyAndWindow<T,​K> groupingKey​(@Nonnull FunctionEx<? super T,​? extends K> keyFn)
      Specifies the function that will extract the grouping key from the items in the associated pipeline stage and moves on to the step in which you'll complete the construction of a windowed group-and-aggregate stage.

      Note: make sure the extracted key is not-null, it would fail the job otherwise. Also make sure that it implements equals() and hashCode().

      Type Parameters:
      K - type of the key
      Parameters:
      keyFn - function that extracts the grouping key
    • distinct

      @Nonnull default StreamStage<WindowResult<T>> distinct()
      Attaches a stage that passes through just the items that are distinct within their window (no two items emitted for a window are equal). There is no guarantee which one of the items with the same key will pass through. The stage emits results in the form of WindowResult(windowEnd, distinctItem).
      Returns:
      the newly attached stage
    • aggregate

      @Nonnull <R> StreamStage<WindowResult<R>> aggregate​(@Nonnull AggregateOperation1<? super T,​?,​? extends R> aggrOp)
      Attaches a stage that performs the given aggregate operation over all the items that belong to a given window. Once the window is complete, it emits a WindowResult with the result of the aggregate operation and the timestamp denoting the window's ending time.

      Sample usage:

      
       StreamStage<WindowResult<Long>> aggregated = pageVisits
           .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
           .aggregate(AggregateOperations.counting());
       
      Type Parameters:
      R - the type of the result
      Parameters:
      aggrOp - the aggregate operation to perform
      See Also:
      AggregateOperations
    • aggregate2

      @Nonnull <T1,​ R> StreamStage<WindowResult<R>> aggregate2​(@Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation2<? super T,​? super T1,​?,​? extends R> aggrOp)
      Attaches a stage that performs the given aggregate operation over all the items that belong to the same window. It receives the items from both this stage and stage1. Once a given window is complete, it invokes mapToOutputFn with the result of the aggregate operation and emits its return value as the window result.

      Sample usage:

      
       StreamStage<WindowResult<Tuple2<Long, Long>>> aggregated = pageVisits
           .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
           .aggregate2(
               addToCarts,
               AggregateOperations.aggregateOperation2(
                   AggregateOperations.counting(),
                   AggregateOperations.counting())
           );
       
      This variant requires you to provide a two-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of two single-input aggregate operations, one for each input stream, then you should use stage0.aggregate2(aggrOp0, stage1, aggrOp1) because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.

      The aggregating stage emits a single item for each completed window.

      Type Parameters:
      T1 - type of items in stage1
      R - type of the aggregation result
      Parameters:
      aggrOp - the aggregate operation to perform
      See Also:
      AggregateOperations
    • aggregate2

      @Nonnull default <T1,​ R0,​ R1> StreamStage<WindowResult<Tuple2<R0,​R1>>> aggregate2​(@Nonnull AggregateOperation1<? super T,​?,​? extends R0> aggrOp0, @Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,​?,​? extends R1> aggrOp1)
      Attaches a stage that performs the given co-aggregate operations over the items from this stage and stage1 you supply. It performs the aggregation separately for each input stage: aggrOp0 on this stage and aggrOp1 on stage1. Once it has received all the items belonging to a window, it emits a WindowResult(Tuple2(result0, result1)).

      The aggregating stage emits a single item for each completed window.

      Sample usage:

      
       StreamStage<WindowResult<Tuple2<Long, Long>>> aggregated = pageVisits
           .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
           .aggregate2(
               AggregateOperations.counting(),
               addToCarts,
               AggregateOperations.counting()
           );
       
      Type Parameters:
      T1 - type of the items in the other stage
      R0 - type of the aggregated result for this stage
      R1 - type of the aggregated result for the other stage
      Parameters:
      aggrOp0 - aggregate operation to perform on this stage
      stage1 - the other stage
      aggrOp1 - aggregate operation to perform on the other stage
      See Also:
      AggregateOperations
    • aggregate3

      @Nonnull <T1,​ T2,​ R> StreamStage<WindowResult<R>> aggregate3​(@Nonnull StreamStage<T1> stage1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation3<? super T,​? super T1,​? super T2,​?,​? extends R> aggrOp)
      Attaches a stage that performs the given aggregate operation over the items it receives from this stage as well as stage1 and stage2 you supply. Once a given window is complete, it emits a WindowResult with the result of the aggregate operation and the timestamp denoting the window's ending time.

      Sample usage:

      
       StreamStage<WindowResult<Tuple3<Long, Long, Long>>> aggregated = pageVisits
           .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
           .aggregate3(
               addToCarts,
               payments,
               AggregateOperations.aggregateOperation3(
                   AggregateOperations.counting(),
                   AggregateOperations.counting(),
                   AggregateOperations.counting())
           );
       
      This variant requires you to provide a three-input aggregate operation (refer to its Javadoc for a simple example). If you can express your logic in terms of three single-input aggregate operations, one for each input stream, then you should use stage0.aggregate2(aggrOp0, stage1, aggrOp1, stage2, aggrOp2) because it offers a simpler API and you can use the already defined single-input operations. Use this variant only when you have the need to implement an aggregate operation that combines the input streams into the same accumulator.
      Type Parameters:
      T1 - type of items in stage1
      T2 - type of items in stage2
      R - type of the result
      Parameters:
      stage1 - the first additional stage
      stage2 - the second additional stage
      aggrOp - the aggregate operation to perform
      See Also:
      AggregateOperations
    • aggregate3

      @Nonnull default <T1,​ T2,​ R0,​ R1,​ R2> StreamStage<WindowResult<Tuple3<R0,​R1,​R2>>> aggregate3​(@Nonnull AggregateOperation1<? super T,​?,​? extends R0> aggrOp0, @Nonnull StreamStage<T1> stage1, @Nonnull AggregateOperation1<? super T1,​?,​? extends R1> aggrOp1, @Nonnull StreamStage<T2> stage2, @Nonnull AggregateOperation1<? super T2,​?,​? extends R2> aggrOp2)
      Attaches a stage that performs the given aggregate operation over all the items that belong to the same window. It receives the items from both this stage and stage1. It performs the aggregation separately for each input stage: aggrOp0 on this stage, aggrOp1 on stage1 and aggrOp2 on stage2. Once it has received all the items belonging to a window, it emits a WindowResult(Tuple3(result0, result1, result2)).

      The aggregating stage emits a single item for each completed window.

      Sample usage:

      
       StreamStage<WindowResult<Tuple3<Long, Long, Long>>> aggregated = pageVisits
           .window(SlidingWindowDefinition.sliding(MINUTES.toMillis(1), SECONDS.toMillis(1)))
           .aggregate3(
               AggregateOperations.counting(),
               addToCarts,
               AggregateOperations.counting(),
               payments,
               AggregateOperations.counting()
           );
       
      Type Parameters:
      T1 - type of items in stage1
      T2 - type of items in stage2
      R0 - type of the result from stream-0
      R1 - type of the result from stream-1
      R2 - type of the result from stream-2
      Parameters:
      aggrOp0 - aggregate operation to perform on this stage
      stage1 - the first additional stage
      aggrOp1 - aggregate operation to perform on stage1
      stage2 - the second additional stage
      aggrOp2 - aggregate operation to perform on stage2
      See Also:
      AggregateOperations
    • aggregateBuilder

      @Nonnull default <R0> WindowAggregateBuilder<R0> aggregateBuilder​(AggregateOperation1<? super T,​?,​? extends R0> aggrOp)
      Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get. You supply an aggregate operation for each input stage and in the output you get the individual aggregation results in a WindowResult(windowEnd, itemsByTag). Use the tag you get from builder.add(stageN, aggrOpN) to retrieve the aggregated result for that stage. Use builder.tag0() as the tag of this stage. You will also be able to supply a function to the builder that immediately transforms the results to the desired output type.

      This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct stage.aggregateN(...) calls because they offer more static type safety.

      This example defines a 1-second sliding window and counts the items in stage-0, sums those in stage-1 and takes the average of those in stage-2:

      
       Pipeline p = Pipeline.create();
       StreamStage<Long> stage0 = p.readFrom(source0).withNativeTimestamps(0L);;
       StreamStage<Long> stage1 = p.readFrom(source1).withNativeTimestamps(0L);;
       StreamStage<Long> stage2 = p.readFrom(source2).withNativeTimestamps(0L);;
       WindowAggregateBuilder<Long> b = stage0
               .window(sliding(1000, 10))
               .aggregateBuilder(AggregateOperations.counting());
       Tag<Long> tag0 = b.tag0();
       Tag<Long> tag1 = b.add(stage1,
               AggregateOperations.summingLong(Long::longValue));
       Tag<Double> tag2 = b.add(stage2,
               AggregateOperations.averagingLong(Long::longValue));
       StreamStage<WindowResult<ItemsByTag>> aggregated = b.build();
       aggregated.map(e -> String.format(
               "Timestamp %d, count of stage0: %d, sum of stage1: %d, average of stage2: %f",
               e.timestamp(), e.item().get(tag0), e.item().get(tag1), e.item().get(tag2))
       );
      
    • aggregateBuilder

      @Nonnull default WindowAggregateBuilder1<T> aggregateBuilder()
      Offers a step-by-step API to build a pipeline stage that co-aggregates the data from several input stages. The current stage will be already registered with the builder you get.

      This builder requires you to provide a multi-input aggregate operation. If you can express your logic in terms of single-input aggregate operations, one for each input stream, then you should use stage0.aggregateBuilder(aggrOp0) because it offers a simpler API. Use this builder only when you have the need to implement an aggregate operation that combines all the input streams into the same accumulator.

      This builder is mainly intended to build a co-aggregation of four or more contributing stages. For up to three stages, prefer the direct stage.aggregateN(...) calls because they offer more static type safety.

      To add the other stages, call add(stage). Collect all the tags returned from add() and use them when building the aggregate operation. Retrieve the tag of the first stage (from which you obtained this builder) by calling WindowAggregateBuilder1.tag0().

      This example takes three streams of strings, specifies a 1-second sliding window and counts the distinct strings across all streams:

      
       Pipeline p = Pipeline.create();
       StreamStage<String> stage0 = p.readFrom(source0).withNativeTimestamps(0L);;
       StreamStage<String> stage1 = p.readFrom(source1).withNativeTimestamps(0L);;
       StreamStage<String> stage2 = p.readFrom(source2).withNativeTimestamps(0L);;
       WindowAggregateBuilder1<String> b = stage0
               .window(sliding(1000, 10))
               .aggregateBuilder();
       Tag<String> tag0 = b.tag0();
       Tag<String> tag1 = b.add(stage1);
       Tag<String> tag2 = b.add(stage2);
       StreamStage<WindowResult<Integer>> aggregated = b.build(AggregateOperation
               .withCreate(HashSet<String>::new)
               .andAccumulate(tag0, (acc, item) -> acc.add(item))
               .andAccumulate(tag1, (acc, item) -> acc.add(item))
               .andAccumulate(tag2, (acc, item) -> acc.add(item))
               .andCombine(HashSet::addAll)
               .andExportFinish(HashSet::size));