sql >> Databáze >  >> NoSQL >> MongoDB

Import CSV pomocí schématu Mongoose

Můžete to udělat pomocí fast-csv získáním headers z definice schématu, která vrátí analyzované řádky jako "objekty". Ve skutečnosti máte nějaké neshody, takže jsem je označil opravami:

const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Pokud schéma skutečně odpovídá poskytnutému CSV, je to v pořádku. Toto jsou opravy, které vidím, ale pokud potřebujete skutečné názvy polí zarovnat jinak, musíte je upravit. Ale v podstatě tam bylo Number na pozici, kde je String a v podstatě další pole, o kterém předpokládám, že je prázdné v CSV.

Obecnou věcí je získání pole názvů polí ze schématu a předání je do možností při vytváření instance analyzátoru csv:

let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Jakmile to skutečně uděláte, získáte zpět "Objekt" místo pole:

{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Nedělejte si starosti s "typy", protože Mongoose přetypuje hodnoty podle schématu.

Zbytek se děje v obslužné rutině pro data událost. Pro maximální efektivitu používáme insertMany() zapisovat do databáze pouze jednou za 10 000 řádků. Jak to ve skutečnosti jde na server a procesy, závisí na verzi MongoDB, ale 10 000 by mělo být docela rozumných na základě průměrného počtu polí, která byste importovali pro jednu kolekci, pokud jde o „kompenzaci“ za využití paměti a zápis rozumný požadavek sítě. V případě potřeby číslo zmenšete.

Důležité je označit tato volání jako async funkce a await výsledek insertMany() před pokračováním. Také potřebujeme pause() stream a resume() u každé položky jinak riskujeme přepsání buffer dokumentů, které je třeba vložit před jejich skutečným odesláním. pause() a resume() je nutné vytvořit "protitlak" na potrubí, jinak položky jen "vycházejí" a střílejí data událost.

Kontrola 10 000 záznamů přirozeně vyžaduje, abychom to zkontrolovali jak při každé iteraci, tak při dokončení streamu, abychom vyprázdnili vyrovnávací paměť a odeslali všechny zbývající dokumenty na server.

To je skutečně to, co chcete udělat, protože určitě nechcete spustit asynchronní požadavek na server při „každé“ iteraci přes data událost nebo v podstatě bez čekání na dokončení každého požadavku. U „velmi malých souborů“ vám to projde tím, že to nebudete kontrolovat, ale u jakéhokoli reálného zatížení určitě překročíte zásobník hovorů kvůli „za letu“ asynchronním hovorům, která ještě nebyla dokončena.

Pro informaci – package.json použitý. mz je volitelný, protože se jedná pouze o modernizovaný Promise povolená knihovna standardních uzlových "vestavěných" knihoven, na které jsem prostě zvyklý. Kód je samozřejmě zcela zaměnitelný s fs modul.

{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Ve skutečnosti s Node v8.9.x a vyšší pak to můžeme ještě mnohem zjednodušit implementací AsyncIterator prostřednictvím stream-to-iterator modul. Stále je v Iterator<Promise<T>> režimu, ale mělo by to fungovat, dokud se Node v10.x nestane stabilním LTS:

const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

V zásadě je veškeré zpracování a pozastavení a obnovení streamu nahrazeno jednoduchým for smyčka:

const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Snadný! To se vyčistí v pozdější implementaci uzlu pomocí for..await..of až bude stabilnější. Ale výše uvedené běží v pořádku na od zadané verze a výše.



  1. Jak převést sadu replik MongoDB na samostatný server

  2. Top 5 výhod sdíleného hostingu MongoDB

  3. Jak zadat objednávku nebo řazení pomocí ovladače C# pro MongoDB?

  4. Sestavení indexu MongoDB – Zabránění uživatelům ve spouštění nových sestavení