Commit 17cec85f authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - wl#2972 rbr blobs ndb api support

parent cd55dea8
...@@ -28,6 +28,7 @@ class NdbOperation; ...@@ -28,6 +28,7 @@ class NdbOperation;
class NdbRecAttr; class NdbRecAttr;
class NdbTableImpl; class NdbTableImpl;
class NdbColumnImpl; class NdbColumnImpl;
class NdbEventOperationImpl;
/** /**
* @class NdbBlob * @class NdbBlob
...@@ -71,6 +72,10 @@ class NdbColumnImpl; ...@@ -71,6 +72,10 @@ class NdbColumnImpl;
* writes. It avoids execute penalty if nothing is pending. It is not * writes. It avoids execute penalty if nothing is pending. It is not
* needed after execute (obviously) or after next scan result. * needed after execute (obviously) or after next scan result.
* *
* NdbBlob also supports reading post or pre blob data from events. The
* handle can be read after next event on main table has been retrieved.
* The data is available immediately. See NdbEventOperation.
*
* NdbBlob methods return -1 on error and 0 on success, and use output * NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary. * parameters when necessary.
* *
...@@ -145,6 +150,12 @@ public: ...@@ -145,6 +150,12 @@ public:
* then the callback is invoked. * then the callback is invoked.
*/ */
int setActiveHook(ActiveHook* activeHook, void* arg); int setActiveHook(ActiveHook* activeHook, void* arg);
/**
* Check if blob value is defined (NULL or not). Used as first call
* on event based blob. The argument is set to -1 for not defined.
* Unlike getNull() this does not cause error on the handle.
*/
int getDefined(int& isNull);
/** /**
* Check if blob is null. * Check if blob is null.
*/ */
...@@ -191,6 +202,11 @@ public: ...@@ -191,6 +202,11 @@ public:
* Get blob parts table name. Useful only to test programs. * Get blob parts table name. Useful only to test programs.
*/ */
static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName); static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName);
/**
* Get blob event name. The blob event is created if the main event
* monitors the blob column. The name includes main event name.
*/
static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName);
/** /**
* Return error object. The error may be blob specific (below) or may * Return error object. The error may be blob specific (below) or may
* be copied from a failed implicit operation. * be copied from a failed implicit operation.
...@@ -217,17 +233,29 @@ private: ...@@ -217,17 +233,29 @@ private:
friend class NdbScanOperation; friend class NdbScanOperation;
friend class NdbDictionaryImpl; friend class NdbDictionaryImpl;
friend class NdbResultSet; // atNextResult friend class NdbResultSet; // atNextResult
friend class NdbEventBuffer;
friend class NdbEventOperationImpl;
#endif #endif
// state // state
State theState; State theState;
void setState(State newState); void setState(State newState);
// quick and dirty support for events (consider subclassing)
int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event
// define blob table // define blob table
static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c);
static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c);
static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c);
static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c);
// ndb api stuff // ndb api stuff
Ndb* theNdb; Ndb* theNdb;
NdbTransaction* theNdbCon; NdbTransaction* theNdbCon;
NdbOperation* theNdbOp; NdbOperation* theNdbOp;
NdbEventOperationImpl* theEventOp;
NdbEventOperationImpl* theBlobEventOp;
NdbRecAttr* theBlobEventPkRecAttr;
NdbRecAttr* theBlobEventDistRecAttr;
NdbRecAttr* theBlobEventPartRecAttr;
NdbRecAttr* theBlobEventDataRecAttr;
const NdbTableImpl* theTable; const NdbTableImpl* theTable;
const NdbTableImpl* theAccessTable; const NdbTableImpl* theAccessTable;
const NdbTableImpl* theBlobTable; const NdbTableImpl* theBlobTable;
...@@ -263,6 +291,8 @@ private: ...@@ -263,6 +291,8 @@ private:
Buf theHeadInlineBuf; Buf theHeadInlineBuf;
Buf theHeadInlineCopyBuf; // for writeTuple Buf theHeadInlineCopyBuf; // for writeTuple
Buf thePartBuf; Buf thePartBuf;
Buf theBlobEventDataBuf;
Uint32 thePartNumber; // for event
Head* theHead; Head* theHead;
char* theInlineData; char* theInlineData;
NdbRecAttr* theHeadInlineRecAttr; NdbRecAttr* theHeadInlineRecAttr;
...@@ -306,6 +336,8 @@ private: ...@@ -306,6 +336,8 @@ private:
int readDataPrivate(char* buf, Uint32& bytes); int readDataPrivate(char* buf, Uint32& bytes);
int writeDataPrivate(const char* buf, Uint32 bytes); int writeDataPrivate(const char* buf, Uint32 bytes);
int readParts(char* buf, Uint32 part, Uint32 count); int readParts(char* buf, Uint32 part, Uint32 count);
int readTableParts(char* buf, Uint32 part, Uint32 count);
int readEventParts(char* buf, Uint32 part, Uint32 count);
int insertParts(const char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count);
int updateParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count);
int deleteParts(Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count);
...@@ -317,19 +349,23 @@ private: ...@@ -317,19 +349,23 @@ private:
int invokeActiveHook(); int invokeActiveHook();
// blob handle maintenance // blob handle maintenance
int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version);
int prepareColumn();
int preExecute(NdbTransaction::ExecType anExecType, bool& batch); int preExecute(NdbTransaction::ExecType anExecType, bool& batch);
int postExecute(NdbTransaction::ExecType anExecType); int postExecute(NdbTransaction::ExecType anExecType);
int preCommit(); int preCommit();
int atNextResult(); int atNextResult();
int atNextEvent();
// errors // errors
void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(int anErrorCode, bool invalidFlag = true);
void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true);
void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true);
#ifdef VM_TRACE #ifdef VM_TRACE
int getOperationType() const; int getOperationType() const;
friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
#endif #endif
// list stuff
void next(NdbBlob* obj) { theNext= obj;} void next(NdbBlob* obj) { theNext= obj;}
NdbBlob* next() { return theNext;} NdbBlob* next() { return theNext;}
friend struct Ndb_free_list_t<NdbBlob>; friend struct Ndb_free_list_t<NdbBlob>;
......
...@@ -1124,7 +1124,7 @@ public: ...@@ -1124,7 +1124,7 @@ public:
_TE_NODE_FAILURE=10, _TE_NODE_FAILURE=10,
_TE_SUBSCRIBE=11, _TE_SUBSCRIBE=11,
_TE_UNSUBSCRIBE=12, _TE_UNSUBSCRIBE=12,
_TE_NUL=13 // internal (INS o DEL within same GCI) _TE_NUL=13 // internal (e.g. INS o DEL within same GCI)
}; };
#endif #endif
/** /**
...@@ -1261,6 +1261,24 @@ public: ...@@ -1261,6 +1261,24 @@ public:
*/ */
int getNoOfEventColumns() const; int getNoOfEventColumns() const;
/**
* The merge events flag is false by default. Setting it true
* implies that events are merged in following ways:
*
* - for given NdbEventOperation associated with this event,
* events on same PK within same GCI are merged into single event
*
* - a blob table event is created for each blob attribute
* and blob events are handled as part of main table events
*
* - blob post/pre data from the blob part events can be read
* via NdbBlob methods as a single value
*
* NOTE: Currently this flag is not inherited by NdbEventOperation
* and must be set on NdbEventOperation explicitly.
*/
void mergeEvents(bool flag);
/** /**
* Get object status * Get object status
*/ */
......
...@@ -150,6 +150,14 @@ public: ...@@ -150,6 +150,14 @@ public:
*/ */
NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0); NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0);
/**
* These methods replace getValue/getPreValue for blobs. Each
* method creates a blob handle NdbBlob. The handle supports only
* read operations. See NdbBlob.
*/
NdbBlob* getBlobHandle(const char *anAttrName);
NdbBlob* getPreBlobHandle(const char *anAttrName);
int isOverrun() const; int isOverrun() const;
/** /**
......
...@@ -4,7 +4,7 @@ OBJS = ndbapi_event.o ...@@ -4,7 +4,7 @@ OBJS = ndbapi_event.o
CXX = g++ -g CXX = g++ -g
CFLAGS = -c -Wall -fno-rtti -fno-exceptions CFLAGS = -c -Wall -fno-rtti -fno-exceptions
CXXFLAGS = CXXFLAGS =
DEBUG = DEBUG =# -DVM_TRACE
LFLAGS = -Wall LFLAGS = -Wall
TOP_SRCDIR = ../../../.. TOP_SRCDIR = ../../../..
INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include
...@@ -16,8 +16,8 @@ SYS_LIB = ...@@ -16,8 +16,8 @@ SYS_LIB =
$(TARGET): $(OBJS) $(TARGET): $(OBJS)
$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
$(TARGET).o: $(SRCS) $(TARGET).o: $(SRCS) Makefile
$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) $(CXX) $(CFLAGS) $(DEBUG) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
clean: clean:
rm -f *.o $(TARGET) rm -f *.o $(TARGET)
...@@ -54,26 +54,32 @@ ...@@ -54,26 +54,32 @@
#include <stdio.h> #include <stdio.h>
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#ifdef VM_TRACE
#include <my_global.h>
#endif
#ifndef assert
#include <assert.h>
#endif
/** /**
* * Assume that there is a table which is being updated by
* Assume that there is a table t0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables). * another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns c0,c1,c2,c3. * We want to monitor what happens with column values.
* *
* or together with the mysql client; * Or using the mysql client:
* *
* shell> mysql -u root * shell> mysql -u root
* mysql> create database TEST_DB; * mysql> create database TEST_DB;
* mysql> use TEST_DB; * mysql> use TEST_DB;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4), * mysql> create table t0
* (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
* primary key(c0, c2)) engine ndb charset latin1; * primary key(c0, c2)) engine ndb charset latin1;
* *
* In another window start ndbapi_event, wait until properly started * In another window start ndbapi_event, wait until properly started
*
insert into t0 values (1, 2, 'a', 'b'); insert into t0 values (1, 2, 'a', 'b', null);
insert into t0 values (3, 4, 'c', 'd'); insert into t0 values (3, 4, 'c', 'd', null);
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same' update t0 set c3 = 'F'; -- use scan update to 'same'
...@@ -81,7 +87,18 @@ ...@@ -81,7 +87,18 @@
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same' update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0; delete from t0;
*
insert ...; update ...; -- see events w/ same pk merged (if -m option)
delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
update ...; update ...;
-- text requires -m flag
set @a = repeat('a',256); -- inline size
set @b = repeat('b',2000); -- part size
set @c = repeat('c',2000*30); -- 30 parts
-- update the text field using combinations of @a, @b, @c ...
* you should see the data popping up in the example window * you should see the data popping up in the example window
* *
*/ */
...@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb, ...@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb,
const char *eventName, const char *eventName,
const char *eventTableName, const char *eventTableName,
const char **eventColumnName, const char **eventColumnName,
const int noEventColumnName); const int noEventColumnName,
bool merge_events);
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
ndb_init(); ndb_init();
bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0; bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0;
#ifdef VM_TRACE
bool dbug = argc > 1 && strchr(argv[1], 'd') != 0;
if (dbug) DBUG_PUSH("d:t:");
if (dbug) putenv("API_SIGNAL_LOG=-");
#endif
Ndb_cluster_connection *cluster_connection= Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster new Ndb_cluster_connection(); // Object representing the cluster
...@@ -134,12 +157,13 @@ int main(int argc, char** argv) ...@@ -134,12 +157,13 @@ int main(int argc, char** argv)
const char *eventName= "CHNG_IN_t0"; const char *eventName= "CHNG_IN_t0";
const char *eventTableName= "t0"; const char *eventTableName= "t0";
const int noEventColumnName= 4; const int noEventColumnName= 5;
const char *eventColumnName[noEventColumnName]= const char *eventColumnName[noEventColumnName]=
{"c0", {"c0",
"c1", "c1",
"c2", "c2",
"c3" "c3",
"c4"
}; };
// Create events // Create events
...@@ -147,9 +171,14 @@ int main(int argc, char** argv) ...@@ -147,9 +171,14 @@ int main(int argc, char** argv)
eventName, eventName,
eventTableName, eventTableName,
eventColumnName, eventColumnName,
noEventColumnName); noEventColumnName,
merge_events);
int j= 0; // Normal values and blobs are unfortunately handled differently..
typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;
int i, j, k, l;
j = 0;
while (j < 99) { while (j < 99) {
// Start "transaction" for handling events // Start "transaction" for handling events
...@@ -160,12 +189,17 @@ int main(int argc, char** argv) ...@@ -160,12 +189,17 @@ int main(int argc, char** argv)
op->mergeEvents(merge_events); op->mergeEvents(merge_events);
printf("get values\n"); printf("get values\n");
NdbRecAttr* recAttr[noEventColumnName]; RA_BH recAttr[noEventColumnName];
NdbRecAttr* recAttrPre[noEventColumnName]; RA_BH recAttrPre[noEventColumnName];
// primary keys should always be a part of the result // primary keys should always be a part of the result
for (int i = 0; i < noEventColumnName; i++) { for (i = 0; i < noEventColumnName; i++) {
recAttr[i] = op->getValue(eventColumnName[i]); if (i < 4) {
recAttrPre[i] = op->getPreValue(eventColumnName[i]); recAttr[i].ra = op->getValue(eventColumnName[i]);
recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
} else if (merge_events) {
recAttr[i].bh = op->getBlobHandle(eventColumnName[i]);
recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
}
} }
// set up the callbacks // set up the callbacks
...@@ -174,13 +208,16 @@ int main(int argc, char** argv) ...@@ -174,13 +208,16 @@ int main(int argc, char** argv)
if (op->execute()) if (op->execute())
APIERROR(op->getNdbError()); APIERROR(op->getNdbError());
int i= 0; NdbEventOperation* the_op = op;
while(i < 40) {
i= 0;
while (i < 40) {
// printf("now waiting for event...\n"); // printf("now waiting for event...\n");
int r= myNdb->pollEvents(1000); // wait for event or 1000 ms int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
if (r > 0) { if (r > 0) {
// printf("got data! %d\n", r); // printf("got data! %d\n", r);
while ((op= myNdb->nextEvent())) { while ((op= myNdb->nextEvent())) {
assert(the_op == op);
i++; i++;
switch (op->getEventType()) { switch (op->getEventType()) {
case NdbDictionary::Event::TE_INSERT: case NdbDictionary::Event::TE_INSERT:
...@@ -195,40 +232,66 @@ int main(int argc, char** argv) ...@@ -195,40 +232,66 @@ int main(int argc, char** argv)
default: default:
abort(); // should not happen abort(); // should not happen
} }
printf(" gci=%d\n", op->getGCI()); printf(" gci=%d\n", (int)op->getGCI());
printf("post: "); for (k = 0; k <= 1; k++) {
for (int i = 0; i < noEventColumnName; i++) { printf(k == 0 ? "post: " : "pre : ");
if (recAttr[i]->isNULL() >= 0) { // we have a value for (l = 0; l < noEventColumnName; l++) {
if (recAttr[i]->isNULL() == 0) { // we have a non-null value if (l < 4) {
if (i < 2) NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
printf("%-5u", recAttr[i]->u_32_value()); if (ra->isNULL() >= 0) { // we have a value
if (ra->isNULL() == 0) { // we have a non-null value
if (l < 2)
printf("%-5u", ra->u_32_value());
else else
printf("%-5.4s", recAttr[i]->aRef()); printf("%-5.4s", ra->aRef());
} else // we have a null value } else
printf("%-5s", "NULL"); printf("%-5s", "NULL");
} else } else
printf("%-5s", "-"); printf("%-5s", "-"); // no value
} else if (merge_events) {
int isNull;
NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
bh->getDefined(isNull);
if (isNull >= 0) { // we have a value
if (! isNull) { // we have a non-null value
Uint64 length = 0;
bh->getLength(length);
// read into buffer
unsigned char* buf = new unsigned char [length];
memset(buf, 'X', length);
Uint32 n = length;
bh->readData(buf, n); // n is in/out
assert(n == length);
// pretty-print
bool first = true;
Uint32 i = 0;
while (i < n) {
unsigned char c = buf[i++];
Uint32 m = 1;
while (i < n && buf[i] == c)
i++, m++;
if (! first)
printf("+");
printf("%u%c", m, c);
first = false;
} }
printf("\npre : "); printf("[%u]", n);
for (int i = 0; i < noEventColumnName; i++) { delete [] buf;
if (recAttrPre[i]->isNULL() >= 0) { // we have a value } else
if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
if (i < 2)
printf("%-5u", recAttrPre[i]->u_32_value());
else
printf("%-5.4s", recAttrPre[i]->aRef());
} else // we have a null value
printf("%-5s", "NULL"); printf("%-5s", "NULL");
} else } else
printf("%-5s", "-"); printf("%-5s", "-"); // no value
}
} }
printf("\n"); printf("\n");
} }
}
} else } else
;//printf("timed out\n"); ;//printf("timed out\n");
} }
// don't want to listen to events anymore // don't want to listen to events anymore
if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError()); if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
the_op = 0;
j++; j++;
} }
...@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb, ...@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb,
const char *eventName, const char *eventName,
const char *eventTableName, const char *eventTableName,
const char **eventColumnNames, const char **eventColumnNames,
const int noEventColumnNames) const int noEventColumnNames,
bool merge_events)
{ {
NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
if (!myDict) APIERROR(myNdb->getNdbError()); if (!myDict) APIERROR(myNdb->getNdbError());
...@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb, ...@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb,
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
myEvent.addEventColumns(noEventColumnNames, eventColumnNames); myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
myEvent.mergeEvents(merge_events);
// Add event to database // Add event to database
if (myDict->createEvent(myEvent) == 0) if (myDict->createEvent(myEvent) == 0)
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <NdbBlob.hpp> #include <NdbBlob.hpp>
#include "NdbBlobImpl.hpp" #include "NdbBlobImpl.hpp"
#include <NdbScanOperation.hpp> #include <NdbScanOperation.hpp>
#include <NdbEventOperationImpl.hpp>
/* /*
* Reading index table directly (as a table) is faster but there are * Reading index table directly (as a table) is faster but there are
...@@ -147,6 +148,61 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm ...@@ -147,6 +148,61 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
int
NdbBlob::getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName)
{
NdbEventImpl* e = anNdb->theDictionary->m_impl.getEvent(eventName);
if (e == NULL)
return -1;
NdbColumnImpl* c = e->m_tableImpl->getColumn(columnName);
if (c == NULL)
return -1;
getBlobEventName(bename, e, c);
return 0;
}
void
NdbBlob::getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c)
{
// XXX events should have object id
snprintf(bename, MAX_TAB_NAME_SIZE, "NDB$BLOBEVENT_%s_%d", e->m_name.c_str(), (int)c->m_column_no);
}
void
NdbBlob::getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c)
{
DBUG_ENTER("NdbBlob::getBlobEvent");
// blob table
assert(c->m_blobTable != NULL);
const NdbTableImpl& bt = *c->m_blobTable;
// blob event name
char bename[NdbBlobImpl::BlobTableNameSize];
getBlobEventName(bename, e, c);
be.setName(bename);
be.setTable(bt);
// simple assigments
be.mi_type = e->mi_type;
be.m_dur = e->m_dur;
be.m_mergeEvents = e->m_mergeEvents;
// report unchanged data
// not really needed now since UPD is DEL o INS and we subscribe to all
be.setReport(NdbDictionary::Event::ER_ALL);
// columns PK - DIST - PART - DATA
{ const NdbColumnImpl* bc = bt.getColumn((Uint32)0);
be.addColumn(*bc);
}
{ const NdbColumnImpl* bc = bt.getColumn((Uint32)1);
be.addColumn(*bc);
}
{ const NdbColumnImpl* bc = bt.getColumn((Uint32)2);
be.addColumn(*bc);
}
{ const NdbColumnImpl* bc = bt.getColumn((Uint32)3);
be.addColumn(*bc);
}
DBUG_VOID_RETURN;
}
// initialization // initialization
NdbBlob::NdbBlob(Ndb*) NdbBlob::NdbBlob(Ndb*)
...@@ -158,9 +214,16 @@ void ...@@ -158,9 +214,16 @@ void
NdbBlob::init() NdbBlob::init()
{ {
theState = Idle; theState = Idle;
theEventBlobVersion = -1;
theNdb = NULL; theNdb = NULL;
theNdbCon = NULL; theNdbCon = NULL;
theNdbOp = NULL; theNdbOp = NULL;
theEventOp = NULL;
theBlobEventOp = NULL;
theBlobEventPkRecAttr = NULL;
theBlobEventDistRecAttr = NULL;
theBlobEventPartRecAttr = NULL;
theBlobEventDataRecAttr = NULL;
theTable = NULL; theTable = NULL;
theAccessTable = NULL; theAccessTable = NULL;
theBlobTable = NULL; theBlobTable = NULL;
...@@ -439,7 +502,7 @@ NdbBlob::getHeadFromRecAttr() ...@@ -439,7 +502,7 @@ NdbBlob::getHeadFromRecAttr()
DBUG_ENTER("NdbBlob::getHeadFromRecAttr"); DBUG_ENTER("NdbBlob::getHeadFromRecAttr");
assert(theHeadInlineRecAttr != NULL); assert(theHeadInlineRecAttr != NULL);
theNullFlag = theHeadInlineRecAttr->isNULL(); theNullFlag = theHeadInlineRecAttr->isNULL();
assert(theNullFlag != -1); assert(theEventBlobVersion >= 0 || theNullFlag != -1);
theLength = ! theNullFlag ? theHead->length : 0; theLength = ! theNullFlag ? theHead->length : 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -543,6 +606,18 @@ NdbBlob::setActiveHook(ActiveHook activeHook, void* arg) ...@@ -543,6 +606,18 @@ NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
// misc operations // misc operations
int
NdbBlob::getDefined(int& isNull)
{
DBUG_ENTER("NdbBlob::getDefined");
if (theState == Prepared && theSetFlag) {
isNull = (theSetBuf == NULL);
DBUG_RETURN(0);
}
isNull = theNullFlag;
DBUG_RETURN(0);
}
int int
NdbBlob::getNull(bool& isNull) NdbBlob::getNull(bool& isNull)
{ {
...@@ -887,6 +962,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) ...@@ -887,6 +962,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
{ {
DBUG_ENTER("NdbBlob::readParts"); DBUG_ENTER("NdbBlob::readParts");
DBUG_PRINT("info", ("part=%u count=%u", part, count)); DBUG_PRINT("info", ("part=%u count=%u", part, count));
int ret;
if (theEventBlobVersion == -1)
ret = readTableParts(buf, part, count);
else
ret = readEventParts(buf, part, count);
DBUG_RETURN(ret);
}
int
NdbBlob::readTableParts(char* buf, Uint32 part, Uint32 count)
{
DBUG_ENTER("NdbBlob::readTableParts");
Uint32 n = 0; Uint32 n = 0;
while (n < count) { while (n < count) {
NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable);
...@@ -906,6 +993,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) ...@@ -906,6 +993,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int
NdbBlob::readEventParts(char* buf, Uint32 part, Uint32 count)
{
DBUG_ENTER("NdbBlob::readEventParts");
int ret = theEventOp->readBlobParts(buf, this, part, count);
if (ret != 0) {
setErrorCode(theEventOp);
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
int int
NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
{ {
...@@ -1094,48 +1193,12 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1094,48 +1193,12 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
theTable = anOp->m_currentTable; theTable = anOp->m_currentTable;
theAccessTable = anOp->m_accessTable; theAccessTable = anOp->m_accessTable;
theColumn = aColumn; theColumn = aColumn;
NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined; // prepare blob column and table
switch (theColumn->getType()) { if (prepareColumn() == -1)
case NdbDictionary::Column::Blob:
partType = NdbDictionary::Column::Binary;
theFillChar = 0x0;
break;
case NdbDictionary::Column::Text:
partType = NdbDictionary::Column::Char;
theFillChar = 0x20;
break;
default:
setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1);
}
// sizes
theInlineSize = theColumn->getInlineSize();
thePartSize = theColumn->getPartSize();
theStripeSize = theColumn->getStripeSize();
// sanity check
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
if (thePartSize > 0) {
const NdbDictionary::Table* bt = NULL;
const NdbDictionary::Column* bc = NULL;
if (theStripeSize == 0 ||
(bt = theColumn->getBlobTable()) == NULL ||
(bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType ||
bc->getLength() != (int)thePartSize) {
setErrorCode(NdbBlobImpl::ErrTable);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} // extra buffers
theBlobTable = &NdbTableImpl::getImpl(*bt);
}
// buffers
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2); theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize); theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize);
thePartBuf.alloc(thePartSize);
theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head);
// handle different operation types // handle different operation types
bool supportedOp = false; bool supportedOp = false;
if (isKeyOp()) { if (isKeyOp()) {
...@@ -1189,6 +1252,99 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl ...@@ -1189,6 +1252,99 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int
NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version)
{
DBUG_ENTER("NdbBlob::atPrepare [event]");
DBUG_PRINT("info", ("this=%p op=%p", this, anOp));
assert(theState == Idle);
assert(version == 0 || version == 1);
theEventBlobVersion = version;
// ndb api stuff
theNdb = anOp->m_ndb;
theEventOp = anOp;
theBlobEventOp = aBlobOp;
theTable = anOp->m_eventImpl->m_tableImpl;
theColumn = aColumn;
// prepare blob column and table
if (prepareColumn() == -1)
DBUG_RETURN(-1);
// extra buffers
theBlobEventDataBuf.alloc(thePartSize);
// prepare receive of head+inline
theHeadInlineRecAttr = theEventOp->getValue(aColumn, theHeadInlineBuf.data, version);
if (theHeadInlineRecAttr == NULL) {
setErrorCode(theEventOp);
DBUG_RETURN(-1);
}
// prepare receive of blob part
if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL ||
(theBlobEventPartRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)2),
(char*)&thePartNumber, version)) == NULL ||
(theBlobEventDataRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)3),
theBlobEventDataBuf.data, version)) == NULL) {
setErrorCode(theBlobEventOp);
DBUG_RETURN(-1);
}
setState(Prepared);
DBUG_RETURN(0);
}
int
NdbBlob::prepareColumn()
{
DBUG_ENTER("prepareColumn");
NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined;
switch (theColumn->getType()) {
case NdbDictionary::Column::Blob:
partType = NdbDictionary::Column::Binary;
theFillChar = 0x0;
break;
case NdbDictionary::Column::Text:
partType = NdbDictionary::Column::Char;
theFillChar = 0x20;
break;
default:
setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1);
}
// sizes
theInlineSize = theColumn->getInlineSize();
thePartSize = theColumn->getPartSize();
theStripeSize = theColumn->getStripeSize();
// sanity check
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
if (thePartSize > 0) {
const NdbDictionary::Table* bt = NULL;
const NdbDictionary::Column* bc = NULL;
if (theStripeSize == 0 ||
(bt = theColumn->getBlobTable()) == NULL ||
(bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType ||
bc->getLength() != (int)thePartSize) {
setErrorCode(NdbBlobImpl::ErrTable);
DBUG_RETURN(-1);
}
// blob table
theBlobTable = &NdbTableImpl::getImpl(*bt);
}
// these buffers are always used
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head);
thePartBuf.alloc(thePartSize);
DBUG_RETURN(0);
}
/* /*
* Before execute of prepared operation. May add new operations before * Before execute of prepared operation. May add new operations before
* this one. May ask that this operation and all before it (a "batch") * this one. May ask that this operation and all before it (a "batch")
...@@ -1537,6 +1693,26 @@ NdbBlob::atNextResult() ...@@ -1537,6 +1693,26 @@ NdbBlob::atNextResult()
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
* After next event on main table.
*/
int
NdbBlob::atNextEvent()
{
DBUG_ENTER("NdbBlob::atNextEvent");
DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d", this, theEventOp, theBlobEventOp, theEventBlobVersion));
if (theState == Invalid)
DBUG_RETURN(-1);
assert(theEventBlobVersion >= 0);
getHeadFromRecAttr();
if (theNullFlag == -1) // value not defined
DBUG_RETURN(0);
if (setPos(0) == -1)
DBUG_RETURN(-1);
setState(Active);
DBUG_RETURN(0);
}
// misc // misc
const NdbDictionary::Column* const NdbDictionary::Column*
...@@ -1589,6 +1765,17 @@ NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag) ...@@ -1589,6 +1765,17 @@ NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag)
setErrorCode(code, invalidFlag); setErrorCode(code, invalidFlag);
} }
void
NdbBlob::setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag)
{
int code = 0;
if ((code = anOp->m_error.code) != 0)
;
else
code = NdbBlobImpl::ErrUnknown;
setErrorCode(code, invalidFlag);
}
// info about all blobs in this operation // info about all blobs in this operation
NdbBlob* NdbBlob*
......
...@@ -901,6 +901,11 @@ int NdbDictionary::Event::getNoOfEventColumns() const ...@@ -901,6 +901,11 @@ int NdbDictionary::Event::getNoOfEventColumns() const
return m_impl.getNoOfEventColumns(); return m_impl.getNoOfEventColumns();
} }
void NdbDictionary::Event::mergeEvents(bool flag)
{
m_impl.m_mergeEvents = flag;
}
NdbDictionary::Object::Status NdbDictionary::Object::Status
NdbDictionary::Event::getObjectStatus() const NdbDictionary::Event::getObjectStatus() const
{ {
......
...@@ -1072,6 +1072,7 @@ void NdbEventImpl::init() ...@@ -1072,6 +1072,7 @@ void NdbEventImpl::init()
m_tableId= RNIL; m_tableId= RNIL;
mi_type= 0; mi_type= 0;
m_dur= NdbDictionary::Event::ED_UNDEFINED; m_dur= NdbDictionary::Event::ED_UNDEFINED;
m_mergeEvents = false;
m_tableImpl= NULL; m_tableImpl= NULL;
m_rep= NdbDictionary::Event::ER_UPDATED; m_rep= NdbDictionary::Event::ER_UPDATED;
} }
...@@ -2036,7 +2037,7 @@ int ...@@ -2036,7 +2037,7 @@ int
NdbDictionaryImpl::addBlobTables(NdbTableImpl &t) NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
{ {
unsigned n= t.m_noOfBlobs; unsigned n= t.m_noOfBlobs;
DBUG_ENTER("NdbDictioanryImpl::addBlobTables"); DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
// optimized for blob column being the last one // optimized for blob column being the last one
// and not looking for more than one if not neccessary // and not looking for more than one if not neccessary
for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) { for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
...@@ -3151,7 +3152,37 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt) ...@@ -3151,7 +3152,37 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
#endif #endif
// NdbDictInterface m_receiver; // NdbDictInterface m_receiver;
DBUG_RETURN(m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */)); if (m_receiver.createEvent(m_ndb, evnt, 0 /* getFlag unset */) != 0)
DBUG_RETURN(-1);
// Create blob events
if (evnt.m_mergeEvents && createBlobEvents(evnt) != 0) {
int save_code = m_error.code;
(void)dropEvent(evnt.m_name.c_str());
m_error.code = save_code;
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
int
NdbDictionaryImpl::createBlobEvents(NdbEventImpl& evnt)
{
DBUG_ENTER("NdbDictionaryImpl::createBlobEvents");
NdbTableImpl& t = *evnt.m_tableImpl;
Uint32 n = t.m_noOfBlobs;
Uint32 i;
for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
NdbColumnImpl & c = *evnt.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
NdbEventImpl blob_evnt;
NdbBlob::getBlobEvent(blob_evnt, &evnt, &c);
if (createEvent(blob_evnt) != 0)
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
} }
int int
...@@ -3400,6 +3431,7 @@ NdbDictionaryImpl::getEvent(const char * eventName) ...@@ -3400,6 +3431,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
if ( attributeList_sz > table.getNoOfColumns() ) if ( attributeList_sz > table.getNoOfColumns() )
{ {
m_error.code = 241;
DBUG_PRINT("error",("Invalid version, too many columns")); DBUG_PRINT("error",("Invalid version, too many columns"));
delete ev; delete ev;
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -3409,6 +3441,7 @@ NdbDictionaryImpl::getEvent(const char * eventName) ...@@ -3409,6 +3441,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) { for(unsigned id= 0; ev->m_columns.size() < attributeList_sz; id++) {
if ( id >= table.getNoOfColumns()) if ( id >= table.getNoOfColumns())
{ {
m_error.code = 241;
DBUG_PRINT("error",("Invalid version, column %d out of range", id)); DBUG_PRINT("error",("Invalid version, column %d out of range", id));
delete ev; delete ev;
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
...@@ -3566,13 +3599,54 @@ NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal, ...@@ -3566,13 +3599,54 @@ NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal,
int int
NdbDictionaryImpl::dropEvent(const char * eventName) NdbDictionaryImpl::dropEvent(const char * eventName)
{ {
NdbEventImpl *ev= new NdbEventImpl(); DBUG_ENTER("NdbDictionaryImpl::dropEvent");
ev->setName(eventName); DBUG_PRINT("info", ("name=%s", eventName));
int ret= m_receiver.dropEvent(*ev);
delete ev;
// printf("__________________RET %u\n", ret); NdbEventImpl *evnt = getEvent(eventName); // allocated
return ret; if (evnt == NULL) {
if (m_error.code != 723 && // no such table
m_error.code != 241) // invalid table
DBUG_RETURN(-1);
DBUG_PRINT("info", ("no table, drop by name alone"));
evnt = new NdbEventImpl();
evnt->setName(eventName);
}
int ret = dropEvent(*evnt);
delete evnt;
DBUG_RETURN(ret);
}
int
NdbDictionaryImpl::dropEvent(const NdbEventImpl& evnt)
{
if (dropBlobEvents(evnt) != 0)
return -1;
if (m_receiver.dropEvent(evnt) != 0)
return -1;
return 0;
}
int
NdbDictionaryImpl::dropBlobEvents(const NdbEventImpl& evnt)
{
DBUG_ENTER("NdbDictionaryImpl::dropBlobEvents");
if (evnt.m_tableImpl != 0) {
const NdbTableImpl& t = *evnt.m_tableImpl;
Uint32 n = t.m_noOfBlobs;
Uint32 i;
for (i = 0; i < evnt.m_columns.size() && n > 0; i++) {
const NdbColumnImpl& c = *evnt.m_columns[i];
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
n--;
char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, &evnt, &c);
(void)dropEvent(bename);
}
} else {
// could loop over MAX_ATTRIBUTES_IN_TABLE ...
}
DBUG_RETURN(0);
} }
int int
......
...@@ -277,7 +277,6 @@ public: ...@@ -277,7 +277,6 @@ public:
NdbDictionary::Event::EventDurability getDurability() const; NdbDictionary::Event::EventDurability getDurability() const;
void setReport(NdbDictionary::Event::EventReport r); void setReport(NdbDictionary::Event::EventReport r);
NdbDictionary::Event::EventReport getReport() const; NdbDictionary::Event::EventReport getReport() const;
void addEventColumn(const NdbColumnImpl &c);
int getNoOfEventColumns() const; int getNoOfEventColumns() const;
void print() { void print() {
...@@ -295,6 +294,7 @@ public: ...@@ -295,6 +294,7 @@ public:
Uint32 mi_type; Uint32 mi_type;
NdbDictionary::Event::EventDurability m_dur; NdbDictionary::Event::EventDurability m_dur;
NdbDictionary::Event::EventReport m_rep; NdbDictionary::Event::EventReport m_rep;
bool m_mergeEvents;
NdbTableImpl *m_tableImpl; NdbTableImpl *m_tableImpl;
BaseString m_tableName; BaseString m_tableName;
...@@ -547,7 +547,10 @@ public: ...@@ -547,7 +547,10 @@ public:
NdbTableImpl * table); NdbTableImpl * table);
int createEvent(NdbEventImpl &); int createEvent(NdbEventImpl &);
int createBlobEvents(NdbEventImpl &);
int dropEvent(const char * eventName); int dropEvent(const char * eventName);
int dropEvent(const NdbEventImpl &);
int dropBlobEvents(const NdbEventImpl &);
int executeSubscribeEvent(NdbEventOperationImpl &); int executeSubscribeEvent(NdbEventOperationImpl &);
int stopSubscribeEvent(NdbEventOperationImpl &); int stopSubscribeEvent(NdbEventOperationImpl &);
......
...@@ -55,6 +55,18 @@ NdbEventOperation::getPreValue(const char *colName, char *aValue) ...@@ -55,6 +55,18 @@ NdbEventOperation::getPreValue(const char *colName, char *aValue)
return m_impl.getValue(colName, aValue, 1); return m_impl.getValue(colName, aValue, 1);
} }
NdbBlob *
NdbEventOperation::getBlobHandle(const char *colName)
{
return m_impl.getBlobHandle(colName, 0);
}
NdbBlob *
NdbEventOperation::getPreBlobHandle(const char *colName)
{
return m_impl.getBlobHandle(colName, 1);
}
int int
NdbEventOperation::execute() NdbEventOperation::execute()
{ {
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "DictCache.hpp" #include "DictCache.hpp"
#include <portlib/NdbMem.h> #include <portlib/NdbMem.h>
#include <NdbRecAttr.hpp> #include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include <NdbEventOperation.hpp> #include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp" #include "NdbEventOperationImpl.hpp"
...@@ -48,6 +49,20 @@ static Gci_container g_empty_gci_container; ...@@ -48,6 +49,20 @@ static Gci_container g_empty_gci_container;
static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4; static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4;
static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1; static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
#ifdef VM_TRACE
static void
print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
{
printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
for (int i = 0; i <= 2; i++) {
printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
for (int j = 0; j < ptr[i].sz; j++)
printf("%08x ", ptr[i].p[j]);
printf("\n");
}
}
#endif
/* /*
* Class NdbEventOperationImpl * Class NdbEventOperationImpl
* *
...@@ -60,7 +75,7 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1; ...@@ -60,7 +75,7 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A) #define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN #define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B) #define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_SUMP(A,B,C) #define DBUG_DUMP_EVENT(A,B,C) DBUG_DUMP(A,B,C)
#else #else
#define DBUG_ENTER_EVENT(A) #define DBUG_ENTER_EVENT(A)
#define DBUG_RETURN_EVENT(A) return(A) #define DBUG_RETURN_EVENT(A) return(A)
...@@ -92,6 +107,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -92,6 +107,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
theCurrentDataAttrs[0] = NULL; theCurrentDataAttrs[0] = NULL;
theFirstDataAttrs[1] = NULL; theFirstDataAttrs[1] = NULL;
theCurrentDataAttrs[1] = NULL; theCurrentDataAttrs[1] = NULL;
theBlobList = NULL;
theBlobOpList = NULL;
theMainOp = NULL;
m_data_item= NULL; m_data_item= NULL;
m_eventImpl = NULL; m_eventImpl = NULL;
...@@ -117,7 +137,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -117,7 +137,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state= EO_CREATED; m_state= EO_CREATED;
#ifdef ndb_event_stores_merge_events_flag
m_mergeEvents = m_eventImpl->m_mergeEvents;
#else
m_mergeEvents = false; m_mergeEvents = false;
#endif
m_has_error= 0; m_has_error= 0;
...@@ -254,10 +278,183 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in ...@@ -254,10 +278,183 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
DBUG_RETURN(tAttr); DBUG_RETURN(tAttr);
} }
NdbBlob*
NdbEventOperationImpl::getBlobHandle(const char *colName, int n)
{
DBUG_ENTER("NdbEventOperationImpl::getBlobHandle (colName)");
assert(m_mergeEvents);
if (m_state != EO_CREATED) {
ndbout_c("NdbEventOperationImpl::getBlobHandle may only be called between "
"instantiation and execute()");
DBUG_RETURN(NULL);
}
NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
if (tAttrInfo == NULL) {
ndbout_c("NdbEventOperationImpl::getBlobHandle attribute %s not found",colName);
DBUG_RETURN(NULL);
}
NdbBlob* bh = getBlobHandle(tAttrInfo, n);
DBUG_RETURN(bh);
}
NdbBlob*
NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
{
DBUG_ENTER("NdbEventOperationImpl::getBlobHandle");
DBUG_PRINT("info", ("attr=%s post/pre=%d", tAttrInfo->m_name.c_str(), n));
// as in NdbOperation, create only one instance
NdbBlob* tBlob = theBlobList;
NdbBlob* tLastBlob = NULL;
while (tBlob != NULL) {
if (tBlob->theColumn == tAttrInfo && tBlob->theEventBlobVersion == n)
DBUG_RETURN(tBlob);
tLastBlob = tBlob;
tBlob = tBlob->theNext;
}
// blob event name
char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
// find blob event op if any (it serves both post and pre handles)
assert(tAttrInfo->m_blobTable != NULL);
NdbEventOperationImpl* tBlobOp = theBlobOpList;
NdbEventOperationImpl* tLastBlopOp = NULL;
while (tBlobOp != NULL) {
if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
assert(tBlobOp->m_eventImpl->m_tableImpl == tAttrInfo->m_blobTable);
break;
}
tLastBlopOp = tBlobOp;
tBlobOp = tBlobOp->theNextBlobOp;
}
DBUG_PRINT("info", ("%s op %s", tBlobOp ? " reuse" : " create", bename));
// create blob event op if not found
if (tBlobOp == NULL) {
NdbEventOperation* tmp = m_ndb->createEventOperation(bename);
if (tmp == NULL)
DBUG_RETURN(NULL);
tBlobOp = &tmp->m_impl;
// pointer to main table op
tBlobOp->theMainOp = this;
tBlobOp->m_mergeEvents = m_mergeEvents;
// add to list end
if (tLastBlopOp == NULL)
theBlobOpList = tBlobOp;
else
tLastBlopOp->theNextBlobOp = tBlobOp;
tBlobOp->theNextBlobOp = NULL;
}
tBlob = m_ndb->getNdbBlob();
if (tBlob == NULL)
DBUG_RETURN(NULL);
// calls getValue on inline and blob part
if (tBlob->atPrepare(this, tBlobOp, tAttrInfo, n) == -1) {
m_ndb->releaseNdbBlob(tBlob);
DBUG_RETURN(NULL);
}
// add to list end
if (tLastBlob == NULL)
theBlobList = tBlob;
else
tLastBlob->theNext = tBlob;
tBlob->theNext = NULL;
DBUG_RETURN(tBlob);
}
int
NdbEventOperationImpl::readBlobParts(char* buf, NdbBlob* blob,
Uint32 part, Uint32 count)
{
DBUG_ENTER_EVENT("NdbEventOperationImpl::readBlobParts");
DBUG_PRINT_EVENT("info", ("part=%u count=%u post/pre=%d",
part, count, blob->theEventBlobVersion));
NdbEventOperationImpl* blob_op = blob->theBlobEventOp;
EventBufData* main_data = m_data_item;
DBUG_PRINT_EVENT("info", ("main_data=%p", main_data));
assert(main_data != NULL);
// search for blob parts list head
EventBufData* head;
assert(m_data_item != NULL);
head = m_data_item->m_next_blob;
while (head != NULL)
{
if (head->m_event_op == blob_op)
{
DBUG_PRINT_EVENT("info", ("found blob parts head %p", head));
break;
}
head = head->m_next_blob;
}
Uint32 nparts = 0;
EventBufData* data = head;
// XXX optimize using part no ordering
while (data != NULL)
{
/*
* Hack part no directly out of buffer since it is not returned
* in pre data (PK buglet). For part data use receive_event().
* This means extra copy.
*/
blob_op->m_data_item = data;
int r = blob_op->receive_event();
assert(r > 0);
Uint32 no = data->get_blob_part_no();
Uint32 sz = blob->thePartSize;
const char* src = blob->theBlobEventDataBuf.data;
DBUG_PRINT_EVENT("info", ("part_data=%p part no=%u part sz=%u", data, no, sz));
if (part <= no && no < part + count)
{
DBUG_PRINT_EVENT("info", ("part within read range"));
memcpy(buf + (no - part) * sz, src, sz);
nparts++;
}
else
{
DBUG_PRINT_EVENT("info", ("part outside read range"));
}
data = data->m_next;
}
assert(nparts == count);
DBUG_RETURN_EVENT(0);
}
int int
NdbEventOperationImpl::execute() NdbEventOperationImpl::execute()
{ {
DBUG_ENTER("NdbEventOperationImpl::execute"); DBUG_ENTER("NdbEventOperationImpl::execute");
m_ndb->theEventBuffer->add_drop_lock();
int r = execute_nolock();
m_ndb->theEventBuffer->add_drop_unlock();
DBUG_RETURN(r);
}
int
NdbEventOperationImpl::execute_nolock()
{
DBUG_ENTER("NdbEventOperationImpl::execute_nolock");
DBUG_PRINT("info", ("this=%p type=%s", this, !theMainOp ? "main" : "blob"));
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) { if (!myDict) {
m_error.code= m_ndb->getNdbError().code; m_error.code= m_ndb->getNdbError().code;
...@@ -266,17 +463,25 @@ NdbEventOperationImpl::execute() ...@@ -266,17 +463,25 @@ NdbEventOperationImpl::execute()
if (theFirstPkAttrs[0] == NULL && if (theFirstPkAttrs[0] == NULL &&
theFirstDataAttrs[0] == NULL) { // defaults to get all theFirstDataAttrs[0] == NULL) { // defaults to get all
} }
m_ndb->theEventBuffer->add_drop_lock();
m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER; m_magic_number= NDB_EVENT_OP_MAGIC_NUMBER;
m_state= EO_EXECUTING; m_state= EO_EXECUTING;
mi_type= m_eventImpl->mi_type; mi_type= m_eventImpl->mi_type;
m_ndb->theEventBuffer->add_op(); m_ndb->theEventBuffer->add_op();
int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this); int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
if (r == 0) { if (r == 0) {
m_ndb->theEventBuffer->add_drop_unlock(); if (theMainOp == NULL) {
DBUG_PRINT("info", ("execute blob ops"));
NdbEventOperationImpl* blob_op = theBlobOpList;
while (blob_op != NULL) {
r = blob_op->execute_nolock();
if (r != 0)
break;
blob_op = blob_op->theNextBlobOp;
}
}
if (r == 0)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
//Error //Error
...@@ -285,7 +490,6 @@ NdbEventOperationImpl::execute() ...@@ -285,7 +490,6 @@ NdbEventOperationImpl::execute()
m_magic_number= 0; m_magic_number= 0;
m_error.code= myDict->getNdbError().code; m_error.code= myDict->getNdbError().code;
m_ndb->theEventBuffer->remove_op(); m_ndb->theEventBuffer->remove_op();
m_ndb->theEventBuffer->add_drop_unlock();
DBUG_RETURN(r); DBUG_RETURN(r);
} }
...@@ -709,21 +913,6 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI) ...@@ -709,21 +913,6 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return ret; return ret;
} }
#ifdef VM_TRACE
static void
print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3])
{
printf("%s\n", tag);
printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
for (int i = 0; i <= 2; i++) {
printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
for (int j = 0; j < ptr[i].sz; j++)
printf("%08x ", ptr[i].p[j]);
printf("\n");
}
}
#endif
NdbEventOperation * NdbEventOperation *
NdbEventBuffer::nextEvent() NdbEventBuffer::nextEvent()
{ {
...@@ -751,6 +940,7 @@ NdbEventBuffer::nextEvent() ...@@ -751,6 +940,7 @@ NdbEventBuffer::nextEvent()
while ((data= m_available_data.m_head)) while ((data= m_available_data.m_head))
{ {
NdbEventOperationImpl *op= data->m_event_op; NdbEventOperationImpl *op= data->m_event_op;
DBUG_PRINT_EVENT("info", ("available data=%p op=%p", data, op));
// set NdbEventOperation data // set NdbEventOperation data
op->m_data_item= data; op->m_data_item= data;
...@@ -767,7 +957,10 @@ NdbEventBuffer::nextEvent() ...@@ -767,7 +957,10 @@ NdbEventBuffer::nextEvent()
// NUL event is not returned // NUL event is not returned
if (data->sdata->operation == NdbDictionary::Event::_TE_NUL) if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
{
DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
continue; continue;
}
int r= op->receive_event(); int r= op->receive_event();
if (r > 0) if (r > 0)
...@@ -777,6 +970,12 @@ NdbEventBuffer::nextEvent() ...@@ -777,6 +970,12 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE #ifdef VM_TRACE
m_latest_command= m_latest_command_save; m_latest_command= m_latest_command_save;
#endif #endif
NdbBlob* tBlob = op->theBlobList;
while (tBlob != NULL)
{
(void)tBlob->atNextEvent();
tBlob = tBlob->theNext;
}
DBUG_RETURN_EVENT(op->m_facade); DBUG_RETURN_EVENT(op->m_facade);
} }
// the next event belonged to an event op that is no // the next event belonged to an event op that is no
...@@ -1161,7 +1360,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1161,7 +1360,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL"); DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
Uint64 gci= sdata->gci; Uint64 gci= sdata->gci;
if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) ) if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
{ {
Gci_container* bucket= find_bucket(&m_active_gci, gci); Gci_container* bucket= find_bucket(&m_active_gci, gci);
...@@ -1179,9 +1378,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1179,9 +1378,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
bool use_hash = const bool is_data_event =
op->m_mergeEvents &&
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT; sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
const bool use_hash = op->m_mergeEvents && is_data_event;
// find position in bucket hash table // find position in bucket hash table
EventBufData* data = 0; EventBufData* data = 0;
...@@ -1207,10 +1406,38 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1207,10 +1406,38 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
op->m_has_error = 3; op->m_has_error = 3;
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
} }
// add it to list and hash table data->m_event_op = op;
if (op->theMainOp == NULL || ! is_data_event)
{
bucket->m_data.append(data); bucket->m_data.append(data);
}
else
{
// find or create main event for this blob event
EventBufData_hash::Pos main_hpos;
int ret = get_main_data(bucket, main_hpos, data);
if (ret == -1)
{
op->m_has_error = 4;
DBUG_RETURN_EVENT(-1);
}
EventBufData* main_data = main_hpos.data;
if (ret != 0) // main event was created
{
main_data->m_event_op = op->theMainOp;
bucket->m_data.append(main_data);
if (use_hash)
{
main_data->m_pkhash = main_hpos.pkhash;
bucket->m_data_hash.append(main_hpos, main_data);
}
}
// link blob event under main event
add_blob_data(main_data, data);
}
if (use_hash) if (use_hash)
{ {
data->m_pkhash = hpos.pkhash;
bucket->m_data_hash.append(hpos, data); bucket->m_data_hash.append(hpos, data);
} }
#ifdef VM_TRACE #ifdef VM_TRACE
...@@ -1226,18 +1453,12 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1226,18 +1453,12 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
} }
} }
data->m_event_op = op;
if (use_hash)
{
data->m_pkhash = hpos.pkhash;
}
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
#ifdef VM_TRACE #ifdef VM_TRACE
if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation) if ((Uint32)op->m_eventImpl->mi_type & (1 << (Uint32)sdata->operation))
{ {
// XXX never reached
DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId)); DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
...@@ -1300,6 +1521,8 @@ NdbEventBuffer::alloc_data() ...@@ -1300,6 +1521,8 @@ NdbEventBuffer::alloc_data()
int int
NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
{ {
DBUG_ENTER("NdbEventBuffer::alloc_mem");
DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
const Uint32 min_alloc_size = 128; const Uint32 min_alloc_size = 128;
Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2; Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
...@@ -1317,7 +1540,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) ...@@ -1317,7 +1540,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
data->memory = (Uint32*)NdbMem_Allocate(alloc_size); data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
if (data->memory == 0) if (data->memory == 0)
return -1; DBUG_RETURN(-1);
data->sz = alloc_size; data->sz = alloc_size;
m_total_alloc += data->sz; m_total_alloc += data->sz;
} }
...@@ -1332,7 +1555,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) ...@@ -1332,7 +1555,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
memptr += ptr[i].sz; memptr += ptr[i].sz;
} }
return 0; DBUG_RETURN(0);
} }
int int
...@@ -1404,13 +1627,10 @@ copy_attr(AttributeHeader ah, ...@@ -1404,13 +1627,10 @@ copy_attr(AttributeHeader ah,
{ {
Uint32 k; Uint32 k;
for (k = 0; k < n; k++) for (k = 0; k < n; k++)
p1[j1++] = p2[j2++]; p1[j1 + k] = p2[j2 + k];
} }
else
{
j1 += n; j1 += n;
j2 += n; j2 += n;
}
} }
int int
...@@ -1443,8 +1663,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -1443,8 +1663,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data->sz = 0; data->sz = 0;
// compose ptr1 o ptr2 = ptr // compose ptr1 o ptr2 = ptr
LinearSectionPtr (&ptr1) [3] = olddata.ptr; LinearSectionPtr (&ptr1)[3] = olddata.ptr;
LinearSectionPtr (&ptr) [3] = data->ptr; LinearSectionPtr (&ptr)[3] = data->ptr;
// loop twice where first loop only sets sizes // loop twice where first loop only sets sizes
int loop; int loop;
...@@ -1458,7 +1678,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -1458,7 +1678,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data->sdata->operation = tp->t3; data->sdata->operation = tp->t3;
} }
ptr[0].sz = ptr[1].sz = ptr[3].sz = 0; ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
// copy pk from new version // copy pk from new version
{ {
...@@ -1573,6 +1793,113 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -1573,6 +1793,113 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
/*
* Given blob part event, find main table event on inline part. It
* should exist (force in TUP) but may arrive later. If so, create
* NUL event on main table. The real event replaces it later.
*/
// write attribute headers for concatened PK
static void
split_concatenated_pk(const NdbTableImpl* t, Uint32* ah_buffer,
const Uint32* pk_buffer, Uint32 pk_sz)
{
Uint32 sz = 0; // words parsed so far
Uint32 n; // pk attr count
Uint32 i;
for (i = n = 0; i < t->m_columns.size() && n < t->m_noOfKeys; i++)
{
const NdbColumnImpl* c = t->getColumn(i);
assert(c != NULL);
if (! c->m_pk)
continue;
assert(sz < pk_sz);
Uint32 bytesize = c->m_attrSize * c->m_arraySize;
Uint32 lb, len;
bool ok = NdbSqlUtil::get_var_length(c->m_type, &pk_buffer[sz], bytesize,
lb, len);
assert(ok);
AttributeHeader ah(i, lb + len);
ah_buffer[n++] = ah.m_value;
sz += ah.getDataSize();
}
assert(n == t->m_noOfKeys && sz == pk_sz);
}
int
NdbEventBuffer::get_main_data(Gci_container* bucket,
EventBufData_hash::Pos& hpos,
EventBufData* blob_data)
{
DBUG_ENTER_EVENT("NdbEventBuffer::get_main_data");
NdbEventOperationImpl* main_op = blob_data->m_event_op->theMainOp;
assert(main_op != NULL);
const NdbTableImpl* mainTable = main_op->m_eventImpl->m_tableImpl;
// create LinearSectionPtr for main table key
LinearSectionPtr ptr[3];
Uint32 ah_buffer[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
ptr[0].sz = mainTable->m_noOfKeys;
ptr[0].p = ah_buffer;
ptr[1].sz = AttributeHeader(blob_data->ptr[0].p[0]).getDataSize();
ptr[1].p = blob_data->ptr[1].p;
ptr[2].sz = 0;
ptr[2].p = 0;
split_concatenated_pk(mainTable, ptr[0].p, ptr[1].p, ptr[1].sz);
DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
// search for main event buffer
bucket->m_data_hash.search(hpos, main_op, ptr);
if (hpos.data != NULL)
DBUG_RETURN_EVENT(0);
// not found, create a place-holder
EventBufData* main_data = alloc_data();
if (main_data == NULL)
DBUG_RETURN_EVENT(-1);
SubTableData sdata = *blob_data->sdata;
sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
sdata.operation = NdbDictionary::Event::_TE_NUL;
if (copy_data(&sdata, ptr, main_data) != 0)
DBUG_RETURN_EVENT(-1);
hpos.data = main_data;
DBUG_RETURN_EVENT(1);
}
void
NdbEventBuffer::add_blob_data(EventBufData* main_data,
EventBufData* blob_data)
{
DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
DBUG_PRINT_EVENT("info", ("main_data=%p blob_data=%p", main_data, blob_data));
EventBufData* head;
head = main_data->m_next_blob;
while (head != NULL)
{
if (head->m_event_op == blob_data->m_event_op)
break;
head = head->m_next_blob;
}
if (head == NULL)
{
head = blob_data;
head->m_next_blob = main_data->m_next_blob;
main_data->m_next_blob = head;
}
else
{
blob_data->m_next = head->m_next;
head->m_next = blob_data;
}
DBUG_VOID_RETURN_EVENT;
}
NdbEventOperationImpl * NdbEventOperationImpl *
NdbEventBuffer::move_data() NdbEventBuffer::move_data()
{ {
...@@ -1613,6 +1940,31 @@ NdbEventBuffer::free_list(EventBufData_list &list) ...@@ -1613,6 +1940,31 @@ NdbEventBuffer::free_list(EventBufData_list &list)
#endif #endif
m_free_data_sz+= list.m_sz; m_free_data_sz+= list.m_sz;
// free blobs XXX unacceptable performance, fix later
{
EventBufData* data = list.m_head;
while (1) {
while (data->m_next_blob != NULL) {
EventBufData* blob_head = data->m_next_blob;
data->m_next_blob = blob_head->m_next_blob;
blob_head->m_next_blob = NULL;
while (blob_head != NULL) {
EventBufData* blob_part = blob_head;
blob_head = blob_head->m_next;
blob_part->m_next = m_free_data;
m_free_data = blob_part;
#ifdef VM_TRACE
m_free_data_count++;
#endif
m_free_data_sz += blob_part->sz;
}
}
if (data == list.m_tail)
break;
data = data->m_next;
}
}
// list returned to m_free_data // list returned to m_free_data
new (&list) EventBufData_list; new (&list) EventBufData_list;
} }
...@@ -1649,6 +2001,14 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp) ...@@ -1649,6 +2001,14 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
m_dropped_ev_op->m_prev= op; m_dropped_ev_op->m_prev= op;
m_dropped_ev_op= op; m_dropped_ev_op= op;
// drop blob ops
while (op->theBlobOpList != NULL)
{
NdbEventOperationImpl* tBlobOp = op->theBlobOpList;
op->theBlobOpList = op->theBlobOpList->theNextBlobOp;
(void)m_ndb->dropEventOperation(tBlobOp);
}
// ToDo, take care of these to be deleted at the // ToDo, take care of these to be deleted at the
// appropriate time, after we are sure that there // appropriate time, after we are sure that there
// are _no_ more events coming // are _no_ more events coming
...@@ -1717,6 +2077,10 @@ send_report: ...@@ -1717,6 +2077,10 @@ send_report:
Uint32 Uint32
EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
{ {
DBUG_ENTER_EVENT("EventBufData_hash::getpkhash");
DBUG_DUMP_EVENT("ah", (char*)ptr[0].p, ptr[0].sz << 2);
DBUG_DUMP_EVENT("pk", (char*)ptr[1].p, ptr[1].sz << 2);
const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad.. // in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
...@@ -1747,13 +2111,19 @@ EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) ...@@ -1747,13 +2111,19 @@ EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
(*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2); (*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
dptr += ((bytesize + 3) / 4) * 4; dptr += ((bytesize + 3) / 4) * 4;
} }
return nr1; DBUG_PRINT_EVENT("info", ("hash result=%08x", nr1));
DBUG_RETURN_EVENT(nr1);
} }
// this is seldom invoked
bool bool
EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]) EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
{ {
DBUG_ENTER_EVENT("EventBufData_hash::getpkequal");
DBUG_DUMP_EVENT("ah1", (char*)ptr1[0].p, ptr1[0].sz << 2);
DBUG_DUMP_EVENT("pk1", (char*)ptr1[1].p, ptr1[1].sz << 2);
DBUG_DUMP_EVENT("ah2", (char*)ptr2[0].p, ptr2[0].sz << 2);
DBUG_DUMP_EVENT("pk2", (char*)ptr2[1].p, ptr2[1].sz << 2);
const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl; const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
Uint32 nkey = tab->m_noOfKeys; Uint32 nkey = tab->m_noOfKeys;
...@@ -1763,6 +2133,8 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3 ...@@ -1763,6 +2133,8 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
const uchar* dptr1 = (uchar*)ptr1[1].p; const uchar* dptr1 = (uchar*)ptr1[1].p;
const uchar* dptr2 = (uchar*)ptr2[1].p; const uchar* dptr2 = (uchar*)ptr2[1].p;
bool equal = true;
while (nkey-- != 0) while (nkey-- != 0)
{ {
AttributeHeader ah1(*hptr1++); AttributeHeader ah1(*hptr1++);
...@@ -1787,16 +2159,22 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3 ...@@ -1787,16 +2159,22 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin; CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false); int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
if (res != 0) if (res != 0)
return false; {
equal = false;
break;
}
dptr1 += ((bytesize1 + 3) / 4) * 4; dptr1 += ((bytesize1 + 3) / 4) * 4;
dptr2 += ((bytesize2 + 3) / 4) * 4; dptr2 += ((bytesize2 + 3) / 4) * 4;
} }
return true;
DBUG_PRINT_EVENT("info", ("equal=%s", equal ? "true" : "false"));
DBUG_RETURN_EVENT(equal);
} }
void void
EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]) EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
{ {
DBUG_ENTER_EVENT("EventBufData_hash::search");
Uint32 pkhash = getpkhash(op, ptr); Uint32 pkhash = getpkhash(op, ptr);
Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE; Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
EventBufData* data = m_hash[index]; EventBufData* data = m_hash[index];
...@@ -1811,6 +2189,8 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ...@@ -1811,6 +2189,8 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr
hpos.index = index; hpos.index = index;
hpos.data = data; hpos.data = data;
hpos.pkhash = pkhash; hpos.pkhash = pkhash;
DBUG_PRINT_EVENT("info", ("search result=%p", data));
DBUG_VOID_RETURN_EVENT;
} }
template class Vector<Gci_container>; template class Vector<Gci_container>;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <signaldata/SumaImpl.hpp> #include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp> #include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp> #include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
...@@ -35,9 +36,28 @@ struct EventBufData ...@@ -35,9 +36,28 @@ struct EventBufData
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
unsigned sz; unsigned sz;
NdbEventOperationImpl *m_event_op; NdbEventOperationImpl *m_event_op;
EventBufData *m_next; // Next wrt to global order
/*
* Blobs are stored in blob list (m_next_blob) where each entry
* is list of parts (m_next) in part number order.
*
* TODO order by part no and link for fast read and free_list
*/
EventBufData *m_next; // Next wrt to global order or Next blob part
EventBufData *m_next_blob; // First part in next blob
EventBufData *m_next_hash; // Next in per-GCI hash EventBufData *m_next_hash; // Next in per-GCI hash
Uint32 m_pkhash; // PK hash (without op) for fast compare Uint32 m_pkhash; // PK hash (without op) for fast compare
// Get blob part number from blob data
Uint32 get_blob_part_no() {
assert(ptr[0].sz > 2);
Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() +
AttributeHeader(ptr[0].p[1]).getDataSize();
Uint32 no = ptr[1].p[pos];
return no;
}
}; };
class EventBufData_list class EventBufData_list
...@@ -70,7 +90,6 @@ EventBufData_list::~EventBufData_list() ...@@ -70,7 +90,6 @@ EventBufData_list::~EventBufData_list()
{ {
} }
inline inline
int EventBufData_list::is_empty() int EventBufData_list::is_empty()
{ {
...@@ -173,9 +192,13 @@ public: ...@@ -173,9 +192,13 @@ public:
NdbEventOperation::State getState(); NdbEventOperation::State getState();
int execute(); int execute();
int execute_nolock();
int stop(); int stop();
NdbRecAttr *getValue(const char *colName, char *aValue, int n); NdbRecAttr *getValue(const char *colName, char *aValue, int n);
NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n); NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
NdbBlob *getBlobHandle(const char *colName, int n);
NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
int receive_event(); int receive_event();
Uint64 getGCI(); Uint64 getGCI();
Uint64 getLatestGCI(); Uint64 getLatestGCI();
...@@ -199,6 +222,13 @@ public: ...@@ -199,6 +222,13 @@ public:
NdbRecAttr *theFirstDataAttrs[2]; NdbRecAttr *theFirstDataAttrs[2];
NdbRecAttr *theCurrentDataAttrs[2]; NdbRecAttr *theCurrentDataAttrs[2];
NdbBlob* theBlobList;
union {
NdbEventOperationImpl* theBlobOpList;
NdbEventOperationImpl* theNextBlobOp;
};
NdbEventOperationImpl* theMainOp; // blob op pointer to main op
NdbEventOperation::State m_state; /* note connection to mi_type */ NdbEventOperation::State m_state; /* note connection to mi_type */
Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
* else same as in EventImpl * else same as in EventImpl
...@@ -275,6 +305,11 @@ public: ...@@ -275,6 +305,11 @@ public:
int merge_data(const SubTableData * const sdata, int merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3], LinearSectionPtr ptr[3],
EventBufData* data); EventBufData* data);
int get_main_data(Gci_container* bucket,
EventBufData_hash::Pos& hpos,
EventBufData* blob_data);
void add_blob_data(EventBufData* main_data,
EventBufData* blob_data);
void free_list(EventBufData_list &list); void free_list(EventBufData_list &list);
......
...@@ -21,14 +21,7 @@ ...@@ -21,14 +21,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <ndb_version.h> #include <ndb_version.h>
#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0) // version >= 5.1 required
#define version50
#else
#undef version50
#endif
// until rbr in 5.1
#undef version51rbr
#if !defined(min) || !defined(max) #if !defined(min) || !defined(max)
#define min(x, y) ((x) < (y) ? (x) : (y)) #define min(x, y) ((x) < (y) ? (x) : (y))
...@@ -57,11 +50,11 @@ ...@@ -57,11 +50,11 @@
* There are other -no-* options, each added to isolate a specific bug. * There are other -no-* options, each added to isolate a specific bug.
* *
* There are 5 ways (ignoring NUL operand) to compose 2 ops: * There are 5 ways (ignoring NUL operand) to compose 2 ops:
* 5.0 bugs 5.1 bugs *
* INS o DEL = NUL * INS o DEL = NUL
* INS o UPD = INS type=INS * INS o UPD = INS
* DEL o INS = UPD type=INS type=INS * DEL o INS = UPD
* UPD o DEL = DEL no event * UPD o DEL = DEL
* UPD o UPD = UPD * UPD o UPD = UPD
*/ */
...@@ -73,17 +66,19 @@ struct Opts { ...@@ -73,17 +66,19 @@ struct Opts {
uint maxpk; uint maxpk;
my_bool no_blobs; my_bool no_blobs;
my_bool no_implicit_nulls; my_bool no_implicit_nulls;
my_bool no_missing_update;
my_bool no_multiops; my_bool no_multiops;
my_bool no_nulls; my_bool no_nulls;
my_bool one_blob; my_bool one_blob;
const char* opstring; const char* opstring;
uint seed; uint seed;
my_bool separate_events; my_bool separate_events;
uint tweak; // whatever's useful
my_bool use_table; my_bool use_table;
}; };
static Opts g_opts; static Opts g_opts;
static const uint g_maxpk = 100; static const uint g_maxpk = 1000;
static const uint g_maxopstringpart = 100; static const uint g_maxopstringpart = 100;
static const char* g_opstringpart[g_maxopstringpart]; static const char* g_opstringpart[g_maxopstringpart];
static uint g_opstringparts = 0; static uint g_opstringparts = 0;
...@@ -712,6 +707,20 @@ checkop(const Op* op, Uint32& pk1) ...@@ -712,6 +707,20 @@ checkop(const Op* op, Uint32& pk1)
if (! c.nullable) { if (! c.nullable) {
chkrc(ind0 <= 0 && ind1 <= 0); chkrc(ind0 <= 0 && ind1 <= 0);
} }
if (c.isblob()) {
// blob values must be from allowed chars
int j;
for (j = 0; j < 2; j++) {
const Data& d = op->data[j];
if (d.ind[i] == 0) {
const Data::Txt& t = *d.ptr[i].txt;
int k;
for (k = 0; k < t.len; k++) {
chkrc(strchr(g_charval, t.val[k]) != 0);
}
}
}
}
} }
return 0; return 0;
} }
...@@ -849,9 +858,8 @@ createevent() ...@@ -849,9 +858,8 @@ createevent()
const Col& c = g_col[i]; const Col& c = g_col[i];
evt.addEventColumn(c.name); evt.addEventColumn(c.name);
} }
#ifdef version51rbr evt.setReport(NdbDictionary::Event::ER_UPDATED);
evt.mergeEvents(! g_opts.separate_events); evt.mergeEvents(! g_opts.separate_events);
#endif
if (g_dic->getEvent(evt.getName()) != 0) if (g_dic->getEvent(evt.getName()) != 0)
chkdb(g_dic->dropEvent(evt.getName()) == 0); chkdb(g_dic->dropEvent(evt.getName()) == 0);
chkdb(g_dic->createEvent(evt) == 0); chkdb(g_dic->createEvent(evt) == 0);
...@@ -875,14 +883,8 @@ static int ...@@ -875,14 +883,8 @@ static int
createeventop() createeventop()
{ {
ll1("createeventop"); ll1("createeventop");
#ifdef version50
uint bsz = 10 * g_opts.maxops;
chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName(), bsz)) != 0);
#else
chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0); chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
// available in gci merge changeset
g_evt_op->mergeEvents(! g_opts.separate_events); // not yet inherited g_evt_op->mergeEvents(! g_opts.separate_events); // not yet inherited
#endif
uint i; uint i;
for (i = 0; i < ncol(); i++) { for (i = 0; i < ncol(); i++) {
const Col& c = g_col[i]; const Col& c = g_col[i];
...@@ -891,10 +893,8 @@ createeventop() ...@@ -891,10 +893,8 @@ createeventop()
chkdb((g_ev_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0); chkdb((g_ev_ra[0][i] = g_evt_op->getValue(c.name, (char*)d[0].ptr[i].v)) != 0);
chkdb((g_ev_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0); chkdb((g_ev_ra[1][i] = g_evt_op->getPreValue(c.name, (char*)d[1].ptr[i].v)) != 0);
} else { } else {
#ifdef version51rbr
chkdb((g_ev_bh[0][i] = g_evt_op->getBlobHandle(c.name)) != 0); chkdb((g_ev_bh[0][i] = g_evt_op->getBlobHandle(c.name)) != 0);
chkdb((g_ev_bh[1][i] = g_evt_op->getPreBlobHandle(c.name)) != 0); chkdb((g_ev_bh[1][i] = g_evt_op->getPreBlobHandle(c.name)) != 0);
#endif
} }
} }
return 0; return 0;
...@@ -909,10 +909,10 @@ dropeventop() ...@@ -909,10 +909,10 @@ dropeventop()
return 0; return 0;
} }
// wait for event to be installed and for GCIs to pass
static int static int
waitgci() // wait for event to be installed and for at least 1 GCI to pass waitgci(uint ngci)
{ {
const uint ngci = 3;
ll1("waitgci " << ngci); ll1("waitgci " << ngci);
Uint32 gci[2]; Uint32 gci[2];
uint i = 0; uint i = 0;
...@@ -976,7 +976,6 @@ scantab() ...@@ -976,7 +976,6 @@ scantab()
if (! c.isblob()) { if (! c.isblob()) {
ind = ra[i]->isNULL(); ind = ra[i]->isNULL();
} else { } else {
#ifdef version51rbr
int ret; int ret;
ret = bh[i]->getDefined(ind); ret = bh[i]->getDefined(ind);
assert(ret == 0); assert(ret == 0);
...@@ -992,8 +991,10 @@ scantab() ...@@ -992,8 +991,10 @@ scantab()
Uint32 len = t.len; Uint32 len = t.len;
ret = bh[i]->readData(t.val, len); ret = bh[i]->readData(t.val, len);
assert(ret == 0 && len == t.len); assert(ret == 0 && len == t.len);
// to see the data, have to execute...
chkdb(g_con->execute(NoCommit) == 0);
assert(memchr(t.val, 'X', t.len) == 0);
} }
#endif
} }
assert(ind >= 0); assert(ind >= 0);
d0.ind[i] = ind; d0.ind[i] = ind;
...@@ -1042,7 +1043,7 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t) ...@@ -1042,7 +1043,7 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
} else if (t == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) { } else if (t == Op::INS && ! g_opts.no_implicit_nulls && c.nullable && urandom(10, 100)) {
d.noop |= (1 << i); d.noop |= (1 << i);
d.ind[i] = 1; // implicit NULL value is known d.ind[i] = 1; // implicit NULL value is known
} else if (t == Op::UPD && urandom(10, 100)) { } else if (t == Op::UPD && ! g_opts.no_missing_update && urandom(10, 100)) {
d.noop |= (1 << i); d.noop |= (1 << i);
d.ind[i] = -1; // fixed up in caller d.ind[i] = -1; // fixed up in caller
} else if (! g_opts.no_nulls && c.nullable && urandom(10, 100)) { } else if (! g_opts.no_nulls && c.nullable && urandom(10, 100)) {
...@@ -1060,6 +1061,8 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t) ...@@ -1060,6 +1061,8 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
{ {
char* p = d.ptr[i].ch; char* p = d.ptr[i].ch;
uint u = urandom(g_charlen); uint u = urandom(g_charlen);
if (u == 0)
u = urandom(g_charlen); // 2x bias for non-empty
uint j; uint j;
for (j = 0; j < g_charlen; j++) { for (j = 0; j < g_charlen; j++) {
uint v = urandom(strlen(g_charval)); uint v = urandom(strlen(g_charval));
...@@ -1070,10 +1073,19 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t) ...@@ -1070,10 +1073,19 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
case NdbDictionary::Column::Text: case NdbDictionary::Column::Text:
{ {
Data::Txt& t = *d.ptr[i].txt; Data::Txt& t = *d.ptr[i].txt;
delete [] t.val;
t.val = 0;
if (g_opts.tweak & 1) {
uint u = 256 + 2000;
uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval));
t.val = new char [u];
t.len = u;
memset(t.val, g_charval[v], u);
break;
}
uint u = urandom(g_maxblobsize); uint u = urandom(g_maxblobsize);
u = urandom(u); // 4x bias for smaller blobs u = urandom(u); // 4x bias for smaller blobs
u = urandom(u); u = urandom(u);
delete [] t.val;
t.val = new char [u]; t.val = new char [u];
t.len = u; t.len = u;
uint j = 0; uint j = 0;
...@@ -1134,9 +1146,15 @@ makeops() ...@@ -1134,9 +1146,15 @@ makeops()
{ {
ll1("makeops"); ll1("makeops");
Uint32 pk1 = 0; Uint32 pk1 = 0;
while (g_usedops < g_opts.maxops && pk1 < g_opts.maxpk) { while (1) {
if (g_opts.opstring == 0) if (g_opts.opstring == 0) {
if (g_usedops >= g_opts.maxops) // use up ops
break;
pk1 = urandom(g_opts.maxpk); pk1 = urandom(g_opts.maxpk);
} else {
if (pk1 >= g_opts.maxpk) // use up pks
break;
}
ll2("makeops: pk1=" << pk1); ll2("makeops: pk1=" << pk1);
// total op on the pk so far // total op on the pk so far
// optype either NUL=initial/deleted or INS=created // optype either NUL=initial/deleted or INS=created
...@@ -1465,7 +1483,7 @@ matchevent(Op* ev) ...@@ -1465,7 +1483,7 @@ matchevent(Op* ev)
} }
if (tmpok) { if (tmpok) {
ok = gci_op->match = true; ok = gci_op->match = true;
ll2("===: match"); ll2("match");
} }
} }
pos++; pos++;
...@@ -1555,7 +1573,6 @@ geteventdata() ...@@ -1555,7 +1573,6 @@ geteventdata()
NdbRecAttr* ra = g_ev_ra[j][i]; NdbRecAttr* ra = g_ev_ra[j][i];
ind = ra->isNULL(); ind = ra->isNULL();
} else { } else {
#ifdef version51rbr
NdbBlob* bh = g_ev_bh[j][i]; NdbBlob* bh = g_ev_bh[j][i];
ret = bh->getDefined(ind); ret = bh->getDefined(ind);
assert(ret == 0); assert(ret == 0);
...@@ -1572,7 +1589,6 @@ geteventdata() ...@@ -1572,7 +1589,6 @@ geteventdata()
ret = bh->readData(t.val, len); ret = bh->readData(t.val, len);
assert(ret == 0 && len == t.len); assert(ret == 0 && len == t.len);
} }
#endif
} }
d[j].ind[i] = ind; d[j].ind[i] = ind;
} }
...@@ -1585,38 +1601,22 @@ runevents() ...@@ -1585,38 +1601,22 @@ runevents()
ll1("runevents"); ll1("runevents");
uint mspoll = 1000; uint mspoll = 1000;
uint npoll = 6; // strangely long delay uint npoll = 6; // strangely long delay
ll1("poll " << npoll);
while (npoll != 0) { while (npoll != 0) {
npoll--; npoll--;
int ret; int ret;
ll1("poll");
ret = g_ndb->pollEvents(mspoll); ret = g_ndb->pollEvents(mspoll);
if (ret <= 0) if (ret <= 0)
continue; continue;
while (1) { while (1) {
g_rec_ev->init(Op::EV); g_rec_ev->init(Op::EV);
#ifdef version50
int overrun = g_opts.maxops;
chkdb((ret = g_evt_op->next(&overrun)) >= 0);
chkrc(overrun == 0);
if (ret == 0)
break;
#else
NdbEventOperation* tmp_op = g_ndb->nextEvent(); NdbEventOperation* tmp_op = g_ndb->nextEvent();
if (tmp_op == 0) if (tmp_op == 0)
break; break;
reqrc(g_evt_op == tmp_op); reqrc(g_evt_op == tmp_op);
#endif
chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0); chkrc(seteventtype(g_rec_ev, g_evt_op->getEventType()) == 0);
geteventdata(); geteventdata();
g_rec_ev->gci = g_evt_op->getGCI(); g_rec_ev->gci = g_evt_op->getGCI();
#ifdef version50
// fix to match 5.1
if (g_rec_ev->type == Op::UPD) {
Uint32 pk1 = g_rec_ev->data[0].pk1;
makedata(getcol("pk1"), g_rec_ev->data[1], pk1, Op::UPD);
makedata(getcol("pk2"), g_rec_ev->data[1], pk1, Op::UPD);
}
#endif
// get indicators and blob value // get indicators and blob value
ll2("runevents: EVT: " << *g_rec_ev); ll2("runevents: EVT: " << *g_rec_ev);
// check basic sanity // check basic sanity
...@@ -1667,7 +1667,7 @@ runtest() ...@@ -1667,7 +1667,7 @@ runtest()
chkrc(createtable() == 0); chkrc(createtable() == 0);
chkrc(createevent() == 0); chkrc(createevent() == 0);
for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) { for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
ll0("loop " << g_loop); ll0("=== loop " << g_loop << " ===");
setseed(g_loop); setseed(g_loop);
resetmem(); resetmem();
chkrc(scantab() == 0); // alternative: save tot_op for loop > 0 chkrc(scantab() == 0); // alternative: save tot_op for loop > 0
...@@ -1675,7 +1675,7 @@ runtest() ...@@ -1675,7 +1675,7 @@ runtest()
g_rec_ev = getop(Op::EV); g_rec_ev = getop(Op::EV);
chkrc(createeventop() == 0); chkrc(createeventop() == 0);
chkdb(g_evt_op->execute() == 0); chkdb(g_evt_op->execute() == 0);
chkrc(waitgci() == 0); chkrc(waitgci(3) == 0);
chkrc(runops() == 0); chkrc(runops() == 0);
if (! g_opts.separate_events) if (! g_opts.separate_events)
chkrc(mergeops() == 0); chkrc(mergeops() == 0);
...@@ -1685,6 +1685,8 @@ runtest() ...@@ -1685,6 +1685,8 @@ runtest()
chkrc(matchevents() == 0); chkrc(matchevents() == 0);
chkrc(matchops() == 0); chkrc(matchops() == 0);
chkrc(dropeventop() == 0); chkrc(dropeventop() == 0);
// time erases everything..
chkrc(waitgci(1) == 0);
} }
chkrc(dropevent() == 0); chkrc(dropevent() == 0);
chkrc(droptable() == 0); chkrc(droptable() == 0);
...@@ -1703,41 +1705,48 @@ my_long_options[] = ...@@ -1703,41 +1705,48 @@ my_long_options[] =
{ "loglevel", 1002, "Logging level in this program (default 0)", { "loglevel", 1002, "Logging level in this program (default 0)",
(gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0, (gptr*)&g_opts.loglevel, (gptr*)&g_opts.loglevel, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "loop", 1003, "Number of test loops (default 2, 0=forever)", { "loop", 1003, "Number of test loops (default 3, 0=forever)",
(gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0, (gptr*)&g_opts.loop, (gptr*)&g_opts.loop, 0,
GET_INT, REQUIRED_ARG, 2, 0, 0, 0, 0, 0 }, GET_INT, REQUIRED_ARG, 3, 0, 0, 0, 0, 0 },
{ "maxops", 1004, "Approx number of PK operations (default 1000)", { "maxops", 1004, "Approx number of PK operations (default 1000)",
(gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0, (gptr*)&g_opts.maxops, (gptr*)&g_opts.maxops, 0,
GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 }, GET_UINT, REQUIRED_ARG, 1000, 0, 0, 0, 0, 0 },
{ "maxpk", 1005, "Number of different PK values (default 10)", { "maxpk", 1005, "Number of different PK values (default 10)",
(gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0, (gptr*)&g_opts.maxpk, (gptr*)&g_opts.maxpk, 0,
GET_UINT, REQUIRED_ARG, 10, 1, g_maxpk, 0, 0, 0 }, GET_UINT, REQUIRED_ARG, 10, 0, 0, 0, 0, 0 },
{ "no-blobs", 1006, "Omit blob attributes (5.0: true)", { "no-blobs", 1006, "Omit blob attributes (5.0: true)",
(gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0, (gptr*)&g_opts.no_blobs, (gptr*)&g_opts.no_blobs, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-implicit-nulls", 1007, "Insert must include NULL values explicitly", { "no-implicit-nulls", 1007, "Insert must include all attrs"
" i.e. no implicit NULLs",
(gptr*)&g_opts.no_implicit_nulls, (gptr*)&g_opts.no_implicit_nulls, 0, (gptr*)&g_opts.no_implicit_nulls, (gptr*)&g_opts.no_implicit_nulls, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-multiops", 1008, "Allow only 1 operation per commit", { "no-missing-update", 1008, "Update must include all non-PK attrs",
(gptr*)&g_opts.no_missing_update, (gptr*)&g_opts.no_missing_update, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-multiops", 1009, "Allow only 1 operation per commit",
(gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0, (gptr*)&g_opts.no_multiops, (gptr*)&g_opts.no_multiops, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "no-nulls", 1009, "Create no NULL values", { "no-nulls", 1010, "Create no NULL values",
(gptr*)&g_opts.no_nulls, (gptr*)&g_opts.no_nulls, 0, (gptr*)&g_opts.no_nulls, (gptr*)&g_opts.no_nulls, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "one-blob", 1010, "Only one blob attribute (defautt 2)", { "one-blob", 1011, "Only one blob attribute (default 2)",
(gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0, (gptr*)&g_opts.one_blob, (gptr*)&g_opts.one_blob, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "opstring", 1011, "Operations to run e.g. idiucdc (c is commit) or" { "opstring", 1012, "Operations to run e.g. idiucdc (c is commit) or"
" iuuc:uudc (the : separates loops)", " iuuc:uudc (the : separates loops)",
(gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0, (gptr*)&g_opts.opstring, (gptr*)&g_opts.opstring, 0,
GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "seed", 1012, "Random seed (0=loop number, default -1=random)", { "seed", 1013, "Random seed (0=loop number, default -1=random)",
(gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0, (gptr*)&g_opts.seed, (gptr*)&g_opts.seed, 0,
GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 }, GET_INT, REQUIRED_ARG, -1, 0, 0, 0, 0, 0 },
{ "separate-events", 1013, "Do not combine events per GCI (5.0: true)", { "separate-events", 1014, "Do not combine events per GCI (5.0: true)",
(gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0, (gptr*)&g_opts.separate_events, (gptr*)&g_opts.separate_events, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "use-table", 1014, "Use existing table 'tem1'", { "tweak", 1015, "Whatever the source says",
(gptr*)&g_opts.tweak, (gptr*)&g_opts.tweak, 0,
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "use-table", 1016, "Use existing table 'tem1'",
(gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0, (gptr*)&g_opts.use_table, (gptr*)&g_opts.use_table, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, { 0, 0, 0,
...@@ -1754,9 +1763,10 @@ usage() ...@@ -1754,9 +1763,10 @@ usage()
static int static int
checkopts() checkopts()
{ {
#ifdef version50 if (g_opts.maxpk > g_maxpk) {
g_opts.separate_events = true; ll0("setting maxpk to " << g_maxpk);
#endif g_opts.maxpk = g_maxpk;
}
if (g_opts.separate_events) { if (g_opts.separate_events) {
g_opts.no_blobs = true; g_opts.no_blobs = true;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment