voice-ai-engine-development by sickn33/antigravity-awesome-skills
npx skills add https://github.com/sickn33/antigravity-awesome-skills --skill voice-ai-engine-development本技能指导您构建具备实时对话能力的生产级语音 AI 引擎。语音 AI 引擎通过流式音频处理、语音转文字转录、基于 LLM 的响应生成以及文字转语音合成,实现用户与 AI 智能体之间自然、双向的对话。
核心架构采用基于异步队列的工作者流水线模式,其中每个组件独立运行并通过 asyncio.Queue 对象进行通信,从而在每个阶段实现并发处理、中断处理和实时流式传输。
在以下场景中使用此技能:
每个语音 AI 引擎都遵循此流水线:
音频输入 → 转录器 → 智能体 → 合成器 → 音频输出
(工作者 1) (工作者 2) (工作者 3)
主要优势:
每个工作者都遵循此模式:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # 从中消费的 asyncio.Queue
self.output_queue = output_queue # 向其生产的 asyncio.Queue
self.active = False
def start(self):
"""启动工作者的处理循环"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""主处理循环 - 持续运行直到终止"""
while self.active:
item = await self.input_queue.get() # 阻塞直到项目到达
await self.process(item) # 处理该项目
async def process(self, item):
"""重写此方法 - 执行实际工作"""
raise NotImplementedError
def terminate(self):
"""停止工作者"""
self.active = False
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
目的:将传入的音频块转换为文本转录
接口要求:
class BaseTranscriber:
def __init__(self, transcriber_config):
self.input_queue = asyncio.Queue() # 音频块 (字节)
self.output_queue = asyncio.Queue() # 转录文本
self.is_muted = False
def send_audio(self, chunk: bytes):
"""客户端调用此方法发送音频"""
if not self.is_muted:
self.input_queue.put_nowait(chunk)
else:
# 发送静音块代替 (防止机器说话时产生回声)
self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
def mute(self):
"""当机器人开始说话时调用 (防止回声)"""
self.is_muted = True
def unmute(self):
"""当机器人停止说话时调用"""
self.is_muted = False
输出格式:
class Transcription:
message: str # "Hello, how are you?"
confidence: float # 0.95
is_final: bool # True = 完整句子,False = 部分句子
is_interrupt: bool # 由 TranscriptionsWorker 设置
支持的提供商:
关键实现细节:
asyncio.gather() 并发运行发送方和接收方任务目的:处理用户输入并生成对话响应
接口要求:
class BaseAgent:
def __init__(self, agent_config):
self.input_queue = asyncio.Queue() # TranscriptionAgentInput
self.output_queue = asyncio.Queue() # AgentResponse
self.transcript = None # 对话历史记录
async def generate_response(self, human_input, is_interrupt, conversation_id):
"""重写此方法 - 返回响应的 AsyncGenerator"""
raise NotImplementedError
为何需要流式响应?
支持的提供商:
关键实现细节:
Transcript 对象中维护对话历史记录AsyncGenerator 流式传输响应目的:将智能体文本响应转换为语音音频
接口要求:
class BaseSynthesizer:
async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
"""
返回一个包含以下内容的 SynthesisResult:
- chunk_generator: 生成音频块的 AsyncGenerator
- get_message_up_to: 获取部分文本的函数 (用于中断)
"""
raise NotImplementedError
SynthesisResult 结构:
class SynthesisResult:
chunk_generator: AsyncGenerator[ChunkResult, None]
get_message_up_to: Callable[[float], str] # 秒数 → 部分文本
class ChunkResult:
chunk: bytes # 原始 PCM 音频
is_last_chunk: bool
支持的提供商:
关键实现细节:
get_message_up_to() 以处理中断目的:将合成的音频发送回客户端
关键:用于中断的速率限制
async def send_speech_to_output(self, message, synthesis_result,
stop_event, seconds_per_chunk):
chunk_idx = 0
async for chunk_result in synthesis_result.chunk_generator:
# 检查中断
if stop_event.is_set():
logger.debug(f"Interrupted after {chunk_idx} chunks")
message_sent = synthesis_result.get_message_up_to(
chunk_idx * seconds_per_chunk
)
return message_sent, True # cut_off = True
start_time = time.time()
# 发送块到输出设备
self.output_device.consume_nonblocking(chunk_result.chunk)
# 关键:在发送下一个块之前等待当前块播放完毕
# 这是中断能够工作的原因!
speech_length = seconds_per_chunk
processing_time = time.time() - start_time
await asyncio.sleep(max(speech_length - processing_time, 0))
chunk_idx += 1
return message, False # cut_off = False
为何需要速率限制? 如果没有速率限制,所有音频块会立即发送,这将导致:
通过每 N 秒发送一个块:
中断系统对于自然对话至关重要。
场景:机器人正在说 "I think the weather will be nice today and tomorrow and—" 时,用户用 "Stop" 打断。
步骤 1:用户开始说话
# TranscriptionsWorker 在机器人说话时检测到新的转录
async def process(self, transcription):
if not self.conversation.is_human_speaking: # 机器人正在说话!
# 向所有进行中的事件广播中断
interrupted = self.conversation.broadcast_interrupt()
transcription.is_interrupt = interrupted
步骤 2:broadcast_interrupt() 停止一切
def broadcast_interrupt(self):
num_interrupts = 0
# 中断所有已排队的事件
while True:
try:
interruptible_event = self.interruptible_events.get_nowait()
if interruptible_event.interrupt(): # 设置 interruption_event
num_interrupts += 1
except queue.Empty:
break
# 取消当前任务
self.agent.cancel_current_task() # 停止生成文本
self.agent_responses_worker.cancel_current_task() # 停止合成
return num_interrupts > 0
步骤 3:SynthesisResultsWorker 检测到中断
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
async for chunk_result in synthesis_result.chunk_generator:
# 检查 stop_event (这就是 interruption_event)
if stop_event.is_set():
logger.debug("Interrupted! Stopping speech.")
# 计算实际说了多少
seconds_spoken = chunk_idx * seconds_per_chunk
partial_message = synthesis_result.get_message_up_to(seconds_spoken)
# 例如,"I think the weather will be nice today"
return partial_message, True # cut_off = True
步骤 4:智能体更新历史记录
if cut_off:
# 使用部分消息更新对话历史记录
self.agent.update_last_bot_message_on_cut_off(message_sent)
# 历史记录现在显示:
# Bot: "I think the weather will be nice today" (不完整)
流水线中的每个事件都包装在一个 InterruptibleEvent 中:
class InterruptibleEvent:
def __init__(self, payload, is_interruptible=True):
self.payload = payload
self.is_interruptible = is_interruptible
self.interruption_event = threading.Event() # 初始未设置
self.interrupted = False
def interrupt(self) -> bool:
"""中断此事件"""
if not self.is_interruptible:
return False
if not self.interrupted:
self.interruption_event.set() # 发出停止信号!
self.interrupted = True
return True
return False
def is_interrupted(self) -> bool:
return self.interruption_event.is_set()
使用工厂模式支持多个提供商:
class VoiceHandler:
"""语音组件的多提供商工厂"""
def create_transcriber(self, agent_config: Dict):
"""基于 transcriberProvider 创建转录器"""
provider = agent_config.get("transcriberProvider", "deepgram")
if provider == "deepgram":
return self._create_deepgram_transcriber(agent_config)
elif provider == "assemblyai":
return self._create_assemblyai_transcriber(agent_config)
elif provider == "azure":
return self._create_azure_transcriber(agent_config)
elif provider == "google":
return self._create_google_transcriber(agent_config)
else:
raise ValueError(f"Unknown transcriber provider: {provider}")
def create_agent(self, agent_config: Dict):
"""基于 llmProvider 创建 LLM 智能体"""
provider = agent_config.get("llmProvider", "openai")
if provider == "openai":
return self._create_openai_agent(agent_config)
elif provider == "gemini":
return self._create_gemini_agent(agent_config)
else:
raise ValueError(f"Unknown LLM provider: {provider}")
def create_synthesizer(self, agent_config: Dict):
"""基于 voiceProvider 创建语音合成器"""
provider = agent_config.get("voiceProvider", "elevenlabs")
if provider == "elevenlabs":
return self._create_elevenlabs_synthesizer(agent_config)
elif provider == "azure":
return self._create_azure_synthesizer(agent_config)
elif provider == "google":
return self._create_google_synthesizer(agent_config)
elif provider == "polly":
return self._create_polly_synthesizer(agent_config)
elif provider == "playht":
return self._create_playht_synthesizer(agent_config)
else:
raise ValueError(f"Unknown voice provider: {provider}")
语音 AI 引擎通常使用 WebSocket 进行双向音频流式传输:
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 创建语音组件
voice_handler = VoiceHandler()
transcriber = voice_handler.create_transcriber(agent_config)
agent = voice_handler.create_agent(agent_config)
synthesizer = voice_handler.create_synthesizer(agent_config)
# 创建输出设备
output_device = WebsocketOutputDevice(
ws=websocket,
sampling_rate=16000,
audio_encoding=AudioEncoding.LINEAR16
)
# 创建对话编排器
conversation = StreamingConversation(
output_device=output_device,
transcriber=transcriber,
agent=agent,
synthesizer=synthesizer
)
# 启动所有工作者
await conversation.start()
try:
# 从客户端接收音频
async for message in websocket.iter_bytes():
conversation.receive_audio(message)
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await conversation.terminate()
问题:机器人的音频在响应过程中跳跃或切断。
原因:以小块形式向合成器发送文本会导致多次 TTS 调用。
解决方案:在发送到合成器之前缓冲整个 LLM 响应:
# ❌ 错误:逐句生成
async for sentence in llm_stream:
yield GeneratedResponse(message=BaseMessage(text=sentence))
# ✅ 正确:缓冲整个响应
full_response = ""
async for chunk in llm_stream:
full_response += chunk
yield GeneratedResponse(message=BaseMessage(text=full_response))
问题:机器人听到自己说话并对其自身的音频做出响应。
原因:在机器人说话期间转录器未静音。
解决方案:当机器人开始说话时静音转录器:
# 在发送音频到输出之前
self.transcriber.mute()
# 音频播放完成后
self.transcriber.unmute()
问题:用户无法在机器人说话过程中打断。
原因:所有音频块一次性发送,而非进行速率限制。
解决方案:对音频块进行速率限制以匹配实时播放:
async for chunk in synthesis_result.chunk_generator:
start_time = time.time()
# 发送块
output_device.consume_nonblocking(chunk)
# 在发送下一个之前等待块的持续时间
processing_time = time.time() - start_time
await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
问题:内存使用量随时间增长。
原因:WebSocket 连接或 API 流未正确关闭。
解决方案:始终使用上下文管理器并进行清理:
try:
async with websockets.connect(url) as ws:
# 使用 websocket
pass
finally:
# 清理
await conversation.terminate()
await transcriber.terminate()
async def _run_loop(self):
while self.active:
try:
item = await self.input_queue.get()
await self.process(item)
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
# 不要使工作者崩溃,继续处理
async def terminate(self):
"""优雅地关闭所有工作者"""
self.active = False
# 停止所有工作者
self.transcriber.terminate()
self.agent.terminate()
self.synthesizer.terminate()
# 等待队列清空
await asyncio.sleep(0.5)
# 关闭连接
if self.websocket:
await self.websocket.close()
# 记录关键事件
logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")
logger.info(f"🤖 [AGENT] Generating response...")
logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")
logger.info(f"⚠️ [INTERRUPT] User interrupted bot")
# 跟踪指标
metrics.increment("transcriptions.count")
metrics.timing("agent.response_time", duration)
metrics.gauge("active_conversations", count)
# 为 API 调用实现速率限制
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 次调用/秒
async def call_api(self, data):
async with rate_limiter:
return await self.client.post(data)
# 生产者
async def producer(queue):
while True:
item = await generate_item()
queue.put_nowait(item)
# 消费者
async def consumer(queue):
while True:
item = await queue.get()
await process_item(item)
代替返回完整结果:
# ❌ 错误:等待整个响应
async def generate_response(prompt):
response = await openai.complete(prompt) # 5 秒
return response
# ✅ 正确:在块到达时流式传输
async def generate_response(prompt):
async for chunk in openai.complete(prompt, stream=True):
yield chunk # 在 0.1s、0.2s 等时间点生成
维护对话历史记录以提供上下文:
class Transcript:
event_logs: List[Message] = []
def add_human_message(self, text):
self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
def add_bot_message(self, text):
self.event_logs.append(Message(sender=Sender.BOT, text=text))
def to_openai_messages(self):
return [
{"role": "user" if msg.sender == Sender.HUMAN else "assistant",
"content": msg.text}
for msg in self.event_logs
]
async def test_transcriber():
transcriber = DeepgramTranscriber(config)
# 模拟音频输入
audio_chunk = b'\x00\x01\x02...'
transcriber.send_audio(audio_chunk)
# 检查输出
transcription = await transcriber.output_queue.get()
assert transcription.message == "expected text"
async def test_full_pipeline():
# 创建所有组件
conversation = create_test_conversation()
# 发送测试音频
conversation.receive_audio(test_audio_chunk)
# 等待响应
response = await wait_for_audio_output(timeout=5)
assert response is not None
async def test_interrupt():
conversation = create_test_conversation()
# 开始机器人说话
await conversation.agent.generate_response("Tell me a long story")
# 在响应过程中中断
await asyncio.sleep(1) # 让它说 1 秒
conversation.broadcast_interrupt()
# 验证转录中的部分消息
last_message = conversation.transcript.event_logs[-1]
assert last_message.text != full_expected_message
实现语音 AI 引擎时:
@websocket-patterns - 用于 WebSocket 实现细节@async-python - 用于 asyncio 和异步模式@streaming-apis - 用于流式 API 集成@audio-processing - 用于音频格式转换和处理@systematic-debugging - 用于调试复杂的异步流水线库:
asyncio - 异步编程websockets - WebSocket 客户端/服务器FastAPI - WebSocket 服务器框架pydub - 音频操作numpy - 音频数据处理API 提供商:
构建语音 AI 引擎需要:
关键见解:为了实现自然、实时的对话,所有内容都必须支持流式传输,并且所有内容都必须是可中断的。
每周安装数
155
代码仓库
GitHub 星标数
27.4K
首次出现时间
2026 年 1 月 27 日
安全审计
安装于
opencode143
gemini-cli136
codex133
github-copilot129
claude-code128
antigravity126
This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.
The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.
Use this skill when:
Every voice AI engine follows this pipeline:
Audio In → Transcriber → Agent → Synthesizer → Audio Out
(Worker 1) (Worker 2) (Worker 3)
Key Benefits:
Every worker follows this pattern:
class BaseWorker:
def __init__(self, input_queue, output_queue):
self.input_queue = input_queue # asyncio.Queue to consume from
self.output_queue = output_queue # asyncio.Queue to produce to
self.active = False
def start(self):
"""Start the worker's processing loop"""
self.active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
"""Main processing loop - runs forever until terminated"""
while self.active:
item = await self.input_queue.get() # Block until item arrives
await self.process(item) # Process the item
async def process(self, item):
"""Override this - does the actual work"""
raise NotImplementedError
def terminate(self):
"""Stop the worker"""
self.active = False
Purpose : Converts incoming audio chunks to text transcriptions
Interface Requirements :
class BaseTranscriber:
def __init__(self, transcriber_config):
self.input_queue = asyncio.Queue() # Audio chunks (bytes)
self.output_queue = asyncio.Queue() # Transcriptions
self.is_muted = False
def send_audio(self, chunk: bytes):
"""Client calls this to send audio"""
if not self.is_muted:
self.input_queue.put_nowait(chunk)
else:
# Send silence instead (prevents echo during bot speech)
self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))
def mute(self):
"""Called when bot starts speaking (prevents echo)"""
self.is_muted = True
def unmute(self):
"""Called when bot stops speaking"""
self.is_muted = False
Output Format :
class Transcription:
message: str # "Hello, how are you?"
confidence: float # 0.95
is_final: bool # True = complete sentence, False = partial
is_interrupt: bool # Set by TranscriptionsWorker
Supported Providers :
Critical Implementation Details :
asyncio.gather()Purpose : Processes user input and generates conversational responses
Interface Requirements :
class BaseAgent:
def __init__(self, agent_config):
self.input_queue = asyncio.Queue() # TranscriptionAgentInput
self.output_queue = asyncio.Queue() # AgentResponse
self.transcript = None # Conversation history
async def generate_response(self, human_input, is_interrupt, conversation_id):
"""Override this - returns AsyncGenerator of responses"""
raise NotImplementedError
Why Streaming Responses?
Supported Providers :
Critical Implementation Details :
Transcript objectAsyncGeneratorPurpose : Converts agent text responses to speech audio
Interface Requirements :
class BaseSynthesizer:
async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
"""
Returns a SynthesisResult containing:
- chunk_generator: AsyncGenerator that yields audio chunks
- get_message_up_to: Function to get partial text (for interrupts)
"""
raise NotImplementedError
SynthesisResult Structure :
class SynthesisResult:
chunk_generator: AsyncGenerator[ChunkResult, None]
get_message_up_to: Callable[[float], str] # seconds → partial text
class ChunkResult:
chunk: bytes # Raw PCM audio
is_last_chunk: bool
Supported Providers :
Critical Implementation Details :
get_message_up_to() for interrupt handlingPurpose : Sends synthesized audio back to the client
CRITICAL: Rate Limiting for Interrupts
async def send_speech_to_output(self, message, synthesis_result,
stop_event, seconds_per_chunk):
chunk_idx = 0
async for chunk_result in synthesis_result.chunk_generator:
# Check for interrupt
if stop_event.is_set():
logger.debug(f"Interrupted after {chunk_idx} chunks")
message_sent = synthesis_result.get_message_up_to(
chunk_idx * seconds_per_chunk
)
return message_sent, True # cut_off = True
start_time = time.time()
# Send chunk to output device
self.output_device.consume_nonblocking(chunk_result.chunk)
# CRITICAL: Wait for chunk to play before sending next one
# This is what makes interrupts work!
speech_length = seconds_per_chunk
processing_time = time.time() - start_time
await asyncio.sleep(max(speech_length - processing_time, 0))
chunk_idx += 1
return message, False # cut_off = False
Why Rate Limiting? Without rate limiting, all audio chunks would be sent immediately, which would:
By sending one chunk every N seconds:
The interrupt system is critical for natural conversations.
Scenario : Bot is saying "I think the weather will be nice today and tomorrow and—" when user interrupts with "Stop".
Step 1: User starts speaking
# TranscriptionsWorker detects new transcription while bot speaking
async def process(self, transcription):
if not self.conversation.is_human_speaking: # Bot was speaking!
# Broadcast interrupt to all in-flight events
interrupted = self.conversation.broadcast_interrupt()
transcription.is_interrupt = interrupted
Step 2: broadcast_interrupt() stops everything
def broadcast_interrupt(self):
num_interrupts = 0
# Interrupt all queued events
while True:
try:
interruptible_event = self.interruptible_events.get_nowait()
if interruptible_event.interrupt(): # Sets interruption_event
num_interrupts += 1
except queue.Empty:
break
# Cancel current tasks
self.agent.cancel_current_task() # Stop generating text
self.agent_responses_worker.cancel_current_task() # Stop synthesizing
return num_interrupts > 0
Step 3: SynthesisResultsWorker detects interrupt
async def send_speech_to_output(self, synthesis_result, stop_event, ...):
async for chunk_result in synthesis_result.chunk_generator:
# Check stop_event (this is the interruption_event)
if stop_event.is_set():
logger.debug("Interrupted! Stopping speech.")
# Calculate what was actually spoken
seconds_spoken = chunk_idx * seconds_per_chunk
partial_message = synthesis_result.get_message_up_to(seconds_spoken)
# e.g., "I think the weather will be nice today"
return partial_message, True # cut_off = True
Step 4: Agent updates history
if cut_off:
# Update conversation history with partial message
self.agent.update_last_bot_message_on_cut_off(message_sent)
# History now shows:
# Bot: "I think the weather will be nice today" (incomplete)
Every event in the pipeline is wrapped in an InterruptibleEvent:
class InterruptibleEvent:
def __init__(self, payload, is_interruptible=True):
self.payload = payload
self.is_interruptible = is_interruptible
self.interruption_event = threading.Event() # Initially not set
self.interrupted = False
def interrupt(self) -> bool:
"""Interrupt this event"""
if not self.is_interruptible:
return False
if not self.interrupted:
self.interruption_event.set() # Signal to stop!
self.interrupted = True
return True
return False
def is_interrupted(self) -> bool:
return self.interruption_event.is_set()
Support multiple providers with a factory pattern:
class VoiceHandler:
"""Multi-provider factory for voice components"""
def create_transcriber(self, agent_config: Dict):
"""Create transcriber based on transcriberProvider"""
provider = agent_config.get("transcriberProvider", "deepgram")
if provider == "deepgram":
return self._create_deepgram_transcriber(agent_config)
elif provider == "assemblyai":
return self._create_assemblyai_transcriber(agent_config)
elif provider == "azure":
return self._create_azure_transcriber(agent_config)
elif provider == "google":
return self._create_google_transcriber(agent_config)
else:
raise ValueError(f"Unknown transcriber provider: {provider}")
def create_agent(self, agent_config: Dict):
"""Create LLM agent based on llmProvider"""
provider = agent_config.get("llmProvider", "openai")
if provider == "openai":
return self._create_openai_agent(agent_config)
elif provider == "gemini":
return self._create_gemini_agent(agent_config)
else:
raise ValueError(f"Unknown LLM provider: {provider}")
def create_synthesizer(self, agent_config: Dict):
"""Create voice synthesizer based on voiceProvider"""
provider = agent_config.get("voiceProvider", "elevenlabs")
if provider == "elevenlabs":
return self._create_elevenlabs_synthesizer(agent_config)
elif provider == "azure":
return self._create_azure_synthesizer(agent_config)
elif provider == "google":
return self._create_google_synthesizer(agent_config)
elif provider == "polly":
return self._create_polly_synthesizer(agent_config)
elif provider == "playht":
return self._create_playht_synthesizer(agent_config)
else:
raise ValueError(f"Unknown voice provider: {provider}")
Voice AI engines typically use WebSocket for bidirectional audio streaming:
@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Create voice components
voice_handler = VoiceHandler()
transcriber = voice_handler.create_transcriber(agent_config)
agent = voice_handler.create_agent(agent_config)
synthesizer = voice_handler.create_synthesizer(agent_config)
# Create output device
output_device = WebsocketOutputDevice(
ws=websocket,
sampling_rate=16000,
audio_encoding=AudioEncoding.LINEAR16
)
# Create conversation orchestrator
conversation = StreamingConversation(
output_device=output_device,
transcriber=transcriber,
agent=agent,
synthesizer=synthesizer
)
# Start all workers
await conversation.start()
try:
# Receive audio from client
async for message in websocket.iter_bytes():
conversation.receive_audio(message)
except WebSocketDisconnect:
logger.info("Client disconnected")
finally:
await conversation.terminate()
Problem : Bot's audio jumps or cuts off mid-response.
Cause : Sending text to synthesizer in small chunks causes multiple TTS calls.
Solution : Buffer the entire LLM response before sending to synthesizer:
# ❌ Bad: Yields sentence-by-sentence
async for sentence in llm_stream:
yield GeneratedResponse(message=BaseMessage(text=sentence))
# ✅ Good: Buffer entire response
full_response = ""
async for chunk in llm_stream:
full_response += chunk
yield GeneratedResponse(message=BaseMessage(text=full_response))
Problem : Bot hears itself speaking and responds to its own audio.
Cause : Transcriber not muted during bot speech.
Solution : Mute transcriber when bot starts speaking:
# Before sending audio to output
self.transcriber.mute()
# After audio playback complete
self.transcriber.unmute()
Problem : User can't interrupt bot mid-sentence.
Cause : All audio chunks sent at once instead of rate-limited.
Solution : Rate-limit audio chunks to match real-time playback:
async for chunk in synthesis_result.chunk_generator:
start_time = time.time()
# Send chunk
output_device.consume_nonblocking(chunk)
# Wait for chunk duration before sending next
processing_time = time.time() - start_time
await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))
Problem : Memory usage grows over time.
Cause : WebSocket connections or API streams not properly closed.
Solution : Always use context managers and cleanup:
try:
async with websockets.connect(url) as ws:
# Use websocket
pass
finally:
# Cleanup
await conversation.terminate()
await transcriber.terminate()
async def _run_loop(self):
while self.active:
try:
item = await self.input_queue.get()
await self.process(item)
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
# Don't crash the worker, continue processing
async def terminate(self):
"""Gracefully shut down all workers"""
self.active = False
# Stop all workers
self.transcriber.terminate()
self.agent.terminate()
self.synthesizer.terminate()
# Wait for queues to drain
await asyncio.sleep(0.5)
# Close connections
if self.websocket:
await self.websocket.close()
# Log key events
logger.info(f"🎤 [TRANSCRIBER] Received: '{transcription.message}'")
logger.info(f"🤖 [AGENT] Generating response...")
logger.info(f"🔊 [SYNTHESIZER] Synthesizing {len(text)} characters")
logger.info(f"⚠️ [INTERRUPT] User interrupted bot")
# Track metrics
metrics.increment("transcriptions.count")
metrics.timing("agent.response_time", duration)
metrics.gauge("active_conversations", count)
# Implement rate limiting for API calls
from aiolimiter import AsyncLimiter
rate_limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 calls/second
async def call_api(self, data):
async with rate_limiter:
return await self.client.post(data)
# Producer
async def producer(queue):
while True:
item = await generate_item()
queue.put_nowait(item)
# Consumer
async def consumer(queue):
while True:
item = await queue.get()
await process_item(item)
Instead of returning complete results:
# ❌ Bad: Wait for entire response
async def generate_response(prompt):
response = await openai.complete(prompt) # 5 seconds
return response
# ✅ Good: Stream chunks as they arrive
async def generate_response(prompt):
async for chunk in openai.complete(prompt, stream=True):
yield chunk # Yield after 0.1s, 0.2s, etc.
Maintain conversation history for context:
class Transcript:
event_logs: List[Message] = []
def add_human_message(self, text):
self.event_logs.append(Message(sender=Sender.HUMAN, text=text))
def add_bot_message(self, text):
self.event_logs.append(Message(sender=Sender.BOT, text=text))
def to_openai_messages(self):
return [
{"role": "user" if msg.sender == Sender.HUMAN else "assistant",
"content": msg.text}
for msg in self.event_logs
]
async def test_transcriber():
transcriber = DeepgramTranscriber(config)
# Mock audio input
audio_chunk = b'\x00\x01\x02...'
transcriber.send_audio(audio_chunk)
# Check output
transcription = await transcriber.output_queue.get()
assert transcription.message == "expected text"
async def test_full_pipeline():
# Create all components
conversation = create_test_conversation()
# Send test audio
conversation.receive_audio(test_audio_chunk)
# Wait for response
response = await wait_for_audio_output(timeout=5)
assert response is not None
async def test_interrupt():
conversation = create_test_conversation()
# Start bot speaking
await conversation.agent.generate_response("Tell me a long story")
# Interrupt mid-response
await asyncio.sleep(1) # Let it speak for 1 second
conversation.broadcast_interrupt()
# Verify partial message in transcript
last_message = conversation.transcript.event_logs[-1]
assert last_message.text != full_expected_message
When implementing a voice AI engine:
@websocket-patterns - For WebSocket implementation details@async-python - For asyncio and async patterns@streaming-apis - For streaming API integration@audio-processing - For audio format conversion and processing@systematic-debugging - For debugging complex async pipelinesLibraries :
asyncio - Async programmingwebsockets - WebSocket client/serverFastAPI - WebSocket server frameworkpydub - Audio manipulationnumpy - Audio data processingAPI Providers :
Building a voice AI engine requires:
The key insight : Everything must stream and everything must be interruptible for natural, real-time conversations.
Weekly Installs
155
Repository
GitHub Stars
27.4K
First Seen
Jan 27, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode143
gemini-cli136
codex133
github-copilot129
claude-code128
antigravity126
AI 代码实施计划编写技能 | 自动化开发任务分解与 TDD 流程规划工具
50,900 周安装