From 889116189eedbfcfecee16ee25a0ed9c86565da9 Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Sun, 9 Apr 2017 20:48:08 +0200 Subject: dmarc: clean up and improve error reporting --- dmarc.py | 81 ++++++++++++++++++++++++++++++++-------------------------------- 1 file changed, 40 insertions(+), 41 deletions(-) (limited to 'dmarc.py') diff --git a/dmarc.py b/dmarc.py index 1ff327c..3543a2c 100644 --- a/dmarc.py +++ b/dmarc.py @@ -4,28 +4,29 @@ from defusedxml.ElementTree import fromstring as xmlparse class MalformedReportException(Exception): pass -def _text(elem, not_missing=True): - if elem is None: - if not_missing == True: - raise MalformedReportException('element missing') - return None +def _text(root, name, mandatory=True, default=None): + elem = root.find(name) + if elem is None or not elem.text: + if mandatory == True: + raise MalformedReportException('element missing: {}'.format(name)) + return default if not elem.text: return '' return elem.text -def _integer(elem, not_missing=True): - text = _text(elem, not_missing) +def _integer(root, name, mandatory=True, default=None): + text = _text(root, name, mandatory, default) try: return int(text) except ValueError: - raise MalformedReportException("integer element invalid: '{}'".format(text)) + raise MalformedReportException("integer element invalid: {} - '{}'".format(name, text)) -def _enum(elem, values, not_missing=True): - text = _text(elem, not_missing) +def _enum(root, name, values, mandatory=True, default=None): + text = _text(root, name, mandatory, default) if not text: return None if text not in values: - raise MalformedReportException("enum element invalid: '{}' expected: '{}'".format(text, values)) + raise MalformedReportException("enum element invalid: {} - '{}' expected: '{}'".format(name, text, values)) return text def _feedback(root): @@ -33,18 +34,16 @@ def _feedback(root): def reportmeta(root): def daterange(root): - begin = _integer(root.find('begin')) - end = _integer(root.find('end')) - if not begin or not end: - return MalformedReportException('date range begin or end missing') + begin = _integer(root, 'begin') + end = _integer(root, 'end') nt = namedtuple('DateRangeType', ['begin', 'end']) return nt(begin, end) - org = _text(root.find('org_name')) - email = _text(root.find('email')) - extra = _text(root.find('extra_contact_info'), not_missing=False) - rid = _text(root.find('report_id')) + org = _text(root, 'org_name') + email = _text(root, 'email') + extra = _text(root, 'extra_contact_info', mandatory=False) + rid = _text(root, 'report_id') drange = daterange(root.find('date_range')) errors = root.findall('error') if errors: @@ -57,12 +56,12 @@ def _feedback(root): def policypub(root): Ealignment = ['r', 's'] - domain = _text(root.find('domain')) - adkim = _enum(root.find('adkim'), Ealignment, not_missing=False) - aspf = _enum(root.find('aspf'), Ealignment, not_missing=False) - p = _enum(root.find('p'), Edisposition) - sp = _enum(root.find('sp'), Edisposition) - pct = _integer(root.find('pct')) + domain = _text(root, 'domain') + adkim = _enum(root, 'adkim', Ealignment, mandatory=False) + aspf = _enum(root, 'aspf', Ealignment, mandatory=False) + p = _enum(root, 'p', Edisposition) + sp = _enum(root, 'sp', Edisposition) + pct = _integer(root, 'pct') nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp', 'pct']) @@ -75,16 +74,16 @@ def _feedback(root): Etype = ['forwarded', 'sampled_out', 'trusted_forwarder', 'mailing_list', 'local_policy', 'other'] - otype = _enum(root.find('type'), Etype) - comm = _text(root.find('comment'), not_missing=False) + otype = _enum(root, 'type', Etype) + comm = _text(root, 'comment', mandatory=False) nt = namedtuple('PolicyOverrideReason', ['type', 'comment']) return nt(otype, comm) Eresult = ['pass', 'fail'] - disp = _enum(root.find('disposition'), Edisposition) - dkim = _enum(root.find('dkim'), Eresult) - spf = _enum(root.find('spf'), Eresult) + disp = _enum(root, 'disposition', Edisposition) + dkim = _enum(root, 'dkim', Eresult) + spf = _enum(root, 'spf', Eresult) reason = root.findall('reason') if reason: reason = map(polreason, reason) @@ -93,8 +92,8 @@ def _feedback(root): 'reason']) return nt(disp, dkim, spf, reason) - source = _text(root.find('source_ip')) - count = _integer(root.find('count')) + source = _text(root, 'source_ip') + count = _integer(root, 'count') poleval = root.find('policy_evaluated') if not poleval: raise MalformedReportException('policy_evaluated missing') @@ -103,8 +102,8 @@ def _feedback(root): return nt(source, count, policyevaluated(poleval)) def identifier(root): - envto = _text(root.find('envelope_to'), not_missing=False) - hdrfrom = _text(root.find('header_from')) + envto = _text(root, 'envelope_to', mandatory=False) + hdrfrom = _text(root, 'header_from') nt = namedtuple('IdentifierType', ['envelope_to', 'header_from']) return nt(envto, hdrfrom) @@ -113,21 +112,21 @@ def _feedback(root): def dkimauth(root): Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror', 'permerror'] - domain = _text(root.find('domain')) - selector = _text(root.find('selector'), not_missing=False) - result = _enum(root.find('result'), Eresult) - human_result = _text(root.find('human_result'), not_missing=False) + domain = _text(root, 'domain') + selector = _text(root, 'selector', mandatory=False) + result = _enum(root, 'result', Eresult) + human_result = _text(root, 'human_result', mandatory=False) nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result', 'human_result']) return nt(domain, selector, result, human_result) def spfauth(root): - Escope = ['helo', 'mfrom'] + # Escope = ['helo', 'mfrom'] #Deprecated Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', 'permerror'] - domain = _text(root.find('domain')) - result = _enum(root.find('result'), Eresult) + domain = _text(root, 'domain') + result = _enum(root, 'result', Eresult) nt = namedtuple('SPFAuthResultType', ['domain', 'result']) return nt(domain, result) -- cgit v1.2.3-54-g00ecf