Commit f29224c6 authored by serg@serg.mysql.com's avatar serg@serg.mysql.com

fixed bug in parallel repair

parent f83aed74
...@@ -324,7 +324,7 @@ typedef struct st_io_cache_share ...@@ -324,7 +324,7 @@ typedef struct st_io_cache_share
/* to sync on reads into buffer */ /* to sync on reads into buffer */
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cond; pthread_cond_t cond;
int count; int count, total;
/* actual IO_CACHE that filled the buffer */ /* actual IO_CACHE that filled the buffer */
struct st_io_cache *active; struct st_io_cache *active;
#ifdef NOT_YET_IMPLEMENTED #ifdef NOT_YET_IMPLEMENTED
......
...@@ -2129,7 +2129,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, ...@@ -2129,7 +2129,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick) const char * name, int rep_quick)
{ {
int got_error; int got_error;
uint i,key, total_key_length; uint i,key, total_key_length, istep;
ulong rec_length; ulong rec_length;
ha_rows start_records; ha_rows start_records;
my_off_t new_header_length,del; my_off_t new_header_length,del;
...@@ -2264,8 +2264,8 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, ...@@ -2264,8 +2264,8 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
info->state->records=info->state->del=share->state.split=0; info->state->records=info->state->del=share->state.split=0;
info->state->empty=0; info->state->empty=0;
for (i=key=0 ; key < share->base.keys ; for (i=key=0, istep=1 ; key < share->base.keys ;
rec_per_key_part+=sort_param[i].keyinfo->keysegs, i++, key++) rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
{ {
sort_param[i].key=key; sort_param[i].key=key;
sort_param[i].keyinfo=share->keyinfo+key; sort_param[i].keyinfo=share->keyinfo+key;
...@@ -2276,9 +2276,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info, ...@@ -2276,9 +2276,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
(char*) (share->state.rec_per_key_part+ (char*) (share->state.rec_per_key_part+
(uint) (rec_per_key_part - param->rec_per_key_part)), (uint) (rec_per_key_part - param->rec_per_key_part)),
sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part)); sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
i--; istep=0;
continue; continue;
} }
istep=1;
if ((!(param->testflag & T_SILENT))) if ((!(param->testflag & T_SILENT)))
printf ("- Fixing index %d\n",key+1); printf ("- Fixing index %d\n",key+1);
sort_param[i].key_read= ((sort_param[i].keyinfo->flag & HA_FULLTEXT) ? sort_param[i].key_read= ((sort_param[i].keyinfo->flag & HA_FULLTEXT) ?
......
...@@ -441,7 +441,7 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) ...@@ -441,7 +441,7 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
DBUG_ASSERT(info->type == READ_CACHE); DBUG_ASSERT(info->type == READ_CACHE);
pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST); pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init (&s->cond, 0); pthread_cond_init (&s->cond, 0);
s->count=num_threads-1; s->total=s->count=num_threads-1;
s->active=0; /* to catch errors */ s->active=0; /* to catch errors */
info->share=s; info->share=s;
info->read_function=_my_b_read_r; info->read_function=_my_b_read_r;
...@@ -456,32 +456,36 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) ...@@ -456,32 +456,36 @@ void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
*/ */
void remove_io_thread(IO_CACHE *info) void remove_io_thread(IO_CACHE *info)
{ {
pthread_mutex_lock(&info->share->mutex); IO_CACHE_SHARE *s=info->share;
if (! info->share->count--)
pthread_cond_signal(&info->share->cond); pthread_mutex_lock(&s->mutex);
pthread_mutex_unlock(&info->share->mutex); s->total--;
if (! s->count--)
pthread_cond_signal(&s->cond);
pthread_mutex_unlock(&s->mutex);
} }
static int lock_io_cache(IO_CACHE *info) static int lock_io_cache(IO_CACHE *info)
{ {
pthread_mutex_lock(&info->share->mutex); int total;
if (!info->share->count) IO_CACHE_SHARE *s=info->share;
return 1;
--(info->share->count); pthread_mutex_lock(&s->mutex);
pthread_cond_wait(&info->share->cond, &info->share->mutex); if (!s->count)
/*
count can be -1 here, if one thread was removed (remove_io_thread)
while all others were locked (lock_io_cache).
If this is the case, this thread behaves as if count was 0 from the
very beginning, that is returns 1 and does not unlock the mutex.
*/
if (++(info->share->count))
{ {
pthread_mutex_unlock(&info->share->mutex); s->count=s->total;
return 0; return 1;
} }
total=s->total;
s->count--;
pthread_cond_wait(&s->cond, &s->mutex);
if (s->total < total)
return 1; return 1;
pthread_mutex_unlock(&s->mutex);
return 0;
} }
static void unlock_io_cache(IO_CACHE *info) static void unlock_io_cache(IO_CACHE *info)
...@@ -1132,19 +1136,15 @@ int end_io_cache(IO_CACHE *info) ...@@ -1132,19 +1136,15 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
#ifdef THREAD #ifdef THREAD
/*
if IO_CACHE is shared between several threads, only one
thread needs to call end_io_cache() - just as init_io_cache()
should be called only once and then memcopy'ed
*/
if (info->share) if (info->share)
{ {
#ifdef SAFE_MUTEX
/* simple protection against multi-close: destroying share first */
if (pthread_cond_destroy (&info->share->cond) |
pthread_mutex_destroy(&info->share->mutex))
{
DBUG_RETURN(1);
}
#else
pthread_cond_destroy (&info->share->cond); pthread_cond_destroy (&info->share->cond);
pthread_mutex_destroy(&info->share->mutex); pthread_mutex_destroy(&info->share->mutex);
#endif
info->share=0; info->share=0;
} }
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment