LatteStream®
Quick Start
Getting Started
Building Custom SDKs
SDKs & Libraries
JavaScript/TypeScript
Node.js / Bun / Deno
Python
Go
PHP
API Reference
WebSocket API
REST API
Webhooks
Authentication
Send real-time messages from your Node.js backend to any connected client
The LatteStream Server SDK enables you to trigger events, manage channels, and authenticate users from your Node.js backend applications. Built for high performance with support for both Node.js and Deno runtimes.
npm install @lattestream/server # or yarn add @lattestream/server # or pnpm add @lattestream/server
import LatteStreamServer from '@lattestream/server'; // Initialize server client const lattestream = new LatteStreamServer('your-app-key', 'your-master-key', { cluster: 'us-east-1', }); // Trigger an event await lattestream.trigger('my-channel', 'my-event', { message: 'Hello World!', timestamp: new Date().toISOString(), });
interface LatteStreamServerOptions { wsEndpoint?: string; // Custom WebSocket endpoint httpEndpoint?: string; // Custom HTTP API endpoint cluster?: string; // Geographic cluster useTLS?: boolean; // Use secure connections (default: true) enableLogging?: boolean; // Enable debug logging timeout?: number; // Request timeout in ms (default: 5000) keepAlive?: boolean; // HTTP keep-alive (default: true) agent?: Agent; // Custom HTTP agent }
const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, { cluster: 'eu-west-1', enableLogging: process.env.NODE_ENV === 'development', timeout: 10000, keepAlive: true, } );
// Basic event await lattestream.trigger('news-updates', 'breaking-news', { headline: 'Major Development in Tech Industry', summary: 'A groundbreaking announcement was made today...', category: 'technology', }); // Event with socket exclusion (don't send to the triggering client) await lattestream.trigger( 'chat-room-1', 'new-message', { user: 'john_doe', message: 'Hello everyone!', timestamp: Date.now(), }, { socket_id: 'socket_123', // Exclude this socket from receiving the event } );
// Send to multiple channels at once await lattestream.trigger(['room-1', 'room-2', 'room-3'], 'announcement', { message: 'Server maintenance in 10 minutes', type: 'warning', });
// Send multiple events efficiently await lattestream.triggerBatch([ { channel: 'user-123', name: 'notification', data: { type: 'friend_request', from: 'user-456' }, }, { channel: 'user-123', name: 'badge_update', data: { badges: ['early_adopter', 'contributor'] }, }, { channel: 'activity-feed', name: 'user_action', data: { user: 'user-123', action: 'joined', timestamp: Date.now() }, }, ]);
Create middleware to authorize private and presence channel subscriptions:
import { createChannelAuthMiddleware } from '@lattestream/server'; // Express.js example app.post( '/lattestream/auth', createChannelAuthMiddleware( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, (req) => { // Return user data for the authenticated user return { user_id: req.user.id.toString(), user_info: { name: req.user.name, email: req.user.email, avatar: req.user.avatar, }, }; } ) );
// Manual authorization for more control app.post('/lattestream/auth', async (req, res) => { const { socket_id, channel_name } = req.body; // Verify user has access to this channel const hasAccess = await checkChannelAccess(req.user.id, channel_name); if (!hasAccess) { return res.status(403).json({ error: 'Access denied' }); } // Generate authorization signature const auth = lattestream.authenticate(socket_id, channel_name, { user_id: req.user.id.toString(), user_info: { name: req.user.name, role: req.user.role, }, }); res.json(auth); });
import { validateWebhook } from '@lattestream/server'; app.post('/webhooks/lattestream', (req, res) => { const signature = req.headers['x-lattestream-signature']; const body = req.body; // Validate webhook authenticity if ( !validateWebhook(body, signature, process.env.LATTESTREAM_WEBHOOK_SECRET) ) { return res.status(401).json({ error: 'Invalid signature' }); } // Process webhook events switch (body.event) { case 'channel_occupied': console.log(`Channel ${body.channel} now has users`); break; case 'channel_vacated': console.log(`Channel ${body.channel} is now empty`); break; case 'member_added': console.log(`User ${body.user_id} joined ${body.channel}`); break; case 'member_removed': console.log(`User ${body.user_id} left ${body.channel}`); break; } res.status(200).send('OK'); });
// Get basic channel info const channelInfo = await lattestream.getChannelInfo('presence-chat-room'); console.log('Channel info:', channelInfo); // { occupied: true, user_count: 15, subscription_count: 15 } // Get list of channels const channels = await lattestream.getChannels({ filter_by_prefix: 'presence-', info: 'user_count', }); console.log('Presence channels:', channels);
// Get users in a presence channel const users = await lattestream.getUsers('presence-chat-room'); console.log('Online users:', users); // { // users: [ // { id: "123", info: { name: "John", avatar: "..." } }, // { id: "456", info: { name: "Jane", avatar: "..." } } // ] // }
// Disconnect all connections for a specific user await lattestream.terminateUserConnections('user-123'); console.log('User disconnected from all channels');
import express from 'express'; import LatteStreamServer from '@lattestream/server'; const app = express(); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); // Middleware for LatteStream authentication app.use('/lattestream', express.json()); // Auth endpoint app.post('/lattestream/auth', (req, res) => { // ... authorization logic }); // API endpoint that triggers events app.post('/api/messages', async (req, res) => { const { roomId, message } = req.body; try { await lattestream.trigger(`chat-${roomId}`, 'new-message', { user: req.user.name, message: message, timestamp: Date.now(), }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } });
import Fastify from 'fastify'; import LatteStreamServer from '@lattestream/server'; const fastify = Fastify({ logger: true }); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); // Register authentication route fastify.post('/lattestream/auth', async (request, reply) => { // ... authorization logic }); // API route with real-time updates fastify.post('/api/notifications', async (request, reply) => { const { userId, notification } = request.body; await lattestream.trigger(`user-${userId}`, 'notification', notification); return { success: true }; });
import { Injectable } from '@nestjs/common'; import LatteStreamServer from '@lattestream/server'; @Injectable() export class RealtimeService { private lattestream: LatteStreamServer; constructor() { this.lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); } async notifyUser(userId: string, data: any) { return this.lattestream.trigger(`user-${userId}`, 'notification', data); } async broadcastToRoom(roomId: string, event: string, data: any) { return this.lattestream.trigger(`room-${roomId}`, event, data); } async authenticateChannel( socketId: string, channelName: string, userData: any ) { return this.lattestream.authenticate(socketId, channelName, userData); } }
// Reuse HTTP connections import https from 'https'; const agent = new https.Agent({ keepAlive: true, maxSockets: 50, maxFreeSockets: 10, timeout: 60000, }); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, { agent } );
// Instead of multiple individual triggers // ❌ Inefficient for (const userId of userIds) { await lattestream.trigger(`user-${userId}`, 'update', data); } // ✅ Efficient batch operation const events = userIds.map((userId) => ({ channel: `user-${userId}`, name: 'update', data: data, })); await lattestream.triggerBatch(events);
async function triggerWithRetry(channel, event, data, maxRetries = 3) { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await lattestream.trigger(channel, event, data); } catch (error) { if (attempt === maxRetries) throw error; // Exponential backoff const delay = Math.pow(2, attempt) * 1000; await new Promise((resolve) => setTimeout(resolve, delay)); } } }
import LatteStreamServer from '@lattestream/server'; // Mock the LatteStream server for testing jest.mock('@lattestream/server'); describe('NotificationService', () => { let mockLatteStream; let notificationService; beforeEach(() => { mockLatteStream = { trigger: jest.fn().mockResolvedValue({}), authenticate: jest.fn().mockReturnValue({ auth: 'mock-signature' }), }; LatteStreamServer.mockImplementation(() => mockLatteStream); notificationService = new NotificationService(); }); test('should send notification to user', async () => { await notificationService.notifyUser('user-123', { message: 'Hello' }); expect(mockLatteStream.trigger).toHaveBeenCalledWith( 'user-123', 'notification', { message: 'Hello' } ); }); });
describe('LatteStream Integration', () => { let lattestream; beforeAll(() => { lattestream = new LatteStreamServer( process.env.TEST_APP_KEY, process.env.TEST_MASTER_KEY ); }); test('should trigger events successfully', async () => { const result = await lattestream.trigger('test-channel', 'test-event', { message: 'Integration test', }); expect(result).toBeDefined(); }); test('should get channel information', async () => { const channelInfo = await lattestream.getChannelInfo('test-channel'); expect(channelInfo).toHaveProperty('occupied'); expect(channelInfo).toHaveProperty('subscription_count'); }); });
new LatteStreamServer( appKey: string, masterKey: string, options?: LatteStreamServerOptions )
Event Triggering
trigger(channel: string | string[], event: string, data: any, params?: TriggerParams): Promise<TriggerResult>triggerBatch(events: BatchEvent[]): Promise<BatchResult>Authentication
authenticate(socketId: string, channel: string, presenceData?: PresenceData): AuthResultvalidateWebhook(body: any, signature: string, secret: string): booleanChannel Management
getChannelInfo(channel: string): Promise<ChannelInfo>getChannels(params?: GetChannelsParams): Promise<ChannelsResult>getUsers(channel: string): Promise<UsersResult>terminateUserConnections(userId: string): Promise<void>interface TriggerParams { socket_id?: string; // Exclude this socket from receiving the event } interface BatchEvent { channel: string; name: string; data: any; socket_id?: string; } interface PresenceData { user_id: string; user_info?: Record<string, any>; } interface ChannelInfo { occupied: boolean; user_count?: number; subscription_count?: number; }
Next Steps: Learn how to integrate with React or check out complete examples.