Commit 08ea984e authored by unknown's avatar unknown

ndb - wl#2972 part 1: merge events per event op, gci

storage/ndb/include/ndbapi/NdbDictionary.hpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/include/ndbapi/NdbEventOperation.hpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/ndbapi-examples/ndbapi_event/Makefile:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/src/ndbapi/NdbEventOperation.cpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp:
  wl#2972 part 1: merge events per event op, gci
storage/ndb/test/ndbapi/test_event.cpp:
  wl#2972 part 1: merge events per event op, gci
parent bf0a3bfc
Branches unavailable
Tags unavailable
No related merge requests found
...@@ -1027,7 +1027,8 @@ public: ...@@ -1027,7 +1027,8 @@ public:
_TE_CREATE=6, _TE_CREATE=6,
_TE_GCP_COMPLETE=7, _TE_GCP_COMPLETE=7,
_TE_CLUSTER_FAILURE=8, _TE_CLUSTER_FAILURE=8,
_TE_STOP=9 _TE_STOP=9,
_TE_NUL=10 // internal (INS o DEL within same GCI)
}; };
#endif #endif
/** /**
......
...@@ -93,6 +93,12 @@ public: ...@@ -93,6 +93,12 @@ public:
* Retrieve current state of the NdbEventOperation object * Retrieve current state of the NdbEventOperation object
*/ */
State getState(); State getState();
/**
* By default events on same NdbEventOperation within same GCI
* are merged into a single event. This can be changed with
* separateEvents(true).
*/
void separateEvents(bool flag);
/** /**
* Activates the NdbEventOperation to start receiving events. The * Activates the NdbEventOperation to start receiving events. The
......
TARGET = ndbapi_event TARGET = ndbapi_event
SRCS = ndbapi_event.cpp SRCS = ndbapi_event.cpp
OBJS = ndbapi_event.o OBJS = ndbapi_event.o
CXX = g++ CXX = g++ -g
CFLAGS = -c -Wall -fno-rtti -fno-exceptions CFLAGS = -c -Wall -fno-rtti -fno-exceptions
CXXFLAGS = CXXFLAGS =
DEBUG = DEBUG =
...@@ -17,7 +17,7 @@ $(TARGET): $(OBJS) ...@@ -17,7 +17,7 @@ $(TARGET): $(OBJS)
$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
$(TARGET).o: $(SRCS) $(TARGET).o: $(SRCS)
$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
clean: clean:
rm -f *.o $(TARGET) rm -f *.o $(TARGET)
...@@ -58,24 +58,29 @@ ...@@ -58,24 +58,29 @@
/** /**
* *
* Assume that there is a table TAB0 which is being updated by * Assume that there is a table t0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables). * another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns COL0, COL2, COL11 * We want to monitor what happens with columns c0,c1,c2,c3.
* *
* or together with the mysql client; * or together with the mysql client;
* *
* shell> mysql -u root * shell> mysql -u root
* mysql> create database TEST_DB; * mysql> create database TEST_DB;
* mysql> use TEST_DB; * mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb; * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* primary key(c0, c2)) engine ndb charset latin1;
* *
* In another window start ndbapi_event, wait until properly started * In another window start ndbapi_event, wait until properly started
* *
insert into TAB0 values (1,2,3); insert into t0 values (1, 2, 'a', 'b');
insert into TAB0 values (2,2,3); insert into t0 values (3, 4, 'c', 'd');
insert into TAB0 values (3,2,9); update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update TAB0 set COL1=10 where COL0=1; update t0 set c3 = 'f'; -- use scan
delete from TAB0 where COL0=1; update t0 set c3 = 'F'; -- use scan update to 'same'
update t0 set c2 = 'g' where c0 = 1; -- update pk part
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
* *
* you should see the data popping up in the example window * you should see the data popping up in the example window
* *
...@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb, ...@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb,
const char **eventColumnName, const char **eventColumnName,
const int noEventColumnName); const int noEventColumnName);
int main() int main(int argc, char** argv)
{ {
ndb_init(); ndb_init();
bool sep = argc > 1 && strcmp(argv[1], "-s") == 0;
Ndb_cluster_connection *cluster_connection= Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster new Ndb_cluster_connection(); // Object representing the cluster
...@@ -126,13 +132,15 @@ int main() ...@@ -126,13 +132,15 @@ int main()
if (myNdb->init() == -1) APIERROR(myNdb->getNdbError()); if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
const char *eventName= "CHNG_IN_TAB0"; const char *eventName= "CHNG_IN_t0";
const char *eventTableName= "TAB0"; const char *eventTableName= "t0";
const int noEventColumnName= 3; const int noEventColumnName= 4;
const char *eventColumnName[noEventColumnName]= const char *eventColumnName[noEventColumnName]=
{"COL0", {"c0",
"COL1", "c1",
"COL11"}; "c2",
"c3"
};
// Create events // Create events
myCreateEvent(myNdb, myCreateEvent(myNdb,
...@@ -142,13 +150,14 @@ int main() ...@@ -142,13 +150,14 @@ int main()
noEventColumnName); noEventColumnName);
int j= 0; int j= 0;
while (j < 5) { while (j < 99) {
// Start "transaction" for handling events // Start "transaction" for handling events
NdbEventOperation* op; NdbEventOperation* op;
printf("create EventOperation\n"); printf("create EventOperation\n");
if ((op = myNdb->createEventOperation(eventName)) == NULL) if ((op = myNdb->createEventOperation(eventName)) == NULL)
APIERROR(myNdb->getNdbError()); APIERROR(myNdb->getNdbError());
op->separateEvents(sep);
printf("get values\n"); printf("get values\n");
NdbRecAttr* recAttr[noEventColumnName]; NdbRecAttr* recAttr[noEventColumnName];
...@@ -175,32 +184,43 @@ int main() ...@@ -175,32 +184,43 @@ int main()
i++; i++;
switch (op->getEventType()) { switch (op->getEventType()) {
case NdbDictionary::Event::TE_INSERT: case NdbDictionary::Event::TE_INSERT:
printf("%u INSERT: ", i); printf("%u INSERT", i);
break; break;
case NdbDictionary::Event::TE_DELETE: case NdbDictionary::Event::TE_DELETE:
printf("%u DELETE: ", i); printf("%u DELETE", i);
break; break;
case NdbDictionary::Event::TE_UPDATE: case NdbDictionary::Event::TE_UPDATE:
printf("%u UPDATE: ", i); printf("%u UPDATE", i);
break; break;
default: default:
abort(); // should not happen abort(); // should not happen
} }
for (int i = 1; i < noEventColumnName; i++) { printf(" gci=%d\n", op->getGCI());
printf("post: ");
for (int i = 0; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value if (recAttr[i]->isNULL() >= 0) { // we have a value
printf(" post[%u]=", i); if (recAttr[i]->isNULL() == 0) { // we have a non-null value
if (recAttr[i]->isNULL() == 0) // we have a non-null value if (i < 2)
printf("%u", recAttr[i]->u_32_value()); printf("%-5u", recAttr[i]->u_32_value());
else // we have a null value else
printf("NULL"); printf("%-5.4s", recAttr[i]->aRef());
} else // we have a null value
printf("%-5s", "NULL");
} else
printf("%-5s", "-");
} }
printf("\npre : ");
for (int i = 0; i < noEventColumnName; i++) {
if (recAttrPre[i]->isNULL() >= 0) { // we have a value if (recAttrPre[i]->isNULL() >= 0) { // we have a value
printf(" pre[%u]=", i); if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
if (recAttrPre[i]->isNULL() == 0) // we have a non-null value if (i < 2)
printf("%u", recAttrPre[i]->u_32_value()); printf("%-5u", recAttrPre[i]->u_32_value());
else // we have a null value else
printf("NULL"); printf("%-5.4s", recAttrPre[i]->aRef());
} } else // we have a null value
printf("%-5s", "NULL");
} else
printf("%-5s", "-");
} }
printf("\n"); printf("\n");
} }
......
...@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState() ...@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState()
return m_impl.getState(); return m_impl.getState();
} }
void NdbEventOperation::separateEvents(bool flag)
{
m_impl.m_separateEvents = flag;
}
NdbRecAttr * NdbRecAttr *
NdbEventOperation::getValue(const char *colName, char *aValue) NdbEventOperation::getValue(const char *colName, char *aValue)
{ {
......
...@@ -25,16 +25,19 @@ ...@@ -25,16 +25,19 @@
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
class NdbEventOperationImpl; class NdbEventOperationImpl;
struct EventBufData struct EventBufData
{ {
union { union {
SubTableData *sdata; SubTableData *sdata;
char *memory; Uint32 *memory;
}; };
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
unsigned sz; unsigned sz;
NdbEventOperationImpl *m_event_op; NdbEventOperationImpl *m_event_op;
EventBufData *m_next; // Next wrt to global order EventBufData *m_next; // Next wrt to global order
EventBufData *m_next_hash; // Next in per-GCI hash
Uint32 m_pkhash; // PK hash (without op) for fast compare
}; };
class EventBufData_list class EventBufData_list
...@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list) ...@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list)
m_sz+= list.m_sz; m_sz+= list.m_sz;
} }
// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class EventBufData_hash
{
public:
struct Pos { // search result
Uint32 index; // index into hash array
EventBufData* data; // non-zero if found
Uint32 pkhash; // PK hash
};
static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);
void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
void append(Pos& hpos, EventBufData* data);
enum { GCI_EVENT_HASH_SIZE = 101 };
EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
};
inline
void EventBufData_hash::append(Pos& hpos, EventBufData* data)
{
data->m_next_hash = m_hash[hpos.index];
m_hash[hpos.index] = data;
}
struct Gci_container struct Gci_container
{ {
enum State enum State
...@@ -127,6 +158,7 @@ struct Gci_container ...@@ -127,6 +158,7 @@ struct Gci_container
Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
Uint64 m_gci; // GCI Uint64 m_gci; // GCI
EventBufData_list m_data; EventBufData_list m_data;
EventBufData_hash m_data_hash;
}; };
class NdbEventOperationImpl : public NdbEventOperation { class NdbEventOperationImpl : public NdbEventOperation {
...@@ -174,6 +206,8 @@ public: ...@@ -174,6 +206,8 @@ public:
Uint32 m_eventId; Uint32 m_eventId;
Uint32 m_oid; Uint32 m_oid;
bool m_separateEvents;
EventBufData *m_data_item; EventBufData *m_data_item;
void *m_custom_data; void *m_custom_data;
...@@ -212,7 +246,6 @@ public: ...@@ -212,7 +246,6 @@ public:
void add_op(); void add_op();
void remove_op(); void remove_op();
void init_gci_containers(); void init_gci_containers();
Uint32 m_active_op_count;
// accessed from the "receive thread" // accessed from the "receive thread"
int insertDataL(NdbEventOperationImpl *op, int insertDataL(NdbEventOperationImpl *op,
...@@ -233,10 +266,15 @@ public: ...@@ -233,10 +266,15 @@ public:
NdbEventOperationImpl *move_data(); NdbEventOperationImpl *move_data();
// used by both user thread and receive thread // routines to copy/merge events
int copy_data_alloc(const SubTableData * const f_sdata, EventBufData* alloc_data();
LinearSectionPtr f_ptr[3], int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]);
EventBufData *ev_buf); int copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data);
int merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data);
void free_list(EventBufData_list &list); void free_list(EventBufData_list &list);
...@@ -290,6 +328,8 @@ private: ...@@ -290,6 +328,8 @@ private:
// dropped event operations that have not yet // dropped event operations that have not yet
// been deleted // been deleted
NdbEventOperationImpl *m_dropped_ev_op; NdbEventOperationImpl *m_dropped_ev_op;
Uint32 m_active_op_count;
}; };
inline inline
......
...@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec ...@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec
g_err << function << "Event operation creation failed\n"; g_err << function << "Event operation creation failed\n";
return NDBT_FAILED; return NDBT_FAILED;
} }
pOp->separateEvents(true);
g_info << function << "get values\n"; g_info << function << "get values\n";
NdbRecAttr* recAttr[1024]; NdbRecAttr* recAttr[1024];
...@@ -370,6 +371,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step) ...@@ -370,6 +371,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
g_err << "Event operation creation failed\n"; g_err << "Event operation creation failed\n";
return NDBT_FAILED; return NDBT_FAILED;
} }
pOp->separateEvents(true);
g_info << "dropping event operation" << endl; g_info << "dropping event operation" << endl;
int res = pNdb->dropEventOperation(pOp); int res = pNdb->dropEventOperation(pOp);
...@@ -540,6 +542,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) ...@@ -540,6 +542,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
g_err << "Event operation creation failed on %s" << buf << endl; g_err << "Event operation creation failed on %s" << buf << endl;
DBUG_RETURN(NDBT_FAILED); DBUG_RETURN(NDBT_FAILED);
} }
pOp->separateEvents(true);
int i; int i;
int n_columns= table->getNoOfColumns(); int n_columns= table->getNoOfColumns();
...@@ -1185,6 +1188,7 @@ static int createEventOperations(Ndb * ndb) ...@@ -1185,6 +1188,7 @@ static int createEventOperations(Ndb * ndb)
{ {
DBUG_RETURN(NDBT_FAILED); DBUG_RETURN(NDBT_FAILED);
} }
pOp->separateEvents(true);
int n_columns= pTabs[i]->getNoOfColumns(); int n_columns= pTabs[i]->getNoOfColumns();
for (int j = 0; j < n_columns; j++) for (int j = 0; j < n_columns; j++)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment