Úvod
Python je široce používán mezi datovými inženýry a datovými vědci k řešení nejrůznějších problémů od ETL/ELT potrubí až po vytváření modelů strojového učení. Apache HBase je efektivní systém pro ukládání dat pro mnoho pracovních postupů, ale přístup k těmto datům konkrétně prostřednictvím Pythonu může být problém. Pro datové profesionály, kteří chtějí využívat data uložená v HBase, lze s PySparkem pro základní operace použít nedávný upstream projekt „hbase-connectors“.
V této sérii blogů vysvětlíme, jak nakonfigurovat PySpark a HBase společně pro základní použití Spark i pro úlohy udržované v CDSW. Pro ty, kteří nejsou obeznámeni s CDSW, je to bezpečná, samoobslužná platforma pro vědu o podnikových datech pro datové vědce, aby mohli spravovat své vlastní analytické kanály, čímž urychlují projekty strojového učení od průzkumu až po produkci. Pro více informací o CDSW navštivte stránku produktu Cloudera Data Science Workbench.
V tomto příspěvku bude vysvětleno a předvedeno několik operací spolu s příkladem výstupu. Pro kontext jsou všechny ukázkové operace v tomto konkrétním blogovém příspěvku spuštěny s nasazením CDSW.
Předpoklady:
- Mějte CDP cluster s HBase a Spark
- Pokud budete postupovat podle příkladů prostřednictvím CDSW, budete jej potřebovat nainstalovaný – Instalace Cloudera Data Science Workbench
- Python 3 je nainstalován na každém uzlu ve stejné cestě
Konfigurace:
Nejprve je třeba HBase a Spark nakonfigurovat společně, aby dotazy Spark SQL fungovaly správně. Za tím účelem má dvě části:nejprve nakonfigurujte servery HBase Region Server prostřednictvím Cloudera Manager; a za druhé se ujistěte, že běhový modul Spark má vazby HBase. Jedna poznámka, kterou je třeba mít na paměti, je, že Cloudera Manager již nastavuje některé proměnné konfigurace a prostředí, aby za vás automaticky nasměroval Spark na HBase. Nicméně první krok konfigurace dotazů Spark SQL je společný pro všechny typy nasazení na clusterech CDP, ale druhý se mírně liší v závislosti na typu nasazení.
Konfigurace serverů regionu HBase
- Přejděte do Cloudera Manager a vyberte službu HBase.
- Vyhledejte „regionserver environment“
- Přidejte novou proměnnou prostředí pomocí úryvku pokročilé konfigurace RegionServer Environment (bezpečnostní ventil):
- Klíč:HBASE_CLASSPATH
- Hodnota:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Ujistěte se, že používáte příslušná čísla verzí.
- Restartujte servery regionů.
Jakmile budete postupovat podle výše uvedených kroků, postupujte podle níže uvedených kroků v závislosti na tom, zda chcete nasazení CDSW nebo Non-CDSW.
Přidání vazeb HBase do prostředí Spark Runtime v nasazeních bez CDSW
Chcete-li správně nasadit shell nebo použít spark-submit, použijte následující příkazy, abyste zajistili, že spark má správné vazby HBase.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. sklenice
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar
Přidání vazeb HBase do Spark Runtime v nasazení CDSW
Chcete-li nakonfigurovat CDSW s HBase a PySpark, musíte provést několik kroků.
1) Ujistěte se, že je Python 3 nainstalován na každém uzlu clusteru a poznamenejte si cestu k němu
2) Vytvořte nový projekt v CDSW a použijte šablonu PySpark
3) Otevřete projekt, přejděte do Nastavení -> Motor -> Proměnné prostředí.
4) Nastavte PYSPARK3_DRIVER_PYTHON a PYSPARK3_PYTHON k cestě, kde je Python nainstalován na uzlech clusteru (cesta uvedená v kroku 1).
Níže je ukázka, jak by to mělo vypadat.
5) Ve svém projektu přejděte na Soubory -> spark-defaults.conf a otevřete jej ve Workbench
6) Zkopírujte a vložte řádek níže do tohoto souboru a před zahájením nové relace se ujistěte, že je uložen.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
V tomto okamžiku je CDSW nyní nakonfigurováno pro spouštění úloh PySpark na HBase! Zbytek tohoto příspěvku na blogu odkazuje na některé ukázkové operace na nasazení CDSW.
Příklady operací
Operace vložení
Existují dva způsoby, jak vložit a aktualizovat řádky do HBase. První a nejvíce doporučovanou metodou je sestavení katalogu, což je schéma, které bude mapovat sloupce tabulky HBase na datový rámec PySpark při specifikaci názvu tabulky a jmenného prostoru. Vytvoření tohoto uživatelem definovaného formátu JSON je nejpreferovanější metodou, protože jej lze použít i s jinými operacemi. Další informace o katalozích naleznete v této dokumentaci http://hbase.apache.org/book.html#_define_catalog. Druhá metoda používá specifický mapovací parametr nazvaný „hbase.columns.mapping“, který přebírá pouze řetězec párů klíč–hodnota.
- Používání katalogů
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Ověřte, že je v HBase vytvořena nová tabulka s názvem „tblEmployee“ jednoduchým otevřením prostředí HBase a provedením následujícího příkazu:
skenování ‚tblZaměstnanec‘, {‘LIMIT‘ => 2}
Pomocí katalogů můžete také snadno načíst tabulky HBase. O tom bude řeč v budoucí části.
- Použití hbase.columns.mapping
Při psaní datového rámce PySpark lze přidat volbu nazvanou „hbase.columns.mapping“, která obsahuje řetězec, který správně mapuje sloupce. Tato možnost vám umožňuje pouze vkládat řádky do existujících tabulek.
V prostředí HBase nejprve vytvořte tabulku create ‚tblEmployee2‘, ‚personal‘
Nyní v PySpark vložíme 2 řádky pomocí „hbase.columns.mapping“
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Znovu ověřte, že nová tabulka s názvem „tblEmployee2“ obsahuje tyto nové řádky.
skenování ‚tblZaměstnanec2‘, {‘LIMIT‘ => 2}
Tím jsou naše příklady, jak vkládat řádky přes PySpark do tabulek HBase, dokončeny. V příštím díle proberu operace Get and Scan, PySpark SQL a některé odstraňování problémů. Do té doby byste si měli pořídit CDP cluster a projít si tyto příklady.