The general shape of any data processing pipeline is
readFromSource -> transform -> writeToSink and the natural way to build it is from source
to sink. The
Pipeline API follows this
pattern. For example,
Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("the", "quick", "brown", "fox")) .map(item -> item.toUpperCase()) .writeTo(Sinks.logger());
In each step, such as
writeTo, you create a pipeline
stage. The stage resulting from a
writeTo operation is called a
sink stage and you can't attach more stages to it. All others are
called compute stages and expect you to attach further stages to them.
BatchStage and StreamStage
The API differentiates between batch (bounded) and stream (unbounded)
sources and this is reflected in the naming: there is a
, each offering the operations appropriate to its kind.
Depending on the data source, your pipeline will end up starting with a
batch or streaming stage. A batch source still can be used to simulate
a streaming source using the
addTimestamps method, which will
convert it into a
In this section we'll mostly use batch stages, for simplicity, but the API of operations common to both kinds is identical. Jet internally treats batches as a bounded stream. We'll explain later on how to apply windowing, which is necessary to aggregate over unbounded streams.
Adding Timestamps to a Stream
The Pipeline API guides you to set up the timestamp policy right after
you obtain a source stage.
SourceStreamStage which offers just these methods:
withNativeTimestamps()declares that the stream will use source's native timestamps. This typically refers to the timestamps that the external source system sets on each event.
withTimestamps(timestampFn)provides a function to the source that determines the timestamp of each event.
withoutTimestamps()declares that the source stage has no timestamps. Use this if you don't need them (i.e., your pipeline won't perform windowed aggregation or stateful mapping).
Exceptionally, you may need to call
withoutTimestamps() on the source
stage, then perform some transformations that determine the event
timestamps, and then call
addTimestamps(timestampFn) to instruct Jet
where to find them in the events. Some examples include an enrichment
stage that retrieves the timestamps from a side input or flat-mapping
the stream to unpack a series of events from each original item. If you
do this, however, it will no longer be the source that determines the
Prefer Assigning Timestamps at the Source
In some source implementations, especially partitioned ones like Kafka,
there is a risk of high event time skew
occurring across partitions as the Jet processor pulls the data from
them in batches, in a round-robin fashion. This problem is especially
pronounced when Jet recovers from a failure and restarts your job. In
this case Jet must catch up with all the events that arrived since
taking the last snapshot. It will receive these events at the maximum
system throughput and thus each partition will have a lot of data each
time Jet polls it. This means that the interleaving of data from
different partitions will become much more coarse-grained and there will
be sudden jumps in event time at the points of transition from one
partition to the next. The jumps can easily exceed the configured
allowedLateness and cause Jet to drop whole swaths of events as late.
In order to mitigate this issue, Jet has special logic in its partitioned source implementations that keeps separate track of the timestamps within each partition and knows how to reconcile the temporary differences that occur between them.
This feature works only if you set the timestamping policy in the source
Your pipeline can consist of multiple sources, each starting its own pipeline branch, and you are allowed to mix both kinds of stages in the same pipeline. You can merge the branches with joining transforms such as hash-join, co-group or merge .
As an example, you can merge two stages into one by using the
Pipeline p = Pipeline.create(); BatchSource<String> leftSource = TestSources.items("the", "quick", "brown", "fox"); BatchSource<String> rightSource = TestSources.items("jumps", "over", "the", "lazy", "dog"); BatchStage<String> left = p.readFrom(leftSource); BatchStage<String> right = p.readFrom(rightSource); left.merge(right) .writeTo(Sinks.logger());
Symmetrically, you can fork the output of a stage and send it to more than one destination:
Pipeline p = Pipeline.create(); BatchStage<String> src = p.readFrom(TestSources.items("the", "quick", "brown", "fox")); src.map(String::toUpperCase) .writeTo(Sinks.files("uppercase")); src.map(String::toLowerCase) .writeTo(Sinks.files("lowercase"));
The pipeline itself is a reusable object which can be passed around and submitted several times to the cluster. To execute a job, you need the following steps:
- Create an empty pipeline definition
- Start with sources, add transforms and then finally write to a sink. A pipeline without any sinks is not valid.
- Create or obtain a
JetInstance(using either embedded instance, bootstrapped or a client)
JetInstance.newJob(Pipeline)submit it to the cluster
- Wait for it complete (for batch jobs) using
Job.join()or just let it run on the cluster indefinitely, for streaming jobs.
Types of Transforms
Besides sources and sinks, Jet offers several transforms which can be used to process data. We can divide these into roughly the following categories:
- Stateless transforms: These transforms do not have any notion of state meaning that all items must be processed independently of any previous items. Examples: map, filter, flatMap, hashJoin
- Stateful transforms: These transforms accumulate data and the output depends on previously encountered items. Examples: aggregate, rollingAggregate, distinct, window
This distinction is important because any stateful computation requires the state to be saved for fault-tolerance and this has big implications in terms of operational design.