Translate Streaming STT Transcripts with LLM Gateway
In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming model and LLM Gateway.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 # --- Configuration --- 11 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 12 13 CONNECTION_PARAMS = { 14 "sample_rate": 16000, 15 "format_turns": True, # Request formatted final transcripts 16 } 17 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 18 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 19 20 # Audio Configuration 21 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 22 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 23 CHANNELS = 1 24 FORMAT = pyaudio.paInt16 25 26 # Global variables for audio stream and websocket 27 audio = None 28 stream = None 29 ws_app = None 30 audio_thread = None 31 stop_event = threading.Event() # To signal the audio thread to stop 32 33 # WAV recording variables 34 recorded_frames = [] # Store audio frames for WAV file 35 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 36 37 # --- Function to Translate Text with LLM Gateway --- 38 39 def translate_text(text): 40 """Called when translating final transcripts.""" 41 headers = { 42 "authorization": YOUR_API_KEY 43 } 44 45 llm_gateway_data = { 46 "model": "gemini-2.5-flash-lite", 47 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}", 48 "max_tokens": 1000 49 } 50 51 result = requests.post( 52 "https://llm-gateway.assemblyai.com/v1/chat/completions", 53 headers=headers, 54 json=llm_gateway_data 55 ) 56 return result.json()["choices"][0]["message"]["content"] 57 58 # --- WebSocket Event Handlers --- 59 60 def on_open(ws): 61 """Called when the WebSocket connection is established.""" 62 print("WebSocket connection opened.") 63 print(f"Connected to: {API_ENDPOINT}") 64 65 # Start sending audio data in a separate thread 66 def stream_audio(): 67 global stream 68 print("Starting audio streaming...") 69 while not stop_event.is_set(): 70 try: 71 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 72 73 # Store audio data for WAV recording 74 with recording_lock: 75 recorded_frames.append(audio_data) 76 77 # Send audio data as binary message 78 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 79 except Exception as e: 80 print(f"Error streaming audio: {e}") 81 # If stream read fails, likely means it's closed, stop the loop 82 break 83 print("Audio streaming stopped.") 84 85 global audio_thread 86 audio_thread = threading.Thread(target=stream_audio) 87 audio_thread.daemon = ( 88 True # Allow main thread to exit even if this thread is running 89 ) 90 audio_thread.start() 91 92 def on_message(ws, message): 93 try: 94 data = json.loads(message) 95 msg_type = data.get('type') 96 97 if msg_type == "Begin": 98 session_id = data.get('id') 99 expires_at = data.get('expires_at') 100 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 101 elif msg_type == "Turn": 102 transcript = data.get('transcript', '') 103 formatted = data.get('turn_is_formatted', False) 104 105 if formatted: 106 print('\r' + ' ' * 80 + '\r', end='') 107 print(translate_text(transcript)) 108 109 elif msg_type == "Termination": 110 audio_duration = data.get('audio_duration_seconds', 0) 111 session_duration = data.get('session_duration_seconds', 0) 112 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 113 114 except json.JSONDecodeError as e: 115 print(f"Error decoding message: {e}") 116 except Exception as e: 117 print(f"Error handling message: {e}") 118 119 def on_error(ws, error): 120 """Called when a WebSocket error occurs.""" 121 print(f"\nWebSocket Error: {error}") 122 # Attempt to signal stop on error 123 stop_event.set() 124 125 def on_close(ws, close_status_code, close_msg): 126 """Called when the WebSocket connection is closed.""" 127 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 128 129 # Ensure audio resources are released 130 global stream, audio 131 stop_event.set() # Signal audio thread just in case it's still running 132 133 if stream: 134 if stream.is_active(): 135 stream.stop_stream() 136 stream.close() 137 stream = None 138 if audio: 139 audio.terminate() 140 audio = None 141 # Try to join the audio thread to ensure clean exit 142 if audio_thread and audio_thread.is_alive(): 143 audio_thread.join(timeout=1.0) 144 145 # --- Main Execution --- 146 147 def run(): 148 global audio, stream, ws_app 149 150 # Initialize PyAudio 151 audio = pyaudio.PyAudio() 152 153 # Open microphone stream 154 try: 155 stream = audio.open( 156 input=True, 157 frames_per_buffer=FRAMES_PER_BUFFER, 158 channels=CHANNELS, 159 format=FORMAT, 160 rate=SAMPLE_RATE, 161 ) 162 print("Microphone stream opened successfully.") 163 print("Speak into your microphone. Press Ctrl+C to stop.") 164 print("Audio will be saved to a WAV file when the session ends.") 165 except Exception as e: 166 print(f"Error opening microphone stream: {e}") 167 if audio: 168 audio.terminate() 169 return # Exit if microphone cannot be opened 170 171 # Create WebSocketApp 172 ws_app = websocket.WebSocketApp( 173 API_ENDPOINT, 174 header={"Authorization": YOUR_API_KEY}, 175 on_open=on_open, 176 on_message=on_message, 177 on_error=on_error, 178 on_close=on_close, 179 ) 180 181 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 182 ws_thread = threading.Thread(target=ws_app.run_forever) 183 ws_thread.daemon = True 184 ws_thread.start() 185 186 try: 187 # Keep main thread alive until interrupted 188 while ws_thread.is_alive(): 189 time.sleep(0.1) 190 except KeyboardInterrupt: 191 print("\nCtrl+C received. Stopping...") 192 stop_event.set() # Signal audio thread to stop 193 194 # Send termination message to the server 195 if ws_app and ws_app.sock and ws_app.sock.connected: 196 try: 197 terminate_message = {"type": "Terminate"} 198 print(f"Sending termination message: {json.dumps(terminate_message)}") 199 ws_app.send(json.dumps(terminate_message)) 200 # Give a moment for messages to process before forceful close 201 time.sleep(5) 202 except Exception as e: 203 print(f"Error sending termination message: {e}") 204 205 # Close the WebSocket connection (will trigger on_close) 206 if ws_app: 207 ws_app.close() 208 209 # Wait for WebSocket thread to finish 210 ws_thread.join(timeout=2.0) 211 212 except Exception as e: 213 print(f"\nAn unexpected error occurred: {e}") 214 stop_event.set() 215 if ws_app: 216 ws_app.close() 217 ws_thread.join(timeout=2.0) 218 219 finally: 220 # Final cleanup (already handled in on_close, but good as a fallback) 221 if stream and stream.is_active(): 222 stream.stop_stream() 223 if stream: 224 stream.close() 225 if audio: 226 audio.terminate() 227 print("Cleanup complete. Exiting.") 228 229 if __name__ == "__main__": 230 run()
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Import Packages & Set API Key
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 21 # WAV recording variables 22 recorded_frames = [] # Store audio frames for WAV file 23 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Translate Text Function
Define a function called translate_text, which uses LLM Gateway to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.
1 def translate_text(text): 2 """Called when translating final transcripts.""" 3 headers = { 4 "authorization": YOUR_API_KEY 5 } 6 7 llm_gateway_data = { 8 "model": "claude-sonnet-4-20250514", 9 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}", 10 "max_tokens": 1000 11 } 12 13 result = requests.post( 14 "https://llm-gateway.assemblyai.com/v1/chat/completions", 15 headers=headers, 16 json=llm_gateway_data 17 ) 18 return result.json()["choices"][0]["message"]["content"]
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
In this function, use the previously defined translate_text to translate all final transcripts.
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 formatted = data.get('turn_is_formatted', False) 13 14 if formatted: 15 print('\r' + ' ' * 80 + '\r', end='') 16 print(translate_text(transcript)) 17 18 elif msg_type == "Termination": 19 audio_duration = data.get('audio_duration_seconds', 0) 20 session_duration = data.get('session_duration_seconds', 0) 21 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 22 23 except json.JSONDecodeError as e: 24 print(f"Error decoding message: {e}") 25 except Exception as e: 26 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
1 def run(): 2 global audio, stream, ws_app 3 4 # Initialize PyAudio 5 audio = pyaudio.PyAudio() 6 7 # Open microphone stream 8 try: 9 stream = audio.open( 10 input=True, 11 frames_per_buffer=FRAMES_PER_BUFFER, 12 channels=CHANNELS, 13 format=FORMAT, 14 rate=SAMPLE_RATE, 15 ) 16 print("Microphone stream opened successfully.") 17 print("Speak into your microphone. Press Ctrl+C to stop.") 18 print("Audio will be saved to a WAV file when the session ends.") 19 except Exception as e: 20 print(f"Error opening microphone stream: {e}") 21 if audio: 22 audio.terminate() 23 return # Exit if microphone cannot be opened 24 25 # Create WebSocketApp 26 ws_app = websocket.WebSocketApp( 27 API_ENDPOINT, 28 header={"Authorization": YOUR_API_KEY}, 29 on_open=on_open, 30 on_message=on_message, 31 on_error=on_error, 32 on_close=on_close, 33 ) 34 35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 36 ws_thread = threading.Thread(target=ws_app.run_forever) 37 ws_thread.daemon = True 38 ws_thread.start() 39 40 try: 41 # Keep main thread alive until interrupted 42 while ws_thread.is_alive(): 43 time.sleep(0.1) 44 except KeyboardInterrupt: 45 print("\nCtrl+C received. Stopping...") 46 stop_event.set() # Signal audio thread to stop 47 48 # Send termination message to the server 49 if ws_app and ws_app.sock and ws_app.sock.connected: 50 try: 51 terminate_message = {"type": "Terminate"} 52 print(f"Sending termination message: {json.dumps(terminate_message)}") 53 ws_app.send(json.dumps(terminate_message)) 54 # Give a moment for messages to process before forceful close 55 time.sleep(5) 56 except Exception as e: 57 print(f"Error sending termination message: {e}") 58 59 # Close the WebSocket connection (will trigger on_close) 60 if ws_app: 61 ws_app.close() 62 63 # Wait for WebSocket thread to finish 64 ws_thread.join(timeout=2.0) 65 66 except Exception as e: 67 print(f"\nAn unexpected error occurred: {e}") 68 stop_event.set() 69 if ws_app: 70 ws_app.close() 71 ws_thread.join(timeout=2.0) 72 73 finally: 74 # Final cleanup (already handled in on_close, but good as a fallback) 75 if stream and stream.is_active(): 76 stream.stop_stream() 77 if stream: 78 stream.close() 79 if audio: 80 audio.terminate() 81 print("Cleanup complete. Exiting.") 82 83 if __name__ == "__main__": 84 run()