Commit 1158fd9f authored by unknown's avatar unknown

"myisamchk -p" for parallel recover works (no extensive testing though)


include/my_sys.h:
  make [un]lock_io_cache functions, not macro
  some io_cache_share functions int->void
include/myisam.h:
  mi_repair_by_sort_r -> mi_repair_parallel
  MI_SORT_PARAM.master field for updating info->s.state struct
myisam/mi_check.c:
  mi_repair_by_sort_r -> mi_repair_parallel
  MI_SORT_PARAM.master field for updating info->s.state struct
myisam/sort.c:
  my_thread_init()/my_thread_end()
  misc bugfixes
mysys/mf_iocache.c:
  io_cache_share functions int->void
  comments added
  [un]lock_io_cache functions added
parent ea01d657
...@@ -322,29 +322,6 @@ typedef struct st_io_cache_share ...@@ -322,29 +322,6 @@ typedef struct st_io_cache_share
my_bool alloced; my_bool alloced;
#endif #endif
} IO_CACHE_SHARE; } IO_CACHE_SHARE;
#define lock_io_cache(info) \
( \
(errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \
(info)->share->count ? ( \
--((info)->share->count), \
pthread_cond_wait(&((info)->share->cond), \
&((info)->share->mutex)), \
(++((info)->share->count) ? \
pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \
: 1 ) \
)
#define unlock_io_cache(info) \
( \
pthread_cond_broadcast(&((info)->share->cond)), \
pthread_mutex_unlock (&((info)->share->mutex)) \
)
/* -- to catch errors
#else
#define lock_io_cache(info)
#define unlock_io_cache(info)
*/
#endif #endif
typedef struct st_io_cache /* Used when cacheing files */ typedef struct st_io_cache /* Used when cacheing files */
...@@ -686,9 +663,9 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, ...@@ -686,9 +663,9 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
#ifdef THREAD #ifdef THREAD
extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_read_r(IO_CACHE *info,byte *Buffer,uint Count);
extern int init_io_cache_share(IO_CACHE *info, extern void init_io_cache_share(IO_CACHE *info,
IO_CACHE_SHARE *s, uint num_threads); IO_CACHE_SHARE *s, uint num_threads);
extern int remove_io_thread(IO_CACHE *info); extern void remove_io_thread(IO_CACHE *info);
#endif #endif
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
......
...@@ -382,7 +382,7 @@ typedef struct st_mi_sort_param ...@@ -382,7 +382,7 @@ typedef struct st_mi_sort_param
IO_CACHE tempfile, tempfile_for_exceptions; IO_CACHE tempfile, tempfile_for_exceptions;
DYNAMIC_ARRAY buffpek; DYNAMIC_ARRAY buffpek;
my_off_t pos,max_pos,filepos,start_recpos; my_off_t pos,max_pos,filepos,start_recpos;
my_bool fix_datafile; my_bool fix_datafile, master;
char *record; char *record;
char *tmpdir; char *tmpdir;
int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *); int (*key_cmp)(struct st_mi_sort_param *, const void *, const void *);
...@@ -403,6 +403,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, ...@@ -403,6 +403,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
int mi_sort_index(MI_CHECK *param, register MI_INFO *info, my_string name); int mi_sort_index(MI_CHECK *param, register MI_INFO *info, my_string name);
int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick); const char * name, int rep_quick);
int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick);
int change_to_newfile(const char * filename, const char * old_ext, int change_to_newfile(const char * filename, const char * old_ext,
const char * new_ext, uint raid_chunks, const char * new_ext, uint raid_chunks,
myf myflags); myf myflags);
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Descript, check and repair of MyISAM tables */ /* Describe, check and repair of MyISAM tables */
#include "ftdefs.h" #include "ftdefs.h"
#include <m_ctype.h> #include <m_ctype.h>
...@@ -1186,7 +1186,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info, ...@@ -1186,7 +1186,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
param->read_cache.end_of_file=sort_info.filelength= param->read_cache.end_of_file=sort_info.filelength=
my_seek(info->dfile,0L,MY_SEEK_END,MYF(0)); my_seek(info->dfile,0L,MY_SEEK_END,MYF(0));
sort_info.dupp=0; sort_info.dupp=0;
sort_param.fix_datafile= (my_bool) (! rep_quick); sort_param.fix_datafile= (my_bool) (!rep_quick);
sort_param.master=1;
sort_info.max_records= ~(ha_rows) 0; sort_info.max_records= ~(ha_rows) 0;
set_data_file_type(&sort_info, share); set_data_file_type(&sort_info, share);
...@@ -1873,6 +1874,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info, ...@@ -1873,6 +1874,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
sort_param.tmpdir=param->tmpdir; sort_param.tmpdir=param->tmpdir;
sort_param.sort_info=&sort_info; sort_param.sort_info=&sort_info;
sort_param.fix_datafile= (my_bool) (! rep_quick); sort_param.fix_datafile= (my_bool) (! rep_quick);
sort_param.master =1;
del=info->state->del; del=info->state->del;
param->glob_crc=0; param->glob_crc=0;
...@@ -2082,7 +2084,7 @@ err: ...@@ -2082,7 +2084,7 @@ err:
/* same as mi_repair_by_sort */ /* same as mi_repair_by_sort */
/* but do it multithreaded */ /* but do it multithreaded */
int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
const char * name, int rep_quick) const char * name, int rep_quick)
{ {
int got_error; int got_error;
...@@ -2091,7 +2093,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, ...@@ -2091,7 +2093,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
ha_rows start_records; ha_rows start_records;
my_off_t new_header_length,del; my_off_t new_header_length,del;
File new_file; File new_file;
MI_SORT_PARAM *sort_param=0, *sinfo; MI_SORT_PARAM *sort_param=0;
MYISAM_SHARE *share=info->s; MYISAM_SHARE *share=info->s;
ulong *rec_per_key_part; ulong *rec_per_key_part;
MI_KEYSEG *keyseg; MI_KEYSEG *keyseg;
...@@ -2099,7 +2101,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, ...@@ -2099,7 +2101,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
IO_CACHE_SHARE io_share; IO_CACHE_SHARE io_share;
SORT_INFO sort_info; SORT_INFO sort_info;
ulonglong key_map=share->state.key_map; ulonglong key_map=share->state.key_map;
DBUG_ENTER("mi_repair_by_sort_r"); DBUG_ENTER("mi_repair_parallel");
start_records=info->state->records; start_records=info->state->records;
got_error=1; got_error=1;
...@@ -2244,6 +2246,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, ...@@ -2244,6 +2246,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param[i].lock_in_memory=lock_memory; sort_param[i].lock_in_memory=lock_memory;
sort_param[i].tmpdir=param->tmpdir; sort_param[i].tmpdir=param->tmpdir;
sort_param[i].sort_info=&sort_info; sort_param[i].sort_info=&sort_info;
sort_param[i].master=0;
sort_param[i].fix_datafile=0; sort_param[i].fix_datafile=0;
sort_param[i].filepos=new_header_length; sort_param[i].filepos=new_header_length;
...@@ -2271,7 +2274,8 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, ...@@ -2271,7 +2274,8 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param[i].key_length+=ft_max_word_len_for_sort-ft_max_word_len; sort_param[i].key_length+=ft_max_word_len_for_sort-ft_max_word_len;
} }
sort_info.total_keys=i; sort_info.total_keys=i;
sort_param[0].fix_datafile= ! rep_quick; sort_param[0].master= 1;
sort_param[0].fix_datafile= (my_bool)(! rep_quick);
sort_info.got_error=0; sort_info.got_error=0;
pthread_mutex_init(& sort_info.mutex, MY_MUTEX_INIT_FAST); pthread_mutex_init(& sort_info.mutex, MY_MUTEX_INIT_FAST);
...@@ -2289,7 +2293,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info, ...@@ -2289,7 +2293,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
In the second one all the threads will fill their sort_buffers In the second one all the threads will fill their sort_buffers
(and call write_keys) at the same time, putting more stress on i/o. (and call write_keys) at the same time, putting more stress on i/o.
*/ */
#if 1 #ifndef USING_FIRST_APPROACH
param->sort_buffer_length/sort_info.total_keys; param->sort_buffer_length/sort_info.total_keys;
#else #else
param->sort_buffer_length*sort_param[i].key_length/length; param->sort_buffer_length*sort_param[i].key_length/length;
...@@ -2454,7 +2458,8 @@ static int sort_key_read(MI_SORT_PARAM *sort_param, void *key) ...@@ -2454,7 +2458,8 @@ static int sort_key_read(MI_SORT_PARAM *sort_param, void *key)
if (info->state->records == sort_info->max_records) if (info->state->records == sort_info->max_records)
{ {
mi_check_print_error(sort_info->param, mi_check_print_error(sort_info->param,
"Found too many records; Can`t continue"); "Key %d - Found too many records; Can't continue",
sort_param->key+1);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
sort_param->real_key_length=(info->s->rec_reflength+ sort_param->real_key_length=(info->s->rec_reflength+
...@@ -2543,7 +2548,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) ...@@ -2543,7 +2548,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if (!sort_param->fix_datafile) if (!sort_param->fix_datafile)
{ {
sort_param->filepos=sort_param->pos; sort_param->filepos=sort_param->pos;
share->state.split++; if (sort_param->master)
share->state.split++;
} }
sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength); sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength);
if (*sort_param->record) if (*sort_param->record)
...@@ -2553,7 +2559,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) ...@@ -2553,7 +2559,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
mi_static_checksum(info,sort_param->record)); mi_static_checksum(info,sort_param->record));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
if (!sort_param->fix_datafile) if (!sort_param->fix_datafile && sort_param->master)
{ {
info->state->del++; info->state->del++;
info->state->empty+=share->base.pack_reclength; info->state->empty+=share->base.pack_reclength;
...@@ -2699,7 +2705,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) ...@@ -2699,7 +2705,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
} }
if (b_type & (BLOCK_DELETED | BLOCK_SYNC_ERROR)) if (b_type & (BLOCK_DELETED | BLOCK_SYNC_ERROR))
{ {
if (!sort_param->fix_datafile && (b_type & BLOCK_DELETED)) if (!sort_param->fix_datafile && sort_param->master &&
(b_type & BLOCK_DELETED))
{ {
info->state->empty+=block_info.block_len; info->state->empty+=block_info.block_len;
info->state->del++; info->state->del++;
...@@ -2718,7 +2725,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) ...@@ -2718,7 +2725,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
continue; continue;
} }
if (!sort_param->fix_datafile) if (!sort_param->fix_datafile && sort_param->master)
share->state.split++; share->state.split++;
if (! found_record++) if (! found_record++)
{ {
...@@ -2860,7 +2867,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param) ...@@ -2860,7 +2867,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if (!sort_param->fix_datafile) if (!sort_param->fix_datafile)
{ {
sort_param->filepos=sort_param->pos; sort_param->filepos=sort_param->pos;
share->state.split++; if (sort_param->master)
share->state.split++;
} }
sort_param->max_pos=(sort_param->pos=block_info.filepos+ sort_param->max_pos=(sort_param->pos=block_info.filepos+
block_info.rec_len); block_info.rec_len);
...@@ -2968,12 +2976,15 @@ int sort_write_record(MI_SORT_PARAM *sort_param) ...@@ -2968,12 +2976,15 @@ int sort_write_record(MI_SORT_PARAM *sort_param)
break; break;
} }
} }
info->state->records++; if (sort_param->master)
if ((param->testflag & T_WRITE_LOOP) &&
(info->state->records % WRITE_COUNT) == 0)
{ {
char llbuff[22]; info->state->records++;
printf("%s\r", llstr(info->state->records,llbuff)); VOID(fflush(stdout)); if ((param->testflag & T_WRITE_LOOP) &&
(info->state->records % WRITE_COUNT) == 0)
{
char llbuff[22];
printf("%s\r", llstr(info->state->records,llbuff)); VOID(fflush(stdout));
}
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} /* sort_write_record */ } /* sort_write_record */
......
...@@ -216,9 +216,15 @@ static struct my_option my_long_options[] = ...@@ -216,9 +216,15 @@ static struct my_option my_long_options[] =
{"recover", 'r', {"recover", 'r',
"Can fix almost anything except unique keys that aren't unique.", "Can fix almost anything except unique keys that aren't unique.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"paraller-recover", 'p',
"Same as '-r' but creates all the keys in parallel",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"safe-recover", 'o', {"safe-recover", 'o',
"Uses old recovery method; Slower than '-r' but can handle a couple of cases where '-r' reports that it can't fix the data file.", "Uses old recovery method; Slower than '-r' but can handle a couple of cases where '-r' reports that it can't fix the data file.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"sort-recover", 'n',
"Force recovering with sorting even if the temporary file was very big.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"start-check-pos", OPT_START_CHECK_POS, {"start-check-pos", OPT_START_CHECK_POS,
"No help available.", "No help available.",
0, 0, 0, GET_ULL, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_ULL, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
...@@ -244,9 +250,6 @@ static struct my_option my_long_options[] = ...@@ -244,9 +250,6 @@ static struct my_option my_long_options[] =
(gptr*) &check_param.opt_sort_key, (gptr*) &check_param.opt_sort_key,
(gptr*) &check_param.opt_sort_key, (gptr*) &check_param.opt_sort_key,
0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"sort-recover", 'n',
"Force recovering with sorting even if the temporary file was very big.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"tmpdir", 't', {"tmpdir", 't',
"Path for temporary files.", "Path for temporary files.",
(gptr*) &check_param.tmpdir, (gptr*) &check_param.tmpdir,
...@@ -514,6 +517,11 @@ get_one_option(int optid, ...@@ -514,6 +517,11 @@ get_one_option(int optid,
if (argument != disabled_my_option) if (argument != disabled_my_option)
check_param.testflag|= T_REP_BY_SORT; check_param.testflag|= T_REP_BY_SORT;
break; break;
case 'p':
check_param.testflag&= ~T_REP_ANY;
if (argument != disabled_my_option)
check_param.testflag|= T_REP_PARALLEL;
break;
case 'o': case 'o':
check_param.testflag&= ~T_REP_ANY; check_param.testflag&= ~T_REP_ANY;
check_param.force_sort= 0; check_param.force_sort= 0;
...@@ -864,8 +872,7 @@ static int myisamchk(MI_CHECK *param, my_string filename) ...@@ -864,8 +872,7 @@ static int myisamchk(MI_CHECK *param, my_string filename)
if (tmp != share->state.key_map) if (tmp != share->state.key_map)
info->update|=HA_STATE_CHANGED; info->update|=HA_STATE_CHANGED;
} }
if (rep_quick && chk_del(param, info, if (rep_quick && chk_del(param, info, param->testflag & ~T_VERBOSE))
param->testflag & ~T_VERBOSE))
{ {
if (param->testflag & T_FORCE_CREATE) if (param->testflag & T_FORCE_CREATE)
{ {
...@@ -881,14 +888,17 @@ static int myisamchk(MI_CHECK *param, my_string filename) ...@@ -881,14 +888,17 @@ static int myisamchk(MI_CHECK *param, my_string filename)
} }
if (!error) if (!error)
{ {
if ((param->testflag & T_REP_BY_SORT) && if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(share->state.key_map || (share->state.key_map ||
(rep_quick && !param->keys_in_use && !recreate)) && (rep_quick && !param->keys_in_use && !recreate)) &&
mi_test_if_sort_rep(info, info->state->records, mi_test_if_sort_rep(info, info->state->records,
info->s->state.key_map, info->s->state.key_map,
param->force_sort)) param->force_sort))
{ {
error=mi_repair_by_sort(param,info,filename,rep_quick); if (param->testflag & T_REP_BY_SORT)
error=mi_repair_by_sort(param,info,filename,rep_quick);
else
error=mi_repair_parallel(param,info,filename,rep_quick);
state_updated=1; state_updated=1;
} }
else if (param->testflag & T_REP_ANY) else if (param->testflag & T_REP_ANY)
......
...@@ -268,17 +268,23 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info) ...@@ -268,17 +268,23 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
uchar **sort_keys; uchar **sort_keys;
MI_KEYSEG *keyseg; MI_KEYSEG *keyseg;
error=1;
if (my_thread_init())
goto err;
if (info->sort_info->got_error)
goto err;
my_b_clear(&info->tempfile); my_b_clear(&info->tempfile);
my_b_clear(&info->tempfile_for_exceptions); my_b_clear(&info->tempfile_for_exceptions);
bzero((char*) &info->buffpek,sizeof(info->buffpek)); bzero((char*) &info->buffpek,sizeof(info->buffpek));
bzero((char*) &info->unique, sizeof(info->unique)); bzero((char*) &info->unique, sizeof(info->unique));
sort_keys= (uchar **) NULL; error= 1; sort_keys= (uchar **) NULL;
if (info->sort_info->got_error)
goto err;
memavl=max(info->sortbuff_size, MIN_SORT_MEMORY); memavl=max(info->sortbuff_size, MIN_SORT_MEMORY);
idx= info->sort_info->max_records; idx= info->sort_info->max_records;
sort_length= info->key_length; sort_length= info->key_length;
maxbuffer= 1;
while (memavl >= MIN_SORT_MEMORY) while (memavl >= MIN_SORT_MEMORY)
{ {
...@@ -327,7 +333,7 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info) ...@@ -327,7 +333,7 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
idx=error=0; idx=error=0;
sort_keys[0]=(uchar*) (sort_keys+keys); sort_keys[0]=(uchar*) (sort_keys+keys);
while(!(error=info->sort_info->got_error) || while(!(error=info->sort_info->got_error) &&
!(error=(*info->key_read)(info,sort_keys[idx]))) !(error=(*info->key_read)(info,sort_keys[idx])))
{ {
if (info->real_key_length > info->key_length) if (info->real_key_length > info->key_length)
...@@ -342,7 +348,6 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info) ...@@ -342,7 +348,6 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
if (write_keys(info,sort_keys,idx-1, if (write_keys(info,sort_keys,idx-1,
(BUFFPEK *)alloc_dynamic(&info->buffpek), &info->tempfile)) (BUFFPEK *)alloc_dynamic(&info->buffpek), &info->tempfile))
goto err; goto err;
sort_keys[0]=(uchar*) (sort_keys+keys); sort_keys[0]=(uchar*) (sort_keys+keys);
memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length); memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
idx=1; idx=1;
...@@ -378,6 +383,7 @@ ok: ...@@ -378,6 +383,7 @@ ok:
info->sort_info->threads_running--; info->sort_info->threads_running--;
pthread_cond_signal(& info->sort_info->cond); pthread_cond_signal(& info->sort_info->cond);
pthread_mutex_unlock(& info->sort_info->mutex); pthread_mutex_unlock(& info->sort_info->mutex);
my_thread_end();
return NULL; return NULL;
} /* _thr_find_all_keys */ } /* _thr_find_all_keys */
...@@ -389,12 +395,13 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param) ...@@ -389,12 +395,13 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param)
ulong *rec_per_key_part=param->rec_per_key_part; ulong *rec_per_key_part=param->rec_per_key_part;
int i, got_error=sort_info->got_error; int i, got_error=sort_info->got_error;
MI_INFO *info=sort_info->info; MI_INFO *info=sort_info->info;
MYISAM_SHARE *share=info->s; MYISAM_SHARE *share=info->s;
MI_SORT_PARAM *sinfo; MI_SORT_PARAM *sinfo;
byte *mergebuf=0; byte *mergebuf=0;
for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++, sinfo++, for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++,
rec_per_key_part+=sinfo->keyinfo->keysegs) rec_per_key_part+=sinfo->keyinfo->keysegs,
sinfo++)
{ {
if (!sinfo->sort_keys) if (!sinfo->sort_keys)
{ {
...@@ -417,10 +424,11 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param) ...@@ -417,10 +424,11 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param)
sinfo->sort_keys=0; sinfo->sort_keys=0;
} }
for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++, sinfo++, for (i=0, sinfo=sort_param ; i<sort_info->total_keys ; i++,
delete_dynamic(& sinfo->buffpek), delete_dynamic(& sinfo->buffpek),
close_cached_file(& sinfo->tempfile), close_cached_file(& sinfo->tempfile),
close_cached_file(& sinfo->tempfile_for_exceptions)) close_cached_file(& sinfo->tempfile_for_exceptions),
sinfo++)
{ {
if (got_error) continue; if (got_error) continue;
if (sinfo->buffpek.elements) if (sinfo->buffpek.elements)
...@@ -520,8 +528,10 @@ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys, ...@@ -520,8 +528,10 @@ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
buffpek->count=count; buffpek->count=count;
for (end=sort_keys+count ; sort_keys != end ; sort_keys++) for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
{
if (my_b_write(tempfile,(byte*) *sort_keys,(uint) sort_length)) if (my_b_write(tempfile,(byte*) *sort_keys,(uint) sort_length))
DBUG_RETURN(1); /* purecov: inspected */ DBUG_RETURN(1); /* purecov: inspected */
}
DBUG_RETURN(0); DBUG_RETURN(0);
} /* write_keys */ } /* write_keys */
...@@ -654,7 +664,7 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, ...@@ -654,7 +664,7 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0, if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
(int (*)(void*, byte *,byte*)) info->key_cmp, (int (*)(void*, byte *,byte*)) info->key_cmp,
(void*) info->sort_info)) (void*) info))
DBUG_RETURN(1); /* purecov: inspected */ DBUG_RETURN(1); /* purecov: inspected */
for (buffpek= Fb ; buffpek <= Tb ; buffpek++) for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
......
...@@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result); ...@@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result);
#define unlock_append_buffer(info) #define unlock_append_buffer(info)
#endif #endif
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
static void static void
init_functions(IO_CACHE* info, enum cache_type type) init_functions(IO_CACHE* info, enum cache_type type)
{ {
...@@ -425,31 +428,71 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -425,31 +428,71 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
} }
#ifdef THREAD #ifdef THREAD
int init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads) /* Prepare IO_CACHE for shared use */
void init_io_cache_share(IO_CACHE *info, IO_CACHE_SHARE *s, uint num_threads)
{ {
DBUG_ASSERT(info->type == READ_CACHE); DBUG_ASSERT(info->type == READ_CACHE);
pthread_mutex_init(& s->mutex, MY_MUTEX_INIT_FAST); pthread_mutex_init(&s->mutex, MY_MUTEX_INIT_FAST);
pthread_cond_init (& s->cond, 0); pthread_cond_init (&s->cond, 0);
s->count=num_threads; s->count=num_threads-1;
s->active=0; /* to catch errors */ s->active=0; /* to catch errors */
info->share=s; info->share=s;
info->read_function=_my_b_read_r; info->read_function=_my_b_read_r;
} }
int remove_io_thread(IO_CACHE *info) /*
Remove a thread from shared access to IO_CACHE
Every thread should do that on exit for not
to deadlock other threads
*/
void remove_io_thread(IO_CACHE *info)
{ {
if (errno=pthread_mutex_lock(& info->share->mutex)) pthread_mutex_lock(&info->share->mutex);
return -1;
if (! info->share->count--) if (! info->share->count--)
pthread_cond_signal(& info->share->cond); pthread_cond_signal(&info->share->cond);
pthread_mutex_unlock(& info->share->mutex); pthread_mutex_unlock(&info->share->mutex);
return 0; return 0;
} }
static int lock_io_cache(IO_CACHE *info)
{
pthread_mutex_lock(&info->share->mutex);
if (!info->share->count)
return 1;
--(info->share->count);
pthread_cond_wait(&info->share->cond, &info->share->mutex);
/*
count can be -1 here, if one thread was removed (remove_io_cache)
while all others were locked (lock_io_cache).
If this is the case, this thread behaves as if count was 0 from the
very beginning, that is returns 1 and does not unlock the mutex.
*/
if (++(info->share->count))
return pthread_mutex_unlock(&info->share->mutex);
else
return 1;
}
static void unlock_io_cache(IO_CACHE *info)
{
pthread_cond_broadcast(&info->share->cond);
pthread_mutex_unlock(&info->share->mutex);
}
/*
Read from IO_CACHE when it is shared between several threads.
It works as follows: when a thread tries to read from a file
(that is, after using all the data from the (shared) buffer),
it just hangs on lock_io_cache(), wating for other threads.
When the very last thread attempts a read, lock_io_cache()
returns 1, the thread does actual IO and unlock_io_cache(),
which signals all the waiting threads that data is in the buffer.
*/
int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
{ {
my_off_t pos_in_file; my_off_t pos_in_file;
int length,diff_length,read_len; uint length,diff_length,read_len;
DBUG_ENTER("_my_b_read_r"); DBUG_ENTER("_my_b_read_r");
if ((read_len=(uint) (info->read_end-info->read_pos))) if ((read_len=(uint) (info->read_end-info->read_pos)))
...@@ -460,28 +503,41 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -460,28 +503,41 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
Count-=read_len; Count-=read_len;
} }
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1)) while (Count)
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1)) {
while (Count) {
int cnt, len; int cnt, len;
pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer); pos_in_file= info->pos_in_file + (uint)(info->read_end - info->buffer);
diff_length= pos_in_file & (IO_SIZE-1); diff_length= (uint)pos_in_file & (IO_SIZE-1);
length=IO_ROUND_UP(Count+diff_length)-diff_length; length=IO_ROUND_UP(Count+diff_length)-diff_length;
length=(length <= info->read_length) ? length=(length <= info->read_length) ?
length + IO_ROUND_DN(info->read_length - length) : length + IO_ROUND_DN(info->read_length - length) :
length - IO_ROUND_UP(length - info->read_length) ; length - IO_ROUND_UP(length - info->read_length) ;
if (info->type != READ_FIFO && (length > info->end_of_file - pos_in_file))
length=info->end_of_file - pos_in_file;
if (length == 0)
{
info->error=(int) read_len;
DBUG_RETURN(1);
}
if (lock_io_cache(info)) if (lock_io_cache(info))
{ {
#if 0 && SAFE_MUTEX
#define PRINT_LOCK(M) printf("Thread %d: mutex is %s\n", my_thread_id(), \
(((safe_mutex_t *)(M))->count ? "Locked" : "Unlocked"))
#else
#define PRINT_LOCK(M)
#endif
PRINT_LOCK(& info->share->mutex);
info->share->active=info; info->share->active=info;
if (info->seek_not_done) /* File touched, do seek */ if (info->seek_not_done) /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0))); VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
len=my_read(info->file,info->buffer, length, info->myflags); len=(int)my_read(info->file,info->buffer, length, info->myflags);
info->read_end=info->buffer + (len == -1 ? 0 : len); info->read_end=info->buffer + (len == -1 ? 0 : len);
info->error=(len == length ? 0 : len); info->error=(len == (int)length ? 0 : len);
info->pos_in_file=pos_in_file; info->pos_in_file=pos_in_file;
unlock_io_cache(info); unlock_io_cache(info);
PRINT_LOCK(& info->share->mutex);
} }
else else
{ {
...@@ -489,15 +545,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -489,15 +545,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
info->read_end= info->share->active->read_end; info->read_end= info->share->active->read_end;
info->pos_in_file= info->share->active->pos_in_file; info->pos_in_file= info->share->active->pos_in_file;
len= (info->error == -1 ? -1 : info->read_end-info->buffer); len= (info->error == -1 ? -1 : info->read_end-info->buffer);
PRINT_LOCK(& info->share->mutex);
} }
info->read_pos=info->buffer; info->read_pos=info->buffer;
info->seek_not_done=0; info->seek_not_done=0;
if (info->error) if (len <= 0)
{ {
info->error=read_len; info->error=(int)read_len;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
cnt=(len > Count) ? Count : len; cnt=(len > Count) ? (int)Count : len;
memcpy(Buffer,info->read_pos, (size_t)cnt); memcpy(Buffer,info->read_pos, (size_t)cnt);
Count -=cnt; Count -=cnt;
Buffer+=cnt; Buffer+=cnt;
...@@ -1070,15 +1127,21 @@ int end_io_cache(IO_CACHE *info) ...@@ -1070,15 +1127,21 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
#ifdef THREAD #ifdef THREAD
/* simple protection against multi-close: destroying share first */
if (info->share) if (info->share)
if (pthread_cond_destroy (& info->share->cond) | {
pthread_mutex_destroy(& info->share->mutex)) #ifdef SAFE_MUTEX
/* simple protection against multi-close: destroying share first */
if (pthread_cond_destroy (&info->share->cond) |
pthread_mutex_destroy(&info->share->mutex))
{ {
DBUG_RETURN(1); DBUG_RETURN(1);
} }
else #else
info->share=0; pthread_cond_destroy (&info->share->cond);
pthread_mutex_destroy(&info->share->mutex);
#endif
info->share=0;
}
#endif #endif
if ((pre_close=info->pre_close)) if ((pre_close=info->pre_close))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment