import sqlite3 _schema = ''' CREATE TABLE feedback ( feedbackid INTEGER PRIMARY KEY AUTOINCREMENT, rm_org_name TEXT NOT NULL, rm_email TEXT NOT NULL, rm_extra_contact_info TEXT, rm_report_id TEXT UNIQUE NOT NULL, rm_date_begin INTEGER NOT NULL, rm_date_end INTEGER NOT NULL, pp_domain TEXT NOT NULL, pp_adkim TEXT, pp_aspf TEXT, pp_p TEXT NOT NULL, pp_sp TEXT NOT NULL, pp_pct TEXT NOT NULL ); CREATE TABLE rm_error ( rm_errorid INTEGER PRIMARY KEY AUTOINCREMENT, feedbackid INTEGER NOT NULL, error TEXT NOT NULL, FOREIGN KEY(feedbackid) REFERENCES feedback ); CREATE TABLE record ( recordid INTEGER PRIMARY KEY AUTOINCREMENT, feedbackid INTEGER NOT NULL, row_source_ip TEXT NOT NULL, row_count INTEGER NOT NULL, row_pol_disposition TEXT NOT NULL, row_pol_dkim TEXT NOT NULL, row_pol_spf TEXT NOT NULL, ids_envelope_to TEXT, ids_header_from TEXT NOT NULL, FOREIGN KEY(feedbackid) REFERENCES feedback ); CREATE TABLE row_pol_reason ( row_pol_reasonid INTEGER PRIMARY KEY AUTOINCREMENT, recordid INTEGER NOT NULL, type TEXT NOT NULL, comment TEXT, FOREIGN KEY(recordid) REFERENCES record ); CREATE TABLE res_dkim ( res_dkimid INTEGER PRIMARY KEY AUTOINCREMENT, recordid INTEGER NOT NULL, domain TEXT NOT NULL, selector TEXT, result TEXT NOT NULL, human_result TEXT, FOREIGN KEY(recordid) REFERENCES record ); CREATE TABLE res_spf ( res_spfid INTEGER PRIMARY KEY AUTOINCREMENT, recordid INTEGER NOT NULL, domain TEXT NOT NULL, result TEXT NOT NULL, FOREIGN KEY(recordid) REFERENCES record ); ''' def _init_db(conn): c = conn.cursor() c.execute('PRAGMA foreign_keys = ON') version = c.execute('PRAGMA user_version').fetchone()[0] if version == 1: return assert(version == 0) c.executescript(_schema) c.execute('PRAGMA user_version = 1') def _flatten(t): for v in t: if isinstance(v, tuple): for vv in flatten(v): yield vv else: yield v def _insert_dmarc(conn, dmarc): c = conn.cursor() c.execute('''INSERT INTO feedback (rm_org_name, rm_email, rm_extra_contact_info, rm_report_id, rm_date_begin, rm_date_end, pp_domain, pp_adkim, pp_aspf, pp_p, pp_sp, pp_pct) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''', ( dmarc.report_metadata.org_name, dmarc.report_metadata.email, dmarc.report_metadata.extra_contact_info, dmarc.report_metadata.report_id, *_flatten(dmarc.report_metadata.date_range), *_flatten(dmarc.policy_published) )) feedback = c.lastrowid c.executemany('INSERT INTO rm_error (feedbackid, error) VALUES (?,?)', ( *(( feedback, e ) for e in dmarc.report_metadata.error ), )) for rec in dmarc.record: c.execute('''INSERT INTO record (feedbackid, row_source_ip, row_count, row_pol_disposition, row_pol_dkim, row_pol_spf, ids_envelope_to, ids_header_from) VALUES (?,?,?,?,?,?,?,?)''', ( feedback, rec.row.source_ip, rec.row.count, rec.row.policy_evaluated.disposition, rec.row.policy_evaluated.dkim, rec.row.policy_evaluated.spf, *_flatten(rec.identifiers) )) record = c.lastrowid c.executemany('''INSERT INTO row_pol_reason (recordid, type, comment) VALUES (?,?,?)''', ( *(( record, r.type, r.comment ) for r in rec.row.policy_evaluated.reason ), )) c.executemany('''INSERT INTO res_dkim (recordid, domain, selector, result, human_result) VALUES (?,?,?,?,?)''', ( *(( record, *_flatten(d) ) for d in rec.auth_results.dkim ), )) c.executemany('''INSERT INTO res_spf (recordid, domain, result) VALUES (?,?,?)''', ( *(( record, *_flatten(s) ) for s in rec.auth_results.spf ), )) def store_dmarc(dbfile, dmarc): with sqlite3.connect(dbfile) as conn: with conn: _init_db(conn) with conn: _insert_dmarc(conn, dmarc)