Commit 9a40c5bf authored by unknown's avatar unknown

WL 2826: Error handling of ALTER TABLE for partitioning

After review changes


mysql-test/r/ndb_partition_key.result:
  Fixed result file
sql/ha_ndbcluster.cc:
  Fixed interface to create_handler_files
sql/ha_ndbcluster.h:
  Fixed interface to create_handler_files
sql/ha_partition.cc:
  Fixed interface to create_handler_files and made it two-stage for rename
  Removed print_error and now it's used by MySQL Server parts instead
sql/ha_partition.h:
  Fixed interface to create_handler_files
sql/mysql_priv.h:
  Fixed error injects
  Externalised Global DDL log mutex
  Some interface changes
sql/mysqld.cc:
  Moved close of DDL log until all user threads been closed
sql/sql_base.cc:
  Interface changes
sql/sql_partition.cc:
  Moved print_error to mysql server part
sql/sql_table.cc:
  Lots of after review changes
sql/table.cc:
  Fixed upgrade code
parent b7b95ecf
......@@ -178,6 +178,7 @@ ALTER TABLE t1 ANALYZE PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
ALTER TABLE t1 REBUILD PARTITION p0;
ERROR HY000: Table storage engine for 't1' doesn't have this option
DROP TABLE t1;
CREATE TABLE t1 (
c1 MEDIUMINT NOT NULL AUTO_INCREMENT,
c2 TEXT NOT NULL,
......
......@@ -4689,7 +4689,7 @@ int ha_ndbcluster::create(const char *name,
int ha_ndbcluster::create_handler_files(const char *file,
const char *old_name,
bool rename_flag)
int action_flag)
{
const char *name;
Ndb* ndb;
......@@ -4700,7 +4700,7 @@ int ha_ndbcluster::create_handler_files(const char *file,
DBUG_ENTER("create_handler_files");
if (rename_flag)
if (action_flag)
{
DBUG_RETURN(FALSE);
}
......
......@@ -612,7 +612,7 @@ class ha_ndbcluster: public handler
int delete_table(const char *name);
int create(const char *name, TABLE *form, HA_CREATE_INFO *info);
int create_handler_files(const char *file, const char *old_name,
bool rename_flag);
int action_flag);
int get_default_no_partitions(ulonglong max_rows);
bool get_no_parts(const char *name, uint *no_parts);
void set_auto_partitions(partition_info *part_info);
......
......@@ -495,7 +495,7 @@ int ha_partition::rename_table(const char *from, const char *to)
int ha_partition::create_handler_files(const char *path,
const char *old_path,
bool rename_flag)
int action_flag)
{
DBUG_ENTER("ha_partition::create_handler_files()");
......@@ -503,15 +503,17 @@ int ha_partition::create_handler_files(const char *path,
We need to update total number of parts since we might write the handler
file as part of a partition management command
*/
if (rename_flag)
if (action_flag)
{
char name[FN_REFLEN];
char old_name[FN_REFLEN];
strxmov(name, path, ha_par_ext, NullS);
strxmov(old_name, old_path, ha_par_ext, NullS);
if (my_delete(name, MYF(MY_WME)) ||
my_rename(old_name, name, MYF(MY_WME)))
if ((action_flag == CHF_DELETE_FLAG &&
my_delete(name, MYF(MY_WME))) ||
(action_flag == CHF_RENAME_FLAG &&
my_rename(old_name, name, MYF(MY_WME))))
{
DBUG_RETURN(TRUE);
}
......@@ -1153,7 +1155,6 @@ int ha_partition::prepare_new_partition(TABLE *table,
error:
if (create_flag)
VOID(file->delete_table(part_name));
print_error(error, MYF(0));
DBUG_RETURN(error);
}
......@@ -1280,7 +1281,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(m_reorged_parts + 1))))
{
mem_alloc_error(sizeof(partition_element*)*(m_reorged_parts+1));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
/*
......@@ -1312,7 +1313,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(2*(no_remain_partitions + 1)))))
{
mem_alloc_error(sizeof(handler*)*2*(no_remain_partitions+1));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
m_added_file= &new_file_array[no_remain_partitions + 1];
......@@ -1384,7 +1385,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
part_elem->engine_type)))
{
mem_alloc_error(sizeof(handler));
DBUG_RETURN(TRUE);
DBUG_RETURN(ER_OUTOFMEMORY);
}
} while (++j < no_subparts);
}
......@@ -1432,7 +1433,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(const char *)part_name_buff)))
{
cleanup_new_partition(part_count);
DBUG_RETURN(TRUE);
DBUG_RETURN(error);
}
m_added_file[part_count++]= new_file_array[part];
} while (++j < no_subparts);
......@@ -1448,7 +1449,7 @@ int ha_partition::change_partitions(HA_CREATE_INFO *create_info,
(const char *)part_name_buff)))
{
cleanup_new_partition(part_count);
DBUG_RETURN(TRUE);
DBUG_RETURN(error);
}
m_added_file[part_count++]= new_file_array[i];
}
......@@ -1554,8 +1555,7 @@ int ha_partition::copy_partitions(ulonglong *copied, ulonglong *deleted)
}
DBUG_RETURN(FALSE);
error:
print_error(result, MYF(0));
DBUG_RETURN(TRUE);
DBUG_RETURN(result);
}
......
......@@ -180,7 +180,7 @@ public:
virtual int create(const char *name, TABLE *form,
HA_CREATE_INFO *create_info);
virtual int create_handler_files(const char *name,
const char *old_name, bool rename_flag);
const char *old_name, int action_flag);
virtual void update_create_info(HA_CREATE_INFO *create_info);
virtual char *update_table_comment(const char *comment);
virtual int change_partitions(HA_CREATE_INFO *create_info,
......
......@@ -633,8 +633,7 @@ struct Query_cache_query_flags
#else
inline bool
my_error_inject_name(const char *dbug_str)
inline bool check_and_unset_keyword(const char *dbug_str)
{
const char *extra_str= "-d,";
char total_str[200];
......@@ -649,7 +648,7 @@ my_error_inject_name(const char *dbug_str)
inline bool
my_error_inject(int value)
check_and_unset_inject_value(int value)
{
THD *thd= current_thd;
if (thd->error_inject_value == (uint)value)
......@@ -700,15 +699,15 @@ my_error_inject(int value)
#define ERROR_INJECT_CRASH(code) \
DBUG_EVALUATE_IF(code, (abort(), 0), 0)
#define ERROR_INJECT_ACTION(code, action) \
(my_error_inject_name(code) ? ((action), 0) : 0)
(check_and_unset_keyword(code) ? ((action), 0) : 0)
#define ERROR_INJECT(code) \
my_error_inject_name(code)
check_and_unset_keyword(code)
#define ERROR_INJECT_VALUE(value) \
my_error_inject(value)
check_and_unset_inject_value(value)
#define ERROR_INJECT_VALUE_ACTION(value,action) \
(my_error_inject(value) ? (action) : 0)
(check_and_unset_inject_value(value) ? (action) : 0)
#define ERROR_INJECT_VALUE_CRASH(value) \
(my_error_inject(value) ? abort() : 0)
ERROR_INJECT_VALUE_ACTION(value, (abort(), 0))
#endif
......@@ -1300,14 +1299,14 @@ bool sync_ddl_log();
void release_ddl_log();
void execute_ddl_log_recovery();
bool execute_ddl_log_entry(THD *thd, uint first_entry);
void lock_global_ddl_log();
void unlock_global_ddl_log();
extern pthread_mutex_t LOCK_gdl;
#define WFRM_WRITE_SHADOW 1
#define WFRM_INSTALL_SHADOW 2
#define WFRM_PACK_FRM 4
bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags);
void abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
int abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table);
......
......@@ -3696,7 +3696,6 @@ we force server id to 2, but this MySQL server will not act as a slave.");
/* (void) pthread_attr_destroy(&connection_attrib); */
DBUG_PRINT("quit",("Exiting main thread"));
release_ddl_log();
#ifndef __WIN__
#ifdef EXTRA_DEBUG2
......@@ -3718,6 +3717,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
pthread_cond_wait(&COND_thread_count,&LOCK_thread_count);
(void) pthread_mutex_unlock(&LOCK_thread_count);
release_ddl_log();
#if defined(__WIN__) && !defined(EMBEDDED_LIBRARY)
if (Service.IsNT() && start_mode)
Service.Stop();
......
......@@ -6140,9 +6140,8 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
abort_and_upgrade_lock()
lpt Parameter passing struct
All parameters passed through the ALTER_PARTITION_PARAM_TYPE object
RETURN VALUES
TRUE Failure
FALSE Success
RETURN VALUE
0
DESCRIPTION
Remember old lock level (for possible downgrade later on), abort all
waiting threads and ensure that all keeping locks currently are
......@@ -6156,7 +6155,7 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
old_lock_level Old lock level
*/
void abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
int abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
{
uint flags= RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG;
DBUG_ENTER("abort_and_upgrade_locks");
......@@ -6166,7 +6165,7 @@ void abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
mysql_lock_abort(lpt->thd, lpt->table, TRUE);
VOID(remove_table_from_cache(lpt->thd, lpt->db, lpt->table_name, flags));
VOID(pthread_mutex_unlock(&LOCK_open));
DBUG_VOID_RETURN;
DBUG_RETURN(0);
}
......
This diff is collapsed.
......@@ -270,6 +270,12 @@ static int mysql_copy_key_list(List<Key> *orig_key,
typedef struct st_global_ddl_log
{
/*
We need to adjust buffer size to be able to handle downgrades/upgrades
where IO_SIZE has changed. We'll set the buffer size such that we can
handle that the buffer size was upto 4 times bigger in the version
that wrote the DDL log.
*/
char file_entry_buf[4*IO_SIZE];
char file_name_str[FN_REFLEN];
char *file_name;
......@@ -278,7 +284,6 @@ typedef struct st_global_ddl_log
uint num_entries;
File file_id;
uint name_len;
uint handler_name_len;
uint io_size;
bool inited;
bool recovery_phase;
......@@ -296,8 +301,7 @@ pthread_mutex_t LOCK_gdl;
#define DDL_LOG_NUM_ENTRY_POS 0
#define DDL_LOG_NAME_LEN_POS 4
#define DDL_LOG_HANDLER_TYPE_POS 8
#define DDL_LOG_IO_SIZE_POS 12
#define DDL_LOG_IO_SIZE_POS 8
/*
Read one entry from ddl log file
......@@ -368,9 +372,6 @@ static bool write_ddl_log_header()
const_var= FN_LEN;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_LEN_POS],
const_var);
const_var= DDL_LOG_HANDLER_TYPE_LEN;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_HANDLER_TYPE_POS],
const_var);
const_var= IO_SIZE;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS],
const_var);
......@@ -424,7 +425,8 @@ static uint read_ddl_log_header()
global_ddl_log.inited= FALSE;
global_ddl_log.recovery_phase= TRUE;
create_ddl_log_file_name(file_name);
if (!(my_open(file_name, O_RDWR | O_BINARY, MYF(MY_WME))))
if (!(global_ddl_log.file_id= my_open(file_name,
O_RDWR | O_BINARY, MYF(MY_WME))))
{
if (read_ddl_log_file_entry(0UL))
{
......@@ -436,10 +438,12 @@ static uint read_ddl_log_header()
}
entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
global_ddl_log.handler_name_len=
uint4korr(&file_entry_buf[DDL_LOG_HANDLER_TYPE_POS]);
if (successful_open)
if (successful_open)
{
global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
DBUG_ASSERT(global_ddl_log.io_size <=
sizeof(global_ddl_log.file_entry_buf));
}
else
{
global_ddl_log.io_size= IO_SIZE;
......@@ -790,7 +794,9 @@ bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
to execute, if 0 = NULL it means that
the entry is removed and the entries
are put into the free list.
in:out:exec_entry Entry to execute, 0 = NULL if the entry
complete Flag indicating we are simply writing
info about that entry has been completed
in:out:active_entry Entry to execute, 0 = NULL if the entry
is written first time and needs to be
returned. In this case the entry written
is returned in this parameter
......@@ -1003,12 +1009,11 @@ bool execute_ddl_log_entry(THD *thd, uint first_entry)
uint read_entry= first_entry;
DBUG_ENTER("execute_ddl_log_entry");
lock_global_ddl_log();
pthread_mutex_lock(&LOCK_gdl);
do
{
if (read_ddl_log_entry(read_entry, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
sql_print_error("Failed to read entry = %u from ddl log",
read_entry);
......@@ -1019,7 +1024,6 @@ bool execute_ddl_log_entry(THD *thd, uint first_entry)
if (execute_ddl_log_action(thd, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
sql_print_error("Failed to execute action for entry = %u from ddl log",
read_entry);
......@@ -1027,7 +1031,7 @@ bool execute_ddl_log_entry(THD *thd, uint first_entry)
}
read_entry= ddl_log_entry.next_entry;
} while (read_entry);
unlock_global_ddl_log();
pthread_mutex_unlock(&LOCK_gdl);
DBUG_RETURN(FALSE);
}
......@@ -1061,7 +1065,6 @@ void execute_ddl_log_recovery()
{
if (read_ddl_log_entry(i, &ddl_log_entry))
{
DBUG_ASSERT(0);
sql_print_error("Failed to read entry no = %u from ddl log",
i);
continue;
......@@ -1071,7 +1074,6 @@ void execute_ddl_log_recovery()
if (execute_ddl_log_entry(thd, ddl_log_entry.next_entry))
{
/* Real unpleasant scenario but we continue anyways. */
DBUG_ASSERT(0);
continue;
}
}
......@@ -1100,7 +1102,7 @@ void release_ddl_log()
DDL_LOG_MEMORY_ENTRY *used_list= global_ddl_log.first_used;
DBUG_ENTER("release_ddl_log");
lock_global_ddl_log();
pthread_mutex_lock(&LOCK_gdl);
while (used_list)
{
DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
......@@ -1114,46 +1116,12 @@ void release_ddl_log()
free_list= tmp;
}
VOID(my_close(global_ddl_log.file_id, MYF(0)));
unlock_global_ddl_log();
pthread_mutex_unlock(&LOCK_gdl);
VOID(pthread_mutex_destroy(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
Lock mutex for global ddl log
SYNOPSIS
lock_global_ddl_log()
RETURN VALUES
NONE
*/
void lock_global_ddl_log()
{
DBUG_ENTER("lock_global_ddl_log");
VOID(pthread_mutex_lock(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
Unlock mutex for global ddl log
SYNOPSIS
unlock_global_ddl_log()
RETURN VALUES
NONE
*/
void unlock_global_ddl_log()
{
DBUG_ENTER("unlock_global_ddl_log");
VOID(pthread_mutex_unlock(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
---------------------------------------------------------------------------
......@@ -1296,11 +1264,14 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
VOID(pthread_mutex_lock(&LOCK_open));
if (my_delete(frm_name, MYF(MY_WME)) ||
#ifdef WITH_PARTITION_STORAGE_ENGINE
lpt->table->file->create_handler_files(path, shadow_path,
CHF_DELETE_FLAG) ||
deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos) ||
(sync_ddl_log(), FALSE) ||
#endif
my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
lpt->table->file->create_handler_files(path, shadow_path, TRUE))
lpt->table->file->create_handler_files(path, shadow_path,
CHF_RENAME_FLAG))
{
error= 1;
}
......
......@@ -667,15 +667,16 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
#endif
next_chunk+= 5 + partition_info_len;
}
#if 1
if (share->mysql_version == 50106 ||
share->mysql_version == 50107)
#if MYSQL_VERSION_ID < 50200
if (share->mysql_version >= 50106 && share->mysql_version <= 50109)
{
/*
Partition state array was here in version 5.1.6, this code makes
it possible to load a 5.1.6 table in later versions. Can most
likely be removed at some point in time.
*/
Partition state array was here in version 5.1.6 to 5.1.9, this code
makes it possible to load a 5.1.6 table in later versions. Can most
likely be removed at some point in time. Will only be used for
upgrades within 5.1 series of versions. Upgrade to 5.2 can only be
done from newer 5.1 versions.
*/
next_chunk+= 4;
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment