Skip to main content

Unosend V2 Architecture

Production-ready, scalable email infrastructure with regional queues and Docker workers.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐ │ VERCEL API LAYER │ │ │ │ 1. Receive API request │ │ 2. Validate API key via Redis cache (NOT Supabase on every request) │ │ 3. Check if user is PAID │ │ 4. Get user IP → determine region (asia/eu/us) │ │ 5. Check queue depths across ALL regions │ │ 6. Route to best queue (prefer user region, fallback if overloaded) │ │ 7. Send email data to selected regional queue │ └─────────────────────────────────────────────────────────────────────────────┘ │ ┌────────────────┼────────────────┐ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ ASIA-REDIS│ │ EU-REDIS │ │ US-REDIS │ │ (Mumbai) │ │ (Frankfurt)│ │ (Virginia)│ │ │ │ │ │ │ │ Contabo │ │ OVH │ │ Future │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ │ │ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ WORKER │ │ WORKER │ │ WORKER │ │ REPLICAS │ │ REPLICAS │ │ REPLICAS │ │ (Docker) │ │ (Docker) │ │ (Docker) │ │ │ │ │ │ │ │ 3 replicas│ │ 3 replicas│ │ 3 replicas│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ │ │ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ KumoMTA │ │ KumoMTA │ │ KumoMTA │ │ (Local) │ │ (Local) │ │ (Local) │ └───────────┘ └───────────┘ └───────────┘

Part 1: Redis Cluster (3 Regions)

Infrastructure

RegionServerIPRedis PortPurpose
AsiaContabo (Mumbai)217.217.250.1146379asia-queue
EUContabo (Germany)84.247.139.1056379eu-queue
USTBDTBD6379us-queue

Redis Configuration

Each Redis instance runs in Docker with:
# docker-compose.redis.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    command: >
      redis-server
      --requirepass ${REDIS_PASSWORD}
      --maxmemory 512mb
      --maxmemory-policy allkeys-lru
      --appendonly yes
      --appendfsync everysec
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: always

volumes:
  redis_data:

Environment Variables (Vercel)

# Regional Redis URLs
REDIS_ASIA_URL=redis://unosend:${REDIS_PASS}@217.217.250.114:6379/0
REDIS_EU_URL=redis://unosend:${REDIS_PASS}@84.247.139.105:6379/0
REDIS_US_URL=redis://unosend:${REDIS_PASS}@us-server-ip:6379/0

# Queue settings
QUEUE_THRESHOLD=500  # Max jobs before overflow to another region

Part 2: API Key Caching in Redis

Why Cache?

  • Supabase query: ~100-200ms
  • Redis cache: ~1-5ms
  • 20-40x faster validation

Cache Structure

Key: apikey: TTL: 300 seconds (5 minutes) Value: JSON
interface CachedApiKey {
  org_id: string
  api_key_id: string
  tier: 'free' | 'pro' | 'scale' | 'enterprise'
  is_paid: boolean
  daily_limit: number
  monthly_limit: number
  daily_sent: number
  monthly_sent: number
  domains: Array<{
    id: string
    domain: string
    dkim_private_key: string
  }>
}

Implementation

// lib/api-key-cache.ts
import Redis from 'ioredis'
import crypto from 'crypto'

const redis = new Redis(process.env.REDIS_ASIA_URL) // Primary cache
const CACHE_TTL = 300

function hashKey(apiKey: string): string {
  return crypto.createHash('sha256').update(apiKey).digest('hex')
}

export async function validateApiKeyFast(apiKey: string): Promise<CachedApiKey | null> {
  const cacheKey = `apikey:${hashKey(apiKey)}`
  
  // 1. Check Redis cache
  const cached = await redis.get(cacheKey)
  if (cached) {
    return JSON.parse(cached)
  }
  
  // 2. Cache miss - fetch from Supabase
  const result = await fetchFromSupabase(apiKey)
  if (!result) return null
  
  // 3. Cache for future requests
  await redis.setex(cacheKey, CACHE_TTL, JSON.stringify(result))
  
  return result
}

// Invalidate cache when user changes settings
export async function invalidateApiKeyCache(apiKeyId: string) {
  const pattern = `apikey:*`
  // In production, store apiKeyId -> cacheKey mapping for precise invalidation
}

Part 3: Queue Router (Vercel)

Logic

  1. Get user IP → determine region
  2. Check queue depths across all regions
  3. If user’s region queue < threshold → use it
  4. Else → use least loaded queue
// lib/queue-router.ts
import Redis from 'ioredis'

const QUEUE_THRESHOLD = 500

const redisConnections = {
  asia: new Redis(process.env.REDIS_ASIA_URL!),
  eu: new Redis(process.env.REDIS_EU_URL!),
  // us: new Redis(process.env.REDIS_US_URL!),
}

interface QueueInfo {
  region: string
  redis: Redis
  depth: number
}

export async function getQueueDepths(): Promise<QueueInfo[]> {
  const depths = await Promise.all(
    Object.entries(redisConnections).map(async ([region, redis]) => ({
      region,
      redis,
      depth: await redis.llen('email:queue')
    }))
  )
  return depths
}

export async function selectBestQueue(userRegion: string): Promise<QueueInfo> {
  const queues = await getQueueDepths()
  
  // Prefer user's region if not overloaded
  const userQueue = queues.find(q => q.region === userRegion)
  if (userQueue && userQueue.depth < QUEUE_THRESHOLD) {
    return userQueue
  }
  
  // Fallback to least loaded
  queues.sort((a, b) => a.depth - b.depth)
  return queues[0]
}

export function getRegionFromIP(ip: string): string {
  // Use MaxMind GeoIP or similar
  // Simplified version:
  const geoip = require('geoip-lite')
  const geo = geoip.lookup(ip)
  
  if (!geo) return 'asia' // Default
  
  const country = geo.country
  const euCountries = ['DE', 'FR', 'GB', 'IT', 'ES', 'NL', 'PL', 'SE', 'NO', 'FI', 'DK', 'AT', 'CH', 'BE', 'IE', 'PT']
  const usCountries = ['US', 'CA', 'MX']
  
  if (euCountries.includes(country)) return 'eu'
  if (usCountries.includes(country)) return 'us'
  return 'asia'
}

Part 4: Queue Payload Structure

What Vercel Sends to Queue

interface QueuePayload {
  // Identifiers
  id: string           // Email UUID
  org_id: string
  api_key_id: string
  
  // User info (from cache)
  is_paid: boolean
  tier: string
  
  // Domain info for DKIM
  domain_id: string
  from_domain: string
  
  // Email content
  from: string
  to: string | string[]
  subject: string
  html?: string
  text?: string
  reply_to?: string
  headers?: Record<string, string>
  
  // Tracking
  track_opens: boolean
  track_clicks: boolean
  
  // Metadata
  source_ip: string
  source_region: string
  created_at: number
}

Part 5: Docker Worker

Design Patterns Used

  1. Worker Pool Pattern - Goroutine pool for concurrent processing
  2. Circuit Breaker - Prevent cascade failures to KumoMTA
  3. Graceful Shutdown - Handle SIGTERM properly
  4. Health Checks - Kubernetes/Docker health endpoints
  5. Structured Logging - JSON logs for observability
  6. Metrics - Prometheus metrics for monitoring

Dockerfile

# docker/worker/Dockerfile
FROM golang:1.22-alpine AS builder

WORKDIR /app

# Dependencies
COPY go.mod go.sum ./
RUN go mod download

# Build
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /worker ./cmd/worker

# Runtime
FROM alpine:3.19

RUN apk --no-cache add ca-certificates tzdata

WORKDIR /app
COPY --from=builder /worker .

# Non-root user
RUN adduser -D -u 1000 worker
USER worker

EXPOSE 8080

HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD wget --no-verbose --tries=1 --spider http://localhost:8080/health || exit 1

ENTRYPOINT ["/app/worker"]

Worker Code Structure

docker/worker/ ├── Dockerfile ├── go.mod ├── go.sum ├── cmd/ │ └── worker/ │ └── main.go # Entry point ├── internal/ │ ├── config/ │ │ └── config.go # Environment config │ ├── queue/ │ │ └── consumer.go # Redis queue consumer │ ├── processor/ │ │ └── email.go # Email processing logic │ ├── validator/ │ │ ├── mx.go # MX record validation │ │ └── suppression.go # Suppression list check │ ├── dkim/ │ │ └── signer.go # DKIM signing │ ├── smtp/ │ │ └── client.go # KumoMTA SMTP client │ ├── metrics/ │ │ └── prometheus.go # Metrics │ └── health/ │ └── handler.go # Health check endpoint └── pkg/ └── models/ └── payload.go # Shared data structures

Main Worker Code

// docker/worker/cmd/worker/main.go
package main

import (
    "context"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/unosend/worker/internal/config"
    "github.com/unosend/worker/internal/health"
    "github.com/unosend/worker/internal/metrics"
    "github.com/unosend/worker/internal/processor"
    "github.com/unosend/worker/internal/queue"
    "github.com/rs/zerolog"
    "github.com/rs/zerolog/log"
)

func main() {
    // Structured logging
    zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
    log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})

    // Load config
    cfg := config.Load()
    log.Info().
        Str("region", cfg.Region).
        Int("workers", cfg.WorkerCount).
        Str("redis", cfg.RedisURL).
        Msg("Starting worker")

    // Context for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start health server
    go health.StartServer(cfg.HealthPort)

    // Start metrics server
    go metrics.StartServer(cfg.MetricsPort)

    // Create worker pool
    var wg sync.WaitGroup
    jobs := make(chan queue.Payload, cfg.BufferSize)

    // Start N worker goroutines
    for i := 0; i < cfg.WorkerCount; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            processor.Worker(ctx, id, jobs, cfg)
        }(i)
    }

    // Start queue consumer
    go queue.Consume(ctx, cfg.RedisURL, jobs)

    // Wait for shutdown signal
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Info().Msg("Shutting down gracefully...")
    cancel()

    // Wait for workers with timeout
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Info().Msg("All workers finished")
    case <-time.After(30 * time.Second):
        log.Warn().Msg("Shutdown timeout, forcing exit")
    }
}

Email Processor

// docker/worker/internal/processor/email.go
package processor

import (
    "context"
    "fmt"
    "time"

    "github.com/unosend/worker/internal/config"
    "github.com/unosend/worker/internal/dkim"
    "github.com/unosend/worker/internal/metrics"
    "github.com/unosend/worker/internal/queue"
    "github.com/unosend/worker/internal/smtp"
    "github.com/unosend/worker/internal/validator"
    "github.com/rs/zerolog/log"
)

func Worker(ctx context.Context, id int, jobs <-chan queue.Payload, cfg *config.Config) {
    smtpClient := smtp.NewClient(cfg.MTAHost, cfg.MTAPort)
    
    for {
        select {
        case <-ctx.Done():
            return
        case payload := <-jobs:
            processEmail(id, payload, smtpClient, cfg)
        }
    }
}

func processEmail(workerID int, p queue.Payload, client *smtp.Client, cfg *config.Config) {
    start := time.Now()
    logger := log.With().
        Int("worker", workerID).
        Str("email_id", p.ID).
        Str("to", p.To).
        Logger()

    // 1. Validate MX records
    mx, err := validator.CheckMX(extractDomain(p.To))
    if err != nil {
        logger.Warn().Err(err).Msg("MX validation failed")
        markFailed(p.ID, "invalid_mx", err.Error())
        metrics.EmailsFailed.Inc()
        return
    }

    // 2. Check suppression list
    if validator.IsSuppressed(p.OrgID, p.To) {
        logger.Info().Msg("Email suppressed")
        markFailed(p.ID, "suppressed", "recipient in suppression list")
        metrics.EmailsSuppressed.Inc()
        return
    }

    // 3. Get DKIM key and sign
    dkimKey := getDKIMKey(p.DomainID)
    message := buildMessage(p)
    signedMessage, err := dkim.Sign(message, p.FromDomain, dkimKey)
    if err != nil {
        logger.Error().Err(err).Msg("DKIM signing failed")
        // Continue without DKIM rather than failing
    }

    // 4. Build envelope with Return-Path for bounce tracking
    envelope := smtp.Envelope{
        MailFrom: fmt.Sprintf("bounce+%s@send.unosend.co", p.ID),
        RcptTo:   []string{p.To},
    }

    // 5. Send to KumoMTA
    err = client.Send(envelope, signedMessage)
    if err != nil {
        logger.Error().Err(err).Msg("SMTP delivery failed")
        markFailed(p.ID, "delivery_failed", err.Error())
        metrics.EmailsFailed.Inc()
        return
    }

    // 6. Mark as sent
    markSent(p.ID)
    
    duration := time.Since(start)
    metrics.EmailsProcessed.Inc()
    metrics.ProcessingDuration.Observe(duration.Seconds())
    
    logger.Info().
        Dur("duration", duration).
        Str("mx", mx).
        Msg("Email sent successfully")
}

Part 6: Docker Compose (Per Server)

# docker-compose.yml (deployed on each server)
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    command: >
      redis-server
      --requirepass ${REDIS_PASSWORD}
      --maxmemory 512mb
      --maxmemory-policy allkeys-lru
      --appendonly yes
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: always
    healthcheck:
      test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3

  worker:
    image: ghcr.io/bittucreator/unosend-worker:latest
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '1'
          memory: 512M
    environment:
      - REDIS_URL=redis://unosend:${REDIS_PASSWORD}@redis:6379/0
      - MTA_HOST=127.0.0.1
      - MTA_PORT=2025
      - WORKER_COUNT=32
      - REGION=${REGION}
      - SUPABASE_URL=${SUPABASE_URL}
      - SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY}
    depends_on:
      redis:
        condition: service_healthy
    restart: always
    healthcheck:
      test: ["CMD", "wget", "--spider", "-q", "http://localhost:8080/health"]
      interval: 30s
      timeout: 3s
      retries: 3

  kumomta:
    # Existing KumoMTA container or systemd service
    # ...

volumes:
  redis_data:

Part 7: Deployment

GitHub Actions CI/CD

# .github/workflows/worker.yml
name: Build Worker

on:
  push:
    paths:
      - 'docker/worker/**'
    branches: [main]

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3
      
      - name: Login to GHCR
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      
      - name: Build and push
        uses: docker/build-push-action@v5
        with:
          context: ./docker/worker
          push: true
          tags: |
            ghcr.io/bittucreator/unosend-worker:latest
            ghcr.io/bittucreator/unosend-worker:${{ github.sha }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

Deploy to Servers

# SSH to each server and update
ssh root@217.217.250.114 "cd /opt/unosend && docker compose pull && docker compose up -d"
ssh root@84.247.139.105 "cd /opt/unosend && docker compose pull && docker compose up -d"

Part 8: Monitoring

Prometheus Metrics

// docker/worker/internal/metrics/prometheus.go
var (
    EmailsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "worker_emails_processed_total",
        Help: "Total emails processed",
    })
    
    EmailsFailed = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "worker_emails_failed_total",
        Help: "Total emails failed",
    })
    
    ProcessingDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "worker_processing_duration_seconds",
        Help:    "Email processing duration",
        Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5},
    })
    
    QueueDepth = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "worker_queue_depth",
        Help: "Current queue depth",
    })
)

Grafana Dashboard

  • Emails processed/minute per region
  • Queue depth per region
  • Processing latency p50/p95/p99
  • Error rate by type
  • Worker health status

Implementation Order

  1. Phase 1: Redis Cluster (Day 1)
    • Set up Redis on both servers
    • Configure authentication
    • Test connectivity from Vercel
  2. Phase 2: API Key Caching (Day 1)
    • Implement Redis cache in Vercel
    • Add cache invalidation
  3. Phase 3: Queue Router (Day 2)
    • Implement region detection
    • Implement queue depth checking
    • Implement smart routing
  4. Phase 4: Docker Worker (Day 2-3)
    • Build Go worker with goroutines
    • Dockerize
    • Test locally
  5. Phase 5: Deployment (Day 3)
    • Push to GHCR
    • Deploy to servers
    • Test end-to-end
  6. Phase 6: Monitoring (Day 4)
    • Add Prometheus metrics
    • Set up Grafana
    • Create alerts

Environment Variables Summary

Vercel

# Redis Cluster
REDIS_ASIA_URL=redis://unosend:xxx@217.217.250.114:6379/0
REDIS_EU_URL=redis://unosend:xxx@84.247.139.105:6379/0
REDIS_US_URL=redis://unosend:xxx@us-server:6379/0

# Queue Config
QUEUE_THRESHOLD=500
API_KEY_CACHE_TTL=300

Docker Worker (per server)

REDIS_URL=redis://unosend:xxx@localhost:6379/0
MTA_HOST=127.0.0.1
MTA_PORT=2025
WORKER_COUNT=32
REGION=asia|eu|us
SUPABASE_URL=https://xxx.supabase.co
SUPABASE_SERVICE_KEY=xxx
METRICS_PORT=9090
HEALTH_PORT=8080

Notes

  • Each server runs: Redis + Worker replicas + KumoMTA
  • Workers connect to local Redis only
  • Vercel routes to correct regional Redis
  • DKIM keys fetched from Supabase (cached in worker)
  • Return-Path rewritten for bounce tracking
  • Graceful shutdown on SIGTERM
  • Health checks for container orchestration