import unittest

class ConfigTestBase:

    def setUp(self):
        import App.config
        self._old_config = App.config._config

    def tearDown(self):
        import App.config
        App.config._config = self._old_config

    def _makeConfig(self, **kw):
        import App.config
        class DummyConfig:
            pass
        App.config._config = config = DummyConfig()
        config.dbtab = DummyDBTab(kw)
        return config

class FakeConnectionTests(unittest.TestCase):

    def _getTargetClass(self):
        from App.ApplicationManager import FakeConnection
        return FakeConnection

    def _makeOne(self, db, parent_jar):
        return self._getTargetClass()(db, parent_jar)

    def test_holds_db(self):
        db = object()
        parent_jar = object()
        fc = self._makeOne(db, parent_jar)
        self.assertTrue(fc.db() is db)

class DatabaseChooserTests(ConfigTestBase, unittest.TestCase):

    def _getTargetClass(self):
        from App.ApplicationManager import DatabaseChooser
        return DatabaseChooser

    def _makeOne(self, id):
        return self._getTargetClass()(id)

    def _makeRoot(self):
        from ExtensionClass import Base
        class Root(Base):
            _p_jar = None
            def getPhysicalRoot(self):
                return self
        return Root()

    def test_getDatabaseNames_sorted(self):
        self._makeConfig(foo=object(), bar=object(), qux=object())
        dc = self._makeOne('test')
        self.assertEqual(list(dc.getDatabaseNames()), ['bar', 'foo', 'qux'])

    def test___getitem___miss(self):
        self._makeConfig(foo=object(), bar=object(), qux=object())
        dc = self._makeOne('test')
        self.assertRaises(KeyError, dc.__getitem__, 'nonesuch')

    def test___getitem___hit(self):
        from App.ApplicationManager import AltDatabaseManager
        from App.ApplicationManager import FakeConnection
        foo=object()
        bar=object()
        qux=object()
        self._makeConfig(foo=foo, bar=bar, qux=qux)
        root = self._makeRoot()
        dc = self._makeOne('test').__of__(root)
        found = dc['foo']
        self.assertTrue(isinstance(found, AltDatabaseManager))
        self.assertEqual(found.id, 'foo')
        self.assertTrue(found.aq_parent is dc)
        conn = found._p_jar
        self.assertTrue(isinstance(conn, FakeConnection))
        self.assertTrue(conn.db() is foo)

    def test___bobo_traverse___miss(self):
        self._makeConfig(foo=object(), bar=object(), qux=object())
        dc = self._makeOne('test')
        self.assertRaises(AttributeError,
                          dc.__bobo_traverse__, None, 'nonesuch')

    def test___bobo_traverse___hit_db(self):
        from App.ApplicationManager import AltDatabaseManager
        from App.ApplicationManager import FakeConnection
        foo=object()
        bar=object()
        qux=object()
        self._makeConfig(foo=foo, bar=bar, qux=qux)
        root = self._makeRoot()
        dc = self._makeOne('test').__of__(root)
        found = dc.__bobo_traverse__(None, 'foo')
        self.assertTrue(isinstance(found, AltDatabaseManager))
        self.assertEqual(found.id, 'foo')
        self.assertTrue(found.aq_parent is dc)
        conn = found._p_jar
        self.assertTrue(isinstance(conn, FakeConnection))
        self.assertTrue(conn.db() is foo)

    def test___bobo_traverse___miss_db_hit_attr(self):
        foo=object()
        bar=object()
        qux=object()
        self._makeConfig(foo=foo, bar=bar, qux=qux)
        root = self._makeRoot()
        dc = self._makeOne('test').__of__(root)
        dc.spam = spam = object()
        found = dc.__bobo_traverse__(None, 'spam')
        self.assertTrue(found is spam)

    def test_tpValues(self):
        from App.ApplicationManager import AltDatabaseManager
        foo=object()
        bar=object()
        qux=object()
        self._makeConfig(foo=foo, bar=bar, qux=qux)
        root = self._makeRoot()
        dc = self._makeOne('test').__of__(root)
        values = dc.tpValues()
        self.assertEqual(len(values), 3)
        self.assertTrue(isinstance(values[0], AltDatabaseManager))
        self.assertEqual(values[0].id, 'bar')
        self.assertEqual(values[0]._p_jar, None)
        self.assertTrue(isinstance(values[1], AltDatabaseManager))
        self.assertEqual(values[1].id, 'foo')
        self.assertEqual(values[1]._p_jar, None)
        self.assertTrue(isinstance(values[2], AltDatabaseManager))
        self.assertEqual(values[2].id, 'qux')
        self.assertEqual(values[2]._p_jar, None)


class DebugManagerTests(unittest.TestCase):

    def setUp(self):
        import sys
        self._sys = sys
        self._old_sys_modules = sys.modules.copy()

    def tearDown(self):
        self._sys.modules.clear()
        self._sys.modules.update(self._old_sys_modules)

    def _getTargetClass(self):
        from App.ApplicationManager import DebugManager
        return DebugManager

    def _makeOne(self, id):
        return self._getTargetClass()(id)

    def _makeModuleClasses(self):
        import sys
        import types
        from ExtensionClass import Base
        class Foo(Base):
            pass
        class Bar(Base):
            pass
        class Baz(Base):
            pass
        foo = sys.modules['foo'] = types.ModuleType('foo')
        foo.Foo = Foo
        Foo.__module__ = 'foo'
        foo.Bar = Bar
        Bar.__module__ = 'foo'
        qux = sys.modules['qux'] = types.ModuleType('qux')
        qux.Baz = Baz
        Baz.__module__ = 'qux'
        return Foo, Bar, Baz

    def test_refcount_no_limit(self):
        import sys
        dm = self._makeOne('test')
        Foo, Bar, Baz = self._makeModuleClasses()
        pairs = dm.refcount()
        # XXX : Ugly empiricism here:  I don't know why the count is up 1.
        foo_count = sys.getrefcount(Foo)
        self.assertTrue((foo_count+1, 'foo.Foo') in pairs)
        bar_count = sys.getrefcount(Bar)
        self.assertTrue((bar_count+1, 'foo.Bar') in pairs)
        baz_count = sys.getrefcount(Baz)
        self.assertTrue((baz_count+1, 'qux.Baz') in pairs)

    def test_refdict(self):
        import sys
        dm = self._makeOne('test')
        Foo, Bar, Baz = self._makeModuleClasses()
        mapping = dm.refdict()
        # XXX : Ugly empiricism here:  I don't know why the count is up 1.
        foo_count = sys.getrefcount(Foo)
        self.assertEqual(mapping['foo.Foo'], foo_count+1)
        bar_count = sys.getrefcount(Bar)
        self.assertEqual(mapping['foo.Bar'], bar_count+1)
        baz_count = sys.getrefcount(Baz)
        self.assertEqual(mapping['qux.Baz'], baz_count+1)

    def test_rcsnapshot(self):
        import sys
        import App.ApplicationManager
        from DateTime.DateTime import DateTime
        dm = self._makeOne('test')
        Foo, Bar, Baz = self._makeModuleClasses()
        before = DateTime()
        dm.rcsnapshot()
        after = DateTime()
        # XXX : Ugly empiricism here:  I don't know why the count is up 1.
        self.assertTrue(before <= App.ApplicationManager._v_rst <= after)
        mapping = App.ApplicationManager._v_rcs
        foo_count = sys.getrefcount(Foo)
        self.assertEqual(mapping['foo.Foo'], foo_count+1)
        bar_count = sys.getrefcount(Bar)
        self.assertEqual(mapping['foo.Bar'], bar_count+1)
        baz_count = sys.getrefcount(Baz)
        self.assertEqual(mapping['qux.Baz'], baz_count+1)

    def test_rcdate(self):
        import App.ApplicationManager
        dummy = object()
        App.ApplicationManager._v_rst = dummy
        dm = self._makeOne('test')
        found = dm.rcdate()
        App.ApplicationManager._v_rst = None
        self.assertTrue(found is dummy)

    def test_rcdeltas(self):
        dm = self._makeOne('test')
        dm.rcsnapshot()
        Foo, Bar, Baz = self._makeModuleClasses()
        mappings = dm.rcdeltas()
        self.assertTrue(len(mappings))
        mapping = mappings[0]
        self.assertTrue('rc' in mapping)
        self.assertTrue('pc' in mapping)
        self.assertEqual(mapping['delta'], mapping['rc'] - mapping['pc'])

    #def test_dbconnections(self):  XXX -- TOO UGLY TO TEST
    #def test_manage_profile_stats(self):  XXX -- TOO UGLY TO TEST

    def test_manage_profile_reset(self):
        import sys
        from ZPublisher import Publish
        _old_sys__ps_ = getattr(sys, '_ps_', self)
        _old_Publish_pstat = getattr(Publish, '_pstat', self)
        sys._ps_ = Publish._pstat = object()
        try:
            dm = self._makeOne('test')
            dm.manage_profile_reset()
        finally:
            if _old_sys__ps_ is not self:
                sys._ps_ = _old_sys__ps_
            if _old_Publish_pstat is not self:
                Publish._pstat = _old_Publish_pstat
        self.assertTrue(sys._ps_ is None)
        self.assertTrue(Publish._pstat is None)

    def test_manage_getSysPath(self):
        import sys
        dm = self._makeOne('test')
        self.assertEqual(dm.manage_getSysPath(), list(sys.path))


class DBProxyTestsBase:

    def _makeOne(self):
        return self._getTargetClass()()

    def _makeJar(self, dbname, dbsize):
        class Jar:
            def db(self):
                return self._db
        jar = Jar()
        jar._db = DummyDB(dbname, dbsize)
        return jar

    def test_db_name(self):
        am = self._makeOne()
        am._p_jar = self._makeJar('foo', '')
        self.assertEqual(am.db_name(), 'foo')

    def test_db_size_string(self):
        am = self._makeOne()
        am._p_jar = self._makeJar('foo', 'super')
        self.assertEqual(am.db_size(), 'super')

    def test_db_size_lt_1_meg(self):
        am = self._makeOne()
        am._p_jar = self._makeJar('foo', 4497)
        self.assertEqual(am.db_size(), '4.4K')

    def test_db_size_gt_1_meg(self):
        am = self._makeOne()
        am._p_jar = self._makeJar('foo', (2048 * 1024) + 123240)
        self.assertEqual(am.db_size(), '2.1M')

    def test_manage_pack(self):
        am = self._makeOne()
        jar = am._p_jar = self._makeJar('foo', '')
        am.manage_pack(1, _when=86400*2)
        self.assertEqual(jar._db._packed, 86400)

class ApplicationManagerTests(ConfigTestBase,
                              DBProxyTestsBase,
                              unittest.TestCase,
                             ):

    def setUp(self):
        ConfigTestBase.setUp(self)
        self._tempdirs = ()

    def tearDown(self):
        import shutil
        for tempdir in self._tempdirs:
            shutil.rmtree(tempdir)
        ConfigTestBase.tearDown(self)

    def _getTargetClass(self):
        from App.ApplicationManager import ApplicationManager
        return ApplicationManager

    def _makeTempdir(self):
        import tempfile
        tmp = tempfile.mkdtemp()
        self._tempdirs += (tmp,)
        return tmp

    def _makeFile(self, dir, name, text):
        import os
        os.makedirs(dir)
        fqn = os.path.join(dir, name)
        f = open(fqn, 'w')
        f.write(text)
        f.flush()
        f.close()
        return fqn

    def test_version_txt(self):
        from App.version_txt import version_txt
        am = self._makeOne()
        self.assertEqual(am.version_txt(), version_txt())

    def test_sys_version(self):
        import sys
        am = self._makeOne()
        self.assertEqual(am.sys_version(), sys.version)

    def test_sys_platform(self):
        import sys
        am = self._makeOne()
        self.assertEqual(am.sys_platform(), sys.platform)

    def test__canCopy(self):
        am = self._makeOne()
        self.assertFalse(am._canCopy())

    def test_manage_app(self):
        from zExceptions import Redirect
        am = self._makeOne()
        try:
            am.manage_app('http://example.com/foo')
        except Redirect, v:
            self.assertEqual(v.args, ('http://example.com/foo/manage',))
        else:
            self.fail('Redirect not raised')

    def test_process_time_seconds(self):
        am = self._makeOne()
        am.process_start = 0
        self.assertEqual(am.process_time(0).strip(), '0 sec')
        self.assertEqual(am.process_time(1).strip(), '1 sec')
        self.assertEqual(am.process_time(2).strip(), '2 sec')

    def test_process_time_minutes(self):
        am = self._makeOne()
        am.process_start = 0
        self.assertEqual(am.process_time(60).strip(), '1 min 0 sec')
        self.assertEqual(am.process_time(61).strip(), '1 min 1 sec')
        self.assertEqual(am.process_time(62).strip(), '1 min 2 sec')
        self.assertEqual(am.process_time(120).strip(), '2 min 0 sec')
        self.assertEqual(am.process_time(121).strip(), '2 min 1 sec')
        self.assertEqual(am.process_time(122).strip(), '2 min 2 sec')

    def test_process_time_hours(self):
        am = self._makeOne()
        am.process_start = 0
        n1 = 60 * 60
        n2 = n1 * 2
        self.assertEqual(am.process_time(n1).strip(),
                         '1 hour  0 sec')
        self.assertEqual(am.process_time(n1 + 61).strip(),
                         '1 hour 1 min 1 sec')
        self.assertEqual(am.process_time(n2 + 1).strip(),
                         '2 hours  1 sec')
        self.assertEqual(am.process_time(n2 + 122).strip(),
                         '2 hours 2 min 2 sec')

    def test_process_time_days(self):
        am = self._makeOne()
        am.process_start = 0
        n1 = 60 * 60 * 24
        n2 = n1 * 2
        self.assertEqual(am.process_time(n1).strip(),
                         '1 day   0 sec')
        self.assertEqual(am.process_time(n1 + 3661).strip(),
                         '1 day 1 hour 1 min 1 sec')
        self.assertEqual(am.process_time(n2 + 1).strip(),
                         '2 days   1 sec')
        self.assertEqual(am.process_time(n2 + 7322).strip(),
                         '2 days 2 hours 2 min 2 sec')

    def test_thread_get_ident(self):
        import thread
        am = self._makeOne()
        self.assertEqual(am.thread_get_ident(), thread.get_ident())

    #def test_manage_restart(self):  XXX -- TOO UGLY TO TEST
    #def test_manage_restart(self):  XXX -- TOO UGLY TO TEST

    def test_revert_points(self):
        am = self._makeOne()
        self.assertEqual(list(am.revert_points()), [])

    def test_version_list(self):
        # XXX this method is too stupid to live:  returning a bare list
        #     of versions without even tying them to the products?
        #     and what about products living outside SOFTWARE_HOME?
        #     Nobody calls it, either
        import os
        am = self._makeOne()
        config = self._makeConfig()
        swdir = config.softwarehome = self._makeTempdir()
        foodir = os.path.join(swdir, 'Products', 'foo')
        self._makeFile(foodir, 'VERSION.TXT', '1.2')
        bardir = os.path.join(swdir, 'Products', 'bar')
        self._makeFile(bardir, 'VERSION.txt', '3.4')
        bazdir = os.path.join(swdir, 'Products', 'baz')
        self._makeFile(bazdir, 'version.txt', '5.6')
        versions = am.version_list()
        self.assertEqual(versions, ['3.4', '5.6', '1.2'])

    def test_getSOFTWARE_HOME_missing(self):
        am = self._makeOne()
        config = self._makeConfig()
        self.assertEqual(am.getSOFTWARE_HOME(), None)

    def test_getSOFTWARE_HOME_present(self):
        am = self._makeOne()
        config = self._makeConfig()
        swdir = config.softwarehome = self._makeTempdir()
        self.assertEqual(am.getSOFTWARE_HOME(), swdir)

    def test_getZOPE_HOME_missing(self):
        am = self._makeOne()
        config = self._makeConfig()
        self.assertEqual(am.getZOPE_HOME(), None)

    def test_getZOPE_HOME_present(self):
        am = self._makeOne()
        config = self._makeConfig()
        zopedir = config.zopehome = self._makeTempdir()
        self.assertEqual(am.getZOPE_HOME(), zopedir)

    def test_getINSTANCE_HOME(self):
        am = self._makeOne()
        config = self._makeConfig()
        instdir = config.instancehome = self._makeTempdir()
        self.assertEqual(am.getINSTANCE_HOME(), instdir)

    def test_getCLIENT_HOME(self):
        am = self._makeOne()
        config = self._makeConfig()
        cldir = config.clienthome = self._makeTempdir()
        self.assertEqual(am.getCLIENT_HOME(), cldir)

    def test_getServers(self):
        from asyncore import socket_map

        class DummySocketServer:
            def __init__(self, port):
                self.port = port

        class AnotherSocketServer(DummySocketServer):
            pass

        class NotAServer:
            pass

        am = self._makeOne()
        _old_socket_map = socket_map.copy()
        socket_map.clear()
        socket_map['foo'] = DummySocketServer(45)
        socket_map['bar'] = AnotherSocketServer(57)
        socket_map['qux'] = NotAServer()

        try:
            pairs = am.getServers()
        finally:
            socket_map.clear()
            socket_map.update(_old_socket_map)

        self.assertEqual(len(pairs), 2)
        self.assertTrue((str(DummySocketServer), 'Port: 45') in pairs)
        self.assertTrue((str(AnotherSocketServer), 'Port: 57') in pairs)

    #def test_objectIds(self):  XXX -- TOO UGLY TO TEST (BBB for Zope 2.3!!)


class AltDatabaseManagerTests(DBProxyTestsBase,
                              unittest.TestCase,
                             ):

    def _getTargetClass(self):
        from App.ApplicationManager import AltDatabaseManager
        return AltDatabaseManager


class DummyDBTab:
    def __init__(self, databases=None):
        self._databases = databases or {}

    def listDatabaseNames(self):
        return self._databases.keys()

    def hasDatabase(self, name):
        return name in self._databases

    def getDatabase(self, name):
        return self._databases[name]

class DummyDB:

    _packed = None

    def __init__(self, name, size):
        self._name = name
        self._size = size

    def getName(self):
        return self._name

    def getSize(self):
        return self._size

    def pack(self, when):
        self._packed = when

def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(FakeConnectionTests),
        unittest.makeSuite(DatabaseChooserTests),
        unittest.makeSuite(DebugManagerTests),
        unittest.makeSuite(ApplicationManagerTests),
        unittest.makeSuite(AltDatabaseManagerTests),
    ))