Translate Streaming STT Transcripts with LLM Gateway

In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming model and LLM Gateway.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
12
13CONNECTION_PARAMS = {
14 "sample_rate": 16000,
15 "format_turns": True, # Request formatted final transcripts
16}
17API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
18API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
19
20# Audio Configuration
21FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
22SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
23CHANNELS = 1
24FORMAT = pyaudio.paInt16
25
26# Global variables for audio stream and websocket
27audio = None
28stream = None
29ws_app = None
30audio_thread = None
31stop_event = threading.Event() # To signal the audio thread to stop
32
33# WAV recording variables
34recorded_frames = [] # Store audio frames for WAV file
35recording_lock = threading.Lock() # Thread-safe access to recorded_frames
36
37# --- Function to Translate Text with LLM Gateway ---
38
39def translate_text(text):
40 """Called when translating final transcripts."""
41 headers = {
42 "authorization": YOUR_API_KEY
43 }
44
45 llm_gateway_data = {
46 "model": "gemini-2.5-flash-lite",
47 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}",
48 "max_tokens": 1000
49 }
50
51 result = requests.post(
52 "https://llm-gateway.assemblyai.com/v1/chat/completions",
53 headers=headers,
54 json=llm_gateway_data
55 )
56 return result.json()["choices"][0]["message"]["content"]
57
58# --- WebSocket Event Handlers ---
59
60def on_open(ws):
61 """Called when the WebSocket connection is established."""
62 print("WebSocket connection opened.")
63 print(f"Connected to: {API_ENDPOINT}")
64
65 # Start sending audio data in a separate thread
66 def stream_audio():
67 global stream
68 print("Starting audio streaming...")
69 while not stop_event.is_set():
70 try:
71 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
72
73 # Store audio data for WAV recording
74 with recording_lock:
75 recorded_frames.append(audio_data)
76
77 # Send audio data as binary message
78 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
79 except Exception as e:
80 print(f"Error streaming audio: {e}")
81 # If stream read fails, likely means it's closed, stop the loop
82 break
83 print("Audio streaming stopped.")
84
85 global audio_thread
86 audio_thread = threading.Thread(target=stream_audio)
87 audio_thread.daemon = (
88 True # Allow main thread to exit even if this thread is running
89 )
90 audio_thread.start()
91
92def on_message(ws, message):
93 try:
94 data = json.loads(message)
95 msg_type = data.get('type')
96
97 if msg_type == "Begin":
98 session_id = data.get('id')
99 expires_at = data.get('expires_at')
100 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
101 elif msg_type == "Turn":
102 transcript = data.get('transcript', '')
103 formatted = data.get('turn_is_formatted', False)
104
105 if formatted:
106 print('\r' + ' ' * 80 + '\r', end='')
107 print(translate_text(transcript))
108
109 elif msg_type == "Termination":
110 audio_duration = data.get('audio_duration_seconds', 0)
111 session_duration = data.get('session_duration_seconds', 0)
112 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
113
114 except json.JSONDecodeError as e:
115 print(f"Error decoding message: {e}")
116 except Exception as e:
117 print(f"Error handling message: {e}")
118
119def on_error(ws, error):
120 """Called when a WebSocket error occurs."""
121 print(f"\nWebSocket Error: {error}")
122 # Attempt to signal stop on error
123 stop_event.set()
124
125def on_close(ws, close_status_code, close_msg):
126 """Called when the WebSocket connection is closed."""
127 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
128
129 # Ensure audio resources are released
130 global stream, audio
131 stop_event.set() # Signal audio thread just in case it's still running
132
133 if stream:
134 if stream.is_active():
135 stream.stop_stream()
136 stream.close()
137 stream = None
138 if audio:
139 audio.terminate()
140 audio = None
141 # Try to join the audio thread to ensure clean exit
142 if audio_thread and audio_thread.is_alive():
143 audio_thread.join(timeout=1.0)
144
145# --- Main Execution ---
146
147def run():
148 global audio, stream, ws_app
149
150 # Initialize PyAudio
151 audio = pyaudio.PyAudio()
152
153 # Open microphone stream
154 try:
155 stream = audio.open(
156 input=True,
157 frames_per_buffer=FRAMES_PER_BUFFER,
158 channels=CHANNELS,
159 format=FORMAT,
160 rate=SAMPLE_RATE,
161 )
162 print("Microphone stream opened successfully.")
163 print("Speak into your microphone. Press Ctrl+C to stop.")
164 print("Audio will be saved to a WAV file when the session ends.")
165 except Exception as e:
166 print(f"Error opening microphone stream: {e}")
167 if audio:
168 audio.terminate()
169 return # Exit if microphone cannot be opened
170
171 # Create WebSocketApp
172 ws_app = websocket.WebSocketApp(
173 API_ENDPOINT,
174 header={"Authorization": YOUR_API_KEY},
175 on_open=on_open,
176 on_message=on_message,
177 on_error=on_error,
178 on_close=on_close,
179 )
180
181 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
182 ws_thread = threading.Thread(target=ws_app.run_forever)
183 ws_thread.daemon = True
184 ws_thread.start()
185
186 try:
187 # Keep main thread alive until interrupted
188 while ws_thread.is_alive():
189 time.sleep(0.1)
190 except KeyboardInterrupt:
191 print("\nCtrl+C received. Stopping...")
192 stop_event.set() # Signal audio thread to stop
193
194 # Send termination message to the server
195 if ws_app and ws_app.sock and ws_app.sock.connected:
196 try:
197 terminate_message = {"type": "Terminate"}
198 print(f"Sending termination message: {json.dumps(terminate_message)}")
199 ws_app.send(json.dumps(terminate_message))
200 # Give a moment for messages to process before forceful close
201 time.sleep(5)
202 except Exception as e:
203 print(f"Error sending termination message: {e}")
204
205 # Close the WebSocket connection (will trigger on_close)
206 if ws_app:
207 ws_app.close()
208
209 # Wait for WebSocket thread to finish
210 ws_thread.join(timeout=2.0)
211
212 except Exception as e:
213 print(f"\nAn unexpected error occurred: {e}")
214 stop_event.set()
215 if ws_app:
216 ws_app.close()
217 ws_thread.join(timeout=2.0)
218
219 finally:
220 # Final cleanup (already handled in on_close, but good as a fallback)
221 if stream and stream.is_active():
222 stream.stop_stream()
223 if stream:
224 stream.close()
225 if audio:
226 audio.terminate()
227 print("Cleanup complete. Exiting.")
228
229if __name__ == "__main__":
230 run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Import Packages & Set API Key

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8from datetime import datetime
9
10YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Global variables for audio stream and websocket
15audio = None
16stream = None
17ws_app = None
18audio_thread = None
19stop_event = threading.Event() # To signal the audio thread to stop
20
21# WAV recording variables
22recorded_frames = [] # Store audio frames for WAV file
23recording_lock = threading.Lock() # Thread-safe access to recorded_frames

Define Translate Text Function

Define a function called translate_text, which uses LLM Gateway to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.

1def translate_text(text):
2 """Called when translating final transcripts."""
3 headers = {
4 "authorization": YOUR_API_KEY
5 }
6
7 llm_gateway_data = {
8 "model": "claude-sonnet-4-20250514",
9 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}",
10 "max_tokens": 1000
11 }
12
13 result = requests.post(
14 "https://llm-gateway.assemblyai.com/v1/chat/completions",
15 headers=headers,
16 json=llm_gateway_data
17 )
18 return result.json()["choices"][0]["message"]["content"]

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 # Start sending audio data in a separate thread
7 def stream_audio():
8 global stream
9 print("Starting audio streaming...")
10 while not stop_event.is_set():
11 try:
12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
13
14 # Store audio data for WAV recording
15 with recording_lock:
16 recorded_frames.append(audio_data)
17
18 # Send audio data as binary message
19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
20 except Exception as e:
21 print(f"Error streaming audio: {e}")
22 # If stream read fails, likely means it's closed, stop the loop
23 break
24 print("Audio streaming stopped.")
25
26 global audio_thread
27 audio_thread = threading.Thread(target=stream_audio)
28 audio_thread.daemon = (
29 True # Allow main thread to exit even if this thread is running
30 )
31 audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined translate_text to translate all final transcripts.

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 formatted = data.get('turn_is_formatted', False)
13
14 if formatted:
15 print('\r' + ' ' * 80 + '\r', end='')
16 print(translate_text(transcript))
17
18 elif msg_type == "Termination":
19 audio_duration = data.get('audio_duration_seconds', 0)
20 session_duration = data.get('session_duration_seconds', 0)
21 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
22
23 except json.JSONDecodeError as e:
24 print(f"Error decoding message: {e}")
25 except Exception as e:
26 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 """Called when the WebSocket connection is closed."""
3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
4
5 # Ensure audio resources are released
6 global stream, audio
7 stop_event.set() # Signal audio thread just in case it's still running
8
9 if stream:
10 if stream.is_active():
11 stream.stop_stream()
12 stream.close()
13 stream = None
14 if audio:
15 audio.terminate()
16 audio = None
17 # Try to join the audio thread to ensure clean exit
18 if audio_thread and audio_thread.is_alive():
19 audio_thread.join(timeout=1.0)

Websocket Error Handling

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 # Attempt to signal stop on error
5 stop_event.set()

Begin Streaming STT Transcription

1def run():
2 global audio, stream, ws_app
3
4 # Initialize PyAudio
5 audio = pyaudio.PyAudio()
6
7 # Open microphone stream
8 try:
9 stream = audio.open(
10 input=True,
11 frames_per_buffer=FRAMES_PER_BUFFER,
12 channels=CHANNELS,
13 format=FORMAT,
14 rate=SAMPLE_RATE,
15 )
16 print("Microphone stream opened successfully.")
17 print("Speak into your microphone. Press Ctrl+C to stop.")
18 print("Audio will be saved to a WAV file when the session ends.")
19 except Exception as e:
20 print(f"Error opening microphone stream: {e}")
21 if audio:
22 audio.terminate()
23 return # Exit if microphone cannot be opened
24
25 # Create WebSocketApp
26 ws_app = websocket.WebSocketApp(
27 API_ENDPOINT,
28 header={"Authorization": YOUR_API_KEY},
29 on_open=on_open,
30 on_message=on_message,
31 on_error=on_error,
32 on_close=on_close,
33 )
34
35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
36 ws_thread = threading.Thread(target=ws_app.run_forever)
37 ws_thread.daemon = True
38 ws_thread.start()
39
40 try:
41 # Keep main thread alive until interrupted
42 while ws_thread.is_alive():
43 time.sleep(0.1)
44 except KeyboardInterrupt:
45 print("\nCtrl+C received. Stopping...")
46 stop_event.set() # Signal audio thread to stop
47
48 # Send termination message to the server
49 if ws_app and ws_app.sock and ws_app.sock.connected:
50 try:
51 terminate_message = {"type": "Terminate"}
52 print(f"Sending termination message: {json.dumps(terminate_message)}")
53 ws_app.send(json.dumps(terminate_message))
54 # Give a moment for messages to process before forceful close
55 time.sleep(5)
56 except Exception as e:
57 print(f"Error sending termination message: {e}")
58
59 # Close the WebSocket connection (will trigger on_close)
60 if ws_app:
61 ws_app.close()
62
63 # Wait for WebSocket thread to finish
64 ws_thread.join(timeout=2.0)
65
66 except Exception as e:
67 print(f"\nAn unexpected error occurred: {e}")
68 stop_event.set()
69 if ws_app:
70 ws_app.close()
71 ws_thread.join(timeout=2.0)
72
73 finally:
74 # Final cleanup (already handled in on_close, but good as a fallback)
75 if stream and stream.is_active():
76 stream.stop_stream()
77 if stream:
78 stream.close()
79 if audio:
80 audio.terminate()
81 print("Cleanup complete. Exiting.")
82
83if __name__ == "__main__":
84 run()