Nedávno společnost MongoDB vydala novou funkci od verze 3.6, Change Streams. To vám dává okamžitý přístup k vašim datům, což vám pomáhá zůstat v obraze o změnách vašich dat. V dnešním světě chce každý spíše okamžitá upozornění, než aby je dostával po několika hodinách nebo minutách. U některých aplikací je důležité posílat upozornění v reálném čase všem přihlášeným uživatelům o každé aktualizaci. MongoDB zavedením této funkce tento proces opravdu zjednodušil. V tomto článku se na několika příkladech seznámíme s proudem změn MongoDB a jeho aplikacemi.
Definování toků změn
Proudy změn nejsou nic jiného než proud jakýchkoli změn, ke kterým dojde v databázi nebo kolekci nebo dokonce v nasazení v reálném čase. Například kdykoli dojde k jakékoli aktualizaci (Vložit, Aktualizovat nebo Smazat) v konkrétní kolekci, MongoDB spustí událost změny se všemi daty, která byla upravena.
Proudy změn můžete definovat v jakékoli kolekci stejně jako jakékoli jiné běžné agregační operátory pomocí operátoru $changeStream a metody watch(). Můžete také definovat tok změn pomocí metody MongoCollection.watch().
Příklad
db.myCollection.watch()
Změna funkcí streamů
-
Změny filtrování
Změny můžete filtrovat a dostávat oznámení o událostech pouze pro některá cílená data.
Příklad:
pipeline = [ { $match: { name: "Bob" } } ]; changeStream = collection.watch(pipeline);
Tento kód zajistí, že budete dostávat aktualizace pouze pro záznamy, které mají jméno rovné Bobovi. Tímto způsobem můžete psát libovolné kanály pro filtrování toků změn.
-
Obnovení streamů změn
Tato funkce zajišťuje, že nedojde ke ztrátě dat v případě jakýchkoli selhání. Každá odpověď ve streamu obsahuje token obnovení, který lze použít k restartování streamu od určitého bodu. U některých častých selhání sítě se ovladač mongodb pokusí znovu navázat spojení s předplatiteli pomocí nejnovějšího tokenu obnovení. I když v případě úplného selhání aplikace by klienti měli udržovat token obnovení, aby bylo možné stream obnovit.
-
Objednané toky změn
MongoDB používá globální logické hodiny k řazení všech událostí toku změn napříč všemi replikami a fragmenty libovolného clusteru, takže příjemce vždy obdrží oznámení ve stejném pořadí, v jakém byly příkazy použity v databázi.
-
Události s úplnými dokumenty
MongoDB standardně vrací část odpovídajících dokumentů. Můžete však upravit konfiguraci toku změn, abyste obdrželi úplný dokument. Chcete-li tak učinit, předejte { fullDocument:“updateLookup”} metodě sledování.
Příklad:collection = db.collection("myColl") changeStream = collection.watch({ fullDocument: “updateLookup”})
-
Trvanlivost
Toky změn budou upozorňovat pouze na data, která jsou přiřazena většině replik. To zajistí, že události budou generovány většinovými perzistentními daty, což zajistí trvanlivost zprávy.
-
Zabezpečení/Řízení přístupu
Toky změn jsou velmi bezpečné. Uživatelé mohou vytvářet proudy změn pouze v kolekcích, u kterých mají oprávnění ke čtení. Můžete vytvářet proudy změn na základě uživatelských rolí.
Příklad toků změn
V tomto příkladu vytvoříme toky změn ve sbírce akcií, abychom byli upozorněni, když cena akcií překročí jakoukoli hranici.
-
Nastavit cluster
Chcete-li použít toky změn, musíme nejprve vytvořit sadu replik. Spusťte následující příkaz k vytvoření sady replik jednoho uzlu.
mongod --dbpath ./data --replSet “rs”
-
Vložte nějaké záznamy do kolekce Akcie
var docs = [ { ticker: "AAPL", price: 210 }, { ticker: "AAPL", price: 260 }, { ticker: "AAPL", price: 245 }, { ticker: "AAPL", price: 255 }, { ticker: "AAPL", price: 270 } ]; db.Stocks.insert(docs)
-
Nastavit prostředí uzlů a nainstalovat závislosti
mkdir mongo-proj && cd mongo-proj npm init -y npm install mongodb --save
-
Přihlaste se k odběru změn
Vytvořte jeden soubor index.js a vložte do něj následující kód.
const mongo = require("mongodb").MongoClient; mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => { console.log("Connected to MongoDB server"); // Select DB and Collection const db = client.db("mydb"); const collection = db.collection("Stocks"); pipeline = [ { $match: { "fullDocument.price": { $gte: 250 } } } ]; // Define change stream const changeStream = collection.watch(pipeline); // start listen to changes changeStream.on("change", function(event) { console.log(JSON.stringify(event)); }); });
Nyní spusťte tento soubor:
node index.js
-
Chcete-li získat aktualizaci, vložte nový záznam do databáze
db.Stocks.insert({ ticker: “AAPL”, price: 280 })
Nyní zkontrolujte konzolu, obdržíte aktualizaci z MongoDB.
Příklad odpovědi:{ "_id":{ "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"}, "operationType":"insert", "clusterTime":"6655565945622233089", "fullDocument":{ "_id":"5c5d51f73aca83479b48de6e", "ticker":"AAPL", "Price":300 }, "ns":{"db":"mydb","coll":"Stocks"}, "documentKey":{"_id":"5c5d51f73aca83479b48de6e"} }
Zde můžete změnit hodnotu parametru operationType pomocí následujících operací, abyste naslouchali různým typům změn v kolekci:
- Vložit
- Nahradit (kromě jedinečného ID)
- Aktualizovat
- Smazat
- Zneplatnit (kdykoli Mongo vrátí neplatný kurzor)
Další režimy toků změn
Toky změn můžete zahájit proti databázi a nasazení stejným způsobem jako proti kolekci. Tato funkce byla uvolněna z MongoDB verze 4.0. Zde jsou příkazy pro otevření toku změn proti databázi a nasazení.
Against DB: db.watch()
Against deployment: Mongo.watch()
Závěr
MongoDB Change Streams zjednodušuje integraci mezi frontendem a backendem v reálném čase a bezproblémově. Tato funkce vám může pomoci používat MongoDB pro model pubsub, takže už nemusíte spravovat nasazení Kafka nebo RabbitMQ. Pokud vaše aplikace vyžaduje informace v reálném čase, musíte se podívat na tuto funkci MongoDB. Doufám, že tento příspěvek vám pomůže začít s proudy změn MongoDB.