executor¶
Abstract Base Class for High-Performance SQLAlchemy Bulk Upsert Operations
This module provides a Template Method pattern implementation for bulk upsert operations using temporary table staging. The abstraction eliminates code duplication while allowing different upsert strategies to implement their specific logic.
Abstraction Strategy:
The UpsertExecutor class abstracts the common workflow shared by all bulk upsert
operations while requiring subclasses to implement only their strategy-specific logic.
This design provides:
Code Reuse: ~80% of upsert logic is common (transaction management, temp table lifecycle, error handling, cleanup)
Strategy Isolation: Each upsert strategy (INSERT OR IGNORE, INSERT OR REPLACE, UPSERT/MERGE) implements only its core database operations
Consistent Behavior: All strategies share the same transaction modes, error handling, and cleanup patterns
Easy Extension: Adding new upsert strategies requires minimal code
Template Method Pattern:
The execution flow follows this pattern:
UpsertExecutor.run()
├── Transaction Management (auto vs user-managed)
├── UpsertExecutor.execute_operation()
│ ├── Create temporary staging table
│ ├── Bulk load candidate data into temp table
│ ├── Subclass.apply_strategy() ← **Strategy-specific logic**
│ └── Cleanup temporary table
└── Error handling and rollback cleanup
Supported Strategies:
INSERT OR IGNORE: Inserts only new records, ignores conflicts
INSERT OR REPLACE: Replaces entire conflicting records with new data
UPSERT/MERGE: Updates specific fields of existing records, inserts new ones
Transaction Management:
All strategies support dual-mode operation:
Auto-managed: Function creates and manages its own transaction
User-managed: Function operates within caller’s existing transaction
Database Compatibility:
This module is designed for SQLite but the abstraction pattern can be extended to other database systems by implementing database-specific executors.
Implementation Notes:
Uses dataclasses for clean state management (eliminates nonlocal variables)
Cached properties for efficient mode detection
Comprehensive error handling with proper cleanup in all scenarios
Testing infrastructure with controlled failure injection
- class sqlalchemy_upsert_kit.sqlite.executor.UpsertExecutor(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None, temp_table_name: str | None, conn: Connection | None, trans: Transaction | None, columns: list[str] | None, _raise_on_temp_table_create: bool, _raise_on_temp_data_insert: bool, _raise_on_target_delete: bool, _raise_on_target_insert: bool, _raise_on_temp_table_drop: bool, _raise_on_merge_update: bool, _ignored_rows: int = 0, _replaced_rows: int = 0, _updated_rows: int = 0, _inserted_rows: int = 0, _temp_table_created: bool = False)[source]¶
Abstract base class for high-performance bulk upsert operations using temporary tables.
This class implements the Template Method pattern, providing a common framework for all upsert strategies while requiring subclasses to implement only their strategy-specific database operations.
Template Method Flow:
Setup: Create temporary table and validate parameters
Data Loading: Bulk insert candidate records into staging table
Strategy Execution: Subclass implements specific upsert logic
Cleanup: Remove temporary resources and handle errors
Subclass Requirements:
Subclasses must implement
apply_strategy()to define their specific upsert behavior (INSERT OR IGNORE, INSERT OR REPLACE, UPSERT/MERGE, etc.).State Management:
All operation state is managed through dataclass fields, eliminating the need for nonlocal variables and providing clean, testable state tracking.
Transaction Modes:
Auto-managed:
conn=None, trans=None- Creates own transactionUser-managed:
conn=Connection, trans=Transaction- Uses caller’s transaction
Example Usage:
@dataclasses.dataclass class MyUpsertExecutor(UpsertExecutor): def apply_strategy(self, conn, trans): # Implement specific upsert logic here pass executor = MyUpsertExecutor.new(engine, table, data) ignored, inserted = executor.run()
- Parameters:
engine – SQLAlchemy engine for database connection
table – Target table for upsert operation
values – Records to insert or replace. Must include primary key values for conflict detection.
metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.
temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.
conn – Optional database connection for user-managed transaction mode. Must be provided together with
transparameter.trans – Optional transaction for user-managed transaction mode. Must be provided together with
connparameter._raise_on_temp_table_create – Testing only - Simulate temp table creation failure
_raise_on_temp_data_insert – Testing only - Simulate temp data insertion failure
_raise_on_target_delete – Testing only - Simulate target deletion failure
_raise_on_target_insert – Testing only - Simulate target insertion failure
_raise_on_temp_table_drop – Testing only - Simulate temp table cleanup failure
_ignored_rows – Number of records ignored during operation (INSERT OR IGNORE).
_replaced_rows – Number of records replaced during operation (INSERT OR REPLACE).
_updated_rows – Number of records updated during operation (UPSERT/MERGE).
_inserted_rows – Number of new records inserted during operation
_temp_table – Temporary staging table created during operation.
_temp_table_created – Flag tracking whether temporary table was successfully created.
- classmethod new(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, columns: list[str] | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_delete: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False, _raise_on_merge_update: bool = False)[source]¶
Factory method to create UpsertExecutor instances with sensible defaults.
This method provides default values for optional parameters and creates a properly configured executor instance ready for operation.
- clone_temp_table()[source]¶
Create a temporary table with the same structure as the target table.
This method clones the target table’s structure into a temporary table that will be used for staging data during the upsert operation.
- create_temp_table(conn: Connection)[source]¶
Create the temporary staging table for bulk data operations.
- Parameters:
conn – Database connection to use for table creation
- insert_temp_data(conn: Connection)[source]¶
Bulk insert candidate records into temporary staging table.
This operation loads all candidate records into the staging area, enabling efficient bulk processing through JOIN operations.
- Parameters:
conn – Database connection to use for data insertion
- cleanup_temp_table_on_success(conn: Connection)[source]¶
Clean up temporary table after successful operation.
This method removes the temporary table and cleans up metadata within the same connection context, ensuring proper cleanup in the success path.
- Parameters:
conn – Database connection to use for cleanup
- cleanup_temp_table_on_failure()[source]¶
Clean up temporary table using a fresh connection after transaction failure.
This method is called when cleanup needs to happen outside the main transaction context, typically in error scenarios. It uses a fresh connection to avoid SQLite database lock issues that can occur when the original transaction has been rolled back.
Why Fresh Connection:
SQLite DDL operations (CREATE/DROP TABLE) are not transactional and commit immediately. When the main transaction rolls back, the temporary table may still exist but the original connection may be locked. A fresh connection ensures we can clean up properly.
Error Handling:
Cleanup failures are suppressed to avoid masking the original exception that caused the operation to fail. This prevents cleanup issues from hiding the root cause of problems.
- abstract apply_strategy(conn: Connection, trans: Transaction)[source]¶
Apply the upsert strategy-specific logic.
This abstract method must be implemented by subclasses to define their specific upsert behavior. The method is called after the temporary table has been created and populated with candidate data.
Implementation Requirements:
Subclasses should implement the core database operations that define their upsert strategy:
INSERT OR IGNORE: Use LEFT JOIN to insert only non-conflicting records
INSERT OR REPLACE: Delete conflicting records, then insert all records
UPSERT/MERGE: Update existing records, insert new ones
State Management:
Implementations should update the appropriate result counters:
self._ignored_rows- Records ignored (INSERT OR IGNORE)self._replaced_rows- Records replaced (INSERT OR REPLACE)self._updated_rows- Records updated (UPSERT/MERGE)self._inserted_rows- New records inserted
Error Handling:
Implementations can use testing flags for controlled failure simulation:
self._raise_on_target_delete- Simulate deletion failuresself._raise_on_target_insert- Simulate insertion failures
- Parameters:
conn – Database connection within active transaction
trans – Active transaction context
- Raises:
UpsertTestError – When testing flags are enabled
NotImplementedError – If subclass doesn’t implement this method
Example Implementation:
def apply_strategy(self, conn, trans): # INSERT OR IGNORE strategy stmt = self.table.insert().from_select( list(self._temp_table.columns.keys()), sa.select(self._temp_table).select_from( self._temp_table.outerjoin(self.table, ...) ).where(self.table.c[self.pk_name].is_(None)) ) result = conn.execute(stmt) self._inserted_rows = result.rowcount or 0 self._ignored_rows = len(self.values) - self._inserted_rows
- execute_operation(conn: Connection, trans: Transaction)[source]¶
Execute the complete upsert operation within the provided transaction context.
This method implements the Template Method pattern, orchestrating the common workflow while delegating strategy-specific logic to subclasses. It operates within either a user-managed or auto-managed transaction depending on how the executor was configured.
Execution Flow:
Create Staging: Create temporary table for bulk data processing
Load Data: Bulk insert all candidate records into staging area
Apply Strategy: Execute subclass-specific upsert logic
Cleanup: Remove temporary resources
Error Handling:
If any step fails, the method re-raises the original exception. Cleanup of temporary tables is handled by the caller based on transaction mode (auto vs user-managed).
Performance Notes:
The temporary table approach provides significant performance benefits over row-by-row operations by enabling efficient bulk SQL operations and leveraging database JOIN optimizations.
- Parameters:
conn – Database connection within active transaction
trans – Active transaction context
- Returns:
Tuple of operation results (varies by strategy)
- Raises:
UpsertTestError – When testing flags are enabled
Exception – Any database or application errors during operation
- run()[source]¶
Execute the complete upsert operation with appropriate transaction management.
This is the main entry point for upsert operations. It handles both auto-managed and user-managed transaction modes, ensuring proper error handling and cleanup in all scenarios.
Transaction Modes:
Auto-managed (
conn=None, trans=None): Creates and manages its own database transaction. Automatically commits on success and rolls back on failure.User-managed (
conn=Connection, trans=Transaction): Operates within the caller’s existing transaction. The caller is responsible for committing or rolling back the transaction.
Error Handling:
In both modes, temporary table cleanup is handled appropriately: - Auto-managed: Uses fresh connection after transaction rollback - User-managed: Cleans up but preserves caller’s transaction state
Usage Examples:
# Auto-managed transaction executor = MyUpsertExecutor.new(engine, table, data) result = executor.run() # User-managed transaction with engine.connect() as conn: with conn.begin() as trans: executor = MyUpsertExecutor.new( engine, table, data, conn=conn, trans=trans ) result = executor.run() # Additional operations... # trans.commit() handled by context manager
- Returns:
Tuple of operation results (varies by strategy): - INSERT OR IGNORE: (_ignored_rows, _inserted_rows) - INSERT OR REPLACE: (_replaced_rows, _inserted_rows) - UPSERT/MERGE: (_updated_rows, _inserted_rows)
- Raises:
ValueError – When transaction mode parameters are inconsistent
UpsertTestError – When testing flags are enabled
Exception – Any database or application errors during operation