Skip to content

Chat app

Simple chat app example build with FastAPI.

Demonstrates:

  • reusing chat history
  • serializing messages

This demonstrates storing chat history between requests and using it to give the model context for new responses.

Most of the complex logic here is in chat_app.html which includes the page layout and JavaScript to handle the chat.

Running the Example

With dependencies installed and environment variables set, run:

python -m pydantic_ai_examples.chat_app
uv run -m pydantic_ai_examples.chat_app

Then open the app at localhost:8000.

TODO screenshot.

Example Code

chat_app.py
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated

import fastapi
import logfire
from fastapi.responses import HTMLResponse, Response, StreamingResponse
from pydantic import Field, TypeAdapter

from pydantic_ai import Agent
from pydantic_ai.messages import Message, MessagesTypeAdapter, UserPrompt

# 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configured
logfire.configure(send_to_logfire='if-token-present')

agent = Agent('openai:gpt-4o')

app = fastapi.FastAPI()
logfire.instrument_fastapi(app)


@app.get('/')
async def index() -> HTMLResponse:
    return HTMLResponse((THIS_DIR / 'chat_app.html').read_bytes())


@app.get('/chat/')
async def get_chat() -> Response:
    msgs = database.get_messages()
    return Response(
        b'\n'.join(MessageTypeAdapter.dump_json(m) for m in msgs),
        media_type='text/plain',
    )


@app.post('/chat/')
async def post_chat(prompt: Annotated[str, fastapi.Form()]) -> StreamingResponse:
    async def stream_messages():
        """Streams new line delimited JSON `Message`s to the client."""
        # stream the user prompt so that can be displayed straight away
        yield MessageTypeAdapter.dump_json(UserPrompt(content=prompt)) + b'\n'
        # get the chat history so far to pass as context to the agent
        messages = list(database.get_messages())
        response = await agent.run(prompt, message_history=messages)
        # add new messages (e.g. the user prompt and the agent response in this case) to the database
        database.add_messages(response.new_messages_json())
        # stream the last message which will be the agent response, we can't just yield `new_messages_json()`
        # since we already stream the user prompt
        yield MessageTypeAdapter.dump_json(response.all_messages()[-1]) + b'\n'

    return StreamingResponse(stream_messages(), media_type='text/plain')


THIS_DIR = Path(__file__).parent
MessageTypeAdapter: TypeAdapter[Message] = TypeAdapter(
    Annotated[Message, Field(discriminator='role')]
)


@dataclass
class Database:
    """Very rudimentary database to store chat messages in a JSON lines file."""

    file: Path = THIS_DIR / '.chat_app_messages.jsonl'

    def add_messages(self, messages: bytes):
        with self.file.open('ab') as f:
            f.write(messages + b'\n')

    def get_messages(self) -> Iterator[Message]:
        if self.file.exists():
            with self.file.open('rb') as f:
                for line in f:
                    if line:
                        yield from MessagesTypeAdapter.validate_json(line)


database = Database()


if __name__ == '__main__':
    import uvicorn

    uvicorn.run(
        'pydantic_ai_examples.chat_app:app', reload=True, reload_dirs=[str(THIS_DIR)]
    )
chat_app.html
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Chat App</title>
  <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" rel="stylesheet">
  <style>
    main {
      max-width: 700px;
    }
    #conversation .user::before {
      content: 'You asked: ';
      font-weight: bold;
      display: block;
    }
    #conversation .llm-response::before {
      content: 'AI Response: ';
      font-weight: bold;
      display: block;
    }
    #spinner {
      opacity: 0;
      transition: opacity 500ms ease-in;
      width: 30px;
      height: 30px;
      border: 3px solid #222;
      border-bottom-color: transparent;
      border-radius: 50%;
      animation: rotation 1s linear infinite;
    }
    @keyframes rotation {
      0% { transform: rotate(0deg); }
      100% { transform: rotate(360deg); }
    }
    #spinner.active {
      opacity: 1;
    }
  </style>
</head>
<body>
  <main class="border rounded mx-auto my-5 p-4">
    <h1>Chat App</h1>
    <p>Ask me anything...</p>
    <div id="conversation" class="px-2"></div>
    <div class="d-flex justify-content-center mb-3">
      <div id="spinner"></div>
    </div>
    <form method="post">
      <input id="prompt-input" name="prompt" class="form-control"/>
      <div class="d-flex justify-content-end">
        <button class="btn btn-primary mt-2">Send</button>
      </div>
    </form>
    <div id="error" class="d-none text-danger">
      Error occurred, check the console for more information.
    </div>
  </main>
</body>
</html>
<script type="module">
  import { marked } from 'https://cdn.jsdelivr.net/npm/marked/lib/marked.esm.js';

  function addMessages(lines) {
    const messages = lines.filter(line => line.length > 1).map((line) => JSON.parse(line))
    const parent = document.getElementById('conversation');
    for (const message of messages) {
      let msgDiv = document.createElement('div');
      msgDiv.classList.add('border-top', 'pt-2', message.role);
      msgDiv.innerHTML = marked.parse(message.content);
      parent.appendChild(msgDiv);
    }
  }

  function onError(error) {
    console.error(error);
    document.getElementById('error').classList.remove('d-none');
    document.getElementById('spinner').classList.remove('active');
  }

  async function fetchResponse(response) {
    let text = '';
    if (response.ok) {
      const reader = response.body.getReader();
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          break;
        }
        text += new TextDecoder().decode(value);
        const lines = text.split('\n');
        if (lines.length > 1) {
          addMessages(lines.slice(0, -1));
          text = lines[lines.length - 1];
        }
      }
      addMessages(text.split('\n'));
      let input = document.getElementById('prompt-input')
      input.disabled = false;
      input.focus();
    } else {
      const text = await response.text();
      console.error(`Unexpected response: ${response.status}`, {response, text});
      throw new Error(`Unexpected response: ${response.status}`);
    }
  }

  async function onSubmit(e) {
    e.preventDefault();
    const spinner = document.getElementById('spinner');
    spinner.classList.add('active');
    const body = new FormData(e.target);

    let input = document.getElementById('prompt-input')
    input.value = '';
    input.disabled = true;

    const response = await fetch('/chat/', {method: 'POST', body});
    await fetchResponse(response);
    spinner.classList.remove('active');
  }

  // call onSubmit when form is submitted (e.g. user clicks the send button or hits Enter)
  document.querySelector('form').addEventListener('submit', (e) => onSubmit(e).catch(onError));

  // load messages on page load
  fetch('/chat/').then(fetchResponse).catch(onError);
</script>