Multichannel streams

Multichannel streaming audio

To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels.

The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.

1

Install the required dependencies.

$pip install assemblyai numpy pyaudio
2

Use this complete script to transcribe dual-channel audio with speaker separation:

1import logging
2from typing import Type
3import threading
4import time
5import wave
6import numpy as np
7import pyaudio
8
9import assemblyai as aai
10from assemblyai.streaming.v3 import (
11 BeginEvent,
12 StreamingClient,
13 StreamingClientOptions,
14 StreamingError,
15 StreamingEvents,
16 StreamingParameters,
17 TerminationEvent,
18 TurnEvent,
19)
20
21# Configuration
22API_KEY = "<YOUR_API_KEY>"
23AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
24
25logging.basicConfig(level=logging.INFO)
26logger = logging.getLogger(__name__)
27
28
29class ChannelTranscriber:
30 def __init__(self, channel_id, channel_name, sample_rate):
31 self.channel_id = channel_id
32 self.channel_name = channel_name
33 self.sample_rate = sample_rate
34 self.client = None
35 self.audio_data = []
36 self.current_turn_line = None
37 self.line_count = 0
38 self.streaming_done = threading.Event()
39
40 def load_audio_channel(self):
41 """Extract single channel from dual-channel audio file."""
42 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
43 frames = wf.readframes(wf.getnframes())
44 audio_array = np.frombuffer(frames, dtype=np.int16)
45
46 if wf.getnchannels() == 2:
47 audio_array = audio_array.reshape(-1, 2)
48 channel_audio = audio_array[:, self.channel_id]
49
50 # Split into chunks for streaming
51 FRAMES_PER_BUFFER = 400 # 50ms chunks
52 for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
53 chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
54 if len(chunk) < FRAMES_PER_BUFFER:
55 chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
56 self.audio_data.append(chunk.astype(np.int16).tobytes())
57
58 def clear_current_line(self):
59 if self.current_turn_line is not None:
60 print("\r" + " " * 100 + "\r", end="", flush=True)
61
62 def print_partial_transcript(self, words):
63 self.clear_current_line()
64 # Build transcript from individual words
65 word_texts = [word.text for word in words]
66 transcript = ' '.join(word_texts)
67 partial_text = f"{self.channel_name}: {transcript}"
68 print(partial_text, end="", flush=True)
69 self.current_turn_line = len(partial_text)
70
71 def print_final_transcript(self, transcript):
72 self.clear_current_line()
73 final_text = f"{self.channel_name}: {transcript}"
74 print(final_text, flush=True)
75 self.current_turn_line = None
76 self.line_count += 1
77
78 def on_begin(self, client: Type[StreamingClient], event: BeginEvent):
79 """Called when the streaming session begins."""
80 pass # Session started
81
82 def on_turn(self, client: Type[StreamingClient], event: TurnEvent):
83 """Called when a turn is received."""
84 transcript = event.transcript.strip() if event.transcript else ''
85 formatted = event.turn_is_formatted
86 words = event.words if event.words else []
87
88 if transcript or words:
89 if formatted:
90 self.print_final_transcript(transcript)
91 else:
92 self.print_partial_transcript(words)
93
94 def on_terminated(self, client: Type[StreamingClient], event: TerminationEvent):
95 """Called when the session is terminated."""
96 self.clear_current_line()
97 self.streaming_done.set()
98
99 def on_error(self, client: Type[StreamingClient], error: StreamingError):
100 """Called when an error occurs."""
101 print(f"\n{self.channel_name}: Error: {error}")
102 self.streaming_done.set()
103
104 def start_transcription(self):
105 """Start the transcription for this channel."""
106 self.load_audio_channel()
107
108 # Create streaming client
109 self.client = StreamingClient(
110 StreamingClientOptions(
111 api_key=API_KEY,
112 api_host="streaming.assemblyai.com",
113 )
114 )
115
116 # Register event handlers
117 self.client.on(StreamingEvents.Begin, self.on_begin)
118 self.client.on(StreamingEvents.Turn, self.on_turn)
119 self.client.on(StreamingEvents.Termination, self.on_terminated)
120 self.client.on(StreamingEvents.Error, self.on_error)
121
122 # Connect to streaming service with turn detection configuration
123 self.client.connect(
124 StreamingParameters(
125 sample_rate=self.sample_rate,
126 format_turns=True,
127 end_of_turn_confidence_threshold=0.4,
128 min_end_of_turn_silence_when_confident=160,
129 max_turn_silence=400,
130 )
131 )
132
133 # Create audio generator
134 def audio_generator():
135 for chunk in self.audio_data:
136 yield chunk
137 time.sleep(0.05) # 50ms intervals
138
139 try:
140 # Stream audio
141 self.client.stream(audio_generator())
142 finally:
143 # Disconnect
144 self.client.disconnect(terminate=True)
145 self.streaming_done.set()
146
147 def start_transcription_thread(self):
148 """Start transcription in a separate thread."""
149 thread = threading.Thread(target=self.start_transcription, daemon=True)
150 thread.start()
151 return thread
152
153
154def play_audio_file():
155 try:
156 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
157 p = pyaudio.PyAudio()
158
159 stream = p.open(
160 format=p.get_format_from_width(wf.getsampwidth()),
161 channels=wf.getnchannels(),
162 rate=wf.getframerate(),
163 output=True
164 )
165
166 print(f"Playing audio: {AUDIO_FILE_PATH}")
167
168 # Play audio in chunks
169 chunk_size = 1024
170 data = wf.readframes(chunk_size)
171
172 while data:
173 stream.write(data)
174 data = wf.readframes(chunk_size)
175
176 stream.stop_stream()
177 stream.close()
178 p.terminate()
179
180 print("Audio playback finished")
181
182 except Exception as e:
183 print(f"Error playing audio: {e}")
184
185
186def transcribe_multichannel():
187 # Get sample rate from file
188 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
189 sample_rate = wf.getframerate()
190
191 # Create transcribers for each channel
192 transcriber_1 = ChannelTranscriber(0, "Speaker 1", sample_rate)
193 transcriber_2 = ChannelTranscriber(1, "Speaker 2", sample_rate)
194
195 # Start audio playback
196 audio_thread = threading.Thread(target=play_audio_file, daemon=True)
197 audio_thread.start()
198
199 # Start both transcriptions
200 thread_1 = transcriber_1.start_transcription_thread()
201 thread_2 = transcriber_2.start_transcription_thread()
202
203 # Wait for completion
204 thread_1.join()
205 thread_2.join()
206 audio_thread.join()
207
208
209if __name__ == "__main__":
210 transcribe_multichannel()
Configure turn detection for your use case

The examples above use turn detection settings optimized for short responses and rapid back-and-forth conversations. To optimize for your specific audio scenario, you can adjust the turn detection parameters.

For configuration examples tailored to different use cases, refer to our Configuration examples.

Modify the StreamingParameters in the start_transcription method:

1# Connect to streaming service with turn detection configuration
2self.client.connect(
3 StreamingParameters(
4 sample_rate=self.sample_rate,
5 format_turns=True,
6 end_of_turn_confidence_threshold=0.4,
7 min_end_of_turn_silence_when_confident=160,
8 max_turn_silence=400,
9 )
10)