Fix formatting issues in community environments - Applied black, isort, trailing whitespace, and end-of-file fixes - Remaining flake8 issues (unused imports, line length) noted for future cleanup

This commit is contained in:
Shannon Sands 2025-05-23 13:33:13 +10:00
parent e85a170c34
commit 7f2e1a4f90
34 changed files with 1560 additions and 821 deletions

View file

@ -1,3 +1,3 @@
# Engine
This directory contains the backend services and AI agents for the Pebble (Stone UI) project.
This directory contains the backend services and AI agents for the Pebble (Stone UI) project.

View file

@ -1,19 +1,19 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from dotenv import load_dotenv
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
class MyAgent(Agent):
def __init__(self) -> None:
@ -28,17 +28,18 @@ class MyAgent(Agent):
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
mcp_servers=[
mcp.MCPServerHTTP(
url="https://mcp.gumloop.com/gcalendar/cY3bcaFS1qNdeVBnj0XIhnP4FEp2%3Aae99858e75594251bea9e05f32bb99b3",
timeout=5,
client_session_timeout_seconds=5,
),
]
mcp.MCPServerHTTP(
url="https://mcp.gumloop.com/gcalendar/cY3bcaFS1qNdeVBnj0XIhnP4FEp2%3Aae99858e75594251bea9e05f32bb99b3",
timeout=5,
client_session_timeout_seconds=5,
),
],
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -54,5 +55,4 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="mcp-agent"))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, agent_name="mcp-agent"))

View file

@ -1,55 +1,84 @@
import os
import logging
import asyncio
import logging
import os
from dotenv import load_dotenv
from livekit.agents import mcp
from livekit.agents.llm import ChatContext, ChatMessage, LLM # Removed ChatRole as using strings
from livekit.agents.llm import ( # Removed ChatRole as using strings
LLM,
ChatContext,
ChatMessage,
)
from livekit.plugins import openai
logger = logging.getLogger("text-perplexity-agent")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
# --- Configure Perplexity MCP Server (as a function to allow async context management) ---
def get_perplexity_mcp_server():
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"perplexity",
"perplexity-ask",
"dist",
"index.js",
)
)
if not os.path.exists(mcp_script_path):
logger.error(f"❌ MCP script not found at {mcp_script_path}. Make sure you\'ve run \'npm install && npm run build\' in the server directory.")
logger.error(
f"❌ MCP script not found at {mcp_script_path}. Make sure you've run 'npm install && npm run build' in the server directory."
)
logger.warning("⚠️ Perplexity tools will be unavailable.")
return None
else:
logger.info(f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}")
logger.info(
f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}"
)
return mcp.MCPServerStdio(
name="PerplexityStdioServer",
name="PerplexityStdioServer",
params={
"command": "node",
"args": [mcp_script_path],
"cwd": os.path.dirname(mcp_script_path),
"env": {"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""},
"client_session_timeout_seconds": 30
"env": {
"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""
},
"client_session_timeout_seconds": 30,
},
client_session_timeout_seconds=30
client_session_timeout_seconds=30,
)
else:
logger.warning("⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable.")
logger.warning(
"⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable."
)
return None
async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | None, initial_question: str = None):
async def run_chat_loop(
llm_instance: LLM,
p_mcp_server: mcp.MCPServerStdio | None,
initial_question: str = None,
):
"""Runs a text-based chat loop with the LLM and Perplexity tool."""
chat_context = ChatContext()
system_prompt = \
"""
You are a specialized assistant for answering general knowledge questions, providing explanations,
system_prompt = """
You are a specialized assistant for answering general knowledge questions, providing explanations,
and performing web searches using the 'perplexity_ask' tool.
When the user asks for information, facts, or to 'search the web', you are the designated expert.
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
with 'role': 'user' and 'content' set to the user's question.
For example: {"messages": [{"role": "user", "content": "What is the capital of France?"}]}
You do not have other tools. Do not try to delegate.
@ -59,10 +88,10 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
async def process_question(question: str):
logger.info(f"You: {question}")
chat_context.add_message(role="user", content=question)
full_response = ""
logger.info("Agent:")
mcp_servers_to_use = []
if p_mcp_server:
# MCPServerStdio is managed by async with in main, so it should be running
@ -73,7 +102,9 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
logger.info(f"DEBUG: Type of chat_context: {type(chat_context)}")
logger.info(f"DEBUG: Attributes of chat_context: {dir(chat_context)}")
# Pass messages from ChatContext and the list of mcp_servers
async for chunk in llm_instance.chat(messages=chat_context.messages, mcp_servers=mcp_servers_to_use):
async for chunk in llm_instance.chat(
messages=chat_context.messages, mcp_servers=mcp_servers_to_use
):
if chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
full_response += chunk.delta.content
@ -84,7 +115,7 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
print(f"Sorry, I encountered an error: {e}")
return
print()
print()
chat_context.add_message(role="assistant", content=full_response)
if initial_question:
@ -102,46 +133,67 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
except KeyboardInterrupt:
logger.info("\nExiting chat due to interrupt.")
break
except EOFError:
except EOFError:
logger.info("\nExiting chat due to EOF.")
break
async def main():
"""Main entrypoint for the text-based Perplexity agent."""
logger.info("Starting Text-based Perplexity Agent...")
llm_instance = openai.LLM(model="gpt-4o")
p_mcp_server_instance = get_perplexity_mcp_server()
test_question = "What is the capital of France?"
if p_mcp_server_instance:
try:
# await p_mcp_server_instance.connect() # Connect to MCP server -> Removed
logger.info("Perplexity MCP Server instance created. Will be used by LLM if needed.")
await run_chat_loop(llm_instance, p_mcp_server_instance, initial_question=test_question)
logger.info(
"Perplexity MCP Server instance created. Will be used by LLM if needed."
)
await run_chat_loop(
llm_instance, p_mcp_server_instance, initial_question=test_question
)
finally:
logger.info("Closing Perplexity MCP server resources.") # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
logger.info(
"Closing Perplexity MCP server resources."
) # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
else:
logger.warning("Running chat loop without Perplexity MCP server.")
await run_chat_loop(llm_instance, None, initial_question=test_question)
logger.info("Text-based Perplexity Agent finished.")
if __name__ == "__main__":
if not os.environ.get("PERPLEXITY_API_KEY"):
logger.error("🔴 PERPLEXITY_API_KEY is not set in the environment.")
logger.error("🔴 Please set it in your .env file for the agent to function correctly with Perplexity.")
logger.error(
"🔴 Please set it in your .env file for the agent to function correctly with Perplexity."
)
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"perplexity",
"perplexity-ask",
"dist",
"index.js",
)
)
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Critical: MCP script not found at {mcp_script_path}.")
logger.error("❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory).")
logger.error(
"❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory)."
)
exit(1)
asyncio.run(main())
asyncio.run(main())

View file

@ -1,55 +1,84 @@
import os
import logging
import asyncio
import logging
import os
from dotenv import load_dotenv
from livekit.agents import mcp
from livekit.agents.llm import ChatContext, ChatMessage, LLM # Removed ChatRole as using strings
from livekit.agents.llm import ( # Removed ChatRole as using strings
LLM,
ChatContext,
ChatMessage,
)
from livekit.plugins import openai
logger = logging.getLogger("text-perplexity-agent")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
# --- Configure Perplexity MCP Server (as a function to allow async context management) ---
def get_perplexity_mcp_server():
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"perplexity",
"perplexity-ask",
"dist",
"index.js",
)
)
if not os.path.exists(mcp_script_path):
logger.error(f"❌ MCP script not found at {mcp_script_path}. Make sure you\'ve run \'npm install && npm run build\' in the server directory.")
logger.error(
f"❌ MCP script not found at {mcp_script_path}. Make sure you've run 'npm install && npm run build' in the server directory."
)
logger.warning("⚠️ Perplexity tools will be unavailable.")
return None
else:
logger.info(f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}")
logger.info(
f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}"
)
return mcp.MCPServerStdio(
name="PerplexityStdioServer",
name="PerplexityStdioServer",
params={
"command": "node",
"args": [mcp_script_path],
"cwd": os.path.dirname(mcp_script_path),
"env": {"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""},
"client_session_timeout_seconds": 30
"env": {
"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""
},
"client_session_timeout_seconds": 30,
},
client_session_timeout_seconds=30
client_session_timeout_seconds=30,
)
else:
logger.warning("⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable.")
logger.warning(
"⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable."
)
return None
async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | None, initial_question: str = None):
async def run_chat_loop(
llm_instance: LLM,
p_mcp_server: mcp.MCPServerStdio | None,
initial_question: str = None,
):
"""Runs a text-based chat loop with the LLM and Perplexity tool."""
chat_context = ChatContext()
system_prompt = \
"""
You are a specialized assistant for answering general knowledge questions, providing explanations,
system_prompt = """
You are a specialized assistant for answering general knowledge questions, providing explanations,
and performing web searches using the 'perplexity_ask' tool.
When the user asks for information, facts, or to 'search the web', you are the designated expert.
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
with 'role': 'user' and 'content' set to the user's question.
For example: {"messages": [{"role": "user", "content": "What is the capital of France?"}]}
You do not have other tools. Do not try to delegate.
@ -59,10 +88,10 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
async def process_question(question: str):
logger.info(f"You: {question}")
chat_context.add_message(role="user", content=question)
full_response = ""
logger.info("Agent:")
mcp_servers_to_use = []
if p_mcp_server:
# MCPServerStdio is managed by async with in main, so it should be running
@ -73,7 +102,9 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
logger.info(f"DEBUG: Type of chat_context: {type(chat_context)}")
logger.info(f"DEBUG: Attributes of chat_context: {dir(chat_context)}")
# Pass messages from ChatContext and the list of mcp_servers
async for chunk in llm_instance.chat(messages=chat_context.messages, mcp_servers=mcp_servers_to_use):
async for chunk in llm_instance.chat(
messages=chat_context.messages, mcp_servers=mcp_servers_to_use
):
if chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
full_response += chunk.delta.content
@ -84,7 +115,7 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
print(f"Sorry, I encountered an error: {e}")
return
print()
print()
chat_context.add_message(role="assistant", content=full_response)
if initial_question:
@ -102,46 +133,67 @@ async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | No
except KeyboardInterrupt:
logger.info("\nExiting chat due to interrupt.")
break
except EOFError:
except EOFError:
logger.info("\nExiting chat due to EOF.")
break
async def main():
"""Main entrypoint for the text-based Perplexity agent."""
logger.info("Starting Text-based Perplexity Agent...")
llm_instance = openai.LLM(model="gpt-4o")
p_mcp_server_instance = get_perplexity_mcp_server()
test_question = "What is the capital of France?"
if p_mcp_server_instance:
try:
# await p_mcp_server_instance.connect() # Connect to MCP server -> Removed
logger.info("Perplexity MCP Server instance created. Will be used by LLM if needed.")
await run_chat_loop(llm_instance, p_mcp_server_instance, initial_question=test_question)
logger.info(
"Perplexity MCP Server instance created. Will be used by LLM if needed."
)
await run_chat_loop(
llm_instance, p_mcp_server_instance, initial_question=test_question
)
finally:
logger.info("Closing Perplexity MCP server resources.") # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
logger.info(
"Closing Perplexity MCP server resources."
) # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
else:
logger.warning("Running chat loop without Perplexity MCP server.")
await run_chat_loop(llm_instance, None, initial_question=test_question)
logger.info("Text-based Perplexity Agent finished.")
if __name__ == "__main__":
if not os.environ.get("PERPLEXITY_API_KEY"):
logger.error("🔴 PERPLEXITY_API_KEY is not set in the environment.")
logger.error("🔴 Please set it in your .env file for the agent to function correctly with Perplexity.")
logger.error(
"🔴 Please set it in your .env file for the agent to function correctly with Perplexity."
)
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"perplexity",
"perplexity-ask",
"dist",
"index.js",
)
)
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Critical: MCP script not found at {mcp_script_path}.")
logger.error("❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory).")
logger.error(
"❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory)."
)
exit(1)
asyncio.run(main())
asyncio.run(main())

View file

@ -1,50 +1,70 @@
import logging # Added logging
import os
import logging # Added logging
from typing import List, Optional # Add Optional & List import
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli # Changed import
from livekit.agents import mcp # Corrected import for mcp
from livekit.agents.llm import ChatChunk, function_tool # Added function_tool for delegate_to_router_agent if it were defined here
from livekit.agents import mcp # Corrected import for mcp
from livekit.agents import tts # Corrected import for tts module
from livekit.agents import ( # Changed import; Add ChatContext & RunContext import
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
)
from livekit.agents.llm import ( # Added function_tool for delegate_to_router_agent if it were defined here
ChatChunk,
function_tool,
)
from livekit.agents.types import NOT_GIVEN # Corrected import for NOT_GIVEN
from livekit.agents.utils.misc import is_given # Corrected import for is_given
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import deepgram, openai, silero
# Removed: from mcp_client import MCPServerStdio
# Removed: from mcp_client.agent_tools import MCPToolsIntegration
from livekit.plugins.turn_detector.multilingual import MultilingualModel # Added from official example
from livekit.agents import ChatContext, RunContext # Add ChatContext & RunContext import
from typing import Optional, List # Add Optional & List import
from livekit.agents import tts # Corrected import for tts module
from livekit.agents.types import NOT_GIVEN # Corrected import for NOT_GIVEN
from livekit.agents.utils.misc import is_given # Corrected import for is_given
from livekit.plugins.turn_detector.multilingual import ( # Added from official example
MultilingualModel,
)
logger = logging.getLogger("agent-math-official") # Added logger
logger = logging.getLogger("agent-math-official") # Added logger
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "tools", "mcp", "calc", "calc_server.py"
)
)
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'calc', 'calc_server.py'
))
class CalculatorAgent(Agent):
"""A LiveKit agent that uses MCP tools from one or more MCP servers."""
def __init__(self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
mcp_servers: Optional[list[mcp.MCPServer]] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None): # Added tools parameter
final_instructions = instructions if instructions is not None else \
"""
You are a specialist Math assistant. Your expertise is in solving mathematical problems,
def __init__(
self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
mcp_servers: Optional[list[mcp.MCPServer]] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None,
): # Added tools parameter
final_instructions = (
instructions
if instructions is not None
else """
You are a specialist Math assistant. Your expertise is in solving mathematical problems,
performing calculations, arithmetic, and answering questions about numbers.
You have two calculation tools: 'multiply' and 'add'.
When your current math task is complete, or if the user asks for something not related to math,
you MUST use the 'delegate_to_router_agent' tool to return to the main assistant.
"""
)
# Combine passed tools with any class-defined tools if necessary (none here for now)
all_tools = tools if tools is not None else []
super().__init__(
instructions=final_instructions,
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=[
@ -54,7 +74,7 @@ class CalculatorAgent(Agent):
)
# MODIFIED: Removed chat_ctx=chat_ctx argument
],
tools=all_tools # Pass the tools to the parent Agent class
tools=all_tools, # Pass the tools to the parent Agent class
)
# MCP tools are automatically integrated by AgentSession if mcp_servers is configured.
# No need for MCPToolsIntegration or manually adding tools here.
@ -64,7 +84,12 @@ class CalculatorAgent(Agent):
tool_call_detected = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected:
if (
isinstance(chunk, ChatChunk)
and chunk.delta
and chunk.delta.tool_calls
and not tool_call_detected
):
tool_call_detected = True
# Example: if self.tts: self.session.say("Working on the math problem.")
# Currently, Math agent does not say anything here.
@ -75,17 +100,18 @@ class CalculatorAgent(Agent):
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit agent application."""
await ctx.connect() # Connect earlier as in official example
await ctx.connect() # Connect earlier as in official example
# Directly configure AgentSession with mcp_servers
session = AgentSession(
vad=silero.VAD.load(), # Redundant if agent has it, but official example does this
stt=deepgram.STT(model="nova-2", language="en-US"), # Consistent with agent
llm=openai.LLM(model="gpt-4o"), # Consistent with agent
tts=openai.TTS(voice="alloy"), # Consistent with agent
turn_detection=MultilingualModel(), # Consistent with agent
vad=silero.VAD.load(), # Redundant if agent has it, but official example does this
stt=deepgram.STT(model="nova-2", language="en-US"), # Consistent with agent
llm=openai.LLM(model="gpt-4o"), # Consistent with agent
tts=openai.TTS(voice="alloy"), # Consistent with agent
turn_detection=MultilingualModel(), # Consistent with agent
)
# Instantiate the agent
@ -93,5 +119,6 @@ async def entrypoint(ctx: JobContext):
await session.start(agent=agent, room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,30 +1,41 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from typing import List, Optional
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("calendar-agent")
class CalendarAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
def __init__(
self, chat_ctx: ChatContext, tools: Optional[List[function_tool]] = None
) -> None:
final_instructions = (
"You are a Calendar specialist. You can help with scheduling, creating, modifying, or querying calendar events, appointments, and meetings. "
"Use tools like 'create_calendar_event', 'get_calendar_events', etc., when available. "
"If your task is complete or the user asks for something outside your calendar capabilities (e.g., math, web search), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
"You are a Calendar specialist. You can help with scheduling, creating, modifying, or querying calendar events, appointments, and meetings. "
"Use tools like 'create_calendar_event', 'get_calendar_events', etc., when available. "
"If your task is complete or the user asks for something outside your calendar capabilities (e.g., math, web search), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
@ -43,12 +54,13 @@ class CalendarAgent(Agent):
chat_ctx=chat_ctx,
allow_interruptions=True,
tools=all_tools,
mcp_servers=mcp_servers_list
mcp_servers=mcp_servers_list,
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -64,4 +76,4 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,45 +1,57 @@
import logging
import os
from dotenv import load_dotenv
import random
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, RunContext, function_tool
from typing import List, Optional
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
import random
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
from livekit import api
logger = logging.getLogger("caller-agent")
class CallerAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
def __init__(
self, chat_ctx: ChatContext, tools: Optional[List[function_tool]] = None
) -> None:
final_instructions = (
"You are a Caller specialist. Your primary function is to initiate phone calls. " +
"If the user asks to call someone, use the 'make_phone_call' tool. " +
"Currently, you can only call a predefined contact (Sam at +16467085301). Confirm with the user if they want to call this specific contact. " +
"If your task is complete or the user asks for something outside your calling capabilities (e.g., math, web search), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
agent_tools = [self.make_phone_call]
"You are a Caller specialist. Your primary function is to initiate phone calls. "
+ "If the user asks to call someone, use the 'make_phone_call' tool. "
+ "Currently, you can only call a predefined contact (Sam at +16467085301). Confirm with the user if they want to call this specific contact. "
+ "If your task is complete or the user asks for something outside your calling capabilities (e.g., math, web search), "
+ "you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
agent_tools = [self.make_phone_call]
all_tools = agent_tools + (tools if tools is not None else [])
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
tools=all_tools
tools=all_tools,
)
self.lkapi = api.LiveKitAPI()
async def on_enter(self):
self.session.generate_reply()
@function_tool
async def make_phone_call(self, context: RunContext, phone_number: str):
@ -49,13 +61,13 @@ class CallerAgent(Agent):
phone_number: The phone number to call.
"""
await self.lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name="my-telephony-agent",
room=f"outbound-{''.join(str(random.randint(0, 9)) for _ in range(10))}",
metadata='{"phone_number": "+16467085301"}' #HARDCODED
api.CreateAgentDispatchRequest(
agent_name="my-telephony-agent",
room=f"outbound-{''.join(str(random.randint(0, 9)) for _ in range(10))}",
metadata='{"phone_number": "+16467085301"}', # HARDCODED
)
)
)
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -72,5 +84,4 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="mcp-agent"))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, agent_name="mcp-agent"))

View file

@ -1,30 +1,41 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from typing import List, Optional
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("contact-agent")
class ContactAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
def __init__(
self, chat_ctx: ChatContext, tools: Optional[List[function_tool]] = None
) -> None:
final_instructions = (
"You are a Contact specialist. You can help find contact information such as phone numbers, email addresses, or other details for individuals. " +
"You can also add new contacts or update existing ones if tools like 'get_contact_details', 'add_contact', 'update_contact' are available. " +
"If your task is complete or the user asks for something outside your contact management capabilities (e.g., math, web search), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
"You are a Contact specialist. You can help find contact information such as phone numbers, email addresses, or other details for individuals. "
+ "You can also add new contacts or update existing ones if tools like 'get_contact_details', 'add_contact', 'update_contact' are available. "
+ "If your task is complete or the user asks for something outside your contact management capabilities (e.g., math, web search), "
+ "you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
@ -38,19 +49,22 @@ class ContactAgent(Agent):
)
)
else:
logger.warning("ZAPIER_CONTACT_MCP_URL not set. Contact agent may not have all its tools.")
logger.warning(
"ZAPIER_CONTACT_MCP_URL not set. Contact agent may not have all its tools."
)
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=mcp_servers_list,
tools=all_tools
tools=all_tools,
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -66,4 +80,4 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,31 +1,42 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from typing import List, Optional
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("gmail-agent")
class GmailAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
def __init__(
self, chat_ctx: ChatContext, tools: Optional[List[function_tool]] = None
) -> None:
final_instructions = (
"You are a Gmail specialist. You can manage emails by reading, searching, sending, and updating them (e.g., marking as read/unread, moving to folders). " +
"Use tools like 'read_emails', 'send_email', and 'update_email' to interact with Gmail. " +
"If sending an email, you might need a recipient; you know Gabin (gabin.fay@gmail.com). " +
"If your task is complete or the user asks for something outside your email management capabilities (e.g., math, calendar), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
"You are a Gmail specialist. You can manage emails by reading, searching, sending, and updating them (e.g., marking as read/unread, moving to folders). "
+ "Use tools like 'read_emails', 'send_email', and 'update_email' to interact with Gmail. "
+ "If sending an email, you might need a recipient; you know Gabin (gabin.fay@gmail.com). "
+ "If your task is complete or the user asks for something outside your email management capabilities (e.g., math, calendar), "
+ "you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
@ -39,19 +50,22 @@ class GmailAgent(Agent):
)
)
else:
logger.warning("GUMLOOP_GMAIL_MCP_URL not set. Gmail agent may not have all its tools.")
logger.warning(
"GUMLOOP_GMAIL_MCP_URL not set. Gmail agent may not have all its tools."
)
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=mcp_servers_list,
tools=all_tools
tools=all_tools,
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -67,4 +81,4 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,106 +1,141 @@
import os
import logging
import asyncio
from dotenv import load_dotenv
import logging
import os
from typing import List, Optional
from livekit.agents import JobContext, WorkerOptions, cli, mcp, function_tool, RunContext
from dotenv import load_dotenv
from livekit.agents import (
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.agents.llm import ChatChunk, ChatContext, ChatMessage
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import openai, silero, deepgram
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
logger = logging.getLogger("go-agent-livekit")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Load environment variables from .env file
load_dotenv()
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
GOOGLE_MAPS_API_KEY = os.environ.get('GOOGLE_MAPS_API_KEY')
DEEPGRAM_API_KEY = os.environ.get('DEEPGRAM_API_KEY')
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
GOOGLE_MAPS_API_KEY = os.environ.get("GOOGLE_MAPS_API_KEY")
DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY")
if not OPENAI_API_KEY:
logger.critical("🔴 CRITICAL: OPENAI_API_KEY not found. OpenAI plugins will fail.")
if not GOOGLE_MAPS_API_KEY:
logger.critical("🔴 CRITICAL: GOOGLE_MAPS_API_KEY not found. Google Maps MCP server will fail.")
logger.critical(
"🔴 CRITICAL: GOOGLE_MAPS_API_KEY not found. Google Maps MCP server will fail."
)
if not DEEPGRAM_API_KEY:
logger.warning("⚠️ WARNING: DEEPGRAM_API_KEY not found. Deepgram STT plugin may have issues.")
logger.warning(
"⚠️ WARNING: DEEPGRAM_API_KEY not found. Deepgram STT plugin may have issues."
)
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'google-maps', 'dist', 'index.js'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"google-maps",
"dist",
"index.js",
)
)
if not os.path.exists(mcp_script_path):
logger.critical(f"CRITICAL: Google Maps MCP script not found at {mcp_script_path}. Agent cannot start tools.")
logger.critical(
f"CRITICAL: Google Maps MCP script not found at {mcp_script_path}. Agent cannot start tools."
)
class GoAgent(Agent):
"""A LiveKit agent specialized in location-based queries using Google Maps via MCP."""
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None):
def __init__(
self, chat_ctx: ChatContext, tools: Optional[List[function_tool]] = None
):
final_instructions = (
"You are the Go Agent, specialized in providing location-based information using Google Maps. "
"You MUST use the available tools to fulfill user queries about locations, directions, distances, and places.\n\n"
"RULE FOR LOCATION REQUESTS: When a user asks about finding a location, getting directions, calculating distances, "
"or information about a place, you MUST use the appropriate Google Maps tool.\n\n"
"Key tools available to you (provided by Google Maps MCP):\n"
"- maps_geocode: Convert an address to coordinates (e.g., maps_geocode address=\"1600 Amphitheatre Parkway, Mountain View, CA\")\n"
"- maps_reverse_geocode: Convert coordinates to an address (e.g., maps_reverse_geocode latitude=37.422 longitude=-122.084)\n"
"- maps_search_places: Search for places (e.g., maps_search_places query=\"restaurants in London\")\n"
"- maps_place_details: Get details for a place_id (e.g., maps_place_details place_id=\"ChIJN1t_tDeuEmsRUsoyG83frY4\")\n"
"- maps_directions: Get directions (e.g., maps_directions origin=\"San Francisco\" destination=\"Los Angeles\" mode=\"driving\")\n"
"- maps_distance_matrix: Calculate distances (e.g., maps_distance_matrix origins=\"New York,Washington D.C.\" destinations=\"Boston,Philadelphia\" mode=\"...\")\n\n"
"RULE FOR TOOL RESULTS: After you receive results from a tool, you MUST analyze the data and provide a clear, "
"helpful response. Format addresses and directions in a readable way, extract key information from place details, "
"and always provide context for coordinates and distances.\n\n"
"If a tool call fails or returns no relevant information, explain clearly to the user and suggest alternatives. "
"If your task is complete or the user asks for something outside your location/maps capabilities (e.g., math, calendar), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
"You are the Go Agent, specialized in providing location-based information using Google Maps. "
"You MUST use the available tools to fulfill user queries about locations, directions, distances, and places.\n\n"
"RULE FOR LOCATION REQUESTS: When a user asks about finding a location, getting directions, calculating distances, "
"or information about a place, you MUST use the appropriate Google Maps tool.\n\n"
"Key tools available to you (provided by Google Maps MCP):\n"
'- maps_geocode: Convert an address to coordinates (e.g., maps_geocode address="1600 Amphitheatre Parkway, Mountain View, CA")\n'
"- maps_reverse_geocode: Convert coordinates to an address (e.g., maps_reverse_geocode latitude=37.422 longitude=-122.084)\n"
'- maps_search_places: Search for places (e.g., maps_search_places query="restaurants in London")\n'
'- maps_place_details: Get details for a place_id (e.g., maps_place_details place_id="ChIJN1t_tDeuEmsRUsoyG83frY4")\n'
'- maps_directions: Get directions (e.g., maps_directions origin="San Francisco" destination="Los Angeles" mode="driving")\n'
'- maps_distance_matrix: Calculate distances (e.g., maps_distance_matrix origins="New York,Washington D.C." destinations="Boston,Philadelphia" mode="...")\n\n'
"RULE FOR TOOL RESULTS: After you receive results from a tool, you MUST analyze the data and provide a clear, "
"helpful response. Format addresses and directions in a readable way, extract key information from place details, "
"and always provide context for coordinates and distances.\n\n"
"If a tool call fails or returns no relevant information, explain clearly to the user and suggest alternatives. "
"If your task is complete or the user asks for something outside your location/maps capabilities (e.g., math, calendar), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
if GOOGLE_MAPS_API_KEY and os.path.exists(mcp_script_path):
mcp_servers_list.append(
mcp.MCPServerStdio(
command='node',
command="node",
args=[mcp_script_path],
env={'GOOGLE_MAPS_API_KEY': GOOGLE_MAPS_API_KEY}
env={"GOOGLE_MAPS_API_KEY": GOOGLE_MAPS_API_KEY},
)
)
else:
logger.warning("Google Maps MCP server not configured due to missing API key or script path.")
logger.warning(
"Google Maps MCP server not configured due to missing API key or script path."
)
super().__init__(
instructions=final_instructions,
allow_interruptions=True,
chat_ctx=chat_ctx,
mcp_servers=mcp_servers_list,
tools=all_tools
mcp_servers=mcp_servers_list,
tools=all_tools,
)
if not self.llm:
logger.error("GoAgentLivekit initialized, but LLM might be missing if API key was not provided to plugin.")
logger.error(
"GoAgentLivekit initialized, but LLM might be missing if API key was not provided to plugin."
)
async def llm_node(self, chat_ctx: ChatContext, tools: list, model_settings: dict):
"""Override the llm_node to log tool calls or add custom behavior."""
tool_call_detected_this_turn = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected_this_turn:
if (
isinstance(chunk, ChatChunk)
and chunk.delta
and chunk.delta.tool_calls
and not tool_call_detected_this_turn
):
tool_call_detected_this_turn = True
logger.info("GoAgentLivekit: LLM is attempting to call a tool. Informing user.")
if hasattr(self, 'session') and self.session is not None:
logger.info(
"GoAgentLivekit: LLM is attempting to call a tool. Informing user."
)
if hasattr(self, "session") and self.session is not None:
self.session.say("Okay, let me check that for you.")
else:
logger.warning("Agent has no session to 'say' through during tool call detection.")
logger.warning(
"Agent has no session to 'say' through during tool call detection."
)
yield chunk
async def on_enter(self):
@ -108,20 +143,23 @@ class GoAgent(Agent):
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit Go Agent application."""
logger.info(f"Go Agent LiveKit starting entrypoint for Job ID: {getattr(ctx.job, 'id', 'unknown')}")
logger.info(
f"Go Agent LiveKit starting entrypoint for Job ID: {getattr(ctx.job, 'id', 'unknown')}"
)
await ctx.connect()
logger.info(f"Successfully connected to LiveKit room: {ctx.room.name if ctx.room else 'N/A'}")
logger.info(
f"Successfully connected to LiveKit room: {ctx.room.name if ctx.room else 'N/A'}"
)
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-2", language="en-US", api_key=os.environ.get('DEEPGRAM_API_KEY')),
stt=deepgram.STT(
model="nova-2", language="en-US", api_key=os.environ.get("DEEPGRAM_API_KEY")
),
llm=openai.LLM(model="gpt-4o", api_key=OPENAI_API_KEY),
tts=openai.TTS(voice="alloy", api_key=OPENAI_API_KEY),
turn_detection=MultilingualModel(),
@ -131,13 +169,18 @@ async def entrypoint(ctx: JobContext):
agent = GoAgent(chat_ctx=session._chat_ctx)
logger.info("GoAgentLivekit instantiated.")
logger.info(f"Starting AgentSession with agent for room: {ctx.room.name if ctx.room else 'N/A'}")
logger.info(
f"Starting AgentSession with agent for room: {ctx.room.name if ctx.room else 'N/A'}"
)
await session.start(agent=agent, room=ctx.room)
logger.info("AgentSession started. GoAgentLivekit is now running.")
if __name__ == "__main__":
logger.info("Starting Go Agent LiveKit application via cli.run_app.")
if not os.environ.get('DEEPGRAM_API_KEY'):
logger.warning("DEEPGRAM_API_KEY not found in environment. STT plugin may fail.")
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
if not os.environ.get("DEEPGRAM_API_KEY"):
logger.warning(
"DEEPGRAM_API_KEY not found in environment. STT plugin may fail."
)
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,27 +1,33 @@
import logging
import os
import sys
import logging
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli, function_tool, RunContext
from livekit.agents import mcp
from livekit.agents.llm import ChatChunk
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import deepgram, openai, silero
from typing import List, Optional
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
from livekit.agents import tts
from dotenv import load_dotenv
from livekit.agents import (
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
tts,
)
from livekit.agents.llm import ChatChunk
from livekit.agents.types import NOT_GIVEN
from livekit.agents.utils.misc import is_given
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from pydantic import BaseModel, Field
from livekit.agents import ChatContext
logger = logging.getLogger("agent-spotify-official")
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
_this_file_dir = os.path.dirname(os.path.abspath(__file__))
_stone_ui_dir = os.path.abspath(os.path.join(_this_file_dir, '..', '..'))
_stone_ui_dir = os.path.abspath(os.path.join(_this_file_dir, "..", ".."))
if _stone_ui_dir not in sys.path:
sys.path.insert(0, _stone_ui_dir)
@ -29,12 +35,21 @@ if _stone_ui_dir not in sys.path:
from engine.config import settings
# --- Spotify Tool Input Models (Based on spotify-mcp-server README) ---
class PlayMusicInput(BaseModel):
uri: Optional[str] = Field(None, description="Spotify URI of the item to play (e.g., spotify:track:...). Overrides type and id.")
type: Optional[str] = Field(None, description="Type of item to play (track, album, artist, playlist)")
uri: Optional[str] = Field(
None,
description="Spotify URI of the item to play (e.g., spotify:track:...). Overrides type and id.",
)
type: Optional[str] = Field(
None, description="Type of item to play (track, album, artist, playlist)"
)
id: Optional[str] = Field(None, description="Spotify ID of the item to play")
deviceId: Optional[str] = Field(None, description="ID of the device to play on (optional)")
deviceId: Optional[str] = Field(
None, description="ID of the device to play on (optional)"
)
# Add other input models here as needed (e.g., SearchSpotifyInput, PlaylistInput etc.)
@ -43,39 +58,58 @@ spotify_mcp_server = None
# Define the path to the BUILT MCP server script
# IMPORTANT: Ensure the MCP server is built (npm run build) and authenticated (npm run auth)
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'spotify', 'build', 'index.js'
))
spotify_config_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'spotify', 'spotify-config.json'
))
mcp_script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "tools", "mcp", "spotify", "build", "index.js"
)
)
spotify_config_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"..",
"tools",
"mcp",
"spotify",
"spotify-config.json",
)
)
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Spotify MCP script not found at {mcp_script_path}. Make sure you've run 'npm install && npm run build' in the server directory.")
logger.error(
f"❌ Spotify MCP script not found at {mcp_script_path}. Make sure you've run 'npm install && npm run build' in the server directory."
)
logger.warning("⚠️ Spotify tools will be unavailable.")
elif not os.path.exists(spotify_config_path):
logger.error(f"❌ Spotify config file not found at {spotify_config_path}. Make sure you've run 'npm run auth' after setting credentials.")
logger.error(
f"❌ Spotify config file not found at {spotify_config_path}. Make sure you've run 'npm run auth' after setting credentials."
)
logger.warning("⚠️ Spotify tools will likely be unavailable due to missing auth.")
else:
# Check if config contains tokens (basic check)
try:
with open(spotify_config_path, 'r') as f:
with open(spotify_config_path, "r") as f:
config_content = f.read()
if 'accessToken' not in config_content or 'refreshToken' not in config_content or 'run-npm auth' in config_content:
logger.warning(f"⚠️ Spotify config file at {spotify_config_path} seems incomplete or unauthenticated. Run 'npm run auth'.")
# We still configure the server, but it might fail at runtime
if (
"accessToken" not in config_content
or "refreshToken" not in config_content
or "run-npm auth" in config_content
):
logger.warning(
f"⚠️ Spotify config file at {spotify_config_path} seems incomplete or unauthenticated. Run 'npm run auth'."
)
# We still configure the server, but it might fail at runtime
else:
logger.info("✅ Spotify config file seems authenticated.")
logger.info("✅ Spotify config file seems authenticated.")
except Exception as e:
logger.error(f"Error reading Spotify config {spotify_config_path}: {e}")
logger.info(f"📂 Configuring Spotify MCP server with script: {mcp_script_path}")
spotify_mcp_server = mcp.MCPServerStdio(
'node', # Command to run the server
args=[mcp_script_path], # Argument is the script path
"node", # Command to run the server
args=[mcp_script_path], # Argument is the script path
# No specific env vars needed here, reads from spotify-config.json
env={},
client_session_timeout_seconds=5*60
client_session_timeout_seconds=5 * 60,
)
logger.info("✅ Spotify MCP Server configured (runtime auth check still needed).")
@ -83,25 +117,31 @@ else:
class ListenAgent(Agent):
"""A LiveKit agent that uses MCP tools from one or more MCP servers."""
def __init__(self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None):
def __init__(
self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None,
):
final_instructions = instructions if instructions is not None else \
("You are the Listen Agent, specialized in controlling Spotify music playback. " +
"You MUST use the available tools to fulfill user requests related to Spotify. " +
"Available tools include 'playMusic', and potentially others like 'searchSpotify', 'pausePlayback', etc.\n\n" +
"RULE FOR MUSIC REQUESTS: When a user asks to play music, search for music, control playback (pause, skip, etc.), " +
"manage playlists, or ask what's playing, you MUST use the appropriate Spotify tool (like 'playMusic'). " +
"Be precise with parameters like 'uri' or 'type' and 'id'. Infer parameters from the user query. If essential info is missing (like what to play), ask the user.\n\n" +
"RULE FOR TOOL RESULTS: After a tool is successfully executed, you MUST confirm the action to the user (e.g., 'Okay, playing \'Bohemian Rhapsody\' now.'). " +
"If a tool fails or returns an error, inform the user clearly. " +
"If your task is complete or the user asks for something outside your Spotify capabilities (e.g., math, calendar), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
final_instructions = (
instructions
if instructions is not None
else (
"You are the Listen Agent, specialized in controlling Spotify music playback. "
+ "You MUST use the available tools to fulfill user requests related to Spotify. "
+ "Available tools include 'playMusic', and potentially others like 'searchSpotify', 'pausePlayback', etc.\n\n"
+ "RULE FOR MUSIC REQUESTS: When a user asks to play music, search for music, control playback (pause, skip, etc.), "
+ "manage playlists, or ask what's playing, you MUST use the appropriate Spotify tool (like 'playMusic'). "
+ "Be precise with parameters like 'uri' or 'type' and 'id'. Infer parameters from the user query. If essential info is missing (like what to play), ask the user.\n\n"
+ "RULE FOR TOOL RESULTS: After a tool is successfully executed, you MUST confirm the action to the user (e.g., 'Okay, playing 'Bohemian Rhapsody' now.'). "
+ "If a tool fails or returns an error, inform the user clearly. "
+ "If your task is complete or the user asks for something outside your Spotify capabilities (e.g., math, calendar), "
+ "you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
)
all_tools = tools if tools is not None else []
active_mcp_servers = []
@ -109,11 +149,11 @@ class ListenAgent(Agent):
active_mcp_servers.append(spotify_mcp_server)
super().__init__(
instructions=final_instructions,
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=active_mcp_servers, # MODIFIED: Pass filtered list
tools=all_tools # Pass the tools to the parent Agent class
mcp_servers=active_mcp_servers, # MODIFIED: Pass filtered list
tools=all_tools, # Pass the tools to the parent Agent class
)
async def llm_node(self, chat_ctx, tools, model_settings):
@ -121,18 +161,24 @@ class ListenAgent(Agent):
tool_call_detected = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected:
if (
isinstance(chunk, ChatChunk)
and chunk.delta
and chunk.delta.tool_calls
and not tool_call_detected
):
tool_call_detected = True
# Use self.session.say() to make the agent speak, only if TTS is configured
if self.tts: # Check if the agent has a TTS instance
self.session.say("Sure, I'll check that for you.")
yield chunk
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit agent application."""
await ctx.connect()
@ -148,5 +194,6 @@ async def entrypoint(ctx: JobContext):
agent = ListenAgent(chat_ctx=session._chat_ctx)
await session.start(agent=agent, room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -1,33 +1,32 @@
import asyncio
import logging
import os
from typing import List, Dict, Any, Annotated
from pathlib import Path
from typing import Annotated, Any, Dict, List
import aiohttp
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent,
AgentSession,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
Agent,
AgentSession
)
from livekit import rtc, api
from livekit.plugins import deepgram, openai, silero
from mem0 import AsyncMemoryClient
# Load environment variables
load_dotenv(dotenv_path=Path(__file__).parent.parent.parent / '.env')
load_dotenv(dotenv_path=Path(__file__).parent.parent.parent / ".env")
# Configure logging
logger = logging.getLogger("memory-assistant")
logger.setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Define a global user ID for simplicity
@ -36,9 +35,16 @@ USER_ID = "voice_user"
# Initialize Mem0 memory client
mem0 = AsyncMemoryClient()
async def _enrich_with_memory(last_user_msg: llm.ChatMessage, chat_ctx_to_modify: llm.ChatContext):
async def _enrich_with_memory(
last_user_msg: llm.ChatMessage, chat_ctx_to_modify: llm.ChatContext
):
"""Add memories and Augment chat context with relevant memories"""
if not last_user_msg or not last_user_msg.text_content or not last_user_msg.text_content.strip():
if (
not last_user_msg
or not last_user_msg.text_content
or not last_user_msg.text_content.strip()
):
logger.info("No valid last user message content to process for memory.")
return
@ -49,11 +55,12 @@ async def _enrich_with_memory(last_user_msg: llm.ChatMessage, chat_ctx_to_modify
logger.info("User message content is empty after getting text_content.")
return
logger.info(f"[Mem0] Attempting to add memory for USER_ID '{USER_ID}': '{content_str}'")
logger.info(
f"[Mem0] Attempting to add memory for USER_ID '{USER_ID}': '{content_str}'"
)
try:
add_response = await mem0.add(
[{"role": "user", "content": content_str}],
user_id=USER_ID
[{"role": "user", "content": content_str}], user_id=USER_ID
)
logger.info(f"[Mem0] Successfully added memory. Response: {add_response}")
except Exception as e:
@ -61,53 +68,61 @@ async def _enrich_with_memory(last_user_msg: llm.ChatMessage, chat_ctx_to_modify
# Decide if we should return or continue to search with potentially stale memory
# For now, we'll continue to search.
logger.info(f"[Mem0] Attempting to search memories for USER_ID '{USER_ID}' with query: '{content_str}'")
logger.info(
f"[Mem0] Attempting to search memories for USER_ID '{USER_ID}' with query: '{content_str}'"
)
results = []
try:
results = await mem0.search(
content_str,
user_id=USER_ID,
)
logger.info(f"[Mem0] Search complete. Found {len(results)} results: {results}")
logger.info(
f"[Mem0] Search complete. Found {len(results)} results: {results}"
)
except Exception as e:
logger.error(f"[Mem0] Error searching memory: {e}", exc_info=True)
if results:
memories_text = ' '.join([result["memory"] for result in results if result.get("memory")])
memories_text = " ".join(
[result["memory"] for result in results if result.get("memory")]
)
if memories_text.strip():
logger.info(f"Enriching with memory: {memories_text}")
# Create the RAG message. Ensure content is a list of ChatContent (string is fine).
rag_msg_content = f"Relevant Memory from past interactions: {memories_text}\\nUser's current query is below."
rag_msg = llm.ChatMessage(role="system", content=[rag_msg_content])
# Insert RAG message before the last user message in the context's items list
inserted = False
# Access items via the .items property
target_items_list = chat_ctx_to_modify.items
target_items_list = chat_ctx_to_modify.items
for i in range(len(target_items_list) - 1, -1, -1):
if target_items_list[i] is last_user_msg: # Check object identity
if target_items_list[i] is last_user_msg: # Check object identity
target_items_list.insert(i, rag_msg)
inserted = True
logger.info(f"Inserted RAG message at index {i} in .items list")
break
if not inserted:
logger.warning("Could not find last user message by identity in .items list. Appending RAG message.")
logger.warning(
"Could not find last user message by identity in .items list. Appending RAG message."
)
if target_items_list and target_items_list[-1] is last_user_msg:
target_items_list.insert(len(target_items_list)-1, rag_msg)
else:
target_items_list.append(rag_msg)
target_items_list.insert(len(target_items_list) - 1, rag_msg)
else:
target_items_list.append(rag_msg)
except Exception as e:
logger.error(f"Error during memory enrichment: {e}", exc_info=True)
class MemoryAgent(Agent):
def __init__(self, chat_ctx: llm.ChatContext):
super().__init__(
chat_ctx=chat_ctx,
instructions="You are a helpful voice assistant that can remember past interactions."
instructions="You are a helpful voice assistant that can remember past interactions.",
)
# System prompt is now managed by the chat_ctx passed to super().__init__
@ -117,7 +132,7 @@ class MemoryAgent(Agent):
# Say initial greeting
await self.session.say(
"Hello! I'm George. Can I help you plan an upcoming trip? ",
allow_interruptions=True
allow_interruptions=True,
)
# Start the main interaction loop
self.session.generate_reply()
@ -125,10 +140,18 @@ class MemoryAgent(Agent):
except Exception as e:
logger.error(f"Error in MemoryAgent.on_enter: {e}", exc_info=True)
async def on_user_turn_completed(self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage):
logger.info(f"MemoryAgent.on_user_turn_completed called with new_message: '{new_message.text_content}'")
async def on_user_turn_completed(
self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
):
logger.info(
f"MemoryAgent.on_user_turn_completed called with new_message: '{new_message.text_content}'"
)
if not new_message or not new_message.content or not new_message.text_content.strip():
if (
not new_message
or not new_message.content
or not new_message.text_content.strip()
):
logger.info("No valid new_message content for memory enrichment.")
return
@ -137,34 +160,40 @@ class MemoryAgent(Agent):
# so _enrich_with_memory can potentially place the RAG message *before* it.
# The AgentActivity will use this modified turn_ctx for the LLM call.
# It will also separately add the new_message to the agent's main context.
# Let's make a working copy if direct modification isn't intended for the passed turn_ctx,
# though the name temp_mutable_chat_ctx in AgentActivity suggests it's okay.
# For safety and clarity in _enrich_with_memory, we'll operate on turn_ctx.
# Add the new user message to the context that will be enriched
turn_ctx.items.append(new_message) # new_message is already part of the main context by AgentActivity
# but for _enrich_with_memory to find it (as last_user_msg)
# and insert RAG before it in *this specific context copy*, it needs to be here.
# AgentActivity also adds this new_message to the agent's _chat_ctx separately.
logger.info(f"Context before enrichment (with new_message added): {turn_ctx.items}")
# Add the new user message to the context that will be enriched
turn_ctx.items.append(
new_message
) # new_message is already part of the main context by AgentActivity
# but for _enrich_with_memory to find it (as last_user_msg)
# and insert RAG before it in *this specific context copy*, it needs to be here.
# AgentActivity also adds this new_message to the agent's _chat_ctx separately.
logger.info(
f"Context before enrichment (with new_message added): {turn_ctx.items}"
)
# Enrich the context (which now includes new_message) with memories
# _enrich_with_memory will find new_message as the last user message
# and insert the RAG system message just before it in turn_ctx.items
await _enrich_with_memory(new_message, turn_ctx)
logger.info(f"Context after enrichment: {turn_ctx.items}")
# No need to call self.update_chat_ctx() here.
# The AgentActivity will use the modified turn_ctx for the LLM.
def prewarm_process(proc: JobProcess):
logger.info("Prewarming VAD model.")
# Preload silero VAD in memory to speed up session start
proc.userdata["vad"] = silero.VAD.load()
logger.info("VAD model prewarmed.")
async def entrypoint(ctx: JobContext):
logger.info("Agent entrypoint started.")
try:
@ -173,17 +202,15 @@ async def entrypoint(ctx: JobContext):
# Define initial system context for the LLM
initial_ctx = llm.ChatContext()
system_prompt_text = (
"""
system_prompt_text = """
You are a helpful voice assistant.
You are a travel guide named George and will help the user to plan a travel trip of their dreams.
You should help the user plan for various adventures like work retreats, family vacations or solo backpacking trips.
You are a travel guide named George and will help the user to plan a travel trip of their dreams.
You should help the user plan for various adventures like work retreats, family vacations or solo backpacking trips.
You should be careful to not suggest anything that would be dangerous, illegal or inappropriate.
You can remember past interactions and use them to inform your answers.
Use semantic memory retrieval to provide contextually relevant responses.
Use semantic memory retrieval to provide contextually relevant responses.
When relevant memory is provided, use it to enhance your response.
"""
)
initial_ctx.add_message(role="system", content=system_prompt_text)
logger.info("Initial system context defined.")
@ -194,7 +221,7 @@ async def entrypoint(ctx: JobContext):
vad_model = silero.VAD.load()
else:
logger.info("Using prewarmed VAD model.")
custom_agent = MemoryAgent(chat_ctx=initial_ctx)
# AgentSession constructor does NOT take 'agent'
@ -205,7 +232,7 @@ async def entrypoint(ctx: JobContext):
tts=openai.TTS(voice="alloy"),
)
logger.info("AgentSession created.")
# Agent is passed to session.start()
await session.start(agent=custom_agent, room=ctx.room)
logger.info("Agent session started with MemoryAgent.")
@ -213,8 +240,13 @@ async def entrypoint(ctx: JobContext):
except Exception as e:
logger.error(f"Error in agent entrypoint: {e}", exc_info=True)
# Run the application
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm_process,
agent_name="mem0-voice-agent")) # Consistent agent name
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm_process,
agent_name="mem0-voice-agent",
)
) # Consistent agent name

View file

@ -1,37 +1,22 @@
import sys # Import sys for sys.exit
import logging
import os
import sys # Import sys for sys.exit
from pathlib import Path
from dotenv import load_dotenv
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
logger = logging.getLogger("stone-agent")
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
WorkerOptions,
cli,
mcp,
RunContext,
function_tool,
from typing import ( # Ensure List and Optional are imported for tool type hints
List,
Optional,
)
from livekit.agents.voice.agent import ModelSettings # Import ModelSettings
from livekit.agents.llm import (
ChatRole,
LLM,
ChatMessage
)
from livekit.plugins import deepgram, openai, silero, anthropic
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import List, Optional # Ensure List and Optional are imported for tool type hints
# Import the original FunctionAgents from the official agent files
# These files should be in the same directory as router_agent.py
@ -43,6 +28,22 @@ from contact_agent import ContactAgent
from gmail_agent import GmailAgent
from go_agent import GoAgent
from listen_agent import ListenAgent
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
function_tool,
mcp,
)
from livekit.agents.llm import LLM, ChatMessage, ChatRole
from livekit.agents.voice.agent import ModelSettings # Import ModelSettings
from livekit.plugins import anthropic, deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
# from mem_agent import MemoryAgent
logger = logging.getLogger("router-agent")
@ -51,8 +52,12 @@ load_dotenv()
# Determine the absolute path for server scripts relative to this file
_current_dir = os.path.dirname(os.path.abspath(__file__))
@function_tool
async def delegate_to_router_agent(context: RunContext, original_query: str = "User wants to talk about something else."):
async def delegate_to_router_agent(
context: RunContext,
original_query: str = "User wants to talk about something else.",
):
"""
Call this function to delegate the conversation back to the main RouterAgent.
This is used when your current task is complete, or the user asks for functionality
@ -60,20 +65,31 @@ async def delegate_to_router_agent(context: RunContext, original_query: str = "U
Args:
original_query: A brief description of why the delegation is happening or the user's last relevant query.
"""
logger.info(f"Specialist Agent: Delegating back to RouterAgent. Reason/Query: '{original_query}'")
logger.info(
f"Specialist Agent: Delegating back to RouterAgent. Reason/Query: '{original_query}'"
)
# Try to access _chat_ctx via context.session, as context.agent was problematic
if not hasattr(context, 'session') or context.session is None:
logger.error("delegate_to_router_agent: RunContext does not have a valid 'session' attribute.")
if not hasattr(context, "session") or context.session is None:
logger.error(
"delegate_to_router_agent: RunContext does not have a valid 'session' attribute."
)
# This is a critical failure for context propagation.
# Depending on desired behavior, could raise an error or attempt a recovery (though recovery is hard here).
# For now, we'll let it fail if it tries to access _chat_ctx on a None session,
# For now, we'll let it fail if it tries to access _chat_ctx on a None session,
# or re-raise a more specific error.
raise AttributeError("RunContext is missing the session attribute, cannot retrieve ChatContext.")
raise AttributeError(
"RunContext is missing the session attribute, cannot retrieve ChatContext."
)
return (
RouterAgent(chat_ctx=context.session._chat_ctx),
"Okay, let me switch you back to the main assistant.",
)
return RouterAgent(chat_ctx=context.session._chat_ctx), "Okay, let me switch you back to the main assistant."
class RouterAgent(Agent):
"""Routes user queries to specialized agents."""
def __init__(self, chat_ctx: ChatContext):
super().__init__(
instructions="""
@ -101,7 +117,7 @@ class RouterAgent(Agent):
If uncertain, you can ask one clarifying question to determine the correct agent, but prefer to route directly if possible.
""",
allow_interruptions=True,
chat_ctx=chat_ctx
chat_ctx=chat_ctx,
)
async def on_enter(self):
@ -109,7 +125,6 @@ class RouterAgent(Agent):
logger.info("RouterAgent entered. Waiting for user query.")
self.session.generate_reply()
@function_tool
async def delegate_to_math_agent(self, query: str):
"""
@ -117,11 +132,13 @@ class RouterAgent(Agent):
Args:
query: The user's original voice query that is mathematical in nature.
"""
logger.info(f"RouterAgent: Delegating to MathSpecialistAgent for query: '{query}'")
logger.info(
f"RouterAgent: Delegating to MathSpecialistAgent for query: '{query}'"
)
# Pass the delegate_to_router_agent tool to the CalculatorAgent
math_agent = CalculatorAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return math_agent, "Okay, I'll connect you with my math specialist for that."
@ -132,13 +149,18 @@ class RouterAgent(Agent):
Args:
query: The user's original voice query.
"""
logger.info(f"RouterAgent: Delegating to AskAgent (for perplexity tasks) for query: '{query}'")
logger.info(
f"RouterAgent: Delegating to AskAgent (for perplexity tasks) for query: '{query}'"
)
try:
perplexity_agent = AskAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return (
perplexity_agent,
"Alright, let me get my knowledge expert to help with that question.",
)
return perplexity_agent, "Alright, let me get my knowledge expert to help with that question."
except AttributeError as e:
logger.error(f"Unexpected AttributeError: {e}")
raise
@ -153,7 +175,7 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to CalendarAgent for query: '{query}'")
calendar_agent = CalendarAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return calendar_agent, "Okay, let me check your calendar."
@ -167,7 +189,7 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to CallerAgent for query: '{query}'")
caller_agent = CallerAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return caller_agent, "Sure, I can try to make that call for you."
@ -181,7 +203,7 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to ContactAgent for query: '{query}'")
contact_agent = ContactAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return contact_agent, "Let me look up that contact information for you."
@ -195,7 +217,7 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to GmailAgent for query: '{query}'")
gmail_agent = GmailAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return gmail_agent, "Okay, I'll check your emails."
@ -209,7 +231,7 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to GoAgent for query: '{query}'")
go_agent = GoAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return go_agent, "Let me get my navigation expert for that."
@ -223,10 +245,11 @@ class RouterAgent(Agent):
logger.info(f"RouterAgent: Delegating to ListenAgent for query: '{query}'")
listen_agent = ListenAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
tools=[delegate_to_router_agent], # Pass the tool
)
return listen_agent, "Okay, let's get some music playing."
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the multi-agent LiveKit application."""
await ctx.connect()
@ -237,21 +260,27 @@ async def entrypoint(ctx: JobContext):
stt=openai.STT(model="gpt-4o-mini-transcribe", detect_language=True),
tts=openai.TTS(voice="alloy", model="tts-1-hd"),
llm=openai.LLM(model="gpt-4o"),
turn_detection=MultilingualModel()
turn_detection=MultilingualModel(),
)
logger.info(
"AgentSession configured. MCP servers will be managed by individual specialist agents."
)
logger.info("AgentSession configured. MCP servers will be managed by individual specialist agents.")
initial_agent = RouterAgent(chat_ctx=session._chat_ctx)
await session.start(agent=initial_agent, room=ctx.room)
logger.info("RouterAgent session started.")
if __name__ == "__main__":
# Setup basic logging if running directly
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
try:
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
except SystemExit: # Allow sys.exit() to pass through without logging as critical
except SystemExit: # Allow sys.exit() to pass through without logging as critical
raise
except Exception as e:
logger.critical(f"Unhandled exception at top level: {e}", exc_info=True)
sys.exit(1) # Ensure exit with error code
sys.exit(1) # Ensure exit with error code

View file

@ -1,15 +1,24 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
RunContext,
WorkerOptions,
cli,
get_job_context,
mcp,
)
from livekit.agents.llm import function_tool
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
from livekit.agents.llm import function_tool
from livekit import api, rtc
from livekit.agents import get_job_context
from livekit.agents import RunContext
# Add this function definition anywhere
async def hangup_call():
@ -17,7 +26,7 @@ async def hangup_call():
if ctx is None:
# Not running in a job context
return
await ctx.api.room.delete_room(
api.DeleteRoomRequest(
room=ctx.room.name,
@ -25,12 +34,13 @@ async def hangup_call():
)
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
class MyAgent(Agent):
def __init__(self, chat_ctx: ChatContext) -> None:
@ -39,7 +49,7 @@ class MyAgent(Agent):
"You can have phone calls. The interface is voice-based: "
"accept spoken user queries and respond with synthesized speech."
),
chat_ctx=chat_ctx
chat_ctx=chat_ctx,
)
@function_tool
@ -65,6 +75,7 @@ class MyAgent(Agent):
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
@ -79,10 +90,11 @@ async def entrypoint(ctx: JobContext):
await session.start(agent=MyAgent(chat_ctx=session._chat_ctx), room=ctx.room)
await session.generate_reply(
instructions="Greet the user and offer your assistance."
)
instructions="Greet the user and offer your assistance."
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent"))
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, agent_name="my-telephony-agent")
)

View file

@ -1,21 +1,29 @@
import json
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
from livekit import api
import json
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
WorkerOptions,
cli,
mcp,
)
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
load_dotenv(os.path.join(os.path.dirname(__file__), "..", "..", ".env"))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
class MyAgent(Agent):
def __init__(self, chat_ctx: ChatContext) -> None:
@ -24,16 +32,17 @@ class MyAgent(Agent):
"You can have phone calls. The interface is voice-based: "
"accept spoken user queries and respond with synthesized speech."
),
chat_ctx=chat_ctx
chat_ctx=chat_ctx,
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
# If a phone number was provided, then place an outbound call
# If a phone number was provided, then place an outbound call
# By having a condition like this, you can use the same agent for inbound/outbound telephony as well as web/mobile/etc.
dial_info = json.loads(ctx.job.metadata)
phone_number = dial_info["phone_number"]
@ -43,27 +52,28 @@ async def entrypoint(ctx: JobContext):
if phone_number is not None:
# The outbound call will be placed after this method is executed
try:
await ctx.api.sip.create_sip_participant(api.CreateSIPParticipantRequest(
# This ensures the participant joins the correct room
room_name=ctx.room.name,
# This is the outbound trunk ID to use (i.e. which phone number the call will come from)
# You can get this from LiveKit CLI with `lk sip outbound list`
sip_trunk_id=os.environ.get("TWILIO_SIP_TRUNK_ID"),
# The outbound phone number to dial and identity to use
sip_call_to=phone_number,
participant_identity=sip_participant_identity,
# This will wait until the call is answered before returning
wait_until_answered=True,
))
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
# This ensures the participant joins the correct room
room_name=ctx.room.name,
# This is the outbound trunk ID to use (i.e. which phone number the call will come from)
# You can get this from LiveKit CLI with `lk sip outbound list`
sip_trunk_id=os.environ.get("TWILIO_SIP_TRUNK_ID"),
# The outbound phone number to dial and identity to use
sip_call_to=phone_number,
participant_identity=sip_participant_identity,
# This will wait until the call is answered before returning
wait_until_answered=True,
)
)
print("call picked up successfully")
except api.TwirpError as e:
print(f"error creating SIP participant: {e.message}, "
f"SIP status: {e.metadata.get('sip_status_code')} "
f"{e.metadata.get('sip_status')}")
print(
f"error creating SIP participant: {e.message}, "
f"SIP status: {e.metadata.get('sip_status_code')} "
f"{e.metadata.get('sip_status')}"
)
ctx.shutdown()
session = AgentSession(
@ -83,5 +93,6 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent"))
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, agent_name="my-telephony-agent")
)

View file

@ -1 +1 @@
# This file makes Python treat the directory as a package.
# This file makes Python treat the directory as a package.

View file

@ -1,5 +1,6 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# Add any specific settings required by the application here
# For example:
@ -10,4 +11,5 @@ class Settings(BaseSettings):
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
settings = Settings()

View file

@ -71,4 +71,4 @@ allow-direct-references = true
# Assuming these contain the core library code. Adjust if needed.
packages = ["engine/agents", "engine/api", "engine/config", "engine/tools"] # Adjusted paths
# If main.py and __init__.py should also be part of the package, might need adjustment
# e.g., include '.' or specific file paths if they are not just scripts.
# e.g., include '.' or specific file paths if they are not just scripts.

View file

@ -19,4 +19,4 @@ export LIVEKIT_ROOM="stone-router-voice-agent"
# Start the agent
cd engine
python agents/stone_agent.py
python agents/stone_agent.py

View file

@ -73,4 +73,4 @@ To add a new MCP implementation:
git submodule add <repository-url> tools/mcp/<service-name>
```
2. Update this README.md file to include information about the new submodule
2. Update this README.md file to include information about the new submodule

View file

@ -1,18 +1,19 @@
from mcp.server.fastmcp import FastMCP
import logging
import os
from mcp.server.fastmcp import FastMCP
# Setup logging to a file
# Adjust the log file path if necessary, perhaps to be relative to this script's location
# or a dedicated logs directory.
log_file_path = os.path.join(os.path.dirname(__file__), 'math_server_official.log')
log_file_path = os.path.join(os.path.dirname(__file__), "math_server_official.log")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
handlers=[
logging.FileHandler(log_file_path, mode='w'), # 'w' to overwrite each run
logging.StreamHandler()
]
logging.FileHandler(log_file_path, mode="w"), # 'w' to overwrite each run
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
@ -20,17 +21,21 @@ mcp = FastMCP("Official Math Server 🚀")
@mcp.tool()
def add(a: int, b: int) -> int: # Changed return type hint to int
def add(a: int, b: int) -> int: # Changed return type hint to int
"""Add two numbers and return the result"""
logger.info(f"Executing add tool with a={a}, b={b}")
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int: # Changed return type hint to int
def multiply(a: int, b: int) -> int: # Changed return type hint to int
"""Multiply two numbers and return the result"""
logger.info(f"Executing multiply tool with a={a}, b={b}")
return a * b
if __name__ == "__main__":
logger.info(f"Starting Official MCP math_server.py with STDIO transport... Log file: {log_file_path}")
mcp.run(transport="stdio") # Ensure stdio transport is used as in server_stdio.py
logger.info(
f"Starting Official MCP math_server.py with STDIO transport... Log file: {log_file_path}"
)
mcp.run(transport="stdio") # Ensure stdio transport is used as in server_stdio.py

View file

@ -22,4 +22,4 @@ WORKDIR /app
RUN npm ci --ignore-scripts --omit-dev
ENTRYPOINT ["node", "dist/index.js"]
ENTRYPOINT ["node", "dist/index.js"]

View file

@ -51,4 +51,4 @@ The integration is implemented in `agents/stone_agent.py` within the `delegate_t
## Testing
Tests for the Google Maps integration are available in `tests/ai/test_maps_integration.py`.
Tests for the Google Maps integration are available in `tests/ai/test_maps_integration.py`.

View file

@ -143,7 +143,7 @@ async function performChatCompletion(
model: model, // Model identifier passed as parameter
messages: messages,
// Additional parameters can be added here if required (e.g., max_tokens, temperature, etc.)
// See the Sonar API documentation for more details:
// See the Sonar API documentation for more details:
// https://docs.perplexity.ai/api-reference/chat-completions
};
@ -182,7 +182,7 @@ async function performChatCompletion(
throw new Error(`Failed to parse JSON response from Perplexity API: ${jsonError}`);
}
// Directly retrieve the main message content from the response
// Directly retrieve the main message content from the response
let messageContent = data.choices[0].message.content;
// If citations are provided, append them to the message content

View file

@ -7,7 +7,7 @@ A lightweight [Model Context Protocol (MCP)](https://modelcontextprotocol.io) se
<details>
<summary>Contents</summary>
- [Example Interactions](#example-interactions)
- [Tools](#tools)
- [Read Operations](#read-operations)