Serialization
To be able to send object state over a network or store it in a file one has to first serialize it into raw bytes. Similarly, to be able to fetch an object state over a wire or read it from a persistent storage one has to deserialize it from raw bytes first. As Hazelcast Jet is a distributed system by nature serialization is integral part of it. Understanding, when it is involved, how does it support the pipelines and knowing differences between supported strategies is crucial to efficient usage of Hazelcast Jet.
Serialization of Pipelines
A typical Jet pipeline involves lambda expressions. Since the whole
pipeline definition must be serialized to be sent to the cluster, the
lambda expressions must be serializable as well. The Java standard
provides an essential building block: if the static type of the lambda
is a subtype of Serializable
you will automatically get a lambda
instance that can serialize itself.
None of the functional interfaces in the JDK extend Serializable
so
we had to mirror the entire java.util.function
package in our own
com.hazelcast.function
with all the interfaces subtyped and made
Serializable
. Each subtype has the name of the original with Ex
appended. For example, a FunctionEx
is just like Function
but
implements Serializable
. We use these types everywhere in the
Pipeline API.
As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what’s going on.
If the lambda references a variable in the outer scope, the variable is
captured and must also be serializable. If it references an instance
variable of the enclosing class, it implicitly captures this so the
entire class will be serialized. For example, this will fail because
JetJob1
does not implement Serializable
:
class JetJob1 {
private String instanceVar;
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("input"))
// Refers to `instanceVar`, capturing `this`, but `JetJob1` is not
// `Serializable` so this call will fail.
.filter(item -> item.equals(instanceVar));
return p;
}
}
Just implementing Serializable
for JetJob1
would be a viable
workaround here. However, consider something just a bit different:
class JetJob2 implements Serializable {
private String instanceVar;
// A non-serializable field.
private OutputStream fileOut;
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("input"))
// Refers to `instanceVar`, capturing `this`. `JetJob2` is declared
// as `Serializable`, but has a non-serializable field and this fails.
.filter(item -> item.equals(instanceVar));
return p;
}
}
Even though we never refer to fileOut
, we are still capturing the
entire JetJob2
instance. We might mark fileOut
as transient, but
the sane approach is to avoid referring to instance variables of the
surrounding class. We can simply achieve this by assigning to a local
variable, then referring to that variable inside the lambda:
class JetJob3 {
private String instanceVar;
Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
// Declare a local variable that loads the value of the instance field.
String findMe = instanceVar;
p.readFrom(Sources.list("input"))
// By referring to the local variable `findMe` we avoid
// capturing `this` and the job runs fine.
.filter(item -> item.equals(findMe));
return p;
}
}
Another common pitfall is capturing an instance of DateTimeFormatter
or a similar non-serializable class:
DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
// Captures the non-serializable formatter, so this fails.
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));
Sometimes we can get away by using one of the preconfigured formatters available in the JDK:
// Accesses the static final field `ISO_LOCAL_TIME`. Static fields are
// not subject to lambda capture, they are dereferenced when the code
// runs on the target machine.
src.map((Long tstamp) -> DateTimeFormatter.ISO_LOCAL_TIME
.format(Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));
This refers to a static final
field in the JDK, so the instance is
available on any JVM. If this is not available, you may create a
static final
field in your own class, but you can also use
mapUsingService()
. In this case you provide a serializable factory
that Jet will ask to create an object on the target member. The object
it returns does not have to be serializable. Here’s an example of that:
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
ServiceFactory<?, DateTimeFormatter> serviceFactory = nonSharedService(
pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault()));
src.mapUsingService(serviceFactory,
(formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));
Serialization of Data Types
Hazelcast Jet closely integrates with Hazelcast IMDG exposing many of
its features to Jet users. In particular, one can use IMDG data
structure as Jet Source
and/or Sink
. Objects retrieved from and
stored in those have to be serializable.
Another case which might require serializable objects is sending computation results between nodes such as in a grouping operation. Hazelcast Jet tries to minimize network traffic as much as possible, nonetheless different parts of a DAG can reside on separate cluster members. To catch serialization issues early on, we recommend using a 2-member local Jet cluster for development and testing.
Currently, Hazelcast Jet supports 4 interfaces to serialize custom types:
- java.io.Serializable
- java.io.Externalizable
- com.hazelcast.nio.serialization.Portable
- com.hazelcast.nio.serialization.StreamSerializer & com.hazelcast.nio.serialization.ByteArraySerializer
The following table provides a comparison between them to help you in deciding which interface to use in your applications.
Serialization interface | Advantages | Drawbacks |
---|---|---|
Serializable | Easy to start with, requires no implementation or registration | CPU intensive and space inefficient |
Externalizable | Faster and more space efficient than Serializable, but no registration required | CPU intensive, space inefficient and requires implementation |
Portable | Faster and more space efficient than java standard interfaces. Supports versioningSupports partial deserialization | Requires implementation and factory registration during cluster setup |
StreamSerializer | The fastest and lightest out of supported interfaces | Requires implementation and registration during cluster setup |
Below you can find rough performance numbers one can expect when employing each of those strategies. A straightforward benchmark which continuously serializes and then deserializes very simple object:
class Person {
private String firstName;
private String lastName;
private int age;
private float height;
}
counting the total throughput, yields following results:
# Processor: Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz
# VM version: JDK 13, OpenJDK 64-Bit Server VM, 13+33
Benchmark Mode Cnt Score Error Units
SerializationBenchmark.serializable thrpt 3 0.259 ± 0.087 ops/us
SerializationBenchmark.externalizable thrpt 3 0.846 ± 0.057 ops/us
SerializationBenchmark.portable thrpt 3 1.171 ± 0.539 ops/us
SerializationBenchmark.stream thrpt 3 4.828 ± 1.227 ops/us
The very same object instantiated with sample data will also be encoded with different number of bytes depending on used strategy:
Strategy Number of Bytes Overhead %
java.io.Serializable 162 523
java.io.Externalizable 87 234
com.hazelcast.nio.serialization.Portable 104 300
com.hazelcast.nio.serialization.StreamSerializer 26 0
You can see that using plain Serializable
can easily become a
bottleneck in your application, as even with this simple data type it's
more than an order of magnitude slower than other serialization options,
not to mention very wasteful of memory usage.
Writing Custom Serializers
For best performance and simplest implementation we recommend using com.hazelcast.nio.serialization.StreamSerializer or com.hazelcast.nio.serialization.ByteArraySerializer.
Below you can find a sample implementation of StreamSerializer
for
Person
(mind the type id which should be unique across all serializers):
class PersonSerializer implements StreamSerializer<Person> {
private static final int TYPE_ID = 1;
@Override
public int getTypeId() {
return TYPE_ID;
}
@Override
public void write(ObjectDataOutput out, Person person) throws IOException {
out.writeUTF(person.firstName);
out.writeUTF(person.lastName);
out.writeInt(person.age);
out.writeFloat(person.height);
}
@Override
public Person read(ObjectDataInput in) throws IOException {
return new Person(in.readUTF(), in.readUTF(), in.readInt(), in.readFloat());
}
}
Then the serializer should be registered with Jet up front on cluster
startup. The best way to do is to create a SerializerHook
which can
automatically be registered on startup:
class PersonSerializerHook implements SerializerHook<Value> {
@Override
public Class<Person> getSerializationType() {
return Person.class;
}
@Override
public Serializer createSerializer() {
return new PersonSerializer();
}
@Override
public boolean isOverwritable() {
return true;
}
}
You'll also need to add the file
META-INF/services/com.hazelcast.SerializerHook
with the following
content:
com.hazelcast.jet.examples.PersonSerializerHook
Alternatively, you can add the following configuration to hazelcast.yaml
:
hazelcast:
serialization:
serializers:
serializer:
"type-class": "com.hazelcast.jet.examples.Person"
"class-name": "com.hazelcast.jet.examples.PersonSerializer"
All the classes - data types, serializers & hooks - should be present
on the server classpath, ideally in server's lib
directory packaged as
a jar file.