Class Edge
- All Implemented Interfaces:
DataSerializable
,IdentifiedDataSerializable
public class Edge extends Object implements IdentifiedDataSerializable
vertices
in a DAG
.
Conceptually, data travels over the edge from the source vertex to the
destination vertex. Practically, since the vertex is distributed across
the cluster and across threads in each cluster member, the edge is
implemented by a number of concurrent queues and network sender/receiver
pairs.
It is often desirable to arrange that all items belonging to the same
collation key are received by the same processing unit (instance of
Processor
). This is achieved by configuring an appropriate
Partitioner
on the edge. The partitioner will determine the
partition ID of each item and all items with the same partition ID will
be routed to the same Processor
instance. Depending on the value
of edge's distributed property, the processor will be unique
cluster-wide, or only within each member.
A newly instantiated Edge is non-distributed with a UNICAST
routing policy.
- Since:
- 3.0
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Edge.RoutingPolicy
An edge describes a connection from many upstream processors to many downstream processors. -
Constructor Summary
-
Method Summary
Modifier and Type Method Description Edge
allToOne(Object key)
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
.static Edge
between(Vertex source, Vertex destination)
Returns an edge between two vertices.Edge
broadcast()
Activates theBROADCAST
routing policy.Edge
distributed()
Declares that the edge is distributed.boolean
equals(Object obj)
static Edge
from(Vertex source)
Returns an edge with the given source vertex and no destination vertex.static Edge
from(Vertex source, int ordinal)
Returns an edge with the given source vertex at the given ordinal and no destination vertex.int
getClassId()
Returns type identifier for this class.EdgeConfig
getConfig()
Returns theEdgeConfig
instance associated with this edge.Vertex
getDestination()
Returns this edge's destination vertex.String
getDestName()
Returns the name of the destination vertex.int
getDestOrdinal()
Returns the ordinal of the edge at the destination vertex.int
getFactoryId()
Returns DataSerializableFactory factory ID for this class.Partitioner<?>
getPartitioner()
Returns the instance encapsulating the partitioning strategy in effect on this edge.int
getPriority()
Returns the value of edge's priority, as explained onpriority(int)
.Edge.RoutingPolicy
getRoutingPolicy()
Returns theEdge.RoutingPolicy
in effect on the edge.Vertex
getSource()
Returns this edge's source vertex.String
getSourceName()
Returns the name of the source vertex.int
getSourceOrdinal()
Returns the ordinal of the edge at the source vertex.int
hashCode()
boolean
isDistributed()
Says whether this edge is distributed.Edge
isolated()
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors.<T> Edge
partitioned(FunctionEx<T,?> extractKeyFn)
Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy.<T, K> Edge
partitioned(FunctionEx<T,K> extractKeyFn, Partitioner<? super K> partitioner)
Activates thePARTITIONED
routing policy and applies the provided partitioning strategy.Edge
priority(int priority)
Sets the priority of the edge.void
readData(ObjectDataInput in)
Reads fields from the input streamEdge
setConfig(EdgeConfig config)
Assigns anEdgeConfig
to this edge.Edge
to(Vertex destination)
Sets the destination vertex of this edge, with ordinal 0.Edge
to(Vertex destination, int ordinal)
Sets the destination vertex and ordinal of this edge.String
toString()
Edge
unicast()
Chooses theUNICAST
routing policy for this edge.void
writeData(ObjectDataOutput out)
Writes object fields to output stream
-
Constructor Details
-
Method Details
-
between
Returns an edge between two vertices. The ordinal of the edge is 0 at both ends. Equivalent tofrom(source).to(destination)
.- Parameters:
source
- the source vertexdestination
- the destination vertex
-
from
Returns an edge with the given source vertex and no destination vertex. The ordinal of the edge is 0. Typically followed by one of theto()
method calls. -
from
Returns an edge with the given source vertex at the given ordinal and no destination vertex. Typically follewed by a call to one of theto()
methods. -
to
Sets the destination vertex of this edge, with ordinal 0. -
to
Sets the destination vertex and ordinal of this edge. -
getSource
Returns this edge's source vertex. -
getDestination
Returns this edge's destination vertex. -
getSourceName
Returns the name of the source vertex. -
getSourceOrdinal
public int getSourceOrdinal()Returns the ordinal of the edge at the source vertex. -
getDestName
Returns the name of the destination vertex. -
getDestOrdinal
public int getDestOrdinal()Returns the ordinal of the edge at the destination vertex. -
priority
Sets the priority of the edge. A lower number means higher priority and the default is 0.Example: there two incoming edges on a vertex, with priorities 1 and 2. The data from the edge with priority 1 will be processed in full before accepting any data from the edge with priority 2.
Possible deadlock
If you split the output of one source vertex and later join the streams with different priorities, you're very likely to run into a deadlock. Consider this DAG:S --+---- V1 ----+--- J \ / +-- V2 --+
The vertexJ
joins the streams, that were originally split from sourceS
. Let's say the input fromV1
has higher priority than the input fromV2
. In this case, no item fromV2
will be processed byJ
beforeV1
completes, which presupposes thatS
also completes. ButS
cannot complete, because it can't emit all items toV2
becauseV2
is blocked byJ
, which is not processing its items. This is a deadlock.This DAG can work only if
S
emits as few items into both paths as can fit into the queues (see queue size configuration.Note
Having different priority edges will cause postponing of the first snapshot until after upstream vertices of higher priority edges are completed. Reason: after receiving abarrier
we stop processing items on that edge until the barrier is received from all other edges. However, we also don't process lower priority edges until higher priority edges are done, which prevents receiving the barrier on them, which in the end stalls the job indefinitely. Technically this applies only toEXACTLY_ONCE
snapshot mode, but the snapshot is also postponed forAT_LEAST_ONCE
jobs, because the snapshot won't complete until after all higher priority edges are completed and will increase the number of duplicately processed items. -
getPriority
public int getPriority()Returns the value of edge's priority, as explained onpriority(int)
. -
unicast
Chooses theUNICAST
routing policy for this edge. -
partitioned
Activates thePARTITIONED
routing policy and applies thedefault
Hazelcast partitioning strategy. The strategy is applied to the result of theextractKeyFn
function. -
partitioned
@Nonnull public <T, K> Edge partitioned(@Nonnull FunctionEx<T,K> extractKeyFn, @Nonnull Partitioner<? super K> partitioner)Activates thePARTITIONED
routing policy and applies the provided partitioning strategy. The strategy is applied to the result of theextractKeyFn
function. -
allToOne
Activates a special-casedPARTITIONED
routing policy where all items will be routed to the same partition ID, determined from the givenkey
. It means that all items will be directed to the same processor and other processors will be idle.It is equivalent to using
partitioned(t -> key)
, but it a has small optimization that the partition ID is not recalculated for each stream item. -
broadcast
Activates theBROADCAST
routing policy. -
isolated
Activates theISOLATED
routing policy which establishes isolated paths from upstream to downstream processors. Each downstream processor is assigned exactly one upstream processor and each upstream processor is assigned a disjoint subset of downstream processors. This allows the selective application of backpressure to just one source processor that feeds a given downstream processor.These restrictions imply that the downstream's local parallelism cannot be less than upstream's. Since all traffic will be local, this policy is not allowed on a distributed edge.
-
getPartitioner
Returns the instance encapsulating the partitioning strategy in effect on this edge. -
getRoutingPolicy
Returns theEdge.RoutingPolicy
in effect on the edge. -
distributed
Declares that the edge is distributed. A non-distributed edge only transfers data within the same member. If the data source running on local member is distributed (produces only a slice of all the data on any given member), the local processors will not observe all the data. The same holds true when the data originates from an upstream distributed edge.A distributed edge allows all the data to be observed by all the processors (using the
BROADCAST
routing policy) and, more attractively, all the data with a given partition ID to be observed by the same unique processor, regardless of whether it is running on the local or a remote member (using thePARTITIONED
routing policy). -
isDistributed
public boolean isDistributed()Says whether this edge is distributed. The effects of this property are discussed indistributed()
. -
getConfig
Returns theEdgeConfig
instance associated with this edge. Default value isnull
. -
setConfig
Assigns anEdgeConfig
to this edge. Ifnull
is supplied, the edge will useJetConfig.getDefaultEdgeConfig()
. -
toString
-
equals
-
hashCode
public int hashCode() -
writeData
Description copied from interface:DataSerializable
Writes object fields to output stream- Specified by:
writeData
in interfaceDataSerializable
- Parameters:
out
- output- Throws:
IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the output stream has been closed.
-
readData
Description copied from interface:DataSerializable
Reads fields from the input stream- Specified by:
readData
in interfaceDataSerializable
- Parameters:
in
- input- Throws:
IOException
- if an I/O error occurs. In particular, anIOException
may be thrown if the input stream has been closed.
-
getFactoryId
public int getFactoryId()Description copied from interface:IdentifiedDataSerializable
Returns DataSerializableFactory factory ID for this class.- Specified by:
getFactoryId
in interfaceIdentifiedDataSerializable
- Returns:
- factory ID
-
getClassId
public int getClassId()Description copied from interface:IdentifiedDataSerializable
Returns type identifier for this class. It should be unique per DataSerializableFactory.- Specified by:
getClassId
in interfaceIdentifiedDataSerializable
- Returns:
- type ID
-