sql >> Databáze >  >> NoSQL >> HBase

Jak na to:Použijte HBase Hromadné načítání a proč

Apache HBase je o poskytování náhodného přístupu pro čtení/zápis k vašim velkým datům v reálném čase, ale jak tato data do HBase efektivně dostat? Intuitivně se o to nový uživatel pokusí prostřednictvím klientských rozhraní API nebo pomocí úlohy MapReduce s TableOutputFormat, ale tyto přístupy jsou problematické, jak se dozvíte níže. Místo toho se funkce hromadného načítání HBase mnohem snadněji používá a může vkládat stejné množství dat rychleji.

Tento blogový příspěvek představí základní koncepty funkce hromadného načítání, představí dva případy použití a navrhne dva příklady.

Přehled hromadného načítání

Pokud máte některý z těchto příznaků, hromadné načítání je pro vás pravděpodobně tou správnou volbou:

  • Potřebovali jste vyladit své MemStore tak, aby využívaly většinu paměti.
  • Potřebovali jste buď použít větší WAL, nebo je úplně obejít.
  • Vaše fronty na zhutnění a splachování se pohybují ve stovkách.
  • Váš GC je mimo kontrolu, protože vaše vložky se pohybují v MB.
  • Při importu dat vaše latence překročí vaši smlouvu SLA.

Většina z těchto příznaků se běžně označuje jako „růstové bolesti“. Použití hromadného načítání vám může pomoci se jim vyhnout.

V HBase-Speak, hromadné načítání je proces přípravy a načítání HFiles (vlastní formát souboru HBase) přímo do RegionServers, čímž se obchází cesta zápisu a tyto problémy se zcela vylučují. Tento proces je podobný ETL a vypadá takto:

1. Extrahujte data ze zdroje, obvykle textových souborů nebo jiné databáze. HBase tuto část procesu nespravuje. Jinými slovy, nemůžete HBase přikázat, aby připravila HFiles jejich přímým čtením z MySQL – spíše to musíte udělat svými vlastními prostředky. Můžete například spustit mysqldump na stole a nahrát výsledné soubory do HDFS nebo jen uchopit soubory protokolu HTTP Apache. V každém případě musí být vaše data před dalším krokem v HDFS.

2. Transformujte data do souborů HFiles. Tento krok vyžaduje úlohu MapReduce a pro většinu typů vstupů budete muset napsat Mapper sami. Úloha bude muset vygenerovat klíč řádku jako klíč a buď hodnotu KeyValue, Put nebo Delete jako hodnotu. Reduktor je ovládán HBase; nakonfigurujete jej pomocí HFileOutputFormat.configureIncrementalLoad() a provede následující:

  • Zkontroluje tabulku a nakonfiguruje rozdělení celkové objednávky
  • Nahraje soubor oddílů do clusteru a přidá jej do DistributedCache
  • Nastaví počet úkolů snížení tak, aby odpovídal aktuálnímu počtu oblastí
  • Nastaví výstupní třídu klíč/hodnota tak, aby odpovídala požadavkům HFileOutputFormat
  • Nastaví redukci tak, aby provedla příslušné řazení (buď KeyValueSortReducer nebo PutSortReducer)

V této fázi bude ve výstupní složce vytvořen jeden soubor HF pro každou oblast. Mějte na paměti, že vstupní data jsou téměř kompletně přepsána, takže budete potřebovat alespoň dvojnásobné množství dostupného místa na disku, než je velikost původní sady dat. Například pro 100GB mysqldump byste měli mít alespoň 200 GB volného místa na disku v HDFS. Na konci procesu můžete odstranit soubor výpisu.

3. Načtěte soubory do HBase tak, že sdělíte RegionServers, kde je mají najít. Toto je nejjednodušší krok. Vyžaduje použití LoadIncrementalHFiles (běžněji známého jako nástroj completebulkload) a předáním adresy URL, která vyhledá soubory v HDFS, načte každý soubor do příslušné oblasti prostřednictvím RegionServeru, který jej obsluhuje. V případě, že po vytvoření souborů došlo k rozdělení oblasti, nástroj automaticky rozdělí soubor HF podle nových hranic. Tento proces není příliš efektivní, takže pokud do vaší tabulky aktuálně zapisují jiné procesy, je nejlepší načíst soubory ihned po dokončení kroku transformace.

Zde je ukázka tohoto procesu. Datový tok jde z původního zdroje do HDFS, kde RegionServers jednoduše přesunou soubory do adresářů svých regionů.

Případy použití

Původní načtení datové sady: Všichni uživatelé migrující z jiného datového úložiště by měli zvážit tento případ použití. Nejprve musíte projít cvičením návrhu schématu tabulky a poté vytvořit samotnou tabulku, předem rozdělenou. Rozdělovací body musí brát v úvahu rozložení klíčů řádků a počet serverů RegionServer. V případě jakéhokoli vážného použití doporučuji přečíst si prezentaci mého kolegy Larse George o pokročilém návrhu schémat.

Výhodou je, že je mnohem rychlejší zapisovat soubory přímo, než procházet cestou zápisu RegionServeru (zápis do MemStore i WAL) a pak případně vyprázdnit, komprimovat a tak dále. Znamená to také, že nemusíte svůj cluster ladit pro zátěž s velkým množstvím zápisu a poté jej znovu ladit pro normální zátěž.

Přírůstkové zatížení: Řekněme, že máte nějakou datovou sadu, kterou aktuálně obsluhuje HBase, ale nyní potřebujete dávkově importovat více dat od třetí strany nebo máte noční úlohu, která generuje několik gigabajtů, které musíte vložit. Pravděpodobně není tak velká jako datová sada, kterou HBase již poskytuje, ale může ovlivnit 95. percentil vaší latence. Procházení normální cesty zápisu bude mít nepříznivý účinek v tom, že během importu spustí více vyprázdnění a komprimace než normálně. Tento dodatečný IO stres bude konkurovat vašim dotazům citlivým na latenci.

Příklady

Následující příklady můžete použít ve svém vlastním clusteru Hadoop, ale pokyny jsou poskytovány pro virtuální počítač Cloudera QuickStart, což je cluster s jedním uzlem, hostující OS a ukázková data a příklady zapracované do zařízení virtuálního počítače pro váš desktop.

Jakmile virtuální počítač spustíte, řekněte mu prostřednictvím webového rozhraní, které se automaticky otevře, aby nasadilo CDH a poté se ujistěte, že je spuštěna také služba HBase.

Vestavěný TSV Bulk Loader

HBase se dodává s úlohou MR, která může číst soubor hodnot oddělených oddělovači a odesílat přímo do tabulky HBase nebo vytvářet HFiles pro hromadné načítání. Tady budeme:

  1. Získejte ukázková data a nahrajte je do HDFS.
  2. Spusťte úlohu ImportTsv a transformujte soubor do více souborů HFiles podle předem nakonfigurované tabulky.
  3. Připravte a načtěte soubory v HBase.

Prvním krokem je otevřít konzolu a pomocí následujícího příkazu získat ukázková data:

curl -Ohttps://people.apache.org/~jdcryans/word_count.csv

Tento soubor jsem vytvořil spuštěním počítání slov na původním rukopisu tohoto blogového příspěvku a následným odesláním výsledku ve formátu csv bez jakýchkoli názvů sloupců. Nyní nahrajte soubor do HDFS:

hdfs dfs -put word_count.csv

Extrakční část hromadného načtení je nyní dokončena, musíte soubor transformovat. Nejprve musíte navrhnout stůl. Aby to nebylo jednoduché, říkejte tomu „počet slov“ – klíče řádku budou samotná slova a jediný sloupec bude obsahovat počet, v rodině, kterou nazýváme „f“. Nejlepším postupem při vytváření tabulky je rozdělit ji podle rozložení klíčů řádku, ale pro tento příklad vytvoříme pouze pět oblastí s dělicími body rovnoměrně rozmístěnými po prostoru klíčů. Otevřete prostředí hbase:

hbase shell

A spusťte následující příkaz k vytvoření tabulky:

create 'wordcount', {NAME => 'f'}, {SPLITS => ['g', 'm', 'r', 'w']}

Čtyři dělicí body vygenerují pět oblastí, kde první oblast začíná prázdným klíčem řádku. Abyste získali lepší dělící body, můžete také provést rychlou analýzu, abyste viděli, jak jsou slova skutečně distribuována, ale to nechám na vás.

Pokud nasměrujete prohlížeč svého virtuálního počítače na http://localhost:60010/, uvidíte naši nově vytvořenou tabulku a jejích pět oblastí, které jsou všechny přiřazeny k RegionServeru.

Nyní je čas udělat to těžké. Vyvoláním jara HBase na příkazovém řádku pomocí skriptu „hadoop“ se zobrazí seznam dostupných nástrojů. Ten, který chceme, se nazývá importtsv a má následující použití:

hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv CHYBA:Chybný počet argumentů:0 Použití:importtsv -Dimporttsv.columns=a,b,c 

Příkazový řádek, který budeme používat, je následující:

hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv-Dimporttsv.separator=,-Dimporttsv.bulk.output=output-Dimporttsv.columns=HBASE_ROW_KEY,f:count počet slov word_count.csv

Zde je přehled různých prvků konfigurace:

  • -Dimporttsv.separator=, určuje, že oddělovačem je čárka.
  • -Dimporttsv.bulk.output=output je relativní cesta k místu, kam budou soubory HFiles zapsány. Protože váš uživatel na virtuálním počítači je ve výchozím nastavení „cloudera“, znamená to, že soubory budou v /user/cloudera/output. Přeskočíte-li tuto možnost, úloha se zapíše přímo do HBase.
  • -Dimporttsv.columns=HBASE_ROW_KEY,f:count je seznam všech sloupců obsažených v tomto souboru. Klíč řádku je třeba identifikovat pomocí velkého řetězce HBASE_ROW_KEY; jinak nezahájí práci. (Rozhodl jsem se použít kvalifikátor „count“, ale může to být cokoliv jiného.)

Vzhledem k malé vstupní velikosti by úloha měla být dokončena do minuty. Všimněte si, že je spuštěno pět Reduktorů, jeden na region. Zde je výsledek na HDFS:

-rw-r--r--   3 cloudera cloudera         4265 2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b-rw-r--3:1era- 3 cloud 1 1 cloud Výstup/F/786198CA47AE406F9BE05C9EB09BEB36-RW-R-3 CLOUDERA Cloudera 2487 2013-09-12 13:14 13:14 13:14-1 Cloudere Cloudere Cloudera-3 Cloudera-3 Cloudera-3 Cloudera-3 Cloudera-3 Cloudera-1-3. :13 output/f/bb341f04c6d845e8bb95830e9946a914-rw-r--r--   3 cloudera cloudera         1336 2013-09-12 13:14 output/f/c3b67d2b4db3b657d899db3b657d899 

Jak vidíte, soubory aktuálně patří uživateli „cloudera“. Abychom je mohli načíst, musíme změnit vlastníka na „hbase“, jinak HBase nebude mít oprávnění soubory přesouvat. Spusťte následující příkaz:

sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output

V posledním kroku musíme použít nástroj completebulkload, abychom ukázali, kde jsou soubory a které tabulky načítáme:

počet výstupních slov hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

Když se vrátíte do prostředí HBase, můžete spustit příkaz count, který vám ukáže, kolik řádků bylo načteno. Pokud jste zapomněli chown, příkaz se zablokuje.

Vlastní úloha MR

Hromadný nakladač TSV je dobrý pro prototypování, ale protože vše interpretuje jako řetězce a nepodporuje manipulaci s poli v době transformace, budete muset napsat svou vlastní úlohu MR. Můj kolega James Kinley, který pracuje jako architekt řešení v Evropě, napsal takovou zakázku, kterou použijeme pro náš další příklad. Údaje o této práci obsahují veřejné zprávy na Facebooku a Twitteru související s finále NBA 2010 (zápas 1) mezi Lakers a Celtics. Kód najdete zde. (VM Quick Start je dodáván s nainstalovaným git a maven, takže na něj můžete naklonovat úložiště.)

Když se podíváme na třídu Driver, nejdůležitější bity jsou následující:

job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class);… // Automatická konfigurace rozdělovače a reduktoru HFileOutputFormat.configureIncrementalLoad(job, hTable);

Nejprve musí váš mapovač vytvořit výstup ImmutableBytesWritable, který obsahuje klíč řádku a výstupní hodnota může být KeyValue, Put nebo Delete. Druhý úryvek ukazuje, jak nakonfigurovat Reducer; je ve skutečnosti kompletně zpracován HFileOutputFormat. configureIncrementalLoad() podle popisu v části „Transformace“ dříve.

Třída HBaseKVMapper obsahuje pouze Mapper, který respektuje nakonfigurovaný výstupní klíč a hodnoty:

veřejná třída HBaseKVMapper rozšiřuje Mapper {

Abyste jej mohli spustit, budete muset zkompilovat projekt pomocí maven a uchopit datové soubory podle odkazů v README. (Obsahuje také shell skript pro vytvoření tabulky.) Před zahájením úlohy nezapomeňte nahrát soubory do HDFS a nastavit vaši třídu tak, aby si byla vědoma HBase, protože tentokrát nepoužijete její jar :

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*

Úlohu budete moci spustit pomocí příkazového řádku podobného tomuto:

hadoop jar hbase-examples-0.0.1-SNAPSHOT.jarcom.cloudera.examples.hbase.bulkimport.Driver -libjars/home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda- time-2.1.jar,/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jarRowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010

Jak vidíte, závislosti úlohy je třeba přidat samostatně. Nakonec můžete soubory načíst tak, že nejprve změníte jejich vlastníka a poté spustíte nástroj completebulkload:

sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010

Potenciální problémy

Nedávno smazaná data se znovu objevují. K tomuto problému dochází, když je Delete vložen prostřednictvím hromadného načtení a je výrazně zhutněn, zatímco odpovídající Put je stále v MemStore. Data budou považována za smazaná, když je Delete v HFsouboru, ale jakmile budou odstraněna během komprimace, bude Put znovu viditelná. Máte-li takový případ použití, zvažte konfiguraci rodin sloupců tak, aby byly odstraněny buňky s KEEP_DELETED_CELLS v shellu nebo HColumnDescriptor.setKeepDeletedCells().

Hromadně načtená data nelze přepsat jiným hromadným načtením. K tomuto problému dochází, když se dva hromadně načtené soubory HFiles načtené v různých časech pokusí zapsat jinou hodnotu do stejné buňky, což znamená, že mají stejný klíč řádku, rodinu, kvalifikátor a časové razítko. Výsledkem je, že místo druhé bude vrácena první vložená hodnota. Tato chyba bude opravena v HBase 0.96.0 a CDH 5 (další hlavní verze CDH) a pracuje se na HBASE-8521 pro větev 0.94 a CDH 4.

Hromadné načítání spouští velké zhutnění. K tomuto problému dochází, když provádíte přírůstkové hromadné načítání a existuje dostatek hromadně načtených souborů ke spuštění menšího zhutnění (výchozí prahová hodnota je 3). Soubory HFiles jsou načteny s pořadovým číslem nastaveným na 0, takže se nejprve vyzvednou, když RegionServer vybírá soubory pro komprimaci, a kvůli chybě vybere také všechny zbývající soubory. Tento problém vážně ovlivní ty, kteří již mají velké oblasti (více GB) nebo kteří často načítají hromadné (každých několik hodin a méně), protože bude zhuštěno velké množství dat. HBase 0.96.0 má správnou opravu, stejně jako CDH 5; HBASE-8521 opravuje problém v 0.94, protože hromadně načítaným souborům HFiles je nyní přiřazeno správné pořadové číslo. HBASE-8283 lze aktivovat pomocí hbase.hstore.useExploringCompation po verzi 0.94.9 a CDH 4.4.0, aby se tento problém zmírnil tím, že jde pouze o chytřejší algoritmus výběru komprimace.

Hromadně načítaná data se nereplikují . Jelikož hromadné načítání obchází cestu zápisu, WAL se nezapisuje jako součást procesu. Replikace funguje tak, že čte soubory WAL, takže neuvidí hromadně načtená data – a totéž platí pro úpravy, které používají Put.setWriteToWAL(true). Jedním ze způsobů, jak to zvládnout, je poslat nezpracované soubory nebo HFiles do druhého clusteru a tam provést další zpracování.

Závěr

Cílem tohoto blogového příspěvku bylo představit vám základní koncepty hromadného načítání Apache HBase. Vysvětlili jsme, jak je tento proces podobný provádění ETL a že je mnohem lepší pro velké datové sady než použití normálního API, protože obchází cestu zápisu. Tyto dva příklady byly zahrnuty, aby ukázaly, jak lze jednoduché soubory TSV hromadně načíst do HBase a jak napsat svůj vlastní mapovač pro jiné datové formáty.

Nyní můžete zkusit udělat totéž pomocí grafického uživatelského rozhraní přes Hue.


  1. Po aktualizaci se změní pořadí pole MongoDB a pozice dokumentu

  2. Jak agregovat součet v MongoDB, abyste získali celkový počet?

  3. Přehled replikace Apache HBase

  4. Geograficky distribuované clustery MongoDB na AWS v regionu EU