executor

Abstract Base Class for High-Performance SQLAlchemy Bulk Upsert Operations

This module provides a Template Method pattern implementation for bulk upsert operations using temporary table staging. The abstraction eliminates code duplication while allowing different upsert strategies to implement their specific logic.

Abstraction Strategy:

The UpsertExecutor class abstracts the common workflow shared by all bulk upsert operations while requiring subclasses to implement only their strategy-specific logic. This design provides:

  1. Code Reuse: ~80% of upsert logic is common (transaction management, temp table lifecycle, error handling, cleanup)

  2. Strategy Isolation: Each upsert strategy (INSERT OR IGNORE, INSERT OR REPLACE, UPSERT/MERGE) implements only its core database operations

  3. Consistent Behavior: All strategies share the same transaction modes, error handling, and cleanup patterns

  4. Easy Extension: Adding new upsert strategies requires minimal code

Template Method Pattern:

The execution flow follows this pattern:

UpsertExecutor.run()
├── Transaction Management (auto vs user-managed)
├── UpsertExecutor.execute_operation()
│   ├── Create temporary staging table
│   ├── Bulk load candidate data into temp table
│   ├── Subclass.apply_strategy() ← **Strategy-specific logic**
│   └── Cleanup temporary table
└── Error handling and rollback cleanup

Supported Strategies:

  • INSERT OR IGNORE: Inserts only new records, ignores conflicts

  • INSERT OR REPLACE: Replaces entire conflicting records with new data

  • UPSERT/MERGE: Updates specific fields of existing records, inserts new ones

Transaction Management:

All strategies support dual-mode operation:

  • Auto-managed: Function creates and manages its own transaction

  • User-managed: Function operates within caller’s existing transaction

Database Compatibility:

This module is designed for SQLite but the abstraction pattern can be extended to other database systems by implementing database-specific executors.

Implementation Notes:

  • Uses dataclasses for clean state management (eliminates nonlocal variables)

  • Cached properties for efficient mode detection

  • Comprehensive error handling with proper cleanup in all scenarios

  • Testing infrastructure with controlled failure injection

class sqlalchemy_upsert_kit.sqlite.executor.UpsertExecutor(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None, temp_table_name: str | None, conn: Connection | None, trans: Transaction | None, columns: list[str] | None, _raise_on_temp_table_create: bool, _raise_on_temp_data_insert: bool, _raise_on_target_delete: bool, _raise_on_target_insert: bool, _raise_on_temp_table_drop: bool, _raise_on_merge_update: bool, _ignored_rows: int = 0, _replaced_rows: int = 0, _updated_rows: int = 0, _inserted_rows: int = 0, _temp_table_created: bool = False)[source]

Abstract base class for high-performance bulk upsert operations using temporary tables.

This class implements the Template Method pattern, providing a common framework for all upsert strategies while requiring subclasses to implement only their strategy-specific database operations.

Template Method Flow:

  1. Setup: Create temporary table and validate parameters

  2. Data Loading: Bulk insert candidate records into staging table

  3. Strategy Execution: Subclass implements specific upsert logic

  4. Cleanup: Remove temporary resources and handle errors

Subclass Requirements:

Subclasses must implement apply_strategy() to define their specific upsert behavior (INSERT OR IGNORE, INSERT OR REPLACE, UPSERT/MERGE, etc.).

State Management:

All operation state is managed through dataclass fields, eliminating the need for nonlocal variables and providing clean, testable state tracking.

Transaction Modes:

  • Auto-managed: conn=None, trans=None - Creates own transaction

  • User-managed: conn=Connection, trans=Transaction - Uses caller’s transaction

Example Usage:

@dataclasses.dataclass
class MyUpsertExecutor(UpsertExecutor):
    def apply_strategy(self, conn, trans):
        # Implement specific upsert logic here
        pass

executor = MyUpsertExecutor.new(engine, table, data)
ignored, inserted = executor.run()
Parameters:
  • engine – SQLAlchemy engine for database connection

  • table – Target table for upsert operation

  • values – Records to insert or replace. Must include primary key values for conflict detection.

  • metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.

  • temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.

  • conn – Optional database connection for user-managed transaction mode. Must be provided together with trans parameter.

  • trans – Optional transaction for user-managed transaction mode. Must be provided together with conn parameter.

  • _raise_on_temp_table_createTesting only - Simulate temp table creation failure

  • _raise_on_temp_data_insertTesting only - Simulate temp data insertion failure

  • _raise_on_target_deleteTesting only - Simulate target deletion failure

  • _raise_on_target_insertTesting only - Simulate target insertion failure

  • _raise_on_temp_table_dropTesting only - Simulate temp table cleanup failure

  • _ignored_rows – Number of records ignored during operation (INSERT OR IGNORE).

  • _replaced_rows – Number of records replaced during operation (INSERT OR REPLACE).

  • _updated_rows – Number of records updated during operation (UPSERT/MERGE).

  • _inserted_rows – Number of new records inserted during operation

  • _temp_table – Temporary staging table created during operation.

  • _temp_table_created – Flag tracking whether temporary table was successfully created.

classmethod new(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, columns: list[str] | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_delete: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False, _raise_on_merge_update: bool = False)[source]

Factory method to create UpsertExecutor instances with sensible defaults.

This method provides default values for optional parameters and creates a properly configured executor instance ready for operation.

property user_managed: bool

Check if executor is operating in user-managed transaction mode.

property auto_managed: bool

Check if executor is operating in auto-managed transaction mode.

property pk_name: str

Get the primary key column name from the target table.

clone_temp_table()[source]

Create a temporary table with the same structure as the target table.

This method clones the target table’s structure into a temporary table that will be used for staging data during the upsert operation.

create_temp_table(conn: Connection)[source]

Create the temporary staging table for bulk data operations.

Parameters:

conn – Database connection to use for table creation

insert_temp_data(conn: Connection)[source]

Bulk insert candidate records into temporary staging table.

This operation loads all candidate records into the staging area, enabling efficient bulk processing through JOIN operations.

Parameters:

conn – Database connection to use for data insertion

cleanup_temp_table_on_success(conn: Connection)[source]

Clean up temporary table after successful operation.

This method removes the temporary table and cleans up metadata within the same connection context, ensuring proper cleanup in the success path.

Parameters:

conn – Database connection to use for cleanup

cleanup_temp_table_on_failure()[source]

Clean up temporary table using a fresh connection after transaction failure.

This method is called when cleanup needs to happen outside the main transaction context, typically in error scenarios. It uses a fresh connection to avoid SQLite database lock issues that can occur when the original transaction has been rolled back.

Why Fresh Connection:

SQLite DDL operations (CREATE/DROP TABLE) are not transactional and commit immediately. When the main transaction rolls back, the temporary table may still exist but the original connection may be locked. A fresh connection ensures we can clean up properly.

Error Handling:

Cleanup failures are suppressed to avoid masking the original exception that caused the operation to fail. This prevents cleanup issues from hiding the root cause of problems.

abstract apply_strategy(conn: Connection, trans: Transaction)[source]

Apply the upsert strategy-specific logic.

This abstract method must be implemented by subclasses to define their specific upsert behavior. The method is called after the temporary table has been created and populated with candidate data.

Implementation Requirements:

Subclasses should implement the core database operations that define their upsert strategy:

  • INSERT OR IGNORE: Use LEFT JOIN to insert only non-conflicting records

  • INSERT OR REPLACE: Delete conflicting records, then insert all records

  • UPSERT/MERGE: Update existing records, insert new ones

State Management:

Implementations should update the appropriate result counters:

  • self._ignored_rows - Records ignored (INSERT OR IGNORE)

  • self._replaced_rows - Records replaced (INSERT OR REPLACE)

  • self._updated_rows - Records updated (UPSERT/MERGE)

  • self._inserted_rows - New records inserted

Error Handling:

Implementations can use testing flags for controlled failure simulation:

  • self._raise_on_target_delete - Simulate deletion failures

  • self._raise_on_target_insert - Simulate insertion failures

Parameters:
  • conn – Database connection within active transaction

  • trans – Active transaction context

Raises:

Example Implementation:

def apply_strategy(self, conn, trans):
    # INSERT OR IGNORE strategy
    stmt = self.table.insert().from_select(
        list(self._temp_table.columns.keys()),
        sa.select(self._temp_table).select_from(
            self._temp_table.outerjoin(self.table, ...)
        ).where(self.table.c[self.pk_name].is_(None))
    )
    result = conn.execute(stmt)
    self._inserted_rows = result.rowcount or 0
    self._ignored_rows = len(self.values) - self._inserted_rows
execute_operation(conn: Connection, trans: Transaction)[source]

Execute the complete upsert operation within the provided transaction context.

This method implements the Template Method pattern, orchestrating the common workflow while delegating strategy-specific logic to subclasses. It operates within either a user-managed or auto-managed transaction depending on how the executor was configured.

Execution Flow:

  1. Create Staging: Create temporary table for bulk data processing

  2. Load Data: Bulk insert all candidate records into staging area

  3. Apply Strategy: Execute subclass-specific upsert logic

  4. Cleanup: Remove temporary resources

Error Handling:

If any step fails, the method re-raises the original exception. Cleanup of temporary tables is handled by the caller based on transaction mode (auto vs user-managed).

Performance Notes:

The temporary table approach provides significant performance benefits over row-by-row operations by enabling efficient bulk SQL operations and leveraging database JOIN optimizations.

Parameters:
  • conn – Database connection within active transaction

  • trans – Active transaction context

Returns:

Tuple of operation results (varies by strategy)

Raises:
run()[source]

Execute the complete upsert operation with appropriate transaction management.

This is the main entry point for upsert operations. It handles both auto-managed and user-managed transaction modes, ensuring proper error handling and cleanup in all scenarios.

Transaction Modes:

  • Auto-managed (conn=None, trans=None): Creates and manages its own database transaction. Automatically commits on success and rolls back on failure.

  • User-managed (conn=Connection, trans=Transaction): Operates within the caller’s existing transaction. The caller is responsible for committing or rolling back the transaction.

Error Handling:

In both modes, temporary table cleanup is handled appropriately: - Auto-managed: Uses fresh connection after transaction rollback - User-managed: Cleans up but preserves caller’s transaction state

Usage Examples:

# Auto-managed transaction
executor = MyUpsertExecutor.new(engine, table, data)
result = executor.run()

# User-managed transaction
with engine.connect() as conn:
    with conn.begin() as trans:
        executor = MyUpsertExecutor.new(
            engine, table, data, conn=conn, trans=trans
        )
        result = executor.run()
        # Additional operations...
        # trans.commit() handled by context manager
Returns:

Tuple of operation results (varies by strategy): - INSERT OR IGNORE: (_ignored_rows, _inserted_rows) - INSERT OR REPLACE: (_replaced_rows, _inserted_rows) - UPSERT/MERGE: (_updated_rows, _inserted_rows)

Raises:
  • ValueError – When transaction mode parameters are inconsistent

  • UpsertTestError – When testing flags are enabled

  • Exception – Any database or application errors during operation