Build a Real-Time Medical Scribe

This example implements a real-time medical scribe using Universal-3 Pro Streaming with LLM Gateway post-processing. It streams audio from a microphone, transcribes with medical terminology boosting, applies LLM-powered clinical editing on each turn, and generates a SOAP note at the end of the session.

For post-visit documentation using pre-recorded audio, see the Post-Visit Medical Scribe guide instead.

1import os
2import json
3import time
4import threading
5from datetime import datetime
6from urllib.parse import urlencode
7
8import pyaudio
9import websocket
10import requests
11from dotenv import load_dotenv
12from simple_term_menu import TerminalMenu
13
14# Load environment variables from .env if present
15try:
16 load_dotenv()
17except Exception:
18 pass
19
20"""
21Medical Scribe – Streaming STT + LLM Gateway Enhancement (SOAP-ready)
22
23What this does
24--------------
251) Streams mic audio to AssemblyAI Streaming STT (with formatted turns + keyterms)
262) On every utterance or formatted final turn, calls AssemblyAI LLM Gateway to
27 apply *medical* edits (terminology, punctuation, proper nouns, etc.)
283) Logs encounter turns and generates a SOAP note at session end via the Gateway
29
30Quick start
31-----------
32export ASSEMBLYAI_API_KEY=your_key
33# Optional: pick a Gateway model (defaults to Claude 3.5 Haiku)
34export LLM_GATEWAY_MODEL=claude-3-5-haiku-20241022
35
36python medical_scribe_llm_gateway.py
37"""
38
39# === Config ===
40ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "your_api_key_here")
41
42# Medical context and terminology (seed – you can swap at runtime)
43MEDICAL_KEYTERMS = [
44 "hypertension",
45 "diabetes mellitus",
46 "coronary artery disease",
47 "metformin 1000mg",
48 "lisinopril 10mg",
49 "atorvastatin 20mg",
50 "chief complaint",
51 "history of present illness",
52 "review of systems",
53 "physical examination",
54 "assessment and plan",
55 "auscultation",
56 "palpation",
57 "reflexes",
58 "range of motion",
59]
60
61# WebSocket / STT parameters - CONSERVATIVE SETTINGS FOR MEDICAL
62CONNECTION_PARAMS = {
63 "sample_rate": 16000,
64 "speech_model": "u3-rt-pro",
65 "domain": "medical-v1",
66 "format_turns": True, # Always true for readable clinical notes
67
68 # MEDICAL SCRIBE CONFIGURATION - Conservative for clinical accuracy
69 # Medical conversations have LONG pauses (provider thinking, examining patient, reviewing charts)
70 # u3-rt-pro defaults: min_turn_silence=100ms, max_turn_silence=1000ms
71 "min_turn_silence": 800, # Wait much longer (vs ~100ms for voice agents, 560ms for meetings)
72 "max_turn_silence": 2000, # Longer for clinical thinking pauses
73
74 "keyterms_prompt": MEDICAL_KEYTERMS,
75}
76
77API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
78API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS, doseq=True)}"
79
80# Audio config
81FRAMES_PER_BUFFER = 800 # 50ms @ 16kHz
82SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
83CHANNELS = 1
84FORMAT = pyaudio.paInt16
85
86# Globals
87audio = None
88stream = None
89ws_app = None
90audio_thread = None
91stop_event = threading.Event()
92encounter_buffer = [] # list of dicts with turn data
93last_processed_turn = None
94
95# === Model selection ===
96AVAILABLE_MODELS = [
97 {"id": "claude-3-haiku-20240307", "name": "Claude 3 Haiku", "description": "Fastest Claude, good for simple tasks"},
98 {"id": "claude-3-5-haiku-20241022", "name": "Claude 3.5 Haiku", "description": "Fast with better reasoning"},
99 {"id": "claude-sonnet-4-20250514", "name": "Claude Sonnet 4", "description": "Balanced speed & intelligence"},
100 {"id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5", "description": "Best for coding & agents"},
101 {"id": "claude-opus-4-20250514", "name": "Claude Opus 4", "description": "Most powerful, deep reasoning"},
102]
103
104def select_model():
105 menu_entries = [f"{m['name']} - {m['description']}" for m in AVAILABLE_MODELS]
106 terminal_menu = TerminalMenu(
107 menu_entries,
108 title="Select a model (Use ↑↓ arrows, Enter to select):",
109 menu_cursor="❯ ",
110 menu_cursor_style=("fg_cyan", "bold"),
111 menu_highlight_style=("bg_cyan", "fg_black"),
112 cycle_cursor=True,
113 clear_screen=False,
114 show_search_hint=True,
115 )
116 idx = terminal_menu.show()
117 if idx is None:
118 print("Model selection cancelled. Exiting...")
119 raise SystemExit(0)
120 return AVAILABLE_MODELS[idx]["id"]
121
122selected_model = None
123
124# === Gateway helpers ===
125
126def _gateway_chat(messages, max_tokens=800, temperature=0.2, retries=3, backoff=0.75):
127 """Call AssemblyAI LLM Gateway with debug logging and retry."""
128 url = "https://llm-gateway.assemblyai.com/v1/chat/completions"
129 headers = {
130 "Authorization": ASSEMBLYAI_API_KEY,
131 "Content-Type": "application/json",
132 }
133 payload = {
134 "model": selected_model,
135 "messages": messages,
136 "max_tokens": max_tokens,
137 "temperature": temperature,
138 }
139
140 last = None
141 for attempt in range(retries):
142 try:
143 print(f"[LLM] POST {url} (model={selected_model}, attempt {attempt+1}/{retries})")
144 resp = requests.post(url, headers=headers, json=payload, timeout=60)
145 print(f"[LLM] ← status {resp.status_code}, bytes {len(resp.content)}")
146 last = resp
147 except Exception as e:
148 if attempt == retries - 1:
149 raise RuntimeError(f"Gateway request error: {e}")
150 time.sleep(backoff * (attempt + 1))
151 continue
152
153 if resp.status_code == 200:
154 data = resp.json()
155 if not data.get("choices") or not data["choices"][0].get("message"):
156 raise RuntimeError(f"Gateway OK but empty body: {str(data)[:200]}")
157 return data
158 if resp.status_code in (429, 500, 502, 503, 504):
159 print(f"[LLM RETRY] {resp.status_code}: {resp.text[:180]}")
160 time.sleep(backoff * (attempt + 1))
161 continue
162 raise RuntimeError(f"Gateway error {resp.status_code}: {resp.text[:300]}")
163
164 raise RuntimeError(
165 f"Gateway failed after retries. Last={getattr(last,'status_code','n/a')} {getattr(last,'text','')[:180]}"
166 )
167
168
169def post_process_with_llm(text: str) -> str:
170 """Medical editing & normalization using LLM Gateway."""
171 system = {
172 "role": "system",
173 "content": (
174 "You are a clinical transcription editor. Keep the speaker's words, "
175 "fix medical terminology (drug names, dosages, anatomy), proper nouns, "
176 "and punctuation for readability. Preserve meaning and avoid inventing "
177 "details. Prefer U.S. clinical style. If a medication or condition is "
178 "phonetically close, correct to the most likely clinical term."
179 ),
180 }
181
182 user = {
183 "role": "user",
184 "content": (
185 "Context keyterms (JSON array):\n" + json.dumps(MEDICAL_KEYTERMS) + "\n\n"
186 "Edit this short transcript for medical accuracy and readability.\n\n"
187 f"Transcript:\n{text}"
188 ),
189 }
190
191 try:
192 res = _gateway_chat([system, user], max_tokens=600)
193 return res["choices"][0]["message"]["content"].strip()
194 except Exception as e:
195 print(f"[LLM EDIT ERROR] {e}. Falling back to original.")
196 return text
197
198
199def generate_clinical_note():
200 """Create a SOAP note from the encounter buffer via Gateway."""
201 if not encounter_buffer:
202 print("No encounter data to summarize.")
203 return
204
205 print("\n=== GENERATING CLINICAL DOCUMENTATION (SOAP) ===")
206 # Build a compact transcript string for the LLM
207 lines = []
208 for e in encounter_buffer:
209 if e.get("type") == "utterance":
210 lines.append(f"[{e['timestamp']}] {e.get('speaker', 'Speaker')}: {e['text']}")
211 elif e.get("type") == "final":
212 lines.append(f"[{e['timestamp']}] FINAL: {e['text']}")
213 combined = "\n".join(lines)
214
215 system = {
216 "role": "system",
217 "content": (
218 "You are a clinician generating concise, structured notes. "
219 "Produce a SOAP note (Subjective, Objective, Assessment, Plan). "
220 "Use bullet points, keep it factual, infer reasonable clinical semantics "
221 "from the transcript but do NOT invent data. Include medications with dosage "
222 "and frequency if mentioned."
223 ),
224 }
225 user = {
226 "role": "user",
227 "content": (
228 "Create a SOAP note from this clinical encounter transcript.\n\n"
229 f"Transcript:\n{combined}\n\n"
230 "Format strictly as:\n"
231 "Subjective:\n- ...\n\nObjective:\n- ...\n\nAssessment:\n- ...\n\nPlan:\n- ...\n"
232 ),
233 }
234
235 try:
236 res = _gateway_chat([system, user], max_tokens=1200)
237 soap = res["choices"][0]["message"]["content"].strip()
238 fname = f"clinical_note_soap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
239 with open(fname, "w", encoding="utf-8") as f:
240 f.write(soap)
241 print(f"SOAP note saved: {fname}")
242 except Exception as e:
243 print(f"[SOAP ERROR] {e}")
244
245
246# === WebSocket callbacks ===
247
248def on_open(ws):
249 print("=" * 80)
250 print(f"[{datetime.now().strftime('%H:%M:%S')}] Medical transcription started")
251 print(f"Connected to: {API_ENDPOINT_BASE_URL}")
252 print(f"Gateway model: {selected_model}")
253 print("=" * 80)
254 print("\nSpeak to begin. Press Ctrl+C to stop.\n")
255
256 def stream_audio():
257 global stream
258 while not stop_event.is_set():
259 try:
260 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
261 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
262 except Exception as e:
263 if not stop_event.is_set():
264 print(f"Error streaming audio: {e}")
265 break
266
267 global audio_thread
268 audio_thread = threading.Thread(target=stream_audio, daemon=True)
269 audio_thread.start()
270
271
272def on_message(ws, message):
273 global last_processed_turn
274 try:
275 data = json.loads(message)
276 msg_type = data.get("type")
277
278 if msg_type == "Begin":
279 print(f"[SESSION] Started - ID: {data.get('id','N/A')}\n")
280
281 elif msg_type == "Turn":
282 end_of_turn = data.get("end_of_turn", False)
283 transcript = data.get("transcript", "")
284 utterance = data.get("utterance", "")
285 turn_order = data.get("turn_order", 0)
286 eot_conf = data.get("end_of_turn_confidence", 0.0)
287
288 # live partials
289 if not end_of_turn and transcript:
290 print(f"\r[PARTIAL] {transcript[:120]}...", end="", flush=True)
291
292 # If AssemblyAI has finalized a turn, LLM-edit the transcript
293 if end_of_turn and transcript:
294 if last_processed_turn == turn_order:
295 return # avoid duplicate processing
296 last_processed_turn = turn_order
297
298 ts = datetime.now().strftime('%H:%M:%S')
299 print("\n[DEBUG] EOT received (formatted). Calling LLM…")
300 edited = post_process_with_llm(transcript)
301
302 changed = "(edited)" if edited.strip() != transcript.strip() else "(no change)"
303 print(f"\n[{ts}] [FINAL {changed}]")
304 print(f" ├─ Original STT : {transcript}")
305 print(f" └─ Edited by LLM: {edited}")
306 print(f"Turn: {turn_order} | Confidence: {eot_conf:.2%}")
307
308 encounter_buffer.append({
309 "timestamp": ts,
310 "text": edited,
311 "original_text": transcript,
312 "turn_order": turn_order,
313 "confidence": eot_conf,
314 "type": "final",
315 })
316
317 # If we also get per-utterance chunks, just log them raw (no LLM) for timeline
318 if utterance:
319 ts = datetime.now().strftime('%H:%M:%S')
320
321 low = utterance.lower()
322 if any(t in low for t in ["medication", "prescribe", "dosage", "mg", "daily"]):
323 print(" 💊 MEDICATION MENTIONED")
324 if any(t in low for t in ["pain", "symptom", "complaint", "problem"]):
325 print(" 🏥 SYMPTOM REPORTED")
326 if any(t in low for t in ["diagnose", "assessment", "impression"]):
327 print(" 📋 DIAGNOSIS DISCUSSED")
328
329 encounter_buffer.append({
330 "timestamp": ts,
331 "text": utterance,
332 "original_text": utterance,
333 "turn_order": turn_order,
334 "confidence": eot_conf,
335 "type": "utterance",
336 })
337 print()
338
339 elif msg_type == "Termination":
340 dur = data.get("audio_duration_seconds", 0)
341 print(f"\n[SESSION] Terminated – Duration: {dur}s")
342 save_encounter_transcript()
343 generate_clinical_note()
344
345 elif msg_type == "Error":
346 print(f"\n[ERROR] {data.get('error', 'Unknown error')}")
347
348 except json.JSONDecodeError as e:
349 print(f"Error decoding message: {e}")
350 except Exception as e:
351 print(f"Error handling message: {e}")
352
353
354def on_error(ws, error):
355 print(f"\n[WEBSOCKET ERROR] {error}")
356 stop_event.set()
357
358
359def on_close(ws, close_status_code, close_msg):
360 print(f"\n[WEBSOCKET] Disconnected – Status: {close_status_code}")
361 global stream, audio
362 stop_event.set()
363
364 if stream:
365 if stream.is_active():
366 stream.stop_stream()
367 stream.close()
368 stream = None
369 if audio:
370 audio.terminate()
371 audio = None
372 if audio_thread and audio_thread.is_alive():
373 audio_thread.join(timeout=1.0)
374
375
376# === Persist artifacts ===
377
378def save_encounter_transcript():
379 if not encounter_buffer:
380 print("No encounter data to save.")
381 return
382
383 fname = f"encounter_transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
384 with open(fname, "w", encoding="utf-8") as f:
385 f.write("Clinical Encounter Transcript\n")
386 f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
387 f.write("=" * 80 + "\n\n")
388 for e in encounter_buffer:
389 if e.get("speaker"):
390 f.write(f"[{e['timestamp']}] {e['speaker']}: {e['text']}\n")
391 else:
392 f.write(f"[{e['timestamp']}] {e['text']}\n")
393 f.write(f"Confidence: {e['confidence']:.2%}\n\n")
394 print(f"Encounter transcript saved: {fname}")
395
396
397# === Main ===
398
399def run():
400 global audio, stream, ws_app, selected_model
401
402 print("=" * 60)
403 print(" 🎙️ Medical Scribe - STT + LLM Gateway")
404 print("=" * 60)
405 selected_model = select_model()
406 print(f"✓ Using model: {selected_model}")
407
408 # Init mic
409 audio = pyaudio.PyAudio()
410 try:
411 stream = audio.open(
412 input=True,
413 frames_per_buffer=FRAMES_PER_BUFFER,
414 channels=CHANNELS,
415 format=FORMAT,
416 rate=SAMPLE_RATE,
417 )
418 print("Audio stream opened successfully.")
419 except Exception as e:
420 print(f"Error opening audio stream: {e}")
421 if audio:
422 audio.terminate()
423 return
424
425 # Connect WS
426 ws_headers = [f"Authorization: {ASSEMBLYAI_API_KEY}"]
427 ws_app = websocket.WebSocketApp(
428 API_ENDPOINT,
429 header=ws_headers,
430 on_open=on_open,
431 on_message=on_message,
432 on_error=on_error,
433 on_close=on_close,
434 )
435
436 ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True)
437 ws_thread.start()
438
439 try:
440 while ws_thread.is_alive():
441 time.sleep(0.1)
442 except KeyboardInterrupt:
443 print("\n\nCtrl+C received. Stopping...")
444 stop_event.set()
445 # best-effort terminate
446 if ws_app and ws_app.sock and ws_app.sock.connected:
447 try:
448 ws_app.send(json.dumps({"type": "Terminate"}))
449 time.sleep(0.5)
450 except Exception as e:
451 print(f"Error sending termination: {e}")
452 if ws_app:
453 ws_app.close()
454 ws_thread.join(timeout=2.0)
455 finally:
456 if stream and stream.is_active():
457 stream.stop_stream()
458 if stream:
459 stream.close()
460 if audio:
461 audio.terminate()
462 print("Cleanup complete. Exiting.")
463
464
465if __name__ == "__main__":
466 run()

Next steps