summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Kramkowski <tk@the-tk.com>2016-12-18 21:31:26 +0000
committerTomasz Kramkowski <tk@the-tk.com>2016-12-18 21:31:26 +0000
commit446baea9b316b0d00ed947280ac712d70bbb7443 (patch)
treed724a55f50cf5c0ff6bb8523074e1a73db80d957
downloaddmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.tar.gz
dmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.tar.xz
dmarcpipe-446baea9b316b0d00ed947280ac712d70bbb7443.zip
Init commit
-rw-r--r--.gitignore1
-rw-r--r--dmarc.py170
-rwxr-xr-xdmarcpipe.py92
-rw-r--r--sql.py144
4 files changed, 407 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/dmarc.py b/dmarc.py
new file mode 100644
index 0000000..1ff327c
--- /dev/null
+++ b/dmarc.py
@@ -0,0 +1,170 @@
+from collections import namedtuple
+from defusedxml.ElementTree import fromstring as xmlparse
+
+class MalformedReportException(Exception):
+ pass
+
+def _text(elem, not_missing=True):
+ if elem is None:
+ if not_missing == True:
+ raise MalformedReportException('element missing')
+ return None
+ if not elem.text:
+ return ''
+ return elem.text
+
+def _integer(elem, not_missing=True):
+ text = _text(elem, not_missing)
+ try:
+ return int(text)
+ except ValueError:
+ raise MalformedReportException("integer element invalid: '{}'".format(text))
+
+def _enum(elem, values, not_missing=True):
+ text = _text(elem, not_missing)
+ if not text:
+ return None
+ if text not in values:
+ raise MalformedReportException("enum element invalid: '{}' expected: '{}'".format(text, values))
+ return text
+
+def _feedback(root):
+ Edisposition = ['none', 'quarantine', 'reject']
+
+ def reportmeta(root):
+ def daterange(root):
+ begin = _integer(root.find('begin'))
+ end = _integer(root.find('end'))
+ if not begin or not end:
+ return MalformedReportException('date range begin or end missing')
+
+ nt = namedtuple('DateRangeType', ['begin', 'end'])
+ return nt(begin, end)
+
+ org = _text(root.find('org_name'))
+ email = _text(root.find('email'))
+ extra = _text(root.find('extra_contact_info'), not_missing=False)
+ rid = _text(root.find('report_id'))
+ drange = daterange(root.find('date_range'))
+ errors = root.findall('error')
+ if errors:
+ errors = map(_text, errors)
+
+ nt = namedtuple('ReportMetadataType', ['org_name', 'email', 'extra_contact_info',
+ 'report_id', 'date_range', 'error'])
+ return nt(org, email, extra, rid, drange, errors)
+
+ def policypub(root):
+ Ealignment = ['r', 's']
+
+ domain = _text(root.find('domain'))
+ adkim = _enum(root.find('adkim'), Ealignment, not_missing=False)
+ aspf = _enum(root.find('aspf'), Ealignment, not_missing=False)
+ p = _enum(root.find('p'), Edisposition)
+ sp = _enum(root.find('sp'), Edisposition)
+ pct = _integer(root.find('pct'))
+
+ nt = namedtuple('PolicyPublishedType', ['domain', 'adkim', 'aspf', 'p', 'sp',
+ 'pct'])
+ return nt(domain, adkim, aspf, p, sp, pct)
+
+ def record(root):
+ def row(root):
+ def policyevaluated(root):
+ def polreason(root):
+ Etype = ['forwarded', 'sampled_out', 'trusted_forwarder',
+ 'mailing_list', 'local_policy', 'other']
+
+ otype = _enum(root.find('type'), Etype)
+ comm = _text(root.find('comment'), not_missing=False)
+
+ nt = namedtuple('PolicyOverrideReason', ['type', 'comment'])
+ return nt(otype, comm)
+
+ Eresult = ['pass', 'fail']
+ disp = _enum(root.find('disposition'), Edisposition)
+ dkim = _enum(root.find('dkim'), Eresult)
+ spf = _enum(root.find('spf'), Eresult)
+ reason = root.findall('reason')
+ if reason:
+ reason = map(polreason, reason)
+
+ nt = namedtuple('PolicyEvaluatedType', ['disposition', 'dkim', 'spf',
+ 'reason'])
+ return nt(disp, dkim, spf, reason)
+
+ source = _text(root.find('source_ip'))
+ count = _integer(root.find('count'))
+ poleval = root.find('policy_evaluated')
+ if not poleval:
+ raise MalformedReportException('policy_evaluated missing')
+
+ nt = namedtuple('RowType', ['source_ip', 'count', 'policy_evaluated'])
+ return nt(source, count, policyevaluated(poleval))
+
+ def identifier(root):
+ envto = _text(root.find('envelope_to'), not_missing=False)
+ hdrfrom = _text(root.find('header_from'))
+
+ nt = namedtuple('IdentifierType', ['envelope_to', 'header_from'])
+ return nt(envto, hdrfrom)
+
+ def authresult(root):
+ def dkimauth(root):
+ Eresult = ['none', 'pass', 'fail', 'policy', 'neutral', 'temperror',
+ 'permerror']
+ domain = _text(root.find('domain'))
+ selector = _text(root.find('selector'), not_missing=False)
+ result = _enum(root.find('result'), Eresult)
+ human_result = _text(root.find('human_result'), not_missing=False)
+ nt = namedtuple('DKIMAuthResultType', ['domain', 'selector', 'result',
+ 'human_result'])
+ return nt(domain, selector, result, human_result)
+
+ def spfauth(root):
+ Escope = ['helo', 'mfrom']
+ Eresult = ['none', 'neutral', 'pass', 'fail', 'softfail', 'temperror',
+ 'permerror']
+
+ domain = _text(root.find('domain'))
+ result = _enum(root.find('result'), Eresult)
+
+ nt = namedtuple('SPFAuthResultType', ['domain', 'result'])
+ return nt(domain, result)
+
+ dkims = root.findall('dkim')
+ spfs = root.findall('spf')
+ if not spfs:
+ raise MalformedReportException('auth_results.spf missing')
+ else:
+ spfs = map(spfauth, spfs)
+ if dkims:
+ dkims = map(dkimauth, dkims)
+
+ nt = namedtuple('AuthResultType', ['dkim', 'spf'])
+ return nt([*dkims], [*spfs])
+
+ r = root.find('row')
+ idents = root.find('identifiers')
+ results = root.find('auth_results')
+ if not r or not idents or not results:
+ raise MalformedReportException('row, identifiers or auth_results missing')
+
+ nt = namedtuple('RecordType', ['row', 'identifiers', 'auth_results'])
+ return nt(row(r), identifier(idents), authresult(results))
+
+ meta = root.find('report_metadata')
+ pol = root.find('policy_published')
+ recs = root.findall('record')
+ if meta is None or pol is None or not recs:
+ raise MalformedReportException('report_metadata, policy_published or record missing')
+
+ nt = namedtuple('feedback', ['report_metadata', 'policy_published', 'record'])
+ return nt(reportmeta(meta), policypub(pol), list(map(record, recs)))
+
+def parse_dmarc(r):
+ root = xmlparse(r)
+ if root.tag != 'feedback':
+ raise MalformedReportException('root is not feedback')
+
+ return _feedback(root)
diff --git a/dmarcpipe.py b/dmarcpipe.py
new file mode 100755
index 0000000..5e26d34
--- /dev/null
+++ b/dmarcpipe.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python3
+
+from dmarc import parse_dmarc
+from getopt import gnu_getopt
+from gzip import decompress as gz_decompress
+from io import BytesIO
+from re import compile as re_compile
+from sql import store_dmarc
+from sys import stdin, stdout, argv
+from zipfile import ZipFile
+import email
+
+default_enc = 'US-ASCII'
+policy_domain = 'the-tk.com'
+re_valid_filename = re_compile('^[^\\s!]+![^\\s!]+![0-9]+![0-9]+(![^\\s!]+)?.(xml(.gz)?|zip)$')
+logfile = None
+dbfile = 'dmarc.db'
+
+class FalseReportException(Exception):
+ pass
+
+def is_valid_filename(f):
+ if re_valid_filename.match(f) is not None:
+ return True
+ return False
+
+def zip2xml(data):
+ z = ZipFile(BytesIO(data))
+ n = z.namelist()
+ if len(n) != 1 or not is_valid_filename(n[0]):
+ raise FalseReportException('zip2xml: broken zip len({})'.format(len(n)))
+ return z.open(n[0]).read()
+
+gzip2xml = lambda data: gz_decompress(data)
+
+decoders = {
+ 'application/zip': zip2xml,
+ 'application/x-zip': zip2xml,
+ 'application/x-zip-compressed': zip2xml,
+ 'application/zlib': gzip2xml,
+ 'application/gzip': gzip2xml,
+ 'text/xml': lambda data: data,
+ }
+
+def process_message(m):
+ if m.get_content_maintype() != 'multipart' and \
+ m.get_content_type() not in decoders.keys():
+ raise FalseReportException('invalid content type: {}'.format(m.get_content_type()))
+
+ att = None
+ inv_fn = False
+
+ for part in m.walk():
+ if part.get_content_maintype() == 'multipart':
+ continue
+ if part.get_content_type() not in decoders.keys():
+ continue
+ if part.get('Content-Disposition') is None:
+ continue
+ if not is_valid_filename(part.get_filename()):
+ inv_fn = True
+ continue
+ att = part
+ break
+
+ if att is None:
+ raise FalseReportException('attachment not found{}'.format(inv_fn == True and ' (invalid filename)' or ''))
+
+ xml = decoders[att.get_content_type()](att.get_payload(decode=True))
+ dmarc = parse_dmarc(xml)
+ store_dmarc(dbfile, dmarc)
+
+def main():
+ global logfile, dbfile
+ opts, args = gnu_getopt(argv, 'l:')
+ for o in opts:
+ if o[0] == '-l':
+ logfile = o[1]
+ if len(args) == 2:
+ print(args[1])
+ dbfile = args[1]
+ m = email.message_from_file(stdin)
+ try:
+ process_message(m)
+ m['X-DMARC-Report'] = 'True'
+ except Exception as e:
+ m['X-DMARC-Report'] = 'False; {}'.format(repr(e))
+ finally:
+ stdout.write(m.as_string())
+
+if __name__ == '__main__':
+ main()
diff --git a/sql.py b/sql.py
new file mode 100644
index 0000000..4207d4f
--- /dev/null
+++ b/sql.py
@@ -0,0 +1,144 @@
+import sqlite3
+
+_schema = '''
+CREATE TABLE feedback (
+ feedbackid INTEGER PRIMARY KEY AUTOINCREMENT,
+ rm_org_name TEXT NOT NULL,
+ rm_email TEXT NOT NULL,
+ rm_extra_contact_info TEXT,
+ rm_report_id TEXT UNIQUE NOT NULL,
+ rm_date_begin INTEGER NOT NULL,
+ rm_date_end INTEGER NOT NULL,
+ pp_domain TEXT NOT NULL,
+ pp_adkim TEXT,
+ pp_aspf TEXT,
+ pp_p TEXT NOT NULL,
+ pp_sp TEXT NOT NULL,
+ pp_pct TEXT NOT NULL
+);
+
+CREATE TABLE rm_error (
+ rm_errorid INTEGER PRIMARY KEY AUTOINCREMENT,
+ feedbackid INTEGER NOT NULL,
+ error TEXT NOT NULL,
+ FOREIGN KEY(feedbackid) REFERENCES feedback
+);
+
+CREATE TABLE record (
+ recordid INTEGER PRIMARY KEY AUTOINCREMENT,
+ feedbackid INTEGER NOT NULL,
+ row_source_ip TEXT NOT NULL,
+ row_count INTEGER NOT NULL,
+ row_pol_disposition TEXT NOT NULL,
+ row_pol_dkim TEXT NOT NULL,
+ row_pol_spf TEXT NOT NULL,
+ ids_envelope_to TEXT,
+ ids_header_from TEXT NOT NULL,
+ FOREIGN KEY(feedbackid) REFERENCES feedback
+);
+
+CREATE TABLE row_pol_reason (
+ row_pol_reasonid INTEGER PRIMARY KEY AUTOINCREMENT,
+ recordid INTEGER NOT NULL,
+ type TEXT NOT NULL,
+ comment TEXT,
+ FOREIGN KEY(recordid) REFERENCES record
+);
+
+CREATE TABLE res_dkim (
+ res_dkimid INTEGER PRIMARY KEY AUTOINCREMENT,
+ recordid INTEGER NOT NULL,
+ domain TEXT NOT NULL,
+ selector TEXT,
+ result TEXT NOT NULL,
+ human_result TEXT,
+ FOREIGN KEY(recordid) REFERENCES record
+);
+
+CREATE TABLE res_spf (
+ res_spfid INTEGER PRIMARY KEY AUTOINCREMENT,
+ recordid INTEGER NOT NULL,
+ domain TEXT NOT NULL,
+ result TEXT NOT NULL,
+ FOREIGN KEY(recordid) REFERENCES record
+);
+'''
+
+def _init_db(conn):
+ c = conn.cursor()
+ c.execute('PRAGMA foreign_keys = ON')
+ version = c.execute('PRAGMA user_version').fetchone()[0]
+ if version == 1:
+ return
+ c.executescript(_schema)
+ c.execute('PRAGMA user_version = 1')
+
+def _flatten(t):
+ for v in t:
+ if isinstance(v, tuple):
+ for vv in flatten(v):
+ yield vv
+ else:
+ yield v
+
+def _insert_dmarc(conn, dmarc):
+ c = conn.cursor()
+ c.execute('''INSERT INTO feedback (rm_org_name, rm_email,
+ rm_extra_contact_info, rm_report_id, rm_date_begin, rm_date_end,
+ pp_domain, pp_adkim, pp_aspf, pp_p, pp_sp, pp_pct) VALUES
+ (?,?,?,?,?,?,?,?,?,?,?,?)''', (
+ dmarc.report_metadata.org_name,
+ dmarc.report_metadata.email,
+ dmarc.report_metadata.extra_contact_info,
+ dmarc.report_metadata.report_id,
+ *_flatten(dmarc.report_metadata.date_range),
+ *_flatten(dmarc.policy_published)
+ ))
+ feedback = c.lastrowid
+ c.executemany('INSERT INTO rm_error (feedbackid, error) VALUES (?,?)', (
+ *((
+ feedback, e
+ ) for e in dmarc.report_metadata.error
+ ),
+ ))
+ for rec in dmarc.record:
+ c.execute('''INSERT INTO record (feedbackid, row_source_ip, row_count,
+ row_pol_disposition, row_pol_dkim, row_pol_spf, ids_envelope_to,
+ ids_header_from) VALUES (?,?,?,?,?,?,?,?)''', (
+ feedback,
+ rec.row.source_ip,
+ rec.row.count,
+ rec.row.policy_evaluated.disposition,
+ rec.row.policy_evaluated.dkim,
+ rec.row.policy_evaluated.spf,
+ *_flatten(rec.identifiers)
+ ))
+ record = c.lastrowid
+ c.executemany('''INSERT INTO row_pol_reason (recordid, type, comment)
+ VALUES (?,?,?)''', (
+ *((
+ record, r.type, r.comment
+ ) for r in rec.row.policy_evaluated.reason
+ ),
+ ))
+ c.executemany('''INSERT INTO res_dkim (recordid, domain, selector,
+ result, human_result) VALUES (?,?,?,?,?)''', (
+ *((
+ record, *_flatten(d)
+ ) for d in rec.auth_results.dkim
+ ),
+ ))
+ c.executemany('''INSERT INTO res_spf (recordid, domain, result)
+ VALUES (?,?,?)''', (
+ *((
+ record, *_flatten(s)
+ ) for s in rec.auth_results.spf
+ ),
+ ))
+
+def store_dmarc(dbfile, dmarc):
+ with sqlite3.connect(dbfile) as conn:
+ with conn:
+ _init_db(conn)
+ with conn:
+ _insert_dmarc(conn, dmarc)