Commit 451b54e5 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0-ndb-wl2278
parents d32c3d69 d9ed8141
...@@ -610,6 +610,19 @@ Suma::execSIGNAL_DROPPED_REP(Signal* signal){ ...@@ -610,6 +610,19 @@ Suma::execSIGNAL_DROPPED_REP(Signal* signal){
* *
*/ */
static unsigned
count_subscribers(const DLList<SumaParticipant::Subscriber> &subs)
{
unsigned n= 0;
SumaParticipant::SubscriberPtr i_subbPtr;
subs.first(i_subbPtr);
while(!i_subbPtr.isNull()){
n++;
subs.next(i_subbPtr);
}
return n;
}
void void
Suma::execDUMP_STATE_ORD(Signal* signal){ Suma::execDUMP_STATE_ORD(Signal* signal){
jamEntry(); jamEntry();
...@@ -664,6 +677,15 @@ Suma::execDUMP_STATE_ORD(Signal* signal){ ...@@ -664,6 +677,15 @@ Suma::execDUMP_STATE_ORD(Signal* signal){
infoEvent("Suma: c_dataBufferPool size: %d free: %d", infoEvent("Suma: c_dataBufferPool size: %d free: %d",
c_dataBufferPool.getSize(), c_dataBufferPool.getSize(),
c_dataBufferPool.getNoOfFree()); c_dataBufferPool.getNoOfFree());
infoEvent("Suma: c_metaSubscribers count: %d",
count_subscribers(c_metaSubscribers));
infoEvent("Suma: c_dataSubscribers count: %d",
count_subscribers(c_dataSubscribers));
infoEvent("Suma: c_prepDataSubscribers count: %d",
count_subscribers(c_prepDataSubscribers));
infoEvent("Suma: c_removeDataSubscribers count: %d",
count_subscribers(c_removeDataSubscribers));
} }
} }
......
...@@ -2530,6 +2530,7 @@ int ...@@ -2530,6 +2530,7 @@ int
NdbDictInterface::executeSubscribeEvent(class Ndb & ndb, NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
NdbEventImpl & evnt) NdbEventImpl & evnt)
{ {
DBUG_ENTER("NdbDictInterface::executeSubscribeEvent");
NdbApiSignal tSignal(m_reference); NdbApiSignal tSignal(m_reference);
// tSignal.theReceiversBlockNumber = SUMA; // tSignal.theReceiversBlockNumber = SUMA;
tSignal.theReceiversBlockNumber = DBDICT; tSignal.theReceiversBlockNumber = DBDICT;
...@@ -2544,7 +2545,7 @@ NdbDictInterface::executeSubscribeEvent(class Ndb & ndb, ...@@ -2544,7 +2545,7 @@ NdbDictInterface::executeSubscribeEvent(class Ndb & ndb,
sumaStart->subscriberData = evnt.m_bufferId & 0xFF; sumaStart->subscriberData = evnt.m_bufferId & 0xFF;
sumaStart->subscriberRef = m_reference; sumaStart->subscriberRef = m_reference;
return executeSubscribeEvent(&tSignal, NULL); DBUG_RETURN(executeSubscribeEvent(&tSignal, NULL));
} }
int int
......
...@@ -92,10 +92,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -92,10 +92,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
NdbEventOperationImpl::~NdbEventOperationImpl() NdbEventOperationImpl::~NdbEventOperationImpl()
{ {
int i; int i;
if (sdata) NdbMem_Free(sdata); if (sdata) NdbMem_Free((char*)sdata);
for (i=0 ; i<3; i++) {
if (ptr[i].p) NdbMem_Free(ptr[i].p);
}
for (i=0 ; i<2; i++) { for (i=0 ; i<2; i++) {
NdbRecAttr *p = theFirstRecAttrs[i]; NdbRecAttr *p = theFirstRecAttrs[i];
while (p) { while (p) {
...@@ -856,39 +853,46 @@ void ...@@ -856,39 +853,46 @@ void
NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
int MAX_NUMBER_ACTIVE_EVENTS) int MAX_NUMBER_ACTIVE_EVENTS)
{ {
if (m_handlers.size() == 0) { // First init DBUG_ENTER("NdbGlobalEventBuffer::real_init");
DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
if (m_handlers.size() == 0)
{ // First init
DBUG_PRINT("info",("first to come"));
m_max = MAX_NUMBER_ACTIVE_EVENTS; m_max = MAX_NUMBER_ACTIVE_EVENTS;
m_buf = new BufItem[m_max]; m_buf = new BufItem[m_max];
// (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem));
for (int i=0; i<m_max; i++) { for (int i=0; i<m_max; i++) {
m_buf[i].gId= 0; m_buf[i].gId= 0;
} }
} }
assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);
// TODO make sure we don't hit roof // TODO make sure we don't hit roof
// m_handlers[m_nhandlers] = h;
m_handlers.push_back(h); m_handlers.push_back(h);
// ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h); DBUG_VOID_RETURN;
} }
void void
NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h) NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
{ {
// ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h); DBUG_ENTER("NdbGlobalEventBuffer::real_remove");
for (Uint32 i=0 ; i < m_handlers.size(); i++) { DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
// ndbout_c("%u %u %u", i, m_handlers[i], h); for (Uint32 i=0 ; i < m_handlers.size(); i++)
if (m_handlers[i] == h) { {
DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i]));
if (m_handlers[i] == h)
{
m_handlers.erase(i); m_handlers.erase(i);
if (m_handlers.size() == 0) { if (m_handlers.size() == 0)
// ndbout_c("last to go"); {
DBUG_PRINT("info",("last to go"));
delete[] m_buf; delete[] m_buf;
m_buf = NULL; m_buf = NULL;
// NdbMem_Free((char*)m_buf);
} }
return; DBUG_VOID_RETURN;
} }
} }
ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle"); ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle");
exit(-1); DBUG_PRINT("error",("non-existing handle"));
abort();
DBUG_VOID_RETURN;
} }
int int
...@@ -1231,6 +1235,9 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId, ...@@ -1231,6 +1235,9 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
DBUG_RETURN(0); // nothing to get DBUG_RETURN(0); // nothing to get
} }
DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d",
ID(bufferId), NO(bufferId), e.b));
if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr, if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,
sdata, ptr)) sdata, ptr))
{ {
...@@ -1255,26 +1262,29 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, ...@@ -1255,26 +1262,29 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr t_ptr[3]) LinearSectionPtr t_ptr[3])
{ {
DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc"); DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc");
if (t_sdata == NULL) { unsigned sz4= (sizeof(SubTableData)+3)>>2;
t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData)); Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 +
} f_ptr[0].sz +
f_ptr[1].sz +
f_ptr[2].sz) * sizeof(Uint32));
if (t_sdata)
NdbMem_Free((char*)t_sdata);
t_sdata= (SubTableData *)ptr;
memcpy(t_sdata,f_sdata,sizeof(SubTableData)); memcpy(t_sdata,f_sdata,sizeof(SubTableData));
ptr+= sz4;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
LinearSectionPtr & f_p = f_ptr[i]; LinearSectionPtr & f_p = f_ptr[i];
LinearSectionPtr & t_p = t_ptr[i]; LinearSectionPtr & t_p = t_ptr[i];
if (f_p.sz > 0) { if (f_p.sz > 0) {
if (t_p.p == NULL) { t_p.p= (Uint32 *)ptr;
t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
} else if (t_p.sz != f_p.sz) {
NdbMem_Free(t_p.p);
t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
}
memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz); memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);
} else if (t_p.p != NULL) { ptr+= f_p.sz;
NdbMem_Free(t_p.p); t_p.sz= f_p.sz;
t_p.p = NULL; } else {
t_p.p= NULL;
t_p.sz= 0;
} }
t_p.sz = f_p.sz;
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -83,7 +83,7 @@ int runEventOperation(NDBT_Context* ctx, NDBT_Step* step) ...@@ -83,7 +83,7 @@ int runEventOperation(NDBT_Context* ctx, NDBT_Step* step)
EventOperationStats stats; EventOperationStats stats;
g_info << "***** Id " << tId << endl; g_info << "***** start Id " << tId << endl;
// sleep(tId); // sleep(tId);
...@@ -102,12 +102,13 @@ int runEventOperation(NDBT_Context* ctx, NDBT_Step* step) ...@@ -102,12 +102,13 @@ int runEventOperation(NDBT_Context* ctx, NDBT_Step* step)
ret = NDBT_FAILED; ret = NDBT_FAILED;
if (ret == NDBT_FAILED) { if (ret == NDBT_FAILED) {
ndbout << "n_inserts = " << stats.n_inserts << endl; g_info << "***** end Id " << tId << endl;
ndbout << "n_deletes = " << stats.n_deletes << endl; ndbout_c("n_inserts = %d (%d)", stats.n_inserts, records);
ndbout << "n_updates = " << stats.n_updates << endl; ndbout_c("n_deletes = %d (%d)", stats.n_deletes, records);
ndbout << "n_consecutive = " << stats.n_consecutive << endl; ndbout_c("n_updates = %d (%d)", stats.n_updates, records);
ndbout << "n_duplicates = " << stats.n_duplicates << endl; ndbout_c("n_consecutive = %d (%d)", stats.n_consecutive, 3);
ndbout << "n_inconsistent_gcis = " << stats.n_inconsistent_gcis << endl; ndbout_c("n_duplicates = %d (%d)", stats.n_duplicates, 0);
ndbout_c("n_inconsistent_gcis = %d (%d)", stats.n_inconsistent_gcis, 0);
} }
return ret; return ret;
...@@ -156,9 +157,6 @@ TESTCASE("BasicEventOperation", ...@@ -156,9 +157,6 @@ TESTCASE("BasicEventOperation",
"NOTE! No errors are allowed!" ){ "NOTE! No errors are allowed!" ){
INITIALIZER(runCreateEvent); INITIALIZER(runCreateEvent);
STEP(runEventOperation); STEP(runEventOperation);
STEP(runEventOperation);
STEP(runEventOperation);
STEP(runEventOperation);
STEP(runEventLoad); STEP(runEventLoad);
FINALIZER(runDropEvent); FINALIZER(runDropEvent);
} }
...@@ -169,19 +167,16 @@ TESTCASE("CreateDropEventOperation", ...@@ -169,19 +167,16 @@ TESTCASE("CreateDropEventOperation",
STEP(runCreateDropEventOperation); STEP(runCreateDropEventOperation);
FINALIZER(runDropEvent); FINALIZER(runDropEvent);
} }
NDBT_TESTSUITE_END(test_event);
#if 0
NDBT_TESTSUITE(test_event);
TESTCASE("ParallellEventOperation", TESTCASE("ParallellEventOperation",
"Verify that we can listen to Events in Parallell" "Verify that we can listen to Events in parallell"
"NOTE! No errors are allowed!" ){ "NOTE! No errors are allowed!" ){
INITIALIZER(runCreateAllEvent); INITIALIZER(runCreateEvent);
STEP(runEventOperation);
STEP(runEventOperation); STEP(runEventOperation);
STEP(runEventLoad);
FINALIZER(runDropEvent); FINALIZER(runDropEvent);
} }
NDBT_TESTSUITE_END(test_event); NDBT_TESTSUITE_END(test_event);
#endif
int main(int argc, const char** argv){ int main(int argc, const char** argv){
ndb_init(); ndb_init();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment