# -*- coding: utf-8 -*-
# Wendelin.core.bigfile | Tests for ZODB utilities and critical properties of ZODB itself
# Copyright (C) 2014-2022  Nexedi SA and Contributors.
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from wendelin.lib.zodb import LivePersistent, deactivate_btree, dbclose, zconn_at, zstor_2zurl, zmajor, _zhasNXDPatch, dbstoropen, zurl_normalize_main
from wendelin.lib.testing import getTestDB
from wendelin.lib import testing
from persistent import Persistent, UPTODATE, GHOST, CHANGED
from ZODB import DB, POSException
from ZODB.FileStorage import FileStorage
from ZODB.MappingStorage import MappingStorage
from ZODB.DemoStorage import DemoStorage
from BTrees.IOBTree import IOBTree
import transaction
from transaction import TransactionManager
from golang import defer, func
from pytest import raises
import pytest; xfail = pytest.mark.xfail
from ZEO.ClientStorage import ClientStorage as ZEOStorage
import os
from six.moves.urllib.parse import quote_plus

from wendelin.lib.tests.testprog import zopenrace, zloadrace

testdb = None

def dbopen():
    return testdb.dbopen()

def setup_module():
    global testdb
    testdb = getTestDB()
    testdb.setup()

def teardown_module():
    testdb.teardown()

# like db.cacheDetail(), but {} instead of []
def cacheInfo(db):
    return dict(db.cacheDetail())

# key for cacheInfo() result
def kkey(klass):
    return '%s.%s' % (klass.__module__, klass.__name__)

@func
def test_livepersistent():
    root = dbopen()
    transaction.commit()    # set root._p_jar
    db = root._p_jar.db()

    # ~~~ test `obj initially created` case
    root['live'] = lp = LivePersistent()
    assert lp._p_jar   is None          # connection does not know about it yet
    assert lp._p_state == UPTODATE      # object initially created in uptodate

    # should not be in cache yet & thus should stay after gc
    db.cacheMinimize()
    assert lp._p_jar   is None
    assert lp._p_state == UPTODATE
    ci = cacheInfo(db)
    assert kkey(LivePersistent) not in ci

    # should be registered to connection & cache after commit
    transaction.commit()
    assert lp._p_jar   is not None
    assert lp._p_state == UPTODATE
    ci = cacheInfo(db)
    assert ci[kkey(LivePersistent)] == 1

    # should stay that way after cache gc
    db.cacheMinimize()
    assert lp._p_jar   is not None
    assert lp._p_state == UPTODATE
    ci = cacheInfo(db)
    assert ci[kkey(LivePersistent)] == 1


    # ~~~ reopen & test `obj loaded from db` case
    dbclose(root)
    del root, db, lp

    root = dbopen()
    db = root._p_jar.db()

    # known to connection & cache & GHOST
    # right after first loading from DB
    lp = root['live']
    assert lp._p_jar   is not None
    assert lp._p_state is GHOST
    ci = cacheInfo(db)
    assert ci[kkey(LivePersistent)] == 1

    # should be UPTODATE for sure after read access
    getattr(lp, 'attr', None)
    assert lp._p_jar   is not None
    assert lp._p_state is UPTODATE
    ci = cacheInfo(db)
    assert ci[kkey(LivePersistent)] == 1

    # does not go back to ghost on cache gc
    db.cacheMinimize()
    assert lp._p_jar   is not None
    assert lp._p_state == UPTODATE
    ci = cacheInfo(db)
    assert ci[kkey(LivePersistent)] == 1

    # ok
    dbclose(root)
    del root, db, lp


    # demo that upon cache invalidation LivePersistent can go back to ghost
    root = dbopen()
    conn = root._p_jar
    db   = conn.db()
    conn.close()
    del root, conn

    tm1 = TransactionManager()
    tm2 = TransactionManager()

    conn1 = db.open(transaction_manager=tm1)
    root1 = conn1.root()
    defer(lambda: dbclose(root1))
    lp1 = root1['live']

    conn2 = db.open(transaction_manager=tm2)
    root2 = conn2.root()
    defer(conn2.close)
    lp2 = root2['live']

    # 2 connections are setup running in parallel with initial obj state as ghost
    assert lp1._p_jar   is conn1
    assert lp2._p_jar   is conn2

    assert lp1._p_state is GHOST
    assert lp2._p_state is GHOST

    # conn1: modify  ghost -> changed
    lp1.attr = 1

    assert lp1._p_state is CHANGED
    assert lp2._p_state is GHOST

    # conn2: read    ghost -> uptodate
    assert getattr(lp1, 'attr', None) == 1
    assert getattr(lp2, 'attr', None) is None

    assert lp1._p_state is CHANGED
    assert lp2._p_state is UPTODATE

    # conn1: commit  changed -> uptodate; conn2 untouched
    tm1.commit()

    assert lp1._p_state is UPTODATE
    assert lp2._p_state is UPTODATE

    assert getattr(lp1, 'attr', None) == 1
    assert getattr(lp2, 'attr', None) is None

    # conn2: commit  (nothing changed - just transaction boundary)
    #                 uptodate -> ghost (invalidation)
    tm2.commit()

    assert lp1._p_state is UPTODATE
    assert lp2._p_state is GHOST

    assert getattr(lp1, 'attr', None) == 1

    # conn2: after reading, the state is again uptodate + changes from conn1 are here
    a = getattr(lp2, 'attr', None)
    assert lp2._p_state is UPTODATE
    assert a == 1

    del conn2, root2


class XInt(Persistent):
    def __init__(self, i):
        self.i = i

def objscachedv(jar):
    return [obj for oid, obj in jar._cache.lru_items()]

@func
def test_deactivate_btree():
    root = dbopen()
    defer(lambda: dbclose(root))

    # init btree with many leaf nodes
    leafv = []
    root['btree'] = B = IOBTree()
    for i in range(10000):
        B[i] = xi = XInt(i)
        leafv.append(xi)
    transaction.commit()

    for npass in range(2):
        # access all elements making them live
        for _ in B.values():
            _._p_activate()

        # now B or/and some leaf nodes should be up-to-date and in cache
        cached = objscachedv(root._p_jar)
        nlive = 0
        for obj in [B] + leafv:
            if obj._p_state == UPTODATE:
                assert obj in cached
                nlive += 1
        assert nlive > 0

        # check how deactivate_btree() works dependently from initially BTree state
        if npass == 0:
            B._p_activate()
        else:
            B._p_deactivate()

        # after btree deactivation B & all leaf nodes should be in ghost state and not in cache
        deactivate_btree(B)
        cached = objscachedv(root._p_jar)
        for obj in [B] + leafv:
            assert obj._p_state == GHOST
            assert obj not in cached


# verify that zconn_at gives correct answer.
@func
def test_zconn_at():
    if zmajor == 4 and not _zhasNXDPatch('conn:MVCC-via-loadBefore-only'):
        pytest.xfail(reason="zconn_at needs https://lab.nexedi.com/nexedi/ZODB/merge_requests/1 to work on ZODB4")

    stor = testdb.getZODBStorage()
    defer(stor.close)
    db  = DB(stor)

    zsync(stor)
    at0 = stor.lastTransaction()

    # open connection, it must be viewing the database @at0
    tm1 = TransactionManager()
    conn1 = db.open(transaction_manager=tm1)
    assert zconn_at(conn1) == at0

    # open another simultaneous connection
    tm2 = TransactionManager()
    conn2 = db.open(transaction_manager=tm2)
    assert zconn_at(conn2) == at0

    # commit in conn1
    root1 = conn1.root()
    root1['z'] = 1
    tm1.commit()
    zsync(stor)
    at1 = stor.lastTransaction()

    # after commit conn1 view is updated; conn2 view stays @at0
    assert zconn_at(conn1) == at1
    assert zconn_at(conn2) == at0

    # reopen conn1 -> view @at1
    conn1.close()
    with raises(POSException.ConnectionStateError):
        zconn_at(conn1)
    assert zconn_at(conn2) == at0
    conn1_ = db.open(transaction_manager=tm1)
    assert conn1_ is conn1   # returned from DB pool
    assert zconn_at(conn1) == at1
    assert zconn_at(conn2) == at0
    conn1.close()

    # commit empty transaction - view stays in sync with storage head
    conn1_ = db.open(transaction_manager=tm1)
    assert conn1_ is conn1   # from DB pool
    assert zconn_at(conn1) == at1
    assert zconn_at(conn2) == at0
    tm1.commit()
    zsync(stor)
    at1_ = stor.lastTransaction()

    assert zconn_at(conn1) == at1_
    assert zconn_at(conn2) == at0


    # reopen conn2 -> view updated to @at1_
    conn2.close()
    conn2_ = db.open(transaction_manager=tm1)
    assert conn2_ is conn2  # from DB pool
    assert zconn_at(conn1) == at1_
    assert zconn_at(conn2) == at1_

    conn1.close()
    conn2.close()


    # verify with historic connection @at0
    tm_old = TransactionManager()
    defer(tm_old.abort)
    conn_at0 = db.open(transaction_manager=tm_old, at=at0)
    assert conn_at0 is not conn1
    assert conn_at0 is not conn2
    assert zconn_at(conn_at0) == at0


# verify that ZODB.Connection.onResyncCallback works
@func
def test_zodb_onresync():
    stor = testdb.getZODBStorage()
    defer(stor.close)
    db  = DB(stor)

    class T:
        def __init__(t):
            t.nresync = 0
        def on_connection_resync(t):
            t.nresync += 1

    t = T()

    conn = db.open()
    conn.onResyncCallback(t)
    assert t.nresync == 0

    # abort makes conn to enter new transaction
    transaction.abort()
    assert t.nresync == 1

    # close/reopen -> new transaction
    conn.close()
    assert t.nresync == 1
    conn_ = db.open()
    assert conn_ is conn
    assert t.nresync == 2

    # commit -> new transaction
    root = conn.root()
    root['r'] = 1
    assert t.nresync == 2
    transaction.commit()
    assert t.nresync == 3
    transaction.commit()
    assert t.nresync == 4
    transaction.commit()
    assert t.nresync == 5

    conn.close()


# verify that ZODB.Connection.onShutdownCallback works
@func
def test_zodb_onshutdown():
    stor = testdb.getZODBStorage()
    defer(stor.close)
    db  = DB(stor)

    class T:
        def __init__(t):
            t.nshutdown = 0
        def on_connection_shutdown(t):
            t.nshutdown += 1

    t1 = T()
    t2 = T()

    # conn1 stays alive outside of db.pool
    conn1 = db.open()
    conn1.onShutdownCallback(t1)

    # conn2 stays alive inside db.pool
    conn2 = db.open()
    conn2.onShutdownCallback(t2)
    conn2.close()

    assert t1.nshutdown == 0
    assert t2.nshutdown == 0

    # db.close triggers conn1 and conn2 shutdown
    db.close()
    assert t1.nshutdown == 1
    assert t2.nshutdown == 1


# test that zurl does not change from one open to another storage open.
def test_zurlstable():
    if not isinstance(testdb, (testing.TestDB_FileStorage, testing.TestDB_ZEO, testing.TestDB_NEO)):
        pytest.xfail(reason="zstor_2zurl is TODO for %r" % testdb)
    zurl0 = None
    for i in range(10):
        zstor = testdb.getZODBStorage()
        zurl  = zstor_2zurl(zstor)
        zstor.close()
        if i == 0:
            zurl0 = zurl
        else:
            assert zurl == zurl0


# test that ZODB database opened via storage's zurl, provides access to the same data.
@func
def test_zurlsamedb():
    stor1 = testdb.getZODBStorage()
    defer(stor1.close)

    # skip on FileStorage - ZODB/py fails with LockError on attempt to create
    # two FileStorage's connected to the same data.
    if isinstance(stor1, FileStorage):
        pytest.skip("skipping on FileStorage")

    zurl = zstor_2zurl(stor1)
    stor2 = dbstoropen(zurl)
    defer(stor2.close)

    db1 = DB(stor1)
    db2 = DB(stor2)

    # get/set retrieves or sets root['X'] = x.
    @func
    def set(db, x):
        conn = db.open();   defer(conn.close)
        root = conn.root()
        root['X'] = x
        transaction.commit()
    @func
    def get(db):
        zsync(db.storage)
        conn = db.open();   defer(conn.close)
        root = conn.root()
        return root['X']

    # stor1/stor2 should have the same data
    set(db1, 1)
    assert get(db2) == 1
    set(db1, 'abc')
    assert get(db2) == 'abc'


# ensure zstor_2zurl returns expected zurl.
def test_zstor_2zurl(tmpdir, neo_ssl_dict):
    # fs1 returns new FileStorage located in tmpdir.
    def fs1(name):
        return FileStorage("%s/%s" % (tmpdir, name))

    # zeo returns new ZEO client for specified storage name and server address.
    #
    # server_addr can be either:
    # - str (specifying address of UNIX socket), or
    # - (host, addr) pair - specifying TCP address.
    #
    # NOTE the client is returned without waiting until server is connected.
    def zeo(storage_name, server_addr):
        if testing.TestDB_ZEO('').z5:
            return ZEOStorage(server_addr, storage=storage_name, wait=False)

        # It's better to use a mock storage for zeo == 4, because
        # we would have to wait for a long time. See here the
        # respective part in ZEO4 source code:
        #
        #   https://github.com/zopefoundation/ZEO/blob/4/src/ZEO/ClientStorage.py#L423-L430
        #
        # ..compared to ZEO5 which omits the else clause:
        #
        #   https://github.com/zopefoundation/ZEO/blob/5.3.0/src/ZEO/ClientStorage.py#L279-L286
        zeo_storage = type(
            "ClientStorage",
            (object,),
            {
                "_addr": server_addr,
                "_storage": storage_name,
                "close": lambda self: None,
                "getName": lambda self: self._storage
            }
        )()
        type(zeo_storage).__module__ = "ZEO.ClientStorage"
        type(zeo_storage).__name__ = "ClientStorage"
        return zeo_storage

    # neo returns new NEO client for specified cluster name and master address.
    # NOTE, similarly to ZEO, the client is returned without waiting until server nodes are connected.
    def neo(cluster_name, master_addr, ssl=0):
        # TODO revert to import neo globally after lab.nexedi.com/nexedi/neoppod/-/merge_requests/24 is landed
        from neo.client.Storage import Storage as NEOStorage
        kwargs = dict(master_nodes=master_addr, name=cluster_name)
        if ssl:
            kwargs.update(neo_ssl_dict)
        return NEOStorage(**kwargs)

    # demo returns new DemoStorage with specified base and delta.
    def demo(base, delta):
        return DemoStorage(base=base, changes=delta)

    # assert_zurl_is_correct verifies that zstor_2zurl(zstor) returns zurl_ok.
    # zstor is closed after this test.
    @func
    def assert_zurl_is_correct(zstor, *zurl_ok):
        defer(zstor.close)
        assert zstor_2zurl(zstor) in zurl_ok

    # sslp is the ssl encryption uri part of an encrypted NEO node
    q = quote_plus
    sslp = "&".join(("%s=%s" % (q(k), q(v)) for k, v in sorted(neo_ssl_dict.items())))

    _ = assert_zurl_is_correct
    _(fs1("test.fs"),                         "file://%s/test.fs" % tmpdir)           # FileStorage
    _(zeo("1",     "/path/to/zeo.sock"),      "zeo:///path/to/zeo.sock")              # ZEO/unix
    _(zeo("test",  "/path/to/zeo.sock"),      "zeo:///path/to/zeo.sock?storage=test") #   + non-default storage name
    _(zeo("1",     ("127.0.0.1", 1234)),      "zeo://127.0.0.1:1234")                 # ZEO/ip4
    _(zeo("test",  ("127.0.0.1", 1234)),      "zeo://127.0.0.1:1234?storage=test")    #   + non-default storage name
    _(zeo("1",     ("::1",       1234)),      "zeo://[::1]:1234")                     # ZEO/ip6
    _(zeo("test",  ("::1",       1234)),      "zeo://[::1]:1234?storage=test")        #   + non-default storage name
    _(neo("test",  "127.0.0.1:1234"),         "neo://test@127.0.0.1:1234")            # NEO/ip4
    _(neo("test",  "127.0.0.1:1234", 1),      "neos://test@127.0.0.1:1234?%s" % sslp) #   + ssl
    _(neo("test",  "[::1]:1234"),             "neo://test@[::1]:1234")                # NEO/ip6
    _(neo("test",  "[::1]:1234", 1),          "neos://test@[::1]:1234?%s" % sslp)     #   + ssl
    _(neo("test",  "[::1]:1234\n[::2]:1234"),                                         #   + 2 master nodes
           # Master order is not specified, so we have 2 possible/acceptable zurl
           "neo://test@[::1]:1234,[::2]:1234", "neo://test@[::2]:1234,[::1]:1234")
    _(demo(zeo("base", ("1.2.3.4",  5)),                                              # DemoStorage
           fs1("delta.fs")),                  "demo:(zeo://1.2.3.4:5?storage=base)/(file://%s/delta.fs)" % tmpdir)

    # Test exceptions
    #   invalid storage
    with raises(ValueError, match="in-RAM storages are not supported"):
        zstor_2zurl(MappingStorage())

    #   invalid object
    with raises(NotImplementedError):
        zstor_2zurl("I am not a storage.")


@pytest.mark.parametrize(
    "zurl,zurl_norm_ok",
    [
        # FileStorage
        ("file://Data.fs", "file://Data.fs"),
        # ZEO
        ("zeo://localhost:9001", "zeo://localhost:9001"),
        # NEO
        ("neo://cluster@127.0.0.1:1234", "neo://cluster@127.0.0.1:1234"),
        #   > 1 master nodes \w different order
        ("neo://cluster@abc:1,def:2", "neo://cluster@abc:1,def:2"),
        ("neo://cluster@def:2,abc:1", "neo://cluster@abc:1,def:2"),
        #   Different SSL paths
        ("neos://cluster@xyz:1?ca=a&key=b&cert=c", "neos://cluster@xyz:1"),
        ("neos://cluster@xyz:1?ca=α&key=β&cert=γ", "neos://cluster@xyz:1"),
        #   neo:// with anything SSL-related in query -> neos://
        ("neo://cluster@xyz:1?cert=c", "neos://cluster@xyz:1"),
        #   any order of options should result in the same normalized URI
        ("neo://cluster@xyz:1?a=1&c=10&b=2", "neo://cluster@xyz:1?a=1&b=2&c=10"),
        ("neo://cluster@xyz:1?b=2&a=1&c=10", "neo://cluster@xyz:1?a=1&b=2&c=10"),
        #   client options
        ("neo://cluster@xyz:1?compress=1&read-only=true&logfile=abc.log&cache-size=1024", "neo://cluster@xyz:1"),
    ],
)
def test_zurl_normalize_main(zurl, zurl_norm_ok):
    assert zurl_normalize_main(zurl) == zurl_norm_ok
    # also verify that zurl_normalize_main is stable
    assert zurl_normalize_main(zurl_norm_ok) == zurl_norm_ok


# 'zurl_normalize_main' must explicitly raise an exception if an unsupported
# zodburi scheme is used.
def test_zurl_normalize_main_invalid_scheme():
    for uri in "https://test postgres://a:b@c:5432/d".split(" "):
        with pytest.raises(NotImplementedError):
            zurl_normalize_main(uri)


# neo_ssl_dict returns the path of precomputed static ssl certificate
# files.
@pytest.fixture
def neo_ssl_dict():
    ssl_files_base_path = "%s%s%s%s" % (
      os.path.dirname(__file__), os.sep, "testdata", os.sep
    )
    return {
        k: "%s%s" % (ssl_files_base_path, v)
        for k, v in dict(ca="ca.crt", key="node.key", cert="node.crt").items()
    }


# ---- tests for critical properties of ZODB ----

# verify race in between Connection.open and invalidations.
def test_zodb_zopenrace_basic():
    # exercises mostly logic inside ZODB around ZODB.Connection
    zopenrace.test(MappingStorage())
def test_zodb_zopenrace():
    # exercises ZODB.Connection + particular storage implementation
    zopenrace.main()

# verify race in between loading and invalidations.
def test_zodb_zloadrace():
    # skip testing with FileStorage - in ZODB/py opening simultaneous read-write
    # connections to the same file is not supported and will raise LockError.
    _ = testdb.getZODBStorage()
    _.close()
    if isinstance(_, FileStorage):
        pytest.skip("skipping on FileStorage")

    zloadrace.main()


# ---- misc ----

# zsync syncs ZODB storage.
# it is noop, if zstor does not support syncing (i.e. FileStorage has no .sync())
def zsync(zstor):
    # ZEOs default sync is effectless. We explicitly need to sync by
    # pinging to the server. For ZEO 5 it would actually be sufficient
    # to set init parameter 'server_sync' to 'True':
    #   https://github.com/zopefoundation/ZEO/blob/423cb8/src/ZEO/ClientStorage.py#L224-L246
    # But because our storage is already initiated this doesn't help.
    if isinstance(zstor, ZEOStorage):
        # ZEO >= 5 specifies ping
        #   https://github.com/zopefoundation/ZEO/blob/423cb8/src/ZEO/ClientStorage.py#L472-L478
        # ZEO < 5: we need to provide a ping method
        getattr(zstor, 'ping', lambda: zstor._server.lastTransaction())()
    sync = getattr(zstor, 'sync', None)
    if sync is not None:
        sync()