Class GrpcServices
public final class GrpcServices extends Object
ServiceFactory
implementations for calling gRPC
endpoints. The ServiceFactory
created are designed to be
used with the mapUsingServiceAsync
transform.
Currently two types of gRPC services are supported:
unary
bidirectionalStreamingService(SupplierEx, FunctionEx)
(SupplierEx, FunctionEx) bidirectionalStreaming}
- Since:
- 4.1
-
Method Summary
Modifier and Type Method Description static <T, R> ServiceFactory<?,? extends GrpcService<T,R>>
bidirectionalStreamingService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)
Creates aServiceFactory
that calls out to a bidrectional streaming gRPC service.static <T, R> ServiceFactory<?,? extends GrpcService<T,R>>
unaryService(SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)
Creates aServiceFactory
that calls out to a unary gRPC service.
-
Method Details
-
unaryService
@Nonnull public static <T, R> ServiceFactory<?,? extends GrpcService<T,R>> unaryService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends BiConsumerEx<T,io.grpc.stub.StreamObserver<R>>> callStubFn)Creates aServiceFactory
that calls out to a unary gRPC service.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );
GreeterGrpc
is the class auto-generated by the protobuf compiler.The created
ServiceFactory
should be used with themapUsingServiceAsync
transform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case the
CompletableFuture
returned fromservice.call(request)
will be completed with that exception. To catch and handle it, use theCompletableFuture
API.- Type Parameters:
T
- type of the request objectR
- type of the response object- Parameters:
channelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
-
bidirectionalStreamingService
@Nonnull public static <T, R> ServiceFactory<?,? extends GrpcService<T,R>> bidirectionalStreamingService(@Nonnull SupplierEx<? extends io.grpc.ManagedChannelBuilder<?>> channelFn, @Nonnull FunctionEx<? super io.grpc.ManagedChannel,? extends FunctionEx<io.grpc.stub.StreamObserver<R>,io.grpc.stub.StreamObserver<T>>> callStubFn)Creates aServiceFactory
that calls out to a bidrectional streaming gRPC service. This may provide better throughput compared to theunaryService(SupplierEx, FunctionEx)
unary} service because all communication happens within a single gRPC call, eliminating some overheads.For example, given the protobuf definition below:
We can create the following service factory:service Greeter { // Sends a greeting rpc SayHello (stream HelloRequest) returns (stream HelloReply) {} }
whereServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = bidirectionalStreamingService( () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(), channel -> GreeterGrpc.newStub(channel)::sayHello );
GreeterGrpc
is the auto-generated class by the protobuf compiler.The created
ServiceFactory
should be used in the * used with themapUsingServiceAsync
transform as follows:Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("one", "two", "three", "four")) .mapUsingServiceAsync(greeterService, (service, input) -> { HelloRequest request = HelloRequest.newBuilder().setName(input).build(); return service.call(request); }) .writeTo(Sinks.logger());
The remote end can signal an error for a given input item. In that case the
CompletableFuture
returned fromservice.call(request)
will be completed with that exception. To catch and handle it, use theCompletableFuture
API.- Type Parameters:
T
- type of the request objectR
- type of the response object- Parameters:
channelFn
- creates the channel builder. A single channel is created per processor instance.callStubFn
- a function which, given a channel, creates the stub and returns a function that calls the stub given the input item and the observer. It will be called once per input item.
-