WL 2826: Error handling of ALTER TABLE for partitioning

More review fixes
parent f8088c18
......@@ -1266,6 +1266,12 @@ typedef struct st_ddl_log_entry
uint entry_pos;
enum ddl_log_entry_code entry_type;
enum ddl_log_action_code action_type;
/*
Most actions have only one phase. REPLACE does however have two
phases. The first phase removes the file with the new name if
there was one there before and the second phase renames the
old name to the new name.
*/
char phase;
} DDL_LOG_ENTRY;
......
......@@ -5634,7 +5634,8 @@ static void write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt,
Failed to write, Bad...
We have completed the operation but have log records to REMOVE
stuff that shouldn't be removed. What clever things could one do
here?
here? An error output was written to the error output by the
above method so we don't do anything here.
*/
;
}
......
......@@ -281,6 +281,7 @@ typedef struct st_global_ddl_log
uint handler_name_len;
uint io_size;
bool inited;
bool recovery_phase;
} GLOBAL_DDL_LOG;
GLOBAL_DDL_LOG global_ddl_log;
......@@ -374,9 +375,11 @@ static bool write_ddl_log_header()
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS],
const_var);
if (write_ddl_log_file_entry(0UL))
error= TRUE;
if (!error)
VOID(sync_ddl_log());
{
sql_print_error("Error writing ddl log header");
DBUG_RETURN(TRUE);
}
VOID(sync_ddl_log());
DBUG_RETURN(error);
}
......@@ -419,16 +422,20 @@ static uint read_ddl_log_header()
bzero(file_entry_buf, sizeof(global_ddl_log.file_entry_buf));
global_ddl_log.inited= FALSE;
global_ddl_log.recovery_phase= TRUE;
create_ddl_log_file_name(file_name);
if (!(my_open(file_name, O_RDONLY | O_BINARY, MYF(MY_WME))))
if (!(my_open(file_name, O_RDWR | O_BINARY, MYF(MY_WME))))
{
if (read_ddl_log_file_entry(0UL))
{
; /* Write message into error log */
/* Write message into error log */
sql_print_error("Failed to read ddl log file in recovery");
}
else
successful_open= TRUE;
}
else
sql_print_error("Failed to open ddl log file in recovery");
entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
global_ddl_log.handler_name_len=
......@@ -436,7 +443,10 @@ static uint read_ddl_log_header()
if (successful_open)
global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
else
{
global_ddl_log.io_size= IO_SIZE;
entry_no= 0;
}
global_ddl_log.first_free= NULL;
global_ddl_log.first_used= NULL;
global_ddl_log.num_entries= 0;
......@@ -466,7 +476,6 @@ bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
if (read_ddl_log_file_entry(read_entry))
{
/* Error handling */
DBUG_RETURN(TRUE);
}
ddl_log_entry->entry_pos= read_entry;
......@@ -508,23 +517,21 @@ static bool init_ddl_log()
DBUG_RETURN(FALSE);
}
global_ddl_log.io_size= IO_SIZE;
create_ddl_log_file_name(file_name);
VOID(my_delete(file_name, MYF(0)));
if ((global_ddl_log.file_id= my_create(file_name,
CREATE_MODE,
O_RDWR | O_TRUNC | O_BINARY,
MYF(MY_WME))) < 0)
{
/* Couldn't create ddl log file, this is serious error */
abort();
sql_print_error("Failed to open ddl log file");
DBUG_RETURN(TRUE);
}
if (write_ddl_log_header())
{
/* Write to error log */
error= TRUE;
DBUG_RETURN(TRUE);
}
global_ddl_log.inited= TRUE;
DBUG_RETURN(error);
DBUG_RETURN(FALSE);
}
......@@ -538,7 +545,7 @@ static bool init_ddl_log()
FALSE Success
*/
static bool execute_ddl_log_action(DDL_LOG_ENTRY *ddl_log_entry)
static bool execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
{
bool frm_action= FALSE;
LEX_STRING handler_name;
......@@ -557,7 +564,7 @@ static bool execute_ddl_log_action(DDL_LOG_ENTRY *ddl_log_entry)
}
handler_name.str= (char*)ddl_log_entry->handler_name;
handler_name.length= strlen(ddl_log_entry->handler_name);
hton= ha_resolve_by_name(current_thd, &handler_name);
hton= ha_resolve_by_name(thd, &handler_name);
if (!hton)
{
my_error(ER_ILLEGAL_HA, MYF(0), ddl_log_entry->handler_name);
......@@ -579,38 +586,40 @@ static bool execute_ddl_log_action(DDL_LOG_ENTRY *ddl_log_entry)
}
switch (ddl_log_entry->action_type)
{
case DDL_LOG_DELETE_ACTION:
case DDL_LOG_REPLACE_ACTION:
case DDL_LOG_DELETE_ACTION:
{
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION ||
(ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION &&
ddl_log_entry->phase == 0UL))
if (ddl_log_entry->phase == 0)
{
if (frm_action)
{
strxmov(path, ddl_log_entry->name, reg_ext, NullS);
if (my_delete(path, MYF(MY_WME)))
break;
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(path, ddl_log_entry->name, par_ext, NullS);
if (my_delete(path, MYF(MY_WME)))
break;
VOID(my_delete(path, MYF(MY_WME)));
#endif
}
else
{
if (file->delete_table(ddl_log_entry->name))
break;
}
if ((!deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
;
else
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
{
VOID(sync_ddl_log());
error= FALSE;
}
break;
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
break;
}
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
break;
DBUG_ASSERT(ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION);
/*
Fall through and perform the rename action of the replace
action. We have already indicated the success of the delete
action in the log entry by stepping up the phase.
*/
}
case DDL_LOG_RENAME_ACTION:
{
......@@ -621,23 +630,22 @@ static bool execute_ddl_log_action(DDL_LOG_ENTRY *ddl_log_entry)
strxmov(from_path, ddl_log_entry->from_name, reg_ext, NullS);
if (my_rename(path, from_path, MYF(MY_WME)))
break;
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(path, ddl_log_entry->name, par_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, par_ext, NullS);
if (my_rename(path, from_path, MYF(MY_WME)))
break;
VOID(my_rename(path, from_path, MYF(MY_WME)));
#endif
}
else
{
if (file->rename_table(ddl_log_entry->name,
ddl_log_entry->from_name))
break;
if ((!deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
;
else
{
VOID(sync_ddl_log());
error= FALSE;
}
}
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
{
VOID(sync_ddl_log());
error= FALSE;
}
break;
}
......@@ -665,7 +673,6 @@ error:
static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
bool *write_header)
{
uint entry_no;
DDL_LOG_MEMORY_ENTRY *used_entry;
DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used;
DBUG_ENTER("get_free_ddl_log_entry");
......@@ -675,17 +682,17 @@ static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc(
sizeof(DDL_LOG_MEMORY_ENTRY), MYF(MY_WME))))
{
sql_print_error("Failed to allocate memory for ddl log free list");
DBUG_RETURN(TRUE);
}
global_ddl_log.num_entries++;
used_entry->entry_pos= entry_no= global_ddl_log.num_entries;
used_entry->entry_pos= global_ddl_log.num_entries;
*write_header= TRUE;
}
else
{
used_entry= global_ddl_log.first_free;
global_ddl_log.first_free= used_entry->next_log_entry;
entry_no= used_entry->entry_pos;
*write_header= FALSE;
}
/*
......@@ -759,12 +766,17 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
}
error= FALSE;
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
{
error= TRUE;
sql_print_error("Failed to write entry_no = %u",
(*active_entry)->entry_pos);
}
if (write_header && !error)
{
VOID(sync_ddl_log());
if (write_ddl_log_header())
error= TRUE;
VOID(sync_ddl_log());
}
if (error)
release_ddl_log_memory_entry(*active_entry);
......@@ -791,6 +803,10 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DESCRIPTION
This is the last write in the ddl log. The previous log entries have
already been written but not yet synched to disk.
We write a couple of log entries that describes action to perform.
This entries are set-up in a linked list, however only when a first
execute entry is put as the first entry these will be executed.
This routine writes this first
*/
bool write_execute_ddl_log_entry(uint first_entry,
......@@ -807,6 +823,12 @@ bool write_execute_ddl_log_entry(uint first_entry,
}
if (!complete)
{
/*
We haven't synched the log entries yet, we synch them now before
writing the execute entry. If complete is true we haven't written
any log entries before, we are only here to write the execute
entry to indicate it is done.
*/
VOID(sync_ddl_log());
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_EXECUTE_CODE;
}
......@@ -827,6 +849,7 @@ bool write_execute_ddl_log_entry(uint first_entry,
}
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
{
sql_print_error("Error writing execute entry in ddl log");
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
......@@ -838,6 +861,7 @@ bool write_execute_ddl_log_entry(uint first_entry,
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
VOID(sync_ddl_log());
}
DBUG_RETURN(FALSE);
}
......@@ -869,7 +893,6 @@ bool write_execute_ddl_log_entry(uint first_entry,
bool deactivate_ddl_log_entry(uint entry_no)
{
bool error= TRUE;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("deactivate_ddl_log_entry");
......@@ -891,11 +914,20 @@ bool deactivate_ddl_log_entry(uint entry_no)
{
DBUG_ASSERT(0);
}
if (!write_ddl_log_file_entry(entry_no))
error= FALSE;
if (write_ddl_log_file_entry(entry_no))
{
sql_print_error("Error in deactivating log entry. Position = %u",
entry_no);
DBUG_RETURN(TRUE);
}
}
}
DBUG_RETURN(error);
else
{
sql_print_error("Failed in reading entry before deactivating it");
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
......@@ -913,13 +945,15 @@ bool sync_ddl_log()
bool error= FALSE;
DBUG_ENTER("sync_ddl_log");
if (init_ddl_log())
if ((!global_ddl_log.recovery_phase) &&
init_ddl_log())
{
DBUG_RETURN(TRUE);
}
if (my_sync(global_ddl_log.file_id, MYF(0)))
{
/* Write to error log */
sql_print_error("Failed to sync ddl log");
error= TRUE;
}
DBUG_RETURN(error);
......@@ -966,7 +1000,7 @@ void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry)
FALSE Success
*/
bool execute_ddl_log_entry(uint first_entry)
bool execute_ddl_log_entry(THD *thd, uint first_entry)
{
DDL_LOG_ENTRY ddl_log_entry;
uint read_entry= first_entry;
......@@ -979,15 +1013,19 @@ bool execute_ddl_log_entry(uint first_entry)
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
sql_print_error("Failed to read entry = %u from ddl log",
read_entry);
break;
}
DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE ||
ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE);
if (execute_ddl_log_action(&ddl_log_entry))
if (execute_ddl_log_action(thd, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
sql_print_error("Failed to execute action for entry = %u from ddl log",
read_entry);
break;
}
read_entry= ddl_log_entry.next_entry;
......@@ -1011,29 +1049,40 @@ void execute_ddl_log_recovery()
DDL_LOG_ENTRY ddl_log_entry;
DBUG_ENTER("execute_ddl_log_recovery");
/*
To be able to run this from boot, we allocate a temporary THD
*/
if (!(thd=new THD))
DBUG_VOID_RETURN;
thd->thread_stack= (char*) &thd;
thd->store_globals();
num_entries= read_ddl_log_header();
for (i= 0; i < num_entries; i++)
{
if (read_ddl_log_entry(i, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log */
break;
sql_print_error("Failed to read entry no = %u from ddl log",
i);
continue;
}
if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE)
{
if (execute_ddl_log_entry(ddl_log_entry.next_entry))
{
/*
Currently errors are either crashing or ignored so we should
never end up here
*/
abort();
DBUG_VOID_RETURN;
/* Real unpleasant scenario but we continue anyways. */
DBUG_ASSERT(0);
continue;
}
}
}
VOID(init_ddl_log());
create_ddl_log_file_name(file_name);
VOID(my_delete(file_name, MYF(0)));
global_ddl_log.recovery_phase= FALSE;
delete thd;
/* Remember that we don't have a THD */
my_pthread_setspecific_ptr(THR_THD, 0);
DBUG_VOID_RETURN;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment