diff options
author | Tomasz Kramkowski <tk@the-tk.com> | 2016-12-18 21:31:26 +0000 |
---|---|---|
committer | Tomasz Kramkowski <tk@the-tk.com> | 2016-12-18 21:31:26 +0000 |
commit | 446baea9b316b0d00ed947280ac712d70bbb7443 (patch) | |
tree | d724a55f50cf5c0ff6bb8523074e1a73db80d957 | |
download | dmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.tar.gz dmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.tar.xz dmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.zip |
Init commit
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | dmarc.py | 170 | ||||
-rwxr-xr-x | dmarcpipe.py | 92 | ||||
-rw-r--r-- | sql.py | 144 |
4 files changed, 407 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/dmarc.py b/dmarc.py new file mode 100644 index 0000000..1ff327c --- /dev/null +++ b/dmarc.py @@ -0,0 +1,170 @@ +from collections import namedtuple +from defusedxml.ElementTree import fromstring as xmlparse + +class MalformedReportException(Exception): + pass + +def _text(elem, not_missing=True): + if elem is None: + if not_missing == True: + raise MalformedReportException('element missing') + return None + if not elem.text: + return '' + return elem.text + +def _integer(elem, not_missing=True): + text = _text(elem, not_missing) + try: + return int(text) + except ValueError: + raise MalformedReportException("integer element invalid: '{}'".format(text)) + +def _enum(elem, values, not_missing=True): + text = _text(elem, not_missing) + if not text: + return None + if text not in values: + raise MalformedReportException("enum element invalid: '{}' expected: '{}'".format(text, values)) + return text + +def _feedback(root): + Edisposition = ['none', 'quarantine', 'reject'] + + def reportmeta(root): + def daterange(root): + begin = _integer(root.find('begin')) + end = _integer(root.find('end')) + if not begin or not end: + return MalformedReportException('date range begin or end missing') + + nt = namedtuple('DateRangeType', ['begin', 'end']) + return nt(begin, end) + + org = _text(root.find('org_name')) + email = _text(root.find('email')) + extra = _text(root.find('extra_contact_info'), not_missing=False) + rid = _text(root.find('report_id')) + drange = daterange(root.find('date_range')) + errors = root.findall('error') + if errors: + errors = map(_text, errors) + + nt = namedtuple('ReportMetadataType', ['org_name', 'email', 'extra_contact_info', + 'report_id', 'date_range', 'error']) + return nt(org, email, extra, rid, drange, errors) + + def policypub(root): + Ealignment = ['r', 's'] + + domain = _text(root.find('domain')) + adkim = _enum(root.find('adkim'), Ealignment, not_missing=False) + aspf = _enum(root.find('aspf'), Ealignment, not_missing=False) + p = _enum(root.find('p'), Edisposition) + sp = _enum(root.find('sp'), Edisposition) + pct = _integer(root.find('pct')) + + nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp', + 'pct']) + return nt(domain, adkim, aspf, p, sp, pct) + + def record(root): + def row(root): + def policyevaluated(root): + def polreason(root): + Etype = ['forwarded', 'sampled_out', 'trusted_forwarder', + 'mailing_list', 'local_policy', 'other'] + + otype = _enum(root.find('type'), Etype) + comm = _text(root.find('comment'), not_missing=False) + + nt = namedtuple('PolicyOverrideReason', ['type', 'comment']) + return nt(otype, comm) + + Eresult = ['pass', 'fail'] + disp = _enum(root.find('disposition'), Edisposition) + dkim = _enum(root.find('dkim'), Eresult) + spf = _enum(root.find('spf'), Eresult) + reason = root.findall('reason') + if reason: + reason = map(polreason, reason) + + nt = namedtuple('PolicyEvaluatedType', ['disposition', 'dkim', 'spf', + 'reason']) + return nt(disp, dkim, spf, reason) + + source = _text(root.find('source_ip')) + count = _integer(root.find('count')) + poleval = root.find('policy_evaluated') + if not poleval: + raise MalformedReportException('policy_evaluated missing') + + nt = namedtuple('RowType', ['source_ip', 'count', 'policy_evaluated']) + return nt(source, count, policyevaluated(poleval)) + + def identifier(root): + envto = _text(root.find('envelope_to'), not_missing=False) + hdrfrom = _text(root.find('header_from')) + + nt = namedtuple('IdentifierType', ['envelope_to', 'header_from']) + return nt(envto, hdrfrom) + + def authresult(root): + def dkimauth(root): + Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror', + 'permerror'] + domain = _text(root.find('domain')) + selector = _text(root.find('selector'), not_missing=False) + result = _enum(root.find('result'), Eresult) + human_result = _text(root.find('human_result'), not_missing=False) + nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result', + 'human_result']) + return nt(domain, selector, result, human_result) + + def spfauth(root): + Escope = ['helo', 'mfrom'] + Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror', + 'permerror'] + + domain = _text(root.find('domain')) + result = _enum(root.find('result'), Eresult) + + nt = namedtuple('SPFAuthResultType', ['domain', 'result']) + return nt(domain, result) + + dkims = root.findall('dkim') + spfs = root.findall('spf') + if not spfs: + raise MalformedReportException('auth_results.spf missing') + else: + spfs = map(spfauth, spfs) + if dkims: + dkims = map(dkimauth, dkims) + + nt = namedtuple('AuthResultType', ['dkim', 'spf']) + return nt([*dkims], [*spfs]) + + r = root.find('row') + idents = root.find('identifiers') + results = root.find('auth_results') + if not r or not idents or not results: + raise MalformedReportException('row, identifiers or auth_results missing') + + nt = namedtuple('RecordType', ['row', 'identifiers', 'auth_results']) + return nt(row(r), identifier(idents), authresult(results)) + + meta = root.find('report_metadata') + pol = root.find('policy_published') + recs = root.findall('record') + if meta is None or pol is None or not recs: + raise MalformedReportException('report_metadata, policy_published or record missing') + + nt = namedtuple('feedback', ['report_metadata', 'policy_published', 'record']) + return nt(reportmeta(meta), policypub(pol), list(map(record, recs))) + +def parse_dmarc(r): + root = xmlparse(r) + if root.tag != 'feedback': + raise MalformedReportException('root is not feedback') + + return _feedback(root) diff --git a/dmarcpipe.py b/dmarcpipe.py new file mode 100755 index 0000000..5e26d34 --- /dev/null +++ b/dmarcpipe.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +from dmarc import parse_dmarc +from getopt import gnu_getopt +from gzip import decompress as gz_decompress +from io import BytesIO +from re import compile as re_compile +from sql import store_dmarc +from sys import stdin, stdout, argv +from zipfile import ZipFile +import email + +default_enc = 'US-ASCII' +policy_domain = 'the-tk.com' +re_valid_filename = re_compile('^[^\\s!]+![^\\s!]+![0-9]+![0-9]+(![^\\s!]+)?.(xml(.gz)?|zip)$') +logfile = None +dbfile = 'dmarc.db' + +class FalseReportException(Exception): + pass + +def is_valid_filename(f): + if re_valid_filename.match(f) is not None: + return True + return False + +def zip2xml(data): + z = ZipFile(BytesIO(data)) + n = z.namelist() + if len(n) != 1 or not is_valid_filename(n[0]): + raise FalseReportException('zip2xml: broken zip len({})'.format(len(n))) + return z.open(n[0]).read() + +gzip2xml = lambda data: gz_decompress(data) + +decoders = { + 'application/zip': zip2xml, + 'application/x-zip': zip2xml, + 'application/x-zip-compressed': zip2xml, + 'application/zlib': gzip2xml, + 'application/gzip': gzip2xml, + 'text/xml': lambda data: data, + } + +def process_message(m): + if m.get_content_maintype() != 'multipart' and \ + m.get_content_type() not in decoders.keys(): + raise FalseReportException('invalid content type: {}'.format(m.get_content_type())) + + att = None + inv_fn = False + + for part in m.walk(): + if part.get_content_maintype() == 'multipart': + continue + if part.get_content_type() not in decoders.keys(): + continue + if part.get('Content-Disposition') is None: + continue + if not is_valid_filename(part.get_filename()): + inv_fn = True + continue + att = part + break + + if att is None: + raise FalseReportException('attachment not found{}'.format(inv_fn == True and ' (invalid filename)' or '')) + + xml = decoders[att.get_content_type()](att.get_payload(decode=True)) + dmarc = parse_dmarc(xml) + store_dmarc(dbfile, dmarc) + +def main(): + global logfile, dbfile + opts, args = gnu_getopt(argv, 'l:') + for o in opts: + if o[0] == '-l': + logfile = o[1] + if len(args) == 2: + print(args[1]) + dbfile = args[1] + m = email.message_from_file(stdin) + try: + process_message(m) + m['X-DMARC-Report'] = 'True' + except Exception as e: + m['X-DMARC-Report'] = 'False; {}'.format(repr(e)) + finally: + stdout.write(m.as_string()) + +if __name__ == '__main__': + main() @@ -0,0 +1,144 @@ +import sqlite3 + +_schema = ''' +CREATE TABLE feedback ( + feedbackid INTEGER PRIMARY KEY AUTOINCREMENT, + rm_org_name TEXT NOT NULL, + rm_email TEXT NOT NULL, + rm_extra_contact_info TEXT, + rm_report_id TEXT UNIQUE NOT NULL, + rm_date_begin INTEGER NOT NULL, + rm_date_end INTEGER NOT NULL, + pp_domain TEXT NOT NULL, + pp_adkim TEXT, + pp_aspf TEXT, + pp_p TEXT NOT NULL, + pp_sp TEXT NOT NULL, + pp_pct TEXT NOT NULL +); + +CREATE TABLE rm_error ( + rm_errorid INTEGER PRIMARY KEY AUTOINCREMENT, + feedbackid INTEGER NOT NULL, + error TEXT NOT NULL, + FOREIGN KEY(feedbackid) REFERENCES feedback +); + +CREATE TABLE record ( + recordid INTEGER PRIMARY KEY AUTOINCREMENT, + feedbackid INTEGER NOT NULL, + row_source_ip TEXT NOT NULL, + row_count INTEGER NOT NULL, + row_pol_disposition TEXT NOT NULL, + row_pol_dkim TEXT NOT NULL, + row_pol_spf TEXT NOT NULL, + ids_envelope_to TEXT, + ids_header_from TEXT NOT NULL, + FOREIGN KEY(feedbackid) REFERENCES feedback +); + +CREATE TABLE row_pol_reason ( + row_pol_reasonid INTEGER PRIMARY KEY AUTOINCREMENT, + recordid INTEGER NOT NULL, + type TEXT NOT NULL, + comment TEXT, + FOREIGN KEY(recordid) REFERENCES record +); + +CREATE TABLE res_dkim ( + res_dkimid INTEGER PRIMARY KEY AUTOINCREMENT, + recordid INTEGER NOT NULL, + domain TEXT NOT NULL, + selector TEXT, + result TEXT NOT NULL, + human_result TEXT, + FOREIGN KEY(recordid) REFERENCES record +); + +CREATE TABLE res_spf ( + res_spfid INTEGER PRIMARY KEY AUTOINCREMENT, + recordid INTEGER NOT NULL, + domain TEXT NOT NULL, + result TEXT NOT NULL, + FOREIGN KEY(recordid) REFERENCES record +); +''' + +def _init_db(conn): + c = conn.cursor() + c.execute('PRAGMA foreign_keys = ON') + version = c.execute('PRAGMA user_version').fetchone()[0] + if version == 1: + return + c.executescript(_schema) + c.execute('PRAGMA user_version = 1') + +def _flatten(t): + for v in t: + if isinstance(v, tuple): + for vv in flatten(v): + yield vv + else: + yield v + +def _insert_dmarc(conn, dmarc): + c = conn.cursor() + c.execute('''INSERT INTO feedback (rm_org_name, rm_email, + rm_extra_contact_info, rm_report_id, rm_date_begin, rm_date_end, + pp_domain, pp_adkim, pp_aspf, pp_p, pp_sp, pp_pct) VALUES + (?,?,?,?,?,?,?,?,?,?,?,?)''', ( + dmarc.report_metadata.org_name, + dmarc.report_metadata.email, + dmarc.report_metadata.extra_contact_info, + dmarc.report_metadata.report_id, + *_flatten(dmarc.report_metadata.date_range), + *_flatten(dmarc.policy_published) + )) + feedback = c.lastrowid + c.executemany('INSERT INTO rm_error (feedbackid, error) VALUES (?,?)', ( + *(( + feedback, e + ) for e in dmarc.report_metadata.error + ), + )) + for rec in dmarc.record: + c.execute('''INSERT INTO record (feedbackid, row_source_ip, row_count, + row_pol_disposition, row_pol_dkim, row_pol_spf, ids_envelope_to, + ids_header_from) VALUES (?,?,?,?,?,?,?,?)''', ( + feedback, + rec.row.source_ip, + rec.row.count, + rec.row.policy_evaluated.disposition, + rec.row.policy_evaluated.dkim, + rec.row.policy_evaluated.spf, + *_flatten(rec.identifiers) + )) + record = c.lastrowid + c.executemany('''INSERT INTO row_pol_reason (recordid, type, comment) + VALUES (?,?,?)''', ( + *(( + record, r.type, r.comment + ) for r in rec.row.policy_evaluated.reason + ), + )) + c.executemany('''INSERT INTO res_dkim (recordid, domain, selector, + result, human_result) VALUES (?,?,?,?,?)''', ( + *(( + record, *_flatten(d) + ) for d in rec.auth_results.dkim + ), + )) + c.executemany('''INSERT INTO res_spf (recordid, domain, result) + VALUES (?,?,?)''', ( + *(( + record, *_flatten(s) + ) for s in rec.auth_results.spf + ), + )) + +def store_dmarc(dbfile, dmarc): + with sqlite3.connect(dbfile) as conn: + with conn: + _init_db(conn) + with conn: + _insert_dmarc(conn, dmarc) |