> ## Documentation Index
> Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Transcribe audio files with Streaming

This guide shows you how to transcribe **WAV audio files** with varying sample rates using our Streaming API.

<Note>
  If you're streaming a pre-recorded file for benchmarking or testing, see [Stream a pre-recorded file in real time](/streaming/guides/stream_prerecorded_file_realtime) for wall-clock pacing that more closely simulates live microphone input.
</Note>

## Quickstart

Here is the complete Python script to transcribe a WAV audio file using the Streaming API.

```python expandable theme={null}
import websocket
import json
import threading
import time
import wave
import sys
import os
from urllib.parse import urlencode
from pathlib import Path

# --- Configuration ---
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
AUDIO_FILE = "audio.wav"  # Path to your audio file
SAMPLE_RATE = 48000  # Change to match the sample rate of your audio file
SAVE_TRANSCRIPT_TO_FILE = True  # Set to False to disable saving transcript to file
PLAY_AUDIO = True  # Set to False to disable audio playback

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": SAMPLE_RATE,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Global variables
ws_app = None
audio_thread = None
stop_event = threading.Event()

# Track session data for output file
session_data = {
    "session_id": None,
    "audio_file": AUDIO_FILE,
    "audio_duration_seconds": None,
    "turns": []
}

# --- Helper Functions ---

def validate_audio_file(filepath, sample_rate):
    """Validate audio file before streaming."""
    file_ext = Path(filepath).suffix.lower()
    if file_ext != ".wav":
        print(f"Error: Only WAV files are supported. Got: {file_ext}", file=sys.stderr)
        print(f"Convert your file to WAV using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
        sys.exit(1)

    with wave.open(filepath, 'rb') as wav_file:
        if wav_file.getnchannels() != 1:
            print("Error: Only mono audio is supported", file=sys.stderr)
            print(f"Convert your file to mono using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)

        file_sample_rate = wav_file.getframerate()
        if file_sample_rate != sample_rate:
            print(f"Error: File sample rate ({file_sample_rate}) doesn't match expected rate ({sample_rate})", file=sys.stderr)
            print(f"Either update SAMPLE_RATE to {file_sample_rate}, or convert your file using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)


def save_transcript():
    """Save the transcript to a file in the same directory as the script."""
    audio_name = Path(session_data["audio_file"]).stem
    session_id = session_data["session_id"] or "unknown"
    output_file = f"{audio_name}_{session_id}.txt"

    with open(output_file, "w") as f:
        f.write(f"AssemblyAI Session ID: {session_data['session_id']}\n")
        f.write(f"Audio file: {session_data['audio_file']}\n")
        f.write(f"Audio duration: {session_data['audio_duration_seconds']} seconds\n")
        f.write("See all available parameters and defaults at https://www.assemblyai.com/docs/api-reference/streaming-api/universal-3-pro-streaming#request.query\n\n")
        f.write("\nTranscription Output\n")
        for i, turn in enumerate(session_data["turns"], 1):
            f.write(f"[Turn #{i}]: {turn}\n")

    print(f"Transcript saved to {output_file}")


# --- WebSocket Event Handlers ---

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_file():
        chunk_duration = 0.05  # 50ms chunks
        audio_player = None

        if PLAY_AUDIO:
            try:
                import pyaudio
                p = pyaudio.PyAudio()
                with wave.open(AUDIO_FILE, 'rb') as wav_file:
                    audio_player = p.open(
                        format=p.get_format_from_width(wav_file.getsampwidth()),
                        channels=wav_file.getnchannels(),
                        rate=wav_file.getframerate(),
                        output=True
                    )
            except ImportError:
                print("Warning: pyaudio not installed. Audio playback disabled.", file=sys.stderr)
                print("Install with: pip install pyaudio", file=sys.stderr)

        try:
            with wave.open(AUDIO_FILE, 'rb') as wav_file:
                frames_per_chunk = int(SAMPLE_RATE * chunk_duration)

                while not stop_event.is_set():
                    frames = wav_file.readframes(frames_per_chunk)
                    if not frames:
                        break

                    if audio_player:
                        audio_player.write(frames)
                    else:
                        time.sleep(chunk_duration)

                    ws.send(frames, websocket.ABNF.OPCODE_BINARY)
        finally:
            if audio_player:
                audio_player.stop_stream()
                audio_player.close()
                p.terminate()

        # All audio sent - terminate the session
        print("File streaming complete. Waiting for final transcripts...")
        try:
            ws.send(json.dumps({"type": "Terminate"}))
        except Exception:
            pass

    global audio_thread
    audio_thread = threading.Thread(target=stream_file)
    audio_thread.daemon = True
    audio_thread.start()


def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_data["session_id"] = data.get('id')
            print(f"Session ID: {data.get('id')}\n")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if not transcript:
                return

            if data.get('end_of_turn'):
                print(f"[Final]: {transcript}\n")
                session_data["turns"].append(transcript)
            else:
                print(f"[Partial]: {transcript}")
        elif msg_type == "Termination":
            session_data["audio_duration_seconds"] = data.get('audio_duration_seconds', 0)
            print(f"Session terminated: {data.get('audio_duration_seconds', 0)} seconds of audio processed")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")


def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    stop_event.set()

    if SAVE_TRANSCRIPT_TO_FILE:
        save_transcript()

    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)


# --- Main Execution ---
def run():
    global ws_app

    # Validate audio file before connecting
    validate_audio_file(AUDIO_FILE, SAMPLE_RATE)

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                ws_app.send(json.dumps({"type": "Terminate"}))
                time.sleep(2)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
```

## Step-by-step guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can [sign up](https://assemblyai.com/dashboard/signup) and get your API key from your [dashboard](https://www.assemblyai.com/dashboard/api-keys).

### Install and import packages

Install the required packages. PyAudio is optional — only needed for audio playback during streaming.

```bash theme={null}
pip install websocket-client
pip install pyaudio
```

Import packages.

```python theme={null}
import websocket
import json
import os
import threading
import time
import wave
import sys
from urllib.parse import urlencode
from pathlib import Path
```

### Configure settings

Set your `ASSEMBLYAI_API_KEY` environment variable.

Set `AUDIO_FILE` to the relative or absolute path of your audio file, and set `SAMPLE_RATE` to match your file's sample rate.

```python expandable theme={null}
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
AUDIO_FILE = "audio.wav"  # Path to your audio file
SAMPLE_RATE = 48000  # Change to match the sample rate of your audio file
SAVE_TRANSCRIPT_TO_FILE = True  # Set to False to disable saving transcript to file
PLAY_AUDIO = True  # Set to False to disable audio playback

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": SAMPLE_RATE,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Global variables
ws_app = None
audio_thread = None
stop_event = threading.Event()

# Track session data for output file
session_data = {
    "session_id": None,
    "audio_file": AUDIO_FILE,
    "audio_duration_seconds": None,
    "turns": []
}
```

### Helper functions

The following helper functions are used to validate audio files and save the transcript output:

* `validate_audio_file()` - Validates that the audio file is a mono WAV file with the expected sample rate.
* `save_transcript()` - Saves the transcript to a text file after the session ends.

```python expandable theme={null}
def validate_audio_file(filepath, sample_rate):
    """Validate audio file before streaming."""
    file_ext = Path(filepath).suffix.lower()
    if file_ext != ".wav":
        print(f"Error: Only WAV files are supported. Got: {file_ext}", file=sys.stderr)
        print(f"Convert your file to WAV using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
        sys.exit(1)

    with wave.open(filepath, 'rb') as wav_file:
        if wav_file.getnchannels() != 1:
            print("Error: Only mono audio is supported", file=sys.stderr)
            print(f"Convert your file to mono using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)

        file_sample_rate = wav_file.getframerate()
        if file_sample_rate != sample_rate:
            print(f"Error: File sample rate ({file_sample_rate}) doesn't match expected rate ({sample_rate})", file=sys.stderr)
            print(f"Either update SAMPLE_RATE to {file_sample_rate}, or convert your file using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)


def save_transcript():
    """Save the transcript to a file in the same directory as the script."""
    audio_name = Path(session_data["audio_file"]).stem
    session_id = session_data["session_id"] or "unknown"
    output_file = f"{audio_name}_{session_id}.txt"

    with open(output_file, "w") as f:
        f.write(f"AssemblyAI Session ID: {session_data['session_id']}\n")
        f.write(f"Audio file: {session_data['audio_file']}\n")
        f.write(f"Audio duration: {session_data['audio_duration_seconds']} seconds\n")
        f.write("See all available parameters and defaults at https://www.assemblyai.com/docs/api-reference/streaming-api/universal-3-pro-streaming#request.query\n\n")
        f.write("\nTranscription Output\n")
        for i, turn in enumerate(session_data["turns"], 1):
            f.write(f"[Turn #{i}]: {turn}\n")

    print(f"Transcript saved to {output_file}")
```

### WebSocket event handlers

#### Open WebSocket and stream audio file

When the connection opens, we start a background thread that reads the WAV file in 50ms chunks and sends them over the WebSocket. If `PLAY_AUDIO` is enabled, the audio is also played through your speakers.

```python expandable theme={null}
def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_file():
        chunk_duration = 0.05  # 50ms chunks
        audio_player = None

        if PLAY_AUDIO:
            try:
                import pyaudio
                p = pyaudio.PyAudio()
                with wave.open(AUDIO_FILE, 'rb') as wav_file:
                    audio_player = p.open(
                        format=p.get_format_from_width(wav_file.getsampwidth()),
                        channels=wav_file.getnchannels(),
                        rate=wav_file.getframerate(),
                        output=True
                    )
            except ImportError:
                print("Warning: pyaudio not installed. Audio playback disabled.", file=sys.stderr)
                print("Install with: pip install pyaudio", file=sys.stderr)

        try:
            with wave.open(AUDIO_FILE, 'rb') as wav_file:
                frames_per_chunk = int(SAMPLE_RATE * chunk_duration)

                while not stop_event.is_set():
                    frames = wav_file.readframes(frames_per_chunk)
                    if not frames:
                        break

                    if audio_player:
                        audio_player.write(frames)
                    else:
                        time.sleep(chunk_duration)

                    ws.send(frames, websocket.ABNF.OPCODE_BINARY)
        finally:
            if audio_player:
                audio_player.stop_stream()
                audio_player.close()
                p.terminate()

        # All audio sent - terminate the session
        print("File streaming complete. Waiting for final transcripts...")
        try:
            ws.send(json.dumps({"type": "Terminate"}))
        except Exception:
            pass

    global audio_thread
    audio_thread = threading.Thread(target=stream_file)
    audio_thread.daemon = True
    audio_thread.start()
```

#### Handle WebSocket messages

```python expandable theme={null}
def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_data["session_id"] = data.get('id')
            print(f"Session ID: {data.get('id')}\n")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if not transcript:
                return

            if data.get('end_of_turn'):
                print(f"[Final]: {transcript}\n")
                session_data["turns"].append(transcript)
            else:
                print(f"[Partial]: {transcript}")
        elif msg_type == "Termination":
            session_data["audio_duration_seconds"] = data.get('audio_duration_seconds', 0)
            print(f"Session terminated: {data.get('audio_duration_seconds', 0)} seconds of audio processed")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")
```

#### WebSocket error and close handlers

```python theme={null}
def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    stop_event.set()

    if SAVE_TRANSCRIPT_TO_FILE:
        save_transcript()

    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)
```

### Connect and stream the file

```python expandable theme={null}
def run():
    global ws_app

    # Validate audio file before connecting
    validate_audio_file(AUDIO_FILE, SAMPLE_RATE)

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                ws_app.send(json.dumps({"type": "Terminate"}))
                time.sleep(2)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
```

The session will terminate once the file is finished streaming. If `SAVE_TRANSCRIPT_TO_FILE` is enabled (default), the transcript will be saved to `{audio_filename}_{session_id}.txt` in the current working directory.

<Note>
  The `AUDIO_FILE` path can be either relative (e.g., `audio.wav`) or absolute
  (e.g., `/path/to/audio.wav`).
</Note>

## Example output

Here's an example of what the console output looks like when streaming an audio file:

```console expandable theme={null}
WebSocket connection opened.
Connected to: wss://streaming.assemblyai.com/v3/ws?speech_model=u3-rt-pro&sample_rate=48000

Session ID: f37d7c4e-6be9-47ed-b6fc-7600fc78e34d

[Partial]: the
[Partial]: the quick
[Partial]: the quick brown
[Partial]: the quick brown fox
[Partial]: the quick brown fox jumps
[Partial]: the quick brown fox jumps over
[Partial]: the quick brown fox jumps over the
[Partial]: the quick brown fox jumps over the lazy
[Partial]: The quick brown fox jumps over the lazy dog
[Final]: The quick brown fox jumps over the lazy dog.

[Partial]: It
[Partial]: It is
[Partial]: It is a
[Partial]: It is a common
[Partial]: It is a common typing
[Partial]: It is a common typing test
[Final]: It is a common typing test.

File streaming complete. Waiting for final transcripts...
Session terminated: 7.52 seconds of audio processed

WebSocket Disconnected: Status=1000, Msg=None
Transcript saved to audio_f37d7c4e-6be9-47ed-b6fc-7600fc78e34d.txt
Cleanup complete. Exiting.
```

The output shows:

* **Partial transcripts**: Real-time updates as words are recognized
* **Final**: The complete turn with proper capitalization and punctuation
