sql >> Databáze >  >> NoSQL >> MongoDB

Ponořte Kafka Stream do MongoDB pomocí PySpark Structured Streaming

Našel jsem řešení. Protože jsem nemohl najít správný ovladač Mongo pro strukturované streamování, pracoval jsem na jiném řešení. Nyní používám přímé připojení k mongoDb a místo foreachbatch(.) používám „foreach(...)“. ...). Můj kód vypadá v souboru testSpark.py takto:

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()



  1. Selhání ověření při pokusu o uložení do mongodb

  2. Jak používat Node.js k vytvoření tunelového připojení SSH k databázi MongoDB

  3. Mongoose, seřaďte dotaz podle vyplněného pole

  4. Delegát metatřídy není instancí