Commit 465960c2 authored by unknown's avatar unknown

ndb - wl#2972 rbr blobs: write blob data to binlog


mysql-test/t/disabled.def:
  rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster.cc:
  rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster.h:
  rbr blobs: write data + dict cache workarounds
sql/ha_ndbcluster_binlog.cc:
  rbr blobs: write data + dict cache workarounds
storage/ndb/include/ndbapi/NdbDictionary.hpp:
  rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbBlob.cpp:
  rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionary.cpp:
  rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  rbr blobs: write data + dict cache workarounds
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  rbr blobs: write data + dict cache workarounds
parent 97c6ff7b
...@@ -28,3 +28,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog ...@@ -28,3 +28,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog
ndb_autodiscover2 : Needs to be fixed w.r.t binlog ndb_autodiscover2 : Needs to be fixed w.r.t binlog
system_mysql_db : Needs fixing system_mysql_db : Needs fixing
system_mysql_db_fix : Needs fixing system_mysql_db_fix : Needs fixing
#ndb_alter_table_row : sometimes wrong error 1015!=1046
ndb_gis : garbled msgs from corrupt THD* + partitioning problem
# vim: set filetype=conf:
...@@ -35,6 +35,11 @@ ...@@ -35,6 +35,11 @@
#include "ha_ndbcluster_binlog.h" #include "ha_ndbcluster_binlog.h"
#ifdef ndb_dynamite
#undef assert
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
// options from from mysqld.cc // options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection; extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring; extern const char *opt_ndbcluster_connectstring;
...@@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) ...@@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
if (ndb_blob->blobsNextBlob() != NULL) if (ndb_blob->blobsNextBlob() != NULL)
DBUG_RETURN(0); DBUG_RETURN(0);
ha_ndbcluster *ha= (ha_ndbcluster *)arg; ha_ndbcluster *ha= (ha_ndbcluster *)arg;
DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); int ret= get_ndb_blobs_value(ha->table, ha->m_value,
ha->m_blobs_buffer, ha->m_blobs_buffer_size,
0);
DBUG_RETURN(ret);
} }
int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) /*
This routine is shared by injector. There is no common blobs buffer
so the buffer and length are passed by reference. Injector also
passes a record pointer diff.
*/
int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
byte*& buffer, uint& buffer_size,
my_ptrdiff_t ptrdiff)
{ {
DBUG_ENTER("get_ndb_blobs_value"); DBUG_ENTER("get_ndb_blobs_value");
...@@ -803,14 +818,17 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) ...@@ -803,14 +818,17 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
for (int loop= 0; loop <= 1; loop++) for (int loop= 0; loop <= 1; loop++)
{ {
uint32 offset= 0; uint32 offset= 0;
for (uint i= 0; i < table_share->fields; i++) for (uint i= 0; i < table->s->fields; i++)
{ {
Field *field= table->field[i]; Field *field= table->field[i];
NdbValue value= m_value[i]; NdbValue value= value_array[i];
if (value.ptr != NULL && (field->flags & BLOB_FLAG)) if (value.ptr != NULL && (field->flags & BLOB_FLAG))
{ {
Field_blob *field_blob= (Field_blob *)field; Field_blob *field_blob= (Field_blob *)field;
NdbBlob *ndb_blob= value.blob; NdbBlob *ndb_blob= value.blob;
int isNull;
ndb_blob->getDefined(isNull);
if (isNull == 0) { // XXX -1 should be allowed only for events
Uint64 blob_len= 0; Uint64 blob_len= 0;
if (ndb_blob->getLength(blob_len) != 0) if (ndb_blob->getLength(blob_len) != 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -820,27 +838,31 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) ...@@ -820,27 +838,31 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
blob_size+= 8 - blob_size % 8; blob_size+= 8 - blob_size % 8;
if (loop == 1) if (loop == 1)
{ {
char *buf= m_blobs_buffer + offset; char *buf= buffer + offset;
uint32 len= 0xffffffff; // Max uint32 uint32 len= 0xffffffff; // Max uint32
DBUG_PRINT("value", ("read blob ptr=%lx len=%u", DBUG_PRINT("info", ("read blob ptr=%p len=%u",
buf, (uint) blob_len)); buf, (uint) blob_len));
if (ndb_blob->readData(buf, len) != 0) if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len); DBUG_ASSERT(len == blob_len);
// Ugly hack assumes only ptr needs to be changed
field_blob->ptr += ptrdiff;
field_blob->set_ptr(len, buf); field_blob->set_ptr(len, buf);
field_blob->ptr -= ptrdiff;
} }
offset+= blob_size; offset+= blob_size;
} }
} }
if (loop == 0 && offset > m_blobs_buffer_size) }
if (loop == 0 && offset > buffer_size)
{ {
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
m_blobs_buffer_size= 0; buffer_size= 0;
DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); buffer= my_malloc(offset, MYF(MY_WME));
if (m_blobs_buffer == NULL) if (buffer == NULL)
DBUG_RETURN(-1); DBUG_RETURN(-1);
m_blobs_buffer_size= offset; buffer_size= offset;
} }
} }
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -2713,15 +2735,23 @@ void ndb_unpack_record(TABLE *table, NdbValue *value, ...@@ -2713,15 +2735,23 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
else else
{ {
NdbBlob *ndb_blob= (*value).blob; NdbBlob *ndb_blob= (*value).blob;
bool isNull= TRUE; int isNull;
#ifndef DBUG_OFF ndb_blob->getDefined(isNull);
int ret= if (isNull != 0)
#endif {
ndb_blob->getNull(isNull); uint col_no = ndb_blob->getColumn()->getColumnNo();
DBUG_ASSERT(ret == 0); if (isNull == 1)
if (isNull) {
DBUG_PRINT("info",("[%u] NULL", col_no))
field->set_null(row_offset); field->set_null(row_offset);
} }
else
{
DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
bitmap_clear_bit(defined, col_no);
}
}
}
} }
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to) ...@@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
const NDBTAB *orig_tab= (const NDBTAB *) m_table; const NDBTAB *orig_tab= (const NDBTAB *) m_table;
DBUG_ENTER("alter_table_name"); DBUG_ENTER("alter_table_name");
DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to));
NdbDictionary::Table new_tab= *orig_tab; NdbDictionary::Table new_tab= *orig_tab;
new_tab.setName(to); new_tab.setName(to);
......
...@@ -25,6 +25,9 @@ ...@@ -25,6 +25,9 @@
#pragma interface /* gcc class implementation */ #pragma interface /* gcc class implementation */
#endif #endif
/* Blob tables and events are internal to NDB and must never be accessed */
#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include <ndbapi_limits.h> #include <ndbapi_limits.h>
...@@ -78,6 +81,10 @@ typedef struct ndb_index_data { ...@@ -78,6 +81,10 @@ typedef struct ndb_index_data {
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
byte*& buffer, uint& buffer_size,
my_ptrdiff_t ptrdiff);
typedef enum { typedef enum {
NSS_INITIAL= 0, NSS_INITIAL= 0,
NSS_DROPPED, NSS_DROPPED,
...@@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share { ...@@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share {
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */ /* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1 /* table has hidden primary key */ #define NSF_HIDDEN_PK 1 /* table has hidden primary key */
#define NSF_BLOB_FLAG 2 /* table has blob attributes */
#define NSF_NO_BINLOG 4 /* table should not be binlogged */ #define NSF_NO_BINLOG 4 /* table should not be binlogged */
#endif #endif
......
...@@ -23,6 +23,11 @@ ...@@ -23,6 +23,11 @@
#include "slave.h" #include "slave.h"
#include "ha_ndbcluster_binlog.h" #include "ha_ndbcluster_binlog.h"
#ifdef ndb_dynamite
#undef assert
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
/* /*
defines for cluster replication table names defines for cluster replication table names
*/ */
...@@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
DBUG_ASSERT(_table != 0); DBUG_ASSERT(_table != 0);
if (_table->s->primary_key == MAX_KEY) if (_table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK; share->flags|= NSF_HIDDEN_PK;
if (_table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG;
return; return;
} }
while (1) while (1)
...@@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) ...@@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
} }
if (table->s->primary_key == MAX_KEY) if (table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK; share->flags|= NSF_HIDDEN_PK;
if (table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG;
break; break;
} }
} }
...@@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ...@@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
NDB_SHARE *share) NDB_SHARE *share)
{ {
DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_ENTER("ndbcluster_create_binlog_setup");
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
...@@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const char *event_name, NDB_SHARE *share) const char *event_name, NDB_SHARE *share)
{ {
DBUG_ENTER("ndbcluster_create_event"); DBUG_ENTER("ndbcluster_create_event");
DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
ndbtab->getName(), ndbtab->getObjectVersion(),
event_name, share ? share->key : "(nil)"));
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
if (!share) if (!share)
{ {
DBUG_PRINT("info", ("share == NULL")); DBUG_PRINT("info", ("share == NULL"));
...@@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
my_event.addTableEvent(NDBEVENT::TE_ALL); my_event.addTableEvent(NDBEVENT::TE_ALL);
if (share->flags & NSF_HIDDEN_PK) if (share->flags & NSF_HIDDEN_PK)
{ {
/* No primary key, susbscribe for all attributes */ if (share->flags & NSF_BLOB_FLAG)
{
sql_print_error("NDB Binlog: logging of table %s "
"with no PK and blob attributes is not supported",
share->key);
DBUG_RETURN(-1);
}
/* No primary key, subscribe for all attributes */
my_event.setReport(NDBEVENT::ER_ALL); my_event.setReport(NDBEVENT::ER_ALL);
DBUG_PRINT("info", ("subscription all")); DBUG_PRINT("info", ("subscription all"));
} }
...@@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, ...@@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
DBUG_PRINT("info", ("subscription all and subscribe")); DBUG_PRINT("info", ("subscription all and subscribe"));
} }
} }
if (share->flags & NSF_BLOB_FLAG)
my_event.mergeEvents(true);
/* add all columns to the event */ /* add all columns to the event */
int n_cols= ndbtab->getNoOfColumns(); int n_cols= ndbtab->getNoOfColumns();
...@@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/ */
DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_ENTER("ndbcluster_create_event_ops");
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
DBUG_ASSERT(share != 0); DBUG_ASSERT(share != 0);
...@@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
} }
TABLE *table= share->table; TABLE *table= share->table;
if (table)
{
/*
Logging of blob tables is not yet implemented, it would require:
1. setup of events also on the blob attribute tables
2. collect the pieces of the blob into one from an epoch to
provide a full blob to binlog
*/
if (table->s->blob_fields)
{
sql_print_error("NDB Binlog: logging of blob table %s "
"is not supported", share->key);
share->flags|= NSF_NO_BINLOG;
DBUG_RETURN(0);
}
}
int do_schema_share= 0, do_apply_status_share= 0; int do_schema_share= 0, do_apply_status_share= 0;
int retries= 100; int retries= 100;
...@@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ...@@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (share->flags & NSF_BLOB_FLAG)
op->mergeEvents(true); // currently not inherited from event
if (share->flags & NSF_BLOB_FLAG)
{
/*
* Given servers S1 S2, following results in out-of-date
* event->m_tableImpl and column->m_blobTable.
*
* S1: create table t1(a int primary key);
* S2: drop table t1;
* S1: create table t2(a int primary key, b blob);
* S1: alter table t2 add x int;
* S1: alter table t2 drop x;
*
* TODO fix at right place before we get here
*/
ndb->getDictionary()->fix_blob_events(ndbtab, event_name);
}
int n_columns= ndbtab->getNoOfColumns(); int n_columns= ndbtab->getNoOfColumns();
int n_fields= table ? table->s->fields : 0; int n_fields= table ? table->s->fields : 0; // XXX ???
for (int j= 0; j < n_columns; j++) for (int j= 0; j < n_columns; j++)
{ {
const char *col_name= ndbtab->getColumn(j)->getName(); const char *col_name= ndbtab->getColumn(j)->getName();
NdbRecAttr *attr0, *attr1; NdbValue attr0, attr1;
if (j < n_fields) if (j < n_fields)
{ {
Field *f= share->table->field[j]; Field *f= share->table->field[j];
if (is_ndb_compatible_type(f)) if (is_ndb_compatible_type(f))
{ {
DBUG_PRINT("info", ("%s compatible", col_name)); DBUG_PRINT("info", ("%s compatible", col_name));
attr0= op->getValue(col_name, f->ptr); attr0.rec= op->getValue(col_name, f->ptr);
attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) + attr1.rec= op->getPreValue(col_name,
(f->ptr - share->table->record[0]) +
share->table->record[1]); share->table->record[1]);
} }
else else if (! (f->flags & BLOB_FLAG))
{ {
DBUG_PRINT("info", ("%s non compatible", col_name)); DBUG_PRINT("info", ("%s non compatible", col_name));
attr0= op->getValue(col_name); attr0.rec= op->getValue(col_name);
attr1= op->getPreValue(col_name); attr1.rec= op->getPreValue(col_name);
}
else
{
DBUG_PRINT("info", ("%s blob", col_name));
attr0.blob= op->getBlobHandle(col_name);
attr1.blob= op->getPreBlobHandle(col_name);
} }
} }
else else
{ {
DBUG_PRINT("info", ("%s hidden key", col_name)); DBUG_PRINT("info", ("%s hidden key", col_name));
attr0= op->getValue(col_name); attr0.rec= op->getValue(col_name);
attr1= op->getPreValue(col_name); attr1.rec= op->getPreValue(col_name);
} }
share->ndb_value[0][j].rec= attr0; share->ndb_value[0][j].ptr= attr0.ptr;
share->ndb_value[1][j].rec= attr1; share->ndb_value[1][j].ptr= attr1.ptr;
} }
op->setCustomData((void *) share); // set before execute op->setCustomData((void *) share); // set before execute
share->op= op; // assign op in NDB_SHARE share->op= op; // assign op in NDB_SHARE
...@@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
(saves moving data about many times) (saves moving data about many times)
*/ */
/*
for now malloc/free blobs buffer each time
TODO if possible share single permanent buffer with handlers
*/
byte* blobs_buffer[2] = { 0, 0 };
uint blobs_buffer_size[2] = { 0, 0 };
switch(pOp->getEventType()) switch(pOp->getEventType())
{ {
case NDBEVENT::TE_INSERT: case NDBEVENT::TE_INSERT:
row.n_inserts++; row.n_inserts++;
DBUG_PRINT("info", ("INSERT INTO %s", share->key)); DBUG_PRINT("info", ("INSERT INTO %s", share->key));
{ {
if (share->flags & NSF_BLOB_FLAG)
{
my_ptrdiff_t ptrdiff= 0;
int ret= get_ndb_blobs_value(table, share->ndb_value[0],
blobs_buffer[0], blobs_buffer_size[0],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
trans.write_row(::server_id, injector::transaction::table(table, true), trans.write_row(::server_id, injector::transaction::table(table, true),
&b, n_fields, table->record[0]); &b, n_fields, table->record[0]);
...@@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
key key
*/ */
if (share->flags & NSF_BLOB_FLAG)
{
my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
int ret= get_ndb_blobs_value(table, share->ndb_value[n],
blobs_buffer[n], blobs_buffer_size[n],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
print_records(table, table->record[n]); print_records(table, table->record[n]);
trans.delete_row(::server_id, injector::transaction::table(table, true), trans.delete_row(::server_id, injector::transaction::table(table, true),
...@@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
row.n_updates++; row.n_updates++;
DBUG_PRINT("info", ("UPDATE %s", share->key)); DBUG_PRINT("info", ("UPDATE %s", share->key));
{ {
if (share->flags & NSF_BLOB_FLAG)
{
my_ptrdiff_t ptrdiff= 0;
int ret= get_ndb_blobs_value(table, share->ndb_value[0],
blobs_buffer[0], blobs_buffer_size[0],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[0], ndb_unpack_record(table, share->ndb_value[0],
&b, table->record[0]); &b, table->record[0]);
print_records(table, table->record[0]); print_records(table, table->record[0]);
if (table->s->primary_key != MAX_KEY) if (table->s->primary_key != MAX_KEY)
{ {
/* /*
since table has a primary key, we can to a write since table has a primary key, we can do a write
using only after values using only after values
*/ */
trans.write_row(::server_id, injector::transaction::table(table, true), trans.write_row(::server_id, injector::transaction::table(table, true),
...@@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
mysql server cannot handle the ndb hidden key and mysql server cannot handle the ndb hidden key and
therefore needs the before image as well therefore needs the before image as well
*/ */
if (share->flags & NSF_BLOB_FLAG)
{
my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
int ret= get_ndb_blobs_value(table, share->ndb_value[1],
blobs_buffer[1], blobs_buffer_size[1],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
print_records(table, table->record[1]); print_records(table, table->record[1]);
trans.update_row(::server_id, trans.update_row(::server_id,
...@@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
break; break;
} }
if (share->flags & NSF_BLOB_FLAG)
{
my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
}
return 0; return 0;
} }
...@@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Binlog_index_row row; Binlog_index_row row;
while (pOp != NULL) while (pOp != NULL)
{ {
// sometimes get TE_ALTER with invalid table
DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
ndb-> ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
...@@ -2684,6 +2767,7 @@ err: ...@@ -2684,6 +2767,7 @@ err:
DBUG_PRINT("info",("removing all event operations")); DBUG_PRINT("info",("removing all event operations"));
while ((op= ndb->getEventOperation())) while ((op= ndb->getEventOperation()))
{ {
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s", DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName())); op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
......
...@@ -883,6 +883,7 @@ public: ...@@ -883,6 +883,7 @@ public:
private: private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend class NdbDictionaryImpl;
friend class NdbTableImpl; friend class NdbTableImpl;
#endif #endif
class NdbTableImpl & m_impl; class NdbTableImpl & m_impl;
...@@ -1764,6 +1765,7 @@ public: ...@@ -1764,6 +1765,7 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
const Table * getTable(const char * name, void **data) const; const Table * getTable(const char * name, void **data) const;
void set_local_table_data_size(unsigned sz); void set_local_table_data_size(unsigned sz);
void fix_blob_events(const Table* table, const char* ev_name);
#endif #endif
}; };
}; };
......
...@@ -1327,10 +1327,10 @@ NdbBlob::prepareColumn() ...@@ -1327,10 +1327,10 @@ NdbBlob::prepareColumn()
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
if (thePartSize > 0) { if (thePartSize > 0) {
const NdbDictionary::Table* bt = NULL; const NdbTableImpl* bt = NULL;
const NdbDictionary::Column* bc = NULL; const NdbColumnImpl* bc = NULL;
if (theStripeSize == 0 || if (theStripeSize == 0 ||
(bt = theColumn->getBlobTable()) == NULL || (bt = theColumn->m_blobTable) == NULL ||
(bc = bt->getColumn("DATA")) == NULL || (bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType || bc->getType() != partType ||
bc->getLength() != (int)thePartSize) { bc->getLength() != (int)thePartSize) {
......
...@@ -1478,6 +1478,12 @@ NdbDictionary::Dictionary::getNdbError() const { ...@@ -1478,6 +1478,12 @@ NdbDictionary::Dictionary::getNdbError() const {
return m_impl.getNdbError(); return m_impl.getNdbError();
} }
void
NdbDictionary::Dictionary::fix_blob_events(const Table* table, const char* ev_name)
{
m_impl.fix_blob_events(table, ev_name);
}
// printers // printers
NdbOut& NdbOut&
......
...@@ -3398,12 +3398,14 @@ NdbDictionaryImpl::getEvent(const char * eventName) ...@@ -3398,12 +3398,14 @@ NdbDictionaryImpl::getEvent(const char * eventName)
if (ev->m_tableId == info->m_table_impl->m_id && if (ev->m_tableId == info->m_table_impl->m_id &&
ev->m_tableVersion == info->m_table_impl->m_version) ev->m_tableVersion == info->m_table_impl->m_version)
break; break;
DBUG_PRINT("error",("%s: retry=%d: "
"table version mismatch, event: [%u,%u] table: [%u,%u]",
ev->getTableName(), retry,
ev->m_tableId, ev->m_tableVersion,
info->m_table_impl->m_id, info->m_table_impl->m_version));
if (retry) if (retry)
{ {
m_error.code= 241; m_error.code= 241;
DBUG_PRINT("error",("%s: table version mismatch, event: [%u,%u] table: [%u,%u]",
ev->getTableName(), ev->m_tableId, ev->m_tableVersion,
info->m_table_impl->m_id, info->m_table_impl->m_version));
delete ev; delete ev;
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
} }
...@@ -3607,7 +3609,7 @@ NdbDictionaryImpl::dropEvent(const char * eventName) ...@@ -3607,7 +3609,7 @@ NdbDictionaryImpl::dropEvent(const char * eventName)
if (m_error.code != 723 && // no such table if (m_error.code != 723 && // no such table
m_error.code != 241) // invalid table m_error.code != 241) // invalid table
DBUG_RETURN(-1); DBUG_RETURN(-1);
DBUG_PRINT("info", ("no table, drop by name alone")); DBUG_PRINT("info", ("no table err=%d, drop by name alone", m_error.code));
evnt = new NdbEventImpl(); evnt = new NdbEventImpl();
evnt->setName(eventName); evnt->setName(eventName);
} }
...@@ -3644,7 +3646,17 @@ NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt) ...@@ -3644,7 +3646,17 @@ NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
(void)dropEvent(bename); (void)dropEvent(bename);
} }
} else { } else {
// could loop over MAX_ATTRIBUTES_IN_TABLE ... // loop over MAX_ATTRIBUTES_IN_TABLE ...
Uint32 i;
for (i = 0; i < MAX_ATTRIBUTES_IN_TABLE; i++) {
char bename[MAX_TAB_NAME_SIZE];
// XXX should get name from NdbBlob
sprintf(bename, "NDB$BLOBEVENT_%s_%u", evnt.getName(), i);
NdbEventImpl* bevnt = new NdbEventImpl();
bevnt->setName(bename);
(void)m_receiver.dropEvent(*bevnt);
delete bevnt;
}
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -4631,6 +4643,30 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst, ...@@ -4631,6 +4643,30 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
return 0; return 0;
} }
// XXX temp
void
NdbDictionaryImpl::fix_blob_events(const NdbDictionary::Table* table, const char* ev_name)
{
const NdbTableImpl& t = table->m_impl;
const NdbEventImpl* ev = getEvent(ev_name);
assert(ev != NULL && ev->m_tableImpl == &t);
Uint32 i;
for (i = 0; i < t.m_columns.size(); i++) {
assert(t.m_columns[i] != NULL);
const NdbColumnImpl& c = *t.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
char bename[200];
NdbBlob::getBlobEventName(bename, ev, &c);
// following fixes dict cache blob table
NdbEventImpl* bev = getEvent(bename);
if (c.m_blobTable != bev->m_tableImpl) {
// XXX const violation
((NdbColumnImpl*)&c)->m_blobTable = bev->m_tableImpl;
}
}
}
template class Vector<int>; template class Vector<int>;
template class Vector<Uint16>; template class Vector<Uint16>;
template class Vector<Uint32>; template class Vector<Uint32>;
......
...@@ -592,6 +592,9 @@ public: ...@@ -592,6 +592,9 @@ public:
NdbDictInterface m_receiver; NdbDictInterface m_receiver;
Ndb & m_ndb; Ndb & m_ndb;
// XXX temp
void fix_blob_events(const NdbDictionary::Table* table, const char* ev_name);
private: private:
NdbIndexImpl * getIndexImpl(const char * name, NdbIndexImpl * getIndexImpl(const char * name,
const BaseString& internalName); const BaseString& internalName);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment