rivo

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2025 License: MIT Imports: 6 Imported by: 0

README

rivo

Go Reference

rivo is a concurrent stream processing library for Go that provides type safety through generics and a composable pipeline architecture.

NOTE: THIS LIBRARY IS STILL IN ACTIVE DEVELOPMENT AND IS NOT YET PRODUCTION READY.

About

rivo has two major inspirations:

  1. The book "Concurrency in Go";
  2. ReactiveX, in particular the Go and JS libraries;

Compared to these sources, rivo aims to provide better type safety (both "Concurrency in Go" and RxGo were written in a pre-generics era and make heavy use of interface{}) and a more intuitive API and developer experience (Rx is very powerful, but can be overwhelming for newcomers).

Getting started

Prerequisites

rivo requires Go 1.23 or later.

Installation
  go get github.com/agiac/rivo
Basic concepts

rivo has several main types, which are the building blocks of the library: Stream, Pipeline, Generator, Sync, and Item.

Stream represents a data stream. It is a read-only channel of type T.

type Stream[T any] <-chan T

Pipeline is a function that takes a context.Context and a Stream of one type and returns a Stream of the same or a different type. They represent the operations that can be performed on streams. Pipelines can be composed together to create more complex operations.

type Pipeline[T, U any] func(ctx context.Context, stream Stream[T]) Stream[U]

For convenience, rivo also provides type aliases for common pipeline patterns:

// Generator is a pipeline that generates items of type T without any input
type Generator[T any] = Pipeline[None, T]

// Sync is a pipeline that processes items of type T and does not emit any items
type Sync[T any] = Pipeline[T, None]

Item is a struct that contains a value and an optional error. It's used when you need error handling in your streams:

type Item[T any] struct {
	Val T
	Err error
}

Most basic operations work with plain values, but when you need error handling, you can use Item[T] and the corresponding pipelines that support error propagation.

If a pipeline generates values without depending on an input stream, it is called a generator. If it consumes values without generating a new stream, it is called a sink. If it transforms values, it is called a transformer.

Here's a basic example:

package main

import (
	"context"
	"fmt"
	"github.com/agiac/rivo"
)

func main() {
	ctx := context.Background()

	// `Of` returns a generator that returns a stream that will emit the provided values
	in := rivo.Of(1, 2, 3, 4, 5)

	// `Filter` returns a pipeline that filters the input stream using the given function
	onlyEven := rivo.Filter(func(ctx context.Context, n int) bool {
		return n%2 == 0
	})

    // `Do` returns a pipeline that applies the given function to each item in the input stream without emitting any values
	log := rivo.Do(func(ctx context.Context, n int) {
		fmt.Println(n)
	})

	// `Pipe` composes pipelines together, returning a new pipeline
	p := rivo.Pipe3(in, onlyEven, log)

	// By passing a context and an input channel to our pipeline, we can get the output stream.
	// Since our first pipeline `in` is a generator and does not depend on an input stream, we can pass a nil channel.
	// Also, since `log` is a sink, we only have to read once from the output channel to know that the pipeline has finished.
	<-p(ctx, nil)

	// Expected output:
	// 2
	// 4
}

For error handling scenarios, you can use Item[T] as your data type to carry both values and errors:

package main

import (
	"context"
	"fmt"
	"strconv"
	"github.com/agiac/rivo"
)

func main() {
	ctx := context.Background()

	// Create a generator with string values
	g := rivo.Of("1", "2", "invalid", "4", "5")

	// Transform string to Item[int] with error handling
	toInt := rivo.Map(func(ctx context.Context, s string) rivo.Item[int] {
		n, err := strconv.Atoi(s)
		if err != nil {
			return rivo.Item[int]{Err: err} // Return an item with the error
		}
		return rivo.Item[int]{Val: n} // Return an item with the value
	})

	// Process the items, handling both values and errors
	handleResults := rivo.Do(func(ctx context.Context, i rivo.Item[int]) {
		if i.Err != nil {
			fmt.Printf("ERROR: %v\n", i.Err)
		} else {
			fmt.Printf("Value: %d\n", i.Val)
		}
	})

	p := rivo.Pipe3(g, toInt, handleResults)
	<-p(ctx, nil)
	
	// Output:
	// Value: 1
	// Value: 2
	// ERROR: strconv.Atoi: parsing "invalid": invalid syntax
	// Value: 4
	// Value: 5
}

Pipeline factories

rivo comes with a set of built-in pipeline factories.

Generators
  • Of: returns a generator pipeline that emits the provided values;
  • FromFunc: returns a generator pipeline that emits values returned by the provided function until the function returns false;
  • FromSeq and FromSeq2: return generator pipelines that emit the values from the provided iterators;
  • Tee and TeeN: return N generator pipelines that each receive a copy of each item from the input stream;
  • Segregate: returns two generator pipelines, where the first pipeline emits items that pass the predicate, and the second pipeline emits items that do not pass the predicate;
Sinks
  • Do: returns a sink pipeline that performs a side effect for each item in the input stream;
  • Connect: returns a sink pipeline that applies the given sink pipelines to the input stream concurrently;
Transformers
  • Filter: returns a transformer pipeline that filters the input stream using the given function;
  • Map: returns a transformer pipeline that applies a function to each item from the input stream;
  • FilterMap: returns a transformer pipeline that filters and maps items from the input stream in a single operation;
  • Batch: returns a transformer pipeline that groups the input stream into batches of the provided size;
  • Flatten: returns a transformer pipeline that flattens the input stream of slices;
  • ForEachOutput: returns a transformer pipeline that applies a function to each item, allowing direct output channel access;
  • Pipe, Pipe2, Pipe3, Pipe4, Pipe5: return transformer pipelines that compose the provided pipelines together;

Besides these, the library's subdirectories contain more specialized pipeline factories.

Package rivo/io
  • FromReader: returns a generator pipeline that reads from the provided io.Reader and emits the read bytes;
  • ToWriter: returns a sink pipeline that writes the input stream to the provided io.Writer;
Package rivo/bufio
  • FromScanner: returns a generator pipeline that reads from the provided bufio.Scanner and emits the scanned items;
  • ToWriter: returns a sink pipeline that writes the input stream to the provided bufio.Writer;
Package rivo/csv
  • FromReader: returns a generator pipeline that reads from the provided csv.Reader and emits the read records;
  • ToWriter: returns a sink pipeline that writes the input stream to the provided csv.Writer;
Package rivo/aws/dynamodb
  • Scan: returns a generator pipeline that scans the provided DynamoDB table and emits the scan output responses;
  • ScanItems: returns a generator pipeline that scans the provided DynamoDB table and emits the items from the scan output responses;
  • BatchWrite: returns a transformer pipeline that writes the input stream to the provided DynamoDB table using the BatchWriteItem API;
  • BatchPutItems: returns a transformer pipeline that writes the input stream to the provided DynamoDB table using the BatchWriteItem API, but only for PutItem operations;

Configuration Options

Many pipelines support configuration options to customize their behavior:

  • Pool Size: Control the number of concurrent goroutines (e.g., MapPoolSize, FilterPoolSize, DoPoolSize)
  • Buffer Size: Control the internal channel buffer size (e.g., MapBufferSize, BatchBufferSize)
  • Time-based Options: Control time-based behavior (e.g., BatchMaxWait)
  • Lifecycle Hooks: Add hooks for cleanup or finalization (e.g., FromFuncOnBeforeClose)

Example usage:

// Map with custom pool size and buffer size
mapper := rivo.Map(transformFunc, rivo.MapPoolSize(5), rivo.MapBufferSize(100))

// Batch with time-based batching
batcher := rivo.Batch(10, rivo.BatchMaxWait(100*time.Millisecond))

Utilities

rivo provides several utility functions to work with streams:

  • Collect: collects all items from a stream into a slice
  • CollectWithContext: like Collect but respects context cancellation
  • OrDone: utility function that propagates context cancellation to streams
  • FilterMapValues: extracts only successful values from Item streams
  • FilterMapErrors: extracts only errors from Item streams

Error handling

When you need error handling in your streams, you can use the Item[T] type to carry both values and errors through your pipelines. This allows you to handle errors at any point in the pipeline without stopping the entire stream.

The library provides several utilities for working with error-carrying streams:

  • FilterMapValues: extracts only successful values from Item streams, filtering out errors
  • FilterMapErrors: extracts only errors from Item streams, filtering out successful values
  • Segregate: splits any stream based on a predicate function

See examples/errorHandling for comprehensive examples of different error handling patterns.

Examples

More examples can be found in the examples folder.


Contributing

Contributions are welcome! If you have any ideas, suggestions or bug reports, please open an issue or a pull request.

Roadmap

  • Review docs, in particular where "pipeline" is used instead of "generator", "sink" or "transformer"
  • Add more pipelines, also using the RxJS list of operators as a reference:
    • FilterMap (combines filter and map operations)
    • ForEachOutput (direct output channel access)
    • Tap (side effects without modifying the stream)
    • Time-based operators (throttle, debounce, etc.)
    • SQL-like operators (join, group by, etc.)
    • More AWS integrations
  • Add more utilities:
    • Merge (combine multiple streams)
    • Zip (combine streams element-wise)
    • Take/Skip operators
  • Performance optimizations and benchmarking
  • Add more examples and tutorials

License

rivo is licensed under the MIT license. See the LICENSE file for details.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Collect

func Collect[T any](in Stream[T]) []T

Collect collects all items from the stream and returns them as a slice.

func CollectWithContext

func CollectWithContext[T any](ctx context.Context, in Stream[T]) []T

CollectWithContext collects all items from the stream and returns them as a slice. If the context is cancelled, it stops collecting items.

func Segregate

func Segregate[T any](ctx context.Context, in Stream[T], predicate func(T) bool) (Generator[T], Generator[T])

func SegregateStream added in v0.5.0

func SegregateStream[T any](ctx context.Context, in Stream[T], predicate func(T) bool) (Stream[T], Stream[T])

SegregateStream takes an input stream and a predicate function, and returns two streams: one containing items that satisfy the predicate and another containing items that do not.

func Tee

func Tee[T any](ctx context.Context, in Stream[T]) (Generator[T], Generator[T])

Tee returns 2 generators that each receive a copy of each item from the input stream.

func TeeStream added in v0.5.0

func TeeStream[T any](ctx context.Context, in Stream[T]) (Stream[T], Stream[T])

TeeStream returns 2 streams that each receive a copy of each item from the input stream.

Types

type BatchOption added in v0.4.0

type BatchOption func(*batchOptions) error

func BatchBufferSize added in v0.4.0

func BatchBufferSize(n int) BatchOption

func BatchMaxWait added in v0.4.0

func BatchMaxWait(d time.Duration) BatchOption

type DoOption added in v0.4.0

type DoOption func(*doOptions) error

func DoOnBeforeClose added in v0.5.0

func DoOnBeforeClose(fn func(context.Context)) DoOption

func DoPoolSize added in v0.4.0

func DoPoolSize(n int) DoOption

type FilterMapOption added in v0.5.0

type FilterMapOption func(*filterMapOptions) error

func FilterMapBufferSize added in v0.5.0

func FilterMapBufferSize(n int) FilterMapOption

func FilterMapPoolSize added in v0.5.0

func FilterMapPoolSize(n int) FilterMapOption

type FilterOption added in v0.4.0

type FilterOption func(*filterOptions) error

func FilterBufferSize added in v0.4.0

func FilterBufferSize(n int) FilterOption

func FilterPoolSize added in v0.4.0

func FilterPoolSize(n int) FilterOption

type ForEachOutputOption added in v0.5.0

type ForEachOutputOption func(*forEachOutputOptions) error

func ForEachOutputBufferSize added in v0.5.0

func ForEachOutputBufferSize(bufferSize int) ForEachOutputOption

func ForEachOutputOnBeforeClose added in v0.5.0

func ForEachOutputOnBeforeClose(f func(context.Context)) ForEachOutputOption

func ForEachOutputPoolSize added in v0.5.0

func ForEachOutputPoolSize(poolSize int) ForEachOutputOption

type FromFuncOption added in v0.4.0

type FromFuncOption func(*fromFuncOptions) error

func FromFuncBufferSize added in v0.4.0

func FromFuncBufferSize(bufferSize int) FromFuncOption

func FromFuncOnBeforeClose added in v0.4.0

func FromFuncOnBeforeClose(f func(context.Context)) FromFuncOption

func FromFuncPoolSize added in v0.4.0

func FromFuncPoolSize(poolSize int) FromFuncOption

type FromSeq2Value

type FromSeq2Value[T, U any] struct {
	Val1 T
	Val2 U
}

type Generator

type Generator[T any] = Pipeline[None, T]

Generator is a pipeline that generates items of type T without any input.

func FromFunc

func FromFunc[T any](f func(context.Context) (T, bool), options ...FromFuncOption) Generator[T]

FromFunc returns a Generator that emits items generated by the given function. The returned stream will emit items until the function returns false in the second return value.

Example
ctx := context.Background()

count := atomic.Int32{}

genFn := func(ctx context.Context) (int32, bool) {
	value := count.Add(1)

	if value > 5 {
		return 0, false
	}

	return value, true
}

in := FromFunc(genFn)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

func FromSeq

func FromSeq[T any](seq iter.Seq[T]) Generator[T]
Example
ctx := context.Background()

seq := slices.Values([]int{1, 2, 3, 4, 5})
in := FromSeq(seq)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

func FromSeq2

func FromSeq2[T, U any](seq iter.Seq2[T, U]) Generator[FromSeq2Value[T, U]]
Example
ctx := context.Background()

seq := slices.All([]string{"a", "b", "c", "d", "e"})

in := FromSeq2(seq)

s := in(ctx, nil)

for item := range s {
	fmt.Printf("%d, %s\n", item.Val1, item.Val2)
}
Output:

0, a
1, b
2, c
3, d
4, e

func Of

func Of[T any](items ...T) Generator[T]

Of returns a Generator that emits the given items.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

s := in(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

1
2
3
4
5

func TeeN

func TeeN[T any](ctx context.Context, in Stream[T], n int) []Generator[T]

TeeN returns n generators that each receive a copy of each item from the input stream.

type Item

type Item[T any] struct {
	// Val is the value of the item when there is no error.
	Val T
	// Err is the optional error of the item.
	Err error
}

Item represents a single item in a data stream. It contains a value of type T and an optional error.

type MapOption added in v0.4.0

type MapOption func(*mapOptions) error

func MapBufferSize added in v0.4.0

func MapBufferSize(bufferSize int) MapOption

func MapPoolSize added in v0.4.0

func MapPoolSize(poolSize int) MapOption

type None

type None struct{}

None is a type that represents no value. It is typically used as the input type of generator pipeline that does not depend on any input stream or for a sync pipeline that does not emit any items.

type Pipeline added in v0.3.0

type Pipeline[T, U any] func(ctx context.Context, stream Stream[T]) Stream[U]

Pipeline is a function that takes a context and a stream and returns a stream of the same type or a different type.

func Batch added in v0.1.0

func Batch[T any](n int, opt ...BatchOption) Pipeline[T, []T]

Batch returns a Pipeline that batches items from the input Stream into slices of n items. If the batch is not full after maxWait, it will be sent anyway. Any error in the input Stream will be propagated to the output Stream immediately.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

b := Batch[int](2)

p := Pipe(in, b)

for item := range p(ctx, nil) {
	fmt.Printf("%v\n", item)
}
Output:

[1 2]
[3 4]
[5]

func Filter

func Filter[T any](f func(context.Context, T) bool, opt ...FilterOption) Pipeline[T, T]

Filter returns a pipeline that filters the input stream using the given function.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

even := Filter(func(ctx context.Context, n int) bool {
	return n%2 == 0
})

p := Pipe(in, even)

s := p(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

2
4

func FilterMap added in v0.5.0

func FilterMap[T, U any](f func(context.Context, T) (bool, U), opt ...FilterMapOption) Pipeline[T, U]

FilterMap returns a pipeline that filters and maps items from the input stream.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

// Filter even numbers and multiply by 10
filterMapEvenAndMultiply := FilterMap(func(ctx context.Context, n int) (bool, int) {
	if n%2 == 0 {
		return true, n * 10
	}
	return false, 0
})

p := Pipe(in, filterMapEvenAndMultiply)

s := p(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

20
40

func FilterMapErrors added in v0.5.0

func FilterMapErrors[T any]() Pipeline[Item[T], error]

func FilterMapValues added in v0.5.0

func FilterMapValues[T any]() Pipeline[Item[T], T]

func Flatten added in v0.2.0

func Flatten[T any]() Pipeline[[]T, T]

Flatten returns a Pipeline that flattens a Stream of slices into a Stream of individual items.

Example
ctx := context.Background()

in := Of([]int{1, 2}, []int{3, 4}, []int{5})

f := Flatten[int]()

p := Pipe(in, f)

for item := range p(ctx, nil) {
	fmt.Printf("%v\n", item)
}
Output:

1
2
3
4
5

func ForEachOutput added in v0.5.0

func ForEachOutput[T, U any](f func(ctx context.Context, val T, out chan<- U), opt ...ForEachOutputOption) Pipeline[T, U]

ForEachOutput returns a pipeline that applies a function to each item from the input stream. The function can write directly to the output channel. The output channel should not be closed by the function, since the output stream will be closed when the input stream is closed or the context is done. ForEachOutput panics if invalid options are provided.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

f := func(ctx context.Context, n int, out chan<- int) {
	out <- n * 2
}

p := Pipe(in, ForEachOutput(f))

s := p(ctx, nil)

for n := range s {
	fmt.Println(n)
}
Output:

2
4
6
8
10

func Map

func Map[T, U any](f func(context.Context, T) U, opt ...MapOption) Pipeline[T, U]

Map returns a pipeline that applies a function to each item from the input stream.

Example
ctx := context.Background()

in := Of(1, 2, 3, 4, 5)

double := Map(func(ctx context.Context, n int) int {
	return n * 2
})

p := Pipe(in, double)

s := p(ctx, nil)

for n := range s {
	fmt.Println(n)
}
Output:

2
4
6
8
10

func Pipe

func Pipe[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]

Pipe pipes two pipelines together. It is a convenience function that calls Pipe2.

Example
ctx := context.Background()

a := Of(1, 2, 3, 4, 5)

p := Pipe(a, Map(func(ctx context.Context, n int) int {
	return n + 1
}))

s := p(ctx, nil)

for item := range s {
	fmt.Println(item)
}
Output:

2
3
4
5
6

func Pipe2

func Pipe2[A, B, C any](a Pipeline[A, B], b Pipeline[B, C]) Pipeline[A, C]

Pipe2 pipes two pipelines together.

func Pipe3

func Pipe3[A, B, C, D any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D]) Pipeline[A, D]

Pipe3 pipes three pipelines together.

func Pipe4

func Pipe4[A, B, C, D, E any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E]) Pipeline[A, E]

Pipe4 pipes four pipelines together.

func Pipe5

func Pipe5[A, B, C, D, E, F any](a Pipeline[A, B], b Pipeline[B, C], c Pipeline[C, D], d Pipeline[D, E], e Pipeline[E, F]) Pipeline[A, F]

Pipe5 pipes five pipelines together.

type Stream

type Stream[T any] <-chan T

Stream represents a data stream of items. It is a read only channel of type T.

func OrDone

func OrDone[T any](ctx context.Context, in Stream[T]) Stream[T]

OrDone is a utility function that returns a channel that will be closed when the context is done.

func TeeStreamN added in v0.5.0

func TeeStreamN[T any](ctx context.Context, in Stream[T], n int) []Stream[T]

TeeStreamN returns n streams that each receive a copy of each item from the input stream.

type Sync

type Sync[T any] = Pipeline[T, None]

Sync is a pipeline that processes items of type T and does not emit any items.

func Connect

func Connect[T any](pp ...Sync[T]) Sync[T]
Example
ctx := context.Background()

g := Of("Hello", "Hello", "Hello")

capitalize := Map(func(ctx context.Context, i string) string {
	return strings.ToUpper(i)
})

lowercase := Map(func(ctx context.Context, i string) string {
	return strings.ToLower(i)
})

resA := make([]string, 0)
a := Do(func(ctx context.Context, i string) {
	resA = append(resA, i)
})

resB := make([]string, 0)
b := Do(func(ctx context.Context, i string) {
	resB = append(resB, i)
})

p1 := Pipe(capitalize, a)
p2 := Pipe(lowercase, b)

<-Pipe(g, Connect(p1, p2))(ctx, nil)

for _, s := range resA {
	fmt.Println(s)
}

for _, s := range resB {
	fmt.Println(s)
}
Output:

HELLO
HELLO
HELLO
hello
hello
hello

func Do

func Do[T any](f func(context.Context, T), opt ...DoOption) Sync[T]

Do returns a sync pipeline that applies the given function to each item in the stream. The output stream will not emit any items, and it will be closed when the input stream is closed or the context is done.

Example
ctx := context.Background()

g := Of(1, 2, 3, 4, 5)

d := Do(func(ctx context.Context, i int) {
	fmt.Println(i)
})

<-Pipe(g, d)(ctx, nil)
Output:

1
2
3
4
5

Directories

Path Synopsis
aws
examples
basic command
options command
readCSV command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL