Use cases & integrationsUse case guidesBuild a medical scribe
Build a Real-Time Medical Scribe
Build a Real-Time Medical Scribe
This example implements a real-time medical scribe using Universal-3 Pro Streaming with LLM Gateway post-processing. It streams audio from a microphone, transcribes with medical terminology boosting, applies LLM-powered clinical editing on each turn, and generates a SOAP note at the end of the session.
For post-visit documentation using pre-recorded audio, see the Post-Visit Medical Scribe guide instead.
1 import os 2 import json 3 import time 4 import threading 5 from datetime import datetime 6 from urllib.parse import urlencode 7 8 import pyaudio 9 import websocket 10 import requests 11 from dotenv import load_dotenv 12 from simple_term_menu import TerminalMenu 13 14 # Load environment variables from .env if present 15 try: 16 load_dotenv() 17 except Exception: 18 pass 19 20 """ 21 Medical Scribe – Streaming STT + LLM Gateway Enhancement (SOAP-ready) 22 23 What this does 24 -------------- 25 1) Streams mic audio to AssemblyAI Streaming STT (with formatted turns + keyterms) 26 2) On every utterance or formatted final turn, calls AssemblyAI LLM Gateway to 27 apply *medical* edits (terminology, punctuation, proper nouns, etc.) 28 3) Logs encounter turns and generates a SOAP note at session end via the Gateway 29 30 Quick start 31 ----------- 32 export ASSEMBLYAI_API_KEY=your_key 33 # Optional: pick a Gateway model (defaults to Claude 3.5 Haiku) 34 export LLM_GATEWAY_MODEL=claude-3-5-haiku-20241022 35 36 python medical_scribe_llm_gateway.py 37 """ 38 39 # === Config === 40 ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "your_api_key_here") 41 42 # Medical context and terminology (seed – you can swap at runtime) 43 MEDICAL_KEYTERMS = [ 44 "hypertension", 45 "diabetes mellitus", 46 "coronary artery disease", 47 "metformin 1000mg", 48 "lisinopril 10mg", 49 "atorvastatin 20mg", 50 "chief complaint", 51 "history of present illness", 52 "review of systems", 53 "physical examination", 54 "assessment and plan", 55 "auscultation", 56 "palpation", 57 "reflexes", 58 "range of motion", 59 ] 60 61 # WebSocket / STT parameters - CONSERVATIVE SETTINGS FOR MEDICAL 62 CONNECTION_PARAMS = { 63 "sample_rate": 16000, 64 "speech_model": "u3-rt-pro", 65 "domain": "medical-v1", 66 "format_turns": True, # Always true for readable clinical notes 67 68 # MEDICAL SCRIBE CONFIGURATION - Conservative for clinical accuracy 69 # Medical conversations have LONG pauses (provider thinking, examining patient, reviewing charts) 70 # u3-rt-pro defaults: min_turn_silence=100ms, max_turn_silence=1000ms 71 "min_turn_silence": 800, # Wait much longer (vs ~100ms for voice agents, 560ms for meetings) 72 "max_turn_silence": 2000, # Longer for clinical thinking pauses 73 74 "keyterms_prompt": MEDICAL_KEYTERMS, 75 } 76 77 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 78 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS, doseq=True)}" 79 80 # Audio config 81 FRAMES_PER_BUFFER = 800 # 50ms @ 16kHz 82 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 83 CHANNELS = 1 84 FORMAT = pyaudio.paInt16 85 86 # Globals 87 audio = None 88 stream = None 89 ws_app = None 90 audio_thread = None 91 stop_event = threading.Event() 92 encounter_buffer = [] # list of dicts with turn data 93 last_processed_turn = None 94 95 # === Model selection === 96 AVAILABLE_MODELS = [ 97 {"id": "claude-3-haiku-20240307", "name": "Claude 3 Haiku", "description": "Fastest Claude, good for simple tasks"}, 98 {"id": "claude-3-5-haiku-20241022", "name": "Claude 3.5 Haiku", "description": "Fast with better reasoning"}, 99 {"id": "claude-sonnet-4-20250514", "name": "Claude Sonnet 4", "description": "Balanced speed & intelligence"}, 100 {"id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5", "description": "Best for coding & agents"}, 101 {"id": "claude-opus-4-20250514", "name": "Claude Opus 4", "description": "Most powerful, deep reasoning"}, 102 ] 103 104 def select_model(): 105 menu_entries = [f"{m['name']} - {m['description']}" for m in AVAILABLE_MODELS] 106 terminal_menu = TerminalMenu( 107 menu_entries, 108 title="Select a model (Use ↑↓ arrows, Enter to select):", 109 menu_cursor="❯ ", 110 menu_cursor_style=("fg_cyan", "bold"), 111 menu_highlight_style=("bg_cyan", "fg_black"), 112 cycle_cursor=True, 113 clear_screen=False, 114 show_search_hint=True, 115 ) 116 idx = terminal_menu.show() 117 if idx is None: 118 print("Model selection cancelled. Exiting...") 119 raise SystemExit(0) 120 return AVAILABLE_MODELS[idx]["id"] 121 122 selected_model = None 123 124 # === Gateway helpers === 125 126 def _gateway_chat(messages, max_tokens=800, temperature=0.2, retries=3, backoff=0.75): 127 """Call AssemblyAI LLM Gateway with debug logging and retry.""" 128 url = "https://llm-gateway.assemblyai.com/v1/chat/completions" 129 headers = { 130 "Authorization": ASSEMBLYAI_API_KEY, 131 "Content-Type": "application/json", 132 } 133 payload = { 134 "model": selected_model, 135 "messages": messages, 136 "max_tokens": max_tokens, 137 "temperature": temperature, 138 } 139 140 last = None 141 for attempt in range(retries): 142 try: 143 print(f"[LLM] POST {url} (model={selected_model}, attempt {attempt+1}/{retries})") 144 resp = requests.post(url, headers=headers, json=payload, timeout=60) 145 print(f"[LLM] ← status {resp.status_code}, bytes {len(resp.content)}") 146 last = resp 147 except Exception as e: 148 if attempt == retries - 1: 149 raise RuntimeError(f"Gateway request error: {e}") 150 time.sleep(backoff * (attempt + 1)) 151 continue 152 153 if resp.status_code == 200: 154 data = resp.json() 155 if not data.get("choices") or not data["choices"][0].get("message"): 156 raise RuntimeError(f"Gateway OK but empty body: {str(data)[:200]}") 157 return data 158 if resp.status_code in (429, 500, 502, 503, 504): 159 print(f"[LLM RETRY] {resp.status_code}: {resp.text[:180]}") 160 time.sleep(backoff * (attempt + 1)) 161 continue 162 raise RuntimeError(f"Gateway error {resp.status_code}: {resp.text[:300]}") 163 164 raise RuntimeError( 165 f"Gateway failed after retries. Last={getattr(last,'status_code','n/a')} {getattr(last,'text','')[:180]}" 166 ) 167 168 169 def post_process_with_llm(text: str) -> str: 170 """Medical editing & normalization using LLM Gateway.""" 171 system = { 172 "role": "system", 173 "content": ( 174 "You are a clinical transcription editor. Keep the speaker's words, " 175 "fix medical terminology (drug names, dosages, anatomy), proper nouns, " 176 "and punctuation for readability. Preserve meaning and avoid inventing " 177 "details. Prefer U.S. clinical style. If a medication or condition is " 178 "phonetically close, correct to the most likely clinical term." 179 ), 180 } 181 182 user = { 183 "role": "user", 184 "content": ( 185 "Context keyterms (JSON array):\n" + json.dumps(MEDICAL_KEYTERMS) + "\n\n" 186 "Edit this short transcript for medical accuracy and readability.\n\n" 187 f"Transcript:\n{text}" 188 ), 189 } 190 191 try: 192 res = _gateway_chat([system, user], max_tokens=600) 193 return res["choices"][0]["message"]["content"].strip() 194 except Exception as e: 195 print(f"[LLM EDIT ERROR] {e}. Falling back to original.") 196 return text 197 198 199 def generate_clinical_note(): 200 """Create a SOAP note from the encounter buffer via Gateway.""" 201 if not encounter_buffer: 202 print("No encounter data to summarize.") 203 return 204 205 print("\n=== GENERATING CLINICAL DOCUMENTATION (SOAP) ===") 206 # Build a compact transcript string for the LLM 207 lines = [] 208 for e in encounter_buffer: 209 if e.get("type") == "utterance": 210 lines.append(f"[{e['timestamp']}] {e.get('speaker', 'Speaker')}: {e['text']}") 211 elif e.get("type") == "final": 212 lines.append(f"[{e['timestamp']}] FINAL: {e['text']}") 213 combined = "\n".join(lines) 214 215 system = { 216 "role": "system", 217 "content": ( 218 "You are a clinician generating concise, structured notes. " 219 "Produce a SOAP note (Subjective, Objective, Assessment, Plan). " 220 "Use bullet points, keep it factual, infer reasonable clinical semantics " 221 "from the transcript but do NOT invent data. Include medications with dosage " 222 "and frequency if mentioned." 223 ), 224 } 225 user = { 226 "role": "user", 227 "content": ( 228 "Create a SOAP note from this clinical encounter transcript.\n\n" 229 f"Transcript:\n{combined}\n\n" 230 "Format strictly as:\n" 231 "Subjective:\n- ...\n\nObjective:\n- ...\n\nAssessment:\n- ...\n\nPlan:\n- ...\n" 232 ), 233 } 234 235 try: 236 res = _gateway_chat([system, user], max_tokens=1200) 237 soap = res["choices"][0]["message"]["content"].strip() 238 fname = f"clinical_note_soap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" 239 with open(fname, "w", encoding="utf-8") as f: 240 f.write(soap) 241 print(f"SOAP note saved: {fname}") 242 except Exception as e: 243 print(f"[SOAP ERROR] {e}") 244 245 246 # === WebSocket callbacks === 247 248 def on_open(ws): 249 print("=" * 80) 250 print(f"[{datetime.now().strftime('%H:%M:%S')}] Medical transcription started") 251 print(f"Connected to: {API_ENDPOINT_BASE_URL}") 252 print(f"Gateway model: {selected_model}") 253 print("=" * 80) 254 print("\nSpeak to begin. Press Ctrl+C to stop.\n") 255 256 def stream_audio(): 257 global stream 258 while not stop_event.is_set(): 259 try: 260 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 261 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 262 except Exception as e: 263 if not stop_event.is_set(): 264 print(f"Error streaming audio: {e}") 265 break 266 267 global audio_thread 268 audio_thread = threading.Thread(target=stream_audio, daemon=True) 269 audio_thread.start() 270 271 272 def on_message(ws, message): 273 global last_processed_turn 274 try: 275 data = json.loads(message) 276 msg_type = data.get("type") 277 278 if msg_type == "Begin": 279 print(f"[SESSION] Started - ID: {data.get('id','N/A')}\n") 280 281 elif msg_type == "Turn": 282 end_of_turn = data.get("end_of_turn", False) 283 transcript = data.get("transcript", "") 284 utterance = data.get("utterance", "") 285 turn_order = data.get("turn_order", 0) 286 eot_conf = data.get("end_of_turn_confidence", 0.0) 287 288 # live partials 289 if not end_of_turn and transcript: 290 print(f"\r[PARTIAL] {transcript[:120]}...", end="", flush=True) 291 292 # If AssemblyAI has finalized a turn, LLM-edit the transcript 293 if end_of_turn and transcript: 294 if last_processed_turn == turn_order: 295 return # avoid duplicate processing 296 last_processed_turn = turn_order 297 298 ts = datetime.now().strftime('%H:%M:%S') 299 print("\n[DEBUG] EOT received (formatted). Calling LLM…") 300 edited = post_process_with_llm(transcript) 301 302 changed = "(edited)" if edited.strip() != transcript.strip() else "(no change)" 303 print(f"\n[{ts}] [FINAL {changed}]") 304 print(f" ├─ Original STT : {transcript}") 305 print(f" └─ Edited by LLM: {edited}") 306 print(f"Turn: {turn_order} | Confidence: {eot_conf:.2%}") 307 308 encounter_buffer.append({ 309 "timestamp": ts, 310 "text": edited, 311 "original_text": transcript, 312 "turn_order": turn_order, 313 "confidence": eot_conf, 314 "type": "final", 315 }) 316 317 # If we also get per-utterance chunks, just log them raw (no LLM) for timeline 318 if utterance: 319 ts = datetime.now().strftime('%H:%M:%S') 320 321 low = utterance.lower() 322 if any(t in low for t in ["medication", "prescribe", "dosage", "mg", "daily"]): 323 print(" 💊 MEDICATION MENTIONED") 324 if any(t in low for t in ["pain", "symptom", "complaint", "problem"]): 325 print(" 🏥 SYMPTOM REPORTED") 326 if any(t in low for t in ["diagnose", "assessment", "impression"]): 327 print(" 📋 DIAGNOSIS DISCUSSED") 328 329 encounter_buffer.append({ 330 "timestamp": ts, 331 "text": utterance, 332 "original_text": utterance, 333 "turn_order": turn_order, 334 "confidence": eot_conf, 335 "type": "utterance", 336 }) 337 print() 338 339 elif msg_type == "Termination": 340 dur = data.get("audio_duration_seconds", 0) 341 print(f"\n[SESSION] Terminated – Duration: {dur}s") 342 save_encounter_transcript() 343 generate_clinical_note() 344 345 elif msg_type == "Error": 346 print(f"\n[ERROR] {data.get('error', 'Unknown error')}") 347 348 except json.JSONDecodeError as e: 349 print(f"Error decoding message: {e}") 350 except Exception as e: 351 print(f"Error handling message: {e}") 352 353 354 def on_error(ws, error): 355 print(f"\n[WEBSOCKET ERROR] {error}") 356 stop_event.set() 357 358 359 def on_close(ws, close_status_code, close_msg): 360 print(f"\n[WEBSOCKET] Disconnected – Status: {close_status_code}") 361 global stream, audio 362 stop_event.set() 363 364 if stream: 365 if stream.is_active(): 366 stream.stop_stream() 367 stream.close() 368 stream = None 369 if audio: 370 audio.terminate() 371 audio = None 372 if audio_thread and audio_thread.is_alive(): 373 audio_thread.join(timeout=1.0) 374 375 376 # === Persist artifacts === 377 378 def save_encounter_transcript(): 379 if not encounter_buffer: 380 print("No encounter data to save.") 381 return 382 383 fname = f"encounter_transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" 384 with open(fname, "w", encoding="utf-8") as f: 385 f.write("Clinical Encounter Transcript\n") 386 f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") 387 f.write("=" * 80 + "\n\n") 388 for e in encounter_buffer: 389 if e.get("speaker"): 390 f.write(f"[{e['timestamp']}] {e['speaker']}: {e['text']}\n") 391 else: 392 f.write(f"[{e['timestamp']}] {e['text']}\n") 393 f.write(f"Confidence: {e['confidence']:.2%}\n\n") 394 print(f"Encounter transcript saved: {fname}") 395 396 397 # === Main === 398 399 def run(): 400 global audio, stream, ws_app, selected_model 401 402 print("=" * 60) 403 print(" 🎙️ Medical Scribe - STT + LLM Gateway") 404 print("=" * 60) 405 selected_model = select_model() 406 print(f"✓ Using model: {selected_model}") 407 408 # Init mic 409 audio = pyaudio.PyAudio() 410 try: 411 stream = audio.open( 412 input=True, 413 frames_per_buffer=FRAMES_PER_BUFFER, 414 channels=CHANNELS, 415 format=FORMAT, 416 rate=SAMPLE_RATE, 417 ) 418 print("Audio stream opened successfully.") 419 except Exception as e: 420 print(f"Error opening audio stream: {e}") 421 if audio: 422 audio.terminate() 423 return 424 425 # Connect WS 426 ws_headers = [f"Authorization: {ASSEMBLYAI_API_KEY}"] 427 ws_app = websocket.WebSocketApp( 428 API_ENDPOINT, 429 header=ws_headers, 430 on_open=on_open, 431 on_message=on_message, 432 on_error=on_error, 433 on_close=on_close, 434 ) 435 436 ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True) 437 ws_thread.start() 438 439 try: 440 while ws_thread.is_alive(): 441 time.sleep(0.1) 442 except KeyboardInterrupt: 443 print("\n\nCtrl+C received. Stopping...") 444 stop_event.set() 445 # best-effort terminate 446 if ws_app and ws_app.sock and ws_app.sock.connected: 447 try: 448 ws_app.send(json.dumps({"type": "Terminate"})) 449 time.sleep(0.5) 450 except Exception as e: 451 print(f"Error sending termination: {e}") 452 if ws_app: 453 ws_app.close() 454 ws_thread.join(timeout=2.0) 455 finally: 456 if stream and stream.is_active(): 457 stream.stop_stream() 458 if stream: 459 stream.close() 460 if audio: 461 audio.terminate() 462 print("Cleanup complete. Exiting.") 463 464 465 if __name__ == "__main__": 466 run()
Next steps
- Build a Post-Visit Medical Scribe — Pre-recorded transcription for post-visit documentation
- Medical Mode for Streaming — Improve streaming medical terminology accuracy
- Universal-3 Pro Streaming — Full streaming model documentation