The Apache Kafka connector supports reading and writing to Apache Kafka topics.
Apache Kafka is schema-less, however SQL assumes a schema. We assume all messages in a topic are of the same type (with some exceptions). Kafka also supports several serialization options, see below.
To work with Kafka, you must specify the
options. Currently, even if you create a table mapping explicitly, we
can't resolve these. These are the supported values for
- any of the supported SQL data
The key and value format can be different. Any options not recognized by Jet are passed directly to the Kafka producer or consumer. See the examples for individual serialization options below.
If the format is one of the primitive type names, then the Java class
representing that type will be stored in the IMap. By primitive type
we mean any supported SQL data
For example, if the topic key is an
Integer and the topic message is a
CREATE MAPPING my_topic TYPE Kafka OPTIONS ( 'keyFormat'='int', 'valueFormat'='varchar', 'bootstrap.servers' = '127.0.0.1:9092' )
Note that you don't have to provide
options, which are otherwise required - in the above case the following
will be automatically added:
'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer', 'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer', 'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer', 'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'
When using Avro, Jet reads the fields from the
KafkaAvroDeserializer. When inserting to a topic, we create an
ad-hoc Avro schema named
jet.sql from the mapping columns. Because of
this, the column list is required. Jet currently can't use your custom
Avro schema to create objects, but it can use it to read objects written
through our ad-hoc schema, as long as the field names and types match.
Mapping Between SQL and Avro Types
|SQL Type||Avro Type|
and all other types
All Avro types are a union of the
NULL type and the actual type.
CREATE MAPPING my_topic ( __key VARCHAR, ticker VARCHAR, amount BIGINT, price DECIMAL ) TYPE Kafka OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'avro', 'bootstrap.servers' = '127.0.0.1:9092', 'schema.registry.url' = 'http://127.0.0.1:8081/' /* more Kafka options ... */ )
In this example, the key is a plain
Long number, the value is
valueFormat options are handled by
Jet, the rest is passed directly to Kafka producer or consumer.
We omitted the kafka serialization options, in this case
io.confluent.kafka.serializers.KafkaAvroDeserializer are automatically
The value will be stored as a JSON object. Jet can't automatically determine the column list for this format, you must explicitly specify it:
CREATE MAPPING my_topic( __key BIGINT, ticker VARCHAR, amount INT) TYPE Kafka OPTIONS ( 'keyFormat' = 'bigint', 'valueFormat' = 'json', 'bootstrap.servers' = '127.0.0.1:9092')
There are no additional options for this format.
JSON's type system doesn't match SQL's exactly. For example, JSON
numbers have unlimited precision, but such numbers are typically not
portable. We convert SQL integer and floating-point types into JSON
numbers. We convert the
DECIMAL type, as well as all temporal types,
to JSON strings.
We don't support the
JSON type from the SQL standard yet. That means
you can't use functions like
JSON_QUERY. If your JSON
documents don't all have the same fields or if they contain nested
objects, the usability is limited.
Java serialization is the last-resort serialization option. It uses the
Java objects exactly as
KafkaConsumer.poll() returns them. You can use
it for objects serialized using the Java serialization or any other
For this format you must specify the class name using
valueJavaClass options, for example:
CREATE MAPPING my_topic TYPE Kafka OPTIONS ( 'keyFormat' = 'java', 'keyJavaClass' = 'java.lang.Long', 'valueFormat' = 'java', 'valueJavaClass' = 'com.example.Person', 'value.serializer' = 'com.example.serialization.PersonSerializer', 'value.deserializer' = 'com.example.serialization.PersonDeserializer', 'bootstrap.servers' = '127.0.0.1:9092')
If the Java class corresponds to one of the basic data types (numbers,
dates, strings), that type will directly be used for the key or value
and mapped as a column named
__key for keys and
this for values. In
the example above, the key will be mapped with the
BIGINT type. In
fact, the above
keyJavaClass duo is equivalent to
If the Java class is not one of the basic types, Hazelcast will analyze
the class using reflection and use its properties as column names. It
recognizes public fields and JavaBean-style getters. If some property
has a non-primitive type, it will be mapped under the
External Column Name
You rarely need to specify the columns in DDL. If you do, you might need to specify the external name for the column.
The entries in a map naturally have key and value elements. Because
of this, the format of the external name must be either
for a field in the key or
this.<name> for a field in the value.
The external name defaults to
this.<columnName>, so normally you only
need to specify it for key fields. There are also columns that represent
the entire key and value objects, called
For example, let's say you have these messages in your topic:
If you map the column
petName, it will have the value
null for the
key=1. This scenario is supported. Similar behavior works
with Avro format.
You need the
hazelcast-jet-kafka module on your classpath. For
Gradle or Maven, make sure to add the dependency:
If you're using the distribution package make sure use the fat one,
hazelcast-jet-kafka-4.5.jar you need is not
contained in the slim one.