Streaming Audio

Quickstart

In this quick guide you will learn how to use AssemblyAI’s Streaming Speech-to-Text feature to transcribe audio from your microphone.

To run this quickstart you will need:

  • Python or JavaScript installed
  • A valid AssemblyAI API key

To run the quickstart:

1

Create a new Python file (for example, main.py) and paste the code provided below inside.

2

Insert your API key to line 17.

3

Install the necessary libraries

$pip install assemblyai pyaudio
4

Run with python main.py

1import logging
2from typing import Type
3
4import assemblyai as aai
5from assemblyai.streaming.v3 import (
6 BeginEvent,
7 StreamingClient,
8 StreamingClientOptions,
9 StreamingError,
10 StreamingEvents,
11 StreamingParameters,
12 StreamingSessionParameters,
13 TerminationEvent,
14 TurnEvent,
15)
16
17api_key = "<YOUR_API_KEY>"
18
19logging.basicConfig(level=logging.INFO)
20logger = logging.getLogger(__name__)
21
22
23def on_begin(self: Type[StreamingClient], event: BeginEvent):
24 print(f"Session started: {event.id}")
25
26
27def on_turn(self: Type[StreamingClient], event: TurnEvent):
28 print(f"{event.transcript} ({event.end_of_turn})")
29
30 if event.end_of_turn and not event.turn_is_formatted:
31 params = StreamingSessionParameters(
32 format_turns=True,
33 )
34
35 self.set_params(params)
36
37
38def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
39 print(
40 f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
41 )
42
43
44def on_error(self: Type[StreamingClient], error: StreamingError):
45 print(f"Error occurred: {error}")
46
47
48def main():
49 client = StreamingClient(
50 StreamingClientOptions(
51 api_key=api_key,
52 api_host="streaming.assemblyai.com",
53 )
54 )
55
56 client.on(StreamingEvents.Begin, on_begin)
57 client.on(StreamingEvents.Turn, on_turn)
58 client.on(StreamingEvents.Termination, on_terminated)
59 client.on(StreamingEvents.Error, on_error)
60
61 client.connect(
62 StreamingParameters(
63 sample_rate=16000,
64 format_turns=True,
65 )
66 )
67
68 try:
69 client.stream(
70 aai.extras.MicrophoneStream(sample_rate=16000)
71 )
72 finally:
73 client.disconnect(terminate=True)
74
75
76if __name__ == "__main__":
77 main()

Core concepts

Streaming API: Message Sequence Breakdown

Universal-Streaming is built based upon two core concepts: Turn objects and immutable transcriptions.

Turn object

A Turn object is intended to correspond to a speaking turn in the context of voice agent applications, and therefore it roughly corresponds to an utterance in a broader context. We assign a unique ID to each Turn object, which is included in our response. Specifically, the Universal-Streaming response is formatted as follows:

1{
2 "turn_order": 1,
3 "turn_is_formatted": false,
4 "end_of_turn": false,
5 "transcript": "modern medicine is",
6 "end_of_turn_confidence": 0.7,
7 "words": [
8 { "text": "modern", "word_is_final": true, ... },
9 { "text": "medicine", "word_is_final": true, ... },
10 { "text": "is", "word_is_final": true, ... },
11 { "text": "amazing", "word_is_final": false, ... }
12 ]
13}
  • turn_order: Integer that increments with each new turn
  • turn_is_formatted: Boolean indicating if the text in the transcript field is formatted. Text formatting is enabled when format_turns is set to true. It adds punctuation as well as performs casing and inverse text normalization to display various entities, such as dates, times, and phone numbers, in a human-friendly format
  • end_of_turn: Boolean indicating if this is the end of the current turn
  • transcript: String containing only finalized words
  • end_of_turn_confidence: Floating number (0-1) representing the confidence that the current turn has finished, i.e., the current speaker has completed their turn
  • words: List of Word objects with individual metadata

Each Word object in the words array includes:

  • text: The string representation of the word
  • word_is_final: Boolean indicating if the word is finalized, where a finalized word means the word won’t be altered in future transcription responses
  • start: Timestamp for word start
  • end: Timestamp for word end
  • confidence: Confidence score for the word

Immutable transcription

AssemblyAI’s streaming system receives audio in a streaming fashion, it returns transcription responses in real-time using the format specified above. Unlike many other streaming speech-to-text models that implement the concept of partial/variable transcriptions to show transcripts in an ongoing manner, Universal-Streaming transcriptions are immutable. In other words, the text that has already been produced will not be overwritten in future transcription responses. Therefore, with Universal-Streaming, the transcriptions will be delivered in the following way:

1→ hello my na
2→ hello my name
3→ hello my name
4→ hello my name is
5→ hello my name is zac
6→ hello my name is zack

When an end of the current turn is detected, you then receive a message with end_of_turn being true. Additionally, if you enable text formatting, you will also receive a transcription response with turn_is_formatted being true.

1→ hello my name is zack (unformatted)
2→ Hello my name is Zack. (formatted)

In this example, you may have noticed that the last word of each transcript may occasionally be a subword (“zac” in the example shown above). Each Word object has the word_is_final field to indicate whether the model is confident that the last word is a completed word. Note that, except for the last word, word_is_final is always true.

Recommendations

  • Use an audio chunk size of 50ms. Larger chunk sizes are workable, but may result in latency fluctuations.

Voice agent use case

To optimize for latency, we recommend using the unformatted transcript as it’s received more quickly than the formatted version. In typical voice agent applications involving large language models (LLMs), the lack of formatting makes little impact on the subsequent LLM processing.

Possible implementation strategy

Since all our transcripts are immutable, the data is immediately ready to be sent in the voice agent pipeline. Here’s one way to handle the conversation flow:

  1. When you receive a transcription response with the end_of_turn value being true but your Voice Agent (i.e., your own turn detection logic) hasn’t detected end of turn, save this data in a variable (let’s call it running_transcript).
  2. When the voice agent detects end of turn, combine the running_transcript with the latest partial transcript and send it to the LLM.
  3. Clear the running_transcript after sending and be sure to ignore the next transcription with end_of_turn of true, that will eventually arrive for the latest partial you used. This prevents duplicate information from being processed in future turns.

What you send to the voice agent should look like: running_transcript + ’ ’ + latest_partial

Example flow

1→ hello my na
2→ hello my name
3→ hello my name
4→ hello my name is
5→ hello my name is son
6→ hello my name is sonny (final – added to running_transcript)
7
8→ I
9→ I work at
10→ I work at assembly ai (final – added to running_transcript)
11
12→ how
13→ how can
14→ how can I help
15→ how can I help you today (latest partial, final not yet received)
16
17<END_OF_TURN_DETECTED>
18"hello my name is sonny I work at assembly ai how can I help you today" → sent to LLM
19
20<running_transcript cleared>
21<final for latest_partial not added to running_transcript>

Utilizing our ongoing transcriptions in this manner will allow you to achieve the fastest possible latency for this step of your Voice Agent. Please reach out to the AssemblyAI team with any questions.

Instead of building your own logic for conversation flow handling, you may use AssemblyAI via integrations with tools like LiveKit and Pipecat. See the next section of our docs for more information on using these orchestrators.

Voice agent orchestrators

Live captioning use case

The default setting for Streaming Speech-to-Text is optimized for the Voice Agent use case, where you expect one person speaking with long silences happening during the agent’s speaking turn.

For applications such as live captioning, where the input audio stream typically contains multiple people speaking, it is usually beneficial to wait longer before detecting turns, which trigger text formatting.

When captioning conversations with multiple speakers, we recommend setting min_end_of_turn_silence_when_confident to 560 ms. By default, this is set to 160 ms.

Authenticate with a temporary token

If you need to authenticate on the client, you can avoid exposing your API key by using temporary authentication tokens. You should generate this token on your server and pass it to the client.

GET
/v3/token
1curl -G https://streaming.assemblyai.com/v3/token \
2 -H "Authorization: <apiKey>" \
3 -d expires_in_seconds=60
1

To generate a temporary token, call StreamingClient.create_temporary_token().

Use the expires_in_seconds parameter to specify the duration for which the token will remain valid. Optionally, use the max_session_duration_seconds parameter to specify the desired maximum duration for the session started using this token.

1client = StreamingClient(
2 StreamingClientOptions(
3 api_key="<YOUR_API_KEY>",
4 api_host="streaming.assemblyai.com",
5 )
6)
7
8return client.create_temporary_token(expires_in_seconds=60)
expires_in_seconds must be a value between 1 and 600 seconds. If specified, max_session_duration_seconds must be a value between 60 and 10800 seconds (defaults to maximum session duration of 3 hours).
2

The client should retrieve the token from the server and use the token to authenticate the transcriber.

Each token has a one-time use restriction and can only be used for a single session. Any usage associated with a temporary token will be attributed to the API key that generated it.

To use it, specify the token parameter when initializing the StreamingClient.

1client = StreamingClient(
2 StreamingClientOptions(
3 token=token,
4 api_host="streaming.assemblyai.com",
5 )
6)

Multichannel streaming audio

To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels.

The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.

1

Firstly, install the required dependencies.

$pip install websocket-client numpy pyaudio
2

Use this complete script to transcribe dual-channel audio with speaker separation:

1import websocket
2import json
3import threading
4import numpy as np
5import wave
6import time
7import pyaudio
8from urllib.parse import urlencode
9
10# Configuration
11YOUR_API_KEY = "<YOUR_API_KEY>"
12AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
13API_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
14API_PARAMS = {
15 "sample_rate": 8000,
16 "format_turns": "true",
17}
18
19# Build API endpoint with URL encoding
20API_ENDPOINT = f"{API_BASE_URL}?{urlencode(API_PARAMS)}"
21
22class ChannelTranscriber:
23 def __init__(self, channel_id, channel_name):
24 self.channel_id = channel_id
25 self.channel_name = channel_name
26 self.ws_app = None
27 self.audio_data = []
28 self.current_turn_line = None
29 self.line_count = 0
30
31 def load_audio_channel(self):
32 """Extract single channel from dual-channel audio file."""
33 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
34 frames = wf.readframes(wf.getnframes())
35 audio_array = np.frombuffer(frames, dtype=np.int16)
36
37 if wf.getnchannels() == 2:
38 audio_array = audio_array.reshape(-1, 2)
39 channel_audio = audio_array[:, self.channel_id]
40
41 # Split into chunks for streaming
42 FRAMES_PER_BUFFER = 400 # 50ms chunks
43 for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
44 chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
45 if len(chunk) < FRAMES_PER_BUFFER:
46 chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
47 self.audio_data.append(chunk.astype(np.int16).tobytes())
48
49 def on_open(self, ws):
50 """Stream audio data when connection opens."""
51 def stream_audio():
52 for chunk in self.audio_data:
53 ws.send(chunk, websocket.ABNF.OPCODE_BINARY)
54 time.sleep(0.05) # 50ms intervals
55
56 # Send termination message
57 terminate_message = {"type": "Terminate"}
58 ws.send(json.dumps(terminate_message))
59
60 threading.Thread(target=stream_audio, daemon=True).start()
61
62 def clear_current_line(self):
63 if self.current_turn_line is not None:
64 print("\r" + " " * 100 + "\r", end="", flush=True)
65
66 def print_partial_transcript(self, words):
67 self.clear_current_line()
68 # Build transcript from individual words
69 word_texts = [word.get('text', '') for word in words]
70 transcript = ' '.join(word_texts)
71 partial_text = f"{self.channel_name}: {transcript}"
72 print(partial_text, end="", flush=True)
73 self.current_turn_line = len(partial_text)
74
75 def print_final_transcript(self, transcript):
76 self.clear_current_line()
77 final_text = f"{self.channel_name}: {transcript}"
78 print(final_text, flush=True)
79 self.current_turn_line = None
80 self.line_count += 1
81
82 def on_message(self, ws, message):
83 """Handle transcription results."""
84 data = json.loads(message)
85 msg_type = data.get('type')
86
87 if msg_type == "Turn":
88 transcript = data.get('transcript', '').strip()
89 formatted = data.get('turn_is_formatted', False)
90 words = data.get('words', [])
91
92 if transcript or words:
93 if formatted:
94 self.print_final_transcript(transcript)
95 else:
96 self.print_partial_transcript(words)
97
98 def start_transcription(self):
99 self.load_audio_channel()
100
101 self.ws_app = websocket.WebSocketApp(
102 API_ENDPOINT,
103 header={"Authorization": YOUR_API_KEY},
104 on_open=self.on_open,
105 on_message=self.on_message,
106 )
107
108 thread = threading.Thread(target=self.ws_app.run_forever, daemon=True)
109 thread.start()
110 return thread
111
112def play_audio_file():
113 try:
114 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
115 p = pyaudio.PyAudio()
116
117 stream = p.open(
118 format=p.get_format_from_width(wf.getsampwidth()),
119 channels=wf.getnchannels(),
120 rate=wf.getframerate(),
121 output=True
122 )
123
124 print(f"Playing audio: {AUDIO_FILE_PATH}")
125
126 # Play audio in chunks
127 chunk_size = 1024
128 data = wf.readframes(chunk_size)
129
130 while data:
131 stream.write(data)
132 data = wf.readframes(chunk_size)
133
134 stream.stop_stream()
135 stream.close()
136 p.terminate()
137
138 print("Audio playback finished")
139
140 except Exception as e:
141 print(f"Error playing audio: {e}")
142
143
144# Usage
145def transcribe_multichannel():
146 # Create transcribers for each channel
147 transcriber_1 = ChannelTranscriber(0, "Speaker 1")
148 transcriber_2 = ChannelTranscriber(1, "Speaker 2")
149
150 # Start audio playback
151 audio_thread = threading.Thread(target=play_audio_file, daemon=True)
152 audio_thread.start()
153
154 # Start both transcriptions
155 thread_1 = transcriber_1.start_transcription()
156 thread_2 = transcriber_2.start_transcription()
157
158 # Wait for completion
159 thread_1.join()
160 thread_2.join()
161 audio_thread.join()
162
163if __name__ == "__main__":
164 transcribe_multichannel()

Reference

Connection parameters

token
string

Authenticate the session using a generated temporary token.

sample_rate
intRequired

The sample rate of the audio stream.

encoding
stringRequired

The encoding of the audio stream. Allowed values: pcm_s16le, pcm_mulaw

format_turns
booleanDefaults to False

Whether to return formatted final transcripts.

If enabled, formatted final transcripts will be emitted shortly following an end-of-turn detection.

end_of_turn_confidence_threshold
floatDefaults to 0.7

The confidence threshold (0.0 to 1.0) to use when determining if the end of a turn has been reached.

Raise or lower the threshold based on how confident you’d like us to be before triggering end of turn based on confidence score
min_end_of_turn_silence_when_confident
intDefaults to 160 ms

The minimum amount of silence in milliseconds required to detect end of turn when confident.

Increase or decrease the amount of time we wait to trigger end of turn when confident
max_turn_silence
intDefaults to 2400 ms

The maximum amount of silence in milliseconds allowed in a turn before end of turn is triggered.

Lower or raise the amount of time needed to trigger end of turn when end of turn isn’t triggered by a high confidence score

Audio requirements

The audio format must conform to the following requirements:

  • PCM16 or Mu-law encoding (See Specify the encoding)
  • A sample rate that matches the value of the sample_rate parameter
  • Single-channel
  • 50 milliseconds of audio per message (recommended)

Message types

You send:

1"UklGRtjIAABXQVZFZ"
1{
2 "type": "UpdateConfiguration",
3 "end_of_turn_confidence_threshold": 0.5
4}
1{ "type": "Terminate" }
1{ "type": "ForceEndpoint" }

You receive:

1{
2 "type": "Begin",
3 "id": "cfd280c7-5a9b-4dd6-8c05-235ccfa3c97f",
4 "expires_at": 1745483367
5}
1{
2 "turn_order": 0,
3 "turn_is_formatted": true,
4 "end_of_turn": true,
5 "transcript": "Hi, my name is Sonny.",
6 "end_of_turn_confidence": 0.8095446228981018,
7 "words":
8 [
9 {
10 "start": 1440,
11 "end": 1520,
12 "text": "Hi,",
13 "confidence": 0.9967870712280273,
14 "word_is_final": true
15 },
16 {
17 "start": 1600,
18 "end": 1680,
19 "text": "my",
20 "confidence": 0.999546468257904,
21 "word_is_final": true
22 },
23 {
24 "start": 1600,
25 "end": 1680,
26 "text": "name",
27 "confidence": 0.9597182273864746,
28 "word_is_final": true
29 },
30 {
31 "start": 1680,
32 "end": 1760,
33 "text": "is",
34 "confidence": 0.8261497616767883,
35 "word_is_final": true
36 },
37 {
38 "start": 2320,
39 "end": 3040,
40 "text": "Sonny.",
41 "confidence": 0.5737350583076477,
42 "word_is_final": true
43 }
44 ],
45 "type": "Turn"
46}

For the full breakdown of the message sequence for a turn, see the Message sequence breakdown guide.

1{
2 "type": "Termination",
3 "audio_duration_seconds": 2000,
4 "session_duration_seconds": 2000
5}

Common session errors and closures

In WebSocket based connections, closures and errors represent different ways a connection can terminate. A closure is a normal, expected termination initiated by either the client or the server, whereas errors are terminations resulting from an unexpected problem like network issues, protocol mismatches, timeouts, or server-side issues. In the event of an error, the on_error callback is triggered just prior to on_close. If an error is not encountered, then only on_close is called.

When a session closes, the on_close callback receives a status code and reason detailing why the connection ended. This information is useful when attempting to debug issues or handle certain closure scenarios programmatically. The below table lists some of the common reasons for a session closure along with their corresponding codes and descriptions.

CodeReasonDescription
3005Session Expired: Maximum session duration exceededSession exceeded 3 hour limit (or max session duration set by temporary token).
3005Input duration violation: <time> ms. Expected between 50 and 1000 msAudio chunk size less than 50ms or greater than 1000ms.
3005Invalid Message Type: <message>Unsupported message type.
3005Invalid JSON: <json>Message contains invalid JSON.
3005Invalid Message: <message>Message is not valid (i.e. '[]').
3005Audio Transmission Rate Exceeded: Received <time> sec. audio in <time> secAudio sent faster than real-time.
3005Session Cancelled: An error occurredUnknown server error.
1008Unauthorized Connection: Too many concurrent sessionsReal-time concurrency limit exceeded. For more on concurrency limits, see your Account’s Rate Limits and how streaming concurrency works.
1008Unauthorized Connection: Missing Authorization headerMissing or invalid API token. Your API tokens can be found on the API Keys page of your account dashboard.
1008Unauthorized Connection: <reason>Account related issue (insufficient account balance, account temporarily disabled, etc.).
Handling closed sessions

A common way to handle a closure such as 3005 - Session Expired: Maximum session duration exceeded is to parse the status code and reason in the on_close callback. If a specific code and reason are detected, you can then take appropriate action, such as opening a new session or logging useful debugging information.

Note that the on_error callback is not triggered in this case, as the session closes for a known reason and not due to encountering an error.

If you believe your session received an error or closed due to a reason not listed above, please reach out to support@assemblyai.com with the session id and any further details.