sql >> Databáze >  >> NoSQL >> MongoDB

Mongodb agregované řazení a omezení v rámci skupiny

Základní problém

Není to nejmoudřejší nápad zkoušet to udělat v agregačním rámci v současné době v dohledné blízké budoucnosti. Hlavní problém samozřejmě pochází z tohoto řádku v kódu, který již máte:

"items" : { "$push": "$$ROOT" }

A to přesně znamená, že v podstatě se musí stát, že všechny objekty v seskupovacím klíči musí být vloženy do pole, aby se dostaly k „horním N“ výsledkům v jakémkoli pozdějším kódu.

To se zjevně neškáluje, protože velikost samotného pole může velmi pravděpodobně překročit limit BSON 16 MB, a to bez ohledu na zbytek dat v seskupeném dokumentu. Hlavní háček je v tom, že není možné „omezit push“ jen na určitý počet položek. Právě s takovou věcí existuje dlouhodobý problém JIRA.

Už jen z tohoto důvodu je nejpraktičtějším přístupem spouštění jednotlivých dotazů na „horních N“ položek pro každý klíč seskupení. Nemusí to být ani .aggregate() výpisy (v závislosti na datech) a může to být opravdu cokoliv, co jednoduše omezuje požadované hodnoty "top N".

Nejlepší přístup

Zdá se, že vaše architektura je na node.js s mongoose , ale cokoli, co podporuje asynchronní IO a paralelní provádění dotazů, bude nejlepší volbou. Ideálně něco s vlastní knihovnou API, která podporuje kombinování výsledků těchto dotazů do jediné odpovědi.

Existuje například tento zjednodušený příkladový výpis využívající vaši architekturu a dostupné knihovny (zejména async ), který dělá tyto paralelní a kombinované výsledky přesně:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      async.waterfall(
        [
          function(callback) {
            Test.distinct("merchant",callback);
          },
          function(merchants,callback) {
            async.concat(
              merchants,
              function(merchant,callback) {
                Test.find({ "merchant": merchant })
                  .sort({ "rating": -1 })
                  .limit(2)
                  .exec(callback);
              },
              function(err,results) {
                console.log(JSON.stringify(results,undefined,2));
                callback(err);
              }
            );
          }
        ],
        callback
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);

Výsledkem jsou pouze 2 nejlepší výsledky pro každého obchodníka ve výstupu:

[
  {
    "_id": "560d153669fab495071553ce",
    "merchant": 1,
    "rating": 3,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553cd",
    "merchant": 1,
    "rating": 2,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553d1",
    "merchant": 2,
    "rating": 3,
    "__v": 0
  },
  {
    "_id": "560d153669fab495071553d0",
    "merchant": 2,
    "rating": 2,
    "__v": 0
  }
]

Je to opravdu nejefektivnější způsob, jak to zpracovat, i když to bude vyžadovat zdroje, protože stále jde o více dotazů. Pokud se však pokusíte uložit všechny dokumenty do pole a zpracovat je, nikde se neblíží zdrojům spotřebovaným v agregačním kanálu.

Souhrnný problém, nyní a blízká budoucnost

K tomuto řádku je možné, že počet dokumentů nezpůsobí porušení limitu BSON, že to lze provést. Metody s aktuálním vydáním MongoDB na to nejsou skvělé, ale nadcházející vydání (v době psaní to dělá větev 3.1.8 dev) alespoň zavádí $slice operátora do agregačního kanálu. Pokud jste tedy chytřejší ohledně operace agregace a používáte $sort nejprve lze již seřazené položky v poli snadno vybrat:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      Test.aggregate(
        [
          { "$sort": { "merchant": 1, "rating": -1 } },
          { "$group": {
            "_id": "$merchant",
            "items": { "$push": "$$ROOT" }
          }},
          { "$project": {
            "items": { "$slice": [ "$items", 2 ] }
          }}
        ],
        function(err,results) {
          console.log(JSON.stringify(results,undefined,2));
          callback(err);
        }
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);

Což vede ke stejnému základnímu výsledku, jako jsou první 2 položky „vyříznuty“ z pole, jakmile byly setříděny jako první.

V aktuálních verzích je to také ve skutečnosti "možné", ale se stejnými základními omezeními v tom, že to stále zahrnuje vložení veškerého obsahu do pole poté, co obsah nejprve seřadíte. Chce to jen "iterativní" přístup. Můžete to kódovat, abyste vytvořili agregační kanál pro větší položky, ale pouhé zobrazení „dvě“ by mělo ukázat, že to není opravdu skvělý nápad:

var async = require('async'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var data = [
  { "merchant": 1, "rating": 1 },
  { "merchant": 1, "rating": 2 },
  { "merchant": 1, "rating": 3 },
  { "merchant": 2, "rating": 1 },
  { "merchant": 2, "rating": 2 },
  { "merchant": 2, "rating": 3 }
];

var testSchema = new Schema({
  merchant: Number,
  rating: Number
});

var Test = mongoose.model( 'Test', testSchema, 'test' );

async.series(
  [
    function(callback) {
      Test.remove({},callback);
    },
    function(callback) {
      async.each(data,function(item,callback) {
        Test.create(item,callback);
      },callback);
    },
    function(callback) {
      Test.aggregate(
        [
          { "$sort": { "merchant": 1, "rating": -1 } },
          { "$group": {
            "_id": "$merchant",
            "items": { "$push": "$$ROOT" }
          }},
          { "$unwind": "$items" },
          { "$group": {
            "_id": "$_id",
            "first": { "$first": "$items" },
            "items": { "$push": "$items" }
          }},
          { "$unwind": "$items" },
          { "$redact": {
            "$cond": [
              { "$eq": [ "$items", "$first" ] },
              "$$PRUNE",
              "$$KEEP"
            ]
          }},
          { "$group": {
            "_id": "$_id",
            "first": { "$first": "$first" },
            "second": { "$first": "$items" }
          }},
          { "$project": {
            "items": {
              "$map": {
                "input": ["A","B"],
                "as": "el",
                "in": {
                  "$cond": [
                    { "$eq": [ "$$el", "A" ] },
                    "$first",
                    "$second"
                  ]
                }
              }
            }
          }}
        ],
        function(err,results) {
          console.log(JSON.stringify(results,undefined,2));
          callback(err);
        }
      );
    }
  ],
  function(err) {
    if (err) throw err;
    mongoose.disconnect();
  }
);

A znovu, zatímco v dřívějších verzích je to "možné" (toto používá funkce představené ve verzi 2.6 ke zkrácení, protože jste již označili $$ROOT ), základními kroky je uložení pole a následné odstranění každé položky „ze zásobníku“ pomocí $first a porovnat to (a potenciálně i další) s položkami v poli, abyste je odstranili a pak dostali „další první“ položku z tohoto zásobníku, dokud nebude vaše „horní N“ nakonec hotové.

Závěr

Dokud nepřijde den, kdy existuje taková operace, která umožňuje položky v $push být agregační akumulátor omezen na určitý počet, pak to pro agregaci opravdu není praktická operace.

Můžete to udělat, pokud jsou data, která máte v těchto výsledcích, dostatečně malá, a může to být dokonce efektivnější než zpracování na straně klienta, pokud mají databázové servery dostatečné specifikace, aby poskytovaly skutečnou výhodu. Ale je pravděpodobné, že ani jedno nebude ve většině reálných aplikací rozumného použití.

Nejlepší je použít nejprve ukázanou možnost „paralelního dotazu“. Vždy se bude dobře škálovat a není třeba „kódovat“ takovou logiku, aby konkrétní seskupení nemuselo vrátit alespoň celkový počet „top N“ požadovaných položek, a vymyslet, jak je zachovat (mnohem delší příklad toho vynechaného ), protože jednoduše provede každý dotaz a zkombinuje výsledky.

Použijte paralelní dotazy. Bude to lepší než kódovaný přístup, který máte, a výrazně překoná agregační přístup. Dokud nebude existovat alespoň lepší možnost.



  1. Jak používáte Mongoose bez definování schématu?

  2. Nováček aplikací v reálném čase - Node.JS + Redis nebo RabbitMQ -> klient/server jak?

  3. Získejte vygenerovaný skript v ovladači MongoDB C#

  4. MongoDB Java Inserting Throws org.bson.codecs.configuration.CodecConfigurationException:Nelze najít kodek pro třídu io.github.ilkgunel.mongodb.Pojo