Quick Start Guide¶
This guide demonstrates the three high-performance bulk upsert operations provided by sqlalchemy_upsert_kit using real examples with an in-memory SQLite database.
Understanding Upsert Operations¶
Upsert operations combine “insert” and “update” logic to handle data synchronization efficiently. The three strategies offered by this library each serve different use cases:
insert_or_ignore: Inserts only new records, ignores conflicts with existing datainsert_or_replace: Completely replaces existing records with new datainsert_or_merge: Selectively updates specified columns while preserving others
All operations use temporary table staging and SQL JOIN operations to achieve ~20x performance improvements over traditional row-by-row approaches.
Setting Up the Database¶
Let’s start by creating an in-memory SQLite database and setting up our test table:
[1]:
from datetime import datetime, timezone, timedelta
import sqlalchemy as sa
from sqlalchemy import MetaData, Table, Column, Integer, String, DateTime
from sqlalchemy_upsert_kit.sqlite.insert_or_ignore import insert_or_ignore
from sqlalchemy_upsert_kit.sqlite.insert_or_replace import insert_or_replace
from sqlalchemy_upsert_kit.sqlite.insert_or_merge import insert_or_merge
# Create in-memory SQLite database
engine = sa.create_engine("sqlite:///:memory:")
# Define our test table structure
metadata = MetaData()
records_table = Table(
'records',
metadata,
Column('id', Integer, primary_key=True),
Column('desc', String, nullable=True),
Column('create_at', DateTime, nullable=False),
Column('update_at', DateTime, nullable=False)
)
# Create the table
metadata.create_all(engine)
print("✅ Database and table created successfully")
✅ Database and table created successfully
Test Data Setup¶
We’ll use a classic test scenario with:
4 existing records (IDs 1-4) already in the database
3 input records for upsert operations
2 conflict records (IDs 3-4) that exist in both existing and input data
1 new record (ID 5) that doesn’t exist in the database
[2]:
# Helper function to display data in table format
import typing as T
from sqlalchemy_upsert_kit.tests.utils import pt_from_many_dict
def display_records(records: list[dict[str, T.Any]], title: str="Records"):
print(f"\n{title}:")
print(pt_from_many_dict(records))
# Set up timestamps
create_time = datetime.now(timezone.utc).replace(microsecond=0)
update_time = create_time + timedelta(minutes=1)
# Prepare existing data (4 records with IDs 1-4)
existing_data = [
{'id': 1, 'desc': 'v1', 'create_at': create_time, 'update_at': create_time},
{'id': 2, 'desc': 'v1', 'create_at': create_time, 'update_at': create_time},
{'id': 3, 'desc': 'v1', 'create_at': create_time, 'update_at': create_time},
{'id': 4, 'desc': 'v1', 'create_at': create_time, 'update_at': create_time}
]
# Insert existing data
with engine.connect() as conn:
conn.execute(records_table.insert(), existing_data)
conn.commit()
# Prepare input data (3 records: 2 conflicts + 1 new)
input_data = [
{'id': 3, 'desc': 'v2', 'create_at': create_time, 'update_at': update_time}, # Conflict
{'id': 4, 'desc': 'v2', 'create_at': create_time, 'update_at': update_time}, # Conflict
{'id': 5, 'desc': 'v2', 'create_at': update_time, 'update_at': update_time} # New
]
# Display initial state
with engine.connect() as conn:
existing_records = conn.execute(sa.select(records_table).order_by(records_table.c.id)).mappings().fetchall()
display_records([dict(r) for r in existing_records], "Existing Records in Database")
display_records(input_data, "Input Data for Upsert Operations")
print(f"\n📊 Data Summary:")
print(f" • Existing records: 4 (IDs 1-4)")
print(f" • Input records: 3 (IDs 3-5)")
print(f" • Conflict records: 2 (IDs 3-4 exist in both)")
print(f" • New records: 1 (ID 5 is new)")
Existing Records in Database:
+----+------+---------------------+---------------------+
| id | desc | create_at | update_at |
+----+------+---------------------+---------------------+
| 1 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 2 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 3 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 4 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
+----+------+---------------------+---------------------+
Input Data for Upsert Operations:
+----+------+---------------------------+---------------------------+
| id | desc | create_at | update_at |
+----+------+---------------------------+---------------------------+
| 3 | v2 | 2025-07-02 16:25:01+00:00 | 2025-07-02 16:26:01+00:00 |
| 4 | v2 | 2025-07-02 16:25:01+00:00 | 2025-07-02 16:26:01+00:00 |
| 5 | v2 | 2025-07-02 16:26:01+00:00 | 2025-07-02 16:26:01+00:00 |
+----+------+---------------------------+---------------------------+
📊 Data Summary:
• Existing records: 4 (IDs 1-4)
• Input records: 3 (IDs 3-5)
• Conflict records: 2 (IDs 3-4 exist in both)
• New records: 1 (ID 5 is new)
[7]:
def reset_database():
# Reset database to original state for clean merge demonstration
with engine.connect() as conn:
conn.execute(records_table.delete())
conn.execute(records_table.insert(), existing_data)
conn.commit()
print("Database reset to original state for merge demonstration")
Operation 1: Insert or Ignore¶
Strategy: Insert only new records, ignore conflicts with existing data.
Expected Behavior:
IDs 3-4: Ignored (already exist)
ID 5: Inserted (new record)
Final result: 2 rows ignored, 1 row inserted
[9]:
print("\n" + "=" * 60)
print("🔍 OPERATION 1: INSERT OR IGNORE")
print("=" * 60)
# Reset database to original state for clean merge demonstration
reset_database()
# Perform insert_or_ignore operation
ignored_rows, inserted_rows = insert_or_ignore(
engine=engine,
table=records_table,
values=input_data
)
print(f"Result: {ignored_rows} rows ignored, {inserted_rows} rows inserted")
# Display final state
with engine.connect() as conn:
final_records = conn.execute(sa.select(records_table).order_by(records_table.c.id)).mappings().fetchall()
display_records(final_records, "Database After INSERT OR IGNORE")
print(f"\n✅ Verification:")
print(f" • Records 3-4: Preserved original data (desc='v1', original timestamps)")
print(f" • Record 5: Successfully inserted with new data")
print(f" • Total records: {len(final_records)} (was 4, added 1)")
============================================================
🔍 OPERATION 1: INSERT OR IGNORE
============================================================
Database reset to original state for merge demonstration
Result: 2 rows ignored, 1 rows inserted
Database After INSERT OR IGNORE:
+----+------+---------------------+---------------------+
| id | desc | create_at | update_at |
+----+------+---------------------+---------------------+
| 1 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 2 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 3 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 4 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 5 | v2 | 2025-07-02 16:26:01 | 2025-07-02 16:26:01 |
+----+------+---------------------+---------------------+
✅ Verification:
• Records 3-4: Preserved original data (desc='v1', original timestamps)
• Record 5: Successfully inserted with new data
• Total records: 5 (was 4, added 1)
Operation 2: Insert or Replace¶
Strategy: Completely replace existing records with new data, insert new records.
Expected Behavior:
IDs 3-4: Completely replaced with new data
ID 5: Already exists from previous operation, will be replaced
Final result: 3 rows replaced, 0 rows inserted (all input records already exist)
[10]:
print("\n" + "=" * 60)
print("🔄 OPERATION 2: INSERT OR REPLACE")
print("=" * 60)
# Reset database to original state for clean merge demonstration
reset_database()
# Perform insert_or_replace operation
replaced_rows, inserted_rows = insert_or_replace(
engine=engine,
table=records_table,
values=input_data
)
print(f"Result: {replaced_rows} rows replaced, {inserted_rows} rows inserted")
# Display final state
with engine.connect() as conn:
final_records = conn.execute(sa.select(records_table).order_by(records_table.c.id)).mappings().fetchall()
display_records(final_records, "Database After INSERT OR REPLACE")
print(f"\n✅ Verification:")
print(f" • Records 1-2: Unchanged (not in input data)")
print(f" • Records 3-5: Completely replaced with new data (desc='v2', new timestamps)")
print(f" • Total records: {len(final_records)} (same count, but data replaced)")
============================================================
🔄 OPERATION 2: INSERT OR REPLACE
============================================================
Database reset to original state for merge demonstration
Result: 2 rows replaced, 1 rows inserted
Database After INSERT OR REPLACE:
+----+------+---------------------+---------------------+
| id | desc | create_at | update_at |
+----+------+---------------------+---------------------+
| 1 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 2 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 3 | v2 | 2025-07-02 16:25:01 | 2025-07-02 16:26:01 |
| 4 | v2 | 2025-07-02 16:25:01 | 2025-07-02 16:26:01 |
| 5 | v2 | 2025-07-02 16:26:01 | 2025-07-02 16:26:01 |
+----+------+---------------------+---------------------+
✅ Verification:
• Records 1-2: Unchanged (not in input data)
• Records 3-5: Completely replaced with new data (desc='v2', new timestamps)
• Total records: 5 (same count, but data replaced)
Operation 3: Insert or Merge¶
Strategy: Selectively update specified columns while preserving others, insert new records.
Expected Behavior:
IDs 3-5: Update only
update_atcolumn, preserve originaldescvaluesFinal result: 3 rows updated, 0 rows inserted (all input records already exist)
[11]:
print("\n" + "=" * 60)
print("🔀 OPERATION 3: INSERT OR MERGE")
print("=" * 60)
# Reset database to original state for clean merge demonstration
reset_database()
# Perform insert_or_merge operation - only update 'update_at' column
updated_rows, inserted_rows = insert_or_merge(
engine=engine,
table=records_table,
values=input_data,
columns=["update_at"] # Only update this column, preserve others
)
print(f"Result: {updated_rows} rows updated, {inserted_rows} rows inserted")
# Display final state
with engine.connect() as conn:
final_records = conn.execute(sa.select(records_table).order_by(records_table.c.id)).mappings().fetchall()
display_records(final_records, "Database After INSERT OR MERGE")
print(f"\n✅ Verification:")
print(f" • Records 1-2: Unchanged (not in input data)")
print(f" • Records 3-4: Selective update - desc='v1' preserved, update_at updated")
print(f" • Record 5: New record inserted with all input data")
print(f" • Total records: {len(final_records)} (was 4, added 1)")
============================================================
🔀 OPERATION 3: INSERT OR MERGE
============================================================
Database reset to original state for merge demonstration
Result: 2 rows updated, 1 rows inserted
Database After INSERT OR MERGE:
+----+------+---------------------+---------------------+
| id | desc | create_at | update_at |
+----+------+---------------------+---------------------+
| 1 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 2 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:25:01 |
| 3 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:26:01 |
| 4 | v1 | 2025-07-02 16:25:01 | 2025-07-02 16:26:01 |
| 5 | v2 | 2025-07-02 16:26:01 | 2025-07-02 16:26:01 |
+----+------+---------------------+---------------------+
✅ Verification:
• Records 1-2: Unchanged (not in input data)
• Records 3-4: Selective update - desc='v1' preserved, update_at updated
• Record 5: New record inserted with all input data
• Total records: 5 (was 4, added 1)
Comparison Summary¶
Here’s how each operation handles the same input data:
Operation |
Conflict Records (3-4) |
New Records (5) |
Total Result |
|---|---|---|---|
Insert or Ignore |
Ignored (preserved) |
Inserted |
2 ignored, 1 inserted |
Insert or Replace |
Completely replaced |
Inserted |
2 replaced, 1 inserted |
Insert or Merge |
Selectively updated |
Inserted |
2 updated, 1 inserted |
*Note: In the replace example, all input records already existed, so 0 new insertions occurred.
Transaction Management¶
All upsert operations are atomically executed within a single transaction, ensuring data consistency and integrity. The library supports two transaction modes:
Auto-Managed Transactions (Default)¶
# Library automatically manages the transaction
result = insert_or_ignore(engine, table, data)
# Transaction is committed automatically on success, rolled back on failure
User-Managed Transactions¶
# You control the transaction scope
with engine.connect() as conn:
with conn.begin() as trans:
# you can do database operation here, just make sure don't commit
result1 = insert_or_ignore(engine, table, data1, conn=conn, trans=trans)
# you can do database operation here, just make sure don't commit
result2 = insert_or_replace(engine, table, data2, conn=conn, trans=trans)
# you can do database operation here, just make sure don't commit
# all operations are part of the same transaction
# Commit/rollback handled by context manager
For detailed information about transaction modes, error handling, and advanced usage patterns, please refer to the :ref:transaction-mode documentation section.
Next Steps¶
Continue reading the documentation to learn about advanced features, error handling, and best practices for production usage.
[ ]: