sqlite

High-Performance SQLAlchemy Bulk Operations Module for Sqlite

This module provides optimized bulk operations for SQLAlchemy using temporary tables. These methods are specifically designed for large datasets and offer superior performance compared to traditional row-by-row operations.

Implementation Notes

Temporary Table Strategy:

All bulk operations use temporary tables as staging areas to achieve optimal performance. Temporary tables are created with unique names to avoid conflicts in concurrent environments. Comprehensive cleanup ensures no temporary tables are left behind, even when errors occur.

SQLite DDL Behavior:

SQLite DDL operations (CREATE/DROP TABLE) are not transactional and commit immediately. The cleanup logic accounts for this by using fresh connections when necessary to avoid database locks during error scenarios.

Testing Infrastructure:

Functions include boolean parameters prefixed with _raise_on_ that are exclusively for testing purposes. These parameters inject controlled failures at specific points in the operation flow to verify error handling and cleanup behavior. These parameters should never be used in production code.

sqlalchemy_upsert_kit.sqlite.insert_or_ignore(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False) tuple[int, int][source]

Perform high-performance bulk INSERT-IF-NOT-EXISTS operation using temporary table.

This function performs conditional bulk insertion: only inserts records whose primary keys don’t already exist in the target table. This is equivalent to “INSERT IGNORE” or “INSERT … ON CONFLICT DO NOTHING” but works more efficiently.

Algorithm:

  1. Creates temporary table and loads all candidate data

  2. Uses LEFT JOIN to identify records not in target table

  3. Bulk inserts only the non-conflicting records

  4. Cleans up temporary resources

This approach is ideal for:

  • Incremental data loading where duplicates should be ignored

  • ETL processes that need idempotent behavior

  • Syncing data from external sources

Transaction Management:

This function supports both auto-managed and user-managed transaction modes. See the module-level documentation for detailed explanations of each mode.

Parameters:
  • engine – SQLAlchemy engine for database connection

  • table – Target table for conditional insertion

  • values – Records to insert if they don’t exist. Must include primary key values for conflict detection.

  • metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.

  • temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.

  • conn – Optional database connection for user-managed transaction mode. Must be provided together with trans parameter.

  • trans – Optional transaction for user-managed transaction mode. Must be provided together with conn parameter.

Returns:

Tuple of (ignored_rows, inserted_rows): - ignored_rows: Number of records that were not inserted (already existed) - inserted_rows: Number of new records successfully inserted

Raises:
  • ValueError – When conn and trans parameters are provided inconsistently (one is None while the other is not)

  • UpsertTestError – When testing flags are enabled and corresponding operations fail

Examples:

Auto-managed transaction (default mode):

# Function manages its own transaction
ignored, inserted = insert_or_ignore(engine, users_table, new_data)

User-managed transaction mode:

# Operation is part of larger transaction
with engine.connect() as conn:
    with conn.begin() as trans:
        # Other operations...
        ignored, inserted = insert_or_ignore(
            engine, users_table, new_data, conn=conn, trans=trans
        )
        # More operations...

Conflict detection example:

# Target table has records with id=1,2,3
new_data = [
    {'id': 2, 'name': 'Bob'},      # Exists - will be ignored
    {'id': 4, 'name': 'Charlie'},  # New - will be inserted
    {'id': 5, 'name': 'David'},    # New - will be inserted
]
ignored, inserted = insert_or_ignore(engine, users_table, new_data)
# Result: ignored=1, inserted=2

Note

Parameters prefixed with _raise_on_ are exclusively for testing error handling and cleanup behavior. Never use these in production code.

sqlalchemy_upsert_kit.sqlite.insert_or_replace(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_delete: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False) tuple[int, int][source]

Perform high-performance bulk INSERT-OR-REPLACE operation using temporary table.

This function performs bulk upsert operations: replaces existing records entirely with new data and inserts records that don’t exist. This is equivalent to “INSERT OR REPLACE” or complete record replacement but works more efficiently for large datasets.

Algorithm:

  1. Creates temporary table and loads all candidate data

  2. Uses JOIN to identify conflicting records in target table

  3. Deletes conflicting records from target table

  4. Bulk inserts all records from temporary table (both new and replacement)

  5. Cleans up temporary resources

This approach is ideal for:

  • Full synchronization from authoritative data source

  • Complete data refresh scenarios

  • When new data should completely replace existing records

Transaction Management:

This function supports both auto-managed and user-managed transaction modes. See the module-level documentation for detailed explanations of each mode.

Parameters:
  • engine – SQLAlchemy engine for database connection

  • table – Target table for upsert operation

  • values – Records to insert or replace. Must include primary key values for conflict detection.

  • metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.

  • temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.

  • conn – Optional database connection for user-managed transaction mode. Must be provided together with trans parameter.

  • trans – Optional transaction for user-managed transaction mode. Must be provided together with conn parameter.

Returns:

Tuple of (replaced_rows, inserted_rows): - replaced_rows: Number of existing records that were replaced - inserted_rows: Number of new records that were inserted

Raises:
  • ValueError – When conn and trans parameters are provided inconsistently (one is None while the other is not)

  • UpsertTestError – When testing flags are enabled and corresponding operations fail

Examples:

Auto-managed transaction (default mode):

# Function manages its own transaction
updated, inserted = insert_or_replace(engine, users_table, new_data)

User-managed transaction mode:

# Operation is part of larger transaction
with engine.connect() as conn:
    with conn.begin() as trans:
        # Other operations...
        updated, inserted = insert_or_replace(
            engine, users_table, new_data, conn=conn, trans=trans
        )
        # More operations...

Complete replacement example:

# Target table has records with id=1,2,3
new_data = [
    {'id': 2, 'name': 'Bob Updated'},    # Exists - will be replaced
    {'id': 4, 'name': 'Charlie'},        # New - will be inserted
    {'id': 5, 'name': 'David'},          # New - will be inserted
]
updated, inserted = insert_or_replace(engine, users_table, new_data)
# Result: updated=1, inserted=2
Performance Comparison:

Traditional row-by-row approach (100K records): ~300 seconds This method (100K records): ~15 seconds Performance gain: ~20x faster

Note

Parameters prefixed with _raise_on_ are exclusively for testing error handling and cleanup behavior. Never use these in production code.

Warning

This operation completely replaces existing records. All fields of conflicting records (including historical fields like timestamps) will be overwritten with new data.

sub packages and modules