Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Carlos Ramos Carreño
neoppod
Commits
87eca1e0
Commit
87eca1e0
authored
Mar 09, 2019
by
Julien Muchembled
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
client: replace global load lock by a per-oid one
parent
c7cdcf87
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
100 additions
and
75 deletions
+100
-75
neo/client/app.py
neo/client/app.py
+37
-35
neo/client/handlers/master.py
neo/client/handlers/master.py
+12
-19
neo/debug.py
neo/debug.py
+1
-4
neo/tests/threaded/__init__.py
neo/tests/threaded/__init__.py
+8
-6
neo/tests/threaded/test.py
neo/tests/threaded/test.py
+42
-11
No files found.
neo/client/app.py
View file @
87eca1e0
...
...
@@ -17,7 +17,7 @@
import
heapq
import
random
import
time
from
collections
import
defaultdict
try
:
from
ZODB._compat
import
dumps
,
loads
,
_protocol
except
ImportError
:
...
...
@@ -79,7 +79,7 @@ class Application(ThreadedApplication):
# no self-assigned UUID, primary master will supply us one
self
.
_cache
=
ClientCache
()
if
cache_size
is
None
else
\
ClientCache
(
max_size
=
cache_size
)
self
.
_loading
_oid
=
None
self
.
_loading
=
defaultdict
(
lambda
:
(
Lock
(),
[]))
self
.
new_oid_list
=
()
self
.
last_oid
=
'
\
0
'
*
8
self
.
storage_event_handler
=
storage
.
StorageEventHandler
(
self
)
...
...
@@ -90,19 +90,13 @@ class Application(ThreadedApplication):
self
.
notifications_handler
=
master
.
PrimaryNotificationsHandler
(
self
)
self
.
_txn_container
=
TransactionContainer
()
# Lock definition :
# _load_lock is used to make loading and storing atomic
lock
=
Lock
()
self
.
_load_lock_acquire
=
lock
.
acquire
self
.
_load_lock_release
=
lock
.
release
# _oid_lock is used in order to not call multiple oid
# generation at the same time
lock
=
Lock
()
self
.
_oid_lock_acquire
=
lock
.
acquire
self
.
_oid_lock_release
=
lock
.
release
lock
=
Lock
()
# _cache_lock is used for the client cache
self
.
_cache_lock_acquire
=
lock
.
acquire
self
.
_cache_lock_release
=
lock
.
release
self
.
_cache_lock
=
Lock
()
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts
self
.
_connecting_to_master_node
=
Lock
()
...
...
@@ -398,21 +392,28 @@ class Application(ThreadedApplication):
"""
# TODO:
# - rename parameters (here? and in handlers & packet definitions)
acquire
=
self
.
_cache_lock_acquire
release
=
self
.
_cache_lock_release
# XXX: Consider using a more fine-grained lock.
self
.
_load_lock_acquire
()
try
:
acquire
()
acquired
=
False
lock
=
self
.
_cache_lock
try
:
while
1
:
with
lock
:
result
=
self
.
_loadFromCache
(
oid
,
tid
,
before_tid
)
if
result
:
return
result
self
.
_loading_oid
=
oid
self
.
_loading_invalidated
=
[]
finally
:
release
()
load_lock
=
self
.
_loading
[
oid
][
0
]
acquired
=
load_lock
.
acquire
(
0
)
# Several concurrent cache misses for the same oid are probably
# for the same tid so we use a per-oid lock to avoid asking the
# same data to the storage node.
if
acquired
:
# The first thread does load from storage,
# and fills cache with the response.
break
# The other threads wait for the first one to complete and
# loop, possibly resulting in a new cache miss if a different
# tid is actually wanted or if the data was too big.
with
load_lock
:
pass
# While the cache lock is released, an arbitrary number of
# invalidations may be processed, for this oid or not. And at this
# precise moment, if both tid and before_tid are None (which is
...
...
@@ -428,20 +429,24 @@ class Application(ThreadedApplication):
# we got from master.
before_tid
=
p64
(
u64
(
self
.
last_tid
)
+
1
)
data
,
tid
,
next_tid
,
_
=
self
.
_loadFromStorage
(
oid
,
tid
,
before_tid
)
acquire
()
try
:
if
self
.
_loading_oid
:
with
lock
:
loading
=
self
.
_loading
.
pop
(
oid
,
None
)
if
loading
:
assert
loading
[
0
]
is
load_lock
if
not
next_tid
:
for
t
in
self
.
_loading_invalidated
:
for
t
in
loading
[
1
]
:
if
tid
<
t
:
next_tid
=
t
break
self
.
_cache
.
store
(
oid
,
data
,
tid
,
next_tid
)
# Else, we just reconnected to the master.
finally
:
release
()
finally
:
self
.
_load_lock_release
()
load_lock
.
release
()
except
:
if
acquired
:
with
lock
:
self
.
_loading
.
pop
(
oid
,
None
)
load_lock
.
release
()
raise
return
data
,
tid
,
next_tid
def
_loadFromStorage
(
self
,
oid
,
at_tid
,
before_tid
):
...
...
@@ -986,11 +991,8 @@ class Application(ThreadedApplication):
# It should not be otherwise required (clients should be free to load
# old data as long as it is available in cache, event if it was pruned
# by a pack), so don't bother invalidating on other clients.
self
.
_cache_lock_acquire
()
try
:
with
self
.
_cache_lock
:
self
.
_cache
.
clear
()
finally
:
self
.
_cache_lock_release
()
def
getLastTID
(
self
,
oid
):
return
self
.
load
(
oid
)[
1
]
...
...
neo/client/handlers/master.py
View file @
87eca1e0
...
...
@@ -52,8 +52,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
# Either we're connecting or we already know the last tid
# via invalidations.
assert
app
.
master_conn
is
None
,
app
.
master_conn
app
.
_cache_lock_acquire
()
try
:
with
app
.
_cache_lock
:
if
app_last_tid
<
ltid
:
app
.
_cache
.
clear_current
()
# In the past, we tried not to invalidate the
...
...
@@ -67,9 +66,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
app
.
_cache
.
clear
()
# Make sure a parallel load won't refill the cache
# with garbage.
app
.
_loading_oid
=
app
.
_loading_invalidated
=
None
finally
:
app
.
_cache_lock_release
()
app
.
_loading
.
clear
()
db
=
app
.
getDB
()
db
is
None
or
db
.
invalidateCache
()
app
.
last_tid
=
ltid
...
...
@@ -80,22 +77,20 @@ class PrimaryNotificationsHandler(MTEventHandler):
app
.
last_tid
=
tid
# Update cache
cache
=
app
.
_cache
app
.
_cache_lock_acquire
()
try
:
with
app
.
_cache_lock
:
invalidate
=
app
.
_cache
.
invalidate
loading
=
app
.
_loading_oid
loading
_get
=
app
.
_loading
.
get
for
oid
,
data
in
cache_dict
.
iteritems
():
# Update ex-latest value in cache
invalidate
(
oid
,
tid
)
if
oid
==
loading
:
app
.
_loading_invalidated
.
append
(
tid
)
loading
=
loading_get
(
oid
)
if
loading
:
loading
[
1
].
append
(
tid
)
if
data
is
not
None
:
# Store in cache with no next_tid
cache
.
store
(
oid
,
data
,
tid
,
None
)
if
callback
is
not
None
:
callback
(
tid
)
finally
:
app
.
_cache_lock_release
()
def
connectionClosed
(
self
,
conn
):
app
=
self
.
app
...
...
@@ -124,19 +119,17 @@ class PrimaryNotificationsHandler(MTEventHandler):
if
app
.
ignore_invalidations
:
return
app
.
last_tid
=
tid
app
.
_cache_lock_acquire
()
try
:
with
app
.
_cache_lock
:
invalidate
=
app
.
_cache
.
invalidate
loading
=
app
.
_loading_oid
loading
_get
=
app
.
_loading
.
get
for
oid
in
oid_list
:
invalidate
(
oid
,
tid
)
if
oid
==
loading
:
app
.
_loading_invalidated
.
append
(
tid
)
loading
=
loading_get
(
oid
)
if
loading
:
loading
[
1
].
append
(
tid
)
db
=
app
.
getDB
()
if
db
is
not
None
:
db
.
invalidate
(
tid
,
oid_list
)
finally
:
app
.
_cache_lock_release
()
def
notifyPartitionChanges
(
self
,
conn
,
ptid
,
cell_list
):
if
self
.
app
.
pt
.
filled
():
...
...
neo/debug.py
View file @
87eca1e0
...
...
@@ -197,8 +197,7 @@ elif IF == 'trace-cache':
@
defer
def
profile
(
app
):
app
.
_cache_lock_acquire
()
try
:
with
app
.
_cache_lock
:
cache
=
app
.
_cache
if
type
(
cache
)
is
ClientCache
:
app
.
_cache
=
CacheTracer
(
cache
,
'%s-%s.neo-cache-trace'
%
...
...
@@ -206,5 +205,3 @@ elif IF == 'trace-cache':
app
.
_cache
.
clear
()
else
:
app
.
_cache
=
cache
.
close
()
finally
:
app
.
_cache_lock_release
()
neo/tests/threaded/__init__.py
View file @
87eca1e0
...
...
@@ -1072,8 +1072,7 @@ class NEOThreadedTest(NeoTestBase):
def
run
(
self
):
try
:
apply
(
*
self
.
__target
)
self
.
__exc_info
=
None
self
.
__result
=
apply
(
*
self
.
__target
)
except
:
self
.
__exc_info
=
sys
.
exc_info
()
if
self
.
__exc_info
[
0
]
is
NEOThreadedTest
.
failureException
:
...
...
@@ -1081,7 +1080,10 @@ class NEOThreadedTest(NeoTestBase):
def
join
(
self
,
timeout
=
None
):
threading
.
Thread
.
join
(
self
,
timeout
)
if
not
self
.
is_alive
()
and
self
.
__exc_info
:
if
not
self
.
is_alive
():
try
:
return
self
.
__result
except
AttributeError
:
etype
,
value
,
tb
=
self
.
__exc_info
del
self
.
__exc_info
raise
etype
,
value
,
tb
...
...
neo/tests/threaded/test.py
View file @
87eca1e0
...
...
@@ -1009,7 +1009,7 @@ class Test(NEOThreadedTest):
x
=
r
[
''
]
=
PCounter
()
t
.
commit
()
tid1
=
x
.
_p_serial
nonlocal_
=
[
0
,
1
]
nonlocal_
=
[
0
,
0
,
0
]
l1
=
threading
.
Lock
();
l1
.
acquire
()
l2
=
threading
.
Lock
();
l2
.
acquire
()
def
invalidateObjects
(
orig
,
*
args
):
...
...
@@ -1019,12 +1019,26 @@ class Test(NEOThreadedTest):
nonlocal_
[
0
]
+=
1
if
nonlocal_
[
0
]
==
2
:
l2
.
release
()
def
_cache_lock_release
(
orig
):
orig
()
if
nonlocal_
[
1
]:
nonlocal_
[
1
]
=
0
l1
.
release
()
class
CacheLock
(
object
):
def
__init__
(
self
,
client
):
self
.
_lock
=
client
.
_cache_lock
def
__enter__
(
self
):
self
.
_lock
.
acquire
()
def
__exit__
(
self
,
t
,
v
,
tb
):
count
=
nonlocal_
[
1
]
nonlocal_
[
1
]
=
count
+
1
self
.
_lock
.
release
()
if
count
==
0
:
load_same
.
start
()
l2
.
acquire
()
elif
count
==
1
:
load_other
.
start
()
def
_loadFromStorage
(
orig
,
*
args
):
count
=
nonlocal_
[
2
]
nonlocal_
[
2
]
=
count
+
1
if
not
count
:
l1
.
release
()
return
orig
(
*
args
)
with
cluster
.
newClient
()
as
client
,
\
Patch
(
client
.
notifications_handler
,
invalidateObjects
=
invalidateObjects
):
...
...
@@ -1043,17 +1057,34 @@ class Test(NEOThreadedTest):
r
.
_p_changed
=
1
t
.
commit
()
self
.
assertEqual
(
tid1
,
client
.
last_tid
)
with
Patch
(
client
,
_cache_lock_release
=
_cache_lock_release
):
load_same
=
self
.
newPausedThread
(
client
.
load
,
x
.
_p_oid
)
load_other
=
self
.
newPausedThread
(
client
.
load
,
r
.
_p_oid
)
with
Patch
(
client
,
_cache_lock
=
CacheLock
(
client
)),
\
Patch
(
client
,
_loadFromStorage
=
_loadFromStorage
):
# 1. Just after having found nothing in cache, the worker
# thread asks the poll thread to get notified about
# invalidations for the loading oid.
# <context switch>
# <context switch>
(l1)
# 2. Both invalidations are processed. -> last_tid=tid3
# <context switch>
# <context switch>
(l2)
# 3. The worker thread loads before tid3+1.
# The poll thread notified [tid2], which must be ignored.
self
.
assertEqual
((
tid2
,
None
),
client
.
load
(
x
.
_p_oid
)[
1
:])
self
.
assertEqual
(
nonlocal_
,
[
2
,
0
])
# In parallel, 2 other loads are done (both cache misses):
# - one for the same oid, which waits for first load to
# complete and in particular fill cache, in order to
# avoid asking the same data to the storage node
# - another for a different oid, which doesn't wait, as shown
# by the fact that it returns an old record (i.e. before any
# invalidation packet is processed)
loaded
=
client
.
load
(
x
.
_p_oid
)
self
.
assertEqual
((
tid2
,
None
),
loaded
[
1
:])
self
.
assertEqual
(
loaded
,
load_same
.
join
())
self
.
assertEqual
((
tid1
,
r
.
_p_serial
),
load_other
.
join
()[
1
:])
# To summary:
# - 3 concurrent loads starting with cache misses
# - 2 loads from storage
# - 1 load ending with a cache hit
self
.
assertEqual
(
nonlocal_
,
[
2
,
8
,
2
])
@
with_cluster
(
storage_count
=
2
,
partitions
=
2
)
def
testReadVerifyingStorage
(
self
,
cluster
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment