From 8ca299a9245d85bcdc9d487e9155a3b83ac8be13 Mon Sep 17 00:00:00 2001 From: Jesse Morgan Date: Sat, 21 Jan 2023 08:45:21 -0800 Subject: Remove tabs --- bin/emailcanary | 156 ++++++++++++------------- emailcanary/canarydb.py | 130 ++++++++++----------- emailcanary/email-digest-sender.py | 42 +++---- setup.py | 8 +- tests/test_canary.py | 164 +++++++++++++------------- tests/test_canarydb.py | 232 ++++++++++++++++++------------------- 6 files changed, 366 insertions(+), 366 deletions(-) diff --git a/bin/emailcanary b/bin/emailcanary index 2ad1d44..938ca8d 100755 --- a/bin/emailcanary +++ b/bin/emailcanary @@ -11,100 +11,100 @@ SUCCESS=0 FAILURE=1 def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument('-d', '--database', default='/etc/emailcanary.db', help='Specify the database to use.') - parser.add_argument('-s', '--smtp', default='localhost:465', help='SMTP Server to send chirps to.') - parser.add_argument('-f', '--from', dest='fromaddress', help='Specify the email address to send the ping from.') + parser = argparse.ArgumentParser() + parser.add_argument('-d', '--database', default='/etc/emailcanary.db', help='Specify the database to use.') + parser.add_argument('-s', '--smtp', default='localhost:465', help='SMTP Server to send chirps to.') + parser.add_argument('-f', '--from', dest='fromaddress', help='Specify the email address to send the ping from.') - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('--chirp', metavar='listAddress', help='Send an email to the given canary list address.') - group.add_argument('--check', metavar='listAddress', help='Check the recepient addresses for the list address.') - group.add_argument('--add', nargs=4, metavar=('listAddress', 'address', 'imapserver', 'password'), help='Add a new recepient and list.') - group.add_argument('--remove', nargs='+', metavar=('listAddress', 'address'), help='Remove recepients from the list.') - group.add_argument('--list', action='store_true', help='List all configured addresses.') + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('--chirp', metavar='listAddress', help='Send an email to the given canary list address.') + group.add_argument('--check', metavar='listAddress', help='Check the recepient addresses for the list address.') + group.add_argument('--add', nargs=4, metavar=('listAddress', 'address', 'imapserver', 'password'), help='Add a new recepient and list.') + group.add_argument('--remove', nargs='+', metavar=('listAddress', 'address'), help='Remove recepients from the list.') + group.add_argument('--list', action='store_true', help='List all configured addresses.') - args = parser.parse_args() - return args + args = parser.parse_args() + return args def get_smtp(address): - index = address.find(':') - if index == -1: - server = address - port = 465 - else: - (server, port) = address.split(':') - return smtplib.SMTP_SSL(server, port) + index = address.find(':') + if index == -1: + server = address + port = 465 + else: + (server, port) = address.split(':') + return smtplib.SMTP_SSL(server, port) def list(db): - accounts = db.get_accounts() - if len(accounts) == 0: - print('No accounts configured.') - return SUCCESS - print("%-25s %-25s %-25s" % ('List Address', 'Recepient', 'IMAP Server')) - print("-" * 80) - for account in accounts: - print("%-25s %-25s %-25s" % (account[0], account[1], account[2])) - return SUCCESS + accounts = db.get_accounts() + if len(accounts) == 0: + print('No accounts configured.') + return SUCCESS + print("%-25s %-25s %-25s" % ('List Address', 'Recepient', 'IMAP Server')) + print("-" * 80) + for account in accounts: + print("%-25s %-25s %-25s" % (account[0], account[1], account[2])) + return SUCCESS def add(db, listAddress, recepient, imapserver, password): - db.add_account(listAddress, recepient, imapserver, password) - return SUCCESS + db.add_account(listAddress, recepient, imapserver, password) + return SUCCESS def remove(db, listAddress, recepients): - if len(recepients) == 0: - recepients = db.get_accounts(listAddress) - for address in recepients: - db.remove_account(listAddress, address) - return SUCCESS + if len(recepients) == 0: + recepients = db.get_accounts(listAddress) + for address in recepients: + db.remove_account(listAddress, address) + return SUCCESS def check(db, birdie, listAddress): - missing = birdie.check(listAddress) - missing = [x for x in missing if x[3].total_seconds() < 86400] - missing = [x for x in missing if x[3].total_seconds() > 3600] - if len(missing) == 0: - return SUCCESS - print("list recepient uuid time") - for chirp in missing: - # Only Print chirps which were missing for more than 15 minutes. - print("%s %s %s %d" % (chirp[0], chirp[1], chirp[2], chirp[3].total_seconds())) - return FAILURE + missing = birdie.check(listAddress) + missing = [x for x in missing if x[3].total_seconds() < 86400] + missing = [x for x in missing if x[3].total_seconds() > 3600] + if len(missing) == 0: + return SUCCESS + print("list recepient uuid time") + for chirp in missing: + # Only Print chirps which were missing for more than 15 minutes. + print("%s %s %s %d" % (chirp[0], chirp[1], chirp[2], chirp[3].total_seconds())) + return FAILURE def main(): - args = parse_args() - if not args: - return + args = parse_args() + if not args: + return - smtp = None - db = None - try: - db = canarydb.CanaryDB(args.database) + smtp = None + db = None + try: + db = canarydb.CanaryDB(args.database) - if args.list: - return list(db) - elif args.add: - return add(db, args.add[0], args.add[1], args.add[2], args.add[3]) - elif args.remove: - return remove(db, args.remove[0], args.remove[1:]) - else: - smtp = get_smtp(args.smtp) - birdie = canary.Canary(db, smtp, args.fromaddress) - if args.chirp: - birdie.chirp(args.chirp) - return SUCCESS - elif args.check: - return check(db, birdie, args.check) - else: - raise Exception('Unknown action') + if args.list: + return list(db) + elif args.add: + return add(db, args.add[0], args.add[1], args.add[2], args.add[3]) + elif args.remove: + return remove(db, args.remove[0], args.remove[1:]) + else: + smtp = get_smtp(args.smtp) + birdie = canary.Canary(db, smtp, args.fromaddress) + if args.chirp: + birdie.chirp(args.chirp) + return SUCCESS + elif args.check: + return check(db, birdie, args.check) + else: + raise Exception('Unknown action') - except Exception as e: - sys.stderr.write("Error: %s\n" % (str(e))) - traceback.print_exc() - return FAILURE - finally: - if smtp: - smtp.quit() - if db: - db.close() + except Exception as e: + sys.stderr.write("Error: %s\n" % (str(e))) + traceback.print_exc() + return FAILURE + finally: + if smtp: + smtp.quit() + if db: + db.close() if __name__ == "__main__": - sys.exit(main()) + sys.exit(main()) diff --git a/emailcanary/canarydb.py b/emailcanary/canarydb.py index 980431b..8bf944f 100644 --- a/emailcanary/canarydb.py +++ b/emailcanary/canarydb.py @@ -2,77 +2,77 @@ import sqlite3 from datetime import datetime class CanaryDB: - def __init__(self, filename): - self.conn = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) + def __init__(self, filename): + self.conn = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) - # Create Tables if necessary - cursor = self.conn.cursor() - cursor.execute("CREATE TABLE IF NOT EXISTS pings (listAddress, address, uuid, timesent timestamp, timereceived timestamp)") - cursor.execute("CREATE TABLE IF NOT EXISTS accounts (list, address, imapserver, password)") + # Create Tables if necessary + cursor = self.conn.cursor() + cursor.execute("CREATE TABLE IF NOT EXISTS pings (listAddress, address, uuid, timesent timestamp, timereceived timestamp)") + cursor.execute("CREATE TABLE IF NOT EXISTS accounts (list, address, imapserver, password)") - def close(self): - self.conn.close() + def close(self): + self.conn.close() - def add_account(self, listAddress, address, imapserver, password): - cursor = self.conn.cursor() - cursor.execute("INSERT INTO accounts (list, address, imapserver, password) VALUES (?, ?, ?, ?)", \ - (listAddress, address, imapserver, password)) - self.conn.commit() + def add_account(self, listAddress, address, imapserver, password): + cursor = self.conn.cursor() + cursor.execute("INSERT INTO accounts (list, address, imapserver, password) VALUES (?, ?, ?, ?)", \ + (listAddress, address, imapserver, password)) + self.conn.commit() - def remove_account(self, listAddress, address): - cursor = self.conn.cursor() - cursor.execute("DELETE FROM accounts WHERE list=? AND address=?", (listAddress, address)) - self.conn.commit() + def remove_account(self, listAddress, address): + cursor = self.conn.cursor() + cursor.execute("DELETE FROM accounts WHERE list=? AND address=?", (listAddress, address)) + self.conn.commit() - def get_accounts(self, listAddress = None): - cursor = self.conn.cursor() - if listAddress: - cursor.execute("SELECT list, address, imapserver, password FROM accounts WHERE list=?", (listAddress,)); - else: - cursor.execute("SELECT list, address, imapserver, password FROM accounts"); - results = list() - for row in cursor: - listAddress = row[0] - address = row[1] - imapserver = row[2] - password = row[3] - results.append((listAddress, address, imapserver, password)) - return results + def get_accounts(self, listAddress = None): + cursor = self.conn.cursor() + if listAddress: + cursor.execute("SELECT list, address, imapserver, password FROM accounts WHERE list=?", (listAddress,)); + else: + cursor.execute("SELECT list, address, imapserver, password FROM accounts"); + results = list() + for row in cursor: + listAddress = row[0] + address = row[1] + imapserver = row[2] + password = row[3] + results.append((listAddress, address, imapserver, password)) + return results - def get_recipients_for_list(self, listAddress): - cursor = self.conn.cursor() - cursor.execute("SELECT address FROM accounts WHERE list=?", (listAddress,)); - results = list() - for row in cursor: - results.append(row[0]) - return results + def get_recipients_for_list(self, listAddress): + cursor = self.conn.cursor() + cursor.execute("SELECT address FROM accounts WHERE list=?", (listAddress,)); + results = list() + for row in cursor: + results.append(row[0]) + return results - def ping(self, listAddress, address, time, uuid): - cursor = self.conn.cursor() - cursor.execute("INSERT INTO pings (listAddress, address, timesent, uuid) VALUES (?, ?, ?, ?)", \ - (listAddress, address, time, uuid)) - self.conn.commit() + def ping(self, listAddress, address, time, uuid): + cursor = self.conn.cursor() + cursor.execute("INSERT INTO pings (listAddress, address, timesent, uuid) VALUES (?, ?, ?, ?)", \ + (listAddress, address, time, uuid)) + self.conn.commit() - def pong(self, address, time, uuid): - cursor = self.conn.cursor() - cursor.execute("UPDATE pings SET timereceived=? WHERE address=? AND uuid=?", \ - (time, address, uuid)) - self.conn.commit() + def pong(self, address, time, uuid): + cursor = self.conn.cursor() + cursor.execute("UPDATE pings SET timereceived=? WHERE address=? AND uuid=?", \ + (time, address, uuid)) + self.conn.commit() - def get_missing_pongs(self, listAddress = None): - '''Return a list of tupls of missing pongs. - Each tupl contains (listAddress, uuid, address, time since send)''' - cursor = self.conn.cursor() - if listAddress: - cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL AND listAddress = ?", (listAddress,)); - else: - cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL"); - now = datetime.now() - results = list() - for row in cursor: - listAddress = row[0] - uuid = row[1] - address = row[2] - delta = now - row[3] - results.append((listAddress, uuid, address, delta)) - return results + def get_missing_pongs(self, listAddress = None): + '''Return a list of tupls of missing pongs. + Each tupl contains (listAddress, uuid, address, time since send)''' + cursor = self.conn.cursor() + if listAddress: + cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL AND listAddress = ?", (listAddress,)); + else: + cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL"); + now = datetime.now() + results = list() + for row in cursor: + listAddress = row[0] + uuid = row[1] + address = row[2] + delta = now - row[3] + results.append((listAddress, uuid, address, delta)) + return results diff --git a/emailcanary/email-digest-sender.py b/emailcanary/email-digest-sender.py index fa4fcb0..e1aad38 100644 --- a/emailcanary/email-digest-sender.py +++ b/emailcanary/email-digest-sender.py @@ -10,29 +10,29 @@ youve_got_mail = False all_subjects = {} for account in ACCOUNTS: - mail = emailutils.get_imap(account[0], account[1], account[2]) - these_subjects = [] - for uid in emailutils.get_mail_uids(mail): - message = emailutils.get_message(mail, uid) - these_subjects.append(message['subject']) - youve_got_mail = True - all_subjects[account[1]] = these_subjects + mail = emailutils.get_imap(account[0], account[1], account[2]) + these_subjects = [] + for uid in emailutils.get_mail_uids(mail): + message = emailutils.get_message(mail, uid) + these_subjects.append(message['subject']) + youve_got_mail = True + all_subjects[account[1]] = these_subjects if youve_got_mail: - msg = "" - for account in all_subjects: - msg = msg + "# Messages for %s\n" % account - for subject in all_subjects[account]: - msg = msg + " * %s\n" % subject - msg = msg + "\n" + msg = "" + for account in all_subjects: + msg = msg + "# Messages for %s\n" % account + for subject in all_subjects[account]: + msg = msg + " * %s\n" % subject + msg = msg + "\n" - digest_message = message.Message() - digest_message.set_payload(msg) - digest_message['From'] = DESTINATION - digest_message['To'] = DESTINATION - digest_message['Subject'] = "Email Digest" + digest_message = message.Message() + digest_message.set_payload(msg) + digest_message['From'] = DESTINATION + digest_message['To'] = DESTINATION + digest_message['Subject'] = "Email Digest" - s = smtplib.SMTP_SSL('localhost', 2465) - s.sendmail(DESTINATION, DESTINATION, digest_message.as_string()) - s.quit() + s = smtplib.SMTP_SSL('localhost', 2465) + s.sendmail(DESTINATION, DESTINATION, digest_message.as_string()) + s.quit() diff --git a/setup.py b/setup.py index bc53c06..fbc1a84 100644 --- a/setup.py +++ b/setup.py @@ -12,11 +12,11 @@ setup(name='emailcanary', packages = find_packages(), include_package_data = True, exclude_package_data = { '': ['README.txt'] }, - + scripts = ['bin/emailcanary'], - + license='MIT', - + #setup_requires = ['python-stdeb', 'fakeroot', 'python-all'], install_requires = ['setuptools'], - ) \ No newline at end of file + ) diff --git a/tests/test_canary.py b/tests/test_canary.py index d23f3fe..c0ba262 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -17,87 +17,87 @@ SERVER = "mail.example.com" PASSWORD = "secret" class TestCanary(unittest.TestCase): - def setUp(self): - self.db = mock.Mock(canarydb.CanaryDB) - self.smtp = mock.Mock(smtplib.SMTP_SSL) - self.canary = canary.Canary(self.db, self.smtp, FROM_ADDRESS) - canary.emailutils.get_imap = mock.Mock() - canary.emailutils.get_message = mock.Mock() - canary.emailutils.get_mail_uids = mock.Mock() - canary.emailutils.delete_message = mock.Mock() - canary.emailutils.close = mock.Mock() - - def tearDown(self): - pass - - def testChirp(self): - # Setup Mock - self.db.get_recipients_for_list.return_value = [USER_ADDRESS1, USER_ADDRESS2] - - # Test chirp - self.canary.chirp(LIST_ADDRESS) - - # Assert DB updated - self.db.get_recipients_for_list.assert_called_with(LIST_ADDRESS) - self.db.ping.assert_has_calls( \ - [mock.call(LIST_ADDRESS, USER_ADDRESS1, mock.ANY, mock.ANY), \ - mock.call(LIST_ADDRESS, USER_ADDRESS2, mock.ANY, mock.ANY)]) - args = self.db.ping.call_args - expectedSubject = "Canary Email " + args[0][3] - - # Assert emails were sent - self.assertEqual(1, self.smtp.sendmail.call_count) - args = self.smtp.sendmail.call_args[0] - self.assertEqual(FROM_ADDRESS, args[0]) - self.assertEqual(LIST_ADDRESS, args[1]) - msg = email.message_from_string(args[2]) - self.assertEqual(FROM_ADDRESS, msg['From']) - self.assertEqual(LIST_ADDRESS, msg['To']) - self.assertEqual(expectedSubject, msg['Subject']) - - def testCheck(self): - # Setup mocks - expectedUUID = "1234-5678-9012-3456" - self.db.get_accounts.return_value = [ \ - (LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD), \ - (LIST_ADDRESS, USER_ADDRESS2, SERVER, PASSWORD)] - canary.emailutils.get_mail_uids.return_value = [1] - canary.emailutils.get_message.return_value = {'Subject': "Canary Email " + expectedUUID} - - # Test check - self.canary.check(LIST_ADDRESS) - - # Assert DB calls - self.db.get_accounts.assert_called_with(LIST_ADDRESS) - self.db.pong.assert_has_calls([ \ - mock.call(USER_ADDRESS1, mock.ANY, expectedUUID), \ - mock.call(USER_ADDRESS2, mock.ANY, expectedUUID)]) - - # Assert mail calls - canary.emailutils.get_imap.assert_has_calls([ \ - mock.call(SERVER, USER_ADDRESS1, PASSWORD), \ - mock.call(SERVER, USER_ADDRESS2, PASSWORD)]) - canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) - canary.emailutils.delete_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) - canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) - - def testDontDeleteOtherMail(self): - # Setup mocks - self.db.get_accounts.return_value = [(LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD)] - canary.emailutils.get_mail_uids.return_value = [1] - canary.emailutils.get_message.return_value = {'Subject': "Buy Our New Widget"} - - # Test check - self.canary.check(LIST_ADDRESS) - - # Assert DB calls - self.db.get_accounts.assert_called_with(LIST_ADDRESS) - self.db.pong.assert_not_called() - - # Assert mail calls - canary.emailutils.get_imap.assert_called_with(SERVER, USER_ADDRESS1, PASSWORD) - canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) - canary.emailutils.delete_message.assert_not_called() - canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) + def setUp(self): + self.db = mock.Mock(canarydb.CanaryDB) + self.smtp = mock.Mock(smtplib.SMTP_SSL) + self.canary = canary.Canary(self.db, self.smtp, FROM_ADDRESS) + canary.emailutils.get_imap = mock.Mock() + canary.emailutils.get_message = mock.Mock() + canary.emailutils.get_mail_uids = mock.Mock() + canary.emailutils.delete_message = mock.Mock() + canary.emailutils.close = mock.Mock() + + def tearDown(self): + pass + + def testChirp(self): + # Setup Mock + self.db.get_recipients_for_list.return_value = [USER_ADDRESS1, USER_ADDRESS2] + + # Test chirp + self.canary.chirp(LIST_ADDRESS) + + # Assert DB updated + self.db.get_recipients_for_list.assert_called_with(LIST_ADDRESS) + self.db.ping.assert_has_calls( \ + [mock.call(LIST_ADDRESS, USER_ADDRESS1, mock.ANY, mock.ANY), \ + mock.call(LIST_ADDRESS, USER_ADDRESS2, mock.ANY, mock.ANY)]) + args = self.db.ping.call_args + expectedSubject = "Canary Email " + args[0][3] + + # Assert emails were sent + self.assertEqual(1, self.smtp.sendmail.call_count) + args = self.smtp.sendmail.call_args[0] + self.assertEqual(FROM_ADDRESS, args[0]) + self.assertEqual(LIST_ADDRESS, args[1]) + msg = email.message_from_string(args[2]) + self.assertEqual(FROM_ADDRESS, msg['From']) + self.assertEqual(LIST_ADDRESS, msg['To']) + self.assertEqual(expectedSubject, msg['Subject']) + + def testCheck(self): + # Setup mocks + expectedUUID = "1234-5678-9012-3456" + self.db.get_accounts.return_value = [ \ + (LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD), \ + (LIST_ADDRESS, USER_ADDRESS2, SERVER, PASSWORD)] + canary.emailutils.get_mail_uids.return_value = [1] + canary.emailutils.get_message.return_value = {'Subject': "Canary Email " + expectedUUID} + + # Test check + self.canary.check(LIST_ADDRESS) + + # Assert DB calls + self.db.get_accounts.assert_called_with(LIST_ADDRESS) + self.db.pong.assert_has_calls([ \ + mock.call(USER_ADDRESS1, mock.ANY, expectedUUID), \ + mock.call(USER_ADDRESS2, mock.ANY, expectedUUID)]) + + # Assert mail calls + canary.emailutils.get_imap.assert_has_calls([ \ + mock.call(SERVER, USER_ADDRESS1, PASSWORD), \ + mock.call(SERVER, USER_ADDRESS2, PASSWORD)]) + canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) + canary.emailutils.delete_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) + canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) + + def testDontDeleteOtherMail(self): + # Setup mocks + self.db.get_accounts.return_value = [(LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD)] + canary.emailutils.get_mail_uids.return_value = [1] + canary.emailutils.get_message.return_value = {'Subject': "Buy Our New Widget"} + + # Test check + self.canary.check(LIST_ADDRESS) + + # Assert DB calls + self.db.get_accounts.assert_called_with(LIST_ADDRESS) + self.db.pong.assert_not_called() + + # Assert mail calls + canary.emailutils.get_imap.assert_called_with(SERVER, USER_ADDRESS1, PASSWORD) + canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) + canary.emailutils.delete_message.assert_not_called() + canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) diff --git a/tests/test_canarydb.py b/tests/test_canarydb.py index a51afdc..195387e 100644 --- a/tests/test_canarydb.py +++ b/tests/test_canarydb.py @@ -5,119 +5,119 @@ import datetime from emailcanary import canarydb class TestCanaryDB(unittest.TestCase): - def setUp(self): - self.tempdir = tempfile.mkdtemp() - self.db = canarydb.CanaryDB(self.tempdir + "canary.db") - - def tearDown(self): - self.db.close() - shutil.rmtree(self.tempdir) - - def testPingCheckPong(self): - listAddress = "list@example.com" - address = "test@example.com" - time = datetime.datetime(2015, 10, 24, 9, 00) - uuid = "1234" - expectedDelta = datetime.datetime.now() - time - - # Record a Ping - self.db.ping(listAddress, address, time, uuid) - - # Check for missing pongs - missing = self.db.get_missing_pongs() - - self.assertEqual(1, len(missing)) - firstMissing = missing[0] - self.assertEqual(4, len(firstMissing)) - self.assertEqual(listAddress, firstMissing[0]) - self.assertEqual(uuid, firstMissing[1]) - self.assertEqual(address, firstMissing[2]) - delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() - self.assertTrue(delta <= 10) - - # Record a pong - pongtime = datetime.datetime(2015, 10, 24, 9, 05) - self.db.pong(address, pongtime, uuid) - - # Check for missing pongs - missing = self.db.get_missing_pongs() - self.assertEqual(0, len(missing)) - - def testCloseReopen(self): - listAddress = "list@example.com" - address = "test@example.com" - time = datetime.datetime(2015, 10, 24, 9, 00) - uuid = "1234" - expectedDelta = datetime.datetime.now() - time - - # Record a Ping - self.db.ping(listAddress, address, time, uuid) - - # Close, Reopen - self.db.close() - self.db = canarydb.CanaryDB(self.tempdir + "canary.db") - - # Check for missing pongs - missing = self.db.get_missing_pongs() - - self.assertEqual(1, len(missing)) - firstMissing = missing[0] - self.assertEqual(4, len(firstMissing)) - self.assertEqual(listAddress, firstMissing[0]) - self.assertEqual(uuid, firstMissing[1]) - self.assertEqual(address, firstMissing[2]) - delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() - self.assertTrue(delta <= 10) - - def testAccounts(self): - listAddress = "list@example.org" - address = "user@example.net" - imapserver = "imap.example.net" - password = "secretpassword" - - # Verify that no accounts exist - accounts = self.db.get_accounts() - self.assertEqual(0, len(accounts)) - - # Add one account - self.db.add_account(listAddress, address, imapserver, password) - - # Verify that the account exists - accounts = self.db.get_accounts() - self.assertEqual(1, len(accounts)) - self.assertEqual(listAddress, accounts[0][0]) - self.assertEqual(address, accounts[0][1]) - self.assertEqual(imapserver, accounts[0][2]) - self.assertEqual(password, accounts[0][3]) - - # Remove the account - self.db.remove_account(listAddress, address) - accounts = self.db.get_accounts() - self.assertEqual(0, len(accounts)) - - def testGetRecipientsForList(self): - listAddress1 = "list1@example.org" - listAddress2 = "list2@example.org" - imapserver = "imap.example.net" - password = "secretpassword" - address1 = "user1@example.net" - address2 = "user2@example.net" - - # No accounts - self.assertEqual([], self.db.get_recipients_for_list(listAddress1)); - self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - - # One account - self.db.add_account(listAddress1, address1, imapserver, password) - self.assertEqual([address1], self.db.get_recipients_for_list(listAddress1)); - self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - - # Two accounts - self.db.add_account(listAddress1, address2, imapserver, password) - self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); - self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - - # Two lists - self.db.add_account(listAddress2, address1, imapserver, password) - self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); - self.assertEqual([address1], self.db.get_recipients_for_list(listAddress2)); + def setUp(self): + self.tempdir = tempfile.mkdtemp() + self.db = canarydb.CanaryDB(self.tempdir + "canary.db") + + def tearDown(self): + self.db.close() + shutil.rmtree(self.tempdir) + + def testPingCheckPong(self): + listAddress = "list@example.com" + address = "test@example.com" + time = datetime.datetime(2015, 10, 24, 9, 00) + uuid = "1234" + expectedDelta = datetime.datetime.now() - time + + # Record a Ping + self.db.ping(listAddress, address, time, uuid) + + # Check for missing pongs + missing = self.db.get_missing_pongs() + + self.assertEqual(1, len(missing)) + firstMissing = missing[0] + self.assertEqual(4, len(firstMissing)) + self.assertEqual(listAddress, firstMissing[0]) + self.assertEqual(uuid, firstMissing[1]) + self.assertEqual(address, firstMissing[2]) + delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() + self.assertTrue(delta <= 10) + + # Record a pong + pongtime = datetime.datetime(2015, 10, 24, 9, 05) + self.db.pong(address, pongtime, uuid) + + # Check for missing pongs + missing = self.db.get_missing_pongs() + self.assertEqual(0, len(missing)) + + def testCloseReopen(self): + listAddress = "list@example.com" + address = "test@example.com" + time = datetime.datetime(2015, 10, 24, 9, 00) + uuid = "1234" + expectedDelta = datetime.datetime.now() - time + + # Record a Ping + self.db.ping(listAddress, address, time, uuid) + + # Close, Reopen + self.db.close() + self.db = canarydb.CanaryDB(self.tempdir + "canary.db") + + # Check for missing pongs + missing = self.db.get_missing_pongs() + + self.assertEqual(1, len(missing)) + firstMissing = missing[0] + self.assertEqual(4, len(firstMissing)) + self.assertEqual(listAddress, firstMissing[0]) + self.assertEqual(uuid, firstMissing[1]) + self.assertEqual(address, firstMissing[2]) + delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() + self.assertTrue(delta <= 10) + + def testAccounts(self): + listAddress = "list@example.org" + address = "user@example.net" + imapserver = "imap.example.net" + password = "secretpassword" + + # Verify that no accounts exist + accounts = self.db.get_accounts() + self.assertEqual(0, len(accounts)) + + # Add one account + self.db.add_account(listAddress, address, imapserver, password) + + # Verify that the account exists + accounts = self.db.get_accounts() + self.assertEqual(1, len(accounts)) + self.assertEqual(listAddress, accounts[0][0]) + self.assertEqual(address, accounts[0][1]) + self.assertEqual(imapserver, accounts[0][2]) + self.assertEqual(password, accounts[0][3]) + + # Remove the account + self.db.remove_account(listAddress, address) + accounts = self.db.get_accounts() + self.assertEqual(0, len(accounts)) + + def testGetRecipientsForList(self): + listAddress1 = "list1@example.org" + listAddress2 = "list2@example.org" + imapserver = "imap.example.net" + password = "secretpassword" + address1 = "user1@example.net" + address2 = "user2@example.net" + + # No accounts + self.assertEqual([], self.db.get_recipients_for_list(listAddress1)); + self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + + # One account + self.db.add_account(listAddress1, address1, imapserver, password) + self.assertEqual([address1], self.db.get_recipients_for_list(listAddress1)); + self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + + # Two accounts + self.db.add_account(listAddress1, address2, imapserver, password) + self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); + self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + + # Two lists + self.db.add_account(listAddress2, address1, imapserver, password) + self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); + self.assertEqual([address1], self.db.get_recipients_for_list(listAddress2)); -- cgit v1.2.3