Commit ec171a94 authored by Alexander Barkov's avatar Alexander Barkov

MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was...

MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was corrupt, leading to illegal data being read

This patch changes the way how INET6 is packed to the RBR binary log:
- from fixed length 16 bytes
- to BINARY(16) compatible variable length style
  with trailing 0x00 byte compression.

This is to make INET6 fully compatible with BINARY(16) in RBR binary logs,
so RBR replication works in this scenarios:

- Old master BINARY(16) -> New slave INET6
- New master INET6      -> Old slave BINARY(16)

A new class StringPack was added to share the code between
Field_string and Field_inet6.
parent 9a833dc6
...@@ -114,6 +114,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ...@@ -114,6 +114,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/compat56.cc ../sql/compat56.cc
../sql/sql_type.cc ../sql/sql_type.h ../sql/sql_type.cc ../sql/sql_type.h
../sql/sql_mode.cc ../sql/sql_mode.cc
../sql/sql_type_string.cc
../sql/sql_type_json.cc ../sql/sql_type_json.cc
../sql/sql_type_geom.cc ../sql/sql_type_geom.cc
../sql/table_cache.cc ../sql/mf_iocache_encr.cc ../sql/table_cache.cc ../sql/mf_iocache_encr.cc
......
include/master-slave.inc
[connection master]
#
# Start of 10.5 tests
#
#
# MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was corrupt, leading to illegal data being read
#
CREATE TABLE t1 (a BINARY(16));
connection slave;
ALTER TABLE t1 MODIFY a INET6;
connection master;
INSERT INTO t1 VALUES (INET6_ATON('::'));
INSERT INTO t1 VALUES (INET6_ATON('::192.168.0.1'));
INSERT INTO t1 VALUES (INET6_ATON('ffff::'));
INSERT INTO t1 VALUES (INET6_ATON('ffff::192.168.0.1'));
SELECT INET6_NTOA(a) FROM t1 ORDER BY a;
INET6_NTOA(a)
::
::192.168.0.1
ffff::
ffff::c0a8:1
connection slave;
SELECT * FROM t1 ORDER BY a;
a
::
::192.168.0.1
ffff::
ffff::c0a8:1
connection master;
DROP TABLE t1;
#
# End of 10.5 tests
#
include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--echo #
--echo # Start of 10.5 tests
--echo #
--echo #
--echo # MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was corrupt, leading to illegal data being read
--echo #
CREATE TABLE t1 (a BINARY(16));
--sync_slave_with_master
ALTER TABLE t1 MODIFY a INET6;
--connection master
INSERT INTO t1 VALUES (INET6_ATON('::'));
INSERT INTO t1 VALUES (INET6_ATON('::192.168.0.1'));
INSERT INTO t1 VALUES (INET6_ATON('ffff::'));
INSERT INTO t1 VALUES (INET6_ATON('ffff::192.168.0.1'));
SELECT INET6_NTOA(a) FROM t1 ORDER BY a;
--sync_slave_with_master
SELECT * FROM t1 ORDER BY a;
--connection master
DROP TABLE t1;
--echo #
--echo # End of 10.5 tests
--echo #
--source include/rpl_end.inc
include/master-slave.inc
[connection master]
#
# Start of 10.5 tests
#
#
# MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was corrupt, leading to illegal data being read
#
CREATE TABLE t1 (a INET6);
connection slave;
ALTER TABLE t1 MODIFY a BINARY(16);
connection master;
INSERT INTO t1 VALUES ('::');
INSERT INTO t1 VALUES ('::192.168.0.1');
INSERT INTO t1 VALUES ('ffff::');
INSERT INTO t1 VALUES ('ffff::192.168.0.1');
SELECT a FROM t1 ORDER BY a;
a
::
::192.168.0.1
ffff::
ffff::c0a8:1
connection slave;
SELECT INET6_NTOA(a) FROM t1 ORDER BY a;
INET6_NTOA(a)
::
::192.168.0.1
ffff::
ffff::c0a8:1
connection master;
DROP TABLE t1;
#
# End of 10.5 tests
#
include/rpl_end.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
--echo #
--echo # Start of 10.5 tests
--echo #
--echo #
--echo # MDEV-20844 RBR from binary(16) to inet6 fails with error 171: The event was corrupt, leading to illegal data being read
--echo #
CREATE TABLE t1 (a INET6);
--sync_slave_with_master
ALTER TABLE t1 MODIFY a BINARY(16);
--connection master
INSERT INTO t1 VALUES ('::');
INSERT INTO t1 VALUES ('::192.168.0.1');
INSERT INTO t1 VALUES ('ffff::');
INSERT INTO t1 VALUES ('ffff::192.168.0.1');
SELECT a FROM t1 ORDER BY a;
--sync_slave_with_master
SELECT INET6_NTOA(a) FROM t1 ORDER BY a;
--connection master
DROP TABLE t1;
--echo #
--echo # End of 10.5 tests
--echo #
--source include/rpl_end.inc
...@@ -970,6 +970,30 @@ class Field_inet6: public Field ...@@ -970,6 +970,30 @@ class Field_inet6: public Field
&my_charset_bin); &my_charset_bin);
} }
uchar *pack(uchar *to, const uchar *from, uint max_length) override
{
DBUG_PRINT("debug", ("Packing field '%s'", field_name.str));
return StringPack(&my_charset_bin, Inet6::binary_length()).
pack(to, from, max_length);
}
const uchar *unpack(uchar *to, const uchar *from, const uchar *from_end,
uint param_data) override
{
return StringPack(&my_charset_bin, Inet6::binary_length()).
unpack(to, from, from_end, param_data);
}
uint max_packed_col_length(uint max_length)
{
return StringPack::max_packed_col_length(max_length);
}
uint packed_col_length(const uchar *data_ptr, uint length)
{
return StringPack::packed_col_length(data_ptr, length);
}
/**********/ /**********/
uint size_of() const override { return sizeof(*this); } uint size_of() const override { return sizeof(*this); }
}; };
......
...@@ -136,6 +136,7 @@ SET (SQL_SOURCE ...@@ -136,6 +136,7 @@ SET (SQL_SOURCE
semisync.cc semisync_master.cc semisync_slave.cc semisync.cc semisync_master.cc semisync_slave.cc
semisync_master_ack_receiver.cc semisync_master_ack_receiver.cc
sql_type.cc sql_mode.cc sql_type_json.cc sql_type.cc sql_mode.cc sql_type_json.cc
sql_type_string.cc
sql_type_geom.cc sql_type_geom.cc
item_windowfunc.cc sql_window.cc item_windowfunc.cc sql_window.cc
sql_cte.cc sql_cte.cc
......
...@@ -7395,39 +7395,8 @@ void Field_string::sql_rpl_type(String *res) const ...@@ -7395,39 +7395,8 @@ void Field_string::sql_rpl_type(String *res) const
uchar *Field_string::pack(uchar *to, const uchar *from, uint max_length) uchar *Field_string::pack(uchar *to, const uchar *from, uint max_length)
{ {
size_t length= MY_MIN(field_length,max_length); DBUG_PRINT("debug", ("Packing field '%s'", field_name.str));
size_t local_char_length= Field_string::char_length(); return StringPack(field_charset(), field_length).pack(to, from, max_length);
DBUG_PRINT("debug", ("Packing field '%s' - length: %zu ", field_name.str,
length));
if (length > local_char_length)
local_char_length= my_charpos(field_charset(), from, from + length,
local_char_length);
set_if_smaller(length, local_char_length);
/*
TODO: change charset interface to add a new function that does
the following or add a flag to lengthsp to do it itself
(this is for not packing padding adding bytes in BINARY
fields).
*/
if (mbmaxlen() == 1)
{
while (length && from[length-1] == field_charset()->pad_char)
length --;
}
else
length= field_charset()->cset->lengthsp(field_charset(),
(const char*) from, length);
// Length always stored little-endian
*to++= (uchar) length;
if (field_length > 255)
*to++= (uchar) (length >> 8);
// Store the actual bytes of the string
memcpy(to, from, length);
return to+length;
} }
...@@ -7454,47 +7423,8 @@ const uchar * ...@@ -7454,47 +7423,8 @@ const uchar *
Field_string::unpack(uchar *to, const uchar *from, const uchar *from_end, Field_string::unpack(uchar *to, const uchar *from, const uchar *from_end,
uint param_data) uint param_data)
{ {
uint from_length, length; return StringPack(field_charset(), field_length).unpack(to, from, from_end,
param_data);
/*
Compute the declared length of the field on the master. This is
used to decide if one or two bytes should be read as length.
*/
if (param_data)
from_length= (((param_data >> 4) & 0x300) ^ 0x300) + (param_data & 0x00ff);
else
from_length= field_length;
DBUG_PRINT("debug",
("param_data: 0x%x, field_length: %u, from_length: %u",
param_data, field_length, from_length));
/*
Compute the actual length of the data by reading one or two bits
(depending on the declared field length on the master).
*/
if (from_length > 255)
{
if (from + 2 > from_end)
return 0;
length= uint2korr(from);
from+= 2;
}
else
{
if (from + 1 > from_end)
return 0;
length= (uint) *from++;
}
if (from + length > from_end || length > field_length)
return 0;
memcpy(to, from, length);
// Pad the string with the pad character of the fields charset
field_charset()->cset->fill(field_charset(),
(char*) to + length,
field_length - length,
field_charset()->pad_char);
return from+length;
} }
...@@ -7552,15 +7482,13 @@ Binlog_type_info Field_string::binlog_type_info() const ...@@ -7552,15 +7482,13 @@ Binlog_type_info Field_string::binlog_type_info() const
uint Field_string::packed_col_length(const uchar *data_ptr, uint length) uint Field_string::packed_col_length(const uchar *data_ptr, uint length)
{ {
if (length > 255) return StringPack::packed_col_length(data_ptr, length);
return uint2korr(data_ptr)+2;
return (uint) *data_ptr + 1;
} }
uint Field_string::max_packed_col_length(uint max_length) uint Field_string::max_packed_col_length(uint max_length)
{ {
return (max_length > 255 ? 2 : 1)+max_length; return StringPack::max_packed_col_length(max_length);
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "sql_array.h" #include "sql_array.h"
#include "sql_const.h" #include "sql_const.h"
#include "sql_time.h" #include "sql_time.h"
#include "sql_type_string.h"
#include "sql_type_real.h" #include "sql_type_real.h"
#include "compat56.h" #include "compat56.h"
C_MODE_START C_MODE_START
......
/*
Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
#include "mariadb.h"
#include "sql_class.h"
#include "sql_type_string.h"
uchar *
StringPack::pack(uchar *to, const uchar *from, uint max_length) const
{
size_t length= MY_MIN(m_octet_length, max_length);
size_t local_char_length= char_length();
DBUG_PRINT("debug", ("length: %zu ", length));
if (length > local_char_length)
local_char_length= my_charpos(charset(), from, from + length,
local_char_length);
set_if_smaller(length, local_char_length);
/*
TODO: change charset interface to add a new function that does
the following or add a flag to lengthsp to do it itself
(this is for not packing padding adding bytes in BINARY
fields).
*/
if (mbmaxlen() == 1)
{
while (length && from[length-1] == charset()->pad_char)
length --;
}
else
length= charset()->cset->lengthsp(charset(), (const char*) from, length);
// Length always stored little-endian
*to++= (uchar) length;
if (m_octet_length > 255)
*to++= (uchar) (length >> 8);
// Store the actual bytes of the string
memcpy(to, from, length);
return to+length;
}
const uchar *
StringPack::unpack(uchar *to, const uchar *from, const uchar *from_end,
uint param_data) const
{
uint from_length, length;
/*
Compute the declared length of the field on the master. This is
used to decide if one or two bytes should be read as length.
*/
if (param_data)
from_length= (((param_data >> 4) & 0x300) ^ 0x300) + (param_data & 0x00ff);
else
from_length= m_octet_length;
DBUG_PRINT("debug",
("param_data: 0x%x, field_length: %u, from_length: %u",
param_data, m_octet_length, from_length));
/*
Compute the actual length of the data by reading one or two bits
(depending on the declared field length on the master).
*/
if (from_length > 255)
{
if (from + 2 > from_end)
return 0;
length= uint2korr(from);
from+= 2;
}
else
{
if (from + 1 > from_end)
return 0;
length= (uint) *from++;
}
if (from + length > from_end || length > m_octet_length)
return 0;
memcpy(to, from, length);
// Pad the string with the pad character of the fields charset
charset()->cset->fill(charset(),
(char*) to + length,
m_octet_length - length,
charset()->pad_char);
return from+length;
}
/* Copyright (c) 2019 MariaDB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef SQL_TYPE_STRING_INCLUDED
#define SQL_TYPE_STRING_INCLUDED
class StringPack
{
CHARSET_INFO *m_cs;
uint32 m_octet_length;
CHARSET_INFO *charset() const { return m_cs; }
uint mbmaxlen() const { return m_cs->mbmaxlen; };
uint32 char_length() const { return m_octet_length / mbmaxlen(); }
public:
StringPack(CHARSET_INFO *cs, uint32 octet_length)
:m_cs(cs),
m_octet_length(octet_length)
{ }
uchar *pack(uchar *to, const uchar *from, uint max_length) const;
const uchar *unpack(uchar *to, const uchar *from, const uchar *from_end,
uint param_data) const;
public:
static uint max_packed_col_length(uint max_length)
{
return (max_length > 255 ? 2 : 1) + max_length;
}
static uint packed_col_length(const uchar *data_ptr, uint length)
{
if (length > 255)
return uint2korr(data_ptr)+2;
return (uint) *data_ptr + 1;
}
};
#endif // SQL_TYPE_STRING_INCLUDED
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment