Documentation
¶
Overview ¶
Package conduit provides utilities for building composable, channel-based data streams and pipelines in Go.
Conduit offers a set of generic combinators and helpers for constructing, transforming, and consuming streams of data. The API is inspired by patterns from "Concurrency in Go", by Katherine Cox-Buday, and uses Go generics and context-based cancellation throughout.
Features include:
- Creating streams from values, sequences, or generators (From, FromSeq, FromSeq2, Repeat)
- Transforming and filtering streams (Map, Skip, SkipN, Take, First)
- Combining and splitting streams (FanIn, FanOut, Tee, Bridge, ChanChan)
- Safe consumption (OrDone)
All functions are context-aware and designed to prevent goroutine leaks.
Example (Pipeline) ¶
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Generate an infinite stream of random numbers
randStream := conduit.Repeat(ctx, genInt)
// Filter out even numbers
odds := conduit.Skip(ctx, randStream, func(_ context.Context, v int) bool {
return v%2 == 0
})
// Map to string with formatting
formatted := conduit.Map(ctx, odds, func(_ context.Context, v int) string {
return fmt.Sprintf("odd: %d", v)
})
// Duplicate the stream
out1, out2 := conduit.Tee(ctx, formatted)
// Fan-in the two streams back together
merged := conduit.FanIn(ctx, out1, out2)
// Take the first 10 values from the merged stream
for v := range conduit.Take(ctx, merged, 10) {
fmt.Println(v)
}
Output: odd: 5 odd: 13 odd: 5 odd: 13 odd: 49 odd: 49 odd: 65 odd: 65 odd: 45 odd: 45
Index ¶
- func Bridge[T any](ctx context.Context, chStream <-chan <-chan T) <-chan T
- func ChanChan[T any](ctx context.Context, n uint, fn func(ctx context.Context, index uint) T) <-chan <-chan T
- func FanIn[T any](ctx context.Context, streams ...<-chan T) <-chan T
- func FanOut[T any](ctx context.Context, n uint, fn func(c context.Context, index uint) <-chan T) []<-chan T
- func First[T any](ctx context.Context, stream <-chan T) <-chan T
- func From[T any](ctx context.Context, values ...T) <-chan T
- func FromSeq[V any](ctx context.Context, seq iter.Seq[V]) <-chan V
- func FromSeq2[K, V any](ctx context.Context, seq iter.Seq2[K, V]) <-chan V
- func Map[T, U any](ctx context.Context, stream <-chan T, fn func(context.Context, T) U) <-chan U
- func OrDone[T any](ctx context.Context, stream <-chan T) <-chan T
- func Repeat[T any](ctx context.Context, fn func(context.Context) T) <-chan T
- func Skip[T any](ctx context.Context, stream <-chan T, eq func(context.Context, T) bool) <-chan T
- func SkipN[T any](ctx context.Context, stream <-chan T, n uint) <-chan T
- func Take[T any](ctx context.Context, stream <-chan T, n uint) <-chan T
- func Tee[T any](ctx context.Context, stream <-chan T) (_, _ <-chan T)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Bridge ¶
Bridge returns a channel that emits values from a stream of inner channels in order. See ChanChan for creating a channel of channels.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
chStream := conduit.ChanChan(ctx, 3, func(_ context.Context, i uint) int {
return int(i + 1)
})
out := conduit.Bridge(ctx, chStream)
for v := range out {
fmt.Println(v)
}
}
Output: 1 2 3
func ChanChan ¶
func ChanChan[T any](ctx context.Context, n uint, fn func(ctx context.Context, index uint) T) <-chan <-chan T
ChanChan returns a channel that emits n inner channels, each receiving a value from fn(ctx, index) where index ranges from 0 to n-1. See Bridge for consuming a channel of channels.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
chStream := conduit.ChanChan(ctx, 2, func(_ context.Context, i uint) int {
return int(i * 100)
})
for ch := range chStream {
for v := range ch {
fmt.Println(v)
}
}
}
Output: 0 100
func FanIn ¶
FanIn returns a channel that emits values from multiple input channels. The order of values is not preserved. See FanOut for creating multiple input channels. To preserve order, use Bridge with ChanChan.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streams := conduit.FanOut(ctx, 3, func(c context.Context, index uint) <-chan int {
return conduit.Take(c, conduit.Repeat(c, func(ctx context.Context) int {
return int(index * 10)
}), 3)
})
out := conduit.FanIn(ctx, streams...)
for v := range conduit.Take(ctx, out, 9) {
fmt.Println(v)
}
}
Output: 0 0 0 10 10 10 20 20 20
func FanOut ¶
func FanOut[T any](ctx context.Context, n uint, fn func(c context.Context, index uint) <-chan T) []<-chan T
FanOut returns a slice of channels where each channel is produced by calling fn(ctx, index) for each index from 0 to n-1. See FanIn for consuming a slice of channels.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streams := conduit.FanOut(ctx, 2, func(_ context.Context, i uint) <-chan int {
return conduit.From(ctx, int(i), int(i+10))
})
for i, ch := range streams {
fmt.Printf("stream %d:\n", i)
for v := range ch {
fmt.Println(v)
}
}
}
Output: stream 0: 0 10 stream 1: 1 11
func First ¶
First returns a channel that emits the first value from the input stream. If the stream is empty or the context is canceled, the channel will be closed without emitting any values.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 10, 20, 30)
out := conduit.First(ctx, stream)
for v := range out {
fmt.Println(v)
}
}
Output: 10
func From ¶
From returns a channel that emits the provided values.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := conduit.From(ctx, 7, 8, 9)
for v := range out {
fmt.Println(v)
}
}
Output: 7 8 9
func FromSeq ¶
FromSeq returns a channel that emits values from the provided sequence.
Example ¶
package main
import (
"context"
"fmt"
"slices"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := conduit.FromSeq(ctx, slices.Values([]int{4, 5, 6}))
for v := range out {
fmt.Println(v)
}
}
Output: 4 5 6
func FromSeq2 ¶
FromSeq2 returns a channel that emits values from the key-value pairs in the provided sequence.
Example ¶
package main
import (
"context"
"fmt"
"slices"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := conduit.FromSeq2(ctx, slices.All([]int{1, 2, 3}))
for v := range out {
fmt.Println(v)
}
}
Output: 1 2 3
func Map ¶
Map returns a channel that emits the results of applying fn to each value from the input stream.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3)
out := conduit.Map(ctx, stream, func(_ context.Context, v int) string {
return fmt.Sprintf("num:%d", v)
})
for v := range out {
fmt.Println(v)
}
}
Output: num:1 num:2 num:3
func OrDone ¶
OrDone returns a channel that emits values from the input stream until it is closed or the context is canceled. Useful for preventing goroutine leaks when consuming from possibly-blocking or shared channels.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3)
out := conduit.OrDone(ctx, stream)
for v := range out {
fmt.Println(v)
}
}
Output: 1 2 3
func Repeat ¶
Repeat returns a channel that emits values by repeatedly calling fn(ctx) until the context is canceled. Useful for generating infinite streams.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out := conduit.Take(ctx, conduit.Repeat(ctx, func(context.Context) int { return 42 }), 3)
for v := range out {
fmt.Println(v)
}
}
Output: 42 42 42
func Skip ¶
Skip returns a channel that emits values from the input stream for which eq(ctx, v) returns false. If eq is nil, the input stream is returned unchanged.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3, 4, 5)
out := conduit.Skip(ctx, stream, func(_ context.Context, v int) bool {
return v%2 == 0
})
for v := range out {
fmt.Println(v)
}
}
Output: 1 3 5
func SkipN ¶
SkipN returns a channel that skips the first n values from the input stream, then emits the rest. If n is 0, the input stream is returned unchanged.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3, 4, 5)
out := conduit.SkipN(ctx, stream, 2)
for v := range out {
fmt.Println(v)
}
}
Output: 3 4 5
func Take ¶
Take returns a channel that emits the first n values from the input stream, or fewer if the stream closes or the context is canceled.
Example ¶
package main
import (
"context"
"fmt"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3, 4, 5)
out := conduit.Take(ctx, stream, 3)
for v := range out {
fmt.Println(v)
}
}
Output: 1 2 3
func Tee ¶
Tee returns two channels that each emit the same values as the input stream.
Example ¶
package main
import (
"context"
"fmt"
"sync"
"github.com/bartventer/conduit"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream := conduit.From(ctx, 1, 2, 3)
out1, out2 := conduit.Tee(ctx, stream)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for v1 := range out1 {
fmt.Printf("out1: %d\n", v1)
}
}()
go func() {
defer wg.Done()
for v2 := range out2 {
fmt.Printf("out2: %d\n", v2)
}
}()
wg.Wait()
}
Output: out1: 1 out1: 2 out1: 3 out2: 1 out2: 2 out2: 3
Types ¶
This section is empty.