Podle chyby již máte řetězec (již jste provedli df.selectExpr("CAST(value AS STRING)")
), takže byste měli zkusit získat událost Row jako String
, a nikoli Array[Byte]
Začněte změnou
val valueStr = new String(record.getAs[Array[Byte]]("value"))
do
val valueStr = record.getAs[String]("value")
Chápu, že už možná máte cluster pro spouštění kódu Spark, ale doporučil bych se ještě podívat na Konektor dřezu Kafka Connect Mongo takže nemusíte psát a udržovat svůj vlastní Mongo spisovatel v kódu Spark.
Nebo můžete zapsat datové sady Spark do mongo také přímo