Commit 7de7c38f authored by unknown's avatar unknown

Many files:

  Improved concurrency for key cache reassignment


include/my_sys.h:
  Improved concurrency for key cache reassignment
include/myisam.h:
  Improved concurrency for key cache reassignment
myisam/mi_keycache.c:
  Improved concurrency for key cache reassignment
myisam/mi_locking.c:
  Improved concurrency for key cache reassignment
mysys/mf_keycache.c:
  Improved concurrency for key cache reassignment
sql/ha_myisam.cc:
  Improved concurrency for key cache reassignment
sql/sql_table.cc:
  Improved concurrency for key cache reassignment
parent 5a85cb66
...@@ -262,7 +262,8 @@ enum cache_type ...@@ -262,7 +262,8 @@ enum cache_type
enum flush_type enum flush_type
{ {
FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED, FLUSH_FORCE_WRITE FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED, FLUSH_FORCE_WRITE,
FLUSH_REMOVE
}; };
typedef struct st_record_cache /* Used when cacheing records */ typedef struct st_record_cache /* Used when cacheing records */
...@@ -527,6 +528,9 @@ typedef struct st_key_cache_var ...@@ -527,6 +528,9 @@ typedef struct st_key_cache_var
ulong cache_read; /* number of reads from files to the cache */ ulong cache_read; /* number of reads from files to the cache */
int blocks; /* max number of blocks in the cache */ int blocks; /* max number of blocks in the cache */
struct st_key_cache_asmt *assign_list; /* list of assignments to the cache */ struct st_key_cache_asmt *assign_list; /* list of assignments to the cache */
int assignments; /* number of not completed assignments */
void (*action)(void *); /* optional call back function */
void *extra_info; /* ptr to extra info */
} KEY_CACHE_VAR; } KEY_CACHE_VAR;
#define DEFAULT_KEY_CACHE_NAME "default" #define DEFAULT_KEY_CACHE_NAME "default"
......
...@@ -410,8 +410,9 @@ my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, ulonglong key_map, ...@@ -410,8 +410,9 @@ my_bool mi_test_if_sort_rep(MI_INFO *info, ha_rows rows, ulonglong key_map,
int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows); int mi_init_bulk_insert(MI_INFO *info, ulong cache_size, ha_rows rows);
void mi_flush_bulk_insert(MI_INFO *info, uint inx); void mi_flush_bulk_insert(MI_INFO *info, uint inx);
void mi_end_bulk_insert(MI_INFO *info); void mi_end_bulk_insert(MI_INFO *info);
int mi_assign_to_keycache(MI_INFO *info, ulonglong key_map, int mi_assign_to_keycache(MI_INFO *info, ulonglong key_map,
KEY_CACHE_HANDLE *reg_keycache); KEY_CACHE_VAR *key_cache,
pthread_mutex_t *assign_lock);
int mi_preload(MI_INFO *info, ulonglong key_map, my_bool ignore_leaves); int mi_preload(MI_INFO *info, ulonglong key_map, my_bool ignore_leaves);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -39,24 +39,68 @@ ...@@ -39,24 +39,68 @@
of the table will be assigned to the specified key cache. of the table will be assigned to the specified key cache.
*/ */
typedef struct st_assign_extra_info
{
pthread_mutex_t *lock;
struct st_my_thread_var *waiting_thread;
} ASSIGN_EXTRA_INFO;
static void remove_key_cache_assign(void *arg)
{
KEY_CACHE_VAR *key_cache= (KEY_CACHE_VAR *) arg;
ASSIGN_EXTRA_INFO *extra_info= (ASSIGN_EXTRA_INFO *) key_cache->extra_info;
struct st_my_thread_var *waiting_thread;
pthread_mutex_t *lock= extra_info->lock;
pthread_mutex_lock(lock);
if (!(--key_cache->assignments) &&
(waiting_thread = extra_info->waiting_thread))
{
my_free(extra_info, MYF(0));
key_cache->extra_info= 0;
if (waiting_thread != my_thread_var)
pthread_cond_signal(&waiting_thread->suspend);
}
pthread_mutex_unlock(lock);
}
int mi_assign_to_keycache(MI_INFO *info, ulonglong key_map, int mi_assign_to_keycache(MI_INFO *info, ulonglong key_map,
KEY_CACHE_HANDLE *reg_keycache) KEY_CACHE_VAR *key_cache,
pthread_mutex_t *assign_lock)
{ {
ASSIGN_EXTRA_INFO *extra_info;
int error= 0; int error= 0;
MYISAM_SHARE* share= info->s; MYISAM_SHARE* share= info->s;
DBUG_ENTER("mi_assign_to_keycache"); DBUG_ENTER("mi_assign_to_keycache");
share->reg_keycache= reg_keycache; share->reg_keycache= &key_cache->cache;
pthread_mutex_lock(assign_lock);
if (!(extra_info= (ASSIGN_EXTRA_INFO *) key_cache->extra_info))
{
if (!(extra_info= (ASSIGN_EXTRA_INFO*) my_malloc(sizeof(ASSIGN_EXTRA_INFO),
MYF(MY_WME | MY_ZEROFILL))))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
key_cache->extra_info= extra_info;
key_cache->action= remove_key_cache_assign;
extra_info->lock= assign_lock;
}
key_cache->assignments++;
pthread_mutex_unlock(assign_lock);
if (!(info->lock_type == F_WRLCK && share->w_locks)) if (!(info->lock_type == F_WRLCK && share->w_locks))
{ {
if (flush_key_blocks(*share->keycache, share->kfile, FLUSH_RELEASE)) if (flush_key_blocks(*share->keycache, share->kfile, FLUSH_REMOVE))
{ {
error=my_errno; error=my_errno;
mi_mark_crashed(info); /* Mark that table must be checked */ mi_mark_crashed(info); /* Mark that table must be checked */
} }
share->keycache= reg_keycache; share->keycache= &key_cache->cache;
} }
else
{
extra_info->waiting_thread= my_thread_var;
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -62,17 +62,12 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -62,17 +62,12 @@ int mi_lock_database(MI_INFO *info, int lock_type)
/* /*
During a key cache reassignment the current and registered During a key cache reassignment the current and registered
key caches for the table are different. key caches for the table are different.
Although at present key cache ressignment is always
performed with a shared cache for the table acquired,
for future possible optimizations we still
handle this situation as if we could come to this point
during the ressignment (in non-reassignment thread).
*/ */
if (info->lock_type == F_WRLCK && !share->w_locks && if (info->lock_type == F_WRLCK && !share->w_locks &&
((switch_fl= share->keycache != share->reg_keycache) || ((switch_fl= share->keycache != share->reg_keycache) ||
!share->delay_key_write) && !share->delay_key_write) &&
flush_key_blocks(*share->keycache, share->kfile, flush_key_blocks(*share->keycache, share->kfile,
switch_fl ? FLUSH_RELEASE : FLUSH_KEEP)) switch_fl ? FLUSH_REMOVE : FLUSH_KEEP))
{ {
error=my_errno; error=my_errno;
mi_mark_crashed(info); /* Mark that table must be checked */ mi_mark_crashed(info); /* Mark that table must be checked */
......
...@@ -425,7 +425,7 @@ int init_key_cache(KEY_CACHE_HANDLE *pkeycache, uint key_cache_block_size, ...@@ -425,7 +425,7 @@ int init_key_cache(KEY_CACHE_HANDLE *pkeycache, uint key_cache_block_size,
keycache->min_warm_blocks= env && env->division_limit ? keycache->min_warm_blocks= env && env->division_limit ?
blocks * env->division_limit / 100 + 1 : blocks * env->division_limit / 100 + 1 :
blocks; blocks;
keycache->age_threshold= env || env->age_threshold ? keycache->age_threshold= env && env->age_threshold ?
blocks * env->age_threshold / 100 : blocks * env->age_threshold / 100 :
blocks; blocks;
...@@ -535,6 +535,7 @@ int resize_key_cache(KEY_CACHE_HANDLE *pkeycache, uint key_cache_block_size, ...@@ -535,6 +535,7 @@ int resize_key_cache(KEY_CACHE_HANDLE *pkeycache, uint key_cache_block_size,
void change_key_cache_param(KEY_CACHE_HANDLE keycache) void change_key_cache_param(KEY_CACHE_HANDLE keycache)
{ {
KEY_CACHE_VAR *env= keycache->env; KEY_CACHE_VAR *env= keycache->env;
DBUG_ENTER("change_key_cache_param");
if (!env) if (!env)
return; return;
...@@ -544,6 +545,7 @@ void change_key_cache_param(KEY_CACHE_HANDLE keycache) ...@@ -544,6 +545,7 @@ void change_key_cache_param(KEY_CACHE_HANDLE keycache)
if (env->age_threshold) if (env->age_threshold)
keycache->age_threshold= keycache->disk_blocks * keycache->age_threshold= keycache->disk_blocks *
env->age_threshold / 100; env->age_threshold / 100;
DBUG_VOID_RETURN;
} }
...@@ -1283,7 +1285,7 @@ restart: ...@@ -1283,7 +1285,7 @@ restart:
KEYCACHE_DBUG_PRINT("find_key_block", KEYCACHE_DBUG_PRINT("find_key_block",
("request waiting for old page to be saved")); ("request waiting for old page to be saved"));
{ {
struct st_my_thread_var *thread=my_thread_var; struct st_my_thread_var *thread= my_thread_var;
/* Put the request into the queue of those waiting for the old page */ /* Put the request into the queue of those waiting for the old page */
add_to_queue(&block->wqueue[COND_FOR_SAVED], thread); add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
/* Wait until the request can be resubmitted */ /* Wait until the request can be resubmitted */
...@@ -2038,6 +2040,7 @@ static int flush_cached_blocks(KEY_CACHE *keycache, ...@@ -2038,6 +2040,7 @@ static int flush_cached_blocks(KEY_CACHE *keycache,
int flush_key_blocks(KEY_CACHE_HANDLE keycache, int flush_key_blocks(KEY_CACHE_HANDLE keycache,
File file, enum flush_type type) File file, enum flush_type type)
{ {
KEY_CACHE_VAR *env;
BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache; BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
int last_errno= 0; int last_errno= 0;
...@@ -2213,9 +2216,14 @@ restart: ...@@ -2213,9 +2216,14 @@ restart:
keycache_pthread_mutex_unlock(&keycache->cache_lock); keycache_pthread_mutex_unlock(&keycache->cache_lock);
if (type == FLUSH_REMOVE && (env= keycache->env) && (env->action))
{
(*env->action)((void *) env);
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
DBUG_EXECUTE("check_keycache", DBUG_EXECUTE("check_keycache",
test_key_cache(keycache, "end of flush_key_blocks", 0);); test_key_cache(keycache, "end of flush_key_blocks", 0););
#endif #endif
if (cache != cache_buff) if (cache != cache_buff)
my_free((gptr) cache, MYF(0)); my_free((gptr) cache, MYF(0));
......
...@@ -761,7 +761,7 @@ int ha_myisam::assign_to_keycache(THD* thd, HA_CHECK_OPT *check_opt) ...@@ -761,7 +761,7 @@ int ha_myisam::assign_to_keycache(THD* thd, HA_CHECK_OPT *check_opt)
reassign_key_cache(key_cache_asmt, new_key_cache); reassign_key_cache(key_cache_asmt, new_key_cache);
VOID(pthread_mutex_unlock(&LOCK_assign)); VOID(pthread_mutex_unlock(&LOCK_assign));
error= mi_assign_to_keycache(file, map, &new_key_cache->cache); error= mi_assign_to_keycache(file, map, new_key_cache, &LOCK_assign);
VOID(pthread_mutex_lock(&LOCK_assign)); VOID(pthread_mutex_lock(&LOCK_assign));
if (error && !key_cache_asmt->triggered) if (error && !key_cache_asmt->triggered)
......
...@@ -1620,15 +1620,19 @@ int reassign_keycache_tables(THD* thd, KEY_CACHE_VAR* src_cache, ...@@ -1620,15 +1620,19 @@ int reassign_keycache_tables(THD* thd, KEY_CACHE_VAR* src_cache,
{ {
if (key_cache_asmt->to_reassign) if (key_cache_asmt->to_reassign)
{ {
bool refresh;
VOID(pthread_mutex_unlock(&LOCK_assign)); VOID(pthread_mutex_unlock(&LOCK_assign));
bzero((byte *) &table, sizeof(table)); bzero((byte *) &table, sizeof(table));
table.option= dest_name; table.option= dest_name;
table.lock_type= TL_READ;
table.db= key_cache_asmt->db_name; table.db= key_cache_asmt->db_name;
table.alias= table.real_name= key_cache_asmt->table_name; table.alias= table.real_name= key_cache_asmt->table_name;
thd->open_options|= HA_OPEN_TO_ASSIGN; thd->open_options|= HA_OPEN_TO_ASSIGN;
table.table = open_ltable(thd, &table, TL_READ); while (!(table.table=open_table(thd,table.db,
table.real_name,table.alias,
&refresh)) && refresh) ;
thd->open_options&= ~HA_OPEN_TO_ASSIGN; thd->open_options&= ~HA_OPEN_TO_ASSIGN;
if (!table.table)
DBUG_RETURN(-1);
table.table->pos_in_table_list= &table; table.table->pos_in_table_list= &table;
key_cache_asmt->triggered= 1; key_cache_asmt->triggered= 1;
rc= table.table->file->assign_to_keycache(thd, 0); rc= table.table->file->assign_to_keycache(thd, 0);
...@@ -1642,6 +1646,18 @@ int reassign_keycache_tables(THD* thd, KEY_CACHE_VAR* src_cache, ...@@ -1642,6 +1646,18 @@ int reassign_keycache_tables(THD* thd, KEY_CACHE_VAR* src_cache,
else else
key_cache_asmt= key_cache_asmt->next; key_cache_asmt= key_cache_asmt->next;
} }
while (src_cache->assignments)
{
struct st_my_thread_var *waiting_thread= my_thread_var;
pthread_cond_wait(&waiting_thread->suspend, &LOCK_assign);
}
if (src_cache->extra_info)
{
my_free((char *) src_cache->extra_info, MYF(0));
src_cache->extra_info= 0;
}
if (remove_fl && !src_cache->assign_list && src_cache != &dflt_key_cache_var) if (remove_fl && !src_cache->assign_list && src_cache != &dflt_key_cache_var)
{ {
end_key_cache(&src_cache->cache, 1); end_key_cache(&src_cache->cache, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment