sql >> Databáze >  >> NoSQL >> HBase

Jak na to:Skenujte tabulky Salted Apache HBase pomocí rozsahů klíčů specifických pro region v MapReduce

Děkujeme Pengyu Wangovi, vývojáři softwaru ve společnosti FINRA, za povolení znovu publikovat tento příspěvek.

Tabulky Salted Apache HBase s předrozdělením jsou osvědčeným efektivním řešením HBase, které poskytuje jednotné rozložení pracovní zátěže mezi servery RegionServers a zabraňuje vzniku horkých míst při hromadných zápisech. V tomto provedení je klíč řádku vyroben s logickým klíčem plus sůl na začátku. Jedním ze způsobů generování soli je výpočet n (počet oblastí) modulo na hash kódu klíče logického řádku (datum atd.).

Klíče řádků pro solení

Například tabulka přijímající zatížení dat na denní bázi může používat logické klíče řádků začínající datem a my chceme tuto tabulku předem rozdělit na 1 000 oblastí. V tomto případě očekáváme, že vygenerujeme 1000 různých solí. Sůl lze vygenerovat například jako:

StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logickýKey =2015-04-26|abcrowKey =893|2015-04-26|abc

Výstup z hashCode() s modulo poskytuje náhodnost pro hodnotu soli od „000“ do „999“. S touto transformací klíče je tabulka při vytváření předem rozdělena na hranice soli. Díky tomu budou objemy řádků rovnoměrně rozloženy při načítání souborů HFiles pomocí hromadného zatížení MapReduce. Zaručuje, že klíče řádků se stejnou solí spadají do stejné oblasti.

V mnoha případech použití, jako je archivace dat, potřebujete skenovat nebo kopírovat data přes určitý rozsah logických klíčů (rozsah dat) pomocí úlohy MapReduce. Standardní tabulkové úlohy MapReduce se nastavují poskytnutím Scan instance s atributy klíčového rozsahu.

Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);…

Nastavení takové úlohy se však pro solené předem dělené stoly stává náročným. Tlačítka řádku Start a Stop se budou pro každý region lišit, protože každý má jedinečnou sůl. A nemůžeme určit více rozsahů pro jeden Scan instance.

Abychom tento problém vyřešili, musíme se podívat na to, jak funguje tabulka MapReduce. Rámec MapReduce obecně vytváří jednu mapovou úlohu pro čtení a zpracování každého vstupního rozdělení. Každé rozdělení je generováno v InputFormat základní třídy, metodou getSplits() .

V úloze MapReduce tabulky HBase TableInputFormat se používá jako InputFormat . Uvnitř implementace je getSplits() metoda je přepsána pro načtení klíčů řádku začátku a konce z Scan instance. Vzhledem k tomu, že klíče počátečního a koncového řádku zahrnují více oblastí, je rozsah rozdělen hranicemi oblastí a vrací seznam TableSplit objekty, které pokrývají rozsah skenovacích klíčů. Místo toho, aby bylo založeno na HDFS bloku, TableSplit s jsou založeny na regionu. Přepsáním getSplits() jsme schopni ovládat TableSplit .

Vytváření vlastního formátu TableInputFormat

Chcete-li změnit chování getSplits() metoda, vlastní třída rozšiřující TableInputFormat je požadováno. Účel getSplits() zde je pokrýt rozsah logických klíčů v každém regionu, vytvořit jejich rozsah klíčů řady s jejich jedinečnou solí. Třída HTable poskytuje metodu getStartEndKeys() který vrací klíče počátečního a koncového řádku pro každou oblast. Z každého startovacího klíče analyzujte odpovídající sůl pro oblast.

Párovat klíče =table.getStartEndKeys();for (int i =0; i  

Konfigurace úlohy prochází rozsahem logických klíčů

TableInputFormat načte klíč start a stop z Scan instance. Protože nemůžeme použít Scan v naší úloze MapReduce bychom mohli použít Configuration místo toho stačí předat tyto dvě proměnné a pouze logický klíč start a stop (proměnnou může být datum nebo jiné obchodní informace). getSplits() metoda má JobContext argument, Instanci konfigurace lze číst jako context.getConfiguration() .

V ovladači MapReduce:

Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");

V Custom TableInputFormat :

@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");…}

Zrekonstruujte rozsah Salted Key podle regionu

Nyní, když máme sůl a logickou klávesu start/stop pro každou oblast, můžeme znovu sestavit skutečný rozsah kláves řádku.

byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);

Vytvoření rozdělení tabulky pro každou oblast

Pomocí rozsahu klíčů řádku nyní můžeme inicializovat TableSplit příklad pro region.

Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; i  

Ještě jedna věc, na kterou je třeba se podívat, je datová lokalita. Rámec používá informace o umístění v každém vstupním rozdělení k přiřazení mapové úlohy v místním hostiteli. Pro náš TableInputFormat , používáme metodu getTableRegionLocation() k načtení umístění regionu obsluhujícího klíč řádku.

Toto umístění je poté předáno do TableSplit konstruktér. To zajistí, že mapovač zpracovávající rozdělení tabulky je na stejném serveru regionu. Jedna metoda, nazvaná DNS.reverseDns() , vyžaduje adresu jmenného serveru HBase. Tento atribut je uložen v konfiguraci „hbase.nameserver.address “.

this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);…public String getTableRegionLocation(HTable table, byte[] rowKey) vyvolá IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}return regionLocation; }chráněný řetězec reverseDNS(InetAddress IPAddress) vyvolá NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}

Kompletní kód getSplits bude vypadat takto:

@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Nebyla poskytnuta žádná tabulka.");}// Získání adresy jmenného serveru a výchozí hodnota je null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Párovat klíče =table.getStartEndKeys();if (keys ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Očekává se alespoň jedna oblast");}Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; i  

Použijte Custom TableInoutFormat v ovladači MapReduce

Nyní musíme nahradit TableInputFormat třídy s vlastním sestavením, které jsme použili pro nastavení úlohy tabulky MapReduce.

Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(název_tabulky,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, pree, MultiFormateTableIn); 

Vlastní přístup TableInputFormat poskytuje efektivní a škálovatelné skenování pro tabulky HBase, které jsou navrženy tak, aby využívaly sůl pro vyvážené zatížení dat. Vzhledem k tomu, že skenování může obejít jakékoli nesouvisející klíče řádků, bez ohledu na to, jak velká je tabulka, je složitost skenování omezena pouze na velikost cílových dat. Ve většině případů použití to může zaručit relativně konzistentní dobu zpracování s růstem tabulky.


  1. MongoDB log 10 $

  2. Ovladač Mongodb C# vrací pouze odpovídající dílčí dokumenty v poli

  3. Asynchronní provádění příkazů redis

  4. MongoDB forEach()