Class EventTimePolicy<T>

java.lang.Object
com.hazelcast.jet.core.EventTimePolicy<T>
Type Parameters:
T - event type
All Implemented Interfaces:
Serializable

public final class EventTimePolicy<T>
extends Object
implements Serializable
A holder of functions and parameters Jet needs to handle event time and the associated watermarks. These are the components:
  • timestampFn: extracts the timestamp from an event in the stream
  • newWmPolicyFn: a factory of watermark policy objects. Refer to its documentation for explanation.
  • frame size and frame offset for watermark throttling: they allow the processor to filter out redundant watermark items before emitting them. For example, a sliding/tumbling window processor doesn't need to observe more than one watermark item per frame.
  • idleTimeoutMillis: a measure to mitigate the issue with temporary lulls in a distributed event stream. It pertains to each partition of a data source independently. If Jet doesn't receive any events from a given partition for this long, it will mark it as "idle" and let the watermark in downstream vertices advance as if the partition didn't exist.
  • wrapFn: a function that transforms a given event and its timestamp into the item to emit from the processor. For example, the Pipeline API uses this to wrap items into JetEvents as a way to propagate the event timestamps through the pipeline regardless of the transformation the user does on the event objects themselves.
This class should be used with EventTimeMapper when implementing a source processor.
Since:
3.0
See Also:
Serialized Form
  • Field Details

    • DEFAULT_IDLE_TIMEOUT

      public static final long DEFAULT_IDLE_TIMEOUT
      The default idle timeout in milliseconds.
      See Also:
      Constant Field Values
  • Method Details

    • eventTimePolicy

      public static <T> EventTimePolicy<T> eventTimePolicy​(@Nullable ToLongFunctionEx<? super T> timestampFn, @Nonnull ObjLongBiFunction<? super T,​?> wrapFn, @Nonnull SupplierEx<? extends WatermarkPolicy> newWmPolicyFn, long watermarkThrottlingFrameSize, long watermarkThrottlingFrameOffset, long idleTimeoutMillis)
      Creates and returns a new event time policy. To get a policy that results in no timestamping, call noEventTime().
      Parameters:
      timestampFn - function that extracts the timestamp from the event; if null, Jet will use the source's native timestamp
      wrapFn - function that transforms the received item and its timestamp into the emitted item
      newWmPolicyFn - factory of the watermark policy objects
      watermarkThrottlingFrameSize - the frame length to which we throttle watermarks, see watermarkThrottlingFrameSize()
      watermarkThrottlingFrameOffset - the frame offset to which we throttle watermarks, see watermarkThrottlingFrameOffset()
      idleTimeoutMillis - the timeout after which a partition will be marked as idle. Use 0 to disable the feature.
    • eventTimePolicy

      public static <T> EventTimePolicy<T> eventTimePolicy​(@Nullable ToLongFunctionEx<? super T> timestampFn, @Nonnull SupplierEx<? extends WatermarkPolicy> newWmPolicyFn, long watermarkThrottlingFrameSize, long watermarkThrottlingFrameOffset, long idleTimeoutMillis)
      Creates and returns a new event time policy. To get a policy that results in no watermarks being emitted, call noEventTime().
      Parameters:
      timestampFn - function that extracts the timestamp from the event; if null, Jet will use the source's native timestamp
      newWmPolicyFn - factory of the watermark policy objects
      watermarkThrottlingFrameSize - the frame length to which we throttle watermarks, see watermarkThrottlingFrameSize()
      watermarkThrottlingFrameOffset - the frame offset to which we throttle watermarks, see watermarkThrottlingFrameOffset()
      idleTimeoutMillis - the timeout after which a partition will be marked as idle.
    • noEventTime

      public static <T> EventTimePolicy<T> noEventTime()
      Returns an event time policy that results in no timestamping. Only useful in jobs with streaming sources that don't do any aggregation. If there is an aggregation step in the job and you use these parameters, your job will keep accumulating the data without producing any output.
    • timestampFn

      @Nullable public ToLongFunctionEx<? super T> timestampFn()
      Returns the function that extracts the timestamp from the event.
    • wrapFn

      @Nonnull public ObjLongBiFunction<? super T,​?> wrapFn()
      Returns the function that transforms the received item and its timestamp into the emitted item.
    • newWmPolicyFn

      @Nonnull public SupplierEx<? extends WatermarkPolicy> newWmPolicyFn()
      Returns the factory of the watermark policy objects.
    • watermarkThrottlingFrameSize

      public long watermarkThrottlingFrameSize()
      This value together with watermarkThrottlingFrameOffset() specify the frame size the watermarks are throttled to. Generally it should match the window slide step used downstream. If there are multiple sliding windows downstream, use the greatest common denominator of them.

      If this parameter is equal to 0, all watermarks will be suppressed.

      Technically, a watermark should be emitted after every increase in event time. Because watermarks are broadcast from each processor to all downstream processors, this will bring some overhead. But the watermarks are only needed for window aggregation and only when a window should close, that is at the frame boundary of a sliding window. To reduce the amount of watermarks on the stream, you can configure to emit only those watermarks that would trigger an emission of a new window.

    • watermarkThrottlingFrameOffset

      public long watermarkThrottlingFrameOffset()
    • idleTimeoutMillis

      public long idleTimeoutMillis()
      Returns the amount of time allowed to pass without receiving any events from a partition before marking it as "idle". When the partition becomes idle, the processor emits an WatermarkCoalescer.IDLE_MESSAGE to its output edges. This signals Jet that the watermark can advance as if the partition didn't exist.

      If you supply a zero or negative value, partitions will never be marked as idle.