Skip to main content
Stream audio from your microphone with speaker diarization and LLM Gateway to get live transcription and automatic summaries after each speaker turn. Products used: Real-time STT + Universal-3 Pro + LLM Gateway Model selection: Uses u3-rt-pro (Universal-3 Pro Streaming) for the lowest latency (~300ms) with the highest streaming accuracy.
# pip install pyaudio websocket-client
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode

# ── Config ────────────────────────────────────────────────────
YOUR_API_KEY = "YOUR_API_KEY"

PROMPT = (
    "Summarize this speaker turn in one sentence, then list any "
    "action items mentioned.\n\nTranscript: {{turn}}"
)

LLM_GATEWAY_CONFIG = {
    "model": "claude-sonnet-4-5-20250929",
    "messages": [{"role": "user", "content": PROMPT}],
    "max_tokens": 500,
}

CONNECTION_PARAMS = {
    "sample_rate": 16000,
    "speech_model": "u3-rt-pro",
    "format_turns": True,
    "min_turn_silence": 560,   # Wait longer for natural meeting pauses
    "max_turn_silence": 2000,
    "llm_gateway": json.dumps(LLM_GATEWAY_CONFIG),
}

API_ENDPOINT = f"wss://streaming.assemblyai.com/v3/ws?{urlencode(CONNECTION_PARAMS)}"

# Audio settings
FRAMES_PER_BUFFER = 800
SAMPLE_RATE = 16000
stop_event = threading.Event()

def on_open(ws):
    print("Connected — speak into your microphone. Press Ctrl+C to stop.\n")

    def stream_audio():
        audio = pyaudio.PyAudio()
        stream = audio.open(
            input=True, frames_per_buffer=FRAMES_PER_BUFFER,
            channels=1, format=pyaudio.paInt16, rate=SAMPLE_RATE,
        )
        while not stop_event.is_set():
            try:
                data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(data, websocket.ABNF.OPCODE_BINARY)
            except Exception:
                break
        stream.stop_stream()
        stream.close()
        audio.terminate()

    threading.Thread(target=stream_audio, daemon=True).start()

def on_message(ws, message):
    data = json.loads(message)
    msg_type = data.get("type")

    if msg_type == "Turn":
        transcript = data.get("transcript", "")
        if data.get("end_of_turn") and transcript:
            print(f"[Turn] {transcript}\n")
        elif transcript:
            print(f"\r  ... {transcript[-80:]}", end="", flush=True)

    elif msg_type == "LLMGatewayResponse":
        content = data.get("data", {}).get("choices", [{}])[0].get("message", {}).get("content", "")
        print(f"[Assistant] {content}\n")

    elif msg_type == "Termination":
        print(f"\nSession ended — {data.get('audio_duration_seconds', 0)}s of audio processed.")

def on_error(ws, error):
    print(f"Error: {error}")
    stop_event.set()

def on_close(ws, code, msg):
    stop_event.set()

ws_app = websocket.WebSocketApp(
    API_ENDPOINT,
    header={"Authorization": YOUR_API_KEY},
    on_open=on_open, on_message=on_message,
    on_error=on_error, on_close=on_close,
)

ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True)
ws_thread.start()

try:
    while ws_thread.is_alive():
        time.sleep(0.1)
except KeyboardInterrupt:
    print("\nStopping...")
    stop_event.set()
    if ws_app.sock and ws_app.sock.connected:
        ws_app.send(json.dumps({"type": "Terminate"}))
        time.sleep(2)
    ws_app.close()
Connected — speak into your microphone. Press Ctrl+C to stop.

[Turn] So the main thing we need to decide today is whether we're going
with vendor A or vendor B for the new analytics platform.

[Assistant] The speaker is initiating a decision discussion about choosing
between two analytics platform vendors.
Action items: None yet — decision pending.

[Turn] I think vendor A has better pricing but vendor B has the integrations
we need. Can someone pull the comparison spreadsheet by Friday?

[Assistant] The speaker compared vendor pricing vs. integrations and requested
a comparison document.
Action items:
- Pull the vendor comparison spreadsheet by Friday

See the End-to-end examples overview for all available pipelines.