Queue
The queue package provides a job queue system for offloading work to background workers. It supports enqueue/dequeue operations, configurable retry policies, dead letter queues, and concurrent worker pools.
Import
import "github.com/gofastadev/gofasta/pkg/queue"Key Types
Queue
type Queue interface {
Enqueue(ctx context.Context, job Job) error
Dequeue(ctx context.Context) (*Job, error)
Ack(ctx context.Context, jobID string) error
Nack(ctx context.Context, jobID string) error
Start(ctx context.Context) error
Stop(ctx context.Context) error
}Job
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
Priority int `json:"priority"`
MaxRetries int `json:"max_retries"`
RetryCount int `json:"retry_count"`
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}QueueConfig
type QueueConfig struct {
Driver string `yaml:"driver" env:"QUEUE_DRIVER"`
Workers int `yaml:"workers" env:"QUEUE_WORKERS"`
MaxRetries int `yaml:"max_retries" env:"QUEUE_MAX_RETRIES"`
RetryDelay time.Duration `yaml:"retry_delay" env:"QUEUE_RETRY_DELAY"`
Concurrency int `yaml:"concurrency" env:"QUEUE_CONCURRENCY"`
}Handler
type Handler func(ctx context.Context, job Job) errorKey Functions
| Function | Signature | Description |
|---|---|---|
NewQueue | func NewQueue(cfg QueueConfig) (Queue, error) | Creates a new queue instance based on the driver |
RegisterHandler | func (q *Queue) RegisterHandler(jobType string, handler Handler) | Registers a handler for a specific job type |
Enqueue | func (q *Queue) Enqueue(ctx context.Context, job Job) error | Adds a job to the queue |
EnqueueAt | func (q *Queue) EnqueueAt(ctx context.Context, job Job, at time.Time) error | Schedules a job for future execution |
Usage
Setting Up the Queue
q, err := queue.NewQueue(queue.QueueConfig{
Driver: "memory", // "memory" or "redis"
Workers: 4,
MaxRetries: 3,
RetryDelay: 5 * time.Second,
Concurrency: 10,
})
if err != nil {
log.Fatalf("failed to create queue: %v", err)
}Registering Handlers
q.RegisterHandler("send_email", func(ctx context.Context, job queue.Job) error {
to := job.Payload["to"].(string)
subject := job.Payload["subject"].(string)
body := job.Payload["body"].(string)
return mailClient.Send(ctx, mailer.Message{
To: []mailer.Address{{Email: to}},
Subject: subject,
Body: body,
})
})
q.RegisterHandler("process_image", func(ctx context.Context, job queue.Job) error {
imageURL := job.Payload["url"].(string)
// process the image...
return nil
})Enqueuing Jobs
// Immediate execution
err := q.Enqueue(ctx, queue.Job{
Type: "send_email",
Payload: map[string]interface{}{
"to": "user@example.com",
"subject": "Welcome!",
"body": "Thanks for signing up.",
},
MaxRetries: 3,
})
// Delayed execution
err := q.EnqueueAt(ctx, queue.Job{
Type: "send_reminder",
Payload: map[string]interface{}{
"user_id": "user-123",
},
}, time.Now().Add(24*time.Hour))Starting and Stopping Workers
// Start processing jobs (blocks until stopped)
go func() {
if err := q.Start(ctx); err != nil {
log.Fatalf("queue error: %v", err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
q.Stop(ctx)Configuration via config.yaml
queue:
driver: redis
workers: 4
max_retries: 3
retry_delay: 5s
concurrency: 10Related Pages
- Scheduler — Cron-based scheduling pairs well with queued jobs
- Mailer — Offload email sending to the queue
- Resilience — Retry and circuit breaker for job handlers
Last updated on