node

package
v0.0.0-...-5777b07 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const NodeVarPrefix = "node"

INFO: this is workaround for expr lang

Variables

View Source
var (
	ErrVarGroupNotFound error = errors.New("group not found")
	ErrVarNodeNotFound  error = errors.New("node not found")
	ErrVarKeyNotFound   error = errors.New("key not found")
)
View Source
var ErrNodeNotFound = errors.New("node not found")

Functions

func BuildContextValue

func BuildContextValue(varMap map[string]any) (*structpb.Value, error)

BuildContextValue converts a VarMap to a structpb.Value for JS execution context

func BuildLoopExecutionEdgeMap

func BuildLoopExecutionEdgeMap(edgeMap mflow.EdgesMap, loopNodeID idwrap.IDWrap, loopTargets []idwrap.IDWrap) mflow.EdgesMap

BuildLoopExecutionEdgeMap returns an edge map suitable for executing loop bodies. It rewrites the loop handle to include only the provided entry targets so duplicate edges to downstream nodes do not participate in scheduling decisions.

When the requested targets already match the existing loop edges, the original map is returned to avoid unnecessary allocations.

func BuildPendingMap

func BuildPendingMap(predecessors map[idwrap.IDWrap][]idwrap.IDWrap) map[idwrap.IDWrap]uint32

BuildPendingMap constructs a PendingAtmoicMap compatible with the runner by counting predecessors for each node. Only entries with more than one predecessor are retained to match runner expectations.

func CloneIterationLabels

func CloneIterationLabels(labels []runner.IterationLabel) []runner.IterationLabel

CloneIterationLabels returns a defensive copy of iteration labels to avoid slice aliasing.

func ClonePendingMap

func ClonePendingMap(src map[idwrap.IDWrap]uint32) map[idwrap.IDWrap]uint32

ClonePendingMap makes a shallow copy of a PendingAtmoicMap. It returns nil when the source is empty to keep downstream checks simple.

func DeepCopyValue

func DeepCopyValue(v any) any

DeepCopyValue creates a deep copy of any value

func DeepCopyVarMap

func DeepCopyVarMap(req *FlowNodeRequest) map[string]any

DeepCopyVarMap creates a deep copy of the VarMap to prevent concurrent access issues

func FilterLoopEntryNodes

func FilterLoopEntryNodes(edgeMap mflow.EdgesMap, loopTargets []idwrap.IDWrap) []idwrap.IDWrap

FilterLoopEntryNodes removes loop targets that are reachable from other loop targets, ensuring we only return the true entry nodes for a loop body. This prevents downstream nodes from being re-executed when the loop handle fan-out includes both the body head and interior nodes (can happen after noop pruning).

If filtering removes every target (e.g. due to a cycle), we fall back to the original slice so execution can still proceed.

func ParseResultValue

func ParseResultValue(result *structpb.Value) (map[string]interface{}, error)

ParseResultValue converts a structpb.Value result from JS execution to a map

func ReadNodeVar

func ReadNodeVar(a *FlowNodeRequest, name, key string) (interface{}, error)

func ReadNodeVarWithTracking

func ReadNodeVarWithTracking(a *FlowNodeRequest, name, key string, tracker *tracking.VariableTracker) (interface{}, error)

ReadNodeVarWithTracking reads a node variable with optional tracking

func ReadVarRaw

func ReadVarRaw(a *FlowNodeRequest, key string) (interface{}, error)

func ReadVarRawWithTracking

func ReadVarRawWithTracking(a *FlowNodeRequest, key string, tracker *tracking.VariableTracker) (interface{}, error)

ReadVarRawWithTracking reads a raw variable with optional tracking

func WriteNodeVar

func WriteNodeVar(a *FlowNodeRequest, name string, key string, v interface{}) error

func WriteNodeVarBulk

func WriteNodeVarBulk(a *FlowNodeRequest, name string, v map[string]interface{}) error

func WriteNodeVarBulkWithTracking

func WriteNodeVarBulkWithTracking(a *FlowNodeRequest, name string, v map[string]interface{}, tracker *tracking.VariableTracker) error

WriteNodeVarBulkWithTracking writes bulk node variables with optional tracking

func WriteNodeVarRaw

func WriteNodeVarRaw(a *FlowNodeRequest, name string, v interface{}) error

func WriteNodeVarRawWithTracking

func WriteNodeVarRawWithTracking(a *FlowNodeRequest, name string, v interface{}, tracker *tracking.VariableTracker) error

WriteNodeVarRawWithTracking writes a raw node variable with optional tracking

func WriteNodeVarWithTracking

func WriteNodeVarWithTracking(a *FlowNodeRequest, name string, key string, v interface{}, tracker *tracking.VariableTracker) error

WriteNodeVarWithTracking writes a node variable with optional tracking

func WriteVar

func WriteVar(a *FlowNodeRequest, key string, v interface{})

WriteVar writes a top-level variable to the VarMap

Types

type FlowNode

type FlowNode interface {
	GetID() idwrap.IDWrap
	GetName() string

	// TODO: will implement streaming in the future
	RunSync(ctx context.Context, req *FlowNodeRequest) FlowNodeResult
	RunAsync(ctx context.Context, req *FlowNodeRequest, resultChan chan FlowNodeResult)
}

type FlowNodeRequest

type FlowNodeRequest struct {
	VarMap           map[string]any
	ReadWriteLock    *sync.RWMutex
	NodeMap          map[idwrap.IDWrap]FlowNode
	EdgeSourceMap    mflow.EdgesMap
	Timeout          time.Duration
	LogPushFunc      LogPushFunc
	PendingAtmoicMap map[idwrap.IDWrap]uint32
	VariableTracker  *tracking.VariableTracker // Optional tracking for input/output data
	IterationContext *runner.IterationContext  // For hierarchical execution naming in loops
	ExecutionID      idwrap.IDWrap             // Unique ID for this specific execution of the node
	Logger           *slog.Logger              // Optional structured logger for node diagnostics
}

type FlowNodeResult

type FlowNodeResult struct {
	NextNodeID []idwrap.IDWrap
	Err        error
	// SkipFinalStatus tells the runner not to create a final execution status.
	// Used by FOR/FOREACH nodes that handle their own iteration status logging.
	SkipFinalStatus bool
	AuxiliaryID     *idwrap.IDWrap
}

type LogPushFunc

type LogPushFunc func(status runner.FlowNodeStatus)

type LoopCoordinator

type LoopCoordinator interface {
	IsLoopCoordinator() bool
}

LoopCoordinator marks nodes that orchestrate loop execution.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL