> ## Documentation Index
> Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Migration guide: Speechmatics to AssemblyAI

This guide walks through the process of migrating from **Speechmatics** to **AssemblyAI** for streaming Speech-to-text.

### Get started

Before we begin, make sure you have an AssemblyAI account and an API key. You can [sign up](https://assemblyai.com/dashboard/signup) for a free account and get your API key from your dashboard.

## Side-by-side code comparison

Below is a side-by-side comparison of a basic Python code snippet to transcribe streaming audio by Speechmatics and AssemblyAI.

<Tabs groupId="language">
  <Tab language="speechmatics" title="Speechmatics">
    ```python expandable theme={null}
    import pyaudio
    import websocket
    import json
    import threading
    import time

    # --- Configuration ---

    YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key

    CONNECTION_PARAMS = {
        "language": "en",
        "enable_partials": True,
        "max_delay": 2.0
    }
    API_ENDPOINT = "wss://eu2.rt.speechmatics.com/v2/en"

    # Audio Configuration

    FRAMES_PER_BUFFER = 1024 # Chunk size
    SAMPLE_RATE = None # Will be set based on device capabilities
    CHANNELS = 1
    FORMAT = pyaudio.paFloat32 # Speechmatics uses float32 format

    # Global variables for audio stream and websocket

    audio = None
    stream = None
    ws_app = None
    audio_thread = None
    stop_event = threading.Event() # To signal the audio thread to stop
    audio_seq_no = 0 # Track number of audio chunks sent

    # --- WebSocket Event Handlers ---

    def on_open(ws):
        """Called when the WebSocket connection is established."""
        print("WebSocket connection opened.")
        print(f"Connected to: {API_ENDPOINT}")

        # Send StartRecognition message
        start_message = {
            "message": "StartRecognition",
            "audio_format": {
                "type": "raw",
                "encoding": "pcm_f32le",
                "sample_rate": SAMPLE_RATE
            },
            "transcription_config": {
                "language": CONNECTION_PARAMS["language"],
                "enable_partials": CONNECTION_PARAMS["enable_partials"],
                "max_delay": CONNECTION_PARAMS["max_delay"]
            }
        }
        ws.send(json.dumps(start_message))

    def on_message(ws, message):
        global audio_seq_no

        try:
            data = json.loads(message)
            msg_type = data.get('message')

            if msg_type == "RecognitionStarted":
                session_id = data.get('id')
                print(f"\nSession began: ID={session_id}")

                # Start sending audio data in a separate thread
                def stream_audio():
                    global audio_seq_no, stream
                    print("Starting audio streaming...")
                    while not stop_event.is_set():
                        try:
                            audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                            # Send audio data as binary message
                            ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                            audio_seq_no += 1
                        except Exception as e:
                            print(f"Error streaming audio: {e}")
                            # If stream read fails, likely means it's closed, stop the loop
                            break
                    print("Audio streaming stopped.")

                global audio_thread
                audio_thread = threading.Thread(target=stream_audio)
                audio_thread.daemon = (
                    True  # Allow main thread to exit even if this thread is running
                )
                audio_thread.start()

            elif msg_type == "AddPartialTranscript":
                transcript = data.get('metadata', {}).get('transcript', '')
                if transcript:
                    print(f"\r{transcript}", end='')

            elif msg_type == "AddTranscript":
                transcript = data.get('metadata', {}).get('transcript', '')
                if transcript:
                    # Clear previous line for final messages
                    print('\r' + ' ' * 80 + '\r', end='')
                    print(transcript)

            elif msg_type == "EndOfTranscript":
                print("\nSession Terminated: Transcription complete")

            elif msg_type == "Error":
                error_type = data.get('type')
                reason = data.get('reason')
                print(f"\nWebSocket Error: {error_type} - {reason}")
                stop_event.set()

        except json.JSONDecodeError as e:
            print(f"Error decoding message: {e}")
        except Exception as e:
            print(f"Error handling message: {e}")

    def on_error(ws, error):
        """Called when a WebSocket error occurs."""
        print(f"\nWebSocket Error: {error}")  # Attempt to signal stop on error
        stop_event.set()

    def on_close(ws, close_status_code, close_msg):
        """Called when the WebSocket connection is closed."""
        print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
        # Ensure audio resources are released
        global stream, audio
        stop_event.set()  # Signal audio thread just in case it's still running

        if stream:
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            stream = None
        if audio:
            audio.terminate()
            audio = None
        # Try to join the audio thread to ensure clean exit
        if audio_thread and audio_thread.is_alive():
            audio_thread.join(timeout=1.0)

    # --- Main Execution ---

    def run():
        global audio, stream, ws_app, SAMPLE_RATE

        # Initialize PyAudio
        audio = pyaudio.PyAudio()

        # Get default input device (can alter to specify specific device)
        default_device = audio.get_default_input_device_info()
        device_index = default_device['index']
        SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])

        print(f"Using microphone: {default_device['name']}")

        # Open microphone stream
        try:
            stream = audio.open(
                input=True,
                frames_per_buffer=FRAMES_PER_BUFFER,
                channels=CHANNELS,
                format=FORMAT,
                rate=SAMPLE_RATE,
                input_device_index=device_index
            )
            print("Microphone stream opened successfully.")
            print("Speak into your microphone. Press Ctrl+C to stop.")
        except Exception as e:
            print(f"Error opening microphone stream: {e}")
            if audio:
                audio.terminate()
            return  # Exit if microphone cannot be opened

        # Create WebSocketApp
        ws_app = websocket.WebSocketApp(
            API_ENDPOINT,
            header={"Authorization": f"Bearer {YOUR_API_KEY}"},  # Speechmatics uses Bearer token
            on_open=on_open,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close,
        )

        # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
        ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
        ws_thread.daemon = True
        ws_thread.start()

        try:
            # Keep main thread alive until interrupted
            while ws_thread.is_alive():
                time.sleep(0.1)
        except KeyboardInterrupt:
            print("\nCtrl+C received. Stopping...")
            stop_event.set()  # Signal audio thread to stop

            # Send EndOfStream message to the server
            if ws_app and ws_app.sock and ws_app.sock.connected:
                try:
                    end_message = {
                        "message": "EndOfStream",
                        "last_seq_no": audio_seq_no
                    }
                    print(f"Sending termination message: {json.dumps(end_message)}")
                    ws_app.send(json.dumps(end_message))
                    # Give a moment for messages to process before forceful close
                    time.sleep(1)
                except Exception as e:
                    print(f"Error sending termination message: {e}")

            # Close the WebSocket connection (will trigger on_close)
            if ws_app:
                ws_app.close()

            # Wait for WebSocket thread to finish
            ws_thread.join(timeout=2.0)

        except Exception as e:
            print(f"\nAn unexpected error occurred: {e}")
            stop_event.set()
            if ws_app:
                ws_app.close()
            ws_thread.join(timeout=2.0)

        finally:
            # Final cleanup (already handled in on_close, but good as a fallback)
            if stream and stream.is_active():
                stream.stop_stream()
            if stream:
                stream.close()
            if audio:
                audio.terminate()
            print("Cleanup complete. Exiting.")

    if __name__ == "__main__":
        run()

    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python expandable theme={null}
    import pyaudio
    import websocket
    import json
    import threading
    import time
    from urllib.parse import urlencode
    from datetime import datetime

    # --- Configuration ---
    YOUR_API_KEY = "YOUR-API-KEY"  # Replace with your actual API key

    CONNECTION_PARAMS = {
        "sample_rate": 16000,
        "speech_model": "u3-rt-pro",
    }
    API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
    API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

    # Audio Configuration
    FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
    SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
    CHANNELS = 1
    FORMAT = pyaudio.paInt16

    # Global variables for audio stream and websocket
    audio = None
    stream = None
    ws_app = None
    audio_thread = None
    stop_event = threading.Event()  # To signal the audio thread to stop

    # --- WebSocket Event Handlers ---
    def on_open(ws):
        """Called when the WebSocket connection is established."""
        print("WebSocket connection opened.")
        print(f"Connected to: {API_ENDPOINT}")

        # Start sending audio data in a separate thread
        def stream_audio():
            global stream
            print("Starting audio streaming...")
            while not stop_event.is_set():
                try:
                    audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                    # Send audio data as binary message
                    ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                except Exception as e:
                    print(f"Error streaming audio: {e}")
                    # If stream read fails, likely means it's closed, stop the loop
                    break
            print("Audio streaming stopped.")

        global audio_thread
        audio_thread = threading.Thread(target=stream_audio)
        audio_thread.daemon = (
            True  # Allow main thread to exit even if this thread is running
        )
        audio_thread.start()

    def on_message(ws, message):
        try:
            data = json.loads(message)
            msg_type = data.get('type')

            if msg_type == "Begin":
                session_id = data.get('id')
                expires_at = data.get('expires_at')
                print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
            elif msg_type == "Turn":
                transcript = data.get('transcript', '')
                end_of_turn = data.get('end_of_turn', False)

                # Print final end-of-turn transcript
                if end_of_turn:
                    print('\r' + ' ' * 80 + '\r', end='')
                    print(transcript)
                else:
                    print(f"\r{transcript}", end='')
            elif msg_type == "Termination":
                audio_duration = data.get('audio_duration_seconds', 0)
                session_duration = data.get('session_duration_seconds', 0)
                print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
        except json.JSONDecodeError as e:
            print(f"Error decoding message: {e}")
        except Exception as e:
            print(f"Error handling message: {e}")

    def on_error(ws, error):
        """Called when a WebSocket error occurs."""
        print(f"\nWebSocket Error: {error}")
        # Attempt to signal stop on error
        stop_event.set()

    def on_close(ws, close_status_code, close_msg):
        """Called when the WebSocket connection is closed."""
        print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
        # Ensure audio resources are released
        global stream, audio
        stop_event.set()  # Signal audio thread just in case it's still running

        if stream:
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            stream = None
        if audio:
            audio.terminate()
            audio = None
        # Try to join the audio thread to ensure clean exit
        if audio_thread and audio_thread.is_alive():
            audio_thread.join(timeout=1.0)

    # --- Main Execution ---
    def run():
        global audio, stream, ws_app

        # Initialize PyAudio
        audio = pyaudio.PyAudio()

        # Open microphone stream
        try:
            stream = audio.open(
                input=True,
                frames_per_buffer=FRAMES_PER_BUFFER,
                channels=CHANNELS,
                format=FORMAT,
                rate=SAMPLE_RATE,
            )
            print("Microphone stream opened successfully.")
            print("Speak into your microphone. Press Ctrl+C to stop.")
        except Exception as e:
            print(f"Error opening microphone stream: {e}")
            if audio:
                audio.terminate()
            return  # Exit if microphone cannot be opened

        # Create WebSocketApp
        ws_app = websocket.WebSocketApp(
            API_ENDPOINT,
            header={"Authorization": YOUR_API_KEY},
            on_open=on_open,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close,
        )

        # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
        ws_thread = threading.Thread(target=ws_app.run_forever)
        ws_thread.daemon = True
        ws_thread.start()

        try:
            # Keep main thread alive until interrupted
            while ws_thread.is_alive():
                time.sleep(0.1)
        except KeyboardInterrupt:
            print("\nCtrl+C received. Stopping...")
            stop_event.set()  # Signal audio thread to stop

            # Send termination message to the server
            if ws_app and ws_app.sock and ws_app.sock.connected:
                try:
                    terminate_message = {"type": "Terminate"}
                    print(f"Sending termination message: {json.dumps(terminate_message)}")
                    ws_app.send(json.dumps(terminate_message))
                    # Give a moment for messages to process before forceful close
                    time.sleep(5)
                except Exception as e:
                    print(f"Error sending termination message: {e}")

            # Close the WebSocket connection (will trigger on_close)
            if ws_app:
                ws_app.close()

            # Wait for WebSocket thread to finish
            ws_thread.join(timeout=2.0)

        except Exception as e:
            print(f"\nAn unexpected error occurred: {e}")
            stop_event.set()
            if ws_app:
                ws_app.close()
            ws_thread.join(timeout=2.0)

        finally:
            # Final cleanup (already handled in on_close, but good as a fallback)
            if stream and stream.is_active():
                stream.stop_stream()
            if stream:
                stream.close()
            if audio:
                audio.terminate()
            print("Cleanup complete. Exiting.")

    if __name__ == "__main__":
        run()
    ```
  </Tab>
</Tabs>

## Step 1: Install dependencies

<Steps>
  <Step>
    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        Install the required Python packages.

        ```bash theme={null}
        pip install websocket-client pyaudio
        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        Install the required Python packages:

        ```bash theme={null}
        pip install websocket-client pyaudio
        ```
      </Tab>
    </Tabs>
  </Step>
</Steps>

## Step 2: Configure the API key

In this step, you'll configure your API key to authenticate your requests.

<Steps>
  <Step>
    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        Navigate to [API Keys](https://portal.speechmatics.com/settings/api-keys) in your account settings and copy your API key.
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        Navigate to [API Keys](https://www.assemblyai.com/dashboard/api-keys) in your account dashboard and copy your API key.
      </Tab>
    </Tabs>
  </Step>

  <Step>
    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        Store your API key in a variable. Replace `<YOUR_API_KEY>` with your copied API key.

        ```python theme={null}
        import pyaudio
        import websocket
        import json
        import threading
        import time

        YOUR_API_KEY = "YOUR-API-KEY"
        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        Store your API key in a variable. Replace `<YOUR_API_KEY>` with your copied API key.

        ```python theme={null}
        import pyaudio
        import websocket
        import json
        import threading
        import time
        from urllib.parse import urlencode
        from datetime import datetime

        YOUR_API_KEY = "YOUR-API-KEY"
        ```
      </Tab>
    </Tabs>
  </Step>

  <Accordion title="Authenticate With A Temporary Token">
    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        ```python theme={null}
        import requests

        def generate_temp_token(api_key, ttl=60):
            """Generate a temporary authentication token that expires after the specified time."""
            url = "https://mp.speechmatics.com/v1/api_keys?type=rt"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {api_key}"
            }
            payload = {
                "ttl": ttl
            }

            response = requests.post(url, json=payload, headers=headers)
            data = response.json()
            return data.get("key_value")
        ```

        <Note>
          **Token usage**

          Instead of authorizing your request with `YOUR_API_KEY` (via request header), use the temporary token generated by this function when establishing the WebSocket connection.

          ```python theme={null}
            API_ENDPOINT= f"wss://eu2.rt.speechmatics.com/v2?jwt={generate_temp_token(api_key)}"
            ws_app = websocket.WebSocketApp(
              API_ENDPOINT,
              on_open=on_open,
              on_message=on_message,
              on_error=on_error,
              on_close=on_close,
            )
          ```
        </Note>
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        ```python theme={null}
        import requests
        from urllib.parse import urlencode

        def generate_temp_token(api_key, expires_in_seconds=60):
          """Generate a temporary authentication token that expires after the specified time."""
          url = "https://streaming.assemblyai.com/v3/token"

          response = requests.get(
              f"{url}?{urlencode({'expires_in_seconds': expires_in_seconds})}",
              headers={"Authorization": api_key}
          )
          data = response.json()
          return data.get("token")

        ```

        <Note>
          **Token usage**

          Instead of authorizing your request with `YOUR_API_KEY` (via request header), use the temporary token generated by this function when establishing the WebSocket connection.

          ```python theme={null}
            API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}&token={generate_temp_token(api_key)}"
            ws_app = websocket.WebSocketApp(
              API_ENDPOINT,
              on_open=on_open,
              on_message=on_message,
              on_error=on_error,
              on_close=on_close,
            )
          ```
        </Note>
      </Tab>
    </Tabs>
  </Accordion>
</Steps>

## Step 3: Set up audio configuration

<Steps>
  <Step>
    Configure the audio settings for your microphone stream.

    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        ```python expandable theme={null}
        import pyaudio

        # Audio Configuration
        FRAMES_PER_BUFFER = 1024  # Chunk size
        SAMPLE_RATE = None  # Will be set based on device capabilities
        CHANNELS = 1
        FORMAT = pyaudio.paFloat32  # Speechmatics uses float32 format

        # Global variables for audio stream and websocket
        audio = None
        stream = None
        ws_app = None
        audio_thread = None
        stop_event = threading.Event()  # To signal the audio thread to stop
        audio_seq_no = 0  # Track number of audio chunks sent

        def run():
          global audio, stream, ws_app, SAMPLE_RATE

          # Initialize PyAudio
          audio = pyaudio.PyAudio()

          # Get default input device (can alter to specify specific device)
          default_device = audio.get_default_input_device_info()
          device_index = default_device['index']
          SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])

          print(f"Using microphone: {default_device['name']}")

          # Open microphone stream
          try:
              stream = audio.open(
                  input=True,
                  frames_per_buffer=FRAMES_PER_BUFFER,
                  channels=CHANNELS,
                  format=FORMAT,
                  rate=SAMPLE_RATE,
                  input_device_index=device_index
              )
              print("Microphone stream opened successfully.")
              print("Speak into your microphone. Press Ctrl+C to stop.")
          except Exception as e:
              print(f"Error opening microphone stream: {e}")
              if audio:
                  audio.terminate()
              return  # Exit if microphone cannot be opened
        ```

        <Tip>
          **Sample rate**

          **Speechmatics** recommends using a `16 kHz` sample rate for speech audio. Anything higher will be downsampled server-side.
        </Tip>
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        ```python expandable theme={null}
        import pyaudio

        CONNECTION_PARAMS = {
            "sample_rate": 16000,
            "speech_model": "u3-rt-pro",
        }

        # Audio Configuration

        FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s \* 16000Hz)
        SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
        CHANNELS = 1
        FORMAT = pyaudio.paInt16

        # Global variables for audio stream and websocket

        audio = None
        stream = None
        ws_app = None
        audio_thread = None
        stop_event = threading.Event() # To signal the audio thread to stop

        def run():
            global audio, stream, ws_app

            # Initialize PyAudio
            audio = pyaudio.PyAudio()

            # Open microphone stream
            try:
                stream = audio.open(
                    input=True,
                    frames_per_buffer=FRAMES_PER_BUFFER,
                    channels=CHANNELS,
                    format=FORMAT,
                    rate=SAMPLE_RATE,
                )
                print("Microphone stream opened successfully.")
                print("Speak into your microphone. Press Ctrl+C to stop.")
            except Exception as e:
                print(f"Error opening microphone stream: {e}")
                if audio:
                    audio.terminate()
                return  # Exit if microphone cannot be opened

        ```

        <Tip>
          **Sample rate**

          Using a sample rate of `16 kHz` and encoding of `pcm_s16le` is recommended, as our STT model operates at a `16 kHz` sample rate.
          If the incoming audio uses a different rate, we perform additional sampling rate conversion under the hood, which might marginally increase latency.
        </Tip>
      </Tab>
    </Tabs>

    <Note>
      **Audio data format**

      If you want to stream data from elsewhere, make sure that your audio data is in the following format:

      * Single-channel
      * PCM16 (default) or Mu-law encoding
      * A sample rate that matches the value of the `sample_rate` parameter (16 kHz is recommended)
      * 50 milliseconds of audio per message (larger chunk sizes are workable, but may result in latency fluctuations)
    </Note>
  </Step>
</Steps>

## Step 4: Create event handlers

In this step, you’ll set up callback functions that handle the different events.

<Steps>
  <Step>
    Create functions to handle the events from the real-time service.

    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        ```python expandable theme={null}
        import json

        def on_open(ws):
            """Called when the WebSocket connection is established."""
            print("WebSocket connection opened.")
            print(f"Connected to: {API_ENDPOINT}")

            # Send StartRecognition message
            start_message = {
                "message": "StartRecognition",
                "audio_format": {
                    "type": "raw",
                    "encoding": "pcm_f32le",
                    "sample_rate": SAMPLE_RATE
                },
                "transcription_config": {
                    "language": CONNECTION_PARAMS["language"],
                    "enable_partials": CONNECTION_PARAMS["enable_partials"],
                    "max_delay": CONNECTION_PARAMS["max_delay"]
                }
            }
            ws.send(json.dumps(start_message))

        def on_error(ws, error):
            """Called when a WebSocket error occurs."""
            print(f"\nWebSocket Error: {error}")
            # Attempt to signal stop on error
            stop_event.set()

        def on_close(ws, close_status_code, close_msg):
            """Called when the WebSocket connection is closed."""
            print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
            # Ensure audio resources are released
            global stream, audio
            stop_event.set()  # Signal audio thread just in case it's still running

            if stream:
                if stream.is_active():
                    stream.stop_stream()
                stream.close()
                stream = None
            if audio:
                audio.terminate()
                audio = None
            # Try to join the audio thread to ensure clean exit
            if audio_thread and audio_thread.is_alive():
                audio_thread.join(timeout=1.0)
        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        ```python expandable theme={null}
        import threading

        def on_open(ws):
            """Called when the WebSocket connection is established."""
            print("WebSocket connection opened.")
            print(f"Connected to: {API_ENDPOINT}")

            # Start sending audio data in a separate thread
            def stream_audio():
                global stream
                print("Starting audio streaming...")
                while not stop_event.is_set():
                    try:
                        audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                        # Send audio data as binary message
                        ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                    except Exception as e:
                        print(f"Error streaming audio: {e}")
                        # If stream read fails, likely means it's closed, stop the loop
                        break
                print("Audio streaming stopped.")

            global audio_thread
            audio_thread = threading.Thread(target=stream_audio)
            audio_thread.daemon = (
                True  # Allow main thread to exit even if this thread is running
            )
            audio_thread.start()

        def on_error(ws, error):
            """Called when a WebSocket error occurs."""
            print(f"\nWebSocket Error: {error}")  # Attempt to signal stop on error
            stop_event.set()

        def on_close(ws, close_status_code, close_msg):
            """Called when the WebSocket connection is closed."""
            print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
            # Ensure audio resources are released
            global stream, audio
            stop_event.set()  # Signal audio thread just in case it's still running

            if stream:
                if stream.is_active():
                    stream.stop_stream()
                stream.close()
                stream = None
            if audio:
                audio.terminate()
                audio = None
            # Try to join the audio thread to ensure clean exit
            if audio_thread and audio_thread.is_alive():
                audio_thread.join(timeout=1.0)

        ```
      </Tab>
    </Tabs>

    <Note>
      **Connection configuration**

      **Speechmatics** requires a **handshake** where the connection configuration is specified before audio is streamed. **AssemblyAI** allows you to configure the connection via query parameters in the URL and start streaming audio immediately.

      The **Speechmatics** handshake begins when `on_open` sends a `StartRecognition` message to configure the session. Audio streaming only starts after the `RecognitionStarted` message type is parsed and confirmed in the `on_message` callback.
    </Note>
  </Step>

  <Step>
    Create another function to handle transcripts.

    **Speechmatics** has separate partial (`AddPartialTranscript`) and final (`AddTranscript`) transcripts. The terminate session message is `EndOfTranscript`.

    **AssemblyAI** instead uses a `Turn` object with an `end_of_turn` boolean flag to indicate finality. The terminate session message is `Termination`.
    For more on the Turn object, see **Streaming** [Core concepts](/streaming/getting-started/transcribe-streaming-audio) section.

    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        ```python expandable theme={null}
        def on_message(ws, message):
            global audio_seq_no

            try:
                data = json.loads(message)
                msg_type = data.get('message')

                if msg_type == "RecognitionStarted":
                    session_id = data.get('id')
                    print(f"\nSession began: ID={session_id}")

                    # Start sending audio data in a separate thread
                    def stream_audio():
                        global audio_seq_no, stream
                        print("Starting audio streaming...")
                        while not stop_event.is_set():
                            try:
                                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                                # Send audio data as binary message
                                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                                audio_seq_no += 1
                            except Exception as e:
                                print(f"Error streaming audio: {e}")
                                # If stream read fails, likely means it's closed, stop the loop
                                break
                        print("Audio streaming stopped.")

                    global audio_thread
                    audio_thread = threading.Thread(target=stream_audio)
                    audio_thread.daemon = (
                        True  # Allow main thread to exit even if this thread is running
                    )
                    audio_thread.start()

                elif msg_type == "AddPartialTranscript":
                    transcript = data.get('metadata', {}).get('transcript', '')
                    if transcript:
                        print(f"\r{transcript}", end='')

                elif msg_type == "AddTranscript":
                    transcript = data.get('metadata', {}).get('transcript', '')
                    if transcript:
                        # Clear previous line for final messages
                        print('\r' + ' ' * 80 + '\r', end='')
                        print(transcript)

                elif msg_type == "EndOfTranscript":
                    print("\nSession Terminated: Transcription complete")

                elif msg_type == "Error":
                    error_type = data.get('type')
                    reason = data.get('reason')
                    print(f"\nWebSocket Error: {error_type} - {reason}")
                    stop_event.set()

            except json.JSONDecodeError as e:
                print(f"Error decoding message: {e}")
            except Exception as e:
                print(f"Error handling message: {e}")
        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        ```python expandable theme={null}
        import json
        from datetime import datetime

        def on_message(ws, message):
            try:
                data = json.loads(message)
                msg_type = data.get('type')

                if msg_type == "Begin":
                    session_id = data.get('id')
                    expires_at = data.get('expires_at')
                    print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
                elif msg_type == "Turn":
                    transcript = data.get('transcript', '')
                    end_of_turn = data.get('end_of_turn', False)

                    # Print final end-of-turn transcript
                    if end_of_turn:
                        print('\r' + ' ' * 80 + '\r', end='')
                        print(transcript)
                    else:
                        print(f"\r{transcript}", end='')
                elif msg_type == "Termination":
                    audio_duration = data.get('audio_duration_seconds', 0)
                    session_duration = data.get('session_duration_seconds', 0)
                    print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
            except json.JSONDecodeError as e:
                print(f"Error decoding message: {e}")
            except Exception as e:
                print(f"Error handling message: {e}")

        ```
      </Tab>
    </Tabs>
  </Step>

  <Note>
    **Transcript message structure**

    Please note the difference in transcript message structure below:

    ```python theme={null}
    # Speechmatics
    {
      "message": "AddPartialTranscript",
      "metadata": {
        "transcript": "hello world"
      },
      # Other transcript data...
    }

    # AssemblyAI
    {
      "type": "Turn",
      "transcript": "hello world",
      "end_of_turn": false,
      # Other transcript data...
    }
    ```
  </Note>
</Steps>

## Step 5: Connect and start transcription

<Steps>
  <Step>
    To stream audio, establish a connection to the API via [WebSockets](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API).

    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        Create a WebSocket connection to the Realtime service.

        ```python theme={null}
        def run():
            global audio, stream, ws_app, SAMPLE_RATE
            # Skipping audio/microphone setup code...

            # Create WebSocketApp
            ws_app = websocket.WebSocketApp(
                API_ENDPOINT,
                header={"Authorization": f"Bearer {YOUR_API_KEY}"},  # Speechmatics uses Bearer token
                on_open=on_open,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close,
            )

            # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
            ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
            ws_thread.daemon = True
            ws_thread.start()

        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        Create and run a WebSocket connection to the Realtime service.

        ```python expandable theme={null}
        import websocket
        import threading

        def run():
            global audio, stream, ws_app
            # Skipping audio/microphone setup code...

            # Create WebSocketApp
            ws_app = websocket.WebSocketApp(
                API_ENDPOINT,
                header={"Authorization": YOUR_API_KEY},
                on_open=on_open,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close,
            )

            # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
            ws_thread = threading.Thread(target=ws_app.run_forever)
            ws_thread.daemon = True
            ws_thread.start()
        ```
      </Tab>
    </Tabs>

    <Note>
      \*\*Authorization \*\*

      Note that while both services use an `Authorization` header to authenticate
      the WebSocket connection, **Speechmatics** uses a `Bearer` prefix, while
      **AssemblyAI** does not.
    </Note>
  </Step>
</Steps>

## Step 6: Close the connection

<Steps>
  <Step>
    Keep the main thread alive until interrupted, handle keyboard interrupts and thrown exceptions, and clean up upon closing of the WebSocket connection.

    <Tabs groupId="language">
      <Tab language="speechmatics" title="Speechmatics">
        ```python expandable theme={null}
        def run():
            global audio, stream, ws_app, SAMPLE_RATE
            # Skipping audio/microphone setup and WebSocket connection code...

            try:
                # Keep main thread alive until interrupted
                while ws_thread.is_alive():
                    time.sleep(0.1)
            except KeyboardInterrupt:
                print("\nCtrl+C received. Stopping...")
                stop_event.set()  # Signal audio thread to stop

                # Send EndOfStream message to the server
                if ws_app and ws_app.sock and ws_app.sock.connected:
                    try:
                        end_message = {
                            "message": "EndOfStream",
                            "last_seq_no": audio_seq_no
                        }
                        print(f"Sending termination message: {json.dumps(end_message)}")
                        ws_app.send(json.dumps(end_message))
                        # Give a moment for messages to process before forceful close
                        time.sleep(1)
                    except Exception as e:
                        print(f"Error sending termination message: {e}")

                # Close the WebSocket connection (will trigger on_close)
                if ws_app:
                    ws_app.close()

                # Wait for WebSocket thread to finish
                ws_thread.join(timeout=2.0)

            except Exception as e:
                print(f"\nAn unexpected error occurred: {e}")
                stop_event.set()
                if ws_app:
                    ws_app.close()
                ws_thread.join(timeout=2.0)

            finally:
                # Final cleanup (already handled in on_close, but good as a fallback)
                if stream and stream.is_active():
                    stream.stop_stream()
                if stream:
                    stream.close()
                if audio:
                    audio.terminate()
                print("Cleanup complete. Exiting.")
        ```
      </Tab>

      <Tab language="aai" title="AssemblyAI">
        ```python expandable theme={null}
        def run():
            global audio, stream, ws_app
            # Skipping audio/microphone setup and WebSocket connection code...

            try:
                # Keep main thread alive until interrupted
                while ws_thread.is_alive():
                    time.sleep(0.1)
            except KeyboardInterrupt:
                print("\nCtrl+C received. Stopping...")
                stop_event.set()  # Signal audio thread to stop

                # Send termination message to the server
                if ws_app and ws_app.sock and ws_app.sock.connected:
                    try:
                        terminate_message = {"type": "Terminate"}
                        print(f"Sending termination message: {json.dumps(terminate_message)}")
                        ws_app.send(json.dumps(terminate_message))
                        # Give a moment for messages to process before forceful close
                        time.sleep(5)
                    except Exception as e:
                        print(f"Error sending termination message: {e}")

                # Close the WebSocket connection (will trigger on_close)
                if ws_app:
                    ws_app.close()

                # Wait for WebSocket thread to finish
                ws_thread.join(timeout=2.0)

            except Exception as e:
                print(f"\nAn unexpected error occurred: {e}")
                stop_event.set()
                if ws_app:
                    ws_app.close()
                ws_thread.join(timeout=2.0)

            finally:
                # Final cleanup (already handled in on_close, but good as a fallback)
                if stream and stream.is_active():
                    stream.stop_stream()
                if stream:
                    stream.close()
                if audio:
                    audio.terminate()
                print("Cleanup complete. Exiting.")

        ```
      </Tab>
    </Tabs>

    The connection will close automatically when you press `Ctrl+C`. In both cases, the `on_close` handler will clean up the audio resources.
  </Step>
</Steps>

## Step 7: Execute the main function

Finally, run the main function to start the main execution.

<Tabs groupId="language">
  <Tab language="speechmatics" title="Speechmatics">
    ```python theme={null}
    if __name__ == "__main__":
        run()
    ```
  </Tab>

  <Tab language="aai" title="AssemblyAI">
    ```python theme={null}
    if __name__ == "__main__":
        run()
    ```
  </Tab>
</Tabs>

## Next steps

To learn more about both Streaming APIs, their key differences, and how to best migrate, see the following resources:

**AssemblyAI**

* [Universal-3 Pro Streaming](/streaming/getting-started/transcribe-streaming-audio)
* [Streaming Speech-to-Text](/streaming/getting-started/transcribe-streaming-audio)
* [Streaming API reference](/api-reference/streaming-api/universal-streaming)

**Speechmatics**

* [Transcribe in Real-Time](https://docs.speechmatics.com/introduction/rt-guide)
* [Using Microphone Input](https://docs.speechmatics.com/tutorials/using-mic)
* [Real-Time API Reference](https://docs.speechmatics.com/rt-api-ref)

## Need some help?

If you get stuck or have any other questions, contact our support team at `support@assemblyai.com` or create a [support ticket](https://www.assemblyai.com/contact/support).
