Commit 3ab99c52 authored by Xavier Thompson's avatar Xavier Thompson

Change cypclass 'Object Invocation Lock' implementation

parent 7ec14202
...@@ -676,7 +676,7 @@ def inject_cypclass_refcount_macros(): ...@@ -676,7 +676,7 @@ def inject_cypclass_refcount_macros():
def inject_cypclass_lock_macros(): def inject_cypclass_lock_macros():
blocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_void_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1) blocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_void_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1)
for macro in ("Cy_RLOCK", "Cy_WLOCK", "Cy_UNLOCK"): for macro in ("Cy_RLOCK", "Cy_WLOCK", "Cy_UNWLOCK", "Cy_UNRLOCK"):
builtin_scope.declare_builtin_cfunction(macro, blocking_macro_type, macro) builtin_scope.declare_builtin_cfunction(macro, blocking_macro_type, macro)
nonblocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_int_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1) nonblocking_macro_type = PyrexTypes.CFuncType(PyrexTypes.c_int_type, [PyrexTypes.CFuncTypeArg("obj", PyrexTypes.cy_object_type, None)], nogil = 1)
for macro in ("Cy_TRYRLOCK", "Cy_TRYWLOCK"): for macro in ("Cy_TRYRLOCK", "Cy_TRYWLOCK"):
......
...@@ -465,11 +465,6 @@ class CypclassLockTransform(Visitor.EnvTransform): ...@@ -465,11 +465,6 @@ class CypclassLockTransform(Visitor.EnvTransform):
self.transform.rlocked[entry] += 1 self.transform.rlocked[entry] += 1
elif state == 'wlocked': elif state == 'wlocked':
self.transform.wlocked[entry] += 1 self.transform.wlocked[entry] += 1
elif state == 'unlocked':
if self.rlocked > 0:
self.transform.rlocked[entry] -= 1
elif self.wlocked > 0:
self.transform.wlocked[entry] -= 1
def __exit__(self, *args): def __exit__(self, *args):
entry = self.entry entry = self.entry
......
...@@ -14007,7 +14007,10 @@ class CoerceToLockedTempNode(CoerceToTempNode): ...@@ -14007,7 +14007,10 @@ class CoerceToLockedTempNode(CoerceToTempNode):
code.putln("Cy_WLOCK(%s);" % self.result()) code.putln("Cy_WLOCK(%s);" % self.result())
def generate_disposal_code(self, code): def generate_disposal_code(self, code):
code.putln("Cy_UNLOCK(%s);" % self.result()) if self.rlock_only:
code.putln("Cy_UNRLOCK(%s);" % self.result())
else:
code.putln("Cy_UNWLOCK(%s);" % self.result())
super(CoerceToLockedTempNode, self).generate_disposal_code(code) super(CoerceToLockedTempNode, self).generate_disposal_code(code)
......
...@@ -926,7 +926,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -926,7 +926,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_deferred_definitions(self, env, code, definition): def generate_cyp_class_deferred_definitions(self, env, code, definition):
""" """
Generate all cypclass method definitions, deferred till now Generate all cypclass method definitions, deferred till now.
""" """
for entry, scope in env.iter_cypclass_entries_and_scopes(): for entry, scope in env.iter_cypclass_entries_and_scopes():
...@@ -948,7 +948,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -948,7 +948,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_attrs_destructor_definition(self, entry, code): def generate_cyp_class_attrs_destructor_definition(self, entry, code):
""" """
Generate destructor definition for the given cypclass entry Generate destructor definition for the given cypclass entry.
""" """
scope = entry.type.scope scope = entry.type.scope
...@@ -966,7 +966,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -966,7 +966,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_activate_function(self, entry, code): def generate_cyp_class_activate_function(self, entry, code):
""" """
Generate activate function for activable cypclass entries Generate activate function for activable cypclass entries.
""" """
active_self_entry = entry.type.scope.lookup_here("<active_self>") active_self_entry = entry.type.scope.lookup_here("<active_self>")
...@@ -1016,7 +1016,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1016,7 +1016,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_activated_class(self, entry, code): def generate_cyp_class_activated_class(self, entry, code):
""" """
Generate activated class Generate activated cypclass.
""" """
from . import Builtin from . import Builtin
...@@ -1085,7 +1085,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1085,7 +1085,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("if (this->%s != NULL) {" % queue_attr_cname) code.putln("if (this->%s != NULL) {" % queue_attr_cname)
code.putln("Cy_WLOCK(%s);" % queue_attr_cname) code.putln("Cy_WLOCK(%s);" % queue_attr_cname)
code.putln("this->%s->push(message);" % queue_attr_cname) code.putln("this->%s->push(message);" % queue_attr_cname)
code.putln("Cy_UNLOCK(%s);" % queue_attr_cname) code.putln("Cy_UNWLOCK(%s);" % queue_attr_cname)
code.putln("} else {") code.putln("} else {")
code.putln("/* We should definitely shout here */") code.putln("/* We should definitely shout here */")
code.putln('fprintf(stderr, "Acthon error: No queue to push to for %s remote call !\\n");' % reified_function_entry.name) code.putln('fprintf(stderr, "Acthon error: No queue to push to for %s remote call !\\n");' % reified_function_entry.name)
...@@ -1098,7 +1098,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1098,7 +1098,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_reifying_entries(self, entry, code): def generate_cyp_class_reifying_entries(self, entry, code):
""" """
Generate code to reify the cypclass entry ? -> TODO what does this do exactly ? Generate code to reify the cypclass entries.
""" """
target_object_type = entry.type target_object_type = entry.type
...@@ -1217,7 +1217,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1217,7 +1217,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("if (this->%s != NULL) {" % sync_attr_cname) code.putln("if (this->%s != NULL) {" % sync_attr_cname)
code.putln("if (!Cy_TRYRLOCK(this->%s)) {" % sync_attr_cname) code.putln("if (!Cy_TRYRLOCK(this->%s)) {" % sync_attr_cname)
code.putln("%s = this->%s->isActivable();" % (sync_result, sync_attr_cname)) code.putln("%s = this->%s->isActivable();" % (sync_result, sync_attr_cname))
code.putln("Cy_UNLOCK(this->%s);" % sync_attr_cname) code.putln("Cy_UNRLOCK(this->%s);" % sync_attr_cname)
code.putln("}") code.putln("}")
code.putln("if (%s == 0) return 0;" % sync_result) code.putln("if (%s == 0) return 0;" % sync_result)
code.putln("}") code.putln("}")
...@@ -1297,19 +1297,22 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1297,19 +1297,22 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
num_unlock = 0 num_unlock = 0
# Target object first, then arguments # Target object first, then arguments
code.putln("if (%s > %s) {" % (trylock_result, num_unlock)) code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s);" % target_object_cname) unlock_op = "Cy_UNRLOCK" if reified_function_entry.type.is_const_method else "Cy_UNWLOCK"
code.putln("%s(this->%s);" % (unlock_op, target_object_cname))
num_unlock += 1 num_unlock += 1
for i, narg in enumerate(func_type.args[:narg_count]): for i, narg in enumerate(func_type.args[:narg_count]):
if narg.type.is_cyp_class: if narg.type.is_cyp_class:
code.putln("if (%s > %s) {" % (trylock_result, num_unlock)) code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s);" % narg.cname) unlock_op = "Cy_UNRLOCK" if narg.type.is_const else "Cy_UNWLOCK"
code.putln("%s(this->%s);" % (unlock_op, narg.cname))
num_unlock += 1 num_unlock += 1
if opt_arg_count and num_optional_if: if opt_arg_count and num_optional_if:
code.putln("if (this->%s != NULL) {" % opt_arg_name) code.putln("if (this->%s != NULL) {" % opt_arg_name)
for opt_idx, optarg in enumerate(func_type.args[narg_count:]): for opt_idx, optarg in enumerate(func_type.args[narg_count:]):
if optarg.type.is_cyp_class: if optarg.type.is_cyp_class:
code.putln("if (%s > %s) {" % (trylock_result, num_unlock)) code.putln("if (%s > %s) {" % (trylock_result, num_unlock))
code.putln("Cy_UNLOCK(this->%s->%s);" % (opt_arg_name, func_type.opt_arg_cname(optarg.name))) unlock_op = "Cy_UNRLOCK" if optarg.type.is_const else "Cy_UNWLOCK"
code.putln("%s(this->%s->%s);" % (unlock_op, opt_arg_name, func_type.opt_arg_cname(optarg.name)))
num_unlock += 1 num_unlock += 1
# Note: we do not respect the semantic order of end-blocks here for simplification purpose. # Note: we do not respect the semantic order of end-blocks here for simplification purpose.
# This one is for the "not NULL opt arg" check # This one is for the "not NULL opt arg" check
...@@ -1330,8 +1333,10 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1330,8 +1333,10 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
", ".join("this->%s" % arg_cname for arg_cname in reified_call_args_list) ", ".join("this->%s" % arg_cname for arg_cname in reified_call_args_list)
) )
) )
code.putln("Cy_UNLOCK(this->%s);" % target_object_cname) unlock_op = "Cy_UNRLOCK" if reified_function_entry.type.is_const_method else "Cy_UNWLOCK"
put_cypclass_op_on_narg_optarg(lambda _: "Cy_UNLOCK", reified_function_entry.type, Naming.optional_args_cname, code) code.putln("%s(this->%s);" % (unlock_op, target_object_cname))
arg_unlocker = lambda arg: "Cy_UNRLOCK" if arg.type.is_const else "Cy_UNWLOCK"
put_cypclass_op_on_narg_optarg(arg_unlocker, reified_function_entry.type, Naming.optional_args_cname, code)
code.putln("/* Push result in the result object */") code.putln("/* Push result in the result object */")
if does_return: if does_return:
code.putln("Cy_WLOCK(this->%s);" % result_attr_cname) code.putln("Cy_WLOCK(this->%s);" % result_attr_cname)
...@@ -1339,7 +1344,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1339,7 +1344,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code.putln("this->%s->pushIntResult(result);" % result_attr_cname) code.putln("this->%s->pushIntResult(result);" % result_attr_cname)
else: else:
code.putln("this->%s->pushVoidStarResult((void*)result);" % result_attr_cname) code.putln("this->%s->pushVoidStarResult((void*)result);" % result_attr_cname)
code.putln("Cy_UNLOCK(this->%s);" % result_attr_cname) code.putln("Cy_UNWLOCK(this->%s);" % result_attr_cname)
code.putln("return 1;") code.putln("return 1;")
code.putln("}") code.putln("}")
...@@ -1354,7 +1359,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode): ...@@ -1354,7 +1359,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def generate_cyp_class_wrapper_definition(self, type, wrapper_entry, constructor_entry, new_entry, alloc_entry, code): def generate_cyp_class_wrapper_definition(self, type, wrapper_entry, constructor_entry, new_entry, alloc_entry, code):
""" """
Generate cypclass constructor wrapper ? -> TODO what does this do exactly ? Generate the cypclass constructor wrapper.
""" """
if type.templates: if type.templates:
......
...@@ -8546,7 +8546,7 @@ class EnsureGILNode(GILExitNode): ...@@ -8546,7 +8546,7 @@ class EnsureGILNode(GILExitNode):
code.put_ensure_gil(declare_gilstate=False) code.put_ensure_gil(declare_gilstate=False)
class LockCypclassNode(StatNode): class LockCypclassNode(StatNode):
# 'with rlocked / wlocked [cypclass object]' or 'with unlocked [cypclass object]' statement # 'with rlocked / wlocked [cypclass object]
# #
# state string 'rlocked' or 'wlocked' or 'unlocked' # state string 'rlocked' or 'wlocked' or 'unlocked'
# obj ExprNode the (un)locked object # obj ExprNode the (un)locked object
...@@ -8574,13 +8574,10 @@ class LockCypclassNode(StatNode): ...@@ -8574,13 +8574,10 @@ class LockCypclassNode(StatNode):
self.body.generate_execution_code(code) self.body.generate_execution_code(code)
# We must unlock if we held a lock previously, and relock if we unlocked. if self.state == "rlocked":
if self.state != "unlocked": code.putln("Cy_UNRLOCK(%s);" % self.obj.result())
code.putln("Cy_UNLOCK(%s);" % self.obj.result()) elif self.state == "wlocked":
elif self.was_wlocked: code.putln("Cy_UNWLOCK(%s);" % self.obj.result())
code.putln("Cy_WLOCK(%s);" % self.obj.result())
elif self.was_rlocked:
code.putln("Cy_RLOCK(%s);" % self.obj.result())
def cython_view_utility_code(): def cython_view_utility_code():
......
...@@ -14,8 +14,14 @@ ...@@ -14,8 +14,14 @@
#ifdef __cplusplus #ifdef __cplusplus
#if __cplusplus >= 201103L #if __cplusplus >= 201103L
#include <atomic> #include <atomic>
#include <cstdint>
using namespace std; using namespace std;
#define CyObject_ATOMIC_REFCOUNT_TYPE atomic_int #define CyObject_ATOMIC_REFCOUNT_TYPE atomic_int
#define CyObject_NO_OWNER -1
#define CyObject_MANY_OWNERS -2
#define CyObject_MAX_READERS (1 << 30)
#define CyObject_LOCK_ERROR_OTHER_WRITER (1 << 0)
#define CyObject_LOCK_ERROR_OTHER_READER (1 << 1)
#include <pthread.h> #include <pthread.h>
...@@ -28,32 +34,26 @@ ...@@ -28,32 +34,26 @@
#include <type_traits> #include <type_traits>
struct ThreadStorage {
pid_t thread_id;
unsigned int read_count;
unsigned int write_count;
};
class RecursiveUpgradeableRWLock { class RecursiveUpgradeableRWLock {
pthread_rwlock_t rw_lock; pthread_mutex_t guard;
pthread_mutex_t upgrade_lock; pthread_cond_t wait_readers_depart;
// Notes: This could be a rw_lock pthread_cond_t wait_writer_depart;
pthread_mutex_t thread_count_lock; atomic<pid_t> owner_id;
std::vector<ThreadStorage> thread_count; atomic_int32_t readers_nb;
protected: uint32_t write_count;
ThreadStorage& get_or_init_thread_count(pid_t thread_id);
public: public:
RecursiveUpgradeableRWLock() RecursiveUpgradeableRWLock() {
{ pthread_mutex_init(&this->guard, NULL);
pthread_rwlock_init(&this->rw_lock, NULL); pthread_cond_init(&this->wait_readers_depart, NULL);
pthread_mutex_init(&this->upgrade_lock, NULL); pthread_cond_init(&this->wait_writer_depart, NULL);
pthread_mutex_init(&this->thread_count_lock, NULL); this->owner_id = CyObject_NO_OWNER;
// Reserve space for up to 8 threads this->readers_nb = 0;
this->thread_count.reserve(8); this->write_count = 0;
} }
void wlock(); void wlock();
void rlock(); void rlock();
void unlock(); void unwlock();
void unrlock();
int tryrlock(); int tryrlock();
int trywlock(); int trywlock();
}; };
...@@ -65,7 +65,6 @@ ...@@ -65,7 +65,6 @@
class CyObject : public CyPyObject { class CyObject : public CyPyObject {
private: private:
CyObject_ATOMIC_REFCOUNT_TYPE nogil_ob_refcnt; CyObject_ATOMIC_REFCOUNT_TYPE nogil_ob_refcnt;
//pthread_rwlock_t ob_lock;
RecursiveUpgradeableRWLock ob_lock; RecursiveUpgradeableRWLock ob_lock;
public: public:
CyObject(): nogil_ob_refcnt(1) {} CyObject(): nogil_ob_refcnt(1) {}
...@@ -75,7 +74,8 @@ ...@@ -75,7 +74,8 @@
int CyObject_GETREF(); int CyObject_GETREF();
void CyObject_RLOCK(); void CyObject_RLOCK();
void CyObject_WLOCK(); void CyObject_WLOCK();
void CyObject_UNLOCK(); void CyObject_UNRLOCK();
void CyObject_UNWLOCK();
int CyObject_TRYRLOCK(); int CyObject_TRYRLOCK();
int CyObject_TRYWLOCK(); int CyObject_TRYWLOCK();
}; };
...@@ -138,62 +138,71 @@ ...@@ -138,62 +138,71 @@
static inline void Cy_INCREF(T) {} static inline void Cy_INCREF(T) {}
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0> template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_DECREF(T &op) { static inline void Cy_DECREF(T &ob) {
if(op->CyObject_DECREF()) if(ob->CyObject_DECREF())
op = NULL; ob = NULL;
} }
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0> template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_XDECREF(T &op) { static inline void Cy_XDECREF(T &ob) {
if (op != NULL) { if (ob != NULL) {
if(op->CyObject_DECREF()) if(ob->CyObject_DECREF())
op = NULL; ob = NULL;
} }
} }
template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0> template <typename T, typename std::enable_if<std::is_convertible<T, CyObject*>::value, int>::type = 0>
static inline void Cy_INCREF(T op) { static inline void Cy_INCREF(T ob) {
if (op != NULL) if (ob != NULL)
op->CyObject_INCREF(); ob->CyObject_INCREF();
} }
static inline int _Cy_GETREF(CyObject *op) { static inline int _Cy_GETREF(CyObject *ob) {
return op->CyObject_GETREF(); return ob->CyObject_GETREF();
} }
static inline void _Cy_RLOCK(CyObject *op) { static inline void _Cy_RLOCK(CyObject *ob) {
if (op != NULL) { if (ob != NULL) {
op->CyObject_RLOCK(); ob->CyObject_RLOCK();
} }
else { else {
fprintf(stderr, "ERROR: trying to read lock NULL !\n"); fprintf(stderr, "ERROR: trying to read lock NULL !\n");
} }
} }
static inline void _Cy_WLOCK(CyObject *op) { static inline void _Cy_WLOCK(CyObject *ob) {
if (op != NULL) { if (ob != NULL) {
op->CyObject_WLOCK(); ob->CyObject_WLOCK();
} }
else { else {
fprintf(stderr, "ERROR: trying to write lock NULL !\n"); fprintf(stderr, "ERROR: trying to write lock NULL !\n");
} }
} }
static inline void _Cy_UNLOCK(CyObject *op) { static inline void _Cy_UNRLOCK(CyObject *ob) {
if (op != NULL) { if (ob != NULL) {
op->CyObject_UNLOCK(); ob->CyObject_UNRLOCK();
}
else {
fprintf(stderr, "ERROR: trying to unrlock NULL !\n");
}
}
static inline void _Cy_UNWLOCK(CyObject *ob) {
if (ob != NULL) {
ob->CyObject_UNWLOCK();
} }
else { else {
fprintf(stderr, "ERROR: trying to unlock NULL !\n"); fprintf(stderr, "ERROR: trying to unwlock NULL !\n");
} }
} }
static inline int _Cy_TRYRLOCK(CyObject *op) { static inline int _Cy_TRYRLOCK(CyObject *ob) {
return op->CyObject_TRYRLOCK(); return ob->CyObject_TRYRLOCK();
} }
static inline int _Cy_TRYWLOCK(CyObject *op) { static inline int _Cy_TRYWLOCK(CyObject *ob) {
return op->CyObject_TRYWLOCK(); return ob->CyObject_TRYWLOCK();
} }
/* /*
...@@ -269,18 +278,19 @@ ...@@ -269,18 +278,19 @@
/* Cast argument to CyObject* type. */ /* Cast argument to CyObject* type. */
#define _CyObject_CAST(op) op #define _CyObject_CAST(ob) ob
#define Cy_GETREF(op) (_Cy_GETREF(_CyObject_CAST(op))) #define Cy_GETREF(ob) (_Cy_GETREF(_CyObject_CAST(ob)))
#define Cy_GOTREF(op) #define Cy_GOTREF(ob)
#define Cy_XGOTREF(op) #define Cy_XGOTREF(ob)
#define Cy_GIVEREF(op) #define Cy_GIVEREF(ob)
#define Cy_XGIVEREF(op) #define Cy_XGIVEREF(ob)
#define Cy_RLOCK(op) _Cy_RLOCK(op) #define Cy_RLOCK(ob) _Cy_RLOCK(ob)
#define Cy_WLOCK(op) _Cy_WLOCK(op) #define Cy_WLOCK(ob) _Cy_WLOCK(ob)
#define Cy_UNLOCK(op) _Cy_UNLOCK(op) #define Cy_UNRLOCK(ob) _Cy_UNRLOCK(ob)
#define Cy_TRYRLOCK(op) _Cy_TRYRLOCK(op) #define Cy_UNWLOCK(ob) _Cy_UNWLOCK(ob)
#define Cy_TRYWLOCK(op) _Cy_TRYWLOCK(op) #define Cy_TRYRLOCK(ob) _Cy_TRYRLOCK(ob)
#define Cy_TRYWLOCK(ob) _Cy_TRYWLOCK(ob)
#endif #endif
#endif #endif
...@@ -297,200 +307,164 @@ ...@@ -297,200 +307,164 @@
#endif /* __cplusplus */ #endif /* __cplusplus */
void RecursiveUpgradeableRWLock::rlock() {
pid_t caller_id = syscall(SYS_gettid);
ThreadStorage& RecursiveUpgradeableRWLock::get_or_init_thread_count(pid_t thread_id) if (this->owner_id == caller_id) {
{ ++this->readers_nb;
int first_empty_index = -1; return;
int match_index = -1;
pthread_mutex_lock(&this->thread_count_lock);
for (unsigned int i = 0; i < this->thread_count.size(); ++i) {
if (this->thread_count[i].thread_id == thread_id)
match_index = i;
if (first_empty_index < 0 && this->thread_count[i].thread_id == 0)
first_empty_index = i;
} }
if (match_index < 0) { pthread_mutex_lock(&this->guard);
// We must get a new entry. The question is: do we have to reallocate space ?
while (this->write_count > 0) {
// First, create the temporary entry pthread_cond_wait(&this->wait_writer_depart, &this->guard);
ThreadStorage tmp_thread_entry;
tmp_thread_entry.thread_id = thread_id;
tmp_thread_entry.read_count = 0;
tmp_thread_entry.write_count = 0;
if (first_empty_index < 0) {
// We have to reallocate space
match_index = this->thread_count.size();
this->thread_count.push_back(tmp_thread_entry);
} else {
// We can reuse an existing and empty cell
match_index = first_empty_index;
this->thread_count[match_index] = tmp_thread_entry;
}
} }
pthread_mutex_unlock(&this->thread_count_lock);
return this->thread_count[match_index];
}
this->owner_id = this->readers_nb++ ? CyObject_MANY_OWNERS : caller_id;
void RecursiveUpgradeableRWLock::wlock() { pthread_mutex_unlock(&this->guard);
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
int mutex_trylock_error = -1;
if (!has_write_lock) {
if (has_read_lock) {
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock);
// As you may have noticed, this is a trylock above, not a blocking lock.
// This is because we could generate a deadlock:
// Imagine 2 threads T1 and T2, both holding a read lock on the same lock.
// Now, T1 tries to upgrade. So it holds the mutex, then unlock it's read lock,
// then tries to take a write lock. As T2 still has a read lock, T1 blocks,
// waiting for T2 to release it's read lock.
// Now, imagine that, instead of releasing, T2 tries to upgrade.
// It will first try to take the mutex. And won't succeed, as T1 holds it.
// This annoying mutex is here to avoid snatching when upgrading.
// Indeed, if you imagine T1 holding a read lock, T1 tries to upgrade,
// and right after T3 tries to write-lock (from nothing).
// As T1 is releasing then taking the write lock, T3 could take the write lock
// before T1, which is not really what's intented for an upgradable lock.
// The strategy here is to allow an "all is right" case by trying to lock
// first in a non-blocking manner. If it succeeds, hurray, our lock
// won't be snatched, we can continue by releasing the read lock.
// If it doesn't, to avoid a potential deadlock, we first release the read lock
// then try to hold the mutex again. Our lock will be snatched.
// So, in either case, we unlock the read lock here.
pthread_rwlock_unlock(&this->rw_lock);
}
if (mutex_trylock_error != 0)
// Two cases: failed upgrading, or trying to acquire a write lock without previous lock.
// In both situations, we're trying here to acquire a write lock from nothing,
// as we already dropped read lock in the failed upgrading case,
// so blocking is allowed here (can't deadlock as we don't own other locks)
pthread_mutex_lock(&this->upgrade_lock);
pthread_rwlock_wrlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
}
// If we already have the write lock we directly jump here
++my_counts.write_count;
} }
void RecursiveUpgradeableRWLock::rlock() { int RecursiveUpgradeableRWLock::tryrlock() {
pid_t my_tid = syscall(SYS_gettid); pid_t caller_id = syscall(SYS_gettid);
if (this->owner_id == caller_id) {
++this->readers_nb;
return 0;
}
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid); // we must lock here, because a trylock could fail also when another thread is currently read-locking or read-unlocking
bool has_read_lock = my_counts.read_count; // but this means we might miss a writer arriving and leaving
bool has_write_lock = my_counts.write_count; pthread_mutex_lock(&this->guard);
if (!has_write_lock && !has_read_lock) { if (this->write_count > 0) {
pthread_mutex_lock(&this->upgrade_lock); return CyObject_LOCK_ERROR_OTHER_WRITER;
pthread_rwlock_rdlock(&this->rw_lock); pthread_mutex_unlock(&this->guard);
pthread_mutex_unlock(&this->upgrade_lock);
} }
// If we already have a lock (read or write), we directly jump here
++my_counts.read_count; this->owner_id = this->readers_nb++ ? CyObject_MANY_OWNERS : caller_id;
pthread_mutex_unlock(&this->guard);
return 0;
} }
void RecursiveUpgradeableRWLock::unlock() { void RecursiveUpgradeableRWLock::unrlock() {
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid); pthread_mutex_lock(&this->guard);
bool has_read_lock = my_counts.read_count;
bool has_write_lock = my_counts.write_count;
if (has_read_lock) { if (--this->readers_nb == 0) {
--my_counts.read_count; if (this->write_count == 0) {
} this->owner_id = CyObject_NO_OWNER;
else if (has_write_lock) { }
--my_counts.write_count;
} // broadcast to wake up all the waiting writers
else { pthread_cond_broadcast(&this->wait_readers_depart);
fprintf(stderr, "ERROR: trying to unlock already unlocked CyObject !\n");
return;
}
if (!my_counts.write_count && !my_counts.read_count) {
pthread_rwlock_unlock(&this->rw_lock);
my_counts.thread_id = 0;
} }
pthread_mutex_unlock(&this->guard);
} }
int RecursiveUpgradeableRWLock::tryrlock() { void RecursiveUpgradeableRWLock::wlock() {
int rw_trylock_error; pid_t caller_id = syscall(SYS_gettid);
pid_t my_tid = syscall(SYS_gettid);
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid); if (this->owner_id == caller_id) {
bool has_read_lock = my_counts.read_count; if (this->write_count) {
bool has_write_lock = my_counts.write_count; ++this->write_count;
if (!has_write_lock && !has_read_lock) { return;
pthread_mutex_lock(&this->upgrade_lock); }
rw_trylock_error = pthread_rwlock_tryrdlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
if (rw_trylock_error) return rw_trylock_error;
} }
++my_counts.read_count;
return 0; pthread_mutex_lock(&this->guard);
if (this->owner_id != caller_id) {
// we wait for readers first and bottleneck the writers after
// this works for a readers preferring approach
// because the other way around would require setting write_count to 1 before waiting on the readers
// in order to ensure only one writer at a times goes beyound waiting for the other writers to leave
// this would be more in line with a writers preferring approach, but that can deadlock when a thread recurses on a readlock
// this also means that we must broadcast when the readers leave in order to give all the waiting writers a chance to wake up
// new readers might still pass before some of the waiting writers, but then they'll just wait some more
// OTOH, if the new readers come, not broadcasting would let some writers wait forever
while (this->readers_nb > 0) {
pthread_cond_wait(&this->wait_readers_depart, &this->guard);
}
while (this->write_count > 0) {
pthread_cond_wait(&this->wait_writer_depart, &this->guard);
}
this->owner_id = caller_id;
}
this->write_count = 1;
pthread_mutex_unlock(&this->guard);
} }
int RecursiveUpgradeableRWLock::trywlock() { int RecursiveUpgradeableRWLock::trywlock() {
int rw_trylock_error; pid_t caller_id = syscall(SYS_gettid);
int mutex_trylock_error;
pid_t my_tid = syscall(SYS_gettid); if (this->owner_id == caller_id) {
ThreadStorage& my_counts = this->get_or_init_thread_count(my_tid); if (this->write_count) {
bool has_read_lock = my_counts.read_count; ++this->write_count;
bool has_write_lock = my_counts.write_count; return 0;
if (!has_write_lock) {
if (has_read_lock) {
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock);
if (mutex_trylock_error) {
// In contrast to the blocking write lock,
// if we fail here we do want to keep the read lock.
return mutex_trylock_error;
}
// Here, we have the lock -> try to upgrade
pthread_rwlock_unlock(&this->rw_lock);
rw_trylock_error = pthread_rwlock_trywrlock(&this->rw_lock);
if (rw_trylock_error) {
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock(&this->rw_lock);
pthread_mutex_unlock(&this->upgrade_lock);
return rw_trylock_error;
}
pthread_mutex_unlock(&this->upgrade_lock);
} }
mutex_trylock_error = pthread_mutex_trylock(&this->upgrade_lock); }
if (mutex_trylock_error) {
// Keep previous state, so we will indeed keep read-lock if (pthread_mutex_trylock(&this->guard) != 0) {
// if we had one. // another thread is currently doing a lock operation on this lock, but we don't know what it is.
return mutex_trylock_error; // we could choose to do a blocking lock instead of a trylock here.
// that way we would not detect when an unlock overlaps with this trylock,
// but also all the contending readers and / or writers could leave while we block,
// so we wouldn't detect those contentions either.
return CyObject_LOCK_ERROR_OTHER_WRITER | CyObject_LOCK_ERROR_OTHER_READER;
}
if (this->owner_id != caller_id) {
if (this->readers_nb > 0) {
pthread_mutex_unlock(&this->guard);
return CyObject_LOCK_ERROR_OTHER_READER;
} }
if (has_read_lock)
pthread_rwlock_unlock(&this->rw_lock); if (this->write_count > 0) {
rw_trylock_error = pthread_rwlock_trywrlock(&this->rw_lock); pthread_mutex_unlock(&this->guard);
if (rw_trylock_error) { return CyObject_LOCK_ERROR_OTHER_WRITER;
if (has_read_lock) {
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock(&this->rw_lock);
}
pthread_mutex_unlock(&this->upgrade_lock);
return rw_trylock_error;
} }
pthread_mutex_unlock(&this->upgrade_lock);
this->owner_id = caller_id;
} }
++my_counts.write_count;
this->write_count = 1;
pthread_mutex_unlock(&this->guard);
return 0; return 0;
} }
void RecursiveUpgradeableRWLock::unwlock() {
pthread_mutex_lock(&this->guard);
if (--this->write_count == 0) {
if (this->readers_nb == 0) {
this->owner_id = CyObject_NO_OWNER;
}
// broadcast to wake up all the waiting readers, + maybe one waiting writer
// more efficient to count r waiting readers and w waiting writers and signal n + (w > 0) times
pthread_cond_broadcast(&this->wait_writer_depart);
}
pthread_mutex_unlock(&this->guard);
}
void CyObject::CyObject_INCREF() void CyObject::CyObject_INCREF()
{ {
atomic_fetch_add(&(this->nogil_ob_refcnt), 1); atomic_fetch_add(&(this->nogil_ob_refcnt), 1);
...@@ -520,11 +494,6 @@ void CyObject::CyObject_WLOCK() ...@@ -520,11 +494,6 @@ void CyObject::CyObject_WLOCK()
this->ob_lock.wlock(); this->ob_lock.wlock();
} }
void CyObject::CyObject_UNLOCK()
{
this->ob_lock.unlock();
}
int CyObject::CyObject_TRYRLOCK() int CyObject::CyObject_TRYRLOCK()
{ {
return this->ob_lock.tryrlock(); return this->ob_lock.tryrlock();
...@@ -534,6 +503,15 @@ int CyObject::CyObject_TRYWLOCK() ...@@ -534,6 +503,15 @@ int CyObject::CyObject_TRYWLOCK()
{ {
return this->ob_lock.trywlock(); return this->ob_lock.trywlock();
} }
void CyObject::CyObject_UNRLOCK()
{
this->ob_lock.unrlock();
}
void CyObject::CyObject_UNWLOCK()
{
this->ob_lock.unwlock();
}
ActhonMessageInterface::ActhonMessageInterface(ActhonSyncInterface* sync_method, ActhonMessageInterface::ActhonMessageInterface(ActhonSyncInterface* sync_method,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment