Commit 59ada157 authored by mats@romeo.(none)'s avatar mats@romeo.(none)

BUG#19033 (RBR: slave does not handle schema changes correctly):

Since checking table compatibility before locking the table, there were
potential that a table could be locked that did not have a definition
that was compatible with the table on the slave.

This patch adds a check just after the table was locked to ensure that
the table is (still) compatible with the table on the slave.
parent 6712c096
...@@ -32,15 +32,6 @@ ...@@ -32,15 +32,6 @@
#include <mysql/plugin.h> #include <mysql/plugin.h>
/*
Define placement versions of operator new and operator delete since
we cannot be sure that the <new> include exists.
*/
inline void *operator new(size_t, void *ptr) { return ptr; }
inline void *operator new[](size_t, void *ptr) { return ptr; }
inline void operator delete(void*, void*) { /* Do nothing */ }
inline void operator delete[](void*, void*) { /* Do nothing */ }
/* max size of the log message */ /* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024 #define MAX_LOG_BUFFER_SIZE 1024
#define MAX_USER_HOST_SIZE 512 #define MAX_USER_HOST_SIZE 512
...@@ -148,8 +139,8 @@ class binlog_trx_data { ...@@ -148,8 +139,8 @@ class binlog_trx_data {
*/ */
void truncate(my_off_t pos) void truncate(my_off_t pos)
{ {
DBUG_PRINT("info", ("truncating to position %lu", pos)); DBUG_PRINT("info", ("truncating to position %ld", pos));
DBUG_PRINT("info", ("before_stmt_pos=%lu", pos)); DBUG_PRINT("info", ("before_stmt_pos=%lu", (void*) pos));
delete pending(); delete pending();
set_pending(0); set_pending(0);
reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
...@@ -3481,9 +3472,9 @@ int THD::binlog_flush_transaction_cache() ...@@ -3481,9 +3472,9 @@ int THD::binlog_flush_transaction_cache()
{ {
DBUG_ENTER("binlog_flush_transaction_cache"); DBUG_ENTER("binlog_flush_transaction_cache");
binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); DBUG_PRINT("enter", ("trx_data=0x%lu", (void*) trx_data));
if (trx_data) if (trx_data)
DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%d",
trx_data->before_stmt_pos)); trx_data->before_stmt_pos));
/* /*
......
...@@ -5753,15 +5753,45 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) ...@@ -5753,15 +5753,45 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
} }
/*
When the open and locking succeeded, we check all tables to
ensure that they still have the correct type.
We can use a down cast here since we know that every table added
to the tables_to_lock is a RPL_TABLE_LIST.
*/
{
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rli->tables_to_lock);
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
{
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
thd->query_error= 1;
rli->clear_tables_to_lock();
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
}
}
/* /*
When the open and locking succeeded, we add all the tables to ... and then we add all the tables to the table map and remove
the table map and remove them from tables to lock. them from tables to lock.
We also invalidate the query cache for all the tables, since We also invalidate the query cache for all the tables, since
they will now be changed. they will now be changed.
TODO [/Matz]: Maybe the query cache should not be invalidated
here? It might be that a table is not changed, even though it
was locked for the statement. We do know that each
Rows_log_event contain at least one row, so after processing one
Rows_log_event, we can invalidate the query cache for the
associated table.
*/ */
TABLE_LIST *ptr; for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
for (ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
{ {
rli->m_table_map.set_table(ptr->table_id, ptr->table); rli->m_table_map.set_table(ptr->table_id, ptr->table);
} }
...@@ -6214,11 +6244,11 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli) ...@@ -6214,11 +6244,11 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
thd->query_id= next_query_id(); thd->query_id= next_query_id();
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
TABLE_LIST *table_list; RPL_TABLE_LIST *table_list;
char *db_mem, *tname_mem; char *db_mem, *tname_mem;
void *const memory= void *const memory=
my_multi_malloc(MYF(MY_WME), my_multi_malloc(MYF(MY_WME),
&table_list, sizeof(TABLE_LIST), &table_list, sizeof(RPL_TABLE_LIST),
&db_mem, NAME_LEN + 1, &db_mem, NAME_LEN + 1,
&tname_mem, NAME_LEN + 1, &tname_mem, NAME_LEN + 1,
NULL); NULL);
...@@ -6264,11 +6294,27 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli) ...@@ -6264,11 +6294,27 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
} }
/* /*
Open the table if it is not already open and add the table to table map. Open the table if it is not already open and add the table to
Note that for any table that should not be replicated, a filter is needed. table map. Note that for any table that should not be
replicated, a filter is needed.
The creation of a new TABLE_LIST is used to up-cast the
table_list consisting of RPL_TABLE_LIST items. This will work
since the only case where the argument to open_tables() is
changed, is when thd->lex->query_tables == table_list, i.e.,
when the statement requires prelocking. Since this is not
executed when a statement is executed, this case will not occur.
As a precaution, an assertion is added to ensure that the bad
case is not a fact.
Either way, the memory in the list is *never* released
internally in the open_tables() function, hence we take a copy
of the pointer to make sure that it's not lost.
*/ */
uint count; uint count;
if ((error= open_tables(thd, &table_list, &count, 0))) DBUG_ASSERT(thd->lex->query_tables != table_list);
TABLE_LIST *tmp_table_list= table_list;
if ((error= open_tables(thd, &tmp_table_list, &count, 0)))
{ {
if (thd->query_error || thd->is_fatal_error) if (thd->query_error || thd->is_fatal_error)
{ {
...@@ -6295,14 +6341,12 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli) ...@@ -6295,14 +6341,12 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
*/ */
DBUG_ASSERT(m_table->in_use); DBUG_ASSERT(m_table->in_use);
table_def const def(m_coltype, m_colcnt); /*
if (def.compatible_with(rli, m_table)) Use placement new to construct the table_def instance in the
{ memory allocated for it inside table_list.
thd->query_error= 1; */
error= ERR_BAD_TABLE_DEF; const table_def *const def=
goto err; new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt);
/* purecov: end */
}
/* /*
We record in the slave's information that the table should be We record in the slave's information that the table should be
......
...@@ -1721,14 +1721,17 @@ class Table_map_log_event : public Log_event ...@@ -1721,14 +1721,17 @@ class Table_map_log_event : public Log_event
TYPE_CODE = TABLE_MAP_EVENT TYPE_CODE = TABLE_MAP_EVENT
}; };
/**
Enumeration of the errors that can be returned.
*/
enum enum_error enum enum_error
{ {
ERR_OPEN_FAILURE = -1, /* Failure to open table */ ERR_OPEN_FAILURE = -1, /**< Failure to open table */
ERR_OK = 0, /* No error */ ERR_OK = 0, /**< No error */
ERR_TABLE_LIMIT_EXCEEDED = 1, /* No more room for tables */ ERR_TABLE_LIMIT_EXCEEDED = 1, /**< No more room for tables */
ERR_OUT_OF_MEM = 2, /* Out of memory */ ERR_OUT_OF_MEM = 2, /**< Out of memory */
ERR_BAD_TABLE_DEF = 3, /* Table definition does not match */ ERR_BAD_TABLE_DEF = 3, /**< Table definition does not match */
ERR_RBR_TO_SBR = 4 /* daisy-chanining RBR to SBR not allowed */ ERR_RBR_TO_SBR = 4 /**< daisy-chanining RBR to SBR not allowed */
}; };
enum enum_flag enum enum_flag
...@@ -1808,7 +1811,7 @@ class Table_map_log_event : public Log_event ...@@ -1808,7 +1811,7 @@ class Table_map_log_event : public Log_event
Row level log event class. Row level log event class.
Common base class for all row-level log events. Common base class for all row-containing log events.
RESPONSIBILITIES RESPONSIBILITIES
...@@ -1822,6 +1825,19 @@ class Table_map_log_event : public Log_event ...@@ -1822,6 +1825,19 @@ class Table_map_log_event : public Log_event
class Rows_log_event : public Log_event class Rows_log_event : public Log_event
{ {
public: public:
/**
Enumeration of the errors that can be returned.
*/
enum enum_error
{
ERR_OPEN_FAILURE = -1, /**< Failure to open table */
ERR_OK = 0, /**< No error */
ERR_TABLE_LIMIT_EXCEEDED = 1, /**< No more room for tables */
ERR_OUT_OF_MEM = 2, /**< Out of memory */
ERR_BAD_TABLE_DEF = 3, /**< Table definition does not match */
ERR_RBR_TO_SBR = 4 /**< daisy-chanining RBR to SBR not allowed */
};
/* /*
These definitions allow you to combine the flags into an These definitions allow you to combine the flags into an
appropriate flag set using the normal bitwise operators. The appropriate flag set using the normal bitwise operators. The
...@@ -1829,7 +1845,6 @@ class Rows_log_event : public Log_event ...@@ -1829,7 +1845,6 @@ class Rows_log_event : public Log_event
accepted by the compiler, which is then used to set the real set accepted by the compiler, which is then used to set the real set
of flags. of flags.
*/ */
enum enum_flag enum enum_flag
{ {
/* Last event of a statement */ /* Last event of a statement */
......
...@@ -24,97 +24,96 @@ ...@@ -24,97 +24,96 @@
#include "mysql_priv.h" #include "mysql_priv.h"
uint32 uint32
field_length_from_packed(enum_field_types const field_type, field_length_from_packed(enum_field_types field_type, byte const *data);
byte const *const data);
/* /**
A table definition from the master. A table definition from the master.
RESPONSIBILITIES The responsibilities of this class is:
- Extract and decode table definition data from the table map event - Extract and decode table definition data from the table map event
- Check if table definition in table map is compatible with table - Check if table definition in table map is compatible with table
definition on slave definition on slave
DESCRIPTION Currently, the only field type data available is an array of the
type operators that are present in the table map event.
Currently, the only field type data available is an array of the
type operators that are present in the table map event.
TODO
Add type operands to this structure to allow detection of @todo Add type operands to this structure to allow detection of
difference between, e.g., BIT(5) and BIT(10). difference between, e.g., BIT(5) and BIT(10).
*/ */
class table_def class table_def
{ {
public: public:
/* /**
Convenience declaration of the type of the field type data in a Convenience declaration of the type of the field type data in a
table map event. table map event.
*/ */
typedef unsigned char field_type; typedef unsigned char field_type;
/* /**
Constructor. Constructor.
SYNOPSIS @param types Array of types
table_def() @param size Number of elements in array 'types'
types Array of types
size Number of elements in array 'types'
*/ */
table_def(field_type *types, my_size_t size) table_def(field_type *types, my_size_t size)
: m_type(types), m_size(size) : m_type(new unsigned char [size]), m_size(size)
{ {
if (m_type)
memcpy(m_type, types, size);
else
m_size= 0;
} }
/* ~table_def() {
Return the number of fields there is type data for. if (m_type)
delete [] m_type;
#ifndef DBUG_OFF
m_type= 0;
m_size= 0;
#endif
}
SYNOPSIS /**
size() Return the number of fields there is type data for.
RETURN VALUE @return The number of fields that there is type data for.
The number of fields that there is type data for.
*/ */
my_size_t size() const { return m_size; } my_size_t size() const { return m_size; }
/* /*
Return a representation of the type data for one field. Return a representation of the type data for one field.
SYNOPSIS @param index Field index to return data for
type()
i Field index to return data for
RETURN VALUE
Will return a representation of the type data for field @return Will return a representation of the type data for field
'i'. Currently, only the type identifier is returned. <code>index</code>. Currently, only the type identifier is
returned.
*/ */
field_type type(my_ptrdiff_t i) const { return m_type[i]; } field_type type(my_ptrdiff_t index) const
{
DBUG_ASSERT(0 <= index);
DBUG_ASSERT(static_cast<my_size_t>(index) < m_size);
return m_type[index];
}
/* /**
Decide if the table definition is compatible with a table. Decide if the table definition is compatible with a table.
SYNOPSIS Compare the definition with a table to see if it is compatible
compatible_with() with it.
rli Pointer to relay log info
table Pointer to table to compare with.
DESCRIPTION
Compare the definition with a table to see if it is compatible
with it. A table definition is compatible with a table if:
A table definition is compatible with a table if:
- the columns types of the table definition is a (not - the columns types of the table definition is a (not
necessarily proper) prefix of the column type of the table, or necessarily proper) prefix of the column type of the table, or
- the other way around - the other way around
RETURN VALUE @param rli Pointer to relay log info
1 if the table definition is not compatible with 'table' @param table Pointer to table to compare with.
0 if the table definition is compatible with 'table'
@retval 1 if the table definition is not compatible with @c table
@retval 0 if the table definition is compatible with @c table
*/ */
int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const; int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const;
...@@ -123,4 +122,14 @@ class table_def ...@@ -123,4 +122,14 @@ class table_def
field_type *m_type; // Array of type descriptors field_type *m_type; // Array of type descriptors
}; };
/**
Extend the normal table list with a few new fields needed by the
slave thread, but nowhere else.
*/
struct RPL_TABLE_LIST
: public st_table_list
{
table_def m_tabledef;
};
#endif /* RPL_UTILITY_H */ #endif /* RPL_UTILITY_H */
...@@ -214,6 +214,15 @@ extern I_List<THD> threads; ...@@ -214,6 +214,15 @@ extern I_List<THD> threads;
#define SLAVE_IO 1 #define SLAVE_IO 1
#define SLAVE_SQL 2 #define SLAVE_SQL 2
/*
Define placement versions of operator new and operator delete since
we cannot be sure that the <new> include exists.
*/
inline void *operator new(size_t, void *ptr) { return ptr; }
inline void *operator new[](size_t, void *ptr) { return ptr; }
inline void operator delete(void*, void*) { /* Do nothing */ }
inline void operator delete[](void*, void*) { /* Do nothing */ }
#endif #endif
...@@ -3097,7 +3097,7 @@ void select_create::send_error(uint errcode,const char *err) ...@@ -3097,7 +3097,7 @@ void select_create::send_error(uint errcode,const char *err)
thd->current_stmt_binlog_row_based ? "is" : "is NOT")); thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
DBUG_PRINT("info", DBUG_PRINT("info",
("Current table (at 0x%lu) %s a temporary (or non-existant) table", ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
table, (void*) table,
table && !table->s->tmp_table ? "is NOT" : "is")); table && !table->s->tmp_table ? "is NOT" : "is"));
DBUG_PRINT("info", DBUG_PRINT("info",
("Table %s prior to executing this statement", ("Table %s prior to executing this statement",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment