# pip install pyaudio websocket-client
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "your_api_key"
# Contact center keyterms
KEYTERMS = [
# Company and product terms
"Acme Corp",
"Premium Support Plan",
"Enterprise License",
# Compliance phrases
"recorded line",
"calls are monitored",
# Common contact center vocabulary
"account number",
"case number",
"escalation",
"supervisor",
]
# CONTACT CENTER CONFIGURATION
CONNECTION_PARAMS = {
"sample_rate": 8000, # Telephony standard (8kHz)
"speech_model": "u3-rt-pro", # Universal-3 Pro Streaming for highest accuracy
"format_turns": True,
# Contact center turn detection
# u3-rt-pro defaults: min_turn_silence=100ms, max_turn_silence=1000ms
"min_turn_silence": 400, # Longer than default for natural call pauses
"max_turn_silence": 1500, # Longer for customers explaining issues
# Keyterms for accuracy
"keyterms_prompt": KEYTERMS,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS, doseq=True)}"
# Audio Configuration
FRAMES_PER_BUFFER = 400 # 50ms of audio at 8kHz
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
transcript_buffer = []
def on_open(ws):
print("=" * 80)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Agent assist transcription started")
print(f"Connected to: {API_ENDPOINT_BASE_URL}")
print(f"Keyterms configured: {', '.join(KEYTERMS[:5])}...")
print("=" * 80)
def stream_audio():
global stream
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
if not stop_event.is_set():
print(f"Error streaming audio: {e}")
break
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "Begin":
session_id = data.get("id", "N/A")
print(f"[SESSION] Started - ID: {session_id}\n")
elif msg_type == "Turn":
end_of_turn = data.get("end_of_turn", False)
transcript = data.get("transcript", "")
turn_order = data.get("turn_order", 0)
# Show partials for responsive agent UI
if not end_of_turn and transcript:
print(f"\r[LIVE] {transcript}", end="", flush=True)
# Use formatted finals for agent display
if end_of_turn and transcript:
timestamp = datetime.now().strftime('%H:%M:%S')
print(f"\n[{timestamp}] {transcript}")
# Detect compliance keywords
transcript_lower = transcript.lower()
if any(term in transcript_lower for term in ["cancel", "refund", "complaint", "supervisor"]):
print(" ** ESCALATION KEYWORD DETECTED **")
transcript_buffer.append({
"timestamp": timestamp,
"text": transcript,
"turn_order": turn_order,
"type": "final"
})
print()
elif msg_type == "Termination":
audio_duration = data.get("audio_duration_seconds", 0)
print(f"\n[SESSION] Terminated - Duration: {audio_duration}s")
elif msg_type == "Error":
error_msg = data.get("error", "Unknown error")
print(f"\n[ERROR] {error_msg}")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
print(f"\n[WEBSOCKET ERROR] {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
print(f"\n[WEBSOCKET] Disconnected - Status: {close_status_code}, Message: {close_msg}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
except Exception as e:
print(f"Error opening audio stream: {e}")
if audio:
audio.terminate()
return
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\nCtrl+C received. Stopping transcription...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
ws_app.send(json.dumps(terminate_message))
time.sleep(1)
except Exception as e:
print(f"Error sending termination message: {e}")
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()