SQL Alchemy
Python with SQL Commands
# Import
import sqlalchemy as db
import pandas as pd
# Connection string
conn_str = f"mysql+pymysql://{username}:{password}@{ip:port}/{db_name}"
# Define sql command
sql = """
select *
from table
limit 10;
"""
# Get dataframe
with db.create_engine(conn_str).connect() as conn:
df = pd.read_sql(sql,con=conn)
Python with functional SQL Commands
import sqlalchemy as db
import pandas as pd
from sqlalchemy.sql import select
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy import Column,Integer,String,ForeignKey
from sqlalchemy import join
# Read one table
engine = create_engine(conn_str)
metadata = MetaData()
t = Table(tablename,metadata,autoload=True,autoload_with=engine)
cols = list(t.c)
with engine.connect() as conn:
query = (
select([cols[0],cols[1]]).where(cols[0].like(None)).limit(10)
)
for row in conn.execute(query):
print(row)
# Join tables
t2 = ...
t3 = t.join(t2,t.a==t2.a)
query = select([t.a,t.b,t2.c,t2.d])
.select_from(t3)\
.where(t2.d.like("??"))\
.where(t.b=="??")\
.limit(10)
with engine.connect() as conn:
result = conn.execute(query)
for row in result:
print(row)
class with functional components
class queryConstruct(object):
def __init__(self,conn_str):
self.engine = create_engine(conn_str)
metadata = MetaData()
self.t1 = Table(t1name,
metadata,autoload=True,
autoload_with=self.engine);
self.t2 = Table(t2name,
metadata,autoload=True,
autoload_with=self.engine);
self.c1 = self.t1.c
self.c2 = self.t2.c
self.t3 = self.t1.join(self.t2,
self.c1.a==self.c2.a)
self.basicQuery = select([self.c1.a,
self.c1.b,self.c1.c,
self.c1.d,self.c1.e,
self.c2.f,self.c1.g,self.c1.h,
self.c2.i]).select_from(self.t3)
def date(self,date): #'2018-%'
self.basicQuery = self.basicQuery\
.where(self.c1.Date.like(date))
return self
def county(self,county): #'Greater London'
self.basicQuery = self.basicQuery\
.where(self.c1.County==county)
return self
def postcode(self,postcode): #'N1C %'
self.basicQuery = self.basicQuery\
.where(self.c1.PostCode.like(postcode))
return self
def limit(self,limit): #10
self.basicQuery = self.basicQuery\
.limit(limit)
return self
def offset(self,offset):
self.basicQuery = self.basicQuery\
.offset(offset)
return self
def epctype(self,epctype):
self.basicQuery = self.basicQuery\
.where(self.c2.EPCType==epctype)
return self
def getQueryResult(self):
with self.engine.connect() as conn:
result = conn.execute(self.basicQuery)
return result
q = queryConstruct(conn_str)
q.date('2010-%')\
.epctype('NON-DOMESTIC')\
.postcode('NW1 %')\
.offset(10)\
.limit(10)
result = q.getQueryResult()
for row in result:
print(row)
References