Commit 5fa9664b authored by Venkatesh Duggirala's avatar Venkatesh Duggirala

Bug#17632978 SLAVE CRASHES IF ROW EVENT IS CORRUPTED

(MYSQLBINLOG -V CRASHES WITH THAT BINLOG)

Problem: If slave receives a corrupted row event,
slave server is crashing.

Analysis: When slave is unpacking the row event, it is
not validating the data before applying the event. If the
data is corrupted for eg: the length of a field is wrong,
it could end up reading wrong data leading to a crash.
A similar problem happens when mysqlbinlog tool is used
against a corrupted binlog using '-v' option. Due to -v
option, the tool tries to print the values of all the
fields. Corrupted field length could lead to a crash.

Fix: Before unpacking the field, a verification
will be made on the length. If it falls into the event
range, only then it will be unpacked. Otherwise,
"ER_SLAVE_CORRUPT_EVENT" error will be thrown.
Incase mysqlbinlog -v case, the field value will not be
printed and the processing of the file will be stopped.

sql/field.h:
  Removed a function which is not required anymore
sql/log_event.cc:
  Adding a validation on the field length before
  the tool tries to print the value.
sql/log_event.h:
  Changing unpack_row call according to the new arguments
sql/log_event_old.h:
  Changing unpack_row call according to the new arguments
sql/rpl_record.cc:
  Adding a new argument 'row_end' which tells
  the end position of the complete data in the
  row event. It will be used to do validation
  before doing 'unpack' field.
sql/rpl_record.h:
  Adding a new argument 'row_end' which tells
  the end position of the complete data in the
  row event. It will be used to do validation
  before doing 'unpack' field.
sql/rpl_utility.cc:
  Now calc_field_size() is required for client too.
parent afd24eb6
#ifndef FIELD_INCLUDED #ifndef FIELD_INCLUDED
#define FIELD_INCLUDED #define FIELD_INCLUDED
/* Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved. /* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -1802,18 +1802,6 @@ class Field_blob :public Field_longstr { ...@@ -1802,18 +1802,6 @@ class Field_blob :public Field_longstr {
{ {
store_length(ptr, packlength, number); store_length(ptr, packlength, number);
} }
/**
Return the packed length plus the length of the data.
This is used to determine the size of the data plus the
packed length portion in the row data.
@returns The length in the row plus the size of the data.
*/
uint32 get_packed_size(const uchar *ptr_arg, bool low_byte_first)
{return packlength + get_length(ptr_arg, packlength, low_byte_first);}
inline uint32 get_length(uint row_offset= 0) inline uint32 get_length(uint row_offset= 0)
{ return get_length(ptr+row_offset, this->packlength, table->s->db_low_byte_first); } { return get_length(ptr+row_offset, this->packlength, table->s->db_low_byte_first); }
uint32 get_length(const uchar *ptr, uint packlength, bool low_byte_first); uint32 get_length(const uchar *ptr, uint packlength, bool low_byte_first);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
#include "sql_priv.h" #include "sql_priv.h"
#include "mysqld_error.h"
#else #else
...@@ -1945,6 +1946,14 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td, ...@@ -1945,6 +1946,14 @@ Rows_log_event::print_verbose_one_row(IO_CACHE *file, table_def *td,
else else
{ {
my_b_printf(file, "### @%d=", i + 1); my_b_printf(file, "### @%d=", i + 1);
size_t fsize= td->calc_field_size((uint)i, (uchar*) value);
if (value + fsize > m_rows_end)
{
my_b_printf(file, "***Corrupted replication event was detected."
" Not printing the value***\n");
value+= fsize;
return 0;
}
size_t size= log_event_print_value(file, value, size_t size= log_event_print_value(file, value,
td->type(i), td->field_metadata(i), td->type(i), td->field_metadata(i),
typestr, sizeof(typestr)); typestr, sizeof(typestr));
......
...@@ -3732,12 +3732,8 @@ class Rows_log_event : public Log_event ...@@ -3732,12 +3732,8 @@ class Rows_log_event : public Log_event
DBUG_ASSERT(m_table); DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols, return ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols,
&m_curr_row_end, &m_master_reclength); &m_curr_row_end, &m_master_reclength, m_rows_end);
if (m_curr_row_end > m_rows_end)
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
return result;
} }
#endif #endif
......
...@@ -203,10 +203,8 @@ class Old_rows_log_event : public Log_event ...@@ -203,10 +203,8 @@ class Old_rows_log_event : public Log_event
{ {
DBUG_ASSERT(m_table); DBUG_ASSERT(m_table);
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols, return ::unpack_row(rli, m_table, m_width, m_curr_row, &m_cols,
&m_curr_row_end, &m_master_reclength); &m_curr_row_end, &m_master_reclength, m_rows_end);
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
return result;
} }
#endif #endif
......
/* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. /* Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -169,11 +169,15 @@ pack_row(TABLE *table, MY_BITMAP const* cols, ...@@ -169,11 +169,15 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
@param row_data @param row_data
Packed row data Packed row data
@param cols Pointer to bitset describing columns to fill in @param cols Pointer to bitset describing columns to fill in
@param row_end Pointer to variable that will hold the value of the @param curr_row_end
one-after-end position for the row Pointer to variable that will hold the value of the
one-after-end position for the current row
@param master_reclength @param master_reclength
Pointer to variable that will be set to the length of the Pointer to variable that will be set to the length of the
record on the master side record on the master side
@param row_end
Pointer to variable that will hold the value of the
end position for the data in the row event
@retval 0 No error @retval 0 No error
...@@ -185,7 +189,8 @@ int ...@@ -185,7 +189,8 @@ int
unpack_row(Relay_log_info const *rli, unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt, TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols, uchar const *const row_data, MY_BITMAP const *cols,
uchar const **const row_end, ulong *const master_reclength) uchar const **const current_row_end, ulong *const master_reclength,
uchar const *const row_end)
{ {
DBUG_ENTER("unpack_row"); DBUG_ENTER("unpack_row");
DBUG_ASSERT(row_data); DBUG_ASSERT(row_data);
...@@ -303,6 +308,13 @@ unpack_row(Relay_log_info const *rli, ...@@ -303,6 +308,13 @@ unpack_row(Relay_log_info const *rli,
#ifndef DBUG_OFF #ifndef DBUG_OFF
uchar const *const old_pack_ptr= pack_ptr; uchar const *const old_pack_ptr= pack_ptr;
#endif #endif
uint32 len= tabledef->calc_field_size(i, (uchar *) pack_ptr);
if ( pack_ptr + len > row_end )
{
pack_ptr+= len;
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
DBUG_RETURN(ER_SLAVE_CORRUPT_EVENT);
}
pack_ptr= f->unpack(f->ptr, pack_ptr, metadata, TRUE); pack_ptr= f->unpack(f->ptr, pack_ptr, metadata, TRUE);
DBUG_PRINT("debug", ("field: %s; metadata: 0x%x;" DBUG_PRINT("debug", ("field: %s; metadata: 0x%x;"
" pack_ptr: 0x%lx; pack_ptr': 0x%lx; bytes: %d", " pack_ptr: 0x%lx; pack_ptr': 0x%lx; bytes: %d",
...@@ -369,6 +381,11 @@ unpack_row(Relay_log_info const *rli, ...@@ -369,6 +381,11 @@ unpack_row(Relay_log_info const *rli,
uint32 len= tabledef->calc_field_size(i, (uchar *) pack_ptr); uint32 len= tabledef->calc_field_size(i, (uchar *) pack_ptr);
DBUG_DUMP("field_data", pack_ptr, len); DBUG_DUMP("field_data", pack_ptr, len);
pack_ptr+= len; pack_ptr+= len;
if ( pack_ptr > row_end )
{
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
DBUG_RETURN(ER_SLAVE_CORRUPT_EVENT);
}
} }
null_mask <<= 1; null_mask <<= 1;
} }
...@@ -382,7 +399,7 @@ unpack_row(Relay_log_info const *rli, ...@@ -382,7 +399,7 @@ unpack_row(Relay_log_info const *rli,
DBUG_DUMP("row_data", row_data, pack_ptr - row_data); DBUG_DUMP("row_data", row_data, pack_ptr - row_data);
*row_end = pack_ptr; *current_row_end = pack_ptr;
if (master_reclength) if (master_reclength)
{ {
if (*field_ptr) if (*field_ptr)
......
/* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. /* Copyright (c) 2007, 2013, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -32,7 +32,8 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols, ...@@ -32,7 +32,8 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols,
int unpack_row(Relay_log_info const *rli, int unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt, TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols, uchar const *const row_data, MY_BITMAP const *cols,
uchar const **const row_end, ulong *const master_reclength); uchar const **const curr_row_end, ulong *const master_reclength,
uchar const *const row_end);
// Fill table's record[0] with default values. // Fill table's record[0] with default values.
int prepare_record(TABLE *const table, const uint skip, const bool check); int prepare_record(TABLE *const table, const uint skip, const bool check);
......
...@@ -186,7 +186,7 @@ int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata) ...@@ -186,7 +186,7 @@ int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
DBUG_PRINT("result", ("%d", result)); DBUG_PRINT("result", ("%d", result));
DBUG_RETURN(result); DBUG_RETURN(result);
} }
#endif //MYSQL_CLIENT
/********************************************************************* /*********************************************************************
* table_def member definitions * * table_def member definitions *
*********************************************************************/ *********************************************************************/
...@@ -285,7 +285,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const ...@@ -285,7 +285,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const
case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VARCHAR:
{ {
length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length() length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
DBUG_ASSERT(uint2korr(master_data) > 0);
length+= length == 1 ? (uint32) *master_data : uint2korr(master_data); length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
break; break;
} }
...@@ -295,17 +294,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const ...@@ -295,17 +294,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const
case MYSQL_TYPE_BLOB: case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_GEOMETRY: case MYSQL_TYPE_GEOMETRY:
{ {
#if 1
/*
BUG#29549:
This is currently broken for NDB, which is using big-endian
order when packing length of BLOB. Once they have decided how to
fix the issue, we can enable the code below to make sure to
always read the length in little-endian order.
*/
Field_blob fb(m_field_metadata[col]);
length= fb.get_packed_size(master_data, TRUE);
#else
/* /*
Compute the length of the data. We cannot use get_length() here Compute the length of the data. We cannot use get_length() here
since it is dependent on the specific table (and also checks the since it is dependent on the specific table (and also checks the
...@@ -331,7 +319,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const ...@@ -331,7 +319,6 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const
} }
length+= m_field_metadata[col]; length+= m_field_metadata[col];
#endif
break; break;
} }
default: default:
...@@ -340,7 +327,7 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const ...@@ -340,7 +327,7 @@ uint32 table_def::calc_field_size(uint col, uchar *master_data) const
return length; return length;
} }
#ifndef MYSQL_CLIENT
/** /**
*/ */
void show_sql_type(enum_field_types type, uint16 metadata, String *str, CHARSET_INFO *field_cs) void show_sql_type(enum_field_types type, uint16 metadata, String *str, CHARSET_INFO *field_cs)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment