sql >> Databáze >  >> RDS >> Sqlserver

Implementace přírůstkového zatížení pomocí Change Data Capture v SQL Server

Tento článek bude zajímavý pro ty, kteří se často musí potýkat s integrací dat.

Úvod

Předpokládejme, že existuje databáze, kde uživatelé vždy upravují data (aktualizují nebo odebírají). Možná tuto databázi používá velká aplikace, která neumožňuje upravovat strukturu tabulky. Úkolem je čas od času načíst data z této databáze do jiné databáze na jiném serveru. Nejjednodušší způsob, jak problém vyřešit, je načíst nová data ze zdrojové databáze do cílové databáze s předběžným vyčištěním cílové databáze. Tuto metodu můžete používat, pokud je čas na načtení dat přijatelný a nepřekračuje přednastavené termíny. Co když načtení dat trvá několik dní? Nestabilní komunikační kanály navíc vedou k situaci, kdy se zatížení dat zastaví a znovu se spustí. Pokud čelíte těmto překážkám, doporučuji zvážit jeden z algoritmů „obnovování dat“. To znamená, že došlo pouze k úpravám dat od posledního načtení.

CDC

V SQL Server 2008 společnost Microsoft představila mechanismus sledování dat nazvaný Change Data Capture (CDC). Obecně řečeno, cílem tohoto mechanismu je, že povolení CDC pro jakoukoli databázovou tabulku vytvoří systémovou tabulku ve stejné databázi s podobným názvem, jaký má původní tabulka (schéma bude následující:'cdc' jako prefix plus starý název schématu plus „_“ a konec „_CT.“ Například původní tabulka je dbo.Example, pak se systémová tabulka bude jmenovat cdc.dbo_Example_CT). Uloží všechna data, která byla změněna.

Chcete-li se ve skutečnosti ponořit hlouběji do CDC, zvažte příklad. Nejprve se však ujistěte, že SQL Agent, který používá CDC, funguje na testovací instanci SQL Server.

Kromě toho budeme uvažovat o skriptu, který vytvoří databázi a testovací tabulku, naplní tuto tabulku daty a povolí CDC pro tuto tabulku.

Abychom porozuměli a zjednodušili úkol, použijeme jednu instanci SQL Serveru bez distribuce zdrojové a cílové databáze na různé servery.

use mastergo-- vytvořte zdrojovou databázi, pokud neexistuje (vyberte * ze sys.databases, kde name ='db_src_cdc') vytvořte databázi db_src_cdcgouse db_src_cdcgo-- povolte CDC, pokud je zakázáno, pokud neexistuje (vyberte * ze sys.databases, kde název =db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- vytvořit roli pro tabulky s CDCif neexistuje (vyberte * z sys.sysusers kde name ='CDC_Reader' a issqlrole=1) vytvořit roli CDC_Readergo-- vytvořit tabulkuif object_id('dbo.Example','U') je null vytvořit tabulku dbo.Příklad ( ID int omezení identity PK_Example primární klíč, Název varchar(200) není null )go-- naplnit tabulku vložit dbo.Example (Title) values( 'Jedna'),('Dva'),('Tři'),('Čtyři'),('Pět');go-- povolit CDC pro tabulku, pokud neexistuje (vyberte * ze sys.tables, kde is_tracked_by_cdc =1 a name ='Příklad') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Příklad', @role_name ='CDC_Reader'go-- naplňte tabulku nějakými daty. Něco změníme nebo smažemeupdate dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Příklad (ID, Titul) values(1,'Jedna'),(6,'Six');set identity_insert dbo.Example off;go

Nyní se podívejme, co máme po provedení tohoto skriptu v tabulkách dbo.Example a cdc.dbo_Example_CT (je třeba poznamenat, že CDC je asynchronní. Do tabulek, kde je sledování změn uloženo, se po určité době naplní data ).

vyberte * z dbo.Example;
ID Název---- ---------------------- 1 jedna 3 eerhT 4 ruoF 5 pět 6 šest
vyberte row_number() over ( rozdělení podle pořadí ID podle __$start_lsn desc, __$seqval desc ) jako __$rn, *from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operace __$update_mask ID Název------ --------------------- ----------- --------------------- ----------- ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Čtyři 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A00000003A00000058000> 000003A00000058000> 0NU 0x00A0000058000 

Zvažte podrobně strukturu tabulky, ve které je uloženo sledování změn. Pole __ $start_lsn a __ $seqval jsou LSN (pořadové číslo protokolu v databázi) a číslo transakce v rámci transakce. V těchto polích je důležitá vlastnost, a to, že si můžeme být jisti, že záznam s vyšším LSN bude proveden později. Díky této vlastnosti můžeme snadno získat nejnovější stav každého záznamu v dotazu, filtrováním našeho výběru podle podmínky – kde __ $ rn =1.

Pole operace __$ obsahuje kód transakce:

  • 1 – záznam je smazán
  • 2 – záznam je vložen
  • 3, 4 – záznam je aktualizován. Stará data před aktualizací jsou 3, nová data jsou 4.

Kromě polí služeb s předponou «__$» jsou pole původní tabulky zcela duplikována. Tato informace nám stačí k tomu, abychom přistoupili k inkrementálnímu načítání.

Nastavení databáze pro načítání dat

Vytvořte tabulku v naší testovací cílové databázi, do které se budou načítat data, a také dodatečnou tabulku pro ukládání dat o protokolu zatížení.

use mastergo-- vytvořte cílovou databázi, pokud neexistuje (vyberte * ze sys.databases, kde name ='db_dst_cdc') vytvořte databázi db_dst_cdcgouse db_dst_cdcgo-- vytvořte tabulkuif object_id('dbo.Example','U') is null vytvořit tabulku dbo.Příklad ( ID int omezení PK_Example primární klíč, název varchar(200) není null )go-- vytvořte tabulku pro uložení logif načtení object_id('dbo.log_cdc','U') je null vytvořit tabulku dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), constraint pk_log_cdc primární klíč (table_name,dt desc) )go

Rád bych vás upozornil na pole tabulky LOG_CDC:

  • TABLE_NAME uchovává informace o tom, která tabulka byla načtena (v budoucnu je možné načíst několik tabulek, z různých databází nebo dokonce z různých serverů; formát tabulky je ‚SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME‘
  • DT je ​​pole data a času načítání, které je pro přírůstkové zatížení volitelné. Bude však užitečný pro audit načítání.
  • LSN – po načtení tabulky potřebujeme v případě potřeby uložit informace o místě, kde začít další načítání. Po každém načtení tedy do tohoto sloupce přidáme nejnovější (maximální) __ $ start_lsn.

Algoritmus pro načítání dat

Jak je popsáno výše, pomocí dotazu můžeme pomocí okenních funkcí získat nejnovější stav tabulky. Pokud známe LSN posledního zatížení, můžeme při příštím načtení odfiltrovat ze zdroje všechna data, jejichž změny jsou vyšší než uložené LSN, pokud došlo alespoň k jednomu kompletnímu předchozímu načtení:

s incr_Example as( vyberte row_number() přes (rozdělení podle pořadí ID podle __$start_lsn desc, __$seqval desc ) jako __$rn, * z db_src_cdc.cdc.dbo_Example_CT kde __$operace <> 3 a __$ start_lsn> @lsn)vyberte * z incr_Example

Pak můžeme získat všechny záznamy pro kompletní zatížení, pokud není uloženo zatížení LSN:

s incr_Example as( vyberte row_number() přes (rozdělení podle pořadí ID podle __$start_lsn desc, __$seqval desc ) jako __$rn, * z db_src_cdc.cdc.dbo_Example_CT kde __$operace <> 3 a __$ start_lsn> @lsn), full_Example as( vyberte * z db_src_cdc.dbo.Příklad, kde @lsn je null)vyberte ID, název, __$operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 jako __$operacefrom full_Example před> 

V závislosti na hodnotě @LSN tedy tento dotaz zobrazí buď všechny poslední změny (vynechání těch průběžných) se stavem Odebráno nebo ne, nebo všechna data z původní tabulky s přidáním stavu 2 (nový záznam) – toto pole se používá pouze pro sjednocení dvou výběrů. S tímto dotazem můžeme snadno implementovat buď úplné načtení, nebo opětovné načtení pomocí příkazu MERGE (od verze SQL 2008).

Abychom se vyhnuli úzkým hrdlům, které mohou vytvářet alternativní procesy a načítali shodná data z různých tabulek (v budoucnu budeme načítat několik tabulek a případně mezi nimi mohou být relační vztahy), doporučuji použít DB snapshot na zdrojovou databázi ( další funkce SQL 2008).

Úplný text zatížení je následující:

[expand title=”Kód”]

/* Algoritmus načítání dat*/-- vytvořte snímek databáze, pokud existuje (vyberte * ze sys.databases, kde název ='db_src_cdc_ss' ) přetáhněte databázi db_src_cdc_ss;declare @query nvarchar(max);vyberte @query =N' vytvořit databázi db_src_cdc_ss dne ( jméno =N'''+jméno+ ''', název souboru =N'''+[název souboru]+'.ss'' ) jako snímek db_src_cdc'z db_src_cdc.sys.sysfiles, kde groupid =1; exec ( @query);-- čtení LSN z předchozího loaddeclare @lsn binary(10) =(vyberte max(lsn) z db_dst_cdc.dbo.log_cdc kde název_tabulky ='localhost.db_src_cdc.dbo.Example');-- vymazat tabulka před úplným načtenímif @lsn má hodnotu null zkrátit tabulku db_dst_cdc.dbo.Example;-- načíst processwith incr_Example as( select row_number() over (rozdělit podle pořadí ID podle __$start_lsn desc, __$seqval desc) jako __$rn , * z db_src_cdc_ss.cdc.dbo_Example_CT kde __$operace <> 3 a __$start_lsn> @lsn), full_Example as( vybrat * z db_src_cdc_ss.dbo.Příklad, kde @lsn je null, jako cte_ Název, __$operace z incr_Example kde __$rn =1 spojení všech vyberte ID, Titul, 2 jako __$operace z úplného_Příkladu) sloučit db_dst_cdc.dbo.Příklad jako trg pomocí cte_Example jako src na trg.ID=src.ID při shodě a __$operace =1, poté smazat při shodě a __$operace <> 1, poté aktualizovat sadu trg.Title =src.Titlepři neodpovídající cíli a __$operace <> 1 poté vložit hodnoty (ID, Název) (src.ID, src .Title);-- označte konec procesu načítání a nejnovější hodnoty LSNinsert db_dst_cdc.dbo.log_cdc (název_tabulky, lsn) ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) z db_src_cdc_ss.cdc.dbo_Example_CT),0))-- smažte snímek databáze, pokud existuje (vyberte * ze sys.databases, kde name ='db_src_cdc_ss' ) přetáhněte databázi db_src_cdc_ss

[/expand]


  1. Sloučení datových souborů se Statistica, část 2

  2. Oracle po uzavření sady výsledků neodstraní kurzory

  3. Jak získat kalendářní čtvrtletí z data v TSQL

  4. Měl by MAMP vrátit ::1 jako IP na localhost?