sql >> Databáze >  >> RDS >> PostgreSQL

Django ORM uniká připojení při použití ThreadPoolExecutor

Můj odhad je, že ThreadPoolExecutor není to, co vytváří připojení DB, ale úlohy s vlákny jsou ty, které drží připojení. Už jsem se s tím musel vypořádat.

Nakonec jsem vytvořil tento obal, abych zajistil, že vlákna budou uzavřena ručně, kdykoli jsou úlohy prováděny v ThreadPoolExecutor. To by mělo být užitečné při zajišťování toho, aby spojení neunikly, zatím jsem při používání tohoto kódu žádný únik nezaznamenal.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Pak můžete použít toto:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...a buďte si jisti, že se spojení uzavřou.




  1. Pomocí indexu, pomocí dočasných, pomocí řazení souborů - jak to opravit?

  2. sqldeveloper vypíše správný výsledek pro count(*), zatímco sqlplus vygeneruje nulu

  3. Volání členské funkce addEagerConstraints() na float LARAVEL

  4. PHP:Iterujte přes více polí a sestavte SQL INSERT dotaz