Transcribe phone calls in real-time in Go with Twilio and AssemblyAI
Learn how to build a real-time phone call transcription system in Go using Twilio MediaStreams and AssemblyAI's Universal-Streaming Speech-to-Text API.



In this tutorial, you'll build a server application in Go that transcribes an incoming Twilio phone call into text in real-time using direct API calls to AssemblyAI's Universal-Streaming API.
You'll use a Twilio MediaStream to stream the voice data to your local server application. You'll pass the voice data to AssemblyAI's Universal-Streaming Speech-to-Text API to transcribe the call into text, and then print it in your terminal in real-time.
This tutorial walks you through the code, step-by-step, but you can also find the complete source code at the end of this page or on the GitHub repo here.
Before you get started
To complete this tutorial, you'll need:
- An AssemblyAI account
- Signing up gets you $50 in free credits and access to Universal-Streaming by default!
- Go installed
- A Twilio account with a phone number
Step 1: Set up ngrok
Twilio will need to access your server through a publicly available URL. In this tutorial, you'll use ngrok to create a publicly available URL for an application running on your local computer.
If you already have a preferred way to expose your application publicly, you may skip this step.
- Sign up for an ngrok account
- Install ngrok for your platform
- Authenticate your ngrok agent using Your Authtoken:
ngrok config add-authtoken <YOUR_TOKEN>
Open an ngrok tunnel for port `8080`. ngrok will only tunnel connections while the following command is running:
ngrok http 8080
You'll see something similar to the output below, where the URL next to Forwarding is the publicly available URL that forwards to your local 8080 port (https://84c5df474.ngrok-free.dev in the example output).
ngrok (Ctrl+C to quit)
Session Status online
Account inconshreveable (Plan: Free)
Version 3.0.0
Region United States (us)
Latency 78ms
Web Interface http://127.0.0.1:4040
Forwarding https://84c5df474.ngrok-free.dev -> http://localhost:8080
Connections ttl opn rt1 rt5 p50 p90
0 0 0.00 0.00 0.00 0.00
Copy the `Forwarding` URL in your terminal output and save it for the next step.
Step 2: Set up Twilio and open a ngrok tunnel
You'll need to register a phone number with Twilio and configure it to call your server application whenever someone calls that number.
- Sign up for a Twilio account
- To get a Twilio number, go to your Twilio console and go to `Phone Number > Manage > Buy a number`. There, you will see a list of numbers you can purchase for a small monthly fee - select one and click `Buy`. Note that we only need Voice capabilities for this tutorial.

Navigate to `Phone Numbers > Manage > Active numbers` and select the phone number you bought above. In the `Voice Configuration`, set a `Webhook` for when a call comes in, pasting the ngrok URL you just copied (from the previous step) under URL and setting the HTTP method to HTTP `POST`:

Then scroll down and click `Save Configuration` to save this change.
You have now configured your Twilio number to send a POST request to the ngrok URL when your number is called, and opened a tunnel that forwards this ngrok URL to port `8080` on your local machine.
We now have a working Go application that can successfully receive and respond to a Twilio phone call - it’s time to add in the WebSocket that receives incoming speech.
Step 3: Create a WebSocket server for Twilio media streams
In this step, you'll set up the Go server application to accept the Twilio MediaStream and connect to AssemblyAI's Universal-Streaming Speech-to-Text API.
Create and navigate into a new project directory:
mkdir realtime-transcription-go
cd realtime-transcription-go
Initialize your Go module:
go mod init realtime-transcription-go
Install the required dependencies:
go get github.com/gorilla/websocket
go get github.com/joho/godotenv
Create a new file called main.go with the following content:
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/joho/godotenv"
)
var (
assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func main() {
// Load .env file
err := godotenv.Load()
if err != nil {
log.Println("No .env file found")
}
assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
if assemblyAIAPIKey == "" {
log.Fatal("ASSEMBLYAI_API_KEY environment variable is
required")
}
http.HandleFunc("/", twilio)
http.HandleFunc("/media", media)
log.Println("Server is running on port 8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
func twilio(w http.ResponseWriter, r *http.Request) {
// Respond to Twilio request and initiate a Twilio
MediaStream
}
func media(w http.ResponseWriter, r *http.Request) {
// Serve the incoming WebSocket connection from Twilio
}
Create a `.env` file in your project directory and add your AssemblyAI API key:
ASSEMBLYAI_API_KEY=your_api_key_here
Step 4: Define message structures
AssemblyAI's Universal-Streaming API uses specific message formats. Add these struct definitions to handle the different message types:
type TwilioMessage struct {
Event string `json:"event"`
Media struct {
Payload string `json:"payload"`
} `json:"media"`
}
// Universal-Streaming message types
type BaseMessage struct {
Type string `json:"type"`
}
type BeginMessage struct {
Type string `json:"type"`
ID string `json:"id"`
ExpiresAt float64 `json:"expires_at"`
}
type TurnMessage struct {
Type string `json:"type"`
TurnOrder int `json:"turn_order"`
TurnIsFormatted bool `json:"turn_is_formatted"`
EndOfTurn bool `json:"end_of_turn"`
Transcript string `json:"transcript"`
EndOfTurnConfidence float64 `json:"end_of_turn_confidence"`
Words []Word `json:"words"`
}
type Word struct {
Text string `json:"text"`
Start int `json:"start"`
End int `json:"end"`
Confidence float64 `json:"confidence"`
WordIsFinal bool `json:"word_is_final"`
}
type TerminationMessage struct {
Type string `json:"type"`
AudioDurationSeconds int `json:"audio_duration_seconds"`
SessionDurationSeconds int `json:"session_duration_seconds"`
}
type TerminateMessage struct {
Type string `json:"type"`
}
Step 5: Implement TwiML response handler
When someone calls the phone number, Twilio makes an HTTP request to an endpoint where you define how to respond. You'll use TwiML to define the instructions that tell Twilio what to do when you receive an incoming call.
Update your twilio function:
func twilio(w http.ResponseWriter, r *http.Request) {
log.Printf("Received %s request to / endpoint from %s",
r.Method, r.RemoteAddr)
if r.Method != "POST" && r.Method != "GET" {
log.Printf("Method not allowed: %s", r.Method)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
host := r.Host
wsProtocol := "wss"
twiML := `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>Now connecting to the streaming service.</Say>
<Connect>
<Stream url='%s://%s/media' />
</Connect>
</Response>`
w.Header().Set("Content-Type", "application/xml")
responseXML := fmt.Sprintf(twiML, wsProtocol, host)
log.Printf("Sending TwiML response with stream URL:
%s://%s/media", wsProtocol, host)
fmt.Fprint(w, responseXML)
}
The TwiML instructs Twilio to:
- Say a greeting message to the caller
- Connect to a WebSocket stream at the /media endpoint
Step 6: Create audio buffering system
Since Twilio sends very small audio chunks (around 20ms each) but AssemblyAI's Universal-Streaming API requires chunks between 50-1000ms, we need to buffer the audio. Add this audio buffer implementation:
const (
MIN_BUFFER_SIZE = 320 // 40ms minimum (2 Twilio chunks)
MAX_BUFFER_SIZE = 400 // 50ms maximum
)
type EnhancedAudioBuffer struct {
buffer []byte
mutex sync.Mutex
minSize int
maxSize int
conn *websocket.Conn
isActive bool
connMutex sync.RWMutex
lastFlush time.Time
flushTimer *time.Timer
totalBytes int64
totalChunks int64
}
func NewEnhancedAudioBuffer(conn *websocket.Conn)
*EnhancedAudioBuffer {
return &EnhancedAudioBuffer{
buffer: make([]byte, 0),
minSize: MIN_BUFFER_SIZE,
maxSize: MAX_BUFFER_SIZE,
conn: conn,
isActive: true,
lastFlush: time.Now(),
}
}
func (eab *EnhancedAudioBuffer) AddAudio(audioData []byte) error {
eab.mutex.Lock()
defer eab.mutex.Unlock()
if !eab.isConnectionActive() {
return fmt.Errorf("connection is not active")
}
eab.buffer = append(eab.buffer, audioData...)
eab.totalBytes += int64(len(audioData))
eab.totalChunks++
// Reset flush timer if it exists
if eab.flushTimer != nil {
eab.flushTimer.Stop()
}
// Immediate flush if buffer is at max size
if len(eab.buffer) >= eab.maxSize {
return eab.flushBuffer()
}
// Set timer to flush after 100ms if we have minimum data
if len(eab.buffer) >= eab.minSize {
eab.flushTimer = time.AfterFunc(100*time.Millisecond,
func() {
eab.mutex.Lock()
defer eab.mutex.Unlock()
if len(eab.buffer) > 0 {
eab.flushBuffer()
}
})
}
return nil
}
func (eab *EnhancedAudioBuffer) flushBuffer() error {
if len(eab.buffer) == 0 {
return nil
}
if !eab.isConnectionActive() {
return fmt.Errorf("connection is not active")
}
// Send the buffered audio
bufferedAudio := make([]byte, len(eab.buffer))
copy(bufferedAudio, eab.buffer)
bufferSize := len(eab.buffer)
// Clear the buffer before sending
eab.buffer = eab.buffer[:0]
eab.lastFlush = time.Now()
// Send without holding the mutex
eab.connMutex.RLock()
defer eab.connMutex.RUnlock()
if eab.conn != nil && eab.isActive {
err := eab.conn.WriteMessage(websocket.BinaryMessage,
bufferedAudio)
if err != nil {
log.Printf("Error sending %d bytes to AssemblyAI:
%v", bufferSize, err)
eab.setInactive()
return err
}
}
return nil
}
func (eab *EnhancedAudioBuffer) setInactive() {
eab.connMutex.Lock()
defer eab.connMutex.Unlock()
eab.isActive = false
}
func (eab *EnhancedAudioBuffer) isConnectionActive() bool {
eab.connMutex.RLock()
defer eab.connMutex.RUnlock()
return eab.isActive && eab.conn != nil
}
Step 7: Create transcript display system
To handle clean display of partial and final transcripts, add this display manager:
type TranscriptDisplay struct {
currentPartial string
mutex sync.Mutex
currentLinePrinted bool
}
func NewTranscriptDisplay() *TranscriptDisplay {
return &TranscriptDisplay{}
}
func (td *TranscriptDisplay) UpdatePartial(text string) {
td.mutex.Lock()
defer td.mutex.Unlock()
// Only update if text is not empty and different from current
if strings.TrimSpace(text) == "" || text == td.currentPartial {
return
}
td.currentPartial = text
td.displayPartial()
}
func (td *TranscriptDisplay) AddFinal(text string) {
td.mutex.Lock()
defer td.mutex.Unlock()
// Clear any partial text first
if td.currentPartial != "" {
fmt.Printf("\r%s\r", strings.Repeat(" ",
len(td.currentPartial)+20))
}
// Print the final text on a new line
if strings.TrimSpace(text) != "" {
fmt.Printf("%s\n", text)
}
// Reset for next utterance
td.currentPartial = ""
td.currentLinePrinted = false
}
func (td *TranscriptDisplay) displayPartial() {
// Clear the current line and redraw
fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
if td.currentPartial != "" {
fmt.Printf("%s", td.currentPartial)
td.currentLinePrinted = true
}
}
func (td *TranscriptDisplay) Clear() {
td.mutex.Lock()
defer td.mutex.Unlock()
if td.currentLinePrinted {
fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
}
td.currentPartial = ""
td.currentLinePrinted = false
}
Step 8: Connect to AssemblyAI Universal-Streaming
Now implement the WebSocket connection to AssemblyAI's Universal-Streaming Speech-to-Text API:
func connectToAssemblyAI() (*websocket.Conn, error) {
// Build URL with query parameters for v3
params := url.Values{}
params.Add("sample_rate", "8000")
params.Add("encoding", "pcm_mulaw")
params.Add("format_turns", "true")
// Optional: Configure turn detection parameters
params.Add("end_of_turn_confidence_threshold", "0.7")
params.Add("min_end_of_turn_silence_when_confident", "160")
params.Add("max_turn_silence", "2400")
wsURL :=
fmt.Sprintf("wss://streaming.assemblyai.com/v3/ws?%s",
params.Encode())
// Create WebSocket connection to AssemblyAI v3
headers := http.Header{}
headers.Add("Authorization", assemblyAIAPIKey)
conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers)
if err != nil {
return nil, fmt.Errorf("failed to connect to AssemblyAI:
%w", err)
}
log.Println("Connected to AssemblyAI Universal-Streaming")
return conn, nil
}
This function:
- Builds the WebSocket URL with the required parameters for Universal-Streaming (v3)
- Sets the sample rate to 8000 Hz (Twilio's format)
- Specifies PCM μ-law encoding (Twilio's audio format)
- Enables formatted turns for better transcript output
- Configures turn detection parameters for natural conversation flow
Step 9: Handle audio forwarding
Add the function to forward audio from Twilio to AssemblyAI:
func forwardAudioToAssemblyAI(audioBuffer *EnhancedAudioBuffer, payload string) error {
// Decode base64 payload from Twilio
audioData, err := base64.StdEncoding.DecodeString(payload)
if err != nil {
return fmt.Errorf("failed to decode audio data: %w", err)
}
// Validate audio chunk size
if len(audioData) == 0 {
return nil // Don't send empty chunks
}
if len(audioData) > 8000 { // More than 1 second of 8kHz audio
log.Printf("Warning: audio chunk too large: %d bytes",
len(audioData))
}
// Add to buffer instead of sending directly
return audioBuffer.AddAudio(audioData)
}
Step 10: Handle AssemblyAI responses
Implement the function to process responses from AssemblyAI:
func handleAssemblyAIResponses(ctx context.Context, conn
*websocket.Conn,
display *TranscriptDisplay, sessionMgr *SessionManager) {
defer display.Clear()
for {
select {
case <-ctx.Done():
return
default:
_, data, err := conn.ReadMessage()
if err != nil {
log.Printf("Error reading from AssemblyAI: %v",
err)
sessionMgr.SetInactive()
return
}
sessionMgr.UpdateLastMessage()
// First parse to get message type
var baseMsg BaseMessage
if err := json.Unmarshal(data, &baseMsg); err != nil
{
log.Printf("Error unmarshaling base message: %v", err)
continue
}
switch baseMsg.Type {
case "Begin":
var msg BeginMessage
if err := json.Unmarshal(data, &msg); err != nil
{
log.Printf("Error unmarshaling Begin message:
%v", err)
continue
}
sessionMgr.SetSessionStarted(msg.ID)
expiresTime := time.Unix(int64(msg.ExpiresAt), 0)
log.Printf("Session started - ID: %s, Expires:
%s",
msg.ID, expiresTime.Format(time.RFC3339))
case "Turn":
var msg TurnMessage
if err := json.Unmarshal(data, &msg); err != nil
{
log.Printf("Error unmarshaling Turn message:
%v", err)
continue
}
// Handle transcript display
if strings.TrimSpace(msg.Transcript) != "" {
if msg.TurnIsFormatted {
// Final formatted transcript
display.AddFinal(msg.Transcript)
} else if msg.EndOfTurn {
// Final but unformatted - wait for
formatted version
// Don't display anything yet
} else {
// Partial transcript
display.UpdatePartial(msg.Transcript)
}
}
case "Termination":
var msg TerminationMessage
if err := json.Unmarshal(data, &msg); err != nil
{
log.Printf("Error unmarshaling Termination
message: %v", err)
continue
}
log.Printf("\nSession terminated - Audio: %ds,
Session: %ds",
msg.AudioDurationSeconds,
msg.SessionDurationSeconds)
sessionMgr.SetInactive()
return
}
}
}
}
This function handles three types of messages from AssemblyAI:
- Begin: Session establishment confirmation
- Turn: Transcript data (partial and final)
- Termination: Session end notification
The transcript handling logic ensures a smooth user experience by displaying partial transcripts in real-time and replacing them with final formatted versions.
Step 11: Implement the media WebSocket handler
Now implement the complete media function that ties everything together:
func media(w http.ResponseWriter, r *http.Request) {
log.Printf("Received request to /media endpoint - Method:
%s", r.Method)
if r.Method != "GET" {
log.Printf("Wrong method for WebSocket upgrade: %s",
r.Method)
return
}
// Upgrade HTTP request to WebSocket for Twilio connection
twilioConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("unable to upgrade connection to websocket:",
err)
return
}
defer twilioConn.Close()
log.Println("Successfully upgraded to WebSocket connection")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create session manager and display
sessionMgr := NewSessionManager()
display := NewTranscriptDisplay()
defer sessionMgr.Close()
// Connect to AssemblyAI Universal-Streaming
assemblyAIConn, err := connectToAssemblyAI()
if err != nil {
log.Println("unable to connect to AssemblyAI:", err)
return
}
// Create audio buffer
audioBuffer := NewEnhancedAudioBuffer(assemblyAIConn)
defer func() {
log.Println("Cleaning up connections...")
// Stop audio buffer
audioBuffer.setInactive()
// Flush any remaining audio
if err := audioBuffer.FlushRemaining(); err !=
nil {
log.Printf("Error flushing remaining audio: %v",
err)
}
// Send termination message before closing
terminateMsg := TerminateMessage{Type: "Terminate"}
if err := assemblyAIConn.WriteJSON(terminateMsg); err != nil {
log.Printf("Error sending terminate message: %v", err)
}
time.Sleep(200 * time.Millisecond)
assemblyAIConn.Close()
log.Println("Cleanup completed")
}()
// Start goroutine to handle AssemblyAI responses
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
handleAssemblyAIResponses(ctx, assemblyAIConn, display,
sessionMgr)
}()
// Process Twilio messages
for {
var message TwilioMessage
err := twilioConn.ReadJSON(&message)
if err != nil {
log.Printf("Unable to read twilio message: %v", err)
cancel()
break
}
switch message.Event {
case "connected":
log.Println("Twilio mediastream connected")
case "start":
log.Println("Twilio mediastream started")
case "media":
// Only process audio if session is active or not yet
started
if sessionMgr.IsActive() || sessionMgr.sessionID ==
"" {
if err := forwardAudioToAssemblyAI(audioBuffer,
message.Media.Payload); err != nil {
log.Printf("Unable to forward audio to
AssemblyAI: %v", err)
}
}
case "stop":
log.Println("Twilio mediastream stopped")
cancel()
break
default:
log.Printf("Received unknown event type: %s",
message.Event)
}
}
wg.Wait()
log.Println("WebSocket handler completed")
}
Step 12: Add session management
For robustness, add a session manager to track the AssemblyAI connection state (optional):
type SessionManager struct {
sessionID string
startTime time.Time
lastMessageTime time.Time
isActive bool
mutex sync.RWMutex
healthCheckTicker *time.Ticker
ctx context.Context
cancel context.CancelFunc
}
func NewSessionManager() *SessionManager {
ctx, cancel := context.WithCancel(context.Background())
sm := &SessionManager{
isActive: false,
ctx: ctx,
cancel: cancel,
}
// Start health check ticker
sm.healthCheckTicker = time.NewTicker(30 * time.Second)
go sm.healthCheckLoop()
return sm
}
func (sm *SessionManager) SetSessionStarted(sessionID string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.sessionID = sessionID
sm.startTime = time.Now()
sm.lastMessageTime = time.Now()
sm.isActive = true
log.Printf("Session started: %s", sessionID)
}
func (sm *SessionManager) UpdateLastMessage() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.lastMessageTime = time.Now()
}
func (sm *SessionManager) IsActive() bool {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
return sm.isActive
}
func (sm *SessionManager) SetInactive() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.isActive = false
}
func (sm *SessionManager) Close() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.isActive = false
if sm.healthCheckTicker != nil {
sm.healthCheckTicker.Stop()
}
sm.cancel()
if sm.sessionID != "" {
duration := time.Since(sm.startTime)
log.Printf("Session %s closed after %v", sm.sessionID,
duration)
}
}
func (sm *SessionManager) healthCheckLoop() {
for {
select {
case <-sm.ctx.Done():
return
case <-sm.healthCheckTicker.C:
sm.checkHealth()
}
}
}
func (sm *SessionManager) checkHealth() {
sm.mutex.RLock()
timeSinceLastMessage := time.Since(sm.lastMessageTime)
isActive := sm.isActive
sm.mutex.RUnlock()
if isActive && timeSinceLastMessage > 5*time.Minute {
log.Printf("Warning: No messages received for %v", timeSinceLastMessage)
}
}
Step 13: Test your application
Now you're ready to test the complete application:
Start the server:
go run main.go
You should see output like:
[date] Server is running on port 8080
[date] AssemblyAI API Key is set: true
Call your Twilio phone number. If prompted by your operating system, allow the application to access the network.
Once instructed by the voice, start speaking to see your call transcribed in your server log:
[date] Received POST request to / endpoint from <INT>
[date] Sending TwiML response with stream URL: wss://<URL>.ngrok-free.app/media
[date] Received request to /media endpoint - Method: GET
[date] Successfully upgraded to WebSocket connection
[date] Connected to AssemblyAI Universal-Streaming
[date] Twilio mediastream connected
[date] Twilio mediastream started
[date] Session started: <SESSION_ID>
[date] Session started - ID: <SESSION_ID>, Expires: [date]
Hello.
[date] Audio stats: 300 chunks, 160.0 avg bytes/chunk, last buffer: 480 bytes
This is a test.
[date] Audio stats: 600 chunks, 160.0 avg bytes/chunk, last buffer: 480 bytes
Most of this logging can be useful for debugging purposes, but can be safely removed to show only the real-time transcript output.
Learn more
In this tutorial, you built a Go application that transcribes incoming phone calls in real-time using Twilio and AssemblyAI's Universal-Streaming Speech-to-Text API.
This implementation demonstrates how to:
- Handle WebSocket connections for real-time audio streaming
- Integrate with Twilio's MediaStream for phone call capture
- Process audio data in mu-law format from Twilio
- Connect directly to AssemblyAI's WebSocket API for streaming transcription
To keep up with more content like this, make sure you subscribe to our newsletter and join our Discord server.
Complete source code
Also available via the GitHub repo here.
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/joho/godotenv"
)
type TwilioMessage struct {
Event string `json:"event"`
Media struct {
Payload string `json:"payload"`
} `json:"media"`
}
// Universal-Streaming message types
type BaseMessage struct {
Type string `json:"type"`
}
type BeginMessage struct {
Type string `json:"type"`
ID string `json:"id"`
ExpiresAt float64 `json:"expires_at"`
}
type TurnMessage struct {
Type string `json:"type"`
TurnOrder int `json:"turn_order"`
TurnIsFormatted bool `json:"turn_is_formatted"`
EndOfTurn bool `json:"end_of_turn"`
Transcript string `json:"transcript"`
EndOfTurnConfidence float64 `json:"end_of_turn_confidence"`
Words []Word `json:"words"`
}
type Word struct {
Text string `json:"text"`
Start int `json:"start"`
End int `json:"end"`
Confidence float64 `json:"confidence"`
WordIsFinal bool `json:"word_is_final"`
}
type TerminationMessage struct {
Type string `json:"type"`
AudioDurationSeconds int
`json:"audio_duration_seconds"`
SessionDurationSeconds int
`json:"session_duration_seconds"`
}
type TerminateMessage struct {
Type string `json:"type"`
}
const (
MIN_BUFFER_SIZE = 320 // 40ms minimum (2 Twilio chunks)
MAX_BUFFER_SIZE = 400 // 50ms maximum
)
// TranscriptDisplay handles clean display of partial and final
transcripts
type TranscriptDisplay struct {
currentPartial string
mutex sync.Mutex
currentLinePrinted bool
}
func NewTranscriptDisplay() *TranscriptDisplay {
return &TranscriptDisplay{}
}
func (td *TranscriptDisplay) UpdatePartial(text string) {
td.mutex.Lock()
defer td.mutex.Unlock()
// Only update if text is not empty and different from
current
if strings.TrimSpace(text) == "" || text ==
td.currentPartial {
return
}
td.currentPartial = text
td.displayPartial()
}
func (td *TranscriptDisplay) AddFinal(text string) {
td.mutex.Lock()
defer td.mutex.Unlock()
// Clear any partial text first
if td.currentPartial != "" {
fmt.Printf("\r%s\r", strings.Repeat(" ",
len(td.currentPartial)+20))
}
// Print the final text on a new line
if strings.TrimSpace(text) != "" {
fmt.Printf("%s\n", text)
}
// Reset for next utterance
td.currentPartial = ""
td.currentLinePrinted = false
}
func (td *TranscriptDisplay) displayPartial() {
// Clear the current line and redraw
fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
if td.currentPartial != "" {
fmt.Printf("%s", td.currentPartial)
td.currentLinePrinted = true
}
}
func (td *TranscriptDisplay) Clear() {
td.mutex.Lock()
defer td.mutex.Unlock()
if td.currentLinePrinted {
fmt.Printf("\r%s\r", strings.Repeat(" ", 100))
}
td.currentPartial = ""
td.currentLinePrinted = false
}
// SessionManager handles AssemblyAI session state and health
type SessionManager struct {
sessionID string
startTime time.Time
lastMessageTime time.Time
isActive bool
mutex sync.RWMutex
healthCheckTicker *time.Ticker
ctx context.Context
cancel context.CancelFunc
}
func NewSessionManager() *SessionManager {
ctx, cancel := context.WithCancel(context.Background())
sm := &SessionManager{
isActive: false,
ctx: ctx,
cancel: cancel,
}
// Start health check ticker
sm.healthCheckTicker = time.NewTicker(30 * time.Second)
go sm.healthCheckLoop()
return sm
}
func (sm *SessionManager) SetSessionStarted(sessionID string) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.sessionID = sessionID
sm.startTime = time.Now()
sm.lastMessageTime = time.Now()
sm.isActive = true
log.Printf("Session started: %s", sessionID)
}
func (sm *SessionManager) UpdateLastMessage() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.lastMessageTime = time.Now()
}
func (sm *SessionManager) IsActive() bool {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
return sm.isActive
}
func (sm *SessionManager) SetInactive() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.isActive = false
}
func (sm *SessionManager) healthCheckLoop() {
for {
select {
case <-sm.ctx.Done():
return
case <-sm.healthCheckTicker.C:
sm.checkHealth()
}
}
}
func (sm *SessionManager) checkHealth() {
sm.mutex.RLock()
timeSinceLastMessage := time.Since(sm.lastMessageTime)
isActive := sm.isActive
sm.mutex.RUnlock()
if isActive && timeSinceLastMessage > 5*time.Minute {
log.Printf("Warning: No messages received for %v",
timeSinceLastMessage)
}
}
func (sm *SessionManager) Close() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.isActive = false
if sm.healthCheckTicker != nil {
sm.healthCheckTicker.Stop()
}
sm.cancel()
if sm.sessionID != "" {
duration := time.Since(sm.startTime)
log.Printf("Session %s closed after %v", sm.sessionID,
duration)
}
}
// EnhancedAudioBuffer with timing controls and health monitoring
type EnhancedAudioBuffer struct {
buffer []byte
mutex sync.Mutex
minSize int
maxSize int
conn *websocket.Conn
isActive bool
connMutex sync.RWMutex
lastFlush time.Time
flushTimer *time.Timer
totalBytes int64
totalChunks int64
}
func NewEnhancedAudioBuffer(conn *websocket.Conn)
*EnhancedAudioBuffer {
return &EnhancedAudioBuffer{
buffer: make([]byte, 0),
minSize: MIN_BUFFER_SIZE,
maxSize: MAX_BUFFER_SIZE,
conn: conn,
isActive: true,
lastFlush: time.Now(),
}
}
func (eab *EnhancedAudioBuffer) AddAudio(audioData []byte) error {
eab.mutex.Lock()
defer eab.mutex.Unlock()
if !eab.isConnectionActive() {
return fmt.Errorf("connection is not active")
}
eab.buffer = append(eab.buffer, audioData...)
eab.totalBytes += int64(len(audioData))
eab.totalChunks++
// Reset flush timer if it exists
if eab.flushTimer != nil {
eab.flushTimer.Stop()
}
// Immediate flush if buffer is at max size
if len(eab.buffer) >= eab.maxSize {
return eab.flushBuffer()
}
// Set timer to flush after 100ms if we have minimum data
if len(eab.buffer) >= eab.minSize {
eab.flushTimer = time.AfterFunc(100*time.Millisecond,
func() {
eab.mutex.Lock()
defer eab.mutex.Unlock()
if len(eab.buffer) > 0 {
eab.flushBuffer()
}
})
}
return nil
}
func (eab *EnhancedAudioBuffer) flushBuffer() error {
if len(eab.buffer) == 0 {
return nil
}
if !eab.isConnectionActive() {
return fmt.Errorf("connection is not active")
}
// Send the buffered audio
bufferedAudio := make([]byte, len(eab.buffer))
copy(bufferedAudio, eab.buffer)
bufferSize := len(eab.buffer)
// Clear the buffer before sending
eab.buffer = eab.buffer[:0]
eab.lastFlush = time.Now()
// Send without holding the mutex
eab.connMutex.RLock()
defer eab.connMutex.RUnlock()
if eab.conn != nil && eab.isActive {
err := eab.conn.WriteMessage(websocket.BinaryMessage,
bufferedAudio)
if err != nil {
log.Printf("Error sending %d bytes to AssemblyAI:
%v", bufferSize, err)
eab.setInactive()
return err
}
// Log periodic stats
if eab.totalChunks%100 == 0 {
avgChunkSize := float64(eab.totalBytes) /
float64(eab.totalChunks)
log.Printf("Audio stats: %d chunks, %.1f avg
bytes/chunk, last buffer: %d bytes",
eab.totalChunks, avgChunkSize, bufferSize)
}
}
return nil
}
func (eab *EnhancedAudioBuffer) FlushRemaining() error {
eab.mutex.Lock()
defer eab.mutex.Unlock()
// Stop any pending timer
if eab.flushTimer != nil {
eab.flushTimer.Stop()
}
return eab.flushBuffer()
}
func (eab *EnhancedAudioBuffer) setInactive() {
eab.connMutex.Lock()
defer eab.connMutex.Unlock()
eab.isActive = false
}
func (eab *EnhancedAudioBuffer) isConnectionActive() bool {
eab.connMutex.RLock()
defer eab.connMutex.RUnlock()
return eab.isActive && eab.conn != nil
}
func (eab *EnhancedAudioBuffer) GetStats() (int64, int64,
float64) {
eab.mutex.Lock()
defer eab.mutex.Unlock()
avgSize := float64(0)
if eab.totalChunks > 0 {
avgSize = float64(eab.totalBytes) /
float64(eab.totalChunks)
}
return eab.totalBytes, eab.totalChunks, avgSize
}
var (
assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func main() {
// Load .env file
err := godotenv.Load()
if err != nil {
log.Println("No .env file found")
}
assemblyAIAPIKey = os.Getenv("ASSEMBLYAI_API_KEY")
if assemblyAIAPIKey == "" {
log.Fatal("ASSEMBLYAI_API_KEY environment variable is
required")
}
http.HandleFunc("/", twilio)
http.HandleFunc("/media", media)
log.Println("Server is running on port 8080")
log.Printf("AssemblyAI API Key is set: %v", assemblyAIAPIKey
!= "")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
func twilio(w http.ResponseWriter, r *http.Request) {
log.Printf("Received %s request to / endpoint from %s",
r.Method, r.RemoteAddr)
if r.Method != "POST" && r.Method != "GET" {
log.Printf("Method not allowed: %s", r.Method)
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
host := r.Host
wsProtocol := "wss"
if r.TLS == nil {
wsProtocol = "wss"
}
twiML := `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>Now connecting to the streaming service.</Say>
<Connect>
<Stream url='%s://%s/media' />
</Connect>
</Response>`
w.Header().Set("Content-Type", "application/xml")
responseXML := fmt.Sprintf(twiML, wsProtocol, host)
log.Printf("Sending TwiML response with stream URL:
%s://%s/media", wsProtocol, host)
fmt.Fprint(w, responseXML)
}
func media(w http.ResponseWriter, r *http.Request) {
log.Printf("Received request to /media endpoint - Method:
%s", r.Method)
if r.Method != "GET" {
log.Printf("Wrong method for WebSocket upgrade: %s",
r.Method)
return
}
// Upgrade HTTP request to WebSocket for Twilio connection
twilioConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("unable to upgrade connection to
websocket:", err)
return
}
defer twilioConn.Close()
log.Println("Successfully upgraded to WebSocket connection")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create session manager and display
sessionMgr := NewSessionManager()
display := NewTranscriptDisplay()
defer sessionMgr.Close()
// Connect to AssemblyAI Universal-Streaming
assemblyAIConn, err := connectToAssemblyAI()
if err != nil {
log.Println("unable to connect to AssemblyAI:", err)
return
}
// Create enhanced audio buffer
audioBuffer := NewEnhancedAudioBuffer(assemblyAIConn)
defer func() {
log.Println("Cleaning up connections...")
// Stop audio buffer
audioBuffer.setInactive()
// Print final stats
totalBytes, totalChunks, avgSize :=
audioBuffer.GetStats()
log.Printf("Final audio stats: %d bytes, %d chunks,
%.1f avg bytes/chunk",
totalBytes, totalChunks, avgSize)
// Flush any remaining audio
if err := audioBuffer.FlushRemaining(); err != nil {
log.Printf("Error flushing remaining audio: %v",
err)
}
// Send termination message before closing
terminateMsg := TerminateMessage{Type: "Terminate"}
if err := assemblyAIConn.WriteJSON(terminateMsg); err
!= nil {
log.Printf("Error sending terminate message: %v",
err)
}
time.Sleep(200 * time.Millisecond)
assemblyAIConn.Close()
log.Println("Cleanup completed")
}()
// Start goroutine to handle AssemblyAI responses
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
handleAssemblyAIResponses(ctx, assemblyAIConn,
display, sessionMgr)
}()
// Process Twilio messages
for {
var message TwilioMessage
err := twilioConn.ReadJSON(&message)
if err != nil {
log.Printf("Unable to read twilio message: %v",
err)
cancel()
break
}
switch message.Event {
case "connected":
log.Println("Twilio mediastream connected")
case "start":
log.Println("Twilio mediastream started")
case "media":
// Only process audio if session is active or not
yet started
if sessionMgr.IsActive() || sessionMgr.sessionID
== "" {
if err := forwardAudioToAssemblyAI(audioBuffer, message.Media.Payload); err
!= nil {
log.Printf("Unable to forward audio to
AssemblyAI: %v", err)
}
}
case "stop":
log.Println("Twilio mediastream stopped")
cancel()
break
default:
log.Printf("Received unknown event type: %s",
message.Event)
}
}
wg.Wait()
log.Println("WebSocket handler completed")
}
func connectToAssemblyAI() (*websocket.Conn, error) {
// Build URL with query parameters for v3
params := url.Values{}
params.Add("sample_rate", "8000")
params.Add("encoding", "pcm_mulaw")
params.Add("format_turns", "true")
// Optional: Configure turn detection parameters
params.Add("end_of_turn_confidence_threshold", "0.7")
params.Add("min_end_of_turn_silence_when_confident", "160")
params.Add("max_turn_silence", "2400")
wsURL :=
fmt.Sprintf("wss://streaming.assemblyai.com/v3/ws?%s",
params.Encode())
// Create WebSocket connection to AssemblyAI v3
headers := http.Header{}
headers.Add("Authorization", assemblyAIAPIKey)
conn, _, err := websocket.DefaultDialer.Dial(wsURL, headers)
if err != nil {
return nil, fmt.Errorf("failed to connect to
AssemblyAI: %w", err)
}
log.Println("Connected to AssemblyAI Universal-Streaming")
return conn, nil
}
func forwardAudioToAssemblyAI(audioBuffer *EnhancedAudioBuffer,
payload string) error {
// Decode base64 payload from Twilio
audioData, err := base64.StdEncoding.DecodeString(payload)
if err != nil {
return fmt.Errorf("failed to decode audio data: %w",
err)
}
// Validate audio chunk size
if len(audioData) == 0 {
return nil // Don't send empty chunks
}
if len(audioData) > 8000 { // More than 1 second of 8kHz
audio
log.Printf("Warning: audio chunk too large: %d bytes",
len(audioData))
}
// Add to buffer instead of sending directly
return audioBuffer.AddAudio(audioData)
}
func handleAssemblyAIResponses(ctx context.Context, conn
*websocket.Conn,
display *TranscriptDisplay, sessionMgr *SessionManager) {
defer display.Clear()
for {
select {
case <-ctx.Done():
return
default:
_, data, err := conn.ReadMessage()
if err != nil {
log.Printf("Error reading from AssemblyAI:
%v", err)
sessionMgr.SetInactive()
return
}
sessionMgr.UpdateLastMessage()
// First parse to get message type
var baseMsg BaseMessage
if err := json.Unmarshal(data, &baseMsg); err !=
nil {
log.Printf("Error unmarshaling base
message: %v", err)
continue
}
switch baseMsg.Type {
case "Begin":
var msg BeginMessage
if err := json.Unmarshal(data, &msg); err
!= nil {
log.Printf("Error unmarshaling Begin
message: %v", err)
continue
}
sessionMgr.SetSessionStarted(msg.ID)
expiresTime :=
time.Unix(int64(msg.ExpiresAt), 0)
log.Printf("Session started - ID: %s,
Expires: %s",
msg.ID,
expiresTime.Format(time.RFC3339))
case "Turn":
var msg TurnMessage
if err := json.Unmarshal(data, &msg); err
!= nil {
log.Printf("Error unmarshaling Turn
message: %v", err)
continue
}
if strings.TrimSpace(msg.Transcript) != ""
{
if msg.TurnIsFormatted {
// Final formatted transcript
display.AddFinal(msg.Transcript)
} else if msg.EndOfTurn {
// Final but unformatted - wait
for formatted version
// Don't display anything yet
} else {
// Partial transcript
display.UpdatePartial(msg.Transcript)
}
}
case "Termination":
var msg TerminationMessage
if err := json.Unmarshal(data, &msg); err
!= nil {
log.Printf("Error unmarshaling
Termination message: %v", err)
continue
}
log.Printf("\nSession terminated - Audio:
%ds, Session: %ds",
msg.AudioDurationSeconds,
msg.SessionDurationSeconds)
sessionMgr.SetInactive()
return
}
}
}
}
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.