#!/usr/bin/env python3
import sqlite3
from os.path import join as pathjoin
from flask import Flask, g, render_template, request, Markup, url_for
from datetime import datetime
from multiprocessing.pool import ThreadPool
from socket import getfqdn
sv = Flask(__name__)
sv.config.from_object(__name__)
sv.config.update({
'database': pathjoin(sv.root_path, 'dmarc.db'),
})
def connect_db():
rv = sqlite3.connect(sv.config['database'])
c = rv.cursor()
assert(c.execute('PRAGMA user_version').fetchone()[0] == 1)
rv.row_factory = sqlite3.Row
return rv
def get_db():
if not hasattr(g, 'sqlite_db'):
g.sqlite_db = connect_db()
return g.sqlite_db
@sv.teardown_appcontext
def close_db(error):
if hasattr(g, 'sqlite_db'):
g.sqlite_db.close()
def format_isotime(value, sep=' ', timespec='auto'):
return datetime.utcfromtimestamp(value).isoformat(sep=sep, timespec=timespec)
sv.jinja_env.filters['isotime'] = format_isotime
rlut = dict()
def bulk_rdns_worker(addr):
return addr, getfqdn(addr)
def bulk_rdns(addrs):
addrs = [a for a in addrs if a not in rlut.keys()]
if not addrs:
return
pool = ThreadPool(processes=min(len(addrs), 50))
for addr, dname in pool.imap(
bulk_rdns_worker,
addrs,
chunksize=1):
rlut[addr] = dname
pool.close()
def format_ipaddr(value):
try:
return rlut[value]
except KeyError:
return value
sv.jinja_env.filters['fqdn'] = format_ipaddr
def format_tblclass(value):
return {
'pass': 'success',
'policy': 'info',
'none': 'notaclass',
'neutral': 'info',
'softfail': 'warning',
'temperror': 'warning',
'fail': 'danger',
'permerror': 'danger',
}[value]
sv.jinja_env.filters['tblclass'] = format_tblclass
def format_alignment(value):
return { 'r': 'Relaxed', 's': 'Strict', None: 'Unknown' }[value]
sv.jinja_env.filters['alignment'] = format_alignment
def order_header(name, dest, order, current):
icon = '△'
destorder = order
if order == current:
icon = '▲'
destorder = '{} desc'.format(order)
if '{} desc'.format(order) == current:
icon = '▼'
href = url_for(dest, order=destorder)
return '{} {}'.format(href, name, icon)
def format_pctclass(value, maximum, td=None, tw=None):
td = td or maximum / 3
tw = tw or maximum / 3 * 2
if value < td:
return 'danger'
elif value < tw:
return 'warning'
else:
return 'success'
sv.jinja_env.filters['pctclass'] = format_pctclass
@sv.route('/')
def index():
return render_template('index.html')
@sv.route('/feedback')
def feedback():
db = get_db()
order = request.args.get('order', 'rm_date_begin').split()
if order[0] not in ('rm_org_name', 'rm_date_begin', 'rm_date_end',
'nemails'):
order = 'rm_date_begin'
if len(order) == 2 and order[1] == 'desc':
order = ' '.join(order)
else:
order = order[0]
feedback = db.execute('''
SELECT f.*, SUM(row_count) AS nemails,
SUM((row_pol_disposition = 'none') * row_count) AS pass_none,
SUM((row_pol_dkim = 'pass' OR row_pol_spf = 'pass') * row_count) AS pass_dmarc,
SUM((row_pol_dkim = 'pass') * row_count) AS pass_dkim,
SUM((row_pol_spf = 'pass') * row_count) AS pass_spf
FROM feedback AS f
JOIN record AS r USING (feedbackid)
GROUP BY feedbackid
ORDER BY {}
'''.format(order)).fetchall()
return render_template('feedback/index.html', feedback=feedback,
order=order, orderhdr=order_header)
@sv.route('/feedback/')
def report(feedbackid=None):
db = get_db()
feedback = db.execute('SELECT * FROM feedback WHERE feedbackid = ?',
(feedbackid, )).fetchone()
records = db.execute('SELECT * FROM record WHERE feedbackid = ?',
(feedbackid, )).fetchall()
bulk_rdns([r['row_source_ip'] for r in records])
total = dict(
records = sum(r['row_count'] for r in records),
none = sum(r['row_pol_disposition'] == 'none'
and r['row_count'] or 0 for r in records),
dmarc = sum((r['row_pol_dkim'] == 'pass' or
r['row_pol_spf'] == 'pass')
and r['row_count'] or 0 for r in records),
dkim = sum(r['row_pol_dkim'] == 'pass'
and r['row_count'] or 0 for r in records),
spf = sum(r['row_pol_spf'] == 'pass'
and r['row_count'] or 0 for r in records))
return render_template('feedback/report.html', feedback=feedback,
records=records, total=total)
@sv.route('/record/')
def record(recordid):
db = get_db()
record = db.execute('SELECT * FROM record WHERE recordid = ?',
(recordid, )).fetchone()
bulk_rdns([record['row_source_ip']])
reasons = db.execute('SELECT * FROM row_pol_reason WHERE recordid = ?',
(recordid, )).fetchall()
resdkim = db.execute('SELECT * FROM res_dkim WHERE recordid = ?',
(recordid, )).fetchall()
resspf = db.execute('SELECT * FROM res_spf WHERE recordid = ?',
(recordid, )).fetchall()
return render_template('record/index.html', record=record, reasons=reasons,
resdkim=resdkim, resspf=resspf)
@sv.route('/feedback/.xml')
def export(feedbackid):
db = get_db()
feedback = db.execute('SELECT * FROM feedback WHERE feedbackid = ?',
(feedbackid, )).fetchone()
records = db.execute('SELECT * FROM record WHERE feedbackid = ?',
(feedbackid, )).fetchall()
reasons = db.execute('''
SELECT r.* FROM row_pol_reason AS r
JOIN record USING (recordid)
WHERE feedbackid = ?''', (feedbackid, )).fetchall()
resdkim = db.execute('''
SELECT d.* FROM res_dkim AS d
JOIN record USING (recordid)
WHERE feedbackid = ?''', (feedbackid, )).fetchall()
resspf = db.execute('''
SELECT s.* FROM res_spf AS s
JOIN record USING (recordid)
WHERE feedbackid = ?''', (feedbackid, )).fetchall()
return render_template('feedback/export.xml', feedback=feedback,
records=records, reasons=reasons, resdkim=resdkim, resspf=resspf)
@sv.route('/stats')
def stats():
return render_template('stats/index.html')