API Reference
This section provides detailed documentation for all public APIs in rapsqlite.
Core Classes
Module Functions
True async SQLite — no fake async, no GIL stalls.
rapsqlite provides true async SQLite operations for Python, backed by Rust, Tokio, and sqlx. Unlike libraries that wrap blocking database calls in async syntax, rapsqlite guarantees that all database operations execute outside the Python GIL, ensuring event loops never stall under load. Supports type adapters and converters (register_adapter, register_converter) and custom aggregates and collations (create_aggregate, create_collation) per-connection (sqlite3-style).
Example
Basic usage:
import asyncio
from rapsqlite import Connection
async def main():
async with Connection("example.db") as conn:
await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
await conn.execute("INSERT INTO test (value) VALUES ('hello')")
rows = await conn.fetch_all("SELECT * FROM test")
print(rows)
# Output: [[1, 'hello']]
asyncio.run(main())
Using the connect() function (aiosqlite-compatible):
import asyncio
from rapsqlite import connect
async def main():
async with connect("example.db") as conn:
await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
await conn.execute("INSERT INTO test (value) VALUES ('hello')")
rows = await conn.fetch_all("SELECT * FROM test")
print(rows)
# Output: [[1, 'hello']]
asyncio.run(main())
Transactions:
async with Connection("example.db") as conn:
await conn.begin()
try:
await conn.execute("INSERT INTO users (name) VALUES ('Alice')")
await conn.commit()
except Exception:
await conn.rollback()
- rapsqlite.ConnectionT
alias of
Connection
- rapsqlite.connect(path: str | PathLike[str], *, pragmas: Any = None, timeout: float = 5.0, iter_chunk_size: int = 64, idle_timeout: int | None = None, loop: Any = None, aiosqlite_compat: bool = False, pool_size: int | None = None, **kwargs: Any) Connection[source]
Connect to a SQLite database.
This function matches the aiosqlite.connect() API for compatibility, allowing seamless migration from aiosqlite to rapsqlite.
- Parameters:
path – Path to the SQLite database file. Can be “:memory:” for an in-memory database, or a file path. Can also be a URI format: “file:path?param=value”. The path is validated for security (non-empty, no null bytes).
pragmas – Optional dictionary of PRAGMA settings to apply on connection. These are applied when the connection pool is first created. Example: {“journal_mode”: “WAL”, “synchronous”: “NORMAL”, “foreign_keys”: True}. See SQLite PRAGMA documentation for available settings.
timeout – How long to wait (in seconds) when the database is locked by another process/thread before raising an error. Default: 5.0 seconds. This sets SQLite’s busy_timeout PRAGMA. Set to 0.0 to disable timeout. This matches aiosqlite and sqlite3’s timeout parameter.
iter_chunk_size – Chunk size for iteration (e.g. fetchmany). Default 64. Stored for use with cursor iteration; aiosqlite-compatible.
idle_timeout – Optional seconds. When set, connections idle in the pool longer than this are closed. None (default) means no idle timeout.
loop – Deprecated. Event loop (ignored). Accept-only for aiosqlite compatibility.
aiosqlite_compat – If True, set default row_factory to tuple so that fetch_all, fetchone, cursor fetchall/fetchone return tuples (like aiosqlite/sqlite3). Use for drop-in
import rapsqlite as aiosqlitewithout changing code that expects tuple rows. Default False (rows are lists).pool_size – Optional max connections in the shared pool for this path. Set before first use so the pool is created with this size (e.g. for high-concurrency tests). Default None (pool uses internal minimum).
**kwargs – Additional arguments (currently ignored, reserved for future use)
- Returns:
- An async SQLite connection object that can be used as an
async context manager. The connection uses lazy initialization - the actual database connection pool is created on first use.
- Return type:
Example
With timeout (aiosqlite compatibility):
async with connect("example.db", timeout=10.0) as conn: await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
- Raises:
ValueError – If the database path is invalid (empty or contains null bytes)
OperationalError – If the database connection cannot be established (e.g., permission denied, disk full, etc.)
Example –
Basic usage:: – async with connect(“example.db”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER PRIMARY KEY)”) await conn.execute(“INSERT INTO test DEFAULT VALUES”) rows = await conn.fetch_all(“SELECT * FROM test”) # rows = [[1]]
In-memory database:: – async with connect(“:memory:”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER)”) # Database exists only for the duration of the connection
With PRAGMA settings:: – async with connect(“example.db”, pragmas={ “journal_mode”: “WAL”, “synchronous”: “NORMAL”, “foreign_keys”: True }) as conn: await conn.execute(“CREATE TABLE test (id INTEGER PRIMARY KEY)”)
URI format:: – async with connect(”file:example.db?mode=rwc”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER)”)
Note
The connection object supports async context manager protocol. It’s recommended to use
async withto ensure proper resource cleanup. All database operations execute outside the Python GIL, providing true async performance.See also
Connection: For more advanced connection options including initialization hooks.
- class rapsqlite.PoolMetricsGauges[source]
Bases:
TypedDictReturn type for pool_metrics_gauges(); gauge names for Prometheus/custom metrics.
- async rapsqlite.pool_metrics_gauges(conn: Any) PoolMetricsGauges[source]
Return pool metrics as a dict of gauge names to values for Prometheus or custom metrics.
- rapsqlite.execute_iter(conn: Any, sql: str, parameters: Any | None = None, chunk_size: int | None = None) _StreamChunksIterator[source]
Return an async iterator that yields rows in chunks (streaming / memory-efficient).
Uses LIMIT/OFFSET under the hood so memory stays bounded by chunk_size. Single connection is used for the duration of iteration; closing the connection or cancelling the task stops iteration.
- async rapsqlite.paginate(conn: Any, sql: str, parameters: Any | None = None, page_size: int = 64, offset: int = 0) list[list[Any]][source]
Fetch one page of rows from a SELECT query.
Uses LIMIT/OFFSET under the hood. For multiple pages, call with incrementing offset: paginate(conn, sql, params, 100, 0), then paginate(conn, sql, params, 100, 100), etc.
- async rapsqlite.analyze_query_plan(conn: Any, sql: str, parameters: Any | None = None) dict[str, Any][source]
Run EXPLAIN QUERY PLAN and return structured analysis.
- async rapsqlite.suggest_indexes(conn: Any, sql: str, parameters: Any | None = None) list[dict[str, Any]][source]
Suggest indexes when query plan indicates a full table scan.
- rapsqlite.in_clause_query(sql: str, values: list[Any] | tuple[Any, ...]) tuple[str, list[Any]][source]
Expand IN (?) to IN (?,?,…) for use with fetch_all.
- rapsqlite.rows_to_dicts(rows: list[Any], columns: list[str] | tuple[str, ...] | None = None) list[dict[str, Any]][source]
Convert rows (list of list/tuple) to list of dicts using column names.
- async rapsqlite.timed_fetch_all(conn: Any, sql: str, parameters: Any | None = None, on_timing: Callable[[float, str], None] | None = None) list[list[Any]] | tuple[list[list[Any]], float][source]
Run fetch_all and record duration; optionally call on_timing(duration_secs, sql).
If on_timing is None, returns (rows, duration_secs). If on_timing is provided, calls on_timing(duration_secs, sql) and returns rows only.
- async rapsqlite.transaction_retry(conn: Any, work: Any, max_retries: int = 5, initial_delay: float = 0.01, max_delay: float = 1.0) Any[source]
Run a transaction with retry on transient errors (e.g. SQLITE_BUSY, SQLITE_LOCKED).
workis a callable that returns an awaitable (e.g. an async function); it is invoked once per attempt so each retry runs fresh. Retries with exponential backoff.Example
- async with connect(“app.db”) as conn:
- async def do_work():
await conn.execute(“INSERT INTO t (x) VALUES (?)”, [“a”])
await transaction_retry(conn, do_work, max_retries=3)
- async rapsqlite.transaction_with_timeout(conn: Any, work: Any, timeout_secs: float = 30.0) Any[source]
Run a transaction with a timeout.
Wraps the transaction body in asyncio.wait_for. Raises asyncio.TimeoutError if the transaction (including work) exceeds timeout_secs.
- exception rapsqlite.DataError
Bases:
DatabaseError
- exception rapsqlite.InternalError
Bases:
DatabaseError
- exception rapsqlite.NotSupportedError
Bases:
DatabaseError