sql >> Databáze >  >> NoSQL >> HBase

Spark-on-HBase:Konektor HBase založený na DataFrame

Tento blogový příspěvek byl publikován na Hortonworks.com před sloučením s Cloudera. Některé odkazy, zdroje nebo reference již nemusí být přesné.

S hrdostí oznamujeme technickou ukázku konektoru Spark-HBase, vyvinutého společností Hortonworks ve spolupráci s agenturou Bloomberg.

Konektor Spark-HBase využívá rozhraní Data Source API (SPARK-3247) představené v Spark-1.2.0. Překlenuje propast mezi jednoduchým úložištěm klíčových hodnot HBase a komplexními relačními dotazy SQL a umožňuje uživatelům provádět komplexní analýzu dat nad HBase pomocí Spark. HBase DataFrame je standardní Spark DataFrame a je schopen komunikovat s jakýmikoli jinými datovými zdroji, jako je Hive, ORC, Parquet, JSON atd.

Pozadí

Existuje několik open source konektorů Spark HBase dostupných buď jako balíčky Spark, jako nezávislé projekty nebo v HBase trunku.

Spark přešel na rozhraní API Dataset/DataFrame, která poskytuje integrovanou optimalizaci plánu dotazů. Nyní koncoví uživatelé preferují použití rozhraní založeného na DataFrames/Datasets.

Konektor HBase v kmeni HBase má bohatou podporu na úrovni RDD, kupř. BulkPut atd., ale jeho podpora DataFrame není tak bohatá. HBase trunk konektor spoléhá na standardní HadoopRDD s vestavěným HBase TableInputFormat má určitá omezení výkonu. Kromě toho může být BulkGet prováděný v ovladači jediným bodem selhání.

Existuje několik dalších alternativních implementací. Vezměte Spark-SQL-on-HBase jako příklad. Aplikuje velmi pokročilé vlastní optimalizační techniky začleněním vlastního plánu optimalizace dotazů do standardního motoru Spark Catalyst, dodává RDD do HBase a provádí komplikované úkoly, jako je částečná agregace, uvnitř koprocesoru HBase. Tento přístup je schopen dosáhnout vysokého výkonu, ale je obtížné jej udržet kvůli jeho složitosti a rychlému vývoji Sparku. Také povolení spuštění libovolného kódu uvnitř koprocesoru může představovat bezpečnostní rizika.

Spark-on-HBase Connector (SHC) byl vyvinut k překonání těchto potenciálních úzkých míst a slabin. Implementuje standardní rozhraní Spark Datasource API a využívá motor Spark Catalyst pro optimalizaci dotazů. Paralelně je RDD vytvořeno od začátku namísto použití TableInputFormat za účelem dosažení vysokého výkonu. S tímto přizpůsobeným RDD lze použít a plně implementovat všechny kritické techniky, jako je ořezávání oddílů, ořezávání sloupců, predikátové posunutí dolů a umístění dat. Konstrukce velmi usnadňuje údržbu a zároveň dosahuje dobrého kompromisu mezi výkonem a jednoduchostí.

Architektura

Předpokládáme, že Spark a HBase jsou nasazeny ve stejném clusteru a spouštěcí programy Spark jsou umístěny společně s regionálními servery, jak je znázorněno na obrázku níže.

Obrázek 1. Architektura konektoru Spark-on-HBase

Na vysoké úrovni se konektor chová ke skenování a získávání podobným způsobem a obě akce se provádějí v exekutorech. Ovladač zpracuje dotaz, agreguje skenování/získání na základě metadat regionu a generuje úlohy podle regionu. Úlohy jsou odesílány preferovaným exekutorům umístěným společně s regionálním serverem a jsou prováděny paralelně v exekutorech, aby se dosáhlo lepší lokalizace dat a souběžnosti. Pokud oblast neobsahuje požadovaná data, není tomuto serveru přiřazena žádná úloha. Úloha se může skládat z více skenů a hromadných zisků a požadavky na data úlohy se načítají pouze z jednoho regionálního serveru a tento regionální server bude také preferovanou lokalitou pro úlohu. Všimněte si, že ovladač není zapojen do skutečného provádění úlohy s výjimkou úloh plánování. Tím se zabrání tomu, aby řidič byl úzkým hrdlem.

Katalog tabulek

Abychom přenesli tabulku HBase jako relační tabulku do Sparku, definujeme mapování mezi tabulkami HBase a Spark, které se nazývá Katalog tabulek. Tento katalog má dvě kritické části. Jedním je definice klíče řádku a druhým je mapování mezi sloupcem tabulky v Sparku a rodinou sloupců a kvalifikátorem sloupců v HBase. Podrobnosti naleznete v části Použití.

Nativní podpora Avro

Konektor nativně podporuje formát Avro, protože je velmi běžnou praxí uchovávat strukturovaná data do HBase jako bajtové pole. Uživatel může uložit záznam Avro přímo do HBase. Interně je schéma Avro automaticky převedeno na nativní datový typ Spark Catalyst. Všimněte si, že obě části klíč–hodnota v tabulce HBase lze definovat ve formátu Avro. Přesné použití najdete v příkladech/testovacích případech v repozitáři.

Predikátní rozšíření

Konektor pouze načítá požadované sloupce z regionálního serveru, aby se snížila režie sítě a zabránilo se redundantnímu zpracování v enginu Spark Catalyst. Stávající standardní filtry HBase se používají k provádění predikátového push-down bez využití schopnosti koprocesoru. Protože HBase nezná datový typ kromě bajtového pole a nekonzistenci pořadí mezi primitivními typy Java a bajtovým polem, musíme před nastavením filtru v operaci Skenování předzpracovat podmínku filtru, abychom předešli ztrátě dat. Uvnitř regionálního serveru jsou odfiltrovány záznamy, které neodpovídají podmínce dotazu.

Prořezávání oddílu

Vyjmutím klíče řádku z predikátů rozdělíme Scan/BulkGet do více nepřekrývajících se rozsahů, pouze servery regionu, které mají požadovaná data, provedou Scan/BulkGet. V současné době se prořezávání oddílu provádí na prvním rozměru klíčů řádků. Pokud je například klíč řádku „key1:key2:key3“, bude prořezávání oddílu založeno pouze na „key1“. Všimněte si, že podmínky WHERE je třeba definovat pečlivě. V opačném případě se prořezávání oddílu nemusí projevit. Například WHERE rowkey1> „abc“ OR sloupec =„xyz“ (kde rowkey1 je první rozměr klíče řádku a sloupec je běžný sloupec hbase) povede k úplnému prohledání, protože musíme pokrýt všechny rozsahy, protože z NEBO logika.

Lokalita dat

Když je exekutor Spark umístěn společně se servery regionu HBase, lokalizace dat je dosažena identifikací umístění serveru regionu a vynakládá maximální úsilí na společné umístění úlohy s regionálním serverem. Každý exekutor provede Scan/BulkGet na části dat umístěných společně na stejném hostiteli.

Skenovat a hromadně získat

Tyto dva operátory se uživatelům zpřístupní zadáním WHERE CLAUSE, např. sloupec WHERE> x a sloupec pro skenování a sloupec WHERE =x pro získání. Operace se provádějí v exekutorech a ovladač tyto operace pouze konstruuje. Interně jsou převedeny na skenování a/nebo získávání a Iterator[Row] se vrací do katalyzátorového motoru pro zpracování horní vrstvy.

Použití

Níže je uveden základní postup použití konektoru. Další podrobnosti a pokročilé případy použití, jako je Avro a podpora kompozitních klíčů, najdete v příkladech v úložišti.

1) Definujte katalog pro mapování schématu:

[code language="scala"]def Catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},           |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}         |}      |}""".stripMargin[/code] 

2) Připravte data a naplňte tabulku HBase:
třída case HBaseRecord(col0:String, col1:Boolean,col2:Double, col3:Float,col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

objekt HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s”String$i:$t”,      i.toByte) }}

val data =(0 až 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> katalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Načtěte DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
.read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(katalog)

4) Jazyk integrovaný dotaz:
val s =df.filter((($”col0″ <=“řádek050″ &&$”col0”> “řádek040”) ||
 $”col0″ ===“řádek005” ||
 $”col0″ ===“řádek020” ||
 $”col0″ === ”r20” ||
 $”col0″ <=“řádek005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .ukázat

5) SQL dotaz:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Konfigurace Spark-Package

Uživatelé mohou používat konektor Spark-on-HBase jako standardní balíček Spark. K zahrnutí balíčku do vaší aplikace Spark použijte:

spark-shell, pyspark nebo spark-submit

> $SPARK_HOME/bin/spark-shell –balíčky zhzhan:shc:0.0.11-1.6.1-s_2.10

Uživatelé mohou balíček zahrnout také jako závislost do vašeho souboru SBT. Formát je spark-package-name:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Spuštění v zabezpečeném clusteru

Pro spuštění v clusteru s podporou Kerberos musí uživatel zahrnout jar související s HBase do cesty třídy, protože načítání a obnovování tokenu HBase provádí Spark a je nezávislé na konektoru. Jinými slovy, uživatel potřebuje iniciovat prostředí normálním způsobem, buď prostřednictvím kinit nebo poskytnutím principu/klíčové tabulky. Následující příklady ukazují, jak spustit v zabezpečeném clusteru s režimem yarn-client i yarn-cluster. Pamatujte, že SPARK_CLASSPATH musí být nastaven pro oba režimy a ukázkový jar je pouze zástupný symbol pro Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Předpokládejme, že hrt_qa je bezhlavý účet, uživatel může použít následující příkaz pro kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –počet-exekutorů 4 –ovladač-paměť 512m –exekutor-paměť 512m –exekutorská jádra 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –balíčky zhzhan:shc:0.0.11-1.6.1-s_2.10 –počet-exekutorů 4 –ovladač-paměť 512m –exekutor-paměť 512m –exekutorská-jádra 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Spojení všeho dohromady

Právě jsme poskytli rychlý přehled o tom, jak HBase podporuje Spark na úrovni DataFrame. S DataFrame API mohou aplikace Spark pracovat s daty uloženými v tabulce HBase stejně snadno jako s jakýmikoli daty uloženými v jiných zdrojích dat. Díky této nové funkci mohou být data v tabulkách HBase snadno spotřebována aplikacemi Spark a dalšími interaktivními nástroji, např. uživatelé mohou spustit složitý SQL dotaz nad tabulkou HBase uvnitř Sparku, provést spojení tabulky s Dataframe nebo integrovat se Spark Streaming a implementovat složitější systém.

Co bude dál?

V současné době je konektor hostován v repozitáři Hortonworks a je publikován jako balíček Spark. Probíhá migrace do kmene Apache HBase. Během migrace jsme identifikovali některé kritické chyby v kmeni HBase a budou opraveny spolu se sloučením. Práce komunity je sledována zastřešujícím HBase JIRA HBASE-14789, včetně HBASE-14795 a HBASE-14796  za účelem optimalizace základní výpočetní architektury pro skenování a BulkGet,  HBASE-14801 poskytuje uživatelské rozhraní JSON pro snadné použití, HBASE-15336 cesta zápisu DataFrame, HBASE-15334 pro podporu Avro, HBASE-15333  pro podporu primitivních typů Java, jako je short, int, long, float a double atd., HBASE-15335 pro podporu složeného klíče řádku a HBASE-15572 přidat volitelnou sémantiku časového razítka. Těšíme se na výrobu budoucí verze konektoru, se kterou se s konektorem bude pracovat ještě snadněji.

Poděkování

Chceme poděkovat Hamel Kothari, Sudarshan Kadambi a týmu Bloomberg za to, že nás vedli v této práci a také nám pomohli tuto práci ověřit. Chceme také poděkovat komunitě HBase za poskytnutí zpětné vazby a zlepšení. A konečně, tato práce využila lekce z dřívějších integrací Spark HBase a chceme poděkovat jejich vývojářům za vydláždění cesty.

Odkaz:

SHC:https://github.com/hortonworks/shc-release

Spark-package:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/


  1. Opravdu má být Redigo Redis Pool globální proměnnou?

  2. Dotazování na rozsah v Redis - Spring Data Redis

  3. HBase BlockCache 101

  4. MongoDB - Aktualizace nebo vložení objektu do pole