from collections import namedtuple from defusedxml.ElementTree import fromstring as xmlparse class MalformedReportException(Exception): pass def _text(root, name, mandatory=True, default=None): elem = root.find(name) if elem is None or not elem.text: if mandatory == True: raise MalformedReportException('element missing: {}'.format(name)) return default if not elem.text: return '' return elem.text def _integer(root, name, mandatory=True, default=None): text = _text(root, name, mandatory, default) try: return int(text) except ValueError: raise MalformedReportException("integer element invalid: {} - '{}'".format(name, text)) def _enum(root, name, values, mandatory=True, default=None): text = _text(root, name, mandatory, default) if not text: return None if text not in values: raise MalformedReportException("enum element invalid: {} - '{}' expected: '{}'".format(name, text, values)) return text def _feedback(root): Edisposition = ['none', 'quarantine', 'reject'] def reportmeta(root): def daterange(root): begin = _integer(root, 'begin') end = _integer(root, 'end') nt = namedtuple('DateRangeType', ['begin', 'end']) return nt(begin, end) org = _text(root, 'org_name') email = _text(root, 'email') extra = _text(root, 'extra_contact_info', mandatory=False) rid = _text(root, 'report_id') drange = daterange(root.find('date_range')) errors = root.findall('error') if errors: errors = map(_text, errors) nt = namedtuple('ReportMetadataType', ['org_name', 'email', 'extra_contact_info', 'report_id', 'date_range', 'error']) return nt(org, email, extra, rid, drange, errors) def policypub(root): Ealignment = ['r', 's'] domain = _text(root, 'domain') adkim = _enum(root, 'adkim', Ealignment, mandatory=False) aspf = _enum(root, 'aspf', Ealignment, mandatory=False) p = _enum(root, 'p', Edisposition) sp = _enum(root, 'sp', Edisposition) pct = _integer(root, 'pct') nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp', 'pct']) return nt(domain, adkim, aspf, p, sp, pct) def record(root): def row(root): def policyevaluated(root): def polreason(root): Etype = ['forwarded', 'sampled_out', 'trusted_forwarder', 'mailing_list', 'local_policy', 'other'] otype = _enum(root, 'type', Etype) comm = _text(root, 'comment', mandatory=False) nt = namedtuple('PolicyOverrideReason', ['type', 'comment']) return nt(otype, comm) Eresult = ['pass', 'fail'] disp = _enum(root, 'disposition', Edisposition) dkim = _enum(root, 'dkim', Eresult) spf = _enum(root, 'spf', Eresult) reason = root.findall('reason') if reason: reason = map(polreason, reason) nt = namedtuple('PolicyEvaluatedType', ['disposition', 'dkim', 'spf', 'reason']) return nt(disp, dkim, spf, reason) source = _text(root, 'source_ip') count = _integer(root, 'count') poleval = root.find('policy_evaluated') if not poleval: raise MalformedReportException('policy_evaluated missing') nt = namedtuple('RowType', ['source_ip', 'count', 'policy_evaluated']) return nt(source, count, policyevaluated(poleval)) def identifier(root): envto = _text(root, 'envelope_to', mandatory=False) hdrfrom = _text(root, 'header_from') nt = namedtuple('IdentifierType', ['envelope_to', 'header_from']) return nt(envto, hdrfrom) def authresult(root): def dkimauth(root): Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror', 'permerror'] domain = _text(root, 'domain') selector = _text(root, 'selector', mandatory=False) result = _enum(root, 'result', Eresult) human_result = _text(root, 'human_result', mandatory=False) nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result', 'human_result']) return nt(domain, selector, result, human_result) def spfauth(root): # Escope = ['helo', 'mfrom'] #Deprecated Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', 'permerror'] domain = _text(root, 'domain') result = _enum(root, 'result', Eresult) nt = namedtuple('SPFAuthResultType', ['domain', 'result']) return nt(domain, result) dkims = root.findall('dkim') spfs = root.findall('spf') if not spfs: raise MalformedReportException('auth_results.spf missing') else: spfs = map(spfauth, spfs) if dkims: dkims = map(dkimauth, dkims) nt = namedtuple('AuthResultType', ['dkim', 'spf']) return nt([*dkims], [*spfs]) r = root.find('row') idents = root.find('identifiers') results = root.find('auth_results') if not r or not idents or not results: raise MalformedReportException('row, identifiers or auth_results missing') nt = namedtuple('RecordType', ['row', 'identifiers', 'auth_results']) return nt(row(r), identifier(idents), authresult(results)) meta = root.find('report_metadata') pol = root.find('policy_published') recs = root.findall('record') if meta is None or pol is None or not recs: raise MalformedReportException('report_metadata, policy_published or record missing') nt = namedtuple('feedback', ['report_metadata', 'policy_published', 'record']) return nt(reportmeta(meta), policypub(pol), list(map(record, recs))) def parse_dmarc(r): root = xmlparse(r) if root.tag != 'feedback': raise MalformedReportException('root is not feedback') return _feedback(root)