WL 2826: Error handling of ALTER TABLE for partitioning

Loads of review comments fixed
inactivate => deactivate
table log => ddl log
Commented on Error Inject Module added
Put various #defines into enums
Fixed abort_and_upgrade_lock, removed unnecessary parameter
Fixed mysqlish method intro's
Fixed warning statements
5.1.7 was released still with partition states in clear text

Fixed io_size bug
Fixed bug in open that TRUNCATED before reading :)
file_entry => file_entry_buf
Don't open DDL log until first write call to DDL log
handler_type => handler_name
no => num
parent ac10cffe
......@@ -701,7 +701,7 @@ int ha_partition::rename_partitions(const char *path)
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL; /* Indicate success */
......@@ -716,13 +716,13 @@ int ha_partition::rename_partitions(const char *path)
DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
else
part_elem->log_entry= NULL; /* Indicate success */
}
} while (++i < temp_partitions);
VOID(sync_table_log());
VOID(sync_ddl_log());
}
i= 0;
do
......@@ -771,9 +771,9 @@ int ha_partition::rename_partitions(const char *path)
DBUG_PRINT("info", ("Delete subpartition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
VOID(sync_table_log());
VOID(sync_ddl_log());
}
file= m_new_file[part];
create_subpartition_name(part_name_buff, path,
......@@ -785,7 +785,7 @@ int ha_partition::rename_partitions(const char *path)
if ((ret_error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(sub_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(sub_elem->log_entry->entry_pos))
error= 1;
else
sub_elem->log_entry= NULL;
......@@ -802,9 +802,9 @@ int ha_partition::rename_partitions(const char *path)
DBUG_PRINT("info", ("Delete partition %s", norm_name_buff));
if ((ret_error= file->delete_table((const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
VOID(sync_table_log());
VOID(sync_ddl_log());
}
file= m_new_file[i];
create_partition_name(part_name_buff, path,
......@@ -815,14 +815,14 @@ int ha_partition::rename_partitions(const char *path)
if ((ret_error= file->rename_table((const char *) part_name_buff,
(const char *) norm_name_buff)))
error= ret_error;
else if (inactivate_table_log_entry(part_elem->log_entry->entry_pos))
else if (deactivate_ddl_log_entry(part_elem->log_entry->entry_pos))
error= 1;
else
part_elem->log_entry= NULL;
}
}
} while (++i < no_parts);
VOID(sync_table_log());
VOID(sync_ddl_log());
DBUG_RETURN(error);
}
......
......@@ -631,9 +631,6 @@ struct Query_cache_query_flags
#else
#define SET_ERROR_INJECT_VALUE(x) \
current_thd->error_inject_value= (x)
inline bool
my_error_inject_name(const char *dbug_str)
{
......@@ -661,6 +658,43 @@ my_error_inject(int value)
return 0;
}
/*
ERROR INJECT MODULE:
--------------------
These macros are used to insert macros from the application code.
The event that activates those error injections can be activated
from SQL by using:
SET SESSION dbug=+d,code;
After the error has been injected, the macros will automatically
remove the debug code, thus similar to using:
SET SESSION dbug=-d,code
from SQL.
ERROR_INJECT_CRASH will inject a crash of the MySQL Server if code
is set when macro is called. ERROR_INJECT_CRASH can be used in
if-statements, it will always return FALSE unless of course it
crashes in which case it doesn't return at all.
ERROR_INJECT_ACTION will inject the action specified in the action
parameter of the macro, before performing the action the code will
be removed such that no more events occur. ERROR_INJECT_ACTION
can also be used in if-statements and always returns FALSE.
ERROR_INJECT can be used in a normal if-statement, where the action
part is performed in the if-block. The macro returns TRUE if the
error was activated and otherwise returns FALSE. If activated the
code is removed.
Sometimes it is necessary to perform error inject actions as a serie
of events. In this case one can use one variable on the THD object.
Thus one sets this value by using e.g. SET_ERROR_INJECT_VALUE(100).
Then one can later test for it by using ERROR_INJECT_CRASH_VALUE,
ERROR_INJECT_ACTION_VALUE and ERROR_INJECT_VALUE. This have the same
behaviour as the above described macros except that they use the
error inject value instead of a code used by DBUG macros.
*/
#define SET_ERROR_INJECT_VALUE(x) \
current_thd->error_inject_value= (x)
#define ERROR_INJECT_CRASH(code) \
DBUG_EVALUATE_IF(code, (abort(), 0), 0)
#define ERROR_INJECT_ACTION(code, action) \
......@@ -1186,57 +1220,86 @@ typedef struct st_lock_param_type
void mem_alloc_error(size_t size);
typedef struct st_table_log_entry
enum ddl_log_entry_code
{
/*
DDL_LOG_EXECUTE_CODE:
This is a code that indicates that this is a log entry to
be executed, from this entry a linked list of log entries
can be found and executed.
DDL_LOG_ENTRY_CODE:
An entry to be executed in a linked list from an execute log
entry.
DDL_IGNORE_LOG_ENTRY_CODE:
An entry that is to be ignored
*/
DDL_LOG_EXECUTE_CODE = 'e',
DDL_LOG_ENTRY_CODE = 'l',
DDL_IGNORE_LOG_ENTRY_CODE = 'i'
};
enum ddl_log_action_code
{
/*
The type of action that a DDL_LOG_ENTRY_CODE entry is to
perform.
DDL_LOG_DELETE_ACTION:
Delete an entity
DDL_LOG_RENAME_ACTION:
Rename an entity
DDL_LOG_REPLACE_ACTION:
Rename an entity after removing the previous entry with the
new name, that is replace this entry.
*/
DDL_LOG_DELETE_ACTION = 'd',
DDL_LOG_RENAME_ACTION = 'r',
DDL_LOG_REPLACE_ACTION = 's'
};
typedef struct st_ddl_log_entry
{
const char *name;
const char *from_name;
const char *handler_type;
const char *handler_name;
uint next_entry;
uint entry_pos;
char action_type;
char entry_type;
enum ddl_log_entry_code entry_type;
enum ddl_log_action_code action_type;
char phase;
char not_used;
} TABLE_LOG_ENTRY;
} DDL_LOG_ENTRY;
typedef struct st_table_log_memory_entry
typedef struct st_ddl_log_memory_entry
{
uint entry_pos;
struct st_table_log_memory_entry *next_log_entry;
struct st_table_log_memory_entry *prev_log_entry;
struct st_table_log_memory_entry *next_active_log_entry;
} TABLE_LOG_MEMORY_ENTRY;
struct st_ddl_log_memory_entry *next_log_entry;
struct st_ddl_log_memory_entry *prev_log_entry;
struct st_ddl_log_memory_entry *next_active_log_entry;
} DDL_LOG_MEMORY_ENTRY;
#define TLOG_EXECUTE_CODE 'e'
#define TLOG_LOG_ENTRY_CODE 'l'
#define TLOG_IGNORE_LOG_ENTRY_CODE 'i'
#define TLOG_DELETE_ACTION_CODE 'd'
#define TLOG_RENAME_ACTION_CODE 'r'
#define TLOG_REPLACE_ACTION_CODE 's'
#define TLOG_HANDLER_TYPE_LEN 32
#define DDL_LOG_HANDLER_TYPE_LEN 32
bool write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
TABLE_LOG_MEMORY_ENTRY **active_entry);
bool write_execute_table_log_entry(uint first_entry,
bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DDL_LOG_MEMORY_ENTRY **active_entry);
bool write_execute_ddl_log_entry(uint first_entry,
bool complete,
TABLE_LOG_MEMORY_ENTRY **active_entry);
bool inactivate_table_log_entry(uint entry_no);
void release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry);
bool sync_table_log();
void release_table_log();
void execute_table_log_recovery();
bool execute_table_log_entry(uint first_entry);
void lock_global_table_log();
void unlock_global_table_log();
DDL_LOG_MEMORY_ENTRY **active_entry);
bool deactivate_ddl_log_entry(uint entry_no);
void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry);
bool sync_ddl_log();
void release_ddl_log();
void execute_ddl_log_recovery();
bool execute_ddl_log_entry(uint first_entry);
void lock_global_ddl_log();
void unlock_global_ddl_log();
#define WFRM_WRITE_SHADOW 1
#define WFRM_INSTALL_SHADOW 2
#define WFRM_PACK_FRM 4
bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags);
bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt,
bool can_be_killed);
void abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt);
void close_open_tables_and_downgrade(ALTER_PARTITION_PARAM_TYPE *lpt);
void mysql_wait_completed_table(ALTER_PARTITION_PARAM_TYPE *lpt, TABLE *my_table);
......
......@@ -3665,7 +3665,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
unireg_abort(1);
}
}
execute_table_log_recovery();
execute_ddl_log_recovery();
create_shutdown_thread();
create_maintenance_thread();
......@@ -3696,7 +3696,7 @@ we force server id to 2, but this MySQL server will not act as a slave.");
/* (void) pthread_attr_destroy(&connection_attrib); */
DBUG_PRINT("quit",("Exiting main thread"));
release_table_log();
release_ddl_log();
#ifndef __WIN__
#ifdef EXTRA_DEBUG2
......
......@@ -36,7 +36,7 @@ enum partition_state {
PART_IS_ADDED= 8
};
struct st_table_log_memory_entry;
struct st_ddl_log_memory_entry;
class partition_element :public Sql_alloc {
public:
......@@ -46,7 +46,7 @@ public:
ulonglong part_min_rows;
char *partition_name;
char *tablespace_name;
struct st_table_log_memory_entry *log_entry;
struct st_ddl_log_memory_entry *log_entry;
longlong range_value;
char* part_comment;
char* data_file_name;
......
......@@ -28,7 +28,7 @@ typedef int (*get_part_id_func)(partition_info *part_info,
longlong *func_value);
typedef uint32 (*get_subpart_id_func)(partition_info *part_info);
struct st_table_log_memory_entry;
struct st_ddl_log_memory_entry;
class partition_info : public Sql_alloc
{
......@@ -77,9 +77,9 @@ public:
Item *item_free_list;
struct st_table_log_memory_entry *first_log_entry;
struct st_table_log_memory_entry *exec_log_entry;
struct st_table_log_memory_entry *frm_log_entry;
struct st_ddl_log_memory_entry *first_log_entry;
struct st_ddl_log_memory_entry *exec_log_entry;
struct st_ddl_log_memory_entry *frm_log_entry;
/*
A bitmap of partitions used by the current query.
......
......@@ -5826,5 +5826,5 @@ ER_NDB_CANT_SWITCH_BINLOG_FORMAT
eng "The NDB cluster engine does not support changing the binlog format on the fly yet"
ER_PARTITION_NO_TEMPORARY
eng "Cannot create temporary table with partitions"
ER_TABLE_LOG_ERROR
eng "Error in table log"
ER_DDL_LOG_ERROR
eng "Error in DDL log"
......@@ -6124,24 +6124,17 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
old_lock_level Old lock level
*/
bool abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt,
bool can_be_killed)
void abort_and_upgrade_lock(ALTER_PARTITION_PARAM_TYPE *lpt)
{
uint flags= RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG;
int error= FALSE;
DBUG_ENTER("abort_and_upgrade_locks");
lpt->old_lock_type= lpt->table->reginfo.lock_type;
VOID(pthread_mutex_lock(&LOCK_open));
mysql_lock_abort(lpt->thd, lpt->table, TRUE);
VOID(remove_table_from_cache(lpt->thd, lpt->db, lpt->table_name, flags));
if (can_be_killed && lpt->thd->killed)
{
lpt->thd->no_warnings_for_error= 0;
error= TRUE;
}
VOID(pthread_mutex_unlock(&LOCK_open));
DBUG_RETURN(error);
DBUG_VOID_RETURN;
}
......
......@@ -5037,10 +5037,8 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
NONE
*/
static
void
insert_part_info_log_entry_list(partition_info *part_info,
TABLE_LOG_MEMORY_ENTRY *log_entry)
static void insert_part_info_log_entry_list(partition_info *part_info,
DDL_LOG_MEMORY_ENTRY *log_entry)
{
log_entry->next_active_log_entry= part_info->first_log_entry;
part_info->first_log_entry= log_entry;
......@@ -5056,15 +5054,13 @@ insert_part_info_log_entry_list(partition_info *part_info,
NONE
*/
static
void
release_part_info_log_entries(TABLE_LOG_MEMORY_ENTRY *log_entry)
static void release_part_info_log_entries(DDL_LOG_MEMORY_ENTRY *log_entry)
{
DBUG_ENTER("release_part_info_log_entries");
while (log_entry)
{
release_table_log_memory_entry(log_entry);
release_ddl_log_memory_entry(log_entry);
log_entry= log_entry->next_active_log_entry;
}
DBUG_VOID_RETURN;
......@@ -5084,32 +5080,30 @@ release_part_info_log_entries(TABLE_LOG_MEMORY_ENTRY *log_entry)
FALSE Success
DESCRIPTION
Support routine that writes a replace or delete of an frm file into the
table log. It also inserts an entry that keeps track of used space into
ddl log. It also inserts an entry that keeps track of used space into
the partition info object
*/
static
bool
write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
uint next_entry,
const char *from_path,
const char *to_path,
bool replace_flag)
static bool write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
uint next_entry,
const char *from_path,
const char *to_path,
bool replace_flag)
{
TABLE_LOG_ENTRY table_log_entry;
TABLE_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_ENTRY ddl_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
DBUG_ENTER("write_log_replace_delete_frm");
if (replace_flag)
table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
table_log_entry.next_entry= next_entry;
table_log_entry.handler_type= "frm";
table_log_entry.name= to_path;
ddl_log_entry.action_type= DDL_LOG_DELETE_ACTION;
ddl_log_entry.next_entry= next_entry;
ddl_log_entry.handler_name= "frm";
ddl_log_entry.name= to_path;
if (replace_flag)
table_log_entry.from_name= from_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
ddl_log_entry.from_name= from_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
......@@ -5140,14 +5134,12 @@ write_log_replace_delete_frm(ALTER_PARTITION_PARAM_TYPE *lpt,
the partition handler.
*/
static
bool
write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry, const char *path)
static bool write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry, const char *path)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
char normal_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
......@@ -5170,8 +5162,8 @@ write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
do
{
partition_element *sub_elem= sub_it++;
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
......@@ -5181,13 +5173,13 @@ write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
part_elem->partition_name,
sub_elem->partition_name,
NORMAL_PART_NAME);
table_log_entry.name= normal_path;
table_log_entry.from_name= tmp_path;
ddl_log_entry.name= normal_path;
ddl_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
table_log_entry.action_type= TLOG_RENAME_ACTION_CODE;
if (write_table_log_entry(&table_log_entry, &log_entry))
ddl_log_entry.action_type= DDL_LOG_RENAME_ACTION;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
......@@ -5198,8 +5190,8 @@ write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
}
else
{
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
......@@ -5207,13 +5199,13 @@ write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
create_partition_name(normal_path, path,
part_elem->partition_name,
NORMAL_PART_NAME, TRUE);
table_log_entry.name= normal_path;
table_log_entry.from_name= tmp_path;
ddl_log_entry.name= normal_path;
ddl_log_entry.from_name= tmp_path;
if (part_elem->part_state == PART_IS_CHANGED)
table_log_entry.action_type= TLOG_REPLACE_ACTION_CODE;
ddl_log_entry.action_type= DDL_LOG_REPLACE_ACTION;
else
table_log_entry.action_type= TLOG_RENAME_ACTION_CODE;
if (write_table_log_entry(&table_log_entry, &log_entry))
ddl_log_entry.action_type= DDL_LOG_RENAME_ACTION;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
......@@ -5237,16 +5229,14 @@ write_log_changed_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
FALSE Success
*/
static
bool
write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry,
const char *path,
bool temp_list)
static bool write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint *next_entry,
const char *path,
bool temp_list)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
char tmp_path[FN_LEN];
List_iterator<partition_element> part_it(part_info->partitions);
List_iterator<partition_element> temp_it(part_info->temp_partitions);
......@@ -5255,7 +5245,7 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
uint i= 0;
DBUG_ENTER("write_log_dropped_partitions");
table_log_entry.action_type= TLOG_DELETE_ACTION_CODE;
ddl_log_entry.action_type= DDL_LOG_DELETE_ACTION;
if (temp_list)
no_elements= no_temp_partitions;
while (no_elements--)
......@@ -5284,15 +5274,15 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
do
{
partition_element *sub_elem= sub_it++;
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(sub_elem->engine_type);
create_subpartition_name(tmp_path, path,
part_elem->partition_name,
sub_elem->partition_name,
name_variant);
table_log_entry.name= tmp_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
ddl_log_entry.name= tmp_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
......@@ -5304,14 +5294,14 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
}
else
{
table_log_entry.next_entry= *next_entry;
table_log_entry.handler_type=
ddl_log_entry.next_entry= *next_entry;
ddl_log_entry.handler_name=
ha_resolve_storage_engine_name(part_elem->engine_type);
create_partition_name(tmp_path, path,
part_elem->partition_name,
name_variant, TRUE);
table_log_entry.name= tmp_path;
if (write_table_log_entry(&table_log_entry, &log_entry))
ddl_log_entry.name= tmp_path;
if (write_ddl_log_entry(&ddl_log_entry, &log_entry))
{
DBUG_RETURN(TRUE);
}
......@@ -5327,7 +5317,7 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
/*
Set execute log entry in table log for this partitioned table
Set execute log entry in ddl log for this partitioned table
SYNOPSIS
set_part_info_exec_log_entry()
part_info Partition info object
......@@ -5336,10 +5326,8 @@ write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
NONE
*/
static
void
set_part_info_exec_log_entry(partition_info *part_info,
TABLE_LOG_MEMORY_ENTRY *exec_log_entry)
static void set_part_info_exec_log_entry(partition_info *part_info,
DDL_LOG_MEMORY_ENTRY *exec_log_entry)
{
part_info->exec_log_entry= exec_log_entry;
exec_log_entry->next_active_log_entry= NULL;
......@@ -5358,41 +5346,38 @@ set_part_info_exec_log_entry(partition_info *part_info,
TRUE Error
FALSE Success
DESCRIPTION
Prepare an entry to the table log indicating a drop/install of the shadow frm
Prepare an entry to the ddl log indicating a drop/install of the shadow frm
file and its corresponding handler file.
*/
static
bool
write_log_drop_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool write_log_drop_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
char shadow_path[FN_LEN];
DBUG_ENTER("write_log_drop_shadow_frm");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
lock_global_table_log();
do
{
if (write_log_replace_delete_frm(lpt, 0UL, NULL,
(const char*)shadow_path, FALSE))
break;
log_entry= part_info->first_log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
break;
unlock_global_table_log();
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
} while (TRUE);
lock_global_ddl_log();
if (write_log_replace_delete_frm(lpt, 0UL, NULL,
(const char*)shadow_path, FALSE))
goto error;
log_entry= part_info->first_log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
unlock_global_ddl_log();
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= NULL;
my_error(ER_TABLE_LOG_ERROR, MYF(0));
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
......@@ -5410,17 +5395,15 @@ write_log_drop_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
file if failure occurs in the middle of the rename process.
*/
static
bool
write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char path[FN_LEN];
char shadow_path[FN_LEN];
TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DBUG_ENTER("write_log_rename_frm");
part_info->first_log_entry= NULL;
......@@ -5428,25 +5411,24 @@ write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
lpt->table_name, "");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
lock_global_table_log();
do
{
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
break;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
break;
release_part_info_log_entries(old_first_log_entry);
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
lock_global_ddl_log();
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
unlock_global_ddl_log();
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_TABLE_LOG_ERROR, MYF(0));
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
......@@ -5462,22 +5444,20 @@ write_log_rename_frm(ALTER_PARTITION_PARAM_TYPE *lpt)
TRUE Error
FALSE Success
DESCRIPTION
Prepare entries to the table log indicating all partitions to drop and to
Prepare entries to the ddl log indicating all partitions to drop and to
install the shadow frm file and remove the old frm file.
*/
static
bool
write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char tmp_path[FN_LEN];
char path[FN_LEN];
uint next_entry= 0;
TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DBUG_ENTER("write_log_drop_partition");
part_info->first_log_entry= NULL;
......@@ -5485,29 +5465,28 @@ write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
lpt->table_name, "");
build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
lpt->table_name, "#");
lock_global_table_log();
do
{
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
break;
if (write_log_replace_delete_frm(lpt, next_entry, (const char*)path,
(const char*)tmp_path, TRUE))
break;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
break;
release_part_info_log_entries(old_first_log_entry);
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
lock_global_ddl_log();
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
goto error;
if (write_log_replace_delete_frm(lpt, next_entry, (const char*)path,
(const char*)tmp_path, TRUE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
unlock_global_ddl_log();
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_TABLE_LOG_ERROR, MYF(0));
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
......@@ -5523,19 +5502,17 @@ write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
TRUE Error
FALSE Success
DESCRIPTION
Prepare entries to the table log indicating all partitions to drop and to
Prepare entries to the ddl log indicating all partitions to drop and to
remove the shadow frm file.
We always inject entries backwards in the list in the table log since we
We always inject entries backwards in the list in the ddl log since we
don't know the entry position until we have written it.
*/
static
bool
write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= NULL;
char tmp_path[FN_LEN];
char path[FN_LEN];
uint next_entry= 0;
......@@ -5545,27 +5522,26 @@ write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
lpt->table_name, "");
build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
lpt->table_name, "#");
lock_global_table_log();
do
{
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
break;
if (write_log_replace_delete_frm(lpt, next_entry, NULL, tmp_path,
FALSE))
break;
log_entry= part_info->first_log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
break;
unlock_global_table_log();
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
} while (TRUE);
lock_global_ddl_log();
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
FALSE))
goto error;
if (write_log_replace_delete_frm(lpt, next_entry, NULL, tmp_path,
FALSE))
goto error;
log_entry= part_info->first_log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
unlock_global_ddl_log();
set_part_info_exec_log_entry(part_info, exec_log_entry);
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= NULL;
my_error(ER_TABLE_LOG_ERROR, MYF(0));
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
......@@ -5586,17 +5562,15 @@ write_log_add_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
frm file.
*/
static
bool
write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
static bool write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
partition_info *part_info= lpt->part_info;
TABLE_LOG_MEMORY_ENTRY *log_entry;
TABLE_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry;
DDL_LOG_MEMORY_ENTRY *exec_log_entry= part_info->exec_log_entry;
char path[FN_LEN];
char shadow_path[FN_LEN];
TABLE_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
DDL_LOG_MEMORY_ENTRY *old_first_log_entry= part_info->first_log_entry;
uint next_entry= 0;
DBUG_ENTER("write_log_final_change_partition");
......@@ -5605,36 +5579,35 @@ write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
lpt->table_name, "");
build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
lpt->table_name, "#");
lock_global_table_log();
do
{
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
TRUE))
break;
if (write_log_changed_partitions(lpt, &next_entry, (const char*)path))
break;
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
break;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_table_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
break;
release_part_info_log_entries(old_first_log_entry);
unlock_global_table_log();
DBUG_RETURN(FALSE);
} while (TRUE);
lock_global_ddl_log();
if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path,
TRUE))
goto error;
if (write_log_changed_partitions(lpt, &next_entry, (const char*)path))
goto error;
if (write_log_replace_delete_frm(lpt, 0UL, path, shadow_path, FALSE))
goto error;
log_entry= part_info->first_log_entry;
part_info->frm_log_entry= log_entry;
if (write_execute_ddl_log_entry(log_entry->entry_pos,
FALSE, &exec_log_entry))
goto error;
release_part_info_log_entries(old_first_log_entry);
unlock_global_ddl_log();
DBUG_RETURN(FALSE);
error:
release_part_info_log_entries(part_info->first_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= old_first_log_entry;
part_info->frm_log_entry= NULL;
my_error(ER_TABLE_LOG_ERROR, MYF(0));
my_error(ER_DDL_LOG_ERROR, MYF(0));
DBUG_RETURN(TRUE);
}
/*
Remove entry from table log and release resources for others to use
Remove entry from ddl log and release resources for others to use
SYNOPSIS
write_log_completed()
......@@ -5644,37 +5617,30 @@ write_log_final_change_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
FALSE Success
*/
static
void
write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt, bool dont_crash)
static void write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt,
bool dont_crash)
{
partition_info *part_info= lpt->part_info;
uint count_loop= 0;
bool not_success;
TABLE_LOG_MEMORY_ENTRY *log_entry= part_info->exec_log_entry;
DDL_LOG_MEMORY_ENTRY *log_entry= part_info->exec_log_entry;
DBUG_ENTER("write_log_completed");
DBUG_ASSERT(log_entry);
lock_global_table_log();
do
{
if (!(not_success= write_execute_table_log_entry(0UL, TRUE, &log_entry)))
break;
my_sleep(1);
} while (count_loop++ < 20);
if (not_success && !dont_crash)
lock_global_ddl_log();
if (write_execute_ddl_log_entry(0UL, TRUE, &log_entry))
{
/*
Failed to write 20 consecutive attempts to write. Bad...
Failed to write, Bad...
We have completed the operation but have log records to REMOVE
stuff that shouldn't be removed. What clever things could one do
here?
*/
abort();
;
}
release_part_info_log_entries(part_info->first_log_entry);
release_part_info_log_entries(part_info->exec_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->exec_log_entry= NULL;
part_info->first_log_entry= NULL;
DBUG_VOID_RETURN;
......@@ -5690,14 +5656,12 @@ write_log_completed(ALTER_PARTITION_PARAM_TYPE *lpt, bool dont_crash)
NONE
*/
static
void
release_log_entries(partition_info *part_info)
static void release_log_entries(partition_info *part_info)
{
lock_global_table_log();
lock_global_ddl_log();
release_part_info_log_entries(part_info->first_log_entry);
release_part_info_log_entries(part_info->exec_log_entry);
unlock_global_table_log();
unlock_global_ddl_log();
part_info->first_log_entry= NULL;
part_info->exec_log_entry= NULL;
}
......@@ -5713,43 +5677,41 @@ release_log_entries(partition_info *part_info)
NONE
*/
void
handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt, bool not_completed,
bool drop_partition, bool frm_install)
void handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt,
bool not_completed,
bool drop_partition,
bool frm_install)
{
partition_info *part_info= lpt->part_info;
DBUG_ENTER("handle_alter_part_error");
if (!part_info->first_log_entry &&
execute_table_log_entry(part_info->first_log_entry->entry_pos))
execute_ddl_log_entry(part_info->first_log_entry->entry_pos))
{
/*
We couldn't recover from error, most likely manual interaction is required.
We couldn't recover from error, most likely manual interaction
is required.
*/
write_log_completed(lpt, FALSE);
release_log_entries(part_info);
if (not_completed)
{
char *text1=
(char*)"Operation was unsuccessful, table is still intact, ";
if (drop_partition)
{
/* Table is still ok, but we left a shadow frm file behind. */
char *text2=
(char*)"but it is possible that a shadow frm file was left behind";
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s \n %s", text1, text2);
"%s %s",
"Operation was unsuccessful, table is still intact,",
"but it is possible that a shadow frm file was left behind");
}
else
{
char *text2=
(char*)"but it is possible that a shadow frm file was left behind.";
char *text3=
(char*)"It is also possible that temporary partitions are left behind, ";
char *text4=
(char*)"these could be empty or more or less filled with records";
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s \n %s \n %s \n %s", text1, text2, text3, text4);
"%s %s %s %s",
"Operation was unsuccessful, table is still intact,",
"but it is possible that a shadow frm file was left behind.",
"It is also possible that temporary partitions are left behind,",
"these could be empty or more or less filled with records");
}
}
else
......@@ -5760,48 +5722,39 @@ handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt, bool not_completed,
Failed during install of shadow frm file, table isn't intact
and dropped partitions are still there
*/
char *text1=
(char*)"Failed during alter of partitions, table is no longer intact, ";
char *text2=
(char*)"The frm file is in an unknown state, and a backup";
char *text3=
(char*)" is required.";
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s \n %s%s\n", text1, text2, text3);
"%s %s %s",
"Failed during alter of partitions, table is no longer intact.",
"The frm file is in an unknown state, and a backup",
"is required.");
}
else if (drop_partition)
{
/*
Table is ok, we have switched to new table but left dropped partitions
still in their places. We remove the log records and ask the user to
perform the action manually. We remove the log records and ask the user
to perform the action manually.
Table is ok, we have switched to new table but left dropped
partitions still in their places. We remove the log records and
ask the user to perform the action manually. We remove the log
records and ask the user to perform the action manually.
*/
char *text1=
(char*)"Failed during drop of partitions, table is intact, ";
char *text2=
(char*)"Manual drop of remaining partitions is required";
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s\n%s", text1, text2);
"%s %s",
"Failed during drop of partitions, table is intact.",
"Manual drop of remaining partitions is required");
}
else
{
/*
We failed during renaming of partitions. The table is most certainly in
a very bad state so we give user warning and disable the table by
writing an ancient frm version into it.
We failed during renaming of partitions. The table is most
certainly in a very bad state so we give user warning and disable
the table by writing an ancient frm version into it.
*/
char *text1=
(char*)"Failed during renaming of partitions. We are now in a position";
char *text2=
(char*)" where table is not reusable";
char *text3=
(char*)"Table is disabled by writing ancient frm file version into it";
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"%s%s\n%s", text1, text2, text3);
"%s %s %s",
"Failed during renaming of partitions. We are now in a position",
"where table is not reusable",
"Table is disabled by writing ancient frm file version into it");
}
}
}
else
{
......@@ -5824,8 +5777,9 @@ handle_alter_part_error(ALTER_PARTITION_PARAM_TYPE *lpt, bool not_completed,
even though we reported an error the operation was successfully
completed.
*/
push_warning(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,
"Operation was successfully completed after failure of normal operation");
push_warning_printf(lpt->thd, MYSQL_ERROR::WARN_LEVEL_WARN, 1,"%s %s",
"Operation was successfully completed by failure handling,",
"after failure of normal operation");
}
}
DBUG_VOID_RETURN;
......@@ -5999,7 +5953,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
0) Write an entry that removes the shadow frm file if crash occurs
1) Write the new frm file as a shadow frm
2) Write the table log to ensure that the operation is completed
2) Write the ddl log to ensure that the operation is completed
even in the presence of a MySQL Server crash
3) Lock the table in TL_WRITE_ONLY to ensure all other accesses to
the table have completed
......@@ -6010,8 +5964,8 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
where crashes make strange things occur. In this placement it can
happen that the ALTER TABLE DROP PARTITION gets performed in the
master but not in the slaves if we have a crash, after writing the
table log but before writing the binlog. A solution to this would
require writing the statement first in the table log and then
ddl log but before writing the binlog. A solution to this would
require writing the statement first in the ddl log and then
when recovering from the crash read the binlog and insert it into
the binlog if not written already.
5) Install the previously written shadow frm file
......@@ -6019,7 +5973,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
reached the abort lock do that before downgrading the lock.
7) Prepare MyISAM handlers for drop of partitions
8) Drop the partitions
9) Remove entries from table log
9) Remove entries from ddl log
10) Wait until all accesses using the old frm file has completed
11) Complete query
......@@ -6033,7 +5987,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
write_log_drop_partition(lpt) ||
ERROR_INJECT_CRASH("crash_drop_partition_3") ||
((not_completed= FALSE), FALSE) ||
(abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
(abort_and_upgrade_lock(lpt), FALSE) ||
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
......@@ -6070,7 +6024,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
0) Write an entry that removes the shadow frm file if crash occurs
1) Write the new frm file as a shadow frm file
2) Log the changes to happen in table log
2) Log the changes to happen in ddl log
2) Add the new partitions
3) Lock all partitions in TL_WRITE_ONLY to ensure that no users
are still using the old partitioning scheme. Wait until all
......@@ -6082,7 +6036,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
6) Install the new frm file of the table where the partitions are
added to the table.
7) Wait until all accesses using the old frm file has completed
8) Remove entries from table log
8) Remove entries from ddl log
9) Complete query
*/
if (write_log_add_change_partition(lpt) ||
......@@ -6091,7 +6045,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
ERROR_INJECT_CRASH("crash_add_partition_2") ||
mysql_change_partitions(lpt) ||
ERROR_INJECT_CRASH("crash_add_partition_3") ||
(abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
(abort_and_upgrade_lock(lpt), FALSE) ||
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
......@@ -6174,7 +6128,7 @@ uint fast_alter_partition_table(THD *thd, TABLE *table,
write_log_final_change_partition(lpt) ||
ERROR_INJECT_CRASH("crash_change_partition_4") ||
((not_completed= FALSE), FALSE) ||
(abort_and_upgrade_lock(lpt, FALSE), FALSE) ||
(abort_and_upgrade_lock(lpt), FALSE) ||
((!thd->lex->no_write_to_binlog) &&
(write_bin_log(thd, FALSE,
thd->query, thd->query_length), FALSE)) ||
......
......@@ -244,7 +244,7 @@ static int mysql_copy_key_list(List<Key> *orig_key,
/*
--------------------------------------------------------------------------
MODULE: Table log
MODULE: DDL log
-----------------
This module is used to ensure that we can recover from crashes that occur
......@@ -253,12 +253,12 @@ static int mysql_copy_key_list(List<Key> *orig_key,
also that each table drop is entirely done and not "half-baked".
To support this we create log entries for each meta-data statement in the
table log while we are executing. These entries are dropped when the
ddl log while we are executing. These entries are dropped when the
operation is completed.
At recovery those entries that were not completed will be executed.
There is only one table log in the system and it is protected by a mutex
There is only one ddl log in the system and it is protected by a mutex
and there is a global struct that contains information about its current
state.
......@@ -268,82 +268,79 @@ static int mysql_copy_key_list(List<Key> *orig_key,
*/
typedef struct st_global_table_log
typedef struct st_global_ddl_log
{
char file_entry[IO_SIZE];
char file_entry_buf[4*IO_SIZE];
char file_name_str[FN_REFLEN];
char *file_name;
TABLE_LOG_MEMORY_ENTRY *first_free;
TABLE_LOG_MEMORY_ENTRY *first_used;
uint no_entries;
DDL_LOG_MEMORY_ENTRY *first_free;
DDL_LOG_MEMORY_ENTRY *first_used;
uint num_entries;
File file_id;
uint name_len;
uint handler_type_len;
uint handler_name_len;
uint io_size;
} GLOBAL_TABLE_LOG;
bool inited;
} GLOBAL_DDL_LOG;
GLOBAL_TABLE_LOG global_table_log;
GLOBAL_DDL_LOG global_ddl_log;
pthread_mutex_t LOCK_gtl;
pthread_mutex_t LOCK_gdl;
#define TLOG_ENTRY_TYPE_POS 0
#define TLOG_ACTION_TYPE_POS 1
#define TLOG_PHASE_POS 2
#define TLOG_NEXT_ENTRY_POS 4
#define TLOG_NAME_POS 8
#define DDL_LOG_ENTRY_TYPE_POS 0
#define DDL_LOG_ACTION_TYPE_POS 1
#define DDL_LOG_PHASE_POS 2
#define DDL_LOG_NEXT_ENTRY_POS 4
#define DDL_LOG_NAME_POS 8
#define TLOG_NO_ENTRY_POS 0
#define TLOG_NAME_LEN_POS 4
#define TLOG_HANDLER_TYPE_POS 8
#define TLOG_IO_SIZE_POS 12
#define DDL_LOG_NUM_ENTRY_POS 0
#define DDL_LOG_NAME_LEN_POS 4
#define DDL_LOG_HANDLER_TYPE_POS 8
#define DDL_LOG_IO_SIZE_POS 12
/*
Read one entry from table log file
Read one entry from ddl log file
SYNOPSIS
read_table_log_file_entry()
read_ddl_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
read_table_log_file_entry(uint entry_no)
static bool read_ddl_log_file_entry(uint entry_no)
{
bool error= FALSE;
File file_id= global_table_log.file_id;
char *file_entry= (char*)global_table_log.file_entry;
uint io_size= global_table_log.io_size;
DBUG_ENTER("read_table_log_file_entry");
File file_id= global_ddl_log.file_id;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
uint io_size= global_ddl_log.io_size;
DBUG_ENTER("read_ddl_log_file_entry");
if (my_pread(file_id, file_entry, io_size, io_size * entry_no,
MYF(MY_WME)) != IO_SIZE)
if (my_pread(file_id, file_entry_buf, io_size, io_size * entry_no,
MYF(MY_WME)) != io_size)
error= TRUE;
DBUG_RETURN(error);
}
/*
Write one entry from table log file
Write one entry from ddl log file
SYNOPSIS
write_table_log_file_entry()
write_ddl_log_file_entry()
entry_no Entry number to read
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
write_table_log_file_entry(uint entry_no)
static bool write_ddl_log_file_entry(uint entry_no)
{
bool error= FALSE;
File file_id= global_table_log.file_id;
char *file_entry= (char*)global_table_log.file_entry;
DBUG_ENTER("write_table_log_file_entry");
File file_id= global_ddl_log.file_id;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("write_ddl_log_file_entry");
if (my_pwrite(file_id, file_entry,
if (my_pwrite(file_id, file_entry_buf,
IO_SIZE, IO_SIZE * entry_no, MYF(MY_WME)) != IO_SIZE)
error= TRUE;
DBUG_RETURN(error);
......@@ -351,200 +348,197 @@ write_table_log_file_entry(uint entry_no)
/*
Write table log header
Write ddl log header
SYNOPSIS
write_table_log_header()
write_ddl_log_header()
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
write_table_log_header()
static bool write_ddl_log_header()
{
uint16 const_var;
bool error= FALSE;
DBUG_ENTER("write_table_log_header");
DBUG_ENTER("write_ddl_log_header");
int4store(&global_table_log.file_entry[TLOG_NO_ENTRY_POS],
global_table_log.no_entries);
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NUM_ENTRY_POS],
global_ddl_log.num_entries);
const_var= FN_LEN;
int4store(&global_table_log.file_entry[TLOG_NAME_LEN_POS],
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_LEN_POS],
const_var);
const_var= TLOG_HANDLER_TYPE_LEN;
int4store(&global_table_log.file_entry[TLOG_HANDLER_TYPE_POS],
const_var= DDL_LOG_HANDLER_TYPE_LEN;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_HANDLER_TYPE_POS],
const_var);
const_var= IO_SIZE;
int4store(&global_table_log.file_entry[TLOG_IO_SIZE_POS],
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_IO_SIZE_POS],
const_var);
if (write_table_log_file_entry(0UL))
if (write_ddl_log_file_entry(0UL))
error= TRUE;
if (!error)
VOID(sync_table_log());
VOID(sync_ddl_log());
DBUG_RETURN(error);
}
/*
Create table log file name
Create ddl log file name
SYNOPSIS
create_table_log_file_name()
create_ddl_log_file_name()
file_name Filename setup
RETURN VALUES
NONE
*/
static
void
create_table_log_file_name(char *file_name)
static inline void create_ddl_log_file_name(char *file_name)
{
strxmov(file_name, mysql_data_home, "/", "table_log.log", NullS);
strxmov(file_name, mysql_data_home, "/", "ddl_log.log", NullS);
}
/*
Read header of table log file
Read header of ddl log file
SYNOPSIS
read_table_log_header()
read_ddl_log_header()
RETURN VALUES
> 0 Last entry in table log
0 No entries in table log
> 0 Last entry in ddl log
0 No entries in ddl log
DESCRIPTION
When we read the table log header we get information about maximum sizes
of names in the table log and we also get information about the number
of entries in the table log.
When we read the ddl log header we get information about maximum sizes
of names in the ddl log and we also get information about the number
of entries in the ddl log.
*/
static
uint
read_table_log_header()
static uint read_ddl_log_header()
{
char *file_entry= (char*)global_table_log.file_entry;
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
char file_name[FN_REFLEN];
uint entry_no;
bool successful_open= FALSE;
DBUG_ENTER("read_table_log_header");
DBUG_ENTER("read_ddl_log_header");
bzero(file_entry, sizeof(global_table_log.file_entry));
create_table_log_file_name(file_name);
if (!(my_open(file_name, O_RDWR | O_TRUNC | O_BINARY, MYF(MY_WME))))
bzero(file_entry_buf, sizeof(global_ddl_log.file_entry_buf));
global_ddl_log.inited= FALSE;
create_ddl_log_file_name(file_name);
if (!(my_open(file_name, O_RDONLY | O_BINARY, MYF(MY_WME))))
{
if (read_table_log_file_entry(0UL))
if (read_ddl_log_file_entry(0UL))
{
; /* Write message into error log */
}
else
successful_open= TRUE;
}
entry_no= uint4korr(&file_entry[TLOG_NO_ENTRY_POS]);
global_table_log.name_len= uint4korr(&file_entry[TLOG_NAME_LEN_POS]);
global_table_log.handler_type_len=
uint4korr(&file_entry[TLOG_HANDLER_TYPE_POS]);
entry_no= uint4korr(&file_entry_buf[DDL_LOG_NUM_ENTRY_POS]);
global_ddl_log.name_len= uint4korr(&file_entry_buf[DDL_LOG_NAME_LEN_POS]);
global_ddl_log.handler_name_len=
uint4korr(&file_entry_buf[DDL_LOG_HANDLER_TYPE_POS]);
if (successful_open)
global_table_log.io_size= uint4korr(&file_entry[TLOG_IO_SIZE_POS]);
global_ddl_log.io_size= uint4korr(&file_entry_buf[DDL_LOG_IO_SIZE_POS]);
else
global_table_log.io_size= IO_SIZE;
global_table_log.first_free= NULL;
global_table_log.first_used= NULL;
global_table_log.no_entries= 0;
VOID(pthread_mutex_init(&LOCK_gtl, MY_MUTEX_INIT_FAST));
global_ddl_log.io_size= IO_SIZE;
global_ddl_log.first_free= NULL;
global_ddl_log.first_used= NULL;
global_ddl_log.num_entries= 0;
VOID(pthread_mutex_init(&LOCK_gdl, MY_MUTEX_INIT_FAST));
DBUG_RETURN(entry_no);
}
/*
Read a table log entry
Read a ddl log entry
SYNOPSIS
read_table_log_entry()
read_ddl_log_entry()
read_entry Number of entry to read
out:entry_info Information from entry
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Read a specified entry in the table log
Read a specified entry in the ddl log
*/
bool
read_table_log_entry(uint read_entry, TABLE_LOG_ENTRY *table_log_entry)
bool read_ddl_log_entry(uint read_entry, DDL_LOG_ENTRY *ddl_log_entry)
{
char *file_entry= (char*)&global_table_log.file_entry;
char *file_entry_buf= (char*)&global_ddl_log.file_entry_buf;
uint inx;
DBUG_ENTER("read_table_log_entry");
DBUG_ENTER("read_ddl_log_entry");
if (read_table_log_file_entry(read_entry))
if (read_ddl_log_file_entry(read_entry))
{
/* Error handling */
DBUG_RETURN(TRUE);
}
table_log_entry->entry_pos= read_entry;
table_log_entry->entry_type= file_entry[TLOG_ENTRY_TYPE_POS];
table_log_entry->action_type= file_entry[TLOG_ACTION_TYPE_POS];
table_log_entry->phase= file_entry[TLOG_PHASE_POS];
table_log_entry->next_entry= uint4korr(&file_entry[TLOG_NEXT_ENTRY_POS]);
table_log_entry->name= &file_entry[TLOG_NAME_POS];
inx= TLOG_NAME_POS + global_table_log.name_len;
table_log_entry->from_name= &file_entry[inx];
inx+= global_table_log.name_len;
table_log_entry->handler_type= &file_entry[inx];
ddl_log_entry->entry_pos= read_entry;
ddl_log_entry->entry_type=
(enum ddl_log_entry_code)file_entry_buf[DDL_LOG_ENTRY_TYPE_POS];
ddl_log_entry->action_type=
(enum ddl_log_action_code)file_entry_buf[DDL_LOG_ACTION_TYPE_POS];
ddl_log_entry->phase= file_entry_buf[DDL_LOG_PHASE_POS];
ddl_log_entry->next_entry= uint4korr(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS]);
ddl_log_entry->name= &file_entry_buf[DDL_LOG_NAME_POS];
inx= DDL_LOG_NAME_POS + global_ddl_log.name_len;
ddl_log_entry->from_name= &file_entry_buf[inx];
inx+= global_ddl_log.name_len;
ddl_log_entry->handler_name= &file_entry_buf[inx];
DBUG_RETURN(FALSE);
}
/*
Initialise table log
Initialise ddl log
SYNOPSIS
init_table_log()
init_ddl_log()
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
Write the header of the table log file and length of names. Also set
Write the header of the ddl log file and length of names. Also set
number of entries to zero.
*/
static
bool
init_table_log()
static bool init_ddl_log()
{
bool error= FALSE;
char file_name[FN_REFLEN];
DBUG_ENTER("init_table_log");
DBUG_ENTER("init_ddl_log");
global_table_log.io_size= IO_SIZE;
create_table_log_file_name(file_name);
if (global_ddl_log.inited)
{
DBUG_RETURN(FALSE);
}
global_ddl_log.io_size= IO_SIZE;
create_ddl_log_file_name(file_name);
VOID(my_delete(file_name, MYF(0)));
if ((global_table_log.file_id= my_create(file_name,
CREATE_MODE,
O_RDWR | O_TRUNC | O_BINARY,
MYF(MY_WME))) < 0)
if ((global_ddl_log.file_id= my_create(file_name,
CREATE_MODE,
O_RDWR | O_TRUNC | O_BINARY,
MYF(MY_WME))) < 0)
{
/* Couldn't create table log file, this is serious error */
/* Couldn't create ddl log file, this is serious error */
abort();
}
if (write_table_log_header())
if (write_ddl_log_header())
{
/* Write to error log */
error= TRUE;
}
global_ddl_log.inited= TRUE;
DBUG_RETURN(error);
}
/*
Execute one action in a table log entry
Execute one action in a ddl log entry
SYNOPSIS
execute_table_log_action()
table_log_entry Information in action entry to execute
execute_ddl_log_action()
ddl_log_entry Information in action entry to execute
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
execute_table_log_action(TABLE_LOG_ENTRY *table_log_entry)
static bool execute_ddl_log_action(DDL_LOG_ENTRY *ddl_log_entry)
{
bool frm_action= FALSE;
LEX_STRING handler_name;
......@@ -555,22 +549,22 @@ execute_table_log_action(TABLE_LOG_ENTRY *table_log_entry)
char from_path[FN_REFLEN];
char *par_ext= (char*)".par";
handlerton *hton;
DBUG_ENTER("execute_table_log_action");
DBUG_ENTER("execute_ddl_log_action");
if (table_log_entry->entry_type == TLOG_IGNORE_LOG_ENTRY_CODE)
if (ddl_log_entry->entry_type == DDL_IGNORE_LOG_ENTRY_CODE)
{
DBUG_RETURN(FALSE);
}
handler_name.str= (char*)table_log_entry->handler_type;
handler_name.length= strlen(table_log_entry->handler_type);
handler_name.str= (char*)ddl_log_entry->handler_name;
handler_name.length= strlen(ddl_log_entry->handler_name);
hton= ha_resolve_by_name(current_thd, &handler_name);
if (!hton)
{
my_error(ER_ILLEGAL_HA, MYF(0), table_log_entry->handler_type);
my_error(ER_ILLEGAL_HA, MYF(0), ddl_log_entry->handler_name);
DBUG_RETURN(TRUE);
}
init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0);
if (strcmp("frm", table_log_entry->handler_type))
if (strcmp("frm", ddl_log_entry->handler_name))
frm_action= TRUE;
else
{
......@@ -583,65 +577,65 @@ execute_table_log_action(TABLE_LOG_ENTRY *table_log_entry)
goto error;
}
}
switch (table_log_entry->action_type)
switch (ddl_log_entry->action_type)
{
case TLOG_DELETE_ACTION_CODE:
case TLOG_REPLACE_ACTION_CODE:
case DDL_LOG_DELETE_ACTION:
case DDL_LOG_REPLACE_ACTION:
{
if (table_log_entry->action_type == TLOG_DELETE_ACTION_CODE ||
(table_log_entry->action_type == TLOG_REPLACE_ACTION_CODE &&
table_log_entry->phase == 0UL))
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION ||
(ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION &&
ddl_log_entry->phase == 0UL))
{
if (frm_action)
{
strxmov(path, table_log_entry->name, reg_ext, NullS);
strxmov(path, ddl_log_entry->name, reg_ext, NullS);
if (my_delete(path, MYF(MY_WME)))
break;
strxmov(path, table_log_entry->name, par_ext, NullS);
strxmov(path, ddl_log_entry->name, par_ext, NullS);
if (my_delete(path, MYF(MY_WME)))
break;
}
else
{
if (file->delete_table(table_log_entry->name))
if (file->delete_table(ddl_log_entry->name))
break;
}
if ((!inactivate_table_log_entry(table_log_entry->entry_pos)))
if ((!deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
;
else
{
VOID(sync_table_log());
VOID(sync_ddl_log());
error= FALSE;
}
break;
}
if (table_log_entry->action_type == TLOG_DELETE_ACTION_CODE)
if (ddl_log_entry->action_type == DDL_LOG_DELETE_ACTION)
break;
}
case TLOG_RENAME_ACTION_CODE:
case DDL_LOG_RENAME_ACTION:
{
error= TRUE;
if (frm_action)
{
strxmov(path, table_log_entry->name, reg_ext, NullS);
strxmov(from_path, table_log_entry->from_name, reg_ext, NullS);
strxmov(path, ddl_log_entry->name, reg_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, reg_ext, NullS);
if (my_rename(path, from_path, MYF(MY_WME)))
break;
strxmov(path, table_log_entry->name, par_ext, NullS);
strxmov(from_path, table_log_entry->from_name, par_ext, NullS);
strxmov(path, ddl_log_entry->name, par_ext, NullS);
strxmov(from_path, ddl_log_entry->from_name, par_ext, NullS);
if (my_rename(path, from_path, MYF(MY_WME)))
break;
}
else
{
if (file->rename_table(table_log_entry->name,
table_log_entry->from_name))
if (file->rename_table(ddl_log_entry->name,
ddl_log_entry->from_name))
break;
if ((!inactivate_table_log_entry(table_log_entry->entry_pos)))
if ((!deactivate_ddl_log_entry(ddl_log_entry->entry_pos)))
;
else
{
VOID(sync_table_log());
VOID(sync_ddl_log());
error= FALSE;
}
}
......@@ -659,40 +653,38 @@ error:
/*
Get a free entry in the table log
Get a free entry in the ddl log
SYNOPSIS
get_free_table_log_entry()
out:active_entry A table log memory entry returned
get_free_ddl_log_entry()
out:active_entry A ddl log memory entry returned
RETURN VALUES
TRUE Error
FALSE Success
*/
static
bool
get_free_table_log_entry(TABLE_LOG_MEMORY_ENTRY **active_entry,
bool *write_header)
static bool get_free_ddl_log_entry(DDL_LOG_MEMORY_ENTRY **active_entry,
bool *write_header)
{
uint entry_no;
TABLE_LOG_MEMORY_ENTRY *used_entry;
TABLE_LOG_MEMORY_ENTRY *first_used= global_table_log.first_used;
DBUG_ENTER("get_free_table_log_entry");
DDL_LOG_MEMORY_ENTRY *used_entry;
DDL_LOG_MEMORY_ENTRY *first_used= global_ddl_log.first_used;
DBUG_ENTER("get_free_ddl_log_entry");
if (global_table_log.first_free == NULL)
if (global_ddl_log.first_free == NULL)
{
if (!(used_entry= (TABLE_LOG_MEMORY_ENTRY*)my_malloc(
sizeof(TABLE_LOG_MEMORY_ENTRY), MYF(MY_WME))))
if (!(used_entry= (DDL_LOG_MEMORY_ENTRY*)my_malloc(
sizeof(DDL_LOG_MEMORY_ENTRY), MYF(MY_WME))))
{
DBUG_RETURN(TRUE);
}
global_table_log.no_entries++;
used_entry->entry_pos= entry_no= global_table_log.no_entries;
global_ddl_log.num_entries++;
used_entry->entry_pos= entry_no= global_ddl_log.num_entries;
*write_header= TRUE;
}
else
{
used_entry= global_table_log.first_free;
global_table_log.first_free= used_entry->next_log_entry;
used_entry= global_ddl_log.first_free;
global_ddl_log.first_free= used_entry->next_log_entry;
entry_no= used_entry->entry_pos;
*write_header= FALSE;
}
......@@ -701,7 +693,7 @@ get_free_table_log_entry(TABLE_LOG_MEMORY_ENTRY **active_entry,
*/
used_entry->next_log_entry= first_used;
used_entry->prev_log_entry= NULL;
global_table_log.first_used= used_entry;
global_ddl_log.first_used= used_entry;
if (first_used)
first_used->prev_log_entry= used_entry;
......@@ -711,76 +703,79 @@ get_free_table_log_entry(TABLE_LOG_MEMORY_ENTRY **active_entry,
/*
External interface methods for the Table log Module
External interface methods for the DDL log Module
---------------------------------------------------
*/
/*
SYNOPSIS
write_table_log_entry()
table_log_entry Information about log entry
out:entry_written Entry information written into
write_ddl_log_entry()
ddl_log_entry Information about log entry
out:entry_written Entry information written into
RETURN VALUES
TRUE Error
FALSE Success
DESCRIPTION
A careful write of the table log is performed to ensure that we can
A careful write of the ddl log is performed to ensure that we can
handle crashes occurring during CREATE and ALTER TABLE processing.
*/
bool
write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
TABLE_LOG_MEMORY_ENTRY **active_entry)
bool write_ddl_log_entry(DDL_LOG_ENTRY *ddl_log_entry,
DDL_LOG_MEMORY_ENTRY **active_entry)
{
bool error, write_header;
DBUG_ENTER("write_table_log_entry");
global_table_log.file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_LOG_ENTRY_CODE;
global_table_log.file_entry[TLOG_ACTION_TYPE_POS]=
table_log_entry->action_type;
global_table_log.file_entry[TLOG_PHASE_POS]= 0;
int4store(&global_table_log.file_entry[TLOG_NEXT_ENTRY_POS],
table_log_entry->next_entry);
DBUG_ASSERT(strlen(table_log_entry->name) < FN_LEN);
strncpy(&global_table_log.file_entry[TLOG_NAME_POS],
table_log_entry->name, FN_LEN);
if (table_log_entry->action_type == TLOG_RENAME_ACTION_CODE ||
table_log_entry->action_type == TLOG_REPLACE_ACTION_CODE)
{
DBUG_ASSERT(strlen(table_log_entry->from_name) < FN_LEN);
strncpy(&global_table_log.file_entry[TLOG_NAME_POS + FN_LEN],
table_log_entry->from_name, FN_LEN);
DBUG_ENTER("write_ddl_log_entry");
if (init_ddl_log())
{
DBUG_RETURN(TRUE);
}
global_ddl_log.file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_ENTRY_CODE;
global_ddl_log.file_entry_buf[DDL_LOG_ACTION_TYPE_POS]=
ddl_log_entry->action_type;
global_ddl_log.file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&global_ddl_log.file_entry_buf[DDL_LOG_NEXT_ENTRY_POS],
ddl_log_entry->next_entry);
DBUG_ASSERT(strlen(ddl_log_entry->name) < FN_LEN);
strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS],
ddl_log_entry->name, FN_LEN);
if (ddl_log_entry->action_type == DDL_LOG_RENAME_ACTION ||
ddl_log_entry->action_type == DDL_LOG_REPLACE_ACTION)
{
DBUG_ASSERT(strlen(ddl_log_entry->from_name) < FN_LEN);
strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN],
ddl_log_entry->from_name, FN_LEN);
}
else
global_table_log.file_entry[TLOG_NAME_POS + FN_LEN]= 0;
DBUG_ASSERT(strlen(table_log_entry->handler_type) < FN_LEN);
strncpy(&global_table_log.file_entry[TLOG_NAME_POS + (2*FN_LEN)],
table_log_entry->handler_type, FN_LEN);
if (get_free_table_log_entry(active_entry, &write_header))
global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
DBUG_ASSERT(strlen(ddl_log_entry->handler_name) < FN_LEN);
strncpy(&global_ddl_log.file_entry_buf[DDL_LOG_NAME_POS + (2*FN_LEN)],
ddl_log_entry->handler_name, FN_LEN);
if (get_free_ddl_log_entry(active_entry, &write_header))
{
DBUG_RETURN(TRUE);
}
error= FALSE;
if (write_table_log_file_entry((*active_entry)->entry_pos))
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
error= TRUE;
if (write_header && !error)
{
VOID(sync_table_log());
if (write_table_log_header())
VOID(sync_ddl_log());
if (write_ddl_log_header())
error= TRUE;
}
if (error)
release_table_log_memory_entry(*active_entry);
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(error);
}
/*
Write final entry in the table log
Write final entry in the ddl log
SYNOPSIS
write_execute_table_log_entry()
write_execute_ddl_log_entry()
first_entry First entry in linked list of entries
to execute, if 0 = NULL it means that
the entry is removed and the entries
......@@ -794,50 +789,53 @@ write_table_log_entry(TABLE_LOG_ENTRY *table_log_entry,
FALSE Success
DESCRIPTION
This is the last write in the table log. The previous log entries have
This is the last write in the ddl log. The previous log entries have
already been written but not yet synched to disk.
*/
bool
write_execute_table_log_entry(uint first_entry,
bool complete,
TABLE_LOG_MEMORY_ENTRY **active_entry)
bool write_execute_ddl_log_entry(uint first_entry,
bool complete,
DDL_LOG_MEMORY_ENTRY **active_entry)
{
bool write_header= FALSE;
char *file_entry= (char*)global_table_log.file_entry;
DBUG_ENTER("write_execute_table_log_entry");
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("write_execute_ddl_log_entry");
if (init_ddl_log())
{
DBUG_RETURN(TRUE);
}
if (!complete)
{
VOID(sync_table_log());
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_EXECUTE_CODE;
VOID(sync_ddl_log());
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_LOG_EXECUTE_CODE;
}
else
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_CODE;
file_entry[TLOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
file_entry[TLOG_PHASE_POS]= 0;
int4store(&file_entry[TLOG_NEXT_ENTRY_POS], first_entry);
file_entry[TLOG_NAME_POS]= 0;
file_entry[TLOG_NAME_POS + FN_LEN]= 0;
file_entry[TLOG_NAME_POS + 2*FN_LEN]= 0;
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
file_entry_buf[DDL_LOG_ACTION_TYPE_POS]= 0; /* Ignored for execute entries */
file_entry_buf[DDL_LOG_PHASE_POS]= 0;
int4store(&file_entry_buf[DDL_LOG_NEXT_ENTRY_POS], first_entry);
file_entry_buf[DDL_LOG_NAME_POS]= 0;
file_entry_buf[DDL_LOG_NAME_POS + FN_LEN]= 0;
file_entry_buf[DDL_LOG_NAME_POS + 2*FN_LEN]= 0;
if (!(*active_entry))
{
if (get_free_table_log_entry(active_entry, &write_header))
if (get_free_ddl_log_entry(active_entry, &write_header))
{
DBUG_RETURN(TRUE);
}
}
if (write_table_log_file_entry((*active_entry)->entry_pos))
if (write_ddl_log_file_entry((*active_entry)->entry_pos))
{
release_table_log_memory_entry(*active_entry);
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
VOID(sync_table_log());
VOID(sync_ddl_log());
if (write_header)
{
if (write_table_log_header())
if (write_ddl_log_header())
{
release_table_log_memory_entry(*active_entry);
release_ddl_log_memory_entry(*active_entry);
DBUG_RETURN(TRUE);
}
}
......@@ -846,9 +844,9 @@ write_execute_table_log_entry(uint first_entry,
/*
For complex rename operations we need to inactivate individual entries.
For complex rename operations we need to deactivate individual entries.
SYNOPSIS
inactivate_table_log_entry()
deactivate_ddl_log_entry()
entry_no Entry position of record to change
RETURN VALUES
TRUE Error
......@@ -857,7 +855,7 @@ write_execute_table_log_entry(uint first_entry,
During replace operations where we start with an existing table called
t1 and a replacement table called t1#temp or something else and where
we want to delete t1 and rename t1#temp to t1 this is not possible to
do in a safe manner unless the table log is informed of the phases in
do in a safe manner unless the ddl log is informed of the phases in
the change.
Delete actions are 1-phase actions that can be ignored immediately after
......@@ -869,32 +867,31 @@ write_execute_table_log_entry(uint first_entry,
rename x -> y.
*/
bool
inactivate_table_log_entry(uint entry_no)
bool deactivate_ddl_log_entry(uint entry_no)
{
bool error= TRUE;
char *file_entry= (char*)global_table_log.file_entry;
DBUG_ENTER("inactivate_table_log_entry");
char *file_entry_buf= (char*)global_ddl_log.file_entry_buf;
DBUG_ENTER("deactivate_ddl_log_entry");
if (!read_table_log_file_entry(entry_no))
if (!read_ddl_log_file_entry(entry_no))
{
if (file_entry[TLOG_ENTRY_TYPE_POS] == TLOG_LOG_ENTRY_CODE)
if (file_entry_buf[DDL_LOG_ENTRY_TYPE_POS] == DDL_LOG_ENTRY_CODE)
{
if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_DELETE_ACTION_CODE ||
file_entry[TLOG_ACTION_TYPE_POS] == TLOG_RENAME_ACTION_CODE ||
(file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE &&
file_entry[TLOG_PHASE_POS] == 1))
file_entry[TLOG_ENTRY_TYPE_POS]= TLOG_IGNORE_LOG_ENTRY_CODE;
else if (file_entry[TLOG_ACTION_TYPE_POS] == TLOG_REPLACE_ACTION_CODE)
if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_DELETE_ACTION ||
file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_RENAME_ACTION ||
(file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION &&
file_entry_buf[DDL_LOG_PHASE_POS] == 1))
file_entry_buf[DDL_LOG_ENTRY_TYPE_POS]= DDL_IGNORE_LOG_ENTRY_CODE;
else if (file_entry_buf[DDL_LOG_ACTION_TYPE_POS] == DDL_LOG_REPLACE_ACTION)
{
DBUG_ASSERT(file_entry[TLOG_PHASE_POS] == 0);
file_entry[TLOG_PHASE_POS]= 1;
DBUG_ASSERT(file_entry_buf[DDL_LOG_PHASE_POS] == 0);
file_entry_buf[DDL_LOG_PHASE_POS]= 1;
}
else
{
DBUG_ASSERT(0);
}
if (!write_table_log_file_entry(entry_no))
if (!write_ddl_log_file_entry(entry_no))
error= FALSE;
}
}
......@@ -903,21 +900,24 @@ inactivate_table_log_entry(uint entry_no)
/*
Sync table log file
Sync ddl log file
SYNOPSIS
sync_table_log()
sync_ddl_log()
RETURN VALUES
TRUE Error
FALSE Success
*/
bool
sync_table_log()
bool sync_ddl_log()
{
bool error= FALSE;
DBUG_ENTER("sync_table_log");
DBUG_ENTER("sync_ddl_log");
if (my_sync(global_table_log.file_id, MYF(0)))
if (init_ddl_log())
{
DBUG_RETURN(TRUE);
}
if (my_sync(global_ddl_log.file_id, MYF(0)))
{
/* Write to error log */
error= TRUE;
......@@ -929,27 +929,26 @@ sync_table_log()
/*
Release a log memory entry
SYNOPSIS
release_table_log_memory_entry()
release_ddl_log_memory_entry()
log_memory_entry Log memory entry to release
RETURN VALUES
NONE
*/
void
release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry)
void release_ddl_log_memory_entry(DDL_LOG_MEMORY_ENTRY *log_entry)
{
TABLE_LOG_MEMORY_ENTRY *first_free= global_table_log.first_free;
TABLE_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
TABLE_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
DBUG_ENTER("release_table_log_memory_entry");
DDL_LOG_MEMORY_ENTRY *first_free= global_ddl_log.first_free;
DDL_LOG_MEMORY_ENTRY *next_log_entry= log_entry->next_log_entry;
DDL_LOG_MEMORY_ENTRY *prev_log_entry= log_entry->prev_log_entry;
DBUG_ENTER("release_ddl_log_memory_entry");
global_table_log.first_free= log_entry;
global_ddl_log.first_free= log_entry;
log_entry->next_log_entry= first_free;
if (prev_log_entry)
prev_log_entry->next_log_entry= next_log_entry;
else
global_table_log.first_used= next_log_entry;
global_ddl_log.first_used= next_log_entry;
if (next_log_entry)
next_log_entry->prev_log_entry= prev_log_entry;
DBUG_VOID_RETURN;
......@@ -957,75 +956,73 @@ release_table_log_memory_entry(TABLE_LOG_MEMORY_ENTRY *log_entry)
/*
Execute one entry in the table log. Executing an entry means executing
Execute one entry in the ddl log. Executing an entry means executing
a linked list of actions.
SYNOPSIS
execute_table_log_entry()
execute_ddl_log_entry()
first_entry Reference to first action in entry
RETURN VALUES
TRUE Error
FALSE Success
*/
bool
execute_table_log_entry(uint first_entry)
bool execute_ddl_log_entry(uint first_entry)
{
TABLE_LOG_ENTRY table_log_entry;
DDL_LOG_ENTRY ddl_log_entry;
uint read_entry= first_entry;
DBUG_ENTER("execute_table_log_entry");
DBUG_ENTER("execute_ddl_log_entry");
lock_global_table_log();
lock_global_ddl_log();
do
{
if (read_table_log_entry(read_entry, &table_log_entry))
if (read_ddl_log_entry(read_entry, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
break;
}
DBUG_ASSERT(table_log_entry.entry_type == TLOG_LOG_ENTRY_CODE ||
table_log_entry.entry_type == TLOG_IGNORE_LOG_ENTRY_CODE);
DBUG_ASSERT(ddl_log_entry.entry_type == DDL_LOG_ENTRY_CODE ||
ddl_log_entry.entry_type == DDL_IGNORE_LOG_ENTRY_CODE);
if (execute_table_log_action(&table_log_entry))
if (execute_ddl_log_action(&ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log and continue with next log entry */
break;
}
read_entry= table_log_entry.next_entry;
read_entry= ddl_log_entry.next_entry;
} while (read_entry);
unlock_global_table_log();
unlock_global_ddl_log();
DBUG_RETURN(FALSE);
}
/*
Execute the table log at recovery of MySQL Server
Execute the ddl log at recovery of MySQL Server
SYNOPSIS
execute_table_log_recovery()
execute_ddl_log_recovery()
RETURN VALUES
NONE
*/
void
execute_table_log_recovery()
void execute_ddl_log_recovery()
{
uint no_entries, i;
TABLE_LOG_ENTRY table_log_entry;
DBUG_ENTER("execute_table_log_recovery");
uint num_entries, i;
DDL_LOG_ENTRY ddl_log_entry;
DBUG_ENTER("execute_ddl_log_recovery");
no_entries= read_table_log_header();
for (i= 0; i < no_entries; i++)
num_entries= read_ddl_log_header();
for (i= 0; i < num_entries; i++)
{
if (read_table_log_entry(i, &table_log_entry))
if (read_ddl_log_entry(i, &ddl_log_entry))
{
DBUG_ASSERT(0);
/* Write to error log */
break;
}
if (table_log_entry.entry_type == TLOG_EXECUTE_CODE)
if (ddl_log_entry.entry_type == DDL_LOG_EXECUTE_CODE)
{
if (execute_table_log_entry(table_log_entry.next_entry))
if (execute_ddl_log_entry(ddl_log_entry.next_entry))
{
/*
Currently errors are either crashing or ignored so we should
......@@ -1036,78 +1033,75 @@ execute_table_log_recovery()
}
}
}
VOID(init_table_log());
VOID(init_ddl_log());
DBUG_VOID_RETURN;
}
/*
Release all memory allocated to the table log
Release all memory allocated to the ddl log
SYNOPSIS
release_table_log()
release_ddl_log()
RETURN VALUES
NONE
*/
void
release_table_log()
void release_ddl_log()
{
TABLE_LOG_MEMORY_ENTRY *free_list= global_table_log.first_free;
TABLE_LOG_MEMORY_ENTRY *used_list= global_table_log.first_used;
DBUG_ENTER("release_table_log");
DDL_LOG_MEMORY_ENTRY *free_list= global_ddl_log.first_free;
DDL_LOG_MEMORY_ENTRY *used_list= global_ddl_log.first_used;
DBUG_ENTER("release_ddl_log");
lock_global_table_log();
lock_global_ddl_log();
while (used_list)
{
TABLE_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
DDL_LOG_MEMORY_ENTRY *tmp= used_list->next_log_entry;
my_free((char*)used_list, MYF(0));
used_list= tmp;
}
while (free_list)
{
TABLE_LOG_MEMORY_ENTRY *tmp= free_list->next_log_entry;
DDL_LOG_MEMORY_ENTRY *tmp= free_list->next_log_entry;
my_free((char*)free_list, MYF(0));
free_list= tmp;
}
VOID(my_close(global_table_log.file_id, MYF(0)));
unlock_global_table_log();
VOID(pthread_mutex_destroy(&LOCK_gtl));
VOID(my_close(global_ddl_log.file_id, MYF(0)));
unlock_global_ddl_log();
VOID(pthread_mutex_destroy(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
Lock mutex for global table log
Lock mutex for global ddl log
SYNOPSIS
lock_global_table_log()
lock_global_ddl_log()
RETURN VALUES
NONE
*/
void
lock_global_table_log()
void lock_global_ddl_log()
{
DBUG_ENTER("lock_global_table_log");
DBUG_ENTER("lock_global_ddl_log");
VOID(pthread_mutex_lock(&LOCK_gtl));
VOID(pthread_mutex_lock(&LOCK_gdl));
DBUG_VOID_RETURN;
}
/*
Unlock mutex for global table log
Unlock mutex for global ddl log
SYNOPSIS
unlock_global_table_log()
unlock_global_ddl_log()
RETURN VALUES
NONE
*/
void
unlock_global_table_log()
void unlock_global_ddl_log()
{
DBUG_ENTER("unlock_global_table_log");
DBUG_ENTER("unlock_global_ddl_log");
VOID(pthread_mutex_unlock(&LOCK_gtl));
VOID(pthread_mutex_unlock(&LOCK_gdl));
DBUG_VOID_RETURN;
}
......@@ -1115,7 +1109,7 @@ unlock_global_table_log()
/*
---------------------------------------------------------------------------
END MODULE Table log
END MODULE DDL log
--------------------
---------------------------------------------------------------------------
......@@ -1254,8 +1248,8 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
VOID(pthread_mutex_lock(&LOCK_open));
if (my_delete(frm_name, MYF(MY_WME)) ||
#ifdef WITH_PARTITION_STORAGE_ENGINE
inactivate_table_log_entry(part_info->frm_log_entry->entry_pos) ||
(sync_table_log(), FALSE) ||
deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos) ||
(sync_ddl_log(), FALSE) ||
#endif
my_rename(shadow_frm_name, frm_name, MYF(MY_WME)) ||
lpt->table->file->create_handler_files(path, shadow_path, TRUE))
......@@ -1264,9 +1258,9 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
}
VOID(pthread_mutex_unlock(&LOCK_open));
#ifdef WITH_PARTITION_STORAGE_ENGINE
inactivate_table_log_entry(part_info->frm_log_entry->entry_pos);
deactivate_ddl_log_entry(part_info->frm_log_entry->entry_pos);
part_info->frm_log_entry= NULL;
VOID(sync_table_log());
VOID(sync_ddl_log());
#endif
}
......
......@@ -667,8 +667,9 @@ static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
#endif
next_chunk+= 5 + partition_info_len;
}
#if 0
if (share->mysql_version == 50106)
#if 1
if (share->mysql_version == 50106 ||
share->mysql_version == 50107)
{
/*
Partition state array was here in version 5.1.6, this code makes
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment