sql >> Databáze >  >> NoSQL >> MongoDB

Streamování dat v reálném čase s MongoDB Change Streams

Nedávno společnost MongoDB vydala novou funkci od verze 3.6, Change Streams. To vám dává okamžitý přístup k vašim datům, což vám pomáhá zůstat v obraze o změnách vašich dat. V dnešním světě chce každý spíše okamžitá upozornění, než aby je dostával po několika hodinách nebo minutách. U některých aplikací je důležité posílat upozornění v reálném čase všem přihlášeným uživatelům o každé aktualizaci. MongoDB zavedením této funkce tento proces opravdu zjednodušil. V tomto článku se na několika příkladech seznámíme s proudem změn MongoDB a jeho aplikacemi.

Definování toků změn

Proudy změn nejsou nic jiného než proud jakýchkoli změn, ke kterým dojde v databázi nebo kolekci nebo dokonce v nasazení v reálném čase. Například kdykoli dojde k jakékoli aktualizaci (Vložit, Aktualizovat nebo Smazat) v konkrétní kolekci, MongoDB spustí událost změny se všemi daty, která byla upravena.

Proudy změn můžete definovat v jakékoli kolekci stejně jako jakékoli jiné běžné agregační operátory pomocí operátoru $changeStream a metody watch(). Můžete také definovat tok změn pomocí metody MongoCollection.watch().

Příklad

db.myCollection.watch()

Změna funkcí streamů

  • Změny filtrování

    Změny můžete filtrovat a dostávat oznámení o událostech pouze pro některá cílená data.

    Příklad:

    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Tento kód zajistí, že budete dostávat aktualizace pouze pro záznamy, které mají jméno rovné Bobovi. Tímto způsobem můžete psát libovolné kanály pro filtrování toků změn.

  • Obnovení streamů změn

    Tato funkce zajišťuje, že nedojde ke ztrátě dat v případě jakýchkoli selhání. Každá odpověď ve streamu obsahuje token obnovení, který lze použít k restartování streamu od určitého bodu. U některých častých selhání sítě se ovladač mongodb pokusí znovu navázat spojení s předplatiteli pomocí nejnovějšího tokenu obnovení. I když v případě úplného selhání aplikace by klienti měli udržovat token obnovení, aby bylo možné stream obnovit.

  • Objednané toky změn

    MongoDB používá globální logické hodiny k řazení všech událostí toku změn napříč všemi replikami a fragmenty libovolného clusteru, takže příjemce vždy obdrží oznámení ve stejném pořadí, v jakém byly příkazy použity v databázi.

  • Události s úplnými dokumenty

    MongoDB standardně vrací část odpovídajících dokumentů. Můžete však upravit konfiguraci toku změn, abyste obdrželi úplný dokument. Chcete-li tak učinit, předejte { fullDocument:“updateLookup”} metodě sledování.
    Příklad:

    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})
  • Trvanlivost

    Toky změn budou upozorňovat pouze na data, která jsou přiřazena většině replik. To zajistí, že události budou generovány většinovými perzistentními daty, což zajistí trvanlivost zprávy.

  • Zabezpečení/Řízení přístupu

    Toky změn jsou velmi bezpečné. Uživatelé mohou vytvářet proudy změn pouze v kolekcích, u kterých mají oprávnění ke čtení. Můžete vytvářet proudy změn na základě uživatelských rolí.

Somenines Staňte se MongoDB DBA – Uvedení MongoDB do produkce Zjistěte, co potřebujete vědět, abyste mohli nasadit, monitorovat, spravovat a škálovat MongoDBDdownload zdarma

Příklad toků změn

V tomto příkladu vytvoříme toky změn ve sbírce akcií, abychom byli upozorněni, když cena akcií překročí jakoukoli hranici.

  • Nastavit cluster

    Chcete-li použít toky změn, musíme nejprve vytvořit sadu replik. Spusťte následující příkaz k vytvoření sady replik jednoho uzlu.

    mongod --dbpath ./data --replSet “rs”
  • Vložte nějaké záznamy do kolekce Akcie

    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)
  • Nastavit prostředí uzlů a nainstalovat závislosti

    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save
  • Přihlaste se k odběru změn

    Vytvořte jeden soubor index.js a vložte do něj následující kód.

    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Nyní spusťte tento soubor:

    node index.js
  • Chcete-li získat aktualizaci, vložte nový záznam do databáze

    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Nyní zkontrolujte konzolu, obdržíte aktualizaci z MongoDB.
    Příklad odpovědi:

    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Zde můžete změnit hodnotu parametru operationType pomocí následujících operací, abyste naslouchali různým typům změn v kolekci:

  • Vložit
  • Nahradit (kromě jedinečného ID)
  • Aktualizovat
  • Smazat
  • Zneplatnit (kdykoli Mongo vrátí neplatný kurzor)

Další režimy toků změn

Toky změn můžete zahájit proti databázi a nasazení stejným způsobem jako proti kolekci. Tato funkce byla uvolněna z MongoDB verze 4.0. Zde jsou příkazy pro otevření toku změn proti databázi a nasazení.

Against DB: db.watch()
Against deployment: Mongo.watch()

Závěr

MongoDB Change Streams zjednodušuje integraci mezi frontendem a backendem v reálném čase a bezproblémově. Tato funkce vám může pomoci používat MongoDB pro model pubsub, takže už nemusíte spravovat nasazení Kafka nebo RabbitMQ. Pokud vaše aplikace vyžaduje informace v reálném čase, musíte se podívat na tuto funkci MongoDB. Doufám, že tento příspěvek vám pomůže začít s proudy změn MongoDB.


  1. MongoDB countDocuments()

  2. Získejte upozornění na změněné dokumenty v mongodb

  3. Mongoose Jedinečné hodnoty ve vnořeném poli objektů

  4. vysvětlit() v Mongodb:rozdíly mezi nscanned a nscannedObjects