Věřím, že TypeError
pochází z multiprocessing
's get
.
Z vašeho skriptu jsem odstranil veškerý kód DB. Podívejte se na toto:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Pomocí r.wait
vrátí očekávaný výsledek, ale pomocí r.get
vyvolá TypeError
. Jak je popsáno v dokumentech pythonu
, použijte r.wait
po map_async
.
Upravit :Musím upravit svou předchozí odpověď. Nyní věřím TypeError
pochází z SQLAlchemy. Upravil jsem svůj skript, aby chybu reprodukoval.
Úprava 2 :Zdá se, že problém je v multiprocessing.pool
nehraje dobře, pokud jakýkoli pracovník vyvolá výjimku, jejíž konstruktor vyžaduje parametr (viz také zde
).
Upravil jsem svůj skript, abych to zdůraznil.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Ve vašem případě, vzhledem k tomu, že váš kód vyvolává výjimku SQLAlchemy, jediné řešení, které mě napadá, je zachytit všechny výjimky v do
a znovu vyvolejte normální Exception
namísto. Něco takového:
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Úprava 3 :takže se zdá, že jde o chybu v Pythonu , ale správné výjimky v SQLAlchemy by to mohly obejít:proto jsem nastolil problém s SQLAlchemy .
Jako řešení problému považuji řešení na konci Úpravy 2 by to udělalo (zabalení zpětných volání do pokusu s výjimkou a opětovného navýšení).