sql >> Databáze >  >> RDS >> Mysql

Použití Pythonu a MySQL v procesu ETL:Použití Pythonu a SQLAlchemy

V předchozích dvou článcích této série jsme diskutovali o tom, jak používat Python a SQLAlchemy k provádění procesu ETL. Dnes uděláme totéž, ale tentokrát s použitím Pythonu a SQL Alchemy bez příkazů SQL v textovém formátu. To nám umožní používat SQLAlchemy bez ohledu na databázový stroj, ke kterému jsme připojeni. Takže začněme.

Dnes budeme diskutovat o tom, jak provést proces ETL pomocí Pythonu a SQLAlchemy. Vytvoříme skript pro extrakci denních dat z naší provozní databáze, transformujeme je a poté načteme do našeho datového skladu.

Toto je třetí článek ze série. Pokud jste nečetli první dva články (Using Python and MySQL in the ETL Process and SQLAlchemy), důrazně vám doporučuji, abyste tak učinili, než budete pokračovat.

Celá tato řada je pokračováním naší řady datových skladů:

  • Vytvoření DWH, Část první:Předplacený obchodní datový model
  • Vytvoření DWH, část 2:Předplacený obchodní datový model
  • Vytvoření datového skladu, část 3:Předplacený obchodní datový model

Dobře, pojďme se pustit do dnešního tématu. Nejprve se podívejme na datové modely.

Datové modely



Provozní (živý) databázový datový model




Datový model DWH


Toto jsou dva datové modely, které budeme používat. Další informace o datových skladech (DWH) najdete v těchto článcích:

  • Hvězdné schéma
  • Schéma sněhových vloček
  • Hvězdné schéma vs. schéma sněhové vločky

Proč SQLAlchemy?

Celá myšlenka SQLAlchemy spočívá v tom, že poté, co importujeme databáze, nepotřebujeme kód SQL, který je specifický pro související databázový stroj. Místo toho můžeme importovat objekty do SQLAlchemy a použít syntaxi SQLAlchemy pro příkazy. To nám umožní používat stejný jazyk bez ohledu na to, ke kterému databázovému stroji jsme připojeni. Hlavní výhodou je, že se vývojář nemusí starat o rozdíly mezi různými databázovými stroji. Váš program SQLAlchemy bude fungovat úplně stejně (s menšími změnami), pokud migrujete na jiný databázový stroj.

Rozhodl jsem se pro komunikaci s dočasným úložištěm a mezi různými databázemi používat pouze příkazy SQLAlchemy a seznamy Pythonu. Klíčovým důvodem tohoto rozhodnutí je, že 1) seznamy Pythonu jsou dobře známé a 2) kód by byl čitelný i pro ty, kdo nemají znalosti Pythonu.

To neznamená, že SQLAlchemy je perfektní. Má určitá omezení, o kterých budeme diskutovat později. Prozatím se podívejme na kód níže:

Spuštění skriptu a výsledku

Toto je příkaz Pythonu používaný k volání našeho skriptu. Skript kontroluje data v provozní databázi, porovnává hodnoty s DWH a importuje nové hodnoty. V tomto příkladu aktualizujeme hodnoty ve dvou tabulkách dimenzí a jedné tabulce faktů; skript vrátí příslušný výstup. Celý skript je napsán tak, že jej můžete spouštět vícekrát denně. Smaže „stará“ data pro daný den a nahradí je novými.

Pojďme analyzovat celý skript, počínaje shora.

Import SQLAlchemy

První věc, kterou musíme udělat, je importovat moduly, které použijeme ve skriptu. Obvykle své moduly importujete při psaní skriptu. Ve většině případů nebudete hned na začátku přesně vědět, které moduly budete potřebovat.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Importovali jsme datetime Pythonu modul, který nám dodává třídy, které pracují s daty.

Dále máme sqlalchemy modul. Nebudeme importovat celý modul, pouze věci, které potřebujeme – některé specifické pro SQLAlchemy (create_engine , MetaData , Table ), některé části příkazů SQL (select , and_ , case ) a func , což nám umožňuje používat funkce jako count() a sum() .

Připojování k databázím

Potřebujeme se připojit ke dvěma databázím na našem serveru. V případě potřeby jsme se mohli připojit k více databázím (MySQL, SQL Server nebo jakékoli jiné) z různých serverů. V tomto případě jsou obě databáze databáze MySQL a jsou uloženy na mém místním počítači.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Vytvořili jsme dva motory a dvě připojení. Nebudu zde zacházet do podrobností, protože jsme to již vysvětlili v předchozím článku.

Aktualizace dim_time Rozměr

Cíl:Vložte včerejší datum, pokud již není vloženo do tabulky.

V našem skriptu aktualizujeme dvě tabulky dimenzí novými hodnotami. Zbytek se řídí stejným vzorem, takže to projdeme pouze jednou; nemusíme ještě několikrát zapisovat téměř stejný kód.

Myšlenka je velmi jednoduchá. Vždy spustíme skript pro vložení nových dat za včerejšek. Proto musíme zkontrolovat, zda bylo toto datum vloženo do tabulky dimenzí. Pokud už tam je, neuděláme nic; pokud ne, doplníme. Podívejme se na kód pro aktualizaci dim_time tabulka.

Nejprve zkontrolujeme, zda datum existuje. Pokud neexistuje, přidáme. Začneme uložením včerejšího data do proměnné. V Pythonu to uděláte takto:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

První řádek obsahuje aktuální datum, převede ho na číselnou hodnotu, odečte od této hodnoty 1 a převede tuto číselnou hodnotu zpět na datum (včera =dnes – 1 ). Druhý řádek ukládá datum v textovém formátu.

Dále otestujeme, zda je datum již v databázi:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Po načtení tabulky spustíme dotaz, který by měl vrátit všechny řádky z tabulky dimenzí, kde se hodnota času a data rovná včerejšku. Výsledek může mít 0 (v tabulce takové datum není) nebo 1 řádek (datum již v tabulce je).

Pokud datum ještě není v tabulce, použijeme k jeho přidání příkaz insert():

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Jedna nová věc, na kterou bych rád poukázal, je použití. .year , .month , .isocalendar()[1] a .weekday získat dataparts.

Aktualizace dim_city Rozměr

Cíl:Vložit nová města, pokud nějaká existují (tj. porovnat seznam měst v živé databázi se seznamem měst v DWH a přidat chybějící).

Aktualizace dim_time rozměr byl docela jednoduchý. Jednoduše jsme otestovali, zda je datum v tabulce, a vložili jej, pokud tam již nebylo. K testování hodnoty v databázi DWH jsme použili proměnnou Pythonu (včera ). Tento proces použijeme znovu, ale tentokrát se seznamy.

Protože neexistuje snadný způsob, jak kombinovat tabulky z různých databází v jediném dotazu SQLAlchemy, nemůžeme použít přístup popsaný v části 1 této série. Proto budeme potřebovat objekt pro uložení hodnot potřebných ke komunikaci mezi těmito dvěma databázemi. Rozhodl jsem se použít seznamy, protože jsou běžné a dělají svou práci.

Nejprve načteme country a city tabulky z živé databáze do příslušných objektů.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Dále načteme dim_city tabulky z DWH do seznamu:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Potom uděláme totéž pro hodnoty z živé databáze. Připojíme se k tabulkám country a city takže máme všechna potřebná data v tomto seznamu:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Nyní projdeme seznam obsahující data z živé databáze. Pro každý záznam porovnáme hodnoty (city_name , postal_code a country_name ). Pokud takové hodnoty nenajdeme, přidáme nový záznam do dim_city tabulka.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Abychom zjistili, zda je hodnota již v DWH, otestovali jsme kombinaci atributů, které by měly být jedinečné. (Primární klíč z živé databáze nám zde příliš nepomůže.) Podobný kód můžeme použít k aktualizaci dalších slovníků. Není to nejhezčí řešení, ale stále je to docela elegantní. A udělá přesně to, co potřebujeme.

Aktualizace fact_customer_subscribed Tabulka

Cíl:Pokud máme stará data pro včerejší datum, nejprve je smažte. Přidejte včerejší data do DWH – bez ohledu na to, zda jsme v předchozím kroku něco smazali nebo ne.

Po aktualizaci všech tabulek dimenzí bychom měli aktualizovat tabulky faktů. V našem skriptu aktualizujeme pouze jednu tabulku faktů. Odůvodnění je stejné jako v předchozí části:aktualizace ostatních tabulek by probíhala podle stejného vzoru, takže bychom většinou opakovali kód.

Před vložením hodnot do tabulky faktů potřebujeme znát hodnoty souvisejících klíčů z tabulek dimenzí. Za tímto účelem znovu načteme dimenze do seznamů a porovnáme je s hodnotami z živé databáze.

První věc, kterou uděláme, je načíst zákazníka a fact_customer_subscribed tabulky do objektů:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Nyní budeme muset najít klíče pro související časovou dimenzi. Protože vždy vkládáme data za včerejšek, vyhledáme toto datum v dim_time tabulky a použijte její ID. Dotaz vrátí 1 řádek a ID je na první pozici (index začíná od 0, takže result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Po tu dobu odstraníme všechny související záznamy z tabulky faktů:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Dobře, nyní máme ID časové dimenze uložené v dim_time_id variabilní. Bylo to snadné, protože můžeme mít pouze jednu hodnotu časové dimenze. Příběh bude jiný pro rozměr města. Nejprve načteme vše hodnoty, které potřebujeme – hodnoty, které jednoznačně popisují město (nikoli ID), a agregované hodnoty:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Na výše uvedeném dotazu bych chtěl zdůraznit několik věcí:

  • func.sum(...) je SUM(...) ze „standardního SQL“.
  • case(...) syntaxe používá and_ před podmínkami, ne mezi nimi.
  • .label(...) funguje jako SQL AS alias.
  • Používáme \ přesunout na další řádek a zvýšit čitelnost dotazu. (Věřte mi, bez lomítka je to skoro nečitelné – zkusil jsem to :) )
  • .group_by(...) hraje roli GROUP BY SQL.

Dále projdeme každý záznam vrácený pomocí předchozího dotazu. U každého záznamu porovnáme hodnoty, které jednoznačně definují město (city_name , postal_code , country_name ) s hodnotami uloženými v seznamu vytvořeném z DWH dim_city stůl. Pokud se všechny tři hodnoty shodují, uložíme ID ze seznamu a použijeme ho při vkládání nových dat. Tímto způsobem budeme mít pro každý záznam ID pro obě dimenze:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

A je to. Aktualizovali jsme náš DWH. Skript by byl mnohem delší, kdybychom aktualizovali všechny tabulky dimenzí a faktů. Složitost by také byla větší, pokud by se tabulka faktů vztahovala k více tabulkám dimenzí. V takovém případě bychom potřebovali pro smyčka pro každou tabulku rozměrů.

To nefunguje!

Byl jsem velmi zklamaný, když jsem napsal tento skript a pak jsem zjistil, že něco takového nebude fungovat:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

V tomto příkladu se pokouším použít tabulky ze dvou různých databází. Pokud vytvoříme dvě samostatná připojení, první připojení „neuvidí“ tabulky z jiného připojení. Pokud se připojíme přímo k serveru a ne k databázi, nebudeme moci načíst tabulky.

Dokud se to nezmění (doufejme, že brzy), budete muset pro komunikaci mezi těmito dvěma databázemi používat nějakou strukturu (např. to, co jsme udělali dnes). To komplikuje kód, protože musíte nahradit jeden dotaz dvěma seznamy a vnořenými pro smyčky.

Podělte se o své myšlenky o SQLAlchemy a Pythonu

Toto byl poslední článek z této série. Ale kdo ví? Možná v nadcházejících článcích vyzkoušíme jiný přístup, takže zůstaňte naladěni. Mezitím se prosím podělte o své názory na SQLAlchemy a Python v kombinaci s databázemi. Co si myslíte, že nám v tomto článku chybí? co byste dodal? Řekněte nám to v komentářích níže.

Zde si můžete stáhnout kompletní skript, který jsme použili v tomto článku.

A zvláštní poděkování patří Dirku J Bosmanovi (@dirkjobosman), který doporučil tuto sérii článků.


  1. Snížení datového rizika pomocí maskování dat

  2. SQL Right Join

  3. Chyba ORA-00932 při použití výběru s polem Union a CLOB

  4. Jak zahodit cizí klíč v SQLite