Performance Guide
This guide covers performance tuning, optimization strategies, and best practices for getting the best performance from rapsqlite.
Connection Pooling
Understanding Connection Pools
rapsqlite uses connection pooling internally. The default pool size is 1, but you can configure it:
from rapsqlite import Connection
# Create connection with custom pool size
conn = Connection("example.db")
conn.pool_size = 5 # Allow up to 5 concurrent connections
conn.connection_timeout = 30 # 30 second timeout for acquiring connections
When to Increase Pool Size
Concurrent operations: If you have many concurrent database operations, increase pool size
Long-running queries: If queries take a long time, more connections allow better concurrency
Mixed read/write workloads: Separate connections for reads and writes
Pool Size Guidelines
Default (1): Good for single-threaded async applications or low concurrency
Small (2-5): Good for moderate concurrency, most web applications
Large (10+): Only for high-concurrency scenarios, be mindful of SQLite’s write serialization
Note: SQLite serializes writes, so increasing pool size mainly helps with concurrent reads.
Single connection vs larger pool
Single connection (pool_size=1, default): Use when you have one logical worker (e.g. one async task doing DB work at a time), or when concurrency is low. Session-connection reuse means one connection handles many sequential queries efficiently.
Larger pool (2–5 or more): Use when many concurrent coroutines perform database operations at the same time (e.g. many concurrent HTTP requests each hitting the DB). Increase
pool_sizebefore the first operation; it cannot be changed after the pool is created.
Prepared Statement Caching
rapsqlite automatically benefits from prepared statement caching provided by sqlx (the underlying database library). sqlx caches prepared statements per connection, meaning:
Automatic caching: Prepared statements are cached automatically - no configuration needed
Per-connection cache: Each connection in the pool maintains its own cache
Query normalization: rapsqlite normalizes queries (removes extra whitespace) to maximize cache hits
Performance benefit: Repeated queries with the same structure reuse prepared statements
To maximize cache hits:
# Good: Same query structure, different parameters
for user_id in user_ids:
await conn.fetch_all("SELECT * FROM users WHERE id = ?", [user_id])
# sqlx caches the prepared statement after first execution
# Also good: Query normalization handles whitespace differences
await conn.execute("INSERT INTO test (value) VALUES (?)", ["a"])
await conn.execute("INSERT INTO test (value) VALUES (?)", ["b"])
# Both queries are normalized and benefit from the same prepared statement
Best practices:
Use consistent query formatting (normalization happens automatically)
Reuse the same query strings with different parameters
Keep connections alive for repeated queries (connection pooling helps)
Use parameterized queries (required for prepared statements)
Avoid dynamic query building when possible (reduces cache hits)
Performance impact:
Prepared statement caching provides significant performance benefits for repeated queries:
First execution: Statement is prepared and cached
Subsequent executions: Reuses cached prepared statement (much faster)
Typical improvement: 2-5x faster for repeated queries vs. unique queries
PRAGMA Optimization
Configure SQLite for your workload:
# For read-heavy workloads
async with connect("example.db", pragmas={
"journal_mode": "WAL", # Write-Ahead Logging
"synchronous": "NORMAL", # Balance safety and performance
"cache_size": "-64000", # 64MB cache (negative = KB)
}) as conn:
# Your operations
pass
# For write-heavy workloads
async with connect("example.db", pragmas={
"journal_mode": "WAL",
"synchronous": "FULL", # Maximum safety
"wal_autocheckpoint": "1000", # Checkpoint every 1000 pages
}) as conn:
# Your operations
pass
Batch Operations
Use execute_many() for batch inserts:
# Good: Single transaction, prepared statement reuse
params = [[f"user_{i}"] for i in range(1000)]
await conn.execute_many("INSERT INTO users (name) VALUES (?)", params)
# Bad: Many individual transactions
for i in range(1000):
await conn.execute("INSERT INTO users (name) VALUES (?)", [f"user_{i}"])
Connection Reuse
Reuse connections when possible:
# Good: Reuse connection
async with connect("example.db") as conn:
for i in range(100):
await conn.execute("INSERT INTO test (value) VALUES (?)", [i])
# Bad: Create new connection for each operation
for i in range(100):
async with connect("example.db") as conn:
await conn.execute("INSERT INTO test (value) VALUES (?)", [i])
Use Appropriate Fetch Methods
fetch_all(): When you need all rowsfetch_one(): When you expect exactly one rowfetch_optional(): When you might have zero or one rowCursor.fetchmany(): When processing large result sets in chunks
Performance Monitoring
Query Timing
import time
start = time.perf_counter()
rows = await conn.fetch_all("SELECT * FROM users")
elapsed = time.perf_counter() - start
print(f"Query took {elapsed * 1000:.2f}ms")
Connection Pool Metrics
Monitor connection pool usage by tracking:
Number of concurrent operations
Connection acquisition timeouts
Pool size vs. actual usage
Troubleshooting Performance Issues
Slow Queries
Check if using parameterized queries: String formatting prevents prepared statement caching
Verify indexes: Use
get_indexes()to check table indexesAnalyze query plans: Use
EXPLAIN QUERY PLANto understand query executionCheck PRAGMA settings: Ensure appropriate settings for your workload
High Memory Usage
Reduce pool size: Smaller pool uses less memory
Use fetchmany(): Process large result sets in chunks
Clear query cache: Connection close clears prepared statement cache
Database Locked Errors
Increase timeout: Set
connection_timeouthigherUse WAL mode:
journal_mode = "WAL"allows concurrent readsReduce transaction duration: Keep transactions short
Implement retry logic: See error handling section in Advanced Usage Guide
Measuring performance and regression testing
Query timing: Use
timed_fetch_all(conn, sql, parameters, on_timing=...)fromrapsqliteto record query duration, or wrap calls withtime.perf_counter(). See Advanced Usage Guide for monitoring and trace callbacks.Regression tests: The test suite includes performance regression tests in
tests/test_performance.py. Run withpytest tests/ -m performanceto validate performance characteristics after changes.
Further Reading
Advanced Usage Guide - Advanced usage patterns and best practices
Migration Guide: aiosqlite to rapsqlite - Migrating from aiosqlite (performance impact of row format, pool, and PRAGMAs)