sql >> Databáze >  >> NoSQL >> Redis

Konverze datového rámce na RDD[(String, String)]

Pokud chcete namapovat řádek na jiný prvek RDD, můžete použít df.map(row => ...) k převodu datového rámce na RDD.

Například:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

Viz https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html ohledně AnalysisException:Dotazy se zdroji streamování musí být provedeny pomocí writeStream.start()

Musíte počkat na ukončení dotazu pomocí dotazu.awaitTermination() Aby se zabránilo ukončení procesu, když je dotaz aktivní.




  1. Řetězec projektu agregace MongoDB na ObjectId

  2. Jak používat $unset a $set v kombinaci v mongoDB

  3. Přehled ověření schématu MongoDB

  4. Správa připojení