Use LeMUR with Streaming Speech-to-Text (STT)
This script is modified to contain a global variable conversation_data
that accumulates the transcribed text in the on_message
function. Once the transcription session is closed, the conversation_data
is sent to LeMUR for analysis using LeMUR’s input_text
parameter.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 # --- Configuration --- 11 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 12 13 CONNECTION_PARAMS = { 14 "sample_rate": 16000, 15 "format_turns": True, # Request formatted final transcripts 16 } 17 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 18 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 19 20 # Audio Configuration 21 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 22 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 23 CHANNELS = 1 24 FORMAT = pyaudio.paInt16 25 26 # Global variables for audio stream and websocket 27 audio = None 28 stream = None 29 ws_app = None 30 audio_thread = None 31 stop_event = threading.Event() # To signal the audio thread to stop 32 conversation_data = "" 33 34 # WAV recording variables 35 recorded_frames = [] # Store audio frames for WAV file 36 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 37 38 # --- Function to Analyze Text with LeMUR --- 39 40 def analyze_with_lemur(text): 41 """Called when the WebSocket connection is closing and the transcript text is sent to LeMUR to be analyzed.""" 42 headers = { 43 "authorization": YOUR_API_KEY 44 } 45 46 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback." 47 48 lemur_data = { 49 "prompt": prompt, 50 "input_text": text, 51 "final_model": "anthropic/claude-3-7-sonnet-20250219", 52 } 53 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data) 54 return result.json()["response"] 55 56 # --- WebSocket Event Handlers --- 57 58 def on_open(ws): 59 """Called when the WebSocket connection is established.""" 60 print("WebSocket connection opened.") 61 print(f"Connected to: {API_ENDPOINT}") 62 63 # Start sending audio data in a separate thread 64 def stream_audio(): 65 global stream 66 print("Starting audio streaming...") 67 while not stop_event.is_set(): 68 try: 69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 70 71 # Store audio data for WAV recording 72 with recording_lock: 73 recorded_frames.append(audio_data) 74 75 # Send audio data as binary message 76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 77 except Exception as e: 78 print(f"Error streaming audio: {e}") 79 # If stream read fails, likely means it's closed, stop the loop 80 break 81 print("Audio streaming stopped.") 82 83 global audio_thread 84 audio_thread = threading.Thread(target=stream_audio) 85 audio_thread.daemon = ( 86 True # Allow main thread to exit even if this thread is running 87 ) 88 audio_thread.start() 89 90 def on_message(ws, message): 91 92 try: 93 data = json.loads(message) 94 msg_type = data.get('type') 95 96 if msg_type == "Begin": 97 session_id = data.get('id') 98 expires_at = data.get('expires_at') 99 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 100 elif msg_type == "Turn": 101 transcript = data.get('transcript', '') 102 formatted = data.get('turn_is_formatted', False) 103 104 if formatted: 105 global conversation_data 106 107 print('\r' + ' ' * 80 + '\r', end='') 108 print(transcript) 109 conversation_data += f"{transcript}\n" 110 111 elif msg_type == "Termination": 112 audio_duration = data.get('audio_duration_seconds', 0) 113 session_duration = data.get('session_duration_seconds', 0) 114 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 115 except json.JSONDecodeError as e: 116 print(f"Error decoding message: {e}") 117 except Exception as e: 118 print(f"Error handling message: {e}") 119 120 def on_error(ws, error): 121 """Called when a WebSocket error occurs.""" 122 print(f"\nWebSocket Error: {error}") 123 # Attempt to signal stop on error 124 stop_event.set() 125 126 def on_close(ws, close_status_code, close_msg): 127 """Called when the WebSocket connection is closed.""" 128 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 129 130 # Ensure audio resources are released 131 global stream, audio 132 stop_event.set() # Signal audio thread just in case it's still running 133 134 if stream: 135 if stream.is_active(): 136 stream.stop_stream() 137 stream.close() 138 stream = None 139 if audio: 140 audio.terminate() 141 audio = None 142 # Try to join the audio thread to ensure clean exit 143 if audio_thread and audio_thread.is_alive(): 144 audio_thread.join(timeout=1.0) 145 146 # --- Main Execution --- 147 148 def run(): 149 global audio, stream, ws_app 150 151 # Initialize PyAudio 152 audio = pyaudio.PyAudio() 153 154 # Open microphone stream 155 try: 156 stream = audio.open( 157 input=True, 158 frames_per_buffer=FRAMES_PER_BUFFER, 159 channels=CHANNELS, 160 format=FORMAT, 161 rate=SAMPLE_RATE, 162 ) 163 print("Microphone stream opened successfully.") 164 print("Speak into your microphone. Press Ctrl+C to stop.") 165 print("Audio will be saved to a WAV file when the session ends.") 166 except Exception as e: 167 print(f"Error opening microphone stream: {e}") 168 if audio: 169 audio.terminate() 170 return # Exit if microphone cannot be opened 171 172 # Create WebSocketApp 173 ws_app = websocket.WebSocketApp( 174 API_ENDPOINT, 175 header={"Authorization": YOUR_API_KEY}, 176 on_open=on_open, 177 on_message=on_message, 178 on_error=on_error, 179 on_close=on_close, 180 ) 181 182 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 183 ws_thread = threading.Thread(target=ws_app.run_forever) 184 ws_thread.daemon = True 185 ws_thread.start() 186 187 try: 188 # Keep main thread alive until interrupted 189 while ws_thread.is_alive(): 190 time.sleep(0.1) 191 except KeyboardInterrupt: 192 print("\nCtrl+C received. Stopping...") 193 stop_event.set() # Signal audio thread to stop 194 195 # Send termination message to the server 196 if ws_app and ws_app.sock and ws_app.sock.connected: 197 try: 198 terminate_message = {"type": "Terminate"} 199 print(f"Sending termination message: {json.dumps(terminate_message)}") 200 ws_app.send(json.dumps(terminate_message)) 201 # Give a moment for messages to process before forceful close 202 time.sleep(5) 203 except Exception as e: 204 print(f"Error sending termination message: {e}") 205 206 # Close the WebSocket connection (will trigger on_close) 207 if ws_app: 208 ws_app.close() 209 210 # Wait for WebSocket thread to finish 211 ws_thread.join(timeout=2.0) 212 213 # Analyze transcript with LeMUR 214 if conversation_data.strip(): 215 print("Analyzing conversation with LeMUR...") 216 print(analyze_with_lemur(conversation_data)) 217 else: 218 print("No conversation data to analyze.") 219 220 except Exception as e: 221 print(f"\nAn unexpected error occurred: {e}") 222 stop_event.set() 223 if ws_app: 224 ws_app.close() 225 ws_thread.join(timeout=2.0) 226 227 finally: 228 # Final cleanup (already handled in on_close, but good as a fallback) 229 if stream and stream.is_active(): 230 stream.stop_stream() 231 if stream: 232 stream.close() 233 if audio: 234 audio.terminate() 235 print("Cleanup complete. Exiting.") 236 237 238 if __name__ == "__main__": 239 run()
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Import Packages & Set API Key
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns
set to True
and a global variable conversation_data
set to an empty string ""
.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 conversation_data = "" 21 22 # WAV recording variables 23 recorded_frames = [] # Store audio frames for WAV file 24 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Analyze With LeMUR Function
Define a function called analyze_with_lemur
, which uses LeMUR to analyze the complete final transcript text. The prompt
can be modified to suit your individual requirements.
1 def analyze_with_lemur(text): 2 """Called when the WebSocket connection is closing and the transcript text is sent to LeMUR to be analyzed.""" 3 headers = { 4 "authorization": YOUR_API_KEY 5 } 6 7 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback." 8 9 lemur_data = { 10 "prompt": prompt, 11 "input_text": text, 12 "final_model": "anthropic/claude-3-7-sonnet-20250219", 13 } 14 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data) 15 return result.json()["response"]
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
In this function, use the previously defined conversation_data
to store all final transcripts together for later analysis.
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 formatted = data.get('turn_is_formatted', False) 13 14 if formatted: 15 global conversation_data 16 17 print('\r' + ' ' * 80 + '\r', end='') 18 print(transcript) 19 conversation_data += f"{transcript}\n" 20 21 elif msg_type == "Termination": 22 audio_duration = data.get('audio_duration_seconds', 0) 23 session_duration = data.get('session_duration_seconds', 0) 24 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 25 26 except json.JSONDecodeError as e: 27 print(f"Error decoding message: {e}") 28 except Exception as e: 29 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
After the socket is closed, conversation_data
is sent to the analyze_with_LeMUR
function and the LeMUR results are printed out.
1 def run(): 2 global audio, stream, ws_app 3 4 # Initialize PyAudio 5 audio = pyaudio.PyAudio() 6 7 # Open microphone stream 8 try: 9 stream = audio.open( 10 input=True, 11 frames_per_buffer=FRAMES_PER_BUFFER, 12 channels=CHANNELS, 13 format=FORMAT, 14 rate=SAMPLE_RATE, 15 ) 16 print("Microphone stream opened successfully.") 17 print("Speak into your microphone. Press Ctrl+C to stop.") 18 print("Audio will be saved to a WAV file when the session ends.") 19 except Exception as e: 20 print(f"Error opening microphone stream: {e}") 21 if audio: 22 audio.terminate() 23 return # Exit if microphone cannot be opened 24 25 # Create WebSocketApp 26 ws_app = websocket.WebSocketApp( 27 API_ENDPOINT, 28 header={"Authorization": YOUR_API_KEY}, 29 on_open=on_open, 30 on_message=on_message, 31 on_error=on_error, 32 on_close=on_close, 33 ) 34 35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 36 ws_thread = threading.Thread(target=ws_app.run_forever) 37 ws_thread.daemon = True 38 ws_thread.start() 39 40 try: 41 # Keep main thread alive until interrupted 42 while ws_thread.is_alive(): 43 time.sleep(0.1) 44 except KeyboardInterrupt: 45 print("\nCtrl+C received. Stopping...") 46 stop_event.set() # Signal audio thread to stop 47 48 # Send termination message to the server 49 if ws_app and ws_app.sock and ws_app.sock.connected: 50 try: 51 terminate_message = {"type": "Terminate"} 52 print(f"Sending termination message: {json.dumps(terminate_message)}") 53 ws_app.send(json.dumps(terminate_message)) 54 # Give a moment for messages to process before forceful close 55 time.sleep(5) 56 except Exception as e: 57 print(f"Error sending termination message: {e}") 58 59 # Close the WebSocket connection (will trigger on_close) 60 if ws_app: 61 ws_app.close() 62 63 # Wait for WebSocket thread to finish 64 ws_thread.join(timeout=2.0) 65 66 # Analyze transcript with LeMUR 67 if conversation_data.strip(): 68 print("Analyzing conversation with LeMUR...") 69 print(analyze_with_lemur(conversation_data)) 70 else: 71 print("No conversation data to analyze.") 72 73 except Exception as e: 74 print(f"\nAn unexpected error occurred: {e}") 75 stop_event.set() 76 if ws_app: 77 ws_app.close() 78 ws_thread.join(timeout=2.0) 79 80 finally: 81 # Final cleanup (already handled in on_close, but good as a fallback) 82 if stream and stream.is_active(): 83 stream.stop_stream() 84 if stream: 85 stream.close() 86 if audio: 87 audio.terminate() 88 print("Cleanup complete. Exiting.") 89 90 91 if __name__ == "__main__": 92 run()