PostgreSQL je dobře známá jako nejpokročilejší opensource databáze a pomáhá vám spravovat vaše data bez ohledu na to, jak velká, malá nebo odlišná je datová sada, takže ji můžete použít ke správě nebo analýze vašich velkých dat, a samozřejmě existují několik způsobů, jak to umožnit, např. Apache Spark. V tomto blogu uvidíme, co je Apache Spark a jak jej můžeme použít pro práci s naší PostgreSQL databází.
Pro analýzu velkých dat máme dva různé typy analýz:
- Dávkové analýzy:Na základě údajů shromážděných za určité časové období.
- Analýza v reálném čase (streamování):Na základě okamžitých dat pro okamžitý výsledek.
Co je Apache Spark?
Apache Spark je sjednocený analytický engine pro rozsáhlé zpracování dat, který může pracovat na dávkové analýze i analýze v reálném čase rychleji a snadněji.
Poskytuje rozhraní API na vysoké úrovni v jazycích Java, Scala, Python a R a optimalizovaný engine, který podporuje obecné grafy provádění.
Součásti Apache SparkKnihovny Apache Spark
Apache Spark obsahuje různé knihovny:
- Spark SQL:Je to modul pro práci se strukturovanými daty pomocí SQL nebo DataFrame API. Poskytuje běžný způsob přístupu k různým zdrojům dat, včetně Hive, Avro, Parquet, ORC, JSON a JDBC. Můžete dokonce spojit data z těchto zdrojů.
- Spark Streaming:Usnadňuje vytváření škálovatelných streamovacích aplikací odolných proti chybám pomocí jazykového rozhraní API pro streamování zpracování, což vám umožňuje psát streamovací úlohy stejným způsobem, jako píšete dávkové úlohy. Podporuje Java, Scala a Python. Spark Streaming obnoví ztracenou práci i stav operátora ihned po vybalení, bez jakéhokoli dalšího kódu z vaší strany. Umožňuje znovu použít stejný kód pro dávkové zpracování, spojovat streamy s historickými daty nebo spouštět ad-hoc dotazy na stav streamu.
- MLib (Machine Learning):Je to škálovatelná knihovna pro strojové učení. MLlib obsahuje vysoce kvalitní algoritmy, které využívají iteraci a mohou přinést lepší výsledky než jednoprůchodové aproximace, které se někdy používají na MapReduce.
- GraphX:Jedná se o rozhraní API pro grafy a grafově paralelní výpočty. GraphX sjednocuje ETL, průzkumnou analýzu a iterativní grafové výpočty do jednoho systému. Můžete zobrazit stejná data jako grafy i kolekce, efektivně transformovat a spojovat grafy s RDD a psát vlastní iterativní grafové algoritmy pomocí rozhraní Pregel API.
Výhody Apache Spark
Podle oficiální dokumentace jsou některé výhody Apache Spark:
- Rychlost:Spouštějte úlohy 100x rychleji. Apache Spark dosahuje vysokého výkonu pro dávková i streamovaná data pomocí nejmodernějšího plánovače DAG (Direct Acyclic Graph), optimalizátoru dotazů a fyzického spouštěcího enginu.
- Snadné použití:Rychle pište aplikace v jazycích Java, Scala, Python, R a SQL. Spark nabízí více než 80 operátorů na vysoké úrovni, kteří usnadňují vytváření paralelních aplikací. Můžete jej používat interaktivně z prostředí Scala, Python, R a SQL.
- Obecnost:Kombinujte SQL, streamování a komplexní analýzy. Spark pohání řadu knihoven včetně SQL a DataFrames, MLlib pro strojové učení, GraphX a Spark Streaming. Tyto knihovny můžete hladce kombinovat ve stejné aplikaci.
- Běží všude:Spark běží na Hadoop, Apache Mesos, Kubernetes, samostatně nebo v cloudu. Může přistupovat k různým zdrojům dat. Spark můžete spustit pomocí jeho samostatného clusterového režimu, na EC2, na Hadoop YARN, na Mesos nebo na Kubernetes. Získejte přístup k datům v HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive a stovkách dalších zdrojů dat.
Nyní se podívejme, jak to můžeme integrovat s naší databází PostgreSQL.
Jak používat Apache Spark s PostgreSQL
Budeme předpokládat, že máte svůj PostgreSQL cluster spuštěný a spuštěný. Pro tento úkol použijeme server PostgreSQL 11 běžící na CentOS7.
Nejprve si vytvoříme testovací databázi na našem PostgreSQL serveru:
postgres=# CREATE DATABASE testing;
CREATE DATABASE
postgres=# \c testing
You are now connected to database "testing" as user "postgres".
Nyní vytvoříme tabulku s názvem t1:
testing=# CREATE TABLE t1 (id int, name text);
CREATE TABLE
A vložte tam nějaká data:
testing=# INSERT INTO t1 VALUES (1,'name1');
INSERT 0 1
testing=# INSERT INTO t1 VALUES (2,'name2');
INSERT 0 1
Zkontrolujte vytvořená data:
testing=# SELECT * FROM t1;
id | name
----+-------
1 | name1
2 | name2
(2 rows)
K připojení Apache Spark k naší databázi PostgreSQL použijeme konektor JDBC. Můžete si jej stáhnout zde.
$ wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar
Nyní nainstalujme Apache Spark. K tomu si musíme stáhnout balíčky spark odtud.
$ wget http://us.mirrors.quenda.co/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
$ tar zxvf spark-2.4.3-bin-hadoop2.7.tgz
$ cd spark-2.4.3-bin-hadoop2.7/
Ke spuštění shellu Spark potřebujeme na našem serveru nainstalovanou JAVA:
$ yum install java
Takže teď můžeme spustit náš Spark Shell:
$ ./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://ApacheSpark1:4040
Spark context available as 'sc' (master = local[*], app id = local-1563907528854).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Máme přístup k našemu kontextovému webovému rozhraní Spark dostupnému na portu 4040 na našem serveru:
Apache Spark UIDo shellu Spark musíme přidat ovladač PostgreSQL JDBC:
scala> :require /path/to/postgresql-42.2.6.jar
Added '/path/to/postgresql-42.2.6.jar' to classpath.
scala> import java.util.Properties
import java.util.Properties
A přidejte informace JDBC, které má Spark používat:
scala> val url = "jdbc:postgresql://localhost:5432/testing"
url: String = jdbc:postgresql://localhost:5432/testing
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
scala> connectionProperties.setProperty("Driver", "org.postgresql.Driver")
res6: Object = null
Nyní můžeme provádět SQL dotazy. Nejprve definujme dotaz1 jako SELECT * FROM t1, naši testovací tabulku.
scala> val query1 = "(SELECT * FROM t1) as q1"
query1: String = (SELECT * FROM t1) as q1
A vytvořte DataFrame:
scala> val query1df = spark.read.jdbc(url, query1, connectionProperties)
query1df: org.apache.spark.sql.DataFrame = [id: int, name: string]
Nyní tedy můžeme provést akci přes tento DataFrame:
scala> query1df.show()
+---+-----+
| id| name|
+---+-----+
| 1|name1|
| 2|name2|
+---+-----+
scala> query1df.explain
== Physical Plan ==
*(1) Scan JDBCRelation((SELECT * FROM t1) as q1) [numPartitions=1] [id#19,name#20] PushedFilters: [], ReadSchema: struct<id:int,name:string>
Můžeme přidat další hodnoty a spustit to znovu, abychom se ujistili, že vrací aktuální hodnoty.
PostgreSQL
testing=# INSERT INTO t1 VALUES (10,'name10'), (11,'name11'), (12,'name12'), (13,'name13'), (14,'name14'), (15,'name15');
INSERT 0 6
testing=# SELECT * FROM t1;
id | name
----+--------
1 | name1
2 | name2
10 | name10
11 | name11
12 | name12
13 | name13
14 | name14
15 | name15
(8 rows)
Jiskra
scala> query1df.show()
+---+------+
| id| name|
+---+------+
| 1| name1|
| 2| name2|
| 10|name10|
| 11|name11|
| 12|name12|
| 13|name13|
| 14|name14|
| 15|name15|
+---+------+
V našem příkladu ukazujeme pouze to, jak Apache Spark pracuje s naší databází PostgreSQL, nikoli jak spravuje naše informace o velkých datech.
Závěr
V dnešní době je docela běžné mít ve společnosti problém spravovat velká data, a jak jsme viděli, můžeme použít Apache Spark, abychom se s tím vyrovnali a využili všechny funkce, které jsme zmínili dříve. Velká data jsou obrovský svět, takže si můžete prohlédnout oficiální dokumentaci, kde najdete další informace o použití Apache Spark a PostgreSQL a přizpůsobíte je svým požadavkům.