From: Mike O'Connor Date: Fri, 30 Oct 2009 10:54:11 +0000 (+0100) Subject: importing old changes files into knwon_changes with a separeate script, now allowing... X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=89304ce4ded069d3e818dcf427e8fc867cb8ab13;p=dak importing old changes files into knwon_changes with a separeate script, now allowing some fields to be 'missing' --- diff --git a/dak/dak.py b/dak/dak.py index f9839ea0..e424836f 100755 --- a/dak/dak.py +++ b/dak/dak.py @@ -134,6 +134,8 @@ def init(): "Generate statistics"), ("bts-categorize", "Categorize uncategorized bugs filed against ftp.debian.org"), + ("import-known-changes", + "import old changes files into known_changes table"), ("add-user", "Add a user to the archive"), ] diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py new file mode 100755 index 00000000..d95ad2c5 --- /dev/null +++ b/dak/import_known_changes.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# coding=utf8 + +""" +Import known_changes files + +@contact: Debian FTP Master +@copyright: 2009 Mike O'Connor +@license: GNU General Public License version 2 or later +""" + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +################################################################################ + + +################################################################################ + +import sys +import os +import logging +import threading +from daklib.dbconn import DBConn,get_knownchange + +from daklib.config import Config + +# where in dak.conf all of our configuration will be stowed +options_prefix = "KnownChanges" +options_prefix = "%s::Options" % options_prefix + +log = logging.getLogger() + +################################################################################ + + +def usage (exit_code=0): + print """Usage: dak import-known-changes [options] + +OPTIONS + -j n + run with n threads concurrently + + -v, --verbose + show verbose information messages + + -q, --quiet + supress all output but errors + +""" + sys.exit(exit_code) + +def check_signature (sig_filename, data_filename=""): + keyrings = [ + "/home/joerg/keyring/keyrings/debian-keyring.gpg", + "/home/joerg/keyring/keyrings/debian-keyring.pgp", + "/home/joerg/keyring/keyrings/debian-maintainers.gpg", + "/home/joerg/keyring/keyrings/debian-role-keys.gpg", + "/home/joerg/keyring/keyrings/emeritus-keyring.pgp", + "/home/joerg/keyring/keyrings/emeritus-keyring.gpg", + "/home/joerg/keyring/keyrings/removed-keys.gpg", + "/home/joerg/keyring/keyrings/removed-keys.pgp" + ] + + keyringargs = " ".join(["--keyring %s" % x for x in keyrings ]) + + # Build the command line + status_read, status_write = os.pipe() + cmd = "gpgv --status-fd %s %s %s" % (status_write, keyringargs, sig_filename) + + # Invoke gpgv on the file + (output, status, exit_status) = gpgv_get_status_output(cmd, status_read, status_write) + + # Process the status-fd output + (keywords, internal_error) = process_gpgv_output(status) + + # If we failed to parse the status-fd output, let's just whine and bail now + if internal_error: + warn("Couldn't parse signature") + return None + + # usually one would check for bad things here. We, however, do not care. + + # Next check gpgv exited with a zero return code + if exit_status: + warn("Couldn't parse signature") + return None + + # Sanity check the good stuff we expect + if not keywords.has_key("VALIDSIG"): + warn("Couldn't parse signature") + else: + args = keywords["VALIDSIG"] + if len(args) < 1: + warn("Couldn't parse signature") + else: + fingerprint = args[0] + + return fingerprint + + +class EndOfChanges(object): + """something enqueued to signify the last change""" + pass + + +class OneAtATime(object): + """ + a one space queue which sits between multiple possible producers + and multiple possible consumers + """ + def __init__(self): + self.next_in_line = None + self.next_lock = threading.Condition() + + def enqueue(self, next): + self.next_lock.acquire() + while self.next_in_line: + self.next_lock.wait() + + assert( not self.next_in_line ) + self.next_in_line = next + self.next_lock.notify() + self.next_lock.release() + + def dequeue(self): + self.next_lock.acquire() + while not self.next_in_line: + self.next_lock.wait() + result = self.next_in_line + + if isinstance(next, EndOfChanges): + return None + + self.next_in_line = None + self.next_lock.notify() + self.next_lock.release() + return result + +class ChangesToImport(object): + """A changes file to be enqueued to be processed""" + def __init__(self, queue, checkdir, changesfile, count): + self.queue = queue + self.checkdir = checkdir + self.changesfile = changesfile + self.count = count + +class ChangesGenerator(threading.Thread): + """enqueues changes files to be imported""" + def __init__(self, queue): + self.queue = queue + self.session = DBConn().session() + + def run(self): + cnf = Config() + for directory in [ "Accepted", "Byhand", "Done", "New", "ProposedUpdates", "OldProposedUpdates" ]: + checkdir = cnf["Dir::Queue::%s" % (directory) ] + if os.path.exists(checkdir): + print "Looking into %s" % (checkdir) + + for dirpath, dirnames, filenames in os.walk(checkdir, topdown=False): + if not filenames: + # Empty directory (or only subdirectories), next + continue + for changesfile in filenames: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue + count += 1 + + if not get_knownchange(session): + self.queue.enqueue(ChangesToImport(directory, checkdir, changesfile, count)) + + self.queue.enqueue(EndOfChanges()) + +class ImportThread(threading.Thread): + def __init__(self, queue): + self.queue = queue + self.session = DBConn().session() + + def run(self): + while True: + try: + to_import = queue.dequeue() + if not to_import: + return + + print( "Directory %s, file %7d, failures %3d. (%s)" % (to_import.dirpath[-10:], to_import.count, failure, to_import.changesfile) ) + + changes = Changes() + changes.changes_file = to_import.changesfile + changesfile = os.path.join(to_import.dirpath, to_import.changesfile) + changes.changes = parse_changes(changesfile, signing_rules=-1) + changes.changes["fingerprint"] = check_signature(changesfile) + changes.add_known_changes(to_import.queue, self.session) + self.session.commit() + + except InvalidDscError, line: + warn("syntax error in .dsc file '%s', line %s." % (f, line)) + failure += 1 + + except ChangesUnicodeError: + warn("found invalid changes file, not properly utf-8 encoded") + failure += 1 + + print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) + + + +def main(): + cnf = Config() + + arguments = [('h',"help", "%s::%s" % (options_prefix,"Help")), + ('j',"concurrency", "%s::%s" % (options_prefix,"Concurrency"),"HasArg"), + ('q',"quiet", "%s::%s" % (options_prefix,"Quiet")), + ('v',"verbose", "%s::%s" % (options_prefix,"Verbose")), + ] + + args = apt_pkg.ParseCommandLine(cnf.Cnf, arguments,sys.argv) + + num_threads = 1 + + if (len(args) < 1) or not commands.has_key(args[0]): + usage() + + if cnf.has_key("%s::%s" % (options_prefix,"Help")): + usage() + + level=logging.INFO + if cnf.has_key("%s::%s" % (options_prefix,"Quiet")): + level=logging.ERROR + + elif cnf.has_key("%s::%s" % (options_prefix,"Verbose")): + level=logging.DEBUG + + + logging.basicConfig( level=level, + format='%(asctime)s %(levelname)s %(message)s', + stream = sys.stderr ) + + if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): + num_threads = int(Config()[ "%s::%s" %(options_prefix,"Suite")]) + + + queue = OneAtATime() + ChangesGenerator(queue).start() + + for i in range(num_threads): + ImportThread(queue).start() + +def which_suites(session): + """ + return a list of suites to operate on + """ + if Config().has_key( "%s::%s" %(options_prefix,"Suite")): + suites = utils.split_args(Config()[ "%s::%s" %(options_prefix,"Suite")]) + else: + suites = Config().SubTree("Suite").List() + + return [get_suite(s.lower(), session) for s in suites] + + +if __name__ == '__main__': + main() diff --git a/daklib/changes.py b/daklib/changes.py index c5ac64a9..1bca6dc1 100755 --- a/daklib/changes.py +++ b/daklib/changes.py @@ -189,9 +189,16 @@ class Changes(object): session.commit() session.close() + + def mark_missing_fields(self): + """add "missing" in fields which we will require for the known_changes table""" + for key in ['urgency', 'maintainer', 'fingerprint', 'changedby' ]: + if (not self.changes.has_key(key)) or (not self.changes[key]): + self.changes[key]='missing' + def add_known_changes(self, queue, session=None): + """add "missing" in fields which we will require for the known_changes table""" cnf = Config() - if session is None: session = DBConn().session() privatetrans = True @@ -200,6 +207,8 @@ class Changes(object): changesfile = os.path.join(dirpath, self.changes_file) filetime = datetime.datetime.fromtimestamp(os.path.getctime(changesfile)) + self.mark_missing_fields() + session.execute( """INSERT INTO known_changes (changesname, seen, source, binaries, architecture, version,