- Type Parameters:
T- type of the values in the sequence
- All Superinterfaces:
public interface Observable<T> extends Iterable<T>
Observable is backed by a
Ringbuffer, which, once
created, has a fixed capacity for storing messages. It supports reading
Observer Observers, which will all observe the same
sequence of messages. A new
Observer will start reading
automatically from the oldest sequence available. Once the capacity is
full, the oldest messages will be overwritten as new ones arrive.
In addition to data events, the
Observer can also observe
completion and failure events. Completion means that no further values
will appear in the sequence. Failure means that something went wrong
during the job execution .
When talking about the lifecycle of an
Observable (which is
basically just a client side object and has a lifecycle just like any
other POJO) it's better to actually consider the lifecycle of the
Ringbuffer, since that is the significant
The lifecycle of the
Ringbuffer is decoupled from the lifecycle
of the job. The
Ringbuffer is created either when the user
gets a reference to its equivalent
and registers the first
Observer on it (through
or when the job containing the sink for it starts executing.
Ringbuffer must be explicitly destroyed when it's no longer
in use, or data will be retained in the cluster. This is done via the
Observable.destroy() method. Note: even if the
Observable POJO gets lost and its underlying
is leaked in the cluster, it's still possible to manually destroy
it later by creating another
Observable instance with the same
name and calling
destroy() on that.
Important: The same
not be used again in a new job since this will cause
completion events interleaving and causing data loss or other unexpected
behaviour. Using one observable name in multiple
observable sinks in the same job is
allowed, this will not produce multiple completion or error events (just
an intermingling of the results from the two sinks, but that should be
fine in some use cases).
Modifier and Type Method Description
Observer<T> observer)(Registers an
()Removes all previously registered observers and destroys the backing
()Returns the configured capacity of the underlying
()Returns an iterator over the sequence of events produced by this
()Name of this instance.
UUID registrationId)(Removes a previously added
Observeridentified by its assigned registration ID.
default <R> CompletableFuture<R>
Function<Stream<T>,R> fn)(Allows you to post-process the results of a Jet job on the client side using the standard Java
nameName of this instance.
Observable. It will receive all events currently in the backing
Ringbufferand then continue receiving any future events.
- registration ID associated with the added
Observer, can be used to remove the
removeObserverRemoves a previously added
Observeridentified by its assigned registration ID. A removed
Observerwill not get notified about further events.
configureCapacitySet the capacity of the underlying
Ringbuffer, which defaults to 10000.
This method can be called only before the
Ringbuffergets created. This means before any
Observersare added to the
Observableand before any jobs containing
observable sinks(with the same observable name) are submitted for execution.
Important: only configure capacity once, multiple configuration are currently not supported.
IllegalStateException- if the
Ringbufferhas already been created
getConfiguredCapacityint getConfiguredCapacity()Returns the configured capacity of the underlying
This method only works if the backing
Ringbufferhas already been created. If so, it will be queried for its actual capacity, which can't be changed any longer. (Reminder: the
Ringbuffergets created either when the first
Observeris added or when the job containing the
observable sink(with the same observable name) is submitted for execution.)
IllegalStateException- if the backing
Ringbufferhas not yet been created
iteratorReturns an iterator over the sequence of events produced by this
Observable. If there are currently no events to observe, the iterator's
next()methods will block. A completion event completes the iterator (
hasNext()will return false) and a failure event makes the iterator's methods throw the underlying exception.
If used against an
Observablepopulated from a streaming job, the iterator will complete only in the case of an error or job cancellation.
The iterator is not thread-safe.
The iterator is backed by a blocking concurrent queue which stores all events until consumed.
toFutureAllows you to post-process the results of a Jet job on the client side using the standard Java
Stream API. You provide a function that will receive the job results as a
Stream<T>and return a single result (which can in fact be another
Stream, if so desired).
CompletableFuturethat will become completed once your function has received all the job results through its
Streamand returned the final result.
A trivial example is counting, like this:
observable.toFuture(Stream::count), however the Stream API is quite rich and you can perform arbitrary transformations and aggregations.
This feature is intended to be used only on the results of a batch job. On an unbounded streaming job the stream-collecting operation will never reach the final result.
fn- transform function which takes the stream of observed values and produces an altered value from it, which could also be a stream
destroyvoid destroy()Removes all previously registered observers and destroys the backing
Note: if you call this while a job that publishes to this
Observableis still active, it will silently create a new
Ringbufferand go on publishing to it.