Děkujeme Pengyu Wangovi, vývojáři softwaru ve společnosti FINRA, za povolení znovu publikovat tento příspěvek.
Tabulky Salted Apache HBase s předrozdělením jsou osvědčeným efektivním řešením HBase, které poskytuje jednotné rozložení pracovní zátěže mezi servery RegionServers a zabraňuje vzniku horkých míst při hromadných zápisech. V tomto provedení je klíč řádku vyroben s logickým klíčem plus sůl na začátku. Jedním ze způsobů generování soli je výpočet n (počet oblastí) modulo na hash kódu klíče logického řádku (datum atd.).
Klíče řádků pro solení
Například tabulka přijímající zatížení dat na denní bázi může používat logické klíče řádků začínající datem a my chceme tuto tabulku předem rozdělit na 1 000 oblastí. V tomto případě očekáváme, že vygenerujeme 1000 různých solí. Sůl lze vygenerovat například jako:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logickýKey =2015-04-26|abcrowKey =893|2015-04-26|abc
Výstup z hashCode()
s modulo poskytuje náhodnost pro hodnotu soli od „000“ do „999“. S touto transformací klíče je tabulka při vytváření předem rozdělena na hranice soli. Díky tomu budou objemy řádků rovnoměrně rozloženy při načítání souborů HFiles pomocí hromadného zatížení MapReduce. Zaručuje, že klíče řádků se stejnou solí spadají do stejné oblasti.
V mnoha případech použití, jako je archivace dat, potřebujete skenovat nebo kopírovat data přes určitý rozsah logických klíčů (rozsah dat) pomocí úlohy MapReduce. Standardní tabulkové úlohy MapReduce se nastavují poskytnutím Scan
instance s atributy klíčového rozsahu.
Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);scan.setStartRow(Bytes.toBytes("2015- 04-26"));scan.setStopRow(Bytes.toBytes("2015-04-27"));/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(tablename,scan,DataScanMapper.class,ImmutableBytesWritable.class, KeyValue.class,job, true, TableInputFormat.class);…
Nastavení takové úlohy se však pro solené předem dělené stoly stává náročným. Tlačítka řádku Start a Stop se budou pro každý region lišit, protože každý má jedinečnou sůl. A nemůžeme určit více rozsahů pro jeden Scan
instance.
Abychom tento problém vyřešili, musíme se podívat na to, jak funguje tabulka MapReduce. Rámec MapReduce obecně vytváří jednu mapovou úlohu pro čtení a zpracování každého vstupního rozdělení. Každé rozdělení je generováno v InputFormat
základní třídy, metodou getSplits()
.
V úloze MapReduce tabulky HBase TableInputFormat
se používá jako InputFormat
. Uvnitř implementace je getSplits()
metoda je přepsána pro načtení klíčů řádku začátku a konce z Scan
instance. Vzhledem k tomu, že klíče počátečního a koncového řádku zahrnují více oblastí, je rozsah rozdělen hranicemi oblastí a vrací seznam TableSplit
objekty, které pokrývají rozsah skenovacích klíčů. Místo toho, aby bylo založeno na HDFS bloku, TableSplit
s jsou založeny na regionu. Přepsáním getSplits()
jsme schopni ovládat TableSplit
.
Vytváření vlastního formátu TableInputFormat
Chcete-li změnit chování getSplits()
metoda, vlastní třída rozšiřující TableInputFormat
je požadováno. Účel getSplits()
zde je pokrýt rozsah logických klíčů v každém regionu, vytvořit jejich rozsah klíčů řady s jejich jedinečnou solí. Třída HTable poskytuje metodu getStartEndKeys()
který vrací klíče počátečního a koncového řádku pro každou oblast. Z každého startovacího klíče analyzujte odpovídající sůl pro oblast.
Párovat klíče =table.getStartEndKeys();for (int i =0; iKonfigurace úlohy prochází rozsahem logických klíčů
TableInputFormat
načte klíč start a stop zScan
instance. Protože nemůžeme použítScan
v naší úloze MapReduce bychom mohli použítConfiguration
místo toho stačí předat tyto dvě proměnné a pouze logický klíč start a stop (proměnnou může být datum nebo jiné obchodní informace).getSplits()
metoda máJobContext
argument, Instanci konfigurace lze číst jakocontext.getConfiguration()
.V ovladači MapReduce:
Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop ", "2015-04-27");V
Custom TableInputFormat
:@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();String scanStart =conf.get("logical.scan.start");String scanStop =conf.get("logical.scan .stop");…}Zrekonstruujte rozsah Salted Key podle regionu
Nyní, když máme sůl a logickou klávesu start/stop pro každou oblast, můžeme znovu sestavit skutečný rozsah kláves řádku.
byte[] startRowKey =Bytes.toBytes(regionSalt + "|" + scanStart);byte[] endRowKey =Bytes.toBytes(regionSalt + "|" + scanStop);Vytvoření rozdělení tabulky pro každou oblast
Pomocí rozsahu klíčů řádku nyní můžeme inicializovat
TableSplit
příklad pro region.Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; iJeště jedna věc, na kterou je třeba se podívat, je datová lokalita. Rámec používá informace o umístění v každém vstupním rozdělení k přiřazení mapové úlohy v místním hostiteli. Pro náš
TableInputFormat
, používáme metodugetTableRegionLocation()
k načtení umístění regionu obsluhujícího klíč řádku.Toto umístění je poté předáno do
TableSplit
konstruktér. To zajistí, že mapovač zpracovávající rozdělení tabulky je na stejném serveru regionu. Jedna metoda, nazvanáDNS.reverseDns()
, vyžaduje adresu jmenného serveru HBase. Tento atribut je uložen v konfiguraci „hbase.nameserver.address
“.this.nameServer =context.getConfiguration().get("hbase.nameserver.address", null);…public String getTableRegionLocation(HTable table, byte[] rowKey) vyvolá IOException {HServerAddress regionServerAddress =table.getRegionLocation(rowKey ).getServerAddress();InetAddress regionAddress =regionServerAddress.getInetSocketAddress().getAddress();String regionLocation;try {regionLocation =reverseDNS(regionAddress);} catch (NamingException e) {regionLocation =regionServerAddress.getHostname();}return regionLocation; }chráněný řetězec reverseDNS(InetAddress IPAddress) vyvolá NamingException {String hostName =this.reverseDNSCacheMap.get(ipAddress);if (hostName ==null) {hostName =Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); .reverseDNSCacheMap.put(ipAddress, hostName);}return hostName;}Kompletní kód
getSplits
bude vypadat takto:@Override public List getSplits(JobContext context) vyvolá IOException {conf =context.getConfiguration();table =getHTable(conf);if (table ==null) {throw new IOException("Nebyla poskytnuta žádná tabulka.");}// Získání adresy jmenného serveru a výchozí hodnota je null.this.nameServer =conf.get("hbase.nameserver.address", null);String scanStart =conf.get("region.scan.start");String scanStop =conf.get("region.scan.stop");Párovat klíče =table.getStartEndKeys();if (keys ==null || keys.getFirst() ==null || keys.getFirst(). length ==0) {throw new RuntimeException("Očekává se alespoň jedna oblast");}Rozdělení seznamu =new ArrayList(keys.getFirst().length);for (int i =0; iPoužijte Custom TableInoutFormat v ovladači MapReduce
Nyní musíme nahradit
TableInputFormat
třídy s vlastním sestavením, které jsme použili pro nastavení úlohy tabulky MapReduce.Konfigurace conf =getConf();conf =HBaseConfiguration.addHbaseResources(conf);HTableInterface status_table =new HTable(conf, status_tablename);conf.set("logical.scan.start", "2015-04-26");conf.set("logical.scan.stop", "2015-04-27");Scan scan =new Scan();scan.setCaching(1000);scan.setCacheBlocks(false);scan.setBatch(1000);scan.setMaxVersions(1);/* Nastavení úlohy mapovače tabulek */TableMapReduceUtil.initTableMapperJob(název_tabulky,scan,DataScanMapper.class,ImmutableBytesWritable.class,KeyValue.class,job, pree, MultiFormateTableIn);Vlastní přístup
TableInputFormat
poskytuje efektivní a škálovatelné skenování pro tabulky HBase, které jsou navrženy tak, aby využívaly sůl pro vyvážené zatížení dat. Vzhledem k tomu, že skenování může obejít jakékoli nesouvisející klíče řádků, bez ohledu na to, jak velká je tabulka, je složitost skenování omezena pouze na velikost cílových dat. Ve většině případů použití to může zaručit relativně konzistentní dobu zpracování s růstem tabulky.