LatteStreamĀ®
Quick Start
Getting Started
Building Custom SDKs
SDKs & Libraries
JavaScript/TypeScript
Node.js / Bun / Deno
Python
Go
PHP
API Reference
WebSocket API
REST API
Webhooks
Authentication
Best practices and implementation patterns for creating LatteStream SDKs in any language
This guide provides comprehensive patterns and best practices for building custom LatteStream SDKs. It's based on the architecture and implementation of the official TypeScript/JavaScript SDK, which has been battle-tested in production.
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Client API Layer ā Public interface for developers
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā Channel Management ā Subscribe, unsubscribe, event binding
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā Connection Layer ā WebSocket lifecycle, reconnection
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā Discovery Service (optional) ā Node discovery for public keys
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā Event Emitter ā Event distribution system
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¤
ā WebSocket Transport ā Low-level WebSocket communication
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
1. Client Class
2. Connection Class
3. Channel Classes
Channel class (abstract)PublicChannel for public channelsPrivateChannel for private channelsPresenceChannel extends PrivateChannel4. Authorizer
5. Discovery Service (if supporting public keys)
lspk_* keysclass Connection: def __init__(self, endpoint, api_key): self.endpoint = endpoint self.api_key = api_key self.ws = None self.state = 'disconnected' self.socket_id = None def connect(self): """Establish WebSocket connection""" self.state = 'connecting' self.ws = create_websocket(self.endpoint) self.ws.on_open = self.handle_open self.ws.on_message = self.handle_message self.ws.on_error = self.handle_error self.ws.on_close = self.handle_close def handle_open(self): """Send authentication immediately""" self.send({ 'api_key': self.api_key }) def handle_message(self, data): """Process incoming messages""" message = json.loads(data) if message['event'] == 'lattestream:connection_established': self.socket_id = message['data']['socket_id'] self.state = 'connected' self.emit('connected') self.resubscribe_all_channels() elif message['event'] == 'lattestream:pong': self.handle_pong() else: self.route_message(message) def handle_error(self, error): self.emit('error', error) def handle_close(self): self.state = 'disconnected' self.emit('disconnected') self.attempt_reconnect()
Implement these states for clear connection lifecycle:
type ConnectionState = | 'connecting' // Establishing WebSocket connection | 'connected' // Successfully connected and authenticated | 'disconnected' // Connection lost | 'unavailable' // Temporarily unavailable (retrying) | 'failed'; // Permanently failed (max retries exceeded)
State Transitions:
disconnected ā connecting ā connected
ā
unavailable ā connecting
ā
failed (max retries)
For public keys (lspk_*), implement node discovery:
class DiscoveryService { async discover(apiKey, cluster) { const endpoint = `https://${cluster}.lattestream.com/discover`; const response = await fetch(`${endpoint}?api_key=${apiKey}`, { method: 'GET', timeout: 10000 }); if (!response.ok) { throw new Error(`Discovery failed: ${response.status}`); } const data = await response.json(); return { nodeId: data.node_id, host: data.host, port: data.port, discoveryToken: data.discovery_token, expiresAt: new Date(data.expires_at) }; } getWebSocketUrl(discovery) { return `wss://${discovery.host}:${discovery.port}?discovery_token=${discovery.discoveryToken}`; } }
Usage:
// 1. Run discovery const discovery = await discoveryService.discover(apiKey, cluster); // 2. Connect to assigned node const wsUrl = discoveryService.getWebSocketUrl(discovery); const connection = new Connection(wsUrl, discovery.discoveryToken);
def connect_with_public_key(public_key, cluster): # Step 1: Discovery discovery_data = discover_node(public_key, cluster) # Step 2: Connect to assigned node ws_url = f"wss://{discovery_data['host']}?discovery_token={discovery_data['discovery_token']}" ws = WebSocket(ws_url) # Step 3: Authenticate with discovery token ws.send(json.dumps({ 'api_key': discovery_data['discovery_token'] })) return ws
def connect_with_private_token(private_token, cluster): # Direct connection (no discovery needed) ws_url = f"wss://{cluster}.lattestream.com" ws = WebSocket(ws_url) # Authenticate with private token ws.send(json.dumps({ 'api_key': private_token })) return ws
For private/presence channels:
class Authorizer { constructor(authEndpoint, authHeaders = {}) { this.authEndpoint = authEndpoint; this.authHeaders = authHeaders; } async authorize(socketId, channelName) { const response = await fetch(this.authEndpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.authHeaders }, body: JSON.stringify({ socket_id: socketId, channel_name: channelName }) }); if (!response.ok) { throw new Error(`Authorization failed: ${response.status}`); } const data = await response.json(); if (!data.auth) { throw new Error('Invalid auth response: missing auth field'); } return { auth: data.auth, channelData: data.channel_data // For presence channels }; } }
Critical for preventing thundering herd problem:
import random import time class ReconnectionStrategy: def __init__(self, base_delay=1000, max_attempts=6, max_gap=30000, backoff_multiplier=2): self.base_delay = base_delay # milliseconds self.max_attempts = max_attempts self.max_gap = max_gap # milliseconds self.backoff_multiplier = backoff_multiplier self.attempt = 0 def get_next_delay(self): """Calculate delay with exponential backoff and jitter""" if self.attempt >= self.max_attempts: return None # Give up # Exponential backoff: base * (multiplier ^ attempt) exponential_delay = self.base_delay * (self.backoff_multiplier ** self.attempt) # Cap at max_gap delay = min(exponential_delay, self.max_gap) # Add jitter (±25%) jitter_range = delay * 0.25 jitter = random.uniform(-jitter_range, jitter_range) delay_with_jitter = delay + jitter self.attempt += 1 return delay_with_jitter / 1000 # Convert to seconds def reset(self): """Reset attempt counter on successful connection""" self.attempt = 0
Usage:
strategy = ReconnectionStrategy() def attempt_reconnect(): delay = strategy.get_next_delay() if delay is None: connection.state = 'failed' emit('failed') return print(f"Reconnecting in {delay:.2f} seconds...") time.sleep(delay) try: connection.connect() strategy.reset() # Reset on success except Exception as e: print(f"Reconnection failed: {e}") attempt_reconnect() # Try again
class Connection { constructor() { this.subscribedChannels = new Set(); } handleConnectionEstablished() { this.state = 'connected'; this.emit('connected'); // Auto-resubscribe all channels this.resubscribeAllChannels(); } resubscribeAllChannels() { for (const channel of this.subscribedChannels) { if (!channel.isSubscribed()) { channel.subscribe(this.socketId); } } } handleDisconnected() { this.state = 'disconnected'; // Mark all channels as unsubscribed for (const channel of this.subscribedChannels) { channel.subscribed = false; } this.emit('disconnected'); this.attemptReconnect(); } }
from abc import ABC, abstractmethod class Channel(ABC): def __init__(self, name, connection): self.name = name self.connection = connection self.subscribed = False self.listeners = {} def bind(self, event, callback): """Bind event listener""" if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def unbind(self, event, callback=None): """Unbind event listener(s)""" if callback is None: # Remove all listeners for event self.listeners[event] = [] else: # Remove specific listener self.listeners[event].remove(callback) def emit(self, event, data): """Emit event to listeners""" if event in self.listeners: for callback in self.listeners[event]: try: callback(data) except Exception as e: print(f"Error in event listener: {e}") @abstractmethod def subscribe(self, socket_id): """Subscribe to channel""" pass def unsubscribe(self): """Unsubscribe from channel""" self.connection.send({ 'event': 'lattestream:unsubscribe', 'data': {'channel': self.name} }) self.subscribed = False class PublicChannel(Channel): def subscribe(self, socket_id): self.connection.send({ 'event': 'lattestream:subscribe', 'data': {'channel': self.name} }) class PrivateChannel(Channel): def __init__(self, name, connection, authorizer): super().__init__(name, connection) self.authorizer = authorizer def subscribe(self, socket_id): # Get authorization from server auth_data = self.authorizer.authorize(socket_id, self.name) # Send subscription with auth self.connection.send({ 'event': 'lattestream:subscribe', 'data': { 'channel': self.name, 'auth': auth_data['auth'] } }) def trigger(self, event_name, data): """Send client event""" if not event_name.startswith('client-'): raise ValueError("Client events must be prefixed with 'client-'") if not self.subscribed: return False return self.connection.send({ 'event': event_name, 'channel': self.name, 'data': data }) class PresenceChannel(PrivateChannel): def __init__(self, name, connection, authorizer): super().__init__(name, connection, authorizer) self.members = {} self.my_id = None def subscribe(self, socket_id): # Get authorization with user data auth_data = self.authorizer.authorize(socket_id, self.name) # Send subscription with auth and channel_data self.connection.send({ 'event': 'lattestream:subscribe', 'data': { 'channel': self.name, 'auth': auth_data['auth'], 'channel_data': auth_data.get('channel_data') } }) def handle_subscription_succeeded(self, data): """Initialize member list""" if 'presence' in data: self.members = {} for user_id, user_info in data['presence']['hash'].items(): self.members[user_id] = user_info if 'me' in data['presence']: self.my_id = data['presence']['me']['id'] self.subscribed = True self.emit('lattestream:subscription_succeeded', data) def handle_member_added(self, member): self.members[member['user_id']] = member['user_info'] self.emit('lattestream:member_added', member) def handle_member_removed(self, member): if member['user_id'] in self.members: del self.members[member['user_id']] self.emit('lattestream:member_removed', member)
class EventEmitter { constructor() { this.events = new Map(); this.onceEvents = new Map(); } on(event, listener) { if (!this.events.has(event)) { this.events.set(event, new Set()); } this.events.get(event).add(listener); return this; } once(event, listener) { if (!this.onceEvents.has(event)) { this.onceEvents.set(event, new Set()); } this.onceEvents.get(event).add(listener); return this; } off(event, listener) { if (listener) { this.events.get(event)?.delete(listener); this.onceEvents.get(event)?.delete(listener); } else { // Remove all listeners for event this.events.delete(event); this.onceEvents.delete(event); } return this; } emit(event, ...args) { // Regular listeners const listeners = this.events.get(event); if (listeners) { for (const listener of listeners) { try { listener(...args); } catch (error) { console.error('Error in event listener:', error); } } } // Once listeners (then clear) const onceListeners = this.onceEvents.get(event); if (onceListeners) { for (const listener of onceListeners) { try { listener(...args); } catch (error) { console.error('Error in once listener:', error); } } this.onceEvents.delete(event); } return this; } }
Key Features:
Set for O(1) add/removeMap for fast event lookuponce listenersdef route_message(self, message): """Route incoming message to appropriate handler""" event = message.get('event') channel = message.get('channel') data = message.get('data') # Protocol messages (no channel) if event == 'lattestream:connection_established': self.handle_connection_established(data) elif event == 'lattestream:pong': self.handle_pong() elif event == 'lattestream:error': self.handle_error(data) # Channel messages elif channel: channel_obj = self.channels.get(channel) if channel_obj: # Internal channel events if event == 'lattestream_internal:subscription_succeeded': channel_obj.handle_subscription_succeeded(data) elif event == 'lattestream_internal:member_added': channel_obj.handle_member_added(data) elif event == 'lattestream_internal:member_removed': channel_obj.handle_member_removed(data) # Custom events else: channel_obj.emit(event, data) # Global events else: self.emit(event, data)
If implementing binary protocol:
function parseBinaryMessage(buffer) { // [4 bytes: type][4 bytes: length][payload] const view = new DataView(buffer); const type = view.getUint32(0, false); // Big-endian const length = view.getUint32(4, false); const payload = buffer.slice(8, 8 + length); switch (type) { case 0x01: // JSON (UTF-8) const json = new TextDecoder().decode(payload); return JSON.parse(json); case 0x02: // Binary with JSON metadata // Custom implementation break; case 0x03: // Compressed JSON // Decompress and parse break; default: throw new Error(`Unknown binary message type: ${type}`); } }
Reduce garbage collection pressure:
class ObjectPool { constructor(factory, reset, initialSize = 10) { this.factory = factory; this.reset = reset; this.pool = []; // Pre-allocate objects for (let i = 0; i < initialSize; i++) { this.pool.push(factory()); } } acquire() { return this.pool.length > 0 ? this.pool.pop() : this.factory(); } release(obj) { this.reset(obj); this.pool.push(obj); } } // Usage: Pool presence member objects const memberPool = new ObjectPool( () => ({ id: '', info: null }), (member) => { member.id = ''; member.info = null; } );
Batch multiple operations:
class MessageQueue { constructor(sender, batchSize = 10, flushInterval = 16) { this.sender = sender; this.batchSize = batchSize; this.flushInterval = flushInterval; // ~60fps this.queue = []; this.flushTimer = null; } enqueue(message) { this.queue.push(message); if (this.queue.length >= this.batchSize) { this.flush(); // Send immediately } else if (!this.flushTimer) { // Schedule flush this.flushTimer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.queue.length === 0) return; clearTimeout(this.flushTimer); this.flushTimer = null; if (this.queue.length === 1) { this.sender(this.queue[0]); } else { this.sender({ event: 'lattestream:batch', data: { messages: this.queue } }); } this.queue = []; } }
function debounce(func, wait) { let timeout; return function(...args) { clearTimeout(timeout); timeout = setTimeout(() => func.apply(this, args), wait); }; } function throttle(func, limit) { let inThrottle; return function(...args) { if (!inThrottle) { func.apply(this, args); inThrottle = true; setTimeout(() => inThrottle = false, limit); } }; } // Usage const debouncedReconnect = debounce(() => connection.connect(), 100); const throttledStateChange = throttle((state) => emit('state', state), 100);
class LatteStreamError extends Error { constructor(message, code, data) { super(message); this.name = 'LatteStreamError'; this.code = code; this.data = data; } } class ConnectionError extends LatteStreamError { constructor(message, data) { super(message, 'CONNECTION_ERROR', data); this.name = 'ConnectionError'; } } class AuthorizationError extends LatteStreamError { constructor(message, data) { super(message, 'AUTHORIZATION_ERROR', data); this.name = 'AuthorizationError'; } } class SubscriptionError extends LatteStreamError { constructor(message, code, data) { super(message, code, data); this.name = 'SubscriptionError'; } }
def handle_error_message(self, error_data): """Handle error from server""" code = error_data.get('code') message = error_data.get('message') if code == 4009: # Authorization error error = AuthorizationError(message, error_data) elif code == 4004: # Too many channels error = SubscriptionError(message, 'TOO_MANY_CHANNELS', error_data) else: error = LatteStreamError(message, code, error_data) self.emit('error', error)
describe('Connection', () => { it('should authenticate on connection', async () => { const connection = new Connection('ws://localhost:8080', 'test_key'); const sendSpy = jest.spyOn(connection, 'send'); await connection.connect(); expect(sendSpy).toHaveBeenCalledWith({ api_key: 'test_key' }); }); it('should reconnect with exponential backoff', async () => { const connection = new Connection('ws://localhost:8080', 'test_key'); const delays = []; connection.on('reconnecting', (delay) => delays.push(delay)); // Simulate 3 failed connections for (let i = 0; i < 3; i++) { connection.handleClose(); await sleep(delays[i]); } expect(delays[0]).toBeCloseTo(1000, -2); // ~1s expect(delays[1]).toBeCloseTo(2000, -2); // ~2s expect(delays[2]).toBeCloseTo(4000, -2); // ~4s }); });
describe('LatteStream Integration', () => { let server, client; beforeAll(async () => { server = await startTestServer(); }); afterAll(async () => { await server.stop(); }); it('should subscribe and receive messages', async () => { client = new LatteStream('test_key', { wsEndpoint: 'ws://localhost:8080' }); const channel = client.subscribe('test-channel'); const messages = []; channel.bind('test-event', (data) => messages.push(data)); await waitForSubscription(channel); // Trigger from server await server.trigger('test-channel', 'test-event', { foo: 'bar' }); await waitFor(() => messages.length > 0); expect(messages[0]).toEqual({ foo: 'bar' }); }); });
Here's a minimal but complete Python SDK implementation:
# lattestream.py import json import websocket import threading import time from typing import Dict, Callable, Optional class LatteStream: def __init__(self, api_key: str, cluster: str = 'us-east-1'): self.api_key = api_key self.endpoint = f'wss://{cluster}.lattestream.com' self.ws = None self.socket_id = None self.channels: Dict[str, Channel] = {} self.listeners: Dict[str, list] = {} self.connected = False def connect(self): self.ws = websocket.WebSocketApp( self.endpoint, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close ) thread = threading.Thread(target=self.ws.run_forever) thread.daemon = True thread.start() def _on_open(self, ws): # Authenticate ws.send(json.dumps({'api_key': self.api_key})) def _on_message(self, ws, message): data = json.loads(message) event = data.get('event') if event == 'lattestream:connection_established': self.socket_id = data['data']['socket_id'] self.connected = True self._emit('connected') elif data.get('channel'): channel = self.channels.get(data['channel']) if channel: channel._handle_message(data) else: self._emit(event, data.get('data')) def _on_error(self, ws, error): self._emit('error', error) def _on_close(self, ws, close_status_code, close_msg): self.connected = False self._emit('disconnected') def subscribe(self, channel_name: str) -> 'Channel': if channel_name in self.channels: return self.channels[channel_name] channel = Channel(channel_name, self) self.channels[channel_name] = channel channel.subscribe() return channel def bind(self, event: str, callback: Callable): if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def _emit(self, event: str, data=None): if event in self.listeners: for callback in self.listeners[event]: callback(data) def _send(self, data: dict): if self.ws: self.ws.send(json.dumps(data)) class Channel: def __init__(self, name: str, client: LatteStream): self.name = name self.client = client self.listeners: Dict[str, list] = {} self.subscribed = False def subscribe(self): self.client._send({ 'event': 'lattestream:subscribe', 'data': {'channel': self.name} }) def bind(self, event: str, callback: Callable): if event not in self.listeners: self.listeners[event] = [] self.listeners[event].append(callback) def _handle_message(self, message: dict): event = message.get('event') data = message.get('data') if event == 'lattestream_internal:subscription_succeeded': self.subscribed = True if event in self.listeners: for callback in self.listeners[event]: callback(data) # Usage if __name__ == '__main__': client = LatteStream('test_key') client.bind('connected', lambda _: print('Connected!')) channel = client.subscribe('test-channel') channel.bind('test-event', lambda data: print(f'Received: {data}')) client.connect() time.sleep(100) # Keep alive
Ready to build? Start with the WebSocket API Reference for complete protocol details.