Skip to content

Instantly share code, notes, and snippets.

@jvelezmagic
Last active September 10, 2024 19:40
Show Gist options
  • Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
QA Chatbot streaming with source documents example using FastAPI, LangChain Expression Language, OpenAI, and Chroma.
"""QA Chatbot streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma.
Features
--------
- Persistent Chat Memory:
Stores chat history in a local file.
- Persistent Vector Store:
Stores document embeddings in a local vector store.
- Standalone Question Generation:
Rephrases follow-up questions to standalone questions in their original language.
- Document Retrieval:
Searches and retrieves relevant documents based on user queries.
- Context-Aware Responses:
Generates responses based on a combined context from relevant documents.
- Streaming Responses:
Streams responses in real time either as plain text or as Server-Sent Events (SSE).
SSE also sends the relevant documents as context.
Next Steps
----------
- Add a proper exception handling mechanism during the streaming process.
- Add pruning to the conversation buffer memory to prevent it from growing too large.
- Combine documents using a more sophisticated method than simply concatenating them.
Usage
-----
1. Install dependencies:
```bash
pip install fastapi==0.99.1 uvicorn==0.23.2 python-dotenv==1.0.0 chromadb==0.4.5 tiktoken==0.4.0 langchain==0.0.257 openai==0.27.8
```
or
```bash
poetry install
```
2. Run the server:
```bash
uvicorn main:app --reload
```
3. curl the server:
With plain text:
```bash
curl --no-buffer -X 'POST' \
'http://localhost:8000/chat' \
-H 'accept: text/plain' \
-H 'Content-Type: application/json' \
-d '{
"session_id": "session_1",
"message": "who'\''s playing in the river?"
}'
```
With SSE:
```bash
curl --no-buffer -X 'POST' \
'http://localhost:8000/chat/sse/' \
-H 'accept: text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"session_id": "session_2",
"message": "who'\''s playing in the garden?"
}'
Cheers!
@jvelezmagic"""
import os
from functools import lru_cache
from typing import AsyncGenerator, Literal
from fastapi import Depends, FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from langchain.memory import ConversationBufferMemory, FileChatMessageHistory
from langchain.prompts import PromptTemplate
from langchain.schema import BaseChatMessageHistory, Document, format_document
from langchain.schema.output_parser import StrOutputParser
from langchain.vectorstores import Chroma
from pydantic import BaseModel, BaseSettings
class Settings(BaseSettings):
openai_api_key: str
class Config: # type: ignore
env_file = ".env"
env_file_encoding = "utf-8"
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatSSEResponse(BaseModel):
type: Literal["context", "start", "streaming", "end", "error"]
value: str | list[Document]
@lru_cache()
def get_settings() -> Settings:
return Settings() # type: ignore
@lru_cache()
def get_vectorstore() -> Chroma:
settings = get_settings()
embeddings = OpenAIEmbeddings(openai_api_key=settings.openai_api_key) # type: ignore
vectorstore = Chroma(
collection_name="chroma",
embedding_function=embeddings,
persist_directory="chroma",
)
return vectorstore
def combine_documents(
docs: list[Document],
document_prompt: PromptTemplate = PromptTemplate.from_template("{page_content}"),
document_separator: str = "\n\n",
) -> str:
doc_strings = [format_document(doc, document_prompt) for doc in docs]
return document_separator.join(doc_strings)
app = FastAPI(
title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma",
version="0.1.0",
)
@app.on_event("startup")
async def startup_event() -> None:
vectorstore = get_vectorstore()
is_collection_empty: bool = vectorstore._collection.count() == 0 # type: ignore
if is_collection_empty:
vectorstore.add_texts( # type: ignore
texts=[
"Cats are playing in the garden.",
"Dogs are playing in the river.",
"Dogs and cats are mortal enemies, but they often play together.",
]
)
if not os.path.exists("message_store"):
os.mkdir("message_store")
async def generate_standalone_question(
chat_history: str, question: str, settings: Settings
) -> str:
prompt = PromptTemplate.from_template(
template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm | StrOutputParser() # type: ignore
return await chain.ainvoke( # type: ignore
{
"chat_history": chat_history,
"question": question,
}
)
async def search_relevant_documents(query: str, k: int = 5) -> list[Document]:
vectorstore = get_vectorstore()
retriever = vectorstore.as_retriever()
return await retriever.aget_relevant_documents(query=query, k=k)
async def generate_response(
context: str, chat_memory: BaseChatMessageHistory, message: str, settings: Settings
) -> AsyncGenerator[str, None]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm # type: ignore
response = ""
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield token.content
response += token.content
chat_memory.add_user_message(message=message)
chat_memory.add_ai_message(message=response)
async def generate_sse_response(
context: list[Document],
chat_memory: BaseChatMessageHistory,
message: str,
settings: Settings,
) -> AsyncGenerator[str, ChatSSEResponse]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm # type: ignore
response = ""
yield ChatSSEResponse(type="context", value=context).json()
try:
yield ChatSSEResponse(type="start", value="").json()
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield ChatSSEResponse(type="streaming", value=token.content).json()
response += token.content
yield ChatSSEResponse(type="end", value="").json()
chat_memory.add_user_message(message=message)
chat_memory.add_ai_message(message=response)
except Exception as e: # TODO: Add proper exception handling
yield ChatSSEResponse(type="error", value=str(e)).json()
@app.post("/chat")
async def chat(
request: ChatRequest, settings: Settings = Depends(get_settings)
) -> StreamingResponse:
memory_key = f"./message_store/{request.session_id}.json"
chat_memory = FileChatMessageHistory(file_path=memory_key)
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False)
standalone_question = await generate_standalone_question(
chat_history=memory.buffer, question=request.message, settings=settings
)
relevant_documents = await search_relevant_documents(query=standalone_question)
combined_documents = combine_documents(relevant_documents)
return StreamingResponse(
generate_response(
context=combined_documents,
chat_memory=chat_memory,
message=request.message,
settings=settings,
),
media_type="text/plain",
)
@app.post("/chat/sse/")
async def chat_sse(
request: ChatRequest, settings: Settings = Depends(get_settings)
) -> StreamingResponse:
memory_key = f"./message_store/{request.session_id}.json"
chat_memory = FileChatMessageHistory(file_path=memory_key)
memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False)
standalone_question = await generate_standalone_question(
chat_history=memory.buffer, question=request.message, settings=settings
)
relevant_documents = await search_relevant_documents(query=standalone_question, k=2)
return StreamingResponse(
generate_sse_response(
context=relevant_documents,
chat_memory=chat_memory,
message=request.message,
settings=settings,
),
media_type="text/event-stream",
)
[tool.poetry]
name = "langchain-language-expression-streaming-fastapi"
version = "0.1.0"
description = ""
authors = ["Jesús Vélez Santiago"]
packages = [{include = "app"}]
[tool.poetry.dependencies]
python = "^3.10"
langchain = "^0.0.257"
openai = "^0.27.8"
fastapi = "0.99.1"
uvicorn = "^0.23.2"
python-dotenv = "^1.0.0"
chromadb = "^0.4.5"
tiktoken = "^0.4.0"
[tool.poetry.group.dev.dependencies]
black = "^23.7.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
@Princekrampah
Copy link

Nice! thanks alot.

@krannnn
Copy link

krannnn commented Aug 9, 2023

@jvelezmagic Many thanks for this!
I'm facing a problem on the "/chat/sse" endpoint.
Postman is not giving any data back, the connection is established, but closed in a few sec (see attached screenshot).
BTW the "/chat " endpoints works on postman.

Appreciate your prompt response.
UPDATE: Would be great if you can help with how to stream the results from fastAPI endpoint to python or javascript to get the full content. Cheers!

@hugoppp
Copy link

hugoppp commented Aug 9, 2023

If anybody is getting the response as one big chunk on an nginx server but everything streams when running locally, you might try to add the X-Accel-Buffering: no header to the StreamingResponse

return StreamingResponse(
   generate_sse_response(...),
   media_type="text/event-stream",
   headers={"X-Accel-Buffering": "no"}
)

@ohmeow
Copy link

ohmeow commented Aug 9, 2023

@jvelezmagic Many thanks for this! I'm facing a problem on the "/chat/sse" endpoint. Postman is not giving any data back, the connection is established, but closed in a few sec (see attached screenshot). BTW the "/chat " endpoints works on postman ... Appreciate your prompt response. UPDATE: Would be great if you can help with how to stream the results from fastAPI endpoint to python or javascript to get the full content. Cheers!

Postman doesn't stream the results back ... I recommend using curl. For example:

curl -N -X POST -H "Accept: text/event-stream" -H "Content-Type: application/json" -d '{"conversation_id": "123", "message": "What are the cats doing?" }' http://localhost:8000/chat/sse

@Princekrampah
Copy link

I try to use this with a ConversationChain. It would not work. Any help with this? I simple added a ConversationChain in the generate_sse_response, any suggestiongs?

@Princekrampah
Copy link

Just fond out how to do this.

from typing import AsyncGenerator, Literal
from pydantic import BaseModel

from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import BaseChatMessageHistory, Document
from langchain.schema.output_parser import StrOutputParser
from langchain.memory import ConversationBufferMemory
from langchain.schema.runnable import RunnableMap
from langchain.prompts import ChatPromptTemplate
from langchain.chains import OpenAIModerationChain


from config import Settings


class ChatRequest(BaseModel):
    session_id: str
    message: str


class ChatSSEResponse(BaseModel):
    type: Literal["context", "start", "streaming", "end", "error"]
    value: str | list[Document]


async def generate_standalone_question(
        chat_history: str,
        question: str,
        settings: Settings) -> str:
    """
    Generate question from the previous chats and the user prompt or question
    to get a stand alone question.

    :param chat_history: History of chats
    :param question: Question user wants to answer
    :param settings: Settings basemodel for openai keys
    :return: String of standalone question
    """

    prompt = PromptTemplate.from_template(
        template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:""",
        input=[]
    )

    llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)

    chain = prompt | llm | StrOutputParser()  # type: ignore

    return await chain.ainvoke(  # type: ignore
        {
            "chat_history": chat_history,
            "question": question,
        }
    )


async def generate_sse_response(
        memory: ConversationBufferMemory,
        chat_memory: BaseChatMessageHistory,
        message: str,
        settings: Settings,
        prompt_template: ChatPromptTemplate,
        moderator_model: OpenAIModerationChain) -> AsyncGenerator[str, ChatSSEResponse]:
    """
    Function to generate the response using Server Sent Events(SSE)
    TODO: Generate good exception handling
    TODO: Adding the moderator to add a safeguard againt violations, hate speech and violent content

    :param context: List of all relevant documents with respect to user question to model
    :param chat_memory: Memory of previous conversations
    :param settings: Settings basemodel for openai keys
    :param prompt_template: The chat prompt template feed to the AI, with System messages and human messages
    :return: List of AsyncGenerator containing str and ChatSSEResponse
    """

    llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)

    # first get the memory then expand the memory
    # https://python.langchain.com/docs/guides/expression_language/cookbook
    chain = RunnableMap({
        "input": lambda x: x["input"],
        "memory": memory.load_memory_variables
    }) | {
        "input": lambda x: x["input"],
        "history": lambda x: x["memory"]["history"]
    } | prompt_template | llm

    response = ""

    try:
        yield ChatSSEResponse(type="start", value="").json()
        # type: ignore
        async for token in chain.astream({"input": message}):
            yield ChatSSEResponse(type="streaming", value=token.content).json()
            response += token.content

        yield ChatSSEResponse(type="end", value="").json()
        chat_memory.add_user_message(message=message)
        chat_memory.add_ai_message(message=response)

    except Exception as e:  # TODO: Add proper exception handling
        yield ChatSSEResponse(type="error", value=str(e)).json()

@pourmand1376
Copy link

If you get TypeError: unsupported operand type(s) for |: 'type' and 'types.GenericAlias' in line value: str | list[Document], you should replace it with value: Union[str, list[Document]].

@pourmand1376
Copy link

pourmand1376 commented Aug 15, 2023

Hi, If you want an MVP (Minimal Working Example), this is what I've extracted from the script.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from pydantic import BaseModel

from typing import AsyncGenerator


class ChatRequest(BaseModel):
    session_id: str
    message: str


app = FastAPI(
    title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma",
    version="0.1.0",
)


async def generate_response(
    context: str,
    message: str,
) -> AsyncGenerator[str, None]:
    prompt = PromptTemplate.from_template(
        """Answer the question based only on the following context:
{context}
Question: {question}"""
    )

    llm = ChatOpenAI(
        temperature=0,
        openai_api_key=YOUR_OPENAI_KEY,
    )

    chain = prompt | llm  # type: ignore

    response = ""
    async for token in chain.astream({"context": context, "question": message}):  # type: ignore
        yield token.content
        response += token.content


@app.post("/chat")
async def chat(
    request: ChatRequest,
) -> StreamingResponse:
    return StreamingResponse(
        generate_response(
            context="",
            message=request.message,
        ),
        media_type="text/plain",
    )

@sowsan
Copy link

sowsan commented Dec 15, 2023

Great content! I am struggling to make this work with AgentExecutor. Output never come as streamed token. Any ideas how to make the LCEL implementation of AgentExecutor to stream?

agent_executor = AgentExecutor(agent=agent_chain, memory=memory, verbose=True, tools=tool_list,return_intermediate_steps=False)

async for token in agent_executor .astream({"input ": "Hello"}):
yield token.content
response += token.content

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment