Commit 3da2197f authored by Xavier Thompson's avatar Xavier Thompson

Change cypclass 'Object Invocation Lock' implementation

parent d50089e2
......@@ -676,7 +676,7 @@ def inject_cypclass_refcount_macros():
def inject_cypclass_lock_macros():
blocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_void_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1)
for macro in ("Cy_RLOCK", "Cy_WLOCK", "Cy_UNLOCK"):
for macro in ("Cy_RLOCK", "Cy_WLOCK", "Cy_UNWLOCK", "Cy_UNRLOCK"):
builtin_scope.declare_builtin_cfunction(macro, blocking_macro_type, macro)
nonblocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_int_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1)
for macro in ("Cy_TRYRLOCK", "Cy_TRYWLOCK"):
......
......@@ -465,11 +465,6 @@ class CypclassLockTransform(Visitor.EnvTransform):
self.transform.rlocked[entry] += 1
elif state == 'wlocked':
self.transform.wlocked[entry] += 1
elif state == 'unlocked':
if self.rlocked > 0:
self.transform.rlocked[entry] -= 1
elif self.wlocked > 0:
self.transform.wlocked[entry] -= 1
def __exit__(self, *args):
entry = self.entry
......
......@@ -13959,7 +13959,10 @@ class CoerceToLockedTempNode(CoerceToTempNode):
code.putln("Cy_WLOCK(%s);" % self.result())
def generate_disposal_code(self, code):
code.putln("Cy_UNLOCK(%s);" % self.result())
if self.rlock_only:
code.putln("Cy_UNRLOCK(%s);" % self.result())
else:
code.putln("Cy_UNWLOCK(%s);" % self.result())
super(CoerceToLockedTempNode, self).generate_disposal_code(code)
......
......@@ -923,7 +923,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_deferred_definitions(self, env, code, definition):
"""
Generate all cypclass method definitions, deferred till now
Generate all cypclass method definitions, deferred till now.
"""
for entry, scope in env.iter_cypclass_entries_and_scopes():
......@@ -945,7 +945,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_attrs_destructor_definition(self, entry, code):
"""
Generate destructor definition for the given cypclass entry
Generate destructor definition for the given cypclass entry.
"""
scope = entry.type.scope
......@@ -963,7 +963,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_activate_function(self, entry, code):
"""
Generate activate function for activable cypclass entries
Generate activate function for activable cypclass entries.
"""
active_self_entry = entry.type.scope.lookup_here("<active_self>")
......@@ -1013,7 +1013,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_activated_class(self, entry, code):
"""
Generate activated class
Generate activated cypclass.
"""
from . import Builtin
......@@ -1082,7 +1082,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("if (this->%s != NULL) {" % queue_attr_cname)
code.putln("Cy_WLOCK(%s);" % queue_attr_cname)
code.putln("this->%s->push(message);" % queue_attr_cname)
code.putln("Cy_UNLOCK(%s);" % queue_attr_cname)
code.putln("Cy_UNWLOCK(%s);" % queue_attr_cname)
code.putln("} else {")
code.putln("/* We should definitely shout here */")
code.putln('fprintf(stderr, "Acthon error: No queue to push to for %s remote call !\\n");' % reified_function_entry.name)
......@@ -1095,7 +1095,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_reifying_entries(self, entry, code):
"""
Generate code to reify the cypclass entry ? -> TODO what does this do exactly ?
Generate code to reify the cypclass entries.
"""
target_object_type = entry.type
......@@ -1214,7 +1214,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("if (this->%s != NULL) {" % sync_attr_cname)
code.putln("if (!Cy_TRYRLOCK(this->%s)) {" % sync_attr_cname)
code.putln("%s = this->%s->isActivable();" % (sync_result, sync_attr_cname))
code.putln("Cy_UNLOCK(this->%s);" % sync_attr_cname)
code.putln("Cy_UNRLOCK(this->%s);" % sync_attr_cname)
code.putln("}")
code.putln("if (%s == 0) return 0;" % sync_result)
code.putln("}")
......@@ -1294,19 +1294,22 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
num_unlock = 0
# Target object first, then arguments
code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s);" % target_object_cname)
unlock_op = "Cy_UNRLOCK" if reified_function_entry.type.is_const_method else "Cy_UNWLOCK"
code.putln("%s(this->%s);" % (unlock_op, target_object_cname))
num_unlock += 1
for i, narg in enumerate(func_type.args[:narg_count]):
if narg.type.is_cyp_class:
code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s);" % narg.cname)
unlock_op = "Cy_UNRLOCK" if narg.type.is_const else "Cy_UNWLOCK"
code.putln("%s(this->%s);" % (unlock_op, narg.cname))
num_unlock += 1
if opt_arg_count and num_optional_if:
code.putln("if (this->%s != NULL) {" % opt_arg_name)
for opt_idx, optarg in enumerate(func_type.args[narg_count:]):
if optarg.type.is_cyp_class:
code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s->%s);" % (opt_arg_name, func_type.opt_arg_cname(optarg.name)))
unlock_op = "Cy_UNRLOCK" if optarg.type.is_const else "Cy_UNWLOCK"
code.putln("%s(this->%s->%s);" % (unlock_op, opt_arg_name, func_type.opt_arg_cname(optarg.name)))
num_unlock += 1
# Note: we do not respect the semantic order of end-blocks here for simplification purpose.
# This one is for the "not NULL opt arg" check
......@@ -1327,8 +1330,10 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
", ".join("this->%s" % arg_cname for arg_cname in reified_call_args_list)
)
)
code.putln("Cy_UNLOCK(this->%s);" % target_object_cname)
put_cypclass_op_on_narg_optarg(lambda _: "Cy_UNLOCK", reified_function_entry.type, Naming.optional_args_cname, code)
unlock_op = "Cy_UNRLOCK" if reified_function_entry.type.is_const_method else "Cy_UNWLOCK"
code.putln("%s(this->%s);" % (unlock_op, target_object_cname))
arg_unlocker = lambda arg: "Cy_UNRLOCK" if arg.type.is_const else "Cy_UNWLOCK"
put_cypclass_op_on_narg_optarg(arg_unlocker, reified_function_entry.type, Naming.optional_args_cname, code)
code.putln("/* Push result in the result object */")
if does_return:
code.putln("Cy_WLOCK(this->%s);" % result_attr_cname)
......@@ -1336,7 +1341,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("this->%s->pushIntResult(result);" % result_attr_cname)
else:
code.putln("this->%s->pushVoidStarResult((void*)result);" % result_attr_cname)
code.putln("Cy_UNLOCK(this->%s);" % result_attr_cname)
code.putln("Cy_UNWLOCK(this->%s);" % result_attr_cname)
code.putln("return 1;")
code.putln("}")
......@@ -1351,7 +1356,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_wrapper_definition(self, type, wrapper_entry, constructor_entry, new_entry, alloc_entry, code):
"""
Generate cypclass constructor wrapper ? -> TODO what does this do exactly ?
Generate the cypclass constructor wrapper.
"""
if type.templates:
......
......@@ -8511,7 +8511,7 @@ class EnsureGILNode(GILExitNode):
code.put_ensure_gil(declare_gilstate=False)
class LockCypclassNode(StatNode):
# 'with rlocked / wlocked [cypclass object]' or 'with unlocked [cypclass object]' statement
# 'with rlocked / wlocked [cypclass object]
#
# state string 'rlocked' or 'wlocked' or 'unlocked'
# obj ExprNode the (un)locked object
......@@ -8539,13 +8539,10 @@ class LockCypclassNode(StatNode):
self.body.generate_execution_code(code)
# We must unlock if we held a lock previously, and relock if we unlocked.
if self.state != "unlocked":
code.putln("Cy_UNLOCK(%s);" % self.obj.result())
elif self.was_wlocked:
code.putln("Cy_WLOCK(%s);" % self.obj.result())
elif self.was_rlocked:
code.putln("Cy_RLOCK(%s);" % self.obj.result())
if self.state == "rlocked":
code.putln("Cy_UNRLOCK(%s);" % self.obj.result())
elif self.state == "wlocked":
code.putln("Cy_UNWLOCK(%s);" % self.obj.result())
def cython_view_utility_code():
......
......@@ -14,8 +14,14 @@
#ifdef __cplusplus
#if __cplusplus >= 201103L
#include <atomic>
#include <cstdint>
using namespace std;
#define CyObject_ATOMIC_REFCOUNT_TYPE atomic_int
#define CyObject_NO_OWNER -1
#define CyObject_MANY_OWNERS -2
#define CyObject_MAX_READERS (1 << 30)
#define CyObject_LOCK_ERROR_OTHER_WRITER (1 << 0)
#define CyObject_LOCK_ERROR_OTHER_READER (1 << 1)
#include <pthread.h>
......@@ -28,32 +34,26 @@
#include <type_traits>
struct ThreadStorage {
pid_t thread_id;
unsigned int read_count;
unsigned int write_count;
};
class RecursiveUpgradeableRWLock {
pthread_rwlock_t rw_lock;
pthread_mutex_t upgrade_lock;
// Notes: This could be a rw_lock
pthread_mutex_t thread_count_lock;
std::vector<ThreadStorage> thread_count;
protected:
ThreadStorage& get_or_init_thread_count(pid_t thread_id);
pthread_mutex_t guard;
pthread_cond_t wait_readers_depart;
pthread_cond_t wait_writer_depart;
atomic<pid_t> owner_id;
atomic_int32_t readers_nb;
uint32_t write_count;
public:
RecursiveUpgradeableRWLock()
{
pthread_rwlock_init(&this->rw_lock, NULL);
pthread_mutex_init(&this->upgrade_lock, NULL);
pthread_mutex_init(&this->thread_count_lock, NULL);
// Reserve space for up to 8 threads
this->thread_count.reserve(8);
RecursiveUpgradeableRWLock() {
pthread_mutex_init(&this->guard, NULL);
pthread_cond_init(&this->wait_readers_depart, NULL);
pthread_cond_init(&this->wait_writer_depart, NULL);
this->owner_id = CyObject_NO_OWNER;
this->readers_nb = 0;
this->write_count = 0;
}
void wlock();
void rlock();
void unlock();
void unwlock();
void unrlock();
int tryrlock();
int trywlock();
};
......@@ -65,7 +65,6 @@
class CyObject : public CyPyObject {
private:
CyObject_ATOMIC_REFCOUNT_TYPE nogil_ob_refcnt;
//pthread_rwlock_t ob_lock;
RecursiveUpgradeableRWLock ob_lock;
public:
CyObject(): nogil_ob_refcnt(1) {}
......@@ -75,7 +74,8 @@
int CyObject_GETREF();
void CyObject_RLOCK();
void CyObject_WLOCK();
void CyObject_UNLOCK();
void CyObject_UNRLOCK();
void CyObject_UNWLOCK();
int CyObject_TRYRLOCK();
int CyObject_TRYWLOCK();
};
......@@ -138,62 +138,71 @@
static inline void Cy_INCREF(T) {}
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_DECREF(T &op) {
if(op->CyObject_DECREF())
op = NULL;
static inline void Cy_DECREF(T &ob) {
if(ob->CyObject_DECREF())
ob = NULL;
}
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_XDECREF(T &op) {
if (op != NULL) {
if(op->CyObject_DECREF())
op = NULL;
static inline void Cy_XDECREF(T &ob) {
if (ob != NULL) {
if(ob->CyObject_DECREF())
ob = NULL;
}
}
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_INCREF(T op) {
if (op != NULL)
op->CyObject_INCREF();
static inline void Cy_INCREF(T ob) {
if (ob != NULL)
ob->CyObject_INCREF();
}
static inline int _Cy_GETREF(CyObject *op) {
return op->CyObject_GETREF();
static inline int _Cy_GETREF(CyObject *ob) {
return ob->CyObject_GETREF();
}
static inline void _Cy_RLOCK(CyObject *op) {
if (op != NULL) {
op->CyObject_RLOCK();
static inline void _Cy_RLOCK(CyObject *ob) {
if (ob != NULL) {
ob->CyObject_RLOCK();
}
else {
fprintf(stderr, "ERROR: trying to read lock NULL !\n");
}
}
static inline void _Cy_WLOCK(CyObject *op) {
if (op != NULL) {
op->CyObject_WLOCK();
static inline void _Cy_WLOCK(CyObject *ob) {
if (ob != NULL) {
ob->CyObject_WLOCK();
}
else {
fprintf(stderr, "ERROR: trying to write lock NULL !\n");
}
}
static inline void _Cy_UNLOCK(CyObject *op) {
if (op != NULL) {
op->CyObject_UNLOCK();
static inline void _Cy_UNRLOCK(CyObject *ob) {
if (ob != NULL) {
ob->CyObject_UNRLOCK();
}
else {
fprintf(stderr, "ERROR: trying to unrlock NULL !\n");
}
}
static inline void _Cy_UNWLOCK(CyObject *ob) {
if (ob != NULL) {
ob->CyObject_UNWLOCK();
}
else {
fprintf(stderr, "ERROR: trying to unlock NULL !\n");
fprintf(stderr, "ERROR: trying to unwlock NULL !\n");
}
}
static inline int _Cy_TRYRLOCK(CyObject *op) {
return op->CyObject_TRYRLOCK();
static inline int _Cy_TRYRLOCK(CyObject *ob) {
return ob->CyObject_TRYRLOCK();
}
static inline int _Cy_TRYWLOCK(CyObject *op) {
return op->CyObject_TRYWLOCK();
static inline int _Cy_TRYWLOCK(CyObject *ob) {
return ob->CyObject_TRYWLOCK();
}
/*
......@@ -269,18 +278,19 @@
/* Cast argument to CyObject* type. */
#define _CyObject_CAST(op) op
#define Cy_GETREF(op) (_Cy_GETREF(_CyObject_CAST(op)))
#define Cy_GOTREF(op)
#define Cy_XGOTREF(op)
#define Cy_GIVEREF(op)
#define Cy_XGIVEREF(op)
#define Cy_RLOCK(op) _Cy_RLOCK(op)
#define Cy_WLOCK(op) _Cy_WLOCK(op)
#define Cy_UNLOCK(op) _Cy_UNLOCK(op)
#define Cy_TRYRLOCK(op) _Cy_TRYRLOCK(op)
#define Cy_TRYWLOCK(op) _Cy_TRYWLOCK(op)
#define _CyObject_CAST(ob) ob
#define Cy_GETREF(ob) (_Cy_GETREF(_CyObject_CAST(ob)))
#define Cy_GOTREF(ob)
#define Cy_XGOTREF(ob)
#define Cy_GIVEREF(ob)
#define Cy_XGIVEREF(ob)
#define Cy_RLOCK(ob) _Cy_RLOCK(ob)
#define Cy_WLOCK(ob) _Cy_WLOCK(ob)
#define Cy_UNRLOCK(ob) _Cy_UNRLOCK(ob)
#define Cy_UNWLOCK(ob) _Cy_UNWLOCK(ob)
#define Cy_TRYRLOCK(ob) _Cy_TRYRLOCK(ob)
#define Cy_TRYWLOCK(ob) _Cy_TRYWLOCK(ob)
#endif
#endif
......@@ -297,200 +307,164 @@
#endif /* __cplusplus */
void RecursiveUpgradeableRWLock::rlock() {
pid_t caller_id = syscall(SYS_gettid);
ThreadStorage& RecursiveUpgradeableRWLock::get_or_init_thread_count(pid_t thread_id)
{
int first_empty_index = -1;
int match_index = -1;
pthread_mutex_lock(&this->thread_count_lock);
for (unsigned int i = 0; i < this->thread_count.size(); ++i) {
if (this->thread_count[i].thread_id == thread_id)
match_index = i;
if (first_empty_index < 0 && this->thread_count[i].thread_id == 0)
first_empty_index = i;
}
if (match_index < 0) {
// We must get a new entry. The question is: do we have to reallocate space ?
// First, create the temporary entry
ThreadStorage tmp_thread_entry;
tmp_thread_entry.thread_id = thread_id;
tmp_thread_entry.read_count = 0;
tmp_thread_entry.write_count = 0;
if (first_empty_index < 0) {
// We have to reallocate space
match_index = this->thread_count.size();
this->thread_count.push_back(tmp_thread_entry);
} else {
// We can reuse an existing and empty cell
match_index = first_empty_index;
this->thread_count[match_index] = tmp_thread_entry;
}
}
pthread_mutex_unlock(&this->thread_count_lock);
return this->thread_count[match_index];
}
if (this->owner_id == caller_id) {
++this->readers_nb;
return;
}
pthread_mutex_lock(&this->guard);
void RecursiveUpgradeableRWLock::wlock() {
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
int mutex_trylock_error = -1;
if (!has_write_lock) {
if (has_read_lock) {
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock);
// As you may have noticed, this is a trylock above, not a blocking lock.
// This is because we could generate a deadlock:
// Imagine 2 threads T1 and T2, both holding a read lock on the same lock.
// Now, T1 tries to upgrade. So it holds the mutex, then unlock it's read lock,
// then tries to take a write lock. As T2 still has a read lock, T1 blocks,
// waiting for T2 to release it's read lock.
// Now, imagine that, instead of releasing, T2 tries to upgrade.
// It will first try to take the mutex. And won't succeed, as T1 holds it.
// This annoying mutex is here to avoid snatching when upgrading.
// Indeed, if you imagine T1 holding a read lock, T1 tries to upgrade,
// and right after T3 tries to write-lock (from nothing).
// As T1 is releasing then taking the write lock, T3 could take the write lock
// before T1, which is not really what's intented for an upgradable lock.
// The strategy here is to allow an "all is right" case by trying to lock
// first in a non-blocking manner. If it succeeds, hurray, our lock
// won't be snatched, we can continue by releasing the read lock.
// If it doesn't, to avoid a potential deadlock, we first release the read lock
// then try to hold the mutex again. Our lock will be snatched.
// So, in either case, we unlock the read lock here.
pthread_rwlock_unlock(&this->rw_lock);
}
if (mutex_trylock_error != 0)
// Two cases: failed upgrading, or trying to acquire a write lock without previous lock.
// In both situations, we're trying here to acquire a write lock from nothing,
// as we already dropped read lock in the failed upgrading case,
// so blocking is allowed here (can't deadlock as we don't own other locks)
pthread_mutex_lock(&this->upgrade_lock);
pthread_rwlock_wrlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
}
// If we already have the write lock we directly jump here
++my_counts.write_count;
while (this->write_count > 0) {
pthread_cond_wait(&this->wait_writer_depart, &this->guard);
}
this->owner_id = this->readers_nb++ ? CyObject_MANY_OWNERS : caller_id;
pthread_mutex_unlock(&this->guard);
}
void RecursiveUpgradeableRWLock::rlock() {
pid_t my_tid = syscall(SYS_gettid);
int RecursiveUpgradeableRWLock::tryrlock() {
pid_t caller_id = syscall(SYS_gettid);
if (this->owner_id == caller_id) {
++this->readers_nb;
return 0;
}
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
// we must lock here, because a trylock could fail also when another thread is currently read-locking or read-unlocking
// but this means we might miss a writer arriving and leaving
pthread_mutex_lock(&this->guard);
if (!has_write_lock && !has_read_lock) {
pthread_mutex_lock(&this->upgrade_lock);
pthread_rwlock_rdlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
if (this->write_count > 0) {
return CyObject_LOCK_ERROR_OTHER_WRITER;
pthread_mutex_unlock(&this->guard);
}
// If we already have a lock (read or write), we directly jump here
++my_counts.read_count;
this->owner_id = this->readers_nb++ ? CyObject_MANY_OWNERS : caller_id;
pthread_mutex_unlock(&this->guard);
return 0;
}
void RecursiveUpgradeableRWLock::unlock() {
pid_t my_tid = syscall(SYS_gettid);
void RecursiveUpgradeableRWLock::unrlock() {
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
pthread_mutex_lock(&this->guard);
if (has_read_lock) {
--my_counts.read_count;
if (--this->readers_nb == 0) {
if (this->write_count == 0) {
this->owner_id = CyObject_NO_OWNER;
}
else if (has_write_lock) {
--my_counts.write_count;
// broadcast to wake up all the waiting writers
pthread_cond_broadcast(&this->wait_readers_depart);
}
else {
fprintf(stderr, "ERROR: trying to unlock already unlocked CyObject !\n");
pthread_mutex_unlock(&this->guard);
}
void RecursiveUpgradeableRWLock::wlock() {
pid_t caller_id = syscall(SYS_gettid);
if (this->owner_id == caller_id) {
if (this->write_count) {
++this->write_count;
return;
}
if (!my_counts.write_count && !my_counts.read_count) {
pthread_rwlock_unlock(&this->rw_lock);
my_counts.thread_id = 0;
}
}
int RecursiveUpgradeableRWLock::tryrlock() {
int rw_trylock_error;
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
if (!has_write_lock && !has_read_lock) {
pthread_mutex_lock(&this->upgrade_lock);
rw_trylock_error = pthread_rwlock_tryrdlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
if (rw_trylock_error) return rw_trylock_error;
}
++my_counts.read_count;
return 0;
pthread_mutex_lock(&this->guard);
if (this->owner_id != caller_id) {
// we wait for readers first and bottleneck the writers after
// this works for a readers preferring approach
// because the other way around would require setting write_count to 1 before waiting on the readers
// in order to ensure only one writer at a times goes beyound waiting for the other writers to leave
// this would be more in line with a writers preferring approach, but that can deadlock when a thread recurses on a readlock
// this also means that we must broadcast when the readers leave in order to give all the waiting writers a chance to wake up
// new readers might still pass before some of the waiting writers, but then they'll just wait some more
// OTOH, if the new readers come, not broadcasting would let some writers wait forever
while (this->readers_nb > 0) {
pthread_cond_wait(&this->wait_readers_depart, &this->guard);
}
while (this->write_count > 0) {
pthread_cond_wait(&this->wait_writer_depart, &this->guard);
}
this->owner_id = caller_id;
}
this->write_count = 1;
pthread_mutex_unlock(&this->guard);
}
int RecursiveUpgradeableRWLock::trywlock() {
int rw_trylock_error;
int mutex_trylock_error;
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
if (!has_write_lock) {
if (has_read_lock) {
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock);
if (mutex_trylock_error) {
// In contrast to the blocking write lock,
// if we fail here we do want to keep the read lock.
return mutex_trylock_error;
}
// Here, we have the lock -> try to upgrade
pthread_rwlock_unlock(&this->rw_lock);
rw_trylock_error = pthread_rwlock_trywrlock(&this->rw_lock);
if (rw_trylock_error) {
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
return rw_trylock_error;
}
pthread_mutex_unlock(&this->upgrade_lock);
}
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock);
if (mutex_trylock_error) {
// Keep previous state, so we will indeed keep read-lock
// if we had one.
return mutex_trylock_error;
}
if (has_read_lock)
pthread_rwlock_unlock(&this->rw_lock);
rw_trylock_error = pthread_rwlock_trywrlock(&this->rw_lock);
if (rw_trylock_error) {
if (has_read_lock) {
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock(&this->rw_lock);
}
pthread_mutex_unlock(&this->upgrade_lock);
return rw_trylock_error;
}
pthread_mutex_unlock(&this->upgrade_lock);
}
++my_counts.write_count;
pid_t caller_id = syscall(SYS_gettid);
if (this->owner_id == caller_id) {
if (this->write_count) {
++this->write_count;
return 0;
}
}
if (pthread_mutex_trylock(&this->guard) != 0) {
// another thread is currently doing a lock operation on this lock, but we don't know what it is.
// we could choose to do a blocking lock instead of a trylock here.
// that way we would not detect when an unlock overlaps with this trylock,
// but also all the contending readers and / or writers could leave while we block,
// so we wouldn't detect those contentions either.
return CyObject_LOCK_ERROR_OTHER_WRITER | CyObject_LOCK_ERROR_OTHER_READER;
}
if (this->owner_id != caller_id) {
if (this->readers_nb > 0) {
pthread_mutex_unlock(&this->guard);
return CyObject_LOCK_ERROR_OTHER_READER;
}
if (this->write_count > 0) {
pthread_mutex_unlock(&this->guard);
return CyObject_LOCK_ERROR_OTHER_WRITER;
}
this->owner_id = caller_id;
}
this->write_count = 1;
pthread_mutex_unlock(&this->guard);
return 0;
}
void RecursiveUpgradeableRWLock::unwlock() {
pthread_mutex_lock(&this->guard);
if (--this->write_count == 0) {
if (this->readers_nb == 0) {
this->owner_id = CyObject_NO_OWNER;
}
// broadcast to wake up all the waiting readers, + maybe one waiting writer
// more efficient to count r waiting readers and w waiting writers and signal n + (w > 0) times
pthread_cond_broadcast(&this->wait_writer_depart);
}
pthread_mutex_unlock(&this->guard);
}
void CyObject::CyObject_INCREF()
{
atomic_fetch_add(&(this->nogil_ob_refcnt), 1);
......@@ -520,11 +494,6 @@ void CyObject::CyObject_WLOCK()
this->ob_lock.wlock();
}
void CyObject::CyObject_UNLOCK()
{
this->ob_lock.unlock();
}
int CyObject::CyObject_TRYRLOCK()
{
return this->ob_lock.tryrlock();
......@@ -534,6 +503,15 @@ int CyObject::CyObject_TRYWLOCK()
{
return this->ob_lock.trywlock();
}
void CyObject::CyObject_UNRLOCK()
{
this->ob_lock.unrlock();
}
void CyObject::CyObject_UNWLOCK()
{
this->ob_lock.unwlock();
}
ActhonMessageInterface::ActhonMessageInterface(ActhonSyncInterface* sync_method,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment