Python

Python

Explore python code snippets and tutorials

Python

SqlAlchemy function insert items in batches

SqlAlchemy function to insert items in batches

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base,sessionmaker


Base = declarative_base()

engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()





class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Integer)



def insert_in_batches(session, model, data, batch_size=100):
    total_records = len(data)
    for i in range(0, total_records, batch_size):
        batch = data[i:i + batch_size]
        session.bulk_insert_mappings(model, batch)
        session.commit()
        print(f"Inserted batch {i // batch_size + 1}/{(total_records - 1) // batch_size + 1}")


products = [
    {"name": "Product A", "price": 10},
    {"name": "Product B", "price": 20},
    {"name": "Product C", "price": 30},
    # Assume there are many more products
]

# Insert the products in batches of 100

insert_in_batches(session, Product, products, batch_size=100)
Python

SQL query to json list in python

<p>convert sql query to json list in python with pyodbc</p>

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import pyodbc
import json

MS_SQL_Host = "host"
MS_SQL_Db = "db_name"
MS_SQL_User = "user"
MSSQL_Paswd = "pass"

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        # 👇️ if passed in object is instance of Decimal
        # convert it to a string
        if isinstance(obj, Decimal):
            return str(obj)
        # 👇️ otherwise use the default behavior
        return json.JSONEncoder.default(self, obj)

def write_json_data(data, filename):
    with open("{}.json".format(filename), "w", encoding="utf-8") as f:
        # json.dump(data, f, ensure_ascii=False, indent=4, cls=DecimalEncoder)
        json.dump(data, f, ensure_ascii=False, cls=DecimalEncoder)


def get_data(query, name):
    mssql_connection = pyodbc.connect(
        "DRIVER={SQL Server};SERVER="
        + MS_SQL_Host
        + ";DATABASE="
        + MS_SQL_Db
        + ";UID="
        + MS_SQL_User
        + ";PWD="
        + MSSQL_Paswd
    )
    mssql_cursor = mssql_connection.cursor()
    results = []
    mssql_cursor.execute(query)
    columns = [column[0] for column in mssql_cursor.description]
    for row in mssql_cursor.fetchall():
        data = dict(zip(columns, row))
        results.append(data)
    mssql_cursor.close()
    mssql_connection.close()
    return write_json_data(results, name)

data = get_data("sql_query", 'data')
Python

Psycopg2 insert or update json object

<p>Insert or update json&nbsp; object in psycopg2</p>

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def insert_update_json(db_table, column, obj, json_key, search_value):
    try:
        # Check if a record with the specified value in the search_column already exists
        cursor.execute(f"SELECT id FROM {db_table} WHERE {column}->>'{json_key}' = %s", (search_value,))
        existing_data = cursor.fetchone()

        if existing_data:
            # Update existing record
            cursor.execute(
                f"UPDATE {db_table} SET {column} = %s WHERE id = %s",
                (json.dumps({**obj}, ensure_ascii=False, cls=DecimalEncoder), existing_data[0])
            )
        else:
            # Insert new record
            cursor.execute(
                f"INSERT INTO {db_table} ({column}) VALUES (%s)",
                (json.dumps({**obj}, ensure_ascii=False, cls=DecimalEncoder),)
            )

        # Commit the transaction
        connection.commit()

    except Exception as e:
        # Handle exceptions, log errors, and rollback the transaction
        print(f"Error: {e}")
        connection.rollback()
Python

Psycopg2 reset tables

<p>To reset tables in postgresql with psycopg2 you can use&nbsp;</p> <p>&nbsp;</p> <p>&nbsp;</p>

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

connection = psycopg2.connect(user=User,
                                password=Paswd,
                                host=Host,
                                port=Port,
                                database=Db)
connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()


def reset_tables(tables):
    for table in tables:
        delete_query = f"DELETE FROM {table};"
        cursor.execute(delete_query)
        cursor.execute(f'TRUNCATE {table} RESTART IDENTITY;')
        cursor.execute(f'ALTER SEQUENCE {table}_id_seq RESTART WITH 1;')
        connection.commit()

tables = ['table1', 'table2']
reset_tables(tables)
Python

Get or create object in python with psycopg postgresql

<p>The code you provided is a Python function named <code>get_or_create_obj</code> that is designed to interact with a database. This function takes two arguments: <code>data</code>, which is a dictionary representing the …

python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def get_or_create_obj(data,db_table):
    columns = ', '.join(data.keys())
    values = ', '.join(['%s' for _ in data])
    query = "SELECT * FROM your_table WHERE "
    params = []
    for key, value in data.items():
        query += f"{key} = %s AND "
        params.append(value)

    # Remove the trailing "AND " from the query
    query = query[:-4]
    # Execute the SQL query with the dynamically generated conditions
    cursor.execute(query, params)
    # Fetch the result
    result = cursor.fetchone()
    if result is None:
        insert_query = f"INSERT INTO {db_table} ({columns}) VALUES ({values}) ON CONFLICT DO NOTHING"
        # Execute the SQL query
        cursor.execute(insert_query, list(data.values()))
        # Commit the transaction
        connection.commit()