Agregační operace v MongoDB vám umožňují zpracovávat datové záznamy, seskupovat je a vracet jejich vypočítané výsledky. MongoDB podporuje tři druhy agregačních operací:
- Jednoúčelové agregační příkazy
- Map-Reduce
- Agregační kanál
Můžete použít tento srovnávací dokument MongoDB, abyste zjistili, který vyhovuje vašim potřebám.
Agregační kanál
Agregační kanál je rámec MongoDB, který poskytuje agregaci dat prostřednictvím kanálu zpracování dat. To znamená, že dokumenty se odesílají vícekrokovým kanálem, který v každém kroku filtruje, seskupuje a jinak transformuje dokumenty. Poskytuje SQL „GROUP BY…“. typ konstrukcí pro MongoDB, které běží v samotné databázi. Dokumentace agregace poskytuje užitečné příklady vytváření takových kanálů.
Proč spouštět agregace na sekundárním serveru?
Agregační kanály jsou operace náročné na zdroje – dává smysl přesunout úlohy agregace na sekundární položky sady replik MongoDB, když je v pořádku pracovat s mírně zastaralými daty. To obvykle platí pro „dávkové“ operace, protože neočekávají, že poběží na nejnovějších datech. Pokud je třeba výstup zapsat do kolekce, pak agregační úlohy běží pouze na primární, protože pouze primární je zapisovatelný do MongoDB.
V tomto příspěvku vám ukážeme, jak zajistit, aby se agregační kanály spouštěly na sekundárním místě jak z mongo shellu, tak z Java.
Spusťte agregační kanály na sekundárním z Mongo Shell a Java v MongoDBClick To TweetPoznámka:K předvedení našich příkladů používáme vzorovou datovou sadu poskytnutou MongoDB v jejich příkladu agregace PSČ. Můžete si jej stáhnout podle pokynů v příkladu.
Agregační kanál na sadách replik
Skořápka MongoDB
Nastavení předvolby čtení na sekundární dělá trik při spouštění agregační úlohy z mongo shellu. Pokusme se načíst všechny státy s populací větší než 10 milionů (1. agregace v příkladu PSČ). Shell i server používají MongoDB verze 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 RS-repl0-0:PRIMARY> use test switched to db test RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-repl0-0:PRIMARY> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 } { "_id" : "TX", "totalPop" : 16984601 }
Pohled do protokolů MongoDB (s povoleným protokolováním pro příkazy) na sekundárním místě ukazuje, že agregace skutečně běžela na sekundárním:
... 2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms ...
Java
Z ovladače MongoDB Java opět stačí nastavení předvolby čtení. Zde je příklad použití ovladače verze 3.2.2:
public class AggregationChecker { /* * Data and code inspired from: * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million */ private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0"; private static final String COL_NAME = "zips"; private static final String DEF_DB = "test"; public AggregationChecker() { } public static void main(String[] args) { AggregationChecker writer = new AggregationChecker(); writer.aggregationJob(); } private void aggregationJob() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); try { final DB db = client.getDB(DEF_DB); final DBCollection coll = db.getCollection(COL_NAME); // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state Iterable iterable = coll.aggregate( Arrays.asList( new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop", new BasicDBObject("$sum", "$pop"))), new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results(); for (DBObject entry : iterable) { printer(entry.toString()); } } finally { client.close(); } printer("Done..."); } ... }
Protokoly na sekundárním:
... 2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms ...
Na primárním zařízení nebyla zaznamenána žádná operace.
Agregační kanál na sdílených klastrech
Agregační kanály jsou podporovány na sdílených clusterech. Podrobné chování je vysvětleno v dokumentaci. Z hlediska implementace je při použití agregačního kanálu malý rozdíl mezi sadou replik a sdíleným clusterem.
Jak nastavit agregační kanál na sdílených klastrech v MongoDBClick To TweetSkořápka MongoDB
Před importem dat do sdíleného clusteru povolte sdílení na kolekci.
mongos> sh.enableSharding("test") mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
Poté jsou operace stejné jako u sady replik:
mongos> db.setSlaveOk() mongos> db.getMongo().setReadPref('secondary') mongos> db.getMongo().getReadPrefMode() secondary mongos> db.zips.aggregate( [ ... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, ... { $match: { totalPop: { $gte: 10*1000*1000 } } } ... ] ) { "_id" : "TX", "totalPop" : 16984601 } { "_id" : "PA", "totalPop" : 11881643 } { "_id" : "CA", "totalPop" : 29754890 } { "_id" : "FL", "totalPop" : 12686644 } { "_id" : "NY", "totalPop" : 17990402 } { "_id" : "OH", "totalPop" : 10846517 } { "_id" : "IL", "totalPop" : 11427576 }
Protokoly z jednoho ze sekundárních serverů:
... 2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms ...
Java
Stejný kód jako v sadě replik funguje dobře se sdíleným clusterem. Stačí nahradit připojovací řetězec sady replik řetězcem ze sdíleného clusteru. Protokoly ze sekundárního souboru ukazují, že úloha byla skutečně spuštěna na sekundárních serverech:
... 2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms 2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms ...
Byl tento obsah užitečný? Dejte nám vědět tweetováním na nás @scaledgridio a jako vždy, pokud máte nějaké dotazy, dejte nám vědět v komentářích níže. Ach jo! Nezapomeňte se podívat na naše hostingové produkty MongoDB, které mohou ušetřit až 40 % dlouhodobých nákladů na hosting MongoDB®.