ÚPRAVA 2018-01-27:
Ukazuje se, že tento problém souvisí s DirectRunner. Pokud spustíte stejný kanál pomocí DataflowRunner, měli byste získat dávky, které jsou ve skutečnosti až 1 000 záznamů. DirectRunner vždy vytvoří svazky velikosti 1 po operaci seskupení.
Původní odpověď:
Narazil jsem na stejný problém při zápisu do cloudových databází pomocí JdbcIO Apache Beam. Problém je v tom, že zatímco JdbcIO podporuje zápis až 1 000 záznamů v jedné dávce, ve skutečnosti jsem nikdy neviděl, že by zapisovalo více než 1 řádek najednou (musím přiznat:ve vývojovém prostředí se vždy používal DirectRunner).
Přidal jsem proto do JdbcIO funkci, kde můžete sami ovládat velikost dávek seskupením dat a zapsáním každé skupiny jako jedné dávky. Níže je uveden příklad použití této funkce na základě původního příkladu WordCount Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Rozdíl oproti normální metodě zápisu JdbcIO je nová metoda writeIterable()
který trvá PCollection<Iterable<RowT>>
jako vstup namísto PCollection<RowT>
. Každá iterovatelná je zapsána jako jedna dávka do databáze.
Verzi JdbcIO s tímto doplňkem lze nalézt zde:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Celý ukázkový projekt obsahující výše uvedený příklad lze nalézt zde:https://github.com/ olavloite/příklad-nosníku
(Na Apache Beam také čeká na vyřízení požadavku na zahrnutí do projektu)