Build & Learn
August 11, 2025

Transcribe phone calls in real-time in Go with Twilio and AssemblyAI

Learn how to build a real-time phone call transcription system in Go using Twilio MediaStreams and AssemblyAI's Universal-Streaming Speech-to-Text API.

Marcus Olsson
Senior Developer Educator
Marcus Olsson
Senior Developer Educator
Reviewed by
Ryan O'Connor
Senior Developer Educator
Ryan O'Connor
Senior Developer Educator
Table of contents

In this tutorial, you'll build a server application in Go that transcribes an incoming Twilio phone call into text in real-time using direct API calls to AssemblyAI's Universal-Streaming API.

You'll use a Twilio MediaStream to stream the voice data to your local server application. You'll pass the voice data to AssemblyAI's Universal-Streaming Speech-to-Text API to transcribe the call into text, and then print it in your terminal in real-time.

This tutorial walks you through the code, step-by-step, but you can also find the complete source code at the end of this page or on the GitHub repo here.

Before you get started

To complete this tutorial, you'll need:

Step 1: Set up ngrok

Twilio will need to access your server through a publicly available URL. In this tutorial, you'll use ngrok to create a publicly available URL for an application running on your local computer.

If you already have a preferred way to expose your application publicly, you may skip this step.

ngrok config add-authtoken <YOUR_TOKEN>

Open an ngrok tunnel for port `8080`. ngrok will only tunnel connections while the following command is running:

ngrok http 8080

You'll see something similar to the output below, where the URL next to Forwarding is the publicly available URL that forwards to your local 8080 port (https://84c5df474.ngrok-free.dev in the example output).

ngrok (Ctrl+C to quit)

Session Status                online
Account                       inconshreveable (Plan: Free)
Version                       3.0.0
Region                        United States (us)
Latency                       78ms
Web Interface                 http://127.0.0.1:4040
Forwarding                    https://84c5df474.ngrok-free.dev -> http://localhost:8080

Connections                   ttl     opn     rt1     rt5     p50     p90
                              0       0       0.00    0.00    0.00    0.00

Copy the `Forwarding` URL in your terminal output and save it for the next step.

Step 2: Set up Twilio and open a ngrok tunnel

You'll need to register a phone number with Twilio and configure it to call your server application whenever someone calls that number.

  • Sign up for a Twilio account
  • To get a Twilio number, go to your Twilio console and go to `Phone Number > Manage > Buy a number`. There, you will see a list of numbers you can purchase for a small monthly fee - select one and click `Buy`. Note that we only need Voice capabilities for this tutorial.

Navigate to `Phone Numbers > Manage > Active numbers` and select the phone number you bought above. In the `Voice Configuration`, set a `Webhook` for when a call comes in, pasting the ngrok URL you just copied (from the previous step) under URL and setting the HTTP method to HTTP `POST`:

Then scroll down and click `Save Configuration` to save this change.

You have now configured your Twilio number to send a POST request to the ngrok URL when your number is called, and opened a tunnel that forwards this ngrok URL to port `8080` on your local machine.

We now have a working Go application that can successfully receive and respond to a Twilio phone call - it’s time to add in the WebSocket that receives incoming speech.

Step 3: Create a WebSocket server for Twilio media streams

In this step, you'll set up the Go server application to accept the Twilio MediaStream and connect to AssemblyAI's Universal-Streaming Speech-to-Text API.

Create and navigate into a new project directory:

mkdir realtime-transcription-go
cd realtime-transcription-go

Initialize your Go module:

go mod init realtime-transcription-go

Install the required dependencies:

go get github.com/gorilla/websocket
go get github.com/joho/godotenv

Create a new file called main.go with the following content:

package main

import (
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "net/url"
    "os"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/joho/godotenv"
)

var (
    assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
    upgrader         = websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
)

func main() {
    // Load .env file
    err := godotenv.Load()
    if err != nil {
        log.Println("No .env file found")
    }

    assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")

    if assemblyAIAPIKey == "" {
        log.Fatal("ASSEMBLYAI_API_KEY environment variable is
required")
    }

    http.HandleFunc("/", twilio)
    http.HandleFunc("/media", media)
    
    log.Println("Server is running on port 8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal(err)
    }
}

func twilio(w http.ResponseWriter, r *http.Request) {
    // Respond to Twilio request and initiate a Twilio
MediaStream
}

func media(w http.ResponseWriter, r *http.Request) {
    // Serve the incoming WebSocket connection from Twilio
}

Create a `.env` file in your project directory and add your AssemblyAI API key:

ASSEMBLYAI_API_KEY=your_api_key_here

Step 4: Define message structures

AssemblyAI's Universal-Streaming API uses specific message formats. Add these struct definitions to handle the different message types:

type TwilioMessage struct {
    Event string `json:"event"`
    Media struct {
        Payload string `json:"payload"`
    } `json:"media"`
}

// Universal-Streaming message types
type BaseMessage struct {
    Type string `json:"type"`
}

type BeginMessage struct {
    Type      string  `json:"type"`
    ID        string  `json:"id"`
    ExpiresAt float64 `json:"expires_at"`
}

type TurnMessage struct {
    Type                string  `json:"type"`
    TurnOrder           int     `json:"turn_order"`
    TurnIsFormatted     bool    `json:"turn_is_formatted"`
    EndOfTurn           bool    `json:"end_of_turn"`
    Transcript          string  `json:"transcript"`
    EndOfTurnConfidence float64 `json:"end_of_turn_confidence"`
    Words               []Word  `json:"words"`
}

type Word struct {
    Text        string  `json:"text"`
    Start       int     `json:"start"`
    End         int     `json:"end"`
    Confidence  float64 `json:"confidence"`
    WordIsFinal bool    `json:"word_is_final"`
}

type TerminationMessage struct {
    Type                   string `json:"type"`
    AudioDurationSeconds   int    `json:"audio_duration_seconds"`
    SessionDurationSeconds int    `json:"session_duration_seconds"`
}

type TerminateMessage struct {
    Type string `json:"type"`
}

Step 5: Implement TwiML response handler

When someone calls the phone number, Twilio makes an HTTP request to an endpoint where you define how to respond. You'll use TwiML to define the instructions that tell Twilio what to do when you receive an incoming call.

Update your twilio function:

func twilio(w http.ResponseWriter, r *http.Request) {
    log.Printf("Received %s request to / endpoint from %s",
r.Method, r.RemoteAddr)


    if r.Method != "POST" && r.Method != "GET" {
        log.Printf("Method not allowed: %s", r.Method)
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }


    host := r.Host
    wsProtocol := "wss"


    twiML := `<?xml version="1.0" encoding="UTF-8"?>
<Response>
    <Say>Now connecting to the streaming service.</Say>
    <Connect>
        <Stream url='%s://%s/media' />
    </Connect>
</Response>`


    w.Header().Set("Content-Type", "application/xml")
    responseXML := fmt.Sprintf(twiML, wsProtocol, host)
    log.Printf("Sending TwiML response with stream URL:
%s://%s/media", wsProtocol, host)
    fmt.Fprint(w, responseXML)
}

The TwiML instructs Twilio to:

  1. Say a greeting message to the caller
  2. Connect to a WebSocket stream at the /media endpoint

Step 6: Create audio buffering system

Since Twilio sends very small audio chunks (around 20ms each) but AssemblyAI's Universal-Streaming API requires chunks between 50-1000ms, we need to buffer the audio. Add this audio buffer implementation:

const (
    MIN_BUFFER_SIZE = 320 // 40ms minimum (2 Twilio chunks)
    MAX_BUFFER_SIZE = 400 // 50ms maximum
)


type EnhancedAudioBuffer struct {
    buffer      []byte
    mutex       sync.Mutex
    minSize     int
    maxSize     int
    conn        *websocket.Conn
    isActive    bool
    connMutex   sync.RWMutex
    lastFlush   time.Time
    flushTimer  *time.Timer
    totalBytes  int64
    totalChunks int64
}


func NewEnhancedAudioBuffer(conn *websocket.Conn)
*EnhancedAudioBuffer {
    return &EnhancedAudioBuffer{
        buffer:    make([]byte, 0),
        minSize:   MIN_BUFFER_SIZE,
        maxSize:   MAX_BUFFER_SIZE,
        conn:      conn,
        isActive:  true,
        lastFlush: time.Now(),
    }
}


func (eab *EnhancedAudioBuffer) AddAudio(audioData []byte) error {
    eab.mutex.Lock()
    defer eab.mutex.Unlock()


    if !eab.isConnectionActive() {
        return fmt.Errorf("connection is not active")
    }


    eab.buffer = append(eab.buffer, audioData...)
    eab.totalBytes += int64(len(audioData))
    eab.totalChunks++


    // Reset flush timer if it exists
    if eab.flushTimer != nil {
        eab.flushTimer.Stop()
    }


    // Immediate flush if buffer is at max size
    if len(eab.buffer) >= eab.maxSize {
        return eab.flushBuffer()
    }


    // Set timer to flush after 100ms if we have minimum data
    if len(eab.buffer) >= eab.minSize {
        eab.flushTimer = time.AfterFunc(100*time.Millisecond,
func() {
            eab.mutex.Lock()
            defer eab.mutex.Unlock()
            if len(eab.buffer) > 0 {
                eab.flushBuffer()
            }
        })
    }


    return nil
}


func (eab *EnhancedAudioBuffer) flushBuffer() error {
    if len(eab.buffer) == 0 {
        return nil
    }


    if !eab.isConnectionActive() {
        return fmt.Errorf("connection is not active")
    }


    // Send the buffered audio
    bufferedAudio := make([]byte, len(eab.buffer))
    copy(bufferedAudio, eab.buffer)
    bufferSize := len(eab.buffer)


    // Clear the buffer before sending
    eab.buffer = eab.buffer[:0]
    eab.lastFlush = time.Now()


    // Send without holding the mutex
    eab.connMutex.RLock()
    defer eab.connMutex.RUnlock()


    if eab.conn != nil && eab.isActive {
        err := eab.conn.WriteMessage(websocket.BinaryMessage,
bufferedAudio)
        if err != nil {
            log.Printf("Error sending %d bytes to AssemblyAI:
%v", bufferSize, err)
            eab.setInactive()
            return err
        }
    }


    return nil
}


func (eab *EnhancedAudioBuffer) setInactive() {
    eab.connMutex.Lock()
    defer eab.connMutex.Unlock()
    eab.isActive = false
}


func (eab *EnhancedAudioBuffer) isConnectionActive() bool {
    eab.connMutex.RLock()
    defer eab.connMutex.RUnlock()
    return eab.isActive && eab.conn != nil
}

Step 7: Create transcript display system

To handle clean display of partial and final transcripts, add this display manager:

type TranscriptDisplay struct {
    currentPartial     string
    mutex              sync.Mutex
    currentLinePrinted bool
}


func NewTranscriptDisplay() *TranscriptDisplay {
    return &TranscriptDisplay{}
}


func (td *TranscriptDisplay) UpdatePartial(text string) {
    td.mutex.Lock()
    defer td.mutex.Unlock()


    // Only update if text is not empty and different from current
    if strings.TrimSpace(text) == "" || text == td.currentPartial {
        return
    }


    td.currentPartial = text
    td.displayPartial()
}


func (td *TranscriptDisplay) AddFinal(text string) {
    td.mutex.Lock()
    defer td.mutex.Unlock()


    // Clear any partial text first
    if td.currentPartial != "" {
        fmt.Printf("\r%s\r", strings.Repeat(" ", 
len(td.currentPartial)+20))
    }


    // Print the final text on a new line
    if strings.TrimSpace(text) != "" {
        fmt.Printf("%s\n", text)
    }


    // Reset for next utterance
    td.currentPartial = ""
    td.currentLinePrinted = false
}


func (td *TranscriptDisplay) displayPartial() {
    // Clear the current line and redraw
    fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
    if td.currentPartial != "" {
        fmt.Printf("%s", td.currentPartial)
        td.currentLinePrinted = true
    }
}


func (td *TranscriptDisplay) Clear() {
    td.mutex.Lock()
    defer td.mutex.Unlock()


    if td.currentLinePrinted {
        fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
    }
    td.currentPartial = ""
    td.currentLinePrinted = false
}

Step 8: Connect to AssemblyAI Universal-Streaming

Now implement the WebSocket connection to AssemblyAI's Universal-Streaming Speech-to-Text API:

func connectToAssemblyAI() (*websocket.Conn, error) {
    // Build URL with query parameters for v3
    params := url.Values{}
    params.Add("sample_rate", "8000")
    params.Add("encoding", "pcm_mulaw")
    params.Add("format_turns", "true")


    // Optional: Configure turn detection parameters
    params.Add("end_of_turn_confidence_threshold", "0.7")
    params.Add("min_end_of_turn_silence_when_confident", "160")
    params.Add("max_turn_silence", "2400")


    wsURL :=
fmt.Sprintf("wss://streaming.assemblyai.com/v3/ws?%s",
params.Encode())


    // Create WebSocket connection to AssemblyAI v3
    headers := http.Header{}
    headers.Add("Authorization", assemblyAIAPIKey)


    conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to AssemblyAI:
%w", err)
    }


    log.Println("Connected to AssemblyAI Universal-Streaming")
    return conn, nil
}

This function:

  • Builds the WebSocket URL with the required parameters for Universal-Streaming (v3)
  • Sets the sample rate to 8000 Hz (Twilio's format)
  • Specifies PCM μ-law encoding (Twilio's audio format)
  • Enables formatted turns for better transcript output
  • Configures turn detection parameters for natural conversation flow

Step 9: Handle audio forwarding

Add the function to forward audio from Twilio to AssemblyAI:

func forwardAudioToAssemblyAI(audioBuffer *EnhancedAudioBuffer, payload string) error {
    // Decode base64 payload from Twilio
    audioData, err := base64.StdEncoding.DecodeString(payload)
    if err != nil {
        return fmt.Errorf("failed to decode audio data: %w", err)
    }


    // Validate audio chunk size
    if len(audioData) == 0 {
        return nil // Don't send empty chunks
    }


    if len(audioData) > 8000 { // More than 1 second of 8kHz audio
        log.Printf("Warning: audio chunk too large: %d bytes",
len(audioData))
    }


    // Add to buffer instead of sending directly
    return audioBuffer.AddAudio(audioData)
}

Step 10: Handle AssemblyAI responses

Implement the function to process responses from AssemblyAI:

func handleAssemblyAIResponses(ctx context.Context, conn
*websocket.Conn,
    display *TranscriptDisplay, sessionMgr *SessionManager) {


    defer display.Clear()


    for {
        select {
        case <-ctx.Done():
            return
        default:
            _, data, err := conn.ReadMessage()
            if err != nil {
                log.Printf("Error reading from AssemblyAI: %v",
err)
                sessionMgr.SetInactive()
                return
            }


            sessionMgr.UpdateLastMessage()


            // First parse to get message type
            var baseMsg BaseMessage
            if err := json.Unmarshal(data, &baseMsg); err != nil
{
                log.Printf("Error unmarshaling base message: %v", err)
                continue
            }


            switch baseMsg.Type {
            case "Begin":
                var msg BeginMessage
                if err := json.Unmarshal(data, &msg); err != nil
{
                    log.Printf("Error unmarshaling Begin message:
%v", err)
                    continue
                }
                sessionMgr.SetSessionStarted(msg.ID)
                expiresTime := time.Unix(int64(msg.ExpiresAt), 0)
                log.Printf("Session started - ID: %s, Expires:
%s",
                    msg.ID, expiresTime.Format(time.RFC3339))


            case "Turn":
                var msg TurnMessage
                if err := json.Unmarshal(data, &msg); err != nil
{
                    log.Printf("Error unmarshaling Turn message:
%v", err)
                    continue
                }


                // Handle transcript display
                if strings.TrimSpace(msg.Transcript) != "" {
                    if msg.TurnIsFormatted {
                        // Final formatted transcript
                        display.AddFinal(msg.Transcript)
                    } else if msg.EndOfTurn {
                        // Final but unformatted - wait for
formatted version
                        // Don't display anything yet
                    } else {
                        // Partial transcript
                        display.UpdatePartial(msg.Transcript)
                    }
                }


            case "Termination":
                var msg TerminationMessage
                if err := json.Unmarshal(data, &msg); err != nil
{
                    log.Printf("Error unmarshaling Termination
message: %v", err)
                    continue
                }
                log.Printf("\nSession terminated - Audio: %ds,
Session: %ds",
                    msg.AudioDurationSeconds,
msg.SessionDurationSeconds)
                sessionMgr.SetInactive()
                return
            }
        }
    }
}

This function handles three types of messages from AssemblyAI:

  • Begin: Session establishment confirmation
  • Turn: Transcript data (partial and final)
  • Termination: Session end notification

The transcript handling logic ensures a smooth user experience by displaying partial transcripts in real-time and replacing them with final formatted versions.

Step 11: Implement the media WebSocket handler

Now implement the complete media function that ties everything together:

func media(w http.ResponseWriter, r *http.Request) {
    log.Printf("Received request to /media endpoint - Method:
%s", r.Method)


    if r.Method != "GET" {
        log.Printf("Wrong method for WebSocket upgrade: %s",
r.Method)
        return
    }


    // Upgrade HTTP request to WebSocket for Twilio connection
    twilioConn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("unable to upgrade connection to websocket:",
err)
        return
    }
    defer twilioConn.Close()
    log.Println("Successfully upgraded to WebSocket connection")


    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()


    // Create session manager and display
    sessionMgr := NewSessionManager()
    display := NewTranscriptDisplay()
    defer sessionMgr.Close()


    // Connect to AssemblyAI Universal-Streaming
    assemblyAIConn, err := connectToAssemblyAI()
    if err != nil {
        log.Println("unable to connect to AssemblyAI:", err)
        return
    }


    // Create audio buffer
    audioBuffer := NewEnhancedAudioBuffer(assemblyAIConn)


    defer func() {
        log.Println("Cleaning up connections...")


        // Stop audio buffer
        audioBuffer.setInactive()


        // Flush any remaining audio
        if err := audioBuffer.FlushRemaining(); err !=
nil {
            log.Printf("Error flushing remaining audio: %v",
err)
        }


        // Send termination message before closing
        terminateMsg := TerminateMessage{Type: "Terminate"}
        if err := assemblyAIConn.WriteJSON(terminateMsg); err != nil {
            log.Printf("Error sending terminate message: %v", err)
        }


        time.Sleep(200 * time.Millisecond)
        assemblyAIConn.Close()
        log.Println("Cleanup completed")
    }()


    // Start goroutine to handle AssemblyAI responses
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        handleAssemblyAIResponses(ctx, assemblyAIConn, display,
sessionMgr)
    }()


    // Process Twilio messages
    for {
        var message TwilioMessage
        err := twilioConn.ReadJSON(&message)
        if err != nil {
            log.Printf("Unable to read twilio message: %v", err)
            cancel()
            break
        }


        switch message.Event {
        case "connected":
            log.Println("Twilio mediastream connected")
        case "start":
            log.Println("Twilio mediastream started")
        case "media":
            // Only process audio if session is active or not yet
started
            if sessionMgr.IsActive() || sessionMgr.sessionID ==
"" {
                if err := forwardAudioToAssemblyAI(audioBuffer,
message.Media.Payload); err != nil {
                    log.Printf("Unable to forward audio to
AssemblyAI: %v", err)
                }
            }
        case "stop":
            log.Println("Twilio mediastream stopped")
            cancel()
            break
        default:
            log.Printf("Received unknown event type: %s",
message.Event)
        }
    }


    wg.Wait()
    log.Println("WebSocket handler completed")
}

Step 12: Add session management

For robustness, add a session manager to track the AssemblyAI connection state (optional):

type SessionManager struct {
    sessionID         string
    startTime         time.Time
    lastMessageTime   time.Time
    isActive          bool
    mutex             sync.RWMutex
    healthCheckTicker *time.Ticker
    ctx               context.Context
    cancel            context.CancelFunc
}


func NewSessionManager() *SessionManager {
    ctx, cancel := context.WithCancel(context.Background())
    sm := &SessionManager{
        isActive: false,
        ctx:      ctx,
        cancel:   cancel,
    }


    // Start health check ticker
    sm.healthCheckTicker = time.NewTicker(30 * time.Second)
    go sm.healthCheckLoop()


    return sm
}


func (sm *SessionManager) SetSessionStarted(sessionID string) {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()


    sm.sessionID = sessionID
    sm.startTime = time.Now()
    sm.lastMessageTime = time.Now()
    sm.isActive = true


    log.Printf("Session started: %s", sessionID)
}


func (sm *SessionManager) UpdateLastMessage() {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()
    sm.lastMessageTime = time.Now()
}


func (sm *SessionManager) IsActive() bool {
    sm.mutex.RLock()
    defer sm.mutex.RUnlock()
    return sm.isActive
}


func (sm *SessionManager) SetInactive() {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()
    sm.isActive = false
}


func (sm *SessionManager) Close() {
    sm.mutex.Lock()
    defer sm.mutex.Unlock()


    sm.isActive = false
    if sm.healthCheckTicker != nil {
        sm.healthCheckTicker.Stop()
    }
    sm.cancel()


    if sm.sessionID != "" {
        duration := time.Since(sm.startTime)
        log.Printf("Session %s closed after %v", sm.sessionID,
duration)
    }
}


func (sm *SessionManager) healthCheckLoop() {
    for {
        select {
        case <-sm.ctx.Done():
            return
        case <-sm.healthCheckTicker.C:
            sm.checkHealth()
        }
    }
}


func (sm *SessionManager) checkHealth() {
    sm.mutex.RLock()
    timeSinceLastMessage := time.Since(sm.lastMessageTime)
    isActive := sm.isActive
    sm.mutex.RUnlock()


    if isActive && timeSinceLastMessage > 5*time.Minute {
        log.Printf("Warning: No messages received for %v", timeSinceLastMessage)
    }
}

Step 13: Test your application

Now you're ready to test the complete application:

Start the server:

go run main.go

You should see output like:

[date] Server is running on port 8080
[date] AssemblyAI API Key is set: true

Call your Twilio phone number. If prompted by your operating system, allow the application to access the network.

Once instructed by the voice, start speaking to see your call transcribed in your server log:

[date] Received POST request to / endpoint from <INT>
[date] Sending TwiML response with stream URL: wss://<URL>.ngrok-free.app/media
[date] Received request to /media endpoint - Method: GET
[date] Successfully upgraded to WebSocket connection
[date] Connected to AssemblyAI Universal-Streaming
[date] Twilio mediastream connected
[date] Twilio mediastream started
[date] Session started: <SESSION_ID>
[date] Session started - ID: <SESSION_ID>, Expires: [date]
Hello.
[date] Audio stats: 300 chunks, 160.0 avg bytes/chunk, last buffer: 480 bytes
This is a test.                                                                                     
[date] Audio stats: 600 chunks, 160.0 avg bytes/chunk, last buffer: 480 bytes

Most of this logging can be useful for debugging purposes, but can be safely removed to show only the real-time transcript output.

Learn more

In this tutorial, you built a Go application that transcribes incoming phone calls in real-time using Twilio and AssemblyAI's Universal-Streaming Speech-to-Text API.

This implementation demonstrates how to:

  • Handle WebSocket connections for real-time audio streaming
  • Integrate with Twilio's MediaStream for phone call capture
  • Process audio data in mu-law format from Twilio
  • Connect directly to AssemblyAI's WebSocket API for streaming transcription

To keep up with more content like this, make sure you subscribe to our newsletter and join our Discord server.

Complete source code

Also available via the GitHub repo here.

package main

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"net/url"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/joho/godotenv"
)

type TwilioMessage struct {
	Event string `json:"event"`
	Media struct {
		Payload string `json:"payload"`
	} `json:"media"`
}

// Universal-Streaming message types
type BaseMessage struct {
	Type string `json:"type"`
}

type BeginMessage struct {
	Type      string  `json:"type"`
	ID        string  `json:"id"`
	ExpiresAt float64 `json:"expires_at"`
}

type TurnMessage struct {
	Type                string  `json:"type"`
	TurnOrder           int     `json:"turn_order"`
	TurnIsFormatted     bool    `json:"turn_is_formatted"`
	EndOfTurn           bool    `json:"end_of_turn"`
	Transcript          string  `json:"transcript"`
	EndOfTurnConfidence float64 `json:"end_of_turn_confidence"`
	Words               []Word  `json:"words"`
}

type Word struct {
	Text        string  `json:"text"`
	Start       int     `json:"start"`
	End         int     `json:"end"`
	Confidence  float64 `json:"confidence"`
	WordIsFinal bool    `json:"word_is_final"`
}

type TerminationMessage struct {
	Type                   string `json:"type"`
	AudioDurationSeconds   int
`json:"audio_duration_seconds"`
	SessionDurationSeconds int
`json:"session_duration_seconds"`
}

type TerminateMessage struct {
	Type string `json:"type"`
}

const (
	MIN_BUFFER_SIZE = 320 // 40ms minimum (2 Twilio chunks)
	MAX_BUFFER_SIZE = 400 // 50ms maximum
)

// TranscriptDisplay handles clean display of partial and final
transcripts
type TranscriptDisplay struct {
	currentPartial     string
	mutex              sync.Mutex
	currentLinePrinted bool
}

func NewTranscriptDisplay() *TranscriptDisplay {
	return &TranscriptDisplay{}
}

func (td *TranscriptDisplay) UpdatePartial(text string) {
	td.mutex.Lock()
	defer td.mutex.Unlock()

	// Only update if text is not empty and different from
current
	if strings.TrimSpace(text) == "" || text ==
td.currentPartial {
		return
	}

	td.currentPartial = text
	td.displayPartial()
}

func (td *TranscriptDisplay) AddFinal(text string) {
	td.mutex.Lock()
	defer td.mutex.Unlock()

	// Clear any partial text first
	if td.currentPartial != "" {
		fmt.Printf("\r%s\r", strings.Repeat(" ",
len(td.currentPartial)+20))
	}

	// Print the final text on a new line
	if strings.TrimSpace(text) != "" {
		fmt.Printf("%s\n", text)
	}

	// Reset for next utterance
	td.currentPartial = ""
	td.currentLinePrinted = false
}

func (td *TranscriptDisplay) displayPartial() {
	// Clear the current line and redraw
	fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
	if td.currentPartial != "" {
		fmt.Printf("%s", td.currentPartial)
		td.currentLinePrinted = true
	}
}

func (td *TranscriptDisplay) Clear() {
	td.mutex.Lock()
	defer td.mutex.Unlock()

	if td.currentLinePrinted {
		fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
	}
	td.currentPartial = ""
	td.currentLinePrinted = false
}

// SessionManager handles AssemblyAI session state and health
type SessionManager struct {
	sessionID         string
	startTime         time.Time
	lastMessageTime   time.Time
	isActive          bool
	mutex             sync.RWMutex
	healthCheckTicker *time.Ticker
	ctx               context.Context
	cancel            context.CancelFunc
}

func NewSessionManager() *SessionManager {
	ctx, cancel := context.WithCancel(context.Background())
	sm := &SessionManager{
		isActive: false,
		ctx:      ctx,
		cancel:   cancel,
	}

	// Start health check ticker
	sm.healthCheckTicker = time.NewTicker(30 * time.Second)
	go sm.healthCheckLoop()

	return sm
}

func (sm *SessionManager) SetSessionStarted(sessionID string) {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()

	sm.sessionID = sessionID
	sm.startTime = time.Now()
	sm.lastMessageTime = time.Now()
	sm.isActive = true

	log.Printf("Session started: %s", sessionID)
}

func (sm *SessionManager) UpdateLastMessage() {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()
	sm.lastMessageTime = time.Now()
}

func (sm *SessionManager) IsActive() bool {
	sm.mutex.RLock()
	defer sm.mutex.RUnlock()
	return sm.isActive
}

func (sm *SessionManager) SetInactive() {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()
	sm.isActive = false
}

func (sm *SessionManager) healthCheckLoop() {
	for {
		select {
		case <-sm.ctx.Done():
			return
		case <-sm.healthCheckTicker.C:
			sm.checkHealth()
		}
	}
}

func (sm *SessionManager) checkHealth() {
	sm.mutex.RLock()
	timeSinceLastMessage := time.Since(sm.lastMessageTime)
	isActive := sm.isActive
	sm.mutex.RUnlock()

	if isActive && timeSinceLastMessage > 5*time.Minute {
		log.Printf("Warning: No messages received for %v",
timeSinceLastMessage)
	}
}

func (sm *SessionManager) Close() {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()

	sm.isActive = false
	if sm.healthCheckTicker != nil {
		sm.healthCheckTicker.Stop()
	}
	sm.cancel()

	if sm.sessionID != "" {
		duration := time.Since(sm.startTime)
		log.Printf("Session %s closed after %v", sm.sessionID,
duration)
	}
}

// EnhancedAudioBuffer with timing controls and health monitoring
type EnhancedAudioBuffer struct {
	buffer      []byte
	mutex       sync.Mutex
	minSize     int
	maxSize     int
	conn        *websocket.Conn
	isActive    bool
	connMutex   sync.RWMutex
	lastFlush   time.Time
	flushTimer  *time.Timer
	totalBytes  int64
	totalChunks int64
}

func NewEnhancedAudioBuffer(conn *websocket.Conn)
*EnhancedAudioBuffer {
	return &EnhancedAudioBuffer{
		buffer:    make([]byte, 0),
		minSize:   MIN_BUFFER_SIZE,
		maxSize:   MAX_BUFFER_SIZE,
		conn:      conn,
		isActive:  true,
		lastFlush: time.Now(),
	}
}

func (eab *EnhancedAudioBuffer) AddAudio(audioData []byte) error {
	eab.mutex.Lock()
	defer eab.mutex.Unlock()

	if !eab.isConnectionActive() {
		return fmt.Errorf("connection is not active")
	}

	eab.buffer = append(eab.buffer, audioData...)
	eab.totalBytes += int64(len(audioData))
	eab.totalChunks++

	// Reset flush timer if it exists
	if eab.flushTimer != nil {
		eab.flushTimer.Stop()
	}

	// Immediate flush if buffer is at max size
	if len(eab.buffer) >= eab.maxSize {
		return eab.flushBuffer()
	}

	// Set timer to flush after 100ms if we have minimum data
	if len(eab.buffer) >= eab.minSize {
		eab.flushTimer = time.AfterFunc(100*time.Millisecond,
func() {
			eab.mutex.Lock()
			defer eab.mutex.Unlock()
			if len(eab.buffer) > 0 {
				eab.flushBuffer()
			}
		})
	}

	return nil
}

func (eab *EnhancedAudioBuffer) flushBuffer() error {
	if len(eab.buffer) == 0 {
		return nil
	}

	if !eab.isConnectionActive() {
		return fmt.Errorf("connection is not active")
	}

	// Send the buffered audio
	bufferedAudio := make([]byte, len(eab.buffer))
	copy(bufferedAudio, eab.buffer)
	bufferSize := len(eab.buffer)

	// Clear the buffer before sending
	eab.buffer = eab.buffer[:0]
	eab.lastFlush = time.Now()

	// Send without holding the mutex
	eab.connMutex.RLock()
	defer eab.connMutex.RUnlock()

	if eab.conn != nil && eab.isActive {
		err := eab.conn.WriteMessage(websocket.BinaryMessage,
bufferedAudio)
		if err != nil {
			log.Printf("Error sending %d bytes to AssemblyAI:
%v", bufferSize, err)
			eab.setInactive()
			return err
		}

		// Log periodic stats
		if eab.totalChunks%100 == 0 {
			avgChunkSize := float64(eab.totalBytes) /
float64(eab.totalChunks)
			log.Printf("Audio stats: %d chunks, %.1f avg
bytes/chunk, last buffer: %d bytes",
				eab.totalChunks, avgChunkSize, bufferSize)
		}
	}

	return nil
}

func (eab *EnhancedAudioBuffer) FlushRemaining() error {
	eab.mutex.Lock()
	defer eab.mutex.Unlock()

	// Stop any pending timer
	if eab.flushTimer != nil {
		eab.flushTimer.Stop()
	}

	return eab.flushBuffer()
}

func (eab *EnhancedAudioBuffer) setInactive() {
	eab.connMutex.Lock()
	defer eab.connMutex.Unlock()
	eab.isActive = false
}

func (eab *EnhancedAudioBuffer) isConnectionActive() bool {
	eab.connMutex.RLock()
	defer eab.connMutex.RUnlock()
	return eab.isActive && eab.conn != nil
}

func (eab *EnhancedAudioBuffer) GetStats() (int64, int64,
float64) {
	eab.mutex.Lock()
	defer eab.mutex.Unlock()

	avgSize := float64(0)
	if eab.totalChunks > 0 {
		avgSize = float64(eab.totalBytes) /
float64(eab.totalChunks)
	}

	return eab.totalBytes, eab.totalChunks, avgSize
}

var (
	assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
	upgrader         = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
)

func main() {
	// Load .env file
	err := godotenv.Load()
	if err != nil {
		log.Println("No .env file found")
	}

	assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")

	if assemblyAIAPIKey == "" {
		log.Fatal("ASSEMBLYAI_API_KEY environment variable is
required")
	}

	http.HandleFunc("/", twilio)
	http.HandleFunc("/media", media)

	log.Println("Server is running on port 8080")
	log.Printf("AssemblyAI API Key is set: %v", assemblyAIAPIKey
!= "")

	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

func twilio(w http.ResponseWriter, r *http.Request) {
	log.Printf("Received %s request to / endpoint from %s",
r.Method, r.RemoteAddr)

	if r.Method != "POST" && r.Method != "GET" {
		log.Printf("Method not allowed: %s", r.Method)
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	host := r.Host
	wsProtocol := "wss"
	if r.TLS == nil {
		wsProtocol = "wss"
	}

	twiML := `<?xml version="1.0" encoding="UTF-8"?>
<Response>
    <Say>Now connecting to the streaming service.</Say>
    <Connect>
        <Stream url='%s://%s/media' />
    </Connect>
</Response>`

	w.Header().Set("Content-Type", "application/xml")
	responseXML := fmt.Sprintf(twiML, wsProtocol, host)
	log.Printf("Sending TwiML response with stream URL:
%s://%s/media", wsProtocol, host)
	fmt.Fprint(w, responseXML)
}

func media(w http.ResponseWriter, r *http.Request) {
	log.Printf("Received request to /media endpoint - Method:
%s", r.Method)

	if r.Method != "GET" {
		log.Printf("Wrong method for WebSocket upgrade: %s",
r.Method)
		return
	}

	// Upgrade HTTP request to WebSocket for Twilio connection
	twilioConn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("unable to upgrade connection to
websocket:", err)
		return
	}
	defer twilioConn.Close()
	log.Println("Successfully upgraded to WebSocket connection")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Create session manager and display
	sessionMgr := NewSessionManager()
	display := NewTranscriptDisplay()
	defer sessionMgr.Close()

	// Connect to AssemblyAI Universal-Streaming
	assemblyAIConn, err := connectToAssemblyAI()
	if err != nil {
		log.Println("unable to connect to AssemblyAI:", err)
		return
	}

	// Create enhanced audio buffer
	audioBuffer := NewEnhancedAudioBuffer(assemblyAIConn)

	defer func() {
		log.Println("Cleaning up connections...")

		// Stop audio buffer
		audioBuffer.setInactive()

		// Print final stats
		totalBytes, totalChunks, avgSize :=
audioBuffer.GetStats()
		log.Printf("Final audio stats: %d bytes, %d chunks,
%.1f avg bytes/chunk",
			totalBytes, totalChunks, avgSize)

		// Flush any remaining audio
		if err := audioBuffer.FlushRemaining(); err != nil {
			log.Printf("Error flushing remaining audio: %v",
err)
		}

		// Send termination message before closing
		terminateMsg := TerminateMessage{Type: "Terminate"}
		if err := assemblyAIConn.WriteJSON(terminateMsg); err
!= nil {
			log.Printf("Error sending terminate message: %v",
err)
		}

		time.Sleep(200 * time.Millisecond)
		assemblyAIConn.Close()
		log.Println("Cleanup completed")
	}()

	// Start goroutine to handle AssemblyAI responses
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		handleAssemblyAIResponses(ctx, assemblyAIConn,
display, sessionMgr)
	}()

	// Process Twilio messages
	for {
		var message TwilioMessage
		err := twilioConn.ReadJSON(&message)
		if err != nil {
			log.Printf("Unable to read twilio message: %v",
err)
			cancel()
			break
		}

		switch message.Event {
		case "connected":
			log.Println("Twilio mediastream connected")
		case "start":
			log.Println("Twilio mediastream started")
		case "media":
			// Only process audio if session is active or not
yet started
			if sessionMgr.IsActive() || sessionMgr.sessionID
== "" {
				if err := forwardAudioToAssemblyAI(audioBuffer, message.Media.Payload); err
!= nil {
					log.Printf("Unable to forward audio to
AssemblyAI: %v", err)
				}
			}
		case "stop":
			log.Println("Twilio mediastream stopped")
			cancel()
			break
		default:
			log.Printf("Received unknown event type: %s",
message.Event)
		}
	}

	wg.Wait()
	log.Println("WebSocket handler completed")
}

func connectToAssemblyAI() (*websocket.Conn, error) {
	// Build URL with query parameters for v3
	params := url.Values{}
	params.Add("sample_rate", "8000")
	params.Add("encoding", "pcm_mulaw")
	params.Add("format_turns", "true")

	// Optional: Configure turn detection parameters
	params.Add("end_of_turn_confidence_threshold", "0.7")
	params.Add("min_end_of_turn_silence_when_confident", "160")
	params.Add("max_turn_silence", "2400")

	wsURL :=
fmt.Sprintf("wss://streaming.assemblyai.com/v3/ws?%s",
params.Encode())

	// Create WebSocket connection to AssemblyAI v3
	headers := http.Header{}
	headers.Add("Authorization", assemblyAIAPIKey)

	conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to
AssemblyAI: %w", err)
	}

	log.Println("Connected to AssemblyAI Universal-Streaming")
	return conn, nil
}

func forwardAudioToAssemblyAI(audioBuffer *EnhancedAudioBuffer,
payload string) error {
	// Decode base64 payload from Twilio
	audioData, err := base64.StdEncoding.DecodeString(payload)
	if err != nil {
		return fmt.Errorf("failed to decode audio data: %w",
err)
	}

	// Validate audio chunk size
	if len(audioData) == 0 {
		return nil // Don't send empty chunks
	}

	if len(audioData) > 8000 { // More than 1 second of 8kHz
audio
		log.Printf("Warning: audio chunk too large: %d bytes",
len(audioData))
	}

	// Add to buffer instead of sending directly
	return audioBuffer.AddAudio(audioData)
}

func handleAssemblyAIResponses(ctx context.Context, conn
*websocket.Conn,
	display *TranscriptDisplay, sessionMgr *SessionManager) {

	defer display.Clear()

	for {
		select {
		case <-ctx.Done():
			return
		default:
			_, data, err := conn.ReadMessage()
			if err != nil {
				log.Printf("Error reading from AssemblyAI:
%v", err)
				sessionMgr.SetInactive()
				return
			}

			sessionMgr.UpdateLastMessage()

			// First parse to get message type
			var baseMsg BaseMessage
			if err := json.Unmarshal(data, &baseMsg); err !=
nil {
				log.Printf("Error unmarshaling base
message: %v", err)
				continue
			}

			switch baseMsg.Type {
			case "Begin":
				var msg BeginMessage
				if err := json.Unmarshal(data, &msg); err
!= nil {
					log.Printf("Error unmarshaling Begin
message: %v", err)
					continue
				}
				sessionMgr.SetSessionStarted(msg.ID)
				expiresTime :=
time.Unix(int64(msg.ExpiresAt), 0)
				log.Printf("Session started - ID: %s,
Expires: %s",
					msg.ID,
expiresTime.Format(time.RFC3339))

			case "Turn":
				var msg TurnMessage
				if err := json.Unmarshal(data, &msg); err
!= nil {
					log.Printf("Error unmarshaling Turn
message: %v", err)
					continue
				}

				if strings.TrimSpace(msg.Transcript) != ""
{
					if msg.TurnIsFormatted {
						// Final formatted transcript
						display.AddFinal(msg.Transcript)
					} else if msg.EndOfTurn {
						// Final but unformatted - wait
for formatted version
						// Don't display anything yet
					} else {
						// Partial transcript
display.UpdatePartial(msg.Transcript)
					}
				}

			case "Termination":
				var msg TerminationMessage
				if err := json.Unmarshal(data, &msg); err
!= nil {
					log.Printf("Error unmarshaling
Termination message: %v", err)
					continue
				}
				log.Printf("\nSession terminated - Audio:
%ds, Session: %ds",
					msg.AudioDurationSeconds,
msg.SessionDurationSeconds)
				sessionMgr.SetInactive()
				return
			}
		}
	}
}

Get a free API key

Access AssemblyAI's API with $50 in free credits

Get Free API Key
Title goes here

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

Button Text
Tutorial
Streaming Speech-to-Text
Go