sql >> Databáze >  >> NoSQL >> Redis

Flask by example – Implementace Redis Task Queue

Tato část výukového programu podrobně popisuje, jak implementovat frontu úloh Redis pro zpracování textu.

Aktualizace:

  • 02/12/2020:Upgradováno na Python verze 3.8.1 a také na nejnovější verze Redis, Python Redis a RQ. Podrobnosti viz níže. Zmiňte chybu v nejnovější verzi RQ a poskytněte řešení. Vyřešena chyba http před https.
  • 03/22/2016:Upgradováno na Python verze 3.5.1 a také na nejnovější verze Redis, Python Redis a RQ. Podrobnosti naleznete níže.
  • 22. 2. 2015:Přidána podpora Pythonu 3.

Box zdarma: Kliknutím sem získáte přístup k bezplatnému videonávodu Flask + Python, který vám krok za krokem ukáže, jak vytvořit webovou aplikaci Flask.

Pamatujte:Zde je to, co vytváříme – aplikaci Flask, která vypočítává dvojice slov a frekvence na základě textu z dané adresy URL.

  1. Část první:Nastavte místní vývojové prostředí a poté nasaďte pracovní i produkční prostředí na Heroku.
  2. Část druhá:Nastavení databáze PostgreSQL spolu s SQLAlchemy a Alembic pro zpracování migrací.
  3. Část třetí:Přidejte back-endovou logiku pro seškrabování a následné zpracování počtu slov z webové stránky pomocí knihoven požadavků, BeautifulSoup a Natural Language Toolkit (NLTK).
  4. Čtvrtá část:Implementace fronty úloh Redis pro zpracování textu. (aktuální )
  5. Část pátá:Nastavte Angular na front-endu, aby se průběžně dotazoval na back-endu, abyste zjistili, zda je zpracování dokončeno.
  6. Část šestá:Pusťte se na pracovní server na Heroku – nastavení Redis a podrobný popis, jak spustit dva procesy (webový a pracovní) na jednom Dyno.
  7. Část sedmá:Aktualizujte front-end, aby byl uživatelsky přívětivější.
  8. Část osmá:Vytvořte vlastní úhlovou směrnici pro zobrazení grafu rozdělení frekvence pomocí JavaScriptu a D3.

Potřebujete kód? Získejte to z úložiště.


Požadavky na instalaci

Použité nástroje:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) – jednoduchá knihovna pro vytváření fronty úloh

Začněte stažením a instalací Redis buď z oficiální stránky, nebo přes Homebrew (brew install redis ). Po instalaci spusťte server Redis:

$ redis-server

Dále nainstalujte Python Redis a RQ v novém okně terminálu:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Nastavit Worker

Začněme vytvořením pracovního procesu, který bude naslouchat úlohám ve frontě. Vytvořte nový soubor worker.py a přidejte tento kód:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Zde jsme čekali na frontu nazvanou default a navázali připojení k serveru Redis na localhost:6379 .

Spusťte to v jiném okně terminálu:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Nyní musíme aktualizovat naši app.py k odeslání úloh do fronty…



Aktualizujte app.py

Přidejte následující importy do app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Poté aktualizujte sekci konfigurace:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) nastavit připojení Redis a inicializovat frontu na základě tohoto připojení.

Přesuňte funkci zpracování textu z naší cesty indexu do nové funkce nazvané count_and_save_words() . Tato funkce přijímá jeden argument, adresu URL, kterou jí předáme, když ji zavoláme z naší indexové trasy.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Poznamenejte si následující kód:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Poznámka: Potřebujeme importovat count_and_save_words funkce v naší funkci index protože balíček RQ má aktuálně chybu, kde nenajde funkce ve stejném modulu.

Zde jsme použili frontu, kterou jsme dříve inicializovali a nazvali enqueue_call() funkce. Tím byla do fronty přidána nová úloha a tato úloha spustila count_and_save_words() funkce s URL jako argumentem. result_ttl=5000 argument line říká RQ, jak dlouho se má udržet výsledek úlohy po dobu - 5 000 sekund, v tomto případě. Poté jsme odeslali ID úlohy do terminálu. Toto ID je potřeba, abyste viděli, zda je zpracování úlohy dokončeno.

Pojďme pro to nastavit novou trasu…



Získat výsledky

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Pojďme to otestovat.

Spusťte server, přejděte na http://localhost:5000/, použijte adresu URL https://realpython.com a z terminálu získejte ID úlohy. Poté toto ID použijte v koncovém bodu „/results/“ – tj. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Pokud před kontrolou stavu uplyne méně než 5 000 sekund, měli byste vidět identifikační číslo, které se vygeneruje, když přidáme výsledky do databáze:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Nyní mírně upravíme trasu, abychom vrátili skutečné výsledky z databáze v JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Nezapomeňte přidat import:

from flask import jsonify

Vyzkoušejte to znovu. Pokud vše proběhlo v pořádku, měli byste ve svém prohlížeči vidět něco podobného:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Co bude dál?

Box zdarma: Kliknutím sem získáte přístup k bezplatnému videonávodu Flask + Python, který vám krok za krokem ukáže, jak vytvořit webovou aplikaci Flask.

V části 5 spojíme klienta a server tak, že do mixu přidáme Angular a vytvoříme poler, který každých pět sekund odešle požadavek na /results/<job_key> koncový bod žádá o aktualizace. Jakmile budou data k dispozici, přidáme je do DOM.

Na zdraví!

Toto je spolupráce mezi Cam Linkem, spoluzakladatelem Startup Edmonton, a lidmi z Real Python



  1. Jak používat řazené seznamy Redis

  2. Mongodb:Nepodařilo se připojit k serveru při prvním připojení

  3. Seřazená sada pevné velikosti v Redis?

  4. Jak implementovat server push v rámci Flask?