diff options
| -rwxr-xr-x | bin/emailcanary | 156 | ||||
| -rw-r--r-- | emailcanary/canarydb.py | 130 | ||||
| -rw-r--r-- | emailcanary/email-digest-sender.py | 42 | ||||
| -rw-r--r-- | setup.py | 8 | ||||
| -rw-r--r-- | tests/test_canary.py | 164 | ||||
| -rw-r--r-- | tests/test_canarydb.py | 232 | 
6 files changed, 366 insertions, 366 deletions
| diff --git a/bin/emailcanary b/bin/emailcanary index 2ad1d44..938ca8d 100755 --- a/bin/emailcanary +++ b/bin/emailcanary @@ -11,100 +11,100 @@ SUCCESS=0  FAILURE=1  def parse_args(): -	parser = argparse.ArgumentParser() -	parser.add_argument('-d', '--database', default='/etc/emailcanary.db', help='Specify the database to use.') -	parser.add_argument('-s', '--smtp', default='localhost:465', help='SMTP Server to send chirps to.') -	parser.add_argument('-f', '--from', dest='fromaddress', help='Specify the email address to send the ping from.') +    parser = argparse.ArgumentParser() +    parser.add_argument('-d', '--database', default='/etc/emailcanary.db', help='Specify the database to use.') +    parser.add_argument('-s', '--smtp', default='localhost:465', help='SMTP Server to send chirps to.') +    parser.add_argument('-f', '--from', dest='fromaddress', help='Specify the email address to send the ping from.') -	group = parser.add_mutually_exclusive_group(required=True) -	group.add_argument('--chirp', metavar='listAddress', help='Send an email to the given canary list address.') -	group.add_argument('--check', metavar='listAddress', help='Check the recepient addresses for the list address.') -	group.add_argument('--add', nargs=4, metavar=('listAddress', 'address', 'imapserver', 'password'), help='Add a new recepient and list.') -	group.add_argument('--remove', nargs='+', metavar=('listAddress', 'address'), help='Remove recepients from the list.') -	group.add_argument('--list', action='store_true', help='List all configured addresses.') +    group = parser.add_mutually_exclusive_group(required=True) +    group.add_argument('--chirp', metavar='listAddress', help='Send an email to the given canary list address.') +    group.add_argument('--check', metavar='listAddress', help='Check the recepient addresses for the list address.') +    group.add_argument('--add', nargs=4, metavar=('listAddress', 'address', 'imapserver', 'password'), help='Add a new recepient and list.') +    group.add_argument('--remove', nargs='+', metavar=('listAddress', 'address'), help='Remove recepients from the list.') +    group.add_argument('--list', action='store_true', help='List all configured addresses.') -	args = parser.parse_args() -	return args +    args = parser.parse_args() +    return args  def get_smtp(address): -	index = address.find(':') -	if index == -1: -		server = address -		port = 465 -	else: -		(server, port) = address.split(':') -	return smtplib.SMTP_SSL(server, port) +    index = address.find(':') +    if index == -1: +        server = address +        port = 465 +    else: +        (server, port) = address.split(':') +    return smtplib.SMTP_SSL(server, port)  def list(db): -	accounts = db.get_accounts() -	if len(accounts) == 0: -		print('No accounts configured.') -		return SUCCESS -	print("%-25s %-25s %-25s" % ('List Address', 'Recepient', 'IMAP Server')) -	print("-" * 80) -	for account in accounts: -		print("%-25s %-25s %-25s" % (account[0], account[1], account[2])) -	return SUCCESS +    accounts = db.get_accounts() +    if len(accounts) == 0: +        print('No accounts configured.') +        return SUCCESS +    print("%-25s %-25s %-25s" % ('List Address', 'Recepient', 'IMAP Server')) +    print("-" * 80) +    for account in accounts: +        print("%-25s %-25s %-25s" % (account[0], account[1], account[2])) +    return SUCCESS  def add(db, listAddress, recepient, imapserver, password): -	db.add_account(listAddress, recepient, imapserver, password) -	return SUCCESS +    db.add_account(listAddress, recepient, imapserver, password) +    return SUCCESS  def remove(db, listAddress, recepients): -	if len(recepients) == 0: -		recepients = db.get_accounts(listAddress) -	for address in recepients: -		db.remove_account(listAddress, address) -	return SUCCESS +    if len(recepients) == 0: +        recepients = db.get_accounts(listAddress) +    for address in recepients: +        db.remove_account(listAddress, address) +    return SUCCESS  def check(db, birdie, listAddress): -	missing = birdie.check(listAddress) -	missing = [x for x in missing if x[3].total_seconds() < 86400] -	missing = [x for x in missing if x[3].total_seconds() > 3600] -	if len(missing) == 0: -		return SUCCESS -	print("list recepient uuid time") -	for chirp in missing: -		# Only Print chirps which were missing for more than 15 minutes. -		print("%s %s %s %d" % (chirp[0], chirp[1], chirp[2], chirp[3].total_seconds())) -	return FAILURE +    missing = birdie.check(listAddress) +    missing = [x for x in missing if x[3].total_seconds() < 86400] +    missing = [x for x in missing if x[3].total_seconds() > 3600] +    if len(missing) == 0: +        return SUCCESS +    print("list recepient uuid time") +    for chirp in missing: +        # Only Print chirps which were missing for more than 15 minutes. +        print("%s %s %s %d" % (chirp[0], chirp[1], chirp[2], chirp[3].total_seconds())) +    return FAILURE  def main(): -	args = parse_args() -	if not args: -		return +    args = parse_args() +    if not args: +        return -	smtp = None -	db = None -	try: -		db = canarydb.CanaryDB(args.database) +    smtp = None +    db = None +    try: +        db = canarydb.CanaryDB(args.database) -		if args.list: -			return list(db) -		elif args.add: -			return add(db, args.add[0], args.add[1], args.add[2], args.add[3]) -		elif args.remove: -			return remove(db, args.remove[0], args.remove[1:]) -		else: -			smtp = get_smtp(args.smtp) -			birdie = canary.Canary(db, smtp, args.fromaddress) -			if args.chirp: -				birdie.chirp(args.chirp) -				return SUCCESS -			elif args.check: -				return check(db, birdie, args.check) -			else: -				raise Exception('Unknown action') +        if args.list: +            return list(db) +        elif args.add: +            return add(db, args.add[0], args.add[1], args.add[2], args.add[3]) +        elif args.remove: +            return remove(db, args.remove[0], args.remove[1:]) +        else: +            smtp = get_smtp(args.smtp) +            birdie = canary.Canary(db, smtp, args.fromaddress) +            if args.chirp: +                birdie.chirp(args.chirp) +                return SUCCESS +            elif args.check: +                return check(db, birdie, args.check) +            else: +                raise Exception('Unknown action') -	except Exception as e: -		sys.stderr.write("Error: %s\n" % (str(e))) -		traceback.print_exc() -		return FAILURE -	finally: -		if smtp: -			smtp.quit() -		if db: -			db.close() +    except Exception as e: +        sys.stderr.write("Error: %s\n" % (str(e))) +        traceback.print_exc() +        return FAILURE +    finally: +        if smtp: +            smtp.quit() +        if db: +            db.close()  if __name__ == "__main__": -	sys.exit(main()) +    sys.exit(main()) diff --git a/emailcanary/canarydb.py b/emailcanary/canarydb.py index 980431b..8bf944f 100644 --- a/emailcanary/canarydb.py +++ b/emailcanary/canarydb.py @@ -2,77 +2,77 @@ import sqlite3  from datetime import datetime  class CanaryDB: -	def __init__(self, filename): -		self.conn = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) +    def __init__(self, filename): +        self.conn = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) -		# Create Tables if necessary -		cursor = self.conn.cursor() -		cursor.execute("CREATE TABLE IF NOT EXISTS pings (listAddress, address, uuid, timesent timestamp, timereceived timestamp)") -		cursor.execute("CREATE TABLE IF NOT EXISTS accounts (list, address, imapserver, password)") +        # Create Tables if necessary +        cursor = self.conn.cursor() +        cursor.execute("CREATE TABLE IF NOT EXISTS pings (listAddress, address, uuid, timesent timestamp, timereceived timestamp)") +        cursor.execute("CREATE TABLE IF NOT EXISTS accounts (list, address, imapserver, password)") -	def close(self): -		self.conn.close() +    def close(self): +        self.conn.close() -	def add_account(self, listAddress, address, imapserver, password): -		cursor = self.conn.cursor() -		cursor.execute("INSERT INTO accounts (list, address, imapserver, password) VALUES (?, ?, ?, ?)", \ -			(listAddress, address, imapserver, password)) -		self.conn.commit() +    def add_account(self, listAddress, address, imapserver, password): +        cursor = self.conn.cursor() +        cursor.execute("INSERT INTO accounts (list, address, imapserver, password) VALUES (?, ?, ?, ?)", \ +            (listAddress, address, imapserver, password)) +        self.conn.commit() -	def remove_account(self, listAddress, address): -		cursor = self.conn.cursor() -		cursor.execute("DELETE FROM accounts WHERE list=? AND address=?", (listAddress, address)) -		self.conn.commit() +    def remove_account(self, listAddress, address): +        cursor = self.conn.cursor() +        cursor.execute("DELETE FROM accounts WHERE list=? AND address=?", (listAddress, address)) +        self.conn.commit() -	def get_accounts(self, listAddress = None): -		cursor = self.conn.cursor() -		if listAddress: -			cursor.execute("SELECT list, address, imapserver, password FROM accounts WHERE list=?", (listAddress,)); -		else: -			cursor.execute("SELECT list, address, imapserver, password FROM accounts"); -		results = list() -		for row in cursor: -			listAddress = row[0] -			address = row[1] -			imapserver = row[2] -			password = row[3] -			results.append((listAddress, address, imapserver, password)) -		return results +    def get_accounts(self, listAddress = None): +        cursor = self.conn.cursor() +        if listAddress: +            cursor.execute("SELECT list, address, imapserver, password FROM accounts WHERE list=?", (listAddress,)); +        else: +            cursor.execute("SELECT list, address, imapserver, password FROM accounts"); +        results = list() +        for row in cursor: +            listAddress = row[0] +            address = row[1] +            imapserver = row[2] +            password = row[3] +            results.append((listAddress, address, imapserver, password)) +        return results -	def get_recipients_for_list(self, listAddress): -		cursor = self.conn.cursor() -		cursor.execute("SELECT address FROM accounts WHERE list=?", (listAddress,)); -		results = list() -		for row in cursor: -			results.append(row[0]) -		return results +    def get_recipients_for_list(self, listAddress): +        cursor = self.conn.cursor() +        cursor.execute("SELECT address FROM accounts WHERE list=?", (listAddress,)); +        results = list() +        for row in cursor: +            results.append(row[0]) +        return results -	def ping(self, listAddress, address, time, uuid): -		cursor = self.conn.cursor() -		cursor.execute("INSERT INTO pings (listAddress, address, timesent, uuid) VALUES (?, ?, ?, ?)", \ -			(listAddress, address, time, uuid)) -		self.conn.commit() +    def ping(self, listAddress, address, time, uuid): +        cursor = self.conn.cursor() +        cursor.execute("INSERT INTO pings (listAddress, address, timesent, uuid) VALUES (?, ?, ?, ?)", \ +            (listAddress, address, time, uuid)) +        self.conn.commit() -	def pong(self, address, time, uuid): -		cursor = self.conn.cursor() -		cursor.execute("UPDATE pings SET timereceived=? WHERE address=? AND uuid=?", \ -			(time, address, uuid)) -		self.conn.commit() +    def pong(self, address, time, uuid): +        cursor = self.conn.cursor() +        cursor.execute("UPDATE pings SET timereceived=? WHERE address=? AND uuid=?", \ +            (time, address, uuid)) +        self.conn.commit() -	def get_missing_pongs(self, listAddress = None): -		'''Return a list of tupls of missing pongs. -		   Each tupl contains (listAddress, uuid, address, time since send)''' -		cursor = self.conn.cursor() -		if listAddress: -			cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL AND listAddress = ?", (listAddress,)); -		else: -			cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL"); -		now = datetime.now() -		results = list() -		for row in cursor: -			listAddress = row[0] -			uuid = row[1] -			address = row[2] -			delta = now - row[3] -			results.append((listAddress, uuid, address, delta)) -		return results +    def get_missing_pongs(self, listAddress = None): +        '''Return a list of tupls of missing pongs. +           Each tupl contains (listAddress, uuid, address, time since send)''' +        cursor = self.conn.cursor() +        if listAddress: +            cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL AND listAddress = ?", (listAddress,)); +        else: +            cursor.execute("SELECT listAddress, uuid, address, timesent FROM pings WHERE timereceived IS NULL"); +        now = datetime.now() +        results = list() +        for row in cursor: +            listAddress = row[0] +            uuid = row[1] +            address = row[2] +            delta = now - row[3] +            results.append((listAddress, uuid, address, delta)) +        return results diff --git a/emailcanary/email-digest-sender.py b/emailcanary/email-digest-sender.py index fa4fcb0..e1aad38 100644 --- a/emailcanary/email-digest-sender.py +++ b/emailcanary/email-digest-sender.py @@ -10,29 +10,29 @@ youve_got_mail = False  all_subjects = {}  for account in ACCOUNTS: -	mail = emailutils.get_imap(account[0], account[1], account[2]) -	these_subjects = [] -	for uid in emailutils.get_mail_uids(mail): -		message = emailutils.get_message(mail, uid) -		these_subjects.append(message['subject']) -		youve_got_mail = True -	all_subjects[account[1]] = these_subjects +    mail = emailutils.get_imap(account[0], account[1], account[2]) +    these_subjects = [] +    for uid in emailutils.get_mail_uids(mail): +        message = emailutils.get_message(mail, uid) +        these_subjects.append(message['subject']) +        youve_got_mail = True +    all_subjects[account[1]] = these_subjects  if youve_got_mail: -	msg = "" -	for account in all_subjects: -		msg = msg + "# Messages for %s\n" % account -		for subject in all_subjects[account]: -			msg = msg + " * %s\n" % subject -		msg = msg + "\n" +    msg = "" +    for account in all_subjects: +        msg = msg + "# Messages for %s\n" % account +        for subject in all_subjects[account]: +            msg = msg + " * %s\n" % subject +        msg = msg + "\n" -	digest_message = message.Message() -	digest_message.set_payload(msg) -	digest_message['From'] = DESTINATION -	digest_message['To'] = DESTINATION -	digest_message['Subject'] = "Email Digest" +    digest_message = message.Message() +    digest_message.set_payload(msg) +    digest_message['From'] = DESTINATION +    digest_message['To'] = DESTINATION +    digest_message['Subject'] = "Email Digest" -	s = smtplib.SMTP_SSL('localhost', 2465) -	s.sendmail(DESTINATION, DESTINATION, digest_message.as_string()) -	s.quit() +    s = smtplib.SMTP_SSL('localhost', 2465) +    s.sendmail(DESTINATION, DESTINATION, digest_message.as_string()) +    s.quit() @@ -12,11 +12,11 @@ setup(name='emailcanary',        packages = find_packages(),        include_package_data = True,        exclude_package_data = { '': ['README.txt'] }, -       +        scripts = ['bin/emailcanary'], -       +        license='MIT', -                   +        #setup_requires = ['python-stdeb', 'fakeroot', 'python-all'],        install_requires = ['setuptools'], -     )
\ No newline at end of file +     ) diff --git a/tests/test_canary.py b/tests/test_canary.py index d23f3fe..c0ba262 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -17,87 +17,87 @@ SERVER = "mail.example.com"  PASSWORD = "secret"  class TestCanary(unittest.TestCase): -	def setUp(self): -		self.db = mock.Mock(canarydb.CanaryDB) -		self.smtp = mock.Mock(smtplib.SMTP_SSL) -		self.canary = canary.Canary(self.db, self.smtp, FROM_ADDRESS) -		canary.emailutils.get_imap = mock.Mock() -		canary.emailutils.get_message = mock.Mock() -		canary.emailutils.get_mail_uids = mock.Mock() -		canary.emailutils.delete_message = mock.Mock() -		canary.emailutils.close = mock.Mock() - -	def tearDown(self): -		pass - -	def testChirp(self): -		# Setup Mock -		self.db.get_recipients_for_list.return_value = [USER_ADDRESS1, USER_ADDRESS2] - -		# Test chirp -		self.canary.chirp(LIST_ADDRESS) - -		# Assert DB updated -		self.db.get_recipients_for_list.assert_called_with(LIST_ADDRESS) -		self.db.ping.assert_has_calls( \ -			[mock.call(LIST_ADDRESS, USER_ADDRESS1, mock.ANY, mock.ANY), \ -			 mock.call(LIST_ADDRESS, USER_ADDRESS2, mock.ANY, mock.ANY)]) -		args = self.db.ping.call_args -		expectedSubject = "Canary Email " + args[0][3] - -		# Assert emails were sent -		self.assertEqual(1, self.smtp.sendmail.call_count) -		args = self.smtp.sendmail.call_args[0] -		self.assertEqual(FROM_ADDRESS, args[0]) -		self.assertEqual(LIST_ADDRESS, args[1]) -		msg = email.message_from_string(args[2]) -		self.assertEqual(FROM_ADDRESS, msg['From']) -		self.assertEqual(LIST_ADDRESS, msg['To']) -		self.assertEqual(expectedSubject, msg['Subject']) - -	def testCheck(self): -		# Setup mocks -		expectedUUID = "1234-5678-9012-3456" -		self.db.get_accounts.return_value = [ \ -			(LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD), \ -			(LIST_ADDRESS, USER_ADDRESS2, SERVER, PASSWORD)] -		canary.emailutils.get_mail_uids.return_value = [1] -		canary.emailutils.get_message.return_value = {'Subject': "Canary Email " + expectedUUID} - -		# Test check -		self.canary.check(LIST_ADDRESS) - -		# Assert DB calls -		self.db.get_accounts.assert_called_with(LIST_ADDRESS) -		self.db.pong.assert_has_calls([ \ -			mock.call(USER_ADDRESS1, mock.ANY, expectedUUID), \ -			mock.call(USER_ADDRESS2, mock.ANY, expectedUUID)]) - -		# Assert mail calls -		canary.emailutils.get_imap.assert_has_calls([ \ -			mock.call(SERVER, USER_ADDRESS1, PASSWORD), \ -			mock.call(SERVER, USER_ADDRESS2, PASSWORD)]) -		canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) -		canary.emailutils.delete_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) -		canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) - -	def testDontDeleteOtherMail(self): -		# Setup mocks -		self.db.get_accounts.return_value = [(LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD)] -		canary.emailutils.get_mail_uids.return_value = [1] -		canary.emailutils.get_message.return_value = {'Subject': "Buy Our New Widget"} - -		# Test check -		self.canary.check(LIST_ADDRESS) - -		# Assert DB calls -		self.db.get_accounts.assert_called_with(LIST_ADDRESS) -		self.db.pong.assert_not_called() - -		# Assert mail calls -		canary.emailutils.get_imap.assert_called_with(SERVER, USER_ADDRESS1, PASSWORD) -		canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) -		canary.emailutils.delete_message.assert_not_called() -		canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) +    def setUp(self): +        self.db = mock.Mock(canarydb.CanaryDB) +        self.smtp = mock.Mock(smtplib.SMTP_SSL) +        self.canary = canary.Canary(self.db, self.smtp, FROM_ADDRESS) +        canary.emailutils.get_imap = mock.Mock() +        canary.emailutils.get_message = mock.Mock() +        canary.emailutils.get_mail_uids = mock.Mock() +        canary.emailutils.delete_message = mock.Mock() +        canary.emailutils.close = mock.Mock() + +    def tearDown(self): +        pass + +    def testChirp(self): +        # Setup Mock +        self.db.get_recipients_for_list.return_value = [USER_ADDRESS1, USER_ADDRESS2] + +        # Test chirp +        self.canary.chirp(LIST_ADDRESS) + +        # Assert DB updated +        self.db.get_recipients_for_list.assert_called_with(LIST_ADDRESS) +        self.db.ping.assert_has_calls( \ +            [mock.call(LIST_ADDRESS, USER_ADDRESS1, mock.ANY, mock.ANY), \ +             mock.call(LIST_ADDRESS, USER_ADDRESS2, mock.ANY, mock.ANY)]) +        args = self.db.ping.call_args +        expectedSubject = "Canary Email " + args[0][3] + +        # Assert emails were sent +        self.assertEqual(1, self.smtp.sendmail.call_count) +        args = self.smtp.sendmail.call_args[0] +        self.assertEqual(FROM_ADDRESS, args[0]) +        self.assertEqual(LIST_ADDRESS, args[1]) +        msg = email.message_from_string(args[2]) +        self.assertEqual(FROM_ADDRESS, msg['From']) +        self.assertEqual(LIST_ADDRESS, msg['To']) +        self.assertEqual(expectedSubject, msg['Subject']) + +    def testCheck(self): +        # Setup mocks +        expectedUUID = "1234-5678-9012-3456" +        self.db.get_accounts.return_value = [ \ +            (LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD), \ +            (LIST_ADDRESS, USER_ADDRESS2, SERVER, PASSWORD)] +        canary.emailutils.get_mail_uids.return_value = [1] +        canary.emailutils.get_message.return_value = {'Subject': "Canary Email " + expectedUUID} + +        # Test check +        self.canary.check(LIST_ADDRESS) + +        # Assert DB calls +        self.db.get_accounts.assert_called_with(LIST_ADDRESS) +        self.db.pong.assert_has_calls([ \ +            mock.call(USER_ADDRESS1, mock.ANY, expectedUUID), \ +            mock.call(USER_ADDRESS2, mock.ANY, expectedUUID)]) + +        # Assert mail calls +        canary.emailutils.get_imap.assert_has_calls([ \ +            mock.call(SERVER, USER_ADDRESS1, PASSWORD), \ +            mock.call(SERVER, USER_ADDRESS2, PASSWORD)]) +        canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) +        canary.emailutils.delete_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) +        canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) + +    def testDontDeleteOtherMail(self): +        # Setup mocks +        self.db.get_accounts.return_value = [(LIST_ADDRESS, USER_ADDRESS1, SERVER, PASSWORD)] +        canary.emailutils.get_mail_uids.return_value = [1] +        canary.emailutils.get_message.return_value = {'Subject': "Buy Our New Widget"} + +        # Test check +        self.canary.check(LIST_ADDRESS) + +        # Assert DB calls +        self.db.get_accounts.assert_called_with(LIST_ADDRESS) +        self.db.pong.assert_not_called() + +        # Assert mail calls +        canary.emailutils.get_imap.assert_called_with(SERVER, USER_ADDRESS1, PASSWORD) +        canary.emailutils.get_message.assert_called_with(canary.emailutils.get_imap.return_value, 1) +        canary.emailutils.delete_message.assert_not_called() +        canary.emailutils.close.assert_called_with(canary.emailutils.get_imap.return_value) diff --git a/tests/test_canarydb.py b/tests/test_canarydb.py index a51afdc..195387e 100644 --- a/tests/test_canarydb.py +++ b/tests/test_canarydb.py @@ -5,119 +5,119 @@ import datetime  from emailcanary import canarydb  class TestCanaryDB(unittest.TestCase): -	def setUp(self): -		self.tempdir = tempfile.mkdtemp() -		self.db = canarydb.CanaryDB(self.tempdir + "canary.db") - -	def tearDown(self): -		self.db.close() -		shutil.rmtree(self.tempdir) - -	def testPingCheckPong(self): -		listAddress = "list@example.com" -		address = "test@example.com" -		time = datetime.datetime(2015, 10, 24, 9, 00) -		uuid = "1234" -		expectedDelta = datetime.datetime.now() - time - -		# Record a Ping -		self.db.ping(listAddress, address, time, uuid) - -		# Check for missing pongs -		missing = self.db.get_missing_pongs() - -		self.assertEqual(1, len(missing)) -		firstMissing = missing[0] -		self.assertEqual(4, len(firstMissing)) -		self.assertEqual(listAddress, firstMissing[0]) -		self.assertEqual(uuid, firstMissing[1]) -		self.assertEqual(address, firstMissing[2]) -		delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() -		self.assertTrue(delta <= 10) - -		# Record a pong -		pongtime = datetime.datetime(2015, 10, 24, 9, 05) -		self.db.pong(address, pongtime, uuid) - -		# Check for missing pongs -		missing = self.db.get_missing_pongs() -		self.assertEqual(0, len(missing)) - -	def testCloseReopen(self): -		listAddress = "list@example.com" -		address = "test@example.com" -		time = datetime.datetime(2015, 10, 24, 9, 00) -		uuid = "1234" -		expectedDelta = datetime.datetime.now() - time - -		# Record a Ping -		self.db.ping(listAddress, address, time, uuid) - -		# Close, Reopen -		self.db.close() -		self.db = canarydb.CanaryDB(self.tempdir + "canary.db") - -		# Check for missing pongs -		missing = self.db.get_missing_pongs() - -		self.assertEqual(1, len(missing)) -		firstMissing = missing[0] -		self.assertEqual(4, len(firstMissing)) -		self.assertEqual(listAddress, firstMissing[0]) -		self.assertEqual(uuid, firstMissing[1]) -		self.assertEqual(address, firstMissing[2]) -		delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() -		self.assertTrue(delta <= 10) - -	def testAccounts(self): -		listAddress = "list@example.org" -		address = "user@example.net" -		imapserver = "imap.example.net" -		password = "secretpassword" - -		# Verify that no accounts exist -		accounts = self.db.get_accounts() -		self.assertEqual(0, len(accounts)) - -		# Add one account -		self.db.add_account(listAddress, address, imapserver, password) - -		# Verify that the account exists -		accounts = self.db.get_accounts() -		self.assertEqual(1, len(accounts)) -		self.assertEqual(listAddress, accounts[0][0]) -		self.assertEqual(address, accounts[0][1]) -		self.assertEqual(imapserver, accounts[0][2]) -		self.assertEqual(password, accounts[0][3]) - -		# Remove the account -		self.db.remove_account(listAddress, address) -		accounts = self.db.get_accounts() -		self.assertEqual(0, len(accounts)) - -	def testGetRecipientsForList(self): -		listAddress1 = "list1@example.org" -		listAddress2 = "list2@example.org" -		imapserver = "imap.example.net" -		password = "secretpassword" -		address1 = "user1@example.net" -		address2 = "user2@example.net" - -		# No accounts -		self.assertEqual([], self.db.get_recipients_for_list(listAddress1)); -		self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - -		# One account -		self.db.add_account(listAddress1, address1, imapserver, password) -		self.assertEqual([address1], self.db.get_recipients_for_list(listAddress1)); -		self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - -		# Two accounts -		self.db.add_account(listAddress1, address2, imapserver, password) -		self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); -		self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); - -		# Two lists -		self.db.add_account(listAddress2, address1, imapserver, password) -		self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); -		self.assertEqual([address1], self.db.get_recipients_for_list(listAddress2)); +    def setUp(self): +        self.tempdir = tempfile.mkdtemp() +        self.db = canarydb.CanaryDB(self.tempdir + "canary.db") + +    def tearDown(self): +        self.db.close() +        shutil.rmtree(self.tempdir) + +    def testPingCheckPong(self): +        listAddress = "list@example.com" +        address = "test@example.com" +        time = datetime.datetime(2015, 10, 24, 9, 00) +        uuid = "1234" +        expectedDelta = datetime.datetime.now() - time + +        # Record a Ping +        self.db.ping(listAddress, address, time, uuid) + +        # Check for missing pongs +        missing = self.db.get_missing_pongs() + +        self.assertEqual(1, len(missing)) +        firstMissing = missing[0] +        self.assertEqual(4, len(firstMissing)) +        self.assertEqual(listAddress, firstMissing[0]) +        self.assertEqual(uuid, firstMissing[1]) +        self.assertEqual(address, firstMissing[2]) +        delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() +        self.assertTrue(delta <= 10) + +        # Record a pong +        pongtime = datetime.datetime(2015, 10, 24, 9, 05) +        self.db.pong(address, pongtime, uuid) + +        # Check for missing pongs +        missing = self.db.get_missing_pongs() +        self.assertEqual(0, len(missing)) + +    def testCloseReopen(self): +        listAddress = "list@example.com" +        address = "test@example.com" +        time = datetime.datetime(2015, 10, 24, 9, 00) +        uuid = "1234" +        expectedDelta = datetime.datetime.now() - time + +        # Record a Ping +        self.db.ping(listAddress, address, time, uuid) + +        # Close, Reopen +        self.db.close() +        self.db = canarydb.CanaryDB(self.tempdir + "canary.db") + +        # Check for missing pongs +        missing = self.db.get_missing_pongs() + +        self.assertEqual(1, len(missing)) +        firstMissing = missing[0] +        self.assertEqual(4, len(firstMissing)) +        self.assertEqual(listAddress, firstMissing[0]) +        self.assertEqual(uuid, firstMissing[1]) +        self.assertEqual(address, firstMissing[2]) +        delta = firstMissing[3].total_seconds() - expectedDelta.total_seconds() +        self.assertTrue(delta <= 10) + +    def testAccounts(self): +        listAddress = "list@example.org" +        address = "user@example.net" +        imapserver = "imap.example.net" +        password = "secretpassword" + +        # Verify that no accounts exist +        accounts = self.db.get_accounts() +        self.assertEqual(0, len(accounts)) + +        # Add one account +        self.db.add_account(listAddress, address, imapserver, password) + +        # Verify that the account exists +        accounts = self.db.get_accounts() +        self.assertEqual(1, len(accounts)) +        self.assertEqual(listAddress, accounts[0][0]) +        self.assertEqual(address, accounts[0][1]) +        self.assertEqual(imapserver, accounts[0][2]) +        self.assertEqual(password, accounts[0][3]) + +        # Remove the account +        self.db.remove_account(listAddress, address) +        accounts = self.db.get_accounts() +        self.assertEqual(0, len(accounts)) + +    def testGetRecipientsForList(self): +        listAddress1 = "list1@example.org" +        listAddress2 = "list2@example.org" +        imapserver = "imap.example.net" +        password = "secretpassword" +        address1 = "user1@example.net" +        address2 = "user2@example.net" + +        # No accounts +        self.assertEqual([], self.db.get_recipients_for_list(listAddress1)); +        self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + +        # One account +        self.db.add_account(listAddress1, address1, imapserver, password) +        self.assertEqual([address1], self.db.get_recipients_for_list(listAddress1)); +        self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + +        # Two accounts +        self.db.add_account(listAddress1, address2, imapserver, password) +        self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); +        self.assertEqual([], self.db.get_recipients_for_list(listAddress2)); + +        # Two lists +        self.db.add_account(listAddress2, address1, imapserver, password) +        self.assertEqual([address1, address2], self.db.get_recipients_for_list(listAddress1)); +        self.assertEqual([address1], self.db.get_recipients_for_list(listAddress2)); | 
