Commit 181b01cb authored by Jim Fulton's avatar Jim Fulton

Fixed a threading bug in the StorageServerDB implementation that cause

the fan-out to fail.
parent b053b2a8
...@@ -5,6 +5,12 @@ ...@@ -5,6 +5,12 @@
3.10.0a2 (2010-??-??) 3.10.0a2 (2010-??-??)
===================== =====================
Bugs Fixed
----------
- When using using a ClientStorage in a Storage server, there was a
threading bug that caused clients to get disconnected.
New Features New Features
------------ ------------
......
...@@ -786,13 +786,6 @@ class StorageServerDB: ...@@ -786,13 +786,6 @@ class StorageServerDB:
raise StorageServerError("Versions aren't supported.") raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id storage_id = self.storage_id
self.server.invalidate(None, storage_id, tid, oids) self.server.invalidate(None, storage_id, tid, oids)
for zeo_server in self.server.connections.get(storage_id, ())[:]:
try:
zeo_server.connection.poll()
except ZEO.zrpc.error.DisconnectedError:
pass
else:
break # We only need to pull one :)
def invalidateCache(self): def invalidateCache(self):
self.server._invalidateCache(self.storage_id) self.server._invalidateCache(self.storage_id)
......
...@@ -10,25 +10,29 @@ ZEO servers for us and another one that picks ports. ...@@ -10,25 +10,29 @@ ZEO servers for us and another one that picks ports.
We'll start the first server: We'll start the first server:
>>> (_, port0), adminaddr0 = start_server( >>> (_, port0), adminaddr0 = start_server(
... '<filestorage>\npath fs\n</filestorage>', keep=1) ... '<filestorage>\npath fs\nblob-dir blobs\n</filestorage>', keep=1)
Then we'll start 2 others that use this one: Then we'll start 2 others that use this one:
>>> addr1, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0) >>> addr1, _ = start_server(
>>> addr2, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0) ... '<zeoclient>\nserver %s\nblob-dir b1\n</zeoclient>' % port0)
>>> addr2, _ = start_server(
... '<zeoclient>\nserver %s\nblob-dir b2\n</zeoclient>' % port0)
Now, let's create some client storages that connect to these: Now, let's create some client storages that connect to these:
>>> import ZEO, transaction >>> import os, ZEO, ZODB.blob, ZODB.POSException, transaction
>>> db1 = ZEO.DB(addr1) >>> db0 = ZEO.DB(port0, blob_dir='cb0')
>>> db1 = ZEO.DB(addr1, blob_dir='cb1')
>>> tm1 = transaction.TransactionManager() >>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1) >>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root() >>> r1 = c1.root()
>>> r1 >>> r1
{} {}
>>> db2 = ZEO.DB(addr2) >>> db2 = ZEO.DB(addr2, blob_dir='cb2')
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2) >>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root() >>> r2 = c2.root()
...@@ -43,7 +47,12 @@ If we update c1, we'll eventually see the change in c2: ...@@ -43,7 +47,12 @@ If we update c1, we'll eventually see the change in c2:
>>> r1[1].v = 1000 >>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping() >>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000 >>> r1[2].v = -1000
>>> r1[3] = ZODB.blob.Blob('x'*4111222)
>>> for i in range(1000, 2000):
... r1[i] = persistent.mapping.PersistentMapping()
... r1[i].v = 0
>>> tm1.commit() >>> tm1.commit()
>>> blob_id = r1[3]._p_oid, r1[1]._p_serial
>>> import time >>> import time
>>> for i in range(100): >>> for i in range(100):
...@@ -51,6 +60,8 @@ If we update c1, we'll eventually see the change in c2: ...@@ -51,6 +60,8 @@ If we update c1, we'll eventually see the change in c2:
... if 1 in r2: ... if 1 in r2:
... break ... break
... time.sleep(0.01) ... time.sleep(0.01)
>>> tm2.abort()
>>> r2[1].v >>> r2[1].v
1000 1000
...@@ -61,25 +72,52 @@ If we update c1, we'll eventually see the change in c2: ...@@ -61,25 +72,52 @@ If we update c1, we'll eventually see the change in c2:
Now, let's see if we can break it. :) Now, let's see if we can break it. :)
>>> def f(): >>> def f():
... c = db1.open(transaction.TransactionManager())
... r = c.root()
... i = 0
... while i < 100:
... r[1].v -= 1
... r[2].v += 1
... try:
... c.transaction_manager.commit()
... i += 1
... except ZODB.POSException.ConflictError:
... c.transaction_manager.abort()
... c.close()
>>> def g():
... c = db0.open(transaction.TransactionManager())
... r = c.root()
... for i in range(100): ... for i in range(100):
... r1[1].v -= 1 ... for j in range(1000, 2000):
... r1[2].v += 1 ... r[j].v += 1
... tm1.commit() ... c.transaction_manager.commit()
... time.sleep(0.01) ... c.close()
>>> import threading >>> import threading
>>> thread = threading.Thread(target=f) >>> threadf = threading.Thread(target=f)
>>> thread.start() >>> threadg = threading.Thread(target=f)
>>> threadf.start()
>>> for i in range(1000): >>> threadg.start()
>>> s2 = db2.storage
>>> start_time = time.time()
>>> while time.time() - start_time < 999:
... t = tm2.begin() ... t = tm2.begin()
... if r2[1].v + r2[2].v: ... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2] ... print 'oops', r2[1], r2[2]
... if r1[1].v == 900: ... if r2[1].v == 900:
... break # we caught up ... break # we caught up
... time.sleep(0.01) ... path = s2.fshelper.getBlobFilename(*blob_id)
... if os.path.exists(path):
... ZODB.blob.remove_committed(path)
... s2._server.sendBlob(*blob_id)
... else: print 'Dang'
>>> thread.join() >>> threadf.join()
>>> threadg.join()
If we shutdown and restart the source server, the variables will be If we shutdown and restart the source server, the variables will be
invalidated: invalidated:
...@@ -109,5 +147,6 @@ invalidated: ...@@ -109,5 +147,6 @@ invalidated:
Cleanup: Cleanup:
>>> db0.close()
>>> db1.close() >>> db1.close()
>>> db2.close() >>> db2.close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment