Building Real-Time Distributed Systems: RabbitMQ, WebSockets, and Redis
Master the art of building scalable, real-time distributed systems using RabbitMQ for messaging, WebSockets for live updates, and Redis for caching and pub/sub.
Building Real-Time Distributed Systems
Modern applications demand real-time communication, scalability, and reliability. This comprehensive guide explores how to build distributed systems using three powerful technologies: RabbitMQ, WebSockets, and Redis.
Architecture Overview
System Components
Key Technologies:
- š° RabbitMQ: Message broker for async communication
- š WebSockets: Real-time bidirectional communication
- š“ Redis: In-memory cache and pub/sub
- šļø PostgreSQL: Persistent data storage
- āļø Node.js: Application runtime
RabbitMQ: Reliable Message Queue
Why RabbitMQ?
| Feature | Benefit |
|---|---|
| Message Persistence | Guaranteed delivery |
| Routing Flexibility | Complex message patterns |
| Dead Letter Exchange | Error handling |
| Multiple Queues | Work distribution |
| Acknowledgments | At-least-once delivery |
Setting Up RabbitMQ
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
ports:
- "5672:5672" # AMQP protocol
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secure_password
RABBITMQ_DEFAULT_VHOST: /production
volumes:
- rabbitmq-data:/var/lib/rabbitmq
networks:
- app-network
volumes:
rabbitmq-data:
networks:
app-network:
driver: bridge
Producer: Publishing Messages
import amqp, { Connection, Channel } from 'amqplib';
class RabbitMQProducer {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(url: string): Promise<void> {
try {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Create exchange for routing
await this.channel.assertExchange('events', 'topic', {
durable: true,
});
console.log('ā
RabbitMQ Producer connected');
} catch (error) {
console.error('ā RabbitMQ connection failed:', error);
throw error;
}
}
async publishEvent(
routingKey: string,
data: object,
options?: { priority?: number; expiration?: string }
): Promise<boolean> {
if (!this.channel) {
throw new Error('Channel not initialized');
}
const message = JSON.stringify(data);
const messageOptions = {
persistent: true,
timestamp: Date.now(),
contentType: 'application/json',
...options,
};
try {
const published = this.channel.publish(
'events',
routingKey,
Buffer.from(message),
messageOptions
);
if (published) {
console.log(`š¤ Published to ${routingKey}:`, data);
return true;
} else {
console.warn('ā ļø Message not published - buffer full');
return false;
}
} catch (error) {
console.error('ā Publish error:', error);
throw error;
}
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
// Usage
const producer = new RabbitMQProducer();
await producer.connect('amqp://admin:secure_password@localhost:5672');
// Publish user registration event
await producer.publishEvent('user.registered', {
userId: '12345',
email: 'user@example.com',
timestamp: new Date().toISOString(),
});
// Publish order created event
await producer.publishEvent('order.created', {
orderId: 'ORD-001',
amount: 99.99,
items: 3,
}, { priority: 5 });
Consumer: Processing Messages
class RabbitMQConsumer {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Set prefetch to control concurrent processing
await this.channel.prefetch(10);
console.log('ā
RabbitMQ Consumer connected');
}
async consumeQueue(
queueName: string,
bindingKeys: string[],
handler: (data: any) => Promise<void>
): Promise<void> {
if (!this.channel) {
throw new Error('Channel not initialized');
}
// Assert queue with options
await this.channel.assertQueue(queueName, {
durable: true,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: 'failed',
messageTtl: 86400000, // 24 hours
});
// Bind queue to routing keys
for (const key of bindingKeys) {
await this.channel.bindQueue(queueName, 'events', key);
}
// Consume messages
await this.channel.consume(
queueName,
async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
console.log(`š„ Received message:`, data);
// Process the message
await handler(data);
// Acknowledge successful processing
this.channel!.ack(msg);
console.log('ā
Message processed successfully');
} catch (error) {
console.error('ā Message processing failed:', error);
// Retry logic
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Requeue with retry count
this.channel!.nack(msg, false, false);
await this.publishWithRetry(msg, retryCount);
} else {
// Max retries reached - send to dead letter
this.channel!.nack(msg, false, false);
console.error('š Message sent to dead letter queue');
}
}
},
{ noAck: false }
);
console.log(`š Listening on queue: ${queueName}`);
}
private async publishWithRetry(msg: amqp.Message, retryCount: number) {
const delay = Math.min(1000 * Math.pow(2, retryCount), 30000);
setTimeout(() => {
this.channel!.publish(
'events',
msg.fields.routingKey,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount,
},
}
);
}, delay);
}
}
// Usage
const consumer = new RabbitMQConsumer();
await consumer.connect('amqp://admin:secure_password@localhost:5672');
// Process user events
await consumer.consumeQueue(
'user-events-queue',
['user.registered', 'user.updated', 'user.deleted'],
async (data) => {
// Handle user events
if (data.userId) {
await sendWelcomeEmail(data.email);
await updateUserCache(data.userId);
}
}
);
WebSockets: Real-Time Communication
WebSocket Server with Socket.io
import express from 'express';
import http from 'http';
import { Server as SocketIOServer } from 'socket.io';
import Redis from 'ioredis';
interface ClientData {
userId: string;
rooms: Set<string>;
}
class WebSocketServer {
private app: express.Application;
private server: http.Server;
private io: SocketIOServer;
private redis: Redis;
private redisSub: Redis;
private clients: Map<string, ClientData> = new Map();
constructor(port: number) {
this.app = express();
this.server = http.createServer(this.app);
// Initialize Socket.io with Redis adapter
this.io = new SocketIOServer(this.server, {
cors: {
origin: process.env.CLIENT_URL || 'http://localhost:3000',
credentials: true,
},
transports: ['websocket', 'polling'],
});
// Redis for pub/sub
this.redis = new Redis({
host: 'localhost',
port: 6379,
});
this.redisSub = new Redis({
host: 'localhost',
port: 6379,
});
this.setupSocketHandlers();
this.setupRedisPubSub();
this.server.listen(port, () => {
console.log(`š WebSocket server running on port ${port}`);
});
}
private setupSocketHandlers(): void {
// Authentication middleware
this.io.use(async (socket, next) => {
const token = socket.handshake.auth.token;
try {
const user = await this.verifyToken(token);
socket.data.userId = user.id;
socket.data.username = user.username;
next();
} catch (error) {
next(new Error('Authentication failed'));
}
});
this.io.on('connection', (socket) => {
const userId = socket.data.userId;
console.log(`ā
User ${userId} connected`);
// Store client data
this.clients.set(socket.id, {
userId,
rooms: new Set(),
});
// Join user's personal room
socket.join(`user:${userId}`);
// Handle room joining
socket.on('join:room', async (roomId: string) => {
await this.joinRoom(socket, roomId);
});
// Handle room leaving
socket.on('leave:room', (roomId: string) => {
this.leaveRoom(socket, roomId);
});
// Handle messages
socket.on('message:send', async (data) => {
await this.handleMessage(socket, data);
});
// Handle typing indicator
socket.on('typing:start', (roomId: string) => {
socket.to(roomId).emit('typing:user', {
userId,
username: socket.data.username,
});
});
socket.on('typing:stop', (roomId: string) => {
socket.to(roomId).emit('typing:stop', { userId });
});
// Handle presence
socket.on('presence:update', async (status: string) => {
await this.updatePresence(userId, status);
socket.broadcast.emit('presence:changed', { userId, status });
});
// Handle disconnection
socket.on('disconnect', async () => {
console.log(`ā User ${userId} disconnected`);
await this.handleDisconnect(socket);
this.clients.delete(socket.id);
});
});
}
private async joinRoom(socket: any, roomId: string): Promise<void> {
const clientData = this.clients.get(socket.id);
if (!clientData) return;
// Check permissions
const hasAccess = await this.checkRoomAccess(clientData.userId, roomId);
if (!hasAccess) {
socket.emit('error', { message: 'Access denied to room' });
return;
}
// Join the room
socket.join(roomId);
clientData.rooms.add(roomId);
// Notify others
socket.to(roomId).emit('user:joined', {
userId: clientData.userId,
username: socket.data.username,
});
// Send room history
const messages = await this.getRoomHistory(roomId);
socket.emit('room:history', { roomId, messages });
console.log(`š„ User ${clientData.userId} joined room ${roomId}`);
}
private leaveRoom(socket: any, roomId: string): void {
const clientData = this.clients.get(socket.id);
if (!clientData) return;
socket.leave(roomId);
clientData.rooms.delete(roomId);
socket.to(roomId).emit('user:left', {
userId: clientData.userId,
username: socket.data.username,
});
}
private async handleMessage(socket: any, data: any): Promise<void> {
const { roomId, content, type = 'text' } = data;
const userId = socket.data.userId;
// Validate message
if (!content || content.trim().length === 0) {
socket.emit('error', { message: 'Empty message' });
return;
}
// Create message object
const message = {
id: this.generateMessageId(),
roomId,
userId,
username: socket.data.username,
content,
type,
timestamp: new Date().toISOString(),
};
// Save to database
await this.saveMessage(message);
// Broadcast to room
this.io.to(roomId).emit('message:new', message);
// Publish to Redis for other server instances
await this.redis.publish('messages', JSON.stringify(message));
console.log(`š¬ Message sent in room ${roomId}`);
}
private setupRedisPubSub(): void {
// Subscribe to channels
this.redisSub.subscribe('messages', 'notifications', 'presence');
this.redisSub.on('message', (channel, message) => {
const data = JSON.parse(message);
switch (channel) {
case 'messages':
// Forward to specific room
this.io.to(data.roomId).emit('message:new', data);
break;
case 'notifications':
// Send to specific user
this.io.to(`user:${data.userId}`).emit('notification', data);
break;
case 'presence':
// Broadcast presence update
this.io.emit('presence:changed', data);
break;
}
});
}
private async verifyToken(token: string): Promise<any> {
// Implement JWT verification
// Return user object or throw error
return { id: '12345', username: 'testuser' };
}
private async checkRoomAccess(userId: string, roomId: string): Promise<boolean> {
// Check if user has permission to access room
return true;
}
private async getRoomHistory(roomId: string): Promise<any[]> {
// Fetch last N messages from Redis or database
const messages = await this.redis.lrange(`room:${roomId}:messages`, 0, 49);
return messages.map(msg => JSON.parse(msg));
}
private async saveMessage(message: any): Promise<void> {
// Save to Redis (cache)
await this.redis.lpush(
`room:${message.roomId}:messages`,
JSON.stringify(message)
);
await this.redis.ltrim(`room:${message.roomId}:messages`, 0, 99);
// Save to database (persistent)
// await db.messages.create(message);
}
private async updatePresence(userId: string, status: string): Promise<void> {
await this.redis.setex(`presence:${userId}`, 300, status);
await this.redis.publish('presence', JSON.stringify({ userId, status }));
}
private async handleDisconnect(socket: any): Promise<void> {
const userId = socket.data.userId;
await this.updatePresence(userId, 'offline');
}
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// Broadcast to all connected clients
public broadcast(event: string, data: any): void {
this.io.emit(event, data);
}
// Send to specific user
public sendToUser(userId: string, event: string, data: any): void {
this.io.to(`user:${userId}`).emit(event, data);
}
// Send to specific room
public sendToRoom(roomId: string, event: string, data: any): void {
this.io.to(roomId).emit(event, data);
}
}
// Initialize server
const wsServer = new WebSocketServer(3001);
Client-Side WebSocket Implementation
import io, { Socket } from 'socket.io-client';
class WebSocketClient {
private socket: Socket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
connect(url: string, token: string): void {
this.socket = io(url, {
auth: { token },
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
});
this.socket.on('connect', () => {
console.log('ā
Connected to WebSocket server');
this.reconnectAttempts = 0;
});
this.socket.on('disconnect', (reason) => {
console.log('ā Disconnected:', reason);
if (reason === 'io server disconnect') {
// Server initiated disconnect - reconnect manually
this.socket?.connect();
}
});
this.socket.on('connect_error', (error) => {
console.error('Connection error:', error);
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
this.socket?.disconnect();
}
});
// Handle incoming messages
this.socket.on('message:new', (message) => {
this.handleNewMessage(message);
});
this.socket.on('notification', (notification) => {
this.showNotification(notification);
});
this.socket.on('presence:changed', (data) => {
this.updateUserPresence(data.userId, data.status);
});
this.socket.on('typing:user', (data) => {
this.showTypingIndicator(data.userId, data.username);
});
this.socket.on('error', (error) => {
console.error('Server error:', error.message);
});
}
joinRoom(roomId: string): void {
this.socket?.emit('join:room', roomId);
}
leaveRoom(roomId: string): void {
this.socket?.emit('leave:room', roomId);
}
sendMessage(roomId: string, content: string, type: string = 'text'): void {
this.socket?.emit('message:send', { roomId, content, type });
}
startTyping(roomId: string): void {
this.socket?.emit('typing:start', roomId);
}
stopTyping(roomId: string): void {
this.socket?.emit('typing:stop', roomId);
}
updateStatus(status: 'online' | 'away' | 'busy'): void {
this.socket?.emit('presence:update', status);
}
disconnect(): void {
this.socket?.disconnect();
}
private handleNewMessage(message: any): void {
// Update UI with new message
console.log('New message:', message);
}
private showNotification(notification: any): void {
// Display notification to user
console.log('Notification:', notification);
}
private updateUserPresence(userId: string, status: string): void {
// Update user status in UI
console.log(`User ${userId} is now ${status}`);
}
private showTypingIndicator(userId: string, username: string): void {
// Show typing indicator in UI
console.log(`${username} is typing...`);
}
}
// Usage
const client = new WebSocketClient();
client.connect('http://localhost:3001', 'user-auth-token');
// Join a chat room
client.joinRoom('room-123');
// Send a message
client.sendMessage('room-123', 'Hello, everyone!');
// Show typing indicator
client.startTyping('room-123');
setTimeout(() => client.stopTyping('room-123'), 2000);
Redis: Caching and Pub/Sub
Redis Setup and Configuration
import Redis, { Redis as RedisClient } from 'ioredis';
import { promisify } from 'util';
class RedisService {
private client: RedisClient;
private subscriber: RedisClient;
private publisher: RedisClient;
constructor(config?: { host: string; port: number; password?: string }) {
const defaultConfig = {
host: 'localhost',
port: 6379,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
maxRetriesPerRequest: 3,
};
this.client = new Redis({ ...defaultConfig, ...config });
this.subscriber = new Redis({ ...defaultConfig, ...config });
this.publisher = new Redis({ ...defaultConfig, ...config });
this.client.on('connect', () => console.log('ā
Redis connected'));
this.client.on('error', (err) => console.error('ā Redis error:', err));
}
// ========== Caching Methods ==========
async get<T>(key: string): Promise<T | null> {
const value = await this.client.get(key);
return value ? JSON.parse(value) : null;
}
async set(
key: string,
value: any,
ttl?: number
): Promise<void> {
const serialized = JSON.stringify(value);
if (ttl) {
await this.client.setex(key, ttl, serialized);
} else {
await this.client.set(key, serialized);
}
}
async delete(key: string): Promise<number> {
return await this.client.del(key);
}
async exists(key: string): Promise<boolean> {
return (await this.client.exists(key)) === 1;
}
async increment(key: string, by: number = 1): Promise<number> {
return await this.client.incrby(key, by);
}
async expire(key: string, seconds: number): Promise<boolean> {
return (await this.client.expire(key, seconds)) === 1;
}
// ========== Hash Operations ==========
async hset(key: string, field: string, value: any): Promise<void> {
await this.client.hset(key, field, JSON.stringify(value));
}
async hget<T>(key: string, field: string): Promise<T | null> {
const value = await this.client.hget(key, field);
return value ? JSON.parse(value) : null;
}
async hgetall<T>(key: string): Promise<Record<string, T>> {
const data = await this.client.hgetall(key);
const result: Record<string, T> = {};
for (const [field, value] of Object.entries(data)) {
result[field] = JSON.parse(value);
}
return result;
}
async hdel(key: string, ...fields: string[]): Promise<number> {
return await this.client.hdel(key, ...fields);
}
// ========== List Operations ==========
async lpush(key: string, ...values: any[]): Promise<number> {
const serialized = values.map(v => JSON.stringify(v));
return await this.client.lpush(key, ...serialized);
}
async rpush(key: string, ...values: any[]): Promise<number> {
const serialized = values.map(v => JSON.stringify(v));
return await this.client.rpush(key, ...serialized);
}
async lrange<T>(key: string, start: number, stop: number): Promise<T[]> {
const values = await this.client.lrange(key, start, stop);
return values.map(v => JSON.parse(v));
}
async ltrim(key: string, start: number, stop: number): Promise<void> {
await this.client.ltrim(key, start, stop);
}
// ========== Set Operations ==========
async sadd(key: string, ...members: string[]): Promise<number> {
return await this.client.sadd(key, ...members);
}
async smembers(key: string): Promise<string[]> {
return await this.client.smembers(key);
}
async sismember(key: string, member: string): Promise<boolean> {
return (await this.client.sismember(key, member)) === 1;
}
async srem(key: string, ...members: string[]): Promise<number> {
return await this.client.srem(key, ...members);
}
// ========== Sorted Set Operations ==========
async zadd(
key: string,
score: number,
member: string
): Promise<number> {
return await this.client.zadd(key, score, member);
}
async zrange(key: string, start: number, stop: number): Promise<string[]> {
return await this.client.zrange(key, start, stop);
}
async zrevrange(
key: string,
start: number,
stop: number
): Promise<string[]> {
return await this.client.zrevrange(key, start, stop);
}
async zscore(key: string, member: string): Promise<number | null> {
const score = await this.client.zscore(key, member);
return score !== null ? parseFloat(score) : null;
}
// ========== Pub/Sub Methods ==========
async publish(channel: string, message: any): Promise<number> {
const serialized = JSON.stringify(message);
return await this.publisher.publish(channel, serialized);
}
subscribe(channel: string, handler: (message: any) => void): void {
this.subscriber.subscribe(channel);
this.subscriber.on('message', (ch, msg) => {
if (ch === channel) {
try {
const parsed = JSON.parse(msg);
handler(parsed);
} catch (error) {
console.error('Failed to parse message:', error);
}
}
});
}
unsubscribe(channel: string): void {
this.subscriber.unsubscribe(channel);
}
// ========== Advanced Patterns ==========
async getOrSet<T>(
key: string,
fetcher: () => Promise<T>,
ttl: number = 3600
): Promise<T> {
// Try to get from cache
const cached = await this.get<T>(key);
if (cached !== null) {
return cached;
}
// Cache miss - fetch data
const data = await fetcher();
await this.set(key, data, ttl);
return data;
}
async invalidatePattern(pattern: string): Promise<number> {
const keys = await this.client.keys(pattern);
if (keys.length === 0) return 0;
return await this.client.del(...keys);
}
async rateLimit(
key: string,
limit: number,
window: number
): Promise<{ allowed: boolean; remaining: number }> {
const current = await this.increment(key);
if (current === 1) {
await this.expire(key, window);
}
const remaining = Math.max(0, limit - current);
const allowed = current <= limit;
return { allowed, remaining };
}
// ========== Distributed Lock ==========
async acquireLock(
lockKey: string,
ttl: number = 10
): Promise<string | null> {
const lockValue = `lock_${Date.now()}_${Math.random()}`;
const result = await this.client.set(
lockKey,
lockValue,
'EX',
ttl,
'NX'
);
return result === 'OK' ? lockValue : null;
}
async releaseLock(lockKey: string, lockValue: string): Promise<boolean> {
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
const result = await this.client.eval(script, 1, lockKey, lockValue);
return result === 1;
}
async disconnect(): Promise<void> {
await this.client.quit();
await this.subscriber.quit();
await this.publisher.quit();
}
}
// Usage Examples
const redis = new RedisService();
// Caching
await redis.set('user:123', { name: 'John', email: 'john@example.com' }, 3600);
const user = await redis.get('user:123');
// Rate limiting
const { allowed, remaining } = await redis.rateLimit('api:user:123', 100, 60);
if (!allowed) {
throw new Error('Rate limit exceeded');
}
// Distributed lock
const lockValue = await redis.acquireLock('process:order:456', 30);
if (lockValue) {
try {
// Critical section
await processOrder('456');
} finally {
await redis.releaseLock('process:order:456', lockValue);
}
}
// Pub/Sub
redis.subscribe('notifications', (message) => {
console.log('Notification received:', message);
});
await redis.publish('notifications', {
type: 'order_completed',
orderId: '456',
userId: '123',
});
Complete System Integration
API Server with All Components
import express from 'express';
import { RabbitMQProducer } from './rabbitmq';
import { RedisService } from './redis';
const app = express();
app.use(express.json());
const rabbitmq = new RabbitMQProducer();
const redis = new RedisService();
// Initialize connections
await rabbitmq.connect('amqp://admin:password@localhost:5672');
// API Endpoint: Create Order
app.post('/api/orders', async (req, res) => {
const { userId, items, total } = req.body;
// Check rate limit
const { allowed } = await redis.rateLimit(`orders:${userId}`, 10, 60);
if (!allowed) {
return res.status(429).json({ error: 'Too many requests' });
}
try {
// Generate order ID
const orderId = `ORD-${Date.now()}`;
// Cache order data
await redis.set(`order:${orderId}`, {
userId,
items,
total,
status: 'pending',
createdAt: new Date().toISOString(),
}, 3600);
// Publish to RabbitMQ for async processing
await rabbitmq.publishEvent('order.created', {
orderId,
userId,
items,
total,
});
// Publish real-time update via Redis
await redis.publish('order-updates', {
type: 'order_created',
orderId,
userId,
});
res.json({
success: true,
orderId,
status: 'processing',
});
} catch (error) {
console.error('Order creation failed:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
// API Endpoint: Get Order Status
app.get('/api/orders/:orderId', async (req, res) => {
const { orderId } = req.params;
try {
// Try cache first
const order = await redis.get(`order:${orderId}`);
if (order) {
res.json(order);
} else {
// Fetch from database if not in cache
// const order = await db.orders.findById(orderId);
res.status(404).json({ error: 'Order not found' });
}
} catch (error) {
console.error('Error fetching order:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
app.listen(3000, () => {
console.log('š API server running on port 3000');
});
Performance Optimization
Best Practices
| Component | Optimization | Impact |
|---|---|---|
| RabbitMQ | Message batching | 50% better throughput |
| WebSockets | Binary protocol | 30% less bandwidth |
| Redis | Pipeline commands | 5x faster operations |
| Caching | Cache warming | 90% cache hit rate |
| Connection Pooling | Reuse connections | 40% less overhead |
Monitoring and Metrics
// Prometheus metrics
import { Counter, Histogram, Gauge } from 'prom-client';
const messageCounter = new Counter({
name: 'rabbitmq_messages_total',
help: 'Total messages processed',
labelNames: ['queue', 'status'],
});
const wsConnections = new Gauge({
name: 'websocket_connections',
help: 'Active WebSocket connections',
});
const cacheHitRate = new Counter({
name: 'redis_cache_hits_total',
help: 'Cache hit/miss counter',
labelNames: ['status'],
});
// Track metrics
messageCounter.inc({ queue: 'orders', status: 'success' });
wsConnections.inc();
cacheHitRate.inc({ status: 'hit' });
Production Deployment
High Availability Setup
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: websocket-server
spec:
replicas: 3
selector:
matchLabels:
app: websocket
template:
metadata:
labels:
app: websocket
spec:
containers:
- name: websocket
image: myapp/websocket:latest
ports:
- containerPort: 3001
env:
- name: REDIS_URL
value: "redis://redis:6379"
- name: RABBITMQ_URL
valueFrom:
secretKeyRef:
name: rabbitmq-credentials
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3001
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3001
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: websocket-service
spec:
type: LoadBalancer
selector:
app: websocket
ports:
- port: 80
targetPort: 3001
Conclusion
Building distributed systems with RabbitMQ, WebSockets, and Redis enables:
ā
Scalability - Handle millions of concurrent users
ā
Real-time Communication - Instant updates and notifications
ā
Reliability - Message persistence and guaranteed delivery
ā
Performance - Sub-millisecond cache access
ā
Flexibility - Microservices-ready architecture
Key Takeaways
- RabbitMQ for reliable async messaging between services
- WebSockets for bidirectional real-time communication
- Redis for caching, pub/sub, and distributed coordination
- Proper monitoring for production reliability
- High availability through clustering and replication
Need help building a distributed system? Our team specializes in scalable architectures. Contact us for consulting!
Official Resources
Further Resources: