sql >> Databáze >  >> RDS >> PostgreSQL

Zastavte se ve skriptu Python pomocí SQLAlchemy a multiprocessingu

Věřím, že TypeError pochází z multiprocessing 's get .

Z vašeho skriptu jsem odstranil veškerý kód DB. Podívejte se na toto:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Pomocí r.wait vrátí očekávaný výsledek, ale pomocí r.get vyvolá TypeError . Jak je popsáno v dokumentech pythonu , použijte r.wait po map_async .

Upravit :Musím upravit svou předchozí odpověď. Nyní věřím TypeError pochází z SQLAlchemy. Upravil jsem svůj skript, aby chybu reprodukoval.

Úprava 2 :Zdá se, že problém je v multiprocessing.pool nehraje dobře, pokud jakýkoli pracovník vyvolá výjimku, jejíž konstruktor vyžaduje parametr (viz také zde ).

Upravil jsem svůj skript, abych to zdůraznil.

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Ve vašem případě, vzhledem k tomu, že váš kód vyvolává výjimku SQLAlchemy, jediné řešení, které mě napadá, je zachytit všechny výjimky v do a znovu vyvolejte normální Exception namísto. Něco takového:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Úprava 3 :takže se zdá, že jde o chybu v Pythonu , ale správné výjimky v SQLAlchemy by to mohly obejít:proto jsem nastolil problém s SQLAlchemy .

Jako řešení problému považuji řešení na konci Úpravy 2 by to udělalo (zabalení zpětných volání do pokusu s výjimkou a opětovného navýšení).



  1. Čekat na několik připojení db před spuštěním expresního serveru?

  2. Spring + Hibernate:Využití mezipaměti plánu dotazů

  3. Zjistěte, jak dlouho trvá vytvoření připojení pomocí PHP mysqli_real_connect()

  4. SQL Natural Join