testAuth.py 4.78 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for AuthZEO."""

import os
import tempfile
import time
import unittest

21
from ZEO import zeopasswd
22
from ZEO.Exceptions import ClientDisconnected
23 24 25 26 27 28
from ZEO.tests.ConnectionTests import CommonSetupTearDown

class AuthTest(CommonSetupTearDown):
    __super_getServerConfig = CommonSetupTearDown.getServerConfig
    __super_setUp = CommonSetupTearDown.setUp
    __super_tearDown = CommonSetupTearDown.tearDown
29

30 31 32 33 34 35 36 37 38 39
    realm = None

    def setUp(self):
        self.pwfile = tempfile.mktemp()
        if self.realm:
            self.pwdb = self.dbclass(self.pwfile, self.realm)
        else:
            self.pwdb = self.dbclass(self.pwfile)
        self.pwdb.add_user("foo", "bar")
        self.pwdb.save()
40
        self._checkZEOpasswd()
41 42
        self.__super_setUp()

43 44 45 46 47 48 49 50 51 52
    def _checkZEOpasswd(self):
        args = ["-f", self.pwfile, "-p", self.protocol]
        if self.protocol == "plaintext":
            from ZEO.auth.base import Database
            zeopasswd.main(args + ["-d", "foo"], Database)
            zeopasswd.main(args + ["foo", "bar"], Database)
        else:
            zeopasswd.main(args + ["-d", "foo"])
            zeopasswd.main(args + ["foo", "bar"])

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    def tearDown(self):
        self.__super_tearDown()
        os.remove(self.pwfile)

    def getConfig(self, path, create, read_only):
        return "<mappingstorage 1/>"

    def getServerConfig(self, addr, ro_svr):
        zconf = self.__super_getServerConfig(addr, ro_svr)
        zconf.authentication_protocol = self.protocol
        zconf.authentication_database = self.pwfile
        zconf.authentication_realm = self.realm
        return zconf

    def wait(self):
        for i in range(25):
69
            time.sleep(0.1)
70 71 72 73 74 75 76 77 78 79 80 81 82 83
            if self._storage.test_connection:
                return
        self.fail("Timed out waiting for client to authenticate")

    def testOK(self):
        # Sleep for 0.2 seconds to give the server some time to start up
        # seems to be needed before and after creating the storage
        self._storage = self.openClientStorage(wait=0, username="foo",
                                              password="bar", realm=self.realm)
        self.wait()

        self.assert_(self._storage._connection)
        self._storage._connection.poll()
        self.assert_(self._storage.is_connected())
84 85 86
        # Make a call to make sure the mechanism is working
        self._storage.versions()

87 88 89 90 91 92 93 94
    def testNOK(self):
        self._storage = self.openClientStorage(wait=0, username="foo",
                                              password="noogie",
                                              realm=self.realm)
        self.wait()
        # If the test established a connection, then it failed.
        self.failIf(self._storage._connection)

95 96 97 98 99 100 101 102 103 104 105 106 107 108
    def testUnauthenticatedMessage(self):
        # Test that an unauthenticated message is rejected by the server
        # if it was sent after the connection was authenticated.
        # Sleep for 0.2 seconds to give the server some time to start up
        # seems to be needed before and after creating the storage
        self._storage = self.openClientStorage(wait=0, username="foo",
                                              password="bar", realm=self.realm)
        self.wait()
        self._storage.versions()
        # Manually clear the state of the hmac connection
        self._storage._connection._SizedMessageAsyncConnection__hmac_send = None
        # Once the client stops using the hmac, it should be disconnected.
        self.assertRaises(ClientDisconnected, self._storage.versions)

109

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
class PlainTextAuth(AuthTest):
    import ZEO.tests.auth_plaintext
    protocol = "plaintext"
    database = "authdb.sha"
    dbclass = ZEO.tests.auth_plaintext.Database
    realm = "Plaintext Realm"

class DigestAuth(AuthTest):
    import ZEO.auth.auth_digest
    protocol = "digest"
    database = "authdb.digest"
    dbclass = ZEO.auth.auth_digest.DigestDatabase
    realm = "Digest Realm"

test_classes = [PlainTextAuth, DigestAuth]

def test_suite():
    suite = unittest.TestSuite()
    for klass in test_classes:
        sub = unittest.makeSuite(klass)
        suite.addTest(sub)
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest='test_suite')