Voice Agent API

AssemblyAI’s Voice Agent API is a native real-time voice conversation endpoint. Unlike a traditional STT → LLM → TTS pipeline, the Voice Agent API handles everything in a single WebSocket connection: it listens to the user, understands what they said, generates a response, and speaks it back — all with low latency.

You need a credit card on file to access the Voice Agent API. Add one in your AssemblyAI dashboard.

Key capabilities:

  • Built-in VAD — server-side voice activity detection with configurable sensitivity
  • Customizable turn detection — tune silence thresholds, barge-in behavior, and interruption sensitivity
  • Native audio output — streams PCM16 audio back directly, no separate TTS step
  • Tool calling — register tools and handle tool.call / tool.result events
  • Barge-in / interruption — users can interrupt the agent mid-response

Quickstart

A complete working example — connects to the Voice Agent API endpoint, streams microphone audio, plays back the agent’s voice, and handles two tools (get_weather and get_time).

1

Get your API key

Grab your API key from your AssemblyAI dashboard.

2

Install dependencies

$pip install websockets sounddevice numpy
3

Run

1import asyncio
2import base64
3import datetime
4import json
5import random
6
7import sounddevice as sd
8import numpy as np
9import websockets
10
11API_KEY = "YOUR_API_KEY"
12URL = "wss://agents.assemblyai.com/v1/realtime"
13
14SAMPLE_RATE = 24000
15CHANNELS = 1
16DTYPE = "int16"
17
18TOOLS = [
19 {
20 "type": "function",
21 "name": "get_weather",
22 "description": "Get the current weather for a city.",
23 "parameters": {
24 "type": "object",
25 "properties": {
26 "location": {"type": "string", "description": "City name"}
27 },
28 "required": ["location"]
29 }
30 },
31 {
32 "type": "function",
33 "name": "get_time",
34 "description": "Get the current time and date.",
35 "parameters": {"type": "object", "properties": {}}
36 },
37]
38
39async def execute_tool(event: dict) -> dict:
40 name = event.get("name", "")
41 args = event.get("args", {})
42
43 if name == "get_weather":
44 result = {
45 "location": args.get("location", "Unknown"),
46 "temp_c": random.randint(5, 28),
47 "description": random.choice(["Sunny", "Partly cloudy", "Light rain"]),
48 }
49 elif name == "get_time":
50 now = datetime.datetime.now()
51 result = {"time": now.strftime("%I:%M %p"), "date": now.strftime("%A, %B %d")}
52 else:
53 result = {"error": f"Unknown tool: {name}"}
54
55 return {"call_id": event.get("call_id", ""), "result": result}
56
57async def main():
58 headers = {"Authorization": f"Bearer {API_KEY}"}
59 async with websockets.connect(URL, additional_headers=headers) as ws:
60
61 # Send session config immediately on connect — before session.ready
62 await ws.send(json.dumps({
63 "type": "session.update",
64 "session": {
65 "system_prompt": (
66 "You are a voice assistant. Keep responses to 1-2 short sentences. "
67 "Use your tools for weather and time questions."
68 ),
69 "voice": "claire",
70 "greeting": "Hi! How can I help?",
71 "tools": TOOLS,
72 }
73 }))
74
75 loop = asyncio.get_running_loop()
76 mic_queue = asyncio.Queue()
77 session_ready = asyncio.Event()
78 pending_tools: list[dict] = []
79
80 def mic_callback(indata, *_):
81 # Only send audio after session.ready fires
82 if session_ready.is_set():
83 loop.call_soon_threadsafe(mic_queue.put_nowait, bytes(indata))
84
85 async def send_audio():
86 while True:
87 chunk = await mic_queue.get()
88 await ws.send(json.dumps({
89 "type": "input.audio",
90 "audio": base64.b64encode(chunk).decode()
91 }))
92
93 asyncio.create_task(send_audio())
94
95 with sd.InputStream(samplerate=SAMPLE_RATE, channels=CHANNELS,
96 dtype=DTYPE, callback=mic_callback), \
97 sd.OutputStream(samplerate=SAMPLE_RATE, channels=CHANNELS,
98 dtype=DTYPE) as speaker:
99
100 async for message in ws:
101 event = json.loads(message)
102 t = event.get("type")
103
104 if t == "session.ready":
105 print(f"Ready — start speaking (session_id={event.get('session_id', '')})")
106 session_ready.set()
107
108 elif t == "input.speech.started":
109 print("Listening...")
110
111 elif t == "input.speech.stopped":
112 print("Processing...")
113
114 elif t == "transcript.user":
115 print(f"You: {event['text']}")
116
117 elif t == "transcript.agent":
118 print(f"Agent: {event['text']}")
119
120 elif t == "tool.call":
121 # Accumulate tool results — send them after reply.done
122 pending_tools.append(await execute_tool(event))
123
124 elif t == "reply.audio":
125 pcm = np.frombuffer(base64.b64decode(event["data"]), dtype=np.int16)
126 speaker.write(pcm)
127
128 elif t == "reply.done":
129 if event.get("status") == "interrupted":
130 pending_tools.clear()
131 elif pending_tools:
132 # Send all accumulated tool results
133 for tool in pending_tools:
134 await ws.send(json.dumps({
135 "type": "tool.result",
136 "call_id": tool["call_id"],
137 "result": json.dumps(tool["result"]),
138 }))
139 pending_tools.clear()
140
141 elif t in ("error", "session.error"):
142 print(f"Error: {event.get('message')}")
143 if t == "error":
144 break
145
146asyncio.run(main())

Audio format

Both input and output audio use the same format:

PropertyValue
EncodingPCM16 (16-bit signed integer, little-endian)
Sample rate24,000 Hz
ChannelsMono
TransportBase64-encoded (not raw binary)

We recommend sending chunks of around 50ms (2,400 bytes at 24kHz). The server buffers and processes continuously, so exact chunk size isn’t critical.

Playing output audio

The server streams reply.audio events containing small PCM16 chunks. Write each chunk directly into an audio output buffer and let the OS drain it at the correct sample rate:

1# ✅ Buffer-based playback
2with sd.OutputStream(samplerate=24000, channels=1, dtype="int16") as speaker:
3 # In your event loop:
4 if event["type"] == "reply.audio":
5 pcm = np.frombuffer(base64.b64decode(event["data"]), dtype=np.int16)
6 speaker.write(pcm)

speaker.write() copies samples into the OS audio buffer and returns immediately. The hardware drains the buffer at exactly 24kHz, producing smooth playback. Network jitter is absorbed by the buffer — even if a WebSocket message arrives late, there’s still audio playing.

Don’t use sleep-based timing to schedule playback. The OS doesn’t guarantee exact sleep durations, so the playback clock drifts from the hardware clock over time, causing pops and gaps in the audio.

1# ❌ Don't do this
2while True:
3 chunk = get_next_chunk()
4 play(chunk)
5 await asyncio.sleep(0.020) # drift accumulates → audio artifacts

Stopping playback on interruption

When the user interrupts the agent, the server stops generating audio and sends reply.done with status: "interrupted". Your output buffer may still have queued audio from before the interruption. Flush it so the user doesn’t hear stale speech:

1if event.get("status") == "interrupted":
2 speaker.abort() # discard buffered audio immediately
3 speaker.start() # restart the stream for the next response

Connection

Endpoint

wss://agents.assemblyai.com/v1/realtime

Authentication

Pass your API key as a Bearer token in the HTTP upgrade request:

Authorization: Bearer YOUR_API_KEY

Browsers cannot set custom headers on WebSocket connections. For browser-based apps, use a server-side proxy. See Browser integration.


Events reference

Client → Server

input.audio

Stream PCM16 audio to the agent.

1{
2 "type": "input.audio",
3 "audio": "<base64-encoded PCM16>"
4}
FieldTypeDescription
audiostringBase64-encoded PCM16 mono 24kHz audio

session.update

Configure the session. Send immediately on WebSocket connect — before session.ready. Can also be sent mid-conversation to update any field.

1{
2 "type": "session.update",
3 "session": {
4 "voice": "claire",
5 "system_prompt": "You are a helpful assistant.",
6 "greeting": "Hi! How can I help you today?",
7 "tools": [],
8 "turn_detection": { ... }
9 }
10}
FieldTypeDescription
session.voicestringThe voice to use for the agent’s speech (see Voices)
session.system_promptstringSets the agent’s personality and context
session.greetingstringSpoken aloud at the start of the conversation
session.toolsarrayTool definitions (see Tool calling)
session.turn_detectionobjectVAD and turn detection configuration (see Turn detection)

session.resume

Reconnect to an existing session using the session_id from a previous session.ready. Preserves conversation context across dropped connections.

1{
2 "type": "session.resume",
3 "session_id": "sess_abc123"
4}

Sessions are preserved for 30 seconds after every disconnection before expiring. If the session has expired, the server returns a session.error with code session_not_found or session_forbidden. Start a fresh connection without session.resume.


tool.result

Send a tool result back to the agent. Send this in the reply.done handler — not immediately in tool.call. See Tool calling.

1{
2 "type": "tool.result",
3 "call_id": "call_abc123",
4 "result": "{\"temp_c\": 22, \"description\": \"Sunny\"}"
5}
FieldTypeDescription
call_idstringThe call_id from the tool.call event
resultstringJSON string containing the tool result

Server → Client

session.ready

Session is established and ready to receive audio. Save session_id for reconnection. Start sending input.audio only after this event.

1{
2 "type": "session.ready",
3 "session_id": "sess_abc123"
4}

session.updated

Sent after session.update is applied successfully.

1{ "type": "session.updated" }

input.speech.started

VAD detected the user has started speaking.

1{ "type": "input.speech.started" }

input.speech.stopped

VAD detected the user has stopped speaking.

1{ "type": "input.speech.stopped" }

transcript.user.delta

Partial transcript of what the user is saying, updating in real-time.

1{
2 "type": "transcript.user.delta",
3 "text": "What's the weather in"
4}

transcript.user

Final transcript of the user’s utterance.

1{
2 "type": "transcript.user",
3 "text": "What's the weather in Tokyo?",
4 "item_id": "item_abc123"
5}

reply.started

Agent has begun generating a response.

1{
2 "type": "reply.started",
3 "reply_id": "reply_abc123"
4}

reply.audio

A chunk of the agent’s spoken response as base64 PCM16. Decode and play immediately.

1{
2 "type": "reply.audio",
3 "data": "<base64-encoded PCM16>"
4}

transcript.agent

Full text of the agent’s response, sent after all audio for the response has been delivered. If the agent was interrupted, interrupted is true and text contains only what was actually spoken before the interruption.

1{
2 "type": "transcript.agent",
3 "text": "It's currently 22°C and sunny in Tokyo.",
4 "reply_id": "reply_abc123",
5 "item_id": "item_abc123",
6 "interrupted": false
7}
FieldTypeDescription
textstringWhat the agent said (trimmed to interruption point if interrupted)
reply_idstringID of the reply
item_idstringConversation item ID
interruptedbooleantrue if the user interrupted mid-response

reply.done

Agent has finished speaking. The optional status field indicates why the reply ended.

1{ "type": "reply.done" }
1{ "type": "reply.done", "status": "interrupted" }
FieldTypeDescription
statusstring"interrupted" if the user barged in, absent for normal completion

tool.call

Agent wants to call a registered tool. args is a dict — ready to use directly.

1{
2 "type": "tool.call",
3 "call_id": "call_abc123",
4 "name": "get_weather",
5 "args": { "location": "Tokyo" }
6}
FieldTypeDescription
call_idstringInclude this in tool.result
namestringTool name to call
argsobjectArguments as a dict — use directly

session.error

Session or protocol error.

1{
2 "type": "session.error",
3 "code": "invalid_format",
4 "message": "Invalid message format"
5}

Also handle "error" (without the session. prefix) for connection-level errors.

Error codes:

CodeDescription
invalid_formatMalformed event (e.g. input.audio sent before session.ready)
session_not_foundThe session_id in session.resume does not exist
session_forbiddenThe session_id belongs to a different API key

Session configuration

System prompt

Set the agent’s personality and behaviour. Can be updated mid-session with another session.update.

1{
2 "type": "session.update",
3 "session": {
4 "system_prompt": "You are a friendly support agent. Keep responses under 2 sentences. Never make up information."
5 }
6}

Tips for voice-first prompts:

  • Keep instructions concise — the model reads this, not the user
  • Ban specific phrases: "Never say 'Certainly' or 'Absolutely'"
  • Enforce brevity: "Max 2 sentences per turn"
  • Tell the agent when to use each tool

Greeting

What the agent says at the start of the conversation, spoken aloud. If omitted, the agent waits silently for the user to speak first.

1{
2 "type": "session.update",
3 "session": {
4 "system_prompt": "You are a helpful assistant.",
5 "greeting": "Hi there! How can I help you today?"
6 }
7}

Turn detection

Customize VAD sensitivity, end-of-turn detection, and barge-in behavior. All fields are optional — only include the ones you want to change. Settings can be updated mid-session.

1{
2 "type": "session.update",
3 "session": {
4 "turn_detection": {
5 "speech_detection_threshold": 0.5,
6 "prefix_padding_ms": 300,
7 "min_end_of_turn_silence_ms": 100,
8 "max_turn_silence_ms": 1000,
9 "interrupt_response": true,
10 "min_interrupt_duration_ms": 600,
11 "min_interrupt_words": 0
12 }
13 }
14}
FieldTypeDefaultDescription
speech_detection_thresholdfloat0.5VAD sensitivity (0.0–1.0). Lower = more sensitive to speech.
prefix_padding_msinteger300Audio to include before detected speech starts (ms).
min_end_of_turn_silence_msinteger100Minimum silence to consider a confident end-of-turn (ms).
max_turn_silence_msinteger1000Maximum silence before forcing end-of-turn (ms).
interrupt_responsebooleantrueWhether user speech interrupts the agent. Set false to disable barge-in.
min_interrupt_duration_msinteger600How long the user must speak before triggering an interruption (ms).
min_interrupt_wordsinteger0Minimum words the user must say before interrupting.

Use min_end_of_turn_silence_ms and max_turn_silence_ms together to control responsiveness. A lower min_end_of_turn_silence_ms makes the agent respond faster after the user pauses, while max_turn_silence_ms sets the hard cutoff.

Voices

Set the agent’s voice in session.update. You can change the voice mid-session.

1{
2 "type": "session.update",
3 "session": {
4 "voice": "claire"
5 }
6}

English voices

VoiceDescription
joshConversational, professional, American, male
dylanTheatrical, energetic, chatty, jagged
dawnProfessional, deliberate, smooth
summerEmpathetic, aesthetic, conversational
andySoft, conversational, young
zoeSmooth, conversational, young
alexisHigh-pitched, chatty
michaelDeep, calming, conversational
peteSmooth, direct, clear, fast-paced
brianChatty, nasal, expressive
dianaSoft, older, calming
graceSouthern, older, warm
kaiSlow, calming, ASMR
claireLively, young, conversational
nathanDeep, older
audreyDeeper, older, calming
melissaBritish, clear, smooth, instructive, simple
willNarrative, British, conversational

Multilingual voices

All multilingual voices also speak English and support code-switching between their language(s) and English.

VoiceLanguage(s)Description
gautamHindi/Hinglish, EnglishConversational
lukeMandarin, EnglishConversational, native in both
alexeiRussian, EnglishConversational
maxGerman, EnglishBritish accent, conversational, smooth
annaGerman, EnglishConversational, soft
antoineFrench, EnglishConversational
jennieKorean, English
kenjiJapanese, English
lilyMandarin, English
kevinKorean, English
novaItalian, English
marcoItalian, English
sofiaSpanish, English
yukiJapanese, English
santiagoSpanish, English
leoSpanish (Latin American), EnglishColombian

Tool calling

Register tools to let the agent take real-world actions.

Tool schema

Tools use a flat format — type, name, description, and parameters at the top level:

1{
2 "type": "function",
3 "name": "get_weather",
4 "description": "Get the current weather for a city.",
5 "parameters": {
6 "type": "object",
7 "properties": {
8 "location": {
9 "type": "string",
10 "description": "City name, e.g. London"
11 }
12 },
13 "required": ["location"]
14 }
15}

Handling tool calls

The key pattern: accumulate tool results, then send them all in reply.done — not immediately in tool.call. The agent speaks a transition phrase while waiting; sending results too early can cause timing issues.

1pending_tools: list[dict] = []
2
3# In your event loop:
4
5elif t == "tool.call":
6 name = event["name"]
7 args = event.get("args", {}) # args is a plain dict
8
9 # Execute your tool
10 if name == "get_weather":
11 result = {"temp_c": 22, "description": "Sunny"}
12 else:
13 result = {"error": "Unknown tool"}
14
15 # Accumulate — don't send yet
16 pending_tools.append({
17 "call_id": event["call_id"],
18 "result": result,
19 })
20
21elif t == "reply.done":
22 if event.get("status") == "interrupted":
23 # User barged in — discard pending results
24 pending_tools.clear()
25 elif pending_tools:
26 # Now send all tool results
27 for tool in pending_tools:
28 await ws.send(json.dumps({
29 "type": "tool.result",
30 "call_id": tool["call_id"],
31 "result": json.dumps(tool["result"]),
32 }))
33 pending_tools.clear()

Interruptions

When the user speaks mid-response, the server stops the agent and sends reply.done with status: "interrupted". The transcript.agent event will also fire with interrupted: true and text trimmed to what was actually spoken before the interruption. Discard any pending tool results — the agent is ready to listen again.

You can customize interruption behavior via turn_detection in session.update:

  • interrupt_response — set to false to disable barge-in entirely
  • min_interrupt_duration_ms — how long the user must speak before triggering an interruption (default: 600ms)
  • min_interrupt_words — minimum words the user must say before interrupting (default: 0)

Browser integration

Browsers cannot set the Authorization header on WebSocket connections. Use a server-side proxy:

Browser ──── ws:// ──── Your proxy ──── wss://agents.assemblyai.com
(no key) (adds auth)

Minimal Node.js proxy:

1import http from "http";
2import { WebSocket, WebSocketServer } from "ws";
3
4const AAI_URL = "wss://agents.assemblyai.com/v1/realtime";
5const API_KEY = process.env.ASSEMBLYAI_API_KEY;
6
7const server = http.createServer();
8const wss = new WebSocketServer({ server });
9
10wss.on("connection", (clientWs) => {
11 const aaiWs = new WebSocket(AAI_URL, {
12 headers: { Authorization: `Bearer ${API_KEY}` },
13 });
14
15 clientWs.on("message", (data) => {
16 if (aaiWs.readyState === WebSocket.OPEN) aaiWs.send(data);
17 });
18 aaiWs.on("message", (data) => {
19 if (clientWs.readyState === WebSocket.OPEN) clientWs.send(data);
20 });
21
22 aaiWs.on("close", (code, reason) => clientWs.close(code, reason));
23 clientWs.on("close", () => aaiWs.close());
24});
25
26server.listen(8080);

Framework integrations

Pipecat

Copy pipecat_assemblyai_realtime.py into your project, then use AssemblyAIRealtimeLLMService as a drop-in LLMService.

1"""AssemblyAI Native Voice Agent Plugin for PipeCat."""
2
3from __future__ import annotations
4
5import asyncio
6import base64
7import json
8import logging
9from datetime import datetime, timezone
10
11import websockets
12import websockets.asyncio.client
13
14from pipecat.frames.frames import (
15 BotStartedSpeakingFrame,
16 BotStoppedSpeakingFrame,
17 CancelFrame,
18 EndFrame,
19 ErrorFrame,
20 Frame,
21 InputAudioRawFrame,
22 InterimTranscriptionFrame,
23 LLMContextFrame,
24 StartFrame,
25 TranscriptionFrame,
26 TTSAudioRawFrame,
27 TTSStartedFrame,
28 TTSStoppedFrame,
29 UserStartedSpeakingFrame,
30 UserStoppedSpeakingFrame,
31)
32from pipecat.processors.frame_processor import FrameDirection
33from pipecat.services.llm_service import LLMService
34
35logger = logging.getLogger(__name__)
36
37SAMPLE_RATE = 24000
38NUM_CHANNELS = 1
39
40
41class AssemblyAIRealtimeLLMService(LLMService):
42 def __init__(self, *, url: str, api_key: str, system_prompt: str = "", **kwargs) -> None:
43 super().__init__(**kwargs)
44 self._url = url
45 self._api_key = api_key
46 self._system_prompt = system_prompt
47 self._websocket = None
48 self._receive_task = None
49 self._session_ready = False
50 self._current_response_id = None
51 self._bot_speaking = False
52 self._user_transcript_buf = ""
53 self._pending_tools: list[dict] = []
54
55 async def start(self, frame: StartFrame) -> None:
56 await super().start(frame)
57 await self._connect()
58
59 async def stop(self, frame: EndFrame) -> None:
60 await super().stop(frame)
61 await self._disconnect()
62
63 async def cancel(self, frame: CancelFrame) -> None:
64 await super().cancel(frame)
65 await self._disconnect()
66
67 async def _connect(self) -> None:
68 headers = {"Authorization": self._api_key}
69 try:
70 self._websocket = await websockets.asyncio.client.connect(
71 self._url, additional_headers=headers,
72 )
73 self._receive_task = asyncio.create_task(
74 self._receive_task_handler(), name="AssemblyAIRealtime._recv",
75 )
76 logger.info(f"Connected to AssemblyAI Voice Agent API at {self._url}")
77 if self._system_prompt:
78 await self._send({"type": "session.update", "session": {"system_prompt": self._system_prompt}})
79 except Exception as e:
80 logger.error(f"Failed to connect: {e}")
81 await self.push_frame(ErrorFrame(str(e)))
82
83 async def _disconnect(self) -> None:
84 if self._receive_task:
85 self._receive_task.cancel()
86 try:
87 await self._receive_task
88 except (asyncio.CancelledError, Exception):
89 pass
90 self._receive_task = None
91 if self._websocket:
92 await self._websocket.close()
93 self._websocket = None
94 self._session_ready = False
95 self._bot_speaking = False
96 self._current_response_id = None
97
98 async def _send(self, msg: dict) -> None:
99 if self._websocket:
100 try:
101 await self._websocket.send(json.dumps(msg))
102 except websockets.exceptions.ConnectionClosedOK:
103 pass
104 except Exception as e:
105 logger.error(f"WebSocket send error: {e}")
106 await self.push_frame(ErrorFrame(str(e)))
107
108 async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
109 await super().process_frame(frame, direction)
110 if isinstance(frame, InputAudioRawFrame):
111 await self._send_user_audio(frame)
112 elif isinstance(frame, LLMContextFrame):
113 await self._handle_context(frame)
114 else:
115 await self.push_frame(frame, direction)
116
117 async def _send_user_audio(self, frame: InputAudioRawFrame) -> None:
118 if not self._session_ready:
119 return
120 await self._send({"type": "input.audio", "audio": base64.b64encode(frame.audio).decode()})
121
122 async def _handle_context(self, frame: LLMContextFrame) -> None:
123 for msg in frame.context.messages:
124 if msg.get("role") == "system":
125 content = msg.get("content", "")
126 if isinstance(content, list):
127 content = " ".join(c.get("text", "") for c in content if isinstance(c, dict))
128 if content:
129 await self._send({"type": "session.update", "session": {"system_prompt": content}})
130 break
131 await self.push_frame(frame, FrameDirection.DOWNSTREAM)
132
133 async def _receive_task_handler(self) -> None:
134 try:
135 async for raw in self._websocket:
136 try:
137 await self._handle_event(json.loads(raw))
138 except Exception:
139 logger.exception("Error handling event")
140 except websockets.exceptions.ConnectionClosed:
141 logger.info("AssemblyAI Voice Agent API connection closed")
142
143 async def _handle_event(self, event: dict) -> None:
144 t = event.get("type", "")
145 if t != "reply.audio":
146 logger.debug(f"← {t} {event}")
147 dispatch = {
148 "session.ready": self._on_session_ready,
149 "input.speech.started": lambda _: self._on_speech_started(),
150 "input.speech.stopped": lambda _: self._on_speech_stopped(),
151 "transcript.user.delta": self._on_user_transcript_delta,
152 "transcript.user": self._on_user_transcript,
153 "reply.started": self._on_response_started,
154 "reply.audio": self._on_response_audio,
155 "transcript.agent": self._on_response_transcript,
156 "reply.done": lambda _: self._on_response_done(),
157 "tool.call": self._on_function_call,
158 "session.error": self._on_error,
159 }
160 if handler := dispatch.get(t):
161 await handler(event)
162
163 async def _on_session_ready(self, event: dict) -> None:
164 self._session_ready = True
165 logger.info(f"Session ready | session_id={event.get('session_id', 'unknown')}")
166 if self._pending_tools:
167 await self._send({"type": "session.update", "session": {"tools": self._pending_tools}})
168 self._pending_tools = []
169
170 @staticmethod
171 def _ts() -> str:
172 return datetime.now(timezone.utc).isoformat()
173
174 async def _on_speech_started(self) -> None:
175 self._user_transcript_buf = ""
176 await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
177
178 async def _on_speech_stopped(self) -> None:
179 await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
180
181 async def _on_user_transcript_delta(self, event: dict) -> None:
182 text = event.get("text", "")
183 if text and text != self._user_transcript_buf:
184 self._user_transcript_buf = text
185 await self.push_frame(InterimTranscriptionFrame(text=text, user_id="user", timestamp=self._ts()), FrameDirection.UPSTREAM)
186
187 async def _on_user_transcript(self, event: dict) -> None:
188 text = event.get("text", "")
189 self._user_transcript_buf = ""
190 if text:
191 logger.info(f"[User] {text}")
192 await self.push_frame(TranscriptionFrame(text=text, user_id="user", timestamp=self._ts()), FrameDirection.UPSTREAM)
193
194 async def _on_response_started(self, event: dict) -> None:
195 self._current_response_id = event.get("reply_id", "")
196 self._bot_speaking = True
197 await self.push_frame(TTSStartedFrame())
198 await self.push_frame(BotStartedSpeakingFrame())
199
200 async def _on_response_audio(self, event: dict) -> None:
201 data = base64.b64decode(event.get("data", ""))
202 if data:
203 await self.push_frame(TTSAudioRawFrame(audio=data, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS))
204
205 async def _on_response_transcript(self, event: dict) -> None:
206 if text := event.get("text", ""):
207 logger.info(f"[Agent] {text}")
208
209 async def _on_response_done(self) -> None:
210 self._current_response_id = None
211 self._bot_speaking = False
212 await self.push_frame(TTSStoppedFrame())
213 await self.push_frame(BotStoppedSpeakingFrame())
214
215 async def _on_function_call(self, event: dict) -> None:
216 call_id = event.get("call_id", "")
217 name = event.get("name", "")
218 raw = event.get("arguments", event.get("args", {}))
219 args = raw if isinstance(raw, str) else json.dumps(raw)
220 logger.info(f"Tool call: {name}({args})")
221 result = ""
222 if self.has_function(name):
223 try:
224 result = await self._execute_function(name, call_id, args)
225 except Exception as e:
226 result = f"Error: {e}"
227 await self._send({"type": "tool.result", "call_id": call_id, "result": result or ""})
228
229 async def _execute_function(self, name: str, call_id: str, arguments: str) -> str:
230 from pipecat.adapters.schemas.direct_function import DirectFunctionWrapper
231 from pipecat.processors.aggregators.llm_context import LLMContext
232 from pipecat.services.llm_service import FunctionCallParams
233 args = json.loads(arguments) if isinstance(arguments, str) else arguments
234 result_holder: list = []
235 async def result_callback(result, **_kwargs):
236 result_holder.append(result)
237 params = FunctionCallParams(function_name=name, tool_call_id=call_id, arguments=args, llm=self, context=LLMContext(), result_callback=result_callback)
238 item = self._functions[name]
239 if isinstance(item.handler, DirectFunctionWrapper):
240 await item.handler.invoke(args=args, params=params)
241 else:
242 await item.handler(params)
243 return str(result_holder[0]) if result_holder else ""
244
245 async def _on_error(self, event: dict) -> None:
246 msg = event.get("message") or str(event)
247 logger.error(f"AssemblyAI Voice Agent API error: {msg}")
248 await self.push_frame(ErrorFrame(msg))
249
250 async def set_tools(self, tools: list[dict]) -> None:
251 if self._session_ready:
252 await self._send({"type": "session.update", "session": {"tools": tools}})
253 else:
254 self._pending_tools = tools

LiveKit

Copy assemblyai_realtime.py into your project, then pass RealtimeModel to AgentSession.

1"""AssemblyAI Native Realtime Plugin — livekit.agents RealtimeModel/RealtimeSession."""
2
3from __future__ import annotations
4
5import asyncio
6import base64
7import json
8import logging
9import time
10from dataclasses import dataclass
11from typing import Literal
12
13import aiohttp
14from livekit import rtc
15from livekit.agents import APIConnectionError, llm, utils
16from livekit.agents.types import NOT_GIVEN, NotGivenOr
17from livekit.agents.utils import is_given
18
19logger = logging.getLogger(__name__)
20
21SAMPLE_RATE = 24000
22NUM_CHANNELS = 1
23_SAMPLES_PER_CHUNK = SAMPLE_RATE // 10
24
25
26@dataclass
27class _Generation:
28 response_id: str
29 msg_ch: utils.aio.Chan[llm.MessageGeneration]
30 fn_ch: utils.aio.Chan[llm.FunctionCall]
31 text_ch: utils.aio.Chan[str]
32 audio_ch: utils.aio.Chan[rtc.AudioFrame]
33 modalities_fut: asyncio.Future[list[Literal["text", "audio"]]]
34
35
36def _serialize_tool(tool: llm.Tool) -> dict | None:
37 if isinstance(tool, llm.FunctionTool):
38 return llm.utils.build_legacy_openai_schema(tool, internally_tagged=True)
39 elif isinstance(tool, llm.RawFunctionTool):
40 raw = dict(tool.info.raw_schema)
41 raw.pop("meta", None)
42 raw["type"] = "function"
43 return raw
44 return None
45
46
47class _SessionExpiredError(Exception):
48 pass
49
50
51class RealtimeModel(llm.RealtimeModel):
52 def __init__(self, *, url: str, api_key: str, sample_rate: int = SAMPLE_RATE, greeting: str | None = None) -> None:
53 super().__init__(
54 capabilities=llm.RealtimeCapabilities(
55 turn_detection=True, user_transcription=True, audio_output=True,
56 manual_function_calls=False, auto_tool_reply_generation=True, message_truncation=False,
57 )
58 )
59 self._url = url
60 self._api_key = api_key
61 self._sample_rate = sample_rate
62 self._greeting = greeting
63 self._http_session: aiohttp.ClientSession | None = None
64
65 def _ensure_http_session(self) -> aiohttp.ClientSession:
66 if self._http_session is None or self._http_session.closed:
67 self._http_session = aiohttp.ClientSession()
68 return self._http_session
69
70 def session(self) -> RealtimeSession:
71 return RealtimeSession(self)
72
73 async def aclose(self) -> None:
74 if self._http_session and not self._http_session.closed:
75 await self._http_session.close()
76
77
78class RealtimeSession(llm.RealtimeSession[Literal[()]]):
79 def __init__(self, realtime_model: RealtimeModel) -> None:
80 super().__init__(realtime_model)
81 self._model = realtime_model
82 self._tools = llm.ToolContext.empty()
83 self._chat_ctx = llm.ChatContext.empty()
84 self._msg_ch: utils.aio.Chan[dict] = utils.aio.Chan()
85 self._current_gen: _Generation | None = None
86 self._pending_reply_fut: asyncio.Future[llm.GenerationCreatedEvent] | None = None
87 self._current_response_id: str | None = None
88 self._pending_call_ids: set[str] = set()
89 self._session_ready: bool = False
90 self._session_id: str | None = None
91 self._bstream = utils.audio.AudioByteStream(SAMPLE_RATE, NUM_CHANNELS, samples_per_channel=_SAMPLES_PER_CHUNK)
92 self._input_resampler: rtc.AudioResampler | None = None
93 self._main_task = asyncio.create_task(self._run(), name="AssemblyAIRealtimeSession._run")
94
95 @property
96 def chat_ctx(self) -> llm.ChatContext:
97 return self._chat_ctx
98
99 @property
100 def tools(self) -> llm.ToolContext:
101 return self._tools.copy()
102
103 def _send(self, msg: dict) -> None:
104 try:
105 self._msg_ch.send_nowait(msg)
106 except Exception:
107 pass
108
109 def push_audio(self, frame: rtc.AudioFrame) -> None:
110 for f in self._resample_audio(frame):
111 for chunk in self._bstream.write(f.data.tobytes()):
112 self._send({"type": "input.audio", "audio": base64.b64encode(chunk.data).decode()})
113
114 def push_video(self, frame: rtc.VideoFrame) -> None:
115 pass
116
117 def generate_reply(self, *, instructions: NotGivenOr[str] = NOT_GIVEN) -> asyncio.Future[llm.GenerationCreatedEvent]:
118 loop = asyncio.get_event_loop()
119 fut: asyncio.Future[llm.GenerationCreatedEvent] = loop.create_future()
120 self._pending_reply_fut = fut
121 self._send({"type": "reply.create"})
122 handle = loop.call_later(5.0, lambda: fut.done() or fut.set_exception(llm.RealtimeError("generate_reply timed out.")))
123 fut.add_done_callback(lambda _: handle.cancel())
124 return fut
125
126 def interrupt(self) -> None:
127 if self._current_response_id:
128 self._send({"type": "reply.cancel", "reply_id": self._current_response_id})
129
130 def commit_audio(self) -> None:
131 pass
132
133 def clear_audio(self) -> None:
134 pass
135
136 def truncate(self, *, message_id: str, modalities: list[Literal["text", "audio"]], audio_end_ms: int, audio_transcript: NotGivenOr[str] = NOT_GIVEN) -> None:
137 pass
138
139 async def update_instructions(self, instructions: str) -> None:
140 payload: dict = {"system_prompt": instructions}
141 if self._model._greeting:
142 payload["greeting"] = self._model._greeting
143 self._send({"type": "session.update", "session": payload})
144
145 async def update_tools(self, tools: list[llm.Tool]) -> None:
146 serialized = [s for t in tools if (s := _serialize_tool(t)) is not None]
147 self._send({"type": "session.update", "session": {"tools": serialized}})
148 self._tools = llm.ToolContext([t for t in tools if isinstance(t, (llm.FunctionTool, llm.RawFunctionTool))])
149
150 def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) -> None:
151 if is_given(tool_choice):
152 self._send({"type": "session.update", "session": {"tool_choice": tool_choice}})
153
154 async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
155 if self._session_ready:
156 existing_ids = {item.id for item in self._chat_ctx.items}
157 for item in chat_ctx.items:
158 if item.id in existing_ids:
159 continue
160 if isinstance(item, llm.FunctionCallOutput) and item.call_id in self._pending_call_ids:
161 self._send({"type": "tool.result", "call_id": item.call_id, "result": item.output})
162 self._pending_call_ids.discard(item.call_id)
163 elif isinstance(item, llm.ChatMessage) and item.role in ("user", "system") and item.text_content:
164 self._send({"type": "conversation.message", "role": item.role, "content": item.text_content})
165 self._chat_ctx = chat_ctx
166
167 async def aclose(self) -> None:
168 self._msg_ch.close()
169 await self._main_task
170
171 @utils.log_exceptions(logger=logger)
172 async def _run(self) -> None:
173 for attempt in range(4):
174 try:
175 headers = {"Authorization": f"Bearer {self._model._api_key}"}
176 ws = await self._model._ensure_http_session().ws_connect(self._model._url, headers=headers)
177 if self._session_id is not None:
178 await ws.send_str(json.dumps({"type": "session.resume", "session_id": self._session_id}))
179 if self._model._greeting:
180 await ws.send_str(json.dumps({"type": "session.update", "session": {"greeting": self._model._greeting}}))
181 self._session_ready = False
182 self._current_response_id = None
183 self._close_current_gen()
184 if self._pending_reply_fut:
185 self._pending_reply_fut.cancel()
186 self._pending_reply_fut = None
187 self._bstream = utils.audio.AudioByteStream(SAMPLE_RATE, NUM_CHANNELS, samples_per_channel=_SAMPLES_PER_CHUNK)
188 closing = False
189
190 @utils.log_exceptions(logger=logger)
191 async def _send_task() -> None:
192 nonlocal closing
193 async for msg in self._msg_ch:
194 try:
195 await ws.send_str(json.dumps(msg))
196 except Exception:
197 logger.exception("failed to send event")
198 closing = True
199 await ws.close()
200
201 @utils.log_exceptions(logger=logger)
202 async def _recv_task() -> None:
203 while True:
204 msg = await ws.receive()
205 if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
206 if closing:
207 return
208 raise APIConnectionError("AssemblyAI Realtime connection closed unexpectedly")
209 if msg.type != aiohttp.WSMsgType.TEXT:
210 continue
211 try:
212 self._handle_event(json.loads(msg.data))
213 except _SessionExpiredError:
214 self._session_id = None
215 self._pending_call_ids.clear()
216 raise APIConnectionError("Session expired; retrying as fresh session")
217 except Exception:
218 logger.exception("failed to handle event")
219
220 tasks = [asyncio.create_task(_recv_task()), asyncio.create_task(_send_task())]
221 try:
222 done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
223 for task in done:
224 task.result()
225 return
226 finally:
227 await utils.aio.cancel_and_wait(*tasks)
228 await ws.close()
229 except APIConnectionError:
230 if attempt >= 3:
231 raise
232 await asyncio.sleep([0.5, 1.0, 2.0][attempt])
233
234 def _handle_event(self, event: dict) -> None:
235 self.emit(f"aai.{event.get('type', '')}", event)
236 t = event.get("type", "")
237 if t == "session.ready":
238 self._session_ready = True
239 self._session_id = event.get("session_id")
240 if self._pending_reply_fut and not self._pending_reply_fut.done():
241 self._send({"type": "reply.create"})
242 elif t == "input.speech.started":
243 self.emit("input_speech_started", llm.InputSpeechStartedEvent())
244 elif t == "input.speech.stopped":
245 self.emit("input_speech_stopped", llm.InputSpeechStoppedEvent(user_transcription_enabled=True))
246 elif t == "transcript.user":
247 self.emit("input_audio_transcription_completed", llm.InputTranscriptionCompleted(item_id=event.get("item_id", ""), transcript=event.get("text", ""), is_final=True))
248 elif t == "reply.started":
249 self._close_current_gen()
250 response_id = event.get("reply_id", "")
251 self._current_response_id = response_id
252 text_ch: utils.aio.Chan[str] = utils.aio.Chan()
253 audio_ch: utils.aio.Chan[rtc.AudioFrame] = utils.aio.Chan()
254 modalities_fut = asyncio.get_event_loop().create_future()
255 modalities_fut.set_result(["audio"])
256 msg_gen = llm.MessageGeneration(message_id=response_id, text_stream=text_ch, audio_stream=audio_ch, modalities=modalities_fut)
257 msg_ch: utils.aio.Chan[llm.MessageGeneration] = utils.aio.Chan()
258 fn_ch: utils.aio.Chan[llm.FunctionCall] = utils.aio.Chan()
259 self._current_gen = _Generation(response_id=response_id, msg_ch=msg_ch, fn_ch=fn_ch, text_ch=text_ch, audio_ch=audio_ch, modalities_fut=modalities_fut)
260 msg_ch.send_nowait(msg_gen)
261 gen_ev = llm.GenerationCreatedEvent(message_stream=msg_ch, function_stream=fn_ch, user_initiated=False, response_id=response_id)
262 if self._pending_reply_fut and not self._pending_reply_fut.done():
263 gen_ev.user_initiated = True
264 self._pending_reply_fut.set_result(gen_ev)
265 self._pending_reply_fut = None
266 self.emit("generation_created", gen_ev)
267 elif t == "reply.audio":
268 if gen := self._current_gen:
269 if data := base64.b64decode(event.get("data", "")):
270 gen.audio_ch.send_nowait(rtc.AudioFrame(data=data, sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, samples_per_channel=len(data) // 2))
271 elif t == "transcript.agent":
272 if gen := self._current_gen:
273 if text := event.get("text", ""):
274 gen.text_ch.send_nowait(text)
275 gen.text_ch.close()
276 elif t == "reply.done":
277 self._close_current_gen()
278 self._current_response_id = None
279 elif t == "tool.call":
280 if gen := self._current_gen:
281 call_id = event.get("call_id", "")
282 if call_id not in self._pending_call_ids:
283 self._pending_call_ids.add(call_id)
284 gen.fn_ch.send_nowait(llm.FunctionCall(call_id=call_id, name=event.get("name", ""), arguments=json.dumps(event.get("args", {}))))
285 elif t == "session.error":
286 code = event.get("code", "")
287 if code in ("session_not_found", "session_forbidden"):
288 raise _SessionExpiredError(event.get("message", ""))
289 self.emit("error", llm.RealtimeModelError(timestamp=time.time(), label=self._model.label, error=Exception(event.get("message", "unknown error")), recoverable=False))
290
291 def _close_current_gen(self) -> None:
292 if gen := self._current_gen:
293 for ch in (gen.audio_ch, gen.text_ch, gen.fn_ch, gen.msg_ch):
294 ch.close()
295 self._current_gen = None
296
297 def _resample_audio(self, frame: rtc.AudioFrame):
298 if self._input_resampler is not None and frame.sample_rate != self._input_resampler._input_rate:
299 self._input_resampler = None
300 if self._input_resampler is None and (frame.sample_rate != SAMPLE_RATE or frame.num_channels != NUM_CHANNELS):
301 self._input_resampler = rtc.AudioResampler(input_rate=frame.sample_rate, output_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS)
302 if self._input_resampler:
303 yield from self._input_resampler.push(frame)
304 else:
305 yield frame

Event flow diagram

Client Server
│ │
│── WebSocket connect ──────────────►│
│── session.update ─────────────────►│ (system prompt + tools + greeting)
│ │
│◄─── session.ready ────────────────│ (save session_id)
│ │
│── input.audio (stream) ──────────►│ (only after session.ready)
│── input.audio (stream) ──────────►│
│ │
│◄─── input.speech.started ─────────│
│◄─── transcript.user.delta ────────│
│◄─── input.speech.stopped ─────────│
│◄─── transcript.user ──────────────│
│ │
│◄─── reply.started ────────────────│
│◄─── reply.audio ──────────────────│
│◄─── transcript.agent ─────────────│
│◄─── reply.done ───────────────────│
│ │
│ [tool call flow] │
│◄─── tool.call ────────────────────│ (args is a dict)
│◄─── reply.done ───────────────────│ ← send tool.result here
│── tool.result ────────────────────►│
│◄─── reply.started ────────────────│
│◄─── reply.audio ──────────────────│
│◄─── reply.done ───────────────────│