Documentation
¶
Index ¶
- func Collect[T any](in Stream[T]) []T
- func CollectWithContext[T any](ctx context.Context, in Stream[T]) []T
- func Segregate[T any](ctx context.Context, in Stream[T], predicate func(T) bool) (Generator[T], Generator[T])
- func SegregateStream[T any](ctx context.Context, in Stream[T], predicate func(T) bool) (Stream[T], Stream[T])
- func Tee[T any](ctx context.Context, in Stream[T]) (Generator[T], Generator[T])
- func TeeStream[T any](ctx context.Context, in Stream[T]) (Stream[T], Stream[T])
- type BatchOption
- type DoOption
- type FilterMapOption
- type FilterOption
- type ForEachOutputOption
- type FromFuncOption
- type FromSeq2Value
- type Generator
- func FromFunc[T any](f func(context.Context) (T, bool), options ...FromFuncOption) Generator[T]
- func FromSeq[T any](seq iter.Seq[T]) Generator[T]
- func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
- func Of[T any](items ...T) Generator[T]
- func TeeN[T any](ctx context.Context, in Stream[T], n int) []Generator[T]
- type Item
- type MapOption
- type None
- type Pipeline
- func Batch[T any](n int, opt ...BatchOption) Pipeline[T, []T]
- func Filter[T any](f func(context.Context, T) bool, opt ...FilterOption) Pipeline[T, T]
- func FilterMap[T, U any](f func(context.Context, T) (bool, U), opt ...FilterMapOption) Pipeline[T, U]
- func FilterMapErrors[T any]() Pipeline[Item[T], error]
- func FilterMapValues[T any]() Pipeline[Item[T], T]
- func Flatten[T any]() Pipeline[[]T, T]
- func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U), opt ...ForEachOutputOption) Pipeline[T, U]
- func Map[T, U any](f func(context.Context, T) U, opt ...MapOption) Pipeline[T, U]
- func Pipe[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]
- func Pipe2[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]
- func Pipe3[A, B, C, D any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D]) Pipeline[A, D]
- func Pipe4[A, B, C, D, E any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E]) Pipeline[A, E]
- func Pipe5[A, B, C, D, E, F any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E], ...) Pipeline[A, F]
- type Stream
- type Sync
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CollectWithContext ¶
CollectWithContext collects all items from the stream and returns them as a slice. If the context is cancelled, it stops collecting items.
func SegregateStream ¶ added in v0.5.0
func SegregateStream[T any](ctx context.Context, in Stream[T], predicate func(T) bool) (Stream[T], Stream[T])
SegregateStream takes an input stream and a predicate function, and returns two streams: one containing items that satisfy the predicate and another containing items that do not.
Types ¶
type BatchOption ¶ added in v0.4.0
type BatchOption func(*batchOptions) error
func BatchBufferSize ¶ added in v0.4.0
func BatchBufferSize(n int) BatchOption
func BatchMaxWait ¶ added in v0.4.0
func BatchMaxWait(d time.Duration) BatchOption
type DoOption ¶ added in v0.4.0
type DoOption func(*doOptions) error
func DoOnBeforeClose ¶ added in v0.5.0
func DoPoolSize ¶ added in v0.4.0
type FilterMapOption ¶ added in v0.5.0
type FilterMapOption func(*filterMapOptions) error
func FilterMapBufferSize ¶ added in v0.5.0
func FilterMapBufferSize(n int) FilterMapOption
func FilterMapPoolSize ¶ added in v0.5.0
func FilterMapPoolSize(n int) FilterMapOption
type FilterOption ¶ added in v0.4.0
type FilterOption func(*filterOptions) error
func FilterBufferSize ¶ added in v0.4.0
func FilterBufferSize(n int) FilterOption
func FilterPoolSize ¶ added in v0.4.0
func FilterPoolSize(n int) FilterOption
type ForEachOutputOption ¶ added in v0.5.0
type ForEachOutputOption func(*forEachOutputOptions) error
func ForEachOutputBufferSize ¶ added in v0.5.0
func ForEachOutputBufferSize(bufferSize int) ForEachOutputOption
func ForEachOutputOnBeforeClose ¶ added in v0.5.0
func ForEachOutputOnBeforeClose(f func(context.Context)) ForEachOutputOption
func ForEachOutputPoolSize ¶ added in v0.5.0
func ForEachOutputPoolSize(poolSize int) ForEachOutputOption
type FromFuncOption ¶ added in v0.4.0
type FromFuncOption func(*fromFuncOptions) error
func FromFuncBufferSize ¶ added in v0.4.0
func FromFuncBufferSize(bufferSize int) FromFuncOption
func FromFuncOnBeforeClose ¶ added in v0.4.0
func FromFuncOnBeforeClose(f func(context.Context)) FromFuncOption
func FromFuncPoolSize ¶ added in v0.4.0
func FromFuncPoolSize(poolSize int) FromFuncOption
type FromSeq2Value ¶
type FromSeq2Value[T, U any] struct { Val1 T Val2 U }
type Generator ¶
Generator is a pipeline that generates items of type T without any input.
func FromFunc ¶
FromFunc returns a Generator that emits items generated by the given function. The returned stream will emit items until the function returns false in the second return value.
Example ¶
ctx := context.Background()
count := atomic.Int32{}
genFn := func(ctx context.Context) (int32, bool) {
value := count.Add(1)
if value > 5 {
return 0, false
}
return value, true
}
in := FromFunc(genFn)
s := in(ctx, nil)
for item := range s {
fmt.Println(item)
}
Output: 1 2 3 4 5
func FromSeq ¶
Example ¶
ctx := context.Background()
seq := slices.Values([]int{1, 2, 3, 4, 5})
in := FromSeq(seq)
s := in(ctx, nil)
for item := range s {
fmt.Println(item)
}
Output: 1 2 3 4 5
func FromSeq2 ¶
func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
Example ¶
ctx := context.Background()
seq := slices.All([]string{"a", "b", "c", "d", "e"})
in := FromSeq2(seq)
s := in(ctx, nil)
for item := range s {
fmt.Printf("%d, %s\n", item.Val1, item.Val2)
}
Output: 0, a 1, b 2, c 3, d 4, e
type Item ¶
type Item[T any] struct { // Val is the value of the item when there is no error. Val T // Err is the optional error of the item. Err error }
Item represents a single item in a data stream. It contains a value of type T and an optional error.
type MapOption ¶ added in v0.4.0
type MapOption func(*mapOptions) error
func MapBufferSize ¶ added in v0.4.0
func MapPoolSize ¶ added in v0.4.0
type None ¶
type None struct{}
None is a type that represents no value. It is typically used as the input type of generator pipeline that does not depend on any input stream or for a sync pipeline that does not emit any items.
type Pipeline ¶ added in v0.3.0
Pipeline is a function that takes a context and a stream and returns a stream of the same type or a different type.
func Batch ¶ added in v0.1.0
func Batch[T any](n int, opt ...BatchOption) Pipeline[T, []T]
Batch returns a Pipeline that batches items from the input Stream into slices of n items. If the batch is not full after maxWait, it will be sent anyway. Any error in the input Stream will be propagated to the output Stream immediately.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
b := Batch[int](2)
p := Pipe(in, b)
for item := range p(ctx, nil) {
fmt.Printf("%v\n", item)
}
Output: [1 2] [3 4] [5]
func Filter ¶
Filter returns a pipeline that filters the input stream using the given function.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
even := Filter(func(ctx context.Context, n int) bool {
return n%2 == 0
})
p := Pipe(in, even)
s := p(ctx, nil)
for item := range s {
fmt.Println(item)
}
Output: 2 4
func FilterMap ¶ added in v0.5.0
func FilterMap[T, U any](f func(context.Context, T) (bool, U), opt ...FilterMapOption) Pipeline[T, U]
FilterMap returns a pipeline that filters and maps items from the input stream.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
// Filter even numbers and multiply by 10
filterMapEvenAndMultiply := FilterMap(func(ctx context.Context, n int) (bool, int) {
if n%2 == 0 {
return true, n * 10
}
return false, 0
})
p := Pipe(in, filterMapEvenAndMultiply)
s := p(ctx, nil)
for item := range s {
fmt.Println(item)
}
Output: 20 40
func FilterMapValues ¶ added in v0.5.0
func Flatten ¶ added in v0.2.0
Flatten returns a Pipeline that flattens a Stream of slices into a Stream of individual items.
Example ¶
ctx := context.Background()
in := Of([]int{1, 2}, []int{3, 4}, []int{5})
f := Flatten[int]()
p := Pipe(in, f)
for item := range p(ctx, nil) {
fmt.Printf("%v\n", item)
}
Output: 1 2 3 4 5
func ForEachOutput ¶ added in v0.5.0
func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U), opt ...ForEachOutputOption) Pipeline[T, U]
ForEachOutput returns a pipeline that applies a function to each item from the input stream. The function can write directly to the output channel. The output channel should not be closed by the function, since the output stream will be closed when the input stream is closed or the context is done. ForEachOutput panics if invalid options are provided.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
f := func(ctx context.Context, n int, out chan<- int) {
out <- n * 2
}
p := Pipe(in, ForEachOutput(f))
s := p(ctx, nil)
for n := range s {
fmt.Println(n)
}
Output: 2 4 6 8 10
func Map ¶
Map returns a pipeline that applies a function to each item from the input stream.
Example ¶
ctx := context.Background()
in := Of(1, 2, 3, 4, 5)
double := Map(func(ctx context.Context, n int) int {
return n * 2
})
p := Pipe(in, double)
s := p(ctx, nil)
for n := range s {
fmt.Println(n)
}
Output: 2 4 6 8 10
func Pipe ¶
Pipe pipes two pipelines together. It is a convenience function that calls Pipe2.
Example ¶
ctx := context.Background()
a := Of(1, 2, 3, 4, 5)
p := Pipe(a, Map(func(ctx context.Context, n int) int {
return n + 1
}))
s := p(ctx, nil)
for item := range s {
fmt.Println(item)
}
Output: 2 3 4 5 6
type Stream ¶
type Stream[T any] <-chan T
Stream represents a data stream of items. It is a read only channel of type T.
type Sync ¶
Sync is a pipeline that processes items of type T and does not emit any items.
func Connect ¶
Example ¶
ctx := context.Background()
g := Of("Hello", "Hello", "Hello")
capitalize := Map(func(ctx context.Context, i string) string {
return strings.ToUpper(i)
})
lowercase := Map(func(ctx context.Context, i string) string {
return strings.ToLower(i)
})
resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) {
resA = append(resA, i)
})
resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) {
resB = append(resB, i)
})
p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)
<-Pipe(g, Connect(p1, p2))(ctx, nil)
for _, s := range resA {
fmt.Println(s)
}
for _, s := range resB {
fmt.Println(s)
}
Output: HELLO HELLO HELLO hello hello hello
func Do ¶
Do returns a sync pipeline that applies the given function to each item in the stream. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.
Example ¶
ctx := context.Background()
g := Of(1, 2, 3, 4, 5)
d := Do(func(ctx context.Context, i int) {
fmt.Println(i)
})
<-Pipe(g, d)(ctx, nil)
Output: 1 2 3 4 5
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
aws
|
|
|
examples
|
|
|
basic
command
|
|
|
errorHandling/basic
command
|
|
|
errorHandling/dedicatedPipeline
command
|
|
|
errorHandling/multiplePipeline
command
|
|
|
options
command
|
|
|
readCSV
command
|
|