Děkujeme Pengyu Wangovi, vývojáři softwaru ve společnosti FINRA, za povolení znovu publikovat tento příspěvek.
Tabulky Salted Apache HBase s předrozdělením jsou osvědčeným efektivním řešením HBase, které poskytuje jednotné rozložení pracovní zátěže mezi servery RegionServers a zabraňuje vzniku horkých míst při hromadných zápisech. V tomto provedení je klíč řádku vyroben s logickým klíčem plus sůl na začátku. Jedním ze způsobů generování soli je výpočet n (počet oblastí) modulo na hash kódu klíče logického řádku (datum atd.).
Klíče řádků pro solení
Například tabulka přijímající zatížení dat na denní bázi může používat logické klíče řádků začínající datem a my chceme tuto tabulku předem rozdělit na 1 000 oblastí. V tomto případě očekáváme, že vygenerujeme 1000 různých solí. Sůl lze vygenerovat například jako:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logickýKey =2015-04-26|abcrowKey =893|2015-04-26|abc
Výstup z hashCode() s modulo poskytuje náhodnost pro hodnotu soli od „000“ do „999“. S touto transformací klíče je tabulka při vytváření předem rozdělena na hranice soli. Díky tomu budou objemy řádků rovnoměrně rozloženy při načítání souborů HFiles pomocí hromadného zatížení MapReduce. Zaručuje, že klíče řádků se stejnou solí spadají do stejné oblasti.
V mnoha případech použití, jako je archivace dat, potřebujete skenovat nebo kopírovat data přes určitý rozsah logických klíčů (rozsah dat) pomocí úlohy MapReduce. Standardní tabulkové úlohy MapReduce se nastavují poskytnutím Scan instance s atributy klíčového rozsahu.
Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);…
Nastavení takové úlohy se však pro solené předem dělené stoly stává náročným. Tlačítka řádku Start a Stop se budou pro každý region lišit, protože každý má jedinečnou sůl. A nemůžeme určit více rozsahů pro jeden Scan instance.
Abychom tento problém vyřešili, musíme se podívat na to, jak funguje tabulka MapReduce. Rámec MapReduce obecně vytváří jednu mapovou úlohu pro čtení a zpracování každého vstupního rozdělení. Každé rozdělení je generováno v InputFormat základní třídy, metodou getSplits() .
V úloze MapReduce tabulky HBase TableInputFormat se používá jako InputFormat . Uvnitř implementace je getSplits() metoda je přepsána pro načtení klíčů řádku začátku a konce z Scan instance. Vzhledem k tomu, že klíče počátečního a koncového řádku zahrnují více oblastí, je rozsah rozdělen hranicemi oblastí a vrací seznam TableSplit objekty, které pokrývají rozsah skenovacích klíčů. Místo toho, aby bylo založeno na HDFS bloku, TableSplit s jsou založeny na regionu. Přepsáním getSplits() jsme schopni ovládat TableSplit .
Vytváření vlastního formátu TableInputFormat
Chcete-li změnit chování getSplits() metoda, vlastní třída rozšiřující TableInputFormat je požadováno. Účel getSplits() zde je pokrýt rozsah logických klíčů v každém regionu, vytvořit jejich rozsah klíčů řady s jejich jedinečnou solí. Třída HTable poskytuje metodu getStartEndKeys() který vrací klíče počátečního a koncového řádku pro každou oblast. Z každého startovacího klíče analyzujte odpovídající sůl pro oblast.
Párovat klíče =table.getStartEndKeys();for (int i =0; iKonfigurace úlohy prochází rozsahem logických klíčů
TableInputFormatnačte klíč start a stop zScaninstance. Protože nemůžeme použítScanv naší úloze MapReduce bychom mohli použítConfigurationmísto toho stačí předat tyto dvě proměnné a pouze logický klíč start a stop (proměnnou může být datum nebo jiné obchodní informace).getSplits()metoda máJobContextargument, Instanci konfigurace lze číst jakocontext.getConfiguration().V ovladači MapReduce:
Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");V
Custom TableInputFormat:@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");…}Zrekonstruujte rozsah Salted Key podle regionu
Nyní, když máme sůl a logickou klávesu start/stop pro každou oblast, můžeme znovu sestavit skutečný rozsah kláves řádku.
byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);Vytvoření rozdělení tabulky pro každou oblast
Pomocí rozsahu klíčů řádku nyní můžeme inicializovat
TableSplitpříklad pro region.Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; iJeště jedna věc, na kterou je třeba se podívat, je datová lokalita. Rámec používá informace o umístění v každém vstupním rozdělení k přiřazení mapové úlohy v místním hostiteli. Pro náš
TableInputFormat, používáme metodugetTableRegionLocation()k načtení umístění regionu obsluhujícího klíč řádku.Toto umístění je poté předáno do
TableSplitkonstruktér. To zajistí, že mapovač zpracovávající rozdělení tabulky je na stejném serveru regionu. Jedna metoda, nazvanáDNS.reverseDns(), vyžaduje adresu jmenného serveru HBase. Tento atribut je uložen v konfiguraci „hbase.nameserver.address“.this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);…public String getTableRegionLocation(HTable table, byte[] rowKey) vyvolá IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}return regionLocation; }chráněný řetězec reverseDNS(InetAddress IPAddress) vyvolá NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}Kompletní kód
getSplitsbude vypadat takto:@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Nebyla poskytnuta žádná tabulka.");}// Získání adresy jmenného serveru a výchozí hodnota je null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Párovat klíče =table.getStartEndKeys();if (keys ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Očekává se alespoň jedna oblast");}Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; iPoužijte Custom TableInoutFormat v ovladači MapReduce
Nyní musíme nahradit
TableInputFormattřídy s vlastním sestavením, které jsme použili pro nastavení úlohy tabulky MapReduce.Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(název_tabulky,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, pree, MultiFormateTableIn);Vlastní přístup
TableInputFormatposkytuje efektivní a škálovatelné skenování pro tabulky HBase, které jsou navrženy tak, aby využívaly sůl pro vyvážené zatížení dat. Vzhledem k tomu, že skenování může obejít jakékoli nesouvisející klíče řádků, bez ohledu na to, jak velká je tabulka, je složitost skenování omezena pouze na velikost cílových dat. Ve většině případů použití to může zaručit relativně konzistentní dobu zpracování s růstem tabulky.