← Back to Articles

Building Real-Time Distributed Systems: RabbitMQ, WebSockets, and Redis

Master the art of building scalable, real-time distributed systems using RabbitMQ for messaging, WebSockets for live updates, and Redis for caching and pub/sub.

By Urban M.•
Distributed SystemsRabbitMQWebSocketsRedisReal-Time
Building Real-Time Distributed Systems: RabbitMQ, WebSockets, and Redis

Building Real-Time Distributed Systems

Modern applications demand real-time communication, scalability, and reliability. This comprehensive guide explores how to build distributed systems using three powerful technologies: RabbitMQ, WebSockets, and Redis.

Distributed Architecture


Architecture Overview

System Components

Rendering diagram...

Key Technologies:

  • 🐰 RabbitMQ: Message broker for async communication
  • šŸ”Œ WebSockets: Real-time bidirectional communication
  • šŸ”“ Redis: In-memory cache and pub/sub
  • šŸ—„ļø PostgreSQL: Persistent data storage
  • āš™ļø Node.js: Application runtime

RabbitMQ: Reliable Message Queue

Why RabbitMQ?

FeatureBenefit
Message PersistenceGuaranteed delivery
Routing FlexibilityComplex message patterns
Dead Letter ExchangeError handling
Multiple QueuesWork distribution
AcknowledgmentsAt-least-once delivery

Setting Up RabbitMQ

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    container_name: rabbitmq
    ports:
      - "5672:5672"      # AMQP protocol
      - "15672:15672"    # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secure_password
      RABBITMQ_DEFAULT_VHOST: /production
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
    networks:
      - app-network

volumes:
  rabbitmq-data:

networks:
  app-network:
    driver: bridge

Producer: Publishing Messages

import amqp, { Connection, Channel } from 'amqplib';

class RabbitMQProducer {
  private connection: Connection | null = null;
  private channel: Channel | null = null;

  async connect(url: string): Promise<void> {
    try {
      this.connection = await amqp.connect(url);
      this.channel = await this.connection.createChannel();

      // Create exchange for routing
      await this.channel.assertExchange('events', 'topic', {
        durable: true,
      });

      console.log('āœ… RabbitMQ Producer connected');
    } catch (error) {
      console.error('āŒ RabbitMQ connection failed:', error);
      throw error;
    }
  }

  async publishEvent(
    routingKey: string,
    data: object,
    options?: { priority?: number; expiration?: string }
  ): Promise<boolean> {
    if (!this.channel) {
      throw new Error('Channel not initialized');
    }

    const message = JSON.stringify(data);
    const messageOptions = {
      persistent: true,
      timestamp: Date.now(),
      contentType: 'application/json',
      ...options,
    };

    try {
      const published = this.channel.publish(
        'events',
        routingKey,
        Buffer.from(message),
        messageOptions
      );

      if (published) {
        console.log(`šŸ“¤ Published to ${routingKey}:`, data);
        return true;
      } else {
        console.warn('āš ļø Message not published - buffer full');
        return false;
      }
    } catch (error) {
      console.error('āŒ Publish error:', error);
      throw error;
    }
  }

  async close(): Promise<void> {
    await this.channel?.close();
    await this.connection?.close();
  }
}

// Usage
const producer = new RabbitMQProducer();
await producer.connect('amqp://admin:secure_password@localhost:5672');

// Publish user registration event
await producer.publishEvent('user.registered', {
  userId: '12345',
  email: 'user@example.com',
  timestamp: new Date().toISOString(),
});

// Publish order created event
await producer.publishEvent('order.created', {
  orderId: 'ORD-001',
  amount: 99.99,
  items: 3,
}, { priority: 5 });

Consumer: Processing Messages

class RabbitMQConsumer {
  private connection: Connection | null = null;
  private channel: Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Set prefetch to control concurrent processing
    await this.channel.prefetch(10);

    console.log('āœ… RabbitMQ Consumer connected');
  }

  async consumeQueue(
    queueName: string,
    bindingKeys: string[],
    handler: (data: any) => Promise<void>
  ): Promise<void> {
    if (!this.channel) {
      throw new Error('Channel not initialized');
    }

    // Assert queue with options
    await this.channel.assertQueue(queueName, {
      durable: true,
      deadLetterExchange: 'dlx',
      deadLetterRoutingKey: 'failed',
      messageTtl: 86400000, // 24 hours
    });

    // Bind queue to routing keys
    for (const key of bindingKeys) {
      await this.channel.bindQueue(queueName, 'events', key);
    }

    // Consume messages
    await this.channel.consume(
      queueName,
      async (msg) => {
        if (!msg) return;

        try {
          const data = JSON.parse(msg.content.toString());
          console.log(`šŸ“„ Received message:`, data);

          // Process the message
          await handler(data);

          // Acknowledge successful processing
          this.channel!.ack(msg);
          console.log('āœ… Message processed successfully');
        } catch (error) {
          console.error('āŒ Message processing failed:', error);

          // Retry logic
          const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
          
          if (retryCount < 3) {
            // Requeue with retry count
            this.channel!.nack(msg, false, false);
            await this.publishWithRetry(msg, retryCount);
          } else {
            // Max retries reached - send to dead letter
            this.channel!.nack(msg, false, false);
            console.error('šŸ’€ Message sent to dead letter queue');
          }
        }
      },
      { noAck: false }
    );

    console.log(`šŸ‘‚ Listening on queue: ${queueName}`);
  }

  private async publishWithRetry(msg: amqp.Message, retryCount: number) {
    const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
    
    setTimeout(() => {
      this.channel!.publish(
        'events',
        msg.fields.routingKey,
        msg.content,
        {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            'x-retry-count': retryCount,
          },
        }
      );
    }, delay);
  }
}

// Usage
const consumer = new RabbitMQConsumer();
await consumer.connect('amqp://admin:secure_password@localhost:5672');

// Process user events
await consumer.consumeQueue(
  'user-events-queue',
  ['user.registered', 'user.updated', 'user.deleted'],
  async (data) => {
    // Handle user events
    if (data.userId) {
      await sendWelcomeEmail(data.email);
      await updateUserCache(data.userId);
    }
  }
);

WebSockets: Real-Time Communication

WebSocket Server with Socket.io

import express from 'express';
import http from 'http';
import { Server as SocketIOServer } from 'socket.io';
import Redis from 'ioredis';

interface ClientData {
  userId: string;
  rooms: Set<string>;
}

class WebSocketServer {
  private app: express.Application;
  private server: http.Server;
  private io: SocketIOServer;
  private redis: Redis;
  private redisSub: Redis;
  private clients: Map<string, ClientData> = new Map();

  constructor(port: number) {
    this.app = express();
    this.server = http.createServer(this.app);
    
    // Initialize Socket.io with Redis adapter
    this.io = new SocketIOServer(this.server, {
      cors: {
        origin: process.env.CLIENT_URL || 'http://localhost:3000',
        credentials: true,
      },
      transports: ['websocket', 'polling'],
    });

    // Redis for pub/sub
    this.redis = new Redis({
      host: 'localhost',
      port: 6379,
    });
    
    this.redisSub = new Redis({
      host: 'localhost',
      port: 6379,
    });

    this.setupSocketHandlers();
    this.setupRedisPubSub();
    this.server.listen(port, () => {
      console.log(`šŸš€ WebSocket server running on port ${port}`);
    });
  }

  private setupSocketHandlers(): void {
    // Authentication middleware
    this.io.use(async (socket, next) => {
      const token = socket.handshake.auth.token;
      
      try {
        const user = await this.verifyToken(token);
        socket.data.userId = user.id;
        socket.data.username = user.username;
        next();
      } catch (error) {
        next(new Error('Authentication failed'));
      }
    });

    this.io.on('connection', (socket) => {
      const userId = socket.data.userId;
      console.log(`āœ… User ${userId} connected`);

      // Store client data
      this.clients.set(socket.id, {
        userId,
        rooms: new Set(),
      });

      // Join user's personal room
      socket.join(`user:${userId}`);

      // Handle room joining
      socket.on('join:room', async (roomId: string) => {
        await this.joinRoom(socket, roomId);
      });

      // Handle room leaving
      socket.on('leave:room', (roomId: string) => {
        this.leaveRoom(socket, roomId);
      });

      // Handle messages
      socket.on('message:send', async (data) => {
        await this.handleMessage(socket, data);
      });

      // Handle typing indicator
      socket.on('typing:start', (roomId: string) => {
        socket.to(roomId).emit('typing:user', {
          userId,
          username: socket.data.username,
        });
      });

      socket.on('typing:stop', (roomId: string) => {
        socket.to(roomId).emit('typing:stop', { userId });
      });

      // Handle presence
      socket.on('presence:update', async (status: string) => {
        await this.updatePresence(userId, status);
        socket.broadcast.emit('presence:changed', { userId, status });
      });

      // Handle disconnection
      socket.on('disconnect', async () => {
        console.log(`āŒ User ${userId} disconnected`);
        await this.handleDisconnect(socket);
        this.clients.delete(socket.id);
      });
    });
  }

  private async joinRoom(socket: any, roomId: string): Promise<void> {
    const clientData = this.clients.get(socket.id);
    if (!clientData) return;

    // Check permissions
    const hasAccess = await this.checkRoomAccess(clientData.userId, roomId);
    if (!hasAccess) {
      socket.emit('error', { message: 'Access denied to room' });
      return;
    }

    // Join the room
    socket.join(roomId);
    clientData.rooms.add(roomId);

    // Notify others
    socket.to(roomId).emit('user:joined', {
      userId: clientData.userId,
      username: socket.data.username,
    });

    // Send room history
    const messages = await this.getRoomHistory(roomId);
    socket.emit('room:history', { roomId, messages });

    console.log(`šŸ‘„ User ${clientData.userId} joined room ${roomId}`);
  }

  private leaveRoom(socket: any, roomId: string): void {
    const clientData = this.clients.get(socket.id);
    if (!clientData) return;

    socket.leave(roomId);
    clientData.rooms.delete(roomId);

    socket.to(roomId).emit('user:left', {
      userId: clientData.userId,
      username: socket.data.username,
    });
  }

  private async handleMessage(socket: any, data: any): Promise<void> {
    const { roomId, content, type = 'text' } = data;
    const userId = socket.data.userId;

    // Validate message
    if (!content || content.trim().length === 0) {
      socket.emit('error', { message: 'Empty message' });
      return;
    }

    // Create message object
    const message = {
      id: this.generateMessageId(),
      roomId,
      userId,
      username: socket.data.username,
      content,
      type,
      timestamp: new Date().toISOString(),
    };

    // Save to database
    await this.saveMessage(message);

    // Broadcast to room
    this.io.to(roomId).emit('message:new', message);

    // Publish to Redis for other server instances
    await this.redis.publish('messages', JSON.stringify(message));

    console.log(`šŸ’¬ Message sent in room ${roomId}`);
  }

  private setupRedisPubSub(): void {
    // Subscribe to channels
    this.redisSub.subscribe('messages', 'notifications', 'presence');

    this.redisSub.on('message', (channel, message) => {
      const data = JSON.parse(message);

      switch (channel) {
        case 'messages':
          // Forward to specific room
          this.io.to(data.roomId).emit('message:new', data);
          break;

        case 'notifications':
          // Send to specific user
          this.io.to(`user:${data.userId}`).emit('notification', data);
          break;

        case 'presence':
          // Broadcast presence update
          this.io.emit('presence:changed', data);
          break;
      }
    });
  }

  private async verifyToken(token: string): Promise<any> {
    // Implement JWT verification
    // Return user object or throw error
    return { id: '12345', username: 'testuser' };
  }

  private async checkRoomAccess(userId: string, roomId: string): Promise<boolean> {
    // Check if user has permission to access room
    return true;
  }

  private async getRoomHistory(roomId: string): Promise<any[]> {
    // Fetch last N messages from Redis or database
    const messages = await this.redis.lrange(`room:${roomId}:messages`, 0, 49);
    return messages.map(msg => JSON.parse(msg));
  }

  private async saveMessage(message: any): Promise<void> {
    // Save to Redis (cache)
    await this.redis.lpush(
      `room:${message.roomId}:messages`,
      JSON.stringify(message)
    );
    await this.redis.ltrim(`room:${message.roomId}:messages`, 0, 99);

    // Save to database (persistent)
    // await db.messages.create(message);
  }

  private async updatePresence(userId: string, status: string): Promise<void> {
    await this.redis.setex(`presence:${userId}`, 300, status);
    await this.redis.publish('presence', JSON.stringify({ userId, status }));
  }

  private async handleDisconnect(socket: any): Promise<void> {
    const userId = socket.data.userId;
    await this.updatePresence(userId, 'offline');
  }

  private generateMessageId(): string {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  // Broadcast to all connected clients
  public broadcast(event: string, data: any): void {
    this.io.emit(event, data);
  }

  // Send to specific user
  public sendToUser(userId: string, event: string, data: any): void {
    this.io.to(`user:${userId}`).emit(event, data);
  }

  // Send to specific room
  public sendToRoom(roomId: string, event: string, data: any): void {
    this.io.to(roomId).emit(event, data);
  }
}

// Initialize server
const wsServer = new WebSocketServer(3001);

Client-Side WebSocket Implementation

import io, { Socket } from 'socket.io-client';

class WebSocketClient {
  private socket: Socket | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  connect(url: string, token: string): void {
    this.socket = io(url, {
      auth: { token },
      transports: ['websocket', 'polling'],
      reconnection: true,
      reconnectionDelay: 1000,
      reconnectionDelayMax: 5000,
    });

    this.socket.on('connect', () => {
      console.log('āœ… Connected to WebSocket server');
      this.reconnectAttempts = 0;
    });

    this.socket.on('disconnect', (reason) => {
      console.log('āŒ Disconnected:', reason);
      
      if (reason === 'io server disconnect') {
        // Server initiated disconnect - reconnect manually
        this.socket?.connect();
      }
    });

    this.socket.on('connect_error', (error) => {
      console.error('Connection error:', error);
      this.reconnectAttempts++;

      if (this.reconnectAttempts >= this.maxReconnectAttempts) {
        console.error('Max reconnection attempts reached');
        this.socket?.disconnect();
      }
    });

    // Handle incoming messages
    this.socket.on('message:new', (message) => {
      this.handleNewMessage(message);
    });

    this.socket.on('notification', (notification) => {
      this.showNotification(notification);
    });

    this.socket.on('presence:changed', (data) => {
      this.updateUserPresence(data.userId, data.status);
    });

    this.socket.on('typing:user', (data) => {
      this.showTypingIndicator(data.userId, data.username);
    });

    this.socket.on('error', (error) => {
      console.error('Server error:', error.message);
    });
  }

  joinRoom(roomId: string): void {
    this.socket?.emit('join:room', roomId);
  }

  leaveRoom(roomId: string): void {
    this.socket?.emit('leave:room', roomId);
  }

  sendMessage(roomId: string, content: string, type: string = 'text'): void {
    this.socket?.emit('message:send', { roomId, content, type });
  }

  startTyping(roomId: string): void {
    this.socket?.emit('typing:start', roomId);
  }

  stopTyping(roomId: string): void {
    this.socket?.emit('typing:stop', roomId);
  }

  updateStatus(status: 'online' | 'away' | 'busy'): void {
    this.socket?.emit('presence:update', status);
  }

  disconnect(): void {
    this.socket?.disconnect();
  }

  private handleNewMessage(message: any): void {
    // Update UI with new message
    console.log('New message:', message);
  }

  private showNotification(notification: any): void {
    // Display notification to user
    console.log('Notification:', notification);
  }

  private updateUserPresence(userId: string, status: string): void {
    // Update user status in UI
    console.log(`User ${userId} is now ${status}`);
  }

  private showTypingIndicator(userId: string, username: string): void {
    // Show typing indicator in UI
    console.log(`${username} is typing...`);
  }
}

// Usage
const client = new WebSocketClient();
client.connect('http://localhost:3001', 'user-auth-token');

// Join a chat room
client.joinRoom('room-123');

// Send a message
client.sendMessage('room-123', 'Hello, everyone!');

// Show typing indicator
client.startTyping('room-123');
setTimeout(() => client.stopTyping('room-123'), 2000);

Redis: Caching and Pub/Sub

Redis Setup and Configuration

import Redis, { Redis as RedisClient } from 'ioredis';
import { promisify } from 'util';

class RedisService {
  private client: RedisClient;
  private subscriber: RedisClient;
  private publisher: RedisClient;

  constructor(config?: { host: string; port: number; password?: string }) {
    const defaultConfig = {
      host: 'localhost',
      port: 6379,
      retryStrategy: (times: number) => {
        const delay = Math.min(times * 50, 2000);
        return delay;
      },
      maxRetriesPerRequest: 3,
    };

    this.client = new Redis({ ...defaultConfig, ...config });
    this.subscriber = new Redis({ ...defaultConfig, ...config });
    this.publisher = new Redis({ ...defaultConfig, ...config });

    this.client.on('connect', () => console.log('āœ… Redis connected'));
    this.client.on('error', (err) => console.error('āŒ Redis error:', err));
  }

  // ========== Caching Methods ==========

  async get<T>(key: string): Promise<T | null> {
    const value = await this.client.get(key);
    return value ? JSON.parse(value) : null;
  }

  async set(
    key: string,
    value: any,
    ttl?: number
  ): Promise<void> {
    const serialized = JSON.stringify(value);
    
    if (ttl) {
      await this.client.setex(key, ttl, serialized);
    } else {
      await this.client.set(key, serialized);
    }
  }

  async delete(key: string): Promise<number> {
    return await this.client.del(key);
  }

  async exists(key: string): Promise<boolean> {
    return (await this.client.exists(key)) === 1;
  }

  async increment(key: string, by: number = 1): Promise<number> {
    return await this.client.incrby(key, by);
  }

  async expire(key: string, seconds: number): Promise<boolean> {
    return (await this.client.expire(key, seconds)) === 1;
  }

  // ========== Hash Operations ==========

  async hset(key: string, field: string, value: any): Promise<void> {
    await this.client.hset(key, field, JSON.stringify(value));
  }

  async hget<T>(key: string, field: string): Promise<T | null> {
    const value = await this.client.hget(key, field);
    return value ? JSON.parse(value) : null;
  }

  async hgetall<T>(key: string): Promise<Record<string, T>> {
    const data = await this.client.hgetall(key);
    const result: Record<string, T> = {};
    
    for (const [field, value] of Object.entries(data)) {
      result[field] = JSON.parse(value);
    }
    
    return result;
  }

  async hdel(key: string, ...fields: string[]): Promise<number> {
    return await this.client.hdel(key, ...fields);
  }

  // ========== List Operations ==========

  async lpush(key: string, ...values: any[]): Promise<number> {
    const serialized = values.map(v => JSON.stringify(v));
    return await this.client.lpush(key, ...serialized);
  }

  async rpush(key: string, ...values: any[]): Promise<number> {
    const serialized = values.map(v => JSON.stringify(v));
    return await this.client.rpush(key, ...serialized);
  }

  async lrange<T>(key: string, start: number, stop: number): Promise<T[]> {
    const values = await this.client.lrange(key, start, stop);
    return values.map(v => JSON.parse(v));
  }

  async ltrim(key: string, start: number, stop: number): Promise<void> {
    await this.client.ltrim(key, start, stop);
  }

  // ========== Set Operations ==========

  async sadd(key: string, ...members: string[]): Promise<number> {
    return await this.client.sadd(key, ...members);
  }

  async smembers(key: string): Promise<string[]> {
    return await this.client.smembers(key);
  }

  async sismember(key: string, member: string): Promise<boolean> {
    return (await this.client.sismember(key, member)) === 1;
  }

  async srem(key: string, ...members: string[]): Promise<number> {
    return await this.client.srem(key, ...members);
  }

  // ========== Sorted Set Operations ==========

  async zadd(
    key: string,
    score: number,
    member: string
  ): Promise<number> {
    return await this.client.zadd(key, score, member);
  }

  async zrange(key: string, start: number, stop: number): Promise<string[]> {
    return await this.client.zrange(key, start, stop);
  }

  async zrevrange(
    key: string,
    start: number,
    stop: number
  ): Promise<string[]> {
    return await this.client.zrevrange(key, start, stop);
  }

  async zscore(key: string, member: string): Promise<number | null> {
    const score = await this.client.zscore(key, member);
    return score !== null ? parseFloat(score) : null;
  }

  // ========== Pub/Sub Methods ==========

  async publish(channel: string, message: any): Promise<number> {
    const serialized = JSON.stringify(message);
    return await this.publisher.publish(channel, serialized);
  }

  subscribe(channel: string, handler: (message: any) => void): void {
    this.subscriber.subscribe(channel);
    
    this.subscriber.on('message', (ch, msg) => {
      if (ch === channel) {
        try {
          const parsed = JSON.parse(msg);
          handler(parsed);
        } catch (error) {
          console.error('Failed to parse message:', error);
        }
      }
    });
  }

  unsubscribe(channel: string): void {
    this.subscriber.unsubscribe(channel);
  }

  // ========== Advanced Patterns ==========

  async getOrSet<T>(
    key: string,
    fetcher: () => Promise<T>,
    ttl: number = 3600
  ): Promise<T> {
    // Try to get from cache
    const cached = await this.get<T>(key);
    if (cached !== null) {
      return cached;
    }

    // Cache miss - fetch data
    const data = await fetcher();
    await this.set(key, data, ttl);
    return data;
  }

  async invalidatePattern(pattern: string): Promise<number> {
    const keys = await this.client.keys(pattern);
    if (keys.length === 0) return 0;
    return await this.client.del(...keys);
  }

  async rateLimit(
    key: string,
    limit: number,
    window: number
  ): Promise<{ allowed: boolean; remaining: number }> {
    const current = await this.increment(key);
    
    if (current === 1) {
      await this.expire(key, window);
    }

    const remaining = Math.max(0, limit - current);
    const allowed = current <= limit;

    return { allowed, remaining };
  }

  // ========== Distributed Lock ==========

  async acquireLock(
    lockKey: string,
    ttl: number = 10
  ): Promise<string | null> {
    const lockValue = `lock_${Date.now()}_${Math.random()}`;
    const result = await this.client.set(
      lockKey,
      lockValue,
      'EX',
      ttl,
      'NX'
    );
    
    return result === 'OK' ? lockValue : null;
  }

  async releaseLock(lockKey: string, lockValue: string): Promise<boolean> {
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    
    const result = await this.client.eval(script, 1, lockKey, lockValue);
    return result === 1;
  }

  async disconnect(): Promise<void> {
    await this.client.quit();
    await this.subscriber.quit();
    await this.publisher.quit();
  }
}

// Usage Examples
const redis = new RedisService();

// Caching
await redis.set('user:123', { name: 'John', email: 'john@example.com' }, 3600);
const user = await redis.get('user:123');

// Rate limiting
const { allowed, remaining } = await redis.rateLimit('api:user:123', 100, 60);
if (!allowed) {
  throw new Error('Rate limit exceeded');
}

// Distributed lock
const lockValue = await redis.acquireLock('process:order:456', 30);
if (lockValue) {
  try {
    // Critical section
    await processOrder('456');
  } finally {
    await redis.releaseLock('process:order:456', lockValue);
  }
}

// Pub/Sub
redis.subscribe('notifications', (message) => {
  console.log('Notification received:', message);
});

await redis.publish('notifications', {
  type: 'order_completed',
  orderId: '456',
  userId: '123',
});

Complete System Integration

API Server with All Components

import express from 'express';
import { RabbitMQProducer } from './rabbitmq';
import { RedisService } from './redis';

const app = express();
app.use(express.json());

const rabbitmq = new RabbitMQProducer();
const redis = new RedisService();

// Initialize connections
await rabbitmq.connect('amqp://admin:password@localhost:5672');

// API Endpoint: Create Order
app.post('/api/orders', async (req, res) => {
  const { userId, items, total } = req.body;

  // Check rate limit
  const { allowed } = await redis.rateLimit(`orders:${userId}`, 10, 60);
  if (!allowed) {
    return res.status(429).json({ error: 'Too many requests' });
  }

  try {
    // Generate order ID
    const orderId = `ORD-${Date.now()}`;

    // Cache order data
    await redis.set(`order:${orderId}`, {
      userId,
      items,
      total,
      status: 'pending',
      createdAt: new Date().toISOString(),
    }, 3600);

    // Publish to RabbitMQ for async processing
    await rabbitmq.publishEvent('order.created', {
      orderId,
      userId,
      items,
      total,
    });

    // Publish real-time update via Redis
    await redis.publish('order-updates', {
      type: 'order_created',
      orderId,
      userId,
    });

    res.json({
      success: true,
      orderId,
      status: 'processing',
    });
  } catch (error) {
    console.error('Order creation failed:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

// API Endpoint: Get Order Status
app.get('/api/orders/:orderId', async (req, res) => {
  const { orderId } = req.params;

  try {
    // Try cache first
    const order = await redis.get(`order:${orderId}`);
    
    if (order) {
      res.json(order);
    } else {
      // Fetch from database if not in cache
      // const order = await db.orders.findById(orderId);
      res.status(404).json({ error: 'Order not found' });
    }
  } catch (error) {
    console.error('Error fetching order:', error);
    res.status(500).json({ error: 'Internal server error' });
  }
});

app.listen(3000, () => {
  console.log('šŸš€ API server running on port 3000');
});

Performance Optimization

Best Practices

ComponentOptimizationImpact
RabbitMQMessage batching50% better throughput
WebSocketsBinary protocol30% less bandwidth
RedisPipeline commands5x faster operations
CachingCache warming90% cache hit rate
Connection PoolingReuse connections40% less overhead

Monitoring and Metrics

// Prometheus metrics
import { Counter, Histogram, Gauge } from 'prom-client';

const messageCounter = new Counter({
  name: 'rabbitmq_messages_total',
  help: 'Total messages processed',
  labelNames: ['queue', 'status'],
});

const wsConnections = new Gauge({
  name: 'websocket_connections',
  help: 'Active WebSocket connections',
});

const cacheHitRate = new Counter({
  name: 'redis_cache_hits_total',
  help: 'Cache hit/miss counter',
  labelNames: ['status'],
});

// Track metrics
messageCounter.inc({ queue: 'orders', status: 'success' });
wsConnections.inc();
cacheHitRate.inc({ status: 'hit' });

Production Deployment

High Availability Setup

# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: websocket-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: websocket
  template:
    metadata:
      labels:
        app: websocket
    spec:
      containers:
      - name: websocket
        image: myapp/websocket:latest
        ports:
        - containerPort: 3001
        env:
        - name: REDIS_URL
          value: "redis://redis:6379"
        - name: RABBITMQ_URL
          valueFrom:
            secretKeyRef:
              name: rabbitmq-credentials
              key: url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3001
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 3001
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: websocket-service
spec:
  type: LoadBalancer
  selector:
    app: websocket
  ports:
  - port: 80
    targetPort: 3001

Conclusion

Building distributed systems with RabbitMQ, WebSockets, and Redis enables:

āœ… Scalability - Handle millions of concurrent users
āœ… Real-time Communication - Instant updates and notifications
āœ… Reliability - Message persistence and guaranteed delivery
āœ… Performance - Sub-millisecond cache access
āœ… Flexibility - Microservices-ready architecture

Key Takeaways

  1. RabbitMQ for reliable async messaging between services
  2. WebSockets for bidirectional real-time communication
  3. Redis for caching, pub/sub, and distributed coordination
  4. Proper monitoring for production reliability
  5. High availability through clustering and replication

Need help building a distributed system? Our team specializes in scalable architectures. Contact us for consulting!

Official Resources

Further Resources: