27 March, 2018

Building a Distributed Task Queue in Go


Introduction

As applications scale and architectures become more distributed, the need for reliable background processing becomes increasingly important. Task queues (also known as job queues) allow applications to offload time-consuming or resource-intensive operations from the main request-handling flow, improving responsiveness and scalability.

Over the past three years of working with Go, I've implemented several task processing systems for different use cases. In this article, I'll share the patterns and practices I've found effective for building distributed task queues in Go, covering architecture design, messaging systems integration, worker pool implementation, error handling, and approaches for ensuring reliability in distributed environments.

Why Use a Task Queue?

Before diving into implementation details, let's consider some common scenarios where a task queue is beneficial:

  1. Resource-intensive operations: Tasks that require significant CPU, memory, or I/O resources (e.g., image/video processing, report generation)
  2. Asynchronous workflows: Operations that don't need to complete within the request-response cycle (e.g., sending emails, notifications)
  3. Rate limiting: Controlling access to rate-limited external services
  4. Scheduled tasks: Operations that need to run at specific times
  5. Retries and reliability: Ensuring critical operations eventually complete despite failures

Architecture Overview

A distributed task queue system typically consists of several components:

  1. Task producers: Services that create and enqueue tasks
  2. Message broker: System that stores and distributes tasks (e.g., RabbitMQ, Kafka, Redis)
  3. Worker pool: Collection of workers that process tasks concurrently
  4. Task handlers: Specialized code to process each task type
  5. Monitoring/dashboards: Visibility into queue health and task processing

Let's design a flexible, robust task queue system in Go that addresses these needs.

Defining the Task Interface

First, let's define what a task looks like in our system:

type Task struct { ID string json:"id" Type string json:"type" Payload map[string]interface{} json:"payload" Priority int json:"priority" Retries int json:"retries" MaxRetry int json:"max_retry" CreatedAt time.Time json:"created_at" }

func NewTask(taskType string, payload map[string]interface{}) *Task { return &Task{ ID: uuid.New().String(), Type: taskType, Payload: payload, Priority: 0, // Default priority Retries: 0, MaxRetry: 3, // Default to 3 retries CreatedAt: time.Now(), } }

Message Broker Integration

The message broker is a critical component of our task queue. Let's implement a broker interface that can be adapted to different backend systems:

type Broker interface { Enqueue(task *Task) error Dequeue() (*Task, error) Complete(taskID string) error Fail(taskID string, err error) error Close() error }

Now, let's implement this interface using RabbitMQ, a popular message broker:

type RabbitMQBroker struct { conn *amqp.Connection channel *amqp.Channel queue string }

func NewRabbitMQBroker(url, queueName string) (*RabbitMQBroker, error) { conn, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) }

ch, err := conn.Channel()
if err != nil {
    conn.Close()
    return nil, fmt.Errorf("failed to open a channel: %w", err)
}

q, err := ch.QueueDeclare(
    queueName, // name
    true,      // durable
    false,     // delete when unused
    false,     // exclusive
    false,     // no-wait
    nil,       // arguments
)
if err != nil {
    ch.Close()
    conn.Close()
    return nil, fmt.Errorf("failed to declare a queue: %w", err)
}

return &RabbitMQBroker{
    conn:    conn,
    channel: ch,
    queue:   q.Name,
}, nil

}

func (b *RabbitMQBroker) Enqueue(task *Task) error { taskBytes, err := json.Marshal(task) if err != nil { return fmt.Errorf("failed to marshal task: %w", err) }

return b.channel.Publish(
    "",       // exchange
    b.queue,  // routing key
    false,    // mandatory
    false,    // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "application/json",
        Body:         taskBytes,
        MessageId:    task.ID,
        Priority:     uint8(task.Priority),
        Timestamp:    task.CreatedAt,
    },
)

}

func (b *RabbitMQBroker) Dequeue() (*Task, error) { msg, err := b.channel.Consume( b.queue, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return nil, fmt.Errorf("failed to register a consumer: %w", err) }

select {
case d := <-msg:
    var task Task
    if err := json.Unmarshal(d.Body, &task); err != nil {
        d.Nack(false, true) // reject but requeue
        return nil, fmt.Errorf("failed to unmarshal task: %w", err)
    }
    
    // Store delivery tag for later acknowledgment
    return &task, nil
    
case <-time.After(5 * time.Second):
    return nil, errors.New("dequeue timeout")
}

}

// Other methods like Complete, Fail, and Close would be implemented similarly

Worker Pool Implementation

Now, let's implement a worker pool to process tasks concurrently:

type WorkerPool struct { broker Broker handlers map[string]TaskHandler numWorkers int workerQueue chan struct{} shutdown chan struct{} wg sync.WaitGroup }

type TaskHandler func(task *Task) error

func NewWorkerPool(broker Broker, numWorkers int) *WorkerPool { return &WorkerPool{ broker: broker, handlers: make(map[string]TaskHandler), numWorkers: numWorkers, workerQueue: make(chan struct{}, numWorkers), shutdown: make(chan struct{}), } }

func (wp *WorkerPool) RegisterHandler(taskType string, handler TaskHandler) { wp.handlers[taskType] = handler }

func (wp *WorkerPool) Start() { for i := 0; i < wp.numWorkers; i++ { wp.wg.Add(1) go wp.worker(i) } }

func (wp *WorkerPool) worker(id int) { defer wp.wg.Done()

log.Printf("Worker %d starting", id)

for {
    select {
    case <-wp.shutdown:
        log.Printf("Worker %d shutting down", id)
        return
        
    default:
        task, err := wp.broker.Dequeue()
        if err != nil {
            // If dequeue timeout or other temporary error, continue
            if errors.Is(err, errors.New("dequeue timeout")) {
                continue
            }
            
            log.Printf("Worker %d dequeue error: %v", id, err)
            time.Sleep(1 * time.Second) // Back off on errors
            continue
        }
        
        handler, ok := wp.handlers[task.Type]
        if !ok {
            log.Printf("No handler for task type: %s", task.Type)
            wp.broker.Fail(task.ID, fmt.Errorf("no handler for task type: %s", task.Type))
            continue
        }
        
        // Process the task
        log.Printf("Worker %d processing task %s (type: %s)", id, task.ID, task.Type)
        err = handler(task)
        
        if err != nil {
            log.Printf("Worker %d task %s failed: %v", id, task.ID, err)
            task.Retries++
            
            if task.Retries <= task.MaxRetry {
                // Requeue the task for retry
                wp.broker.Enqueue(task)
            } else {
                // Max retries exceeded
                wp.broker.Fail(task.ID, err)
            }
        } else {
            log.Printf("Worker %d task %s completed successfully", id, task.ID)
            wp.broker.Complete(task.ID)
        }
    }
}

}

func (wp *WorkerPool) Shutdown(timeout time.Duration) { close(wp.shutdown)

// Wait for workers to finish with timeout
c := make(chan struct{})
go func() {
    wp.wg.Wait()
    close(c)
}()

select {
case <-c:
    log.Println("All workers successfully shutdown")
case <-time.After(timeout):
    log.Println("Shutdown timeout: some workers did not exit in time")
}

}

Task Producer

The task producer is responsible for creating and enqueuing tasks:

type TaskProducer struct { broker Broker }

func NewTaskProducer(broker Broker) *TaskProducer { return &TaskProducer{ broker: broker, } }

func (p *TaskProducer) EnqueueTask(taskType string, payload map[string]interface{}, options ...func(*Task)) error { task := NewTask(taskType, payload)

// Apply any options
for _, option := range options {
    option(task)
}

return p.broker.Enqueue(task)

}

// Option functions for configuring tasks func WithPriority(priority int) func(*Task) { return func(t *Task) { t.Priority = priority } }

func WithMaxRetry(maxRetry int) func(*Task) { return func(t *Task) { t.MaxRetry = maxRetry } }

Handling Distributed System Challenges

Distributed task queues face several challenges that need to be addressed:

1. Message Acknowledgment and At-Least-Once Delivery

To ensure tasks aren't lost during processing, we use acknowledgments:

func (b *RabbitMQBroker) Dequeue() (*Task, error) { // ... existing code ...

select {
case d := <-msg:
    var task Task
    if err := json.Unmarshal(d.Body, &task); err != nil {
        d.Nack(false, true) // reject but requeue
        return nil, fmt.Errorf("failed to unmarshal task: %w", err)
    }
    
    // Store delivery tag for later acknowledgment
    task.deliveryTag = d.DeliveryTag // We would need to add this field to Task
    return &task, nil

// ... existing code ...
}

}

func (b *RabbitMQBroker) Complete(taskID string) error { // In a real implementation, we would look up the delivery tag by taskID // For simplicity, assuming task has deliveryTag property task, err := b.GetTask(taskID) if err != nil { return err }

return b.channel.Ack(task.deliveryTag, false)

}

2. Idempotent Task Processing

Tasks may be processed multiple times due to failures and retries. Handlers should be idempotent:

func processPaymentHandler(task *Task) error { // Extract transaction ID from payload txID, ok := task.Payload["transaction_id"].(string) if !ok { return errors.New("missing transaction_id") }

// Check if payment was already processed
processed, err := isPaymentProcessed(txID)
if err != nil {
    return err
}

if processed {
    log.Printf("Payment %s already processed, skipping", txID)
    return nil
}

// Process the payment
// ...

// Mark as processed
return markPaymentProcessed(txID)

}

3. Dead Letter Queues

Tasks that repeatedly fail should be moved to a dead letter queue for further inspection:

type RabbitMQBroker struct { // ... existing fields ... deadLetterQueue string }

func NewRabbitMQBroker(url, queueName string) (*RabbitMQBroker, error) { // ... existing connection setup ...

// Declare dead letter queue
dlq, err := ch.QueueDeclare(
    queueName+".dlq", // name
    true,             // durable
    false,            // delete when unused
    false,            // exclusive
    false,            // no-wait
    nil,              // arguments
)
if err != nil {
    ch.Close()
    conn.Close()
    return nil, fmt.Errorf("failed to declare dead letter queue: %w", err)
}

return &RabbitMQBroker{
    // ... existing fields ...
    deadLetterQueue: dlq.Name,
}, nil

}

func (b *RabbitMQBroker) Fail(taskID string, err error) error { task, getErr := b.GetTask(taskID) if getErr != nil { return getErr }

// Add error information to task
task.LastError = err.Error()
task.FailedAt = time.Now()

// Publish to dead letter queue
taskBytes, marshalErr := json.Marshal(task)
if marshalErr != nil {
    return marshalErr
}

publishErr := b.channel.Publish(
    "",                // exchange
    b.deadLetterQueue, // routing key
    false,             // mandatory
    false,             // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType:  "application/json",
        Body:         taskBytes,
        MessageId:    task.ID,
    },
)
if publishErr != nil {
    return publishErr
}

// Acknowledge the original message
return b.channel.Ack(task.deliveryTag, false)

}

4. Task Prioritization

Some tasks may be more important than others and should be processed first:

func (p *TaskProducer) EnqueuePriorityTask(taskType string, payload map[string]interface{}, priority int) error { task := NewTask(taskType, payload) task.Priority = priority

return p.broker.Enqueue(task)

}

In RabbitMQ, this is supported directly:

func (b *RabbitMQBroker) Enqueue(task *Task) error { // ... existing code ...

return b.channel.Publish(
    "",       // exchange
    b.queue,  // routing key
    false,    // mandatory
    false,    // immediate
    amqp.Publishing{
        // ... existing properties ...
        Priority: uint8(task.Priority), // Priority from 0 to 255
    },
)

}

5. Delayed/Scheduled Tasks

Sometimes tasks need to be executed at a specific time or after a delay:

func (p *TaskProducer) EnqueueDelayedTask(taskType string, payload map[string]interface{}, delay time.Duration) error { task := NewTask(taskType, payload)

// For RabbitMQ, we can use a delay plugin or a delay exchange
// For simplicity, this example stores the execution time in the task
task.ExecuteAt = time.Now().Add(delay)

return p.broker.Enqueue(task)

}

In the worker, we'd need to check if it's time to execute:

func (wp *WorkerPool) worker(id int) { // ... existing code ...

for {
    // ... existing code ...
    
    task, err := wp.broker.Dequeue()
    if err != nil {
        // ... existing error handling ...
    }
    
    // Check if it's time to execute the task
    if !task.ExecuteAt.IsZero() && time.Now().Before(task.ExecuteAt) {
        // Not yet time, requeue with a delay
        wp.broker.Enqueue(task)
        continue
    }
    
    // ... existing processing code ...
}

}

Monitoring and Observability

A distributed task queue needs proper monitoring to ensure health and performance:

type Metrics struct { TasksEnqueued prometheus.Counter TasksCompleted prometheus.Counter TasksFailed prometheus.Counter TasksRetried prometheus.Counter ProcessingTime prometheus.Histogram QueueDepth prometheus.Gauge }

func NewMetrics(registry *prometheus.Registry) *Metrics { m := &Metrics{ TasksEnqueued: prometheus.NewCounter(prometheus.CounterOpts{ Name: "tasks_enqueued_total", Help: "Total number of tasks enqueued", }), TasksCompleted: prometheus.NewCounter(prometheus.CounterOpts{ Name: "tasks_completed_total", Help: "Total number of tasks completed successfully", }), TasksFailed: prometheus.NewCounter(prometheus.CounterOpts{ Name: "tasks_failed_total", Help: "Total number of tasks that failed permanently", }), TasksRetried: prometheus.NewCounter(prometheus.CounterOpts{ Name: "tasks_retried_total", Help: "Total number of task retry attempts", }), ProcessingTime: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "task_processing_seconds", Help: "Time spent processing tasks", Buckets: prometheus.DefBuckets, }), QueueDepth: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "queue_depth", Help: "Current number of tasks in the queue", }), }

registry.MustRegister(
    m.TasksEnqueued,
    m.TasksCompleted,
    m.TasksFailed,
    m.TasksRetried,
    m.ProcessingTime,
    m.QueueDepth,
)

return m

}

// Integrating metrics with our broker and worker pool func (wp *WorkerPool) worker(id int) { // ... existing code ...

for {
    // ... existing code ...
    
    task, err := wp.broker.Dequeue()
    if err != nil {
        // ... existing error handling ...
    }
    
    // Record metrics
    wp.metrics.QueueDepth.Dec()
    
    startTime := time.Now()
    
    // Process the task
    handler, ok := wp.handlers[task.Type]
    if !ok {
        // ... existing error handling ...
    }
    
    err = handler(task)
    
    // Record processing time
    wp.metrics.ProcessingTime.Observe(time.Since(startTime).Seconds())
    
    if err != nil {
        // ... existing error handling ...
        if task.Retries <= task.MaxRetry {
            wp.metrics.TasksRetried.Inc()
            // ... retry logic ...
        } else {
            wp.metrics.TasksFailed.Inc()
            // ... failure logic ...
        }
    } else {
        wp.metrics.TasksCompleted.Inc()
        // ... completion logic ...
    }
}

}

Putting It All Together

Let's see how all these components work together in a complete example:

func main() { // Initialize the broker broker, err := NewRabbitMQBroker("amqp://guest:guest@localhost:5672/", "tasks") if err != nil { log.Fatalf("Failed to create broker: %v", err) } defer broker.Close()

// Initialize metrics
registry := prometheus.NewRegistry()
metrics := NewMetrics(registry)

// Create a task producer
producer := NewTaskProducer(broker)

// Create a worker pool
pool := NewWorkerPool(broker, 5) // 5 workers

// Register task handlers
pool.RegisterHandler("email", func(task *Task) error {
    to, _ := task.Payload["to"].(string)
    subject, _ := task.Payload["subject"].(string)
    body, _ := task.Payload["body"].(string)
    
    log.Printf("Sending email to %s: %s", to, subject)
    // Actual email sending logic would go here
    
    return nil
})

pool.RegisterHandler("image_resize", func(task *Task) error {
    imageURL, _ := task.Payload["url"].(string)
    width, _ := task.Payload["width"].(float64)
    height, _ := task.Payload["height"].(float64)
    
    log.Printf("Resizing image %s to %dx%d", imageURL, int(width), int(height))
    // Actual image processing logic would go here
    
    return nil
})

// Start the worker pool
pool.Start()

// Start HTTP server for metrics and control
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
http.HandleFunc("/enqueue", func(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }
    
    var request struct {
        Type    string                 `json:"type"`
        Payload map[string]interface{} `json:"payload"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
        http.Error(w, "Invalid request", http.StatusBadRequest)
        return
    }
    
    task := NewTask(request.Type, request.Payload)
    if err := broker.Enqueue(task); err != nil {
        http.Error(w, fmt.Sprintf("Failed to enqueue task: %v", err), http.StatusInternalServerError)
        return
    }
    
    metrics.TasksEnqueued.Inc()
    metrics.QueueDepth.Inc()
    
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(map[string]string{"task_id": task.ID})
})

// Start the HTTP server
server := &http.Server{
    Addr: ":8080",
}

// Graceful shutdown
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)

go func() {
    log.Println("HTTP server listening on :8080")
    if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
        log.Fatalf("HTTP server error: %v", err)
    }
}()

<-shutdown
log.Println("Shutdown signal received")

// Shutdown the HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
    log.Printf("HTTP server shutdown error: %v", err)
}

// Shutdown the worker pool
pool.Shutdown(30 * time.Second)

log.Println("Service shutdown complete")

}

Best Practices and Lessons Learned

Through building and operating distributed task queues in production, I've learned several important lessons:

  1. Design for Failure: Assume components will fail and design accordingly. Use retries, circuit breakers, and dead letter queues.

  2. Idempotency is Critical: Ensure task handlers are idempotent to safely handle retries without side effects.

  3. Visibility Matters: Comprehensive logging and monitoring are essential for troubleshooting and capacity planning.

  4. Prioritize Tasks: Not all tasks are equal; implement priority queues to ensure critical tasks are processed first.

  5. Balance Throughput and Reliability: Acknowledge messages only after successful processing, but batch acknowledgments for efficiency when possible.

  6. Limit Concurrency: Set appropriate limits on worker concurrency to avoid overwhelming downstream services.

  7. Implement Backpressure Mechanisms: When the system is overwhelmed, slow down task production rather than failing.

  8. Avoid Poison Pills: Tasks that consistently fail can block queues; move them to dead letter queues quickly.

Conclusion

Building a distributed task queue in Go requires careful consideration of architecture, message broker selection, worker pool design, and error handling strategies. The implementation outlined in this article provides a solid foundation for handling background processing needs in distributed applications.

Go's concurrency primitives, strong typing, and excellent ecosystem make it well-suited for implementing robust task queues. By following the patterns and practices described here, you can build a system that reliably processes tasks at scale while gracefully handling the challenges inherent in distributed systems.

In future articles, I'll dive deeper into advanced topics such as implementing exactly-once delivery semantics, building workflow orchestration on top of task queues, and optimizing for extreme throughput scenarios.


About the author: I'm a software engineer with experience in systems programming and distributed systems. Over the past years, I've been designing and implementing distributed systems in Go, with a focus on reliability and scalability.