Documentation
¶
Overview ¶
Package async is a library for asynchronous programming.
Since Go has already done a great job in bringing green/virtual threads into life, this library only implements a single-threaded Executor type, which some refer to as an async runtime. One can create as many executors as they like.
While Go excels at forking, async, on the other hand, excels at joining.
Use Case #1: Fan-in Executing Code From Various Goroutines ¶
Wanted to execute pieces of code from various goroutines in a single-threaded way?
An Executor is designed to be able to run tasks spawned in various goroutines sequentially. This comes in handy when one wants to do a series of operations on a single thread, for example, to read or update states that are not safe for concurrent access, to write data to the console, to update one's user interfaces, etc.
No backpressure alert. Task spawning is designed not to block. If spawning outruns execution, an executor could easily consume a lot of memory over time. To mitigate, one could introduce a semaphore per hot spot.
Use Case #2: Event-driven Reactiveness ¶
A Task can be reactive.
A task is spawned with a Coroutine to take care of it. In this user-provided function, one can return a specific Result to tell a coroutine to watch and await some events (e.g. Signal, State and Memo, etc.), and the coroutine can just re-run the task whenever any of these events notifies.
This is useful when one wants to do something repeatedly. It works like a loop. To exit this loop, just return a Result that ends the coroutine from within the task function. Simple.
Use Case #3: Easy State Machines ¶
A Coroutine can also transit from one Task to another, just like a state machine can transit from one state to another. This is done by returning another specific Result from within a task function. A coroutine can transit from one task to another until a task ends it.
With the ability to transit, async is able to provide more advanced control structures, like Block, Loop and Func, to ease the process of writing async code. The experience now feels similar to that of writing sync code.
Spawning Async Tasks vs. Passing Data over Go Channels ¶
It's not recommended to have channel operations in an async Task for a Coroutine to do, since they tend to block. For an Executor, if one coroutine blocks, no other coroutines can run. So instead of passing data around, one would just handle data in place.
One of the advantages of passing data over channels is to be able to reduce allocation. Unfortunately, async tasks always escape to heap. Any variable they captured also escapes to heap. One should always stay alert and take measures in hot spot, like repeatedly using a same task.
Example ¶
This example demonstrates how to spawn tasks with different paths. The lower path, the higher priority. This example creates a task with path "aa" for additional computations and another task with path "zz" for printing results. The former runs before the latter because "aa" < "zz".
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
// Create an executor.
var myExecutor async.Executor
// Set up an autorun function to run an executor automatically whenever a coroutine is spawned or resumed.
// The best practice is to pass a function that does not block. See Example (NonBlocking).
myExecutor.Autorun(myExecutor.Run)
// Create two states.
s1, s2 := async.NewState(1), async.NewState(2)
// Although states can be created without the help of executors,
// they might only be safe for use by one and only one executor because of data races.
// Without proper synchronization, it's better only to spawn coroutines to read or update states.
var sum, product async.State[int]
myExecutor.Spawn("aa", func(co *async.Coroutine) async.Result { // The path of co is "aa".
co.Watch(s1, s2) // Let co depend on s1 and s2, so co can re-run whenever s1 or s2 changes.
sum.Set(s1.Get() + s2.Get())
product.Set(s1.Get() * s2.Get())
return co.Await() // Awaits signals or state changes.
})
// The above task re-runs whenever s1 or s2 changes. As an example, this is fine.
// In practice, one should probably use memos to avoid unnecessary recomputations. See Example (Memo).
op := async.NewState('+')
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result { // The path of co is "zz".
co.Watch(op)
fmt.Println("op =", "'"+string(op.Get())+"'")
switch op.Get() {
case '+':
// The path of an inner coroutine is relative to its outer one.
co.Spawn("sum", func(co *async.Coroutine) async.Result { // The path of inner co is "zz/sum".
fmt.Println("s1 + s2 =", sum.Get())
return co.Await(&sum)
})
case '*':
co.Spawn("product", func(co *async.Coroutine) async.Result { // The path of inner co is "zz/product".
fmt.Println("s1 * s2 =", product.Get())
return co.Await(&product)
})
}
return co.Await()
})
fmt.Println("--- SEPARATOR ---")
// The followings create several tasks to mutate states.
// They share the same path, "/", which is lower than "aa" and "zz".
// Remember that, the lower path, the higher priority.
// Updating states should have higher priority, so that when there are multiple update tasks,
// they can run together before any read task.
// This reduces the number of reads that have to react on update.
myExecutor.Spawn("/", async.Do(func() {
s1.Set(3)
s2.Set(4)
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
op.Set('*')
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(5)
s2.Set(6)
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(7)
s2.Set(8)
op.Set('+')
}))
}
Output: op = '+' s1 + s2 = 3 --- SEPARATOR --- s1 + s2 = 7 --- SEPARATOR --- op = '*' s1 * s2 = 12 --- SEPARATOR --- s1 * s2 = 30 --- SEPARATOR --- op = '+' s1 + s2 = 15
Example (Cleanup) ¶
This example demonstrates how to add a function call before a task re-runs, or after a task ends.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
co.Cleanup(func() { fmt.Println(v, myState.Get()) })
if v < 3 {
return co.Await()
}
return co.End()
})
for i := 1; i <= 5; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 5.
}
Output: 0 1 1 2 2 3 3 3 5
Example (Conditional) ¶
This example demonstrates how a task can conditionally depend on a state.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
s1, s2, s3 := async.NewState(1), async.NewState(2), async.NewState(7)
myExecutor.Spawn("aa", func(co *async.Coroutine) async.Result {
co.Watch(s1, s2) // Always depends on s1 and s2.
v := s1.Get() + s2.Get()
if v%2 == 0 {
co.Watch(s3) // Conditionally depends on s3.
v *= s3.Get()
}
fmt.Println(v)
return co.Await()
})
inc := func(i int) int { return i + 1 }
myExecutor.Spawn("/", async.Do(func() { s3.Notify() })) // Nothing happens.
myExecutor.Spawn("/", async.Do(func() { s1.Update(inc) }))
myExecutor.Spawn("/", async.Do(func() { s3.Notify() }))
myExecutor.Spawn("/", async.Do(func() { s2.Update(inc) }))
myExecutor.Spawn("/", async.Do(func() { s3.Notify() })) // Nothing happens.
}
Output: 3 28 28 5
Example (ConditionalMemo) ¶
This example demonstrates how a memo can conditionally depend on a state.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
s1, s2, s3 := async.NewState(1), async.NewState(2), async.NewState(7)
m := async.NewMemo(&myExecutor, "aa", func(co *async.Coroutine, s *async.State[int]) {
co.Watch(s1, s2) // Always depends on s1 and s2.
v := s1.Get() + s2.Get()
if v%2 == 0 {
co.Watch(s3) // Conditionally depends on s3.
v *= s3.Get()
}
s.Set(v)
})
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(m)
fmt.Println(m.Get())
return co.Await()
})
inc := func(i int) int { return i + 1 }
myExecutor.Spawn("/", async.Do(func() { s3.Notify() })) // Nothing happens.
myExecutor.Spawn("/", async.Do(func() { s1.Update(inc) }))
myExecutor.Spawn("/", async.Do(func() { s3.Notify() }))
myExecutor.Spawn("/", async.Do(func() { s2.Update(inc) }))
myExecutor.Spawn("/", async.Do(func() { s3.Notify() })) // Nothing happens.
}
Output: 3 28 28 5
Example (End) ¶
This example demonstrates how to end a task. It creates a task that prints the value of a state whenever it changes. The task only prints 0, 1, 2 and 3 because it is ended after 3.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v)
if v < 3 {
return co.Await()
}
return co.End()
})
for i := 1; i <= 5; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 5.
}
Output: 0 1 2 3 5
Example (Memo) ¶
This example demonstrates how to use memos to memoize cheap computations. Memos are evaluated lazily. They take effect only when they are acquired.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
s1, s2 := async.NewState(1), async.NewState(2)
sum := async.NewMemo(&myExecutor, "aa", func(co *async.Coroutine, s *async.State[int]) {
co.Watch(s1, s2)
if v := s1.Get() + s2.Get(); v != s.Get() {
s.Set(v) // Update s only when its value changes to stop unnecessary propagation.
}
})
product := async.NewMemo(&myExecutor, "aa", func(co *async.Coroutine, s *async.State[int]) {
co.Watch(s1, s2)
if v := s1.Get() * s2.Get(); v != s.Get() {
s.Set(v)
}
})
op := async.NewState('+')
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(op)
fmt.Println("op =", "'"+string(op.Get())+"'")
switch op.Get() {
case '+':
co.Spawn("sum", func(co *async.Coroutine) async.Result {
fmt.Println("s1 + s2 =", sum.Get())
return co.Await(sum)
})
case '*':
co.Spawn("product", func(co *async.Coroutine) async.Result {
fmt.Println("s1 * s2 =", product.Get())
return co.Await(product)
})
}
return co.Await()
})
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(3)
s2.Set(4)
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
op.Set('*')
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(5)
s2.Set(6)
}))
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(7)
s2.Set(8)
op.Set('+')
}))
}
Output: op = '+' s1 + s2 = 3 --- SEPARATOR --- s1 + s2 = 7 --- SEPARATOR --- op = '*' s1 * s2 = 12 --- SEPARATOR --- s1 * s2 = 30 --- SEPARATOR --- op = '+' s1 + s2 = 15
Example (NonBlocking) ¶
This example demonstrates how to set up an autorun function to run an executor in a goroutine automatically whenever a coroutine is spawned or resumed.
package main
import (
"fmt"
"sync"
"github.com/b97tsk/async"
)
func main() {
var wg sync.WaitGroup // For keeping track of goroutines.
var myExecutor async.Executor
myExecutor.Autorun(func() {
wg.Add(1)
go func() {
defer wg.Done()
myExecutor.Run()
}()
})
s1, s2 := async.NewState(1), async.NewState(2)
sum := async.NewMemo(&myExecutor, "aa", func(co *async.Coroutine, s *async.State[int]) {
co.Watch(s1, s2)
if v := s1.Get() + s2.Get(); v != s.Get() {
s.Set(v)
}
})
product := async.NewMemo(&myExecutor, "aa", func(co *async.Coroutine, s *async.State[int]) {
co.Watch(s1, s2)
if v := s1.Get() * s2.Get(); v != s.Get() {
s.Set(v)
}
})
op := async.NewState('+')
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(op)
fmt.Println("op =", "'"+string(op.Get())+"'")
switch op.Get() {
case '+':
co.Spawn("sum", func(co *async.Coroutine) async.Result {
fmt.Println("s1 + s2 =", sum.Get())
return co.Await(sum)
})
case '*':
co.Spawn("product", func(co *async.Coroutine) async.Result {
fmt.Println("s1 * s2 =", product.Get())
return co.Await(product)
})
}
return co.Await()
})
wg.Wait() // Wait for autorun to complete.
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(3)
s2.Set(4)
}))
wg.Wait()
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
op.Set('*')
}))
wg.Wait()
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(5)
s2.Set(6)
}))
wg.Wait()
fmt.Println("--- SEPARATOR ---")
myExecutor.Spawn("/", async.Do(func() {
s1.Set(7)
s2.Set(8)
op.Set('+')
}))
wg.Wait()
}
Output: op = '+' s1 + s2 = 3 --- SEPARATOR --- s1 + s2 = 7 --- SEPARATOR --- op = '*' s1 * s2 = 12 --- SEPARATOR --- s1 * s2 = 30 --- SEPARATOR --- op = '+' s1 + s2 = 15
Example (Switch) ¶
This example demonstrates how a coroutine can transit from one task to another.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v)
if v < 3 {
return co.Await()
}
return co.Transit(func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v, "(transited)")
if v < 5 {
return co.Await()
}
return co.End()
})
})
for i := 1; i <= 7; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 7.
}
Output: 0 1 2 3 3 (transited) 4 (transited) 5 (transited) 7
Index ¶
- type Coroutine
- func (co *Coroutine) Await(ev ...Event) Result
- func (co *Coroutine) Break() Result
- func (co *Coroutine) BreakLabel(l Label) Result
- func (co *Coroutine) Cleanup(f func())
- func (co *Coroutine) Continue() Result
- func (co *Coroutine) ContinueLabel(l Label) Result
- func (co *Coroutine) Defer(t Task)
- func (co *Coroutine) End() Result
- func (co *Coroutine) Executor() *Executor
- func (co *Coroutine) Exit() Result
- func (co *Coroutine) Path() string
- func (co *Coroutine) Return() Result
- func (co *Coroutine) Spawn(p string, t Task)
- func (co *Coroutine) Transit(t Task) Result
- func (co *Coroutine) Watch(ev ...Event)
- func (co *Coroutine) Yield(t Task) Result
- type Event
- type Executor
- type Label
- type Memo
- type Result
- type Semaphore
- type Signal
- type State
- type Task
- func Await(ev ...Event) Task
- func Block(s ...Task) Task
- func Break() Task
- func BreakLabel(l Label) Task
- func Continue() Task
- func ContinueLabel(l Label) Task
- func Defer(t Task) Task
- func Do(f func()) Task
- func End() Task
- func Exit() Task
- func Func(t Task) Task
- func Loop(t Task) Task
- func LoopLabel(l Label, t Task) Task
- func LoopLabelN(l Label, n int, t Task) Task
- func LoopN(n int, t Task) Task
- func Return() Task
- type WaitGroup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Coroutine ¶ added in v0.2.0
type Coroutine struct {
// contains filtered or unexported fields
}
A Coroutine is an execution of code, similar to a goroutine but cooperative and stackless.
A coroutine is created with a function called Task. A coroutine's job is to end the task. When an Executor spawns a coroutine with a task, it runs the coroutine by calling the task function with the coroutine as the argument. The return value determines whether to end the coroutine or to yield it so that it could resume later.
In order for a coroutine to resume, the coroutine must watch at least one Event (e.g. Signal, State and Memo, etc.), when calling the task function. A notification of such an event resumes the coroutine. When a coroutine is resumed, the executor runs the coroutine again.
A coroutine can also make a transit to work on another task according to the return value of the task function. A coroutine can transit from one task to another until a task ends it.
func (*Coroutine) Await ¶ added in v0.2.0
Await returns a Result that will cause co to yield. Await also accepts additional events to watch.
func (*Coroutine) Break ¶ added in v0.3.0
Break returns a Result that will cause co to break a loop.
func (*Coroutine) BreakLabel ¶ added in v0.3.0
BreakLabel returns a Result that will cause co to break a loop with label l.
func (*Coroutine) Cleanup ¶ added in v0.3.0
func (co *Coroutine) Cleanup(f func())
Cleanup adds a function call when co resumes or ends, or when co is making a transit to work on another Task.
func (*Coroutine) Continue ¶ added in v0.3.0
Continue returns a Result that will cause co to continue a loop.
func (*Coroutine) ContinueLabel ¶ added in v0.3.0
ContinueLabel returns a Result that will cause co to continue a loop with label l.
func (*Coroutine) Defer ¶ added in v0.2.0
Defer adds a Task for execution when returning from a Func. Deferred tasks are executed in last-in-first-out (LIFO) order.
func (*Coroutine) End ¶ added in v0.2.0
End returns a Result that will cause co to end or make a transit to work on another Task.
func (*Coroutine) Executor ¶ added in v0.2.0
Executor returns the executor that spawned co.
Since co can be recycled by an executor, it is recommended to save the return value in a variable first.
func (*Coroutine) Exit ¶ added in v0.3.0
Exit returns a Result that will cause co to exit. All deferred tasks will be run before co exits.
func (*Coroutine) Path ¶ added in v0.2.0
Path returns the path of co.
Since co can be recycled by an executor, it is recommended to save the return value in a variable first.
func (*Coroutine) Return ¶ added in v0.3.0
Return returns a Result that will cause co to return from a Func.
func (*Coroutine) Spawn ¶ added in v0.2.0
Spawn creates an inner coroutine to work on t, using the result of path.Join(co.Path(), p) as its path.
Inner coroutines are ended automatically when the outer one resumes or ends, or when the outer one is making a transit to work on another task.
func (*Coroutine) Transit ¶ added in v0.3.0
Transit returns a Result that will cause co to make a transit to work on t.
type Event ¶
type Event interface {
// contains filtered or unexported methods
}
Event is the interface of any type that can be watched by a coroutine.
The following types implement Event: Signal, State and Memo. Any type that embeds Signal also implements Event, e.g. State.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
An Executor is a coroutine spawner, and a coroutine runner.
When a coroutine is spawned or resumed, it is added into an internal queue. The Run method then pops and runs each of them from the queue until the queue is emptied. It is done in a single-threaded manner. If one coroutine blocks, no other coroutines can run. The best practice is not to block.
The internal queue is a priority queue. Coroutines added in the queue are sorted by their paths. Coroutines with the same path are sorted by their arrival order (FIFO). Popping the queue removes the first coroutine with the least path.
Manually calling the Run method is usually not desired. One would instead use the Autorun method to set up an autorun function to calling the Run method automatically whenever a coroutine is spawned or resumed. An executor never calls the autorun function twice at the same time.
func (*Executor) Autorun ¶
func (e *Executor) Autorun(f func())
Autorun sets up an autorun function to calling the Run method automatically whenever a coroutine is spawned or resumed.
One must pass a function that calls the Run method.
If f blocks, the Spawn method may block too. The best practice is not to block.
func (*Executor) Run ¶
func (e *Executor) Run()
Run pops and runs every coroutine in the queue until the queue is emptied.
Run must not be called twice at the same time.
type Memo ¶
type Memo[T any] struct { // contains filtered or unexported fields }
A Memo is a State-like structure that carries a value that can only be set in a Task-like function.
A memo is designed to have a value that is computed from other states. What make a memo useful are that:
- A memo can prevent unnecessary computations when it isn't used;
- A memo can prevent unnecessary propagations when its value isn't changed.
To create a memo, use NewMemo or NewStrictMemo.
A Memo must not be shared by more than one Executor.
func NewMemo ¶
NewMemo returns a new non-strict Memo. The arguments are used to initialize an internal coroutine.
One must pass a function that watches some states, computes a value from these states, and then updates the given state if the value differs.
Like any Event, a memo can be watched by multiple coroutines. The watch list increases and decreases over time. For a non-strict memo, when the last coroutine in the list unwatches it, it does not immediately end its internal coroutine. Ending the internal coroutine would only put the memo into a stale state because the memo no longer detects dependency changes. By not immediately ending the internal coroutine, a non-strict memo prevents an extra computation when a new coroutine watches it, provided that there are no dependency changes.
On the other hand, a strict memo immediately ends its internal coroutine whenever the last coroutine in the watch list unwatches it. The memo becomes stale. The next time a new coroutine watches it, it has to make a fresh computation.
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
Result is the type of the return value of a Task function. A Result determines what next for a coroutine to do after running a task.
A Result can be created by calling one of the following methods:
- Coroutine.End: for ending a coroutine;
- Coroutine.Await: for yielding a coroutine with additional events to watch;
- Coroutine.Yield: for yielding a coroutine with another task to which will be transited later when resuming;
- Coroutine.Transit: for transiting to another task.
type Semaphore ¶ added in v0.2.0
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore provides a way to bound asynchronous access to a resource. The callers can request access with a given weight.
Note that this Semaphore type does not provide backpressure for spawning a lot of tasks. One should instead look for a sync implementation.
A Semaphore must not be shared by more than one Executor.
Example ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/b97tsk/async"
)
func main() {
var wg sync.WaitGroup // For keeping track of goroutines.
var myExecutor async.Executor
myExecutor.Autorun(func() {
wg.Add(1)
go func() {
defer wg.Done()
myExecutor.Run()
}()
})
mySemaphore := async.NewSemaphore(12)
for n := int64(1); n <= 8; n++ {
myExecutor.Spawn("/", mySemaphore.Acquire(n).Then(async.Do(func() {
fmt.Println(n)
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
myExecutor.Spawn("/", async.Do(func() { mySemaphore.Release(n) }))
}()
})))
}
wg.Wait()
}
Output: 1 2 3 4 5 6 7 8
func NewSemaphore ¶ added in v0.2.0
NewSemaphore creates a new weighted semaphore with the given maximum combined weight.
type Signal ¶
type Signal struct {
// contains filtered or unexported fields
}
Signal is a type that implements Event.
Calling the Notify method of a signal, in a Task function, resumes any coroutine that is watching the signal.
A Signal must not be shared by more than one Executor.
type State ¶
A State is a Signal that carries a value. To retrieve the value, call the Get method.
Calling the Set method of a state, in a Task function, updates the value and resumes any coroutine that is watching the state.
A State must not be shared by more than one Executor.
func (*State[T]) Get ¶
func (s *State[T]) Get() T
Get retrieves the value of s.
Without proper synchronization, one should only call this method in a Task function.
type Task ¶
A Task is a piece of work that a coroutine is given to do when it is spawned. The return value of a task, a Result, determines what next for a coroutine to do.
The argument co must not escape, because co can be recycled by an Executor when co ends.
func Await ¶ added in v0.3.0
Await returns a Task that awaits some events until any of them notifies, and then ends. If ev is empty, Await returns a Task that never ends.
Example ¶
This example computes two values in separate goroutines sequentially, then prints their sum.
package main
import (
"fmt"
"sync"
"time"
"github.com/b97tsk/async"
)
func main() {
var wg sync.WaitGroup // For keeping track of goroutines.
var myExecutor async.Executor
myExecutor.Autorun(func() {
wg.Add(1)
go func() {
defer wg.Done()
myExecutor.Run()
}()
})
var myState struct {
async.Signal
v1, v2 int
}
myExecutor.Spawn("/", func(co *async.Coroutine) async.Result {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(500 * time.Millisecond) // Heavy work #1 here.
ans := 15
myExecutor.Spawn("/", async.Do(func() {
myState.v1 = ans
myState.Notify()
}))
}()
return co.Transit(async.Await(&myState).Then(
func(co *async.Coroutine) async.Result {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(500 * time.Millisecond) // Heavy work #2 here.
ans := 27
myExecutor.Spawn("/", async.Do(func() {
myState.v2 = ans
myState.Notify()
}))
}()
return co.Transit(async.Await(&myState).Then(
async.Do(func() {
fmt.Println("v1 + v2 =", myState.v1+myState.v2)
}),
))
},
))
})
wg.Wait()
}
Output: v1 + v2 = 42
func Block ¶ added in v0.3.0
Block returns a Task that runs each of the given tasks in sequence. When one task ends, Block runs another.
Example ¶
This example demonstrates how to run a block of tasks. A block can have zero or more tasks. A block runs tasks in sequence.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", func(co *async.Coroutine) async.Result {
var t async.Task
t = async.Block(
async.Await(&myState),
async.Do(func() {
if v := myState.Get(); v%2 != 0 {
fmt.Println(v)
}
}),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v >= 7 {
return co.End()
}
return co.Transit(t) // Transit to t again to form a loop.
},
)
return co.Transit(t)
})
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 9
func BreakLabel ¶ added in v0.3.0
BreakLabel returns a Task that breaks a loop with label l.
func ContinueLabel ¶ added in v0.3.0
ContinueLabel returns a Task that continues a loop with label l.
func Defer ¶ added in v0.3.0
Defer returns a Task that adds t for execution when returning from a Func. Deferred tasks are executed in last-in-first-out (LIFO) order.
func Exit ¶ added in v0.3.0
func Exit() Task
Exit returns a Task that causes the coroutine that runs it to exit. All deferred tasks are run before the coroutine exits.
func Func ¶ added in v0.3.0
Func returns a Task that runs t in a function scope. Spawned tasks are considered surrounded by an invisible Func.
Example ¶
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", async.Block(
async.Defer( // Note that spawned tasks are considered surrounded by an invisible [Func].
async.Do(func() { fmt.Println("defer 1") }),
),
async.Func(async.Block( // A block in a function scope.
async.Defer(
async.Do(func() { fmt.Println("defer 2") }),
),
async.Loop(async.Block(
async.Await(&myState),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v%2 == 0 {
return co.Continue()
}
return co.End()
},
async.Do(func() {
fmt.Println(myState.Get())
}),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v >= 7 {
return co.Return() // Return here.
}
return co.End()
},
)),
async.Do(func() { fmt.Println("after Loop") }), // Didn't run due to early return.
)),
async.Do(func() { fmt.Println("after Func") }),
))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 defer 2 after Func defer 1 9
Example (Exit) ¶
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", async.Block(
async.Defer( // Note that spawned tasks are considered surrounded by an invisible [Func].
async.Do(func() { fmt.Println("defer 1") }),
),
async.Func(async.Block( // A block in a function scope.
async.Defer(
async.Do(func() { fmt.Println("defer 2") }),
),
async.Loop(async.Block(
async.Await(&myState),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v%2 == 0 {
return co.Continue()
}
return co.End()
},
async.Do(func() {
fmt.Println(myState.Get())
}),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v >= 7 {
return co.Exit() // Exit here.
}
return co.End()
},
)),
async.Do(func() { fmt.Println("after Loop") }), // Didn't run due to early exit.
)),
async.Do(func() { fmt.Println("after Func") }), // Didn't run due to early exit.
))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 defer 2 defer 1 9
Example (Panic) ¶
This example demonstrates how async can handle panickings.
package main
import (
"errors"
"fmt"
"strings"
"github.com/b97tsk/async"
)
func main() {
dummyError := errors.New("dummy")
var myExecutor async.Executor
myExecutor.Autorun(func() {
defer func() {
v := recover()
if v == nil {
return
}
err, ok := v.(error)
if ok && errors.Is(err, dummyError) && strings.Contains(err.Error(), "dummy") {
// Note that use of strings.Contains(...) here is for coverage.
fmt.Println("recovered dummy error")
return
}
panic(v) // Repanic unexpected recovered value.
}()
myExecutor.Run()
})
var myState async.State[int]
myExecutor.Spawn("zz", async.Block(
async.Defer( // Note that spawned tasks are considered surrounded by an invisible [Func].
async.Do(func() { fmt.Println("defer 1") }),
),
async.Func(async.Block( // A block in a function scope.
async.Defer(
async.Do(func() { fmt.Println("defer 2") }),
),
async.Loop(async.Block(
async.Await(&myState),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v%2 == 0 {
return co.Continue()
}
return co.End()
},
async.Do(func() {
fmt.Println(myState.Get())
}),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v >= 7 {
panic(dummyError) // Panic here.
}
return co.End()
},
)),
async.Do(func() { fmt.Println("after Loop") }), // Didn't run due to early panicking.
)),
async.Do(func() { fmt.Println("after Func") }), // Didn't run due to early panicking.
))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 defer 2 defer 1 recovered dummy error 9
Example (Tailcall) ¶
This example demonstrates how to make tail-calls in an async.Func. Tail-calls are not recommended and should be avoided when possible. Without tail-call optimization, this example panics. To run this example, add an output comment at the end.
package main
import (
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
myExecutor.Spawn("#1", func(co *async.Coroutine) async.Result {
var n int
var t async.Task
t = async.Func(async.Block(
async.End(),
async.End(),
async.End(),
func(co *async.Coroutine) async.Result { // Last task in the block.
if n < 5000000 {
n++
return co.Transit(t) // Tail-call here.
}
return co.End()
},
))
return co.Transit(t)
})
myExecutor.Spawn("#2", func(co *async.Coroutine) async.Result {
var n int
var t async.Task
t = async.Func(async.Block(
func(co *async.Coroutine) async.Result {
if n < 5000000 {
n++
co.Defer(t) // Tail-call here (workaround).
return co.Return() // Early return.
}
return co.End()
},
async.End(),
async.End(),
async.End(),
))
return co.Transit(t)
})
// To run this example, add an output comment here.
}
func Loop ¶ added in v0.3.0
Loop returns a Task that forms a loop, which would run t repeatedly. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early.
Example ¶
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", async.Loop(async.Block(
async.Await(&myState),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v%2 == 0 {
return co.Continue()
}
return co.End()
},
async.Do(func() {
fmt.Println(myState.Get())
}),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v >= 7 {
return co.Break()
}
return co.End()
},
)))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 9
func LoopLabel ¶ added in v0.3.0
LoopLabel returns a Task that forms a loop with label l, which would run t repeatedly. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early. Both Coroutine.BreakLabel and BreakLabel, with label l, can break this loop early. Both Coroutine.ContinueLabel and ContinueLabel, with label l, can continue this loop early.
func LoopLabelN ¶ added in v0.3.0
LoopLabelN returns a Task that forms a loop with label l, which would run t repeatedly for n times. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early. Both Coroutine.BreakLabel and BreakLabel, with label l, can break this loop early. Both Coroutine.ContinueLabel and ContinueLabel, with label l, can continue this loop early.
func LoopN ¶ added in v0.3.0
LoopN returns a Task that forms a loop, which would run t repeatedly for n times. Both Coroutine.Break and Break can break this loop early. Both Coroutine.Continue and Continue can continue this loop early.
Example ¶
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
myExecutor.Spawn("zz", async.LoopN(7, async.Block(
async.Await(&myState),
func(co *async.Coroutine) async.Result {
if v := myState.Get(); v%2 == 0 {
return co.Continue()
}
return co.End()
},
async.Do(func() {
fmt.Println(myState.Get())
}),
)))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 1 3 5 7 9
func (Task) Then ¶ added in v0.2.0
Then returns a Task that first works on t, then next after t ends.
To chain multiple tasks, use Block function.
Example ¶
This example demonstrates how to run a task after another. To run multiple tasks in sequence, use async.Block instead.
package main
import (
"fmt"
"github.com/b97tsk/async"
)
func main() {
var myExecutor async.Executor
myExecutor.Autorun(myExecutor.Run)
var myState async.State[int]
a := func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v, "(a)")
if v < 3 {
return co.Await()
}
return co.Transit(func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v, "(transited)")
if v < 5 {
return co.Await()
}
return co.End()
})
}
b := func(co *async.Coroutine) async.Result {
co.Watch(&myState)
v := myState.Get()
fmt.Println(v, "(b)")
if v < 7 {
return co.Await()
}
return co.End()
}
myExecutor.Spawn("zz", async.Task(a).Then(b))
for i := 1; i <= 9; i++ {
myExecutor.Spawn("/", async.Do(func() { myState.Set(i) }))
}
fmt.Println(myState.Get()) // Prints 9.
}
Output: 0 (a) 1 (a) 2 (a) 3 (a) 3 (transited) 4 (transited) 5 (transited) 5 (b) 6 (b) 7 (b) 9
type WaitGroup ¶ added in v0.2.0
type WaitGroup struct {
Signal
// contains filtered or unexported fields
}
A WaitGroup is a Signal with a counter.
Calling the Add or Done method of a WaitGroup, in a Task function, updates the counter and, when the counter becomes zero, resumes any coroutine that is watching the WaitGroup.
A WaitGroup must not be shared by more than one Executor.
Example ¶
package main
import (
"fmt"
"sync"
"time"
"github.com/b97tsk/async"
)
func main() {
var wg sync.WaitGroup // For keeping track of goroutines.
var myExecutor async.Executor
myExecutor.Autorun(func() {
wg.Add(1)
go func() {
defer wg.Done()
myExecutor.Run()
}()
})
var myState struct {
wg async.WaitGroup
v1, v2 int
}
myState.wg.Add(2) // Note that async.WaitGroup is not safe for concurrent use.
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(500 * time.Millisecond) // Heavy work #1 here.
ans := 15
myExecutor.Spawn("/", async.Do(func() {
myState.v1 = ans
myState.wg.Done()
}))
}()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(500 * time.Millisecond) // Heavy work #2 here.
ans := 27
myExecutor.Spawn("/", async.Do(func() {
myState.v2 = ans
myState.wg.Done()
}))
}()
myExecutor.Spawn("/", myState.wg.Await().Then(async.Do(func() {
fmt.Println("v1 + v2 =", myState.v1+myState.v2)
})))
wg.Wait()
}
Output: v1 + v2 = 42
func (*WaitGroup) Add ¶ added in v0.2.0
Add adds delta, which may be negative, to the WaitGroup counter. If the WaitGroup counter becomes zero, Add resumes any coroutine that is watching wg. If the WaitGroup counter is negative, Add panics.