Determine Optimal Turn Detection Settings from Historical Audio Analysis
This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.
Quickstart
1 import requests 2 import time 3 import json 4 import pyaudio 5 import websocket 6 import threading 7 from urllib.parse import urlencode 8 from datetime import datetime 9 import os 10 from pathlib import Path 11 12 13 YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key 14 AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files 15 16 # Audio Configuration 17 SAMPLE_RATE = 16000 18 CHANNELS = 1 19 FORMAT = pyaudio.paInt16 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 22 # Global variables for audio stream and websocket 23 audio = None 24 stream = None 25 ws_app = None 26 audio_thread = None 27 stop_event = threading.Event() 28 recorded_frames = [] 29 recording_lock = threading.Lock() 30 31 # Store the optimized configuration 32 OPTIMIZED_CONFIG = {} 33 34 35 def get_audio_files(folder_path): 36 """ 37 Gets all audio files from the specified folder. 38 Supports all formats accepted by AssemblyAI's API 39 """ 40 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape', 41 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3', 42 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta', 43 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS', 44 '.mov', '.mp4', '.m4v'} 45 folder = Path(folder_path) 46 47 if not folder.exists(): 48 raise FileNotFoundError(f"Folder not found: {folder_path}") 49 50 audio_files = [ 51 str(f) for f in folder.iterdir() 52 if f.is_file() and f.suffix.lower() in audio_extensions 53 ] 54 55 if not audio_files: 56 raise ValueError(f"No audio files found in {folder_path}") 57 58 return sorted(audio_files) 59 60 61 def analyze_single_file(audio_file, api_key, file_index, total_files): 62 """ 63 Analyzes a single audio file and returns gap statistics. 64 """ 65 print("\n" + "=" * 70) 66 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}") 67 print("=" * 70) 68 69 base_url = "https://api.assemblyai.com" 70 headers = {"authorization": api_key} 71 72 # Upload audio file 73 print(f"\nUploading audio file...") 74 75 if audio_file.startswith("http"): 76 upload_url = audio_file 77 print("Using provided URL") 78 else: 79 with open(audio_file, "rb") as f: 80 response = requests.post( 81 base_url + "/v2/upload", 82 headers=headers, 83 data=f 84 ) 85 upload_url = response.json()["upload_url"] 86 print(f"Upload complete") 87 88 # Enable Speaker Labels 89 data = { 90 "audio_url": upload_url, 91 "speaker_labels": True, 92 # "language_detection": True # Enable automatic language detection if your files are in different languages 93 } 94 95 response = requests.post( 96 base_url + "/v2/transcript", 97 json=data, 98 headers=headers 99 ) 100 transcript_id = response.json()['id'] 101 print(f"Transcript ID: {transcript_id}") 102 103 # Poll for completion 104 print("\nWaiting for transcription to complete...") 105 polling_endpoint = base_url + "/v2/transcript/" + transcript_id 106 107 while True: 108 transcription_result = requests.get(polling_endpoint, headers=headers).json() 109 110 if transcription_result['status'] == 'completed': 111 print("Transcription completed!") 112 break 113 elif transcription_result['status'] == 'error': 114 print(f"Transcription failed: {transcription_result['error']}") 115 return None 116 else: 117 time.sleep(3) 118 119 # Calculate gaps 120 utterances = transcription_result['utterances'] 121 122 if len(utterances) < 2: 123 print("⚠ Not enough utterances to analyze gaps (need at least 2)") 124 return None 125 126 gaps = [] 127 for i in range(len(utterances) - 1): 128 current_end = utterances[i]['end'] 129 next_start = utterances[i + 1]['start'] 130 gap = next_start - current_end 131 132 if gap > 0: 133 gaps.append(gap) 134 135 if not gaps: 136 print("⚠ No gaps found between utterances (all speech overlaps)") 137 return None 138 139 # Calculate statistics 140 stats = { 141 'filename': Path(audio_file).name, 142 'average_gap_ms': sum(gaps) / len(gaps), 143 'min_gap_ms': min(gaps), 144 'max_gap_ms': max(gaps), 145 'median_gap_ms': sorted(gaps)[len(gaps) // 2], 146 'total_utterances': len(utterances), 147 'total_gaps': len(gaps), 148 'all_gaps': gaps 149 } 150 151 print(f"\nResults for {stats['filename']}:") 152 print(f" Total utterances: {stats['total_utterances']}") 153 print(f" Total gaps: {stats['total_gaps']}") 154 print(f" Average gap: {stats['average_gap_ms']:.0f} ms") 155 print(f" Median gap: {stats['median_gap_ms']:.0f} ms") 156 157 # Save transcript JSON to file 158 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 159 safe_filename = Path(audio_file).stem.replace(' ', '_') 160 json_filename = f"transcript_{safe_filename}_{timestamp}.json" 161 162 try: 163 with open(json_filename, 'w', encoding='utf-8') as f: 164 json.dump(transcription_result, f, indent=2, ensure_ascii=False) 165 print(f" Transcript saved: {json_filename}") 166 except Exception as e: 167 print(f" Error saving transcript: {e}") 168 169 return stats 170 171 172 def analyze_multiple_files(folder_path, api_key): 173 """ 174 Analyzes all audio files in a folder and returns aggregated statistics. 175 """ 176 print("=" * 70) 177 print("MULTI-FILE UTTERANCE GAP ANALYSIS") 178 print("=" * 70) 179 180 audio_files = get_audio_files(folder_path) 181 total_files = len(audio_files) 182 183 print(f"\nFound {total_files} audio file(s) in: {folder_path}") 184 for i, file in enumerate(audio_files, 1): 185 print(f" {i}. {Path(file).name}") 186 187 # Analyze each file 188 all_file_stats = [] 189 all_gaps = [] 190 191 for i, audio_file in enumerate(audio_files, 1): 192 try: 193 stats = analyze_single_file(audio_file, api_key, i, total_files) 194 if stats: 195 all_file_stats.append(stats) 196 all_gaps.extend(stats['all_gaps']) 197 except Exception as e: 198 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}") 199 continue 200 201 if not all_file_stats: 202 print("\n✗ No files were successfully analyzed") 203 return None 204 205 # Calculate aggregated statistics 206 print("\n" + "=" * 70) 207 print("AGGREGATED GAP ANALYSIS RESULTS") 208 print("=" * 70) 209 210 aggregated_stats = { 211 'total_files_analyzed': len(all_file_stats), 212 'total_utterances': sum(s['total_utterances'] for s in all_file_stats), 213 'total_gaps': sum(s['total_gaps'] for s in all_file_stats), 214 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps), 215 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2], 216 'overall_min_gap_ms': min(all_gaps), 217 'overall_max_gap_ms': max(all_gaps), 218 'file_averages': [s['average_gap_ms'] for s in all_file_stats], 219 'file_stats': all_file_stats 220 } 221 222 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}") 223 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}") 224 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}") 225 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)") 226 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms") 227 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms") 228 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms") 229 230 # Show per-file breakdown 231 print(f"\nPer-file average gaps:") 232 for stat in all_file_stats: 233 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms") 234 235 # Calculate variability 236 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages']) 237 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms'] 238 239 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms") 240 print(f"Variability ratio: {variability_ratio:.2f}x") 241 242 if variability_ratio > 3: 243 print("└─> HIGH variability - mixed conversation patterns across files") 244 elif variability_ratio > 2: 245 print("└─> MODERATE variability - some pattern variation") 246 else: 247 print("└─> LOW variability - consistent conversation rhythm") 248 249 # Save aggregated results 250 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 251 summary_filename = f"aggregated_analysis_{timestamp}.json" 252 253 try: 254 summary_data = { 255 'analysis_date': datetime.now().isoformat(), 256 'folder_path': folder_path, 257 'aggregated_statistics': { 258 'total_files_analyzed': aggregated_stats['total_files_analyzed'], 259 'total_utterances': aggregated_stats['total_utterances'], 260 'total_gaps': aggregated_stats['total_gaps'], 261 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'], 262 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'], 263 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'], 264 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'], 265 'variability_ratio': variability_ratio 266 }, 267 'per_file_results': [ 268 { 269 'filename': s['filename'], 270 'average_gap_ms': s['average_gap_ms'], 271 'median_gap_ms': s['median_gap_ms'], 272 'total_utterances': s['total_utterances'], 273 'total_gaps': s['total_gaps'] 274 } 275 for s in all_file_stats 276 ] 277 } 278 279 with open(summary_filename, 'w', encoding='utf-8') as f: 280 json.dump(summary_data, f, indent=2, ensure_ascii=False) 281 print(f"\nAggregated analysis saved to: {summary_filename}") 282 except Exception as e: 283 print(f"\nError saving aggregated analysis: {e}") 284 285 return aggregated_stats 286 287 288 def determine_streaming_config(aggregated_stats): 289 """ 290 Determines optimal Universal-Streaming configuration based on aggregated gap analysis. 291 Returns WebSocket connection parameters. 292 """ 293 if aggregated_stats is None: 294 print("\nUsing default balanced configuration (no gap data available)") 295 return { 296 'name': 'Balanced (Default)', 297 'end_of_turn_confidence_threshold': 0.4, 298 'min_end_of_turn_silence_when_confident': 400, 299 'max_turn_silence': 1280, 300 'description': 'Standard configuration for general use' 301 } 302 303 print("\n" + "=" * 70) 304 print("DETERMINING OPTIMAL STREAMING CONFIGURATION") 305 print("=" * 70) 306 307 avg_gap = aggregated_stats['overall_average_gap_ms'] 308 num_files = aggregated_stats['total_files_analyzed'] 309 310 print(f"\nBased on analysis of {num_files} file(s)") 311 print(f"Overall average gap: {avg_gap:.0f} ms") 312 313 # Determine configuration based on average gap 314 if avg_gap < 500: 315 config = { 316 'name': 'Aggressive', 317 'end_of_turn_confidence_threshold': 0.4, 318 'min_end_of_turn_silence_when_confident': 160, 319 'max_turn_silence': 400, 320 'description': 'Fast-paced conversation with quick turn-taking' 321 } 322 use_cases = "IVR systems, order confirmations, yes/no queries, retail support" 323 elif avg_gap < 1000: 324 config = { 325 'name': 'Balanced', 326 'end_of_turn_confidence_threshold': 0.4, 327 'min_end_of_turn_silence_when_confident': 400, 328 'max_turn_silence': 1280, 329 'description': 'Natural conversation pacing' 330 } 331 use_cases = "General customer support, consultations, standard voice agents" 332 else: 333 config = { 334 'name': 'Conservative', 335 'end_of_turn_confidence_threshold': 0.7, 336 'min_end_of_turn_silence_when_confident': 800, 337 'max_turn_silence': 3600, 338 'description': 'Thoughtful, complex speech with longer pauses' 339 } 340 use_cases = "Technical support, healthcare, legal consultations, troubleshooting" 341 342 print(f"\nSelected Configuration: {config['name']}") 343 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}") 344 print(f"\nConfiguration Parameters:") 345 print(f" • end_of_turn_confidence_threshold: {config['end_of_turn_confidence_threshold']}") 346 print(f" • min_end_of_turn_silence_when_confident: {config['min_end_of_turn_silence_when_confident']} ms") 347 print(f" • max_turn_silence: {config['max_turn_silence']} ms") 348 print(f"\nRecommended use cases: {use_cases}") 349 350 return config 351 352 353 # WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS 354 355 def on_open(ws): 356 """Called when the WebSocket connection is established.""" 357 print("WebSocket connection opened.") 358 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration") 359 360 def stream_audio(): 361 global stream 362 print("Starting audio streaming...") 363 while not stop_event.is_set(): 364 try: 365 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 366 367 with recording_lock: 368 recorded_frames.append(audio_data) 369 370 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 371 except Exception as e: 372 print(f"Error streaming audio: {e}") 373 break 374 print("Audio streaming stopped.") 375 376 global audio_thread 377 audio_thread = threading.Thread(target=stream_audio) 378 audio_thread.daemon = True 379 audio_thread.start() 380 381 def on_message(ws, message): 382 try: 383 data = json.loads(message) 384 msg_type = data.get('type') 385 386 if msg_type == "Begin": 387 session_id = data.get('id') 388 expires_at = data.get('expires_at') 389 print(f"\nSession began: ID={session_id}") 390 print(f" Expires at: {datetime.fromtimestamp(expires_at)}") 391 print(f" Configuration: {OPTIMIZED_CONFIG['name']}") 392 print("\nSpeak now... (Press Ctrl+C to stop)\n") 393 394 elif msg_type == "Turn": 395 transcript = data.get('transcript', '') 396 formatted = data.get('turn_is_formatted', False) 397 398 if formatted: 399 print('\r' + ' ' * 80 + '\r', end='') 400 print(f"FINAL: {transcript}") 401 else: 402 print(f"\r partial: {transcript}", end='') 403 404 elif msg_type == "Termination": 405 audio_duration = data.get('audio_duration_seconds', 0) 406 session_duration = data.get('session_duration_seconds', 0) 407 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s") 408 409 except json.JSONDecodeError as e: 410 print(f"Error decoding message: {e}") 411 except Exception as e: 412 print(f"Error handling message: {e}") 413 414 def on_error(ws, error): 415 """Called when a WebSocket error occurs.""" 416 print(f"\nWebSocket Error: {error}") 417 stop_event.set() 418 419 def on_close(ws, close_status_code, close_msg): 420 """Called when the WebSocket connection is closed.""" 421 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 422 423 global stream, audio 424 stop_event.set() 425 426 if stream: 427 if stream.is_active(): 428 stream.stop_stream() 429 stream.close() 430 stream = None 431 if audio: 432 audio.terminate() 433 audio = None 434 if audio_thread and audio_thread.is_alive(): 435 audio_thread.join(timeout=1.0) 436 437 438 # RUN STREAMING WITH OPTIMIZED CONFIGURATION 439 440 def run_streaming(config): 441 """ 442 Runs the streaming transcription with optimized turn detection settings. 443 """ 444 global audio, stream, ws_app, OPTIMIZED_CONFIG 445 446 OPTIMIZED_CONFIG = config 447 448 print("\n" + "=" * 70) 449 print("STARTING REAL-TIME STREAMING") 450 print("=" * 70) 451 452 # Build connection parameters with optimized settings 453 CONNECTION_PARAMS = { 454 "sample_rate": SAMPLE_RATE, 455 "format_turns": True, 456 "end_of_turn_confidence_threshold": config['end_of_turn_confidence_threshold'], 457 "min_end_of_turn_silence_when_confident": str(config['min_end_of_turn_silence_when_confident']), 458 "max_turn_silence": str(config['max_turn_silence']) 459 } 460 461 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 462 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 463 464 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}") 465 print(f"\nApplied Configuration:") 466 for key, value in CONNECTION_PARAMS.items(): 467 print(f" • {key}: {value}") 468 469 # Initialize PyAudio 470 audio = pyaudio.PyAudio() 471 472 # Open microphone stream 473 try: 474 stream = audio.open( 475 input=True, 476 frames_per_buffer=FRAMES_PER_BUFFER, 477 channels=CHANNELS, 478 format=FORMAT, 479 rate=SAMPLE_RATE, 480 ) 481 print("\nMicrophone stream opened successfully.") 482 except Exception as e: 483 print(f"Error opening microphone stream: {e}") 484 if audio: 485 audio.terminate() 486 return 487 488 # Create WebSocketApp 489 ws_app = websocket.WebSocketApp( 490 API_ENDPOINT, 491 header={"Authorization": YOUR_API_KEY}, 492 on_open=on_open, 493 on_message=on_message, 494 on_error=on_error, 495 on_close=on_close, 496 ) 497 498 # Run WebSocketApp in a separate thread 499 ws_thread = threading.Thread(target=ws_app.run_forever) 500 ws_thread.daemon = True 501 ws_thread.start() 502 503 try: 504 while ws_thread.is_alive(): 505 time.sleep(0.1) 506 except KeyboardInterrupt: 507 print("\nCtrl+C received. Stopping...") 508 stop_event.set() 509 510 if ws_app and ws_app.sock and ws_app.sock.connected: 511 try: 512 terminate_message = {"type": "Terminate"} 513 print(f"Sending termination message...") 514 ws_app.send(json.dumps(terminate_message)) 515 time.sleep(1) 516 except Exception as e: 517 print(f"Error sending termination message: {e}") 518 519 if ws_app: 520 ws_app.close() 521 522 ws_thread.join(timeout=2.0) 523 524 except Exception as e: 525 print(f"\nAn unexpected error occurred: {e}") 526 stop_event.set() 527 if ws_app: 528 ws_app.close() 529 ws_thread.join(timeout=2.0) 530 531 finally: 532 if stream and stream.is_active(): 533 stream.stop_stream() 534 if stream: 535 stream.close() 536 if audio: 537 audio.terminate() 538 print("Cleanup complete. Exiting.") 539 540 541 # MAIN WORKFLOW 542 543 def main(): 544 """ 545 Main workflow: Analyze multiple files -> Configure -> Run Streaming 546 """ 547 548 try: 549 # Step 1: Analyze all audio files in folder 550 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY) 551 552 # Step 2: Determine optimal configuration based on aggregated data 553 streaming_config = determine_streaming_config(aggregated_stats) 554 555 # Step 3: Run streaming with optimized settings 556 run_streaming(streaming_config) 557 558 except Exception as e: 559 print(f"\nError in workflow: {str(e)}") 560 raise 561 562 563 # EXECUTION 564 565 if __name__ == "__main__": 566 main()
Step-By-Step Guide
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
- Install All Required Packages
$ pip install requests pyaudio websocket-client
- Configuration and Global Variables
Set up API credentials, file paths, audio parameters (16kHz sample rate, mono channel), and initialize global variables for managing WebSocket connections and audio streaming threads.
1 import requests 2 import time 3 import json 4 import pyaudio 5 import websocket 6 import threading 7 from urllib.parse import urlencode 8 from datetime import datetime 9 import os 10 from pathlib import Path 11 12 13 YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key 14 AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files 15 16 # Audio Configuration 17 SAMPLE_RATE = 16000 18 CHANNELS = 1 19 FORMAT = pyaudio.paInt16 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 22 # Global variables for audio stream and websocket 23 audio = None 24 stream = None 25 ws_app = None 26 audio_thread = None 27 stop_event = threading.Event() 28 recorded_frames = [] 29 recording_lock = threading.Lock() 30 31 # Store the optimized configuration 32 OPTIMIZED_CONFIG = {}
- Define get_audio_files() Function
This function scans a specified folder for audio/video files with supported extensions and returns a sorted list of file paths for batch processing.
1 def get_audio_files(folder_path): 2 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape', 3 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3', 4 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta', 5 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS', 6 '.mov', '.mp4', '.m4v'} 7 folder = Path(folder_path) 8 9 if not folder.exists(): 10 raise FileNotFoundError(f"Folder not found: {folder_path}") 11 12 audio_files = [ 13 str(f) for f in folder.iterdir() 14 if f.is_file() and f.suffix.lower() in audio_extensions 15 ] 16 17 if not audio_files: 18 raise ValueError(f"No audio files found in {folder_path}") 19 20 return sorted(audio_files)
- Define
analyze_single_file()Function
This function uploads an audio file to AssemblyAI, requests transcription with speaker labels enabled, polls until completion, then calculates gap statistics between utterances (average, median, min, max) and saves the transcript JSON.
1 def analyze_single_file(audio_file, api_key, file_index, total_files): 2 print("\n" + "=" * 70) 3 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}") 4 print("=" * 70) 5 6 base_url = "https://api.assemblyai.com" 7 headers = {"authorization": api_key} 8 9 # Upload audio file 10 print(f"\nUploading audio file...") 11 12 if audio_file.startswith("http"): 13 upload_url = audio_file 14 print("Using provided URL") 15 else: 16 with open(audio_file, "rb") as f: 17 response = requests.post( 18 base_url + "/v2/upload", 19 headers=headers, 20 data=f 21 ) 22 upload_url = response.json()["upload_url"] 23 print(f"Upload complete") 24 25 # Enable Speaker Labels 26 data = { 27 "audio_url": upload_url, 28 "speaker_labels": True, 29 # "language_detection": True # Enable automatic language detection if your files are in different languages 30 } 31 32 response = requests.post( 33 base_url + "/v2/transcript", 34 json=data, 35 headers=headers 36 ) 37 transcript_id = response.json()['id'] 38 print(f"Transcript ID: {transcript_id}") 39 40 # Poll for completion 41 print("\nWaiting for transcription to complete...") 42 polling_endpoint = base_url + "/v2/transcript/" + transcript_id 43 44 while True: 45 transcription_result = requests.get(polling_endpoint, headers=headers).json() 46 47 if transcription_result['status'] == 'completed': 48 print("Transcription completed!") 49 break 50 elif transcription_result['status'] == 'error': 51 print(f"Transcription failed: {transcription_result['error']}") 52 return None 53 else: 54 time.sleep(3) 55 56 # Calculate gaps 57 utterances = transcription_result['utterances'] 58 59 if len(utterances) < 2: 60 print("⚠ Not enough utterances to analyze gaps (need at least 2)") 61 return None 62 63 gaps = [] 64 for i in range(len(utterances) - 1): 65 current_end = utterances[i]['end'] 66 next_start = utterances[i + 1]['start'] 67 gap = next_start - current_end 68 69 if gap > 0: 70 gaps.append(gap) 71 72 if not gaps: 73 print("⚠ No gaps found between utterances (all speech overlaps)") 74 return None 75 76 # Calculate statistics 77 stats = { 78 'filename': Path(audio_file).name, 79 'average_gap_ms': sum(gaps) / len(gaps), 80 'min_gap_ms': min(gaps), 81 'max_gap_ms': max(gaps), 82 'median_gap_ms': sorted(gaps)[len(gaps) // 2], 83 'total_utterances': len(utterances), 84 'total_gaps': len(gaps), 85 'all_gaps': gaps 86 } 87 88 print(f"\nResults for {stats['filename']}:") 89 print(f" Total utterances: {stats['total_utterances']}") 90 print(f" Total gaps: {stats['total_gaps']}") 91 print(f" Average gap: {stats['average_gap_ms']:.0f} ms") 92 print(f" Median gap: {stats['median_gap_ms']:.0f} ms") 93 94 # Save transcript JSON to file 95 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 96 safe_filename = Path(audio_file).stem.replace(' ', '_') 97 json_filename = f"transcript_{safe_filename}_{timestamp}.json" 98 99 try: 100 with open(json_filename, 'w', encoding='utf-8') as f: 101 json.dump(transcription_result, f, indent=2, ensure_ascii=False) 102 print(f" Transcript saved: {json_filename}") 103 except Exception as e: 104 print(f" Error saving transcript: {e}") 105 106 return stats
- Define
analyze_multiple_files()Function
This function orchestrates the analysis of all files in a folder by calling analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.
1 def analyze_multiple_files(folder_path, api_key): 2 print("=" * 70) 3 print("MULTI-FILE UTTERANCE GAP ANALYSIS") 4 print("=" * 70) 5 6 audio_files = get_audio_files(folder_path) 7 total_files = len(audio_files) 8 9 print(f"\nFound {total_files} audio file(s) in: {folder_path}") 10 for i, file in enumerate(audio_files, 1): 11 print(f" {i}. {Path(file).name}") 12 13 # Analyze each file 14 all_file_stats = [] 15 all_gaps = [] 16 17 for i, audio_file in enumerate(audio_files, 1): 18 try: 19 stats = analyze_single_file(audio_file, api_key, i, total_files) 20 if stats: 21 all_file_stats.append(stats) 22 all_gaps.extend(stats['all_gaps']) 23 except Exception as e: 24 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}") 25 continue 26 27 if not all_file_stats: 28 print("\n✗ No files were successfully analyzed") 29 return None 30 31 # Calculate aggregated statistics 32 print("\n" + "=" * 70) 33 print("AGGREGATED GAP ANALYSIS RESULTS") 34 print("=" * 70) 35 36 aggregated_stats = { 37 'total_files_analyzed': len(all_file_stats), 38 'total_utterances': sum(s['total_utterances'] for s in all_file_stats), 39 'total_gaps': sum(s['total_gaps'] for s in all_file_stats), 40 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps), 41 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2], 42 'overall_min_gap_ms': min(all_gaps), 43 'overall_max_gap_ms': max(all_gaps), 44 'file_averages': [s['average_gap_ms'] for s in all_file_stats], 45 'file_stats': all_file_stats 46 } 47 48 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}") 49 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}") 50 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}") 51 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)") 52 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms") 53 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms") 54 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms") 55 56 # Show per-file breakdown 57 print(f"\nPer-file average gaps:") 58 for stat in all_file_stats: 59 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms") 60 61 # Calculate variability 62 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages']) 63 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms'] 64 65 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms") 66 print(f"Variability ratio: {variability_ratio:.2f}x") 67 68 if variability_ratio > 3: 69 print("└─> HIGH variability - mixed conversation patterns across files") 70 elif variability_ratio > 2: 71 print("└─> MODERATE variability - some pattern variation") 72 else: 73 print("└─> LOW variability - consistent conversation rhythm") 74 75 # Save aggregated results 76 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 77 summary_filename = f"aggregated_analysis_{timestamp}.json" 78 79 try: 80 summary_data = { 81 'analysis_date': datetime.now().isoformat(), 82 'folder_path': folder_path, 83 'aggregated_statistics': { 84 'total_files_analyzed': aggregated_stats['total_files_analyzed'], 85 'total_utterances': aggregated_stats['total_utterances'], 86 'total_gaps': aggregated_stats['total_gaps'], 87 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'], 88 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'], 89 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'], 90 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'], 91 'variability_ratio': variability_ratio 92 }, 93 'per_file_results': [ 94 { 95 'filename': s['filename'], 96 'average_gap_ms': s['average_gap_ms'], 97 'median_gap_ms': s['median_gap_ms'], 98 'total_utterances': s['total_utterances'], 99 'total_gaps': s['total_gaps'] 100 } 101 for s in all_file_stats 102 ] 103 } 104 105 with open(summary_filename, 'w', encoding='utf-8') as f: 106 json.dump(summary_data, f, indent=2, ensure_ascii=False) 107 print(f"\nAggregated analysis saved to: {summary_filename}") 108 except Exception as e: 109 print(f"\nError saving aggregated analysis: {e}") 110 111 return aggregated_stats
- Define
determine_streaming_config()Function
This function takes aggregated gap statistics and selects one of three preset configurations with optimized turn detection parameters for different conversation styles.
1 def determine_streaming_config(aggregated_stats): 2 if aggregated_stats is None: 3 print("\nUsing default balanced configuration (no gap data available)") 4 return { 5 'name': 'Balanced (Default)', 6 'end_of_turn_confidence_threshold': 0.4, 7 'min_end_of_turn_silence_when_confident': 400, 8 'max_turn_silence': 1280, 9 'description': 'Standard configuration for general use' 10 } 11 12 print("\n" + "=" * 70) 13 print("DETERMINING OPTIMAL STREAMING CONFIGURATION") 14 print("=" * 70) 15 16 avg_gap = aggregated_stats['overall_average_gap_ms'] 17 num_files = aggregated_stats['total_files_analyzed'] 18 19 print(f"\nBased on analysis of {num_files} file(s)") 20 print(f"Overall average gap: {avg_gap:.0f} ms") 21 22 # Determine configuration based on average gap 23 if avg_gap < 500: 24 config = { 25 'name': 'Aggressive', 26 'end_of_turn_confidence_threshold': 0.4, 27 'min_end_of_turn_silence_when_confident': 160, 28 'max_turn_silence': 400, 29 'description': 'Fast-paced conversation with quick turn-taking' 30 } 31 use_cases = "IVR systems, order confirmations, yes/no queries, retail support" 32 elif avg_gap < 1000: 33 config = { 34 'name': 'Balanced', 35 'end_of_turn_confidence_threshold': 0.4, 36 'min_end_of_turn_silence_when_confident': 400, 37 'max_turn_silence': 1280, 38 'description': 'Natural conversation pacing' 39 } 40 use_cases = "General customer support, consultations, standard voice agents" 41 else: 42 config = { 43 'name': 'Conservative', 44 'end_of_turn_confidence_threshold': 0.7, 45 'min_end_of_turn_silence_when_confident': 800, 46 'max_turn_silence': 3600, 47 'description': 'Thoughtful, complex speech with longer pauses' 48 } 49 use_cases = "Technical support, healthcare, legal consultations, troubleshooting" 50 51 print(f"\nSelected Configuration: {config['name']}") 52 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}") 53 print(f"\nConfiguration Parameters:") 54 print(f" • end_of_turn_confidence_threshold: {config['end_of_turn_confidence_threshold']}") 55 print(f" • min_end_of_turn_silence_when_confident: {config['min_end_of_turn_silence_when_confident']} ms") 56 print(f" • max_turn_silence: {config['max_turn_silence']} ms") 57 print(f"\nRecommended use cases: {use_cases}") 58 59 return config
- Create WebSocket Event Handlers (
on_open,on_message,on_error,on_close)
These functions manage the real-time streaming connection lifecycle: on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration") 5 6 def stream_audio(): 7 global stream 8 print("Starting audio streaming...") 9 while not stop_event.is_set(): 10 try: 11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 12 13 with recording_lock: 14 recorded_frames.append(audio_data) 15 16 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 17 except Exception as e: 18 print(f"Error streaming audio: {e}") 19 break 20 print("Audio streaming stopped.") 21 22 global audio_thread 23 audio_thread = threading.Thread(target=stream_audio) 24 audio_thread.daemon = True 25 audio_thread.start() 26 27 def on_message(ws, message): 28 try: 29 data = json.loads(message) 30 msg_type = data.get('type') 31 32 if msg_type == "Begin": 33 session_id = data.get('id') 34 expires_at = data.get('expires_at') 35 print(f"\nSession began: ID={session_id}") 36 print(f" Expires at: {datetime.fromtimestamp(expires_at)}") 37 print(f" Configuration: {OPTIMIZED_CONFIG['name']}") 38 print("\nSpeak now... (Press Ctrl+C to stop)\n") 39 40 elif msg_type == "Turn": 41 transcript = data.get('transcript', '') 42 formatted = data.get('turn_is_formatted', False) 43 44 if formatted: 45 print('\r' + ' ' * 80 + '\r', end='') 46 print(f"FINAL: {transcript}") 47 else: 48 print(f"\r partial: {transcript}", end='') 49 50 elif msg_type == "Termination": 51 audio_duration = data.get('audio_duration_seconds', 0) 52 session_duration = data.get('session_duration_seconds', 0) 53 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s") 54 55 except json.JSONDecodeError as e: 56 print(f"Error decoding message: {e}") 57 except Exception as e: 58 print(f"Error handling message: {e}") 59 60 def on_error(ws, error): 61 """Called when a WebSocket error occurs.""" 62 print(f"\nWebSocket Error: {error}") 63 stop_event.set() 64 65 def on_close(ws, close_status_code, close_msg): 66 """Called when the WebSocket connection is closed.""" 67 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 68 69 global stream, audio 70 stop_event.set() 71 72 if stream: 73 if stream.is_active(): 74 stream.stop_stream() 75 stream.close() 76 stream = None 77 if audio: 78 audio.terminate() 79 audio = None 80 if audio_thread and audio_thread.is_alive(): 81 audio_thread.join(timeout=1.0)
- Define
run_streaming()Function
This function initializes PyAudio to capture microphone input, establishes a WebSocket connection with the optimized configuration parameters, and streams audio in real-time while displaying transcription results until the user stops with Ctrl+C.
1 def run_streaming(config): 2 global audio, stream, ws_app, OPTIMIZED_CONFIG 3 4 OPTIMIZED_CONFIG = config 5 6 print("\n" + "=" * 70) 7 print("STARTING REAL-TIME STREAMING") 8 print("=" * 70) 9 10 # Build connection parameters with optimized settings 11 CONNECTION_PARAMS = { 12 "sample_rate": SAMPLE_RATE, 13 "format_turns": True, 14 "end_of_turn_confidence_threshold": config['end_of_turn_confidence_threshold'], 15 "min_end_of_turn_silence_when_confident": str(config['min_end_of_turn_silence_when_confident']), 16 "max_turn_silence": str(config['max_turn_silence']) 17 } 18 19 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 20 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 21 22 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}") 23 print(f"\nApplied Configuration:") 24 for key, value in CONNECTION_PARAMS.items(): 25 print(f" • {key}: {value}") 26 27 # Initialize PyAudio 28 audio = pyaudio.PyAudio() 29 30 # Open microphone stream 31 try: 32 stream = audio.open( 33 input=True, 34 frames_per_buffer=FRAMES_PER_BUFFER, 35 channels=CHANNELS, 36 format=FORMAT, 37 rate=SAMPLE_RATE, 38 ) 39 print("\nMicrophone stream opened successfully.") 40 except Exception as e: 41 print(f"Error opening microphone stream: {e}") 42 if audio: 43 audio.terminate() 44 return 45 46 # Create WebSocketApp 47 ws_app = websocket.WebSocketApp( 48 API_ENDPOINT, 49 header={"Authorization": YOUR_API_KEY}, 50 on_open=on_open, 51 on_message=on_message, 52 on_error=on_error, 53 on_close=on_close, 54 ) 55 56 # Run WebSocketApp in a separate thread 57 ws_thread = threading.Thread(target=ws_app.run_forever) 58 ws_thread.daemon = True 59 ws_thread.start() 60 61 try: 62 while ws_thread.is_alive(): 63 time.sleep(0.1) 64 except KeyboardInterrupt: 65 print("\nCtrl+C received. Stopping...") 66 stop_event.set() 67 68 if ws_app and ws_app.sock and ws_app.sock.connected: 69 try: 70 terminate_message = {"type": "Terminate"} 71 print(f"Sending termination message...") 72 ws_app.send(json.dumps(terminate_message)) 73 time.sleep(1) 74 except Exception as e: 75 print(f"Error sending termination message: {e}") 76 77 if ws_app: 78 ws_app.close() 79 80 ws_thread.join(timeout=2.0) 81 82 except Exception as e: 83 print(f"\nAn unexpected error occurred: {e}") 84 stop_event.set() 85 if ws_app: 86 ws_app.close() 87 ws_thread.join(timeout=2.0) 88 89 finally: 90 if stream and stream.is_active(): 91 stream.stop_stream() 92 if stream: 93 stream.close() 94 if audio: 95 audio.terminate() 96 print("Cleanup complete. Exiting.")
- Define
main()Workflow
Execute the three-step process: analyze all audio files in the folder, determine the best streaming configuration based on aggregated utterance gaps, then launch real-time streaming with the optimized settings.
1 def main(): 2 try: 3 # Step 1: Analyze all audio files in folder 4 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY) 5 6 # Step 2: Determine optimal configuration based on aggregated data 7 streaming_config = determine_streaming_config(aggregated_stats) 8 9 # Step 3: Run streaming with optimized settings 10 run_streaming(streaming_config) 11 12 except Exception as e: 13 print(f"\nError in workflow: {str(e)}") 14 raise 15 16 if __name__ == "__main__": 17 main()