Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Xavier Thompson
cython
Commits
3de2efaf
Commit
3de2efaf
authored
Jun 20, 2019
by
gsamain
Committed by
Xavier Thompson
Aug 17, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
CyObjects utility for locks
parent
ef4bbc54
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
288 additions
and
0 deletions
+288
-0
Cython/Utility/CyObjects.cpp
Cython/Utility/CyObjects.cpp
+288
-0
No files found.
Cython/Utility/CyObjects.cpp
View file @
3de2efaf
...
...
@@ -17,14 +17,60 @@
using
namespace
std
;
#define CyObject_ATOMIC_REFCOUNT_TYPE atomic_int
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <vector>
struct
ThreadStorage
{
pid_t
thread_id
;
unsigned
int
read_count
;
unsigned
int
write_count
;
};
class
RecursiveUpgradeableRWLock
{
pthread_rwlock_t
rw_lock
;
pthread_mutex_t
upgrade_lock
;
// Notes: This could be a rw_lock
pthread_mutex_t
thread_count_lock
;
std
::
vector
<
ThreadStorage
>
thread_count
;
protected:
ThreadStorage
&
get_or_init_thread_count
(
pid_t
thread_id
);
public:
RecursiveUpgradeableRWLock
()
{
pthread_rwlock_init
(
&
this
->
rw_lock
,
NULL
);
pthread_mutex_init
(
&
this
->
upgrade_lock
,
NULL
);
pthread_mutex_init
(
&
this
->
thread_count_lock
,
NULL
);
// Reserve space for up to 8 threads
this
->
thread_count
.
reserve
(
8
);
}
void
wlock
();
void
rlock
();
void
unlock
();
int
tryrlock
();
int
trywlock
();
};
class
CyObject
{
private:
CyObject_ATOMIC_REFCOUNT_TYPE
ob_refcnt
;
//pthread_rwlock_t ob_lock;
RecursiveUpgradeableRWLock
ob_lock
;
public:
CyObject
()
:
ob_refcnt
(
1
)
{}
virtual
~
CyObject
()
{}
void
CyObject_INCREF
();
int
CyObject_DECREF
();
void
CyObject_RLOCK
();
void
CyObject_WLOCK
();
void
CyObject_UNLOCK
();
int
CyObject_TRYRLOCK
();
int
CyObject_TRYWLOCK
();
};
static
inline
int
_Cy_DECREF
(
CyObject
*
op
)
{
...
...
@@ -35,6 +81,26 @@
op
->
CyObject_INCREF
();
}
static
inline
void
_Cy_RLOCK
(
CyObject
*
op
)
{
op
->
CyObject_RLOCK
();
}
static
inline
void
_Cy_WLOCK
(
CyObject
*
op
)
{
op
->
CyObject_WLOCK
();
}
static
inline
void
_Cy_UNLOCK
(
CyObject
*
op
)
{
if
(
op
!=
NULL
)
op
->
CyObject_UNLOCK
();
}
static
inline
int
_Cy_TRYRLOCK
(
CyObject
*
op
)
{
return
op
->
CyObject_TRYRLOCK
();
}
static
inline
int
_Cy_TRYWLOCK
(
CyObject
*
op
)
{
return
op
->
CyObject_TRYWLOCK
();
}
/* Cast argument to CyObject* type. */
#define _CyObject_CAST(op) op
...
...
@@ -45,6 +111,11 @@
#define Cy_XGOTREF(op)
#define Cy_GIVEREF(op)
#define Cy_XGIVEREF(op)
#define Cy_RLOCK(op) _Cy_RLOCK(op)
#define Cy_WLOCK(op) _Cy_WLOCK(op)
#define Cy_UNLOCK(op) _Cy_UNLOCK(op)
#define Cy_TRYRLOCK(op) _Cy_TRYRLOCK(op)
#define Cy_TRYWLOCK(op) _Cy_TRYWLOCK(op)
#endif
#endif
...
...
@@ -60,6 +131,198 @@
#error C++ needed for cython+ nogil classes
#endif
/* __cplusplus */
ThreadStorage
&
RecursiveUpgradeableRWLock
::
get_or_init_thread_count
(
pid_t
thread_id
)
{
int
first_empty_index
=
-
1
;
int
match_index
=
-
1
;
pthread_mutex_lock
(
&
this
->
thread_count_lock
);
for
(
unsigned
int
i
=
0
;
i
<
this
->
thread_count
.
size
();
++
i
)
{
if
(
this
->
thread_count
[
i
].
thread_id
==
thread_id
)
match_index
=
i
;
if
(
first_empty_index
<
0
&&
this
->
thread_count
[
i
].
thread_id
==
0
)
first_empty_index
=
i
;
}
if
(
match_index
<
0
)
{
// We must get a new entry. The question is: do we have to reallocate space ?
// First, create the temporary entry
ThreadStorage
tmp_thread_entry
;
tmp_thread_entry
.
thread_id
=
thread_id
;
tmp_thread_entry
.
read_count
=
0
;
tmp_thread_entry
.
write_count
=
0
;
if
(
first_empty_index
<
0
)
{
// We have to reallocate space
match_index
=
this
->
thread_count
.
size
();
this
->
thread_count
.
push_back
(
tmp_thread_entry
);
}
else
{
// We can reuse an existing and empty cell
match_index
=
first_empty_index
;
this
->
thread_count
[
match_index
]
=
tmp_thread_entry
;
}
}
pthread_mutex_unlock
(
&
this
->
thread_count_lock
);
return
this
->
thread_count
[
match_index
];
}
void
RecursiveUpgradeableRWLock
::
wlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
int
mutex_trylock_error
=
-
1
;
if
(
!
has_write_lock
)
{
if
(
has_read_lock
)
{
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
// As you may have noticed, this is a trylock above, not a blocking lock.
// This is because we could generate a deadlock:
// Imagine 2 threads T1 and T2, both holding a read lock on the same lock.
// Now, T1 tries to upgrade. So it holds the mutex, then unlock it's read lock,
// then tries to take a write lock. As T2 still has a read lock, T1 blocks,
// waiting for T2 to release it's read lock.
// Now, imagine that, instead of releasing, T2 tries to upgrade.
// It will first try to take the mutex. And won't succeed, as T1 holds it.
// This annoying mutex is here to avoid snatching when upgrading.
// Indeed, if you imagine T1 holding a read lock, T1 tries to upgrade,
// and right after T3 tries to write-lock (from nothing).
// As T1 is releasing then taking the write lock, T3 could take the write lock
// before T1, which is not really what's intented for an upgradable lock.
// The strategy here is to allow an "all is right" case by trying to lock
// first in a non-blocking manner. If it succeeds, hurray, our lock
// won't be snatched, we can continue by releasing the read lock.
// If it doesn't, to avoid a potential deadlock, we first release the read lock
// then try to hold the mutex again. Our lock will be snatched.
// So, in either case, we unlock the read lock here.
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
}
if
(
mutex_trylock_error
!=
0
)
// Two cases: failed upgrading, or trying to acquire a write lock without previous lock.
// In both situations, we're trying here to acquire a write lock from nothing,
// as we already dropped read lock in the failed upgrading case,
// so blocking is allowed here (can't deadlock as we don't own other locks)
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
pthread_rwlock_wrlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
}
// If we already have the write lock we directly jump here
++
my_counts
.
write_count
;
}
void
RecursiveUpgradeableRWLock
::
rlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
!
has_write_lock
&&
!
has_read_lock
)
{
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
}
// If we already have a lock (read or write), we directly jump here
++
my_counts
.
read_count
;
}
void
RecursiveUpgradeableRWLock
::
unlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
has_read_lock
)
{
--
my_counts
.
read_count
;
}
else
if
(
has_write_lock
)
{
--
my_counts
.
write_count
;
}
if
(
!
my_counts
.
write_count
&&
!
my_counts
.
read_count
)
{
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
my_counts
.
thread_id
=
0
;
}
}
int
RecursiveUpgradeableRWLock
::
tryrlock
()
{
int
rw_trylock_error
;
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
!
has_write_lock
&&
!
has_read_lock
)
{
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
rw_trylock_error
=
pthread_rwlock_tryrdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
if
(
rw_trylock_error
)
return
rw_trylock_error
;
}
++
my_counts
.
read_count
;
return
0
;
}
int
RecursiveUpgradeableRWLock
::
trywlock
()
{
int
rw_trylock_error
;
int
mutex_trylock_error
;
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
!
has_write_lock
)
{
if
(
has_read_lock
)
{
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
if
(
mutex_trylock_error
)
{
// In contrast to the blocking write lock,
// if we fail here we do want to keep the read lock.
return
mutex_trylock_error
;
}
// Here, we have the lock -> try to upgrade
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
rw_trylock_error
=
pthread_rwlock_trywrlock
(
&
this
->
rw_lock
);
if
(
rw_trylock_error
)
{
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
return
rw_trylock_error
;
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
}
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
if
(
mutex_trylock_error
)
{
// Keep previous state, so we will indeed keep read-lock
// if we had one.
return
mutex_trylock_error
;
}
if
(
has_read_lock
)
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
rw_trylock_error
=
pthread_rwlock_trywrlock
(
&
this
->
rw_lock
);
if
(
rw_trylock_error
)
{
if
(
has_read_lock
)
{
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
return
rw_trylock_error
;
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
}
++
my_counts
.
write_count
;
return
0
;
}
void
CyObject
::
CyObject_INCREF
()
{
atomic_fetch_add
(
&
(
this
->
ob_refcnt
),
1
);
...
...
@@ -73,3 +336,28 @@ int CyObject::CyObject_DECREF()
}
return
0
;
}
void
CyObject
::
CyObject_RLOCK
()
{
this
->
ob_lock
.
rlock
();
}
void
CyObject
::
CyObject_WLOCK
()
{
this
->
ob_lock
.
wlock
();
}
void
CyObject
::
CyObject_UNLOCK
()
{
this
->
ob_lock
.
unlock
();
}
int
CyObject
::
CyObject_TRYRLOCK
()
{
return
this
->
ob_lock
.
tryrlock
();
}
int
CyObject
::
CyObject_TRYWLOCK
()
{
return
this
->
ob_lock
.
trywlock
();
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment