Source code for sqlalchemy_upsert_kit.sqlite.insert_or_ignore

# -*- coding: utf-8 -*-

import typing as T
import dataclasses

import sqlalchemy as sa

from ..exc import UpsertTestError

from .executor import UpsertExecutor


[docs] @dataclasses.dataclass class InsertOrIgnoreExecutor(UpsertExecutor):
[docs] def apply_strategy( self, conn: sa.Connection, trans: sa.Transaction, ): if self._raise_on_target_insert: # Testing flag raise UpsertTestError("error on target insertion") stmt = self.table.insert().from_select( list(self._temp_table.columns.keys()), sa.select(self._temp_table) .select_from( self._temp_table.outerjoin( # LEFT JOIN to find non-matches self.table, self._temp_table.c[self.pk_name] == self.table.c[self.pk_name], ) ) .where( self.table.c[self.pk_name].is_(None) ), # Only insert where no match exists ) res = conn.execute(stmt) try: self._inserted_rows = res.rowcount if res.rowcount is not None else 0 self._ignored_rows = len(self.values) - self._inserted_rows except: # pragma: no cover self._inserted_rows = 0 self._ignored_rows = len(self.values)
[docs] def insert_or_ignore( engine: sa.Engine, table: sa.Table, values: list[dict[str, T.Any]], metadata: T.Optional[sa.MetaData] = None, temp_table_name: T.Optional[str] = None, conn: T.Optional[sa.Connection] = None, trans: T.Optional[sa.Transaction] = None, _raise_on_temp_table_create: bool = False, _raise_on_temp_data_insert: bool = False, _raise_on_target_insert: bool = False, _raise_on_temp_table_drop: bool = False, ) -> tuple[int, int]: """ Perform high-performance bulk INSERT-IF-NOT-EXISTS operation using temporary table. This function performs conditional bulk insertion: only inserts records whose primary keys don't already exist in the target table. This is equivalent to "INSERT IGNORE" or "INSERT ... ON CONFLICT DO NOTHING" but works more efficiently. **Algorithm**: 1. Creates temporary table and loads all candidate data 2. Uses LEFT JOIN to identify records not in target table 3. Bulk inserts only the non-conflicting records 4. Cleans up temporary resources This approach is ideal for: - Incremental data loading where duplicates should be ignored - ETL processes that need idempotent behavior - Syncing data from external sources **Transaction Management**: This function supports both auto-managed and user-managed transaction modes. See the module-level documentation for detailed explanations of each mode. :param engine: SQLAlchemy engine for database connection :param table: Target table for conditional insertion :param values: Records to insert if they don't exist. Must include primary key values for conflict detection. :param metadata: Optional metadata instance for temporary table isolation. If None, a new MetaData instance is created for clean separation. :param temp_table_name: Optional custom name for temporary table. If None, generates unique name with timestamp to avoid conflicts. :param conn: Optional database connection for user-managed transaction mode. Must be provided together with ``trans`` parameter. :param trans: Optional transaction for user-managed transaction mode. Must be provided together with ``conn`` parameter. :returns: Tuple of (ignored_rows, inserted_rows): - ignored_rows: Number of records that were not inserted (already existed) - inserted_rows: Number of new records successfully inserted :raises ValueError: When conn and trans parameters are provided inconsistently (one is None while the other is not) :raises UpsertTestError: When testing flags are enabled and corresponding operations fail **Examples**: Auto-managed transaction (default mode):: # Function manages its own transaction ignored, inserted = insert_or_ignore(engine, users_table, new_data) User-managed transaction mode:: # Operation is part of larger transaction with engine.connect() as conn: with conn.begin() as trans: # Other operations... ignored, inserted = insert_or_ignore( engine, users_table, new_data, conn=conn, trans=trans ) # More operations... Conflict detection example:: # Target table has records with id=1,2,3 new_data = [ {'id': 2, 'name': 'Bob'}, # Exists - will be ignored {'id': 4, 'name': 'Charlie'}, # New - will be inserted {'id': 5, 'name': 'David'}, # New - will be inserted ] ignored, inserted = insert_or_ignore(engine, users_table, new_data) # Result: ignored=1, inserted=2 .. note:: Parameters prefixed with ``_raise_on_`` are exclusively for testing error handling and cleanup behavior. Never use these in production code. """ if not values: # pragma: no cover return 0, 0 # No-op for empty data executor = InsertOrIgnoreExecutor.new( engine=engine, table=table, values=values, metadata=metadata, temp_table_name=temp_table_name, conn=conn, trans=trans, _raise_on_temp_table_create=_raise_on_temp_table_create, _raise_on_temp_data_insert=_raise_on_temp_data_insert, _raise_on_target_insert=_raise_on_target_insert, _raise_on_temp_table_drop=_raise_on_temp_table_drop, ) executor.run() ignored_rows, inserted_rows = executor._ignored_rows, executor._inserted_rows return ignored_rows, inserted_rows