conduit

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

README

conduit

Go Reference Go Report Card Test codecov

conduit provides utilities for building composable, channel-based data streams and pipelines in Go.

Inspired by patterns from Concurrency in Go, by Katherine Cox-Buday, conduit offers generic combinators and helpers for constructing, transforming, and consuming streams of data. All functions are context-aware and designed to prevent goroutine leaks.

Features

  • Create streams: From, FromSeq, FromSeq2, Repeat
  • Transform/filter: Map, Skip, SkipN, Take, First
  • Combine/split: FanIn, FanOut, Tee, Bridge, ChanChan
  • Safe consumption: OrDone

Example

package main

import (
    "context"
    "fmt"
    "math/rand/v2"
    "time"

    "github.com/bartventer/conduit"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    // Generate an infinite stream of random numbers
    randStream := conduit.Repeat(ctx, func(context.Context) int {
        return rand.IntN(100)
    })

    // Filter out even numbers
    odds := conduit.Skip(ctx, randStream, func(_ context.Context, v int) bool {
        return v%2 == 0
    })

    // Map to string with formatting
    formatted := conduit.Map(ctx, odds, func(_ context.Context, v int) string {
        return fmt.Sprintf("odd: %d", v)
    })

    // Duplicate the stream
    out1, out2 := conduit.Tee(ctx, formatted)

    // Fan-in the two streams back together
    merged := conduit.FanIn(ctx, out1, out2)

    // Take the first 10 values from the merged stream
    for v := range conduit.Take(ctx, merged, 10) {
        fmt.Println(v)
    }
}

Documentation

See GoDoc for full API documentation and examples.

Installation

go get github.com/bartventer/conduit

License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.


Contributions and feedback welcome!

Documentation

Overview

Package conduit provides utilities for building composable, channel-based data streams and pipelines in Go.

Conduit offers a set of generic combinators and helpers for constructing, transforming, and consuming streams of data. The API is inspired by patterns from "Concurrency in Go", by Katherine Cox-Buday, and uses Go generics and context-based cancellation throughout.

Features include:

All functions are context-aware and designed to prevent goroutine leaks.

Example (Pipeline)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// Generate an infinite stream of random numbers
randStream := conduit.Repeat(ctx, genInt)

// Filter out even numbers
odds := conduit.Skip(ctx, randStream, func(_ context.Context, v int) bool {
	return v%2 == 0
})

// Map to string with formatting
formatted := conduit.Map(ctx, odds, func(_ context.Context, v int) string {
	return fmt.Sprintf("odd: %d", v)
})

// Duplicate the stream
out1, out2 := conduit.Tee(ctx, formatted)

// Fan-in the two streams back together
merged := conduit.FanIn(ctx, out1, out2)

// Take the first 10 values from the merged stream
for v := range conduit.Take(ctx, merged, 10) {
	fmt.Println(v)
}
Output:

odd: 5
odd: 13
odd: 5
odd: 13
odd: 49
odd: 49
odd: 65
odd: 65
odd: 45
odd: 45

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Bridge

func Bridge[T any](ctx context.Context, chStream <-chan <-chan T) <-chan T

Bridge returns a channel that emits values from a stream of inner channels in order. See ChanChan for creating a channel of channels.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	chStream := conduit.ChanChan(ctx, 3, func(_ context.Context, i uint) int {
		return int(i + 1)
	})
	out := conduit.Bridge(ctx, chStream)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

1
2
3

func ChanChan

func ChanChan[T any](ctx context.Context, n uint, fn func(ctx context.Context, index uint) T) <-chan <-chan T

ChanChan returns a channel that emits n inner channels, each receiving a value from fn(ctx, index) where index ranges from 0 to n-1. See Bridge for consuming a channel of channels.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	chStream := conduit.ChanChan(ctx, 2, func(_ context.Context, i uint) int {
		return int(i * 100)
	})
	for ch := range chStream {
		for v := range ch {
			fmt.Println(v)
		}
	}
}
Output:

0
100

func FanIn

func FanIn[T any](ctx context.Context, streams ...<-chan T) <-chan T

FanIn returns a channel that emits values from multiple input channels. The order of values is not preserved. See FanOut for creating multiple input channels. To preserve order, use Bridge with ChanChan.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	streams := conduit.FanOut(ctx, 3, func(c context.Context, index uint) <-chan int {
		return conduit.Take(c, conduit.Repeat(c, func(ctx context.Context) int {
			return int(index * 10)
		}), 3)
	})
	out := conduit.FanIn(ctx, streams...)
	for v := range conduit.Take(ctx, out, 9) {
		fmt.Println(v)
	}
}
Output:

0
0
0
10
10
10
20
20
20

func FanOut

func FanOut[T any](ctx context.Context, n uint, fn func(c context.Context, index uint) <-chan T) []<-chan T

FanOut returns a slice of channels where each channel is produced by calling fn(ctx, index) for each index from 0 to n-1. See FanIn for consuming a slice of channels.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	streams := conduit.FanOut(ctx, 2, func(_ context.Context, i uint) <-chan int {
		return conduit.From(ctx, int(i), int(i+10))
	})
	for i, ch := range streams {
		fmt.Printf("stream %d:\n", i)
		for v := range ch {
			fmt.Println(v)
		}
	}
}
Output:

stream 0:
0
10
stream 1:
1
11

func First

func First[T any](ctx context.Context, stream <-chan T) <-chan T

First returns a channel that emits the first value from the input stream. If the stream is empty or the context is canceled, the channel will be closed without emitting any values.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 10, 20, 30)
	out := conduit.First(ctx, stream)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

10

func From

func From[T any](ctx context.Context, values ...T) <-chan T

From returns a channel that emits the provided values.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	out := conduit.From(ctx, 7, 8, 9)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

7
8
9

func FromSeq

func FromSeq[V any](ctx context.Context, seq iter.Seq[V]) <-chan V

FromSeq returns a channel that emits values from the provided sequence.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	out := conduit.FromSeq(ctx, slices.Values([]int{4, 5, 6}))
	for v := range out {
		fmt.Println(v)
	}
}
Output:

4
5
6

func FromSeq2

func FromSeq2[K, V any](ctx context.Context, seq iter.Seq2[K, V]) <-chan V

FromSeq2 returns a channel that emits values from the key-value pairs in the provided sequence.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	out := conduit.FromSeq2(ctx, slices.All([]int{1, 2, 3}))
	for v := range out {
		fmt.Println(v)
	}
}
Output:

1
2
3

func Map

func Map[T, U any](ctx context.Context, stream <-chan T, fn func(context.Context, T) U) <-chan U

Map returns a channel that emits the results of applying fn to each value from the input stream.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3)
	out := conduit.Map(ctx, stream, func(_ context.Context, v int) string {
		return fmt.Sprintf("num:%d", v)
	})
	for v := range out {
		fmt.Println(v)
	}
}
Output:

num:1
num:2
num:3

func OrDone

func OrDone[T any](ctx context.Context, stream <-chan T) <-chan T

OrDone returns a channel that emits values from the input stream until it is closed or the context is canceled. Useful for preventing goroutine leaks when consuming from possibly-blocking or shared channels.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3)
	out := conduit.OrDone(ctx, stream)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

1
2
3

func Repeat

func Repeat[T any](ctx context.Context, fn func(context.Context) T) <-chan T

Repeat returns a channel that emits values by repeatedly calling fn(ctx) until the context is canceled. Useful for generating infinite streams.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	out := conduit.Take(ctx, conduit.Repeat(ctx, func(context.Context) int { return 42 }), 3)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

42
42
42

func Skip

func Skip[T any](ctx context.Context, stream <-chan T, eq func(context.Context, T) bool) <-chan T

Skip returns a channel that emits values from the input stream for which eq(ctx, v) returns false. If eq is nil, the input stream is returned unchanged.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3, 4, 5)
	out := conduit.Skip(ctx, stream, func(_ context.Context, v int) bool {
		return v%2 == 0
	})
	for v := range out {
		fmt.Println(v)
	}
}
Output:

1
3
5

func SkipN

func SkipN[T any](ctx context.Context, stream <-chan T, n uint) <-chan T

SkipN returns a channel that skips the first n values from the input stream, then emits the rest. If n is 0, the input stream is returned unchanged.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3, 4, 5)
	out := conduit.SkipN(ctx, stream, 2)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

3
4
5

func Take

func Take[T any](ctx context.Context, stream <-chan T, n uint) <-chan T

Take returns a channel that emits the first n values from the input stream, or fewer if the stream closes or the context is canceled.

Example
package main

import (
	"context"
	"fmt"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3, 4, 5)
	out := conduit.Take(ctx, stream, 3)
	for v := range out {
		fmt.Println(v)
	}
}
Output:

1
2
3

func Tee

func Tee[T any](ctx context.Context, stream <-chan T) (_, _ <-chan T)

Tee returns two channels that each emit the same values as the input stream.

Example
package main

import (
	"context"
	"fmt"
	"sync"

	"github.com/bartventer/conduit"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	stream := conduit.From(ctx, 1, 2, 3)
	out1, out2 := conduit.Tee(ctx, stream)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		for v1 := range out1 {
			fmt.Printf("out1: %d\n", v1)
		}
	}()

	go func() {
		defer wg.Done()
		for v2 := range out2 {
			fmt.Printf("out2: %d\n", v2)
		}
	}()
	wg.Wait()
}
Output:

out1: 1
out1: 2
out1: 3
out2: 1
out2: 2
out2: 3

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL