LatteStreamĀ®

Quick Start

Getting Started

Building Custom SDKs

SDKs & Libraries

JavaScript/TypeScript

Node.js / Bun / Deno

Python

In Development

Go

In Development

PHP

In Development

API Reference

WebSocket API

REST API

Webhooks

Authentication

Building Custom LatteStream SDKs

Best practices and implementation patterns for creating LatteStream SDKs in any language

This guide provides comprehensive patterns and best practices for building custom LatteStream SDKs. It's based on the architecture and implementation of the official TypeScript/JavaScript SDK, which has been battle-tested in production.

Table of Contents

  1. Architecture Overview
  2. Connection Management
  3. Authentication Flow
  4. Reconnection Logic
  5. Channel Management
  6. Event System
  7. Message Handling
  8. Performance Optimization
  9. Error Handling
  10. Testing Strategies

Architecture Overview

Recommended Layer Structure

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│         Client API Layer            │  Public interface for developers
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│       Channel Management            │  Subscribe, unsubscribe, event binding
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│       Connection Layer              │  WebSocket lifecycle, reconnection
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│      Discovery Service (optional)   │  Node discovery for public keys
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│        Event Emitter                │  Event distribution system
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│      WebSocket Transport            │  Low-level WebSocket communication
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Core Components

1. Client Class

  • Main entry point for developers
  • Manages global state and configuration
  • Factory for creating channels
  • Global event emitter

2. Connection Class

  • WebSocket lifecycle management
  • Authentication handling
  • Reconnection logic
  • Heartbeat/ping-pong
  • Message routing

3. Channel Classes

  • Base Channel class (abstract)
  • PublicChannel for public channels
  • PrivateChannel for private channels
  • PresenceChannel extends PrivateChannel

4. Authorizer

  • Handles auth endpoint requests
  • Manages channel authorization tokens

5. Discovery Service (if supporting public keys)

  • Node discovery for lspk_* keys
  • Token caching

Connection Management

WebSocket Lifecycle

class Connection: def __init__(self, endpoint, api_key): self.endpoint = endpoint self.api_key = api_key self.ws = None self.state = 'disconnected' self.socket_id = None def connect(self): """Establish WebSocket connection""" self.state = 'connecting' self.ws = create_websocket(self.endpoint) self.ws.on_open = self.handle_open self.ws.on_message = self.handle_message self.ws.on_error = self.handle_error self.ws.on_close = self.handle_close def handle_open(self): """Send authentication immediately""" self.send({ 'api_key': self.api_key }) def handle_message(self, data): """Process incoming messages""" message = json.loads(data) if message['event'] == 'lattestream:connection_established': self.socket_id = message['data']['socket_id'] self.state = 'connected' self.emit('connected') self.resubscribe_all_channels() elif message['event'] == 'lattestream:pong': self.handle_pong() else: self.route_message(message) def handle_error(self, error): self.emit('error', error) def handle_close(self): self.state = 'disconnected' self.emit('disconnected') self.attempt_reconnect()

Connection States

Implement these states for clear connection lifecycle:

type ConnectionState = | 'connecting' // Establishing WebSocket connection | 'connected' // Successfully connected and authenticated | 'disconnected' // Connection lost | 'unavailable' // Temporarily unavailable (retrying) | 'failed'; // Permanently failed (max retries exceeded)

State Transitions:

disconnected → connecting → connected
                   ↓
              unavailable ⇄ connecting
                   ↓
               failed (max retries)

Discovery Service Integration

For public keys (lspk_*), implement node discovery:

class DiscoveryService { async discover(apiKey, cluster) { const endpoint = `https://${cluster}.lattestream.com/discover`; const response = await fetch(`${endpoint}?api_key=${apiKey}`, { method: 'GET', timeout: 10000 }); if (!response.ok) { throw new Error(`Discovery failed: ${response.status}`); } const data = await response.json(); return { nodeId: data.node_id, host: data.host, port: data.port, discoveryToken: data.discovery_token, expiresAt: new Date(data.expires_at) }; } getWebSocketUrl(discovery) { return `wss://${discovery.host}:${discovery.port}?discovery_token=${discovery.discoveryToken}`; } }

Usage:

// 1. Run discovery const discovery = await discoveryService.discover(apiKey, cluster); // 2. Connect to assigned node const wsUrl = discoveryService.getWebSocketUrl(discovery); const connection = new Connection(wsUrl, discovery.discoveryToken);

Authentication Flow

Public Key Authentication

def connect_with_public_key(public_key, cluster): # Step 1: Discovery discovery_data = discover_node(public_key, cluster) # Step 2: Connect to assigned node ws_url = f"wss://{discovery_data['host']}?discovery_token={discovery_data['discovery_token']}" ws = WebSocket(ws_url) # Step 3: Authenticate with discovery token ws.send(json.dumps({ 'api_key': discovery_data['discovery_token'] })) return ws

Private Token Authentication

def connect_with_private_token(private_token, cluster): # Direct connection (no discovery needed) ws_url = f"wss://{cluster}.lattestream.com" ws = WebSocket(ws_url) # Authenticate with private token ws.send(json.dumps({ 'api_key': private_token })) return ws

Channel Authorization

For private/presence channels:

class Authorizer { constructor(authEndpoint, authHeaders = {}) { this.authEndpoint = authEndpoint; this.authHeaders = authHeaders; } async authorize(socketId, channelName) { const response = await fetch(this.authEndpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.authHeaders }, body: JSON.stringify({ socket_id: socketId, channel_name: channelName }) }); if (!response.ok) { throw new Error(`Authorization failed: ${response.status}`); } const data = await response.json(); if (!data.auth) { throw new Error('Invalid auth response: missing auth field'); } return { auth: data.auth, channelData: data.channel_data // For presence channels }; } }

Reconnection Logic

Exponential Backoff with Jitter

Critical for preventing thundering herd problem:

import random import time class ReconnectionStrategy: def __init__(self, base_delay=1000, max_attempts=6, max_gap=30000, backoff_multiplier=2): self.base_delay = base_delay # milliseconds self.max_attempts = max_attempts self.max_gap = max_gap # milliseconds self.backoff_multiplier = backoff_multiplier self.attempt = 0 def get_next_delay(self): """Calculate delay with exponential backoff and jitter""" if self.attempt >= self.max_attempts: return None # Give up # Exponential backoff: base * (multiplier ^ attempt) exponential_delay = self.base_delay * (self.backoff_multiplier ** self.attempt) # Cap at max_gap delay = min(exponential_delay, self.max_gap) # Add jitter (±25%) jitter_range = delay * 0.25 jitter = random.uniform(-jitter_range, jitter_range) delay_with_jitter = delay + jitter self.attempt += 1 return delay_with_jitter / 1000 # Convert to seconds def reset(self): """Reset attempt counter on successful connection""" self.attempt = 0

Usage:

strategy = ReconnectionStrategy() def attempt_reconnect(): delay = strategy.get_next_delay() if delay is None: connection.state = 'failed' emit('failed') return print(f"Reconnecting in {delay:.2f} seconds...") time.sleep(delay) try: connection.connect() strategy.reset() # Reset on success except Exception as e: print(f"Reconnection failed: {e}") attempt_reconnect() # Try again

Auto-Resubscribe Channels

class Connection { constructor() { this.subscribedChannels = new Set(); } handleConnectionEstablished() { this.state = 'connected'; this.emit('connected'); // Auto-resubscribe all channels this.resubscribeAllChannels(); } resubscribeAllChannels() { for (const channel of this.subscribedChannels) { if (!channel.isSubscribed()) { channel.subscribe(this.socketId); } } } handleDisconnected() { this.state = 'disconnected'; // Mark all channels as unsubscribed for (const channel of this.subscribedChannels) { channel.subscribed = false; } this.emit('disconnected'); this.attemptReconnect(); } }

Channel Management

Channel Class Hierarchy

from abc import ABC, abstractmethod class Channel(ABC): def __init__(self, name, connection): self.name = name self.connection = connection self.subscribed = False self.listeners = {} def bind(self, event, callback): """Bind event listener""" if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def unbind(self, event, callback=None): """Unbind event listener(s)""" if callback is None: # Remove all listeners for event self.listeners[event] = [] else: # Remove specific listener self.listeners[event].remove(callback) def emit(self, event, data): """Emit event to listeners""" if event in self.listeners: for callback in self.listeners[event]: try: callback(data) except Exception as e: print(f"Error in event listener: {e}") @abstractmethod def subscribe(self, socket_id): """Subscribe to channel""" pass def unsubscribe(self): """Unsubscribe from channel""" self.connection.send({ 'event': 'lattestream:unsubscribe', 'data': {'channel': self.name} }) self.subscribed = False class PublicChannel(Channel): def subscribe(self, socket_id): self.connection.send({ 'event': 'lattestream:subscribe', 'data': {'channel': self.name} }) class PrivateChannel(Channel): def __init__(self, name, connection, authorizer): super().__init__(name, connection) self.authorizer = authorizer def subscribe(self, socket_id): # Get authorization from server auth_data = self.authorizer.authorize(socket_id, self.name) # Send subscription with auth self.connection.send({ 'event': 'lattestream:subscribe', 'data': { 'channel': self.name, 'auth': auth_data['auth'] } }) def trigger(self, event_name, data): """Send client event""" if not event_name.startswith('client-'): raise ValueError("Client events must be prefixed with 'client-'") if not self.subscribed: return False return self.connection.send({ 'event': event_name, 'channel': self.name, 'data': data }) class PresenceChannel(PrivateChannel): def __init__(self, name, connection, authorizer): super().__init__(name, connection, authorizer) self.members = {} self.my_id = None def subscribe(self, socket_id): # Get authorization with user data auth_data = self.authorizer.authorize(socket_id, self.name) # Send subscription with auth and channel_data self.connection.send({ 'event': 'lattestream:subscribe', 'data': { 'channel': self.name, 'auth': auth_data['auth'], 'channel_data': auth_data.get('channel_data') } }) def handle_subscription_succeeded(self, data): """Initialize member list""" if 'presence' in data: self.members = {} for user_id, user_info in data['presence']['hash'].items(): self.members[user_id] = user_info if 'me' in data['presence']: self.my_id = data['presence']['me']['id'] self.subscribed = True self.emit('lattestream:subscription_succeeded', data) def handle_member_added(self, member): self.members[member['user_id']] = member['user_info'] self.emit('lattestream:member_added', member) def handle_member_removed(self, member): if member['user_id'] in self.members: del self.members[member['user_id']] self.emit('lattestream:member_removed', member)

Event System

Fast Event Emitter

class EventEmitter { constructor() { this.events = new Map(); this.onceEvents = new Map(); } on(event, listener) { if (!this.events.has(event)) { this.events.set(event, new Set()); } this.events.get(event).add(listener); return this; } once(event, listener) { if (!this.onceEvents.has(event)) { this.onceEvents.set(event, new Set()); } this.onceEvents.get(event).add(listener); return this; } off(event, listener) { if (listener) { this.events.get(event)?.delete(listener); this.onceEvents.get(event)?.delete(listener); } else { // Remove all listeners for event this.events.delete(event); this.onceEvents.delete(event); } return this; } emit(event, ...args) { // Regular listeners const listeners = this.events.get(event); if (listeners) { for (const listener of listeners) { try { listener(...args); } catch (error) { console.error('Error in event listener:', error); } } } // Once listeners (then clear) const onceListeners = this.onceEvents.get(event); if (onceListeners) { for (const listener of onceListeners) { try { listener(...args); } catch (error) { console.error('Error in once listener:', error); } } this.onceEvents.delete(event); } return this; } }

Key Features:

  • Use Set for O(1) add/remove
  • Use Map for fast event lookup
  • Error isolation (listener errors don't crash SDK)
  • Support for once listeners
  • Chainable methods

Message Handling

Message Routing

def route_message(self, message): """Route incoming message to appropriate handler""" event = message.get('event') channel = message.get('channel') data = message.get('data') # Protocol messages (no channel) if event == 'lattestream:connection_established': self.handle_connection_established(data) elif event == 'lattestream:pong': self.handle_pong() elif event == 'lattestream:error': self.handle_error(data) # Channel messages elif channel: channel_obj = self.channels.get(channel) if channel_obj: # Internal channel events if event == 'lattestream_internal:subscription_succeeded': channel_obj.handle_subscription_succeeded(data) elif event == 'lattestream_internal:member_added': channel_obj.handle_member_added(data) elif event == 'lattestream_internal:member_removed': channel_obj.handle_member_removed(data) # Custom events else: channel_obj.emit(event, data) # Global events else: self.emit(event, data)

Binary Protocol Support

If implementing binary protocol:

function parseBinaryMessage(buffer) { // [4 bytes: type][4 bytes: length][payload] const view = new DataView(buffer); const type = view.getUint32(0, false); // Big-endian const length = view.getUint32(4, false); const payload = buffer.slice(8, 8 + length); switch (type) { case 0x01: // JSON (UTF-8) const json = new TextDecoder().decode(payload); return JSON.parse(json); case 0x02: // Binary with JSON metadata // Custom implementation break; case 0x03: // Compressed JSON // Decompress and parse break; default: throw new Error(`Unknown binary message type: ${type}`); } }

Performance Optimization

Object Pooling

Reduce garbage collection pressure:

class ObjectPool { constructor(factory, reset, initialSize = 10) { this.factory = factory; this.reset = reset; this.pool = []; // Pre-allocate objects for (let i = 0; i < initialSize; i++) { this.pool.push(factory()); } } acquire() { return this.pool.length > 0 ? this.pool.pop() : this.factory(); } release(obj) { this.reset(obj); this.pool.push(obj); } } // Usage: Pool presence member objects const memberPool = new ObjectPool( () => ({ id: '', info: null }), (member) => { member.id = ''; member.info = null; } );

Message Batching

Batch multiple operations:

class MessageQueue { constructor(sender, batchSize = 10, flushInterval = 16) { this.sender = sender; this.batchSize = batchSize; this.flushInterval = flushInterval; // ~60fps this.queue = []; this.flushTimer = null; } enqueue(message) { this.queue.push(message); if (this.queue.length >= this.batchSize) { this.flush(); // Send immediately } else if (!this.flushTimer) { // Schedule flush this.flushTimer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.queue.length === 0) return; clearTimeout(this.flushTimer); this.flushTimer = null; if (this.queue.length === 1) { this.sender(this.queue[0]); } else { this.sender({ event: 'lattestream:batch', data: { messages: this.queue } }); } this.queue = []; } }

Debounce and Throttle

function debounce(func, wait) { let timeout; return function(...args) { clearTimeout(timeout); timeout = setTimeout(() => func.apply(this, args), wait); }; } function throttle(func, limit) { let inThrottle; return function(...args) { if (!inThrottle) { func.apply(this, args); inThrottle = true; setTimeout(() => inThrottle = false, limit); } }; } // Usage const debouncedReconnect = debounce(() => connection.connect(), 100); const throttledStateChange = throttle((state) => emit('state', state), 100);

Error Handling

Error Types

class LatteStreamError extends Error { constructor(message, code, data) { super(message); this.name = 'LatteStreamError'; this.code = code; this.data = data; } } class ConnectionError extends LatteStreamError { constructor(message, data) { super(message, 'CONNECTION_ERROR', data); this.name = 'ConnectionError'; } } class AuthorizationError extends LatteStreamError { constructor(message, data) { super(message, 'AUTHORIZATION_ERROR', data); this.name = 'AuthorizationError'; } } class SubscriptionError extends LatteStreamError { constructor(message, code, data) { super(message, code, data); this.name = 'SubscriptionError'; } }

Graceful Error Handling

def handle_error_message(self, error_data): """Handle error from server""" code = error_data.get('code') message = error_data.get('message') if code == 4009: # Authorization error error = AuthorizationError(message, error_data) elif code == 4004: # Too many channels error = SubscriptionError(message, 'TOO_MANY_CHANNELS', error_data) else: error = LatteStreamError(message, code, error_data) self.emit('error', error)

Testing Strategies

Unit Tests

describe('Connection', () => { it('should authenticate on connection', async () => { const connection = new Connection('ws://localhost:8080', 'test_key'); const sendSpy = jest.spyOn(connection, 'send'); await connection.connect(); expect(sendSpy).toHaveBeenCalledWith({ api_key: 'test_key' }); }); it('should reconnect with exponential backoff', async () => { const connection = new Connection('ws://localhost:8080', 'test_key'); const delays = []; connection.on('reconnecting', (delay) => delays.push(delay)); // Simulate 3 failed connections for (let i = 0; i < 3; i++) { connection.handleClose(); await sleep(delays[i]); } expect(delays[0]).toBeCloseTo(1000, -2); // ~1s expect(delays[1]).toBeCloseTo(2000, -2); // ~2s expect(delays[2]).toBeCloseTo(4000, -2); // ~4s }); });

Integration Tests

describe('LatteStream Integration', () => { let server, client; beforeAll(async () => { server = await startTestServer(); }); afterAll(async () => { await server.stop(); }); it('should subscribe and receive messages', async () => { client = new LatteStream('test_key', { wsEndpoint: 'ws://localhost:8080' }); const channel = client.subscribe('test-channel'); const messages = []; channel.bind('test-event', (data) => messages.push(data)); await waitForSubscription(channel); // Trigger from server await server.trigger('test-channel', 'test-event', { foo: 'bar' }); await waitFor(() => messages.length > 0); expect(messages[0]).toEqual({ foo: 'bar' }); }); });

Complete Example: Python SDK

Here's a minimal but complete Python SDK implementation:

# lattestream.py import json import websocket import threading import time from typing import Dict, Callable, Optional class LatteStream: def __init__(self, api_key: str, cluster: str = 'us-east-1'): self.api_key = api_key self.endpoint = f'wss://{cluster}.lattestream.com' self.ws = None self.socket_id = None self.channels: Dict[str, Channel] = {} self.listeners: Dict[str, list] = {} self.connected = False def connect(self): self.ws = websocket.WebSocketApp( self.endpoint, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close ) thread = threading.Thread(target=self.ws.run_forever) thread.daemon = True thread.start() def _on_open(self, ws): # Authenticate ws.send(json.dumps({'api_key': self.api_key})) def _on_message(self, ws, message): data = json.loads(message) event = data.get('event') if event == 'lattestream:connection_established': self.socket_id = data['data']['socket_id'] self.connected = True self._emit('connected') elif data.get('channel'): channel = self.channels.get(data['channel']) if channel: channel._handle_message(data) else: self._emit(event, data.get('data')) def _on_error(self, ws, error): self._emit('error', error) def _on_close(self, ws, close_status_code, close_msg): self.connected = False self._emit('disconnected') def subscribe(self, channel_name: str) -> 'Channel': if channel_name in self.channels: return self.channels[channel_name] channel = Channel(channel_name, self) self.channels[channel_name] = channel channel.subscribe() return channel def bind(self, event: str, callback: Callable): if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def _emit(self, event: str, data=None): if event in self.listeners: for callback in self.listeners[event]: callback(data) def _send(self, data: dict): if self.ws: self.ws.send(json.dumps(data)) class Channel: def __init__(self, name: str, client: LatteStream): self.name = name self.client = client self.listeners: Dict[str, list] = {} self.subscribed = False def subscribe(self): self.client._send({ 'event': 'lattestream:subscribe', 'data': {'channel': self.name} }) def bind(self, event: str, callback: Callable): if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def _handle_message(self, message: dict): event = message.get('event') data = message.get('data') if event == 'lattestream_internal:subscription_succeeded': self.subscribed = True if event in self.listeners: for callback in self.listeners[event]: callback(data) # Usage if __name__ == '__main__': client = LatteStream('test_key') client.bind('connected', lambda _: print('Connected!')) channel = client.subscribe('test-channel') channel.bind('test-event', lambda data: print(f'Received: {data}')) client.connect() time.sleep(100) # Keep alive

Checklist for SDK Developers

Core Features

  • WebSocket connection management
  • Authentication (public keys, private tokens, JWT)
  • Discovery service (for public keys)
  • Channel subscription/unsubscription
  • Event binding and emitting
  • Reconnection with exponential backoff
  • Heartbeat (ping/pong)
  • Connection state tracking
  • Error handling

Channel Support

  • Public channels
  • Private channels with authorization
  • Presence channels with member tracking
  • Client events on private/presence channels

Performance

  • Message batching (optional)
  • Object pooling (optional)
  • Efficient event emitter
  • Minimal memory allocations

Developer Experience

  • TypeScript/type definitions (if applicable)
  • Clear error messages
  • Documentation and examples
  • Logging/debugging support
  • Unit tests
  • Integration tests

Related Documentation

Support


Ready to build? Start with the WebSocket API Reference for complete protocol details.