Agora voice agent with AssemblyAI Universal-3 Pro Streaming
Build a real-time transcription bot for Agora channels using AssemblyAI Universal-3 Pro Streaming.



Build a real-time transcription bot for Agora channels using AssemblyAI Universal-3 Pro Streaming. A Python server joins the Agora channel as a silent observer, pulls raw PCM audio from every participant, and streams it directly to AssemblyAI's WebSocket — giving you speaker-aware transcripts with 307ms P50 latency without touching your existing client code.
Why Agora + AssemblyAI?
Agora's Python Server SDK is designed for server-side bots: they join a channel, subscribe to participant audio as raw PCM frames, and can publish audio back. That PCM stream is exactly what AssemblyAI Universal-3 Pro Streaming expects.
Architecture
Browser/Mobile clients
│ WebRTC (Agora SDK)
▼
Agora Channel
│ server subscribes as bot user
▼
Python Server Bot
(agora-python-server-sdk)
│ PcmAudioFrame per participant
│ sample_rate=16000, pcm_s16le
▼
AssemblyAI Universal-3 Pro Streaming
wss://streaming.assemblyai.com/v3/ws
│ Turn events with transcript
▼
Your application logic
(drive LLM, store transcript, trigger webhook)
Quick start
git clone https://github.com/assemblyai-examples/agora-universal-3-pro
cd agora-universal-3-pro
pip install agora-python-server-sdk websockets
cp .env.example .env
# Fill in AGORA_APP_ID, AGORA_APP_CERT, ASSEMBLYAI_API_KEY
python bot.py --channel my-channel
The bot joins my-channel, begins streaming audio to AssemblyAI, and prints transcripts to stdout. Hit Ctrl+C to terminate the session cleanly.
Environment setup
# .env
AGORA_APP_ID=your_agora_app_id
AGORA_APP_CERT=your_agora_certificate
AGORA_CHANNEL=my-channel
AGORA_BOT_UID=9999 # any unique integer for the bot
ASSEMBLYAI_API_KEY=your_assemblyai_api_keyGet your Agora credentials from the Agora Console. Get your AssemblyAI API key from the AssemblyAI dashboard.
Core integration
The bot does three things concurrently for each participant: pulls audio frames from Agora, forward them to AssemblyAI, and handle transcript events.
import asyncio
import json
import os
import websockets
from agora.rtc.agora_service import AgoraService, AgoraServiceConfig
from agora.rtc.rtc_connection import RTCConnConfig
from agora.rtc.agora_base import (
ClientRoleType,
ChannelProfileType,
AudioScenarioType,
)
SAMPLE_RATE = 16000 # AssemblyAI Universal-3 Pro prefers 16 kHz
CHANNELS = 1 # mono
AAI_WS_URL = (
"wss://streaming.assemblyai.com/v3/ws"
f"?sample_rate={SAMPLE_RATE}"
"&speech_model=u3-rt-pro"
"&format_turns=true"
)
async def stream_participant(agora_channel, uid: int, api_key: str):
"""Open one AssemblyAI WebSocket per participant and forward their audio."""
headers = {"Authorization": api_key}
async with websockets.connect(AAI_WS_URL, additional_headers=headers) as ws:
# Receive Begin session event
begin = json.loads(await ws.recv())
print(f"[uid={uid}] AAI session: {begin['id']}")
async def send_audio():
async for frame in agora_channel.get_audio_frames(uid):
await ws.send(frame.data) # raw PCM bytes
async def recv_transcripts():
async for message in ws:
event = json.loads(message)
if event["type"] == "Turn" and event.get("end_of_turn"):
print(f"[uid={uid}] {event['transcript']}")
await asyncio.gather(send_audio(), recv_transcripts())
async def main():
app_id = os.environ["AGORA_APP_ID"]
app_cert = os.environ["AGORA_APP_CERT"]
channel = os.environ["AGORA_CHANNEL"]
bot_uid = int(os.environ.get("AGORA_BOT_UID", "9999"))
api_key = os.environ["ASSEMBLYAI_API_KEY"]
cfg = AgoraServiceConfig()
cfg.appid = app_id
cfg.enable_audio_processor = True
cfg.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS
service = AgoraService()
service.initialize(cfg)
conn_cfg = RTCConnConfig(
client_role_type=ClientRoleType.CLIENT_ROLE_AUDIENCE,
channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
)
connection = service.create_rtc_connection(conn_cfg)
connection.connect("", channel, str(bot_uid))
agora_channel = connection.get_local_user()
agora_channel.set_playback_audio_frame_before_mixing_parameters(CHANNELS, SAMPLE_RATE)
agora_channel.subscribe_all_audio()
print(f"Bot joined channel '{channel}' as uid {bot_uid}. Waiting for speakers...")
active_streams: dict[int, asyncio.Task] = {}
def on_user_joined(uid: int):
task = asyncio.create_task(stream_participant(agora_channel, uid, api_key))
active_streams[uid] = task
def on_user_left(uid: int, reason: int):
if uid in active_streams:
active_streams[uid].cancel()
del active_streams[uid]
connection.register_observer_callback("on_user_joined", on_user_joined)
connection.register_observer_callback("on_user_offline", on_user_left)
try:
await asyncio.Event().wait()
finally:
for task in active_streams.values():
task.cancel()
connection.disconnect()
service.release()
if __name__ == "__main__":
asyncio.run(main())
Audio format
Agora server-side audio frames are raw PCM. Configure the channel to output at 16 kHz mono before subscribing — this matches AssemblyAI's default format with no resampling required:
# Set output format BEFORE calling subscribe_all_audio()
agora_channel.set_playback_audio_frame_before_mixing_parameters(
num_of_channels=1, # mono
sample_rate=16000, # 16 kHz — no resampling needed for AssemblyAI
)
agora_channel.subscribe_all_audio()
# Each PcmAudioFrame yielded by get_audio_frames() will have:
# frame.data → bytes (raw PCM, 16-bit LE)
# frame.sample_rate → 16000
# frame.number_of_channels → 1
# frame.samples_per_channel → 160 (10ms chunks)
AssemblyAI Universal-3 Pro Streaming accepts any chunk size — 10ms frames from Agora are streamed directly without buffering.
Handling transcripts
The Turn event fires at natural speech boundaries with a complete, formatted transcript. Wire it to your LLM or store it however you need:
async def recv_transcripts(ws, uid: int):
async for message in ws:
event = json.loads(message)
if event["type"] == "Turn" and event.get("end_of_turn"):
transcript = event["transcript"]
# Option 1: log it
print(f"[uid={uid}] {transcript}")
# Option 2: send to your LLM
await send_to_llm(uid, transcript)
# Option 3: store in database
await db.insert(uid=uid, text=transcript, ts=event.get("created"))
Terminating cleanly
Send a Terminate message to flush the final turn before closing. AssemblyAI sends a Termination event confirming audio duration processed:
async def close_stream(ws):
await ws.send(json.dumps({"type": "Terminate"}))
async for message in ws:
event = json.loads(message)
if event["type"] == "Termination":
print(f"Session closed. Audio processed:
{event['audio_duration_seconds']}s")
break
Generating tokens for production
Agora channels in production require a token. Generate one server-side using the Agora token builder:
pip install agora-token-builder
from agora_token_builder import RtcTokenBuilder, Role_Subscriber
import time
def generate_bot_token(app_id: str, app_cert: str, channel: str, uid: int) -> str:
expire = int(time.time()) + 3600
return RtcTokenBuilder.buildTokenWithUid(
app_id, app_cert, channel, uid, Role_Subscriber, expire
)
token = generate_bot_token(
os.environ["AGORA_APP_ID"],
os.environ["AGORA_APP_CERT"],
channel,
bot_uid,
)
connection.connect(token, channel, str(bot_uid))
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

