goroutine_util

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2025 License: MulanPSL-2.0 Imports: 10 Imported by: 3

README

说明

Go 协程工具函数库

codecov

使用

go get gitee.com/ivfzhou/goroutine-util@latest
// NewRunner 提供同时最多运行 max 个协程运行 fn,一旦 fn 发生错误或恐慌便终止运行。
//
// ctx:上下文,终止时,停止所有 fn 运行,并在调用 add 或 wait 时返回 ctx.Err()。
//
// max:表示最多 max 个协程运行 fn。小于等于零表示不限制协程数。
//
// fn:为要运行的函数。入参 T 由 add 函数提供。若 fn 发生错误或恐慌,将终止所有 fn 运行。恐慌将被恢复,并以错误形式返回,在调用 add 或 wait 时返回。
//
// add:为 fn 提供 T,每一个 T 只会被一个 fn 运行一次。block 为真时,当某一时刻运行 fn 数量达到 max 时,阻塞当前协程添加 T。反之,不阻塞当前协程。
//
// wait:阻塞当前协程,等待运行完毕。fastExit 为真时,运行发生错误时,立即返回该错误。反之,表示等待所有 fn 都终止后再返回。
//
// add 函数返回的错误不为空时,与 fn 返回的第一个错误一致,且与 wait 函数返回的错误为同一个。
//
// 注意:若 fn 为空,将触发恐慌。
//
// 注意:在 add 完所有 T 后再调用 wait,否则触发恐慌,返回 ErrCallAddAfterWait。
//
// 注意:调用了 add 后,请务必调用 wait,除非 add 返回了错误。
func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) (
    add func(t T, block bool) error, wait func(fastExit bool) error)

// NewPipelineRunner 创建流水线工作模型。
//
// ctx:上下文。当上下文终止时,将终止流水线运行,push 将返回假。
//
// steps:任务 T 依次在 steps 中运行。step 返回真表示将 T 传递给下一个 step 处理,否则不传递。
//
// push:向 step 中传递一个 T 处理,返回真表示添加成功。
//
// successCh:成功运行完所有 step 的 T 将从该通道发出。
//
// endPush:表示不再有 T 需要处理,push 将返回假。
func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) (
    push func(T) bool, successCh <-chan T, endPush func())

// RunData 将 jobs 传给 fn,并发运行 fn。若发生错误,就立刻终止运行。
//
// ctx:上下文。如果上下文终止了,则终止运行,并返回 ctx.Err()。
//
// fn:要运行处理 jobs 的函数。
//
// fastExit:发生错误就立刻返回,不等待所有协程全部退出。
//
// jobs:要处理的任务。
//
// 注意:fn 为空将触发恐慌。
func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, jobs ...T) error

// RunPipeline 将每个 jobs 依次递给 steps 函数处理。一旦某个 step 发生错误或恐慌,就结束处理,返回错误。
// 一个 job 最多在一个 step 中运行一次,且一个 job 一定是依次序递给 steps,前一个 step 处理完毕才会给下一个 step 处理。
// 每个 step 并发运行 jobs。
//
// ctx:上下文,上下文终止时,将终止所有 steps 运行,并在 errCh 中返回 ctx.Err()。
//
// stopWhenErr:当 step 发生错误时,是否继续处理 job。当为假时,只会停止将 job 往下一个 step 传递,不会终止运行。
//
// steps:处理 jobs 的函数。返回错误意味着终止运行。
//
// successCh:经所有 steps 处理成功 job 将从该通道发出。
//
// errCh:运行中的错误从该通道发出。
//
// 注意:若 steps 中含有空元素将会恐慌。
func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func(context.Context, T) error) (
    successCh <-chan T, errCh <-chan error)

// RunPeriodically 运行 fn,每个 fn 之间至少间隔 period 时间。前一个 fn 运行完毕到下一个 fn 开始运行之间的间隔时间。
//
// period:每个 fn 运行至少间隔时间。
//
// 注意:若 period 为负数将会触发恐慌。
func RunPeriodically(period time.Duration) (run func(fn func()))

联系作者

电邮:[email protected]
微信:h899123

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrCallAddAfterWait = errors.New("cannot call add after wait")

ErrCallAddAfterWait 在 NewRunner 中,如果调用 wait 后,再调用 add 时,将会返回。

View Source
var RunPipelineCacheJobs = 10

Functions

func ListenChan

func ListenChan[T any](chans ...<-chan T) (ch <-chan T)

ListenChan 监听 chans,一旦有一个 chan 激活便立即将 T 发送给 sendCh,并关闭 sendCh。 若所有 chans 都未曾激活(chan 是空和关闭认为未激活),则 sendCh 被关闭。 若同时多个 chan 被激活,则随机将一个激活值发送给 sendCh。

chans:要监听的通道。

sendCh:接收到的 T 从该通道发出。

func NewPipelineRunner

func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) (
	push func(T) bool, successCh <-chan T, endPush func())

NewPipelineRunner 创建流水线工作模型。

ctx:上下文。当上下文终止时,将终止流水线运行,push 将返回假。

steps:任务 T 依次在 steps 中运行。step 返回真表示将 T 传递给下一个 step 处理,否则不传递。

push:向 step 中传递一个 T 处理,返回真表示添加成功。

successCh:成功运行完所有 step 的 T 将从该通道发出。

endPush:表示不再有 T 需要处理,push 将返回假。

Example
package main

import (
	"context"

	gu "gitee.com/ivfzhou/goroutine-util"
)

func main() {
	type job struct{}
	ctx := context.Background()

	step1 := func(context.Context, *job) bool { return true }
	step2 := func(context.Context, *job) bool { return true }
	step3 := func(context.Context, *job) bool { return true }
	push, successCh, endPush := gu.NewPipelineRunner(ctx, step1, step2, step3)

	// 将任务数据推送进去处理。
	go func() {
		jobs := make([]*job, 100)
		for i := range jobs {
			push(jobs[i])
		}
		endPush() // 结束推送。
	}()

	// 获取处理成功的任务。
	for v := range successCh {
		_ = v
	}
}

func NewRunner

func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) (
	add func(t T, block bool) error, wait func(fastExit bool) error)

NewRunner 提供同时最多运行 max 个协程运行 fn,一旦 fn 发生错误或恐慌便终止运行。

ctx:上下文,终止时,停止所有 fn 运行,并在调用 add 或 wait 时返回 ctx.Err()。

max:表示最多 max 个协程运行 fn。小于等于零表示不限制协程数。

fn:为要运行的函数。入参 T 由 add 函数提供。若 fn 发生错误或恐慌,将终止所有 fn 运行。恐慌将被恢复,并以错误形式返回,在调用 add 或 wait 时返回。

add:为 fn 提供 T,每一个 T 只会被一个 fn 运行一次。block 为真时,当某一时刻运行 fn 数量达到 max 时,阻塞当前协程添加 T。反之,不阻塞当前协程。

wait:阻塞当前协程,等待运行完毕。fastExit 为真时,运行发生错误时,立即返回该错误。反之,表示等待所有 fn 都终止后再返回。

add 函数返回的错误不为空时,与 fn 返回的第一个错误一致,且与 wait 函数返回的错误为同一个。

注意:若 fn 为空,将触发恐慌。

注意:在 add 完所有 T 后再调用 wait,否则触发恐慌,返回 ErrCallAddAfterWait。

注意:调用了 add 后,请务必调用 wait,除非 add 返回了错误。

Example
package main

import (
	"context"

	gu "gitee.com/ivfzhou/goroutine-util"
)

func main() {
	type product struct{}
	ctx := context.Background()
	op := func(ctx context.Context, data *product) error {
		// 要处理的任务逻辑。
		return nil
	}

	add, wait := gu.NewRunner[*product](ctx, 12, op)

	// 将任务要处理的数据传递给任务处理逻辑 op。
	var projects []*product
	for _, v := range projects {
		if err := add(v, true); err != nil {
			// 发生错误可能会提前预知。
		}
	}

	// 等待所有数据处理完毕。
	if err := wait(true); err != nil {
		// 处理 op 返回的错误。
	}
}

func RunConcurrently

func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)

RunConcurrently 并发运行 fn,一旦有错误发生,终止运行。

ctx:上下文,终止时,将导致 fn 终止运行,并在调用 wait 时返回 ctx.Err()。

fn:要并发运行的函数。触发的恐慌将被恢复,并在调用 wait 时,以错误形式返回。

wait:等待所有 fn 运行完毕。阻塞调用者协程,若 fastExit 为真,则一旦 fn 中发生了错误,立刻返回该错误。

注意:fn 是空将触发恐慌。

Example
package main

import (
	"context"

	gu "gitee.com/ivfzhou/goroutine-util"
)

func main() {
	ctx := context.Background()

	work1 := func(ctx context.Context) error {
		// 编写你的业务逻辑。
		return nil
	}

	work2 := func(ctx context.Context) error {
		// 编写你的业务逻辑。
		return nil
	}

	err := gu.RunConcurrently(ctx, work1, work2)(true)
	if err != nil {
		// 处理发生的错误。
		return
	}
}

func RunData

func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, jobs ...T) error

RunData 将 jobs 传给 fn,并发运行 fn。若发生错误,就立刻终止运行。

ctx:上下文。如果上下文终止了,则终止运行,并返回 ctx.Err()。

fn:要运行处理 jobs 的函数。

fastExit:发生错误就立刻返回,不等待所有协程全部退出。

jobs:要处理的任务。

注意:fn 为空将触发恐慌。

func RunPeriodically

func RunPeriodically(period time.Duration) (run func(fn func()))

RunPeriodically 运行 fn,每个 fn 之间至少间隔 period 时间。前一个 fn 运行完毕到下一个 fn 开始运行之间的间隔时间。

period:每个 fn 运行至少间隔时间。

注意:若 period 为负数将会触发恐慌。

func RunPipeline

func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func(context.Context, T) error) (
	successCh <-chan T, errCh <-chan error)

RunPipeline 将每个 jobs 依次递给 steps 函数处理。一旦某个 step 发生错误或恐慌,就结束处理,返回错误。 一个 job 最多在一个 step 中运行一次,且一个 job 一定是依次序递给 steps,前一个 step 处理完毕才会给下一个 step 处理。 每个 step 并发运行 jobs。

ctx:上下文,上下文终止时,将终止所有 steps 运行,并在 errCh 中返回 ctx.Err()。

stopWhenErr:当 step 发生错误时,是否继续处理 job。当为假时,只会停止将 job 往下一个 step 传递,不会终止运行。

steps:处理 jobs 的函数。返回错误意味着终止运行。

successCh:经所有 steps 处理成功 job 将从该通道发出。

errCh:运行中的错误从该通道发出。

注意:若 steps 中含有空元素将会恐慌。

Example
package main

import (
	"context"

	gu "gitee.com/ivfzhou/goroutine-util"
)

func main() {
	type data struct{}
	ctx := context.Background()

	jobs := []*data{{}, {}}
	work1 := func(ctx context.Context, d *data) error { return nil }
	work2 := func(ctx context.Context, d *data) error { return nil }

	successCh, errCh := gu.RunPipeline(ctx, jobs, false, work1, work2)
	for {
		if successCh == nil && errCh == nil {
			break
		}
		select {
		case _, ok := <-successCh:
			if !ok {
				successCh = nil
				continue
			}
			// 处理成功的 jobPtr。
		case _, ok := <-errCh:
			if !ok {
				errCh = nil
				continue
			}
			// 处理错误。
		}
	}
}

func RunSequentially

func RunSequentially(ctx context.Context, fn ...func(context.Context) error) error

RunSequentially 依次运行 fn,当有错误发生时停止后续 fn 运行。

ctx:上下文,终止时,将导致后续 fn 不再运行,并返回 ctx.Err()。

fn:要运行的函数,可以安全的添加空 fn。发生恐慌将被恢复,并以错误形式返回。

error:返回的错误与 fn 返回的错误一致。

Example
package main

import (
	"context"

	gu "gitee.com/ivfzhou/goroutine-util"
)

func main() {
	ctx := context.Background()
	first := func(context.Context) error { return nil }
	then := func(context.Context) error { return nil }
	last := func(context.Context) error { return nil }
	err := gu.RunSequentially(ctx, first, then, last)
	if err != nil {
		// 处理错误。
	}
}

func WrapError added in v1.0.3

func WrapError(err error) error

WrapError 包裹错误信息。

Types

type AtomicError added in v1.0.2

type AtomicError struct {
	// contains filtered or unexported fields
}

AtomicError 原子性读取和设置错误信息。

func (*AtomicError) Get added in v1.0.2

func (e *AtomicError) Get() error

Get 获取内部错误信息。

func (*AtomicError) HasSet added in v1.0.3

func (e *AtomicError) HasSet() bool

HasSet 是否设置了错误信息。

func (*AtomicError) Set added in v1.0.2

func (e *AtomicError) Set(err error) bool

Set 设置错误信息,除非 err 是空。返回真表示设置成功。

type Error added in v1.0.3

type Error struct {
	// contains filtered or unexported fields
}

Error 错误对象。

func (*Error) Error added in v1.0.3

func (e *Error) Error() string

Error 接口实现。

func (*Error) String added in v1.0.3

func (e *Error) String() string

String 接口实现。

func (*Error) Unwrap added in v1.0.3

func (e *Error) Unwrap() error

Unwrap 接口实现。

type Queue

type Queue[E any] struct {
	// contains filtered or unexported fields
}

Queue 数据队列。

func (*Queue[E]) Close

func (q *Queue[E]) Close()

Close 从 GetFromChan 中获取的 chan 将被关闭。

func (*Queue[E]) GetFromChan

func (q *Queue[E]) GetFromChan() <-chan E

GetFromChan 获取队列头元素。

func (*Queue[E]) Push

func (q *Queue[E]) Push(elem E) bool

Push 向队列尾部加元素,如果队列关闭则不会加元素。

bool:添加成功返回 true。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL