Commit ddf50f7c authored by unknown's avatar unknown

Merge mronstrom@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  c-870ae253.1238-1-64736c10.cust.bredbandsbolaget.se:/home/pappa/wl2826


sql/partition_info.h:
  Auto merged
parents 1a6219fd d73562fb
......@@ -116,6 +116,7 @@ valgrind_flags="$valgrind_flags -DMYSQL_SERVER_SUFFIX=-valgrind-max"
# Used in -debug builds
debug_cflags="-DUNIV_MUST_NOT_INLINE -DEXTRA_DEBUG -DFORCE_INIT_OF_VARS "
debug_cflags="$debug_cflags -DSAFEMALLOC -DPEDANTIC_SAFEMALLOC -DSAFE_MUTEX"
error_inject="--with-error-inject "
#
# Base C++ flags for all builds
base_cxxflags="-felide-constructors -fno-exceptions -fno-rtti"
......
......@@ -4,6 +4,6 @@ path=`dirname $0`
. "$path/SETUP.sh" "$@" --with-debug=full
extra_flags="$pentium_cflags $debug_cflags"
extra_configs="$pentium_configs $debug_configs $max_configs"
extra_configs="$pentium_configs $debug_configs $max_configs $error_inject"
. "$path/FINISH.sh"
......@@ -666,6 +666,7 @@ else
AC_MSG_RESULT([no])
fi
MYSQL_SYS_LARGEFILE
# Types that must be checked AFTER large file support is checked
......@@ -1608,6 +1609,21 @@ else
CXXFLAGS="$OPTIMIZE_CXXFLAGS -DDBUG_OFF $CXXFLAGS"
fi
# If we should allow error injection tests
AC_ARG_WITH(error-inject,
[ --with-error-inject Enable error injection in MySQL Server],
[ with_error_inject=$withval ],
[ with_error_inject=no ])
if test $with_debug != "no"
then
if test "$with_error_inject" = "yes"
then
AC_DEFINE([ERROR_INJECT_SUPPORT], [1],
[Enable error injection in MySQL Server])
fi
fi
AC_ARG_WITH([fast-mutexes],
AC_HELP_STRING([--with-fast-mutexes],
[Compile with fast mutexes (default is disabled)]),
......
......@@ -559,7 +559,7 @@ extern File my_register_filename(File fd, const char *FileName,
enum file_type type_of_file,
uint error_message_number, myf MyFlags);
extern File my_create(const char *FileName,int CreateFlags,
int AccsesFlags, myf MyFlags);
int AccessFlags, myf MyFlags);
extern int my_close(File Filedes,myf MyFlags);
extern File my_dup(File file, myf MyFlags);
extern int my_mkdir(const char *dir, int Flags, myf MyFlags);
......
......@@ -165,6 +165,20 @@ ENGINE=NDB
PARTITION BY KEY(c3) PARTITIONS 5;
ALTER TABLE t1 COALESCE PARTITION 4;
DROP TABLE t1;
CREATE TABLE t1 (a int primary key)
ENGINE=NDB
PARTITION BY KEY(a);
ALTER TABLE t1 OPTIMIZE PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
ALTER TABLE t1 CHECK PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
ALTER TABLE t1 REPAIR PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
ALTER TABLE t1 ANALYZE PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
ALTER TABLE t1 REBUILD PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
DROP TABLE t1;
CREATE TABLE t1 (
c1 MEDIUMINT NOT NULL AUTO_INCREMENT,
c2 TEXT NOT NULL,
......
......@@ -154,6 +154,24 @@ ALTER TABLE t1 COALESCE PARTITION 4;
DROP TABLE t1;
#
# Bug 16822: OPTIMIZE TABLE hangs test
#
CREATE TABLE t1 (a int primary key)
ENGINE=NDB
PARTITION BY KEY(a);
--error 1031
ALTER TABLE t1 OPTIMIZE PARTITION p0;
--error 1031
ALTER TABLE t1 CHECK PARTITION p0;
--error 1031
ALTER TABLE t1 REPAIR PARTITION p0;
--error 1031
ALTER TABLE t1 ANALYZE PARTITION p0;
--error 1031
ALTER TABLE t1 REBUILD PARTITION p0;
DROP TABLE t1;
#
# BUG 16806: ALTER TABLE fails
#
......
......@@ -4711,7 +4711,10 @@ int ha_ndbcluster::create(const char *name,
DBUG_RETURN(my_errno);
}
int ha_ndbcluster::create_handler_files(const char *file, HA_CREATE_INFO *info)
int ha_ndbcluster::create_handler_files(const char *file,
const char *old_name,
int action_flag,
HA_CREATE_INFO *info)
{
char path[FN_REFLEN];
const char *name;
......@@ -4723,6 +4726,10 @@ int ha_ndbcluster::create_handler_files(const char *file, HA_CREATE_INFO *info)
DBUG_ENTER("create_handler_files");
if (action_flag != CHF_INDEX_FLAG)
{
DBUG_RETURN(FALSE);
}
DBUG_PRINT("enter", ("file: %s", file));
if (!(ndb= get_ndb()))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
......
......@@ -610,7 +610,8 @@ class ha_ndbcluster: public handler
int rename_table(const char *from, const char *to);
int delete_table(const char *name);
int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
int create_handler_files(const char *file, HA_CREATE_INFO *info);
int create_handler_files(const char *file, const char *old_name,
int action_flag, HA_CREATE_INFO *info);
int get_default_no_partitions(ulonglong max_rows);
bool get_no_parts(const char *name, uint *no_parts);
void set_auto_partitions(partition_info *part_info);
......
......@@ -403,88 +403,6 @@ int ha_partition::ha_initialise()
/****************************************************************************
MODULE meta data changes
****************************************************************************/
/*
Create partition names
SYNOPSIS
create_partition_name()
out:out Created partition name string
in1 First part
in2 Second part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the partition name, service routine to
the del_ren_cre_table method.
*/
#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
static void create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate)
{
char transl_part_name[FN_REFLEN];
const char *transl_part;
if (translate)
{
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
transl_part= transl_part_name;
}
else
transl_part= in2;
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}
/*
Create subpartition name
SYNOPSIS
create_subpartition_name()
out:out Created partition name string
in1 First part
in2 Second part
in3 Third part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the subpartition name, service routine to
the del_ren_cre_table method.
*/
static void create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant)
{
char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#REN#", NullS);
}
/*
Delete a table
......@@ -576,7 +494,9 @@ int ha_partition::rename_table(const char *from, const char *to)
and types of engines in the partitions.
*/
int ha_partition::create_handler_files(const char *name,
int ha_partition::create_handler_files(const char *path,
const char *old_path,
int action_flag,
HA_CREATE_INFO *create_info)
{
DBUG_ENTER("ha_partition::create_handler_files()");
......@@ -585,10 +505,29 @@ int ha_partition::create_handler_files(const char *name,
We need to update total number of parts since we might write the handler
file as part of a partition management command
*/
if (create_handler_file(name))
if (action_flag == CHF_DELETE_FLAG ||
action_flag == CHF_RENAME_FLAG)
{
my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
DBUG_RETURN(1);
char name[FN_REFLEN];
char old_name[FN_REFLEN];
strxmov(name, path, ha_par_ext, NullS);
strxmov(old_name, old_path, ha_par_ext, NullS);
if ((action_flag == CHF_DELETE_FLAG &&
my_delete(name, MYF(MY_WME))) ||
(action_flag == CHF_RENAME_FLAG &&
my_rename(old_name, name, MYF(MY_WME))))
{
DBUG_RETURN(TRUE);
}
}
else if (action_flag == CHF_CREATE_FLAG)
{
if (create_handler_file(path))
{
my_error(ER_CANT_CREATE_HANDLER_FILE, MYF(0));
DBUG_RETURN(1);
}
}
DBUG_RETURN(0);
}
......@@ -654,45 +593,26 @@ int ha_partition::create(const char *name, TABLE *table_arg,
int ha_partition::drop_partitions(const char *path)
{
List_iterator<partition_element> part_it(m_part_info->partitions);
List_iterator<partition_element> temp_it(m_part_info->temp_partitions);
char part_name_buff[FN_REFLEN];
uint no_parts= m_part_info->partitions.elements;
uint part_count= 0;
uint no_subparts= m_part_info->no_subparts;
uint i= 0;
uint name_variant;
int error= 1;
bool reorged_parts= (m_reorged_parts > 0);
bool temp_partitions= (m_part_info->temp_partitions.elements > 0);
int ret_error;
int error= 0;
DBUG_ENTER("ha_partition::drop_partitions");
if (temp_partitions)
no_parts= m_part_info->temp_partitions.elements;
do
{
partition_element *part_elem;
if (temp_partitions)
{
/*
We need to remove the reorganised partitions that were put in the
temp_partitions-list.
*/
part_elem= temp_it++;
DBUG_ASSERT(part_elem->part_state == PART_TO_BE_DROPPED);
}
else
part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED ||
part_elem->part_state == PART_IS_CHANGED)
partition_element *part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED)
{
handler *file;
/*
This part is to be dropped, meaning the part or all its subparts.
*/
name_variant= NORMAL_PART_NAME;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_TO_BE_DROPPED && temp_partitions))
name_variant= RENAMED_PART_NAME;
if (m_is_sub_partitioned)
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
......@@ -704,12 +624,10 @@ int ha_partition::drop_partitions(const char *path)
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name, name_variant);
if (reorged_parts)
file= m_reorged_file[part_count++];
else
file= m_file[part];
file= m_file[part];
DBUG_PRINT("info", ("Drop subpartition %s", part_name_buff));
error= file->delete_table((const char *) part_name_buff);
if ((ret_error= file->delete_table((const char *) part_name_buff)))
error= ret_error;
} while (++j < no_subparts);
}
else
......@@ -717,12 +635,10 @@ int ha_partition::drop_partitions(const char *path)
create_partition_name(part_name_buff, path,
part_elem->partition_name, name_variant,
TRUE);
if (reorged_parts)
file= m_reorged_file[part_count++];
else
file= m_file[i];
file= m_file[i];
DBUG_PRINT("info", ("Drop partition %s", part_name_buff));
error= file->delete_table((const char *) part_name_buff);
if ((ret_error= file->delete_table((const char *) part_name_buff)))
error= ret_error;
}
if (part_elem->part_state == PART_IS_CHANGED)
part_elem->part_state= PART_NORMAL;
......@@ -764,7 +680,8 @@ int ha_partition::rename_partitions(const char *path)
uint no_subparts= m_part_info->no_subparts;
uint i= 0;
uint j= 0;
int error= 1;
int error= 0;
int ret_error;
uint temp_partitions= m_part_info->temp_partitions.elements;
handler *file;
partition_element *part_elem, *sub_elem;
......@@ -772,6 +689,14 @@ int ha_partition::rename_partitions(const char *path)
if (temp_partitions)
{
/*
These are the reorganised partitions that have already been copied.
We delete the partitions and log the delete by inactivating the
delete log entry in the table log. We only need to synchronise
these writes before moving to the next loop since there is no
interaction among reorganised partitions, they cannot have the
same name.
*/
do
{
part_elem= temp_it++;
......@@ -782,39 +707,59 @@ int ha_partition::rename_partitions(const char *path)
{
sub_elem= sub_it++;
file= m_reorged_file[part_count++];
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
RENAMED_PART_NAME);
create_subpartition_name(norm_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
norm_name_buff, part_name_buff));
error= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL; /* Indicate success */
} while (++j < no_subparts);
}
else
{
file= m_reorged_file[part_count++];
create_partition_name(part_name_buff, path,
part_elem->partition_name, RENAMED_PART_NAME,
TRUE);
create_partition_name(norm_name_buff, path,
part_elem->partition_name, NORMAL_PART_NAME,
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
norm_name_buff, part_name_buff));
error= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
else
part_elem->log_entry= NULL; /* Indicate success */
}
} while (++i < temp_partitions);
VOID(sync_ddl_log());
}
i= 0;
do
{
/*
When state is PART_IS_CHANGED it means that we have created a new
TEMP partition that is to be renamed to normal partition name and
we are to delete the old partition with currently the normal name.
We perform this operation by
1) Delete old partition with normal partition name
2) Signal this in table log entry
3) Synch table log to ensure we have consistency in crashes
4) Rename temporary partition name to normal partition name
5) Signal this to table log entry
It is not necessary to synch the last state since a new rename
should not corrupt things if there was no temporary partition.
The only other parts we need to cater for are new parts that
replace reorganised parts. The reorganised parts were deleted
by the code above that goes through the temp_partitions list.
Thus the synch above makes it safe to simply perform step 4 and 5
for those entries.
*/
part_elem= part_it++;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_IS_ADDED && temp_partitions))
......@@ -836,14 +781,12 @@ int ha_partition::rename_partitions(const char *path)
if (part_elem->part_state == PART_IS_CHANGED)
{
file= m_reorged_file[part_count++];
create_subpartition_name(part_name_buff, path,
part_elem->partition_name,
sub_elem->partition_name,
RENAMED_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
norm_name_buff, part_name_buff));
error= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
VOID(sync_ddl_log());
}
file= m_new_file[part];
create_subpartition_name(part_name_buff, path,
......@@ -852,8 +795,13 @@ int ha_partition::rename_partitions(const char *path)
TEMP_PART_NAME);
DBUG_PRINT("info", ("Rename subpartition from %s to %s",
part_name_buff, norm_name_buff));
error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff);
if ((ret_error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL;
} while (++j < no_subparts);
}
else
......@@ -864,13 +812,12 @@ int ha_partition::rename_partitions(const char *path)
if (part_elem->part_state == PART_IS_CHANGED)
{
file= m_reorged_file[part_count++];
create_partition_name(part_name_buff, path,
part_elem->partition_name, RENAMED_PART_NAME,
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
norm_name_buff, part_name_buff));
error= file->rename_table((const char *) norm_name_buff,
(const char *) part_name_buff);
DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
VOID(sync_ddl_log());
}
file= m_new_file[i];
create_partition_name(part_name_buff, path,
......@@ -878,11 +825,17 @@ int ha_partition::rename_partitions(const char *path)
TRUE);
DBUG_PRINT("info", ("Rename partition from %s to %s",
part_name_buff, norm_name_buff));
error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff);
if ((ret_error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff)))
error= ret_error;
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
else
part_elem->log_entry= NULL;
}
}
} while (++i < no_parts);
VOID(sync_ddl_log());
DBUG_RETURN(error);
}
......@@ -1204,7 +1157,6 @@ int ha_partition::prepare_new_partition(TABLE *table,
error:
if (create_flag)
VOID(file->delete_table(part_name));
print_error(error, MYF(0));
DBUG_RETURN(error);
}
......@@ -1331,7 +1283,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(m_reorged_parts + 1))))
{
mem_alloc_error(sizeof(partition_element*)*(m_reorged_parts+1));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
/*
......@@ -1363,7 +1315,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(2*(no_remain_partitions + 1)))))
{
mem_alloc_error(sizeof(handler*)*2*(no_remain_partitions+1));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
m_added_file= &new_file_array[no_remain_partitions + 1];
......@@ -1435,7 +1387,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
part_elem->engine_type)))
{
mem_alloc_error(sizeof(handler));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
} while (++j < no_subparts);
}
......@@ -1483,7 +1435,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(const char *)part_name_buff)))
{
cleanup_new_partition(part_count);
DBUG_RETURN(TRUE);
DBUG_RETURN(error);
}
m_added_file[part_count++]= new_file_array[part];
} while (++j < no_subparts);
......@@ -1499,7 +1451,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(const char *)part_name_buff)))
{
cleanup_new_partition(part_count);
DBUG_RETURN(TRUE);
DBUG_RETURN(error);
}
m_added_file[part_count++]= new_file_array[i];
}
......@@ -1605,8 +1557,7 @@ int ha_partition::copy_partitions(ulonglong *copied, ulonglong *deleted)
}
DBUG_RETURN(FALSE);
error:
print_error(result, MYF(0));
DBUG_RETURN(TRUE);
DBUG_RETURN(result);
}
......@@ -1873,8 +1824,8 @@ bool ha_partition::create_handler_file(const char *name)
{
part_elem= part_it++;
if (part_elem->part_state != PART_NORMAL &&
part_elem->part_state != PART_IS_ADDED &&
part_elem->part_state != PART_IS_CHANGED)
part_elem->part_state != PART_TO_BE_ADDED &&
part_elem->part_state != PART_CHANGED)
continue;
tablename_to_filename(part_elem->partition_name, part_name,
FN_REFLEN);
......@@ -1925,8 +1876,8 @@ bool ha_partition::create_handler_file(const char *name)
{
part_elem= part_it++;
if (part_elem->part_state != PART_NORMAL &&
part_elem->part_state != PART_IS_ADDED &&
part_elem->part_state != PART_IS_CHANGED)
part_elem->part_state != PART_TO_BE_ADDED &&
part_elem->part_state != PART_CHANGED)
continue;
if (!m_is_sub_partitioned)
{
......
......@@ -179,7 +179,8 @@ class ha_partition :public handler
virtual int rename_table(const char *from, const char *to);
virtual int create(const char *name, TABLE *form,
HA_CREATE_INFO *create_info);
virtual int create_handler_files(const char *name,
virtual int create_handler_files(const char *name,
const char *old_name, int action_flag,
HA_CREATE_INFO *create_info);
virtual void update_create_info(HA_CREATE_INFO *create_info);
virtual char *update_table_comment(const char *comment);
......
......@@ -632,6 +632,7 @@ typedef struct {
#define UNDEF_NODEGROUP 65535
class Item;
struct st_table_log_memory_entry;
class partition_info;
......@@ -639,7 +640,6 @@ struct st_partition_iter;
#define NOT_A_PARTITION_ID ((uint32)-1)
typedef struct st_ha_create_information
{
CHARSET_INFO *table_charset, *default_table_charset;
......@@ -1379,8 +1379,15 @@ class handler :public Sql_alloc
virtual void drop_table(const char *name);
virtual int create(const char *name, TABLE *form, HA_CREATE_INFO *info)=0;
virtual int create_handler_files(const char *name, HA_CREATE_INFO *info)
{ return FALSE;}
#define CHF_CREATE_FLAG 0
#define CHF_DELETE_FLAG 1
#define CHF_RENAME_FLAG 2
#define CHF_INDEX_FLAG 3
virtual int create_handler_files(const char *name, const char *old_name,
int action_flag, HA_CREATE_INFO *info)
{ return FALSE; }
virtual int change_partitions(HA_CREATE_INFO *create_info,
const char *path,
......
......@@ -613,6 +613,100 @@ struct Query_cache_query_flags
#define query_cache_invalidate_by_MyISAM_filename_ref NULL
#endif /*HAVE_QUERY_CACHE*/
/*
Error injector Macros to enable easy testing of recovery after failures
in various error cases.
*/
#ifndef ERROR_INJECT_SUPPORT
#define ERROR_INJECT(x) 0
#define ERROR_INJECT_ACTION(x,action) 0
#define ERROR_INJECT_CRASH(x) 0
#define ERROR_INJECT_VALUE(x) 0
#define ERROR_INJECT_VALUE_ACTION(x,action) 0
#define ERROR_INJECT_VALUE_CRASH(x) 0
#define SET_ERROR_INJECT_VALUE(x)
#else
inline bool check_and_unset_keyword(const char *dbug_str)
{
const char *extra_str= "-d,";
char total_str[200];
if (_db_strict_keyword_ (dbug_str))
{
strxmov(total_str, extra_str, dbug_str, NullS);
DBUG_SET(total_str);
return 1;
}
return 0;
}
inline bool
check_and_unset_inject_value(int value)
{
THD *thd= current_thd;
if (thd->error_inject_value == (uint)value)
{
thd->error_inject_value= 0;
return 1;
}
return 0;
}
/*
ERROR INJECT MODULE:
--------------------
These macros are used to insert macros from the application code.
The event that activates those error injections can be activated
from SQL by using:
SET SESSION dbug=+d,code;
After the error has been injected, the macros will automatically
remove the debug code, thus similar to using:
SET SESSION dbug=-d,code
from SQL.
ERROR_INJECT_CRASH will inject a crash of the MySQL Server if code
is set when macro is called. ERROR_INJECT_CRASH can be used in
if-statements, it will always return FALSE unless of course it
crashes in which case it doesn't return at all.
ERROR_INJECT_ACTION will inject the action specified in the action
parameter of the macro, before performing the action the code will
be removed such that no more events occur. ERROR_INJECT_ACTION
can also be used in if-statements and always returns FALSE.
ERROR_INJECT can be used in a normal if-statement, where the action
part is performed in the if-block. The macro returns TRUE if the
error was activated and otherwise returns FALSE. If activated the
code is removed.
Sometimes it is necessary to perform error inject actions as a serie
of events. In this case one can use one variable on the THD object.
Thus one sets this value by using e.g. SET_ERROR_INJECT_VALUE(100).
Then one can later test for it by using ERROR_INJECT_CRASH_VALUE,
ERROR_INJECT_ACTION_VALUE and ERROR_INJECT_VALUE. This have the same
behaviour as the above described macros except that they use the
error inject value instead of a code used by DBUG macros.
*/
#define SET_ERROR_INJECT_VALUE(x) \
current_thd->error_inject_value= (x)
#define ERROR_INJECT_CRASH(code) \
DBUG_EVALUATE_IF(code, (abort(), 0), 0)
#define ERROR_INJECT_ACTION(code, action) \
(check_and_unset_keyword(code) ? ((action), 0) : 0)
#define ERROR_INJECT(code) \
check_and_unset_keyword(code)
#define ERROR_INJECT_VALUE(value) \
check_and_unset_inject_value(value)
#define ERROR_INJECT_VALUE_ACTION(value,action) \
(check_and_unset_inject_value(value) ? (action) : 0)
#define ERROR_INJECT_VALUE_CRASH(value) \
ERROR_INJECT_VALUE_ACTION(value, (abort(), 0))
#endif
uint build_table_path(char *buff, size_t bufflen, const char *db,
const char *table, const char *ext);
void write_bin_log(THD *thd, bool clear_error,
......@@ -1090,6 +1184,16 @@ uint prep_alter_part_table(THD *thd, TABLE *table, ALTER_INFO *alter_info,
bool remove_table_from_cache(THD *thd, const char *db, const char *table,
uint flags);
#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
void create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate);
void create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant);
typedef struct st_lock_param_type
{
ulonglong copied;
......@@ -1109,14 +1213,94 @@ typedef struct st_lock_param_type
uint key_count;
uint db_options;
uint pack_frm_len;
partition_info *part_info;
} ALTER_PARTITION_PARAM_TYPE;
void mem_alloc_error(size_t size);
#define WFRM_INITIAL_WRITE 1
#define WFRM_CREATE_HANDLER_FILES 2
enum ddl_log_entry_code
{
/*
DDL_LOG_EXECUTE_CODE:
This is a code that indicates that this is a log entry to
be executed, from this entry a linked list of log entries
can be found and executed.
DDL_LOG_ENTRY_CODE:
An entry to be executed in a linked list from an execute log
entry.
DDL_IGNORE_LOG_ENTRY_CODE:
An entry that is to be ignored
*/
DDL_LOG_EXECUTE_CODE = 'e',
DDL_LOG_ENTRY_CODE = 'l',
DDL_IGNORE_LOG_ENTRY_CODE = 'i'
};
enum ddl_log_action_code
{
/*
The type of action that a DDL_LOG_ENTRY_CODE entry is to
perform.
DDL_LOG_DELETE_ACTION:
Delete an entity
DDL_LOG_RENAME_ACTION:
Rename an entity
DDL_LOG_REPLACE_ACTION:
Rename an entity after removing the previous entry with the
new name, that is replace this entry.
*/
DDL_LOG_DELETE_ACTION = 'd',
DDL_LOG_RENAME_ACTION = 'r',
DDL_LOG_REPLACE_ACTION = 's'
};
typedef struct st_ddl_log_entry
{
const char *name;
const char *from_name;
const char *handler_name;
uint next_entry;
uint entry_pos;
enum ddl_log_entry_code entry_type;
enum ddl_log_action_code action_type;
/*
Most actions have only one phase. REPLACE does however have two
phases. The first phase removes the file with the new name if
there was one there before and the second phase renames the
old name to the new name.
*/
char phase;
} DDL_LOG_ENTRY;
typedef struct st_ddl_log_memory_entry
{
uint entry_pos;
struct st_ddl_log_memory_entry *next_log_entry;
struct st_ddl_log_memory_entry *prev_log_entry;
struct st_ddl_log_memory_entry *next_active_log_entry;
} DDL_LOG_MEMORY_ENTRY;
bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DDL_LOG_MEMORY_ENTRY **active_entry);
bool write_execute_ddl_log_entry(uint first_entry,
bool complete,
DDL_LOG_MEMORY_ENTRY **active_entry);
bool deactivate_ddl_log_entry(uint entry_no);
void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry);
bool sync_ddl_log();
void release_ddl_log();
void execute_ddl_log_recovery();
bool execute_ddl_log_entry(THD *thd, uint first_entry);
extern pthread_mutex_t LOCK_gdl;
#define WFRM_WRITE_SHADOW 1
#define WFRM_INSTALL_SHADOW 2
#define WFRM_PACK_FRM 4
bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags);
bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
int abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table);
......
......@@ -3605,6 +3605,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
unireg_abort(1);
}
}
execute_ddl_log_recovery();
create_shutdown_thread();
create_maintenance_thread();
......@@ -3656,6 +3657,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
release_ddl_log();
#if defined(__WIN__) && !defined(EMBEDDED_LIBRARY)
if (Service.IsNT() && start_mode)
Service.Stop();
......
......@@ -36,6 +36,8 @@ enum partition_state {
PART_IS_ADDED= 8
};
struct st_ddl_log_memory_entry;
class partition_element :public Sql_alloc {
public:
List<partition_element> subpartitions;
......@@ -44,6 +46,7 @@ class partition_element :public Sql_alloc {
ulonglong part_min_rows;
char *partition_name;
char *tablespace_name;
struct st_ddl_log_memory_entry *log_entry;
longlong range_value;
char* part_comment;
char* data_file_name;
......@@ -55,7 +58,8 @@ class partition_element :public Sql_alloc {
partition_element()
: part_max_rows(0), part_min_rows(0), partition_name(NULL),
tablespace_name(NULL), range_value(0), part_comment(NULL),
tablespace_name(NULL), log_entry(NULL),
range_value(0), part_comment(NULL),
data_file_name(NULL), index_file_name(NULL),
engine_type(NULL),part_state(PART_NORMAL),
nodegroup_id(UNDEF_NODEGROUP), has_null_value(FALSE)
......
......@@ -28,7 +28,7 @@ typedef int (*get_part_id_func)(partition_info *part_info,
longlong *func_value);
typedef uint32 (*get_subpart_id_func)(partition_info *part_info);
struct st_ddl_log_memory_entry;
class partition_info : public Sql_alloc
{
......@@ -76,7 +76,11 @@ class partition_info : public Sql_alloc
Item *subpart_expr;
Item *item_free_list;
struct st_ddl_log_memory_entry *first_log_entry;
struct st_ddl_log_memory_entry *exec_log_entry;
struct st_ddl_log_memory_entry *frm_log_entry;
/*
A bitmap of partitions used by the current query.
Usage pattern:
......@@ -191,6 +195,7 @@ class partition_info : public Sql_alloc
part_field_array(NULL), subpart_field_array(NULL),
full_part_field_array(NULL),
part_expr(NULL), subpart_expr(NULL), item_free_list(NULL),
first_log_entry(NULL), exec_log_entry(NULL), frm_log_entry(NULL),
list_array(NULL),
part_info_string(NULL),
part_func_string(NULL), subpart_func_string(NULL),
......
......@@ -5826,6 +5826,8 @@ ER_NDB_CANT_SWITCH_BINLOG_FORMAT
eng "The NDB cluster engine does not support changing the binlog format on the fly yet"
ER_PARTITION_NO_TEMPORARY
eng "Cannot create temporary table with partitions"
ER_DDL_LOG_ERROR
eng "Error in DDL log"
ER_NULL_IN_VALUES_LESS_THAN
eng "Not allowed to use NULL value in VALUES LESS THAN"
swe "Det r inte tilltet att anvnda NULL-vrden i VALUES LESS THAN"
......
......@@ -6140,9 +6140,8 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
abort_and_upgrade_lock()
lpt Parameter passing struct
All parameters passed through the ALTER_PARTITION_PARAM_TYPE object
RETURN VALUES
TRUE Failure
FALSE Success
RETURN VALUE
0
DESCRIPTION
Remember old lock level (for possible downgrade later on), abort all
waiting threads and ensure that all keeping locks currently are
......@@ -6156,23 +6155,17 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
old_lock_level Old lock level
*/
bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
int abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
{
uint flags= RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG;
int error= FALSE;
DBUG_ENTER("abort_and_upgrade_locks");
lpt->old_lock_type= lpt->table->reginfo.lock_type;
VOID(pthread_mutex_lock(&LOCK_open));
mysql_lock_abort(lpt->thd, lpt->table, TRUE);
VOID(remove_table_from_cache(lpt->thd, lpt->db, lpt->table_name, flags));
if (lpt->thd->killed)
{
lpt->thd->no_warnings_for_error= 0;
error= TRUE;
}
VOID(pthread_mutex_unlock(&LOCK_open));
DBUG_RETURN(error);
DBUG_RETURN(0);
}
......
......@@ -223,6 +223,9 @@ THD::THD()
cuted_fields= sent_row_count= 0L;
limit_found_rows= 0;
statement_id_counter= 0UL;
#ifdef ERROR_INJECT_SUPPORT
error_inject_value= 0UL;
#endif
// Must be reset to handle error with THD's created for init of mysqld
lex->current_select= 0;
start_time=(time_t) 0;
......
......@@ -1099,6 +1099,9 @@ class THD :public Statement,
query_id_t query_id, warn_id;
ulong thread_id, col_access;
#ifdef ERROR_INJECT_SUPPORT
ulong error_inject_value;
#endif
/* Statement id is thread-wide. This counter is used to generate ids */
ulong statement_id_counter;
ulong rand_saved_seed1, rand_saved_seed2;
......
......@@ -1792,15 +1792,10 @@ char *generate_partition_syntax(partition_info *part_info,
char path[FN_REFLEN];
int err= 0;
List_iterator<partition_element> part_it(part_info->partitions);
List_iterator<partition_element> temp_it(part_info->temp_partitions);
File fptr;
char *buf= NULL; //Return buffer
uint use_temp= 0;
uint no_temp_parts= part_info->temp_partitions.elements;
bool write_part_state;
DBUG_ENTER("generate_partition_syntax");
write_part_state= (part_info->part_state && !part_info->part_state_len);
if (unlikely(((fptr= create_temp_file(path,mysql_tmpdir,"psy",
O_RDWR | O_BINARY | O_TRUNC |
O_TEMPORARY, MYF(MY_WME)))) < 0))
......@@ -1865,67 +1860,26 @@ char *generate_partition_syntax(partition_info *part_info,
err+= add_space(fptr);
}
}
no_parts= part_info->no_parts;
tot_no_parts= no_parts + no_temp_parts;
tot_no_parts= part_info->partitions.elements;
no_subparts= part_info->no_subparts;
if (write_all || (!part_info->use_default_partitions))
{
bool first= TRUE;
err+= add_begin_parenthesis(fptr);
i= 0;
do
{
/*
We need to do some clever list manipulation here since we have two
different needs for our list processing and here we take some of the
cost of using a simpler list processing for the other parts of the
code.
ALTER TABLE REORGANIZE PARTITIONS has the list of partitions to be
the final list as the main list and the reorganised partitions is in
the temporary partition list. Thus when finding the first part added
we insert the temporary list if there is such a list. If there is no
temporary list we are performing an ADD PARTITION.
*/
if (use_temp && use_temp <= no_temp_parts)
{
part_elem= temp_it++;
DBUG_ASSERT(no_temp_parts);
no_temp_parts--;
}
else if (use_temp)
{
DBUG_ASSERT(no_parts);
part_elem= save_part_elem;
use_temp= 0;
no_parts--;
}
else
part_elem= part_it++;
if (part_elem->part_state != PART_TO_BE_DROPPED &&
part_elem->part_state != PART_REORGED_DROPPED)
{
part_elem= part_it++;
if ((part_elem->part_state == PART_TO_BE_ADDED ||
part_elem->part_state == PART_IS_ADDED) && no_temp_parts)
{
save_part_elem= part_elem;
part_elem= temp_it++;
no_temp_parts--;
use_temp= 1;
}
else
if (!first)
{
DBUG_ASSERT(no_parts);
no_parts--;
}
}
if (part_elem->part_state != PART_IS_DROPPED)
{
if (write_part_state)
{
uint32 part_state_id= part_info->part_state_len;
part_info->part_state[part_state_id]= (uchar)part_elem->part_state;
part_info->part_state_len= part_state_id+1;
err+= add_comma(fptr);
err+= add_space(fptr);
}
first= FALSE;
err+= add_partition(fptr);
err+= add_name_string(fptr, part_elem->partition_name);
err+= add_space(fptr);
......@@ -1955,16 +1909,10 @@ char *generate_partition_syntax(partition_info *part_info,
err+= add_end_parenthesis(fptr);
} while (++j < no_subparts);
}
if (i != (tot_no_parts-1))
{
err+= add_comma(fptr);
err+= add_space(fptr);
}
}
if (i == (tot_no_parts-1))
err+= add_end_parenthesis(fptr);
} while (++i < tot_no_parts);
DBUG_ASSERT(!no_parts && !no_temp_parts);
}
if (err)
goto close_file;
......@@ -3525,27 +3473,6 @@ set_engine_all_partitions(partition_info *part_info,
}
} while (++i < part_info->no_parts);
}
/*
SYNOPSIS
fast_alter_partition_error_handler()
lpt Container for parameters
RETURN VALUES
None
DESCRIPTION
Support routine to clean up after failures of on-line ALTER TABLE
for partition management.
*/
static void fast_alter_partition_error_handler(ALTER_PARTITION_PARAM_TYPE *lpt)
{
DBUG_ENTER("fast_alter_partition_error_handler");
/* TODO: WL 2826 Error handling */
DBUG_VOID_RETURN;
}
/*
SYNOPSIS
fast_end_partition()
......@@ -3567,6 +3494,7 @@ static void fast_alter_partition_error_handler(ALTER_PARTITION_PARAM_TYPE *lpt)
static int fast_end_partition(THD *thd, ulonglong copied,
ulonglong deleted,
TABLE *table,
TABLE_LIST *table_list, bool is_empty,
ALTER_PARTITION_PARAM_TYPE *lpt,
bool written_bin_log)
......@@ -3594,7 +3522,7 @@ static int fast_end_partition(THD *thd, ulonglong copied,
send_ok(thd,copied+deleted,0L,tmp_name);
DBUG_RETURN(FALSE);
}
fast_alter_partition_error_handler(lpt);
table->file->print_error(error, MYF(0));
DBUG_RETURN(TRUE);
}
......@@ -3843,7 +3771,8 @@ uint prep_alter_part_table(THD *thd, TABLE *table, ALTER_INFO *alter_info,
after the change as before. Thus we can reply ok immediately
without any changes at all.
*/
DBUG_RETURN(fast_end_partition(thd, ULL(0), ULL(0), NULL,
DBUG_RETURN(fast_end_partition(thd, ULL(0), ULL(0),
table, NULL,
TRUE, NULL, FALSE));
}
else if (new_part_no > curr_part_no)
......@@ -4195,6 +4124,7 @@ that are reorganised.
my_error(ER_ROW_IS_REFERENCED, MYF(0));
DBUG_RETURN(TRUE);
}
tab_part_info->no_parts-= no_parts_dropped;
}
else if ((alter_info->flags & ALTER_OPTIMIZE_PARTITION) ||
(alter_info->flags & ALTER_ANALYZE_PARTITION) ||
......@@ -4239,6 +4169,11 @@ that are reorganised.
my_error(ER_DROP_PARTITION_NON_EXISTENT, MYF(0), ptr);
DBUG_RETURN(TRUE);
}
if (!(*fast_alter_partition))
{
table->file->print_error(HA_ERR_WRONG_COMMAND, MYF(0));
DBUG_RETURN(TRUE);
}
}
else if (alter_info->flags & ALTER_COALESCE_PARTITION)
{
......@@ -4663,14 +4598,22 @@ the generated partition syntax in a correct manner.
static bool mysql_change_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
{
char path[FN_REFLEN+1];
int error;
handler *file= lpt->table->file;
DBUG_ENTER("mysql_change_partitions");
build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
DBUG_RETURN(lpt->table->file->change_partitions(lpt->create_info, path,
&lpt->copied,
&lpt->deleted,
lpt->pack_frm_data,
lpt->pack_frm_len));
if ((error= file->change_partitions(lpt->create_info, path, &lpt->copied,
&lpt->deleted, lpt->pack_frm_data,
lpt->pack_frm_len)))
{
if (error != ER_OUTOFMEMORY)
file->print_error(error, MYF(0));
else
lpt->thd->fatal_error();
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
......@@ -4696,10 +4639,17 @@ static bool mysql_change_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool mysql_rename_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
{
char path[FN_REFLEN+1];
int error;
DBUG_ENTER("mysql_rename_partitions");
build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
DBUG_RETURN(lpt->table->file->rename_partitions(path));
if ((error= lpt->table->file->rename_partitions(path)))
{
if (error != 1)
lpt->table->file->print_error(error, MYF(0));
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
......@@ -4730,11 +4680,13 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
List_iterator<partition_element> part_it(part_info->partitions);
uint i= 0;
uint remove_count= 0;
int error;
DBUG_ENTER("mysql_drop_partitions");
build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
if (lpt->table->file->drop_partitions(path))
if ((error= lpt->table->file->drop_partitions(path)))
{
lpt->table->file->print_error(error, MYF(0));
DBUG_RETURN(TRUE);
}
do
......@@ -4751,6 +4703,767 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
}
/*
Insert log entry into list
SYNOPSIS
insert_part_info_log_entry_list()
log_entry
RETURN VALUES
NONE
*/
static void insert_part_info_log_entry_list(partition_info *part_info,
DDL_LOG_MEMORY_ENTRY *log_entry)
{
log_entry->next_active_log_entry= part_info->first_log_entry;
part_info->first_log_entry= log_entry;
}
/*
Release all log entries for this partition info struct
SYNOPSIS
release_part_info_log_entries()
first_log_entry First log entry in list to release
RETURN VALUES
NONE
*/
static void release_part_info_log_entries(DDL_LOG_MEMORY_ENTRY *log_entry)
{
DBUG_ENTER("release_part_info_log_entries");
while (log_entry)
{
release_ddl_log_memory_entry(log_entry);
log_entry= log_entry->next_active_log_entry;
}
DBUG_VOID_RETURN;
}
/*
Log an delete/rename frm file
SYNOPSIS
write_log_replace_delete_frm()
lpt Struct for parameters
next_entry Next reference to use in log record
from_path Name to rename from
to_path Name to rename to
replace_flag TRUE if replace, else delete
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Support routine that writes a replace or delete of an frm file into the
ddl log. It also inserts an entry that keeps track of used space into
the partition info object
*/
static bool write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
uint next_entry,
const char *from_path,
const char *to_path,
bool replace_flag)
{
DDL_LOG_ENTRY ddl_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
DBUG_ENTER("write_log_replace_delete_frm");
if (replace_flag)
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
ddl_log_entry.action_type= DDL_LOG_DELETE_ACTION;
ddl_log_entry.next_entry= next_entry;
ddl_log_entry.handler_name= reg_ext;
ddl_log_entry.name= to_path;
if (replace_flag)
ddl_log_entry.from_name= from_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
insert_part_info_log_entry_list(lpt->part_info, log_entry);
DBUG_RETURN(FALSE);
}
/*
Log final partition changes in change partition
SYNOPSIS
write_log_changed_partitions()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
This code is used to perform safe ADD PARTITION for HASH partitions
and COALESCE for HASH partitions and REORGANIZE for any type of
partitions.
We prepare entries for all partitions except the reorganised partitions
in REORGANIZE partition, those are handled by
write_log_dropped_partitions. For those partitions that are replaced
special care is needed to ensure that this is performed correctly and
this requires a two-phased approach with this log as a helper for this.
This code is closely intertwined with the code in rename_partitions in
the partition handler.
*/
static bool write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry, const char *path)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
char normal_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
uint temp_partitions= part_info->temp_partitions.elements;
uint no_elements= part_info->partitions.elements;
uint i= 0;
DBUG_ENTER("write_log_changed_partitions");
do
{
partition_element *part_elem= part_it++;
if (part_elem->part_state == PART_IS_CHANGED ||
(part_elem->part_state == PART_IS_ADDED && temp_partitions))
{
if (part_info->is_sub_partitioned())
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
uint no_subparts= part_info->no_subparts;
uint j= 0;
do
{
partition_element *sub_elem= sub_it++;
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
sub_elem->partition_name,
TEMP_PART_NAME);
create_subpartition_name(normal_path, path,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
ddl_log_entry.name= normal_path;
ddl_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
ddl_log_entry.action_type= DDL_LOG_RENAME_ACTION;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
sub_elem->log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
} while (++j < no_subparts);
}
else
{
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
TEMP_PART_NAME, TRUE);
create_partition_name(normal_path, path,
part_elem->partition_name,
NORMAL_PART_NAME, TRUE);
ddl_log_entry.name= normal_path;
ddl_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
ddl_log_entry.action_type= DDL_LOG_RENAME_ACTION;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
part_elem->log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
}
}
} while (++i < no_elements);
DBUG_RETURN(FALSE);
}
/*
Log dropped partitions
SYNOPSIS
write_log_dropped_partitions()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
*/
static bool write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry,
const char *path,
bool temp_list)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
List_iterator<partition_element> temp_it(part_info->temp_partitions);
uint no_temp_partitions= part_info->temp_partitions.elements;
uint no_elements= part_info->partitions.elements;
uint i= 0;
DBUG_ENTER("write_log_dropped_partitions");
ddl_log_entry.action_type= DDL_LOG_DELETE_ACTION;
if (temp_list)
no_elements= no_temp_partitions;
while (no_elements--)
{
partition_element *part_elem;
if (temp_list)
part_elem= temp_it++;
else
part_elem= part_it++;
if (part_elem->part_state == PART_TO_BE_DROPPED ||
part_elem->part_state == PART_TO_BE_ADDED ||
part_elem->part_state == PART_CHANGED)
{
uint name_variant;
if (part_elem->part_state == PART_CHANGED ||
(part_elem->part_state == PART_TO_BE_ADDED &&
no_temp_partitions))
name_variant= TEMP_PART_NAME;
else
name_variant= NORMAL_PART_NAME;
if (part_info->is_sub_partitioned())
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
uint no_subparts= part_info->no_subparts;
uint j= 0;
do
{
partition_element *sub_elem= sub_it++;
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
sub_elem->partition_name,
name_variant);
ddl_log_entry.name= tmp_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
if (temp_list)
sub_elem->log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
} while (++j < no_subparts);
}
else
{
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
name_variant, TRUE);
ddl_log_entry.name= tmp_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
*next_entry= log_entry->entry_pos;
if (temp_list)
part_elem->log_entry= log_entry;
insert_part_info_log_entry_list(part_info, log_entry);
}
}
}
DBUG_RETURN(FALSE);
}
/*
Set execute log entry in ddl log for this partitioned table
SYNOPSIS
set_part_info_exec_log_entry()
part_info Partition info object
exec_log_entry Log entry
RETURN VALUES
NONE
*/
static void set_part_info_exec_log_entry(partition_info *part_info,
DDL_LOG_MEMORY_ENTRY *exec_log_entry)
{
part_info->exec_log_entry= exec_log_entry;
exec_log_entry->next_active_log_entry= NULL;
}
/*
Write the log entry to ensure that the shadow frm file is removed at
crash.
SYNOPSIS
write_log_drop_shadow_frm()
lpt Struct containing parameters
install_frm Should we log action to install shadow frm or should
the action be to remove the shadow frm file.
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Prepare an entry to the ddl log indicating a drop/install of the shadow frm
file and its corresponding handler file.
*/
static bool write_log_drop_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
char shadow_path[FN_LEN];
DBUG_ENTER("write_log_drop_shadow_frm");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_replace_delete_frm(lpt, 0UL, NULL,
(const char*)shadow_path, FALSE))
goto error;
log_entry= part_info->first_log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
pthread_mutex_unlock(&LOCK_gdl);
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= NULL;
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Log renaming of shadow frm to real frm name and dropping of old frm
SYNOPSIS
write_log_rename_frm()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Prepare an entry to ensure that we complete the renaming of the frm
file if failure occurs in the middle of the rename process.
*/
static bool write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char path[FN_LEN];
char shadow_path[FN_LEN];
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DBUG_ENTER("write_log_rename_frm");
part_info->first_log_entry= NULL;
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_replace_delete_frm(lpt, 0UL, shadow_path, path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Write the log entries to ensure that the drop partition command is completed
even in the presence of a crash.
SYNOPSIS
write_log_drop_partition()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Prepare entries to the ddl log indicating all partitions to drop and to
install the shadow frm file and remove the old frm file.
*/
static bool write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char tmp_path[FN_LEN];
char path[FN_LEN];
uint next_entry= 0;
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DBUG_ENTER("write_log_drop_partition");
part_info->first_log_entry= NULL;
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
goto error;
if (write_log_replace_delete_frm(lpt, next_entry, (const char*)tmp_path,
(const char*)path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Write the log entries to ensure that the add partition command is not
executed at all if a crash before it has completed
SYNOPSIS
write_log_add_change_partition()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Prepare entries to the ddl log indicating all partitions to drop and to
remove the shadow frm file.
We always inject entries backwards in the list in the ddl log since we
don't know the entry position until we have written it.
*/
static bool write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
char tmp_path[FN_LEN];
char path[FN_LEN];
uint next_entry= 0;
DBUG_ENTER("write_log_add_change_partition");
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
goto error;
if (write_log_replace_delete_frm(lpt, next_entry, NULL, tmp_path,
FALSE))
goto error;
log_entry= part_info->first_log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
pthread_mutex_unlock(&LOCK_gdl);
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= NULL;
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Write description of how to complete the operation after first phase of
change partitions.
SYNOPSIS
write_log_final_change_partition()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
We will write log entries that specify to remove all partitions reorganised,
to rename others to reflect the new naming scheme and to install the shadow
frm file.
*/
static bool write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char path[FN_LEN];
char shadow_path[FN_LEN];
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
uint next_entry= 0;
DBUG_ENTER("write_log_final_change_partition");
part_info->first_log_entry= NULL;
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
pthread_mutex_lock(&LOCK_gdl);
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
TRUE))
goto error;
if (write_log_changed_partitions(lpt, &next_entry, (const char*)path))
goto error;
if (write_log_replace_delete_frm(lpt, 0UL, shadow_path, path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Remove entry from ddl log and release resources for others to use
SYNOPSIS
write_log_completed()
lpt Struct containing parameters
RETURN VALUES
TRUE Error
FALSE Success
*/
static void write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt,
bool dont_crash)
{
partition_info *part_info= lpt->part_info;
uint count_loop= 0;
bool not_success;
DDL_LOG_MEMORY_ENTRY *log_entry= part_info->exec_log_entry;
DBUG_ENTER("write_log_completed");
DBUG_ASSERT(log_entry);
pthread_mutex_lock(&LOCK_gdl);
if (write_execute_ddl_log_entry(0UL, TRUE, &log_entry))
{
/*
Failed to write, Bad...
We have completed the operation but have log records to REMOVE
stuff that shouldn't be removed. What clever things could one do
here? An error output was written to the error output by the
above method so we don't do anything here.
*/
;
}
release_part_info_log_entries(part_info->first_log_entry);
release_part_info_log_entries(part_info->exec_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->exec_log_entry= NULL;
part_info->first_log_entry= NULL;
DBUG_VOID_RETURN;
}
/*
Release all log entries
SYNOPSIS
release_log_entries()
part_info Partition info struct
RETURN VALUES
NONE
*/
static void release_log_entries(partition_info *part_info)
{
pthread_mutex_lock(&LOCK_gdl);
release_part_info_log_entries(part_info->first_log_entry);
release_part_info_log_entries(part_info->exec_log_entry);
pthread_mutex_unlock(&LOCK_gdl);
part_info->first_log_entry= NULL;
part_info->exec_log_entry= NULL;
}
/*
Handle errors for ALTER TABLE for partitioning
SYNOPSIS
handle_alter_part_error()
lpt Struct carrying parameters
not_completed Was request in complete phase when error occurred
RETURN VALUES
NONE
*/
void handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt,
bool not_completed,
bool drop_partition,
bool frm_install)
{
partition_info *part_info= lpt->part_info;
DBUG_ENTER("handle_alter_part_error");
if (!part_info->first_log_entry &&
execute_ddl_log_entry(current_thd,
part_info->first_log_entry->entry_pos))
{
/*
We couldn't recover from error, most likely manual interaction
is required.
*/
write_log_completed(lpt, FALSE);
release_log_entries(part_info);
if (not_completed)
{
if (drop_partition)
{
/* Table is still ok, but we left a shadow frm file behind. */
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s %s",
"Operation was unsuccessful, table is still intact,",
"but it is possible that a shadow frm file was left behind");
}
else
{
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s %s %s %s",
"Operation was unsuccessful, table is still intact,",
"but it is possible that a shadow frm file was left behind.",
"It is also possible that temporary partitions are left behind,",
"these could be empty or more or less filled with records");
}
}
else
{
if (frm_install)
{
/*
Failed during install of shadow frm file, table isn't intact
and dropped partitions are still there
*/
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s %s %s",
"Failed during alter of partitions, table is no longer intact.",
"The frm file is in an unknown state, and a backup",
"is required.");
}
else if (drop_partition)
{
/*
Table is ok, we have switched to new table but left dropped
partitions still in their places. We remove the log records and
ask the user to perform the action manually. We remove the log
records and ask the user to perform the action manually.
*/
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s %s",
"Failed during drop of partitions, table is intact.",
"Manual drop of remaining partitions is required");
}
else
{
/*
We failed during renaming of partitions. The table is most
certainly in a very bad state so we give user warning and disable
the table by writing an ancient frm version into it.
*/
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s %s %s",
"Failed during renaming of partitions. We are now in a position",
"where table is not reusable",
"Table is disabled by writing ancient frm file version into it");
}
}
}
else
{
release_log_entries(part_info);
if (not_completed)
{
/*
We hit an error before things were completed but managed
to recover from the error. An error occurred and we have
restored things to original so no need for further action.
*/
;
}
else
{
/*
We hit an error after we had completed most of the operation
and were successful in a second attempt so the operation
actually is successful now. We need to issue a warning that
even though we reported an error the operation was successfully
completed.
*/
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,"%s %s",
"Operation was successfully completed by failure handling,",
"after failure of normal operation");
}
}
DBUG_VOID_RETURN;
}
/*
Actually perform the change requested by ALTER TABLE of partitions
previously prepared.
......@@ -4792,9 +5505,12 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
ALTER_PARTITION_PARAM_TYPE lpt_obj;
ALTER_PARTITION_PARAM_TYPE *lpt= &lpt_obj;
bool written_bin_log= TRUE;
bool not_completed= TRUE;
bool frm_install= FALSE;
DBUG_ENTER("fast_alter_partition_table");
lpt->thd= thd;
lpt->part_info= part_info;
lpt->create_info= create_info;
lpt->create_list= create_list;
lpt->key_list= key_list;
......@@ -4826,17 +5542,18 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
In this case it is enough to call optimise_partitions, there is no
need to change frm files or anything else.
*/
int error;
written_bin_log= FALSE;
if (((alter_info->flags & ALTER_OPTIMIZE_PARTITION) &&
(table->file->optimize_partitions(thd))) ||
(error= table->file->optimize_partitions(thd))) ||
((alter_info->flags & ALTER_ANALYZE_PARTITION) &&
(table->file->analyze_partitions(thd))) ||
(error= table->file->analyze_partitions(thd))) ||
((alter_info->flags & ALTER_CHECK_PARTITION) &&
(table->file->check_partitions(thd))) ||
(error= table->file->check_partitions(thd))) ||
((alter_info->flags & ALTER_REPAIR_PARTITION) &&
(table->file->repair_partitions(thd))))
(error= table->file->repair_partitions(thd))))
{
fast_alter_partition_error_handler(lpt);
table->file->print_error(error, MYF(0));
DBUG_RETURN(TRUE);
}
}
......@@ -4881,10 +5598,9 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
1) Write the new frm, pack it and then delete it
2) Perform the change within the handler
*/
if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE | WFRM_PACK_FRM)) ||
(mysql_change_partitions(lpt)))
if (mysql_write_frm(lpt, WFRM_WRITE_SHADOW | WFRM_PACK_FRM) ||
mysql_change_partitions(lpt))
{
fast_alter_partition_error_handler(lpt);
DBUG_RETURN(TRUE);
}
}
......@@ -4912,32 +5628,62 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
after a DROP PARTITION) if one ensured that failed accesses to the
dropped partitions was aborted for sure (thus only possible for
transactional engines).
1) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
0) Write an entry that removes the shadow frm file if crash occurs
1) Write the new frm file as a shadow frm
2) Write the ddl log to ensure that the operation is completed
even in the presence of a MySQL Server crash
3) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
the table have completed
2) Write the new frm file where the partitions have changed but are
still remaining with the state PART_TO_BE_DROPPED
3) Write the bin log
4) Prepare MyISAM handlers for drop of partitions
5) Ensure that any users that has opened the table but not yet
4) Write the bin log
Unfortunately the writing of the binlog is not synchronised with
other logging activities. So no matter in which order the binlog
is written compared to other activities there will always be cases
where crashes make strange things occur. In this placement it can
happen that the ALTER TABLE DROP PARTITION gets performed in the
master but not in the slaves if we have a crash, after writing the
ddl log but before writing the binlog. A solution to this would
require writing the statement first in the ddl log and then
when recovering from the crash read the binlog and insert it into
the binlog if not written already.
5) Install the previously written shadow frm file
6) Ensure that any users that has opened the table but not yet
reached the abort lock do that before downgrading the lock.
6) Drop the partitions
7) Write the frm file that the partition has been dropped
8) Wait until all accesses using the old frm file has completed
9) Complete query
7) Prepare MyISAM handlers for drop of partitions
8) Drop the partitions
9) Remove entries from ddl log
10) Wait until all accesses using the old frm file has completed
11) Complete query
We insert Error injections at all places where it could be interesting
to test if recovery is properly done.
*/
if ((abort_and_upgrade_lock(lpt)) ||
(mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
if (write_log_drop_shadow_frm(lpt) ||
ERROR_INJECT_CRASH("crash_drop_partition_1") ||
mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
ERROR_INJECT_CRASH("crash_drop_partition_2") ||
write_log_drop_partition(lpt) ||
ERROR_INJECT_CRASH("crash_drop_partition_3") ||
(not_completed= FALSE) ||
abort_and_upgrade_lock(lpt) || /* Always returns 0 */
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
(table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
thd->query, thd->query_length), FALSE)) ||
ERROR_INJECT_CRASH("crash_drop_partition_4") ||
(table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE), FALSE) ||
ERROR_INJECT_CRASH("crash_drop_partition_5") ||
((frm_install= TRUE), FALSE) ||
mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
((frm_install= FALSE), FALSE) ||
(close_open_tables_and_downgrade(lpt), FALSE) ||
(mysql_drop_partitions(lpt)) ||
(mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
ERROR_INJECT_CRASH("crash_drop_partition_6") ||
mysql_drop_partitions(lpt) ||
ERROR_INJECT_CRASH("crash_drop_partition_7") ||
(write_log_completed(lpt, FALSE), FALSE) ||
ERROR_INJECT_CRASH("crash_drop_partition_8") ||
(mysql_wait_completed_table(lpt, table), FALSE))
{
fast_alter_partition_error_handler(lpt);
handle_alter_part_error(lpt, not_completed, TRUE, frm_install);
DBUG_RETURN(TRUE);
}
}
......@@ -4954,28 +5700,45 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
miss updates made by a transaction serialised before it that are
inserted into the new partition.
1) Write the new frm file where state of added partitions is
changed to PART_TO_BE_ADDED
0) Write an entry that removes the shadow frm file if crash occurs
1) Write the new frm file as a shadow frm file
2) Log the changes to happen in ddl log
2) Add the new partitions
3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
are still using the old partitioning scheme. Wait until all
ongoing users have completed before progressing.
4) Write a new frm file of the table where the partitions are added
to the table.
5) Write binlog
6) Wait until all accesses using the old frm file has completed
7) Complete query
4) Write binlog
5) Now the change is completed except for the installation of the
new frm file. We thus write an action in the log to change to
the shadow frm file
6) Install the new frm file of the table where the partitions are
added to the table.
7) Wait until all accesses using the old frm file has completed
8) Remove entries from ddl log
9) Complete query
*/
if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
(mysql_change_partitions(lpt)) ||
(abort_and_upgrade_lock(lpt)) ||
(mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
if (write_log_add_change_partition(lpt) ||
ERROR_INJECT_CRASH("crash_add_partition_1") ||
mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
ERROR_INJECT_CRASH("crash_add_partition_2") ||
mysql_change_partitions(lpt) ||
ERROR_INJECT_CRASH("crash_add_partition_3") ||
abort_and_upgrade_lock(lpt) || /* Always returns 0 */
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
(close_open_tables_and_downgrade(lpt), FALSE))
ERROR_INJECT_CRASH("crash_add_partition_4") ||
write_log_rename_frm(lpt) ||
(not_completed= FALSE) ||
ERROR_INJECT_CRASH("crash_add_partition_5") ||
((frm_install= TRUE), FALSE) ||
mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
ERROR_INJECT_CRASH("crash_add_partition_6") ||
(close_open_tables_and_downgrade(lpt), FALSE) ||
(write_log_completed(lpt, FALSE), FALSE) ||
ERROR_INJECT_CRASH("crash_add_partition_7"))
{
fast_alter_partition_error_handler(lpt);
handle_alter_part_error(lpt, not_completed, FALSE, frm_install);
DBUG_RETURN(TRUE);
}
}
......@@ -5012,44 +5775,57 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
use a lower lock level. This can be handled inside store_lock in the
respective handler.
1) Write the new frm file where state of added partitions is
changed to PART_TO_BE_ADDED and the reorganised partitions
are set in state PART_TO_BE_REORGED.
2) Add the new partitions
0) Write an entry that removes the shadow frm file if crash occurs
1) Write the shadow frm file of new partitioning
2) Log such that temporary partitions added in change phase are
removed in a crash situation
3) Add the new partitions
Copy from the reorganised partitions to the new partitions
3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
4) Log that operation is completed and log all complete actions
needed to complete operation from here
5) Lock all partitions in TL_WRITE_ONLY to ensure that no users
are still using the old partitioning scheme. Wait until all
ongoing users have completed before progressing.
4) Prepare MyISAM handlers for rename and delete of partitions
5) Write a new frm file of the table where the partitions are
reorganised.
6) Rename the reorged partitions such that they are no longer
6) Prepare MyISAM handlers for rename and delete of partitions
7) Rename the reorged partitions such that they are no longer
used and rename those added to their real new names.
7) Write bin log
8) Wait until all accesses using the old frm file has completed
9) Drop the reorganised partitions
10)Write a new frm file of the table where the partitions are
reorganised.
11)Wait until all accesses using the old frm file has completed
12)Complete query
8) Write bin log
9) Install the shadow frm file
10) Wait until all accesses using the old frm file has completed
11) Drop the reorganised partitions
12) Remove log entry
13)Wait until all accesses using the old frm file has completed
14)Complete query
*/
if ((mysql_write_frm(lpt, WFRM_INITIAL_WRITE)) ||
(mysql_change_partitions(lpt)) ||
(abort_and_upgrade_lock(lpt)) ||
(mysql_write_frm(lpt, WFRM_CREATE_HANDLER_FILES)) ||
(table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE)) ||
(mysql_rename_partitions(lpt)) ||
if (write_log_add_change_partition(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_1") ||
mysql_write_frm(lpt, WFRM_WRITE_SHADOW) ||
ERROR_INJECT_CRASH("crash_change_partition_2") ||
mysql_change_partitions(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_3") ||
write_log_final_change_partition(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_4") ||
(not_completed= FALSE) ||
abort_and_upgrade_lock(lpt) || /* Always returns 0 */
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
ERROR_INJECT_CRASH("crash_change_partition_5") ||
(table->file->extra(HA_EXTRA_PREPARE_FOR_DELETE), FALSE) ||
ERROR_INJECT_CRASH("crash_change_partition_6") ||
mysql_rename_partitions(lpt) ||
((frm_install= TRUE), FALSE) ||
ERROR_INJECT_CRASH("crash_change_partition_7") ||
mysql_write_frm(lpt, WFRM_INSTALL_SHADOW) ||
ERROR_INJECT_CRASH("crash_change_partition_8") ||
(close_open_tables_and_downgrade(lpt), FALSE) ||
(mysql_drop_partitions(lpt)) ||
(mysql_write_frm(lpt, 0UL)) ||
ERROR_INJECT_CRASH("crash_change_partition_9") ||
(write_log_completed(lpt, FALSE), FALSE) ||
ERROR_INJECT_CRASH("crash_change_partition_10") ||
(mysql_wait_completed_table(lpt, table), FALSE))
{
fast_alter_partition_error_handler(lpt);
DBUG_RETURN(TRUE);
handle_alter_part_error(lpt, not_completed, FALSE, frm_install);
DBUG_RETURN(TRUE);
}
}
/*
......@@ -5057,7 +5833,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
user
*/
DBUG_RETURN(fast_end_partition(thd, lpt->copied, lpt->deleted,
table_list, FALSE, lpt,
table, table_list, FALSE, lpt,
written_bin_log));
}
#endif
......@@ -5709,5 +6485,85 @@ static uint32 get_next_subpartition_via_walking(PARTITION_ITERATOR *part_iter)
field->store(part_iter->field_vals.cur++, FALSE);
return part_iter->part_info->get_subpartition_id(part_iter->part_info);
}
/*
Create partition names
SYNOPSIS
create_partition_name()
out:out Created partition name string
in1 First part
in2 Second part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the partition name, service routine to
the del_ren_cre_table method.
*/
void create_partition_name(char *out, const char *in1,
const char *in2, uint name_variant,
bool translate)
{
char transl_part_name[FN_REFLEN];
const char *transl_part;
if (translate)
{
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
transl_part= transl_part_name;
}
else
transl_part= in2;
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}
/*
Create subpartition name
SYNOPSIS
create_subpartition_name()
out:out Created partition name string
in1 First part
in2 Second part
in3 Third part
name_variant Normal, temporary or renamed partition name
RETURN VALUE
NONE
DESCRIPTION
This method is used to calculate the subpartition name, service routine to
the del_ren_cre_table method.
*/
void create_subpartition_name(char *out, const char *in1,
const char *in2, const char *in3,
uint name_variant)
{
char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];
tablename_to_filename(in2, transl_part_name, FN_REFLEN);
tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
if (name_variant == NORMAL_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, NullS);
else if (name_variant == TEMP_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#TMP#", NullS);
else if (name_variant == RENAMED_PART_NAME)
strxmov(out, in1, "#P#", transl_part_name,
"#SP#", transl_subpart_name, "#REN#", NullS);
}
#endif
......@@ -248,6 +248,904 @@ static int mysql_copy_key_list(List<Key> *orig_key,
DBUG_RETURN(FALSE);
}
/*
--------------------------------------------------------------------------
MODULE: DDL log
-----------------
This module is used to ensure that we can recover from crashes that occur
in the middle of a meta-data operation in MySQL. E.g. DROP TABLE t1, t2;
We need to ensure that both t1 and t2 are dropped and not only t1 and
also that each table drop is entirely done and not "half-baked".
To support this we create log entries for each meta-data statement in the
ddl log while we are executing. These entries are dropped when the
operation is completed.
At recovery those entries that were not completed will be executed.
There is only one ddl log in the system and it is protected by a mutex
and there is a global struct that contains information about its current
state.
History:
First version written in 2006 by Mikael Ronstrom
--------------------------------------------------------------------------
*/
typedef struct st_global_ddl_log
{
/*
We need to adjust buffer size to be able to handle downgrades/upgrades
where IO_SIZE has changed. We'll set the buffer size such that we can
handle that the buffer size was upto 4 times bigger in the version
that wrote the DDL log.
*/
char file_entry_buf[4*IO_SIZE];
char file_name_str[FN_REFLEN];
char *file_name;
DDL_LOG_MEMORY_ENTRY *first_free;
DDL_LOG_MEMORY_ENTRY *first_used;
uint num_entries;
File file_id;
uint name_len;
uint io_size;
bool inited;
bool recovery_phase;
} GLOBAL_DDL_LOG;
GLOBAL_DDL_LOG global_ddl_log;
pthread_mutex_t LOCK_gdl;
#define DDL_LOG_ENTRY_TYPE_POS 0
#define DDL_LOG_ACTION_TYPE_POS 1
#define DDL_LOG_PHASE_POS 2
#define DDL_LOG_NEXT_ENTRY_POS 4
#define DDL_LOG_NAME_POS 8
#define DDL_LOG_NUM_ENTRY_POS 0
#define DDL_LOG_NAME_LEN_POS 4
#define DDL_LOG_IO_SIZE_POS 8
/*
Read one entry from ddl log file
SYNOPSIS
read_ddl_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
*/
static bool read_ddl_log_file_entry(uint entry_no)
{
bool error= FALSE;
File file_id= global_ddl_log.file_id;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
uint io_size= global_ddl_log.io_size;
DBUG_ENTER("read_ddl_log_file_entry");
if (my_pread(file_id, file_entry_buf, io_size, io_size * entry_no,
MYF(MY_WME)) != io_size)
error= TRUE;
DBUG_RETURN(error);
}
/*
Write one entry from ddl log file
SYNOPSIS
write_ddl_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
*/
static bool write_ddl_log_file_entry(uint entry_no)
{
bool error= FALSE;
File file_id= global_ddl_log.file_id;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("write_ddl_log_file_entry");
if (my_pwrite(file_id, file_entry_buf,
IO_SIZE, IO_SIZE * entry_no, MYF(MY_WME)) != IO_SIZE)
error= TRUE;
DBUG_RETURN(error);
}
/*
Write ddl log header
SYNOPSIS
write_ddl_log_header()
RETURN VALUES
TRUE Error
FALSE Success
*/
static bool write_ddl_log_header()
{
uint16 const_var;
bool error= FALSE;
DBUG_ENTER("write_ddl_log_header");
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NUM_ENTRY_POS],
global_ddl_log.num_entries);
const_var= FN_LEN;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_LEN_POS],
const_var);
const_var= IO_SIZE;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS],
const_var);
if (write_ddl_log_file_entry(0UL))
{
sql_print_error("Error writing ddl log header");
DBUG_RETURN(TRUE);
}
VOID(sync_ddl_log());
DBUG_RETURN(error);
}
/*
Create ddl log file name
SYNOPSIS
create_ddl_log_file_name()
file_name Filename setup
RETURN VALUES
NONE
*/
static inline void create_ddl_log_file_name(char *file_name)
{
strxmov(file_name, mysql_data_home, "/", "ddl_log.log", NullS);
}
/*
Read header of ddl log file
SYNOPSIS
read_ddl_log_header()
RETURN VALUES
> 0 Last entry in ddl log
0 No entries in ddl log
DESCRIPTION
When we read the ddl log header we get information about maximum sizes
of names in the ddl log and we also get information about the number
of entries in the ddl log.
*/
static uint read_ddl_log_header()
{
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
char file_name[FN_REFLEN];
uint entry_no;
bool successful_open= FALSE;
DBUG_ENTER("read_ddl_log_header");
bzero(file_entry_buf, sizeof(global_ddl_log.file_entry_buf));
global_ddl_log.inited= FALSE;
global_ddl_log.recovery_phase= TRUE;
global_ddl_log.io_size= IO_SIZE;
create_ddl_log_file_name(file_name);
if ((global_ddl_log.file_id= my_open(file_name,
O_RDWR | O_BINARY, MYF(MY_WME))) >= 0)
{
if (read_ddl_log_file_entry(0UL))
{
/* Write message into error log */
sql_print_error("Failed to read ddl log file in recovery");
}
else
successful_open= TRUE;
}
entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
if (successful_open)
{
global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
DBUG_ASSERT(global_ddl_log.io_size <=
sizeof(global_ddl_log.file_entry_buf));
}
else
{
entry_no= 0;
}
global_ddl_log.first_free= NULL;
global_ddl_log.first_used= NULL;
global_ddl_log.num_entries= 0;
VOID(pthread_mutex_init(&LOCK_gdl, MY_MUTEX_INIT_FAST));
DBUG_RETURN(entry_no);
}
/*
Read a ddl log entry
SYNOPSIS
read_ddl_log_entry()
read_entry Number of entry to read
out:entry_info Information from entry
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Read a specified entry in the ddl log
*/
bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
{
char *file_entry_buf= (char*)&global_ddl_log.file_entry_buf;
uint inx;
uchar single_char;
DBUG_ENTER("read_ddl_log_entry");
if (read_ddl_log_file_entry(read_entry))
{
DBUG_RETURN(TRUE);
}
ddl_log_entry->entry_pos= read_entry;
single_char= file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
ddl_log_entry->entry_type= (enum ddl_log_entry_code)single_char;
single_char= file_entry_buf[DDL_LOG_ACTION_TYPE_POS];
ddl_log_entry->action_type= (enum ddl_log_action_code)single_char;
ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS];
ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]);
ddl_log_entry->name= &file_entry_buf[DDL_LOG_NAME_POS];
inx= DDL_LOG_NAME_POS + global_ddl_log.name_len;
ddl_log_entry->from_name= &file_entry_buf[inx];
inx+= global_ddl_log.name_len;
ddl_log_entry->handler_name= &file_entry_buf[inx];
DBUG_RETURN(FALSE);
}
/*
Initialise ddl log
SYNOPSIS
init_ddl_log()
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Write the header of the ddl log file and length of names. Also set
number of entries to zero.
*/
static bool init_ddl_log()
{
bool error= FALSE;
char file_name[FN_REFLEN];
DBUG_ENTER("init_ddl_log");
if (global_ddl_log.inited)
{
DBUG_RETURN(FALSE);
}
global_ddl_log.io_size= IO_SIZE;
create_ddl_log_file_name(file_name);
if ((global_ddl_log.file_id= my_create(file_name,
CREATE_MODE,
O_RDWR | O_TRUNC | O_BINARY,
MYF(MY_WME))) < 0)
{
/* Couldn't create ddl log file, this is serious error */
sql_print_error("Failed to open ddl log file");
DBUG_RETURN(TRUE);
}
global_ddl_log.inited= TRUE;
if (write_ddl_log_header())
{
global_ddl_log.inited= FALSE;
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
/*
Execute one action in a ddl log entry
SYNOPSIS
execute_ddl_log_action()
ddl_log_entry Information in action entry to execute
RETURN VALUES
TRUE Error
FALSE Success
*/
static int execute_ddl_log_action(THD *thd, DDL_LOG_ENTRY *ddl_log_entry)
{
bool frm_action= FALSE;
LEX_STRING handler_name;
handler *file= NULL;
MEM_ROOT mem_root;
int error= TRUE;
char to_path[FN_REFLEN];
char from_path[FN_REFLEN];
char *par_ext= (char*)".par";
handlerton *hton;
DBUG_ENTER("execute_ddl_log_action");
if (ddl_log_entry->entry_type == DDL_IGNORE_LOG_ENTRY_CODE)
{
DBUG_RETURN(FALSE);
}
handler_name.str= (char*)ddl_log_entry->handler_name;
handler_name.length= strlen(ddl_log_entry->handler_name);
init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0);
if (!strcmp(ddl_log_entry->handler_name, reg_ext))
frm_action= TRUE;
else
{
TABLE_SHARE dummy;
hton= ha_resolve_by_name(thd, &handler_name);
if (!hton)
{
my_error(ER_ILLEGAL_HA, MYF(0), ddl_log_entry->handler_name);
goto error;
}
bzero(&dummy, sizeof(TABLE_SHARE));
file= get_new_handler(&dummy, &mem_root, hton);
if (!file)
{
mem_alloc_error(sizeof(handler));
goto error;
}
}
switch (ddl_log_entry->action_type)
{
case DDL_LOG_REPLACE_ACTION:
case DDL_LOG_DELETE_ACTION:
{
if (ddl_log_entry->phase == 0)
{
if (frm_action)
{
strxmov(to_path, ddl_log_entry->name, reg_ext, NullS);
if ((error= my_delete(to_path, MYF(MY_WME))))
{
if (my_errno != ENOENT)
break;
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(to_path, ddl_log_entry->name, par_ext, NullS);
VOID(my_delete(to_path, MYF(MY_WME)));
#endif
}
else
{
if ((error= file->delete_table(ddl_log_entry->name)))
{
if (error != ENOENT && error != HA_ERR_NO_SUCH_TABLE)
break;
}
}
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
break;
VOID(sync_ddl_log());
error= FALSE;
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
break;
}
DBUG_ASSERT(ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION);
/*
Fall through and perform the rename action of the replace
action. We have already indicated the success of the delete
action in the log entry by stepping up the phase.
*/
}
case DDL_LOG_RENAME_ACTION:
{
error= TRUE;
if (frm_action)
{
strxmov(to_path, ddl_log_entry->name, reg_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, reg_ext, NullS);
if (my_rename(from_path, to_path, MYF(MY_WME)))
break;
#ifdef WITH_PARTITION_STORAGE_ENGINE
strxmov(to_path, ddl_log_entry->name, par_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, par_ext, NullS);
VOID(my_rename(from_path, to_path, MYF(MY_WME)));
#endif
}
else
{
if (file->rename_table(ddl_log_entry->from_name,
ddl_log_entry->name))
break;
}
if ((deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
break;
VOID(sync_ddl_log());
error= FALSE;
break;
}
default:
DBUG_ASSERT(0);
break;
}
delete file;
error:
free_root(&mem_root, MYF(0));
DBUG_RETURN(error);
}
/*
Get a free entry in the ddl log
SYNOPSIS
get_free_ddl_log_entry()
out:active_entry A ddl log memory entry returned
RETURN VALUES
TRUE Error
FALSE Success
*/
static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
bool *write_header)
{
DDL_LOG_MEMORY_ENTRY *used_entry;
DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used;
DBUG_ENTER("get_free_ddl_log_entry");
if (global_ddl_log.first_free == NULL)
{
if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc(
sizeof(DDL_LOG_MEMORY_ENTRY), MYF(MY_WME))))
{
sql_print_error("Failed to allocate memory for ddl log free list");
DBUG_RETURN(TRUE);
}
global_ddl_log.num_entries++;
used_entry->entry_pos= global_ddl_log.num_entries;
*write_header= TRUE;
}
else
{
used_entry= global_ddl_log.first_free;
global_ddl_log.first_free= used_entry->next_log_entry;
*write_header= FALSE;
}
/*
Move from free list to used list
*/
used_entry->next_log_entry= first_used;
used_entry->prev_log_entry= NULL;
global_ddl_log.first_used= used_entry;
if (first_used)
first_used->prev_log_entry= used_entry;
*active_entry= used_entry;
DBUG_RETURN(FALSE);
}
/*
External interface methods for the DDL log Module
---------------------------------------------------
*/
/*
SYNOPSIS
write_ddl_log_entry()
ddl_log_entry Information about log entry
out:entry_written Entry information written into
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
A careful write of the ddl log is performed to ensure that we can
handle crashes occurring during CREATE and ALTER TABLE processing.
*/
bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DDL_LOG_MEMORY_ENTRY **active_entry)
{
bool error, write_header;
DBUG_ENTER("write_ddl_log_entry");
if (init_ddl_log())
{
DBUG_RETURN(TRUE);
}
global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]=
(char)DDL_LOG_ENTRY_CODE;
global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS]=
(char)ddl_log_entry->action_type;
global_ddl_log.file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NEXT_ENTRY_POS],
ddl_log_entry->next_entry);
DBUG_ASSERT(strlen(ddl_log_entry->name) < FN_LEN);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS],
ddl_log_entry->name, FN_LEN - 1);
if (ddl_log_entry->action_type == DDL_LOG_RENAME_ACTION ||
ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION)
{
DBUG_ASSERT(strlen(ddl_log_entry->from_name) < FN_LEN);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN],
ddl_log_entry->from_name, FN_LEN - 1);
}
else
global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
DBUG_ASSERT(strlen(ddl_log_entry->handler_name) < FN_LEN);
strmake(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + (2*FN_LEN)],
ddl_log_entry->handler_name, FN_LEN - 1);
if (get_free_ddl_log_entry(active_entry, &write_header))
{
DBUG_RETURN(TRUE);
}
error= FALSE;
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
{
error= TRUE;
sql_print_error("Failed to write entry_no = %u",
(*active_entry)->entry_pos);
}
if (write_header && !error)
{
VOID(sync_ddl_log());
if (write_ddl_log_header())
error= TRUE;
}
if (error)
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(error);
}
/*
Write final entry in the ddl log
SYNOPSIS
write_execute_ddl_log_entry()
first_entry First entry in linked list of entries
to execute, if 0 = NULL it means that
the entry is removed and the entries
are put into the free list.
complete Flag indicating we are simply writing
info about that entry has been completed
in:out:active_entry Entry to execute, 0 = NULL if the entry
is written first time and needs to be
returned. In this case the entry written
is returned in this parameter
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
This is the last write in the ddl log. The previous log entries have
already been written but not yet synched to disk.
We write a couple of log entries that describes action to perform.
This entries are set-up in a linked list, however only when a first
execute entry is put as the first entry these will be executed.
This routine writes this first
*/
bool write_execute_ddl_log_entry(uint first_entry,
bool complete,
DDL_LOG_MEMORY_ENTRY **active_entry)
{
bool write_header= FALSE;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("write_execute_ddl_log_entry");
if (init_ddl_log())
{
DBUG_RETURN(TRUE);
}
if (!complete)
{
/*
We haven't synched the log entries yet, we synch them now before
writing the execute entry. If complete is true we haven't written
any log entries before, we are only here to write the execute
entry to indicate it is done.
*/
VOID(sync_ddl_log());
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= (char)DDL_LOG_EXECUTE_CODE;
}
else
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= (char)DDL_IGNORE_LOG_ENTRY_CODE;
file_entry_buf[DDL_LOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry);
file_entry_buf[DDL_LOG_NAME_POS]= 0;
file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
file_entry_buf[DDL_LOG_NAME_POS + 2*FN_LEN]= 0;
if (!(*active_entry))
{
if (get_free_ddl_log_entry(active_entry, &write_header))
{
DBUG_RETURN(TRUE);
}
}
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
{
sql_print_error("Error writing execute entry in ddl log");
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
VOID(sync_ddl_log());
if (write_header)
{
if (write_ddl_log_header())
{
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
}
DBUG_RETURN(FALSE);
}
/*
For complex rename operations we need to deactivate individual entries.
SYNOPSIS
deactivate_ddl_log_entry()
entry_no Entry position of record to change
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
During replace operations where we start with an existing table called
t1 and a replacement table called t1#temp or something else and where
we want to delete t1 and rename t1#temp to t1 this is not possible to
do in a safe manner unless the ddl log is informed of the phases in
the change.
Delete actions are 1-phase actions that can be ignored immediately after
being executed.
Rename actions from x to y is also a 1-phase action since there is no
interaction with any other handlers named x and y.
Replace action where drop y and x -> y happens needs to be a two-phase
action. Thus the first phase will drop y and the second phase will
rename x -> y.
*/
bool deactivate_ddl_log_entry(uint entry_no)
{
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("deactivate_ddl_log_entry");
if (!read_ddl_log_file_entry(entry_no))
{
if (file_entry_buf[DDL_LOG_ENTRY_TYPE_POS] == DDL_LOG_ENTRY_CODE)
{
if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_DELETE_ACTION ||
file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_RENAME_ACTION ||
(file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION &&
file_entry_buf[DDL_LOG_PHASE_POS] == 1))
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
else if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION)
{
DBUG_ASSERT(file_entry_buf[DDL_LOG_PHASE_POS] == 0);
file_entry_buf[DDL_LOG_PHASE_POS]= 1;
}
else
{
DBUG_ASSERT(0);
}
if (write_ddl_log_file_entry(entry_no))
{
sql_print_error("Error in deactivating log entry. Position = %u",
entry_no);
DBUG_RETURN(TRUE);
}
}
}
else
{
sql_print_error("Failed in reading entry before deactivating it");
DBUG_RETURN(TRUE);
}
DBUG_RETURN(FALSE);
}
/*
Sync ddl log file
SYNOPSIS
sync_ddl_log()
RETURN VALUES
TRUE Error
FALSE Success
*/
bool sync_ddl_log()
{
bool error= FALSE;
DBUG_ENTER("sync_ddl_log");
if ((!global_ddl_log.recovery_phase) &&
init_ddl_log())
{
DBUG_RETURN(TRUE);
}
if (my_sync(global_ddl_log.file_id, MYF(0)))
{
/* Write to error log */
sql_print_error("Failed to sync ddl log");
error= TRUE;
}
DBUG_RETURN(error);
}
/*
Release a log memory entry
SYNOPSIS
release_ddl_log_memory_entry()
log_memory_entry Log memory entry to release
RETURN VALUES
NONE
*/
void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry)
{
DDL_LOG_MEMORY_ENTRY *first_free= global_ddl_log.first_free;
DDL_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
DDL_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
DBUG_ENTER("release_ddl_log_memory_entry");
global_ddl_log.first_free= log_entry;
log_entry->next_log_entry= first_free;
if (prev_log_entry)
prev_log_entry->next_log_entry= next_log_entry;
else
global_ddl_log.first_used= next_log_entry;
if (next_log_entry)
next_log_entry->prev_log_entry= prev_log_entry;
DBUG_VOID_RETURN;
}
/*
Execute one entry in the ddl log. Executing an entry means executing
a linked list of actions.
SYNOPSIS
execute_ddl_log_entry()
first_entry Reference to first action in entry
RETURN VALUES
TRUE Error
FALSE Success
*/
bool execute_ddl_log_entry(THD *thd, uint first_entry)
{
DDL_LOG_ENTRY ddl_log_entry;
uint read_entry= first_entry;
DBUG_ENTER("execute_ddl_log_entry");
pthread_mutex_lock(&LOCK_gdl);
do
{
if (read_ddl_log_entry(read_entry, &ddl_log_entry))
{
/* Write to error log and continue with next log entry */
sql_print_error("Failed to read entry = %u from ddl log",
read_entry);
break;
}
DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE ||
ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE);
if (execute_ddl_log_action(thd, &ddl_log_entry))
{
/* Write to error log and continue with next log entry */
sql_print_error("Failed to execute action for entry = %u from ddl log",
read_entry);
break;
}
read_entry= ddl_log_entry.next_entry;
} while (read_entry);
pthread_mutex_unlock(&LOCK_gdl);
DBUG_RETURN(FALSE);
}
/*
Execute the ddl log at recovery of MySQL Server
SYNOPSIS
execute_ddl_log_recovery()
RETURN VALUES
NONE
*/
void execute_ddl_log_recovery()
{
uint num_entries, i;
THD *thd;
DDL_LOG_ENTRY ddl_log_entry;
char file_name[FN_REFLEN];
DBUG_ENTER("execute_ddl_log_recovery");
/*
To be able to run this from boot, we allocate a temporary THD
*/
if (!(thd=new THD))
DBUG_VOID_RETURN;
thd->thread_stack= (char*) &thd;
thd->store_globals();
num_entries= read_ddl_log_header();
for (i= 1; i < num_entries + 1; i++)
{
if (read_ddl_log_entry(i, &ddl_log_entry))
{
sql_print_error("Failed to read entry no = %u from ddl log",
i);
continue;
}
if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE)
{
if (execute_ddl_log_entry(thd, ddl_log_entry.next_entry))
{
/* Real unpleasant scenario but we continue anyways. */
continue;
}
}
}
create_ddl_log_file_name(file_name);
VOID(my_delete(file_name, MYF(0)));
global_ddl_log.recovery_phase= FALSE;
delete thd;
/* Remember that we don't have a THD */
my_pthread_setspecific_ptr(THR_THD, 0);
DBUG_VOID_RETURN;
}
/*
Release all memory allocated to the ddl log
SYNOPSIS
release_ddl_log()
RETURN VALUES
NONE
*/
void release_ddl_log()
{
DDL_LOG_MEMORY_ENTRY *free_list= global_ddl_log.first_free;
DDL_LOG_MEMORY_ENTRY *used_list= global_ddl_log.first_used;
DBUG_ENTER("release_ddl_log");
pthread_mutex_lock(&LOCK_gdl);
while (used_list)
{
DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
my_free((char*)used_list, MYF(0));
used_list= tmp;
}
while (free_list)
{
DDL_LOG_MEMORY_ENTRY *tmp= free_list->next_log_entry;
my_free((char*)free_list, MYF(0));
free_list= tmp;
}
VOID(my_close(global_ddl_log.file_id, MYF(0)));
pthread_mutex_unlock(&LOCK_gdl);
VOID(pthread_mutex_destroy(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
---------------------------------------------------------------------------
END MODULE DDL log
--------------------
---------------------------------------------------------------------------
*/
/*
SYNOPSIS
......@@ -281,83 +1179,68 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
*/
int error= 0;
char path[FN_REFLEN+1];
char shadow_path[FN_REFLEN+1];
char shadow_frm_name[FN_REFLEN+1];
char frm_name[FN_REFLEN+1];
DBUG_ENTER("mysql_write_frm");
if (flags & WFRM_INITIAL_WRITE)
/*
Build shadow frm file name
*/
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
strxmov(shadow_frm_name, shadow_path, reg_ext, NullS);
if (flags & WFRM_WRITE_SHADOW)
{
error= mysql_copy_create_list(lpt->create_list,
&lpt->new_create_list);
error+= mysql_copy_key_list(lpt->key_list,
&lpt->new_key_list);
if (error)
if (mysql_copy_create_list(lpt->create_list,
&lpt->new_create_list) ||
mysql_copy_key_list(lpt->key_list,
&lpt->new_key_list) ||
mysql_prepare_table(lpt->thd, lpt->create_info,
&lpt->new_create_list,
&lpt->new_key_list,
/*tmp_table*/ 1,
&lpt->db_options,
lpt->table->file,
&lpt->key_info_buffer,
&lpt->key_count,
/*select_field_count*/ 0))
{
DBUG_RETURN(TRUE);
}
}
build_table_filename(path, sizeof(path), lpt->db, lpt->table_name, "");
strxmov(frm_name, path, reg_ext, NullS);
if ((flags & WFRM_INITIAL_WRITE) &&
(mysql_prepare_table(lpt->thd, lpt->create_info, &lpt->new_create_list,
&lpt->new_key_list,/*tmp_table*/ 1, &lpt->db_options,
lpt->table->file, &lpt->key_info_buffer,
&lpt->key_count, /*select_field_count*/ 0)))
{
DBUG_RETURN(TRUE);
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
{
partition_info *part_info= lpt->table->part_info;
char *part_syntax_buf;
uint syntax_len, i;
bool any_unnormal_state= FALSE;
if (part_info)
{
uint max_part_state_len= part_info->partitions.elements +
part_info->temp_partitions.elements;
if (!(part_info->part_state= (uchar*)sql_alloc(max_part_state_len)))
{
DBUG_RETURN(TRUE);
}
part_info->part_state_len= 0;
if (!(part_syntax_buf= generate_partition_syntax(part_info,
&syntax_len,
TRUE, FALSE)))
{
DBUG_RETURN(TRUE);
}
for (i= 0; i < part_info->part_state_len; i++)
{
enum partition_state part_state=
(enum partition_state)part_info->part_state[i];
if (part_state != PART_NORMAL && part_state != PART_IS_ADDED)
any_unnormal_state= TRUE;
}
if (!any_unnormal_state)
partition_info *part_info= lpt->table->part_info;
char *part_syntax_buf;
uint syntax_len;
if (part_info)
{
part_info->part_state= NULL;
part_info->part_state_len= 0;
if (!(part_syntax_buf= generate_partition_syntax(part_info,
&syntax_len,
TRUE, FALSE)))
{
DBUG_RETURN(TRUE);
}
part_info->part_info_string= part_syntax_buf;
part_info->part_info_len= syntax_len;
}
part_info->part_info_string= part_syntax_buf;
part_info->part_info_len= syntax_len;
}
}
#endif
/*
We write the frm file with the LOCK_open mutex since otherwise we could
overwrite the frm file as another is reading it in open_table.
*/
lpt->create_info->table_options= lpt->db_options;
VOID(pthread_mutex_lock(&LOCK_open));
if ((mysql_create_frm(lpt->thd, frm_name, lpt->db, lpt->table_name,
lpt->create_info, lpt->new_create_list, lpt->key_count,
lpt->key_info_buffer, lpt->table->file)) ||
((flags & WFRM_CREATE_HANDLER_FILES) &&
lpt->table->file->create_handler_files(path, lpt->create_info)))
{
error= 1;
goto end;
/* Write shadow frm file */
lpt->create_info->table_options= lpt->db_options;
if ((mysql_create_frm(lpt->thd, shadow_frm_name, lpt->db,
lpt->table_name, lpt->create_info,
lpt->new_create_list, lpt->key_count,
lpt->key_info_buffer, lpt->table->file)) ||
lpt->table->file->create_handler_files(shadow_path, NULL,
CHF_CREATE_FLAG,
lpt->create_info))
{
my_delete(shadow_frm_name, MYF(0));
error= 1;
goto end;
}
}
if (flags & WFRM_PACK_FRM)
{
......@@ -369,7 +1252,7 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
*/
const void *data= 0;
uint length= 0;
if (readfrm(path, &data, &length) ||
if (readfrm(shadow_path, &data, &length) ||
packfrm(data, length, &lpt->pack_frm_data, &lpt->pack_frm_len))
{
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
......@@ -378,11 +1261,56 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
error= 1;
goto end;
}
error= my_delete(frm_name, MYF(MY_WME));
error= my_delete(shadow_frm_name, MYF(MY_WME));
}
/* Frm file have been updated to reflect the change about to happen. */
if (flags & WFRM_INSTALL_SHADOW)
{
#ifdef WITH_PARTITION_STORAGE_ENGINE
partition_info *part_info= lpt->part_info;
#endif
/*
Build frm file name
*/
build_table_filename(path, sizeof(path), lpt->db,
lpt->table_name, "");
strxmov(frm_name, path, reg_ext, NullS);
/*
When we are changing to use new frm file we need to ensure that we
don't collide with another thread in process to open the frm file.
We start by deleting the .frm file and possible .par file. Then we
write to the DDL log that we have completed the delete phase by
increasing the phase of the log entry. Next step is to rename the
new .frm file and the new .par file to the real name. After
completing this we write a new phase to the log entry that will
deactivate it.
*/
VOID(pthread_mutex_lock(&LOCK_open));
if (my_delete(frm_name, MYF(MY_WME)) ||
#ifdef WITH_PARTITION_STORAGE_ENGINE
lpt->table->file->create_handler_files(path, shadow_path,
CHF_DELETE_FLAG, NULL) ||
deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos) ||
(sync_ddl_log(), FALSE) ||
#endif
#ifdef WITH_PARTITION_STORAGE_ENGINE
my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
lpt->table->file->create_handler_files(path, shadow_path,
CHF_RENAME_FLAG, NULL))
#else
my_rename(shadow_frm_name, frm_name, MYF(MY_WME)))
#endif
{
error= 1;
}
VOID(pthread_mutex_unlock(&LOCK_open));
#ifdef WITH_PARTITION_STORAGE_ENGINE
deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos);
part_info->frm_log_entry= NULL;
VOID(sync_ddl_log());
#endif
}
end:
VOID(pthread_mutex_unlock(&LOCK_open));
DBUG_RETURN(error);
}
......@@ -4790,7 +5718,8 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
error= (mysql_create_frm(thd, reg_path, db, table_name,
create_info, prepared_create_list, key_count,
key_info_buffer, table->file) ||
table->file->create_handler_files(path, create_info));
table->file->create_handler_files(reg_path, NULL, CHF_INDEX_FLAG,
create_info));
VOID(pthread_mutex_unlock(&LOCK_open));
if (error)
goto err;
......@@ -4836,7 +5765,8 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
error= (mysql_create_frm(thd, reg_path, db, table_name,
create_info, prepared_create_list, key_count,
key_info_buffer, table->file) ||
table->file->create_handler_files(path, create_info));
table->file->create_handler_files(reg_path, NULL, CHF_INDEX_FLAG,
create_info));
VOID(pthread_mutex_unlock(&LOCK_open));
if (error)
goto err;
......@@ -5060,7 +5990,8 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
VOID(pthread_mutex_lock(&LOCK_open));
}
/* Tell the handler that a new frm file is in place. */
if (table->file->create_handler_files(path, create_info))
if (table->file->create_handler_files(reg_path, NULL, CHF_INDEX_FLAG,
create_info))
{
VOID(pthread_mutex_unlock(&LOCK_open));
goto err;
......
......@@ -3558,75 +3558,14 @@ part_definition:
LEX *lex= Lex;
partition_info *part_info= lex->part_info;
partition_element *p_elem= new partition_element();
uint part_id= part_info->partitions.elements +
part_info->temp_partitions.elements;
enum partition_state part_state;
uint part_id= part_info->partitions.elements;
if (part_info->part_state)
part_state= (enum partition_state)part_info->part_state[part_id];
else
part_state= PART_NORMAL;
switch (part_state)
if (!p_elem || part_info->partitions.push_back(p_elem))
{
case PART_TO_BE_DROPPED:
/*
This part is currently removed so we keep it in a
temporary list for REPAIR TABLE to be able to handle
failures during drop partition process.
*/
case PART_TO_BE_ADDED:
/*
This part is currently being added so we keep it in a
temporary list for REPAIR TABLE to be able to handle
failures during add partition process.
*/
if (!p_elem || part_info->temp_partitions.push_back(p_elem))
{
mem_alloc_error(sizeof(partition_element));
YYABORT;
}
break;
case PART_IS_ADDED:
/*
Part has been added and is now a normal partition
*/
case PART_TO_BE_REORGED:
/*
This part is currently reorganised, it is still however
used so we keep it in the list of partitions. We do
however need the state to be able to handle REPAIR TABLE
after failures in the reorganisation process.
*/
case PART_REORGED_DROPPED:
/*
This part is currently reorganised as part of a
COALESCE PARTITION and it will be dropped without a new
replacement partition after completing the reorganisation.
*/
case PART_CHANGED:
/*
This part is currently split or merged as part of ADD
PARTITION for a hash partition or as part of COALESCE
PARTITION for a hash partitioned table.
*/
case PART_IS_CHANGED:
/*
This part has been split or merged as part of ADD
PARTITION for a hash partition or as part of COALESCE
PARTITION for a hash partitioned table.
*/
case PART_NORMAL:
if (!p_elem || part_info->partitions.push_back(p_elem))
{
mem_alloc_error(sizeof(partition_element));
YYABORT;
}
break;
default:
mem_alloc_error((part_id * 1000) + part_state);
YYABORT;
mem_alloc_error(sizeof(partition_element));
YYABORT;
}
p_elem->part_state= part_state;
p_elem->part_state= PART_NORMAL;
part_info->curr_part_elem= p_elem;
part_info->current_partition= p_elem;
part_info->use_default_partitions= FALSE;
......
......@@ -667,36 +667,17 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
#endif
next_chunk+= 5 + partition_info_len;
}
if (share->mysql_version > 50105 && next_chunk + 5 < buff_end)
#if MYSQL_VERSION_ID < 50200
if (share->mysql_version >= 50106 && share->mysql_version <= 50109)
{
/*
Partition state was introduced to support partition management in version 5.1.5
Partition state array was here in version 5.1.6 to 5.1.9, this code
makes it possible to load a 5.1.6 table in later versions. Can most
likely be removed at some point in time. Will only be used for
upgrades within 5.1 series of versions. Upgrade to 5.2 can only be
done from newer 5.1 versions.
*/
uint32 part_state_len= uint4korr(next_chunk);
#ifdef WITH_PARTITION_STORAGE_ENGINE
if ((share->part_state_len= part_state_len))
if (!(share->part_state=
(uchar*) memdup_root(&share->mem_root, next_chunk + 4,
part_state_len)))
{
my_free(buff, MYF(0));
goto err;
}
#else
if (part_state_len)
{
DBUG_PRINT("info", ("WITH_PARTITION_STORAGE_ENGINE is not defined"));
my_free(buff, MYF(0));
goto err;
}
#endif
next_chunk+= 4 + part_state_len;
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
else
{
share->part_state_len= 0;
share->part_state= NULL;
next_chunk+= 4;
}
#endif
keyinfo= share->key_info;
......
......@@ -136,7 +136,6 @@ bool mysql_create_frm(THD *thd, const char *file_name,
if (part_info)
{
create_info->extra_size+= part_info->part_info_len;
create_info->extra_size+= part_info->part_state_len;
}
#endif
......@@ -209,12 +208,6 @@ bool mysql_create_frm(THD *thd, const char *file_name,
my_write(file, (const byte*)part_info->part_info_string,
part_info->part_info_len + 1, MYF_RW))
goto err;
DBUG_PRINT("info", ("Part state len = %d", part_info->part_state_len));
int4store(buff, part_info->part_state_len);
if (my_write(file, (const byte*)buff, 4, MYF_RW) ||
my_write(file, (const byte*)part_info->part_state,
part_info->part_state_len, MYF_RW))
goto err;
}
else
#endif
......@@ -330,7 +323,7 @@ int rea_create_table(THD *thd, const char *path,
// Make sure mysql_create_frm din't remove extension
DBUG_ASSERT(*fn_rext(frm_name));
if (file->create_handler_files(path, create_info))
if (file->create_handler_files(path, NULL, CHF_CREATE_FLAG, create_info))
goto err_handler;
if (!create_info->frm_only && ha_create_table(thd, path, db, table_name,
create_info,0))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment