Nejsem odborník na mongodb, ale na základě příkladů, které jsem viděl, je to vzor, který bych zkusil.
Vynechal jsem události jiné než data, protože se zdá, že hlavním problémem je omezení této události.
var cursor = db.collection('mycollection').find({});
const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait
const nextBatch = () => {
if(cursor.hasNext()) {
cursorNext.next('next');
}
});
cursorNext
.switchMap(() => // wait for cursorNext to signal
Rx.Observable.fromPromise(cursor.next()) // get a single doc
.repeat() // get another
.takeWhile(() => cursor.hasNext() ) // stop taking if out of data
.take(batchSize) // until full batch
.toArray() // combine into a single emit
)
.map(docsBatch => {
// do something with the batch
// return docsBatch or modified doscBatch
})
... // other operators?
.subscribe(x => {
...
nextBatch();
});
Snažím se dát dohromady test tohoto Rx toku bez mongodb, mezitím vám to může dát nějaké nápady.