LatteStream®

Quick Start

Getting Started

Building Custom SDKs

SDKs & Libraries

JavaScript/TypeScript

Node.js / Bun / Deno

Python

In Development

Go

In Development

PHP

In Development

API Reference

WebSocket API

REST API

Webhooks

Authentication

Server SDK - Node.js

Send real-time messages from your Node.js backend to any connected client

The LatteStream Server SDK enables you to trigger events, manage channels, and authenticate users from your Node.js backend applications. Built for high performance with support for both Node.js and Deno runtimes.

Quick Start

Installation

npm install @lattestream/server # or yarn add @lattestream/server # or pnpm add @lattestream/server

Basic Usage

import LatteStreamServer from '@lattestream/server'; // Initialize server client const lattestream = new LatteStreamServer('your-app-key', 'your-master-key', { cluster: 'us-east-1', }); // Trigger an event await lattestream.trigger('my-channel', 'my-event', { message: 'Hello World!', timestamp: new Date().toISOString(), });

Features

  • TypeScript Support - Full TypeScript definitions included
  • High Performance - Optimized for high-throughput messaging
  • Batch Operations - Send multiple events efficiently
  • Secure Authentication - Generate auth signatures for private channels
  • Channel Management - Get channel info and user presence
  • Multi-Runtime - Works with Node.js and Deno
  • Framework Agnostic - Integrate with Express, Fastify, NestJS, etc.

Configuration

Constructor Options

interface LatteStreamServerOptions { wsEndpoint?: string; // Custom WebSocket endpoint httpEndpoint?: string; // Custom HTTP API endpoint cluster?: string; // Geographic cluster useTLS?: boolean; // Use secure connections (default: true) enableLogging?: boolean; // Enable debug logging timeout?: number; // Request timeout in ms (default: 5000) keepAlive?: boolean; // HTTP keep-alive (default: true) agent?: Agent; // Custom HTTP agent }

Example Configuration

const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, { cluster: 'eu-west-1', enableLogging: process.env.NODE_ENV === 'development', timeout: 10000, keepAlive: true, } );

Triggering Events

Single Event

// Basic event await lattestream.trigger('news-updates', 'breaking-news', { headline: 'Major Development in Tech Industry', summary: 'A groundbreaking announcement was made today...', category: 'technology', }); // Event with socket exclusion (don't send to the triggering client) await lattestream.trigger( 'chat-room-1', 'new-message', { user: 'john_doe', message: 'Hello everyone!', timestamp: Date.now(), }, { socket_id: 'socket_123', // Exclude this socket from receiving the event } );

Multiple Channels

// Send to multiple channels at once await lattestream.trigger(['room-1', 'room-2', 'room-3'], 'announcement', { message: 'Server maintenance in 10 minutes', type: 'warning', });

Batch Events

// Send multiple events efficiently await lattestream.triggerBatch([ { channel: 'user-123', name: 'notification', data: { type: 'friend_request', from: 'user-456' }, }, { channel: 'user-123', name: 'badge_update', data: { badges: ['early_adopter', 'contributor'] }, }, { channel: 'activity-feed', name: 'user_action', data: { user: 'user-123', action: 'joined', timestamp: Date.now() }, }, ]);

Authentication

Channel Authorization Middleware

Create middleware to authorize private and presence channel subscriptions:

import { createChannelAuthMiddleware } from '@lattestream/server'; // Express.js example app.post( '/lattestream/auth', createChannelAuthMiddleware( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, (req) => { // Return user data for the authenticated user return { user_id: req.user.id.toString(), user_info: { name: req.user.name, email: req.user.email, avatar: req.user.avatar, }, }; } ) );

Custom Authorization

// Manual authorization for more control app.post('/lattestream/auth', async (req, res) => { const { socket_id, channel_name } = req.body; // Verify user has access to this channel const hasAccess = await checkChannelAccess(req.user.id, channel_name); if (!hasAccess) { return res.status(403).json({ error: 'Access denied' }); } // Generate authorization signature const auth = lattestream.authenticate(socket_id, channel_name, { user_id: req.user.id.toString(), user_info: { name: req.user.name, role: req.user.role, }, }); res.json(auth); });

Webhook Validation

import { validateWebhook } from '@lattestream/server'; app.post('/webhooks/lattestream', (req, res) => { const signature = req.headers['x-lattestream-signature']; const body = req.body; // Validate webhook authenticity if ( !validateWebhook(body, signature, process.env.LATTESTREAM_WEBHOOK_SECRET) ) { return res.status(401).json({ error: 'Invalid signature' }); } // Process webhook events switch (body.event) { case 'channel_occupied': console.log(`Channel ${body.channel} now has users`); break; case 'channel_vacated': console.log(`Channel ${body.channel} is now empty`); break; case 'member_added': console.log(`User ${body.user_id} joined ${body.channel}`); break; case 'member_removed': console.log(`User ${body.user_id} left ${body.channel}`); break; } res.status(200).send('OK'); });

Channel Management

Get Channel Information

// Get basic channel info const channelInfo = await lattestream.getChannelInfo('presence-chat-room'); console.log('Channel info:', channelInfo); // { occupied: true, user_count: 15, subscription_count: 15 } // Get list of channels const channels = await lattestream.getChannels({ filter_by_prefix: 'presence-', info: 'user_count', }); console.log('Presence channels:', channels);

Presence Channel Users

// Get users in a presence channel const users = await lattestream.getUsers('presence-chat-room'); console.log('Online users:', users); // { // users: [ // { id: "123", info: { name: "John", avatar: "..." } }, // { id: "456", info: { name: "Jane", avatar: "..." } } // ] // }

Terminate User Connections

// Disconnect all connections for a specific user await lattestream.terminateUserConnections('user-123'); console.log('User disconnected from all channels');

Framework Integrations

Express.js

import express from 'express'; import LatteStreamServer from '@lattestream/server'; const app = express(); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); // Middleware for LatteStream authentication app.use('/lattestream', express.json()); // Auth endpoint app.post('/lattestream/auth', (req, res) => { // ... authorization logic }); // API endpoint that triggers events app.post('/api/messages', async (req, res) => { const { roomId, message } = req.body; try { await lattestream.trigger(`chat-${roomId}`, 'new-message', { user: req.user.name, message: message, timestamp: Date.now(), }); res.json({ success: true }); } catch (error) { res.status(500).json({ error: error.message }); } });

Fastify

import Fastify from 'fastify'; import LatteStreamServer from '@lattestream/server'; const fastify = Fastify({ logger: true }); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); // Register authentication route fastify.post('/lattestream/auth', async (request, reply) => { // ... authorization logic }); // API route with real-time updates fastify.post('/api/notifications', async (request, reply) => { const { userId, notification } = request.body; await lattestream.trigger(`user-${userId}`, 'notification', notification); return { success: true }; });

NestJS

import { Injectable } from '@nestjs/common'; import LatteStreamServer from '@lattestream/server'; @Injectable() export class RealtimeService { private lattestream: LatteStreamServer; constructor() { this.lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY ); } async notifyUser(userId: string, data: any) { return this.lattestream.trigger(`user-${userId}`, 'notification', data); } async broadcastToRoom(roomId: string, event: string, data: any) { return this.lattestream.trigger(`room-${roomId}`, event, data); } async authenticateChannel( socketId: string, channelName: string, userData: any ) { return this.lattestream.authenticate(socketId, channelName, userData); } }

Performance Optimization

Connection Pooling

// Reuse HTTP connections import https from 'https'; const agent = new https.Agent({ keepAlive: true, maxSockets: 50, maxFreeSockets: 10, timeout: 60000, }); const lattestream = new LatteStreamServer( process.env.LATTESTREAM_APP_KEY, process.env.LATTESTREAM_MASTER_KEY, { agent } );

Batch Operations

// Instead of multiple individual triggers // ❌ Inefficient for (const userId of userIds) { await lattestream.trigger(`user-${userId}`, 'update', data); } // ✅ Efficient batch operation const events = userIds.map((userId) => ({ channel: `user-${userId}`, name: 'update', data: data, })); await lattestream.triggerBatch(events);

Error Handling with Retries

async function triggerWithRetry(channel, event, data, maxRetries = 3) { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await lattestream.trigger(channel, event, data); } catch (error) { if (attempt === maxRetries) throw error; // Exponential backoff const delay = Math.pow(2, attempt) * 1000; await new Promise((resolve) => setTimeout(resolve, delay)); } } }

Testing

Unit Testing

import LatteStreamServer from '@lattestream/server'; // Mock the LatteStream server for testing jest.mock('@lattestream/server'); describe('NotificationService', () => { let mockLatteStream; let notificationService; beforeEach(() => { mockLatteStream = { trigger: jest.fn().mockResolvedValue({}), authenticate: jest.fn().mockReturnValue({ auth: 'mock-signature' }), }; LatteStreamServer.mockImplementation(() => mockLatteStream); notificationService = new NotificationService(); }); test('should send notification to user', async () => { await notificationService.notifyUser('user-123', { message: 'Hello' }); expect(mockLatteStream.trigger).toHaveBeenCalledWith( 'user-123', 'notification', { message: 'Hello' } ); }); });

Integration Testing

describe('LatteStream Integration', () => { let lattestream; beforeAll(() => { lattestream = new LatteStreamServer( process.env.TEST_APP_KEY, process.env.TEST_MASTER_KEY ); }); test('should trigger events successfully', async () => { const result = await lattestream.trigger('test-channel', 'test-event', { message: 'Integration test', }); expect(result).toBeDefined(); }); test('should get channel information', async () => { const channelInfo = await lattestream.getChannelInfo('test-channel'); expect(channelInfo).toHaveProperty('occupied'); expect(channelInfo).toHaveProperty('subscription_count'); }); });

API Reference

LatteStreamServer Class

Constructor

new LatteStreamServer( appKey: string, masterKey: string, options?: LatteStreamServerOptions )

Methods

Event Triggering

  • trigger(channel: string | string[], event: string, data: any, params?: TriggerParams): Promise<TriggerResult>
  • triggerBatch(events: BatchEvent[]): Promise<BatchResult>

Authentication

  • authenticate(socketId: string, channel: string, presenceData?: PresenceData): AuthResult
  • validateWebhook(body: any, signature: string, secret: string): boolean

Channel Management

  • getChannelInfo(channel: string): Promise<ChannelInfo>
  • getChannels(params?: GetChannelsParams): Promise<ChannelsResult>
  • getUsers(channel: string): Promise<UsersResult>
  • terminateUserConnections(userId: string): Promise<void>

Type Definitions

interface TriggerParams { socket_id?: string; // Exclude this socket from receiving the event } interface BatchEvent { channel: string; name: string; data: any; socket_id?: string; } interface PresenceData { user_id: string; user_info?: Record<string, any>; } interface ChannelInfo { occupied: boolean; user_count?: number; subscription_count?: number; }

Related Documentation

Support


Next Steps: Learn how to integrate with React or check out complete examples.