insert_or_replace¶
- class sqlalchemy_upsert_kit.sqlite.insert_or_replace.InsertOrReplaceExcutor(engine: sqlalchemy.engine.base.Engine, table: sqlalchemy.sql.schema.Table, values: list[dict[str, Any]], metadata: sqlalchemy.sql.schema.MetaData | None, temp_table_name: str | None, conn: sqlalchemy.engine.base.Connection | None, trans: sqlalchemy.engine.base.Transaction | None, columns: list[str] | None, _raise_on_temp_table_create: bool, _raise_on_temp_data_insert: bool, _raise_on_target_delete: bool, _raise_on_target_insert: bool, _raise_on_temp_table_drop: bool, _raise_on_merge_update: bool, _ignored_rows: int = 0, _replaced_rows: int = 0, _updated_rows: int = 0, _inserted_rows: int = 0, _temp_table_created: bool = False)[source]¶
- apply_strategy(conn: Connection, trans: Transaction)[source]¶
Apply the upsert strategy-specific logic.
This abstract method must be implemented by subclasses to define their specific upsert behavior. The method is called after the temporary table has been created and populated with candidate data.
Implementation Requirements:
Subclasses should implement the core database operations that define their upsert strategy:
INSERT OR IGNORE: Use LEFT JOIN to insert only non-conflicting records
INSERT OR REPLACE: Delete conflicting records, then insert all records
UPSERT/MERGE: Update existing records, insert new ones
State Management:
Implementations should update the appropriate result counters:
self._ignored_rows- Records ignored (INSERT OR IGNORE)self._replaced_rows- Records replaced (INSERT OR REPLACE)self._updated_rows- Records updated (UPSERT/MERGE)self._inserted_rows- New records inserted
Error Handling:
Implementations can use testing flags for controlled failure simulation:
self._raise_on_target_delete- Simulate deletion failuresself._raise_on_target_insert- Simulate insertion failures
- Parameters:
conn – Database connection within active transaction
trans – Active transaction context
- Raises:
UpsertTestError – When testing flags are enabled
NotImplementedError – If subclass doesn’t implement this method
Example Implementation:
def apply_strategy(self, conn, trans): # INSERT OR IGNORE strategy stmt = self.table.insert().from_select( list(self._temp_table.columns.keys()), sa.select(self._temp_table).select_from( self._temp_table.outerjoin(self.table, ...) ).where(self.table.c[self.pk_name].is_(None)) ) result = conn.execute(stmt) self._inserted_rows = result.rowcount or 0 self._ignored_rows = len(self.values) - self._inserted_rows
- sqlalchemy_upsert_kit.sqlite.insert_or_replace.insert_or_replace(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_delete: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False) tuple[int, int][source]¶
Perform high-performance bulk INSERT-OR-REPLACE operation using temporary table.
This function performs bulk upsert operations: replaces existing records entirely with new data and inserts records that don’t exist. This is equivalent to “INSERT OR REPLACE” or complete record replacement but works more efficiently for large datasets.
Algorithm:
Creates temporary table and loads all candidate data
Uses JOIN to identify conflicting records in target table
Deletes conflicting records from target table
Bulk inserts all records from temporary table (both new and replacement)
Cleans up temporary resources
This approach is ideal for:
Full synchronization from authoritative data source
Complete data refresh scenarios
When new data should completely replace existing records
Transaction Management:
This function supports both auto-managed and user-managed transaction modes. See the module-level documentation for detailed explanations of each mode.
- Parameters:
engine – SQLAlchemy engine for database connection
table – Target table for upsert operation
values – Records to insert or replace. Must include primary key values for conflict detection.
metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.
temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.
conn – Optional database connection for user-managed transaction mode. Must be provided together with
transparameter.trans – Optional transaction for user-managed transaction mode. Must be provided together with
connparameter.
- Returns:
Tuple of (replaced_rows, inserted_rows): - replaced_rows: Number of existing records that were replaced - inserted_rows: Number of new records that were inserted
- Raises:
ValueError – When conn and trans parameters are provided inconsistently (one is None while the other is not)
UpsertTestError – When testing flags are enabled and corresponding operations fail
Examples:
Auto-managed transaction (default mode):
# Function manages its own transaction updated, inserted = insert_or_replace(engine, users_table, new_data)
User-managed transaction mode:
# Operation is part of larger transaction with engine.connect() as conn: with conn.begin() as trans: # Other operations... updated, inserted = insert_or_replace( engine, users_table, new_data, conn=conn, trans=trans ) # More operations...
Complete replacement example:
# Target table has records with id=1,2,3 new_data = [ {'id': 2, 'name': 'Bob Updated'}, # Exists - will be replaced {'id': 4, 'name': 'Charlie'}, # New - will be inserted {'id': 5, 'name': 'David'}, # New - will be inserted ] updated, inserted = insert_or_replace(engine, users_table, new_data) # Result: updated=1, inserted=2
- Performance Comparison:
Traditional row-by-row approach (100K records): ~300 seconds This method (100K records): ~15 seconds Performance gain: ~20x faster
Note
Parameters prefixed with
_raise_on_are exclusively for testing error handling and cleanup behavior. Never use these in production code.Warning
This operation completely replaces existing records. All fields of conflicting records (including historical fields like timestamps) will be overwritten with new data.