Class ServiceFactories

java.lang.Object
com.hazelcast.jet.pipeline.ServiceFactories

public final class ServiceFactories
extends Object
Utility class with methods that create several useful service factories.
Since:
3.0
  • Method Details

    • replicatedMapService

      @Nonnull public static <K,​ V> ServiceFactory<?,​ReplicatedMap<K,​V>> replicatedMapService​(@Nonnull String mapName)
      Returns a factory that provides a ReplicatedMap as the service object. A replicated map is a particularly good choice if you are enriching an event stream with the data stored in the Hazelcast Jet cluster. Unlike in a hashJoin transformation, the data in the map can change while the job is running so you can keep the enriching dataset up-to-date. Unlike IMap, the data you access is local so you won't do any blocking calls using it (important for performance).

      If you want to destroy the map after the job finishes, call factory.destroyFn(ReplicatedMap::destroy) on the object you get from this method.

      Example usage (without destroyFn):

       p.readFrom( /* a batch or streaming source */ )
        .mapUsingService(replicatedMapService("fooMapName"),
            (map, item) -> tuple2(item, map.get(item.getKey())))
        .destroyFn(ReplicatedMap::destroy);
       
      Type Parameters:
      K - type of the map key
      V - type of the map value
      Parameters:
      mapName - name of the ReplicatedMap to use as the service
      Since:
      3.0
    • iMapService

      @Nonnull public static <K,​ V> ServiceFactory<?,​IMap<K,​V>> iMapService​(@Nonnull String mapName)
      Returns a factory that provides an IMap as the service. This is useful if you are enriching an event stream with the data stored in the Hazelcast Jet cluster. Unlike in a hashJoin transformation, the data in the map can change while the job is running so you can keep the enriching dataset up-to-date.

      Instead of using this factory, you can call GeneralStage.mapUsingIMap(IMap, FunctionEx, BiFunctionEx) or GeneralStageWithKey.mapUsingIMap(IMap, BiFunctionEx).

      If you plan to use a sync method on the map, call ServiceFactory.toNonCooperative() on the returned factory.

      Type Parameters:
      K - key type
      V - value type
      Parameters:
      mapName - name of the map used as service
      Returns:
      the service factory
      Since:
      3.0
    • sharedService

      @Nonnull public static <S> ServiceFactory<?,​S> sharedService​(@Nonnull FunctionEx<? super ProcessorSupplier.Context,​S> createServiceFn)
      A variant of sharedService(createFn, destroyFn) with a no-op destroyFn.

      Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative() as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Call stage.setLocalParallelism() to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.

      Since:
      4.0
    • sharedService

      public static <S> ServiceFactory<?,​S> sharedService​(@Nonnull FunctionEx<? super ProcessorSupplier.Context,​S> createServiceFn, @Nonnull ConsumerEx<S> destroyServiceFn)
      Returns a ServiceFactory which will provide a single shared service object per cluster member. All parallel processors serving the associated pipeline stage will use the same object. Since the service object will be accessed from many parallel threads, it must be thread-safe.

      Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative() as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Call stage.setLocalParallelism() to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.

      Type Parameters:
      S - type of the service object
      Parameters:
      createServiceFn - the function that creates the service. It will be called once on each Jet member. It must be stateless.
      destroyServiceFn - the function that destroys the service. It will be called once on each Jet member. It can be used to tear down any resources acquired by the service. It must be stateless.
      See Also:
      nonSharedService(FunctionEx, ConsumerEx)
    • nonSharedService

      @Nonnull public static <S> ServiceFactory<?,​S> nonSharedService​(@Nonnull FunctionEx<? super Processor.Context,​? extends S> createServiceFn)
      A variant of nonSharedService(createFn, destroyFn) with a no-op destroyFn.

      Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative() as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Call stage.setLocalParallelism() to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.

      Since:
      4.0
    • nonSharedService

      public static <S> ServiceFactory<?,​S> nonSharedService​(@Nonnull FunctionEx<? super Processor.Context,​? extends S> createServiceFn, @Nonnull ConsumerEx<? super S> destroyServiceFn)
      Returns a ServiceFactory which creates a separate service instance for each parallel Jet processor. The number of processors on each cluster member is dictated by stage.localParallelism. Use this when the service instance should not be shared across multiple threads.

      Note: if your service has a blocking API (e.g., doing synchronous IO or acquiring locks), you must call ServiceFactory.toNonCooperative() as a hint to the Jet execution engine to start a dedicated thread for those calls. Failing to do this can cause severe performance problems. You should also carefully consider how much local parallelism you need for this step since each parallel tasklet needs its own thread. Call stage.setLocalParallelism() to set an explicit level, otherwise it will depend on the number of cores on the Jet machine, which makes no sense for blocking code.

      Type Parameters:
      S - type of the service object
      Parameters:
      createServiceFn - the function that creates the service. It will be called once per processor instance. It must be stateless.
      destroyServiceFn - the function that destroys the service. It will be called once per processor instance. It can be used to tear down any resources acquired by the service. It must be stateless.
      See Also:
      sharedService(FunctionEx, ConsumerEx)