Commit a7919a11 authored by unknown's avatar unknown

bug#2831 - --extenral-locking does not fully work with --myisam-recover.

Changed the semantics of open_count so that it is decremented
at every unlock (if it was incremented due to data changes).
So it indicates a crash, if it is non-zero after a lock.
The table will then be repaired.


myisam/mi_close.c:
  bug#2831 - --extenral-locking does not fully work with --myisam-recover.
  To avoid flushing the open_count at every unlock,
  we need to do so at close at least.
myisam/mi_locking.c:
  bug#2831 - --extenral-locking does not fully work with --myisam-recover.
  open_count is now decremented at unlock (from a writelock) with
  mi_unlock_open_count(). After every new lock the state is read
  from the index file and the open_count checked. If not zero,
  another server must have crashed, so the table is marked as crashed.
  In certain situations the decremented open_count mut be flushed to
  the index file. I tried to keep these extra flushes as seldom as possible.
sql/ha_myisam.cc:
  bug#2831 - --extenral-locking does not fully work with --myisam-recover.
  Added code to repair the table, if it is marked crashed after
  successful locking and the --myisam-recover option is set.
sql/sql_table.cc:
  This does not really belong to the bugfix for #2831.
  But it was detected during fixing the external locking.
parent c36356b6
...@@ -70,7 +70,8 @@ int mi_close(register MI_INFO *info) ...@@ -70,7 +70,8 @@ int mi_close(register MI_INFO *info)
error=my_errno; error=my_errno;
if (share->kfile >= 0) if (share->kfile >= 0)
{ {
if (share->mode != O_RDONLY && mi_is_crashed(info)) /* We must always flush the state with the current open_count. */
if (share->mode != O_RDONLY)
mi_state_info_write(share->kfile, &share->state, 1); mi_state_info_write(share->kfile, &share->state, 1);
if (my_close(share->kfile,MYF(0))) if (my_close(share->kfile,MYF(0)))
error = my_errno; error = my_errno;
......
...@@ -26,6 +26,9 @@ ...@@ -26,6 +26,9 @@
#include <errno.h> #include <errno.h>
#endif #endif
static int mi_unlock_open_count(MI_INFO *info, my_bool write_info);
/* lock table by F_UNLCK, F_RDLCK or F_WRLCK */ /* lock table by F_UNLCK, F_RDLCK or F_WRLCK */
int mi_lock_database(MI_INFO *info, int lock_type) int mi_lock_database(MI_INFO *info, int lock_type)
...@@ -35,7 +38,12 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -35,7 +38,12 @@ int mi_lock_database(MI_INFO *info, int lock_type)
MYISAM_SHARE *share=info->s; MYISAM_SHARE *share=info->s;
uint flag; uint flag;
DBUG_ENTER("mi_lock_database"); DBUG_ENTER("mi_lock_database");
DBUG_PRINT("info",("lock_type: %d", lock_type)); DBUG_PRINT("enter",("mi_lock_database: lock_type %d, old lock %d"
", r_locks %u, w_locks %u", lock_type,
info->lock_type, share->r_locks, share->w_locks));
DBUG_PRINT("enter",("mi_lock_database: gl._changed %d, open_count %u '%s'",
share->global_changed, share->state.open_count,
share->index_file_name));
if (share->options & HA_OPTION_READ_ONLY_DATA || if (share->options & HA_OPTION_READ_ONLY_DATA ||
info->lock_type == lock_type) info->lock_type == lock_type)
...@@ -54,7 +62,6 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -54,7 +62,6 @@ int mi_lock_database(MI_INFO *info, int lock_type)
{ {
switch (lock_type) { switch (lock_type) {
case F_UNLCK: case F_UNLCK:
DBUG_PRINT("info", ("old lock: %d", info->lock_type));
if (info->lock_type == F_RDLCK) if (info->lock_type == F_RDLCK)
count= --share->r_locks; count= --share->r_locks;
else else
...@@ -83,7 +90,8 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -83,7 +90,8 @@ int mi_lock_database(MI_INFO *info, int lock_type)
share->state.process= share->last_process=share->this_process; share->state.process= share->last_process=share->this_process;
share->state.unique= info->last_unique= info->this_unique; share->state.unique= info->last_unique= info->this_unique;
share->state.update_count= info->last_loop= ++info->this_loop; share->state.update_count= info->last_loop= ++info->this_loop;
if (mi_state_info_write(share->kfile, &share->state, 1)) if (mi_unlock_open_count(info, FALSE) ||
mi_state_info_write(share->kfile, &share->state, 1))
error=my_errno; error=my_errno;
share->changed=0; share->changed=0;
if (myisam_flush) if (myisam_flush)
...@@ -98,6 +106,19 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -98,6 +106,19 @@ int mi_lock_database(MI_INFO *info, int lock_type)
if (error) if (error)
mi_mark_crashed(info); mi_mark_crashed(info);
} }
else
{
/*
There are chances that _mi_mark_file_changed() has been called,
while share->changed remained FALSE. Consequently, we need to
clear the open_count even when share->changed is FALSE. Note,
that mi_unlock_open_count() will only clear the open_count when
it is set and only write the status to file, if it changes it
and we are running --with-external-locking.
*/
if (mi_unlock_open_count(info, ! my_disable_locking))
error= my_errno;
}
if (info->lock_type != F_EXTRA_LCK) if (info->lock_type != F_EXTRA_LCK)
{ {
if (share->r_locks) if (share->r_locks)
...@@ -122,10 +143,15 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -122,10 +143,15 @@ int mi_lock_database(MI_INFO *info, int lock_type)
case F_RDLCK: case F_RDLCK:
if (info->lock_type == F_WRLCK) if (info->lock_type == F_WRLCK)
{ /* Change RW to READONLY */ { /* Change RW to READONLY */
/*
mysqld does not turn write locks to read locks,
so we're never here in mysqld.
*/
if (share->w_locks == 1) if (share->w_locks == 1)
{ {
flag=1; flag=1;
if (my_lock(share->kfile,lock_type,0L,F_TO_EOF, if (mi_unlock_open_count(info, ! my_disable_locking) ||
my_lock(share->kfile,lock_type,0L,F_TO_EOF,
MYF(MY_SEEK_NOT_DONE))) MYF(MY_SEEK_NOT_DONE)))
{ {
error=my_errno; error=my_errno;
...@@ -153,6 +179,14 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -153,6 +179,14 @@ int mi_lock_database(MI_INFO *info, int lock_type)
my_errno=error; my_errno=error;
break; break;
} }
if (share->state.open_count)
{
DBUG_PRINT("error",("RD: Table has not been correctly unlocked"
": open_count %d '%s'",
share->state.open_count,
share->index_file_name));
mi_mark_crashed(info);
}
} }
VOID(_mi_test_if_changed(info)); VOID(_mi_test_if_changed(info));
share->r_locks++; share->r_locks++;
...@@ -198,6 +232,14 @@ int mi_lock_database(MI_INFO *info, int lock_type) ...@@ -198,6 +232,14 @@ int mi_lock_database(MI_INFO *info, int lock_type)
my_errno=error; my_errno=error;
break; break;
} }
if (share->state.open_count)
{
DBUG_PRINT("error",("WR: Table has not been correctly unlocked"
": open_count %d '%s'",
share->state.open_count,
share->index_file_name));
mi_mark_crashed(info);
}
} }
} }
} }
...@@ -459,3 +501,36 @@ int _mi_decrement_open_count(MI_INFO *info) ...@@ -459,3 +501,36 @@ int _mi_decrement_open_count(MI_INFO *info)
} }
return test(lock_error || write_error); return test(lock_error || write_error);
} }
/*
Decrement open_count in preparation for unlock.
SYNOPSIS
mi_unlock_open_count()
info Pointer to the MI_INFO structure.
write_info If info must be written when changed.
RETURN
0 OK
*/
static int mi_unlock_open_count(MI_INFO *info, my_bool write_info)
{
int rc= 0;
MYISAM_SHARE *share=info->s;
DBUG_ENTER("mi_unlock_open_count");
DBUG_PRINT("enter",("mi_unlock_open_count: gl._changed %d open_count %d '%s'",
share->global_changed, share->state.open_count,
share->index_file_name));
if (share->global_changed)
{
share->global_changed= 0;
if (share->state.open_count)
share->state.open_count--;
if (write_info)
rc= _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
}
DBUG_RETURN(rc);
}
...@@ -1000,9 +1000,27 @@ int ha_myisam::delete_table(const char *name) ...@@ -1000,9 +1000,27 @@ int ha_myisam::delete_table(const char *name)
int ha_myisam::external_lock(THD *thd, int lock_type) int ha_myisam::external_lock(THD *thd, int lock_type)
{ {
return mi_lock_database(file, !table->tmp_table ? int rc;
while ((! (rc= mi_lock_database(file, !table->tmp_table ?
lock_type : ((lock_type == F_UNLCK) ? lock_type : ((lock_type == F_UNLCK) ?
F_UNLCK : F_EXTRA_LCK)); F_UNLCK : F_EXTRA_LCK)))) &&
mi_is_crashed(file) && (myisam_recover_options != HA_RECOVER_NONE))
{
/*
check_and_repair() implicitly write locks the table, unless a
LOCK TABLES is in effect. It should be safer to always write lock here.
The implicit lock by check_and_repair() will then be a no-op.
check_and_repair() does not restore the original lock, but unlocks the
table. So we have to get the requested lock type again. And then to
check, if the table has been crashed again meanwhile by another server.
If anything fails, we break.
*/
if (((lock_type != F_WRLCK) && (rc= mi_lock_database(file, F_WRLCK))) ||
(rc= check_and_repair(thd)))
break;
}
return rc;
} }
......
...@@ -2208,7 +2208,12 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -2208,7 +2208,12 @@ copy_data_between_tables(TABLE *from,TABLE *to,
if (!(copy= new Copy_field[to->fields])) if (!(copy= new Copy_field[to->fields]))
DBUG_RETURN(-1); /* purecov: inspected */ DBUG_RETURN(-1); /* purecov: inspected */
to->file->external_lock(thd,F_WRLCK); if (to->file->external_lock(thd, F_WRLCK))
{
/* We must always unlock, even when lock failed. */
(void) to->file->external_lock(thd, F_UNLCK);
DBUG_RETURN(-1);
}
to->file->extra(HA_EXTRA_WRITE_CACHE); to->file->extra(HA_EXTRA_WRITE_CACHE);
from->file->info(HA_STATUS_VARIABLE); from->file->info(HA_STATUS_VARIABLE);
to->file->deactivate_non_unique_index(from->file->records); to->file->deactivate_non_unique_index(from->file->records);
...@@ -2308,11 +2313,12 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -2308,11 +2313,12 @@ copy_data_between_tables(TABLE *from,TABLE *to,
error=1; error=1;
if (ha_commit(thd)) if (ha_commit(thd))
error=1; error=1;
if (to->file->external_lock(thd,F_UNLCK))
error=1;
err: err:
free_io_cache(from); free_io_cache(from);
*copied= found_count; *copied= found_count;
*deleted=delete_count; *deleted=delete_count;
/* we must always unlock the table on return. */
if (to->file->external_lock(thd,F_UNLCK))
error=1;
DBUG_RETURN(error > 0 ? -1 : 0); DBUG_RETURN(error > 0 ? -1 : 0);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment