007 - Extended gRPC Support
Since: 4.1
Goal
Provide a convenient way to call gRPC services with good performance as part of a pipeline.
Background
A Jet pipeline can contact outside services to process the data. The
general facility is called mapUsingService
, with variants that do
filtering and flat-mapping. With this transform, each processor creates
a proxy to a service and then invokes it for each input item. Jet also
supports services with async APIs through mapUsingServiceAsync
and in
that case it can make several concurrent requests to the same service
without any blocking.
gRPC is an RPC framework for building network services and handles concerns like transport, authentication and stub generation with support for several different programming languages.
If you have a pipeline where you want to call a gRPC service for each
input item, you can use the mapUsingService
transform directly, but
this will not give you the best performance because it will not be
making use of asynchronous calls and each processor will only have a
single request in flight a time. It's also possible to use
mapUsingServiceAsync
in combination with the non-blocking stub, but
this requires a lot of boilerplate.
Furthermore, our internal tests showed that the most efficient gRPC
communication pattern is bidirectional
streaming.
Using it is even less straightforward, requiring some fairly advanced
code in the ServiceFactory
.
As a part of the Jet-Python integration, we discovered some best practices for integrating a Jet pipeline with gRPC, and the goal is to apply these in a more generic way.
Design
The implementation includes a new module hazelcast-jet-grpc
which
is added to the extensions
folder. The module has one main entry
point which is GrpcServices
which provides the convenience for
creating ServiceFactory
for the corresponding the gRPC service.
There are two methods added, where one is for unary service and the other for the bidirectional service.
Unary Service
Unary service is the most basic gRPC service type, where each request has a matching response and the service is called in a request-response fashion.
Given the protobuf definition below for a unary service:
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
where GreeterGrpc
is the class auto-generated by the protobuf
compiler.
GrpcService has a very simple interface which only has a single method:
@FunctionalInterface
public interface GrpcService<T, R> {
/**
* Calls the requested service and returns a future which will be
* completed with the result once a response is received.
*/
@Nonnull
CompletableFuture<R> call(@Nonnull T input);
}
So for every input item, a future containing the output item is created.
The created ServiceFactory<.., GrpcService>
then can be used with the
mapUsingServiceAsync
transform as follows:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
In this implementation, Jet will simply send an async request for each item, and when the matching response comes it will be emitted further in the pipeline.
Ordering
When a unary service is used, the order of items and their matching
timestamps in a stream are always preserved. The mapUsingServiceAsync
transforms has several mechanisms for making sure that input/output
items are matched and they are emitted in the input order.
Bidirectional Streaming
In bidirectional streaming mode, the requests are sent in a stream and the replies are also sent as a stream. This can provide better throughput because there's less overhead per item and there's also opportunity to send requests in batches.
Given the proto definition below:
service Greeter {
// Sends a greeting
rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
}
We can create the following service factory:
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHello
);
The way the service is invoked in a mapUsingServiceAsync
call would
be identical.
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
.mapUsingServiceAsync(greeterService, (service, input) -> {
HelloRequest request = HelloRequest.newBuilder().setName(input).build();
return service.call(request);
})
.writeTo(Sinks.logger());
Ordering
In the bidirectional streaming style, gRPC doesn't enforce any correlation between the input and output streams. However, from a higher-level perspective, the service is just mapping the input to the output, one item at a time, and Jet must match them up based on nothing more than their order of appearance. Therefore, for each input item your gRPC service receives, it must emit exactly one output item in exactly the same order. If you want to filter out some items, you must do that by using special "null sentinel" items that you can then filter out in post-processing.
Error Handling
If the gRPC service returns an error response, Jet completes the
corresponding future with an exception. For non-fatal errors you should
use CompletableFuture.handle()
to map the exception to an appropriate
item, otherwise the job will fail.
Future Work
It may be interesting to use a builder pattern for building the gRPC factory to make the syntax a little simpler. This was investigated but dropped for now due to increased complexity.
It may be possible to support unordered bidirectional streaming by requiring a correlation id field to the requests and then matching them to the responses, but this was also considered out of scope for now.
Currently the deployment and scaling of a gRPC service is left to user, however it may be an option in the future to have a "Jet-managed" gRPC bundle which contains the service, and that can be deployed alongside the Jet nodes.