> ## Documentation Index
> Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Migration Guide: Gladia to AssemblyAI

This guide walks through the process of migrating from Gladia to AssemblyAI for transcribing streaming audio.

## Get Started

Before we begin, make sure you have an AssemblyAI account and an API key. You can [sign up](https://assemblyai.com/dashboard/signup) for a free account and get your API key from your [AssemblyAI dashboard](https://www.assemblyai.com/app/api-keys).

## Side-By-Side Code Comparison

Below is a side-by-side comparison of a basic snippet to transcribe live audio by Gladia and AssemblyAI using a microphone:

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python expandable theme={null}
    import asyncio
    import base64
    import json
    import signal
    from datetime import time
    import pyaudio
    import requests
    from websockets.asyncio.client import connect
    from websockets.exceptions import ConnectionClosedOK

    # Constants
    GLADIA_API_KEY = "<YOUR_GLADIA_API_KEY>"
    GLADIA_API_URL = "https://api.gladia.io"

    # Audio configuration
    CHANNELS = 1
    FORMAT = pyaudio.paInt16
    FRAMES_PER_BUFFER = 3200
    SAMPLE_RATE = 16_000

    async def main():
        # Initialize the session
        config = {
            "encoding": "wav/pcm",
            "sample_rate": SAMPLE_RATE,
            "bit_depth": 16,
            "channels": CHANNELS,
            "language_config": {
                "languages": ["en"],
            },
        }

        # Start the live session
        response = requests.post(
            f"{GLADIA_API_URL}/v2/live",
            headers={"X-Gladia-Key": GLADIA_API_KEY},
            json=config,
            timeout=3,
        )

        if not response.ok:
            print(f"{response.status_code}: {response.text or response.reason}")
            exit(response.status_code)

        session_data = response.json()

        # Connect to the websocket
        async with connect(session_data["url"]) as websocket:
            print("\n################ Begin session ################\n")

            # Handle Ctrl+C to stop recording
            loop = asyncio.get_running_loop()
            loop.add_signal_handler(
                signal.SIGINT,
                lambda: loop.create_task(stop_recording(websocket)),
            )

            # Create tasks for sending audio and receiving transcripts
            send_audio_task = asyncio.create_task(send_audio(websocket))
            receive_transcript_task = asyncio.create_task(receive_transcript(websocket))

            await asyncio.wait([send_audio_task, receive_transcript_task])

    async def stop_recording(websocket):
        print(">>>>> Ending the recording…")
        await websocket.send(json.dumps({"type": "stop_recording"}))
        await asyncio.sleep(0)

    async def send_audio(websocket):
        # Initialize PyAudio
        p = pyaudio.PyAudio()

        # Open audio stream
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=SAMPLE_RATE,
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
        )

        # Send audio chunks
        while True:
            data = stream.read(FRAMES_PER_BUFFER)
            data = base64.b64encode(data).decode("utf-8")
            json_data = json.dumps({"type": "audio_chunk", "data": {"chunk": str(data)}})
            try:
                await websocket.send(json_data)
                await asyncio.sleep(0.1)  # Send audio every 100ms
            except ConnectionClosedOK:
                return

    async def receive_transcript(websocket):
        # Process incoming messages
        async for message in websocket:
            content = json.loads(message)

            # Print transcripts
            if content["type"] == "transcript" and content["data"]["is_final"]:
                text = content["data"]["utterance"]["text"].strip()
                print(f"Final: {text}")

            # Print final results
            if content["type"] == "post_final_transcript":
                print("\n################ End of session ################\n")
                print(json.dumps(content, indent=2, ensure_ascii=False))

    if __name__ == "__main__":
        asyncio.run(main())
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    import pyaudio
    import websocket
    import json
    import threading
    import time
    from urllib.parse import urlencode
    from datetime import datetime

    # --- Configuration ---
    YOUR_API_KEY = "YOUR-API-KEY"  # Replace with your actual API key

    CONNECTION_PARAMS = {
        "sample_rate": 16000,
        "speech_model": "u3-rt-pro",
    }
    API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
    API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

    # Audio Configuration
    FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
    SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
    CHANNELS = 1
    FORMAT = pyaudio.paInt16

    # Global variables for audio stream and websocket
    audio = None
    stream = None
    ws_app = None
    audio_thread = None
    stop_event = threading.Event()  # To signal the audio thread to stop

    # --- WebSocket Event Handlers ---


    def on_open(ws):
        """Called when the WebSocket connection is established."""
        print("WebSocket connection opened.")
        print(f"Connected to: {API_ENDPOINT}")

        # Start sending audio data in a separate thread
        def stream_audio():
            global stream
            print("Starting audio streaming...")
            while not stop_event.is_set():
                try:
                    audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                    # Send audio data as binary message
                    ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                except Exception as e:
                    print(f"Error streaming audio: {e}")
                    # If stream read fails, likely means it's closed, stop the loop
                    break
            print("Audio streaming stopped.")

        global audio_thread
        audio_thread = threading.Thread(target=stream_audio)
        audio_thread.daemon = (
            True  # Allow main thread to exit even if this thread is running
        )
        audio_thread.start()

    def on_message(ws, message):
        try:
            data = json.loads(message)
            msg_type = data.get('type')

            if msg_type == "Begin":
                session_id = data.get('id')
                expires_at = data.get('expires_at')
                print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
            elif msg_type == "Turn":
                transcript = data.get('transcript', '')
                end_of_turn = data.get('end_of_turn', False)

                # Print final end-of-turn transcript
                if end_of_turn:
                    print('\r' + ' ' * 80 + '\r', end='')
                    print(transcript)
                else:
                    print(f"\r{transcript}", end='')
            elif msg_type == "Termination":
                audio_duration = data.get('audio_duration_seconds', 0)
                session_duration = data.get('session_duration_seconds', 0)
                print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
        except json.JSONDecodeError as e:
            print(f"Error decoding message: {e}")
        except Exception as e:
            print(f"Error handling message: {e}")

    def on_error(ws, error):
        """Called when a WebSocket error occurs."""
        print(f"\nWebSocket Error: {error}")
        # Attempt to signal stop on error
        stop_event.set()


    def on_close(ws, close_status_code, close_msg):
        """Called when the WebSocket connection is closed."""
        print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
        # Ensure audio resources are released
        global stream, audio
        stop_event.set()  # Signal audio thread just in case it's still running

        if stream:
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            stream = None
        if audio:
            audio.terminate()
            audio = None
        # Try to join the audio thread to ensure clean exit
        if audio_thread and audio_thread.is_alive():
            audio_thread.join(timeout=1.0)


    # --- Main Execution ---
    def run():
        global audio, stream, ws_app

        # Initialize PyAudio
        audio = pyaudio.PyAudio()

        # Open microphone stream
        try:
            stream = audio.open(
                input=True,
                frames_per_buffer=FRAMES_PER_BUFFER,
                channels=CHANNELS,
                format=FORMAT,
                rate=SAMPLE_RATE,
            )
            print("Microphone stream opened successfully.")
            print("Speak into your microphone. Press Ctrl+C to stop.")
        except Exception as e:
            print(f"Error opening microphone stream: {e}")
            if audio:
                audio.terminate()
            return  # Exit if microphone cannot be opened

        # Create WebSocketApp
        ws_app = websocket.WebSocketApp(
            API_ENDPOINT,
            header={"Authorization": YOUR_API_KEY},
            on_open=on_open,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close,
        )

        # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
        ws_thread = threading.Thread(target=ws_app.run_forever)
        ws_thread.daemon = True
        ws_thread.start()

        try:
            # Keep main thread alive until interrupted
            while ws_thread.is_alive():
                time.sleep(0.1)
        except KeyboardInterrupt:
            print("\nCtrl+C received. Stopping...")
            stop_event.set()  # Signal audio thread to stop

            # Send termination message to the server
            if ws_app and ws_app.sock and ws_app.sock.connected:
                try:
                    terminate_message = {"type": "Terminate"}
                    print(f"Sending termination message: {json.dumps(terminate_message)}")
                    ws_app.send(json.dumps(terminate_message))
                    # Give a moment for messages to process before forceful close
                    time.sleep(5)
                except Exception as e:
                    print(f"Error sending termination message: {e}")

            # Close the WebSocket connection (will trigger on_close)
            if ws_app:
                ws_app.close()

            # Wait for WebSocket thread to finish
            ws_thread.join(timeout=2.0)

        except Exception as e:
            print(f"\nAn unexpected error occurred: {e}")
            stop_event.set()
            if ws_app:
                ws_app.close()
            ws_thread.join(timeout=2.0)

        finally:
            # Final cleanup (already handled in on_close, but good as a fallback)
            if stream and stream.is_active():
                stream.stop_stream()
            if stream:
                stream.close()
            if audio:
                audio.terminate()
            print("Cleanup complete. Exiting.")


    if __name__ == "__main__":
        run()

    ```
  </Tab>
</Tabs>

## Authentication

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    import asyncio
    import base64
    import json
    import signal
    from datetime import time
    import pyaudio
    import requests
    from websockets.asyncio.client import connect
    from websockets.exceptions import ConnectionClosedOK

    GLADIA_API_KEY = "<YOUR_GLADIA_API_KEY>"
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python theme={null}
    import pyaudio
    import websocket
    import json
    import threading
    import time
    from urllib.parse import urlencode
    from datetime import datetime

    YOUR_API_KEY = "YOUR-API-KEY"
    ```
  </Tab>
</Tabs>

<Note>
  **Protect Your API Key**

  For improved security, store your API key as an environment variable.
</Note>

## Connection Parameters & Microphone Setup

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    GLADIA_API_URL = "https://api.gladia.io"

    CHANNELS = 1
    FORMAT = pyaudio.paInt16
    FRAMES_PER_BUFFER = 3200
    SAMPLE_RATE = 16_000

    config = {
        "encoding": "wav/pcm",
        "sample_rate": SAMPLE_RATE,
        "bit_depth": 16,
        "channels": CHANNELS,
        "language_config": {
            "languages": ["en"]
        },
    }
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python theme={null}
    CONNECTION_PARAMS = {
        "sample_rate": 16000,
        "speech_model": "u3-rt-pro",
    }
    API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
    API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

    # Audio Configuration
    FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
    SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
    CHANNELS = 1
    FORMAT = pyaudio.paInt16

    # Global variables for audio stream and websocket
    audio = None
    stream = None
    ws_app = None
    audio_thread = None
    stop_event = threading.Event()  # To signal the audio thread to stop
    ```
  </Tab>
</Tabs>

Helpful information about our streaming model:

* **Universal-3 Pro Model** — Connect to `wss://streaming.assemblyai.com/v3/ws` with `speech_model=u3-rt-pro` to use our latest, highest-accuracy streaming model — Universal-3 Pro.

* **Built-in Formatting** — Universal-3 Pro always returns formatted transcripts with smart punctuation & casing. No extra parameter is needed.

* **Partial Transcripts** — AssemblyAI streams interim results automatically. Universal-3 Pro emits partials during periods of silence, with at most one partial per silence period.

## Open Microphone Stream & Create WebSocket

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python expandable theme={null}

    # Start the live session
    response = requests.post(
        f"{GLADIA_API_URL}/v2/live",
        headers={"X-Gladia-Key": GLADIA_API_KEY},
        json=config,
        timeout=3,
    )

    if not response.ok:
        print(f"{response.status_code}: {response.text or response.reason}")
        exit(response.status_code)

    session_data = response.json()

    async def send_audio(websocket):
        # Initialize PyAudio
        p = pyaudio.PyAudio()

        # Open audio stream
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=SAMPLE_RATE,
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
        )

        # Send audio chunks
        while True:
            data = stream.read(FRAMES_PER_BUFFER)
            data = base64.b64encode(data).decode("utf-8")
            json_data = json.dumps({"type": "audio_chunk", "data": {"chunk": str(data)}})
            try:
                await websocket.send(json_data)
                await asyncio.sleep(0.1)  # Send audio every 100ms
            except ConnectionClosedOK:
                return
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    global audio, stream, ws_app
    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return  # Exit if microphone cannot be opened

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )
    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    ```

    <Note>
      **Websocket Error Handling**

      Capture and log any errors emitted by the WebSocket connection to streamline troubleshooting and maintain smooth operation.

      ```python theme={null}
      def on_error(ws, error):
          """Called when a WebSocket error occurs."""
          print(f'\nWebSocket Error: {error}')
          # Attempt to signal stop on error
          stop_event.set()
      ```
    </Note>
  </Tab>
</Tabs>

## Open WebSocket

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    # Connect to Websocket
    async with connect(session_data["url"]) as websocket:
        print("\n################ Begin session ################\n")
        
        # Create tasks for sending audio and receiving transcripts
        send_audio_task = asyncio.create_task(send_audio(websocket))
        receive_transcript_task = asyncio.create_task(receive_transcript(websocket))
        
        await asyncio.wait([send_audio_task, receive_transcript_task])
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    def on_open(ws):
        """Called when the WebSocket connection is established."""
        print("WebSocket connection opened.")
        print(f"Connected to: {API_ENDPOINT}")

        # Start sending audio data in a separate thread
        def stream_audio():
            global stream
            print("Starting audio streaming...")
            while not stop_event.is_set():
                try:
                    audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                    # Send audio data as binary message
                    ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                except Exception as e:
                    print(f"Error streaming audio: {e}")
                    # If stream read fails, likely means it's closed, stop the loop
                    break
            print("Audio streaming stopped.")

        global audio_thread
        audio_thread = threading.Thread(target=stream_audio)
        audio_thread.daemon = (
            True  # Allow main thread to exit even if this thread is running
        )
        audio_thread.start()
    ```
  </Tab>
</Tabs>

## Receive Messsages from WebSocket

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    async def receive_transcript(websocket):
        # Process incoming messages
        async for message in websocket:
            content = json.loads(message)
            
            # Print transcripts
            if content["type"] == "transcript" and content["data"]["is_final"]:
                text = content["data"]["utterance"]["text"].strip()
                print(f"Final: {text}")
                
            # Print final results
            if content["type"] == "post_final_transcript":
                print("\n################ End of session ################\n")
                print(json.dumps(content, indent=2, ensure_ascii=False))
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    def on_message(ws, message):
        try:
            data = json.loads(message)
            msg_type = data.get('type')

            if msg_type == "Begin":
                session_id = data.get('id')
                expires_at = data.get('expires_at')
                print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
            elif msg_type == "Turn":
                transcript = data.get('transcript', '')
                end_of_turn = data.get('end_of_turn', False)

                # Print final end-of-turn transcript
                if end_of_turn:
                    print('\r' + ' ' * 80 + '\r', end='')
                    print(transcript)
                else:
                    print(f"\r{transcript}", end='')
            elif msg_type == "Termination":
                audio_duration = data.get('audio_duration_seconds', 0)
                session_duration = data.get('session_duration_seconds', 0)
                print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
        except json.JSONDecodeError as e:
            print(f"Error decoding message: {e}")
        except Exception as e:
            print(f"Error handling message: {e}")
    ```
  </Tab>
</Tabs>

Helpful information about AssemblyAI’s message payloads:

* **Clear Message Types** – Instead of checking `is_final`, you’ll receive explicit `"Begin"`, `"Turn"`, and `"Termination"` events, making your logic simpler and more readable.

* **Session Metadata Up-Front** – The first `"Begin"` message delivers a `session_id` and expiry timestamp so you can immediately log or surface these for tracing or billing.

* **End-of-Turn Detection** – Each `"Turn"` object includes an `end_of_turn` boolean. When `end_of_turn` is `true`, the transcript is a final, formatted result. When `false`, it is a partial transcript. Universal-3 Pro always returns formatted transcripts with smart punctuation & casing built in.

## Close the WebSocket

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    async def stop_recording(websocket):
        print(">>>>> Ending the recording…")
        await websocket.send(json.dumps({"type": "stop_recording"}))
        await asyncio.sleep(0)
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python theme={null}
    def on_close(ws, close_status_code, close_msg):
        """Called when the WebSocket connection is closed."""
        print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
        # Ensure audio resources are released
        global stream, audio
        stop_event.set()  # Signal audio thread just in case it's still running

        if stream:
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            stream = None
        if audio:
            audio.terminate()
            audio = None
        # Try to join the audio thread to ensure clean exit
        if audio_thread and audio_thread.is_alive():
            audio_thread.join(timeout=1.0)
    ```
  </Tab>
</Tabs>

Helpful information about AssemblyAI’s WebSocket Closure:

* **Connection Diagnostics** - If the socket closes unexpectedly, AssemblyAI supplies both a status code and a reason message (close\_status\_code, close\_msg), so you know immediately whether the server timed out, refused authentication, or encountered a different error.

## Session Shutdown

<Tabs groupId="language">
  <Tab language="gladia" title="Gladia">
    ```python theme={null}
    async with connect(session_data["url"]) as websocket:
        ...
        # Handle Ctrl+C to stop recording
        loop = asyncio.get_running_loop()
        loop.add_signal_handler(
            signal.SIGINT,
            lambda: loop.create_task(stop_recording(websocket)),
        )
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    try:
        # Keep main thread alive until interrupted
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()  # Signal audio thread to stop

        # Send termination message to the server
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                # Give a moment for messages to process before forceful close
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        # Close the WebSocket connection (will trigger on_close)
        if ws_app:
            ws_app.close()

        # Wait for WebSocket thread to finish
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        # Final cleanup (already handled in on_close, but good as a fallback)
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")
    ```
  </Tab>
</Tabs>

Helpful information to know about AssemblyAI’s shutdown:

* **JSON Payload Difference** - When closing the stream with AssemblyAI, your JSON payload will be `{ "type": "Terminate" }` instead of `{ "type": "stop_recording" }`.

* **No Metadata Race Condition** - AssemblyAI provides session info at "Begin" and doesn’t append extra data at shutdown, making the exit faster and less error-prone.

## Resources

For additional information about using AssemblyAI's Streaming Speech-To-Text API you can also refer to:

* [Universal-3 Pro Streaming](/streaming/getting-started/transcribe-streaming-audio)
* [Streaming Guide](/streaming/getting-started/transcribe-streaming-audio)
