Documentation
¶
Index ¶
- type Consumer
- type ConsumerOpts
- type DefaultLogger
- type EmptyLogger
- type ErrExceededMaxBackoff
- type ErrFailedToAckJob
- type ErrFailedToKillJob
- type ErrFailedToRetryJob
- type HandlerFunc
- type Job
- type Logger
- type LoudLogger
- type Producer
- func (p *Producer) Perform(job Job) (string, error)
- func (p *Producer) PerformAfter(duration time.Duration, job Job) (string, error)
- func (p *Producer) PerformAfterCtx(ctx context.Context, duration time.Duration, job Job) (string, error)
- func (p *Producer) PerformAt(at time.Time, job Job) (string, error)
- func (p *Producer) PerformAtCtx(ctx context.Context, at time.Time, job Job) (string, error)
- func (p *Producer) PerformCtx(ctx context.Context, job Job) (string, error)
- type ProducerOpts
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
A Consumer executes jobs and manages the state of the queue.
func NewConsumer ¶
func NewConsumer(opts *ConsumerOpts) *Consumer
NewConsumer instantiates a new Consumer.
func (*Consumer) Consume ¶
func (c *Consumer) Consume(handler HandlerFunc, signals ...os.Signal) error
Consume starts the consumer with a default context. The Consumer runs until the process receives one of the specified signals. An error is returned if the Consumer cannot shut down gracefully.
func (*Consumer) ConsumeCtx ¶
func (c *Consumer) ConsumeCtx(ctx context.Context, handler HandlerFunc) (err error)
ConsumeCtx starts the consumer with a user-supplied context. The Consumer runs indefinitely until the provided context is canceled. An error is returned if the Consumer cannot shut down gracefully.
type ConsumerOpts ¶
type ConsumerOpts struct {
// Address specifies the address of the Redis backing your queue.
// CurlyQ will generate a go-redis instance based on this address.
Address string
// Client is a custom go-redis instance used to communicate with Redis.
// If provided, this option overrides the value set in Address.
Client *redis.Client
// Queue specifies the name of the queue that this consumer will consume from.
Queue string
// Logger provides a concrete implementation of the Logger interface.
// If not provided, it will default to using the stdlib's log package.
Logger Logger
// The maximum number of times to retry a job before killing it.
// Default: 20
JobMaxAttempts int
// The maximum delay between retry attempts.
// Default: 1 week
JobMaxBackoff time.Duration
// How long to wait for executors to finish before exiting forcibly.
// A zero value indicates that we should wait indefinitely.
// Default: 0
ShutdownGracePeriod time.Duration
// How long to wait after a missed heartbeat before a consumer is considered dead.
// Default: 1 minute
// Minimum: 5 seconds
CustodianConsumerTimeout time.Duration
// The maximum number of failed attempts before aborting.
// A zero value indiciates the custodian should never abort.
// Default: 0
CustodianMaxAttempts int
// The longest amount of time to wait between failed attempts.
// Default: 30 seconds
CustodianMaxBackoff time.Duration
// Max number of jobs to clean up during a single check.
// Default: 50
CustodianMaxJobs int
// How frequently the custodian should clean up jobs.
// Default: 1 minute
CustodianPollInterval time.Duration
// The maximum number of failed attempts before aborting.
// A zero value indiciates the hearbeart should never abort.
// Default: 0
HeartbeatMaxAttempts int
// The longest amount of time to wait between failed attempts.
// Default: 30 seconds
HeartbeatMaxBackoff time.Duration
// How frequently we should heartbeat.
// Default: 1 minute
// Minimum: 15 seconds
HeartbeatPollInterval time.Duration
// How many jobs to buffer locally.
// Default: 10
PollerBufferSize int
// The maximum number of failed attempts before aborting.
// A zero value indiciates the poller should never abort.
// Default: 0
PollerMaxAttempts int
// The longest amount of time to wait between failed attempts.
// Default: 30 seconds
PollerMaxBackoff time.Duration
// How long we should block on Redis for new jobs on each call.
// Default: 5 seconds
// Minimum: 1 second
PollerPollDuration time.Duration
// How many jobs to process simultaneously.
// Default: 5
ProcessorConcurrency int
// The maximum number of failed attempts before aborting.
// A zero value indiciates the scheduler should never abort.
// Default: 0
SchedulerMaxAttempts int
// The longest amount of time to wait between failed attempts.
// Default: 30 seconds
SchedulerMaxBackoff time.Duration
// Max number of jobs to schedule during each check.
// Default: 50
SchedulerMaxJobs int
// How frequently the scheduler should check for scheduled jobs.
// Default: 5 seconds
SchedulerPollInterval time.Duration
}
ConsumerOpts exposes options used when creating a new Consumer.
type DefaultLogger ¶ added in v0.2.0
type DefaultLogger struct{}
DefaultLogger is a Logger that send all non-debug logs to stdout.
func (*DefaultLogger) Debug ¶ added in v0.2.0
func (l *DefaultLogger) Debug(args ...interface{})
Debug does nothing.
func (*DefaultLogger) Error ¶ added in v0.2.0
func (l *DefaultLogger) Error(args ...interface{})
Error logs error level information to stdout.
func (*DefaultLogger) Info ¶ added in v0.2.0
func (l *DefaultLogger) Info(args ...interface{})
Info logs info level information to stdout.
func (*DefaultLogger) Warn ¶ added in v0.2.0
func (l *DefaultLogger) Warn(args ...interface{})
Warn logs warn level information to stdout.
type EmptyLogger ¶ added in v0.3.0
type EmptyLogger struct{}
EmptyLogger is a Logger that logs nothing.
func (*EmptyLogger) Debug ¶ added in v0.3.0
func (l *EmptyLogger) Debug(args ...interface{})
Debug does nothing.
func (*EmptyLogger) Error ¶ added in v0.3.0
func (l *EmptyLogger) Error(args ...interface{})
Error does nothing.
func (*EmptyLogger) Info ¶ added in v0.3.0
func (l *EmptyLogger) Info(args ...interface{})
Info does nothing.
func (*EmptyLogger) Warn ¶ added in v0.3.0
func (l *EmptyLogger) Warn(args ...interface{})
Warn does nothing.
type ErrExceededMaxBackoff ¶ added in v0.4.0
ErrExceededMaxBackoff indicates a polling loop exceeded a maximum number of backoffs. It is considered a fatal error that should shut down the consumer.
func (ErrExceededMaxBackoff) Error ¶ added in v0.4.0
func (e ErrExceededMaxBackoff) Error() string
type ErrFailedToAckJob ¶ added in v0.2.0
ErrFailedToAckJob indicates an error when acknowledging a completed job. It is considered a fatal error that should shut down the consumer.
func (ErrFailedToAckJob) Error ¶ added in v0.2.0
func (e ErrFailedToAckJob) Error() string
type ErrFailedToKillJob ¶ added in v0.2.0
ErrFailedToKillJob indicates an error when marking a job as dead. It is considered a fatal error that should shut down the consumer.
func (ErrFailedToKillJob) Error ¶ added in v0.2.0
func (e ErrFailedToKillJob) Error() string
type ErrFailedToRetryJob ¶ added in v0.2.0
ErrFailedToRetryJob indicates an error when scheduling a retry. It is considered a fatal error that should shut down the consumer.
func (ErrFailedToRetryJob) Error ¶ added in v0.2.0
func (e ErrFailedToRetryJob) Error() string
type HandlerFunc ¶
HandlerFunc is a convenience alias. It represents a function used to process a job.
type Logger ¶ added in v0.2.0
type Logger interface {
// Debug logs fine-grained information,
// such as when a given process starts and ends.
Debug(...interface{})
// Info logs useful information,
// such as which job is currently being processed.
Info(...interface{})
// Warn logs non-critical errors,
// such as network issues that are treated as transient errors.
Warn(...interface{})
// Error logs critical errors,
// such as redis issues which might affect the consistency of the queue.
Error(...interface{})
}
Logger exposes an interface for a leveled logger. You can provide a Logger to a Consumer and a Producer to modify CurlyQ's default logging behavior.
type LoudLogger ¶ added in v0.4.0
type LoudLogger struct{}
LoudLogger is a Logger that sends all logs to stdout.
func (*LoudLogger) Debug ¶ added in v0.4.0
func (l *LoudLogger) Debug(args ...interface{})
Debug logs debug level information to stdout.
func (*LoudLogger) Error ¶ added in v0.4.0
func (l *LoudLogger) Error(args ...interface{})
Error logs error level information to stdout.
func (*LoudLogger) Info ¶ added in v0.4.0
func (l *LoudLogger) Info(args ...interface{})
Info logs info level information to stdout.
func (*LoudLogger) Warn ¶ added in v0.4.0
func (l *LoudLogger) Warn(args ...interface{})
Warn logs warn level information to stdout.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
A Producer pushes jobs onto a queue.
func NewProducer ¶
func NewProducer(opts *ProducerOpts) *Producer
NewProducer instantiates a new Producer.
func (*Producer) Perform ¶
Perform calls PerformCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.
func (*Producer) PerformAfter ¶
PerformAfter enqueues a job to be performed after a certain amount of time. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.
func (*Producer) PerformAfterCtx ¶
func (p *Producer) PerformAfterCtx(ctx context.Context, duration time.Duration, job Job) (string, error)
PerformAfterCtx enqueues a job to be performed after a certain amount of time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.
func (*Producer) PerformAt ¶
PerformAt calls PerformAtCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.
func (*Producer) PerformAtCtx ¶
PerformAtCtx schedules a job to be performed at a particular point in time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.
type ProducerOpts ¶
type ProducerOpts struct {
// Address specifies the address of the Redis backing your queue.
// CurlyQ will generate a go-redis instance based on this address.
Address string
// Client is a custom go-redis instance used to communicate with Redis.
// If provided, this option overrides the value set in Address.
Client *redis.Client
// Logger provides a concrete implementation of the Logger interface.
// If not provided, it will default to using the stdlib's log package.
Logger Logger
// Queue specifies the name of the queue that this producer will push to.
Queue string
}
ProducerOpts exposes options used when creating a new Producer.