Interface Observable<T>

Type Parameters:
T - type of the values in the sequence
All Superinterfaces:
Iterable<T>

public interface Observable<T>
extends Iterable<T>
Represents a flowing sequence of events produced by observable sinks. To observe the events, call jet.getObservable(name).addObserver(myObserver).

The Observable is backed by a Ringbuffer, which, once created, has a fixed capacity for storing messages. It supports reading by multiple Observer Observers, which will all observe the same sequence of messages. A new Observer will start reading automatically from the oldest sequence available. Once the capacity is full, the oldest messages will be overwritten as new ones arrive.

The Ringbuffer's capacity defaults to 10000, but can be changed (via the configureCapacity(int) method), as long as the Ringbuffer hasn't been created yet (see the "Lifecycle" section below).

In addition to data events, the Observer can also observe completion and failure events. Completion means that no further values will appear in the sequence. Failure means that something went wrong during the job execution .

Lifecycle

When talking about the lifecycle of an Observable (which is basically just a client side object and has a lifecycle just like any other POJO) it's better to actually consider the lifecycle of the underlying Ringbuffer, since that is the significant distributed entity.

The lifecycle of the Ringbuffer is decoupled from the lifecycle of the job. The Ringbuffer is created either when the user gets a reference to its equivalent Observable (through JetInstance.getObservable()) and registers the first Observer on it (through Observable.addObserver()) or when the job containing the sink for it starts executing.

The Ringbuffer must be explicitly destroyed when it's no longer in use, or data will be retained in the cluster. This is done via the Observable.destroy() method. Note: even if the Observable POJO gets lost and its underlying Ringbuffer is leaked in the cluster, it's still possible to manually destroy it later by creating another Observable instance with the same name and calling destroy() on that.

Important: The same Observable must not be used again in a new job since this will cause completion events interleaving and causing data loss or other unexpected behaviour. Using one observable name in multiple observable sinks in the same job is allowed, this will not produce multiple completion or error events (just an intermingling of the results from the two sinks, but that should be fine in some use cases).

Since:
4.0
  • Method Details

    • name

      Name of this instance.
    • addObserver

      @Nonnull UUID addObserver​(@Nonnull Observer<T> observer)
      Registers an Observer to this Observable. It will receive all events currently in the backing Ringbuffer and then continue receiving any future events.
      Returns:
      registration ID associated with the added Observer, can be used to remove the Observer later
    • removeObserver

      void removeObserver​(@Nonnull UUID registrationId)
      Removes a previously added Observer identified by its assigned registration ID. A removed Observer will not get notified about further events.
    • configureCapacity

      Observable<T> configureCapacity​(int capacity)
      Set the capacity of the underlying Ringbuffer, which defaults to 10000.

      This method can be called only before the Ringbuffer gets created. This means before any Observers are added to the Observable and before any jobs containing observable sinks (with the same observable name) are submitted for execution.

      Important: only configure capacity once, multiple configuration are currently not supported.

      Throws:
      IllegalStateException - if the Ringbuffer has already been created
    • getConfiguredCapacity

      int getConfiguredCapacity()
      Returns the configured capacity of the underlying Ringbuffer..

      This method only works if the backing Ringbuffer has already been created. If so, it will be queried for its actual capacity, which can't be changed any longer. (Reminder: the Ringbuffer gets created either when the first Observer is added or when the job containing the observable sink (with the same observable name) is submitted for execution.)

      Throws:
      IllegalStateException - if the backing Ringbuffer has not yet been created
    • iterator

      @Nonnull default Iterator<T> iterator()
      Returns an iterator over the sequence of events produced by this Observable. If there are currently no events to observe, the iterator's hasNext() and next() methods will block. A completion event completes the iterator (hasNext() will return false) and a failure event makes the iterator's methods throw the underlying exception.

      If used against an Observable populated from a streaming job, the iterator will complete only in the case of an error or job cancellation.

      The iterator is not thread-safe.

      The iterator is backed by a blocking concurrent queue which stores all events until consumed.

      Specified by:
      iterator in interface Iterable<T>
    • toFuture

      @Nonnull default <R> CompletableFuture<R> toFuture​(@Nonnull Function<Stream<T>,​R> fn)
      Allows you to post-process the results of a Jet job on the client side using the standard Java Stream API. You provide a function that will receive the job results as a Stream<T> and return a single result (which can in fact be another Stream, if so desired).

      Returns a CompletableFuture that will become completed once your function has received all the job results through its Stream and returned the final result.

      A trivial example is counting, like this: observable.toFuture(Stream::count), however the Stream API is quite rich and you can perform arbitrary transformations and aggregations.

      This feature is intended to be used only on the results of a batch job. On an unbounded streaming job the stream-collecting operation will never reach the final result.

      Parameters:
      fn - transform function which takes the stream of observed values and produces an altered value from it, which could also be a stream
    • destroy

      void destroy()
      Removes all previously registered observers and destroys the backing Ringbuffer.

      Note: if you call this while a job that publishes to this Observable is still active, it will silently create a new Ringbuffer and go on publishing to it.