insert_or_replace

class sqlalchemy_upsert_kit.sqlite.insert_or_replace.InsertOrReplaceExcutor(engine: sqlalchemy.engine.base.Engine, table: sqlalchemy.sql.schema.Table, values: list[dict[str, Any]], metadata: sqlalchemy.sql.schema.MetaData | None, temp_table_name: str | None, conn: sqlalchemy.engine.base.Connection | None, trans: sqlalchemy.engine.base.Transaction | None, columns: list[str] | None, _raise_on_temp_table_create: bool, _raise_on_temp_data_insert: bool, _raise_on_target_delete: bool, _raise_on_target_insert: bool, _raise_on_temp_table_drop: bool, _raise_on_merge_update: bool, _ignored_rows: int = 0, _replaced_rows: int = 0, _updated_rows: int = 0, _inserted_rows: int = 0, _temp_table_created: bool = False)[source]
apply_strategy(conn: Connection, trans: Transaction)[source]

Apply the upsert strategy-specific logic.

This abstract method must be implemented by subclasses to define their specific upsert behavior. The method is called after the temporary table has been created and populated with candidate data.

Implementation Requirements:

Subclasses should implement the core database operations that define their upsert strategy:

  • INSERT OR IGNORE: Use LEFT JOIN to insert only non-conflicting records

  • INSERT OR REPLACE: Delete conflicting records, then insert all records

  • UPSERT/MERGE: Update existing records, insert new ones

State Management:

Implementations should update the appropriate result counters:

  • self._ignored_rows - Records ignored (INSERT OR IGNORE)

  • self._replaced_rows - Records replaced (INSERT OR REPLACE)

  • self._updated_rows - Records updated (UPSERT/MERGE)

  • self._inserted_rows - New records inserted

Error Handling:

Implementations can use testing flags for controlled failure simulation:

  • self._raise_on_target_delete - Simulate deletion failures

  • self._raise_on_target_insert - Simulate insertion failures

Parameters:
  • conn – Database connection within active transaction

  • trans – Active transaction context

Raises:

Example Implementation:

def apply_strategy(self, conn, trans):
    # INSERT OR IGNORE strategy
    stmt = self.table.insert().from_select(
        list(self._temp_table.columns.keys()),
        sa.select(self._temp_table).select_from(
            self._temp_table.outerjoin(self.table, ...)
        ).where(self.table.c[self.pk_name].is_(None))
    )
    result = conn.execute(stmt)
    self._inserted_rows = result.rowcount or 0
    self._ignored_rows = len(self.values) - self._inserted_rows
sqlalchemy_upsert_kit.sqlite.insert_or_replace.insert_or_replace(engine: Engine, table: Table, values: list[dict[str, Any]], metadata: MetaData | None = None, temp_table_name: str | None = None, conn: Connection | None = None, trans: Transaction | None = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_delete: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False) tuple[int, int][source]

Perform high-performance bulk INSERT-OR-REPLACE operation using temporary table.

This function performs bulk upsert operations: replaces existing records entirely with new data and inserts records that don’t exist. This is equivalent to “INSERT OR REPLACE” or complete record replacement but works more efficiently for large datasets.

Algorithm:

  1. Creates temporary table and loads all candidate data

  2. Uses JOIN to identify conflicting records in target table

  3. Deletes conflicting records from target table

  4. Bulk inserts all records from temporary table (both new and replacement)

  5. Cleans up temporary resources

This approach is ideal for:

  • Full synchronization from authoritative data source

  • Complete data refresh scenarios

  • When new data should completely replace existing records

Transaction Management:

This function supports both auto-managed and user-managed transaction modes. See the module-level documentation for detailed explanations of each mode.

Parameters:
  • engine – SQLAlchemy engine for database connection

  • table – Target table for upsert operation

  • values – Records to insert or replace. Must include primary key values for conflict detection.

  • metadata – Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation.

  • temp_table_name – Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts.

  • conn – Optional database connection for user-managed transaction mode. Must be provided together with trans parameter.

  • trans – Optional transaction for user-managed transaction mode. Must be provided together with conn parameter.

Returns:

Tuple of (replaced_rows, inserted_rows): - replaced_rows: Number of existing records that were replaced - inserted_rows: Number of new records that were inserted

Raises:
  • ValueError – When conn and trans parameters are provided inconsistently (one is None while the other is not)

  • UpsertTestError – When testing flags are enabled and corresponding operations fail

Examples:

Auto-managed transaction (default mode):

# Function manages its own transaction
updated, inserted = insert_or_replace(engine, users_table, new_data)

User-managed transaction mode:

# Operation is part of larger transaction
with engine.connect() as conn:
    with conn.begin() as trans:
        # Other operations...
        updated, inserted = insert_or_replace(
            engine, users_table, new_data, conn=conn, trans=trans
        )
        # More operations...

Complete replacement example:

# Target table has records with id=1,2,3
new_data = [
    {'id': 2, 'name': 'Bob Updated'},    # Exists - will be replaced
    {'id': 4, 'name': 'Charlie'},        # New - will be inserted
    {'id': 5, 'name': 'David'},          # New - will be inserted
]
updated, inserted = insert_or_replace(engine, users_table, new_data)
# Result: updated=1, inserted=2
Performance Comparison:

Traditional row-by-row approach (100K records): ~300 seconds This method (100K records): ~15 seconds Performance gain: ~20x faster

Note

Parameters prefixed with _raise_on_ are exclusively for testing error handling and cleanup behavior. Never use these in production code.

Warning

This operation completely replaces existing records. All fields of conflicting records (including historical fields like timestamps) will be overwritten with new data.