T
- event typepublic class EventTimeMapper<T> extends Object
EventTimePolicy
. Generally this class should be used if a source needs
to emit watermarks
. The mapper deals with the
following concerns:
EventTimePolicy.wrapFn()
is set.
EventTimePolicy.watermarkThrottlingFrameSize()
is wasteful since they
are broadcast to all processors. The mapper ensures that watermarks are
emitted as seldom as possible.
Traverser
that holds the output data. Your source can follow this
pattern:
public boolean complete() {
if (traverser == null) {
List<Record> records = poll();
if (records.isEmpty()) {
traverser = eventTimeMapper.flatMapIdle();
} else {
traverser = traverserIterable(records)
.flatMap(event -> eventTimeMapper.flatMapEvent(
event, event.getPartition()));
}
traverser = traverser.onFirstNull(() -> traverser = null);
}
emitFromTraverser(traverser, event -> {
if (!(event instanceof Watermark)) {
// store your offset after event was emitted
offsetsMap.put(event.getPartition(), event.getOffset());
}
});
return false;
}
Other methods:
increasePartitionCount(int)
to set your partition count
initially or whenever the count increases.
getWatermark(int)
for all partitions to the snapshot. When restoring the
state, call restoreWatermark(int, long)
.
broadcastKey()
, because the external partitions don't match Hazelcast
partitions. This way, all processor instances will see all keys and they
can restore partition they handle and ignore others.
Modifier and Type | Field and Description |
---|---|
static long |
NO_NATIVE_TIME
Value to use as the
nativeEventTime argument when calling
flatMapEvent(Object, int, long) when there's no native event
time to supply. |
Constructor and Description |
---|
EventTimeMapper(EventTimePolicy<? super T> eventTimePolicy)
The partition count is initially set to 0, call
increasePartitionCount(int) to set it. |
Modifier and Type | Method and Description |
---|---|
Traverser<Object> |
flatMapEvent(T event,
int partitionIndex,
long nativeEventTime)
Flat-maps the given
event by (possibly) prepending it with a
watermark. |
Traverser<Object> |
flatMapIdle()
Call this method when there is no event coming.
|
long |
getWatermark(int partitionIndex)
Watermark value to be saved to state snapshot for the given source
partition index.
|
void |
increasePartitionCount(int newPartitionCount)
Changes the partition count.
|
void |
restoreWatermark(int partitionIndex,
long wm)
Restore watermark value from state snapshot.
|
public static final long NO_NATIVE_TIME
nativeEventTime
argument when calling
flatMapEvent(Object, int, long)
when there's no native event
time to supply.public EventTimeMapper(EventTimePolicy<? super T> eventTimePolicy)
increasePartitionCount(int)
to set it.eventTimePolicy
- event time policy as passed in Sources.streamFromProcessorWithWatermarks(java.lang.String, java.util.function.Function<com.hazelcast.jet.core.EventTimePolicy<? super T>, com.hazelcast.jet.core.ProcessorMetaSupplier>, boolean)
@Nonnull public Traverser<Object> flatMapEvent(T event, int partitionIndex, long nativeEventTime)
event
by (possibly) prepending it with a
watermark. Designed to use when emitting from traverser:
Traverser t = traverserIterable(...)
.flatMap(event -> eventTimeMapper.flatMapEvent(
event, event.getPartition(), nativeEventTime));
event
- the eventpartitionIndex
- the source partition index the event came fromnativeEventTime
- native event time in case no timestampFn
was supplied or
NO_NATIVE_TIME
if the event has no native timestamp@Nonnull public Traverser<Object> flatMapIdle()
public void increasePartitionCount(int newPartitionCount)
You can call this method at any moment. Added partitions will be considered active initially.
newPartitionCount
- partition count, must be higher than the
current countpublic long getWatermark(int partitionIndex)
restored
to a processor handling the same
partition after restart.
Method is meant to be used from Processor.saveToSnapshot()
.
partitionIndex
- 0-based source partition index.public void restoreWatermark(int partitionIndex, long wm)
Method is meant to be used from Processor.restoreFromSnapshot(Inbox)
.
See getWatermark(int)
.
partitionIndex
- 0-based source partition index.wm
- watermark value to restoreCopyright © 2019 Hazelcast, Inc.. All rights reserved.