Streaming Diarization and Multichannel

Identify and label individual speakers in real time, or transcribe multichannel audio using the Streaming API.

Streaming Diarization

Streaming Diarization lets you identify and label individual speakers in real time directly from the Streaming API. Each Turn event includes a speaker_label field (e.g. A, B) indicating which speaker produced that transcript. Speaker accuracy improves over the course of a session as the model accumulates embedding context — so the longer the conversation, the better the labels.

Diarization is supported on all streaming models: u3-rt-pro, universal-streaming-english, and universal-streaming-multilingual.

Already using AssemblyAI streaming?

You can enable Streaming Diarization by adding speaker_labels: true to your connection parameters. No other changes are required — the speaker_label field will appear on every Turn event automatically.

Quickstart

Get started with Streaming Diarization using the code below. This example streams audio from your microphone and prints each turn with its speaker label.

1

Install the required libraries

$pip install "assemblyai>=1.0.0" pyaudio
2

Create a new file main.py and paste the code below. Replace <YOUR_API_KEY> with your API key.

3

Run with python main.py and speak into your microphone.

1import logging
2from typing import Type
3import assemblyai as aai
4from assemblyai.streaming.v3 import (
5 BeginEvent,
6 StreamingClient,
7 StreamingClientOptions,
8 StreamingError,
9 StreamingEvents,
10 StreamingParameters,
11 TurnEvent,
12 TerminationEvent,
13)
14
15api_key = "<YOUR_API_KEY>"
16logging.basicConfig(level=logging.INFO)
17logger = logging.getLogger(__name__)
18
19def on_begin(self: Type[StreamingClient], event: BeginEvent):
20 print(f"Session started: {event.id}")
21
22def on_turn(self: Type[StreamingClient], event: TurnEvent):
23 speaker = event.speaker_label or "UNKNOWN"
24 print(f"[{speaker}] {event.transcript} (end_of_turn={event.end_of_turn})")
25
26def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
27 print(
28 f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
29 )
30
31def on_error(self: Type[StreamingClient], error: StreamingError):
32 print(f"Error occurred: {error}")
33
34def main():
35 client = StreamingClient(
36 StreamingClientOptions(
37 api_key=api_key,
38 api_host="streaming.assemblyai.com",
39 )
40 )
41 client.on(StreamingEvents.Begin, on_begin)
42 client.on(StreamingEvents.Turn, on_turn)
43 client.on(StreamingEvents.Termination, on_terminated)
44 client.on(StreamingEvents.Error, on_error)
45
46 client.connect(
47 StreamingParameters(
48 sample_rate=16000,
49 speech_model="u3-rt-pro",
50 speaker_labels=True,
51 )
52 )
53 try:
54 client.stream(
55 aai.extras.MicrophoneStream(sample_rate=16000)
56 )
57 finally:
58 client.disconnect(terminate=True)
59
60if __name__ == "__main__":
61 main()

Configuration

Enable Streaming Diarization by adding speaker_labels: true to your connection parameters. You can optionally cap the number of speakers with max_speakers.

ParameterTypeDefaultDescription
speaker_labelsbooleanfalseSet to true to enable real-time speaker diarization.
max_speakersintegerOptional. Hint the maximum number of speakers expected (1–10). Setting this accurately can improve assignment accuracy when you know the speaker count in advance.
1{
2 "speech_model": "u3-rt-pro",
3 "speaker_labels": true,
4 "max_speakers": 2
5}

Diarization is supported on u3-rt-pro, universal-streaming-english, and universal-streaming-multilingual. You do not need to change your speech model to use it — just add speaker_labels: true.

Reading speaker labels

When diarization is enabled, every Turn event includes a speaker_label field with a label such as A, B, and so on.

1{
2 "type": "Turn",
3 "transcript": "Good morning, thanks for joining the call.",
4 "speaker_label": "A",
5 "end_of_turn": true,
6 "turn_is_formatted": true
7}

If the model cannot confidently assign a speaker — typically for very short utterances at the start of a session — the speaker_label field may be null. Your application should handle this case gracefully.

A typical multi-speaker exchange looks like this:

[A] Good morning, thanks for joining the call.
[B] Good morning. Happy to be here.
[A] So let's start with a quick overview of the project timeline.
[B] Sure. We're currently on track for the March deadline.
[A] Great. And how's the team handling the workload?
[C] It's been busy, but manageable. We brought on two new engineers last week.

How speaker accuracy improves over time

Streaming Diarization builds a speaker profile incrementally as audio flows in. In practice this means:

  • Early in a session, speaker assignments may be less stable, especially if the first few turns are short.
  • As the session progresses, the model accumulates richer speaker embeddings and assignments become more consistent.

For long-form use cases (call center, clinical scribe, meeting transcription), the model will settle into accurate, stable labels well before the end of the conversation.

Known limitations

Real-time diarization is an inherently harder problem than diarization for async transcription on pre-recorded audio. The following limitations apply to the current beta:

  • Short utterances — Turns shorter than ~3 words provide insufficient audio for a reliable speaker embedding. Single-word responses like “yes” or “no” may receive a low-confidence or incorrect label.
  • Overlapping speech — When two speakers talk simultaneously, the model cannot split the audio and will assign the turn to a single speaker. Performance degrades with frequent cross-talk.
  • Session start accuracy — The first 1–2 turns of a session may be misassigned because the model has not yet built up speaker profiles. This self-corrects quickly in practice.
  • Noisy environments — Background noise and microphone bleed between speakers can reduce embedding quality and lead to more frequent misassignments.

For the best results, use a microphone setup that minimizes cross-talk and background noise, and ensure each speaker produces at least a few complete sentences before you rely on per-turn labels for downstream processing.

Supported models

Modelspeech_model valueDiarization supported
Universal-3 Pro (Streaming)u3-rt-pro
Universal Streaming (English)universal-streaming-english
Universal Streaming (Multilingual)universal-streaming-multilingual

Multichannel streaming audio

To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels.

The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.

1

Install the required dependencies.

$pip install assemblyai numpy pyaudio
2

Use this complete script to transcribe dual-channel audio with speaker separation:

1import logging
2from typing import Type
3import threading
4import time
5import wave
6import numpy as np
7import pyaudio
8
9import assemblyai as aai
10from assemblyai.streaming.v3 import (
11 BeginEvent,
12 StreamingClient,
13 StreamingClientOptions,
14 StreamingError,
15 StreamingEvents,
16 StreamingParameters,
17 TerminationEvent,
18 TurnEvent,
19)
20
21# Configuration
22API_KEY = "<YOUR_API_KEY>"
23AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
24
25logging.basicConfig(level=logging.INFO)
26logger = logging.getLogger(__name__)
27
28
29class ChannelTranscriber:
30 def __init__(self, channel_id, channel_name, sample_rate):
31 self.channel_id = channel_id
32 self.channel_name = channel_name
33 self.sample_rate = sample_rate
34 self.client = None
35 self.audio_data = []
36 self.current_turn_line = None
37 self.line_count = 0
38 self.streaming_done = threading.Event()
39
40 def load_audio_channel(self):
41 """Extract single channel from dual-channel audio file."""
42 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
43 frames = wf.readframes(wf.getnframes())
44 audio_array = np.frombuffer(frames, dtype=np.int16)
45
46 if wf.getnchannels() == 2:
47 audio_array = audio_array.reshape(-1, 2)
48 channel_audio = audio_array[:, self.channel_id]
49
50 # Split into chunks for streaming
51 FRAMES_PER_BUFFER = 400 # 50ms chunks
52 for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
53 chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
54 if len(chunk) < FRAMES_PER_BUFFER:
55 chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
56 self.audio_data.append(chunk.astype(np.int16).tobytes())
57
58 def clear_current_line(self):
59 if self.current_turn_line is not None:
60 print("\r" + " " * 100 + "\r", end="", flush=True)
61
62 def print_partial_transcript(self, words):
63 self.clear_current_line()
64 # Build transcript from individual words
65 word_texts = [word.text for word in words]
66 transcript = ' '.join(word_texts)
67 partial_text = f"{self.channel_name}: {transcript}"
68 print(partial_text, end="", flush=True)
69 self.current_turn_line = len(partial_text)
70
71 def print_final_transcript(self, transcript):
72 self.clear_current_line()
73 final_text = f"{self.channel_name}: {transcript}"
74 print(final_text, flush=True)
75 self.current_turn_line = None
76 self.line_count += 1
77
78 def on_begin(self, client: Type[StreamingClient], event: BeginEvent):
79 """Called when the streaming session begins."""
80 pass # Session started
81
82 def on_turn(self, client: Type[StreamingClient], event: TurnEvent):
83 """Called when a turn is received."""
84 transcript = event.transcript.strip() if event.transcript else ''
85 formatted = event.turn_is_formatted
86 words = event.words if event.words else []
87
88 if transcript or words:
89 if formatted:
90 self.print_final_transcript(transcript)
91 else:
92 self.print_partial_transcript(words)
93
94 def on_terminated(self, client: Type[StreamingClient], event: TerminationEvent):
95 """Called when the session is terminated."""
96 self.clear_current_line()
97 self.streaming_done.set()
98
99 def on_error(self, client: Type[StreamingClient], error: StreamingError):
100 """Called when an error occurs."""
101 print(f"\n{self.channel_name}: Error: {error}")
102 self.streaming_done.set()
103
104 def start_transcription(self):
105 """Start the transcription for this channel."""
106 self.load_audio_channel()
107
108 # Create streaming client
109 self.client = StreamingClient(
110 StreamingClientOptions(
111 api_key=API_KEY,
112 api_host="streaming.assemblyai.com",
113 )
114 )
115
116 # Register event handlers
117 self.client.on(StreamingEvents.Begin, self.on_begin)
118 self.client.on(StreamingEvents.Turn, self.on_turn)
119 self.client.on(StreamingEvents.Termination, self.on_terminated)
120 self.client.on(StreamingEvents.Error, self.on_error)
121
122 # Connect to streaming service with turn detection configuration
123 self.client.connect(
124 StreamingParameters(
125 sample_rate=self.sample_rate,
126 format_turns=True,
127 end_of_turn_confidence_threshold=0.4,
128 min_turn_silence=160,
129 max_turn_silence=400,
130 )
131 )
132
133 # Create audio generator
134 def audio_generator():
135 for chunk in self.audio_data:
136 yield chunk
137 time.sleep(0.05) # 50ms intervals
138
139 try:
140 # Stream audio
141 self.client.stream(audio_generator())
142 finally:
143 # Disconnect
144 self.client.disconnect(terminate=True)
145 self.streaming_done.set()
146
147 def start_transcription_thread(self):
148 """Start transcription in a separate thread."""
149 thread = threading.Thread(target=self.start_transcription, daemon=True)
150 thread.start()
151 return thread
152
153
154def play_audio_file():
155 try:
156 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
157 p = pyaudio.PyAudio()
158
159 stream = p.open(
160 format=p.get_format_from_width(wf.getsampwidth()),
161 channels=wf.getnchannels(),
162 rate=wf.getframerate(),
163 output=True
164 )
165
166 print(f"Playing audio: {AUDIO_FILE_PATH}")
167
168 # Play audio in chunks
169 chunk_size = 1024
170 data = wf.readframes(chunk_size)
171
172 while data:
173 stream.write(data)
174 data = wf.readframes(chunk_size)
175
176 stream.stop_stream()
177 stream.close()
178 p.terminate()
179
180 print("Audio playback finished")
181
182 except Exception as e:
183 print(f"Error playing audio: {e}")
184
185
186def transcribe_multichannel():
187 # Get sample rate from file
188 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
189 sample_rate = wf.getframerate()
190
191 # Create transcribers for each channel
192 transcriber_1 = ChannelTranscriber(0, "Speaker 1", sample_rate)
193 transcriber_2 = ChannelTranscriber(1, "Speaker 2", sample_rate)
194
195 # Start audio playback
196 audio_thread = threading.Thread(target=play_audio_file, daemon=True)
197 audio_thread.start()
198
199 # Start both transcriptions
200 thread_1 = transcriber_1.start_transcription_thread()
201 thread_2 = transcriber_2.start_transcription_thread()
202
203 # Wait for completion
204 thread_1.join()
205 thread_2.join()
206 audio_thread.join()
207
208
209if __name__ == "__main__":
210 transcribe_multichannel()
Configure turn detection for your use case

The examples above use turn detection settings optimized for short responses and rapid back-and-forth conversations. To optimize for your specific audio scenario, you can adjust the turn detection parameters.

For configuration examples tailored to different use cases, refer to our Configuration examples.

Modify the StreamingParameters in the start_transcription method:

1# Connect to streaming service with turn detection configuration
2self.client.connect(
3 StreamingParameters(
4 sample_rate=self.sample_rate,
5 format_turns=True,
6 end_of_turn_confidence_threshold=0.4,
7 min_turn_silence=160,
8 max_turn_silence=400,
9 )
10)