#!/usr/bin/env python3 # dmarcstats.py - DMARC report display and statistic tool. # Copyright (C) 2017 Tomasz Kramkowski # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import sqlite3 from os.path import join as pathjoin from flask import Flask, g, render_template, request, Markup, url_for from datetime import datetime from multiprocessing.pool import ThreadPool from socket import getfqdn sv = Flask(__name__) sv.config.from_object(__name__) sv.config.update({ 'database': pathjoin(sv.root_path, 'dmarc.db'), }) def connect_db(): rv = sqlite3.connect(sv.config['database']) c = rv.cursor() assert(c.execute('PRAGMA user_version').fetchone()[0] == 1) rv.row_factory = sqlite3.Row return rv def get_db(): if not hasattr(g, 'sqlite_db'): g.sqlite_db = connect_db() return g.sqlite_db @sv.teardown_appcontext def close_db(error): if hasattr(g, 'sqlite_db'): g.sqlite_db.close() def format_isotime(value, sep=' ', timespec='auto'): return datetime.utcfromtimestamp(value).isoformat(sep=sep, timespec=timespec) sv.jinja_env.filters['isotime'] = format_isotime rlut = dict() def bulk_rdns_worker(addr): return addr, getfqdn(addr) def bulk_rdns(addrs): addrs = [a for a in addrs if a not in rlut.keys()] if not addrs: return pool = ThreadPool(processes=min(len(addrs), 50)) for addr, dname in pool.imap( bulk_rdns_worker, addrs, chunksize=1): rlut[addr] = dname pool.close() def format_ipaddr(value): try: return rlut[value] except KeyError: return value sv.jinja_env.filters['fqdn'] = format_ipaddr def format_tblclass(value): return { 'pass': 'success', 'policy': 'info', 'none': 'notaclass', 'neutral': 'info', 'softfail': 'warning', 'temperror': 'warning', 'fail': 'danger', 'permerror': 'danger', }[value] sv.jinja_env.filters['tblclass'] = format_tblclass def format_alignment(value): return { 'r': 'Relaxed', 's': 'Strict', None: 'Unknown' }[value] sv.jinja_env.filters['alignment'] = format_alignment def order_header(name, dest, order, current): icon = '△' destorder = order if order == current: icon = '▲' destorder = '{} desc'.format(order) if '{} desc'.format(order) == current: icon = '▼' href = url_for(dest, order=destorder) return '{} {}'.format(href, name, icon) def format_pctclass(value, maximum, td=None, tw=None): td = td or maximum / 3 tw = tw or maximum / 3 * 2 if value < td: return 'danger' elif value < tw: return 'warning' else: return 'success' sv.jinja_env.filters['pctclass'] = format_pctclass @sv.route('/') def index(): return render_template('index.html') @sv.route('/feedback') def feedback(): db = get_db() order = request.args.get('order', 'rm_date_begin').split() if order[0] not in ('rm_org_name', 'rm_date_begin', 'rm_date_end', 'nemails'): order = 'rm_date_begin' if len(order) == 2 and order[1] == 'desc': order = ' '.join(order) else: order = order[0] feedback = db.execute(''' SELECT f.*, SUM(row_count) AS nemails, SUM((row_pol_disposition = 'none') * row_count) AS pass_none, SUM((row_pol_dkim = 'pass' OR row_pol_spf = 'pass') * row_count) AS pass_dmarc, SUM((row_pol_dkim = 'pass') * row_count) AS pass_dkim, SUM((row_pol_spf = 'pass') * row_count) AS pass_spf FROM feedback AS f JOIN record AS r USING (feedbackid) GROUP BY feedbackid ORDER BY {} '''.format(order)).fetchall() return render_template('feedback/index.html', feedback=feedback, order=order, orderhdr=order_header) @sv.route('/feedback/') def report(feedbackid=None): db = get_db() feedback = db.execute('SELECT * FROM feedback WHERE feedbackid = ?', (feedbackid, )).fetchone() records = db.execute('SELECT * FROM record WHERE feedbackid = ?', (feedbackid, )).fetchall() bulk_rdns([r['row_source_ip'] for r in records]) total = dict( records = sum(r['row_count'] for r in records), none = sum(r['row_pol_disposition'] == 'none' and r['row_count'] or 0 for r in records), dmarc = sum((r['row_pol_dkim'] == 'pass' or r['row_pol_spf'] == 'pass') and r['row_count'] or 0 for r in records), dkim = sum(r['row_pol_dkim'] == 'pass' and r['row_count'] or 0 for r in records), spf = sum(r['row_pol_spf'] == 'pass' and r['row_count'] or 0 for r in records)) return render_template('feedback/report.html', feedback=feedback, records=records, total=total) @sv.route('/record/') def record(recordid): db = get_db() record = db.execute('SELECT * FROM record WHERE recordid = ?', (recordid, )).fetchone() bulk_rdns([record['row_source_ip']]) reasons = db.execute('SELECT * FROM row_pol_reason WHERE recordid = ?', (recordid, )).fetchall() resdkim = db.execute('SELECT * FROM res_dkim WHERE recordid = ?', (recordid, )).fetchall() resspf = db.execute('SELECT * FROM res_spf WHERE recordid = ?', (recordid, )).fetchall() return render_template('record/index.html', record=record, reasons=reasons, resdkim=resdkim, resspf=resspf) @sv.route('/feedback/.xml') def export(feedbackid): db = get_db() feedback = db.execute('SELECT * FROM feedback WHERE feedbackid = ?', (feedbackid, )).fetchone() records = db.execute('SELECT * FROM record WHERE feedbackid = ?', (feedbackid, )).fetchall() reasons = db.execute(''' SELECT r.* FROM row_pol_reason AS r JOIN record USING (recordid) WHERE feedbackid = ?''', (feedbackid, )).fetchall() resdkim = db.execute(''' SELECT d.* FROM res_dkim AS d JOIN record USING (recordid) WHERE feedbackid = ?''', (feedbackid, )).fetchall() resspf = db.execute(''' SELECT s.* FROM res_spf AS s JOIN record USING (recordid) WHERE feedbackid = ?''', (feedbackid, )).fetchall() return render_template('feedback/export.xml', feedback=feedback, records=records, reasons=reasons, resdkim=resdkim, resspf=resspf) @sv.route('/stats') def stats(): return render_template('stats/index.html')