sql >> Databáze >  >> NoSQL >> MongoDB

Výkon MongoDB:Spouštění operací MongoDB Map-Reduce na sekundárních serverech

Map-reduce je možná nejuniverzálnější z agregačních operací, které MongoDB podporuje.

Map-Reduce je populární programovací model, který vznikl ve společnosti Google pro paralelní zpracování a agregaci velkých objemů dat. Podrobná diskuse o Map-Reduce je mimo rozsah tohoto článku, ale v podstatě se jedná o vícekrokový proces agregace. Nejdůležitější dva kroky jsou fáze mapy (zpracování každého dokumentu a odeslání výsledků) a fáze redukce (shromáždí výsledky vygenerované během fáze mapy).

MongoDB podporuje tři druhy agregačních operací:Map-Reduce, agregační kanál a jednoúčelové agregační příkazy. Pomocí tohoto srovnávacího dokumentu MongoDB můžete zjistit, který vyhovuje vašim potřebám. https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/

V mém posledním příspěvku jsme na příkladech viděli, jak spustit agregační kanály na sekundárních serverech. V tomto příspěvku si projdeme spouštění úloh Map-Reduce na sekundárních replikách MongoDB.

MongoDB Map-Reduce

MongoDB podporuje spouštění úloh Map-Reduce na databázových serverech. To nabízí flexibilitu při psaní složitých agregačních úloh, které nelze tak snadno provádět prostřednictvím agregačních kanálů. MongoDB vám umožňuje psát vlastní mapu a redukovat funkce v Javascriptu, které lze předávat do databáze přes Mongo shell nebo jiného klienta. Na velkých a neustále rostoucích souborech dat lze dokonce zvážit spouštění přírůstkových úloh Map-Reduce, aby se předešlo pokaždé zpracování starších dat.

Historicky byly metody map a redukce prováděny v kontextu s jedním vláknem. Toto omezení však bylo ve verzi 2.4 odstraněno.

Proč spouštět úlohy Map-Reduce na sekundárním?

Stejně jako ostatní agregační úlohy je i Map-Reduce „dávková“ úloha náročná na zdroje, takže je vhodná pro spuštění na replikách pouze pro čtení. Námitky při tom jsou:

1) Mělo by být v pořádku používat mírně zastaralá data. Nebo můžete vyladit starost o zápis, abyste zajistili, že repliky budou vždy synchronizovány s primárním. Tato druhá možnost předpokládá, že zásah do výkonu zápisu je přijatelný.

2) Výstup úlohy Map-Reduce by neměl být zapsán do jiné kolekce v databázi, ale měl by být vrácen do aplikace (tj. žádné zápisy do databáze).

Podívejme se na to, jak to udělat pomocí příkladů, jak z prostředí mongo, tak z ovladače Java.

Zmenšení mapy na sadách replik

Soubor dat

Pro ilustraci použijeme poměrně jednoduchý soubor dat:denní výpis záznamů transakcí od prodejce. Ukázkový záznam vypadá takto:

RS-replica-0:PRIMARY> use test
switched to db test
RS-replica-0:PRIMARY> show tables
txns
RS-replica-0:PRIMARY> db.txns.findOne()
{
    "_id" : ObjectId("584a3b71cdc1cb061957289b"),
    "custid" : "cust_66",
    "txnval" : 100,
    "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...],
...
}

V našich příkladech spočítáme celkovou útratu daného zákazníka v daný den. Vzhledem k našemu schématu tedy budou metody map a redukce vypadat takto:

var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record
var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid

Po vytvoření našeho schématu se podívejme na Map-Reduce v akci.

MongoDB Shell

Aby bylo zajištěno, že úloha Map-Reduce bude provedena na sekundárním, měla by být preference čtení nastavena na sekundární . Jak jsme řekli výše, aby Map-Reduce běželo na sekundárním serveru, výstup výsledku musí být inline (Ve skutečnosti je to jediná povolená hodnota na sekundárních serverech). Podívejme se, jak to funguje.

$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
MongoDB shell version: 3.2.10
connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test
2016-12-09T08:15:19.347+0000 I NETWORK  [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017
2016-12-09T08:15:19.349+0000 I NETWORK  [ReplicaSetMonitorWatcher] starting
RS-replica-0:PRIMARY> db.setSlaveOk()
RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary')
RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); }
RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); }
RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }})
{
    "results" : [
        {
            "_id" : "cust_0",
            "value" : 72734
        },
        {
            "_id" : "cust_1",
            "value" : 67737
        },
...
    ]
    "timeMillis" : 215,
    "counts" : {
        "input" : 10000,
        "emit" : 10000,
        "reduce" : 909,
        "output" : 101
    },
    "ok" : 1

}

Nahlédnutí do protokolů na sekundárním zařízení potvrzuje, že úloha skutečně na sekundárním zařízení běžela.

...
2016-12-09T08:17:24.842+0000 D COMMAND  [conn344] mr ns: test.txns
2016-12-09T08:17:24.843+0000 I COMMAND  [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:17:24.865+0000 I COMMAND  [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:17:25.063+0000 I COMMAND  [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms
...

Java

Nyní zkusme spustit úlohu Map-Reduce na načtených replikách z aplikace Java. Na ovladači MongoDB Java stačí nastavení předvolby čtení. Výstup je ve výchozím nastavení inline, takže není třeba předávat žádné další parametry. Zde je příklad použití ovladače verze 3.2.2:

public class MapReduceExample {

    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0";
    private static final String COL_NAME = "txns";
    private static final String DEF_DB = "test";

    public MapReduceExample() { }

    public static void main(String[] args) {
        MapReduceExample writer = new MapReduceExample();
        writer.mapReduce();
    }

    public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
    public static final String reducefunction = "function(key, values) { return Array.sum(values); }";

    private void mapReduce() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase database = client.getDatabase(DEF_DB);
        MongoCollection collection = database.getCollection(COL_NAME);
        MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default
        MongoCursor cursor = iterable.iterator();
        while (cursor.hasNext()) {
           Document result = cursor.next();
           printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value"));
        }
        printer("Done...");
    }
...
}

Jak je zřejmé z protokolů, úloha běžela na sekundárním:

...
2016-12-09T08:32:31.419+0000 D COMMAND  [conn371] mr ns: test.txns
2016-12-09T08:32:31.420+0000 I COMMAND  [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:32:31.444+0000 I COMMAND  [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:32:31.890+0000 I COMMAND  [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms
...

MongoDB Map-Reduce na sdílených clusterech

MongoDB podporuje Map-Reduce na sdílených clusterech, a to jak v případě, že je sdílená kolekce vstupem, tak i když je výstupem úlohy Map-Reduce. Momentálně však MongoDB nepodporuje spouštění úloh redukce map na sekundárních částech sdíleného clusteru. Tedy i když možnost out je nastaveno na inline Úlohy Map-Reduce poběží vždy na primárních položkách sdíleného clusteru. Tento problém je sledován prostřednictvím této chyby JIRA.

Syntaxe provádění úlohy Map-Reduce na sdíleném clusteru je stejná jako na sadě replik. Takže platí příklady uvedené ve výše uvedené části. Pokud je výše uvedený příklad Java spuštěn na sdíleném clusteru, objeví se na primárních položkách zprávy protokolu, které indikují, že tam byl příkaz spuštěn.

...
2016-11-24T08:46:30.828+0000 I COMMAND  [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in
line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS
tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing:
 { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha
rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu
t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu
ireCount: { r: 1 } } } protocol:op_command 115ms
2016-11-24T08:46:30.830+0000 I COMMAND  [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
...

Navštivte prosím naši produktovou stránku MongoDB, kde se dozvíte o našem rozsáhlém seznamu funkcí.


  1. Filtrační prvky v Redis

  2. Je možné generovat dynamicky pojmenované kolekce MongoDB?

  3. Redis Publikovat/přihlásit se k odběru

  4. Jak zastavit mongo DB jedním příkazem