Class FileSourceBuilder
public final class FileSourceBuilder extends Object
mapOutputFn
- Since:
- 3.0
-
Method Summary
Modifier and Type Method Description BatchSource<String>
build()
Convenience forbuild(BiFunctionEx)
.<T> BatchSource<T>
build(BiFunctionEx<String,String,? extends T> mapOutputFn)
Builds a custom fileBatchSource
with supplied components and the output functionmapOutputFn
.<T> BatchSource<T>
build(FunctionEx<? super Path,? extends Stream<T>> readFileFn)
Builds a custom fileBatchSource
with supplied components.StreamSource<String>
buildWatcher()
Convenience forbuildWatcher(BiFunctionEx)
.<T> StreamSource<T>
buildWatcher(BiFunctionEx<String,String,? extends T> mapOutputFn)
Builds a source that emits a stream of lines of text coming from files in the watched directory (but not its subdirectories).FileSourceBuilder
charset(Charset charset)
Sets the character set used to encode the files.FileSourceBuilder
glob(String glob)
Sets the globbing mask, seegetPathMatcher()
.FileSourceBuilder
sharedFileSystem(boolean sharedFileSystem)
Sets if files are in a shared storage visible to all members.
-
Method Details
-
glob
Sets the globbing mask, seegetPathMatcher()
. Default value is"*"
which means all files. -
sharedFileSystem
Sets if files are in a shared storage visible to all members. Default value isfalse
.If
sharedFileSystem
istrue
, Jet will assume all members see the same files. They will split the work so that each member will read a part of the files. IfsharedFileSystem
isfalse
, each member will read all files in the directory, assuming the are local.If you start all the members on a single machine (such as for development), set this property to true. If you have multiple machines with multiple members each and the directory is not a shared storage, it's not possible to configure the file reader correctly - use only one member per machine.
-
charset
Sets the character set used to encode the files. Default value isStandardCharsets.UTF_8
.Setting this component has no effect if the user provides a custom
readFileFn
to thebuild()
method. -
build
Convenience forbuild(BiFunctionEx)
. Source emits lines to downstream without any transformation. -
build
@Nonnull public <T> BatchSource<T> build(@Nonnull BiFunctionEx<String,String,? extends T> mapOutputFn)Builds a custom fileBatchSource
with supplied components and the output functionmapOutputFn
.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any
IOException
will cause the job to fail. The files must not change while being read; if they do, the behavior is unspecified.The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
- Type Parameters:
T
- the type of the items the source emits- Parameters:
mapOutputFn
- the function which creates output object from each line. Gets the filename and line as parameters
-
build
@Nonnull public <T> BatchSource<T> build(@Nonnull FunctionEx<? super Path,? extends Stream<T>> readFileFn)Builds a custom fileBatchSource
with supplied components. Will use the suppliedreadFileFn
to read the files. The configured is ignored.The source does not save any state to snapshot. If the job is restarted, it will re-emit all entries.
Any
IOException
will cause the job to fail. The files must not change while being read; if they do, the behavior is unspecified.The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
- Type Parameters:
T
- the type of items returned from file reading- Parameters:
readFileFn
- the function to read objects from a file. Gets filePath
as parameter and returns aStream
of items.
-
buildWatcher
Convenience forbuildWatcher(BiFunctionEx)
. -
buildWatcher
@Nonnull public <T> StreamSource<T> buildWatcher(@Nonnull BiFunctionEx<String,String,? extends T> mapOutputFn)Builds a source that emits a stream of lines of text coming from files in the watched directory (but not its subdirectories). It will emit only new contents added after startup: both new files and new content appended to existing ones.If, during the scanning phase, the source observes a file that doesn't end with a newline, it will assume that there is a line just being written. This line won't appear in its output.
The source completes when the directory is deleted. However, in order to delete the directory, all files in it must be deleted and if you delete a file that is currently being read from, the job may encounter an
IOException
. The directory must be deleted on all nodes ifsharedFileSystem
isfalse
.Any
IOException
will cause the job to fail.The source does not save any state to snapshot. If the job is restarted, lines added after the restart will be emitted, which gives at-most-once behavior.
The default local parallelism for this processor is 2 (or 1 if just 1 CPU is available).
Limitation on Windows
On Windows theWatchService
is not notified of appended lines until the file is closed. If the file-writing process keeps the file open while appending, the processor may fail to observe the changes. It will be notified if any process tries to open that file, such as looking at the file in Explorer. This holds for Windows 10 with the NTFS file system and might change in future. You are advised to do your own testing on your target Windows platform.Use the latest JRE
The underlying JDK API (WatchService
) has a history of unreliability and this source may experience infinite blocking, missed, or duplicate events as a result. Such problems may be resolved by upgrading the JRE to the latest version.Appending lines using an text editor
If you're testing this source, you might think of using a text editor to append the lines. However, it might not work as expected because some editors write to a temp file and then rename it or append extra newline character at the end which gets overwritten if more text is added in the editor. The best way to append is to useecho text >> yourFile
.- Type Parameters:
T
- the type of the items the source emits- Parameters:
mapOutputFn
- the function which creates output object from each line. Gets the filename and line as parameters
-