sql >> Databáze >  >> NoSQL >> MongoDB

Opakování výsledků pomocí externího volání API a findOneAndUpdate

Základní věc, kterou opravdu postrádáte, je, že metody rozhraní Mongoose API také používají "Sliby" , ale zdá se, že pouze kopírujete z dokumentace nebo starých příkladů pomocí zpětných volání. Řešením je převést na používání pouze Promises.

Práce s Promises

Model.find({},{ _id: 1, tweet: 1}).then(tweets => 
  Promise.all(
    tweets.map(({ _id, tweet }) => 
      api.petition(tweet).then(result =>   
       TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
         .then( updated => { console.log(updated); return updated })
      )
    )
  )
)
.then( updatedDocs => {
  // do something with array of updated documents
})
.catch(e => console.error(e))

Kromě obecné konverze ze zpětných volání je hlavní změnou použití Promise.all() vyřešit výstup z Array.map() zpracovává se na základě výsledků z .find() místo for smyčka. To je vlastně jeden z největších problémů vašeho pokusu, protože for nemůže ve skutečnosti ovládat, kdy se asynchronní funkce vyřeší. Dalším problémem je „směšování zpětných volání“, ale to je to, co zde obecně řešíme pouze pomocí Promises.

V rámci Array.map() vrátíme Promise z volání API, zřetězeného na findOneAndUpdate() což je vlastně aktualizace dokumentu. Používáme také new: true skutečně vrátit upravený dokument.

Promise.all() umožňuje "pole slibů" vyřešit a vrátit řadu výsledků. Ty vidíte jako updatedDocs . Další výhodou je, že vnitřní metody budou spouštět „paralelně“ a ne sériově. To obvykle znamená rychlejší rozlišení, i když to vyžaduje několik dalších zdrojů.

Všimněte si také, že používáme "projekci" { _id: 1, tweet: 1 } vrátit pouze tato dvě pole z Model.find() výsledek, protože ty jsou jediné použité ve zbývajících hovorech. Tím se ušetří vrácení celého dokumentu pro každý výsledek, když nepoužijete ostatní hodnoty.

Můžete jednoduše vrátit Promise z findOneAndUpdate() , ale právě přidávám console.log() takže můžete vidět, že výstup v tomto bodě spouští.

Běžné produkční použití by se bez něj mělo obejít:

Model.find({},{ _id: 1, tweet: 1}).then(tweets => 
  Promise.all(
    tweets.map(({ _id, tweet }) => 
      api.petition(tweet).then(result =>   
       TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
      )
    )
  )
)
.then( updatedDocs => {
  // do something with array of updated documents
})
.catch(e => console.error(e))

Další "vychytávkou" by mohlo být použití "bluebird" implementace Promise.map() , který oba kombinuje společný Array.map() na Promise (s) implementace se schopností řídit „souběh“ probíhajících paralelních volání:

const Promise = require("bluebird");

Model.find({},{ _id: 1, tweet: 1}).then(tweets => 
  Promise.map(tweets, ({ _id, tweet }) => 
    api.petition(tweet).then(result =>   
      TweetModel.findOneAndUpdate({ _id }, { result }, { new: true })
    ),
    { concurrency: 5 }
  )
)
.then( updatedDocs => {
  // do something with array of updated documents
})
.catch(e => console.error(e))

Alternativa k "paralelnímu" by se spouštěla ​​v pořadí. To může být zváženo, pokud příliš mnoho výsledků způsobuje příliš mnoho volání API a volání zpět do databáze:

Model.find({},{ _id: 1, tweet: 1}).then(tweets => {
  let updatedDocs = [];
  return tweets.reduce((o,{ _id, tweet }) => 
    o.then(() => api.petition(tweet))
      .then(result => TweetModel.findByIdAndUpdate(_id, { result }, { new: true })
      .then(updated => updatedDocs.push(updated))
    ,Promise.resolve()
  ).then(() => updatedDocs);
})
.then( updatedDocs => {
  // do something with array of updated documents
})
.catch(e => console.error(e))

Tam můžeme použít Array.reduce() „spojit“ sliby dohromady, což jim umožní řešit postupně. Všimněte si, že pole výsledků je zachováno v rozsahu a nahrazeno konečným .then() připojené na konec spojeného řetězce, protože takovou techniku ​​potřebujete ke „sbírání“ výsledků ze slibů, které se řeší v různých bodech tohoto „řetězce“.

Asynchronní/Čeká

V moderních prostředích od NodeJS V8.x, což je ve skutečnosti aktuální vydání LTS a již nějakou dobu existuje, máte ve skutečnosti podporu pro async/await . To vám umožní přirozeněji psát tok

try {
  let tweets = await Model.find({},{ _id: 1, tweet: 1});

  let updatedDocs = await Promise.all(
    tweets.map(({ _id, tweet }) => 
      api.petition(tweet).then(result =>   
        TweetModel.findByIdAndUpdate(_id, { result }, { new: true })
      )
    )
  );

  // Do something with results
} catch(e) {
  console.error(e);
}

Nebo dokonce možná zpracujte postupně, pokud jsou zdroje problémem:

try {
  let cursor = Model.collection.find().project({ _id: 1, tweet: 1 });

  while ( await cursor.hasNext() ) {
    let { _id, tweet } = await cursor.next();
    let result = await api.petition(tweet);
    let updated = await TweetModel.findByIdAndUpdate(_id, { result },{ new: true });
    // do something with updated document
  }

} catch(e) {
  console.error(e)
}

Upozorňujeme také, že findByIdAndUpdate() lze také použít jako shodu s _id je již implikováno, takže jako první argument nepotřebujete celý dokument dotazu.

BulkWrite

Jako poslední poznámku, pokud vlastně nepotřebujete aktualizované dokumenty jako odpověď vůbec, pak bulkWrite() je lepší volba a umožňuje zápisům obecně zpracovat na serveru v jediném požadavku:

Model.find({},{ _id: 1, tweet: 1}).then(tweets => 
  Promise.all(
    tweets.map(({ _id, tweet }) => api.petition(tweet).then(result => ({ _id, result }))
  )
).then( results =>
  Tweetmodel.bulkWrite(
    results.map(({ _id, result }) => 
      ({ updateOne: { filter: { _id }, update: { $set: { result } } } })
    )
  )
)
.catch(e => console.error(e))

Nebo přes async/await syntaxe:

try {
  let tweets = await Model.find({},{ _id: 1, tweet: 1});

  let writeResult = await Tweetmodel.bulkWrite(
    (await Promise.all(
      tweets.map(({ _id, tweet }) => api.petition(tweet).then(result => ({ _id, result }))
    )).map(({ _id, result }) =>
      ({ updateOne: { filter: { _id }, update: { $set: { result } } } })
    )
  );
} catch(e) {
  console.error(e);
}

V podstatě všechny výše uvedené kombinace se dají upravit jako bulkWrite() metoda bere "pole" instrukcí, takže toto pole můžete sestavit ze zpracovaných volání API ze všech výše uvedených metod.




  1. mongoDB, nelze číst z konfiguračního souboru -- config v jiné složce / Odinstalovat?

  2. Udržování otevřeného připojení Redis pomocí BookSleeve

  3. Nejlepší middleware pro ukládání relací pro Express + MongoDB

  4. Složené indexy pro dotaz OR + řazení v mongodb