Commit dabcb601 authored by Jim Fulton's avatar Jim Fulton

client side and server config and tweaks

parent accd8c02
......@@ -34,6 +34,7 @@ import BTrees.OOBTree
import zc.lockfile
import ZODB
import ZODB.BaseStorage
import ZODB.ConflictResolution
import ZODB.interfaces
import zope.interface
import six
......@@ -75,7 +76,7 @@ def get_timestamp(prev_ts=None):
MB = 1024**2
@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object):
class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
"""A storage class that is a network client to a remote storage.
This is a faithful implementation of the Storage API.
......@@ -331,6 +332,7 @@ class ClientStorage(object):
The storage isn't really ready to use until after this call.
"""
super(ClientStorage, self).registerDB(db)
self._db = db
def is_connected(self, test=False):
......@@ -722,8 +724,27 @@ class ClientStorage(object):
"""
tbuf = self._check_trans(txn, 'tpc_vote')
try:
for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)
conflicts = True
vote_attempts = 0
while conflicts and vote_attempts < 9: # 9? Mainly avoid inf. loop
conflicts = False
for oid in self._call('vote', id(txn)) or ():
if isinstance(oid, dict):
# Conflict, let's try to resolve it
conflicts = True
conflict = oid
oid = conflict['oid']
committed, read = conflict['serials']
data = self.tryToResolveConflict(
oid, committed, read, conflict['data'])
self._async('storea', oid, committed, data, id(txn))
tbuf.resolve(oid, data)
else:
tbuf.serial(oid, ResolvedSerial)
vote_attempts += 1
except POSException.ConflictError as err:
oid = getattr(err, 'oid', None)
if oid is not None:
......@@ -745,8 +766,8 @@ class ClientStorage(object):
if tbuf.exception:
raise tbuf.exception
if tbuf.resolved:
return list(tbuf.resolved)
if tbuf.server_resolved or tbuf.client_resolved:
return list(tbuf.server_resolved) + list(tbuf.client_resolved)
else:
return None
......
......@@ -89,8 +89,7 @@ class ZEOStorage:
def __init__(self, server, read_only=0):
self.server = server
self.client_side_conflict_resolution = (
server.client_side_conflict_resolution)
self.client_conflict_resolution = server.client_conflict_resolution
# timeout and stats will be initialized in register()
self.read_only = read_only
# The authentication protocol may define extra methods.
......@@ -442,7 +441,7 @@ class ZEOStorage:
try:
serials = self.storage.tpc_vote(self.transaction)
except ConflictError as err:
if (self.client_side_conflict_resolution and
if (self.client_conflict_resolution and
err.oid and err.serials and err.data
):
self.conflicts[err.oid] = dict(
......@@ -586,7 +585,7 @@ class ZEOStorage:
self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)
except ConflictError as err:
if self.client_side_conflict_resolution and err.serials:
if self.client_conflict_resolution and err.serials:
self.conflicts[oid] = dict(
oid=oid, serials=err.serials, data=data)
else:
......@@ -594,7 +593,6 @@ class ZEOStorage:
else:
if oid in self.conflicts:
del self.conflicts[oid]
self.serials.append(oid)
if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)
......@@ -714,7 +712,7 @@ class StorageServer:
invalidation_age=None,
transaction_timeout=None,
ssl=None,
client_side_conflict_resolution=False,
client_conflict_resolution=False,
):
"""StorageServer constructor.
......@@ -785,13 +783,13 @@ class StorageServer:
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
if client_side_conflict_resolution:
if client_conflict_resolution:
# XXX this may go away later, when storages grow
# configuration for this.
storage.tryToResolveConflict = never_resolve_conflict
self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.client_side_conflict_resolution = client_side_conflict_resolution
self.client_conflict_resolution = client_conflict_resolution
if addr is not None:
self.acceptor = Acceptor(self, addr, ssl)
......
......@@ -46,7 +46,8 @@ class TransactionBuffer:
# stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1
self.resolved = set() # {oid}
self.server_resolved = set() # {oid}
self.client_resolved = {} # {oid -> buffer_record_number}
self.exception = None
def close(self):
......@@ -59,11 +60,17 @@ class TransactionBuffer:
# Estimate per-record cache size
self.size = self.size + (data and len(data) or 0) + 31
def resolve(self, oid, data):
"""Record client-resolved data
"""
self.store(oid, data)
self.client_resolved[oid] = self.count - 1
def serial(self, oid, serial):
if isinstance(serial, Exception):
self.exception = serial # This transaction will never be committed
elif serial == ResolvedSerial:
self.resolved.add(oid)
self.server_resolved.add(oid)
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
......@@ -71,7 +78,8 @@ class TransactionBuffer:
def __iter__(self):
self.file.seek(0)
unpickler = Unpickler(self.file)
resolved = self.resolved
server_resolved = self.server_resolved
client_resolved = self.client_resolved
# Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated
......@@ -81,10 +89,11 @@ class TransactionBuffer:
seen = set()
for i in range(self.count):
oid, data = unpickler.load()
seen.add(oid)
yield oid, data, oid in resolved
if client_resolved.get(oid, i) == i:
seen.add(oid)
yield oid, data, oid in server_resolved
# We may have leftover oids because undo
for oid in resolved:
for oid in server_resolved:
if oid not in seen:
yield oid, None, True
......@@ -98,6 +98,9 @@ class ZEOOptionsMixin:
self.add("address", "zeo.address.address",
required="no server address specified; use -a or -C")
self.add("read_only", "zeo.read_only", default=0)
self.add("client_conflict_resolution",
"zeo.client_conflict_resolution",
default=0)
self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
default=100)
self.add("invalidation_age", "zeo.invalidation_age")
......@@ -339,6 +342,7 @@ def create_server(storages, options):
options.address,
storages,
read_only = options.read_only,
client_conflict_resolution=options.client_conflict_resolution,
invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout,
......
......@@ -107,6 +107,14 @@
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
<key name="client-conflict-resolution" datatype="boolean"
required="no" default="false">
<description>
Flag indicating whether the server should return conflict
errors to the client, for resolution there.
</description>
</key>
</sectiontype>
</component>
......@@ -52,7 +52,7 @@ class ZEOConfig:
for name in (
'invalidation_queue_size', 'invalidation_age',
'transaction_timeout', 'pid_filename',
'ssl_certificate', 'ssl_key',
'ssl_certificate', 'ssl_key', 'client_conflict_resolution',
):
v = getattr(self, name, None)
if v:
......
......@@ -7,6 +7,8 @@ from ZODB.DemoStorage import DemoStorage
from ZODB.utils import p64, z64, maxtid
from ZODB.broken import find_global
import ZEO
from .utils import StorageServer
class Var(object):
......@@ -61,7 +63,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
# resolve conflicts:
server = StorageServer(
self, DemoStorage(), client_side_conflict_resolution=True)
self, DemoStorage(), client_conflict_resolution=True)
zs = server.zs
# 2 non-conflicting transactions:
......@@ -97,7 +99,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
# tid2 as the starting tid:
ob.change(1)
zs.storea(ob._p_oid, tid2, writer.serialize(ob), 3)
self.assertEqual(zs.vote(3), [ob._p_oid])
self.assertEqual(zs.vote(3), [])
tid3 = server.unpack_result(zs.tpc_finish(3))
server.assert_calls(self, ('info', {'size': Var(), 'length': 1}))
......@@ -106,5 +108,43 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
self.assertEqual(reader.getClassName(p), 'BTrees.Length.Length')
self.assertEqual(reader.getState(p), 3)
def test_client_side(self):
# First, traditional:
addr, stop = ZEO.server('data.fs')
db = ZEO.DB(addr)
with db.transaction() as conn:
conn.root.l = Length(0)
conn2 = db.open()
conn2.root.l.change(1)
with db.transaction() as conn:
conn.root.l.change(1)
conn2.transaction_manager.commit()
self.assertEqual(conn2.root.l.value, 2)
db.close(); stop()
# Now, do conflict resolution on the client.
addr2, stop = ZEO.server(
storage_conf='<mappingstorage>\n</mappingstorage>\n',
zeo_conf=dict(client_conflict_resolution=True),
)
db = ZEO.DB(addr2)
with db.transaction() as conn:
conn.root.l = Length(0)
conn2 = db.open()
conn2.root.l.change(1)
with db.transaction() as conn:
conn.root.l.change(1)
self.assertEqual(conn2.root.l.value, 1)
conn2.transaction_manager.commit()
self.assertEqual(conn2.root.l.value, 2)
db.close(); stop()
def test_suite():
return unittest.makeSuite(ClientSideConflictResolutionTests)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment