Documentation
¶
Index ¶
- type Closable
- type Collector
- type DatetimeRange
- type DefaultSourceTuple
- func NewDefaultRawTuple(raw []byte, meta map[string]interface{}, ts time.Time) *DefaultSourceTuple
- func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
- func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, ...) *DefaultSourceTuple
- type Emitter
- type Function
- type FunctionContext
- type GraphNode
- type Logger
- type LookupSource
- type MessageClient
- type Operator
- type PrintableTopo
- type Qos
- type RawTuple
- type ResendSink
- type RestartStrategy
- type Rewindable
- type Rule
- type RuleGraph
- type RuleOption
- type Sink
- type Source
- type SourceConnector
- type SourceMeta
- type SourceTuple
- type Store
- type StreamContext
- type Subscriber
- type TopNode
- type TopicChannel
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Closable ¶
type Closable interface {
Close(ctx StreamContext) error
}
type DatetimeRange ¶
type DefaultSourceTuple ¶
type DefaultSourceTuple struct {
Mess map[string]interface{} `json:"message"`
M map[string]interface{} `json:"meta"`
Time time.Time `json:"timestamp"`
// contains filtered or unexported fields
}
func NewDefaultRawTuple ¶
func NewDefaultRawTuple(raw []byte, meta map[string]interface{}, ts time.Time) *DefaultSourceTuple
NewDefaultRawTuple creates a new DefaultSourceTuple with raw data. Use this when extend source connector
func NewDefaultSourceTuple ¶
func NewDefaultSourceTuple(message map[string]interface{}, meta map[string]interface{}) *DefaultSourceTuple
NewDefaultSourceTuple creates a new DefaultSourceTuple with message and metadata. Use this when extend all in one source.
func NewDefaultSourceTupleWithTime ¶
func NewDefaultSourceTupleWithTime(message map[string]interface{}, meta map[string]interface{}, timestamp time.Time) *DefaultSourceTuple
func (*DefaultSourceTuple) Message ¶
func (t *DefaultSourceTuple) Message() map[string]interface{}
func (*DefaultSourceTuple) Meta ¶
func (t *DefaultSourceTuple) Meta() map[string]interface{}
func (*DefaultSourceTuple) Raw ¶
func (t *DefaultSourceTuple) Raw() []byte
func (*DefaultSourceTuple) Timestamp ¶
func (t *DefaultSourceTuple) Timestamp() time.Time
type Function ¶
type Function interface {
// Validate The argument is a list of xsql.Expr
Validate(args []interface{}) error
// Exec Execute the function, return the result and if execution is successful.
// If execution fails, return the error and false.
Exec(args []interface{}, ctx FunctionContext) (interface{}, bool)
// IsAggregate If this function is an aggregate function. Each parameter of an aggregate function will be a slice
IsAggregate() bool
}
type FunctionContext ¶
type FunctionContext interface {
StreamContext
GetFuncId() int
}
type Logger ¶
type Logger interface {
Debug(args ...interface{})
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
Debugln(args ...interface{})
Infoln(args ...interface{})
Warnln(args ...interface{})
Errorln(args ...interface{})
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
type LookupSource ¶
type LookupSource interface {
// Open creates the connection to the external data source
Open(ctx StreamContext) error
// Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
// read from the yaml
Configure(datasource string, props map[string]interface{}) error
// Lookup receive lookup values to construct the query and return query results
Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]SourceTuple, error)
Closable
}
type MessageClient ¶
type MessageClient interface {
Subscribe(c StreamContext, subChan []TopicChannel, messageErrors chan error, params map[string]interface{}) error
Publish(c StreamContext, topic string, message []byte, params map[string]interface{}) error
Ping() error
}
type PrintableTopo ¶
type ResendSink ¶
type ResendSink interface {
Sink
// CollectResend Called when the sink cache resend is triggered
CollectResend(ctx StreamContext, data interface{}) error
}
type RestartStrategy ¶
type Rewindable ¶
type Rule ¶
type Rule struct {
Triggered bool `json:"triggered"`
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"` // The display name of a rule
Sql string `json:"sql,omitempty"`
Graph *RuleGraph `json:"graph,omitempty"`
Actions []map[string]interface{} `json:"actions,omitempty"`
Options *RuleOption `json:"options,omitempty"`
}
Rule the definition of the business logic Sql and Graph are mutually exclusive, at least one of them should be set
func GetDefaultRule ¶
func (*Rule) IsLongRunningScheduleRule ¶
func (*Rule) IsScheduleRule ¶
type RuleGraph ¶
type RuleGraph struct {
Nodes map[string]*GraphNode `json:"nodes"`
Topo *PrintableTopo `json:"topo"`
}
type RuleOption ¶
type RuleOption struct {
Debug bool `json:"debug" yaml:"debug"`
LogFilename string `json:"logFilename" yaml:"logFilename"`
IsEventTime bool `json:"isEventTime" yaml:"isEventTime"`
LateTol int64 `json:"lateTolerance" yaml:"lateTolerance"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
BufferLength int `json:"bufferLength" yaml:"bufferLength"`
SendMetaToSink bool `json:"sendMetaToSink" yaml:"sendMetaToSink"`
SendError bool `json:"sendError" yaml:"sendError"`
Qos Qos `json:"qos" yaml:"qos"`
CheckpointInterval int `json:"checkpointInterval" yaml:"checkpointInterval"`
RestartStrategy *RestartStrategy `json:"restartStrategy" yaml:"restartStrategy"`
Cron string `json:"cron" yaml:"cron"`
Duration string `json:"duration" yaml:"duration"`
CronDatetimeRange []DatetimeRange `json:"cronDatetimeRange" yaml:"cronDatetimeRange"`
}
type Sink ¶
type Sink interface {
// Open Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext) error
// Configure Called during initialization. Configure the sink with the properties from rule action definition
Configure(props map[string]interface{}) error
// Collect Called when each row of data has transferred to this sink
Collect(ctx StreamContext, data interface{}) error
Closable
}
type Source ¶
type Source interface {
// Open Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
// Configure Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
// read from the yaml
Configure(datasource string, props map[string]interface{}) error
Closable
}
type SourceConnector ¶
type SourceConnector interface {
Source
Connect(ctx StreamContext) error
Subscriber
}
type SourceMeta ¶
type SourceMeta struct {
SourceName string `json:"sourceName"` // the name of the stream or table
SourceType string `json:"sourceType"` // stream or table
}
SourceMeta is the meta data of a source node. It describes what existed stream/table to refer to. It is part of the Props in the GraphNode and it is optional
type SourceTuple ¶
type StreamContext ¶
type StreamContext interface {
context.Context
GetLogger() Logger
GetRuleId() string
GetOpId() string
GetInstanceId() int
GetRootPath() string
WithMeta(ruleId string, opId string, store Store) StreamContext
WithInstance(instanceId int) StreamContext
WithCancel() (StreamContext, context.CancelFunc)
SetError(e error)
// IncrCounter State handling
IncrCounter(key string, amount int) error
GetCounter(key string) (int, error)
PutState(key string, value interface{}) error
GetState(key string) (interface{}, error)
DeleteState(key string) error
// ParseTemplate parse the template string with the given data
ParseTemplate(template string, data interface{}) (string, error)
// ParseJsonPath parse the jsonPath string with the given data
ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
// TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields
// TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property
// It is recommended that you do not configure both the dataTemplate property and the fields property.
// The second parameter is whether the data is transformed or just return as its json format.
TransformOutput(data interface{}) ([]byte, bool, error)
// Decode is set in the source according to the format.
// It decodes byte array into map or map slice.
Decode(data []byte) (map[string]interface{}, error)
DecodeIntoList(data []byte) ([]map[string]interface{}, error)
}
type Subscriber ¶
type Subscriber interface {
Subscribe(ctx StreamContext) error
}
type TopicChannel ¶
type TopicChannel struct {
// Topic for subscriber to filter on if any
Topic string
// Messages is the returned message channel for the subscriber
Messages chan<- interface{}
}
TopicChannel is the data structure for subscriber
Click to show internal directories.
Click to hide internal directories.