WL 2826: Error handling of ALTER TABLE for partitioning

Make lots of fixes to handle the most complex case of reorganising
of partitions where two-phased processes are needed in some cases.
parent abe4952f
......@@ -577,7 +577,6 @@ int ha_partition::create(const char *name, TABLE *table_arg,
int ha_partition::drop_partitions(const char *path)
{
List_iterator<partition_element> part_it(m_part_info->partitions);
List_iterator<partition_element> temp_it(m_part_info->temp_partitions);
char part_name_buff[FN_REFLEN];
uint no_parts= m_part_info->partitions.elements;
uint part_count= 0;
......@@ -585,37 +584,18 @@ int ha_partition::drop_partitions(const char *path)
uint i= 0;
uint name_variant;
int error= 0;
bool reorged_parts= (m_reorged_parts > 0);
bool temp_partitions= (m_part_info->temp_partitions.elements > 0);
DBUG_ENTER("ha_partition::drop_partitions");
if (temp_partitions)
no_parts= m_part_info->temp_partitions.elements;
do
{
partition_element *part_elem;
if (temp_partitions)
{
/*
We need to remove the reorganised partitions that were put in the
temp_partitions-list.
*/
part_elem= temp_it++;
DBUG_ASSERT(part_elem->part_state == PART_TO_BE_DROPPED);
}
else
part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED ||
part_elem->part_state == PART_IS_CHANGED)
partition_element *part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED)
{
handler *file;
/*
This part is to be dropped, meaning the part or all its subparts.
*/
name_variant= NORMAL_PART_NAME;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_TO_BE_DROPPED && temp_partitions))
name_variant= RENAMED_PART_NAME;
if (m_is_sub_partitioned)
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
......@@ -627,10 +607,7 @@ int ha_partition::drop_partitions(const char *path)
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name, name_variant);
if (reorged_parts)
file= m_reorged_file[part_count++];
else
file= m_file[part];
file= m_file[part];
DBUG_PRINT("info", ("Drop subpartition %s", part_name_buff));
error+= file->delete_table((const char *) part_name_buff);
} while (++j < no_subparts);
......@@ -640,10 +617,7 @@ int ha_partition::drop_partitions(const char *path)
create_partition_name(part_name_buff, path,
part_elem->partition_name, name_variant,
TRUE);
if (reorged_parts)
file= m_reorged_file[part_count++];
else
file= m_file[i];
file= m_file[i];
DBUG_PRINT("info", ("Drop partition %s", part_name_buff));
error+= file->delete_table((const char *) part_name_buff);
}
......@@ -695,6 +669,14 @@ int ha_partition::rename_partitions(const char *path)
if (temp_partitions)
{
/*
These are the reorganised partitions that have already been copied.
We delete the partitions and log the delete by inactivating the
delete log entry in the table log. We only need to synchronise
these writes before moving to the next loop since there is no
interaction among reorganised partitions, they cannot have the
same name.
*/
do
{
part_elem= temp_it++;
......@@ -705,39 +687,57 @@ int ha_partition::rename_partitions(const char *path)
{
sub_elem= sub_it++;
file= m_reorged_file[part_count++];
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
RENAMED_PART_NAME);
create_subpartition_name(norm_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
norm_name_buff, part_name_buff));
error+= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if (file->delete_table((const char *) norm_name_buff) ||
inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL; /* Indicate success */
} while (++j < no_subparts);
}
else
{
file= m_reorged_file[part_count++];
create_partition_name(part_name_buff, path,
part_elem->partition_name, RENAMED_PART_NAME,
TRUE);
create_partition_name(norm_name_buff, path,
part_elem->partition_name, NORMAL_PART_NAME,
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
norm_name_buff, part_name_buff));
error+= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
if (file->delete_table((const char *) norm_name_buff) ||
inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL; /* Indicate success */
}
} while (++i < temp_partitions);
VOID(sync_table_log());
}
i= 0;
do
{
/*
When state is PART_IS_CHANGED it means that we have created a new
TEMP partition that is to be renamed to normal partition name and
we are to delete the old partition with currently the normal name.
We perform this operation by
1) Delete old partition with normal partition name
2) Signal this in table log entry
3) Synch table log to ensure we have consistency in crashes
4) Rename temporary partition name to normal partition name
5) Signal this to table log entry
It is not necessary to synch the last state since a new rename
should not corrupt things if there was no temporary partition.
The only other parts we need to cater for are new parts that
replace reorganised parts. The reorganised parts were deleted
by the code above that goes through the temp_partitions list.
Thus the synch above makes it safe to simply perform step 4 and 5
for those entries.
*/
part_elem= part_it++;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_IS_ADDED && temp_partitions))
......@@ -759,14 +759,11 @@ int ha_partition::rename_partitions(const char *path)
if (part_elem->part_state == PART_IS_CHANGED)
{
file= m_reorged_file[part_count++];
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
RENAMED_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
norm_name_buff, part_name_buff));
error+= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if (file->delete_table((const char *) norm_name_buff) ||
inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
VOID(synch_table_log());
}
file= m_new_file[part];
create_subpartition_name(part_name_buff, path,
......@@ -775,8 +772,12 @@ int ha_partition::rename_partitions(const char *path)
TEMP_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
part_name_buff, norm_name_buff));
error+= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff);
if (file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff) ||
inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL;
} while (++j < no_subparts);
}
else
......@@ -787,13 +788,11 @@ int ha_partition::rename_partitions(const char *path)
if (part_elem->part_state == PART_IS_CHANGED)
{
file= m_reorged_file[part_count++];
create_partition_name(part_name_buff, path,
part_elem->partition_name, RENAMED_PART_NAME,
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
norm_name_buff, part_name_buff));
error+= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if (file->delete_table((const char *) norm_name_buff) ||
inactivate_table_log_entry(part_elem->log_entry->entry_pos))
error= 1;
VOID(synch_table_log());
}
file= m_new_file[i];
create_partition_name(part_name_buff, path,
......@@ -801,11 +800,16 @@ int ha_partition::rename_partitions(const char *path)
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
part_name_buff, norm_name_buff));
error+= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff);
if (file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff) ||
inactivate_table_log_entry(part_elem->log_entry->entry_pos))
error= 1;
else
part_elem->log_entry= NULL;
}
}
} while (++i < no_parts);
VOID(synch_table_log());
DBUG_RETURN(error);
}
......
......@@ -667,6 +667,7 @@ class partition_element :public Sql_alloc {
ulonglong part_min_rows;
char *partition_name;
char *tablespace_name;
TABLE_LOG_MEMORY_ENTRY *log_entry;
longlong range_value;
char* part_comment;
char* data_file_name;
......@@ -677,7 +678,8 @@ class partition_element :public Sql_alloc {
partition_element()
: part_max_rows(0), part_min_rows(0), partition_name(NULL),
tablespace_name(NULL), range_value(0), part_comment(NULL),
tablespace_name(NULL), log_entry(0),
range_value(0), part_comment(NULL),
data_file_name(NULL), index_file_name(NULL),
engine_type(NULL),part_state(PART_NORMAL),
nodegroup_id(UNDEF_NODEGROUP)
......
......@@ -1177,6 +1177,8 @@ typedef struct st_table_log_entry
uint next_entry;
char action_type;
char entry_type;
char phase;
char not_used;
} TABLE_LOG_ENTRY;
typedef struct st_table_log_memory_entry
......@@ -1187,11 +1189,22 @@ typedef struct st_table_log_memory_entry
struct st_table_log_memory_entry *next_active_log_entry;
} TABLE_LOG_MEMORY_ENTRY;
#define TLOG_EXECUTE_CODE 'e'
#define TLOG_LOG_ENTRY_CODE 'l'
#define TLOG_IGNORE_LOG_ENTRY_CODE 'i'
#define TLOG_DELETE_ACTION_CODE 'd'
#define TLOG_RENAME_ACTION_CODE 'r'
#define TLOG_REPLACE_ACTION_CODE 's'
#define TLOG_HANDLER_TYPE_LEN 32
bool write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
TABLE_LOG_MEMORY_ENTRY **active_entry);
bool write_execute_table_log_entry(uint first_entry,
bool complete,
TABLE_LOG_MEMORY_ENTRY **active_entry);
bool inactivate_table_log_entry(uint entry_no);
void release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry);
void release_table_log();
void execute_table_log_recovery();
......
......@@ -5135,9 +5135,9 @@ write_log_rename_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
DBUG_ENTER("write_log_rename_frm");
if (rename_flag)
table_log_entry.action_type= 'r';
table_log_entry.action_type= TLOG_RENAME_ACTION_CODE;
else
table_log_entry.action_type= 'd';
table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
table_log_entry.next_entry= next_entry;
table_log_entry.handler_type= "frm";
table_log_entry.name= to_path;
......@@ -5160,6 +5160,18 @@ write_log_rename_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
This code is used to perform safe ADD PARTITION for HASH partitions
and COALESCE for HASH partitions and REORGANIZE for any type of
partitions.
We prepare entries for all partitions except the reorganised partitions
in REORGANIZE partition, those are handled by
write_log_dropped_partitions. For those partitions that are replaced
special care is needed to ensure that this is performed correctly and
this requires a two-phased approach with this log as a helper for this.
This code is closely intertwined with the code in rename_partitions in
the partition handler.
*/
static
......@@ -5167,7 +5179,84 @@ bool
write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry, const char *path)
{
TABLE_LOG_ENTRY table_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
char normal_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
uint temp_partitions= part_info->temp_partitions.elements;
uint no_elements= part_info->partitions.elements;
uint i= 0;
DBUG_ENTER("write_log_changed_partitions");
do
{
partition_element *part_elem= part_it++;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_IS_ADDED && temp_partitions))
{
if (is_sub_partitioned(part_info))
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
uint no_subparts= part_info->no_subparts;
uint j= 0;
do
{
partition_element *sub_elem= sub_it++;
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
sub_elem->partition_name,
TEMP_PART_NAME);
create_subpartition_name(normal_path, path,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
table_log_entry.name= norm_path;
table_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
table_log_entry->action_type= TLOG_REPLACE_ACTION_CODE;
else
table_log_entry->action_type= TLOG_RENAME_ACTION_CODE;
if (write_table_log_entry(&table_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
sub_elem->log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
} while (++j < no_subparts);
}
else
{
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
TEMP_PART_NAME, TRUE);
create_partition_name(normal_path, path,
part_elem->partition_name,
NORMAL_PART_NAME, TRUE);
table_log_entry.name= normal_path;
table_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
table_log_entry->action_type= TLOG_REPLACE_ACTION_CODE;
else
table_log_entry->action_type= TLOG_RENAME_ACTION_CODE;
if (write_table_log_entry(&table_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
part_elem->table_log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
}
}
} while (++i < no_elements)
DBUG_RETURN(FALSE);
}
......@@ -5200,7 +5289,7 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint i= 0;
DBUG_ENTER("write_log_dropped_partitions");
table_log_entry.action_type= 'd';
table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
if (temp_list)
no_elements= no_temp_partitions;
while (no_elements--)
......@@ -5242,6 +5331,8 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
if (temp_list)
sub_elem->table_log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
} while (++j < no_subparts);
}
......@@ -5259,6 +5350,8 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
if (temp_list)
part_elem->table_log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
}
}
......@@ -5961,10 +6054,8 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
ERROR_INJECT_CRASH("crash_change_partition_8") ||
(close_open_tables_and_downgrade(lpt), FALSE) ||
ERROR_INJECT_CRASH("crash_change_partition_9") ||
mysql_drop_partitions(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_10") ||
write_log_completed(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_11") ||
ERROR_INJECT_CRASH("crash_change_partition_10") ||
(mysql_wait_completed_table(lpt, table), FALSE))
{
abort();
......
......@@ -289,28 +289,38 @@ GLOBAL_TABLE_LOG global_table_log;
pthread_mutex_t LOCK_gtl;
#define TLOG_ENTRY_TYPE_POS 0
#define TLOG_ACTION_TYPE_POS 1
#define TLOG_PHASE_POS 2
#define TLOG_NEXT_ENTRY_POS 4
#define TLOG_NO_ENTRY_POS 0
#define TLOG_NAME_LEN_POS 4
#define TLOG_HANDLER_TYPE_POS 8
#define TLOG_IO_SIZE_POS 12
/*
Sync table log file
Read one entry from table log file
SYNOPSIS
sync_table_log()
read_table_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
TRUE Error
FALSE Success
*/
static
bool
sync_table_log()
read_table_log_file_entry(uint entry_no)
{
bool error= FALSE;
DBUG_ENTER("sync_table_log");
File file_id= global_table_log.file_id;
char *file_entry= (char*)global_table_log.file_entry;
uint io_size= global_table_log.io_size;
DBUG_ENTER("read_table_log_file_entry");
if (my_sync(global_table_log.file_id, MYF(0)))
{
/* Write to error log */
if (my_pread(file_id, file_entry, io_size, io_size * entry_no, MYF(0)))
error= TRUE;
}
DBUG_RETURN(error);
}
......@@ -358,13 +368,17 @@ write_table_log_header()
bool error= FALSE;
DBUG_ENTER("write_table_log_header");
int4store(&global_table_log.file_entry[0], global_table_log.no_entries);
int4store(&global_table_log.file_entry[TLOG_NO_ENTRY_POS],
global_table_log.no_entries);
const_var= FN_LEN;
int2store(&global_table_log.file_entry[4], const_var);
const_var= 32;
int2store(&global_table_log.file_entry[6], const_var);
int4store(&global_table_log.file_entry[TLOG_NAME_LEN_POS],
const_var);
const_var= TLOG_HANDLER_TYPE_LEN;
int4store(&global_table_log.file_entry[TLOG_HANDLER_TYPE_POS],
const_var);
const_var= IO_SIZE;
int4store(&global_table_log.file_entry[8], const_var);
int4store(&global_table_log.file_entry[TLOG_IO_SIZE_POS],
const_var);
if (write_table_log_file_entry(0UL))
error= TRUE;
if (!error)
......@@ -373,32 +387,6 @@ write_table_log_header()
}
/*
Read one entry from table log file
SYNOPSIS
read_table_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
read_table_log_file_entry(uint entry_no)
{
bool error= FALSE;
File file_id= global_table_log.file_id;
char *file_entry= (char*)global_table_log.file_entry;
uint io_size= global_table_log.io_size;
DBUG_ENTER("read_table_log_file_entry");
if (my_pread(file_id, file_entry, io_size, io_size * entry_no, MYF(0)))
error= TRUE;
DBUG_RETURN(error);
}
/*
Create table log file name
SYNOPSIS
......@@ -450,11 +438,12 @@ read_table_log_header()
else
successful_open= TRUE;
}
entry_no= uint4korr(&file_entry[0]);
global_table_log.name_len= uint2korr(&file_entry[4]);
global_table_log.handler_type_len= uint2korr(&file_entry[6]);
entry_no= uint4korr(&file_entry[TLOG_NO_ENTRY_POS]);
global_table_log.name_len= uint4korr(&file_entry[TLOG_NAME_LEN_POS]);
global_table_log.handler_type_len=
uint4korr(&file_entry[TLOG_HANDLER_TYPE_POS]);
if (successful_open)
global_table_log.io_size= uint4korr(&file_entry[8]);
global_table_log.io_size= uint4korr(&file_entry[TLOG_IO_SIZE_POS]);
global_table_log.first_free= NULL;
global_table_log.first_used= NULL;
global_table_log.no_entries= 0;
......@@ -488,11 +477,12 @@ read_table_log_entry(uint read_entry, TABLE_LOG_ENTRY *table_log_entry)
/* Error handling */
DBUG_RETURN(TRUE);
}
table_log_entry->entry_type= file_entry[0];
table_log_entry->action_type= file_entry[1];
table_log_entry->next_entry= uint4korr(&file_entry[2]);
table_log_entry->name= &file_entry[6];
inx= 6 + global_table_log.name_len;
table_log_entry->entry_type= file_entry[TLOG_ENTRY_TYPE_POS];
table_log_entry->action_type= file_entry[TLOG_ACTION_TYPE_POS];
table_log_entry->phase= file_entry[TLOG_PHASE_POS];
table_log_entry->next_entry= uint4korr(&file_entry[TLOG_NEXT_ENTRY_POS]);
table_log_entry->name= &file_entry[TLOG_NAME_POS];
inx= TLOG_NAME_POS + global_table_log.name_len;
table_log_entry->from_name= &file_entry[inx];
inx+= global_table_log.name_len;
table_log_entry->handler_type= &file_entry[inx];
......@@ -637,23 +627,27 @@ write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
bool error, write_header;
DBUG_ENTER("write_table_log_entry");
global_table_log.file_entry[0]= 'i';
global_table_log.file_entry[1]= table_log_entry->action_type;
int4store(&global_table_log.file_entry[2],
global_table_log.file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_LOG_ENTRY_CODE;
global_table_log.file_entry[TLOG_ACTION_TYPE_POS]=
table_log_entry->action_type;
global_table_log.file_entry[TLOG_PHASE_POS]= 0;
int4store(&global_table_log.file_entry[TLOG_NEXT_ENTRY_POS],
table_log_entry->next_entry);
DBUG_ASSERT(strlen(table_log_entry->name) < FN_LEN);
strncpy(&global_table_log.file_entry[6], table_log_entry->name, FN_LEN);
if (table_log_entry->action_type == 'r')
strncpy(&global_table_log.file_entry[TLOG_NAME_POS],
table_log_entry->name, FN_LEN);
if (table_log_entry->action_type == TLOG_RENAME_ACTION_CODE ||
table_log_entry->action_type == TLOG_REPLACE_ACTION_CODE)
{
DBUG_ASSERT(strlen(table_log_entry->from_name) < FN_LEN);
strncpy(&global_table_log.file_entry[6 + FN_LEN],
strncpy(&global_table_log.file_entry[TLOG_NAME_POS + FN_LEN],
table_log_entry->from_name, FN_LEN);
}
else
global_table_log.file_entry[6 + FN_LEN]= 0;
global_table_log.file_entry[TLOG_NAME_POS + FN_LEN]= 0;
DBUG_ASSERT(strlen(table_log_entry->handler_type) < FN_LEN);
strncpy(&global_table_log.file_entry[6 + (2*FN_LEN)],
table_log_entry->handler_type, FN_LEN);
strncpy(&global_table_log.file_entry[TLOG_NAME_POS + (2*FN_LEN)],
table_log_entry->handler_type, FN_LEN);
if (get_free_table_log_entry(active_entry, &write_header))
{
DBUG_RETURN(TRUE);
......@@ -705,15 +699,16 @@ write_execute_table_log_entry(uint first_entry,
if (!complete)
{
VOID(sync_table_log());
file_entry[0]= 'e';
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_EXECUTE_CODE;
}
else
file_entry[0]= 'i';
file_entry[1]= 0; /* Ignored for execute entries */
int4store(&file_entry[2], first_entry);
file_entry[6]= 0;
file_entry[6 + FN_LEN]= 0;
file_entry[6 + 2*FN_LEN]= 0;
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_CODE;
file_entry[TLOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
file_entry[TLOG_PHASE_POS]= 0;
int4store(&file_entry[TLOG_NEXT_ENTRY_POS], first_entry);
file_entry[TLOG_NAME_POS]= 0;
file_entry[TLOG_NAME_POS + FN_LEN]= 0;
file_entry[TLOG_NAME_POS + 2*FN_LEN]= 0;
if (!(*active_entry))
{
if (get_free_table_log_entry(active_entry, &write_header))
......@@ -739,6 +734,88 @@ write_execute_table_log_entry(uint first_entry,
}
/*
For complex rename operations we need to inactivate individual entries.
SYNOPSIS
inactivate_table_log_entry()
entry_no Entry position of record to change
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
During replace operations where we start with an existing table called
t1 and a replacement table called t1#temp or something else and where
we want to delete t1 and rename t1#temp to t1 this is not possible to
do in a safe manner unless the table log is informed of the phases in
the change.
Delete actions are 1-phase actions that can be ignored immediately after
being executed.
Rename actions from x to y is also a 1-phase action since there is no
interaction with any other handlers named x and y.
Replace action where drop y and x -> y happens needs to be a two-phase
action. Thus the first phase will drop y and the second phase will
rename x -> y.
*/
bool
inactivate_table_log_entry(uint entry_no)
{
bool error= TRUE;
const char *file_entry= (const char*)global_table_log.file_entry
DBUG_ENTER("inactivate_table_log_entry");
if (!read_table_log_file_entry(entry_no))
{
if (file_entry[TLOG_ENTRY_POS] == TLOG_LOG_ENTRY_CODE)
{
if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_DELETE_ACTION_CODE ||
file_entry[TLOG_ACTION_TYPE_POS] == TLOG_RENAME_ACTION_CODE ||
(file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE &&
file_entry[TLOG_PHASE_POS] == 1))
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_POS;
else if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE)
{
DBUG_ASSERT(file_entry[TLOG_PHASE_POS] == 0);
file_entry[TLOG_PHASE_POS]= 1;
}
else
{
DBUG_ASSERT(0);
}
if (!write_table_log_file_entry(entry_no))
error= FALSE;
}
}
DBUG_RETURN(error);
}
/*
Sync table log file
SYNOPSIS
sync_table_log()
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
sync_table_log()
{
bool error= FALSE;
DBUG_ENTER("sync_table_log");
if (my_sync(global_table_log.file_id, MYF(0)))
{
/* Write to error log */
error= TRUE;
}
DBUG_RETURN(error);
}
/*
Release a log memory entry
SYNOPSIS
......@@ -795,7 +872,8 @@ execute_table_log_entry(uint first_entry)
/* Write to error log and continue with next log entry */
break;
}
DBUG_ASSERT(table_log_entry.entry_type == 'i');
DBUG_ASSERT(table_log_entry.entry_type == TLOG_LOG_ENTRY_CODE ||
table_log_entry.entry_type == TLOG_IGNORE_LOG_ENTRY_CODE);
if (execute_table_log_action(&table_log_entry))
{
DBUG_ASSERT(0);
......@@ -831,7 +909,7 @@ execute_table_log_recovery()
/* Write to error log */
break;
}
if (table_log_entry.entry_type == 'e')
if (table_log_entry.entry_type == TLOG_EXECUTE_CODE)
{
if (execute_table_log_entry(table_log_entry.next_entry))
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment