sql >> Databáze >  >> NoSQL >> MongoDB

RDD BSONObject do DataFrame

val datapath = "path_to_bson_file.bson" 

import org.apache.hadoop.conf.Configuration

// Set up the configuration for reading from bson dump.
val bsonConfig = new Configuration()
bsonConfig.set("mongo.job.input.format", "com.mongodb.hadoop.BSONFileInputFormat")

// given with your spark session 
implicit lazy val sparkSession = initSpark()

// read the RDD[org.bson.BSONObject]
val bson_data_as_json_string = sparkSession.sparkContext.newAPIHadoopFile(datapath,
  classOf[com.mongodb.hadoop.BSONFileInputFormat].
    asSubclass(classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[Object, org.bson.BSONObject]]),
  classOf[Object],
  classOf[org.bson.BSONObject],
  bsonConfig).
  map{row => {
    // map BSON object to JSON string
    val json = com.mongodb.util.JSON.serialize(row._2)
    json
  }
}

// read into JSON spark Dataset:
val bson_data_as_json_dataset = sparkSession.sqlContext.read.json(bson_data_as_json_string)
// eval the schema:
bson_data_as_json_dataset.printSchema()


  1. SyntaxError:neplatné ID vlastnosti - MongoDB

  2. Jak mohu navrhnout schéma pro níže uvedený produkt pomocí mongoose?

  3. Budování škálovatelného procesu pomocí NiFi, Kafka a HBase na CDP

  4. Jak získám datum, které se uloží jako datum v MongoDB místo Int64?