From 446baea9b316b0d00ed947280ac712d70bbb7443 Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Sun, 18 Dec 2016 21:31:26 +0000 Subject: Init commit --- dmarc.py | 170 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 dmarc.py (limited to 'dmarc.py') diff --git a/dmarc.py b/dmarc.py new file mode 100644 index 0000000..1ff327c --- /dev/null +++ b/dmarc.py @@ -0,0 +1,170 @@ +from collections import namedtuple +from defusedxml.ElementTree import fromstring as xmlparse + +class MalformedReportException(Exception): + pass + +def _text(elem, not_missing=True): + if elem is None: + if not_missing == True: + raise MalformedReportException('element missing') + return None + if not elem.text: + return '' + return elem.text + +def _integer(elem, not_missing=True): + text = _text(elem, not_missing) + try: + return int(text) + except ValueError: + raise MalformedReportException("integer element invalid: '{}'".format(text)) + +def _enum(elem, values, not_missing=True): + text = _text(elem, not_missing) + if not text: + return None + if text not in values: + raise MalformedReportException("enum element invalid: '{}' expected: '{}'".format(text, values)) + return text + +def _feedback(root): + Edisposition = ['none', 'quarantine', 'reject'] + + def reportmeta(root): + def daterange(root): + begin = _integer(root.find('begin')) + end = _integer(root.find('end')) + if not begin or not end: + return MalformedReportException('date range begin or end missing') + + nt = namedtuple('DateRangeType', ['begin', 'end']) + return nt(begin, end) + + org = _text(root.find('org_name')) + email = _text(root.find('email')) + extra = _text(root.find('extra_contact_info'), not_missing=False) + rid = _text(root.find('report_id')) + drange = daterange(root.find('date_range')) + errors = root.findall('error') + if errors: + errors = map(_text, errors) + + nt = namedtuple('ReportMetadataType', ['org_name', 'email', 'extra_contact_info', + 'report_id', 'date_range', 'error']) + return nt(org, email, extra, rid, drange, errors) + + def policypub(root): + Ealignment = ['r', 's'] + + domain = _text(root.find('domain')) + adkim = _enum(root.find('adkim'), Ealignment, not_missing=False) + aspf = _enum(root.find('aspf'), Ealignment, not_missing=False) + p = _enum(root.find('p'), Edisposition) + sp = _enum(root.find('sp'), Edisposition) + pct = _integer(root.find('pct')) + + nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp', + 'pct']) + return nt(domain, adkim, aspf, p, sp, pct) + + def record(root): + def row(root): + def policyevaluated(root): + def polreason(root): + Etype = ['forwarded', 'sampled_out', 'trusted_forwarder', + 'mailing_list', 'local_policy', 'other'] + + otype = _enum(root.find('type'), Etype) + comm = _text(root.find('comment'), not_missing=False) + + nt = namedtuple('PolicyOverrideReason', ['type', 'comment']) + return nt(otype, comm) + + Eresult = ['pass', 'fail'] + disp = _enum(root.find('disposition'), Edisposition) + dkim = _enum(root.find('dkim'), Eresult) + spf = _enum(root.find('spf'), Eresult) + reason = root.findall('reason') + if reason: + reason = map(polreason, reason) + + nt = namedtuple('PolicyEvaluatedType', ['disposition', 'dkim', 'spf', + 'reason']) + return nt(disp, dkim, spf, reason) + + source = _text(root.find('source_ip')) + count = _integer(root.find('count')) + poleval = root.find('policy_evaluated') + if not poleval: + raise MalformedReportException('policy_evaluated missing') + + nt = namedtuple('RowType', ['source_ip', 'count', 'policy_evaluated']) + return nt(source, count, policyevaluated(poleval)) + + def identifier(root): + envto = _text(root.find('envelope_to'), not_missing=False) + hdrfrom = _text(root.find('header_from')) + + nt = namedtuple('IdentifierType', ['envelope_to', 'header_from']) + return nt(envto, hdrfrom) + + def authresult(root): + def dkimauth(root): + Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror', + 'permerror'] + domain = _text(root.find('domain')) + selector = _text(root.find('selector'), not_missing=False) + result = _enum(root.find('result'), Eresult) + human_result = _text(root.find('human_result'), not_missing=False) + nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result', + 'human_result']) + return nt(domain, selector, result, human_result) + + def spfauth(root): + Escope = ['helo', 'mfrom'] + Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', + 'permerror'] + + domain = _text(root.find('domain')) + result = _enum(root.find('result'), Eresult) + + nt = namedtuple('SPFAuthResultType', ['domain', 'result']) + return nt(domain, result) + + dkims = root.findall('dkim') + spfs = root.findall('spf') + if not spfs: + raise MalformedReportException('auth_results.spf missing') + else: + spfs = map(spfauth, spfs) + if dkims: + dkims = map(dkimauth, dkims) + + nt = namedtuple('AuthResultType', ['dkim', 'spf']) + return nt([*dkims], [*spfs]) + + r = root.find('row') + idents = root.find('identifiers') + results = root.find('auth_results') + if not r or not idents or not results: + raise MalformedReportException('row, identifiers or auth_results missing') + + nt = namedtuple('RecordType', ['row', 'identifiers', 'auth_results']) + return nt(row(r), identifier(idents), authresult(results)) + + meta = root.find('report_metadata') + pol = root.find('policy_published') + recs = root.findall('record') + if meta is None or pol is None or not recs: + raise MalformedReportException('report_metadata, policy_published or record missing') + + nt = namedtuple('feedback', ['report_metadata', 'policy_published', 'record']) + return nt(reportmeta(meta), policypub(pol), list(map(record, recs))) + +def parse_dmarc(r): + root = xmlparse(r) + if root.tag != 'feedback': + raise MalformedReportException('root is not feedback') + + return _feedback(root) -- cgit v1.2.3-54-g00ecf