]> err.no Git - dak/commitdiff
Contents: replace multithreading by multiprocessing.
authorTorsten Werner <twerner@debian.org>
Tue, 22 Mar 2011 16:48:13 +0000 (16:48 +0000)
committerTorsten Werner <twerner@debian.org>
Tue, 22 Mar 2011 16:52:51 +0000 (16:52 +0000)
That avoids the global interpreter lock of CPython.

Signed-off-by: Torsten Werner <twerner@debian.org>
daklib/contents.py

index bb81589394f20fb77068eb8a04fd071ed25b6aa4..b56a9c6ce322a5913c7d767b3b13ced8aae820ad 100755 (executable)
@@ -27,11 +27,8 @@ Helper code for contents generation.
 
 from daklib.dbconn import *
 from daklib.config import Config
-from daklib.threadpool import ThreadPool
-from multiprocessing import Pool
 
-from sqlalchemy import desc, or_
-from sqlalchemy.exc import IntegrityError
+from multiprocessing import Pool
 from subprocess import Popen, PIPE
 
 import os.path
@@ -265,12 +262,12 @@ class ContentsScanner(object):
     ContentsScanner provides a threadsafe method scan() to scan the contents of
     a DBBinary object.
     '''
-    def __init__(self, binary):
+    def __init__(self, binary_id):
         '''
-        The argument binary is the actual DBBinary object that should be
-        scanned.
+        The argument binary_id is the id of the DBBinary object that
+        should be scanned.
         '''
-        self.binary_id = binary.binary_id
+        self.binary_id = binary_id
 
     def scan(self, dummy_arg = None):
         '''
@@ -302,10 +299,24 @@ class ContentsScanner(object):
         if limit is not None:
             query = query.limit(limit)
         processed = query.count()
-        threadpool = ThreadPool()
+        pool = Pool()
         for binary in query.yield_per(100):
-            threadpool.queueTask(ContentsScanner(binary).scan)
-        threadpool.joinAll()
+            pool.apply_async(scan_helper, (binary.binary_id, ))
+        pool.close()
+        pool.join()
         remaining = remaining()
         session.close()
         return { 'processed': processed, 'remaining': remaining }
+
+reset = False
+
+def scan_helper(binary_id):
+    '''
+    This function runs in a subprocess.
+    '''
+    global reset
+    if not reset:
+        DBConn().reset()
+        reset = True
+    scanner = ContentsScanner(binary_id)
+    scanner.scan()