redis

package
v0.0.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package redis provides a Redis-based job queue implementation. This package offers distributed queue operations with blocking dequeue and processing tracking.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	RedisURL  string // Redis URL (defaults to WHEN_REDIS_URL or redis://localhost:6379/0)
	KeyPrefix string // Key prefix for queue keys (defaults to "queue:")
}

Config configures the Redis queue

type Job

type Job struct {
	ActionID   string    `json:"actionID"`
	QueueName  string    `json:"queueName"`
	WorkflowID string    `json:"workflowID"` // Workflow ID (determines which queue to use)
	RunID      string    `json:"runID"`
	EnqueuedAt time.Time `json:"enqueuedAt"`
	RetryCount int       `json:"retryCount"`
}

Job represents a job in the execution queue

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue handles job queue operations using Redis

func NewQueue

func NewQueue(ctx context.Context, config Config) (*Queue, error)

NewQueue creates a new Redis queue client

func (*Queue) Close

func (q *Queue) Close() error

Close closes the Redis connection

func (*Queue) CompleteJob

func (q *Queue) CompleteJob(actionID string) error

CompleteJob removes a job from the processing set

func (*Queue) Dequeue

func (q *Queue) Dequeue(queueName string, timeout time.Duration) (*Job, error)

Dequeue removes and returns the next job from a queue (blocking)

func (*Queue) Enqueue

func (q *Queue) Enqueue(job Job) error

Enqueue adds a job to a queue

func (*Queue) FailJob

func (q *Queue) FailJob(actionID string, requeue bool, queueName string, retryCount int) error

FailJob marks a job as failed and optionally re-enqueues it

func (*Queue) GetQueueDepth

func (q *Queue) GetQueueDepth(queueName string) (int, error)

GetQueueDepth returns the number of jobs in a queue

func (*Queue) IsProcessing

func (q *Queue) IsProcessing(actionID string) (bool, error)

IsProcessing checks if a job is currently being processed

func (*Queue) MarkProcessing

func (q *Queue) MarkProcessing(actionID string, deadline time.Time) error

MarkProcessing adds a job to the processing set with a deadline

func (*Queue) WaitForJobCompletion

func (q *Queue) WaitForJobCompletion(actionID string, timeout time.Duration, checkStatus func(string) (string, error)) error

WaitForJobCompletion waits for a job to complete or timeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL