blob: a408e717d24d3e27af4d64d1c2d3290a4f656893 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
#!/usr/bin/env python3
from dmarc import parse_dmarc
from getopt import gnu_getopt
from gzip import decompress as gz_decompress
from io import BytesIO
from re import compile as re_compile
from sql import store_dmarc
from sys import stdin, stdout, argv
from zipfile import ZipFile
import email
re_valid_filename = re_compile('^[^\\s!]+![^\\s!]+![0-9]+![0-9]+(![^\\s!]+)?.(xml(.gz)?|zip)$')
logfile = None
dbfile = 'dmarc.db'
class FalseReportException(Exception):
pass
def is_valid_filename(f):
if re_valid_filename.match(f) is not None:
return True
return False
def zip2xml(data):
z = ZipFile(BytesIO(data))
n = z.namelist()
if len(n) != 1 or not is_valid_filename(n[0]):
raise FalseReportException('zip2xml: broken zip len({})'.format(len(n)))
return z.open(n[0]).read()
gzip2xml = lambda data: gz_decompress(data)
decoders = {
'application/zip': zip2xml,
'application/x-zip': zip2xml,
'application/x-zip-compressed': zip2xml,
'application/zlib': gzip2xml,
'application/gzip': gzip2xml,
'text/xml': lambda data: data,
}
def process_message(m):
if m.get_content_maintype() != 'multipart' and \
m.get_content_type() not in decoders.keys():
raise FalseReportException('invalid content type: {}'.format(m.get_content_type()))
att = None
inv_fn = False
for part in m.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_type() not in decoders.keys():
continue
if part.get_content_disposition() is None:
continue
if not is_valid_filename(part.get_filename()):
inv_fn = True
continue
att = part
break
if att is None:
raise FalseReportException('attachment not found{}'.format(inv_fn == True and ' (invalid filename)' or ''))
xml = decoders[att.get_content_type()](att.get_payload(decode=True))
dmarc = parse_dmarc(xml)
store_dmarc(dbfile, dmarc)
def main():
global logfile, dbfile
opts, args = gnu_getopt(argv, 'l:')
for o in opts:
if o[0] == '-l':
logfile = o[1]
if len(args) == 2:
dbfile = args[1]
try:
m = email.message_from_file(stdin)
process_message(m)
m['X-DMARC-Report'] = 'True'
except Exception as e:
m['X-DMARC-Report'] = 'False; {}'.format(repr(e))
finally:
stdout.write(m.as_string())
if __name__ == '__main__':
main()
|