V předchozích dvou dílech jsme představili model živé databáze pro podnik založený na předplatném a datový sklad (DWH), který bychom mohli použít pro vytváření sestav. I když je zřejmé, že by měly spolupracovat, mezi těmito dvěma modely nebylo žádné spojení. Dnes uděláme další krok a napíšeme kód pro přenos dat z živé databáze do našeho DWH.
Datové modely
Než se ponoříme do kódu, připomeňme si dva modely, se kterými budeme pracovat. Prvním je transakční datový model, který budeme používat k ukládání našich dat v reálném čase. Vzhledem k tomu, že provozujeme firmu založenou na předplatném, budeme muset ukládat podrobnosti o zákaznících a předplatném, objednávky zákazníků a stavy objednávek.
Je toho opravdu hodně, co bychom k tomuto modelu mohli přidat, jako je sledování plateb a ukládání historických dat (zejména změny v datech zákazníků a předplatitelů). Abych zdůraznil proces ETL (extrakce, transformace a načtení), chci tento model zachovat co nejjednodušší.
Použití transakčního datového modelu jako databáze sestav může v některých případech fungovat, ale nebude fungovat ve všech případech. Už jsme to zmínili, ale stojí za to si to zopakovat. Pokud chceme oddělit naše reportovací úkoly od našich real-time procesů, měli bychom vytvořit nějakou reportovací databázi. Jedním z řešení je datový sklad.
Náš DWH je soustředěn kolem čtyř tabulek faktů. První dva sledují počet zákazníků a předplatných na denní úrovni. Zbývající dva sledují počet dodávek a produkty zahrnuté v těchto dodávkách.
Můj předpoklad je, že náš proces ETL spustíme jednou denně. Nejprve naplníme tabulky dimenzí novými hodnotami (tam, kde je to potřeba). Poté vyplníme tabulky faktů.
Abychom se vyhnuli zbytečnému opakování, předvedu pouze kód, který naplní první dvě tabulky dimenzí a první dvě tabulky faktů. Zbývající tabulky lze naplnit pomocí velmi podobného kódu. Doporučuji vám, abyste si kód zapsali sami. Není lepší způsob, jak se naučit něco nového, než to zkusit.
Nápad:Tabulky dimenzí
Obecnou myšlenkou je vytvořit uložené procedury, které bychom mohli pravidelně používat k naplňování DWH – tabulek dimenzí i tabulek faktů. Tyto postupy přenesou data mezi dvěma databázemi na stejném serveru. To znamená, že některé dotazy uvnitř těchto procedur budou používat tabulky z obou databází. To se očekává; musíme porovnat stav DWH s aktuální DB a provést změny v DWH podle toho, co se děje v živé DB.
V našem DWH máme čtyři tabulky dimenzí:dim_time
, dim_city
, dim_product
a dim_delivery_status
.
Časová dimenze se vyplní přidáním předchozího data. Hlavním předpokladem je, že tento postup budeme spouštět denně, po uzavření obchodu.
Rozměry města a produktu budou záviset na aktuálních hodnotách uložených v city
a product
slovníky v živé databázi. Pokud do těchto slovníků něco přidáme, při příští aktualizaci DWH budou přidány nové hodnoty do tabulek rozměrů.
Poslední tabulkou dimenzí je dim_delivery_status
stůl. Nebude aktualizován, protože obsahuje pouze tři výchozí hodnoty. Dodávka je buď na cestě, je zrušena nebo doručena.
Nápad:Tabulky faktů
Naplňování tabulek faktů je vlastně ta pravá práce. Zatímco slovníky v živé databázi atribut časového razítka neobsahují, tabulky s daty vloženými v důsledku našich operací ano. Všimnete si dvou atributů časového razítka, time_inserted
a time_updated
, v datovém modelu.
Opět předpokládám, že import DWH úspěšně spustíme jednou denně. To nám umožňuje agregovat data na denní úrovni. Budeme počítat počty aktivních a zrušených zákazníků a předplatných, stejně jako dodávky a dodané produkty k danému datu.
Náš živý model funguje dobře, pokud po COB (uzavření obchodu) spustíme proceduru vložení. Přesto, pokud chceme větší flexibilitu, měli bychom v modelu provést nějaké změny. Jednou z takových změn by mohla být samostatná tabulka historie pro sledování přesného okamžiku, kdy se změnila jakákoli data týkající se zákazníků nebo předplatných. S naší současnou organizací budeme vědět, že ke změně došlo, ale nebudeme vědět, zda došlo k nějakým změnám před touto změnou (např. zákazník včera zrušil, po půlnoci znovu aktivoval svůj účet a dnes znovu zrušil) .
Vyplňování tabulek dimenzí
Jak již bylo zmíněno, budu vycházet z předpokladu, že import DWH spustíme přesně jednou denně. Pokud tomu tak není, budeme potřebovat další kód k odstranění nově vložených dat z tabulek dimenzí a faktů. U tabulek dimenzí by to bylo omezeno na smazání daného data.
Nejprve zkontrolujeme, zda dané datum existuje v dim_time
stůl. Pokud ne, přidáme do tabulky nový řádek; pokud ano, nemusíme nic dělat. Ve většině případů jsou všechna data vložena během počátečního produkčního nasazení. Ale půjdu s tímto příkladem pro vzdělávací účely.
Pro dim_city
a dim_product
rozměry, přidám pouze nové hodnoty, které zjistím v city
a product
tabulky. Nebudu provádět žádné mazání, protože na jakékoli dříve vložené hodnoty lze odkazovat v nějaké tabulce faktů. Mohli bychom jít s měkkým vymazáním, např. mít „aktivní“ příznak, který můžeme zapínat a vypínat.
U poslední tabulky dim_delivery_status
, neudělám nic, protože bude vždy obsahovat stejné tři hodnoty.
Níže uvedený kód vytvoří proceduru, která naplní tabulky dimenzí dim_time
a dim_city
.
Pro časovou dimenzi přidám včerejší datum. Vycházím z předpokladu, že proces ETL začíná hned po půlnoci. Zkontroluji, zda tato dimenze již existuje, a pokud ne, přidám nové datum do tabulky.
Pro dimenzi města použiji LEFT JOIN ke spojení dat z živé databáze a databáze DWH, abych určil, které řádky chybí. Poté do tabulky dimenzí přidám pouze jakákoli chybějící data. Stojí za zmínku, že existuje několik způsobů, jak zkontrolovat, zda byla data změněna. Tento proces se nazývá sběr dat změn nebo CDC. Běžnou metodou je kontrola aktualizovaných časových razítek nebo verzí. Existuje několik dalších způsobů, ale ty jsou mimo rozsah tohoto článku.
Pojďme se nyní podívat na kód, který je napsán pomocí syntaxe MySQL .
DROP PROCEDURE IF EXISTS p_update_dimensions// CREATE PROCEDURE p_update_dimensions () BEGIN SET @time_exists = 0; SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates dimension tables with new values -- dim_time SET @time_exists = (SELECT COUNT(*) FROM subscription_dwh.dim_time dim_time WHERE dim_time.time_date = @time_date); IF (@time_exists = 0) THEN INSERT INTO subscription_dwh.`dim_time`(`time_date`, `time_year`, `time_month`, `time_week`, `time_weekday`, `ts`) SELECT @time_date AS time_date, YEAR(@time_date) AS time_year, MONTH(@time_date) AS time_month, WEEK(@time_date) AS time_week, WEEKDAY(@time_date) AS time_weekday, NOW() AS ts; END IF; -- dim_city INSERT INTO subscription_dwh.`dim_city`(`city_name`, `postal_code`, `country_name`, `ts`) SELECT city_live.city_name, city_live.postal_code, country_live.country_name, Now() FROM subscription_live.city city_live INNER JOIN subscription_live.country country_live ON city_live.country_id = country_live.id LEFT JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name WHERE city_dwh.id IS NULL; END// -- CALL p_update_dimensions ()
Spuštění této procedury -- kterou provádíme pomocí komentované procedury CALL -- vloží do tabulek dimenzí nové datum a všechna chybějící města. Zkuste přidat svůj vlastní kód, abyste naplnili zbývající dvě tabulky dimenzí novými hodnotami.
Proces ETL v datovém skladu
Hlavní myšlenkou datového skladu je obsahovat agregovaná data v požadovaném formátu. Tento formát bychom samozřejmě měli znát ještě předtím, než vůbec začneme skladovat. Pokud uděláme vše podle plánu, můžeme získat všechny výhody, které nám DWH nabízí. Hlavní výhodou je vyšší výkon při spouštění dotazů. Naše dotazy pracují s menším počtem záznamů (protože jsou agregované) a běží v databázi přehledů (spíše než v živé).
Ale než se budeme moci dotazovat, musíme uložit fakta do naší databáze. Způsob, jakým to uděláme, závisí na tom, co s našimi daty budeme muset udělat později. Pokud před zahájením výstavby našeho DWH nebudeme mít dobrý celkový obraz, můžeme se brzy ocitnout v problémech! brzy.
Název tohoto procesu je ETL:E =Extract, T =Transform, L =Load. Získá data, transformuje je tak, aby vyhovovala struktuře DWH, a načte je do DWH. Abychom byli přesní, skutečný proces, který použijeme, je ELT:Extrahovat, Načíst, Transformovat. Protože používáme uložené procedury, extrahujeme data, načteme je a poté je transformujeme tak, aby vyhovovaly našim potřebám. Je dobré vědět, že i když se ETL a ELT mírně liší, termíny se někdy používají zaměnitelně.
Vyplnění tabulek faktů
Naplňování tabulek faktů je důvod, proč jsme skutečně tady. Dnes vyplním dvě tabulky faktů, fact_customer_subscribed
tabulka a fact_subscription_status
stůl. Zbývající dvě tabulky faktů si můžete vyzkoušet jako domácí úkol.
Než přejdeme k vyplnění tabulky faktů, musíme předpokládat, že tabulky dimenzí jsou naplněny novými hodnotami. Vyplnění tabulek faktů probíhá podle stejného vzoru. Protože mají stejnou strukturu, vysvětlím je oba společně.
Data seskupujeme podle dvou dimenzí:času a města. Časová dimenze bude nastavena na včera a ID souvisejícího záznamu najdeme v dim_time
tabulky porovnáním dat (poslední INNER JOIN v obou dotazech).
ID dim_city
se extrahuje spojením všech atributů, které tvoří UNIKÁTNÍ kombinaci v tabulce dimenzí (název města, PSČ a název země).
V tomto dotazu otestujeme hodnoty pomocí CASE a poté je SUMÍME. Pro aktivní a neaktivní zákazníky jsem datum netestoval. Pro tato pole jsem však vybral tak, jak jsou. U nových a zrušených účtů jsem testoval aktualizovaný čas.
DROP PROCEDURE IF EXISTS p_update_facts// CREATE PROCEDURE p_update_facts () BEGIN SET @time_date = DATE_ADD(DATE(NOW()), INTERVAL -1 DAY); -- procedure populates fact tables with new values -- fact_customer_subscribed INSERT INTO `fact_customer_subscribed`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN customer_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN customer_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN customer_live.active = 1 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN customer_live.active = 0 AND DATE(customer_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; -- fact_subscription_status INSERT INTO `fact_subscription_status`(`dim_city_id`, `dim_time_id`, `total_active`, `total_inactive`, `daily_new`, `daily_canceled`, `ts`) SELECT city_dwh.id AS dim_ctiy_id, time_dwh.id AS dim_time_id, SUM(CASE WHEN subscription_live.active = 1 THEN 1 ELSE 0 END) AS total_active, SUM(CASE WHEN subscription_live.active = 0 THEN 1 ELSE 0 END) AS total_inactive, SUM(CASE WHEN subscription_live.active = 1 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_new, SUM(CASE WHEN subscription_live.active = 0 AND DATE(subscription_live.time_updated) = @time_date THEN 1 ELSE 0 END) AS daily_canceled, MIN(NOW()) AS ts FROM subscription_live.`customer` customer_live INNER JOIN subscription_live.`subscription` subscription_live ON subscription_live.customer_id = customer_live.id INNER JOIN subscription_live.`city` city_live ON customer_live.city_id = city_live.id INNER JOIN subscription_live.`country` country_live ON city_live.country_id = country_live.id INNER JOIN subscription_dwh.dim_city city_dwh ON city_live.city_name = city_dwh.city_name AND city_live.postal_code = city_dwh.postal_code AND country_live.country_name = city_dwh.country_name INNER JOIN subscription_dwh.dim_time time_dwh ON time_dwh.time_date = @time_date GROUP BY city_dwh.id, time_dwh.id; END// -- CALL p_update_facts ()
Ještě jednou jsem okomentoval poslední řádek. Odeberte komentář a tento řádek můžete použít k volání procedury a vložení nových hodnot. Vezměte prosím na vědomí, že jsem neodstranil žádné existující staré hodnoty, takže tento postup nebude fungovat, pokud již hodnoty pro dané datum a město máme. To lze vyřešit provedením mazání před vložením.
Pamatujte, že musíme naplnit zbývající tabulky faktů v našem DWH. Doporučuji vám to zkusit sami!
Další věc, kterou bych určitě doporučil, je umístění celého procesu do transakce. To by zajistilo, že buď budou všechna vložení úspěšná, nebo nebudou provedeny žádné. To je velmi důležité, když se chceme vyhnout částečnému vkládání dat, např. pokud máme více procedur pro vkládání dimenzí a faktů a některé z nich odvedou svou práci, zatímco jiné selžou.
Co si myslíte?
Dnes jsme viděli, jak bychom mohli provést proces ELT/ETL a načíst data z živé databáze do datového skladu. Zatímco proces, který jsme předvedli, je značně zjednodušený, obsahuje všechny prvky potřebné k E(extrakci) dat, T(transformaci) do vhodného formátu a nakonec k L(naložení) do DWH. Co myslíš? Sdělte nám prosím své zkušenosti v komentářích níže.