S
- type of the source state objectpublic final class SourceBuilder<S> extends Object
Modifier and Type | Class and Description |
---|---|
class |
SourceBuilder.Batch<T>
A builder of a batch stream source.
|
static interface |
SourceBuilder.SourceBuffer<T>
The buffer object that the
fillBufferFn gets on each call. |
class |
SourceBuilder.Stream<T>
A builder of an unbounded stream source.
|
static interface |
SourceBuilder.TimestampedSourceBuffer<T>
The buffer object that the
fillBufferFn gets on each call. |
class |
SourceBuilder.TimestampedStream<T>
A builder of an unbounded stream source with timestamps.
|
Modifier and Type | Method and Description |
---|---|
static <S> SourceBuilder.Batch<Void> |
batch(String name,
FunctionEx<? super Processor.Context,? extends S> createFn)
Returns a fluent-API builder with which you can create a batch source for a Jet pipeline.
|
static <S> SourceBuilder.Stream<Void> |
stream(String name,
FunctionEx<? super Processor.Context,? extends S> createFn)
Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.
|
static <S> SourceBuilder.TimestampedStream<Void> |
timestampedStream(String name,
FunctionEx<? super Processor.Context,? extends S> createFn)
Returns a fluent-API builder with which you can create an unbounded stream source for a Jet pipeline.
|
@Nonnull public static <S> SourceBuilder.Batch<Void> batch(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends S> createFn)
Each parallel processor that drives your source has its private instance
of a state object it gets from your createFn
. To get the data
items to emit to the pipeline, the processor repeatedly calls your fillBufferFn
with the state object and a buffer object.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Once it has emitted all the data, fillBufferFn
must call buffer.close()
. This signals Jet to not call fillBufferFn
again and at some later point it will call the destroyFn
on the state object.
Unless you call builder.distributed()
,
Jet will create just a single processor that should emit all the data.
If you do call it, make sure your distributed source takes care of
splitting the data between processors. Your createFn
should
consult ProcessorMetaSupplier.Context.totalParallelism()
procContext.totalParallelism()}
and Processor.Context.globalProcessorIndex()
procContext.globalProcessorIndex()}.
Jet calls it exactly once with each globalProcessorIndex
from 0
to totalParallelism - 1
and each of the resulting state objects
must emit its unique slice of the total source data.
Here's an example that builds a simple, non-distributed source that
reads the lines from a single text file. Since you can't control on
which member of the Jet cluster the source's processor will run, the
file should be available on all members. The source emits one line per
fillBufferFn
call.
BatchSource<String> fileSource = SourceBuilder
.batch("file-source", x ->
new BufferedReader(new FileReader("input.txt")))
.<String>fillBufferFn((in, buf) -> {
String line = in.readLine();
if (line != null) {
buf.add(line);
} else {
buf.close();
}
})
.destroyFn(BufferedReader::close)
.build();
Pipeline p = Pipeline.create();
BatchStage<String> srcStage = p.drawFrom(fileSource);
S
- type of the state objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the state object@Nonnull public static <S> SourceBuilder.Stream<Void> stream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends S> createFn)
Each parallel processor that drives your source has its private instance
of a state object it gets from your createFn
. To get the data
items to emit to the pipeline, the processor repeatedly calls your fillBufferFn
with the state object and a buffer object.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call builder.distributed()
,
Jet will create just a single processor that should emit all the data.
If you do call it, make sure your distributed source takes care of
splitting the data between processors. Your createFn
should
consult procContext.totalParallelism()
and procContext.globalProcessorIndex()
.
Jet calls it exactly once with each globalProcessorIndex
from 0
to totalParallelism - 1
and each of the resulting state objects
must emit its unique slice of the total source data.
Here's an example that builds a simple, non-distributed source that polls an URL and emits all the lines it gets in the response:
StreamSource<String> socketSource = SourceBuilder
.stream("http-source", ctx -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
new BufferedReader(new InputStreamReader(
httpc.execute(new HttpGet("localhost:8008"))
.getEntity().getContent()))
.lines()
.forEach(buf::add);
})
.destroyFn(Closeable::close)
.build();
Pipeline p = Pipeline.create();
StreamStage<String> srcStage = p.drawFrom(socketSource);
NOTE: the source you build with this builder is not fault-tolerant. You shouldn't use it in jobs that require a processing guarantee.
S
- type of the state objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the state object@Nonnull public static <S> SourceBuilder.TimestampedStream<Void> timestampedStream(@Nonnull String name, @Nonnull FunctionEx<? super Processor.Context,? extends S> createFn)
Each parallel processor that drives your source has its private instance
of a state object it gets from the given createFn
. To get the
data items to emit to the pipeline, the processor repeatedly calls your
fillBufferFn
with the state object and a buffer object. The
buffer's add()
method
takes two arguments: the item and the timestamp in milliseconds.
Your function should add some items to the buffer, ideally those it has ready without having to block. A hundred items at a time is enough to eliminate any per-call overheads within Jet. If it doesn't have any items ready, it may also return without adding anything. In any case the function should not take more than a second or so to complete, otherwise you risk interfering with Jet's coordination mechanisms and getting bad performance.
Unless you call builder.distributed()
, Jet will create just a single processor that
should emit all the data. If you do call it, make sure your distributed
source takes care of splitting the data between processors. Your createFn
should consult procContext.totalParallelism()
and procContext.globalProcessorIndex()
. Jet calls it exactly once with each
globalProcessorIndex
from 0 to totalParallelism - 1
and
each of the resulting state objects must emit its unique slice of the
total source data.
Here's an example that builds a simple, non-distributed source that polls a URL and emits all the lines it gets in the response, interpreting the first 9 characters as the timestamp.
StreamSource<String> socketSource = SourceBuilder
.timestampedStream("http-source", ctx -> HttpClients.createDefault())
.<String>fillBufferFn((httpc, buf) -> {
new BufferedReader(new InputStreamReader(
httpc.execute(new HttpGet("localhost:8008"))
.getEntity().getContent()))
.lines()
.forEach(line -> {
long timestamp = Long.valueOf(line.substring(0, 9));
buf.add(line.substring(9), timestamp);
});
})
.destroyFn(Closeable::close)
.build();
Pipeline p = Pipeline.create();
StreamStage<String> srcStage = p.drawFrom(socketSource)
.withNativeTimestamps(SECONDS.toMillis(5));
NOTE 1: the source you build with this builder is not fault-tolerant. You shouldn't use it in jobs that require a processing guarantee. Use a custom processor if you need fault tolerance.
NOTE 2: if the data source you're adapting to Jet is partitioned, you may run into issues with event skew between partitions assigned to single parallel processor. The timestamp you get from one partition may be significantly behind the timestamp you already got from another partition. If the skew is more than the allowed lag you configured, you risk that the events will be dropped. Use a custom processor if you need to coalesce watermarks from multiple partitions.
S
- type of the state objectname
- a descriptive name for the source (for diagnostic purposes)createFn
- a function that creates the state objectCopyright © 2019 Hazelcast, Inc.. All rights reserved.