Commit 27737589 authored by Mats Kindahl's avatar Mats Kindahl

BUG#49618: Field length stored incorrectly in binary log

           for InnoDB
            
The class Field_bit_as_char stores the metadata for the
field incorrecly because bytes_in_rec and bit_len are set
to (field_length + 7 ) / 8 and 0 respectively, while
Field_bit has the correct values field_length / 8 and
field_length % 8.
            
Solved the problem by re-computing the values for the
metadata based on the field_length instead of using the
bytes_in_rec and bit_len variables.
            
To handle compatibility with old server, a table map
flag was added to indicate that the bit computation is
exact. If the flag is clear, the slave computes the
number of bytes required to store the bit field and
compares that instead, effectively allowing replication
*without conversion* from any field length that require
the same number of bytes to store.
parent 76903202
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
**** Resetting master and slave ****
include/stop_slave.inc
RESET SLAVE;
RESET MASTER;
include/start_slave.inc
CREATE TABLE t1(b1 BIT(1), b2 BIT(2), b3 BIT(3)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (b'0', b'01', b'101');
Comparing tables master:test.t1 and slave:test.t1
DROP TABLE t1;
--innodb
\ No newline at end of file
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
--source include/master-slave.inc
#
# BUG#49618: Field length stored incorrectly in binary log for InnoDB
#
source include/reset_master_and_slave.inc;
connection master;
CREATE TABLE t1(b1 BIT(1), b2 BIT(2), b3 BIT(3)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (b'0', b'01', b'101');
sync_slave_with_master;
let $diff_table_1=master:test.t1;
let $diff_table_2=slave:test.t1;
source include/diff_tables.inc;
connection master;
DROP TABLE t1;
sync_slave_with_master;
...@@ -1373,12 +1373,14 @@ bool Field::send_binary(Protocol *protocol) ...@@ -1373,12 +1373,14 @@ bool Field::send_binary(Protocol *protocol)
to the size of this field (the slave or destination). to the size of this field (the slave or destination).
@param field_metadata Encoded size in field metadata @param field_metadata Encoded size in field metadata
@param mflags Flags from the table map event for the table.
@retval 0 if this field's size is < the source field's size @retval 0 if this field's size is < the source field's size
@retval 1 if this field's size is >= the source field's size @retval 1 if this field's size is >= the source field's size
*/ */
int Field::compatible_field_size(uint field_metadata, int Field::compatible_field_size(uint field_metadata,
const Relay_log_info *rli_arg __attribute__((unused))) const Relay_log_info *rli_arg __attribute__((unused)),
uint16 mflags __attribute__((unused)))
{ {
uint const source_size= pack_length_from_metadata(field_metadata); uint const source_size= pack_length_from_metadata(field_metadata);
uint const destination_size= row_pack_length(); uint const destination_size= row_pack_length();
...@@ -2836,7 +2838,8 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata) ...@@ -2836,7 +2838,8 @@ uint Field_new_decimal::pack_length_from_metadata(uint field_metadata)
@retval 1 if this field's size is >= the source field's size @retval 1 if this field's size is >= the source field's size
*/ */
int Field_new_decimal::compatible_field_size(uint field_metadata, int Field_new_decimal::compatible_field_size(uint field_metadata,
const Relay_log_info * __attribute__((unused))) const Relay_log_info * __attribute__((unused)),
uint16 mflags __attribute__((unused)))
{ {
int compatible= 0; int compatible= 0;
uint const source_precision= (field_metadata >> 8U) & 0x00ff; uint const source_precision= (field_metadata >> 8U) & 0x00ff;
...@@ -6612,7 +6615,8 @@ check_field_for_37426(const void *param_arg) ...@@ -6612,7 +6615,8 @@ check_field_for_37426(const void *param_arg)
#endif #endif
int Field_string::compatible_field_size(uint field_metadata, int Field_string::compatible_field_size(uint field_metadata,
const Relay_log_info *rli_arg) const Relay_log_info *rli_arg,
uint16 mflags __attribute__((unused)))
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
const Check_field_param check_param = { this }; const Check_field_param check_param = { this };
...@@ -6620,7 +6624,7 @@ int Field_string::compatible_field_size(uint field_metadata, ...@@ -6620,7 +6624,7 @@ int Field_string::compatible_field_size(uint field_metadata,
check_field_for_37426, &check_param)) check_field_for_37426, &check_param))
return FALSE; // Not compatible field sizes return FALSE; // Not compatible field sizes
#endif #endif
return Field::compatible_field_size(field_metadata, rli_arg); return Field::compatible_field_size(field_metadata, rli_arg, mflags);
} }
...@@ -9172,8 +9176,13 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg) ...@@ -9172,8 +9176,13 @@ uint Field_bit::get_key_image(uchar *buff, uint length, imagetype type_arg)
*/ */
int Field_bit::do_save_field_metadata(uchar *metadata_ptr) int Field_bit::do_save_field_metadata(uchar *metadata_ptr)
{ {
*metadata_ptr= bit_len; /*
*(metadata_ptr + 1)= bytes_in_rec; Since this class and Field_bit_as_char have different ideas of
what should be stored here, we compute the values of the metadata
explicitly using the field_length.
*/
metadata_ptr[0]= field_length % 8;
metadata_ptr[1]= field_length / 8;
return 2; return 2;
} }
...@@ -9213,20 +9222,26 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata) ...@@ -9213,20 +9222,26 @@ uint Field_bit::pack_length_from_metadata(uint field_metadata)
@retval 1 if this field's size is >= the source field's size @retval 1 if this field's size is >= the source field's size
*/ */
int Field_bit::compatible_field_size(uint field_metadata, int Field_bit::compatible_field_size(uint field_metadata,
const Relay_log_info * __attribute__((unused))) const Relay_log_info * __attribute__((unused)),
uint16 mflags)
{ {
int compatible= 0; uint from_bit_len= 8 * (field_metadata >> 8) + (field_metadata & 0xff);
uint const source_size= pack_length_from_metadata(field_metadata); uint to_bit_len= max_display_length();
uint const destination_size= row_pack_length();
uint const from_bit_len= field_metadata & 0x00ff; /*
uint const from_len= (field_metadata >> 8U) & 0x00ff; If the bit length exact flag is clear, we are dealing with an old
if ((bit_len == 0) || (from_bit_len == 0)) master, so we allow some less strict behaviour if replicating by
compatible= (source_size <= destination_size); moving both bit lengths to an even multiple of 8.
else if (from_bit_len > bit_len)
compatible= (from_len < bytes_in_rec); We do this by computing the number of bytes to store the field
else instead, and then compare the result.
compatible= ((from_bit_len <= bit_len) && (from_len <= bytes_in_rec)); */
return (compatible); if (!(mflags & Table_map_log_event::TM_BIT_LEN_EXACT_F)) {
from_bit_len= (from_bit_len + 7) / 8;
to_bit_len= (to_bit_len + 7) / 8;
}
return from_bit_len <= to_bit_len;
} }
......
...@@ -165,7 +165,7 @@ class Field ...@@ -165,7 +165,7 @@ class Field
*/ */
virtual uint32 pack_length_in_rec() const { return pack_length(); } virtual uint32 pack_length_in_rec() const { return pack_length(); }
virtual int compatible_field_size(uint field_metadata, virtual int compatible_field_size(uint field_metadata,
const Relay_log_info *); const Relay_log_info *, uint16 mflags);
virtual uint pack_length_from_metadata(uint field_metadata) virtual uint pack_length_from_metadata(uint field_metadata)
{ return field_metadata; } { return field_metadata; }
/* /*
...@@ -803,7 +803,7 @@ class Field_new_decimal :public Field_num { ...@@ -803,7 +803,7 @@ class Field_new_decimal :public Field_num {
uint pack_length_from_metadata(uint field_metadata); uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length() { return pack_length(); } uint row_pack_length() { return pack_length(); }
int compatible_field_size(uint field_metadata, int compatible_field_size(uint field_metadata,
const Relay_log_info *rli); const Relay_log_info *rli, uint16 mflags);
uint is_equal(Create_field *new_field); uint is_equal(Create_field *new_field);
virtual const uchar *unpack(uchar* to, const uchar *from, virtual const uchar *unpack(uchar* to, const uchar *from,
uint param_data, bool low_byte_first); uint param_data, bool low_byte_first);
...@@ -1498,7 +1498,7 @@ class Field_string :public Field_longstr { ...@@ -1498,7 +1498,7 @@ class Field_string :public Field_longstr {
return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff); return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff);
} }
int compatible_field_size(uint field_metadata, int compatible_field_size(uint field_metadata,
const Relay_log_info *rli); const Relay_log_info *rli, uint16 mflags);
uint row_pack_length() { return (field_length + 1); } uint row_pack_length() { return (field_length + 1); }
int pack_cmp(const uchar *a,const uchar *b,uint key_length, int pack_cmp(const uchar *a,const uchar *b,uint key_length,
my_bool insert_or_update); my_bool insert_or_update);
...@@ -1962,7 +1962,7 @@ class Field_bit :public Field { ...@@ -1962,7 +1962,7 @@ class Field_bit :public Field {
uint row_pack_length() uint row_pack_length()
{ return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); } { return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); }
int compatible_field_size(uint field_metadata, int compatible_field_size(uint field_metadata,
const Relay_log_info *rli); const Relay_log_info *rli, uint16 mflags);
void sql_type(String &str) const; void sql_type(String &str) const;
virtual uchar *pack(uchar *to, const uchar *from, virtual uchar *pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first); uint max_length, bool low_byte_first);
......
...@@ -3847,11 +3847,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) ...@@ -3847,11 +3847,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open()); DBUG_ASSERT(current_stmt_binlog_row_based && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event::flag_set const
flags= Table_map_log_event::TM_NO_FLAGS;
Table_map_log_event Table_map_log_event
the_event(this, table, table->s->table_map_id, is_trans, flags); the_event(this, table, table->s->table_map_id, is_trans);
if (is_trans && binlog_table_maps == 0) if (is_trans && binlog_table_maps == 0)
binlog_start_trans_and_stmt(); binlog_start_trans_and_stmt();
......
...@@ -7837,7 +7837,7 @@ int Table_map_log_event::save_field_metadata() ...@@ -7837,7 +7837,7 @@ int Table_map_log_event::save_field_metadata()
*/ */
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional, uint16 flags) bool is_transactional)
: Log_event(thd, 0, true), : Log_event(thd, 0, true),
m_table(tbl), m_table(tbl),
m_dbnam(tbl->s->db.str), m_dbnam(tbl->s->db.str),
...@@ -7847,7 +7847,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, ...@@ -7847,7 +7847,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
m_colcnt(tbl->s->fields), m_colcnt(tbl->s->fields),
m_memory(NULL), m_memory(NULL),
m_table_id(tid), m_table_id(tid),
m_flags(flags), m_flags(TM_BIT_LEN_EXACT_F),
m_data_size(0), m_data_size(0),
m_field_metadata(0), m_field_metadata(0),
m_field_metadata_size(0), m_field_metadata_size(0),
...@@ -8105,8 +8105,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) ...@@ -8105,8 +8105,10 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
inside Relay_log_info::clear_tables_to_lock() by calling the inside Relay_log_info::clear_tables_to_lock() by calling the
table_def destructor explicitly. table_def destructor explicitly.
*/ */
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt, new (&table_list->m_tabledef)
m_field_metadata, m_field_metadata_size, m_null_bits); table_def(m_coltype, m_colcnt,
m_field_metadata, m_field_metadata_size,
m_null_bits, m_flags);
table_list->m_tabledef_valid= TRUE; table_list->m_tabledef_valid= TRUE;
/* /*
......
...@@ -3283,16 +3283,14 @@ class Table_map_log_event : public Log_event ...@@ -3283,16 +3283,14 @@ class Table_map_log_event : public Log_event
/* Special constants representing sets of flags */ /* Special constants representing sets of flags */
enum enum
{ {
TM_NO_FLAGS = 0U TM_NO_FLAGS = 0U,
TM_BIT_LEN_EXACT_F = (1U << 0)
}; };
void set_flags(flag_set flag) { m_flags |= flag; }
void clear_flags(flag_set flag) { m_flags &= ~flag; }
flag_set get_flags(flag_set flag) const { return m_flags & flag; } flag_set get_flags(flag_set flag) const { return m_flags & flag; }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, bool is_transactional);
bool is_transactional, uint16 flags);
#endif #endif
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
Table_map_log_event(const char *buf, uint event_len, Table_map_log_event(const char *buf, uint event_len,
...@@ -3305,7 +3303,7 @@ class Table_map_log_event : public Log_event ...@@ -3305,7 +3303,7 @@ class Table_map_log_event : public Log_event
table_def *create_table_def() table_def *create_table_def()
{ {
return new table_def(m_coltype, m_colcnt, m_field_metadata, return new table_def(m_coltype, m_colcnt, m_field_metadata,
m_field_metadata_size, m_null_bits); m_field_metadata_size, m_null_bits, m_flags);
} }
ulong get_table_id() const { return m_table_id; } ulong get_table_id() const { return m_table_id; }
const char *get_table_name() const { return m_tblnam; } const char *get_table_name() const { return m_tblnam; }
......
...@@ -206,7 +206,7 @@ table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table) ...@@ -206,7 +206,7 @@ table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table)
Check the slave's field size against that of the master. Check the slave's field size against that of the master.
*/ */
if (!error && if (!error &&
!field->compatible_field_size(field_metadata(col), rli_arg)) !field->compatible_field_size(field_metadata(col), rli_arg, m_flags))
{ {
error= 1; error= 1;
char buf[256]; char buf[256];
......
...@@ -32,12 +32,6 @@ class Relay_log_info; ...@@ -32,12 +32,6 @@ class Relay_log_info;
- Extract and decode table definition data from the table map event - Extract and decode table definition data from the table map event
- Check if table definition in table map is compatible with table - Check if table definition in table map is compatible with table
definition on slave definition on slave
Currently, the only field type data available is an array of the
type operators that are present in the table map event.
@todo Add type operands to this structure to allow detection of
difference between, e.g., BIT(5) and BIT(10).
*/ */
class table_def class table_def
...@@ -59,9 +53,9 @@ class table_def ...@@ -59,9 +53,9 @@ class table_def
@param null_bitmap The bitmap of fields that can be null @param null_bitmap The bitmap of fields that can be null
*/ */
table_def(field_type *types, ulong size, uchar *field_metadata, table_def(field_type *types, ulong size, uchar *field_metadata,
int metadata_size, uchar *null_bitmap) int metadata_size, uchar *null_bitmap, uint16 flags)
: m_size(size), m_type(0), m_field_metadata_size(metadata_size), : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
m_field_metadata(0), m_null_bits(0), m_memory(NULL) m_field_metadata(0), m_null_bits(0), m_flags(flags), m_memory(NULL)
{ {
m_memory= (uchar *)my_multi_malloc(MYF(MY_WME), m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
&m_type, size, &m_type, size,
...@@ -246,6 +240,7 @@ class table_def ...@@ -246,6 +240,7 @@ class table_def
uint m_field_metadata_size; uint m_field_metadata_size;
uint16 *m_field_metadata; uint16 *m_field_metadata;
uchar *m_null_bits; uchar *m_null_bits;
uint16 m_flags; // Table flags
uchar *m_memory; uchar *m_memory;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment