API Reference

This section provides detailed documentation for all public APIs in rapsqlite.

Core Classes

Module Functions

True async SQLite — no fake async, no GIL stalls.

rapsqlite provides true async SQLite operations for Python, backed by Rust, Tokio, and sqlx. Unlike libraries that wrap blocking database calls in async syntax, rapsqlite guarantees that all database operations execute outside the Python GIL, ensuring event loops never stall under load. Supports type adapters and converters (register_adapter, register_converter) and custom aggregates and collations (create_aggregate, create_collation) per-connection (sqlite3-style).

Example

Basic usage:

import asyncio
from rapsqlite import Connection

async def main():
    async with Connection("example.db") as conn:
        await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
        await conn.execute("INSERT INTO test (value) VALUES ('hello')")
        rows = await conn.fetch_all("SELECT * FROM test")
        print(rows)
        # Output: [[1, 'hello']]

asyncio.run(main())

Using the connect() function (aiosqlite-compatible):

import asyncio
from rapsqlite import connect

async def main():
    async with connect("example.db") as conn:
        await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
        await conn.execute("INSERT INTO test (value) VALUES ('hello')")
        rows = await conn.fetch_all("SELECT * FROM test")
        print(rows)
        # Output: [[1, 'hello']]

asyncio.run(main())

Transactions:

async with Connection("example.db") as conn:
    await conn.begin()
    try:
        await conn.execute("INSERT INTO users (name) VALUES ('Alice')")
        await conn.commit()
    except Exception:
        await conn.rollback()
rapsqlite.ConnectionT

alias of Connection

rapsqlite.CursorT

alias of Cursor

rapsqlite.connect(path: str | PathLike[str], *, pragmas: Any = None, timeout: float = 5.0, iter_chunk_size: int = 64, idle_timeout: int | None = None, loop: Any = None, aiosqlite_compat: bool = False, pool_size: int | None = None, **kwargs: Any) Connection[source]

Connect to a SQLite database.

This function matches the aiosqlite.connect() API for compatibility, allowing seamless migration from aiosqlite to rapsqlite.

Parameters:
  • path – Path to the SQLite database file. Can be “:memory:” for an in-memory database, or a file path. Can also be a URI format: “file:path?param=value”. The path is validated for security (non-empty, no null bytes).

  • pragmas – Optional dictionary of PRAGMA settings to apply on connection. These are applied when the connection pool is first created. Example: {“journal_mode”: “WAL”, “synchronous”: “NORMAL”, “foreign_keys”: True}. See SQLite PRAGMA documentation for available settings.

  • timeout – How long to wait (in seconds) when the database is locked by another process/thread before raising an error. Default: 5.0 seconds. This sets SQLite’s busy_timeout PRAGMA. Set to 0.0 to disable timeout. This matches aiosqlite and sqlite3’s timeout parameter.

  • iter_chunk_size – Chunk size for iteration (e.g. fetchmany). Default 64. Stored for use with cursor iteration; aiosqlite-compatible.

  • idle_timeout – Optional seconds. When set, connections idle in the pool longer than this are closed. None (default) means no idle timeout.

  • loop – Deprecated. Event loop (ignored). Accept-only for aiosqlite compatibility.

  • aiosqlite_compat – If True, set default row_factory to tuple so that fetch_all, fetchone, cursor fetchall/fetchone return tuples (like aiosqlite/sqlite3). Use for drop-in import rapsqlite as aiosqlite without changing code that expects tuple rows. Default False (rows are lists).

  • pool_size – Optional max connections in the shared pool for this path. Set before first use so the pool is created with this size (e.g. for high-concurrency tests). Default None (pool uses internal minimum).

  • **kwargs – Additional arguments (currently ignored, reserved for future use)

Returns:

An async SQLite connection object that can be used as an

async context manager. The connection uses lazy initialization - the actual database connection pool is created on first use.

Return type:

Connection

Example

With timeout (aiosqlite compatibility):

async with connect("example.db", timeout=10.0) as conn:
    await conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)")
Raises:
  • ValueError – If the database path is invalid (empty or contains null bytes)

  • OperationalError – If the database connection cannot be established (e.g., permission denied, disk full, etc.)

  • Example

  • Basic usage:: – async with connect(“example.db”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER PRIMARY KEY)”) await conn.execute(“INSERT INTO test DEFAULT VALUES”) rows = await conn.fetch_all(“SELECT * FROM test”) # rows = [[1]]

  • In-memory database:: – async with connect(“:memory:”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER)”) # Database exists only for the duration of the connection

  • With PRAGMA settings:: – async with connect(“example.db”, pragmas={ “journal_mode”: “WAL”, “synchronous”: “NORMAL”, “foreign_keys”: True }) as conn: await conn.execute(“CREATE TABLE test (id INTEGER PRIMARY KEY)”)

  • URI format:: – async with connect(”file:example.db?mode=rwc”) as conn: await conn.execute(“CREATE TABLE test (id INTEGER)”)

Note

The connection object supports async context manager protocol. It’s recommended to use async with to ensure proper resource cleanup. All database operations execute outside the Python GIL, providing true async performance.

See also

Connection: For more advanced connection options including initialization hooks.

class rapsqlite.PoolMetricsGauges[source]

Bases: TypedDict

Return type for pool_metrics_gauges(); gauge names for Prometheus/custom metrics.

rapsqlite_pool_size: int
rapsqlite_pool_num_idle: int
rapsqlite_pool_in_use: int
async rapsqlite.pool_metrics_gauges(conn: Any) PoolMetricsGauges[source]

Return pool metrics as a dict of gauge names to values for Prometheus or custom metrics.

rapsqlite.execute_iter(conn: Any, sql: str, parameters: Any | None = None, chunk_size: int | None = None) _StreamChunksIterator[source]

Return an async iterator that yields rows in chunks (streaming / memory-efficient).

Uses LIMIT/OFFSET under the hood so memory stays bounded by chunk_size. Single connection is used for the duration of iteration; closing the connection or cancelling the task stops iteration.

async rapsqlite.paginate(conn: Any, sql: str, parameters: Any | None = None, page_size: int = 64, offset: int = 0) list[list[Any]][source]

Fetch one page of rows from a SELECT query.

Uses LIMIT/OFFSET under the hood. For multiple pages, call with incrementing offset: paginate(conn, sql, params, 100, 0), then paginate(conn, sql, params, 100, 100), etc.

async rapsqlite.analyze_query_plan(conn: Any, sql: str, parameters: Any | None = None) dict[str, Any][source]

Run EXPLAIN QUERY PLAN and return structured analysis.

async rapsqlite.suggest_indexes(conn: Any, sql: str, parameters: Any | None = None) list[dict[str, Any]][source]

Suggest indexes when query plan indicates a full table scan.

rapsqlite.in_clause_query(sql: str, values: list[Any] | tuple[Any, ...]) tuple[str, list[Any]][source]

Expand IN (?) to IN (?,?,…) for use with fetch_all.

rapsqlite.rows_to_dicts(rows: list[Any], columns: list[str] | tuple[str, ...] | None = None) list[dict[str, Any]][source]

Convert rows (list of list/tuple) to list of dicts using column names.

async rapsqlite.timed_fetch_all(conn: Any, sql: str, parameters: Any | None = None, on_timing: Callable[[float, str], None] | None = None) list[list[Any]] | tuple[list[list[Any]], float][source]

Run fetch_all and record duration; optionally call on_timing(duration_secs, sql).

If on_timing is None, returns (rows, duration_secs). If on_timing is provided, calls on_timing(duration_secs, sql) and returns rows only.

async rapsqlite.transaction_retry(conn: Any, work: Any, max_retries: int = 5, initial_delay: float = 0.01, max_delay: float = 1.0) Any[source]

Run a transaction with retry on transient errors (e.g. SQLITE_BUSY, SQLITE_LOCKED).

work is a callable that returns an awaitable (e.g. an async function); it is invoked once per attempt so each retry runs fresh. Retries with exponential backoff.

Example

async with connect(“app.db”) as conn:
async def do_work():

await conn.execute(“INSERT INTO t (x) VALUES (?)”, [“a”])

await transaction_retry(conn, do_work, max_retries=3)

async rapsqlite.transaction_with_timeout(conn: Any, work: Any, timeout_secs: float = 30.0) Any[source]

Run a transaction with a timeout.

Wraps the transaction body in asyncio.wait_for. Raises asyncio.TimeoutError if the transaction (including work) exceeds timeout_secs.

exception rapsqlite.InterfaceError

Bases: Error

exception rapsqlite.DataError

Bases: DatabaseError

exception rapsqlite.InternalError

Bases: DatabaseError

exception rapsqlite.NotSupportedError

Bases: DatabaseError