Class KinesisSinks

java.lang.Object
com.hazelcast.jet.kinesis.KinesisSinks

public final class KinesisSinks
extends Object
Contains factory methods for creating Amazon Kinesis Data Streams (KDS) sinks.
Since:
4.4
  • Nested Class Summary

    Nested Classes
    Modifier and Type Class Description
    static class  KinesisSinks.Builder<T>
    Fluent builder for constructing the Kinesis sink and setting its configuration parameters.
  • Field Summary

    Fields
    Modifier and Type Field Description
    static String BATCH_SIZE_METRIC
    One of the metrics exposed by the sink used to monitor the current sending batch size.
    static int MAX_RECORD_SIZE
    The length of a record's data blob (byte array length), plus the record's key size (no.
    static int MAXIMUM_KEY_LENGTH
    Kinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.
    static String THROTTLING_SLEEP_METRIC
    One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds).
  • Method Summary

    Modifier and Type Method Description
    static KinesisSinks.Builder<Map.Entry<String,​byte[]>> kinesis​(String stream)
    Convenience method for a specific type of sink, one that ingests items of type Map.Entry<String, byte[]> and assumes that the entries key is the partition key and the entries value is the record data blob.
    static <T> KinesisSinks.Builder<T> kinesis​(String stream, FunctionEx<T,​String> keyFn, FunctionEx<T,​byte[]> valueFn)
    Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Field Details

    • MAXIMUM_KEY_LENGTH

      public static final int MAXIMUM_KEY_LENGTH
      Kinesis partition keys are Unicode strings, with a maximum length limit of 256 characters for each key.
      See Also:
      Constant Field Values
    • MAX_RECORD_SIZE

      public static final int MAX_RECORD_SIZE
      The length of a record's data blob (byte array length), plus the record's key size (no. of Unicode characters in the key), must not be larger than 1MiB.
      See Also:
      Constant Field Values
    • BATCH_SIZE_METRIC

      public static final String BATCH_SIZE_METRIC
      One of the metrics exposed by the sink used to monitor the current sending batch size. The batch size is computed based on the number of shards in the stream. The more shards, the bigger the batch size, the more data the stream can ingest. The maximum value is 500, the limit imposed by Kinesis.
      See Also:
      Constant Field Values
    • THROTTLING_SLEEP_METRIC

      public static final String THROTTLING_SLEEP_METRIC
      One of the metrics exposed by the sink used to monitor the current sleep delay between consecutive sends (in milliseconds). When the flow control mechanism is deactivated, the value should always be zero. If flow control kicks in, the value keeps increasing until shard ingestion rates are no longer tripped, then stabilizes, then slowly decreases back to zero (if the data rate was just a spike and is not sustained).
      See Also:
      Constant Field Values
  • Method Details

    • kinesis

      @Nonnull public static <T> KinesisSinks.Builder<T> kinesis​(@Nonnull String stream, @Nonnull FunctionEx<T,​String> keyFn, @Nonnull FunctionEx<T,​byte[]> valueFn)
      Initiates the building of a sink that publishes messages into Amazon Kinesis Data Streams (KDS).

      The basic unit of data stored in KDS is the record. A record is composed of a sequence number, partition key, and data blob. KDS assigns the sequence numbers on ingestion; the other two have to be specified by the sink.

      The key function we provide specifies how to assign partition keys to the items to be published. The partition keys have the role of grouping related records so that Kinesis can handle them accordingly. Partition keys are Unicode strings with a maximum length of 256 characters.

      The value function we provide specifies how to extract the useful data content from the items to be published. It's basically serialization of the messages (Kinesis handles neither serialization nor deserialization internally). The length of the resulting byte array, plus the length of the partition key, can't be longer than 1MiB.

      The sink is distributed Each instance handles a subset of possible partition keys. In Jet terms, the sink processors' input edges are distributed and partitioned by the same key function we provide for computing the partition keys. As a result, each item with the same partition key will end up in the same distributed sink instance.

      The sink can be used in both streaming and batching pipelines.

      The only processing guarantee the sink can support is at-least-once. This is caused by the lack of transaction support in Kinesis (can't write data into it with transactional guarantees) and the AWS SDK occasionally causing data duplication on its own (@see Producer Retries in the documentation).

      The sink preserves the ordering of items for the same partition key, as long as the stream's ingestion rate is not tripped. Ingestion rates in Kinesis are specified on a per shard basis, which is the base throughput unit. One shard provides a capacity of 1MiB/sec data input and 2MiB/sec data output. One shard can support up to 1000 record publications per second. The owner of the Kinesis stream specifies the number of shards needed when creating the stream. It can be changed later without stopping the data flow.

      If the sinks attempt to write more into a shard than allowed, some records will be rejected. This rejection breaks the ordering because the sinks write data in batches, and the shards don't just reject entire batches but random records from them. What's rejected can (and is) retried, but the batch's original ordering can't be preserved.

      The sink cannot avoid rejections entirely because multiple instances of it write into the same shard, and coordinating an aggregated rate among them is not something currently possible in Jet.

      The sink has a flow control mechanism, which tries to minimize the amount of ingestion rate tripping, when it starts happening, by reducing the send batch size and introducing adaptive sleep delays between consecutive sends. However, the only sure way to avoid the problem is having enough shards and a good spread of partition keys (partition keys are assigned to shard based on an MD5 hash function; each shard owns a partition of the hash space).

      The sink exposes metrics that can be used to monitor the flow control mechanism. One of them is the send batch size; the other one is the sleep delay between consecutive sends.

      Parameters:
      stream - name of the Kinesis stream being written into
      keyFn - function for computing partition keys
      valueFn - function for computing serialized message data content
      Returns:
      fluent builder that can be used to set properties and also to construct the sink once configuration is done
    • kinesis

      @Nonnull public static KinesisSinks.Builder<Map.Entry<String,​byte[]>> kinesis​(@Nonnull String stream)
      Convenience method for a specific type of sink, one that ingests items of type Map.Entry<String, byte[]> and assumes that the entries key is the partition key and the entries value is the record data blob. No explicit key nor value function needs to be specified.