Můžete to udělat pomocí fast-csv získáním headers
z definice schématu, která vrátí analyzované řádky jako "objekty". Ve skutečnosti máte nějaké neshody, takže jsem je označil opravami:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Pokud schéma skutečně odpovídá poskytnutému CSV, je to v pořádku. Toto jsou opravy, které vidím, ale pokud potřebujete skutečné názvy polí zarovnat jinak, musíte je upravit. Ale v podstatě tam bylo Number
na pozici, kde je String
a v podstatě další pole, o kterém předpokládám, že je prázdné v CSV.
Obecnou věcí je získání pole názvů polí ze schématu a předání je do možností při vytváření instance analyzátoru csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Jakmile to skutečně uděláte, získáte zpět "Objekt" místo pole:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Nedělejte si starosti s "typy", protože Mongoose přetypuje hodnoty podle schématu.
Zbytek se děje v obslužné rutině pro data
událost. Pro maximální efektivitu používáme insertMany()
zapisovat do databáze pouze jednou za 10 000 řádků. Jak to ve skutečnosti jde na server a procesy, závisí na verzi MongoDB, ale 10 000 by mělo být docela rozumných na základě průměrného počtu polí, která byste importovali pro jednu kolekci, pokud jde o „kompenzaci“ za využití paměti a zápis rozumný požadavek sítě. V případě potřeby číslo zmenšete.
Důležité je označit tato volání jako async
funkce a await
výsledek insertMany()
před pokračováním. Také potřebujeme pause()
stream a resume()
u každé položky jinak riskujeme přepsání buffer
dokumentů, které je třeba vložit před jejich skutečným odesláním. pause()
a resume()
je nutné vytvořit "protitlak" na potrubí, jinak položky jen "vycházejí" a střílejí data
událost.
Kontrola 10 000 záznamů přirozeně vyžaduje, abychom to zkontrolovali jak při každé iteraci, tak při dokončení streamu, abychom vyprázdnili vyrovnávací paměť a odeslali všechny zbývající dokumenty na server.
To je skutečně to, co chcete udělat, protože určitě nechcete spustit asynchronní požadavek na server při „každé“ iteraci přes data
událost nebo v podstatě bez čekání na dokončení každého požadavku. U „velmi malých souborů“ vám to projde tím, že to nebudete kontrolovat, ale u jakéhokoli reálného zatížení určitě překročíte zásobník hovorů kvůli „za letu“ asynchronním hovorům, která ještě nebyla dokončena.
Pro informaci – package.json
použitý. mz
je volitelný, protože se jedná pouze o modernizovaný Promise
povolená knihovna standardních uzlových "vestavěných" knihoven, na které jsem prostě zvyklý. Kód je samozřejmě zcela zaměnitelný s fs
modul.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Ve skutečnosti s Node v8.9.x a vyšší pak to můžeme ještě mnohem zjednodušit implementací AsyncIterator
prostřednictvím stream-to-iterator
modul. Stále je v Iterator<Promise<T>>
režimu, ale mělo by to fungovat, dokud se Node v10.x nestane stabilním LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
V zásadě je veškeré zpracování a pozastavení a obnovení streamu nahrazeno jednoduchým for
smyčka:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Snadný! To se vyčistí v pozdější implementaci uzlu pomocí for..await..of
až bude stabilnější. Ale výše uvedené běží v pořádku na od zadané verze a výše.