Commit a1055ab3 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-29043 mariabackup --compress hangs

Even though commit b817afaa passed
the test mariabackup.compress_qpress, that test turned out to be
too small to reveal one more problem that had previously been prevented
by the existence of ctrl_mutex. I did not realize that there can be
multiple concurrent callers to compress_write(). One of them is the
log copying thread; further callers are data file copying threads
(default: --parallel=1).

By default, there is only one compression worker thread
(--compress-threads=1).

compress_write(): Fix a race condition between threads that would
use the same worker thread object. Make thd->data_avail contain the
thread identifier of the submitter, and add thd->avail_cond to
notify other compress_write() threads that are waiting for a slot.
parent 32167225
...@@ -34,9 +34,10 @@ typedef struct { ...@@ -34,9 +34,10 @@ typedef struct {
pthread_t id; pthread_t id;
uint num; uint num;
pthread_mutex_t data_mutex; pthread_mutex_t data_mutex;
pthread_cond_t avail_cond;
pthread_cond_t data_cond; pthread_cond_t data_cond;
pthread_cond_t done_cond; pthread_cond_t done_cond;
my_bool data_avail; pthread_t data_avail;
my_bool cancelled; my_bool cancelled;
const char *from; const char *from;
size_t from_len; size_t from_len;
...@@ -197,9 +198,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ...@@ -197,9 +198,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
threads = comp_ctxt->threads; threads = comp_ctxt->threads;
nthreads = comp_ctxt->nthreads; nthreads = comp_ctxt->nthreads;
const pthread_t self = pthread_self();
ptr = (const char *) buf; ptr = (const char *) buf;
while (len > 0) { while (len > 0) {
uint max_thread; bool wait = nthreads == 1;
retry:
bool submitted = false;
/* Send data to worker threads for compression */ /* Send data to worker threads for compression */
for (i = 0; i < nthreads; i++) { for (i = 0; i < nthreads; i++) {
...@@ -208,16 +213,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ...@@ -208,16 +213,33 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i; thd = threads + i;
pthread_mutex_lock(&thd->data_mutex); pthread_mutex_lock(&thd->data_mutex);
if (thd->data_avail == pthread_t(~0UL)) {
} else if (!wait) {
skip:
pthread_mutex_unlock(&thd->data_mutex);
continue;
} else {
for (;;) {
pthread_cond_wait(&thd->avail_cond,
&thd->data_mutex);
if (thd->data_avail
== pthread_t(~0UL)) {
break;
}
goto skip;
}
}
chunk_len = (len > COMPRESS_CHUNK_SIZE) ? chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len; COMPRESS_CHUNK_SIZE : len;
thd->from = ptr; thd->from = ptr;
thd->from_len = chunk_len; thd->from_len = chunk_len;
thd->data_avail = TRUE; thd->data_avail = self;
pthread_cond_signal(&thd->data_cond); pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex); pthread_mutex_unlock(&thd->data_mutex);
submitted = true;
len -= chunk_len; len -= chunk_len;
if (len == 0) { if (len == 0) {
break; break;
...@@ -225,13 +247,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ...@@ -225,13 +247,20 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
ptr += chunk_len; ptr += chunk_len;
} }
max_thread = (i < nthreads) ? i : nthreads - 1; if (!submitted) {
wait = true;
goto retry;
}
/* Reap and stream the compressed data */ for (i = 0; i < nthreads; i++) {
for (i = 0; i <= max_thread; i++) {
thd = threads + i; thd = threads + i;
pthread_mutex_lock(&thd->data_mutex); pthread_mutex_lock(&thd->data_mutex);
if (thd->data_avail != self) {
pthread_mutex_unlock(&thd->data_mutex);
continue;
}
while (!thd->to_len) { while (!thd->to_len) {
pthread_cond_wait(&thd->done_cond, pthread_cond_wait(&thd->done_cond,
&thd->data_mutex); &thd->data_mutex);
...@@ -249,6 +278,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len) ...@@ -249,6 +278,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
} }
thd->to_len = 0; thd->to_len = 0;
thd->data_avail = pthread_t(~0UL);
pthread_cond_signal(&thd->avail_cond);
pthread_mutex_unlock(&thd->data_mutex); pthread_mutex_unlock(&thd->data_mutex);
if (fail) { if (fail) {
...@@ -336,6 +367,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd) ...@@ -336,6 +367,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)
pthread_join(thd->id, NULL); pthread_join(thd->id, NULL);
pthread_cond_destroy(&thd->avail_cond);
pthread_cond_destroy(&thd->data_cond); pthread_cond_destroy(&thd->data_cond);
pthread_cond_destroy(&thd->done_cond); pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex); pthread_mutex_destroy(&thd->data_mutex);
...@@ -363,11 +395,14 @@ create_worker_threads(uint n) ...@@ -363,11 +395,14 @@ create_worker_threads(uint n)
/* Initialize and data mutex and condition var */ /* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) || if (pthread_mutex_init(&thd->data_mutex, NULL) ||
pthread_cond_init(&thd->avail_cond, NULL) ||
pthread_cond_init(&thd->data_cond, NULL) || pthread_cond_init(&thd->data_cond, NULL) ||
pthread_cond_init(&thd->done_cond, NULL)) { pthread_cond_init(&thd->done_cond, NULL)) {
goto err; goto err;
} }
thd->data_avail = pthread_t(~0UL);
if (pthread_create(&thd->id, NULL, compress_worker_thread_func, if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) { thd)) {
msg("compress: pthread_create() failed: " msg("compress: pthread_create() failed: "
...@@ -409,13 +444,13 @@ compress_worker_thread_func(void *arg) ...@@ -409,13 +444,13 @@ compress_worker_thread_func(void *arg)
pthread_mutex_lock(&thd->data_mutex); pthread_mutex_lock(&thd->data_mutex);
while (1) { while (1) {
while (!thd->data_avail && !thd->cancelled) { while (!thd->cancelled
&& (thd->to_len || thd->data_avail == pthread_t(~0UL))) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex); pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
} }
if (thd->cancelled) if (thd->cancelled)
break; break;
thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len, thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state); &thd->state);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment