> ## Documentation Index
> Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Terminate Streaming Session After Inactivity

An often-overlooked aspect of implementing AssemblyAI's Streaming Speech-to-Text (STT) service is efficiently terminating transcription sessions. In this cookbook, you will learn how to terminate a Streaming session after any fixed duration of silence.

For the full code, refer to this [GitHub gist.](https://gist.github.com/m-ods/133f7c7e4c08abf97ae53dc7577cadf7)

## Quickstart

```python expandable theme={null}
import pyaudio
import websocket
import json
import os
import threading
import time
from urllib.parse import urlencode
from datetime import datetime

# --- Configuration ---
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Audio Configuration
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()

# Silence tracking
last_transcript_received = datetime.now()
terminated = False

# --- WebSocket Event Handlers ---

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_audio():
        global stream
        print("Starting audio streaming...")
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

def on_message(ws, message):
    global last_transcript_received, terminated

    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if terminated and msg_type != "Termination":
            return

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')

            # Update timestamp if meaningful speech received
            if transcript.strip():
                last_transcript_received = datetime.now()

            # Check for silence timeout
            silence_duration = (datetime.now() - last_transcript_received).total_seconds()
            if silence_duration > 5:
                print("No transcription received in 5 seconds. Terminating session...")
                try:
                    ws.send(json.dumps({"type": "Terminate"}))
                except Exception:
                    pass
                terminated = True
                stop_event.set()
                return

        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

# --- Main Execution ---
def run():
    global audio, stream, ws_app

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
```

## Get Started

Before we begin, make sure you have an AssemblyAI account and an API key. You can [sign up for an AssemblyAI account](https://www.assemblyai.com/app) and get your API key from your dashboard.

## Step-by-step instructions

First, install the required packages.

```bash theme={null}
pip install websocket-client pyaudio
```

Import packages and set your API key.

```python theme={null}
import pyaudio
import websocket
import json
import os
import threading
import time
from urllib.parse import urlencode
from datetime import datetime

ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
```

### Audio configuration and global variables

```python theme={null}
CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Audio Configuration
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
```

## Implementing Speech Activity Checks

Our Streaming API emits a Turn Event each time speech is processed. You can use this behavior to detect inactivity and automatically terminate the session.

We track the timestamp of the most recent non-empty transcript. On every Turn Event, we:

* Update the timestamp if meaningful speech is received

* Check how many seconds have passed since the last valid transcript

* If that exceeds your timeout (e.g. 5 seconds), terminate the session

### Key Variables

```python theme={null}
last_transcript_received = datetime.now()
terminated = False
```

These are updated on every Turn event.

### WebSocket event handlers

#### Open WebSocket

```python expandable theme={null}
def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_audio():
        global stream
        print("Starting audio streaming...")
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()
```

#### Handle WebSocket messages with silence detection

```python expandable theme={null}
def on_message(ws, message):
    global last_transcript_received, terminated

    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if terminated and msg_type != "Termination":
            return

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')

            # Update timestamp if meaningful speech received
            if transcript.strip():
                last_transcript_received = datetime.now()

            # Check for silence timeout
            silence_duration = (datetime.now() - last_transcript_received).total_seconds()
            if silence_duration > 5:
                print("No transcription received in 5 seconds. Terminating session...")
                try:
                    ws.send(json.dumps({"type": "Terminate"}))
                except Exception:
                    pass
                terminated = True
                stop_event.set()
                return

        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")
```

This pattern ensures sessions are cleanly terminated after inactivity.

#### WebSocket error and close handlers

```python expandable theme={null}
def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)
```

### Begin real-time STT transcription

```python expandable theme={null}
def run():
    global audio, stream, ws_app

    audio = pyaudio.PyAudio()

    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
```

## What You'll Observe

* Live transcription continues as long as there's speech

* After 5 seconds of silence, the session ends automatically

You can change the timeout value to suit your needs by modifying the `silence_duration > 5` check.
