Documentation
¶
Index ¶
- func MigrateUp(pool *pgxpool.Pool) error
- type DB
- type EnqueueHandler
- type EnqueueOption
- type Job
- type JobEnqueueContext
- type JobProcessContext
- type JobQueue
- type JobQueueOption
- func WithBaseRetryDelay(d time.Duration) JobQueueOption
- func WithDLQ(queueName string) JobQueueOption
- func WithFIFO(isFIFO bool) JobQueueOption
- func WithLogger(logger *slog.Logger) JobQueueOption
- func WithMaxRetryDelay(d time.Duration) JobQueueOption
- func WithMiddlewares(mws ...Middleware) JobQueueOption
- func WithPollInterval(interval time.Duration) JobQueueOption
- func WithQueueName(name string) JobQueueOption
- type Middleware
- type ProcessHandler
- type Querier
- type RetryPolicy
- type UnimplementedMiddleware
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DB ¶
type DB interface {
database.DBTX
// BeginTx creates a new (pgx) transaction
BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error)
}
DB is the abstraction for any (pgx) database connection with allowing to create transactions.
type EnqueueHandler ¶
type EnqueueHandler func(ctx JobEnqueueContext) error
EnqueueHandler defines the function signature for handling job enqueuing.
type EnqueueOption ¶
type EnqueueOption func(*enqueueConfig)
EnqueueOption defines a functional option for configuring job enqueuing.
func WithMaxRetries ¶
func WithMaxRetries(n int32) EnqueueOption
WithMaxRetries sets the maximum number of retries for the enqueued job.
func WithRetryPolicy ¶
func WithRetryPolicy(policy RetryPolicy) EnqueueOption
WithRetryPolicy sets the retry policy for the enqueued job. Possible policies are RetryPolicyConstant, RetryPolicyLinear, and RetryPolicyExponential.
type Job ¶
type Job[T any] struct { // ID is the database identifier for the job. ID int32 // Metadata contains meta information of a job stored as key-value pairs. Metadata map[string]any // Args contains the job payload of type T. Args T }
Job represents a queued job with its arguments and database ID.
type JobEnqueueContext ¶
JobEnqueueContext provides context for enqueuing a job, including its arguments.
func (JobEnqueueContext) WithContext ¶
func (ectx JobEnqueueContext) WithContext(ctx context.Context) JobEnqueueContext
WithContext returns a copy of the JobEnqueueContext with the provided context.
type JobProcessContext ¶
JobProcessContext provides context for processing a job, including its ID and arguments.
func (JobProcessContext) WithContext ¶
func (pctx JobProcessContext) WithContext(ctx context.Context) JobProcessContext
WithContext returns a copy of the JobProcessContext with the provided context.
type JobQueue ¶
type JobQueue[T any] struct { // contains filtered or unexported fields }
JobQueue manages the lifecycle of jobs in a named queue. It polls the database for scheduled jobs, executes them via a Worker, and handles retries with exponential backoff or other configured policies. Jobs are processed concurrently.
T is the type of the job arguments payload and must be JSON serializable.
JobQueues should be created using the New function. Example:
queue := goqueue.New(db, &MyWorker{},
goqueue.WithQueueName("my-queue"),
goqueue.WithPollInterval(500*time.Millisecond),
)
func New ¶
func New[T any](db DB, worker Worker[T], opts ...JobQueueOption) *JobQueue[T]
New creates a new JobQueue with the given database connection and worker. Configure behavior using functional options like WithPollInterval, WithQueueName, etc.
func (*JobQueue[T]) Enqueue ¶
Enqueue adds a new job with the given arguments to the queue. Optional parameters can be set using EnqueueOption functions like WithMaxRetries and WithRetryPolicy. It returns the created Job or an error. Example:
job, err := queue.Enqueue(ctx, MyJobArgs{...},
jobqueue.WithMaxRetries(5),
jobqueue.WithRetryPolicy(jobqueue.RetryPolicyExponential),
)
func (*JobQueue[T]) Receive ¶
Receive starts polling the queue for scheduled jobs and processes them using the configured Worker. It runs until the provided context is cancelled. Errors during job processing are logged but do not stop the polling loop. Failed jobs are retried or marked as failed after the retry policy has been exhausted.
Example:
ctx, cancel := context.WithCancel(context.Background()) defer cancel() go queue.Receive(ctx) // Run for 10 minutes time.Sleep(10 * time.Minute) cancel()
type JobQueueOption ¶
type JobQueueOption func(*jobQueueConfig)
JobQueueOption defines a functional option for configuring a JobQueue.
func WithBaseRetryDelay ¶
func WithBaseRetryDelay(d time.Duration) JobQueueOption
WithBaseRetryDelay sets the base delay used for calculating retry backoff.
func WithDLQ ¶
func WithDLQ(queueName string) JobQueueOption
WithDLQ configures a dead-letter queue (DLQ) for handling failed jobs. If queueName is an empty string, no DLQ is used.
func WithFIFO ¶
func WithFIFO(isFIFO bool) JobQueueOption
WithFIFO configures the queue to process jobs in a first-in-first-out manner.
func WithLogger ¶
func WithLogger(logger *slog.Logger) JobQueueOption
WithLogger sets a custom logger for the JobQueue.
func WithMaxRetryDelay ¶
func WithMaxRetryDelay(d time.Duration) JobQueueOption
WithMaxRetryDelay sets the maximum delay allowed between retry attempts.
func WithMiddlewares ¶
func WithMiddlewares(mws ...Middleware) JobQueueOption
WithMiddlewares adds the given middlewares to the JobQueue for both enqueuing and processing jobs. They are applied in the order provided.
func WithPollInterval ¶
func WithPollInterval(interval time.Duration) JobQueueOption
WithPollInterval sets the polling interval for checking the queue for new jobs.
func WithQueueName ¶
func WithQueueName(name string) JobQueueOption
WithQueueName sets the name of the job queue.
type Middleware ¶
type Middleware interface {
// Enqueue is called when a job is being enqueued.
Enqueue(next EnqueueHandler) EnqueueHandler
// Process is called when a job is being processed.
Process(next ProcessHandler) ProcessHandler
}
Middleware allows wrapping EnqueueHandler and ProcessHandler with custom logic.
type ProcessHandler ¶
type ProcessHandler func(ctx JobProcessContext) error
ProcessHandler defines the function signature for handling job processing.
type Querier ¶
Querier combines the generated database queries with the ability to create a new Querier bound to a transaction.
type RetryPolicy ¶
type RetryPolicy int
RetryPolicy defines the strategy for retrying failed jobs.
const ( // RetryPolicyConstant retries jobs with a constant delay. Formlar: // // next_delay = base_delay // RetryPolicyConstant RetryPolicy = iota // RetryPolicyLinear retries jobs with a linearly increasing delay. Formula: // // next_delay = base_delay * (attempt_number + 1) // RetryPolicyLinear // RetryPolicyExponential retries jobs with an exponentially increasing delay. Formula: // // next_delay = base_delay ^ (attempt_number + 1) // RetryPolicyExponential )
type UnimplementedMiddleware ¶
type UnimplementedMiddleware struct {
Middleware
}
UnimplementedMiddleware can be embedded to have a Middleware implementation with no methods.
func (*UnimplementedMiddleware) Enqueue ¶
func (m *UnimplementedMiddleware) Enqueue(next EnqueueHandler) EnqueueHandler
Enqueue returns the next EnqueueHandler unchanged.
func (*UnimplementedMiddleware) Process ¶
func (m *UnimplementedMiddleware) Process(next ProcessHandler) ProcessHandler
Process returns the next ProcessHandler unchanged.
type Worker ¶
type Worker[T any] interface { // Work processes a single job. Returning an error triggers retry logic // based on the job's retry policy and max retry count. If max retries is // exceeded, the job is marked as failed. Work(ctx context.Context, job *Job[T]) error }
Worker processes jobs ob type T. Implementations should be idempotent as jobs may be retried multiple times on failure.