Documentation
¶
Index ¶
- Constants
- Variables
- func BuildContextValue(varMap map[string]any) (*structpb.Value, error)
- func BuildLoopExecutionEdgeMap(edgeMap mflow.EdgesMap, loopNodeID idwrap.IDWrap, loopTargets []idwrap.IDWrap) mflow.EdgesMap
- func BuildPendingMap(predecessors map[idwrap.IDWrap][]idwrap.IDWrap) map[idwrap.IDWrap]uint32
- func CloneIterationLabels(labels []runner.IterationLabel) []runner.IterationLabel
- func ClonePendingMap(src map[idwrap.IDWrap]uint32) map[idwrap.IDWrap]uint32
- func DeepCopyValue(v any) any
- func DeepCopyVarMap(req *FlowNodeRequest) map[string]any
- func FilterLoopEntryNodes(edgeMap mflow.EdgesMap, loopTargets []idwrap.IDWrap) []idwrap.IDWrap
- func ParseResultValue(result *structpb.Value) (map[string]interface{}, error)
- func ReadNodeVar(a *FlowNodeRequest, name, key string) (interface{}, error)
- func ReadNodeVarWithTracking(a *FlowNodeRequest, name, key string, tracker *tracking.VariableTracker) (interface{}, error)
- func ReadVarRaw(a *FlowNodeRequest, key string) (interface{}, error)
- func ReadVarRawWithTracking(a *FlowNodeRequest, key string, tracker *tracking.VariableTracker) (interface{}, error)
- func WriteNodeVar(a *FlowNodeRequest, name string, key string, v interface{}) error
- func WriteNodeVarBulk(a *FlowNodeRequest, name string, v map[string]interface{}) error
- func WriteNodeVarBulkWithTracking(a *FlowNodeRequest, name string, v map[string]interface{}, ...) error
- func WriteNodeVarRaw(a *FlowNodeRequest, name string, v interface{}) error
- func WriteNodeVarRawWithTracking(a *FlowNodeRequest, name string, v interface{}, ...) error
- func WriteNodeVarWithTracking(a *FlowNodeRequest, name string, key string, v interface{}, ...) error
- func WriteVar(a *FlowNodeRequest, key string, v interface{})
- type FlowNode
- type FlowNodeRequest
- type FlowNodeResult
- type LogPushFunc
- type LoopCoordinator
Constants ¶
const NodeVarPrefix = "node"
INFO: this is workaround for expr lang
Variables ¶
var ( ErrVarGroupNotFound error = errors.New("group not found") ErrVarNodeNotFound error = errors.New("node not found") ErrVarKeyNotFound error = errors.New("key not found") )
var ErrNodeNotFound = errors.New("node not found")
Functions ¶
func BuildContextValue ¶
BuildContextValue converts a VarMap to a structpb.Value for JS execution context
func BuildLoopExecutionEdgeMap ¶
func BuildLoopExecutionEdgeMap(edgeMap mflow.EdgesMap, loopNodeID idwrap.IDWrap, loopTargets []idwrap.IDWrap) mflow.EdgesMap
BuildLoopExecutionEdgeMap returns an edge map suitable for executing loop bodies. It rewrites the loop handle to include only the provided entry targets so duplicate edges to downstream nodes do not participate in scheduling decisions.
When the requested targets already match the existing loop edges, the original map is returned to avoid unnecessary allocations.
func BuildPendingMap ¶
BuildPendingMap constructs a PendingAtmoicMap compatible with the runner by counting predecessors for each node. Only entries with more than one predecessor are retained to match runner expectations.
func CloneIterationLabels ¶
func CloneIterationLabels(labels []runner.IterationLabel) []runner.IterationLabel
CloneIterationLabels returns a defensive copy of iteration labels to avoid slice aliasing.
func ClonePendingMap ¶
ClonePendingMap makes a shallow copy of a PendingAtmoicMap. It returns nil when the source is empty to keep downstream checks simple.
func DeepCopyVarMap ¶
func DeepCopyVarMap(req *FlowNodeRequest) map[string]any
DeepCopyVarMap creates a deep copy of the VarMap to prevent concurrent access issues
func FilterLoopEntryNodes ¶
FilterLoopEntryNodes removes loop targets that are reachable from other loop targets, ensuring we only return the true entry nodes for a loop body. This prevents downstream nodes from being re-executed when the loop handle fan-out includes both the body head and interior nodes (can happen after noop pruning).
If filtering removes every target (e.g. due to a cycle), we fall back to the original slice so execution can still proceed.
func ParseResultValue ¶
ParseResultValue converts a structpb.Value result from JS execution to a map
func ReadNodeVar ¶
func ReadNodeVar(a *FlowNodeRequest, name, key string) (interface{}, error)
func ReadNodeVarWithTracking ¶
func ReadNodeVarWithTracking(a *FlowNodeRequest, name, key string, tracker *tracking.VariableTracker) (interface{}, error)
ReadNodeVarWithTracking reads a node variable with optional tracking
func ReadVarRaw ¶
func ReadVarRaw(a *FlowNodeRequest, key string) (interface{}, error)
func ReadVarRawWithTracking ¶
func ReadVarRawWithTracking(a *FlowNodeRequest, key string, tracker *tracking.VariableTracker) (interface{}, error)
ReadVarRawWithTracking reads a raw variable with optional tracking
func WriteNodeVar ¶
func WriteNodeVar(a *FlowNodeRequest, name string, key string, v interface{}) error
func WriteNodeVarBulk ¶
func WriteNodeVarBulk(a *FlowNodeRequest, name string, v map[string]interface{}) error
func WriteNodeVarBulkWithTracking ¶
func WriteNodeVarBulkWithTracking(a *FlowNodeRequest, name string, v map[string]interface{}, tracker *tracking.VariableTracker) error
WriteNodeVarBulkWithTracking writes bulk node variables with optional tracking
func WriteNodeVarRaw ¶
func WriteNodeVarRaw(a *FlowNodeRequest, name string, v interface{}) error
func WriteNodeVarRawWithTracking ¶
func WriteNodeVarRawWithTracking(a *FlowNodeRequest, name string, v interface{}, tracker *tracking.VariableTracker) error
WriteNodeVarRawWithTracking writes a raw node variable with optional tracking
func WriteNodeVarWithTracking ¶
func WriteNodeVarWithTracking(a *FlowNodeRequest, name string, key string, v interface{}, tracker *tracking.VariableTracker) error
WriteNodeVarWithTracking writes a node variable with optional tracking
func WriteVar ¶
func WriteVar(a *FlowNodeRequest, key string, v interface{})
WriteVar writes a top-level variable to the VarMap
Types ¶
type FlowNode ¶
type FlowNode interface {
GetID() idwrap.IDWrap
GetName() string
// TODO: will implement streaming in the future
RunSync(ctx context.Context, req *FlowNodeRequest) FlowNodeResult
RunAsync(ctx context.Context, req *FlowNodeRequest, resultChan chan FlowNodeResult)
}
type FlowNodeRequest ¶
type FlowNodeRequest struct {
VarMap map[string]any
ReadWriteLock *sync.RWMutex
NodeMap map[idwrap.IDWrap]FlowNode
EdgeSourceMap mflow.EdgesMap
Timeout time.Duration
LogPushFunc LogPushFunc
PendingAtmoicMap map[idwrap.IDWrap]uint32
VariableTracker *tracking.VariableTracker // Optional tracking for input/output data
IterationContext *runner.IterationContext // For hierarchical execution naming in loops
ExecutionID idwrap.IDWrap // Unique ID for this specific execution of the node
Logger *slog.Logger // Optional structured logger for node diagnostics
}
type FlowNodeResult ¶
type LogPushFunc ¶
type LogPushFunc func(status runner.FlowNodeStatus)
type LoopCoordinator ¶
type LoopCoordinator interface {
IsLoopCoordinator() bool
}
LoopCoordinator marks nodes that orchestrate loop execution.