Building a High-Performance Sensor Data API with FastAPI and Postgres' TimescaleDB Extension
Create an API for streaming, storing, and querying sensor data using Postgres TimescaleDB and FastAPI
In this guide, you'll build a high-performance API for streaming, storing, and querying sensor data using FastAPI and TimescaleDB for efficient time-series data storage. By combining FastAPI with TimescaleDB's advanced time-series features, you'll be able to maintain low latency queries even at the petabyte scale, making it perfect for things like IoT systems that generate large volumes of sensor data.
Prerequisites
Before starting, ensure you have the following tools and services ready:
pip
: Required for installing and managing Python packages, including uv for creating virtual environments. You can check ifpip
is installed by running the following command:- Neon serverless Postgres : you will need a Neon account for provisioning and scaling your
PostgreSQL
database. If you don't have an account yet, sign up here.
Setting up the Project
Follow these steps to set up your project and virtual environment:
-
Create a
uv
projectIf you don't already have uv installed, you can install it with:
Once
uv
is installed, create a new project:This will create a new project directory called
timescale_fastapi
. Open this directory in your code editor of your choice. -
Set up the virtual environment.
You will now create and activate a virtual environment in which your project's dependencies will beinstalled.
You should see
(timescale_fastapi)
in your terminal now, this means that your virtual environment is activated. -
Install dependencies.
Next, add all the necessary dependencies for your project:
uv add python-dotenv asyncpg loguru fastapi uvicorn requests
where each package does the following:
FastAPI
: A Web / API frameworkAsyncPG
: An asynchronous PostgreSQL clientUvicorn
: An ASGI server for our appLoguru
: A logging libraryPython-dotenv
: To load environment variables from a .env file
-
Create the project structure.
Create the following directory structure to organize your project files:
timescale_fastapi ├── src/ │ ├── database/ │ │ └── postgres.py │ ├── models/ │ │ └── product_models.py │ ├── routes/ │ │ └── product_routes.py │ └── main.py ├── .env ├── .python-version ├── README.md ├── pyproject.toml └── uv.lock
Setting up your Database
In this section, you will set up the TimescaleDB
extension using Neon's console, add the database's schema, and create the database connection pool and lifecycle management logic in FastAPI. Optionally, you can also add some mock data to test your API endpoints.
Given TimescaleDB is an extension on top of vanilla Postgres, you must first add the extension by running the following SQL in the SQL Editor
tab of the Neon console.
CREATE EXTENSION IF NOT EXISTS timescaledb;
Next, you will add the necessary tables to your database with:
CREATE TABLE IF NOT EXISTS sensors (
sensor_id SERIAL PRIMARY KEY,
sensor_type VARCHAR(50) NOT NULL,
description VARCHAR(255),
location VARCHAR(255)
);
CREATE TABLE IF NOT EXISTS sensor_data (
sensor_id INT REFERENCES sensors(sensor_id),
value FLOAT NOT NULL,
time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(sensor_id, time)
);
One of TimescaleDB's core features is Hypertables
, which is an optimized abstraction for handling large time-series data. It partitions your data into chunks based on time, allowing efficient storage, querying, and performance at scale. By converting the sensor_data table into a hypertable, TimescaleDB will manage the underlying chunking and indexing automatically.
To convert the sensor_data
table into a hypertable, use the following command:
SELECT create_hypertable('sensor_data', 'time');
Now that the schema is ready, you can optionally populate the database with some sample sensor data. First, insert the metadata for two sensors:
INSERT INTO sensors (sensor_type, description, location)
VALUES
('temperature', 'Outdoor temperature sensor', 'Backyard'),
('humidity', 'Indoor humidity sensor', 'Living Room');
Next, generate time-series data for the past 14 days with one-minute intervals for both sensors. Here's how you can insert random data for each sensor using Timescales generate_series()
feature.
INSERT INTO sensor_data (sensor_id, value, time)
SELECT 1 as sensor_id,
15 + random() * 15 AS value,
generate_series(
now() - INTERVAL '14 days',
now(),
INTERVAL '1 minute'
) AS time;
INSERT INTO sensor_data (sensor_id, value, time)
SELECT 2 as sensor_id,
40 + random() * 20 AS value,
generate_series(
now() - INTERVAL '14 days',
now(),
INTERVAL '1 minute'
) AS time;
With your schema and sample data in place, you're now ready to connect to your database in the FastAPI application. To do this you must create a .env
file in the root of the project to hold environment-specific variables, such as the connection string to your Neon PostgreSQL database.
DATABASE_URL=postgres://user:password@your-neon-hostname.neon.tech/neondb?sslmode=require
Make sure to replace the placeholders (user, password, your-neon-hostname, etc.) with your actual Neon database credentials, which are available in the console.
In your project, the database.py
file manages the connection to PostgreSQL
using asyncpg
and its connection pool, which is a mechanism for managing and reusing database connections efficiently. With this, you can use asynchronous queries, allowing the application to handle multiple requests concurrently.
import os
import asyncpg
import dotenv
from loguru import logger
from typing import Optional
dotenv.load_dotenv()
conn_pool: Optional[asyncpg.Pool] = None
async def init_postgres() -> None:
"""
Initialize the PostgreSQL connection pool and create the products table if it doesn't exist.
"""
global conn_pool
try:
logger.info("Initializing PostgreSQL connection pool...")
conn_pool = await asyncpg.create_pool(
dsn=os.getenv("DATABASE_URL"), min_size=1, max_size=10
)
logger.info("PostgreSQL connection pool created successfully.")
except Exception as e:
logger.error(f"Error initializing PostgreSQL connection pool: {e}")
raise
async def get_postgres() -> asyncpg.Pool:
"""
Get a reference to the PostgreSQL connection pool.
Returns
-------
asyncpg.Pool
The connection pool object to the PostgreSQL database.
"""
global conn_pool
if conn_pool is None:
logger.error("Connection pool is not initialized.")
raise ConnectionError("PostgreSQL connection pool is not initialized.")
try:
return conn_pool
except Exception as e:
logger.error(f"Failed to return PostgreSQL connection pool: {e}")
raise
async def close_postgres() -> None:
"""
Close the PostgreSQL connection pool.
"""
global conn_pool
if conn_pool is not None:
try:
logger.info("Closing PostgreSQL connection pool...")
await conn_pool.close()
logger.info("PostgreSQL connection pool closed successfully.")
except Exception as e:
logger.error(f"Error closing PostgreSQL connection pool: {e}")
raise
else:
logger.warning("PostgreSQL connection pool was not initialized.")
init_postgres
is responsible for opening the connection pool to the PostgreSQL
database and close_postgres
is responsible for gracefully closing all connections in the pool when the FastAPI
app shuts down to properly manage the lifecycle of the database.
Throughout your API you will also need access to the pool to get connection instances and run queries. get_postgres
returns the active connection pool. If the pool is not initialized, an error is raised.
Defining the Pydantic Models
Now, you will create Pydantic
models to define the structure of the data your API expects and returns, automatically validating incoming requests and responses versus the defined format.
from pydantic import BaseModel
from typing import List
from datetime import datetime, date
class SensorData(BaseModel):
value: float
timestamp: datetime
class SensorDataBatch(BaseModel):
data: List[SensorData]
class SensorCreate(BaseModel):
sensor_type: str
description: str
location: str
class SensorDailyStatsResponse(BaseModel):
day: date
sensor_id: int
avg_value: float
min_value: float
max_value: float
reading_count: int
median_value: float
iqr_value: float
Each of the models represent the following:
SensorData
: A single sensor reading, including the value recorded and the timestamp when the reading occurredSensorDataBatch
: A batch of data points, to support batch streaming in your APISensorCreate
: The fields for creating a new sensorSensorDailyStatsResponse
: The daily sensor statistics
Creating the API Endpoints
In this section, you will define the FastAPI endpoints that allow you to manage sensor data. These endpoints handle tasks like creating new sensors, streaming sensor data (both single points and batches), and querying daily statistics for a specific sensor. With these endpoints, you can efficiently manage and analyze sensor data using TimescaleDB’s time-series capabilities.
from fastapi import HTTPException, Path, Body, APIRouter, Depends
from database.postgres import get_postgres
from typing import Union, List
from asyncpg import Pool
from loguru import logger
from models.sensor_models import (
SensorData,
SensorDataBatch,
SensorCreate,
SensorDailyStatsResponse,
)
sensor_router = APIRouter()
@sensor_router.post("/sensors")
async def create_sensor(
sensor: SensorCreate = Body(...), db: Pool = Depends(get_postgres)
):
"""
Create a new sensor.
Parameters
----------
sensor : SensorCreate
The sensor details (type, description, and location) to create.
db : asyncpg.Pool
Database connection pool injected by dependency.
Returns
-------
dict
A dictionary containing the newly created sensor ID and a success message.
"""
insert_query = """
INSERT INTO sensors (sensor_type, description, location)
VALUES ($1, $2, $3)
RETURNING sensor_id;
"""
logger.info(
f"Creating new sensor with type: {sensor.sensor_type}, location: {sensor.location}"
)
async with db.acquire() as conn:
sensor_id = await conn.fetchval(
insert_query, sensor.sensor_type, sensor.description, sensor.location
)
if sensor_id is None:
logger.error("Failed to create sensor.")
raise HTTPException(status_code=500, detail="Failed to create sensor")
logger.info(f"Sensor created successfully with ID: {sensor_id}")
return {"sensor_id": sensor_id, "message": "Sensor created successfully."}
@sensor_router.post("/sensor_data/{sensor_id}")
async def stream_sensor_data(
sensor_id: int = Path(...),
sensor_data: Union[SensorData, SensorDataBatch] = Body(...),
db: Pool = Depends(get_postgres),
):
"""
Stream sensor data (single or batch) for a specific sensor.
Parameters
----------
sensor_id : int
The ID of the sensor to associate the data with.
sensor_data : Union[SensorData, SensorDataBatch]
The sensor data to stream, which can be either a single data point or a batch.
db : asyncpg.Pool
Database connection pool injected by dependency.
Returns
-------
dict
A success message once the data is streamed.
"""
insert_query = """
INSERT INTO sensor_data (sensor_id, value, time)
VALUES ($1, $2, $3);
"""
logger.info(f"Streaming data for sensor_id: {sensor_id}")
async with db.acquire() as conn:
async with conn.transaction():
if isinstance(sensor_data, SensorDataBatch):
for data in sensor_data.data:
logger.debug(f"Batch data: {data.value} at {data.timestamp}")
await conn.execute(
insert_query, sensor_id, data.value, data.timestamp
)
elif isinstance(sensor_data, SensorData):
logger.debug(
f"Single data: {sensor_data.value} at {sensor_data.timestamp}"
)
await conn.execute(
insert_query, sensor_id, sensor_data.value, sensor_data.timestamp
)
logger.info(f"Sensor data streamed successfully for sensor_id: {sensor_id}")
return {"message": "Sensor data streamed successfully."}
@sensor_router.get(
"/daily_avg/{sensor_id}", response_model=List[SensorDailyStatsResponse]
)
async def get_sensor_daily_avg(
sensor_id: int = Path(..., description="The ID of the sensor"),
db: Pool = Depends(get_postgres),
):
"""
Query daily statistics (min, max, median, IQR) for a specific sensor over the last 7 days.
Parameters
----------
sensor_id : int
The ID of the sensor.
db : asyncpg.Pool
Database connection pool injected by dependency.
Returns
-------
List[SensorDailyStatsResponse]
A list of daily sensor statistics (average, min, max, median, IQR).
"""
query = """
WITH sensor_stats AS (
SELECT
time_bucket('1 day', time) AS day,
sensor_id,
avg(value) AS avg_value,
min(value) AS min_value,
max(value) AS max_value,
count(*) AS reading_count,
percentile_cont(0.5) WITHIN GROUP (ORDER BY value) AS median_value,
percentile_cont(0.75) WITHIN GROUP (ORDER BY value) -
percentile_cont(0.25) WITHIN GROUP (ORDER BY value) AS iqr_value
FROM sensor_data
WHERE sensor_id = $1
GROUP BY day, sensor_id
)
SELECT * FROM sensor_stats
ORDER BY day DESC
LIMIT 7;
"""
async with db.acquire() as conn:
rows = await conn.fetch(query, sensor_id)
if not rows:
raise HTTPException(status_code=404, detail="No data found for this sensor.")
return [
SensorDailyStatsResponse(
day=row["day"],
sensor_id=row["sensor_id"],
avg_value=row["avg_value"],
min_value=row["min_value"],
max_value=row["max_value"],
reading_count=row["reading_count"],
median_value=row["median_value"],
iqr_value=row["iqr_value"],
)
for row in rows
]
The code defines endpoints for:
-
POST /sensors
: This endpoint creates a new sensor by providing the sensor type, description, and location. -
POST /sensor_data/{sensor_id}
: Streams sensor data for a specific sensor. The data can be a single point or a batch. -
GET /daily_avg/{sensor_id}
: Retrieves daily statistics (average, min, max, median, IQR) for the given sensor over the last 7 days.
In the query for the sensor statistics, the data is able to be partitioned quickly with Timescale's time_bucket()
function by using the indexes generated when you created the hypertable. Likewise, you can easily calculate things like the interquartile range (IQR) using Timescale-specific functions.
Running the Application
After setting up the database, models, and API routes, the next step is to run the FastAPI
application and test it out.
The main.py
file defines the FastAPI
application, manages the database lifecycle, and includes the routes you created above.
from fastapi import FastAPI
from contextlib import asynccontextmanager
from database.postgres import init_postgres, close_postgres
from routes.sensor_routes import sensor_router
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_postgres()
yield
await close_postgres()
app: FastAPI = FastAPI(lifespan=lifespan, title="FastAPI TimescaleDB Sensor Data API")
app.include_router(sensor_router)
To run the application, use uvicorn CLI with the following command:
uvicorn main:app --host 0.0.0.0 --port 8080
Once the server is running, you can access the API documentation and test the endpoints directly in your browser:
- Interactive API Docs (Swagger UI):
Visithttp://127.0.0.1:8080/docs
to access the automatically generated API documentation where you can test the endpoints. - Alternative Docs (ReDoc):
Visithttp://127.0.0.1:8080/redoc
for another style of API documentation.
Testing the API
You can test your application using HTTPie
, a command-line tool for making HTTP requests. The following steps will guide you through creating sensors, streaming data, and querying sensor statistics.
-
Retrieve sensor statistics for pre-generated data (optional).
If you followed the optional data generation steps, you can retrieve daily statistics for the pre-generated sensors:
http GET http://127.0.0.1:8080/daily_avg/1
http GET http://127.0.0.1:8080/daily_avg/2
These commands will return the daily statistics (average, min, max, median, and IQR) for the pre-generated temperature and humidity sensors over the last 7 days.
-
Create a new sensor.
Start by creating a new sensor (e.g., a temperature sensor for the living room):
http POST http://127.0.0.1:8080/sensors sensor_type="temperature" description="Living room temperature sensor" location="Living Room"
You should see a response confirming the creation of the sensor with a unique ID:
{ "sensor_id": 3, "message": "Sensor created successfully." }
-
Stream a single sensor data point.
Stream a single data point for the newly created sensor (
sensor_id = 3
):http POST http://127.0.0.1:8080/sensor_data/3 value:=23.5 timestamp="2024-10-12T14:29:00"
You should get a response indicating success:
{ "message": "Sensor data streamed successfully." }
-
Stream a batch of sensor data.
You can also stream multiple sensor data points in a batch for the same sensor:
http POST http://127.0.0.1:8080/sensor_data/3 data:='[{"value": 22.5, "timestamp": "2024-10-12T14:30:00"}, {"value": 22.7, "timestamp": "2024-10-12T14:31:00"}]'
This will send two data points to the sensor. The response will confirm successful streaming of the batch data:
{ "message": "Sensor data streamed successfully." }
-
Retrieve daily statistics for the new sensor.
After streaming the sensor data, you can retrieve the daily statistics for the new sensor (
sensor_id = 3
):http GET http://127.0.0.1:8080/daily_avg/3
This will return daily statistics (average, min, max, median, and IQR) for the new sensor over the last 7 days:
[ { "day": "2024-10-12", "sensor_id": 3, "avg_value": 22.6, "min_value": 22.5, "max_value": 22.7, "reading_count": 2, "median_value": 22.6, "iqr_value": 0.2 } ]
By following these steps, you can easily create sensors, stream sensor data, and query statistics from your API. For sensors with pre-generated data, you can retrieve the statistics immediately. For new sensors, you can stream data and retrieve their daily stats dynamically.
Conclusion
Now, you have created and tested an API for managing, streaming, and querying sensor data into TimescaleDB
using FastAPI
. By leveraging TimescaleDB for time-series data storage, you now have a high-performance solution for handling sensor data at scale.
As a next step, you can look into streaming data into the database using a distributed event platform like Kafka
or Red Panda
, or using Timescale
to monitor the sensor data with Apache Superset
or Grafana
.