sql >> Databáze >  >> RDS >> Mysql

Google Dataflow (Apache beam) hromadné vložení JdbcIO do mysql databáze

ÚPRAVA 2018-01-27:

Ukazuje se, že tento problém souvisí s DirectRunner. Pokud spustíte stejný kanál pomocí DataflowRunner, měli byste získat dávky, které jsou ve skutečnosti až 1 000 záznamů. DirectRunner vždy vytvoří svazky velikosti 1 po operaci seskupení.

Původní odpověď:

Narazil jsem na stejný problém při zápisu do cloudových databází pomocí JdbcIO Apache Beam. Problém je v tom, že zatímco JdbcIO podporuje zápis až 1 000 záznamů v jedné dávce, ve skutečnosti jsem nikdy neviděl, že by zapisovalo více než 1 řádek najednou (musím přiznat:ve vývojovém prostředí se vždy používal DirectRunner).

Přidal jsem proto do JdbcIO funkci, kde můžete sami ovládat velikost dávek seskupením dat a zapsáním každé skupiny jako jedné dávky. Níže je uveden příklad použití této funkce na základě původního příkladu WordCount Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

Rozdíl oproti normální metodě zápisu JdbcIO je nová metoda writeIterable() který trvá PCollection<Iterable<RowT>> jako vstup namísto PCollection<RowT> . Každá iterovatelná je zapsána jako jedna dávka do databáze.

Verzi JdbcIO s tímto doplňkem lze nalézt zde:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Celý ukázkový projekt obsahující výše uvedený příklad lze nalézt zde:https://github.com/ olavloite/příklad-nosníku

(Na Apache Beam také čeká na vyřízení požadavku na zahrnutí do projektu)




  1. Jak vytvořit Cron úlohu pro zálohování MySQL a FTP zálohy na můj záložní server?

  2. Fulltextový dotaz s jedinou uvozovkou

  3. Výhody PostgreSQL

  4. Jak zjistím svou MySQL URL, hostitele, port a uživatelské jméno?