sql >> Databáze >  >> NoSQL >> MongoDB

Uložte velmi velký CSV do mongoDB pomocí mongoose

Vítejte ve streamování. To, co opravdu chcete, je „stream událostí“, který zpracuje váš vstup „po částech“ a samozřejmě ideálně pomocí společného oddělovače, jako je znak „nový řádek“, který právě používáte.

Pro skutečně efektivní věci můžete přidat použití MongoDB "Bulk API" vložky, aby vaše načítání bylo co nejrychlejší, aniž byste zabírali veškerou paměť počítače nebo cykly CPU.

Neobhajuji, protože existují různá dostupná řešení, ale zde je seznam, který využívá linku - balíček input-stream aby byla část "zakončování linky" jednoduchá.

Definice schémat pouze podle "příkladu":

var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Takže obecně rozhraní "stream" tam "rozbije vstup", aby zpracovalo "jeden řádek po druhém". To vám zabrání načíst vše najednou.

Hlavní části jsou "Bulk Operations API" z MongoDB. To vám umožní "zařadit" mnoho operací najednou před skutečným odesláním na server. Takže v tomto případě s použitím "modulo" jsou zápisy odesílány pouze na 1000 zpracovaných záznamů. Do limitu 16 MB BSON můžete opravdu dělat cokoli, ale udržujte jej zvládnutelný.

Kromě hromadných operací je k dispozici další „omezovač“ z async knihovna. Není to ve skutečnosti vyžadováno, ale zajišťuje to, že v podstatě nikdy není zpracováno více než „modulo limit“ dokumentů. Obecná dávková „vložení“ nemají žádné náklady na vstup IO kromě paměti, ale volání „execute“ znamenají, že IO zpracovává. Takže raději čekáme, než abychom řadili další věci do fronty.

Určitě existují lepší řešení, která můžete najít pro data typu CSV „zpracování proudu“, což se zdá být. Ale obecně vám to poskytuje koncepty, jak to udělat paměťově efektivním způsobem, aniž byste museli jíst cykly CPU.



  1. Vnořená pole v Mongoose

  2. RedisClient LUA API

  3. Jak zakázat protokolování java ovladače mongoDB?

  4. Ukládání java 8 LocalDate do mongo DB