From: Mike O'Connor Date: Fri, 30 Oct 2009 13:36:52 +0000 (+0000) Subject: i was trying to interrupt this with the keyboards and failing X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1ab6105c5cdf9847bd949c14c8c2d6697670ed2c;p=dak i was trying to interrupt this with the keyboards and failing --- diff --git a/dak/import_known_changes.py b/dak/import_known_changes.py index b403a808..5aa9a580 100755 --- a/dak/import_known_changes.py +++ b/dak/import_known_changes.py @@ -66,6 +66,8 @@ OPTIONS sys.exit(exit_code) def check_signature (sig_filename, data_filename=""): + fingerprint = None + keyrings = [ "/home/joerg/keyring/keyrings/debian-keyring.gpg", "/home/joerg/keyring/keyrings/debian-keyring.pgp", @@ -127,10 +129,17 @@ class OneAtATime(object): def __init__(self): self.next_in_line = None self.next_lock = threading.Condition() + self.die = False + + def plsDie(self): + self.die = True + self.next_lock.notify() def enqueue(self, next): self.next_lock.acquire() while self.next_in_line: + if self.die: + return self.next_lock.wait() assert( not self.next_in_line ) @@ -141,15 +150,19 @@ class OneAtATime(object): def dequeue(self): self.next_lock.acquire() while not self.next_in_line: + if self.die: + return self.next_lock.wait() - result = self.next_in_line - if isinstance(result, EndOfChanges): - return None + result = self.next_in_line self.next_in_line = None self.next_lock.notify() self.next_lock.release() + + if isinstance(result, EndOfChanges): + return None + return result class ChangesToImport(object): @@ -164,10 +177,11 @@ class ChangesToImport(object): class ChangesGenerator(threading.Thread): """enqueues changes files to be imported""" - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent self.die = False def plsDie(self): @@ -185,26 +199,32 @@ class ChangesGenerator(threading.Thread): if not filenames: # Empty directory (or only subdirectories), next continue - if self.die: - return for changesfile in filenames: - if not changesfile.endswith(".changes"): - # Only interested in changes files. - continue - count += 1 - - if not get_knownchange(changesfile, self.session): - to_import = ChangesToImport(dirpath, changesfile, count) - self.queue.enqueue(to_import) + try: + if not changesfile.endswith(".changes"): + # Only interested in changes files. + continue + count += 1 + + if not get_knownchange(changesfile, self.session): + to_import = ChangesToImport(dirpath, changesfile, count) + if self.die: + return + self.queue.enqueue(to_import) + except KeyboardInterrupt: + print("got Ctrl-c in enqueue thread. terminating") + self.parent.plsDie() + sys.exit(1) self.queue.enqueue(EndOfChanges()) class ImportThread(threading.Thread): - def __init__(self, queue): + def __init__(self, parent, queue): threading.Thread.__init__(self) self.queue = queue self.session = DBConn().session() + self.parent = parent self.die = False def plsDie(self): @@ -231,17 +251,49 @@ class ImportThread(threading.Thread): except InvalidDscError, line: warn("syntax error in .dsc file '%s', line %s." % (f, line)) -# failure += 1 except ChangesUnicodeError: warn("found invalid changes file, not properly utf-8 encoded") -# failure += 1 - - print "Directory %s, file %7d, failures %3d. (%s)" % (dirpath[-10:], count, failure, changesfile) - + + except KeyboardInterrupt: + print("Caught C-c; on ImportThread. terminating.") + self.parent.plsDie() + sys.exit(1) + print("STUSTUSTUSTUSTU") + return except: traceback.print_exc() + self.parent.plsDie() + sys.exit(1) + +class ImportKnownChanges(object): + def __init__(self,num_threads): + self.queue = OneAtATime() + self.threads = [ ChangesGenerator(self,self.queue) ] + + for i in range(num_threads): + self.threads.append( ImportThread(self,self.queue) ) + + try: + for thread in self.threads: + thread.start() + + except KeyboardInterrupt: + print("Caught C-c; terminating.") + utils.warn("Caught C-c; terminating.") + self.plsDie() + + def plsDie(self): + traceback.print_stack90 + for thread in self.threads: + print( "STU: before ask %s to die" % thread ) + thread.plsDie() + print( "STU: after ask %s to die" % thread ) + + self.threads=[] + sys.exit(1) + def main(): cnf = Config() @@ -277,28 +329,10 @@ def main(): if Config().has_key( "%s::%s" %(options_prefix,"Concurrency")): num_threads = int(Config()[ "%s::%s" %(options_prefix,"Concurrency")]) + ImportKnownChanges(num_threads) - queue = OneAtATime() - threads = [ ChangesGenerator(queue) ] - - for i in range(num_threads): - threads.append( ImportThread(queue) ) - - try: - for thread in threads: - thread.start() - - for thread in thrads: - thread.join() - - except KeyboardInterrupt: - utils.warn("Caught C-c; terminating.") - for thread in threads: - thread.plsDie() - - for thread in threads: - thread.join() + if __name__ == '__main__': main() diff --git a/daklib/changes.py b/daklib/changes.py index 90ce2311..3eb842d2 100755 --- a/daklib/changes.py +++ b/daklib/changes.py @@ -192,7 +192,7 @@ class Changes(object): def mark_missing_fields(self): """add "missing" in fields which we will require for the known_changes table""" - for key in ['urgency', 'maintainer', 'fingerprint', 'changedby' ]: + for key in ['urgency', 'maintainer', 'fingerprint', 'changed-by' ]: if (not self.changes.has_key(key)) or (not self.changes[key]): self.changes[key]='missing'