Translate Streaming STT Transcripts with LeMUR

In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming model and LeMUR framework.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7import requests
8from urllib.parse import urlencode
9from datetime import datetime
10
11# --- Configuration ---
12YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
13
14CONNECTION_PARAMS = {
15 "sample_rate": 16000,
16 "format_turns": True, # Request formatted final transcripts
17}
18API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
19API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
20
21# Audio Configuration
22FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
23SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
24CHANNELS = 1
25FORMAT = pyaudio.paInt16
26
27# Global variables for audio stream and websocket
28audio = None
29stream = None
30ws_app = None
31audio_thread = None
32stop_event = threading.Event() # To signal the audio thread to stop
33
34# WAV recording variables
35recorded_frames = [] # Store audio frames for WAV file
36recording_lock = threading.Lock() # Thread-safe access to recorded_frames
37
38# --- Function to Translate Text with LeMUR ---
39
40def translate_text(text):
41 """Called when translating final transcripts."""
42 headers = {
43 "authorization": YOUR_API_KEY
44 }
45
46 prompt = "Translate the following text into Spanish. Do not write a preamble. Just return the translated text."
47
48 lemur_data = {
49 "prompt": prompt,
50 "input_text": text,
51 "final_model": "anthropic/claude-3-7-sonnet-20250219",
52 }
53 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data)
54 return result.json()["response"]
55
56# --- WebSocket Event Handlers ---
57
58def on_open(ws):
59 """Called when the WebSocket connection is established."""
60 print("WebSocket connection opened.")
61 print(f"Connected to: {API_ENDPOINT}")
62
63 # Start sending audio data in a separate thread
64 def stream_audio():
65 global stream
66 print("Starting audio streaming...")
67 while not stop_event.is_set():
68 try:
69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
70
71 # Store audio data for WAV recording
72 with recording_lock:
73 recorded_frames.append(audio_data)
74
75 # Send audio data as binary message
76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
77 except Exception as e:
78 print(f"Error streaming audio: {e}")
79 # If stream read fails, likely means it's closed, stop the loop
80 break
81 print("Audio streaming stopped.")
82
83 global audio_thread
84 audio_thread = threading.Thread(target=stream_audio)
85 audio_thread.daemon = (
86 True # Allow main thread to exit even if this thread is running
87 )
88 audio_thread.start()
89
90def on_message(ws, message):
91 try:
92 data = json.loads(message)
93 msg_type = data.get('type')
94
95 if msg_type == "Begin":
96 session_id = data.get('id')
97 expires_at = data.get('expires_at')
98 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
99 elif msg_type == "Turn":
100 transcript = data.get('transcript', '')
101 formatted = data.get('turn_is_formatted', False)
102
103 if formatted:
104 print('\r' + ' ' * 80 + '\r', end='')
105 print(translate_text(transcript))
106
107 elif msg_type == "Termination":
108 audio_duration = data.get('audio_duration_seconds', 0)
109 session_duration = data.get('session_duration_seconds', 0)
110 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
111
112 except json.JSONDecodeError as e:
113 print(f"Error decoding message: {e}")
114 except Exception as e:
115 print(f"Error handling message: {e}")
116
117def on_error(ws, error):
118 """Called when a WebSocket error occurs."""
119 print(f"\nWebSocket Error: {error}")
120 # Attempt to signal stop on error
121 stop_event.set()
122
123def on_close(ws, close_status_code, close_msg):
124 """Called when the WebSocket connection is closed."""
125 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
126
127 # Ensure audio resources are released
128 global stream, audio
129 stop_event.set() # Signal audio thread just in case it's still running
130
131 if stream:
132 if stream.is_active():
133 stream.stop_stream()
134 stream.close()
135 stream = None
136 if audio:
137 audio.terminate()
138 audio = None
139 # Try to join the audio thread to ensure clean exit
140 if audio_thread and audio_thread.is_alive():
141 audio_thread.join(timeout=1.0)
142
143# --- Main Execution ---
144
145def run():
146 global audio, stream, ws_app
147
148 # Initialize PyAudio
149 audio = pyaudio.PyAudio()
150
151 # Open microphone stream
152 try:
153 stream = audio.open(
154 input=True,
155 frames_per_buffer=FRAMES_PER_BUFFER,
156 channels=CHANNELS,
157 format=FORMAT,
158 rate=SAMPLE_RATE,
159 )
160 print("Microphone stream opened successfully.")
161 print("Speak into your microphone. Press Ctrl+C to stop.")
162 print("Audio will be saved to a WAV file when the session ends.")
163 except Exception as e:
164 print(f"Error opening microphone stream: {e}")
165 if audio:
166 audio.terminate()
167 return # Exit if microphone cannot be opened
168
169 # Create WebSocketApp
170 ws_app = websocket.WebSocketApp(
171 API_ENDPOINT,
172 header={"Authorization": YOUR_API_KEY},
173 on_open=on_open,
174 on_message=on_message,
175 on_error=on_error,
176 on_close=on_close,
177 )
178
179 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
180 ws_thread = threading.Thread(target=ws_app.run_forever)
181 ws_thread.daemon = True
182 ws_thread.start()
183
184 try:
185 # Keep main thread alive until interrupted
186 while ws_thread.is_alive():
187 time.sleep(0.1)
188 except KeyboardInterrupt:
189 print("\nCtrl+C received. Stopping...")
190 stop_event.set() # Signal audio thread to stop
191
192 # Send termination message to the server
193 if ws_app and ws_app.sock and ws_app.sock.connected:
194 try:
195 terminate_message = {"type": "Terminate"}
196 print(f"Sending termination message: {json.dumps(terminate_message)}")
197 ws_app.send(json.dumps(terminate_message))
198 # Give a moment for messages to process before forceful close
199 time.sleep(5)
200 except Exception as e:
201 print(f"Error sending termination message: {e}")
202
203 # Close the WebSocket connection (will trigger on_close)
204 if ws_app:
205 ws_app.close()
206
207 # Wait for WebSocket thread to finish
208 ws_thread.join(timeout=2.0)
209
210 except Exception as e:
211 print(f"\nAn unexpected error occurred: {e}")
212 stop_event.set()
213 if ws_app:
214 ws_app.close()
215 ws_thread.join(timeout=2.0)
216
217 finally:
218 # Final cleanup (already handled in on_close, but good as a fallback)
219 if stream and stream.is_active():
220 stream.stop_stream()
221 if stream:
222 stream.close()
223 if audio:
224 audio.terminate()
225 print("Cleanup complete. Exiting.")
226
227if __name__ == "__main__":
228 run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Import Packages & Set API Key

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8from datetime import datetime
9
10YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Global variables for audio stream and websocket
15audio = None
16stream = None
17ws_app = None
18audio_thread = None
19stop_event = threading.Event() # To signal the audio thread to stop
20
21# WAV recording variables
22recorded_frames = [] # Store audio frames for WAV file
23recording_lock = threading.Lock() # Thread-safe access to recorded_frames

Define Translate Text Function

Define a function called translate_text, which uses LeMUR to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.

1def translate_text(text):
2 """Called when translating final transcripts."""
3 headers = {
4 "authorization": YOUR_API_KEY
5 }
6
7 prompt = "Translate the following text into Spanish. Do not write a preamble. Just return the translated text."
8
9 lemur_data = {
10 "prompt": prompt,
11 "input_text": text,
12 "final_model": "anthropic/claude-3-7-sonnet-20250219",
13 }
14 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data)
15 return result.json()["response"]

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 # Start sending audio data in a separate thread
7 def stream_audio():
8 global stream
9 print("Starting audio streaming...")
10 while not stop_event.is_set():
11 try:
12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
13
14 # Store audio data for WAV recording
15 with recording_lock:
16 recorded_frames.append(audio_data)
17
18 # Send audio data as binary message
19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
20 except Exception as e:
21 print(f"Error streaming audio: {e}")
22 # If stream read fails, likely means it's closed, stop the loop
23 break
24 print("Audio streaming stopped.")
25
26 global audio_thread
27 audio_thread = threading.Thread(target=stream_audio)
28 audio_thread.daemon = (
29 True # Allow main thread to exit even if this thread is running
30 )
31 audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined translate_text to translate all final transcripts.

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 formatted = data.get('turn_is_formatted', False)
13
14 if formatted:
15 print('\r' + ' ' * 80 + '\r', end='')
16 print(translate_text(transcript))
17
18 elif msg_type == "Termination":
19 audio_duration = data.get('audio_duration_seconds', 0)
20 session_duration = data.get('session_duration_seconds', 0)
21 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
22
23 except json.JSONDecodeError as e:
24 print(f"Error decoding message: {e}")
25 except Exception as e:
26 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 """Called when the WebSocket connection is closed."""
3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
4
5 # Ensure audio resources are released
6 global stream, audio
7 stop_event.set() # Signal audio thread just in case it's still running
8
9 if stream:
10 if stream.is_active():
11 stream.stop_stream()
12 stream.close()
13 stream = None
14 if audio:
15 audio.terminate()
16 audio = None
17 # Try to join the audio thread to ensure clean exit
18 if audio_thread and audio_thread.is_alive():
19 audio_thread.join(timeout=1.0)

Websocket Error Handling

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 # Attempt to signal stop on error
5 stop_event.set()

Begin Streaming STT Transcription

1def run():
2 global audio, stream, ws_app
3
4 # Initialize PyAudio
5 audio = pyaudio.PyAudio()
6
7 # Open microphone stream
8 try:
9 stream = audio.open(
10 input=True,
11 frames_per_buffer=FRAMES_PER_BUFFER,
12 channels=CHANNELS,
13 format=FORMAT,
14 rate=SAMPLE_RATE,
15 )
16 print("Microphone stream opened successfully.")
17 print("Speak into your microphone. Press Ctrl+C to stop.")
18 print("Audio will be saved to a WAV file when the session ends.")
19 except Exception as e:
20 print(f"Error opening microphone stream: {e}")
21 if audio:
22 audio.terminate()
23 return # Exit if microphone cannot be opened
24
25 # Create WebSocketApp
26 ws_app = websocket.WebSocketApp(
27 API_ENDPOINT,
28 header={"Authorization": YOUR_API_KEY},
29 on_open=on_open,
30 on_message=on_message,
31 on_error=on_error,
32 on_close=on_close,
33 )
34
35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
36 ws_thread = threading.Thread(target=ws_app.run_forever)
37 ws_thread.daemon = True
38 ws_thread.start()
39
40 try:
41 # Keep main thread alive until interrupted
42 while ws_thread.is_alive():
43 time.sleep(0.1)
44 except KeyboardInterrupt:
45 print("\nCtrl+C received. Stopping...")
46 stop_event.set() # Signal audio thread to stop
47
48 # Send termination message to the server
49 if ws_app and ws_app.sock and ws_app.sock.connected:
50 try:
51 terminate_message = {"type": "Terminate"}
52 print(f"Sending termination message: {json.dumps(terminate_message)}")
53 ws_app.send(json.dumps(terminate_message))
54 # Give a moment for messages to process before forceful close
55 time.sleep(5)
56 except Exception as e:
57 print(f"Error sending termination message: {e}")
58
59 # Close the WebSocket connection (will trigger on_close)
60 if ws_app:
61 ws_app.close()
62
63 # Wait for WebSocket thread to finish
64 ws_thread.join(timeout=2.0)
65
66 except Exception as e:
67 print(f"\nAn unexpected error occurred: {e}")
68 stop_event.set()
69 if ws_app:
70 ws_app.close()
71 ws_thread.join(timeout=2.0)
72
73 finally:
74 # Final cleanup (already handled in on_close, but good as a fallback)
75 if stream and stream.is_active():
76 stream.stop_stream()
77 if stream:
78 stream.close()
79 if audio:
80 audio.terminate()
81 print("Cleanup complete. Exiting.")
82
83if __name__ == "__main__":
84 run()