rpl_utility.cc 6.58 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2006 MySQL AB
2 3 4

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
   the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
unknown's avatar
unknown committed
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15 16

#include "rpl_utility.h"
17
#include "rpl_rli.h"
18

19 20 21 22 23 24 25 26
/*********************************************************************
 *                   table_def member definitions                    *
 *********************************************************************/

/*
  This function returns the field size in raw bytes based on the type
  and the encoded field data from the master's raw data.
*/
27
uint32 table_def::calc_field_size(uint col, uchar *master_data) const
28 29 30
{
  uint32 length;

31
  switch (type(col)) {
32
  case MYSQL_TYPE_NEWDECIMAL:
33
    length= my_decimal_get_binary_size(m_field_metadata[col] >> 8, 
34
                                       m_field_metadata[col] & 0xff);
35 36 37 38 39 40
    break;
  case MYSQL_TYPE_DECIMAL:
  case MYSQL_TYPE_FLOAT:
  case MYSQL_TYPE_DOUBLE:
    length= m_field_metadata[col];
    break;
41 42 43 44 45
  /*
    The cases for SET and ENUM are include for completeness, however
    both are mapped to type MYSQL_TYPE_STRING and their real types
    are encoded in the field metadata.
  */
46 47 48 49
  case MYSQL_TYPE_SET:
  case MYSQL_TYPE_ENUM:
  case MYSQL_TYPE_STRING:
  {
50 51
    uchar type= m_field_metadata[col] >> 8U;
    if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
52 53 54
      length= m_field_metadata[col] & 0x00ff;
    else
    {
55 56 57 58 59 60 61
      /*
        We are reading the actual size from the master_data record
        because this field has the actual lengh stored in the first
        byte.
      */
      length= (uint) *master_data + 1;
      DBUG_ASSERT(length != 0);
62
    }
63
    break;
64
  }
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  case MYSQL_TYPE_YEAR:
  case MYSQL_TYPE_TINY:
    length= 1;
    break;
  case MYSQL_TYPE_SHORT:
    length= 2;
    break;
  case MYSQL_TYPE_INT24:
    length= 3;
    break;
  case MYSQL_TYPE_LONG:
    length= 4;
    break;
#ifdef HAVE_LONG_LONG
  case MYSQL_TYPE_LONGLONG:
    length= 8;
    break;
#endif
  case MYSQL_TYPE_NULL:
    length= 0;
    break;
  case MYSQL_TYPE_NEWDATE:
    length= 3;
    break;
  case MYSQL_TYPE_DATE:
  case MYSQL_TYPE_TIME:
    length= 3;
    break;
  case MYSQL_TYPE_TIMESTAMP:
    length= 4;
    break;
  case MYSQL_TYPE_DATETIME:
    length= 8;
    break;
  case MYSQL_TYPE_BIT:
100
  {
101 102 103 104 105 106 107
    /*
      Decode the size of the bit field from the master.
        from_len is the length in bytes from the master
        from_bit_len is the number of extra bits stored in the master record
      If from_bit_len is not 0, add 1 to the length to account for accurate
      number of bytes needed.
    */
108 109
    uint from_len= (m_field_metadata[col] >> 8U) & 0x00ff;
    uint from_bit_len= m_field_metadata[col] & 0x00ff;
110
    DBUG_ASSERT(from_bit_len <= 7);
111
    length= from_len + ((from_bit_len > 0) ? 1 : 0);
112
    break;
113
  }
114
  case MYSQL_TYPE_VARCHAR:
115
  {
116
    length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
117
    DBUG_ASSERT(uint2korr(master_data) > 0);
118
    length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
119
    break;
120
  }
121 122 123 124 125
  case MYSQL_TYPE_TINY_BLOB:
  case MYSQL_TYPE_MEDIUM_BLOB:
  case MYSQL_TYPE_LONG_BLOB:
  case MYSQL_TYPE_BLOB:
  case MYSQL_TYPE_GEOMETRY:
126
  {
127 128 129 130 131 132 133 134
#if 1
    /*
      BUG#29549: 
      This is currently broken for NDB, which is using big-endian
      order when packing length of BLOB. Once they have decided how to
      fix the issue, we can enable the code below to make sure to
      always read the length in little-endian order.
    */
135
    Field_blob fb(m_field_metadata[col]);
136 137 138 139 140 141 142 143 144 145 146 147 148
    length= fb.get_packed_size(master_data, TRUE);
#else
    /*
      Compute the length of the data. We cannot use get_length() here
      since it is dependent on the specific table (and also checks the
      packlength using the internal 'table' pointer) and replication
      is using a fixed format for storing data in the binlog.
    */
    switch (m_field_metadata[col]) {
    case 1:
      length= *master_data;
      break;
    case 2:
149
      length= uint2korr(master_data);
150 151 152 153 154 155 156 157 158 159 160 161 162 163
      break;
    case 3:
      length= uint3korr(master_data);
      break;
    case 4:
      length= uint4korr(master_data);
      break;
    default:
      DBUG_ASSERT(0);		// Should not come here
      break;
    }

    length+= m_field_metadata[col];
#endif
164 165
    break;
  }
166 167 168
  default:
    length= -1;
  }
unknown's avatar
unknown committed
169
  return length;
170 171 172 173 174 175 176
}

/*
  Is the definition compatible with a table?

*/
int
177
table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table)
178 179 180 181 182 183 184
  const
{
  /*
    We only check the initial columns for the tables.
  */
  uint const cols_to_check= min(table->s->fields, size());
  int error= 0;
185
  Relay_log_info const *rli= const_cast<Relay_log_info*>(rli_arg);
186 187 188 189 190 191 192 193 194 195

  TABLE_SHARE const *const tsh= table->s;

  for (uint col= 0 ; col < cols_to_check ; ++col)
  {
    if (table->field[col]->type() != type(col))
    {
      DBUG_ASSERT(col < size() && col < tsh->fields);
      DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
      error= 1;
196 197 198 199 200 201 202
      char buf[256];
      my_snprintf(buf, sizeof(buf), "Column %d type mismatch - "
                  "received type %d, %s.%s has type %d",
                  col, type(col), tsh->db.str, tsh->table_name.str,
                  table->field[col]->type());
      rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
                  ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
203
    }
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    /*
      Check the slave's field size against that of the master.
    */
    if (!error && 
        !table->field[col]->compatible_field_size(field_metadata(col)))
    {
      error= 1;
      char buf[256];
      my_snprintf(buf, sizeof(buf), "Column %d size mismatch - "
                  "master has size %d, %s.%s on slave has size %d."
                  " Master's column size should be <= the slave's "
                  "column size.", col,
                  table->field[col]->pack_length_from_metadata(
                                       m_field_metadata[col]),
                  tsh->db.str, tsh->table_name.str, 
                  table->field[col]->row_pack_length());
      rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
                  ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
    }
223 224 225 226
  }

  return error;
}