Neon Authorize just launched. Add row-level security to your codebase, with simplified syntax

Building an Async Product Management API with FastAPI, Pydantic, and PostgreSQL

Learn how to create an asynchronous API for managing products using FastAPI, Pydantic for data validation, and PostgreSQL with connection pooling

Following this guide, you’ll build an asynchronous product management API and leverage FastAPI's async capabilities and connection pools to efficiently manage database connections, ensuring your API can scale and handle high traffic with ease. Whether you’re aiming to improve performance or simply learn the best practices for building async APIs, this guide has you covered.

Prerequisites

Before starting, ensure you have the following tools and services ready:

  • pip : Required for installing and managing Python packages, including uv for creating virtual environments. You can check if pip is installed by running the following command:
    pip --version
  • Neon serverless Postgres : you will need a Neon account for provisioning and scaling your PostgreSQL database. If you don't have an account yet, sign up here

Setting up the Project

Follow these steps to set up your project and virtual environment:

  1. Create a uv project

    If you don't already have uv installed, you can install it with:

    pip install uv

    Once uv is installed, create a new project:

    uv init async_postgres

    This will create a new project directory called async_postgres. Open this directory in your code editor of your choice.

  2. Set Up the Virtual Environment

    You will now create and activate a virtual environment in which your project's dependencies will be installed.

    Linux/macOS
    Windows
    uv venv
    source .venv/bin/activate

    You should see (async_postgres) in your terminal now, this means that your virtual environment is activated.

  3. Install Dependencies

    Next, add all the necessary dependencies for your project:

    uv add python-dotenv asyncpg loguru fastapi uvicorn requests

    Where each package does the following :

    • FastAPI : A Web / API framework
    • AsyncPG : An asynchronous PostgreSQL client
    • Uvicorn : An ASGI server for our app
    • Loguru : A logging library
    • Python-dotenv : To load environment variables from a .env file
  4. Create the project structure

    Now, create the following directory structure to organize your project files:

    async_postgres
    ├── src/
    │ ├── database/
    │ │ └── postgres.py
    │ ├── models/
    │ │ └── product_models.py
    │ ├── routes/
    │ │ └── product_routes.py
    │ └── main.py
    ├── .env  
    ├── .python-version
    ├── README.md  
    ├── pyproject.toml  
    └── uv.lock

Setting up your Database

In this section, you will set up the connection pool, ensure your database schema is in place, and manage database connections effectively. To connect to your PostgreSQL database, you will use the asyncpg library for asynchronous database connections.

First, create a .env file in the root of your project to store the database connection URL. This file will hold environment-specific variables, such as the connection string to your Neon PostgreSQL database.

DATABASE_URL=postgres://user:password@your-neon-hostname.neon.tech/neondb?sslmode=require

Make sure to replace the placeholders (user, password, your-neon-hostname, etc.) with your actual Neon database credentials which are available in the console.

In your project, the database.py file manages the connection to PostgreSQL using asyncpg and its connection pool, which is a mechanism for managing and reusing database connections efficiently. With this, you can use asynchronous queries, allowing the application to handle multiple requests concurrently.

import os
import asyncpg
import dotenv
from loguru import logger
from typing import Optional

dotenv.load_dotenv()

conn_pool: Optional[asyncpg.Pool] = None

async def init_postgres() -> None:
    """
    Initialize the PostgreSQL connection pool and create the products table if it doesn't exist.

    This function is meant to be called at the startup of the FastAPI app to
    initialize a connection pool to PostgreSQL and ensure that the required
    database schema is in place.
    """
    global conn_pool
    try:
        logger.info("Initializing PostgreSQL connection pool...")

        conn_pool = await asyncpg.create_pool(
            dsn=os.getenv("DATABASE_URL"), min_size=1, max_size=10
        )
        logger.info("PostgreSQL connection pool created successfully.")

    except Exception as e:
        logger.error(f"Error initializing PostgreSQL connection pool: {e}")
        raise
    try:
        async with conn_pool.acquire() as conn:
            create_table_query = """
            CREATE TABLE IF NOT EXISTS products (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                price NUMERIC(10, 2) NOT NULL CHECK (price >= 0),
                quantity INT NOT NULL CHECK (quantity >= 0),
                description VARCHAR(255)
            );
            """
            async with conn.transaction():
                await conn.execute(create_table_query)
            logger.info("Products table ensured to exist.")

    except Exception as e:
        logger.error(f"Error creating the products table: {e}")
        raise


async def get_postgres() -> asyncpg.Pool:
    """
    Return the PostgreSQL connection pool.

    This function returns the connection pool object, from which individual
    connections can be acquired as needed for database operations. The caller
    is responsible for acquiring and releasing connections from the pool.

    Returns
    -------
    asyncpg.Pool
        The connection pool object to the PostgreSQL database.

    Raises
    ------
    ConnectionError
        Raised if the connection pool is not initialized.
    """
    global conn_pool
    if conn_pool is None:
        logger.error("Connection pool is not initialized.")
        raise ConnectionError("PostgreSQL connection pool is not initialized.")
    try:
        return conn_pool
    except Exception as e:
        logger.error(f"Failed to return PostgreSQL connection pool: {e}")
        raise



async def close_postgres() -> None:
    """
    Close the PostgreSQL connection pool.

    This function should be called during the shutdown of the FastAPI app
    to properly close all connections in the pool and release resources.
    """
    global conn_pool
    if conn_pool is not None:
        try:
            logger.info("Closing PostgreSQL connection pool...")
            await conn_pool.close()
            logger.info("PostgreSQL connection pool closed successfully.")
        except Exception as e:
            logger.error(f"Error closing PostgreSQL connection pool: {e}")
            raise
    else:
        logger.warning("PostgreSQL connection pool was not initialized.")

init_postgres is responsible for opening the connection pool to the PostgreSQL database and setting up the required database schema. Specifically, it ensures that the necessary database tables (such as the products table) are created if they don’t already exist, preparing the application to start accepting requests.

To properly manage the lifecycle of the database, you need a function to close the connection pool when the API spins down close_postgres is responsible for gracefully closing all connections in the pool when the FastAPI app shuts down.

Throughout your API you will also need access to the pool to get connection instances and run queries. get_postgres returns the active connection pool. If the pool is not initialized, an error is raised. The term for passing this in is Dependency Injection.

Defining the Pydantic Models

Pydantic is a data validation library in Python that ensures data entering or leaving your API is valid by enforcing constraints and data types.

In your application, you will define several models using Pydantic to represent the data for products. These models will be used to create, update, and manage products in the database, as well as handle validation when clients interact with our API.

from pydantic import BaseModel, Field, ConfigDict
from typing import Optional


class Product(BaseModel):
    """
    Represents the product table in the database.
    """
    model_config = ConfigDict(from_attributes=True)
    id: int
    name: str
    price: float
    quantity: int
    description: Optional[str]


class ProductCreate(BaseModel):
    """
    Represents the required fields to create a new product.
    """
    name: str
    price: float = Field(..., ge=0)
    quantity: int = Field(..., ge=0)
    description: Optional[str] = Field(None, max_length=255)


class ProductUpdate(BaseModel):
    """
    Represents optional fields to update an existing product.
    Allows partial updates.
    """
    name: Optional[str] = None
    price: Optional[float] = Field(None, ge=0)
    quantity: Optional[int] = Field(None, ge=0)
    description: Optional[str] = Field(None, max_length=255)


class ProductStockUpdate(BaseModel):
    """
    Represents the stock update for a product's quantity.
    """
    quantity: int = Field(..., ge=0)

Creating the API Endpoints

In this section, you will create the API endpoints that allow you to manage products in your PostgreSQL database. These endpoints will allow you to create, retrieve, update, delete, and manage product stock. You will leverage asynchronous database connections using asyncpg.

Each endpoint follows a similar flow for interacting with the database. You will first get a connection from the connection pool, execute the desired query, and release the connection back to the pool. Since the connection pool is used as a context manager, the connection will automatically be returned to the pool after each operation.

The common database flow goes as follows :

  1. Getting the Connection Pool:
    • You inject the connection pool using FastAPI's Depends() function, which allows you to easily retrieve a connection from the pool.
  2. Acquiring a Connection:
    • Using the connection pool, you acquire a connection by calling async with db_pool.acquire() as conn:. This ensures you obtain a database connection to run the query.
  3. Running the Query:
    • Once the connection is acquired, you run the query using methods such as fetchrow() (for single rows) or fetch() (for multiple rows) depending on the operation.
  4. Returning the Connection to the Pool:
    • Once the query is complete, the connection is automatically returned to the pool because the async with context manager handles the lifecycle of the connection.
from fastapi import HTTPException, Query, Path, Body, APIRouter, Depends
from models.product_models import Product, ProductCreate, ProductUpdate, ProductStockUpdate
from database.postgres import get_postgres
from typing import List
import asyncpg
from loguru import logger

product_router = APIRouter()


@product_router.post("/products", response_model=Product)
async def create_product(
    product: ProductCreate = Body(...),
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> Product:
    """
    Create a new product.

    Parameters
    ----------
    product : ProductCreate
        The product details to create.
    db_pool : asyncpg.Pool
        Database connection pool injected by dependency.

    Returns
    -------
    Product
        The newly created product.
    """
    query = """
    INSERT INTO products (name, price, quantity, description)
    VALUES ($1, $2, $3, $4)
    RETURNING id, name, price, quantity, description
    """
    try:
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(
                query,
                product.name,
                product.price,
                product.quantity,
                product.description,
            )

        if result:
            return Product(**dict(result))
        else:
            logger.error("Failed to create product")
            raise HTTPException(status_code=500, detail="Failed to create product")
    except Exception as e:
        logger.error(f"Error during product creation: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during product creation"
        )


@product_router.get("/products", response_model=List[Product])
async def get_all_products(
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> List[Product]:
    """
    Get a list of all products.

    Parameters
    ----------
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    List[Product]
        A list of all products in the inventory.
    """
    query = "SELECT id, name, price, quantity, description FROM products"

    try:
        async with db_pool.acquire() as conn:
            results = await conn.fetch(query)
            return [Product(**dict(result)) for result in results]
    except Exception as e:
        logger.error(f"Error fetching products: {e}")
        raise HTTPException(status_code=500, detail="Failed to retrieve products")


@product_router.get("/products/{id}", response_model=Product)
async def get_product_by_id(
    id: int = Path(..., ge=1),
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> Product:
    """
    Get a product by its ID.

    Parameters
    ----------
    id : int
        The ID of the product.
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    Product
        The product details for the given ID.
    """
    query = "SELECT id, name, price, quantity, description FROM products WHERE id = $1"

    try:
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(query, id)
            if result:
                return Product(**dict(result))
            else:
                logger.warning(f"Product with ID {id} not found")
                raise HTTPException(status_code=404, detail="Product not found")
    except Exception as e:
        logger.error(f"Error fetching product by ID: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during product retrieval"
        )


@product_router.put("/products/{id}", response_model=Product)
async def update_product(
    id: int = Path(..., ge=1),
    product: ProductUpdate = Body(...),
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> Product:
    """
    Update a product by its ID.

    Parameters
    ----------
    id : int
        The ID of the product to update.
    product : ProductUpdate
        The fields to update (partial updates allowed).
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    Product
        The updated product details.
    """
    query = """
    UPDATE products
    SET name = COALESCE($1, name),
        price = COALESCE($2, price),
        quantity = COALESCE($3, quantity),
        description = COALESCE($4, description)
    WHERE id = $5
    RETURNING id, name, price, quantity, description
    """

    try:
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(
                query,
                product.name,
                product.price,
                product.quantity,
                product.description,
                id,
            )
            if result:
                return Product(**dict(result))
            else:
                logger.warning(f"Product with ID {id} not found for update")
                raise HTTPException(status_code=404, detail="Product not found")
    except Exception as e:
        logger.error(f"Error updating product: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during product update"
        )


@product_router.delete("/products/{id}")
async def delete_product(
    id: int = Path(..., ge=1),
    db_pool: asyncpg.Pool = Depends(get_postgres)
) -> dict:
    """
    Delete a product by its ID.

    Parameters
    ----------
    id : int
        The ID of the product to delete.
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    dict
        A message indicating the product was deleted.
    """
    query = "DELETE FROM products WHERE id = $1 RETURNING id"

    try:
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(query, id)
            if result:
                return {"message": "Product deleted successfully"}
            else:
                logger.warning(f"Product with ID {id} not found for deletion")
                raise HTTPException(status_code=404, detail="Product not found")
    except Exception as e:
        logger.error(f"Error deleting product: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during product deletion"
        )


@product_router.patch("/products/{id}/stock", response_model=Product)
async def update_product_stock(
    id: int = Path(..., ge=1),
    stock: ProductStockUpdate = Body(...),
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> Product:
    """
    Update the stock (quantity) of a product by its ID.

    Parameters
    ----------
    id : int
        The ID of the product to update.
    stock : ProductStockUpdate
        The new quantity for the product.
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    Product
        The updated product with new stock quantity.
    """
    query = """
    UPDATE products
    SET quantity = $1
    WHERE id = $2
    RETURNING id, name, price, quantity, description
    """
    try:
        async with db_pool.acquire() as conn:
            result = await conn.fetchrow(query, stock.quantity, id)
            if result:
                return Product(**dict(result))
            else:
                raise HTTPException(status_code=404, detail="Product not found")
    except Exception as e:
        logger.error(f"Error updating product stock: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during product stock update"
        )


@product_router.get("/products/filter/price", response_model=List[Product])
async def filter_products_by_price(
    min_price: float = Query(...),
    max_price: float = Query(...),
    db_pool: asyncpg.Pool = Depends(get_postgres),
) -> List[Product]:
    """
    Get products within a specific price range.

    Parameters
    ----------
    min_price : float
        The minimum price for filtering.
    max_price : float
        The maximum price for filtering.
    db_pool : asyncpg.Pool, optional
        Database connection pool injected by dependency.

    Returns
    -------
    List[Product]
        A list of products within the specified price range.
    """
    query = """
    SELECT id, name, price, quantity, description
    FROM products
    WHERE price BETWEEN $1 AND $2
    """
    try:
        async with db_pool.acquire() as conn:
            results = await conn.fetch(query, min_price, max_price)
            return [Product(**dict(result)) for result in results]
    except Exception as e:
        logger.error(f"Error filtering products by price: {e}")
        raise HTTPException(
            status_code=500, detail="Internal server error during price filtering"
        )

The code defines endpoints for :

  • POST /products: Creates a new product. It receives the product data (name, price, quantity, and description) and inserts it into the database. The newly created product is returned.

  • GET /products: Retrieves all products from the database. The response is a list of products, each containing its ID, name, price, quantity, and description.

  • GET /products/{id}: Retrieves a product by its unique ID. If the product exists, its details are returned; otherwise, a 404 error is raised.

  • PUT /products/{id}: Updates an existing product by its ID. The update can be partial, as it uses COALESCE to only update the fields provided. The updated product is returned.

  • DELETE /products/{id}: Deletes a product by its ID. If the product is successfully deleted, a success message is returned.

  • PATCH /products/{id}/stock: Updates the stock (quantity) of a specific product by its ID. The updated product, with the new quantity, is returned.

  • GET /products/filter/price: Retrieves products within a specific price range. You pass min_price and max_price as query parameters, and the endpoint returns a list of products that fall within that range.

Running the Application

After setting up the database, models, and API routes, the next step is to run the FastAPI application. The main.py file is the entry point for the application, and Uvicorn starts and serves it.

The main.py file defines the FastAPI application, manages lifecycle events like starting and closing the PostgreSQL connection pool, and includes the product-related routes. Here, you will use the @asynccontextmanager decorator to manage the database connection pool lifecycle.

from fastapi import FastAPI
from contextlib import asynccontextmanager
from database.postgres import init_postgres, close_postgres
from routes.product_routes import product_router
import uvicorn


@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_postgres()
    yield
    await close_postgres()


app: FastAPI = FastAPI(lifespan=lifespan, title="Async FastAPI PostgreSQL Inventory Manager")
app.include_router(product_router)

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True)

To run the application, use the following command:

uv run src/main.py

Once the server is running, you can access the API documentation and test the endpoints directly in your browser:

  • Interactive API Docs (Swagger UI):
    Visit http://127.0.0.1:8080/docs to access the automatically generated API documentation where you can test the endpoints.
  • Alternative Docs (ReDoc):
    Visit http://127.0.0.1:8080/redoc for another style of API documentation.

Testing the API

You can also use tools like httpie, curl, and Postman to test the API.

Below are examples of how to interact with the API using httpie, a command-line HTTP client.

  1. Create a Product

    Start by creating a new product:

    http POST http://127.0.0.1:8080/products name="Test Product" price:=9.99 quantity:=100 description="A test product"

    You should see a response with the created product data:

    {
        "id": 1,
        "name": "Test Product",
        "price": 9.99,
        "quantity": 100,
        "description": "A test product"
    }
  2. Retrieve All Products

    Next, retrieve all products from the database:

    http GET http://127.0.0.1:8080/products

    This will return a list of all products in the database:

    [
        {
            "id": 1,
            "name": "Test Product",
            "price": 9.99,
            "quantity": 100,
            "description": "A test product"
        }
    ]
  3. Retrieve a Specific Product by ID

    You can also retrieve a specific product by its ID:

    http GET http://127.0.0.1:8080/products/1

    This will return the product details for the product with ID 1:

    {
        "id": 1,
        "name": "Test Product",
        "price": 9.99,
        "quantity": 100,
        "description": "A test product"
    }
  4. Update a Product

    To update an existing product, use the following command:

    http PUT http://127.0.0.1:8080/products/1 name="Updated Product" price:=12.99 quantity:=150 description="An updated product description"

    This will return the updated product data:

    {
        "id": 1,
        "name": "Updated Product",
        "price": 12.99,
        "quantity": 150,
        "description": "An updated product description"
    }
  5. Update Product Stock

    You can also update just the stock (quantity) of a product:

    http PATCH http://127.0.0.1:8080/products/1/stock quantity:=200

    This will return the updated product with the new quantity:

    {
        "id": 1,
        "name": "Updated Product",
        "price": 12.99,
        "quantity": 200,
        "description": "An updated product description"
    }
  6. Filter Products by Price Range

    To filter products by a specific price range, use this command:

    http GET http://127.0.0.1:8080/products/filter/price min_price==5.00 max_price==15.00

    This will return products that fall within the specified price range:

    [
        {
            "id": 1,
            "name": "Updated Product",
            "price": 12.99,
            "quantity": 200,
            "description": "An updated product description"
        }
    ]
  7. Delete a Product

    To delete a product by its ID, use the following command:

    http DELETE http://127.0.0.1:8080/products/1

    If successful, you will receive a confirmation message:

    {
        "message": "Product deleted successfully"
    }

Conclusion

Using this guide, you have built a fully functional API for managing products using FastAPI, Pydantic, and PostgreSQL with asyncpg.

This stack provides a solid foundation for building high-performance and scalable web services. FastAPI's asynchronous support, combined with Pydantic's robust data validation and asyncpg's efficient database interactions, allows for fast and reliable API development.

As a next step, you can look at deploying this application in the cloud using scalable technologies like Docker and Kubernetes, or implementing automated test, build, and deployment workflows using GitHub CI

Last updated on

Was this page helpful?