Stateless Transforms
Stateless transforms are the bread and butter of a data pipeline: they transform the input into the correct shape that is required by further, more complex transforms. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.
map
Mapping is the simplest kind of stateless transformation. It simply applies a function to the input item, and passes the output to the next stage.
StreamStage<String> names = stage.map(name -> name.toLowerCase());
filter
Similar to map
, the filter
is a stateless operator that applies a
predicate to the input to decide whether to pass it to the output.
BatchStage<String> names = stage.filter(name -> !name.isEmpty());
flatMap
flatMap
is equivalent to map
, with the difference that instead of
one output item you can have arbitrary number of output items per input
item. The output type is a Traverser
, which is a Jet type similar to
an Iterator
. For example, the code below will split a sentence into
individual items consisting of words:
StreamStage<String> words = stage.flatMap(
sentence -> Traversers.traverseArray(sentence.split("\\W+"))
);
merge
This transform merges the contents of two streams into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage. The items from both sides will be interleaved in arbitrary order.
StreamStage<Trade> tradesNewYork = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesTokyo = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesNyAndTokyo = tradesNewYork.merge(tradesTokyo);
mapUsingIMap
This transform looks up each incoming item from the corresponding IMap and the result of the lookup is combined with the input item.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingIMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
The above code can be thought of as equivalent to below, where the input
is of type Order
public void getOrderDetails(Order order) {
IMap<String, ProductDetails> map = jet.getMap("products");
ProductDetails product = map.get(order.getProductId());
return new OrderDetails(order, product);
}
See Joining Static Data to a Stream for a tutorial using this operator.
mapUsingReplicatedMap
This transform is equivalent to mapUsingIMap with the
only difference that a ReplicatedMap is used instead
of an IMap
.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingReplicatedMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
With a
ReplicatedMap
, as opposed to a standardIMap
, every lookup is local. The downside is that the data is replicated to all the nodes, consuming more memory in the cluster.
mapUsingService
This transform takes an input and performs a mapping using a service object. Examples are an external HTTP-based service or some library which is loaded and initialized during runtime (such as a machine learning model).
The service itself is defined through a ServiceFactory
object. The
main difference between this operator and a simple map
is that the
service is initialized once per job. This is what makes it useful for
calling out to heavy-weight objects which are expensive to initialize
(such as HTTP connections).
Let's imagine an HTTP service which returns details for a product and
that we have wrapped this service in a ProductService
class:
interface ProductService {
ProductDetails getDetails(int productId);
}
We can then create a shared service factory as follows:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url))
.toNonCooperative();
"Shared" means that the service is thread-safe and can be called from multiple-threads, so only Jet will create just one instance on each node and share it among the parallel tasklets.
We also declared the service as "non-cooperative" because it makes blocking HTTP calls. Failing to do this would have severe consequences for the performance of not just your pipeline, but all the jobs running on the Jet cluster.
We can then perform a lookup on this service for each incoming order:
StreamStage<OrderDetails> details = orders.mapUsingService(productService,
(service, order) -> {
ProductDetails details = service.getDetails(order.getProductId);
return new OrderDetails(order, details);
}
);
mapUsingServiceAsync
This transform is identical to mapUsingService with
one important distinction: the service in this case supports
asynchronous calls, which are compatible with cooperative concurrency
and don't need extra threads. It also means that we can have multiple
requests in flight at the same time to maximize throughput. Instead of
the mapped value, this transform expects the user to supply a
CompletableFuture<T>
as the return value, which will be completed at
some later time.
For example, if we extend the previous ProductService
as follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
}
We still create the shared service factory as before:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url));
The lookup instead becomes async, and note that the transform also expects you to return
StreamStage<OrderDetails> details = orders.mapUsingServiceAsync(productService,
(service, order) -> {
CompletableFuture<ProductDetails> f = service.getDetailsAsync(order.getProductId);
return f.thenApply(details -> new OrderDetails(order, details));
}
);
The main advantage of using async communication is that we can have many invocations to the service in-flight at the same time which will result in better throughput.
mapUsingServiceAsyncBatched
This variant is very similar to the previous one, but instead of sending
one request at a time, we can send in so-called "smart batches" (for a
more in-depth look at the internals of Jet, see the Execution
Engine section). Jet will
automatically group items as they come, and allows to send requests in
batches. This can be very efficient for example for a remote service,
where instead of one roundtrip per request, you can send them in groups
to maximize throughput. If we would extend our ProductService
as
follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
CompletableFuture<List<ProductDetails>> getAllDetailsAsync(List<Integer> productIds);
}
We can then rewrite the transform as:
StreamStage<OrderDetails> details = orders.mapUsingServiceAsyncBatched(productService,
(service, orderList) -> {
List<Integer> productIds = orderList
.stream()
.map(o -> o.getProductId())
.collect(Collectors.toList())
CompletableFuture<List<ProductDetails>> f = service
.getDetailsAsync(order.getProductId());
return f.thenApply(productDetailsList -> {
List<OrderDetails> orderDetailsList = new ArrayList<>();
for (int i = 0; i < orderList; i++) {
new OrderDetails(order.get(i), productDetailsList.get(i)))
}
};
});
);
As you can see, there is some more code to write to combine the results back, but this should give better throughput given the service is able to efficient batching.
hashJoin
hashJoin
is a type of join where you have two or more inputs where all
but one of the inputs must be small enough to fit in memory. You can
consider a primary input which is accompanied by one or more
side inputs which are small enough to fit in memory. The side inputs
are joined to the primary input, which can be either a batch or
streaming stage. The side inputs must be batch stages.
StreamStage<Order> orders = pipeline
.readFrom(orderSource())
.withoutTimestamps();
BatchStage<ProductDetails> productDetails = pipeline
.readFrom(productDetailsSource());
StreamStage<OrderDetails> joined = orders.hashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
The last argument to hashJoin
is a function that gets the input and
the enriching item. Note that by default Jet does an outer join: if the
enriching stream lacks a given key, the corresponding function parameter
will be null
. You can request an inner join as well:
StreamStage<OrderDetails> joined = orders.innerHashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
In this case the product
argument is never null
and if a given key
is missing, the input Order
item is filtered out.
Jet also supports hash-joining with more streams at once through
hashJoin2
and the hashJoinBuilder
. Refer to their documentation for
more details.