Class Sinks
public final class Sinks extends Object
SinkStage
and accepts no downstream stages.
The default local parallelism for the sinks in this class is typically 1, check the documentation of individual methods.
- Since:
- 3.0
-
Method Summary
Modifier and Type Method Description static <T extends Map.Entry>
Sink<T>cache(String cacheName)
Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name.static <T> Sink<T>
files(String directoryName)
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files.static <T> FileSinkBuilder<T>
filesBuilder(String directoryName)
Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API.static <T> Sink<T>
fromProcessor(String sinkName, ProcessorMetaSupplier metaSupplier)
Returns a sink constructed directly from the given Core API processor meta-supplier.static <T> Sink<T>
jdbc(String updateQuery, SupplierEx<Connection> newConnectionFn, BiConsumerEx<PreparedStatement,T> bindFn)
Returns a sink that connects to the specified database using the givennewConnectionFn
, prepares a statement using the givenupdateQuery
and inserts/updates the items.static <T> Sink<T>
jdbc(String updateQuery, String connectionUrl, BiConsumerEx<PreparedStatement,T> bindFn)
Convenience forjdbc(String, SupplierEx, BiConsumerEx)
.static <T> Sink<T>
jmsQueue(SupplierEx<javax.jms.ConnectionFactory> factorySupplier, String name)
Convenience forjmsQueueBuilder(SupplierEx)
.static <T> JmsSinkBuilder<T>
jmsQueueBuilder(SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API.static <T> Sink<T>
jmsTopic(SupplierEx<javax.jms.ConnectionFactory> factorySupplier, String name)
Convenience forjmsTopicBuilder(SupplierEx)
.static <T> JmsSinkBuilder<T>
jmsTopicBuilder(SupplierEx<javax.jms.ConnectionFactory> factorySupplier)
Returns a builder object that offers a step-by-step fluent API to build a custom JMS topic sink for the Pipeline API.static <T> Sink<T>
list(IList<? super T> list)
Returns a sink that adds the items it receives to the specified HazelcastIList
.static <T> Sink<T>
list(String listName)
Returns a sink that adds the items it receives to a HazelcastIList
with the specified name.static <T> Sink<T>
logger()
static <T> Sink<T>
logger(FunctionEx<? super T,String> toStringFn)
Returns a sink that logs all the data items it receives, at the INFO level to the log categoryWriteLoggerP
.static <K, V> Sink<Map.Entry<K,V>>
map(IMap<? super K,? super V> map)
Returns a sink that putsMap.Entry
s it receives into the given HazelcastIMap
.static <K, V> Sink<Map.Entry<K,V>>
map(String mapName)
Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name.static <T, K, V, R> Sink<T>
mapWithEntryProcessor(IMap<? super K,? super V> map, FunctionEx<? super T,? extends K> toKeyFn, FunctionEx<? super T,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name.static <E, K, V, R> Sink<E>
mapWithEntryProcessor(String mapName, FunctionEx<? super E,? extends K> toKeyFn, FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name.static <T, K, V> Sink<T>
mapWithMerging(IMap<? super K,? super V> map, FunctionEx<? super T,? extends K> toKeyFn, FunctionEx<? super T,? extends V> toValueFn, BinaryOperatorEx<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
.static <K, V> Sink<Map.Entry<K,V>>
mapWithMerging(IMap<? super K,V> map, BinaryOperatorEx<V> mergeFn)
Convenience formapWithMerging(IMap, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item.static <K, V> Sink<Map.Entry<K,V>>
mapWithMerging(String mapName, BinaryOperatorEx<V> mergeFn)
Convenience formapWithMerging(String, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item.static <T, K, V> Sink<T>
mapWithMerging(String mapName, FunctionEx<? super T,? extends K> toKeyFn, FunctionEx<? super T,? extends V> toValueFn, BinaryOperatorEx<V> mergeFn)
Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
.static <K, V, E extends Map.Entry<K, V>>
Sink<E>mapWithUpdating(IMap<? super K,? super V> map, BiFunctionEx<? super V,? super E,? extends V> updateFn)
Convenience formapWithUpdating(IMap, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item.static <T, K, V> Sink<T>
mapWithUpdating(IMap<? super K,? super V> map, FunctionEx<? super T,? extends K> toKeyFn, BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
.static <K, V, E extends Map.Entry<K, V>>
Sink<E>mapWithUpdating(String mapName, BiFunctionEx<? super V,? super E,? extends V> updateFn)
Convenience formapWithUpdating(String, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item.static <T, K, V> Sink<T>
mapWithUpdating(String mapName, FunctionEx<? super T,? extends K> toKeyFn, BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
.static <T> Sink<T>
noop()
Returns a sink which discards all received items.static <T> Sink<T>
observable(Observable<? super T> observable)
Returns a sink that publishes to the providedObservable
.static <T> Sink<T>
observable(String name)
Returns a sink that publishes to theObservable
with the provided name.static <T> Sink<T>
reliableTopic(ITopic<Object> reliableTopic)
Returns a sink which publishes the items it receives to the provided distributed reliable topic.static <T> Sink<T>
reliableTopic(String reliableTopicName)
Returns a sink which publishes the items it receives to a distributed reliable topic with the specified name.static <T extends Map.Entry>
Sink<T>remoteCache(String cacheName, ClientConfig clientConfig)
Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <T> Sink<T>
remoteList(String listName, ClientConfig clientConfig)
Returns a sink that adds the items it receives to a HazelcastIList
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <K, V> Sink<Map.Entry<K,V>>
remoteMap(String mapName, ClientConfig clientConfig)
Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.static <E, K, V, R> Sink<E>
remoteMapWithEntryProcessor(String mapName, ClientConfig clientConfig, FunctionEx<? super E,? extends K> toKeyFn, FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)
Returns a sink equivalent tomapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.static <K, V> Sink<Map.Entry<K,V>>
remoteMapWithMerging(String mapName, ClientConfig clientConfig, BinaryOperatorEx<V> mergeFn)
static <T, K, V> Sink<T>
remoteMapWithMerging(String mapName, ClientConfig clientConfig, FunctionEx<? super T,? extends K> toKeyFn, FunctionEx<? super T,? extends V> toValueFn, BinaryOperatorEx<V> mergeFn)
Returns a sink equivalent tomapWithMerging(String, BinaryOperatorEx)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.static <K, V, E extends Map.Entry<K, V>>
Sink<E>remoteMapWithUpdating(String mapName, ClientConfig clientConfig, BiFunctionEx<? super V,? super E,? extends V> updateFn)
static <T, K, V> Sink<T>
remoteMapWithUpdating(String mapName, ClientConfig clientConfig, FunctionEx<? super T,? extends K> toKeyFn, BiFunctionEx<? super V,? super T,? extends V> updateFn)
Returns a sink equivalent tomapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.static <T> Sink<T>
remoteReliableTopic(String reliableTopicName, ClientConfig clientConfig)
Returns a sink which publishes the items it receives to a distributed reliable topic with the provided name in a remote cluster identified by the suppliedClientConfig
.static <T> Sink<T>
socket(String host, int port)
Convenience forsocket(String, int, FunctionEx, Charset)
withObject.toString
as the conversion function and UTF-8 as the charset.static <T> Sink<T>
socket(String host, int port, FunctionEx<? super T,? extends String> toStringFn)
Convenience forsocket(String, int, FunctionEx, Charset)
with UTF-8 as the charset.static <T> Sink<T>
socket(String host, int port, FunctionEx<? super T,? extends String> toStringFn, Charset charset)
Returns a sink that connects to the specified TCP socket and writes to it a string representation of the items it receives.
-
Method Details
-
fromProcessor
@Nonnull public static <T> Sink<T> fromProcessor(@Nonnull String sinkName, @Nonnull ProcessorMetaSupplier metaSupplier)Returns a sink constructed directly from the given Core API processor meta-supplier.The default local parallelism for this source is specified inside the
metaSupplier
.- Parameters:
sinkName
- user-friendly sink namemetaSupplier
- the processor meta-supplier
-
map
Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
map
Returns a sink that putsMap.Entry
s it receives into the given HazelcastIMap
.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
remoteMap
@Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMap(@Nonnull String mapName, @Nonnull ClientConfig clientConfig)Returns a sink that putsMap.Entry
s it receives into a HazelcastIMap
with the specified name in a remote cluster identified by the suppliedClientConfig
.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 1.
-
mapWithMerging
@Nonnull public static <T, K, V> Sink<T> mapWithMerging(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
. If the map already contains the key, it applies the givenmergeFn
to resolve the existing and the proposed value into the value to use. If the value comes out asnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);
This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rulemergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
mapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the received item
-
mapWithMerging
@Nonnull public static <T, K, V> Sink<T> mapWithMerging(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)Returns a sink that uses the supplied functions to extract the key and value with which to update a HazelcastIMap
. If the map already contains the key, it applies the givenmergeFn
to resolve the existing and the proposed value into the value to use. If the value comes out asnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = toValueFn.apply(item); V resolved = (oldValue == null) ? newValue : mergeFn.apply(oldValue, newValue); if (value == null) map.remove(key); else map.put(key, value);
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.This sink supports exactly-once processing only if the supplied merge function performs idempotent updates, i.e., it satisfies the rule
mergeFn.apply(oldValue, toValueFn.apply(e)).equals(oldValue)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- the map to drain totoKeyFn
- function that extracts the key from the input itemtoValueFn
- function that extracts the value from the input itemmergeFn
- function that merges the existing value with the value acquired from the received item
-
remoteMapWithMerging
@Nonnull public static <T, K, V> Sink<T> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn)Returns a sink equivalent tomapWithMerging(String, BinaryOperatorEx)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.Due to the used API, the remote cluster must be at least 3.11.
-
mapWithMerging
@Nonnull public static <K, V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull String mapName, @Nonnull BinaryOperatorEx<V> mergeFn)Convenience formapWithMerging(String, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item. -
mapWithMerging
@Nonnull public static <K, V> Sink<Map.Entry<K,V>> mapWithMerging(@Nonnull IMap<? super K,V> map, @Nonnull BinaryOperatorEx<V> mergeFn)Convenience formapWithMerging(IMap, FunctionEx, FunctionEx, BinaryOperatorEx)
withMap.Entry
as input item. -
remoteMapWithMerging
@Nonnull public static <K, V> Sink<Map.Entry<K,V>> remoteMapWithMerging(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BinaryOperatorEx<V> mergeFn) -
mapWithUpdating
@Nonnull public static <T, K, V> Sink<T> mapWithUpdating(@Nonnull String mapName, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
. For each item it receives, it appliestoKeyFn
to get the key and then appliesupdateFn
to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value isnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);
This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the ruleupdateFn.apply(v, e).equals(v)
for anye
that was already observed.Note: This operation is NOT lock-aware, it will process the entries no matter if they are locked or not. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
mapName
- name of the maptoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item and returns the new map value
-
mapWithUpdating
@Nonnull public static <T, K, V> Sink<T> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)Returns a sink that uses the supplied key-extracting and value-updating functions to update a HazelcastIMap
. For each item it receives, it appliestoKeyFn
to get the key and then appliesupdateFn
to the existing value in the map and the received item to acquire the new value to associate with the key. If the new value isnull
, it removes the key from the map. Expressed as code, the sink performs the equivalent of the following for each item:K key = toKeyFn.apply(item); V oldValue = map.get(key); V newValue = updateFn.apply(oldValue, item); if (newValue == null) map.remove(key); else map.put(key, newValue);
NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.This sink supports exactly-once processing only if the supplied update function performs idempotent updates, i.e., it satisfies the rule
updateFn.apply(v, e).equals(v)
for anye
that was already observed.Note: This operation is not lock-aware, it will process the entries even if they are locked. Use
mapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
if you need locking.The default local parallelism for this sink is 1.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- map to drain totoKeyFn
- function that extracts the key from the input itemupdateFn
- function that receives the existing map value and the item and returns the new map value
-
remoteMapWithUpdating
@Nonnull public static <T, K, V> Sink<T> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V,? super T,? extends V> updateFn)Returns a sink equivalent tomapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
.Due to the used API, the remote cluster must be at least 3.11.
-
mapWithUpdating
@Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> mapWithUpdating(@Nonnull String mapName, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)Convenience formapWithUpdating(String, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item. -
mapWithUpdating
@Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> mapWithUpdating(@Nonnull IMap<? super K,? super V> map, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn)Convenience formapWithUpdating(IMap, FunctionEx, BiFunctionEx)
withMap.Entry
as the input item. -
remoteMapWithUpdating
@Nonnull public static <K, V, E extends Map.Entry<K, V>> Sink<E> remoteMapWithUpdating(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull BiFunctionEx<? super V,? super E,? extends V> updateFn) -
mapWithEntryProcessor
@Nonnull public static <E, K, V, R> Sink<E> mapWithEntryProcessor(@Nonnull String mapName, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name. For each received item it appliestoKeyFn
to get the key andtoEntryProcessorFn
to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.As opposed to
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.If your entry processors take a long time to update a value, consider using entry processors that implement
Offloadable
. This will avoid blocking the Hazelcast partition thread during large update operations.This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.The default local parallelism for this sink is 1.
- Type Parameters:
E
- input item typeK
- key typeV
- value type- Parameters:
mapName
- name of the maptoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns theEntryProcessor
to apply to the key
-
mapWithEntryProcessor
@Nonnull public static <T, K, V, R> Sink<T> mapWithEntryProcessor(@Nonnull IMap<? super K,? super V> map, @Nonnull FunctionEx<? super T,? extends K> toKeyFn, @Nonnull FunctionEx<? super T,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)Returns a sink that uses the items it receives to createEntryProcessor
s it submits to a HazelcastIMap
with the specified name. For each received item it appliestoKeyFn
to get the key andtoEntryProcessorFn
to get the entry processor, and then submits the key and the entry processor to the Hazelcast cluster, which will internally apply the entry processor to the key.NOTE: Jet only remembers the name of the map you supply and acquires a map with that name on the local cluster. If you supply a map instance from another cluster, no error will be thrown to indicate this.
As opposed to
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this sink does not use batching and submits a separate entry processor for each received item. For use cases that are efficiently solvable using those sinks, this one will perform worse. It should be used only when they are not applicable.If your entry processors take a long time to update a value, consider using entry processors that implement
Offloadable
. This will avoid blocking the Hazelcast partition thread during large update operations.This sink supports exactly-once processing only if the supplied entry processor performs idempotent updates, i.e., the resulting value would be the same if an entry processor was run on the same entry more than once.
Note: Unlike
mapWithUpdating(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.BiFunctionEx<? super V, ? super T, ? extends V>)
andmapWithMerging(java.lang.String, com.hazelcast.function.FunctionEx<? super T, ? extends K>, com.hazelcast.function.FunctionEx<? super T, ? extends V>, com.hazelcast.function.BinaryOperatorEx<V>)
, this operation is lock-aware. If the key is locked, the EntryProcessor will wait until it acquires the lock.The default local parallelism for this sink is 1.
- Type Parameters:
T
- input item typeK
- key typeV
- value type- Parameters:
map
- map to drain totoKeyFn
- function that extracts the key from the input itemtoEntryProcessorFn
- function that returns theEntryProcessor
to apply to the key
-
remoteMapWithEntryProcessor
@Nonnull public static <E, K, V, R> Sink<E> remoteMapWithEntryProcessor(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull FunctionEx<? super E,? extends K> toKeyFn, @Nonnull FunctionEx<? super E,? extends EntryProcessor<K,V,R>> toEntryProcessorFn)Returns a sink equivalent tomapWithEntryProcessor(java.lang.String, com.hazelcast.function.FunctionEx<? super E, ? extends K>, com.hazelcast.function.FunctionEx<? super E, ? extends com.hazelcast.map.EntryProcessor<K, V, R>>)
, but for a map in a remote Hazelcast cluster identified by the suppliedClientConfig
. -
cache
Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
-
remoteCache
@Nonnull public static <T extends Map.Entry> Sink<T> remoteCache(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig)Returns a sink that putsMap.Entry
s it receives into a HazelcastICache
with the specified name in a remote cluster identified by the suppliedClientConfig
.This sink provides the exactly-once guarantee thanks to idempotent updates. It means that the value with the same key is not appended, but overwritten. After the job is restarted from snapshot, duplicate items will not change the state in the target map.
The default local parallelism for this sink is 2.
-
list
Returns a sink that adds the items it receives to a HazelcastIList
with the specified name.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
list
Returns a sink that adds the items it receives to the specified HazelcastIList
.NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.
No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
remoteList
@Nonnull public static <T> Sink<T> remoteList(@Nonnull String listName, @Nonnull ClientConfig clientConfig)Returns a sink that adds the items it receives to a HazelcastIList
with the specified name in a remote cluster identified by the suppliedClientConfig
.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
reliableTopic
Returns a sink which publishes the items it receives to a distributed reliable topic with the specified name.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- 4.0
-
reliableTopic
Returns a sink which publishes the items it receives to the provided distributed reliable topic. More precisely, it takes the name of the givenITopic
and then independently retrieves theITopic
with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained theITopic
from the same cluster to which you will submit the pipeline.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- 4.0
-
remoteReliableTopic
@Nonnull public static <T> Sink<T> remoteReliableTopic(@Nonnull String reliableTopicName, @Nonnull ClientConfig clientConfig)Returns a sink which publishes the items it receives to a distributed reliable topic with the provided name in a remote cluster identified by the suppliedClientConfig
.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
Local parallelism for this sink is 1.
- Since:
- 4.0
-
socket
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn, @Nonnull Charset charset)Returns a sink that connects to the specified TCP socket and writes to it a string representation of the items it receives. It converts an item to its string representation using the suppliedtoStringFn
function and encodes the string using the suppliedCharset
. It follows each item with a newline character.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.
The default local parallelism for this sink is 1.
-
socket
@Nonnull public static <T> Sink<T> socket(@Nonnull String host, int port, @Nonnull FunctionEx<? super T,? extends String> toStringFn)Convenience forsocket(String, int, FunctionEx, Charset)
with UTF-8 as the charset. -
socket
Convenience forsocket(String, int, FunctionEx, Charset)
withObject.toString
as the conversion function and UTF-8 as the charset. -
filesBuilder
Returns a builder object that offers a step-by-step fluent API to build a custom file sink for the Pipeline API. See javadoc of methods inFileSinkBuilder
for more details.The sink writes the items it receives to files. Each processor will write to its own files whose names contain the processor's global index (an integer unique to each processor of the vertex), but a single directory is used for all files, on all cluster members. That directory can be a shared in a network - the global processor index is different on each member and the members won't overwrite each other's files.
Fault tolerance
If the job is running in exactly-once mode, Jet writes the items to temporary files (ending with a ".tmp" suffix). When Jet commits a snapshot, it atomically renames the file to remove this suffix. Thanks to the two-phase commit of the snapshot the sink provides exactly-once guarantee. Because Jet starts a new file each time it snapshots the state, the sink will likely produce many more small files, depending on the snapshot interval.If you want to avoid the temporary files or the high number of files but need to have exactly-once for other processors in the job, call
exactlyOnce(false)
on the returned builder. This will give you at-least-once guarantee for the source and unchanged guarantee for other processors.File name structure
Description (parts in[<date>-]<global processor index>[-<sequence>][".tmp"]
[]
are optional):<date>
: the current date and time, seeFileSinkBuilder.rollByDate(String)
. Not present if rolling by date is not used<global processor index>
: a processor index ensuring that each parallel processor writes to its own file<sequence>
: a sequence number starting from 0. Used if either:- running in exactly-once mode
- maximum file size is set
<date>
changes.".tmp"
: theFileSinkBuilder.TEMP_FILE_SUFFIX
, used if the file is not yet committed
Notes
The target directory is not deleted before the job start. If file names clash, they are appended to. This is needed to ensure at-least-once behavior. In exactly-once mode the file names never clash thanks to the sequence number in file name: a number higher than the highest sequence number found in the directory is always chosen.For performance, the processor doesn't delete old files from the directory. If you have frequent snapshots, you should delete the old files from time to time to avoid having huge number of files in the directory. Jet lists the files in the directory after a restart to find out the sequence number to use.
The default local parallelism for this sink is 1.
- Type Parameters:
T
- type of the items the sink accepts
-
files
Convenience forfilesBuilder(java.lang.String)
with the UTF-8 charset and with overwriting of existing files. -
logger
Returns a sink that logs all the data items it receives, at the INFO level to the log categoryWriteLoggerP
. It also logswatermark
items, but at FINE level.The sink logs each item on whichever cluster member it happens to receive it. Its primary purpose is for development, when running Jet on a local machine.
The default local parallelism for this sink is 1.
- Type Parameters:
T
- stream item type- Parameters:
toStringFn
- a function that returns a string representation of a stream item
-
logger
-
noop
Returns a sink which discards all received items. -
jmsQueue
@Nonnull public static <T> Sink<T> jmsQueue(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)Convenience forjmsQueueBuilder(SupplierEx)
. Creates a connection without any authentication parameters. If a received item is not an instance ofjavax.jms.Message
, the sink wrapsitem.toString()
into aTextMessage
.- Parameters:
factorySupplier
- supplier to obtain JMS connection factoryname
- the name of the queue
-
jmsQueueBuilder
@Nonnull public static <T> JmsSinkBuilder<T> jmsQueueBuilder(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)Returns a builder object that offers a step-by-step fluent API to build a custom JMS queue sink for the Pipeline API. See javadoc forJmsSinkBuilder
methods for more details.Behavior on job restart: the processor is stateless. Items are written in auto-acknowledge mode. If the job is restarted, duplicate events can occur, giving you at-least-once guarantee. If you need exactly-once behavior, you must ensure idempotence on the consumer end.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs are available).
- Type Parameters:
T
- type of the items the sink accepts
-
jmsTopic
@Nonnull public static <T> Sink<T> jmsTopic(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier, @Nonnull String name)Convenience forjmsTopicBuilder(SupplierEx)
. Creates a connection without any authentication parameters and uses non-transacted sessions withSession.AUTO_ACKNOWLEDGE
mode. If a received item is not an instance ofjavax.jms.Message
, the sink wrapsitem.toString()
into aTextMessage
.- Parameters:
factorySupplier
- supplier to obtain JMS connection factoryname
- the name of the queue
-
jmsTopicBuilder
@Nonnull public static <T> JmsSinkBuilder<T> jmsTopicBuilder(@Nonnull SupplierEx<javax.jms.ConnectionFactory> factorySupplier)Returns a builder object that offers a step-by-step fluent API to build a custom JMS topic sink for the Pipeline API. See javadoc onJmsSinkBuilder
methods for more details.Behavior on job restart: the processor is stateless. Items are written in auto-acknowledge mode. If the job is restarted, duplicate events can occur, giving you at-least-once guarantee. If you need exactly-once behavior, you must ensure idempotence on the consumer end.
IO failures should be handled by the JMS provider. If any JMS operation throws an exception, the job will fail. Most of the providers offer a configuration parameter to enable auto-reconnection, refer to provider documentation for details.
The default local parallelism for this processor is 4 (or less if less CPUs are available).
- Type Parameters:
T
- type of the items the sink accepts
-
jdbc
@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull SupplierEx<Connection> newConnectionFn, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)Returns a sink that connects to the specified database using the givennewConnectionFn
, prepares a statement using the givenupdateQuery
and inserts/updates the items.The
updateQuery
should contain a parametrized query. ThebindFn
will receive aPreparedStatement
created for this query and should bind parameters to it. It should not execute the query, call commit or any other method.The records will be committed after each batch of records and a batch mode will be used (if the driver supports it). Auto-commit will be disabled on the connection.
Example:
p.writeTo(Sinks.jdbc( "REPLACE into table (id, name) values(?, ?)", () -> return DriverManager.getConnection("jdbc:..."), (stmt, item) -> { stmt.setInt(1, item.id); stmt.setInt(2, item.name); } ));
In case of an
SQLException
the processor will automatically try to reconnect and the job won't fail, except for theSQLNonTransientException
subclass. The default local parallelism for this sink is 1.No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee. For this reason you should not use
INSERT
statement which can fail on duplicate primary key. Rather use an insert-or-update statement that can tolerate duplicate writes.- Type Parameters:
T
- type of the items the sink accepts- Parameters:
updateQuery
- the SQL query which will do the insert/updatenewConnectionFn
- the supplier of database connectionbindFn
- the function to set the parameters of the statement for each item received
-
jdbc
@Nonnull public static <T> Sink<T> jdbc(@Nonnull String updateQuery, @Nonnull String connectionUrl, @Nonnull BiConsumerEx<PreparedStatement,T> bindFn)Convenience forjdbc(String, SupplierEx, BiConsumerEx)
. The connection will be created fromconnectionUrl
. -
observable
Returns a sink that publishes to theObservable
with the provided name. The records that are sent to the observable can be read through first getting a handle to it throughJetInstance.getObservable(String)
and then subscribing to the events using the methods onObservable
.The
Observable
should be destroyed after using it. For the full description seethe javadoc for Observable
. Example:
This sink is cooperative and uses a local parallelism of 1.Observable<Integer> observable = jet.newObservable(); CompletableFuture<List<Integer>> list = observable.toFuture(o -> o.collect(toList())); pipeline.readFrom(TestSources.items(1, 2, 3, 4)) .writeTo(Sinks.observable(observable)); Job job = jet.newJob(pipeline); System.out.println(list.get()); observable.destroy();
- Since:
- 4.0
-
observable
Returns a sink that publishes to the providedObservable
. More precisely, it takes the name of the givenObservable
and then independently retrieves anObservable
with the same name from the cluster where the job is running. To prevent surprising behavior, make sure you have obtained theObservable
from the same cluster to which you will submit the pipeline.For more details refer to
observable(name)
.- Since:
- 4.0
-