from collections import namedtuple from defusedxml.ElementTree import fromstring as xmlparse class MalformedReportException(Exception): pass def _text(elem, not_missing=True): if elem is None: if not_missing == True: raise MalformedReportException('element missing') return None if not elem.text: return '' return elem.text def _integer(elem, not_missing=True): text = _text(elem, not_missing) try: return int(text) except ValueError: raise MalformedReportException("integer element invalid: '{}'".format(text)) def _enum(elem, values, not_missing=True): text = _text(elem, not_missing) if not text: return None if text not in values: raise MalformedReportException("enum element invalid: '{}' expected: '{}'".format(text, values)) return text def _feedback(root): Edisposition = ['none', 'quarantine', 'reject'] def reportmeta(root): def daterange(root): begin = _integer(root.find('begin')) end = _integer(root.find('end')) if not begin or not end: return MalformedReportException('date range begin or end missing') nt = namedtuple('DateRangeType', ['begin', 'end']) return nt(begin, end) org = _text(root.find('org_name')) email = _text(root.find('email')) extra = _text(root.find('extra_contact_info'), not_missing=False) rid = _text(root.find('report_id')) drange = daterange(root.find('date_range')) errors = root.findall('error') if errors: errors = map(_text, errors) nt = namedtuple('ReportMetadataType', ['org_name', 'email', 'extra_contact_info', 'report_id', 'date_range', 'error']) return nt(org, email, extra, rid, drange, errors) def policypub(root): Ealignment = ['r', 's'] domain = _text(root.find('domain')) adkim = _enum(root.find('adkim'), Ealignment, not_missing=False) aspf = _enum(root.find('aspf'), Ealignment, not_missing=False) p = _enum(root.find('p'), Edisposition) sp = _enum(root.find('sp'), Edisposition) pct = _integer(root.find('pct')) nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp', 'pct']) return nt(domain, adkim, aspf, p, sp, pct) def record(root): def row(root): def policyevaluated(root): def polreason(root): Etype = ['forwarded', 'sampled_out', 'trusted_forwarder', 'mailing_list', 'local_policy', 'other'] otype = _enum(root.find('type'), Etype) comm = _text(root.find('comment'), not_missing=False) nt = namedtuple('PolicyOverrideReason', ['type', 'comment']) return nt(otype, comm) Eresult = ['pass', 'fail'] disp = _enum(root.find('disposition'), Edisposition) dkim = _enum(root.find('dkim'), Eresult) spf = _enum(root.find('spf'), Eresult) reason = root.findall('reason') if reason: reason = map(polreason, reason) nt = namedtuple('PolicyEvaluatedType', ['disposition', 'dkim', 'spf', 'reason']) return nt(disp, dkim, spf, reason) source = _text(root.find('source_ip')) count = _integer(root.find('count')) poleval = root.find('policy_evaluated') if not poleval: raise MalformedReportException('policy_evaluated missing') nt = namedtuple('RowType', ['source_ip', 'count', 'policy_evaluated']) return nt(source, count, policyevaluated(poleval)) def identifier(root): envto = _text(root.find('envelope_to'), not_missing=False) hdrfrom = _text(root.find('header_from')) nt = namedtuple('IdentifierType', ['envelope_to', 'header_from']) return nt(envto, hdrfrom) def authresult(root): def dkimauth(root): Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror', 'permerror'] domain = _text(root.find('domain')) selector = _text(root.find('selector'), not_missing=False) result = _enum(root.find('result'), Eresult) human_result = _text(root.find('human_result'), not_missing=False) nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result', 'human_result']) return nt(domain, selector, result, human_result) def spfauth(root): Escope = ['helo', 'mfrom'] Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', 'permerror'] domain = _text(root.find('domain')) result = _enum(root.find('result'), Eresult) nt = namedtuple('SPFAuthResultType', ['domain', 'result']) return nt(domain, result) dkims = root.findall('dkim') spfs = root.findall('spf') if not spfs: raise MalformedReportException('auth_results.spf missing') else: spfs = map(spfauth, spfs) if dkims: dkims = map(dkimauth, dkims) nt = namedtuple('AuthResultType', ['dkim', 'spf']) return nt([*dkims], [*spfs]) r = root.find('row') idents = root.find('identifiers') results = root.find('auth_results') if not r or not idents or not results: raise MalformedReportException('row, identifiers or auth_results missing') nt = namedtuple('RecordType', ['row', 'identifiers', 'auth_results']) return nt(row(r), identifier(idents), authresult(results)) meta = root.find('report_metadata') pol = root.find('policy_published') recs = root.findall('record') if meta is None or pol is None or not recs: raise MalformedReportException('report_metadata, policy_published or record missing') nt = namedtuple('feedback', ['report_metadata', 'policy_published', 'record']) return nt(reportmeta(meta), policypub(pol), list(map(record, recs))) def parse_dmarc(r): root = xmlparse(r) if root.tag != 'feedback': raise MalformedReportException('root is not feedback') return _feedback(root)