################################################################################
import os
-import copy
+import datetime
import sys
+import traceback
import apt_pkg
from daklib.dbconn import *
-from daklib.queue import *
from daklib import daklog
from daklib import utils
from daklib.dak_exceptions import CantOpenError, AlreadyLockedError, CantGetLockError
from daklib.config import Config
-from daklib.changesutils import *
+from daklib.archive import ArchiveTransaction
+from daklib.urgencylog import UrgencyLog
+from daklib.textutils import fix_maintainer
# Globals
Options = None
################################################################################
-def do_comments(dir, srcqueue, opref, npref, line, fn, session):
+def do_comments(dir, srcqueue, opref, npref, line, fn, transaction):
+ session = transaction.session
for comm in [ x for x in os.listdir(dir) if x.startswith(opref) ]:
- lines = open("%s/%s" % (dir, comm)).readlines()
+ lines = open(os.path.join(dir, comm)).readlines()
if len(lines) == 0 or lines[0] != line + "\n": continue
- changes_files = [ x for x in os.listdir(".") if x.startswith(comm[len(opref):]+"_")
- and x.endswith(".changes") ]
- changes_files = sort_changes(changes_files, session)
- for f in changes_files:
- print "Processing changes file: %s" % f
- f = utils.validate_changes_file_arg(f, 0)
- if not f:
- print "Couldn't validate changes file %s" % f
- continue
- fn(f, srcqueue, "".join(lines[1:]), session)
-
- if opref != npref and not Options["No-Action"]:
+
+ # If the ACCEPT includes a _<arch> we only accept that .changes.
+ # Otherwise we accept all .changes that start with the given prefix
+ changes_prefix = comm[len(opref):]
+ if changes_prefix.count('_') < 2:
+ changes_prefix = changes_prefix + '_'
+ else:
+ changes_prefix = changes_prefix + '.changes'
+
+ uploads = session.query(PolicyQueueUpload).filter_by(policy_queue=srcqueue) \
+ .join(PolicyQueueUpload.changes).filter(DBChange.changesname.startswith(changes_prefix)) \
+ .order_by(PolicyQueueUpload.source_id)
+ for u in uploads:
+ print "Processing changes file: %s" % u.changes.changesname
+ fn(u, srcqueue, "".join(lines[1:]), transaction)
+
+ if opref != npref:
newcomm = npref + comm[len(opref):]
- os.rename("%s/%s" % (dir, comm), "%s/%s" % (dir, newcomm))
+ transaction.fs.move(os.path.join(dir, comm), os.path.join(dir, newcomm))
+
+################################################################################
+
+def try_or_reject(function):
+ def wrapper(upload, srcqueue, comments, transaction):
+ try:
+ function(upload, srcqueue, comments, transaction)
+ except Exception as e:
+ comments = 'An exception was raised while processing the package:\n{0}\nOriginal comments:\n{1}'.format(traceback.format_exc(), comments)
+ try:
+ transaction.rollback()
+ real_comment_reject(upload, srcqueue, comments, transaction)
+ except Exception as e:
+ comments = 'In addition an exception was raised while trying to reject the upload:\n{0}\nOriginal rejection:\n{1}'.format(traceback.format_exc(), comments)
+ transaction.rollback()
+ real_comment_reject(upload, srcqueue, comments, transaction, notify=False)
+ if not Options['No-Action']:
+ transaction.commit()
+ return wrapper
################################################################################
-def comment_accept(changes_file, srcqueue, comments, session):
- u = Upload()
- u.pkg.changes_file = changes_file
- u.load_changes(changes_file)
- u.update_subst()
+@try_or_reject
+def comment_accept(upload, srcqueue, comments, transaction):
+ for byhand in upload.byhand:
+ path = os.path.join(srcqueue.path, byhand.filename)
+ if os.path.exists(path):
+ raise Exception('E: cannot ACCEPT upload with unprocessed byhand file {0}'.format(byhand.filename))
+ cnf = Config()
+
+ fs = transaction.fs
+ session = transaction.session
+ changesname = upload.changes.changesname
+ allow_tainted = srcqueue.suite.archive.tainted
+
+ # We need overrides to get the target component
+ overridesuite = upload.target_suite
+ if overridesuite.overridesuite is not None:
+ overridesuite = session.query(Suite).filter_by(suite_name=overridesuite.overridesuite).one()
+
+ def binary_component_func(db_binary):
+ override = session.query(Override).filter_by(suite=overridesuite, package=db_binary.package) \
+ .join(OverrideType).filter(OverrideType.overridetype == db_binary.binarytype) \
+ .join(Component).one()
+ return override.component
+
+ def source_component_func(db_source):
+ override = session.query(Override).filter_by(suite=overridesuite, package=db_source.source) \
+ .join(OverrideType).filter(OverrideType.overridetype == 'dsc') \
+ .join(Component).one()
+ return override.component
+
+ all_target_suites = [upload.target_suite]
+ all_target_suites.extend([q.suite for q in upload.target_suite.copy_queues])
+
+ for suite in all_target_suites:
+ if upload.source is not None:
+ transaction.copy_source(upload.source, suite, source_component_func(upload.source), allow_tainted=allow_tainted)
+ for db_binary in upload.binaries:
+ transaction.copy_binary(db_binary, suite, binary_component_func(db_binary), allow_tainted=allow_tainted, extra_archives=[upload.target_suite.archive])
+
+ # Copy .changes if needed
+ if upload.target_suite.copychanges:
+ src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
+ dst = os.path.join(upload.target_suite.path, upload.changes.changesname)
+ fs.copy(src, dst, mode=upload.target_suite.archive.mode)
+
+ if upload.source is not None and not Options['No-Action']:
+ urgency = upload.changes.urgency
+ if urgency not in cnf.value_list('Urgency::Valid'):
+ urgency = cnf['Urgency::Default']
+ UrgencyLog().log(upload.source.source, upload.source.version, urgency)
+
+ print " ACCEPT"
+ if not Options['No-Action']:
+ Logger.log(["Policy Queue ACCEPT", srcqueue.queue_name, changesname])
+
+ # Send announcement
+ subst = subst_for_upload(upload)
+ announce = ", ".join(upload.target_suite.announce or [])
+ tracking = cnf.get('Dinstall::TrackingServer')
+ if tracking and upload.source is not None:
+ announce = '{0}\nBcc: {1}@{2}'.format(announce, upload.changes.source, tracking)
+ subst['__ANNOUNCE_LIST_ADDRESS__'] = announce
+ message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.announce'))
+ utils.send_mail(message)
+
+ # TODO: code duplication. Similar code is in process-upload.
+ if cnf.find_b('Dinstall::CloseBugs') and upload.changes.closes is not None and upload.source is not None:
+ for bugnum in upload.changes.closes:
+ subst['__BUG_NUMBER__'] = bugnum
+ message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'process-unchecked.bug-close'))
+ utils.send_mail(message)
+
+ del subst['__BUG_NUMBER__']
+
+ # TODO: code duplication. Similar code is in process-upload.
+ # Move .changes to done
+ src = os.path.join(upload.policy_queue.path, upload.changes.changesname)
+ now = datetime.datetime.now()
+ donedir = os.path.join(cnf['Dir::Done'], now.strftime('%Y/%m/%d'))
+ dst = os.path.join(donedir, upload.changes.changesname)
+ dst = utils.find_next_free(dst)
+ fs.copy(src, dst, mode=0o644)
+
+ remove_upload(upload, transaction)
+
+################################################################################
+
+@try_or_reject
+def comment_reject(*args):
+ real_comment_reject(*args)
+
+def real_comment_reject(upload, srcqueue, comments, transaction, notify=True):
+ cnf = Config()
+
+ fs = transaction.fs
+ session = transaction.session
+ changesname = upload.changes.changesname
+ queuedir = upload.policy_queue.path
+ rejectdir = cnf['Dir::Reject']
+
+ ### Copy files to reject/
+
+ poolfiles = [b.poolfile for b in upload.binaries]
+ if upload.source is not None:
+ poolfiles.extend([df.poolfile for df in upload.source.srcfiles])
+ # Not beautiful...
+ files = [ af.path for af in session.query(ArchiveFile) \
+ .filter_by(archive=upload.policy_queue.suite.archive) \
+ .join(ArchiveFile.file) \
+ .filter(PoolFile.file_id.in_([ f.file_id for f in poolfiles ])) ]
+ for byhand in upload.byhand:
+ path = os.path.join(queuedir, byhand.filename)
+ if os.path.exists(path):
+ files.append(path)
+ files.append(os.path.join(queuedir, changesname))
+
+ for fn in files:
+ dst = utils.find_next_free(os.path.join(rejectdir, os.path.basename(fn)))
+ fs.copy(fn, dst, link=True)
+
+ ### Write reason
+
+ dst = utils.find_next_free(os.path.join(rejectdir, '{0}.reason'.format(changesname)))
+ fh = fs.create(dst)
+ fh.write(comments)
+ fh.close()
+
+ ### Send mail notification
+
+ if notify:
+ subst = subst_for_upload(upload)
+ subst['__MANUAL_REJECT_MESSAGE__'] = ''
+ subst['__REJECT_MESSAGE__'] = comments
+ message = utils.TemplateSubst(subst, os.path.join(cnf['Dir::Templates'], 'queue.rejected'))
+ utils.send_mail(message)
+
+ print " REJECT"
if not Options["No-Action"]:
- destqueue = get_policy_queue('newstage', session)
- if changes_to_queue(u, srcqueue, destqueue, session):
- print " ACCEPT"
- Logger.log(["Policy Queue ACCEPT: %s: %s" % (srcqueue.queue_name, u.pkg.changes_file)])
- else:
- print "E: Failed to migrate %s" % u.pkg.changes_file
+ Logger.log(["Policy Queue REJECT", srcqueue.queue_name, upload.changes.changesname])
+
+ remove_upload(upload, transaction)
################################################################################
-def comment_reject(changes_file, srcqueue, comments, session):
- u = Upload()
- u.pkg.changes_file = changes_file
- u.load_changes(changes_file)
- u.update_subst()
+def remove_upload(upload, transaction):
+ fs = transaction.fs
+ session = transaction.session
+ changes = upload.changes
+
+ # Remove byhand and changes files. Binary and source packages will be
+ # removed from {bin,src}_associations and eventually removed by clean-suites automatically.
+ queuedir = upload.policy_queue.path
+ for byhand in upload.byhand:
+ path = os.path.join(queuedir, byhand.filename)
+ if os.path.exists(path):
+ fs.unlink(path)
+ session.delete(byhand)
+ fs.unlink(os.path.join(queuedir, upload.changes.changesname))
+
+ session.delete(upload)
+ session.delete(changes)
+ session.flush()
- u.rejects.append(comments)
+################################################################################
+def subst_for_upload(upload):
+ # TODO: similar code in process-upload
cnf = Config()
- bcc = "X-DAK: dak process-policy"
- if cnf.has_key("Dinstall::Bcc"):
- u.Subst["__BCC__"] = bcc + "\nBcc: %s" % (cnf["Dinstall::Bcc"])
- else:
- u.Subst["__BCC__"] = bcc
- if not Options["No-Action"]:
- u.do_reject(manual=0, reject_message='\n'.join(u.rejects))
- u.pkg.remove_known_changes(session=session)
- session.commit()
+ maintainer_field = upload.changes.changedby or upload.changes.maintainer
+ addresses = utils.mail_addresses_for_upload(upload.changes.maintainer, maintainer_field, upload.changes.fingerprint)
+
+ changes_path = os.path.join(upload.policy_queue.path, upload.changes.changesname)
+ changes_contents = open(changes_path, 'r').read()
+
+ bcc = 'X-DAK: dak process-policy'
+ if 'Dinstall::Bcc' in cnf:
+ bcc = '{0}\nBcc: {1}'.format(bcc, cnf['Dinstall::Bcc'])
- print " REJECT"
- Logger.log(["Policy Queue REJECT: %s: %s" % (srcqueue.queue_name, u.pkg.changes_file)])
+ subst = {
+ '__DISTRO__': cnf['Dinstall::MyDistribution'],
+ '__ADMIN_ADDRESS__': cnf['Dinstall::MyAdminAddress'],
+ '__CHANGES_FILENAME__': upload.changes.changesname,
+ '__SOURCE__': upload.changes.source,
+ '__VERSION__': upload.changes.version,
+ '__ARCHITECTURE__': upload.changes.architecture,
+ '__MAINTAINER__': maintainer_field,
+ '__MAINTAINER_FROM__': fix_maintainer(maintainer_field)[1],
+ '__MAINTAINER_TO__': ", ".join(addresses),
+ '__CC__': 'X-DAK-Rejection: manual or automatic',
+ '__REJECTOR_ADDRESS__': cnf['Dinstall::MyEmailAddress'],
+ '__BCC__': bcc,
+ '__BUG_SERVER__': cnf.get('Dinstall::BugServer'),
+ '__FILE_CONTENTS__': changes_contents,
+ }
+
+ override_maintainer = cnf.get('Dinstall::OverrideMaintainer')
+ if override_maintainer:
+ subst['__MAINTAINER_TO__'] = override_maintainer
+
+ return subst
+
+################################################################################
+
+def remove_unreferenced_binaries(policy_queue, transaction):
+ """Remove binaries that are no longer referenced by an upload
+
+ @type policy_queue: L{daklib.dbconn.PolicyQueue}
+
+ @type transaction: L{daklib.archive.ArchiveTransaction}
+ """
+ session = transaction.session
+ suite = policy_queue.suite
+
+ query = """
+ SELECT b.*
+ FROM binaries b
+ JOIN bin_associations ba ON b.id = ba.bin
+ WHERE ba.suite = :suite_id
+ AND NOT EXISTS (SELECT 1 FROM policy_queue_upload_binaries_map pqubm
+ JOIN policy_queue_upload pqu ON pqubm.policy_queue_upload_id = pqu.id
+ WHERE pqu.policy_queue_id = :policy_queue_id
+ AND pqubm.binary_id = b.id)"""
+ binaries = session.query(DBBinary).from_statement(query) \
+ .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
+
+ for binary in binaries:
+ Logger.log(["removed binary from policy queue", policy_queue.queue_name, binary.package, binary.version])
+ transaction.remove_binary(binary, suite)
+
+def remove_unreferenced_sources(policy_queue, transaction):
+ """Remove sources that are no longer referenced by an upload or a binary
+
+ @type policy_queue: L{daklib.dbconn.PolicyQueue}
+
+ @type transaction: L{daklib.archive.ArchiveTransaction}
+ """
+ session = transaction.session
+ suite = policy_queue.suite
+
+ query = """
+ SELECT s.*
+ FROM source s
+ JOIN src_associations sa ON s.id = sa.source
+ WHERE sa.suite = :suite_id
+ AND NOT EXISTS (SELECT 1 FROM policy_queue_upload pqu
+ WHERE pqu.policy_queue_id = :policy_queue_id
+ AND pqu.source_id = s.id)
+ AND NOT EXISTS (SELECT 1 FROM binaries b
+ JOIN bin_associations ba ON b.id = ba.bin
+ WHERE b.source = s.id
+ AND ba.suite = :suite_id)"""
+ sources = session.query(DBSource).from_statement(query) \
+ .params({'suite_id': policy_queue.suite_id, 'policy_queue_id': policy_queue.policy_queue_id})
+
+ for source in sources:
+ Logger.log(["removed source from policy queue", policy_queue.queue_name, source.source, source.version])
+ transaction.remove_source(source, suite)
################################################################################
if Options["Help"]:
usage()
+ Logger = daklog.Logger("process-policy")
if not Options["No-Action"]:
+ urgencylog = UrgencyLog()
+
+ with ArchiveTransaction() as transaction:
+ session = transaction.session
try:
- Logger = daklog.Logger("process-policy")
- except CantOpenError as e:
- Logger = None
+ pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
+ except NoResultFound:
+ print "E: Cannot find policy queue %s" % queue_name
+ sys.exit(1)
- # Find policy queue
- session.query(PolicyQueue)
+ commentsdir = os.path.join(pq.path, 'COMMENTS')
+ # The comments stuff relies on being in the right directory
+ os.chdir(pq.path)
- try:
- pq = session.query(PolicyQueue).filter_by(queue_name=queue_name).one()
- except NoResultFound:
- print "E: Cannot find policy queue %s" % queue_name
- sys.exit(1)
+ do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, transaction)
+ do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, transaction)
+ do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, transaction)
- commentsdir = os.path.join(pq.path, 'COMMENTS')
- # The comments stuff relies on being in the right directory
- os.chdir(pq.path)
- do_comments(commentsdir, pq, "ACCEPT.", "ACCEPTED.", "OK", comment_accept, session)
- do_comments(commentsdir, pq, "ACCEPTED.", "ACCEPTED.", "OK", comment_accept, session)
- do_comments(commentsdir, pq, "REJECT.", "REJECTED.", "NOTOK", comment_reject, session)
+ remove_unreferenced_binaries(pq, transaction)
+ remove_unreferenced_sources(pq, transaction)
+ if not Options['No-Action']:
+ urgencylog.close()
################################################################################