Závažné upozornění Nikdy předtím jsem tuto knihovnu nepoužil a moje znalost některých konceptů na nízké úrovni trochu... chybí. Většinou si pročítám tutoriál. Jsem si docela jistý, že každý, kdo dělal asynchronní práci, si to přečte a zasměje se, ale pro ostatní to může být užitečný výchozí bod. Varujte, emptor!
Začněme něčím trochu jednodušším a demonstrujeme si, jak Stream
funguje. Můžeme převést iterátor Result
s do streamu:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
To nám ukazuje jeden způsob, jak stream konzumovat. Používáme and_then
udělat něco s každým nákladem (zde ho jen vytisknout) a poté for_each
převést Stream
zpět do Future
. Budoucnost pak můžeme spustit voláním podivně pojmenovaného forget
metoda.
Dále je třeba spojit knihovnu Redis do mixu a zpracovat pouze jednu zprávu. Od get_message()
metoda blokuje, musíme do mixu zavést nějaká vlákna. V tomto typu asynchronního systému není dobrý nápad provádět velké množství práce, protože vše ostatní bude zablokováno. Například:
Pokud není zařízeno jinak, mělo by být zajištěno, že implementace této funkce skončí velmi rychle .
V ideálním světě by byla redis crate postavena na vrcholu knihovny, jako je budoucnost, a toto vše by nativně odhalila.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Mé chápání je zde nejasnější. V samostatném vláknu zprávu zablokujeme a jakmile ji obdržíme, vložíme ji do kanálu. Nerozumím tomu, proč se musíme držet rukojeti vlákna. Očekával bych, že foo.forget
by se sám zablokoval a čekal, dokud nebude stream prázdný.
V připojení telnet k serveru Redis odešlete toto:
publish rust awesome
A uvidíte, že to funguje. Přidání tiskových příkazů ukazuje, že (pro mě) je foo.forget
příkaz je spuštěn před vytvořením vlákna.
Více zpráv je složitější. Sender
spotřebovává se, aby zabránil generující straně dostat se příliš daleko před konzumní stranu. Toho je dosaženo vrácením další budoucnosti z send
! Potřebujeme ho odtamtud přesunout zpět, abychom ho mohli znovu použít pro další opakování cyklu:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Jsem si jistý, že v průběhu času bude existovat více ekosystémů pro tento typ vzájemné spolupráce. Například bedna futures-cpupool by mohla pravděpodobně být rozšířen tak, aby podporoval podobný případ použití.