Documentation
¶
Index ¶
- Variables
- func ListenChan[T any](chans ...<-chan T) (ch <-chan T)
- func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) (push func(T) bool, successCh <-chan T, endPush func())
- func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) (add func(t T, block bool) error, wait func(fastExit bool) error)
- func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)
- func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, ...) error
- func RunPeriodically(period time.Duration) (run func(fn func()))
- func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, ...) (successCh <-chan T, errCh <-chan error)
- func RunSequentially(ctx context.Context, fn ...func(context.Context) error) error
- func WrapError(err error) error
- type AtomicError
- type Error
- type Queue
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrCallAddAfterWait = errors.New("cannot call add after wait")
ErrCallAddAfterWait 在 NewRunner 中,如果调用 wait 后,再调用 add 时,将会返回。
var RunPipelineCacheJobs = 10
Functions ¶
func ListenChan ¶
func ListenChan[T any](chans ...<-chan T) (ch <-chan T)
ListenChan 监听 chans,一旦有一个 chan 激活便立即将 T 发送给 sendCh,并关闭 sendCh。 若所有 chans 都未曾激活(chan 是空和关闭认为未激活),则 sendCh 被关闭。 若同时多个 chan 被激活,则随机将一个激活值发送给 sendCh。
chans:要监听的通道。
sendCh:接收到的 T 从该通道发出。
func NewPipelineRunner ¶
func NewPipelineRunner[T any](ctx context.Context, steps ...func(context.Context, T) bool) ( push func(T) bool, successCh <-chan T, endPush func())
NewPipelineRunner 创建流水线工作模型。
ctx:上下文。当上下文终止时,将终止流水线运行,push 将返回假。
steps:任务 T 依次在 steps 中运行。step 返回真表示将 T 传递给下一个 step 处理,否则不传递。
push:向 step 中传递一个 T 处理,返回真表示添加成功。
successCh:成功运行完所有 step 的 T 将从该通道发出。
endPush:表示不再有 T 需要处理,push 将返回假。
Example ¶
package main
import (
"context"
gu "gitee.com/ivfzhou/goroutine-util"
)
func main() {
type job struct{}
ctx := context.Background()
step1 := func(context.Context, *job) bool { return true }
step2 := func(context.Context, *job) bool { return true }
step3 := func(context.Context, *job) bool { return true }
push, successCh, endPush := gu.NewPipelineRunner(ctx, step1, step2, step3)
// 将任务数据推送进去处理。
go func() {
jobs := make([]*job, 100)
for i := range jobs {
push(jobs[i])
}
endPush() // 结束推送。
}()
// 获取处理成功的任务。
for v := range successCh {
_ = v
}
}
func NewRunner ¶
func NewRunner[T any](ctx context.Context, max int, fn func(context.Context, T) error) ( add func(t T, block bool) error, wait func(fastExit bool) error)
NewRunner 提供同时最多运行 max 个协程运行 fn,一旦 fn 发生错误或恐慌便终止运行。
ctx:上下文,终止时,停止所有 fn 运行,并在调用 add 或 wait 时返回 ctx.Err()。
max:表示最多 max 个协程运行 fn。小于等于零表示不限制协程数。
fn:为要运行的函数。入参 T 由 add 函数提供。若 fn 发生错误或恐慌,将终止所有 fn 运行。恐慌将被恢复,并以错误形式返回,在调用 add 或 wait 时返回。
add:为 fn 提供 T,每一个 T 只会被一个 fn 运行一次。block 为真时,当某一时刻运行 fn 数量达到 max 时,阻塞当前协程添加 T。反之,不阻塞当前协程。
wait:阻塞当前协程,等待运行完毕。fastExit 为真时,运行发生错误时,立即返回该错误。反之,表示等待所有 fn 都终止后再返回。
add 函数返回的错误不为空时,与 fn 返回的第一个错误一致,且与 wait 函数返回的错误为同一个。
注意:若 fn 为空,将触发恐慌。
注意:在 add 完所有 T 后再调用 wait,否则触发恐慌,返回 ErrCallAddAfterWait。
注意:调用了 add 后,请务必调用 wait,除非 add 返回了错误。
Example ¶
package main
import (
"context"
gu "gitee.com/ivfzhou/goroutine-util"
)
func main() {
type product struct{}
ctx := context.Background()
op := func(ctx context.Context, data *product) error {
// 要处理的任务逻辑。
return nil
}
add, wait := gu.NewRunner[*product](ctx, 12, op)
// 将任务要处理的数据传递给任务处理逻辑 op。
var projects []*product
for _, v := range projects {
if err := add(v, true); err != nil {
// 发生错误可能会提前预知。
}
}
// 等待所有数据处理完毕。
if err := wait(true); err != nil {
// 处理 op 返回的错误。
}
}
func RunConcurrently ¶
func RunConcurrently(ctx context.Context, fn ...func(context.Context) error) (wait func(fastExit bool) error)
RunConcurrently 并发运行 fn,一旦有错误发生,终止运行。
ctx:上下文,终止时,将导致 fn 终止运行,并在调用 wait 时返回 ctx.Err()。
fn:要并发运行的函数。触发的恐慌将被恢复,并在调用 wait 时,以错误形式返回。
wait:等待所有 fn 运行完毕。阻塞调用者协程,若 fastExit 为真,则一旦 fn 中发生了错误,立刻返回该错误。
注意:fn 是空将触发恐慌。
Example ¶
package main
import (
"context"
gu "gitee.com/ivfzhou/goroutine-util"
)
func main() {
ctx := context.Background()
work1 := func(ctx context.Context) error {
// 编写你的业务逻辑。
return nil
}
work2 := func(ctx context.Context) error {
// 编写你的业务逻辑。
return nil
}
err := gu.RunConcurrently(ctx, work1, work2)(true)
if err != nil {
// 处理发生的错误。
return
}
}
func RunData ¶
func RunData[T any](ctx context.Context, fn func(context.Context, T) error, fastExit bool, jobs ...T) error
RunData 将 jobs 传给 fn,并发运行 fn。若发生错误,就立刻终止运行。
ctx:上下文。如果上下文终止了,则终止运行,并返回 ctx.Err()。
fn:要运行处理 jobs 的函数。
fastExit:发生错误就立刻返回,不等待所有协程全部退出。
jobs:要处理的任务。
注意:fn 为空将触发恐慌。
func RunPeriodically ¶
RunPeriodically 运行 fn,每个 fn 之间至少间隔 period 时间。前一个 fn 运行完毕到下一个 fn 开始运行之间的间隔时间。
period:每个 fn 运行至少间隔时间。
注意:若 period 为负数将会触发恐慌。
func RunPipeline ¶
func RunPipeline[T any](ctx context.Context, jobs []T, stopWhenErr bool, steps ...func(context.Context, T) error) ( successCh <-chan T, errCh <-chan error)
RunPipeline 将每个 jobs 依次递给 steps 函数处理。一旦某个 step 发生错误或恐慌,就结束处理,返回错误。 一个 job 最多在一个 step 中运行一次,且一个 job 一定是依次序递给 steps,前一个 step 处理完毕才会给下一个 step 处理。 每个 step 并发运行 jobs。
ctx:上下文,上下文终止时,将终止所有 steps 运行,并在 errCh 中返回 ctx.Err()。
stopWhenErr:当 step 发生错误时,是否继续处理 job。当为假时,只会停止将 job 往下一个 step 传递,不会终止运行。
steps:处理 jobs 的函数。返回错误意味着终止运行。
successCh:经所有 steps 处理成功 job 将从该通道发出。
errCh:运行中的错误从该通道发出。
注意:若 steps 中含有空元素将会恐慌。
Example ¶
package main
import (
"context"
gu "gitee.com/ivfzhou/goroutine-util"
)
func main() {
type data struct{}
ctx := context.Background()
jobs := []*data{{}, {}}
work1 := func(ctx context.Context, d *data) error { return nil }
work2 := func(ctx context.Context, d *data) error { return nil }
successCh, errCh := gu.RunPipeline(ctx, jobs, false, work1, work2)
for {
if successCh == nil && errCh == nil {
break
}
select {
case _, ok := <-successCh:
if !ok {
successCh = nil
continue
}
// 处理成功的 jobPtr。
case _, ok := <-errCh:
if !ok {
errCh = nil
continue
}
// 处理错误。
}
}
}
func RunSequentially ¶
RunSequentially 依次运行 fn,当有错误发生时停止后续 fn 运行。
ctx:上下文,终止时,将导致后续 fn 不再运行,并返回 ctx.Err()。
fn:要运行的函数,可以安全的添加空 fn。发生恐慌将被恢复,并以错误形式返回。
error:返回的错误与 fn 返回的错误一致。
Example ¶
package main
import (
"context"
gu "gitee.com/ivfzhou/goroutine-util"
)
func main() {
ctx := context.Background()
first := func(context.Context) error { return nil }
then := func(context.Context) error { return nil }
last := func(context.Context) error { return nil }
err := gu.RunSequentially(ctx, first, then, last)
if err != nil {
// 处理错误。
}
}
Types ¶
type AtomicError ¶ added in v1.0.2
type AtomicError struct {
// contains filtered or unexported fields
}
AtomicError 原子性读取和设置错误信息。
func (*AtomicError) Set ¶ added in v1.0.2
func (e *AtomicError) Set(err error) bool
Set 设置错误信息,除非 err 是空。返回真表示设置成功。
type Error ¶ added in v1.0.3
type Error struct {
// contains filtered or unexported fields
}
Error 错误对象。