Reorganize community environments - Move lean_proof_env, router_env, and philosophical_rlaif_env.py to environments/community/ - Add comprehensive README for community environments - This organizes community-contributed environments into a dedicated community folder for better maintainability and discoverability

This commit is contained in:
Shannon Sands 2025-05-23 13:31:13 +10:00
parent 945ea30c3a
commit e85a170c34
53 changed files with 85 additions and 0 deletions

View file

@ -0,0 +1,58 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=(
"You can retrieve data via the MCP server. The interface is voice-based: "
"accept spoken user queries and respond with synthesized speech."
),
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
mcp_servers=[
mcp.MCPServerHTTP(
url="https://mcp.gumloop.com/gcalendar/cY3bcaFS1qNdeVBnj0XIhnP4FEp2%3Aae99858e75594251bea9e05f32bb99b3",
timeout=5,
client_session_timeout_seconds=5,
),
]
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
)
await session.start(agent=MyAgent(), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="mcp-agent"))

View file

@ -0,0 +1,147 @@
import os
import logging
import asyncio
from dotenv import load_dotenv
from livekit.agents import mcp
from livekit.agents.llm import ChatContext, ChatMessage, LLM # Removed ChatRole as using strings
from livekit.plugins import openai
logger = logging.getLogger("text-perplexity-agent")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
# --- Configure Perplexity MCP Server (as a function to allow async context management) ---
def get_perplexity_mcp_server():
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
if not os.path.exists(mcp_script_path):
logger.error(f"❌ MCP script not found at {mcp_script_path}. Make sure you\'ve run \'npm install && npm run build\' in the server directory.")
logger.warning("⚠️ Perplexity tools will be unavailable.")
return None
else:
logger.info(f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}")
return mcp.MCPServerStdio(
name="PerplexityStdioServer",
params={
"command": "node",
"args": [mcp_script_path],
"cwd": os.path.dirname(mcp_script_path),
"env": {"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""},
"client_session_timeout_seconds": 30
},
client_session_timeout_seconds=30
)
else:
logger.warning("⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable.")
return None
async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | None, initial_question: str = None):
"""Runs a text-based chat loop with the LLM and Perplexity tool."""
chat_context = ChatContext()
system_prompt = \
"""
You are a specialized assistant for answering general knowledge questions, providing explanations,
and performing web searches using the 'perplexity_ask' tool.
When the user asks for information, facts, or to 'search the web', you are the designated expert.
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
with 'role': 'user' and 'content' set to the user's question.
For example: {"messages": [{"role": "user", "content": "What is the capital of France?"}]}
You do not have other tools. Do not try to delegate.
"""
chat_context.add_message(role="system", content=system_prompt)
async def process_question(question: str):
logger.info(f"You: {question}")
chat_context.add_message(role="user", content=question)
full_response = ""
logger.info("Agent:")
mcp_servers_to_use = []
if p_mcp_server:
# MCPServerStdio is managed by async with in main, so it should be running
mcp_servers_to_use.append(p_mcp_server)
logger.info("Perplexity MCP Server is available for this query.")
try:
logger.info(f"DEBUG: Type of chat_context: {type(chat_context)}")
logger.info(f"DEBUG: Attributes of chat_context: {dir(chat_context)}")
# Pass messages from ChatContext and the list of mcp_servers
async for chunk in llm_instance.chat(messages=chat_context.messages, mcp_servers=mcp_servers_to_use):
if chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
full_response += chunk.delta.content
if chunk.delta.tool_calls:
logger.info(f"\n[Tool call detected: {chunk.delta.tool_calls}]")
except Exception as e:
logger.error(f"Error during LLM chat: {e}")
print(f"Sorry, I encountered an error: {e}")
return
print()
chat_context.add_message(role="assistant", content=full_response)
if initial_question:
await process_question(initial_question)
while True:
try:
user_input = await asyncio.to_thread(input, "You: ")
if user_input.lower() in ["exit", "quit"]:
logger.info("Exiting chat.")
break
if not user_input.strip():
continue
await process_question(user_input)
except KeyboardInterrupt:
logger.info("\nExiting chat due to interrupt.")
break
except EOFError:
logger.info("\nExiting chat due to EOF.")
break
async def main():
"""Main entrypoint for the text-based Perplexity agent."""
logger.info("Starting Text-based Perplexity Agent...")
llm_instance = openai.LLM(model="gpt-4o")
p_mcp_server_instance = get_perplexity_mcp_server()
test_question = "What is the capital of France?"
if p_mcp_server_instance:
try:
# await p_mcp_server_instance.connect() # Connect to MCP server -> Removed
logger.info("Perplexity MCP Server instance created. Will be used by LLM if needed.")
await run_chat_loop(llm_instance, p_mcp_server_instance, initial_question=test_question)
finally:
logger.info("Closing Perplexity MCP server resources.") # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
else:
logger.warning("Running chat loop without Perplexity MCP server.")
await run_chat_loop(llm_instance, None, initial_question=test_question)
logger.info("Text-based Perplexity Agent finished.")
if __name__ == "__main__":
if not os.environ.get("PERPLEXITY_API_KEY"):
logger.error("🔴 PERPLEXITY_API_KEY is not set in the environment.")
logger.error("🔴 Please set it in your .env file for the agent to function correctly with Perplexity.")
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Critical: MCP script not found at {mcp_script_path}.")
logger.error("❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory).")
exit(1)
asyncio.run(main())

View file

@ -0,0 +1,147 @@
import os
import logging
import asyncio
from dotenv import load_dotenv
from livekit.agents import mcp
from livekit.agents.llm import ChatContext, ChatMessage, LLM # Removed ChatRole as using strings
from livekit.plugins import openai
logger = logging.getLogger("text-perplexity-agent")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
# --- Configure Perplexity MCP Server (as a function to allow async context management) ---
def get_perplexity_mcp_server():
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
if not os.path.exists(mcp_script_path):
logger.error(f"❌ MCP script not found at {mcp_script_path}. Make sure you\'ve run \'npm install && npm run build\' in the server directory.")
logger.warning("⚠️ Perplexity tools will be unavailable.")
return None
else:
logger.info(f"📂 Configuring Perplexity MCP server with script: {mcp_script_path}")
return mcp.MCPServerStdio(
name="PerplexityStdioServer",
params={
"command": "node",
"args": [mcp_script_path],
"cwd": os.path.dirname(mcp_script_path),
"env": {"PERPLEXITY_API_KEY": os.environ.get("PERPLEXITY_API_KEY") or ""},
"client_session_timeout_seconds": 30
},
client_session_timeout_seconds=30
)
else:
logger.warning("⚠️ PERPLEXITY_API_KEY not set. Perplexity tools will be unavailable.")
return None
async def run_chat_loop(llm_instance: LLM, p_mcp_server: mcp.MCPServerStdio | None, initial_question: str = None):
"""Runs a text-based chat loop with the LLM and Perplexity tool."""
chat_context = ChatContext()
system_prompt = \
"""
You are a specialized assistant for answering general knowledge questions, providing explanations,
and performing web searches using the 'perplexity_ask' tool.
When the user asks for information, facts, or to 'search the web', you are the designated expert.
When calling the 'perplexity_ask' tool, ensure the 'messages' argument is an array containing a single object
with 'role': 'user' and 'content' set to the user's question.
For example: {"messages": [{"role": "user", "content": "What is the capital of France?"}]}
You do not have other tools. Do not try to delegate.
"""
chat_context.add_message(role="system", content=system_prompt)
async def process_question(question: str):
logger.info(f"You: {question}")
chat_context.add_message(role="user", content=question)
full_response = ""
logger.info("Agent:")
mcp_servers_to_use = []
if p_mcp_server:
# MCPServerStdio is managed by async with in main, so it should be running
mcp_servers_to_use.append(p_mcp_server)
logger.info("Perplexity MCP Server is available for this query.")
try:
logger.info(f"DEBUG: Type of chat_context: {type(chat_context)}")
logger.info(f"DEBUG: Attributes of chat_context: {dir(chat_context)}")
# Pass messages from ChatContext and the list of mcp_servers
async for chunk in llm_instance.chat(messages=chat_context.messages, mcp_servers=mcp_servers_to_use):
if chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
full_response += chunk.delta.content
if chunk.delta.tool_calls:
logger.info(f"\n[Tool call detected: {chunk.delta.tool_calls}]")
except Exception as e:
logger.error(f"Error during LLM chat: {e}")
print(f"Sorry, I encountered an error: {e}")
return
print()
chat_context.add_message(role="assistant", content=full_response)
if initial_question:
await process_question(initial_question)
while True:
try:
user_input = await asyncio.to_thread(input, "You: ")
if user_input.lower() in ["exit", "quit"]:
logger.info("Exiting chat.")
break
if not user_input.strip():
continue
await process_question(user_input)
except KeyboardInterrupt:
logger.info("\nExiting chat due to interrupt.")
break
except EOFError:
logger.info("\nExiting chat due to EOF.")
break
async def main():
"""Main entrypoint for the text-based Perplexity agent."""
logger.info("Starting Text-based Perplexity Agent...")
llm_instance = openai.LLM(model="gpt-4o")
p_mcp_server_instance = get_perplexity_mcp_server()
test_question = "What is the capital of France?"
if p_mcp_server_instance:
try:
# await p_mcp_server_instance.connect() # Connect to MCP server -> Removed
logger.info("Perplexity MCP Server instance created. Will be used by LLM if needed.")
await run_chat_loop(llm_instance, p_mcp_server_instance, initial_question=test_question)
finally:
logger.info("Closing Perplexity MCP server resources.") # Changed log message
await p_mcp_server_instance.aclose() # Close MCP server connection
else:
logger.warning("Running chat loop without Perplexity MCP server.")
await run_chat_loop(llm_instance, None, initial_question=test_question)
logger.info("Text-based Perplexity Agent finished.")
if __name__ == "__main__":
if not os.environ.get("PERPLEXITY_API_KEY"):
logger.error("🔴 PERPLEXITY_API_KEY is not set in the environment.")
logger.error("🔴 Please set it in your .env file for the agent to function correctly with Perplexity.")
if os.environ.get("PERPLEXITY_API_KEY"):
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'perplexity', 'perplexity-ask', 'dist', 'index.js'
))
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Critical: MCP script not found at {mcp_script_path}.")
logger.error("❌ The agent cannot use Perplexity tools. Please build the MCP server ('npm install && npm run build' in its directory).")
exit(1)
asyncio.run(main())

View file

@ -0,0 +1,97 @@
import os
import logging # Added logging
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli # Changed import
from livekit.agents import mcp # Corrected import for mcp
from livekit.agents.llm import ChatChunk, function_tool # Added function_tool for delegate_to_router_agent if it were defined here
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import deepgram, openai, silero
# Removed: from mcp_client import MCPServerStdio
# Removed: from mcp_client.agent_tools import MCPToolsIntegration
from livekit.plugins.turn_detector.multilingual import MultilingualModel # Added from official example
from livekit.agents import ChatContext, RunContext # Add ChatContext & RunContext import
from typing import Optional, List # Add Optional & List import
from livekit.agents import tts # Corrected import for tts module
from livekit.agents.types import NOT_GIVEN # Corrected import for NOT_GIVEN
from livekit.agents.utils.misc import is_given # Corrected import for is_given
logger = logging.getLogger("agent-math-official") # Added logger
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'calc', 'calc_server.py'
))
class CalculatorAgent(Agent):
"""A LiveKit agent that uses MCP tools from one or more MCP servers."""
def __init__(self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
mcp_servers: Optional[list[mcp.MCPServer]] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None): # Added tools parameter
final_instructions = instructions if instructions is not None else \
"""
You are a specialist Math assistant. Your expertise is in solving mathematical problems,
performing calculations, arithmetic, and answering questions about numbers.
You have two calculation tools: 'multiply' and 'add'.
When your current math task is complete, or if the user asks for something not related to math,
you MUST use the 'delegate_to_router_agent' tool to return to the main assistant.
"""
# Combine passed tools with any class-defined tools if necessary (none here for now)
all_tools = tools if tools is not None else []
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=[
mcp.MCPServerStdio(
command="python",
args=[mcp_script_path],
)
# MODIFIED: Removed chat_ctx=chat_ctx argument
],
tools=all_tools # Pass the tools to the parent Agent class
)
# MCP tools are automatically integrated by AgentSession if mcp_servers is configured.
# No need for MCPToolsIntegration or manually adding tools here.
async def llm_node(self, chat_ctx, tools, model_settings):
"""Override the llm_node to say a message when a tool call is detected."""
tool_call_detected = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected:
tool_call_detected = True
# Example: if self.tts: self.session.say("Working on the math problem.")
# Currently, Math agent does not say anything here.
yield chunk
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit agent application."""
await ctx.connect() # Connect earlier as in official example
# Directly configure AgentSession with mcp_servers
session = AgentSession(
vad=silero.VAD.load(), # Redundant if agent has it, but official example does this
stt=deepgram.STT(model="nova-2", language="en-US"), # Consistent with agent
llm=openai.LLM(model="gpt-4o"), # Consistent with agent
tts=openai.TTS(voice="alloy"), # Consistent with agent
turn_detection=MultilingualModel(), # Consistent with agent
)
# Instantiate the agent
agent = CalculatorAgent(chat_ctx=session._chat_ctx)
await session.start(agent=agent, room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,67 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("calendar-agent")
class CalendarAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
final_instructions = (
"You are a Calendar specialist. You can help with scheduling, creating, modifying, or querying calendar events, appointments, and meetings. "
"Use tools like 'create_calendar_event', 'get_calendar_events', etc., when available. "
"If your task is complete or the user asks for something outside your calendar capabilities (e.g., math, web search), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
gumloop_mcp_url = os.getenv("GUMLOOP_CALENDAR_MCP_URL")
if gumloop_mcp_url:
mcp_servers_list.append(
mcp.MCPServerHTTP(
url=gumloop_mcp_url,
timeout=5,
client_session_timeout_seconds=5,
)
)
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
tools=all_tools,
mcp_servers=mcp_servers_list
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
)
await session.start(agent=CalendarAgent(chat_ctx=session._chat_ctx), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,76 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, RunContext, function_tool
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
import random
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
from livekit import api
logger = logging.getLogger("caller-agent")
class CallerAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
final_instructions = (
"You are a Caller specialist. Your primary function is to initiate phone calls. " +
"If the user asks to call someone, use the 'make_phone_call' tool. " +
"Currently, you can only call a predefined contact (Sam at +16467085301). Confirm with the user if they want to call this specific contact. " +
"If your task is complete or the user asks for something outside your calling capabilities (e.g., math, web search), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
agent_tools = [self.make_phone_call]
all_tools = agent_tools + (tools if tools is not None else [])
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
tools=all_tools
)
self.lkapi = api.LiveKitAPI()
async def on_enter(self):
self.session.generate_reply()
@function_tool
async def make_phone_call(self, context: RunContext, phone_number: str):
"""
Call this function to make a phone call to a user number.
Args:
phone_number: The phone number to call.
"""
await self.lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name="my-telephony-agent",
room=f"outbound-{''.join(str(random.randint(0, 9)) for _ in range(10))}",
metadata='{"phone_number": "+16467085301"}' #HARDCODED
)
)
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
)
await session.start(agent=CallerAgent(chat_ctx=session._chat_ctx), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="mcp-agent"))

View file

@ -0,0 +1,69 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("contact-agent")
class ContactAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
final_instructions = (
"You are a Contact specialist. You can help find contact information such as phone numbers, email addresses, or other details for individuals. " +
"You can also add new contacts or update existing ones if tools like 'get_contact_details', 'add_contact', 'update_contact' are available. " +
"If your task is complete or the user asks for something outside your contact management capabilities (e.g., math, web search), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
zapier_mcp_url = os.getenv("ZAPIER_CONTACT_MCP_URL")
if zapier_mcp_url:
mcp_servers_list.append(
mcp.MCPServerHTTP(
url=zapier_mcp_url,
timeout=5,
client_session_timeout_seconds=5,
)
)
else:
logger.warning("ZAPIER_CONTACT_MCP_URL not set. Contact agent may not have all its tools.")
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=mcp_servers_list,
tools=all_tools
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="alloy"),
turn_detection=MultilingualModel(),
)
await session.start(agent=ContactAgent(chat_ctx=session._chat_ctx), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,70 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext, function_tool, RunContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("gmail-agent")
class GmailAgent(Agent):
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None) -> None:
final_instructions = (
"You are a Gmail specialist. You can manage emails by reading, searching, sending, and updating them (e.g., marking as read/unread, moving to folders). " +
"Use tools like 'read_emails', 'send_email', and 'update_email' to interact with Gmail. " +
"If sending an email, you might need a recipient; you know Gabin (gabin.fay@gmail.com). " +
"If your task is complete or the user asks for something outside your email management capabilities (e.g., math, calendar), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
gumloop_mcp_url = os.getenv("GUMLOOP_GMAIL_MCP_URL")
if gumloop_mcp_url:
mcp_servers_list.append(
mcp.MCPServerHTTP(
url=gumloop_mcp_url,
timeout=5,
client_session_timeout_seconds=5,
)
)
else:
logger.warning("GUMLOOP_GMAIL_MCP_URL not set. Gmail agent may not have all its tools.")
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=mcp_servers_list,
tools=all_tools
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="alloy"),
turn_detection=MultilingualModel(),
)
await session.start(agent=GmailAgent(chat_ctx=session._chat_ctx), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,143 @@
import os
import logging
import asyncio
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli, mcp, function_tool, RunContext
from livekit.agents.llm import ChatChunk, ChatContext, ChatMessage
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import openai, silero, deepgram
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
logger = logging.getLogger("go-agent-livekit")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Load environment variables from .env file
load_dotenv()
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
GOOGLE_MAPS_API_KEY = os.environ.get('GOOGLE_MAPS_API_KEY')
DEEPGRAM_API_KEY = os.environ.get('DEEPGRAM_API_KEY')
if not OPENAI_API_KEY:
logger.critical("🔴 CRITICAL: OPENAI_API_KEY not found. OpenAI plugins will fail.")
if not GOOGLE_MAPS_API_KEY:
logger.critical("🔴 CRITICAL: GOOGLE_MAPS_API_KEY not found. Google Maps MCP server will fail.")
if not DEEPGRAM_API_KEY:
logger.warning("⚠️ WARNING: DEEPGRAM_API_KEY not found. Deepgram STT plugin may have issues.")
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'google-maps', 'dist', 'index.js'
))
if not os.path.exists(mcp_script_path):
logger.critical(f"CRITICAL: Google Maps MCP script not found at {mcp_script_path}. Agent cannot start tools.")
class GoAgent(Agent):
"""A LiveKit agent specialized in location-based queries using Google Maps via MCP."""
def __init__(self,
chat_ctx: ChatContext,
tools: Optional[List[function_tool]] = None):
final_instructions = (
"You are the Go Agent, specialized in providing location-based information using Google Maps. "
"You MUST use the available tools to fulfill user queries about locations, directions, distances, and places.\n\n"
"RULE FOR LOCATION REQUESTS: When a user asks about finding a location, getting directions, calculating distances, "
"or information about a place, you MUST use the appropriate Google Maps tool.\n\n"
"Key tools available to you (provided by Google Maps MCP):\n"
"- maps_geocode: Convert an address to coordinates (e.g., maps_geocode address=\"1600 Amphitheatre Parkway, Mountain View, CA\")\n"
"- maps_reverse_geocode: Convert coordinates to an address (e.g., maps_reverse_geocode latitude=37.422 longitude=-122.084)\n"
"- maps_search_places: Search for places (e.g., maps_search_places query=\"restaurants in London\")\n"
"- maps_place_details: Get details for a place_id (e.g., maps_place_details place_id=\"ChIJN1t_tDeuEmsRUsoyG83frY4\")\n"
"- maps_directions: Get directions (e.g., maps_directions origin=\"San Francisco\" destination=\"Los Angeles\" mode=\"driving\")\n"
"- maps_distance_matrix: Calculate distances (e.g., maps_distance_matrix origins=\"New York,Washington D.C.\" destinations=\"Boston,Philadelphia\" mode=\"...\")\n\n"
"RULE FOR TOOL RESULTS: After you receive results from a tool, you MUST analyze the data and provide a clear, "
"helpful response. Format addresses and directions in a readable way, extract key information from place details, "
"and always provide context for coordinates and distances.\n\n"
"If a tool call fails or returns no relevant information, explain clearly to the user and suggest alternatives. "
"If your task is complete or the user asks for something outside your location/maps capabilities (e.g., math, calendar), "
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
mcp_servers_list = []
if GOOGLE_MAPS_API_KEY and os.path.exists(mcp_script_path):
mcp_servers_list.append(
mcp.MCPServerStdio(
command='node',
args=[mcp_script_path],
env={'GOOGLE_MAPS_API_KEY': GOOGLE_MAPS_API_KEY}
)
)
else:
logger.warning("Google Maps MCP server not configured due to missing API key or script path.")
super().__init__(
instructions=final_instructions,
allow_interruptions=True,
chat_ctx=chat_ctx,
mcp_servers=mcp_servers_list,
tools=all_tools
)
if not self.llm:
logger.error("GoAgentLivekit initialized, but LLM might be missing if API key was not provided to plugin.")
async def llm_node(self, chat_ctx: ChatContext, tools: list, model_settings: dict):
"""Override the llm_node to log tool calls or add custom behavior."""
tool_call_detected_this_turn = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected_this_turn:
tool_call_detected_this_turn = True
logger.info("GoAgentLivekit: LLM is attempting to call a tool. Informing user.")
if hasattr(self, 'session') and self.session is not None:
self.session.say("Okay, let me check that for you.")
else:
logger.warning("Agent has no session to 'say' through during tool call detection.")
yield chunk
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit Go Agent application."""
logger.info(f"Go Agent LiveKit starting entrypoint for Job ID: {getattr(ctx.job, 'id', 'unknown')}")
await ctx.connect()
logger.info(f"Successfully connected to LiveKit room: {ctx.room.name if ctx.room else 'N/A'}")
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-2", language="en-US", api_key=os.environ.get('DEEPGRAM_API_KEY')),
llm=openai.LLM(model="gpt-4o", api_key=OPENAI_API_KEY),
tts=openai.TTS(voice="alloy", api_key=OPENAI_API_KEY),
turn_detection=MultilingualModel(),
)
logger.info("AgentSession configured with Google Maps MCP server.")
agent = GoAgent(chat_ctx=session._chat_ctx)
logger.info("GoAgentLivekit instantiated.")
logger.info(f"Starting AgentSession with agent for room: {ctx.room.name if ctx.room else 'N/A'}")
await session.start(agent=agent, room=ctx.room)
logger.info("AgentSession started. GoAgentLivekit is now running.")
if __name__ == "__main__":
logger.info("Starting Go Agent LiveKit application via cli.run_app.")
if not os.environ.get('DEEPGRAM_API_KEY'):
logger.warning("DEEPGRAM_API_KEY not found in environment. STT plugin may fail.")
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,152 @@
import os
import sys
import logging
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli, function_tool, RunContext
from livekit.agents import mcp
from livekit.agents.llm import ChatChunk
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import Optional, List
from livekit.agents import tts
from livekit.agents.types import NOT_GIVEN
from livekit.agents.utils.misc import is_given
from pydantic import BaseModel, Field
from livekit.agents import ChatContext
logger = logging.getLogger("agent-spotify-official")
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
_this_file_dir = os.path.dirname(os.path.abspath(__file__))
_stone_ui_dir = os.path.abspath(os.path.join(_this_file_dir, '..', '..'))
if _stone_ui_dir not in sys.path:
sys.path.insert(0, _stone_ui_dir)
# Removed ANTHROPIC_API_KEY check as it seems unrelated to this OpenAI-based agent.
from engine.config import settings
# --- Spotify Tool Input Models (Based on spotify-mcp-server README) ---
class PlayMusicInput(BaseModel):
uri: Optional[str] = Field(None, description="Spotify URI of the item to play (e.g., spotify:track:...). Overrides type and id.")
type: Optional[str] = Field(None, description="Type of item to play (track, album, artist, playlist)")
id: Optional[str] = Field(None, description="Spotify ID of the item to play")
deviceId: Optional[str] = Field(None, description="ID of the device to play on (optional)")
# Add other input models here as needed (e.g., SearchSpotifyInput, PlaylistInput etc.)
# --- Configure Spotify MCP Server ---
spotify_mcp_server = None
# Define the path to the BUILT MCP server script
# IMPORTANT: Ensure the MCP server is built (npm run build) and authenticated (npm run auth)
mcp_script_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'spotify', 'build', 'index.js'
))
spotify_config_path = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..', 'tools', 'mcp', 'spotify', 'spotify-config.json'
))
if not os.path.exists(mcp_script_path):
logger.error(f"❌ Spotify MCP script not found at {mcp_script_path}. Make sure you've run 'npm install && npm run build' in the server directory.")
logger.warning("⚠️ Spotify tools will be unavailable.")
elif not os.path.exists(spotify_config_path):
logger.error(f"❌ Spotify config file not found at {spotify_config_path}. Make sure you've run 'npm run auth' after setting credentials.")
logger.warning("⚠️ Spotify tools will likely be unavailable due to missing auth.")
else:
# Check if config contains tokens (basic check)
try:
with open(spotify_config_path, 'r') as f:
config_content = f.read()
if 'accessToken' not in config_content or 'refreshToken' not in config_content or 'run-npm auth' in config_content:
logger.warning(f"⚠️ Spotify config file at {spotify_config_path} seems incomplete or unauthenticated. Run 'npm run auth'.")
# We still configure the server, but it might fail at runtime
else:
logger.info("✅ Spotify config file seems authenticated.")
except Exception as e:
logger.error(f"Error reading Spotify config {spotify_config_path}: {e}")
logger.info(f"📂 Configuring Spotify MCP server with script: {mcp_script_path}")
spotify_mcp_server = mcp.MCPServerStdio(
'node', # Command to run the server
args=[mcp_script_path], # Argument is the script path
# No specific env vars needed here, reads from spotify-config.json
env={},
client_session_timeout_seconds=5*60
)
logger.info("✅ Spotify MCP Server configured (runtime auth check still needed).")
class ListenAgent(Agent):
"""A LiveKit agent that uses MCP tools from one or more MCP servers."""
def __init__(self,
chat_ctx: ChatContext,
instructions: Optional[str] = None,
tts: Optional[tts.TTS] = NOT_GIVEN,
tools: Optional[List[function_tool]] = None):
final_instructions = instructions if instructions is not None else \
("You are the Listen Agent, specialized in controlling Spotify music playback. " +
"You MUST use the available tools to fulfill user requests related to Spotify. " +
"Available tools include 'playMusic', and potentially others like 'searchSpotify', 'pausePlayback', etc.\n\n" +
"RULE FOR MUSIC REQUESTS: When a user asks to play music, search for music, control playback (pause, skip, etc.), " +
"manage playlists, or ask what's playing, you MUST use the appropriate Spotify tool (like 'playMusic'). " +
"Be precise with parameters like 'uri' or 'type' and 'id'. Infer parameters from the user query. If essential info is missing (like what to play), ask the user.\n\n" +
"RULE FOR TOOL RESULTS: After a tool is successfully executed, you MUST confirm the action to the user (e.g., 'Okay, playing \'Bohemian Rhapsody\' now.'). " +
"If a tool fails or returns an error, inform the user clearly. " +
"If your task is complete or the user asks for something outside your Spotify capabilities (e.g., math, calendar), " +
"you MUST use the 'delegate_to_router_agent' tool to return to the main assistant."
)
all_tools = tools if tools is not None else []
active_mcp_servers = []
if spotify_mcp_server is not None:
active_mcp_servers.append(spotify_mcp_server)
super().__init__(
instructions=final_instructions,
chat_ctx=chat_ctx,
allow_interruptions=True,
mcp_servers=active_mcp_servers, # MODIFIED: Pass filtered list
tools=all_tools # Pass the tools to the parent Agent class
)
async def llm_node(self, chat_ctx, tools, model_settings):
"""Override the llm_node to say a message when a tool call is detected."""
tool_call_detected = False
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
if isinstance(chunk, ChatChunk) and chunk.delta and chunk.delta.tool_calls and not tool_call_detected:
tool_call_detected = True
# Use self.session.say() to make the agent speak, only if TTS is configured
if self.tts: # Check if the agent has a TTS instance
self.session.say("Sure, I'll check that for you.")
yield chunk
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the LiveKit agent application."""
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-2", language="en-US"),
llm=openai.LLM(model="gpt-4o"),
tts=openai.TTS(voice="alloy"),
turn_detection=MultilingualModel(),
)
agent = ListenAgent(chat_ctx=session._chat_ctx)
await session.start(agent=agent, room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

View file

@ -0,0 +1,220 @@
import asyncio
import logging
import os
from typing import List, Dict, Any, Annotated
from pathlib import Path
import aiohttp
from dotenv import load_dotenv
from livekit.agents import (
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
Agent,
AgentSession
)
from livekit import rtc, api
from livekit.plugins import deepgram, openai, silero
from mem0 import AsyncMemoryClient
# Load environment variables
load_dotenv(dotenv_path=Path(__file__).parent.parent.parent / '.env')
# Configure logging
logger = logging.getLogger("memory-assistant")
logger.setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Define a global user ID for simplicity
USER_ID = "voice_user"
# Initialize Mem0 memory client
mem0 = AsyncMemoryClient()
async def _enrich_with_memory(last_user_msg: llm.ChatMessage, chat_ctx_to_modify: llm.ChatContext):
"""Add memories and Augment chat context with relevant memories"""
if not last_user_msg or not last_user_msg.text_content or not last_user_msg.text_content.strip():
logger.info("No valid last user message content to process for memory.")
return
try:
# Ensure last_user_msg.text_content is a string for mem0
content_str = last_user_msg.text_content
if not content_str or not content_str.strip():
logger.info("User message content is empty after getting text_content.")
return
logger.info(f"[Mem0] Attempting to add memory for USER_ID '{USER_ID}': '{content_str}'")
try:
add_response = await mem0.add(
[{"role": "user", "content": content_str}],
user_id=USER_ID
)
logger.info(f"[Mem0] Successfully added memory. Response: {add_response}")
except Exception as e:
logger.error(f"[Mem0] Error adding memory: {e}", exc_info=True)
# Decide if we should return or continue to search with potentially stale memory
# For now, we'll continue to search.
logger.info(f"[Mem0] Attempting to search memories for USER_ID '{USER_ID}' with query: '{content_str}'")
results = []
try:
results = await mem0.search(
content_str,
user_id=USER_ID,
)
logger.info(f"[Mem0] Search complete. Found {len(results)} results: {results}")
except Exception as e:
logger.error(f"[Mem0] Error searching memory: {e}", exc_info=True)
if results:
memories_text = ' '.join([result["memory"] for result in results if result.get("memory")])
if memories_text.strip():
logger.info(f"Enriching with memory: {memories_text}")
# Create the RAG message. Ensure content is a list of ChatContent (string is fine).
rag_msg_content = f"Relevant Memory from past interactions: {memories_text}\\nUser's current query is below."
rag_msg = llm.ChatMessage(role="system", content=[rag_msg_content])
# Insert RAG message before the last user message in the context's items list
inserted = False
# Access items via the .items property
target_items_list = chat_ctx_to_modify.items
for i in range(len(target_items_list) - 1, -1, -1):
if target_items_list[i] is last_user_msg: # Check object identity
target_items_list.insert(i, rag_msg)
inserted = True
logger.info(f"Inserted RAG message at index {i} in .items list")
break
if not inserted:
logger.warning("Could not find last user message by identity in .items list. Appending RAG message.")
if target_items_list and target_items_list[-1] is last_user_msg:
target_items_list.insert(len(target_items_list)-1, rag_msg)
else:
target_items_list.append(rag_msg)
except Exception as e:
logger.error(f"Error during memory enrichment: {e}", exc_info=True)
class MemoryAgent(Agent):
def __init__(self, chat_ctx: llm.ChatContext):
super().__init__(
chat_ctx=chat_ctx,
instructions="You are a helpful voice assistant that can remember past interactions."
)
# System prompt is now managed by the chat_ctx passed to super().__init__
async def on_enter(self):
logger.info("MemoryAgent entered room.")
try:
# Say initial greeting
await self.session.say(
"Hello! I'm George. Can I help you plan an upcoming trip? ",
allow_interruptions=True
)
# Start the main interaction loop
self.session.generate_reply()
logger.info("MemoryAgent started generate_reply loop.")
except Exception as e:
logger.error(f"Error in MemoryAgent.on_enter: {e}", exc_info=True)
async def on_user_turn_completed(self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage):
logger.info(f"MemoryAgent.on_user_turn_completed called with new_message: '{new_message.text_content}'")
if not new_message or not new_message.content or not new_message.text_content.strip():
logger.info("No valid new_message content for memory enrichment.")
return
# The turn_ctx provided by the hook is the context *before* the new_message.
# We need to add the new_message to it before enrichment,
# so _enrich_with_memory can potentially place the RAG message *before* it.
# The AgentActivity will use this modified turn_ctx for the LLM call.
# It will also separately add the new_message to the agent's main context.
# Let's make a working copy if direct modification isn't intended for the passed turn_ctx,
# though the name temp_mutable_chat_ctx in AgentActivity suggests it's okay.
# For safety and clarity in _enrich_with_memory, we'll operate on turn_ctx.
# Add the new user message to the context that will be enriched
turn_ctx.items.append(new_message) # new_message is already part of the main context by AgentActivity
# but for _enrich_with_memory to find it (as last_user_msg)
# and insert RAG before it in *this specific context copy*, it needs to be here.
# AgentActivity also adds this new_message to the agent's _chat_ctx separately.
logger.info(f"Context before enrichment (with new_message added): {turn_ctx.items}")
# Enrich the context (which now includes new_message) with memories
# _enrich_with_memory will find new_message as the last user message
# and insert the RAG system message just before it in turn_ctx.items
await _enrich_with_memory(new_message, turn_ctx)
logger.info(f"Context after enrichment: {turn_ctx.items}")
# No need to call self.update_chat_ctx() here.
# The AgentActivity will use the modified turn_ctx for the LLM.
def prewarm_process(proc: JobProcess):
logger.info("Prewarming VAD model.")
# Preload silero VAD in memory to speed up session start
proc.userdata["vad"] = silero.VAD.load()
logger.info("VAD model prewarmed.")
async def entrypoint(ctx: JobContext):
logger.info("Agent entrypoint started.")
try:
await ctx.connect()
logger.info("Connected to LiveKit room.")
# Define initial system context for the LLM
initial_ctx = llm.ChatContext()
system_prompt_text = (
"""
You are a helpful voice assistant.
You are a travel guide named George and will help the user to plan a travel trip of their dreams.
You should help the user plan for various adventures like work retreats, family vacations or solo backpacking trips.
You should be careful to not suggest anything that would be dangerous, illegal or inappropriate.
You can remember past interactions and use them to inform your answers.
Use semantic memory retrieval to provide contextually relevant responses.
When relevant memory is provided, use it to enhance your response.
"""
)
initial_ctx.add_message(role="system", content=system_prompt_text)
logger.info("Initial system context defined.")
# VAD model loading logic remains the same
vad_model = ctx.proc.userdata.get("vad")
if not vad_model:
logger.info("VAD not prewarmed or not found in userdata, loading now.")
vad_model = silero.VAD.load()
else:
logger.info("Using prewarmed VAD model.")
custom_agent = MemoryAgent(chat_ctx=initial_ctx)
# AgentSession constructor does NOT take 'agent'
session = AgentSession(
vad=vad_model,
stt=deepgram.STT(model="nova-2", language="en"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="alloy"),
)
logger.info("AgentSession created.")
# Agent is passed to session.start()
await session.start(agent=custom_agent, room=ctx.room)
logger.info("Agent session started with MemoryAgent.")
except Exception as e:
logger.error(f"Error in agent entrypoint: {e}", exc_info=True)
# Run the application
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm_process,
agent_name="mem0-voice-agent")) # Consistent agent name

View file

@ -0,0 +1,257 @@
import sys # Import sys for sys.exit
import logging
import os
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
logger = logging.getLogger("stone-agent")
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
WorkerOptions,
cli,
mcp,
RunContext,
function_tool,
)
from livekit.agents.voice.agent import ModelSettings # Import ModelSettings
from livekit.agents.llm import (
ChatRole,
LLM,
ChatMessage
)
from livekit.plugins import deepgram, openai, silero, anthropic
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import List, Optional # Ensure List and Optional are imported for tool type hints
# Import the original FunctionAgents from the official agent files
# These files should be in the same directory as router_agent.py
from ask_agent import AskAgent
from calc_agent import CalculatorAgent
from calendar_agent import CalendarAgent
from caller_agent import CallerAgent
from contact_agent import ContactAgent
from gmail_agent import GmailAgent
from go_agent import GoAgent
from listen_agent import ListenAgent
# from mem_agent import MemoryAgent
logger = logging.getLogger("router-agent")
load_dotenv()
# Determine the absolute path for server scripts relative to this file
_current_dir = os.path.dirname(os.path.abspath(__file__))
@function_tool
async def delegate_to_router_agent(context: RunContext, original_query: str = "User wants to talk about something else."):
"""
Call this function to delegate the conversation back to the main RouterAgent.
This is used when your current task is complete, or the user asks for functionality
that you (the specialist agent) do not provide.
Args:
original_query: A brief description of why the delegation is happening or the user's last relevant query.
"""
logger.info(f"Specialist Agent: Delegating back to RouterAgent. Reason/Query: '{original_query}'")
# Try to access _chat_ctx via context.session, as context.agent was problematic
if not hasattr(context, 'session') or context.session is None:
logger.error("delegate_to_router_agent: RunContext does not have a valid 'session' attribute.")
# This is a critical failure for context propagation.
# Depending on desired behavior, could raise an error or attempt a recovery (though recovery is hard here).
# For now, we'll let it fail if it tries to access _chat_ctx on a None session,
# or re-raise a more specific error.
raise AttributeError("RunContext is missing the session attribute, cannot retrieve ChatContext.")
return RouterAgent(chat_ctx=context.session._chat_ctx), "Okay, let me switch you back to the main assistant."
class RouterAgent(Agent):
"""Routes user queries to specialized agents."""
def __init__(self, chat_ctx: ChatContext):
super().__init__(
instructions="""
You are a router agent. Your primary responsibility is to understand the user's voice query
and delegate it to the most appropriate specialist agent.
- If the query is primarily about mathematics, calculations, arithmetic, or numbers,
you MUST use the 'delegate_to_math_agent' tool.
- For general knowledge questions, facts, explanations, requests to 'search the web', 'make a web search',
or any other type of query not strictly mathematical, not about specific addresses/locations, and not covered by other specialists,
you MUST use the 'delegate_to_perplexity_agent' tool.
- If the query involves calendar events, scheduling, creating appointments, or asking about your schedule,
you MUST use the 'delegate_to_calendar_agent' tool.
- If the user explicitly asks to make a phone call,
you MUST use the 'delegate_to_caller_agent' tool.
- If the query is about finding contact information (like phone numbers or email addresses of people),
you MUST use the 'delegate_to_contact_agent' tool.
- For tasks related to managing emails (reading, sending, searching Gmail),
you MUST use the 'delegate_to_gmail_agent' tool.
- If the query is about locations, finding places, getting directions, looking up addresses, or anything map-related,
you MUST use the 'delegate_to_go_agent' tool.
- If the user wants to play music, control music playback, or anything related to Spotify,
you MUST use the 'delegate_to_listen_agent' tool.
Listen carefully to the user's query and make a clear decision.
Do not attempt to answer the question yourself. Your sole job is to route.
If uncertain, you can ask one clarifying question to determine the correct agent, but prefer to route directly if possible.
""",
allow_interruptions=True,
chat_ctx=chat_ctx
)
async def on_enter(self):
"""Called when the RouterAgent starts. It will wait for user input."""
logger.info("RouterAgent entered. Waiting for user query.")
self.session.generate_reply()
@function_tool
async def delegate_to_math_agent(self, query: str):
"""
Call this function to delegate a math-related query to the MathSpecialistAgent.
Args:
query: The user's original voice query that is mathematical in nature.
"""
logger.info(f"RouterAgent: Delegating to MathSpecialistAgent for query: '{query}'")
# Pass the delegate_to_router_agent tool to the CalculatorAgent
math_agent = CalculatorAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return math_agent, "Okay, I'll connect you with my math specialist for that."
@function_tool
async def delegate_to_perplexity_agent(self, query: str):
"""
Call this function to delegate a query that needs to perform a web search to the Perplexity Agent.
Args:
query: The user's original voice query.
"""
logger.info(f"RouterAgent: Delegating to AskAgent (for perplexity tasks) for query: '{query}'")
try:
perplexity_agent = AskAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return perplexity_agent, "Alright, let me get my knowledge expert to help with that question."
except AttributeError as e:
logger.error(f"Unexpected AttributeError: {e}")
raise
@function_tool
async def delegate_to_calendar_agent(self, query: str):
"""
Call this function to delegate a query about calendar events, scheduling, or appointments to the CalendarAgent.
Args:
query: The user's original voice query related to calendar.
"""
logger.info(f"RouterAgent: Delegating to CalendarAgent for query: '{query}'")
calendar_agent = CalendarAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return calendar_agent, "Okay, let me check your calendar."
@function_tool
async def delegate_to_caller_agent(self, query: str):
"""
Call this function to delegate a request to make a phone call to the CallerAgent.
Args:
query: The user's original voice query about making a call.
"""
logger.info(f"RouterAgent: Delegating to CallerAgent for query: '{query}'")
caller_agent = CallerAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return caller_agent, "Sure, I can try to make that call for you."
@function_tool
async def delegate_to_contact_agent(self, query: str):
"""
Call this function to delegate a query about finding or managing contact information to the ContactAgent.
Args:
query: The user's original voice query related to contacts.
"""
logger.info(f"RouterAgent: Delegating to ContactAgent for query: '{query}'")
contact_agent = ContactAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return contact_agent, "Let me look up that contact information for you."
@function_tool
async def delegate_to_gmail_agent(self, query: str):
"""
Call this function to delegate an email-related query (reading, sending, managing emails) to the GmailAgent.
Args:
query: The user's original voice query related to Gmail.
"""
logger.info(f"RouterAgent: Delegating to GmailAgent for query: '{query}'")
gmail_agent = GmailAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return gmail_agent, "Okay, I'll check your emails."
@function_tool
async def delegate_to_go_agent(self, query: str):
"""
Call this function to delegate a query about locations, directions, maps, or places to the GoAgent.
Args:
query: The user's original voice query related to maps or navigation.
"""
logger.info(f"RouterAgent: Delegating to GoAgent for query: '{query}'")
go_agent = GoAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return go_agent, "Let me get my navigation expert for that."
@function_tool
async def delegate_to_listen_agent(self, query: str):
"""
Call this function to delegate a request to play or control music (Spotify) to the ListenAgent.
Args:
query: The user's original voice query related to music or Spotify.
"""
logger.info(f"RouterAgent: Delegating to ListenAgent for query: '{query}'")
listen_agent = ListenAgent(
chat_ctx=self.session._chat_ctx,
tools=[delegate_to_router_agent] # Pass the tool
)
return listen_agent, "Okay, let's get some music playing."
async def entrypoint(ctx: JobContext):
"""Main entrypoint for the multi-agent LiveKit application."""
await ctx.connect()
logger.info("Router agent connected to LiveKit.")
session = AgentSession[None](
vad=silero.VAD.load(),
stt=openai.STT(model="gpt-4o-mini-transcribe", detect_language=True),
tts=openai.TTS(voice="alloy", model="tts-1-hd"),
llm=openai.LLM(model="gpt-4o"),
turn_detection=MultilingualModel()
)
logger.info("AgentSession configured. MCP servers will be managed by individual specialist agents.")
initial_agent = RouterAgent(chat_ctx=session._chat_ctx)
await session.start(agent=initial_agent, room=ctx.room)
logger.info("RouterAgent session started.")
if __name__ == "__main__":
# Setup basic logging if running directly
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
try:
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
except SystemExit: # Allow sys.exit() to pass through without logging as critical
raise
except Exception as e:
logger.critical(f"Unhandled exception at top level: {e}", exc_info=True)
sys.exit(1) # Ensure exit with error code

View file

@ -0,0 +1,88 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
from livekit.agents.llm import function_tool
from livekit import api, rtc
from livekit.agents import get_job_context
from livekit.agents import RunContext
# Add this function definition anywhere
async def hangup_call():
ctx = get_job_context()
if ctx is None:
# Not running in a job context
return
await ctx.api.room.delete_room(
api.DeleteRoomRequest(
room=ctx.room.name,
)
)
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
class MyAgent(Agent):
def __init__(self, chat_ctx: ChatContext) -> None:
super().__init__(
instructions=(
"You can have phone calls. The interface is voice-based: "
"accept spoken user queries and respond with synthesized speech."
),
chat_ctx=chat_ctx
)
@function_tool
async def end_call(self, ctx: RunContext):
"""Called when the user wants to end the call"""
# let the agent finish speaking
current_speech = ctx.session.current_speech
if current_speech:
await current_speech.wait_for_playout()
await hangup_call()
@function_tool
async def end_call_finished_by_you(self, ctx: RunContext):
"""Called when you have accomplished your task and can end the call safely"""
# let the agent finish speaking
current_speech = ctx.session.current_speech
if current_speech:
await current_speech.wait_for_playout()
await hangup_call()
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
)
await session.start(agent=MyAgent(chat_ctx=session._chat_ctx), room=ctx.room)
await session.generate_reply(
instructions="Greet the user and offer your assistance."
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent"))

View file

@ -0,0 +1,87 @@
import logging
import os
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli, mcp, ChatContext
from livekit.plugins import deepgram, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from dotenv import load_dotenv
from livekit import api
import json
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
logger = logging.getLogger("mcp-agent")
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
class MyAgent(Agent):
def __init__(self, chat_ctx: ChatContext) -> None:
super().__init__(
instructions=(
"You can have phone calls. The interface is voice-based: "
"accept spoken user queries and respond with synthesized speech."
),
chat_ctx=chat_ctx
)
async def on_enter(self):
self.session.generate_reply()
async def entrypoint(ctx: JobContext):
await ctx.connect()
# If a phone number was provided, then place an outbound call
# By having a condition like this, you can use the same agent for inbound/outbound telephony as well as web/mobile/etc.
dial_info = json.loads(ctx.job.metadata)
phone_number = dial_info["phone_number"]
# The participant's identity can be anything you want, but this example uses the phone number itself
sip_participant_identity = phone_number
if phone_number is not None:
# The outbound call will be placed after this method is executed
try:
await ctx.api.sip.create_sip_participant(api.CreateSIPParticipantRequest(
# This ensures the participant joins the correct room
room_name=ctx.room.name,
# This is the outbound trunk ID to use (i.e. which phone number the call will come from)
# You can get this from LiveKit CLI with `lk sip outbound list`
sip_trunk_id=os.environ.get("TWILIO_SIP_TRUNK_ID"),
# The outbound phone number to dial and identity to use
sip_call_to=phone_number,
participant_identity=sip_participant_identity,
# This will wait until the call is answered before returning
wait_until_answered=True,
))
print("call picked up successfully")
except api.TwirpError as e:
print(f"error creating SIP participant: {e.message}, "
f"SIP status: {e.metadata.get('sip_status_code')} "
f"{e.metadata.get('sip_status')}")
ctx.shutdown()
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3", language="multi"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
turn_detection=MultilingualModel(),
)
await session.start(agent=MyAgent(chat_ctx=session._chat_ctx), room=ctx.room)
if phone_number is None:
await session.generate_reply(
instructions="Greet the user and offer your assistance."
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent"))