Keyterms prompting for Universal-Streaming

The keyterms prompting feature helps improve recognition accuracy for specific words and phrases that are important to your use case.

Keyterms Prompting costs an additional $0.04/hour.

Quickstart

Firstly, install the required dependencies.

$pip install websocket-client pyaudio
1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
12
13CONNECTION_PARAMS = {
14 "sample_rate": 16000,
15 "format_turns": True, # Request formatted final transcripts
16 "keyterms_prompt": json.dumps(["Keanu Reeves", "AssemblyAI", "Universal-2"])
17}
18API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
19API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
20
21# Audio Configuration
22FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
23SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
24CHANNELS = 1
25FORMAT = pyaudio.paInt16
26
27# Global variables for audio stream and websocket
28audio = None
29stream = None
30ws_app = None
31audio_thread = None
32stop_event = threading.Event() # To signal the audio thread to stop
33
34# WAV recording variables
35recorded_frames = [] # Store audio frames for WAV file
36recording_lock = threading.Lock() # Thread-safe access to recorded_frames
37
38# --- WebSocket Event Handlers ---
39
40
41def on_open(ws):
42 """Called when the WebSocket connection is established."""
43 print("WebSocket connection opened.")
44 print(f"Connected to: {API_ENDPOINT}")
45
46 # Start sending audio data in a separate thread
47 def stream_audio():
48 global stream
49 print("Starting audio streaming...")
50 while not stop_event.is_set():
51 try:
52 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
53
54 # Store audio data for WAV recording
55 with recording_lock:
56 recorded_frames.append(audio_data)
57
58 # Send audio data as binary message
59 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
60 except Exception as e:
61 print(f"Error streaming audio: {e}")
62 # If stream read fails, likely means it's closed, stop the loop
63 break
64 print("Audio streaming stopped.")
65
66 global audio_thread
67 audio_thread = threading.Thread(target=stream_audio)
68 audio_thread.daemon = (
69 True # Allow main thread to exit even if this thread is running
70 )
71 audio_thread.start()
72
73def on_message(ws, message):
74 try:
75 data = json.loads(message)
76 msg_type = data.get('type')
77
78 if msg_type == "Begin":
79 session_id = data.get('id')
80 expires_at = data.get('expires_at')
81 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
82 elif msg_type == "Turn":
83 transcript = data.get('transcript', '')
84 formatted = data.get('turn_is_formatted', False)
85
86 # Clear previous line for formatted messages
87 if formatted:
88 print('\r' + ' ' * 80 + '\r', end='')
89 print(transcript)
90 else:
91 print(f"\r{transcript}", end='')
92 elif msg_type == "Termination":
93 audio_duration = data.get('audio_duration_seconds', 0)
94 session_duration = data.get('session_duration_seconds', 0)
95 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
96 except json.JSONDecodeError as e:
97 print(f"Error decoding message: {e}")
98 except Exception as e:
99 print(f"Error handling message: {e}")
100
101def on_error(ws, error):
102 """Called when a WebSocket error occurs."""
103 print(f"\nWebSocket Error: {error}")
104 # Attempt to signal stop on error
105 stop_event.set()
106
107
108def on_close(ws, close_status_code, close_msg):
109 """Called when the WebSocket connection is closed."""
110 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
111
112 # Save recorded audio to WAV file
113 save_wav_file()
114
115 # Ensure audio resources are released
116 global stream, audio
117 stop_event.set() # Signal audio thread just in case it's still running
118
119 if stream:
120 if stream.is_active():
121 stream.stop_stream()
122 stream.close()
123 stream = None
124 if audio:
125 audio.terminate()
126 audio = None
127 # Try to join the audio thread to ensure clean exit
128 if audio_thread and audio_thread.is_alive():
129 audio_thread.join(timeout=1.0)
130
131
132def save_wav_file():
133 """Save recorded audio frames to a WAV file."""
134 if not recorded_frames:
135 print("No audio data recorded.")
136 return
137
138 # Generate filename with timestamp
139 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
140 filename = f"recorded_audio_{timestamp}.wav"
141
142 try:
143 with wave.open(filename, 'wb') as wf:
144 wf.setnchannels(CHANNELS)
145 wf.setsampwidth(2) # 16-bit = 2 bytes
146 wf.setframerate(SAMPLE_RATE)
147
148 # Write all recorded frames
149 with recording_lock:
150 wf.writeframes(b''.join(recorded_frames))
151
152 print(f"Audio saved to: {filename}")
153 print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
154
155 except Exception as e:
156 print(f"Error saving WAV file: {e}")
157
158
159# --- Main Execution ---
160def run():
161 global audio, stream, ws_app
162
163 # Initialize PyAudio
164 audio = pyaudio.PyAudio()
165
166 # Open microphone stream
167 try:
168 stream = audio.open(
169 input=True,
170 frames_per_buffer=FRAMES_PER_BUFFER,
171 channels=CHANNELS,
172 format=FORMAT,
173 rate=SAMPLE_RATE,
174 )
175 print("Microphone stream opened successfully.")
176 print("Speak into your microphone. Press Ctrl+C to stop.")
177 print("Audio will be saved to a WAV file when the session ends.")
178 except Exception as e:
179 print(f"Error opening microphone stream: {e}")
180 if audio:
181 audio.terminate()
182 return # Exit if microphone cannot be opened
183
184 # Create WebSocketApp
185 ws_app = websocket.WebSocketApp(
186 API_ENDPOINT,
187 header={"Authorization": YOUR_API_KEY},
188 on_open=on_open,
189 on_message=on_message,
190 on_error=on_error,
191 on_close=on_close,
192 )
193
194 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
195 ws_thread = threading.Thread(target=ws_app.run_forever)
196 ws_thread.daemon = True
197 ws_thread.start()
198
199 try:
200 # Keep main thread alive until interrupted
201 while ws_thread.is_alive():
202 time.sleep(0.1)
203 except KeyboardInterrupt:
204 print("\nCtrl+C received. Stopping...")
205 stop_event.set() # Signal audio thread to stop
206
207 # Send termination message to the server
208 if ws_app and ws_app.sock and ws_app.sock.connected:
209 try:
210 terminate_message = {"type": "Terminate"}
211 print(f"Sending termination message: {json.dumps(terminate_message)}")
212 ws_app.send(json.dumps(terminate_message))
213 # Give a moment for messages to process before forceful close
214 time.sleep(5)
215 except Exception as e:
216 print(f"Error sending termination message: {e}")
217
218 # Close the WebSocket connection (will trigger on_close)
219 if ws_app:
220 ws_app.close()
221
222 # Wait for WebSocket thread to finish
223 ws_thread.join(timeout=2.0)
224
225 except Exception as e:
226 print(f"\nAn unexpected error occurred: {e}")
227 stop_event.set()
228 if ws_app:
229 ws_app.close()
230 ws_thread.join(timeout=2.0)
231
232 finally:
233 # Final cleanup (already handled in on_close, but good as a fallback)
234 if stream and stream.is_active():
235 stream.stop_stream()
236 if stream:
237 stream.close()
238 if audio:
239 audio.terminate()
240 print("Cleanup complete. Exiting.")
241
242
243if __name__ == "__main__":
244 run()

Configuration

To utilize keyterms prompting, you need to include your desired keyterms as query parameters in the WebSocket URL.

  • You can include a maximum of 100 keyterms per session.
  • Each individual keyterm string must be 50 characters or less in length.

How it works

Streaming Keyterms Prompting has two components to improve accuracy for your terms.

Word-level boosting

The streaming model itself is biased during inference to be more accurate at identifying words from your keyterms list. This happens in real-time as words are emitted during the streaming process, providing immediate improvements to recognition accuracy. This component is enabled by default.

Turn-level boosting

After each turn is completed, an additional boosting pass analyzes the full transcript using your keyterms list. This post-processing step, similar to formatting, provides a second layer of accuracy improvement by examining the complete context of the turn. To enable this component, set format_turns to True.

Both stages work together to maximize recognition accuracy for your keyterms throughout the streaming process.

Important notes

  • Keyterms prompts longer than 50 characters are ignored.
  • Requests containing more than 100 keyterms will result in an error.

Best practices

To maximize the effectiveness of keyterms prompting:

  • Specify Unique Terminology: Include proper names, company names, technical terms, or vocabulary specific to your domain that might not be commonly recognized.
  • Exact Spelling and Capitalization: Provide keyterms with the precise spelling and capitalization you expect to see in the output transcript. This helps the system accurately identify the terms.
  • Avoid Common Words: Do not include single, common English words (e.g., “information”) as keyterms. The system is generally proficient with such words, and adding them as keyterms can be redundant.