Determine Optimal Turn Detection Settings from Historical Audio Analysis

This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.

Quickstart

1import requests
2import time
3import json
4import pyaudio
5import websocket
6import threading
7from urllib.parse import urlencode
8from datetime import datetime
9import os
10from pathlib import Path
11
12
13YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
14AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
15
16# Audio Configuration
17SAMPLE_RATE = 16000
18CHANNELS = 1
19FORMAT = pyaudio.paInt16
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21
22# Global variables for audio stream and websocket
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28recorded_frames = []
29recording_lock = threading.Lock()
30
31# Store the optimized configuration
32OPTIMIZED_CONFIG = {}
33
34
35def get_audio_files(folder_path):
36 """
37 Gets all audio files from the specified folder.
38 Supports all formats accepted by AssemblyAI's API
39 """
40 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
41 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
42 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
43 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
44 '.mov', '.mp4', '.m4v'}
45 folder = Path(folder_path)
46
47 if not folder.exists():
48 raise FileNotFoundError(f"Folder not found: {folder_path}")
49
50 audio_files = [
51 str(f) for f in folder.iterdir()
52 if f.is_file() and f.suffix.lower() in audio_extensions
53 ]
54
55 if not audio_files:
56 raise ValueError(f"No audio files found in {folder_path}")
57
58 return sorted(audio_files)
59
60
61def analyze_single_file(audio_file, api_key, file_index, total_files):
62 """
63 Analyzes a single audio file and returns gap statistics.
64 """
65 print("\n" + "=" * 70)
66 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
67 print("=" * 70)
68
69 base_url = "https://api.assemblyai.com"
70 headers = {"authorization": api_key}
71
72 # Upload audio file
73 print(f"\nUploading audio file...")
74
75 if audio_file.startswith("http"):
76 upload_url = audio_file
77 print("Using provided URL")
78 else:
79 with open(audio_file, "rb") as f:
80 response = requests.post(
81 base_url + "/v2/upload",
82 headers=headers,
83 data=f
84 )
85 upload_url = response.json()["upload_url"]
86 print(f"Upload complete")
87
88 # Enable Speaker Labels
89 data = {
90 "audio_url": upload_url,
91 "speaker_labels": True,
92 # "language_detection": True # Enable automatic language detection if your files are in different languages
93 }
94
95 response = requests.post(
96 base_url + "/v2/transcript",
97 json=data,
98 headers=headers
99 )
100 transcript_id = response.json()['id']
101 print(f"Transcript ID: {transcript_id}")
102
103 # Poll for completion
104 print("\nWaiting for transcription to complete...")
105 polling_endpoint = base_url + "/v2/transcript/" + transcript_id
106
107 while True:
108 transcription_result = requests.get(polling_endpoint, headers=headers).json()
109
110 if transcription_result['status'] == 'completed':
111 print("Transcription completed!")
112 break
113 elif transcription_result['status'] == 'error':
114 print(f"Transcription failed: {transcription_result['error']}")
115 return None
116 else:
117 time.sleep(3)
118
119 # Calculate gaps
120 utterances = transcription_result['utterances']
121
122 if len(utterances) < 2:
123 print("⚠ Not enough utterances to analyze gaps (need at least 2)")
124 return None
125
126 gaps = []
127 for i in range(len(utterances) - 1):
128 current_end = utterances[i]['end']
129 next_start = utterances[i + 1]['start']
130 gap = next_start - current_end
131
132 if gap > 0:
133 gaps.append(gap)
134
135 if not gaps:
136 print("⚠ No gaps found between utterances (all speech overlaps)")
137 return None
138
139 # Calculate statistics
140 stats = {
141 'filename': Path(audio_file).name,
142 'average_gap_ms': sum(gaps) / len(gaps),
143 'min_gap_ms': min(gaps),
144 'max_gap_ms': max(gaps),
145 'median_gap_ms': sorted(gaps)[len(gaps) // 2],
146 'total_utterances': len(utterances),
147 'total_gaps': len(gaps),
148 'all_gaps': gaps
149 }
150
151 print(f"\nResults for {stats['filename']}:")
152 print(f" Total utterances: {stats['total_utterances']}")
153 print(f" Total gaps: {stats['total_gaps']}")
154 print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
155 print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
156
157 # Save transcript JSON to file
158 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
159 safe_filename = Path(audio_file).stem.replace(' ', '_')
160 json_filename = f"transcript_{safe_filename}_{timestamp}.json"
161
162 try:
163 with open(json_filename, 'w', encoding='utf-8') as f:
164 json.dump(transcription_result, f, indent=2, ensure_ascii=False)
165 print(f" Transcript saved: {json_filename}")
166 except Exception as e:
167 print(f" Error saving transcript: {e}")
168
169 return stats
170
171
172def analyze_multiple_files(folder_path, api_key):
173 """
174 Analyzes all audio files in a folder and returns aggregated statistics.
175 """
176 print("=" * 70)
177 print("MULTI-FILE UTTERANCE GAP ANALYSIS")
178 print("=" * 70)
179
180 audio_files = get_audio_files(folder_path)
181 total_files = len(audio_files)
182
183 print(f"\nFound {total_files} audio file(s) in: {folder_path}")
184 for i, file in enumerate(audio_files, 1):
185 print(f" {i}. {Path(file).name}")
186
187 # Analyze each file
188 all_file_stats = []
189 all_gaps = []
190
191 for i, audio_file in enumerate(audio_files, 1):
192 try:
193 stats = analyze_single_file(audio_file, api_key, i, total_files)
194 if stats:
195 all_file_stats.append(stats)
196 all_gaps.extend(stats['all_gaps'])
197 except Exception as e:
198 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
199 continue
200
201 if not all_file_stats:
202 print("\n✗ No files were successfully analyzed")
203 return None
204
205 # Calculate aggregated statistics
206 print("\n" + "=" * 70)
207 print("AGGREGATED GAP ANALYSIS RESULTS")
208 print("=" * 70)
209
210 aggregated_stats = {
211 'total_files_analyzed': len(all_file_stats),
212 'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
213 'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
214 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
215 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
216 'overall_min_gap_ms': min(all_gaps),
217 'overall_max_gap_ms': max(all_gaps),
218 'file_averages': [s['average_gap_ms'] for s in all_file_stats],
219 'file_stats': all_file_stats
220 }
221
222 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
223 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
224 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
225 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
226 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
227 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
228 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
229
230 # Show per-file breakdown
231 print(f"\nPer-file average gaps:")
232 for stat in all_file_stats:
233 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
234
235 # Calculate variability
236 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
237 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
238
239 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
240 print(f"Variability ratio: {variability_ratio:.2f}x")
241
242 if variability_ratio > 3:
243 print("└─> HIGH variability - mixed conversation patterns across files")
244 elif variability_ratio > 2:
245 print("└─> MODERATE variability - some pattern variation")
246 else:
247 print("└─> LOW variability - consistent conversation rhythm")
248
249 # Save aggregated results
250 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
251 summary_filename = f"aggregated_analysis_{timestamp}.json"
252
253 try:
254 summary_data = {
255 'analysis_date': datetime.now().isoformat(),
256 'folder_path': folder_path,
257 'aggregated_statistics': {
258 'total_files_analyzed': aggregated_stats['total_files_analyzed'],
259 'total_utterances': aggregated_stats['total_utterances'],
260 'total_gaps': aggregated_stats['total_gaps'],
261 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
262 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
263 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
264 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
265 'variability_ratio': variability_ratio
266 },
267 'per_file_results': [
268 {
269 'filename': s['filename'],
270 'average_gap_ms': s['average_gap_ms'],
271 'median_gap_ms': s['median_gap_ms'],
272 'total_utterances': s['total_utterances'],
273 'total_gaps': s['total_gaps']
274 }
275 for s in all_file_stats
276 ]
277 }
278
279 with open(summary_filename, 'w', encoding='utf-8') as f:
280 json.dump(summary_data, f, indent=2, ensure_ascii=False)
281 print(f"\nAggregated analysis saved to: {summary_filename}")
282 except Exception as e:
283 print(f"\nError saving aggregated analysis: {e}")
284
285 return aggregated_stats
286
287
288def determine_streaming_config(aggregated_stats):
289 """
290 Determines optimal Universal-Streaming configuration based on aggregated gap analysis.
291 Returns WebSocket connection parameters.
292 """
293 if aggregated_stats is None:
294 print("\nUsing default balanced configuration (no gap data available)")
295 return {
296 'name': 'Balanced (Default)',
297 'end_of_turn_confidence_threshold': 0.4,
298 'min_end_of_turn_silence_when_confident': 400,
299 'max_turn_silence': 1280,
300 'description': 'Standard configuration for general use'
301 }
302
303 print("\n" + "=" * 70)
304 print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
305 print("=" * 70)
306
307 avg_gap = aggregated_stats['overall_average_gap_ms']
308 num_files = aggregated_stats['total_files_analyzed']
309
310 print(f"\nBased on analysis of {num_files} file(s)")
311 print(f"Overall average gap: {avg_gap:.0f} ms")
312
313 # Determine configuration based on average gap
314 if avg_gap < 500:
315 config = {
316 'name': 'Aggressive',
317 'end_of_turn_confidence_threshold': 0.4,
318 'min_end_of_turn_silence_when_confident': 160,
319 'max_turn_silence': 400,
320 'description': 'Fast-paced conversation with quick turn-taking'
321 }
322 use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
323 elif avg_gap < 1000:
324 config = {
325 'name': 'Balanced',
326 'end_of_turn_confidence_threshold': 0.4,
327 'min_end_of_turn_silence_when_confident': 400,
328 'max_turn_silence': 1280,
329 'description': 'Natural conversation pacing'
330 }
331 use_cases = "General customer support, consultations, standard voice agents"
332 else:
333 config = {
334 'name': 'Conservative',
335 'end_of_turn_confidence_threshold': 0.7,
336 'min_end_of_turn_silence_when_confident': 800,
337 'max_turn_silence': 3600,
338 'description': 'Thoughtful, complex speech with longer pauses'
339 }
340 use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
341
342 print(f"\nSelected Configuration: {config['name']}")
343 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
344 print(f"\nConfiguration Parameters:")
345 print(f" • end_of_turn_confidence_threshold: {config['end_of_turn_confidence_threshold']}")
346 print(f" • min_end_of_turn_silence_when_confident: {config['min_end_of_turn_silence_when_confident']} ms")
347 print(f" • max_turn_silence: {config['max_turn_silence']} ms")
348 print(f"\nRecommended use cases: {use_cases}")
349
350 return config
351
352
353# WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS
354
355def on_open(ws):
356 """Called when the WebSocket connection is established."""
357 print("WebSocket connection opened.")
358 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
359
360 def stream_audio():
361 global stream
362 print("Starting audio streaming...")
363 while not stop_event.is_set():
364 try:
365 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
366
367 with recording_lock:
368 recorded_frames.append(audio_data)
369
370 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
371 except Exception as e:
372 print(f"Error streaming audio: {e}")
373 break
374 print("Audio streaming stopped.")
375
376 global audio_thread
377 audio_thread = threading.Thread(target=stream_audio)
378 audio_thread.daemon = True
379 audio_thread.start()
380
381def on_message(ws, message):
382 try:
383 data = json.loads(message)
384 msg_type = data.get('type')
385
386 if msg_type == "Begin":
387 session_id = data.get('id')
388 expires_at = data.get('expires_at')
389 print(f"\nSession began: ID={session_id}")
390 print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
391 print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
392 print("\nSpeak now... (Press Ctrl+C to stop)\n")
393
394 elif msg_type == "Turn":
395 transcript = data.get('transcript', '')
396 formatted = data.get('turn_is_formatted', False)
397
398 if formatted:
399 print('\r' + ' ' * 80 + '\r', end='')
400 print(f"FINAL: {transcript}")
401 else:
402 print(f"\r partial: {transcript}", end='')
403
404 elif msg_type == "Termination":
405 audio_duration = data.get('audio_duration_seconds', 0)
406 session_duration = data.get('session_duration_seconds', 0)
407 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
408
409 except json.JSONDecodeError as e:
410 print(f"Error decoding message: {e}")
411 except Exception as e:
412 print(f"Error handling message: {e}")
413
414def on_error(ws, error):
415 """Called when a WebSocket error occurs."""
416 print(f"\nWebSocket Error: {error}")
417 stop_event.set()
418
419def on_close(ws, close_status_code, close_msg):
420 """Called when the WebSocket connection is closed."""
421 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
422
423 global stream, audio
424 stop_event.set()
425
426 if stream:
427 if stream.is_active():
428 stream.stop_stream()
429 stream.close()
430 stream = None
431 if audio:
432 audio.terminate()
433 audio = None
434 if audio_thread and audio_thread.is_alive():
435 audio_thread.join(timeout=1.0)
436
437
438# RUN STREAMING WITH OPTIMIZED CONFIGURATION
439
440def run_streaming(config):
441 """
442 Runs the streaming transcription with optimized turn detection settings.
443 """
444 global audio, stream, ws_app, OPTIMIZED_CONFIG
445
446 OPTIMIZED_CONFIG = config
447
448 print("\n" + "=" * 70)
449 print("STARTING REAL-TIME STREAMING")
450 print("=" * 70)
451
452 # Build connection parameters with optimized settings
453 CONNECTION_PARAMS = {
454 "sample_rate": SAMPLE_RATE,
455 "format_turns": True,
456 "end_of_turn_confidence_threshold": config['end_of_turn_confidence_threshold'],
457 "min_end_of_turn_silence_when_confident": str(config['min_end_of_turn_silence_when_confident']),
458 "max_turn_silence": str(config['max_turn_silence'])
459 }
460
461 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
462 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
463
464 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
465 print(f"\nApplied Configuration:")
466 for key, value in CONNECTION_PARAMS.items():
467 print(f" • {key}: {value}")
468
469 # Initialize PyAudio
470 audio = pyaudio.PyAudio()
471
472 # Open microphone stream
473 try:
474 stream = audio.open(
475 input=True,
476 frames_per_buffer=FRAMES_PER_BUFFER,
477 channels=CHANNELS,
478 format=FORMAT,
479 rate=SAMPLE_RATE,
480 )
481 print("\nMicrophone stream opened successfully.")
482 except Exception as e:
483 print(f"Error opening microphone stream: {e}")
484 if audio:
485 audio.terminate()
486 return
487
488 # Create WebSocketApp
489 ws_app = websocket.WebSocketApp(
490 API_ENDPOINT,
491 header={"Authorization": YOUR_API_KEY},
492 on_open=on_open,
493 on_message=on_message,
494 on_error=on_error,
495 on_close=on_close,
496 )
497
498 # Run WebSocketApp in a separate thread
499 ws_thread = threading.Thread(target=ws_app.run_forever)
500 ws_thread.daemon = True
501 ws_thread.start()
502
503 try:
504 while ws_thread.is_alive():
505 time.sleep(0.1)
506 except KeyboardInterrupt:
507 print("\nCtrl+C received. Stopping...")
508 stop_event.set()
509
510 if ws_app and ws_app.sock and ws_app.sock.connected:
511 try:
512 terminate_message = {"type": "Terminate"}
513 print(f"Sending termination message...")
514 ws_app.send(json.dumps(terminate_message))
515 time.sleep(1)
516 except Exception as e:
517 print(f"Error sending termination message: {e}")
518
519 if ws_app:
520 ws_app.close()
521
522 ws_thread.join(timeout=2.0)
523
524 except Exception as e:
525 print(f"\nAn unexpected error occurred: {e}")
526 stop_event.set()
527 if ws_app:
528 ws_app.close()
529 ws_thread.join(timeout=2.0)
530
531 finally:
532 if stream and stream.is_active():
533 stream.stop_stream()
534 if stream:
535 stream.close()
536 if audio:
537 audio.terminate()
538 print("Cleanup complete. Exiting.")
539
540
541# MAIN WORKFLOW
542
543def main():
544 """
545 Main workflow: Analyze multiple files -> Configure -> Run Streaming
546 """
547
548 try:
549 # Step 1: Analyze all audio files in folder
550 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
551
552 # Step 2: Determine optimal configuration based on aggregated data
553 streaming_config = determine_streaming_config(aggregated_stats)
554
555 # Step 3: Run streaming with optimized settings
556 run_streaming(streaming_config)
557
558 except Exception as e:
559 print(f"\nError in workflow: {str(e)}")
560 raise
561
562
563# EXECUTION
564
565if __name__ == "__main__":
566 main()

Step-By-Step Guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

  1. Install All Required Packages
$pip install requests pyaudio websocket-client
  1. Configuration and Global Variables

Set up API credentials, file paths, audio parameters (16kHz sample rate, mono channel), and initialize global variables for managing WebSocket connections and audio streaming threads.

1import requests
2import time
3import json
4import pyaudio
5import websocket
6import threading
7from urllib.parse import urlencode
8from datetime import datetime
9import os
10from pathlib import Path
11
12
13YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
14AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
15
16# Audio Configuration
17SAMPLE_RATE = 16000
18CHANNELS = 1
19FORMAT = pyaudio.paInt16
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21
22# Global variables for audio stream and websocket
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28recorded_frames = []
29recording_lock = threading.Lock()
30
31# Store the optimized configuration
32OPTIMIZED_CONFIG = {}
  1. Define get_audio_files() Function

This function scans a specified folder for audio/video files with supported extensions and returns a sorted list of file paths for batch processing.

1def get_audio_files(folder_path):
2 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
3 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
4 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
5 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
6 '.mov', '.mp4', '.m4v'}
7 folder = Path(folder_path)
8
9 if not folder.exists():
10 raise FileNotFoundError(f"Folder not found: {folder_path}")
11
12 audio_files = [
13 str(f) for f in folder.iterdir()
14 if f.is_file() and f.suffix.lower() in audio_extensions
15 ]
16
17 if not audio_files:
18 raise ValueError(f"No audio files found in {folder_path}")
19
20 return sorted(audio_files)
  1. Define analyze_single_file() Function

This function uploads an audio file to AssemblyAI, requests transcription with speaker labels enabled, polls until completion, then calculates gap statistics between utterances (average, median, min, max) and saves the transcript JSON.

1def analyze_single_file(audio_file, api_key, file_index, total_files):
2 print("\n" + "=" * 70)
3 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
4 print("=" * 70)
5
6 base_url = "https://api.assemblyai.com"
7 headers = {"authorization": api_key}
8
9 # Upload audio file
10 print(f"\nUploading audio file...")
11
12 if audio_file.startswith("http"):
13 upload_url = audio_file
14 print("Using provided URL")
15 else:
16 with open(audio_file, "rb") as f:
17 response = requests.post(
18 base_url + "/v2/upload",
19 headers=headers,
20 data=f
21 )
22 upload_url = response.json()["upload_url"]
23 print(f"Upload complete")
24
25 # Enable Speaker Labels
26 data = {
27 "audio_url": upload_url,
28 "speaker_labels": True,
29 # "language_detection": True # Enable automatic language detection if your files are in different languages
30 }
31
32 response = requests.post(
33 base_url + "/v2/transcript",
34 json=data,
35 headers=headers
36 )
37 transcript_id = response.json()['id']
38 print(f"Transcript ID: {transcript_id}")
39
40 # Poll for completion
41 print("\nWaiting for transcription to complete...")
42 polling_endpoint = base_url + "/v2/transcript/" + transcript_id
43
44 while True:
45 transcription_result = requests.get(polling_endpoint, headers=headers).json()
46
47 if transcription_result['status'] == 'completed':
48 print("Transcription completed!")
49 break
50 elif transcription_result['status'] == 'error':
51 print(f"Transcription failed: {transcription_result['error']}")
52 return None
53 else:
54 time.sleep(3)
55
56 # Calculate gaps
57 utterances = transcription_result['utterances']
58
59 if len(utterances) < 2:
60 print("⚠ Not enough utterances to analyze gaps (need at least 2)")
61 return None
62
63 gaps = []
64 for i in range(len(utterances) - 1):
65 current_end = utterances[i]['end']
66 next_start = utterances[i + 1]['start']
67 gap = next_start - current_end
68
69 if gap > 0:
70 gaps.append(gap)
71
72 if not gaps:
73 print("⚠ No gaps found between utterances (all speech overlaps)")
74 return None
75
76 # Calculate statistics
77 stats = {
78 'filename': Path(audio_file).name,
79 'average_gap_ms': sum(gaps) / len(gaps),
80 'min_gap_ms': min(gaps),
81 'max_gap_ms': max(gaps),
82 'median_gap_ms': sorted(gaps)[len(gaps) // 2],
83 'total_utterances': len(utterances),
84 'total_gaps': len(gaps),
85 'all_gaps': gaps
86 }
87
88 print(f"\nResults for {stats['filename']}:")
89 print(f" Total utterances: {stats['total_utterances']}")
90 print(f" Total gaps: {stats['total_gaps']}")
91 print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
92 print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
93
94 # Save transcript JSON to file
95 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
96 safe_filename = Path(audio_file).stem.replace(' ', '_')
97 json_filename = f"transcript_{safe_filename}_{timestamp}.json"
98
99 try:
100 with open(json_filename, 'w', encoding='utf-8') as f:
101 json.dump(transcription_result, f, indent=2, ensure_ascii=False)
102 print(f" Transcript saved: {json_filename}")
103 except Exception as e:
104 print(f" Error saving transcript: {e}")
105
106 return stats
  1. Define analyze_multiple_files() Function

This function orchestrates the analysis of all files in a folder by calling analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.

1def analyze_multiple_files(folder_path, api_key):
2 print("=" * 70)
3 print("MULTI-FILE UTTERANCE GAP ANALYSIS")
4 print("=" * 70)
5
6 audio_files = get_audio_files(folder_path)
7 total_files = len(audio_files)
8
9 print(f"\nFound {total_files} audio file(s) in: {folder_path}")
10 for i, file in enumerate(audio_files, 1):
11 print(f" {i}. {Path(file).name}")
12
13 # Analyze each file
14 all_file_stats = []
15 all_gaps = []
16
17 for i, audio_file in enumerate(audio_files, 1):
18 try:
19 stats = analyze_single_file(audio_file, api_key, i, total_files)
20 if stats:
21 all_file_stats.append(stats)
22 all_gaps.extend(stats['all_gaps'])
23 except Exception as e:
24 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
25 continue
26
27 if not all_file_stats:
28 print("\n✗ No files were successfully analyzed")
29 return None
30
31 # Calculate aggregated statistics
32 print("\n" + "=" * 70)
33 print("AGGREGATED GAP ANALYSIS RESULTS")
34 print("=" * 70)
35
36 aggregated_stats = {
37 'total_files_analyzed': len(all_file_stats),
38 'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
39 'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
40 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
41 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
42 'overall_min_gap_ms': min(all_gaps),
43 'overall_max_gap_ms': max(all_gaps),
44 'file_averages': [s['average_gap_ms'] for s in all_file_stats],
45 'file_stats': all_file_stats
46 }
47
48 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
49 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
50 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
51 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
52 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
53 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
54 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
55
56 # Show per-file breakdown
57 print(f"\nPer-file average gaps:")
58 for stat in all_file_stats:
59 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
60
61 # Calculate variability
62 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
63 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
64
65 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
66 print(f"Variability ratio: {variability_ratio:.2f}x")
67
68 if variability_ratio > 3:
69 print("└─> HIGH variability - mixed conversation patterns across files")
70 elif variability_ratio > 2:
71 print("└─> MODERATE variability - some pattern variation")
72 else:
73 print("└─> LOW variability - consistent conversation rhythm")
74
75 # Save aggregated results
76 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
77 summary_filename = f"aggregated_analysis_{timestamp}.json"
78
79 try:
80 summary_data = {
81 'analysis_date': datetime.now().isoformat(),
82 'folder_path': folder_path,
83 'aggregated_statistics': {
84 'total_files_analyzed': aggregated_stats['total_files_analyzed'],
85 'total_utterances': aggregated_stats['total_utterances'],
86 'total_gaps': aggregated_stats['total_gaps'],
87 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
88 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
89 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
90 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
91 'variability_ratio': variability_ratio
92 },
93 'per_file_results': [
94 {
95 'filename': s['filename'],
96 'average_gap_ms': s['average_gap_ms'],
97 'median_gap_ms': s['median_gap_ms'],
98 'total_utterances': s['total_utterances'],
99 'total_gaps': s['total_gaps']
100 }
101 for s in all_file_stats
102 ]
103 }
104
105 with open(summary_filename, 'w', encoding='utf-8') as f:
106 json.dump(summary_data, f, indent=2, ensure_ascii=False)
107 print(f"\nAggregated analysis saved to: {summary_filename}")
108 except Exception as e:
109 print(f"\nError saving aggregated analysis: {e}")
110
111 return aggregated_stats
  1. Define determine_streaming_config() Function

This function takes aggregated gap statistics and selects one of three preset configurations with optimized turn detection parameters for different conversation styles.

1def determine_streaming_config(aggregated_stats):
2 if aggregated_stats is None:
3 print("\nUsing default balanced configuration (no gap data available)")
4 return {
5 'name': 'Balanced (Default)',
6 'end_of_turn_confidence_threshold': 0.4,
7 'min_end_of_turn_silence_when_confident': 400,
8 'max_turn_silence': 1280,
9 'description': 'Standard configuration for general use'
10 }
11
12 print("\n" + "=" * 70)
13 print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
14 print("=" * 70)
15
16 avg_gap = aggregated_stats['overall_average_gap_ms']
17 num_files = aggregated_stats['total_files_analyzed']
18
19 print(f"\nBased on analysis of {num_files} file(s)")
20 print(f"Overall average gap: {avg_gap:.0f} ms")
21
22 # Determine configuration based on average gap
23 if avg_gap < 500:
24 config = {
25 'name': 'Aggressive',
26 'end_of_turn_confidence_threshold': 0.4,
27 'min_end_of_turn_silence_when_confident': 160,
28 'max_turn_silence': 400,
29 'description': 'Fast-paced conversation with quick turn-taking'
30 }
31 use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
32 elif avg_gap < 1000:
33 config = {
34 'name': 'Balanced',
35 'end_of_turn_confidence_threshold': 0.4,
36 'min_end_of_turn_silence_when_confident': 400,
37 'max_turn_silence': 1280,
38 'description': 'Natural conversation pacing'
39 }
40 use_cases = "General customer support, consultations, standard voice agents"
41 else:
42 config = {
43 'name': 'Conservative',
44 'end_of_turn_confidence_threshold': 0.7,
45 'min_end_of_turn_silence_when_confident': 800,
46 'max_turn_silence': 3600,
47 'description': 'Thoughtful, complex speech with longer pauses'
48 }
49 use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
50
51 print(f"\nSelected Configuration: {config['name']}")
52 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
53 print(f"\nConfiguration Parameters:")
54 print(f" • end_of_turn_confidence_threshold: {config['end_of_turn_confidence_threshold']}")
55 print(f" • min_end_of_turn_silence_when_confident: {config['min_end_of_turn_silence_when_confident']} ms")
56 print(f" • max_turn_silence: {config['max_turn_silence']} ms")
57 print(f"\nRecommended use cases: {use_cases}")
58
59 return config
  1. Create WebSocket Event Handlers (on_open, on_message, on_error, on_close)

These functions manage the real-time streaming connection lifecycle: on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
5
6 def stream_audio():
7 global stream
8 print("Starting audio streaming...")
9 while not stop_event.is_set():
10 try:
11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
12
13 with recording_lock:
14 recorded_frames.append(audio_data)
15
16 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
17 except Exception as e:
18 print(f"Error streaming audio: {e}")
19 break
20 print("Audio streaming stopped.")
21
22 global audio_thread
23 audio_thread = threading.Thread(target=stream_audio)
24 audio_thread.daemon = True
25 audio_thread.start()
26
27def on_message(ws, message):
28 try:
29 data = json.loads(message)
30 msg_type = data.get('type')
31
32 if msg_type == "Begin":
33 session_id = data.get('id')
34 expires_at = data.get('expires_at')
35 print(f"\nSession began: ID={session_id}")
36 print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
37 print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
38 print("\nSpeak now... (Press Ctrl+C to stop)\n")
39
40 elif msg_type == "Turn":
41 transcript = data.get('transcript', '')
42 formatted = data.get('turn_is_formatted', False)
43
44 if formatted:
45 print('\r' + ' ' * 80 + '\r', end='')
46 print(f"FINAL: {transcript}")
47 else:
48 print(f"\r partial: {transcript}", end='')
49
50 elif msg_type == "Termination":
51 audio_duration = data.get('audio_duration_seconds', 0)
52 session_duration = data.get('session_duration_seconds', 0)
53 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
54
55 except json.JSONDecodeError as e:
56 print(f"Error decoding message: {e}")
57 except Exception as e:
58 print(f"Error handling message: {e}")
59
60def on_error(ws, error):
61 """Called when a WebSocket error occurs."""
62 print(f"\nWebSocket Error: {error}")
63 stop_event.set()
64
65def on_close(ws, close_status_code, close_msg):
66 """Called when the WebSocket connection is closed."""
67 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
68
69 global stream, audio
70 stop_event.set()
71
72 if stream:
73 if stream.is_active():
74 stream.stop_stream()
75 stream.close()
76 stream = None
77 if audio:
78 audio.terminate()
79 audio = None
80 if audio_thread and audio_thread.is_alive():
81 audio_thread.join(timeout=1.0)
  1. Define run_streaming() Function

This function initializes PyAudio to capture microphone input, establishes a WebSocket connection with the optimized configuration parameters, and streams audio in real-time while displaying transcription results until the user stops with Ctrl+C.

1def run_streaming(config):
2 global audio, stream, ws_app, OPTIMIZED_CONFIG
3
4 OPTIMIZED_CONFIG = config
5
6 print("\n" + "=" * 70)
7 print("STARTING REAL-TIME STREAMING")
8 print("=" * 70)
9
10 # Build connection parameters with optimized settings
11 CONNECTION_PARAMS = {
12 "sample_rate": SAMPLE_RATE,
13 "format_turns": True,
14 "end_of_turn_confidence_threshold": config['end_of_turn_confidence_threshold'],
15 "min_end_of_turn_silence_when_confident": str(config['min_end_of_turn_silence_when_confident']),
16 "max_turn_silence": str(config['max_turn_silence'])
17 }
18
19 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
20 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
21
22 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
23 print(f"\nApplied Configuration:")
24 for key, value in CONNECTION_PARAMS.items():
25 print(f" • {key}: {value}")
26
27 # Initialize PyAudio
28 audio = pyaudio.PyAudio()
29
30 # Open microphone stream
31 try:
32 stream = audio.open(
33 input=True,
34 frames_per_buffer=FRAMES_PER_BUFFER,
35 channels=CHANNELS,
36 format=FORMAT,
37 rate=SAMPLE_RATE,
38 )
39 print("\nMicrophone stream opened successfully.")
40 except Exception as e:
41 print(f"Error opening microphone stream: {e}")
42 if audio:
43 audio.terminate()
44 return
45
46 # Create WebSocketApp
47 ws_app = websocket.WebSocketApp(
48 API_ENDPOINT,
49 header={"Authorization": YOUR_API_KEY},
50 on_open=on_open,
51 on_message=on_message,
52 on_error=on_error,
53 on_close=on_close,
54 )
55
56 # Run WebSocketApp in a separate thread
57 ws_thread = threading.Thread(target=ws_app.run_forever)
58 ws_thread.daemon = True
59 ws_thread.start()
60
61 try:
62 while ws_thread.is_alive():
63 time.sleep(0.1)
64 except KeyboardInterrupt:
65 print("\nCtrl+C received. Stopping...")
66 stop_event.set()
67
68 if ws_app and ws_app.sock and ws_app.sock.connected:
69 try:
70 terminate_message = {"type": "Terminate"}
71 print(f"Sending termination message...")
72 ws_app.send(json.dumps(terminate_message))
73 time.sleep(1)
74 except Exception as e:
75 print(f"Error sending termination message: {e}")
76
77 if ws_app:
78 ws_app.close()
79
80 ws_thread.join(timeout=2.0)
81
82 except Exception as e:
83 print(f"\nAn unexpected error occurred: {e}")
84 stop_event.set()
85 if ws_app:
86 ws_app.close()
87 ws_thread.join(timeout=2.0)
88
89 finally:
90 if stream and stream.is_active():
91 stream.stop_stream()
92 if stream:
93 stream.close()
94 if audio:
95 audio.terminate()
96 print("Cleanup complete. Exiting.")
  1. Define main() Workflow

Execute the three-step process: analyze all audio files in the folder, determine the best streaming configuration based on aggregated utterance gaps, then launch real-time streaming with the optimized settings.

1def main():
2 try:
3 # Step 1: Analyze all audio files in folder
4 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
5
6 # Step 2: Determine optimal configuration based on aggregated data
7 streaming_config = determine_streaming_config(aggregated_stats)
8
9 # Step 3: Run streaming with optimized settings
10 run_streaming(streaming_config)
11
12 except Exception as e:
13 print(f"\nError in workflow: {str(e)}")
14 raise
15
16if __name__ == "__main__":
17 main()