Skip to content

Instantly share code, notes, and snippets.

@kwindla
Created August 4, 2024 17:14
Show Gist options
  • Save kwindla/738ab60233588ad112560e38cd6afe96 to your computer and use it in GitHub Desktop.
Save kwindla/738ab60233588ad112560e38cd6afe96 to your computer and use it in GitHub Desktop.
RTVI simple bot example, 2024-08-04
import aiohttp
import asyncio
import os
import sys
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.frameworks.rtvi import (
RTVIConfig,
RTVIProcessor,
RTVIServiceConfig,
RTVIServiceOptionConfig)
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer, VADParams
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
# logger.add(sys.stderr, level="TRACE")
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Realtime AI",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(
stop_secs=float(os.getenv("VAD_STOP_SECS", "0.3"))
)),
))
config = RTVIConfig(
config=[
RTVIServiceConfig(
service="llm",
options=[
RTVIServiceOptionConfig(
name="model",
value="gpt-4o"),
RTVIServiceOptionConfig(
name="messages",
value=[{
"role": "system",
"content": "You are a helpful assistant named Socrates. Briefly say hello!"
}]),
])])
rtai = RTVIProcessor(transport=transport)
rtai.setup_on_start(config=None, ctor_args={
"llm": {
"name": "LLM",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tts": {
"name": "TTS",
"api_key": os.getenv("CARTESIA_API_KEY"),
"voice_id": "79a125e8-cd45-4c13-8a67-188112f4dd22",
}
})
runner = PipelineRunner()
pipeline = Pipeline([transport.input(), rtai])
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
send_initial_empty_metrics=False,
report_only_initial_ttfb=False))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await rtai.update_config(config)
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
logger.info("Partcipant left. Exiting.")
await task.queue_frame(EndFrame())
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment