From: Torsten Werner Date: Tue, 22 Mar 2011 16:48:13 +0000 (+0000) Subject: Contents: replace multithreading by multiprocessing. X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8ba1b12293e55da930cc0be48c1616723ec59088;p=dak Contents: replace multithreading by multiprocessing. That avoids the global interpreter lock of CPython. Signed-off-by: Torsten Werner --- diff --git a/daklib/contents.py b/daklib/contents.py index bb815893..b56a9c6c 100755 --- a/daklib/contents.py +++ b/daklib/contents.py @@ -27,11 +27,8 @@ Helper code for contents generation. from daklib.dbconn import * from daklib.config import Config -from daklib.threadpool import ThreadPool -from multiprocessing import Pool -from sqlalchemy import desc, or_ -from sqlalchemy.exc import IntegrityError +from multiprocessing import Pool from subprocess import Popen, PIPE import os.path @@ -265,12 +262,12 @@ class ContentsScanner(object): ContentsScanner provides a threadsafe method scan() to scan the contents of a DBBinary object. ''' - def __init__(self, binary): + def __init__(self, binary_id): ''' - The argument binary is the actual DBBinary object that should be - scanned. + The argument binary_id is the id of the DBBinary object that + should be scanned. ''' - self.binary_id = binary.binary_id + self.binary_id = binary_id def scan(self, dummy_arg = None): ''' @@ -302,10 +299,24 @@ class ContentsScanner(object): if limit is not None: query = query.limit(limit) processed = query.count() - threadpool = ThreadPool() + pool = Pool() for binary in query.yield_per(100): - threadpool.queueTask(ContentsScanner(binary).scan) - threadpool.joinAll() + pool.apply_async(scan_helper, (binary.binary_id, )) + pool.close() + pool.join() remaining = remaining() session.close() return { 'processed': processed, 'remaining': remaining } + +reset = False + +def scan_helper(binary_id): + ''' + This function runs in a subprocess. + ''' + global reset + if not reset: + DBConn().reset() + reset = True + scanner = ContentsScanner(binary_id) + scanner.scan()