From: Ansgar Burchardt Date: Fri, 25 Mar 2011 15:15:42 +0000 (+0000) Subject: generate-filelist: Use multiprocessing X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0febf373c5ba12416e3eb77277feff4155cadcb0;p=dak generate-filelist: Use multiprocessing Signed-off-by: Ansgar Burchardt --- diff --git a/dak/generate_filelist.py b/dak/generate_filelist.py index 1f4d6654..2a6d218b 100755 --- a/dak/generate_filelist.py +++ b/dak/generate_filelist.py @@ -5,6 +5,7 @@ Generate file lists for apt-ftparchive. @contact: Debian FTP Master @copyright: 2009 Torsten Werner +@copyright: 2011 Ansgar Burchardt @license: GNU General Public License version 2 or later """ @@ -37,8 +38,8 @@ Generate file lists for apt-ftparchive. from daklib.dbconn import * from daklib.config import Config -from daklib.threadpool import ThreadPool -from daklib import utils +from daklib import utils, daklog +from multiprocessing import Pool import apt_pkg, os, stat, sys from daklib.lists import getSources, getBinaries, getArchAll @@ -64,37 +65,48 @@ def listPath(suite, component, architecture = None, type = None, file.truncate() return (file, timestamp) -def writeSourceList(args): - (suite, component, incremental_mode) = args +def writeSourceList(suite_id, component_id, incremental_mode): + session = DBConn().session() + suite = Suite.get(suite_id, session) + component = Component.get(component_id, session) (file, timestamp) = listPath(suite, component, incremental_mode = incremental_mode) - session = DBConn().session() + for _, filename in getSources(suite, component, session, timestamp): file.write(filename + '\n') session.close() file.close() + return "sources list for %s %s" % (suite.suite_name, component.component_name) -def writeAllList(args): - (suite, component, architecture, type, incremental_mode) = args +def writeAllList(suite_id, component_id, architecture_id, type, incremental_mode): + session = DBConn().session() + suite = Suite.get(suite_id, session) + component = Component.get(component_id, session) + architecture = Architecture.get(architecture_id, session) (file, timestamp) = listPath(suite, component, architecture, type, incremental_mode) - session = DBConn().session() + for _, filename in getArchAll(suite, component, architecture, type, session, timestamp): file.write(filename + '\n') session.close() file.close() + return "all list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type) -def writeBinaryList(args): - (suite, component, architecture, type, incremental_mode) = args +def writeBinaryList(suite_id, component_id, architecture_id, type, incremental_mode): + session = DBConn().session() + suite = Suite.get(suite_id, session) + component = Component.get(component_id, session) + architecture = Architecture.get(architecture_id, session) (file, timestamp) = listPath(suite, component, architecture, type, incremental_mode) - session = DBConn().session() + for _, filename in getBinaries(suite, component, architecture, type, session, timestamp): file.write(filename + '\n') session.close() file.close() + return "binary list for %s %s (arch=%s, type=%s)" % (suite.suite_name, component.component_name, architecture.arch_string, type) def usage(): print """Usage: dak generate_filelist [OPTIONS] @@ -114,6 +126,7 @@ Incremental mode appends only newer files to existing lists.""" def main(): cnf = Config() + Logger = daklog.Logger(cnf, 'generate-filelist') Arguments = [('h', "help", "Filelist::Options::Help"), ('s', "suite", "Filelist::Options::Suite", "HasArg"), ('c', "component", "Filelist::Options::Component", "HasArg"), @@ -140,36 +153,44 @@ def main(): Options = cnf.SubTree("Filelist::Options") if Options['Help']: usage() - threadpool = ThreadPool() + pool = Pool() query_suites = query_suites. \ filter(Suite.suite_name.in_(utils.split_args(Options['Suite']))) query_components = query_components. \ filter(Component.component_name.in_(utils.split_args(Options['Component']))) query_architectures = query_architectures. \ filter(Architecture.arch_string.in_(utils.split_args(Options['Architecture']))) + + def log(message): + Logger.log([message]) + for suite in query_suites: + suite_id = suite.suite_id for component in query_components: + component_id = component.component_id for architecture in query_architectures: + architecture_id = architecture.arch_id if architecture not in suite.architectures: pass elif architecture.arch_string == 'source': - threadpool.queueTask(writeSourceList, - (suite, component, Options['Incremental'])) + pool.apply_async(writeSourceList, + (suite_id, component_id, Options['Incremental']), callback=log) elif architecture.arch_string == 'all': - threadpool.queueTask(writeAllList, - (suite, component, architecture, 'deb', - Options['Incremental'])) - threadpool.queueTask(writeAllList, - (suite, component, architecture, 'udeb', - Options['Incremental'])) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=log) + pool.apply_async(writeAllList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=log) else: # arch any - threadpool.queueTask(writeBinaryList, - (suite, component, architecture, 'deb', - Options['Incremental'])) - threadpool.queueTask(writeBinaryList, - (suite, component, architecture, 'udeb', - Options['Incremental'])) - threadpool.joinAll() + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'deb', + Options['Incremental']), callback=log) + pool.apply_async(writeBinaryList, + (suite_id, component_id, architecture_id, 'udeb', + Options['Incremental']), callback=log) + pool.close() + pool.join() # this script doesn't change the database session.close()