From: Ansgar Burchardt Date: Thu, 31 May 2012 19:22:33 +0000 (+0200) Subject: dak/process_upload.py: update for multi-archive changes X-Git-Url: https://err.no/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=78051a3d482cdc0d938f053ad860c0c22b6ec62e;p=dak dak/process_upload.py: update for multi-archive changes --- diff --git a/dak/process_upload.py b/dak/process_upload.py index 1908ecb5..d20267ec 100755 --- a/dak/process_upload.py +++ b/dak/process_upload.py @@ -159,24 +159,27 @@ Checks Debian packages from Incoming ## Queue builds +import datetime from errno import EACCES, EAGAIN import fcntl import os import sys import traceback import apt_pkg +import time from sqlalchemy.orm.exc import NoResultFound from daklib import daklog -from daklib.queue import * -from daklib.queue_install import * -from daklib import utils from daklib.dbconn import * from daklib.urgencylog import UrgencyLog from daklib.summarystats import SummaryStats -from daklib.holding import Holding from daklib.config import Config -from daklib.regexes import re_match_expired +import daklib.utils as utils +from daklib.textutils import fix_maintainer +from daklib.regexes import * + +import daklib.archive +import daklib.upload ############################################################################### @@ -198,27 +201,229 @@ def usage (exit_code=0): ############################################################################### -def byebye(): - if not Options["No-Action"]: - # Clean out the queue files - session = DBConn().session() - session.execute("DELETE FROM changes_pending_files WHERE id NOT IN (SELECT file_id FROM changes_pending_files_map )") - session.commit() +def try_or_reject(function): + """Try to call function or reject the upload if that fails + """ + def wrapper(directory, upload, *args, **kwargs): + try: + return function(directory, upload, *args, **kwargs) + except Exception as e: + try: + reason = "There was an uncaught exception when processing your upload:\n{0}\nAny original reject reason follows below.".format(traceback.format_exc()) + upload.rollback() + return real_reject(directory, upload, reason=reason) + except Exception as e: + reason = "In addition there was an exception when rejecting the package:\n{0}\nPrevious reasons:\n{1}".format(traceback.format_exc(), reason) + upload.rollback() + return real_reject(directory, upload, reason=reason, notify=False) + + return wrapper + +def subst_for_upload(upload): + cnf = Config() + + changes = upload.changes + control = upload.changes.changes + + if upload.final_suites is None or len(upload.final_suites) == 0: + suite_name = '(unknown)' + else: + suite_names = [] + for suite in upload.final_suites: + if suite.policy_queue: + suite_names.append("{0}->{1}".format(suite.suite_name, suite.policy_queue.queue_name)) + else: + suite_names.append(suite.suite_name) + suite_name = ','.join(suite_names) + + maintainer_field = control.get('Changed-By', control.get('Maintainer', cnf['Dinstall::MyEmailAddress'])) + maintainer = fix_maintainer(maintainer_field) + addresses = utils.mail_addresses_for_upload(control.get('Maintainer', cnf['Dinstall::MyEmailAddress']), maintainer_field, changes.primary_fingerprint) + + bcc = 'X-DAK: dak process-upload' + if 'Dinstall::Bcc' in cnf: + bcc = '{0}\nBcc: {1}'.format(bcc, cnf['Dinstall::Bcc']) + + subst = { + '__DISTRO__': cnf['Dinstall::MyDistribution'], + '__ADMIN_ADDRESS__': cnf['Dinstall::MyAdminAddress'], + + '__CHANGES_FILENAME__': upload.changes.filename, + + '__SOURCE__': control.get('Source', '(unknown)'), + '__ARCHITECTURE__': control.get('Architecture', '(unknown)'), + '__VERSION__': control.get('Version', '(unknown)'), + + '__SUITE__': suite_name, + + '__DAK_ADDRESS__': cnf['Dinstall::MyEmailAddress'], + '__MAINTAINER_FROM__': maintainer[1], + '__MAINTAINER_TO__': ", ".join(addresses), + '__MAINTAINER__': maintainer_field, + '__BCC__': bcc, + + '__BUG_SERVER__': cnf.get('Dinstall::BugServer'), + + # TODO: don't use private member + '__FILE_CONTENTS__': upload.changes._signed_file.contents, + + # __REJECT_MESSAGE__ + } + + override_maintainer = cnf.get('Dinstall::OverrideMaintainer') + if override_maintainer: + subst['__MAINTAINER_TO__'] = subst['__MAINTAINER_FROM__'] = override_maintainer + + return subst + +@try_or_reject +def accept(directory, upload): + cnf = Config() + + Logger.log(['ACCEPT', upload.changes.filename]) + + upload.install() + + accepted_to_real_suite = False + for suite in upload.final_suites: + accepted_to_real_suite = accepted_to_real_suite or suite.policy_queue is None + + control = upload.changes.changes + if 'source' in upload.changes.architectures and not Options['No-Action']: + urgency = control.get('Urgency') + if urgency not in cnf.value_list('Urgency::Valid'): + urgency = cnf['Urgency::Default'] + UrgencyLog().log(control['Source'], control['Version'], urgency) + + # send mail to maintainer + subst = subst_for_upload(upload) + message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.accepted')) + utils.send_mail(message) + + # send mail to announce lists and tracking server + subst = subst_for_upload(upload) + announce = set() + for suite in upload.final_suites: + if suite.policy_queue is None: + continue + announce.update(suite.announce or []) + announce_address = ", ".join(announce) + tracking = cnf.get('Dinstall::TrackingServer') + if tracking and 'source' in upload.changes.architectures: + announce_address = '{0}\nBcc: {1}@{2}'.format(announce_address, control['Source'], tracking) + message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.announce')) + utils.send_mail(message) + + # Only close bugs for uploads that were not redirected to a policy queue. + # process-policy will close bugs for those once they are accepted. + subst = subst_for_upload(upload) + if accepted_to_real_suite and cnf.find_b('Dinstall::CloseBugs') and upload.changes.source is not None: + for bugnum in upload.changes.closed_bugs: + subst['__BUG_NUMBER__'] = str(bugnum) + + message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.bug-close')) + utils.send_mail(message) + + del subst['__BUG_NUMBER__'] + + # Move .changes to done, but only for uploads that were accepted to a + # real suite. process-policy will handle this for uploads to queues. + if accepted_to_real_suite: + src = os.path.join(upload.directory, upload.changes.filename) + + now = datetime.datetime.now() + donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d')) + dst = os.path.join(donedir, upload.changes.filename) + dst = utils.find_next_free(dst) + + upload.transaction.fs.copy(src, dst, mode=0o644) + + SummaryStats().accept_count += 1 + SummaryStats().accept_bytes += upload.changes.bytes + +@try_or_reject +def accept_to_new(directory, upload): + cnf = Config() + + Logger.log(['ACCEPT-TO-NEW', upload.changes.filename]) + + upload.install_to_new() + # TODO: tag bugs pending, send announcement + + subst = subst_for_upload(upload) + message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.new')) + utils.send_mail(message) + + SummaryStats().accept_count += 1 + SummaryStats().accept_bytes += upload.changes.bytes + +@try_or_reject +def reject(directory, upload, reason=None, notify=True): + real_reject(directory, upload, reason, notify) + +def real_reject(directory, upload, reason=None, notify=True): + # XXX: rejection itself should go to daklib.archive.ArchiveUpload + cnf = Config() + + Logger.log(['REJECT', upload.changes.filename]) + + fs = upload.transaction.fs + rejectdir = cnf['Dir::Reject'] + files = [ f.filename for f in upload.changes.files.itervalues() ] + files.append(upload.changes.filename) + for fn in files: + src = os.path.join(upload.directory, fn) + dst = utils.find_next_free(os.path.join(rejectdir, fn)) + fs.copy(src, dst) + + if upload.reject_reasons is not None: + if reason is None: + reason = '' + reason = reason + '\n' + '\n'.join(upload.reject_reasons) + + if reason is None: + reason = '(Unknown reason. Please check logs.)' + + dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(upload.changes.filename))) + fh = fs.create(dst) + fh.write(reason) + fh.close() + + # TODO: fix + if notify: + subst = subst_for_upload(upload) + subst['__REJECTOR_ADDRESS__'] = cnf['Dinstall::MyEmailAddress'] + subst['__MANUAL_REJECT_MESSAGE__'] = '' + subst['__REJECT_MESSAGE__'] = reason + subst['__CC__'] = 'X-DAK-Rejection: automatic (moo)' + + message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'queue.rejected')) + utils.send_mail(message) + + SummaryStats().reject_count += 1 + +############################################################################### + +def action(directory, upload): + changes = upload.changes + processed = True -def action(u, session): global Logger cnf = Config() - holding = Holding() - # changes["distribution"] may not exist in corner cases - # (e.g. unreadable changes files) - if not u.pkg.changes.has_key("distribution") or not isinstance(u.pkg.changes["distribution"], dict): - u.pkg.changes["distribution"] = {} + okay = upload.check() - (summary, short_summary) = u.build_summaries() + summary = changes.changes.get('Changes', '') + + package_info = [] + if okay: + if changes.source is not None: + package_info.append("source:{0}".format(changes.source.dsc['Source'])) + for binary in changes.binaries: + package_info.append("binary:{0}".format(binary.control['Package'])) (prompt, answer) = ("", "XXX") if Options["No-Action"] or Options["Automatic"]: @@ -226,63 +431,36 @@ def action(u, session): queuekey = '' - pi = u.package_info() + print summary + print + print "\n".join(package_info) + print - try: - chg = session.query(DBChange).filter_by(changesname=os.path.basename(u.pkg.changes_file)).one() - except NoResultFound as e: - chg = None + if len(upload.reject_reasons) > 0: + print "Reason:" + print "\n".join(upload.reject_reasons) + print - if len(u.rejects) > 0: - if u.upload_too_new(): - print "SKIP (too new)\n" + pi, + path = os.path.join(directory, changes.filename) + created = os.stat(path).st_mtime + now = time.time() + too_new = (now - created < int(cnf['Dinstall::SkipTime'])) + + if too_new: + print "SKIP (too new)" prompt = "[S]kip, Quit ?" else: - print "REJECT\n" + pi prompt = "[R]eject, Skip, Quit ?" if Options["Automatic"]: answer = 'R' + elif upload.new: + prompt = "[N]ew, Skip, Quit ?" + if Options['Automatic']: + answer = 'N' else: - # Are we headed for NEW / BYHAND / AUTOBYHAND? - # Note that policy queues are no longer handled here - qu = determine_target(u) - if qu: - print "%s for %s\n%s%s" % ( qu.upper(), ", ".join(u.pkg.changes["distribution"].keys()), pi, summary) - queuekey = qu[0].upper() - if queuekey in "RQSA": - queuekey = "D" - prompt = "[D]ivert, Skip, Quit ?" - else: - prompt = "[%s]%s, Skip, Quit ?" % (queuekey, qu[1:].lower()) - if Options["Automatic"]: - answer = queuekey - else: - # Does suite have a policy_queue configured - divert = False - for s in u.pkg.changes["distribution"].keys(): - suite = get_suite(s, session) - if suite.policy_queue: - if not chg or chg.approved_for_id != suite.policy_queue.policy_queue_id: - # This routine will check whether the upload is a binary - # upload when the source is already in the target suite. If - # so, we skip the policy queue, otherwise we go there. - divert = package_to_suite(u, suite.suite_name, session=session) - if divert: - print "%s for %s\n%s%s" % ( suite.policy_queue.queue_name.upper(), - ", ".join(u.pkg.changes["distribution"].keys()), - pi, summary) - queuekey = "P" - prompt = "[P]olicy, Skip, Quit ?" - policyqueue = suite.policy_queue - if Options["Automatic"]: - answer = 'P' - break - - if not divert: - print "ACCEPT\n" + pi + summary, - prompt = "[A]ccept, Skip, Quit ?" - if Options["Automatic"]: - answer = 'A' + prompt = "[A]ccept, Skip, Quit ?" + if Options['Automatic']: + answer = 'A' while prompt.find(answer) == -1: answer = utils.our_raw_input(prompt) @@ -292,131 +470,86 @@ def action(u, session): answer = answer[:1].upper() if answer == 'R': - os.chdir(u.pkg.directory) - u.do_reject(0, pi) + reject(directory, upload) elif answer == 'A': - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - session.commit() - u.accept(summary, short_summary, session) - u.check_override() - chg.clean_from_queue() - session.commit() - u.remove() - elif answer == 'P': - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - package_to_queue(u, summary, short_summary, policyqueue, chg, session) - session.commit() - u.remove() - elif answer == queuekey: - if not chg: - chg = u.pkg.add_known_changes(holding.holding_dir, session=session, logger=Logger) - QueueInfo[qu]["process"](u, summary, short_summary, chg, session) - session.commit() - u.remove() + # upload.try_autobyhand must not be run with No-Action. + if Options['No-Action']: + accept(directory, upload) + elif upload.try_autobyhand(): + accept(directory, upload) + else: + print "W: redirecting to BYHAND as automatic processing failed." + accept_to_new(directory, upload) + elif answer == 'N': + accept_to_new(directory, upload) elif answer == 'Q': - byebye() sys.exit(0) + elif answer == 'S': + processed = False + + #raise Exception("FAIL") + if not Options['No-Action']: + upload.commit() - session.commit() + return processed ############################################################################### -def cleanup(): - h = Holding() - if not Options["No-Action"]: - h.clean() +def unlink_if_exists(path): + try: + os.unlink(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise -def process_it(changes_file, session): +def process_it(directory, changes, keyrings, session): global Logger - Logger.log(["Processing changes file", changes_file]) + print "\n{0}\n".format(changes.filename) + Logger.log(["Processing changes file", changes.filename]) cnf = Config() - holding = Holding() - - # TODO: Actually implement using pending* tables so that we don't lose track - # of what is where - - u = Upload() - u.pkg.changes_file = changes_file - u.pkg.directory = os.getcwd() - u.logger = Logger - origchanges = os.path.abspath(u.pkg.changes_file) - # Some defaults in case we can't fully process the .changes file - u.pkg.changes["maintainer2047"] = cnf["Dinstall::MyEmailAddress"] - u.pkg.changes["changedby2047"] = cnf["Dinstall::MyEmailAddress"] + #u.pkg.changes["maintainer2047"] = cnf["Dinstall::MyEmailAddress"] + #u.pkg.changes["changedby2047"] = cnf["Dinstall::MyEmailAddress"] # debian-{devel-,}-changes@lists.debian.org toggles writes access based on this header bcc = "X-DAK: dak process-upload" - if cnf.has_key("Dinstall::Bcc"): - u.Subst["__BCC__"] = bcc + "\nBcc: %s" % (cnf["Dinstall::Bcc"]) - else: - u.Subst["__BCC__"] = bcc + #if cnf.has_key("Dinstall::Bcc"): + # u.Subst["__BCC__"] = bcc + "\nBcc: %s" % (cnf["Dinstall::Bcc"]) + #else: + # u.Subst["__BCC__"] = bcc + + with daklib.archive.ArchiveUpload(directory, changes, keyrings) as upload: + processed = action(directory, upload) + if processed and not Options['No-Action']: + unlink_if_exists(os.path.join(directory, changes.filename)) + for fn in changes.files: + unlink_if_exists(os.path.join(directory, fn)) - # Remember where we are so we can come back after cd-ing into the - # holding directory. TODO: Fix this stupid hack - u.prevdir = os.getcwd() +############################################################################### - try: - # If this is the Real Thing(tm), copy things into a private - # holding directory first to avoid replacable file races. - if not Options["No-Action"]: - holding.chdir_to_holding() - - # Absolutize the filename to avoid the requirement of being in the - # same directory as the .changes file. - holding.copy_to_holding(origchanges) - - # Relativize the filename so we use the copy in holding - # rather than the original... - changespath = os.path.basename(u.pkg.changes_file) - else: - changespath = origchanges +def process_changes(changes_filenames): + session = DBConn().session() + keyrings = session.query(Keyring).filter_by(active=True).order_by(Keyring.priority) + keyring_files = [ k.keyring_name for k in keyrings ] + + changes = [] + for fn in changes_filenames: + try: + directory, filename = os.path.split(fn) + c = daklib.upload.Changes(directory, filename, keyring_files) + changes.append([directory, c]) + except Exception as e: + Logger.log([filename, "Error while loading changes: {0}".format(e)]) - (u.pkg.changes["fingerprint"], rejects) = utils.check_signature(changespath) + changes.sort(key=lambda x: x[1]) - if u.pkg.changes["fingerprint"]: - valid_changes_p = u.load_changes(changespath) - else: - for reason in rejects: - if re_match_expired.match(reason): - # Hrm, key expired. Lets see if we can still parse the .changes before - # we reject. Then we would be able to mail the maintainer, instead of - # just silently dropping the upload. - u.load_changes(changespath) - valid_changes_p = False - u.rejects.extend(rejects) - - if valid_changes_p: - u.check_distributions() - u.check_files(not Options["No-Action"]) - valid_dsc_p = u.check_dsc(not Options["No-Action"]) - if valid_dsc_p and not Options["No-Action"]: - u.check_source() - u.check_hashes() - if valid_dsc_p and not Options["No-Action"] and not len(u.rejects): - u.check_lintian() - u.check_urgency() - u.check_timestamps() - u.check_signed_by_key() - - action(u, session) - - except (SystemExit, KeyboardInterrupt): - cleanup() - raise - - except: - print "ERROR" - traceback.print_exc(file=sys.stderr) - - cleanup() - # Restore previous WD - os.chdir(u.prevdir) + for directory, c in changes: + process_it(directory, c, keyring_files, session) + + session.rollback() ############################################################################### @@ -426,8 +559,6 @@ def main(): cnf = Config() summarystats = SummaryStats() - DBConn() - Arguments = [('a',"automatic","Dinstall::Options::Automatic"), ('h',"help","Dinstall::Options::Help"), ('n',"no-action","Dinstall::Options::No-Action"), @@ -485,15 +616,7 @@ def main(): else: Logger.log(["Using changes files from command-line", len(changes_files)]) - # Sort the .changes files so that we process sourceful ones first - changes_files.sort(utils.changes_compare) - - # Process the changes files - for changes_file in changes_files: - print "\n" + changes_file - session = DBConn().session() - process_it(changes_file, session) - session.close() + process_changes(changes_files) if summarystats.accept_count: sets = "set" @@ -510,8 +633,6 @@ def main(): print "Rejected %d package %s." % (summarystats.reject_count, sets) Logger.log(["rejected", summarystats.reject_count]) - byebye() - if not Options["No-Action"]: urgencylog.close()