Migrating from Streaming v2 to Streaming v3

This cookbook guides you through migrating from AssemblyAI’s legacy Streaming STT model (v2) to our latest Universal Streaming STT model (v3), which provides ultra-low latency for faster transcription, intelligent endpointing for more natural speech detection, and improved accuracy across various audio conditions.

Check out this blog post to learn more about this new model!

Overview of changes

The migration involves several key improvements:

  • API Version: Upgrade from v2 (/v2/realtime/ws) to v3 (/v3/ws)
  • Enhanced Error Handling: Robust cleanup and resource management
  • Improved Threading: Better control over audio streaming threads
  • Modern Message Format: Updated message types and structure
  • Configuration Options: More flexible connection parameters
  • Graceful Shutdown: Proper termination handling

You can follow the step-by-step guide below to make changes to your existing code but here is what your code should look like in the end:

1import pyaudio
2import websocket
3import json
4import threading
5import time
6from urllib.parse import urlencode
7from datetime import datetime
8
9# --- Configuration ---
10YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
11
12CONNECTION_PARAMS = {
13 "sample_rate": 16000,
14 "format_turns": True, # Request formatted final transcripts
15}
16API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
17API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
18
19# Audio Configuration
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
22CHANNELS = 1
23FORMAT = pyaudio.paInt16
24
25# Global variables for audio stream and websocket
26audio = None
27stream = None
28ws_app = None
29audio_thread = None
30stop_event = threading.Event() # To signal the audio thread to stop
31
32# --- WebSocket Event Handlers ---
33
34
35def on_open(ws):
36 """Called when the WebSocket connection is established."""
37 print("WebSocket connection opened.")
38 print(f"Connected to: {API_ENDPOINT}")
39
40 # Start sending audio data in a separate thread
41 def stream_audio():
42 global stream
43 print("Starting audio streaming...")
44 while not stop_event.is_set():
45 try:
46 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
47 # Send audio data as binary message
48 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
49 except Exception as e:
50 print(f"Error streaming audio: {e}")
51 # If stream read fails, likely means it's closed, stop the loop
52 break
53 print("Audio streaming stopped.")
54
55 global audio_thread
56 audio_thread = threading.Thread(target=stream_audio)
57 audio_thread.daemon = (
58 True # Allow main thread to exit even if this thread is running
59 )
60 audio_thread.start()
61
62def on_message(ws, message):
63 try:
64 data = json.loads(message)
65 msg_type = data.get('type')
66
67 if msg_type == "Begin":
68 session_id = data.get('id')
69 expires_at = data.get('expires_at')
70 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
71 elif msg_type == "Turn":
72 transcript = data.get('transcript', '')
73 formatted = data.get('turn_is_formatted', False)
74
75 # Clear previous line for formatted messages
76 if formatted:
77 print('\r' + ' ' * 80 + '\r', end='')
78 print(transcript)
79 else:
80 print(f"\r{transcript}", end='')
81 elif msg_type == "Termination":
82 audio_duration = data.get('audio_duration_seconds', 0)
83 session_duration = data.get('session_duration_seconds', 0)
84 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
85 except json.JSONDecodeError as e:
86 print(f"Error decoding message: {e}")
87 except Exception as e:
88 print(f"Error handling message: {e}")
89
90def on_error(ws, error):
91 """Called when a WebSocket error occurs."""
92 print(f"\nWebSocket Error: {error}")
93 # Attempt to signal stop on error
94 stop_event.set()
95
96
97def on_close(ws, close_status_code, close_msg):
98 """Called when the WebSocket connection is closed."""
99 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
100 # Ensure audio resources are released
101 global stream, audio
102 stop_event.set() # Signal audio thread just in case it's still running
103
104 if stream:
105 if stream.is_active():
106 stream.stop_stream()
107 stream.close()
108 stream = None
109 if audio:
110 audio.terminate()
111 audio = None
112 # Try to join the audio thread to ensure clean exit
113 if audio_thread and audio_thread.is_alive():
114 audio_thread.join(timeout=1.0)
115
116
117# --- Main Execution ---
118def run():
119 global audio, stream, ws_app
120
121 # Initialize PyAudio
122 audio = pyaudio.PyAudio()
123
124 # Open microphone stream
125 try:
126 stream = audio.open(
127 input=True,
128 frames_per_buffer=FRAMES_PER_BUFFER,
129 channels=CHANNELS,
130 format=FORMAT,
131 rate=SAMPLE_RATE,
132 )
133 print("Microphone stream opened successfully.")
134 print("Speak into your microphone. Press Ctrl+C to stop.")
135 except Exception as e:
136 print(f"Error opening microphone stream: {e}")
137 if audio:
138 audio.terminate()
139 return # Exit if microphone cannot be opened
140
141 # Create WebSocketApp
142 ws_app = websocket.WebSocketApp(
143 API_ENDPOINT,
144 header={"Authorization": YOUR_API_KEY},
145 on_open=on_open,
146 on_message=on_message,
147 on_error=on_error,
148 on_close=on_close,
149 )
150
151 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
152 ws_thread = threading.Thread(target=ws_app.run_forever)
153 ws_thread.daemon = True
154 ws_thread.start()
155
156 try:
157 # Keep main thread alive until interrupted
158 while ws_thread.is_alive():
159 time.sleep(0.1)
160 except KeyboardInterrupt:
161 print("\nCtrl+C received. Stopping...")
162 stop_event.set() # Signal audio thread to stop
163
164 # Send termination message to the server
165 if ws_app and ws_app.sock and ws_app.sock.connected:
166 try:
167 terminate_message = {"type": "Terminate"}
168 print(f"Sending termination message: {json.dumps(terminate_message)}")
169 ws_app.send(json.dumps(terminate_message))
170 # Give a moment for messages to process before forceful close
171 time.sleep(5)
172 except Exception as e:
173 print(f"Error sending termination message: {e}")
174
175 # Close the WebSocket connection (will trigger on_close)
176 if ws_app:
177 ws_app.close()
178
179 # Wait for WebSocket thread to finish
180 ws_thread.join(timeout=2.0)
181
182 except Exception as e:
183 print(f"\nAn unexpected error occurred: {e}")
184 stop_event.set()
185 if ws_app:
186 ws_app.close()
187 ws_thread.join(timeout=2.0)
188
189 finally:
190 # Final cleanup (already handled in on_close, but good as a fallback)
191 if stream and stream.is_active():
192 stream.stop_stream()
193 if stream:
194 stream.close()
195 if audio:
196 audio.terminate()
197 print("Cleanup complete. Exiting.")
198
199
200if __name__ == "__main__":
201 run()

For more information on our Universal Streaming feature, see this section of our official documentation.

Step-by-step migration guide

1. Update API endpoint and configuration

Before (v2):

1ws = websocket.WebSocketApp(
2 f'wss://api.assemblyai.com/v2/realtime/ws?sample_rate={SAMPLE_RATE}',
3 header={'Authorization': YOUR_API_KEY},
4 on_message=on_message,
5 on_open=on_open,
6 on_error=on_error,
7 on_close=on_close
8)

After (v3):

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8ws_app = websocket.WebSocketApp(
9 API_ENDPOINT,
10 header={"Authorization": YOUR_API_KEY},
11 # ...
12)

Key Changes:

  • New base URL: streaming.assemblyai.com instead of api.assemblyai.com
  • Version upgrade: /v3/ws instead of /v2/realtime/ws
  • Configuration via URL parameters using urlencode()
  • Added format_turns option for better transcript formatting

2. Improve audio configuration

Before (v2):

1FRAMES_PER_BUFFER = 3200 # 200ms of audio
2SAMPLE_RATE = 16000
3CHANNELS = 1
4FORMAT = pyaudio.paInt16

After (v3):

1FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
2SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
3CHANNELS = 1
4FORMAT = pyaudio.paInt16

Key Changes:

  • Reduced buffer size from 200ms to 50ms for lower latency
  • Sample rate now references the configuration parameter
  • Added detailed comments explaining the calculations

3. Enhance thread management

Before (v2):

1def on_open(ws):
2 def stream_audio():
3 while True:
4 try:
5 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
6 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
7 except Exception as e:
8 print(f'\nError streaming audio: {e}')
9 break
10
11 audio_thread = Thread(target=stream_audio, daemon=True)
12 audio_thread.start()

After (v3):

1# Global variables for better resource management
2stop_event = threading.Event()
3audio_thread = None
4
5def on_open(ws):
6 def stream_audio():
7 while not stop_event.is_set():
8 try:
9 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
10 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
11 except Exception as e:
12 print(f"Error streaming audio: {e}")
13 break
14
15 global audio_thread
16 audio_thread = threading.Thread(target=stream_audio)
17 audio_thread.daemon = True
18 audio_thread.start()

Key Changes:

  • Added threading.Event() for controlled thread termination
  • Global audio_thread variable for better lifecycle management
  • Condition-based loop (while not stop_event.is_set()) instead of infinite loop
  • Improved error handling and logging

4. Update message handling

Before (v2):

1def on_message(ws, message):
2 try:
3 msg = json.loads(message)
4 msg_type = msg.get('message_type')
5
6 if msg_type == 'SessionBegins':
7 session_id = msg.get('session_id')
8 print("Session ID:", session_id)
9 return
10
11 text = msg.get('text', '')
12 if not text:
13 return
14
15 if msg_type == 'PartialTranscript':
16 print(text, end='\r')
17 elif msg_type == 'FinalTranscript':
18 print(text, end='\r\n')
19 elif msg_type == 'error':
20 print(f'\nError: {msg.get("error", "Unknown error")}')
21 except Exception as e:
22 print(f'\nError handling message: {e}')

After (v3):

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 formatted = data.get('turn_is_formatted', False)
13
14 if formatted:
15 print('\r' + ' ' * 80 + '\r', end='')
16 print(transcript)
17 else:
18 print(f"\r{transcript}", end='')
19 elif msg_type == "Termination":
20 audio_duration = data.get('audio_duration_seconds', 0)
21 session_duration = data.get('session_duration_seconds', 0)
22 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
23 except json.JSONDecodeError as e:
24 print(f"Error decoding message: {e}")
25 except Exception as e:
26 print(f"Error handling message: {e}")

Key Changes:

  • Message types renamed: SessionBegins → Begin, PartialTranscript/FinalTranscript → Turn
  • Field names updated: message_type → type, session_id → id, text → transcript
  • Added session expiration timestamp handling
  • Improved transcript formatting with turn_is_formatted flag
  • Added Termination message handling with session statistics
  • Enhanced error handling with specific JSONDecodeError catch

5. Implement robust resource management

Before (v2):

1def on_close(ws, status, msg):
2 stream.stop_stream()
3 stream.close()
4 audio.terminate()
5 print('\nDisconnected')
6
7# Global audio resources (potential for memory leaks)
8audio = pyaudio.PyAudio()
9stream = audio.open(...)

After (v3):

1def on_close(ws, close_status_code, close_msg):
2 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
3 global stream, audio
4 stop_event.set() # Signal audio thread to stop
5
6 if stream:
7 if stream.is_active():
8 stream.stop_stream()
9 stream.close()
10 stream = None
11 if audio:
12 audio.terminate()
13 audio = None
14 # Ensure audio thread cleanup
15 if audio_thread and audio_thread.is_alive():
16 audio_thread.join(timeout=1.0)
17
18def on_error(ws, error):
19 print(f"\nWebSocket Error: {error}")
20 stop_event.set() # Signal threads to stop on error

Key Changes:

  • Added thread stop signaling via stop_event.set()
  • Conditional resource cleanup with null checks
  • Proper thread joining with timeout
  • Resource nullification to prevent reuse
  • Enhanced error handling in on_error

6. Add graceful shutdown handling

Before (v2):

1try:
2 ws.run_forever()
3except Exception as e:
4 print(f'\nError: {e}')

After (v3):

1def run():
2 # ... initialization code ...
3
4 ws_thread = threading.Thread(target=ws_app.run_forever)
5 ws_thread.daemon = True
6 ws_thread.start()
7
8 try:
9 while ws_thread.is_alive():
10 time.sleep(0.1)
11 except KeyboardInterrupt:
12 print("\nCtrl+C received. Stopping...")
13 stop_event.set()
14
15 # Send termination message
16 if ws_app and ws_app.sock and ws_app.sock.connected:
17 try:
18 terminate_message = {"type": "Terminate"}
19 ws_app.send(json.dumps(terminate_message))
20 time.sleep(5) # Allow message processing
21 except Exception as e:
22 print(f"Error sending termination message: {e}")
23
24 ws_app.close()
25 ws_thread.join(timeout=2.0)
26 finally:
27 # Final cleanup
28 # ... resource cleanup code ...

Key Changes:

  • WebSocket runs in separate thread for better control
  • Proper KeyboardInterrupt handling
  • Graceful termination message sending
  • Thread joining with timeouts
  • Comprehensive cleanup in finally block

7. Improve error handling and logging

Before (v2):

  • Basic error printing
  • Limited context in error messages
  • No resource cleanup on errors

After (v3):

  • Detailed error context and timestamps
  • Proper exception type handling
  • Resource cleanup on all error paths
  • Connection status checking before operations

Migration checklist

  • Update API endpoint from v2 to v3
  • Change base URL to streaming.assemblyai.com
  • Update message type handling (Begin, Turn, Termination)
  • Implement threading.Event() for thread control
  • Add proper resource cleanup in all code paths
  • Update field names in message parsing
  • Add graceful shutdown with termination messages
  • Implement timeout-based thread joining
  • Add detailed error logging with context
  • Test KeyboardInterrupt handling
  • Verify audio resource cleanup
  • Test connection failure scenarios

Testing your migration

  1. Basic Functionality: Verify transcription works with simple speech
  2. Error Handling: Test with invalid API keys or network issues
  3. Graceful Shutdown: Test Ctrl+C interruption
  4. Resource Cleanup: Monitor for memory leaks during extended use
  5. Thread Management: Verify proper thread termination
  6. Message Formatting: Test with format_turns enabled/disabled

Common migration issues

Issue: “WebSocket connection failed”

Solution: Verify you’re using the new v3 endpoint URL and proper authentication header format.

Issue: “Message type not recognized”

Solution: Update message type handling from old names (SessionBegins, PartialTranscript) to new ones (Begin, Turn).

Issue: “Audio thread won’t stop”

Solution: Ensure you’re using threading.Event() and calling stop_event.set() in error handlers.

Issue: “Resource leak warnings”

Solution: Verify all audio resources are properly cleaned up in on_close and finally blocks.

Benefits of migration

  • Improved Reliability: Better error handling and recovery
  • Lower Latency: Reduced buffer sizes for faster response
  • Enhanced Features: Formatted transcripts and session statistics
  • Better Resource Management: Proper cleanup prevents memory leaks
  • Graceful Shutdown: Clean termination with proper cleanup
  • Modern Architecture: Improved threading and event handling

Conclusion

This migration provides a more robust, maintainable, and feature-rich streaming transcription implementation. The enhanced error handling, resource management, and modern API features make it suitable for production use cases where reliability and performance are critical.