Tato část výukového programu podrobně popisuje, jak implementovat frontu úloh Redis pro zpracování textu.
Aktualizace:
- 02/12/2020:Upgradováno na Python verze 3.8.1 a také na nejnovější verze Redis, Python Redis a RQ. Podrobnosti viz níže. Zmiňte chybu v nejnovější verzi RQ a poskytněte řešení. Vyřešena chyba http před https.
- 03/22/2016:Upgradováno na Python verze 3.5.1 a také na nejnovější verze Redis, Python Redis a RQ. Podrobnosti naleznete níže.
- 22. 2. 2015:Přidána podpora Pythonu 3.
Box zdarma: Kliknutím sem získáte přístup k bezplatnému videonávodu Flask + Python, který vám krok za krokem ukáže, jak vytvořit webovou aplikaci Flask.
Pamatujte:Zde je to, co vytváříme – aplikaci Flask, která vypočítává dvojice slov a frekvence na základě textu z dané adresy URL.
- Část první:Nastavte místní vývojové prostředí a poté nasaďte pracovní i produkční prostředí na Heroku.
- Část druhá:Nastavení databáze PostgreSQL spolu s SQLAlchemy a Alembic pro zpracování migrací.
- Část třetí:Přidejte back-endovou logiku pro seškrabování a následné zpracování počtu slov z webové stránky pomocí knihoven požadavků, BeautifulSoup a Natural Language Toolkit (NLTK).
- Čtvrtá část:Implementace fronty úloh Redis pro zpracování textu. (aktuální )
- Část pátá:Nastavte Angular na front-endu, aby se průběžně dotazoval na back-endu, abyste zjistili, zda je zpracování dokončeno.
- Část šestá:Pusťte se na pracovní server na Heroku – nastavení Redis a podrobný popis, jak spustit dva procesy (webový a pracovní) na jednom Dyno.
- Část sedmá:Aktualizujte front-end, aby byl uživatelsky přívětivější.
- Část osmá:Vytvořte vlastní úhlovou směrnici pro zobrazení grafu rozdělení frekvence pomocí JavaScriptu a D3.
Potřebujete kód? Získejte to z úložiště.
Požadavky na instalaci
Použité nástroje:
- Redis (5.0.7)
- Python Redis (3.4.1)
- RQ (1.2.2) – jednoduchá knihovna pro vytváření fronty úloh
Začněte stažením a instalací Redis buď z oficiální stránky, nebo přes Homebrew (brew install redis
). Po instalaci spusťte server Redis:
$ redis-server
Dále nainstalujte Python Redis a RQ v novém okně terminálu:
$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt
Nastavit Worker
Začněme vytvořením pracovního procesu, který bude naslouchat úlohám ve frontě. Vytvořte nový soubor worker.py a přidejte tento kód:
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Zde jsme čekali na frontu nazvanou default
a navázali připojení k serveru Redis na localhost:6379
.
Spusťte to v jiném okně terminálu:
$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...
Nyní musíme aktualizovat naši app.py k odeslání úloh do fronty…
Aktualizujte app.py
Přidejte následující importy do app.py :
from rq import Queue
from rq.job import Job
from worker import conn
Poté aktualizujte sekci konfigurace:
app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
q = Queue(connection=conn)
from models import *
q = Queue(connection=conn)
nastavit připojení Redis a inicializovat frontu na základě tohoto připojení.
Přesuňte funkci zpracování textu z naší cesty indexu do nové funkce nazvané count_and_save_words()
. Tato funkce přijímá jeden argument, adresu URL, kterou jí předáme, když ji zavoláme z naší indexové trasy.
def count_and_save_words(url):
errors = []
try:
r = requests.get(url)
except:
errors.append(
"Unable to get URL. Please make sure it's valid and try again."
)
return {"error": errors}
# text processing
raw = BeautifulSoup(r.text).get_text()
nltk.data.path.append('./nltk_data/') # set the path
tokens = nltk.word_tokenize(raw)
text = nltk.Text(tokens)
# remove punctuation, count raw words
nonPunct = re.compile('.*[A-Za-z].*')
raw_words = [w for w in text if nonPunct.match(w)]
raw_word_count = Counter(raw_words)
# stop words
no_stop_words = [w for w in raw_words if w.lower() not in stops]
no_stop_words_count = Counter(no_stop_words)
# save the results
try:
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
except:
errors.append("Unable to add item to database.")
return {"error": errors}
@app.route('/', methods=['GET', 'POST'])
def index():
results = {}
if request.method == "POST":
# this import solves a rq bug which currently exists
from app import count_and_save_words
# get url that the person has entered
url = request.form['url']
if not url[:8].startswith(('https://', 'http://')):
url = 'http://' + url
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
return render_template('index.html', results=results)
Poznamenejte si následující kód:
job = q.enqueue_call(
func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())
Poznámka: Potřebujeme importovat
count_and_save_words
funkce v naší funkciindex
protože balíček RQ má aktuálně chybu, kde nenajde funkce ve stejném modulu.
Zde jsme použili frontu, kterou jsme dříve inicializovali a nazvali enqueue_call()
funkce. Tím byla do fronty přidána nová úloha a tato úloha spustila count_and_save_words()
funkce s URL jako argumentem. result_ttl=5000
argument line říká RQ, jak dlouho se má udržet výsledek úlohy po dobu - 5 000 sekund, v tomto případě. Poté jsme odeslali ID úlohy do terminálu. Toto ID je potřeba, abyste viděli, zda je zpracování úlohy dokončeno.
Pojďme pro to nastavit novou trasu…
Získat výsledky
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
return str(job.result), 200
else:
return "Nay!", 202
Pojďme to otestovat.
Spusťte server, přejděte na http://localhost:5000/, použijte adresu URL https://realpython.com a z terminálu získejte ID úlohy. Poté toto ID použijte v koncovém bodu „/results/“ – tj. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.
Pokud před kontrolou stavu uplyne méně než 5 000 sekund, měli byste vidět identifikační číslo, které se vygeneruje, když přidáme výsledky do databáze:
# save the results
try:
from models import Result
result = Result(
url=url,
result_all=raw_word_count,
result_no_stop_words=no_stop_words_count
)
db.session.add(result)
db.session.commit()
return result.id
Nyní mírně upravíme trasu, abychom vrátili skutečné výsledky z databáze v JSON:
@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):
job = Job.fetch(job_key, connection=conn)
if job.is_finished:
result = Result.query.filter_by(id=job.result).first()
results = sorted(
result.result_no_stop_words.items(),
key=operator.itemgetter(1),
reverse=True
)[:10]
return jsonify(results)
else:
return "Nay!", 202
Nezapomeňte přidat import:
from flask import jsonify
Vyzkoušejte to znovu. Pokud vše proběhlo v pořádku, měli byste ve svém prohlížeči vidět něco podobného:
[
[
"Python",
315
],
[
"intermediate",
167
],
[
"python",
161
],
[
"basics",
118
],
[
"web-dev",
108
],
[
"data-science",
51
],
[
"best-practices",
49
],
[
"advanced",
45
],
[
"django",
43
],
[
"flask",
41
]
]
Co bude dál?
Box zdarma: Kliknutím sem získáte přístup k bezplatnému videonávodu Flask + Python, který vám krok za krokem ukáže, jak vytvořit webovou aplikaci Flask.
V části 5 spojíme klienta a server tak, že do mixu přidáme Angular a vytvoříme poler, který každých pět sekund odešle požadavek na /results/<job_key>
koncový bod žádá o aktualizace. Jakmile budou data k dispozici, přidáme je do DOM.
Na zdraví!
Toto je spolupráce mezi Cam Linkem, spoluzakladatelem Startup Edmonton, a lidmi z Real Python