WL 2826: Error handling for ALTER TABLE for partitioning

Step 14: First version of table log for add/Drop partition
parent c12c0952
...@@ -391,88 +391,6 @@ int ha_partition::ha_initialise() ...@@ -391,88 +391,6 @@ int ha_partition::ha_initialise()
/**************************************************************************** /****************************************************************************
MODULE meta data changes MODULE meta data changes
****************************************************************************/ ****************************************************************************/
/*
Create partition names
SYNOPSIS
create_partition_name()
out:out Created partition name string
in1 First part
in2 Second part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the partition name, service routine to
the del_ren_cre_table method.
*/
#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
static void create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate)
{
char transl_part_name[FN_REFLEN];
const char *transl_part;
if (translate)
{
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
transl_part= transl_part_name;
}
else
transl_part= in2;
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}
/*
Create subpartition name
SYNOPSIS
create_subpartition_name()
out:out Created partition name string
in1 First part
in2 Second part
in3 Third part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the subpartition name, service routine to
the del_ren_cre_table method.
*/
static void create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant)
{
char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#REN#", NullS);
}
/* /*
Delete a table Delete a table
......
...@@ -799,6 +799,8 @@ typedef int (*get_partitions_in_range_iter)(partition_info *part_info, ...@@ -799,6 +799,8 @@ typedef int (*get_partitions_in_range_iter)(partition_info *part_info,
PARTITION_ITERATOR *part_iter); PARTITION_ITERATOR *part_iter);
struct TABLE_LOG_MEMORY_ENTRY;
class partition_info : public Sql_alloc class partition_info : public Sql_alloc
{ {
public: public:
...@@ -845,7 +847,9 @@ public: ...@@ -845,7 +847,9 @@ public:
Item *subpart_expr; Item *subpart_expr;
Item *item_free_list; Item *item_free_list;
TABLE_LOG_MEMORY_ENTRY *first_log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry;
/* /*
A bitmap of partitions used by the current query. A bitmap of partitions used by the current query.
Usage pattern: Usage pattern:
......
...@@ -1135,6 +1135,16 @@ uint prep_alter_part_table(THD *thd, TABLE *table, ALTER_INFO *alter_info, ...@@ -1135,6 +1135,16 @@ uint prep_alter_part_table(THD *thd, TABLE *table, ALTER_INFO *alter_info,
bool remove_table_from_cache(THD *thd, const char *db, const char *table, bool remove_table_from_cache(THD *thd, const char *db, const char *table,
uint flags); uint flags);
#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
void create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate);
void create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant);
typedef struct st_lock_param_type typedef struct st_lock_param_type
{ {
ulonglong copied; ulonglong copied;
......
...@@ -5063,6 +5063,50 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt) ...@@ -5063,6 +5063,50 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
} }
/*
Insert log entry into list
SYNOPSIS
insert_part_info_log_entry_list()
log_entry
RETURN VALUES
NONE
*/
static
void
insert_part_info_log_entry_list(partition_info *part_info,
TABLE_LOG_MEMORY_ENTRY *log_entry)
{
log_entry->next_active_log_entry= part_info->first_log_entry;
part_info->first_log_entry= log_entry;
}
/*
Release all log entries for this partition info struct
SYNOPSIS
release_part_info_log_entries()
first_log_entry First log entry in list to release
RETURN VALUES
NONE
*/
static
void
release_part_info_log_entries(TABLE_LOG_MEMORY_ENTRY *log_entry)
{
DBUG_ENTER("release_part_info_log_entries");
while (log_entry)
{
release_table_log_memory_entry(log_entry);
log_entry= log_entry->next_log_entry;
}
part_info->first_log_entry= NULL;
DBUG_VOID_RETURN;
}
/* /*
Write the log entry to ensure that the shadow frm file is removed at Write the log entry to ensure that the shadow frm file is removed at
crash. crash.
...@@ -5082,11 +5126,111 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt) ...@@ -5082,11 +5126,111 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
bool bool
write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm) write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm)
{ {
TABLE_LOG_ENTRY table_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
char shadow_path[FN_LEN];
DBUG_ENTER("write_log_shadow_frm"); DBUG_ENTER("write_log_shadow_frm");
lock_global_table_log(); lock_global_table_log();
do
{
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
table_log_entry.action_type= 'd';
table_log_entry.next_entry= 0;
table_log_entry.handler_type= "frm";
table_log_entry.name= shadow_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
break;
insert_part_info_log_entry_list(part_info, log_entry);
if (write_execute_table_log_entry(log_entry->entry_pos, &log_entry))
break;
part_info->exec_log_entry= log_entry;
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log(); unlock_global_table_log();
DBUG_RETURN(TRUE);
}
/*
Log dropped partitions
SYNOPSIS
write_log_dropped_partitions()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry,
const char *path)
{
TABLE_LOG_ENTRY table_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
uint no_elements= part_info->partitions.elements;
DBUG_ENTER("write_log_dropped_partitions");
table_log_entry.action_type= 'd';
do
{
partition_element part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED ||
part_elem->part_state == PART_TO_BE_ADDED)
{
if (is_sub_partitioned(part_info))
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
uint no_subparts= part_info->no_subparts;
do
{
partition_element *sub_elem= sub_it++;
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
table_log_entry.name= tmp_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
insert_part_info_log_entry_list(part_info, log_entry);
} while (++i < no_subparts);
}
else
{
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
NORMAL_PART_NAME, TRUE);
table_log_entry.name= tmp_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
insert_part_info_log_entry_list(part_info, log_entry);
}
}
} while (++i < no_elements);
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
error:
} }
...@@ -5108,11 +5252,47 @@ write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm) ...@@ -5108,11 +5252,47 @@ write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm)
bool bool
write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt) write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{ {
TABLE_LOG_ENTRY table_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
char path[FN_LEN];
uint next_entry= 0;
TABLE_LOG_MEMORY_ENTRY *old_log_entry= part_info->first_log_entry;
DBUG_ENTER("write_log_drop_partition"); DBUG_ENTER("write_log_drop_partition");
part_info->first_log_entry= NULL;
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
lock_global_table_log(); lock_global_table_log();
do
{
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path))
break;
/*
At first we write an entry that installs the new frm file
*/
build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
lpt->table_name, "#");
table_log_entry.action_type= 'r';
table_log_entry.next_entry= next_entry;
table_log_entry.handler_type= "frm";
table_log_entry.name= path;
table_log_entry.from_name= tmp_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
break;
insert_part_info_log_entry_list(part_info, log_entry);
log_entry= part_info->exec_log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos, NULL))
break;
release_part_info_log_entries(old_first_log_entry);
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
release_part_info_log_entries(part_info->first_log_entry);
part_info->first_log_entry= old_log_entry;
unlock_global_table_log(); unlock_global_table_log();
DBUG_RETURN(FALSE); DBUG_RETURN(TRUE);
} }
...@@ -5129,16 +5309,48 @@ write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt) ...@@ -5129,16 +5309,48 @@ write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
DESCRIPTION DESCRIPTION
Prepare entries to the table log indicating all partitions to drop and to Prepare entries to the table log indicating all partitions to drop and to
remove the shadow frm file. remove the shadow frm file.
The removal of the shadow frm file is already in the log file so we only
need to link the new entries to the existing and carefully ensure that
the new linked list has first the dropped partitions and then the
drop of the shadow frm file.
We always inject entries backwards in the list in the table log since we
don't know the entry position until we have written it.
*/ */
bool bool
write_log_add_partition(ALTER_PARTITION_PARAM_TYPE *lpt) write_log_add_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{ {
DBUG_ENTER("write_log_add_partition"); DBUG_ENTER("write_log_add_partition");
TABLE_LOG_ENTRY table_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
char path[FN_LEN];
TABLE_LOG_MEMORY_ENTRY *old_log_entry= part_info->first_log_entry;
uint next_entry= old_log_entry->entry_pos;
/* Ensure we linked the existing entries at the back */
DBUG_ENTER("write_log_add_partition");
part_info->first_log_entry= NULL;
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
lock_global_table_log(); lock_global_table_log();
do
{
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path))
break;
log_entry= part_info->first_log_entry;
/* Ensure first entry is the last dropped partition */
if (write_execute_table_log_entry(log_entry->entry_pos, NULL))
break;
release_part_info_log_entries(old_first_log_entry);
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
release_part_info_log_entries(part_info->first_log_entry);
part_info->first_log_entry= old_log_entry;
unlock_global_table_log(); unlock_global_table_log();
DBUG_RETURN(FALSE); DBUG_RETURN(TRUE);
} }
...@@ -6182,5 +6394,87 @@ static uint32 get_next_subpartition_via_walking(PARTITION_ITERATOR *part_iter) ...@@ -6182,5 +6394,87 @@ static uint32 get_next_subpartition_via_walking(PARTITION_ITERATOR *part_iter)
part_iter->field_vals.start++; part_iter->field_vals.start++;
return part_iter->part_info->get_subpartition_id(part_iter->part_info); return part_iter->part_info->get_subpartition_id(part_iter->part_info);
} }
/*
Create partition names
SYNOPSIS
create_partition_name()
out:out Created partition name string
in1 First part
in2 Second part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the partition name, service routine to
the del_ren_cre_table method.
*/
void
create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate)
{
char transl_part_name[FN_REFLEN];
const char *transl_part;
if (translate)
{
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
transl_part= transl_part_name;
}
else
transl_part= in2;
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}
/*
Create subpartition name
SYNOPSIS
create_subpartition_name()
out:out Created partition name string
in1 First part
in2 Second part
in3 Third part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the subpartition name, service routine to
the del_ren_cre_table method.
*/
void
create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant)
{
char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#REN#", NullS);
}
#endif #endif
...@@ -282,6 +282,7 @@ typedef struct st_global_table_log ...@@ -282,6 +282,7 @@ typedef struct st_global_table_log
File file_id; File file_id;
uint name_len; uint name_len;
uint handler_type_len; uint handler_type_len;
uint io_size;
} GLOBAL_TABLE_LOG; } GLOBAL_TABLE_LOG;
GLOBAL_TABLE_LOG global_table_log; GLOBAL_TABLE_LOG global_table_log;
...@@ -361,6 +362,8 @@ write_table_log_header() ...@@ -361,6 +362,8 @@ write_table_log_header()
int2store(&global_table_log.file_entry[4], const_var); int2store(&global_table_log.file_entry[4], const_var);
const_var= 32; const_var= 32;
int2store(&global_table_log.file_entry[6], const_var); int2store(&global_table_log.file_entry[6], const_var);
const_var= IO_SIZE;
int4store(&global_table_log.file_entry[8], const_var);
if (write_table_log_file_entry(0UL)) if (write_table_log_file_entry(0UL))
error= TRUE; error= TRUE;
DBUG_RETURN(error); DBUG_RETURN(error);
...@@ -384,9 +387,10 @@ read_table_log_file_entry(uint entry_no) ...@@ -384,9 +387,10 @@ read_table_log_file_entry(uint entry_no)
bool error= FALSE; bool error= FALSE;
File file_id= global_table_log.file_id; File file_id= global_table_log.file_id;
char *file_entry= (char*)global_table_log.file_entry; char *file_entry= (char*)global_table_log.file_entry;
uint io_size= global_table_log.io_size;
DBUG_ENTER("read_table_log_file_entry"); DBUG_ENTER("read_table_log_file_entry");
if (my_pread(file_id, file_entry, IO_SIZE, IO_SIZE * entry_no, MYF(0))) if (my_pread(file_id, file_entry, io_size, io_size * entry_no, MYF(0)))
error= TRUE; error= TRUE;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -429,6 +433,7 @@ read_table_log_header() ...@@ -429,6 +433,7 @@ read_table_log_header()
char *file_entry= (char*)global_table_log.file_entry; char *file_entry= (char*)global_table_log.file_entry;
char file_name[FN_REFLEN]; char file_name[FN_REFLEN];
uint entry_no; uint entry_no;
bool successful_open= FALSE;
DBUG_ENTER("read_table_log_header"); DBUG_ENTER("read_table_log_header");
bzero(file_entry, sizeof(global_table_log.file_entry)); bzero(file_entry, sizeof(global_table_log.file_entry));
...@@ -439,10 +444,14 @@ read_table_log_header() ...@@ -439,10 +444,14 @@ read_table_log_header()
{ {
/* Write message into error log */ /* Write message into error log */
} }
else
successful_open= TRUE;
} }
entry_no= uint4korr(&file_entry[0]); entry_no= uint4korr(&file_entry[0]);
global_table_log.name_len= uint2korr(&file_entry[4]); global_table_log.name_len= uint2korr(&file_entry[4]);
global_table_log.handler_type_len= uint2korr(&file_entry[6]); global_table_log.handler_type_len= uint2korr(&file_entry[6]);
if (successful_open)
global_table_log.io_size= uint4korr(&file_entry[8]);
global_table_log.first_free= NULL; global_table_log.first_free= NULL;
global_table_log.first_used= NULL; global_table_log.first_used= NULL;
global_table_log.no_entries= 0; global_table_log.no_entries= 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment