Skip to Content

Queue

The queue package provides a job queue system for offloading work to background workers. It supports enqueue/dequeue operations, configurable retry policies, dead letter queues, and concurrent worker pools.

Import

import "github.com/gofastadev/gofasta/pkg/queue"

Key Types

Queue

type Queue interface { Enqueue(ctx context.Context, job Job) error Dequeue(ctx context.Context) (*Job, error) Ack(ctx context.Context, jobID string) error Nack(ctx context.Context, jobID string) error Start(ctx context.Context) error Stop(ctx context.Context) error }

Job

type Job struct { ID string `json:"id"` Type string `json:"type"` Payload map[string]interface{} `json:"payload"` Priority int `json:"priority"` MaxRetries int `json:"max_retries"` RetryCount int `json:"retry_count"` ScheduledAt *time.Time `json:"scheduled_at,omitempty"` CreatedAt time.Time `json:"created_at"` }

QueueConfig

type QueueConfig struct { Driver string `yaml:"driver" env:"QUEUE_DRIVER"` Workers int `yaml:"workers" env:"QUEUE_WORKERS"` MaxRetries int `yaml:"max_retries" env:"QUEUE_MAX_RETRIES"` RetryDelay time.Duration `yaml:"retry_delay" env:"QUEUE_RETRY_DELAY"` Concurrency int `yaml:"concurrency" env:"QUEUE_CONCURRENCY"` }

Handler

type Handler func(ctx context.Context, job Job) error

Key Functions

FunctionSignatureDescription
NewQueuefunc NewQueue(cfg QueueConfig) (Queue, error)Creates a new queue instance based on the driver
RegisterHandlerfunc (q *Queue) RegisterHandler(jobType string, handler Handler)Registers a handler for a specific job type
Enqueuefunc (q *Queue) Enqueue(ctx context.Context, job Job) errorAdds a job to the queue
EnqueueAtfunc (q *Queue) EnqueueAt(ctx context.Context, job Job, at time.Time) errorSchedules a job for future execution

Usage

Setting Up the Queue

q, err := queue.NewQueue(queue.QueueConfig{ Driver: "memory", // "memory" or "redis" Workers: 4, MaxRetries: 3, RetryDelay: 5 * time.Second, Concurrency: 10, }) if err != nil { log.Fatalf("failed to create queue: %v", err) }

Registering Handlers

q.RegisterHandler("send_email", func(ctx context.Context, job queue.Job) error { to := job.Payload["to"].(string) subject := job.Payload["subject"].(string) body := job.Payload["body"].(string) return mailClient.Send(ctx, mailer.Message{ To: []mailer.Address{{Email: to}}, Subject: subject, Body: body, }) }) q.RegisterHandler("process_image", func(ctx context.Context, job queue.Job) error { imageURL := job.Payload["url"].(string) // process the image... return nil })

Enqueuing Jobs

// Immediate execution err := q.Enqueue(ctx, queue.Job{ Type: "send_email", Payload: map[string]interface{}{ "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up.", }, MaxRetries: 3, }) // Delayed execution err := q.EnqueueAt(ctx, queue.Job{ Type: "send_reminder", Payload: map[string]interface{}{ "user_id": "user-123", }, }, time.Now().Add(24*time.Hour))

Starting and Stopping Workers

// Start processing jobs (blocks until stopped) go func() { if err := q.Start(ctx); err != nil { log.Fatalf("queue error: %v", err) } }() // Graceful shutdown quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() q.Stop(ctx)

Configuration via config.yaml

queue: driver: redis workers: 4 max_retries: 3 retry_delay: 5s concurrency: 10
  • Scheduler — Cron-based scheduling pairs well with queued jobs
  • Mailer — Offload email sending to the queue
  • Resilience — Retry and circuit breaker for job handlers
Last updated on