Commit 8642812c authored by Mats Kindahl's avatar Mats Kindahl

Bug #52131: SET and ENUM stored endian-dependent in binary log

Replication SET and ENUM fields from a big-endian to a little-
endian machine (or the opposite) that are represented using
more than 1 byte (SET fields with more than 8 members or ENUM
fields with more than 256 constants) will fail to replicate
correctly when using row-based replication.

The reason is that there are no pack() or unpack() functions
for Field_set or Field_enum, which make them rely on Field::pack
and Field::unpack. These functions pack data as strings, but
since Field_set and Field_enum use integral types for
representation, the fields are stored incorrectly on big-endian
machines.

This patch adds Field_enum::pack and Field_enum::unpack
functions that store the integral value correctly in the binary
log even on big-endian machines. Since Field_set inherits from
Field_enum, it will use the same functions for packing and
unpacking the field.

sql/field.cc:
  Removing some obsolete debug printouts and adding Field_enum::pack
  and Field_enum::unpack functions.
sql/field.h:
  Adding helper functions for packing and unpacking 16- and
  24-bit integral types.
  
  Field_short::pack and Field_short::unpack now use these functions.
sql/rpl_record.cc:
  Removing some obsolete debug printouts and adding some
  more useful ones.
parent 4cce72a9
...@@ -7725,12 +7725,6 @@ void Field_blob::sql_type(String &res) const ...@@ -7725,12 +7725,6 @@ void Field_blob::sql_type(String &res) const
uchar *Field_blob::pack(uchar *to, const uchar *from, uchar *Field_blob::pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first) uint max_length, bool low_byte_first)
{ {
DBUG_ENTER("Field_blob::pack");
DBUG_PRINT("enter", ("to: 0x%lx; from: 0x%lx;"
" max_length: %u; low_byte_first: %d",
(ulong) to, (ulong) from,
max_length, low_byte_first));
DBUG_DUMP("record", from, table->s->reclength);
uchar *save= ptr; uchar *save= ptr;
ptr= (uchar*) from; ptr= (uchar*) from;
uint32 length=get_length(); // Length of from string uint32 length=get_length(); // Length of from string
...@@ -7751,8 +7745,7 @@ uchar *Field_blob::pack(uchar *to, const uchar *from, ...@@ -7751,8 +7745,7 @@ uchar *Field_blob::pack(uchar *to, const uchar *from,
memcpy(to+packlength, from,length); memcpy(to+packlength, from,length);
} }
ptr=save; // Restore org row pointer ptr=save; // Restore org row pointer
DBUG_DUMP("packed", to, packlength + length); return to+packlength+length;
DBUG_RETURN(to+packlength+length);
} }
...@@ -8396,6 +8389,50 @@ uint Field_enum::is_equal(Create_field *new_field) ...@@ -8396,6 +8389,50 @@ uint Field_enum::is_equal(Create_field *new_field)
} }
uchar *Field_enum::pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first)
{
DBUG_ENTER("Field_enum::pack");
DBUG_PRINT("debug", ("packlength: %d", packlength));
DBUG_DUMP("from", from, packlength);
switch (packlength)
{
case 1:
*to = *from;
DBUG_RETURN(to + 1);
case 2: DBUG_RETURN(pack_int16(to, from, low_byte_first));
case 3: DBUG_RETURN(pack_int24(to, from, low_byte_first));
case 4: DBUG_RETURN(pack_int32(to, from, low_byte_first));
case 8: DBUG_RETURN(pack_int64(to, from, low_byte_first));
default:
DBUG_ASSERT(0);
}
}
const uchar *Field_enum::unpack(uchar *to, const uchar *from,
uint param_data, bool low_byte_first)
{
DBUG_ENTER("Field_enum::unpack");
DBUG_PRINT("debug", ("packlength: %d", packlength));
DBUG_DUMP("from", from, packlength);
switch (packlength)
{
case 1:
*to = *from;
DBUG_RETURN(from + 1);
case 2: DBUG_RETURN(unpack_int16(to, from, low_byte_first));
case 3: DBUG_RETURN(unpack_int24(to, from, low_byte_first));
case 4: DBUG_RETURN(unpack_int32(to, from, low_byte_first));
case 8: DBUG_RETURN(unpack_int64(to, from, low_byte_first));
default:
DBUG_ASSERT(0);
}
}
/** /**
@return @return
returns 1 if the fields are equally defined returns 1 if the fields are equally defined
......
...@@ -554,6 +554,48 @@ class Field ...@@ -554,6 +554,48 @@ class Field
{ return 0; } { return 0; }
protected: protected:
static void handle_int16(uchar *to, const uchar *from,
bool low_byte_first_from, bool low_byte_first_to)
{
int16 val;
#ifdef WORDS_BIGENDIAN
if (low_byte_first_from)
val = sint2korr(from);
else
#endif
shortget(val, from);
#ifdef WORDS_BIGENDIAN
if (low_byte_first_to)
int2store(to, val);
else
#endif
shortstore(to, val);
}
static void handle_int24(uchar *to, const uchar *from,
bool low_byte_first_from, bool low_byte_first_to)
{
int32 val;
#ifdef WORDS_BIGENDIAN
if (low_byte_first_from)
val = sint3korr(from);
else
#endif
val= (from[0] << 16) + (from[1] << 8) + from[2];
#ifdef WORDS_BIGENDIAN
if (low_byte_first_to)
int2store(to, val);
else
#endif
{
to[0]= 0xFF & (val >> 16);
to[1]= 0xFF & (val >> 8);
to[2]= 0xFF & val;
}
}
/* /*
Helper function to pack()/unpack() int32 values Helper function to pack()/unpack() int32 values
*/ */
...@@ -598,6 +640,32 @@ class Field ...@@ -598,6 +640,32 @@ class Field
longlongstore(to, val); longlongstore(to, val);
} }
uchar *pack_int16(uchar *to, const uchar *from, bool low_byte_first_to)
{
handle_int16(to, from, table->s->db_low_byte_first, low_byte_first_to);
return to + sizeof(int16);
}
const uchar *unpack_int16(uchar* to, const uchar *from,
bool low_byte_first_from)
{
handle_int16(to, from, low_byte_first_from, table->s->db_low_byte_first);
return from + sizeof(int16);
}
uchar *pack_int24(uchar *to, const uchar *from, bool low_byte_first_to)
{
handle_int24(to, from, table->s->db_low_byte_first, low_byte_first_to);
return to + 3;
}
const uchar *unpack_int24(uchar* to, const uchar *from,
bool low_byte_first_from)
{
handle_int24(to, from, low_byte_first_from, table->s->db_low_byte_first);
return from + 3;
}
uchar *pack_int32(uchar *to, const uchar *from, bool low_byte_first_to) uchar *pack_int32(uchar *to, const uchar *from, bool low_byte_first_to)
{ {
handle_int32(to, from, table->s->db_low_byte_first, low_byte_first_to); handle_int32(to, from, table->s->db_low_byte_first, low_byte_first_to);
...@@ -918,41 +986,13 @@ class Field_short :public Field_num { ...@@ -918,41 +986,13 @@ class Field_short :public Field_num {
virtual uchar *pack(uchar* to, const uchar *from, virtual uchar *pack(uchar* to, const uchar *from,
uint max_length, bool low_byte_first) uint max_length, bool low_byte_first)
{ {
int16 val; return pack_int16(to, from, low_byte_first);
#ifdef WORDS_BIGENDIAN
if (table->s->db_low_byte_first)
val = sint2korr(from);
else
#endif
shortget(val, from);
#ifdef WORDS_BIGENDIAN
if (low_byte_first)
int2store(to, val);
else
#endif
shortstore(to, val);
return to + sizeof(val);
} }
virtual const uchar *unpack(uchar* to, const uchar *from, virtual const uchar *unpack(uchar* to, const uchar *from,
uint param_data, bool low_byte_first) uint param_data, bool low_byte_first)
{ {
int16 val; return unpack_int16(to, from, low_byte_first);
#ifdef WORDS_BIGENDIAN
if (low_byte_first)
val = sint2korr(from);
else
#endif
shortget(val, from);
#ifdef WORDS_BIGENDIAN
if (table->s->db_low_byte_first)
int2store(to, val);
else
#endif
shortstore(to, val);
return from + sizeof(val);
} }
}; };
...@@ -1895,6 +1935,12 @@ class Field_enum :public Field_str { ...@@ -1895,6 +1935,12 @@ class Field_enum :public Field_str {
bool has_charset(void) const { return TRUE; } bool has_charset(void) const { return TRUE; }
/* enum and set are sorted as integers */ /* enum and set are sorted as integers */
CHARSET_INFO *sort_charset(void) const { return &my_charset_bin; } CHARSET_INFO *sort_charset(void) const { return &my_charset_bin; }
virtual uchar *pack(uchar *to, const uchar *from,
uint max_length, bool low_byte_first);
virtual const uchar *unpack(uchar *to, const uchar *from,
uint param_data, bool low_byte_first);
private: private:
int do_save_field_metadata(uchar *first_byte); int do_save_field_metadata(uchar *first_byte);
uint is_equal(Create_field *new_field); uint is_equal(Create_field *new_field);
......
...@@ -78,8 +78,6 @@ pack_row(TABLE *table, MY_BITMAP const* cols, ...@@ -78,8 +78,6 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
unsigned int null_mask= 1U; unsigned int null_mask= 1U;
for ( ; (field= *p_field) ; p_field++) for ( ; (field= *p_field) ; p_field++)
{ {
DBUG_PRINT("debug", ("null_mask=%d; null_ptr=%p; row_data=%p; null_byte_count=%d",
null_mask, null_ptr, row_data, null_byte_count));
if (bitmap_is_set(cols, p_field - table->field)) if (bitmap_is_set(cols, p_field - table->field))
{ {
my_ptrdiff_t offset; my_ptrdiff_t offset;
...@@ -110,6 +108,7 @@ pack_row(TABLE *table, MY_BITMAP const* cols, ...@@ -110,6 +108,7 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
field->field_name, field->real_type(), field->field_name, field->real_type(),
(ulong) old_pack_ptr, (ulong) pack_ptr, (ulong) old_pack_ptr, (ulong) pack_ptr,
(int) (pack_ptr - old_pack_ptr))); (int) (pack_ptr - old_pack_ptr)));
DBUG_DUMP("packed_data", old_pack_ptr, pack_ptr - old_pack_ptr);
} }
null_mask <<= 1; null_mask <<= 1;
...@@ -380,8 +379,11 @@ unpack_row(Relay_log_info const *rli, ...@@ -380,8 +379,11 @@ unpack_row(Relay_log_info const *rli,
} }
DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set DBUG_ASSERT(null_mask & 0xFF); // One of the 8 LSB should be set
if (!((null_bits & null_mask) && tabledef->maybe_null(i))) if (!((null_bits & null_mask) && tabledef->maybe_null(i))) {
pack_ptr+= tabledef->calc_field_size(i, (uchar *) pack_ptr); uint32 len= tabledef->calc_field_size(i, (uchar *) pack_ptr);
DBUG_DUMP("field_data", pack_ptr, len);
pack_ptr+= len;
}
null_mask <<= 1; null_mask <<= 1;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment