SQL Alchemy


Python with SQL Commands


# Import 
import sqlalchemy as db
import pandas as pd
# Connection string
conn_str = f"mysql+pymysql://{username}:{password}@{ip:port}/{db_name}"
# Define sql command 
sql = """
select * 
from table
limit 10;
"""
# Get dataframe
with db.create_engine(conn_str).connect() as conn:
    df = pd.read_sql(sql,con=conn)
    

Python with functional SQL Commands


import sqlalchemy as db
import pandas as pd
from sqlalchemy.sql import select
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy import Column,Integer,String,ForeignKey
from sqlalchemy import join

# Read one table
engine = create_engine(conn_str)
metadata = MetaData()
t = Table(tablename,metadata,autoload=True,autoload_with=engine)
cols = list(t.c)
with engine.connect() as conn:
    query = (
    select([cols[0],cols[1]]).where(cols[0].like(None)).limit(10)
    )
    for row in conn.execute(query):
        print(row)

# Join tables
t2 = ...
t3 = t.join(t2,t.a==t2.a)
query = select([t.a,t.b,t2.c,t2.d])
            .select_from(t3)\
            .where(t2.d.like("??"))\
            .where(t.b=="??")\
            .limit(10)
with engine.connect() as conn:
    result = conn.execute(query)
    for row in result:
        print(row)
    
class with functional components

class queryConstruct(object):
    def __init__(self,conn_str):
        self.engine = create_engine(conn_str)
        metadata = MetaData()
        self.t1 = Table(t1name,
                  metadata,autoload=True,
                  autoload_with=self.engine);
        self.t2 = Table(t2name,
                  metadata,autoload=True,
                  autoload_with=self.engine);
        self.c1 = self.t1.c
        self.c2 = self.t2.c
        self.t3 = self.t1.join(self.t2,
                self.c1.a==self.c2.a)
        self.basicQuery = select([self.c1.a,
            self.c1.b,self.c1.c,
            self.c1.d,self.c1.e,
            self.c2.f,self.c1.g,self.c1.h,
            self.c2.i]).select_from(self.t3)
    def date(self,date): #'2018-%'
        self.basicQuery = self.basicQuery\
                .where(self.c1.Date.like(date))
        return self
    def county(self,county): #'Greater London'
        self.basicQuery = self.basicQuery\
                .where(self.c1.County==county)
        return self
    def postcode(self,postcode): #'N1C %'
        self.basicQuery = self.basicQuery\
                .where(self.c1.PostCode.like(postcode))
        return self
    def limit(self,limit): #10
        self.basicQuery = self.basicQuery\
                .limit(limit)
        return self
    def offset(self,offset):
        self.basicQuery = self.basicQuery\
                .offset(offset)
        return self
    def epctype(self,epctype):
        self.basicQuery = self.basicQuery\
                .where(self.c2.EPCType==epctype)
        return self
    def getQueryResult(self):
        with self.engine.connect() as conn:
            result = conn.execute(self.basicQuery)
        return result


q = queryConstruct(conn_str)
q.date('2010-%')\
    .epctype('NON-DOMESTIC')\
    .postcode('NW1 %')\
    .offset(10)\
    .limit(10)

result = q.getQueryResult()
for row in result:
    print(row)
    

References