Use this file to discover all available pages before exploring further.
In this guide, we’ll show you how to generate Speaker Labels using Pyannote with an AssemblyAI transcript. This can be used to generate Speaker Labels for languages we currently do not support for speaker labelling.
import osimport assemblyai as aaifrom pyannote.audio import Pipelineimport torchimport pandas as pdimport numpy as np# Assign your API keysHUGGING_FACE_TOKEN = os.getenv("HF_TOKEN")ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")# Authenticate with AssemblyAIaai.settings.api_key = ASSEMBLYAI_API_KEYdef transcribe_audio(audio_file, language="en"): """ Transcribe an audio file using AssemblyAI. Args: audio_file (str): Path to the audio file. language (str, optional): Language code for transcription. Defaults to "en". Returns: aai.Transcript: The transcription result. """ transcriber = aai.Transcriber(config=aai.TranscriptionConfig(speech_models=["universal-3-pro", "universal-2"], language_code=language)) transcript = transcriber.transcribe(audio_file) print(f"Transcript ID: {transcript.id}") return transcriptdef get_speaker_labels(audio_file, transcript: aai.Transcript): """ Perform speaker diarization on an audio file and combine results with the transcript. Args: audio_file (str): Path to the audio file. transcript (aai.Transcript): The transcription result from AssemblyAI. Returns: str: A formatted string containing the transcript with speaker labels and timestamps. """ # Initialize the speaker diarization pipeline with GPU support device = torch.device("cuda" if torch.cuda.is_available() else "cpu") pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization", use_auth_token=HUGGING_FACE_TOKEN, ) if pipeline is None: raise ValueError("Failed to initialize the pipeline. Please check your authentication token and internet connection.") else: pipeline = pipeline.to(device) # Apply the pipeline to the audio file diarization = pipeline(audio_file) # Create a dictionary to store speaker segments speaker_segments = {} # Process diarization results for turn, _, speaker in diarization.itertracks(yield_label=True): start, end = turn.start, turn.end if speaker not in speaker_segments: speaker_segments[speaker] = [] speaker_segments[speaker].append((start, end)) # Convert speaker_segments to a DataFrame diarize_df = pd.DataFrame([(speaker, start, end) for speaker, segments in speaker_segments.items() for start, end in segments], columns=['speaker', 'start', 'end']) # Assign speakers to transcript words for word in transcript.words: word_start = float(word.start) / 1000 word_end = float(word.end) / 1000 overlaps = diarize_df[ (diarize_df['start'] <= word_end) & (diarize_df['end'] >= word_start) ].copy() if not overlaps.empty: overlaps['overlap'] = np.minimum(overlaps['end'], word_end) - np.maximum(overlaps['start'], word_start) word.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker'] else: word.speaker = "Unknown" full_transcript = '' # Update segment speakers based on the majority speaker of its words for segment in transcript.get_sentences(): segment_start = float(segment.start) / 1000 segment_end = float(segment.end) / 1000 overlaps = diarize_df[ (diarize_df['start'] <= segment_end) & (diarize_df['end'] >= segment_start) ].copy() if not overlaps.empty: overlaps['overlap'] = np.minimum(overlaps['end'], segment_end) - np.maximum(overlaps['start'], segment_start) segment.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker'] speaker_label = segment.speaker.replace('SPEAKER_', 'SPEAKER ') full_transcript += f'[{format_timestamp(segment_start)}] {speaker_label}: {segment.text}\n' else: segment.speaker = "Unknown" full_transcript += f'[{format_timestamp(segment_start)}] Unknown: {segment.text}\n' return full_transcriptdef format_timestamp(seconds): """ Convert seconds to a formatted timestamp string (HH:MM:SS). Args: seconds (float): Time in seconds. Returns: str: Formatted timestamp string. """ hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}"audio_file = "audio.wav" # your local file pathtranscript: aai.Transcript = transcribe_audio(audio_file, language="hr") # select a language codetranscript_with_speakers = get_speaker_labels(audio_file, transcript)print(transcript_with_speakers)
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for a free account and get your API key from your dashboard.You’ll also need a HuggingFace account and API key. You can sign up for a free account and get your API key here. Create a Read type API token to ensure the necessary permissions are enabled.
Browse to the
speaker-diarization and
segmentation model pages and
accept the Gated Model Terms & Conditions by entering your
Company/University, Website and Use Case details in order to gain
access to the use of these models.
Import the necessary dependencies, assign your API keys and authenticate with AssemblyAI.
import osimport assemblyai as aaifrom pyannote.audio import Pipelineimport torchimport pandas as pdimport numpy as np# Assign your API keysHUGGING_FACE_TOKEN = os.getenv("HF_TOKEN")ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")# Authenticate with AssemblyAIaai.settings.api_key = ASSEMBLYAI_API_KEY
Create the transcribe_audio function, this will handle the transcription process with AssemblyAI.
def transcribe_audio(audio_file, language="en"): """ Transcribe an audio file using AssemblyAI. Args: audio_file (str): Path to the audio file. language (str, optional): Language code for transcription. Defaults to "en". Returns: aai.Transcript: The transcription result. """ transcriber = aai.Transcriber(config=aai.TranscriptionConfig(speech_models=["universal-3-pro", "universal-2"], language_code=language)) transcript = transcriber.transcribe(audio_file) print(f"Transcript ID: {transcript.id}") return transcript
Create the get_speaker_labelsfunction, this will handle the speaker diarization model processing to generate the custom speaker labels for the transcript.Firstly, it initializes and applies the pipeline to the audio file.Secondly, it processes the diarization results and converts the speaker segments into a DataFrame so we can compare the results with the transcript.Lastly, the speaker segments are compared and assigned to the words and sentences of the transcript to create the speaker labelled transcript.
def get_speaker_labels(audio_file, transcript: aai.Transcript): """ Perform speaker diarization on an audio file and combine results with the transcript. Args: audio_file (str): Path to the audio file. transcript (aai.Transcript): The transcription result from AssemblyAI. Returns: str: A formatted string containing the transcript with speaker labels and timestamps. """ # Initialize the speaker diarization pipeline with GPU support device = torch.device("cuda" if torch.cuda.is_available() else "cpu") pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization", use_auth_token=HUGGING_FACE_TOKEN, ) if pipeline is None: raise ValueError("Failed to initialize the pipeline. Please check your authentication token and internet connection.") else: pipeline = pipeline.to(device) # Apply the pipeline to the audio file diarization = pipeline(audio_file) # Create a dictionary to store speaker segments speaker_segments = {} # Process diarization results for turn, _, speaker in diarization.itertracks(yield_label=True): start, end = turn.start, turn.end if speaker not in speaker_segments: speaker_segments[speaker] = [] speaker_segments[speaker].append((start, end)) # Convert speaker_segments to a DataFrame diarize_df = pd.DataFrame([(speaker, start, end) for speaker, segments in speaker_segments.items() for start, end in segments], columns=['speaker', 'start', 'end']) # Assign speakers to transcript words for word in transcript.words: word_start = float(word.start) / 1000 word_end = float(word.end) / 1000 overlaps = diarize_df[ (diarize_df['start'] <= word_end) & (diarize_df['end'] >= word_start) ].copy() if not overlaps.empty: overlaps['overlap'] = np.minimum(overlaps['end'], word_end) - np.maximum(overlaps['start'], word_start) word.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker'] else: word.speaker = "Unknown" full_transcript = '' # Update segment speakers based on the majority speaker of its words for segment in transcript.get_sentences(): segment_start = float(segment.start) / 1000 segment_end = float(segment.end) / 1000 overlaps = diarize_df[ (diarize_df['start'] <= segment_end) & (diarize_df['end'] >= segment_start) ].copy() if not overlaps.empty: overlaps['overlap'] = np.minimum(overlaps['end'], segment_end) - np.maximum(overlaps['start'], segment_start) segment.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker'] speaker_label = segment.speaker.replace('SPEAKER_', 'SPEAKER ') full_transcript += f'[{format_timestamp(segment_start)}] {speaker_label}: {segment.text}\n' else: segment.speaker = "Unknown" full_transcript += f'[{format_timestamp(segment_start)}] Unknown: {segment.text}\n' return full_transcript
How can I set the number of speakers?
If you know the number of speakers in advance, you can use the num_speakers parameter to set the number of speakers:
# Apply the pipeline to the audio filediarization = pipeline(audio_file, num_speakers=4)
You can also provide upper/lower bands on the number of speakers using the min_speakers and max_speakers parameters:
# Apply the pipeline to the audio filediarization = pipeline(audio_file, min_speakers=2, max_speakers=5)
Create the format_timestamp, this will handle the timestamps conversion to improve the readability of the final speaker labelled transcript.
def format_timestamp(seconds): """ Convert seconds to a formatted timestamp string (HH:MM:SS). Args: seconds (float): Time in seconds. Returns: str: Formatted timestamp string. """ hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
Finally, select a local file and call the functions to generate and print your custom Speaker Labelled transcript.
audio_file = "audio.wav" # your local file pathtranscript: aai.Transcript = transcribe_audio(audio_file, language="hr") # select a language codetranscript_with_speakers = get_speaker_labels(audio_file, transcript)print(transcript_with_speakers)
Here’s an example speaker labelled output from a Croatian file:
[00:00:05] SPEAKER 04: Nalazimo se u Centro Zagreba, u parku Zrinjevac, gdje je kao što vidite jako ljepo, vreme je prekrasno, a danas ćemo ljude pitati što im se sviđa u Zagrebu ili što im se možda ne sviđa u Zagrebu.[00:00:42] SPEAKER 04: Dobar dan, može jednokratko pitanje samo.[00:00:46] SPEAKER 04: Može?[00:00:48] SPEAKER 04: Evo lako, što vam se najviše sviđa u Zagrebu?[00:00:50] SPEAKER 07: Što mi se najviše sviđa u Zagrebu?[00:00:53] SPEAKER 07: E sad, teško pitanje, ali trenutno mi se najviše sviđa što nije klasična jesen, nego više prođeče u zraku.[00:01:06] SPEAKER 07: Dobre.[00:01:09] SPEAKER 07: Može sigurnost još uvijek s osišam sigurno u Zagrebu.[00:01:13] SPEAKER 04: I po noći?[00:01:15] SPEAKER 07: Pa po noći ne šetam baš toliko po noći, ali centar grada mi je dosta siguran, osvijetljen i to mi je okej.