1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import unittest
import tempfile, shutil
import datetime
from emailcanary import canarydb
class TestCanaryDB(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.db = canarydb.CanaryDB(self.tempdir + "canary.db")
def tearDown(self):
self.db.close()
shutil.rmtree(self.tempdir)
def testPingCheckPong(self):
listAddress = "list@example.com"
address = "test@example.com"
time = datetime.datetime(2015, 10, 24, 9, 00)
uuid = "1234"
expectedDelta = datetime.datetime.now() - time
# Record a Ping
self.db.ping(listAddress, address, time, uuid)
# Check for missing pongs
missing = self.db.get_missing_pongs()
self.assertEqual(1, len(missing))
firstMissing = missing[0]
self.assertEqual(4, len(firstMissing))
self.assertEqual(listAddress, firstMissing[0])
self.assertEqual(uuid, firstMissing[1])
self.assertEqual(address, firstMissing[2])
delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds()
self.assertTrue(delta <= 10)
# Record a pong
pongtime = datetime.datetime(2015, 10, 24, 9, 05)
self.db.pong(address, pongtime, uuid)
# Check for missing pongs
missing = self.db.get_missing_pongs()
self.assertEqual(0, len(missing))
def testCloseReopen(self):
listAddress = "list@example.com"
address = "test@example.com"
time = datetime.datetime(2015, 10, 24, 9, 00)
uuid = "1234"
expectedDelta = datetime.datetime.now() - time
# Record a Ping
self.db.ping(listAddress, address, time, uuid)
# Close, Reopen
self.db.close()
self.db = canarydb.CanaryDB(self.tempdir + "canary.db")
# Check for missing pongs
missing = self.db.get_missing_pongs()
self.assertEqual(1, len(missing))
firstMissing = missing[0]
self.assertEqual(4, len(firstMissing))
self.assertEqual(listAddress, firstMissing[0])
self.assertEqual(uuid, firstMissing[1])
self.assertEqual(address, firstMissing[2])
delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds()
self.assertTrue(delta <= 10)
def testAccounts(self):
listAddress = "list@example.org"
address = "user@example.net"
imapserver = "imap.example.net"
password = "secretpassword"
# Verify that no accounts exist
accounts = self.db.get_accounts()
self.assertEqual(0, len(accounts))
# Add one account
self.db.add_account(listAddress, address, imapserver, password)
# Verify that the account exists
accounts = self.db.get_accounts()
self.assertEqual(1, len(accounts))
self.assertEqual(listAddress, accounts[0][0])
self.assertEqual(address, accounts[0][1])
self.assertEqual(imapserver, accounts[0][2])
self.assertEqual(password, accounts[0][3])
# Remove the account
self.db.remove_account(listAddress, address)
accounts = self.db.get_accounts()
self.assertEqual(0, len(accounts))
def testGetRecipientsForList(self):
listAddress1 = "list1@example.org"
listAddress2 = "list2@example.org"
imapserver = "imap.example.net"
password = "secretpassword"
address1 = "user1@example.net"
address2 = "user2@example.net"
# No accounts
self.assertEqual([], self.db.get_recipients_for_list(listAddress1));
self.assertEqual([], self.db.get_recipients_for_list(listAddress2));
# One account
self.db.add_account(listAddress1, address1, imapserver, password)
self.assertEqual([address1], self.db.get_recipients_for_list(listAddress1));
self.assertEqual([], self.db.get_recipients_for_list(listAddress2));
# Two accounts
self.db.add_account(listAddress1, address2, imapserver, password)
self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1));
self.assertEqual([], self.db.get_recipients_for_list(listAddress2));
# Two lists
self.db.add_account(listAddress2, address1, imapserver, password)
self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1));
self.assertEqual([address1], self.db.get_recipients_for_list(listAddress2));
|