ndb - bug#21754 ndb api events: fix sizes for blobs + remove expensive loop

parent 454d5454
...@@ -65,6 +65,39 @@ print_std(const SubTableData * sdata, LinearSectionPtr ptr[3]) ...@@ -65,6 +65,39 @@ print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
} }
#endif #endif
// EventBufData
Uint32
EventBufData::get_blob_part_no() const
{
assert(ptr[0].sz > 2);
Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() +
AttributeHeader(ptr[0].p[1]).getDataSize();
Uint32 no = ptr[1].p[pos];
return no;
}
void
EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const
{
Uint32 tmp_count = 0;
Uint32 tmp_sz = 0;
const EventBufData* data2 = m_next_blob;
while (data2 != 0) {
tmp_count++;
tmp_sz += data2->sz;
const EventBufData* data3 = data2->m_next;
while (data3 != 0) {
tmp_count++;
tmp_sz += data3->sz;
data3 = data3->m_next;
}
data2 = data2->m_next_blob;
}
full_count += tmp_count;
full_sz += tmp_sz;
}
/* /*
* Class NdbEventOperationImpl * Class NdbEventOperationImpl
* *
...@@ -1162,11 +1195,12 @@ NdbEventBuffer::nextEvent() ...@@ -1162,11 +1195,12 @@ NdbEventBuffer::nextEvent()
// set NdbEventOperation data // set NdbEventOperation data
op->m_data_item= data; op->m_data_item= data;
// remove item from m_available_data // remove item from m_available_data and return size
m_available_data.remove_first(); Uint32 full_count, full_sz;
m_available_data.remove_first(full_count, full_sz);
// add it to used list // add it to used list
m_used_data.append_used_data(data); m_used_data.append_used_data(data, full_count, full_sz);
#ifdef VM_TRACE #ifdef VM_TRACE
op->m_data_done_count++; op->m_data_done_count++;
...@@ -1840,7 +1874,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1840,7 +1874,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
op->m_has_error = 2; op->m_has_error = 2;
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
} }
if (unlikely(copy_data(sdata, ptr, data))) if (unlikely(copy_data(sdata, ptr, data, NULL)))
{ {
op->m_has_error = 3; op->m_has_error = 3;
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
...@@ -1872,7 +1906,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1872,7 +1906,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
} }
} }
// link blob event under main event // link blob event under main event
add_blob_data(main_data, data); add_blob_data(bucket, main_data, data);
} }
if (use_hash) if (use_hash)
{ {
...@@ -1886,7 +1920,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1886,7 +1920,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
else else
{ {
// event with same op, PK found, merge into old buffer // event with same op, PK found, merge into old buffer
if (unlikely(merge_data(sdata, ptr, data))) if (unlikely(merge_data(sdata, ptr, data, & bucket->m_data.m_sz)))
{ {
op->m_has_error = 3; op->m_has_error = 3;
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
...@@ -1909,6 +1943,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1909,6 +1943,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
} }
} }
} }
#ifdef NDB_EVENT_VERIFY_SIZE
verify_size(bucket->m_data);
#endif
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
...@@ -1962,8 +1999,21 @@ NdbEventBuffer::alloc_data() ...@@ -1962,8 +1999,21 @@ NdbEventBuffer::alloc_data()
} }
// remove data from free list // remove data from free list
m_free_data = data->m_next; if (data->m_next_blob == 0)
m_free_data = data->m_next;
else {
EventBufData* data2 = data->m_next_blob;
if (data2->m_next == 0) {
data->m_next_blob = data2->m_next_blob;
data = data2;
} else {
EventBufData* data3 = data2->m_next;
data2->m_next = data3->m_next;
data = data3;
}
}
data->m_next = 0; data->m_next = 0;
data->m_next_blob = 0;
#ifdef VM_TRACE #ifdef VM_TRACE
m_free_data_count--; m_free_data_count--;
assert(m_free_data_sz >= data->sz); assert(m_free_data_sz >= data->sz);
...@@ -1975,7 +2025,9 @@ NdbEventBuffer::alloc_data() ...@@ -1975,7 +2025,9 @@ NdbEventBuffer::alloc_data()
// allocate initial or bigger memory area in EventBufData // allocate initial or bigger memory area in EventBufData
// takes sizes from given ptr and sets up data->ptr // takes sizes from given ptr and sets up data->ptr
int int
NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) NdbEventBuffer::alloc_mem(EventBufData* data,
LinearSectionPtr ptr[3],
Uint32 * change_sz)
{ {
DBUG_ENTER("NdbEventBuffer::alloc_mem"); DBUG_ENTER("NdbEventBuffer::alloc_mem");
DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz)); DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz));
...@@ -1988,6 +2040,8 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) ...@@ -1988,6 +2040,8 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
if (data->sz < alloc_size) if (data->sz < alloc_size)
{ {
Uint32 add_sz = alloc_size - data->sz;
NdbMem_Free((char*)data->memory); NdbMem_Free((char*)data->memory);
assert(m_total_alloc >= data->sz); assert(m_total_alloc >= data->sz);
m_total_alloc -= data->sz; m_total_alloc -= data->sz;
...@@ -1999,6 +2053,9 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) ...@@ -1999,6 +2053,9 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
DBUG_RETURN(-1); DBUG_RETURN(-1);
data->sz = alloc_size; data->sz = alloc_size;
m_total_alloc += data->sz; m_total_alloc += data->sz;
if (change_sz != NULL)
*change_sz += add_sz;
} }
Uint32* memptr = data->memory; Uint32* memptr = data->memory;
...@@ -2014,14 +2071,30 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) ...@@ -2014,14 +2071,30 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void
NdbEventBuffer::dealloc_mem(EventBufData* data,
Uint32 * change_sz)
{
NdbMem_Free((char*)data->memory);
assert(m_total_alloc >= data->sz);
m_total_alloc -= data->sz;
if (change_sz != NULL) {
assert(*change_sz >= data->sz);
*change_sz -= data->sz;
}
data->memory = 0;
data->sz = 0;
}
int int
NdbEventBuffer::copy_data(const SubTableData * const sdata, NdbEventBuffer::copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3], LinearSectionPtr ptr[3],
EventBufData* data) EventBufData* data,
Uint32 * change_sz)
{ {
DBUG_ENTER_EVENT("NdbEventBuffer::copy_data"); DBUG_ENTER_EVENT("NdbEventBuffer::copy_data");
if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0)
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
memcpy(data->sdata, sdata, sizeof(SubTableData)); memcpy(data->sdata, sdata, sizeof(SubTableData));
int i; int i;
...@@ -2093,7 +2166,8 @@ copy_attr(AttributeHeader ah, ...@@ -2093,7 +2166,8 @@ copy_attr(AttributeHeader ah,
int int
NdbEventBuffer::merge_data(const SubTableData * const sdata, NdbEventBuffer::merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr2[3], LinearSectionPtr ptr2[3],
EventBufData* data) EventBufData* data,
Uint32 * change_sz)
{ {
DBUG_ENTER_EVENT("NdbEventBuffer::merge_data"); DBUG_ENTER_EVENT("NdbEventBuffer::merge_data");
...@@ -2102,7 +2176,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -2102,7 +2176,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
int t1 = SubTableData::getOperation(data->sdata->requestInfo); int t1 = SubTableData::getOperation(data->sdata->requestInfo);
int t2 = SubTableData::getOperation(sdata->requestInfo); int t2 = SubTableData::getOperation(sdata->requestInfo);
if (t1 == Ev_t::enum_NUL) if (t1 == Ev_t::enum_NUL)
DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data)); DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data, change_sz));
Ev_t* tp = 0; Ev_t* tp = 0;
int i; int i;
...@@ -2142,6 +2216,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -2142,6 +2216,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
// TODO: use old data items, avoid malloc/free on each merge
// save old data // save old data
EventBufData olddata = *data; EventBufData olddata = *data;
data->memory = 0; data->memory = 0;
...@@ -2158,7 +2234,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -2158,7 +2234,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
{ {
if (loop == 1) if (loop == 1)
{ {
if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0)
{ {
result = -1; result = -1;
goto end; goto end;
...@@ -2277,11 +2353,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, ...@@ -2277,11 +2353,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
} }
end: end:
// free old data dealloc_mem(&olddata, change_sz);
NdbMem_Free((char*)olddata.memory);
assert(m_total_alloc >= olddata.sz);
m_total_alloc -= olddata.sz;
DBUG_RETURN_EVENT(result); DBUG_RETURN_EVENT(result);
} }
...@@ -2357,7 +2429,7 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, ...@@ -2357,7 +2429,7 @@ NdbEventBuffer::get_main_data(Gci_container* bucket,
SubTableData sdata = *blob_data->sdata; SubTableData sdata = *blob_data->sdata;
sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id; sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL); SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
if (copy_data(&sdata, ptr, main_data) != 0) if (copy_data(&sdata, ptr, main_data, NULL) != 0)
DBUG_RETURN_EVENT(-1); DBUG_RETURN_EVENT(-1);
hpos.data = main_data; hpos.data = main_data;
...@@ -2365,7 +2437,8 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, ...@@ -2365,7 +2437,8 @@ NdbEventBuffer::get_main_data(Gci_container* bucket,
} }
void void
NdbEventBuffer::add_blob_data(EventBufData* main_data, NdbEventBuffer::add_blob_data(Gci_container* bucket,
EventBufData* main_data,
EventBufData* blob_data) EventBufData* blob_data)
{ {
DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data"); DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data");
...@@ -2389,6 +2462,9 @@ NdbEventBuffer::add_blob_data(EventBufData* main_data, ...@@ -2389,6 +2462,9 @@ NdbEventBuffer::add_blob_data(EventBufData* main_data,
blob_data->m_next = head->m_next; blob_data->m_next = head->m_next;
head->m_next = blob_data; head->m_next = blob_data;
} }
// adjust data list size
bucket->m_data.m_count += 1;
bucket->m_data.m_sz += blob_data->sz;
DBUG_VOID_RETURN_EVENT; DBUG_VOID_RETURN_EVENT;
} }
...@@ -2424,6 +2500,9 @@ NdbEventBuffer::move_data() ...@@ -2424,6 +2500,9 @@ NdbEventBuffer::move_data()
void void
NdbEventBuffer::free_list(EventBufData_list &list) NdbEventBuffer::free_list(EventBufData_list &list)
{ {
#ifdef NDB_EVENT_VERIFY_SIZE
verify_size(list);
#endif
// return list to m_free_data // return list to m_free_data
list.m_tail->m_next= m_free_data; list.m_tail->m_next= m_free_data;
m_free_data= list.m_head; m_free_data= list.m_head;
...@@ -2432,38 +2511,15 @@ NdbEventBuffer::free_list(EventBufData_list &list) ...@@ -2432,38 +2511,15 @@ NdbEventBuffer::free_list(EventBufData_list &list)
#endif #endif
m_free_data_sz+= list.m_sz; m_free_data_sz+= list.m_sz;
// free blobs XXX unacceptable performance, fix later
{
EventBufData* data = list.m_head;
while (1) {
while (data->m_next_blob != NULL) {
EventBufData* blob_head = data->m_next_blob;
data->m_next_blob = blob_head->m_next_blob;
blob_head->m_next_blob = NULL;
while (blob_head != NULL) {
EventBufData* blob_part = blob_head;
blob_head = blob_head->m_next;
blob_part->m_next = m_free_data;
m_free_data = blob_part;
#ifdef VM_TRACE
m_free_data_count++;
#endif
m_free_data_sz += blob_part->sz;
}
}
if (data == list.m_tail)
break;
data = data->m_next;
}
}
// list returned to m_free_data
list.m_head = list.m_tail = NULL; list.m_head = list.m_tail = NULL;
list.m_count = list.m_sz = 0; list.m_count = list.m_sz = 0;
} }
void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
{ {
#ifdef NDB_EVENT_VERIFY_SIZE
NdbEventBuffer::verify_size(*list);
#endif
move_gci_ops(list, gci); move_gci_ops(list, gci);
if (m_tail) if (m_tail)
...@@ -2702,6 +2758,29 @@ send_report: ...@@ -2702,6 +2758,29 @@ send_report:
#endif #endif
} }
#ifdef VM_TRACE
void
NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz)
{
Uint32 tmp_count = 0;
Uint32 tmp_sz = 0;
while (data != 0) {
Uint32 full_count, full_sz;
data->get_full_size(full_count, full_sz);
tmp_count += full_count;
tmp_sz += full_sz;
data = data->m_next;
}
assert(tmp_count == count);
assert(tmp_sz == sz);
}
void
NdbEventBuffer::verify_size(const EventBufData_list & list)
{
verify_size(list.m_head, list.m_count, list.m_sz);
}
#endif
// hash table routines // hash table routines
// could optimize the all-fixed case // could optimize the all-fixed case
......
...@@ -40,6 +40,12 @@ ...@@ -40,6 +40,12 @@
#define DBUG_DUMP_EVENT(A,B,C) #define DBUG_DUMP_EVENT(A,B,C)
#endif #endif
#undef NDB_EVENT_VERIFY_SIZE
#ifdef VM_TRACE
// not much effect on performance, leave on
#define NDB_EVENT_VERIFY_SIZE
#endif
class NdbEventOperationImpl; class NdbEventOperationImpl;
struct EventBufData struct EventBufData
...@@ -54,9 +60,13 @@ struct EventBufData ...@@ -54,9 +60,13 @@ struct EventBufData
/* /*
* Blobs are stored in blob list (m_next_blob) where each entry * Blobs are stored in blob list (m_next_blob) where each entry
* is list of parts (m_next) in part number order. * is list of parts (m_next). TODO order by part number
*
* Processed data (m_used_data, m_free_data) keeps the old blob
* list intact. It is reconsumed when new data items are needed.
* *
* TODO order by part no and link for fast read and free_list * Data item lists keep track of item count and sum(sz) and
* these include both main items and blob parts.
*/ */
EventBufData *m_next; // Next wrt to global order or Next blob part EventBufData *m_next; // Next wrt to global order or Next blob part
...@@ -66,14 +76,22 @@ struct EventBufData ...@@ -66,14 +76,22 @@ struct EventBufData
Uint32 m_pkhash; // PK hash (without op) for fast compare Uint32 m_pkhash; // PK hash (without op) for fast compare
EventBufData() {} EventBufData() {}
// Get blob part number from blob data // Get blob part number from blob data
Uint32 get_blob_part_no() { Uint32 get_blob_part_no() const;
assert(ptr[0].sz > 2);
Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + /*
AttributeHeader(ptr[0].p[1]).getDataSize(); * Main item does not include summary of parts (space / performance
Uint32 no = ptr[1].p[pos]; * tradeoff). The summary is needed when moving single data item.
return no; * It is not needed when moving entire list.
*/
void get_full_size(Uint32 & full_count, Uint32 & full_sz) const {
full_count = 1;
full_sz = sz;
if (m_next_blob != 0)
add_part_size(full_count, full_sz);
} }
void add_part_size(Uint32 & full_count, Uint32 & full_sz) const;
}; };
class EventBufData_list class EventBufData_list
...@@ -82,19 +100,22 @@ public: ...@@ -82,19 +100,22 @@ public:
EventBufData_list(); EventBufData_list();
~EventBufData_list(); ~EventBufData_list();
void remove_first(); // remove first and return its size
// append data and insert data into Gci_op list with add_gci_op void remove_first(Uint32 & full_count, Uint32 & full_sz);
void append_data(EventBufData *data); // for remove+append avoid double call to get_full_size()
void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz);
// append data and insert data but ignore Gci_op list // append data and insert data but ignore Gci_op list
void append_used_data(EventBufData *data); void append_used_data(EventBufData *data);
// append data and insert data into Gci_op list with add_gci_op
void append_data(EventBufData *data);
// append list to another, will call move_gci_ops // append list to another, will call move_gci_ops
void append_list(EventBufData_list *list, Uint64 gci); void append_list(EventBufData_list *list, Uint64 gci);
int is_empty(); int is_empty();
EventBufData *m_head, *m_tail; EventBufData *m_head, *m_tail;
unsigned m_count; Uint32 m_count;
unsigned m_sz; Uint32 m_sz;
/* /*
distinct ops per gci (assume no hash needed) distinct ops per gci (assume no hash needed)
...@@ -193,33 +214,47 @@ int EventBufData_list::is_empty() ...@@ -193,33 +214,47 @@ int EventBufData_list::is_empty()
} }
inline inline
void EventBufData_list::remove_first() void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz)
{ {
m_count--; m_head->get_full_size(full_count, full_sz);
m_sz-= m_head->sz; #ifdef VM_TRACE
m_head= m_head->m_next; assert(m_count >= full_count);
assert(m_sz >= full_sz);
#endif
m_count -= full_count;
m_sz -= full_sz;
m_head = m_head->m_next;
if (m_head == 0) if (m_head == 0)
m_tail= 0; m_tail = 0;
} }
inline inline
void EventBufData_list::append_used_data(EventBufData *data) void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz)
{ {
data->m_next= 0; data->m_next = 0;
if (m_tail) if (m_tail)
m_tail->m_next= data; m_tail->m_next = data;
else else
{ {
#ifdef VM_TRACE #ifdef VM_TRACE
assert(m_head == 0);
assert(m_count == 0); assert(m_count == 0);
assert(m_sz == 0); assert(m_sz == 0);
#endif #endif
m_head= data; m_head = data;
} }
m_tail= data; m_tail = data;
m_count++; m_count += full_count;
m_sz+= data->sz; m_sz += full_sz;
}
inline
void EventBufData_list::append_used_data(EventBufData *data)
{
Uint32 full_count, full_sz;
data->get_full_size(full_count, full_sz);
append_used_data(data, full_count, full_sz);
} }
inline inline
...@@ -442,17 +477,24 @@ public: ...@@ -442,17 +477,24 @@ public:
// routines to copy/merge events // routines to copy/merge events
EventBufData* alloc_data(); EventBufData* alloc_data();
int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]); int alloc_mem(EventBufData* data,
LinearSectionPtr ptr[3],
Uint32 * change_sz);
void dealloc_mem(EventBufData* data,
Uint32 * change_sz);
int copy_data(const SubTableData * const sdata, int copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3], LinearSectionPtr ptr[3],
EventBufData* data); EventBufData* data,
Uint32 * change_sz);
int merge_data(const SubTableData * const sdata, int merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3], LinearSectionPtr ptr[3],
EventBufData* data); EventBufData* data,
Uint32 * change_sz);
int get_main_data(Gci_container* bucket, int get_main_data(Gci_container* bucket,
EventBufData_hash::Pos& hpos, EventBufData_hash::Pos& hpos,
EventBufData* blob_data); EventBufData* blob_data);
void add_blob_data(EventBufData* main_data, void add_blob_data(Gci_container* bucket,
EventBufData* main_data,
EventBufData* blob_data); EventBufData* blob_data);
void free_list(EventBufData_list &list); void free_list(EventBufData_list &list);
...@@ -478,9 +520,9 @@ public: ...@@ -478,9 +520,9 @@ public:
Gci_container m_complete_data; Gci_container m_complete_data;
EventBufData *m_free_data; EventBufData *m_free_data;
#ifdef VM_TRACE #ifdef VM_TRACE
unsigned m_free_data_count; Uint32 m_free_data_count;
#endif #endif
unsigned m_free_data_sz; Uint32 m_free_data_sz;
// user thread // user thread
EventBufData_list m_available_data; EventBufData_list m_available_data;
...@@ -493,6 +535,12 @@ public: ...@@ -493,6 +535,12 @@ public:
unsigned m_gci_slip_thresh; unsigned m_gci_slip_thresh;
NdbError m_error; NdbError m_error;
#ifdef VM_TRACE
static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz);
static void verify_size(const EventBufData_list & list);
#endif
private: private:
int expand(unsigned sz); int expand(unsigned sz);
......
...@@ -161,6 +161,7 @@ static void ...@@ -161,6 +161,7 @@ static void
errdb() errdb()
{ {
uint any = 0; uint any = 0;
// g_ncc return no error...
if (g_ndb != 0) { if (g_ndb != 0) {
const NdbError& e = g_ndb->getNdbError(); const NdbError& e = g_ndb->getNdbError();
if (e.code != 0) if (e.code != 0)
...@@ -359,9 +360,9 @@ createtable(Tab& t) ...@@ -359,9 +360,9 @@ createtable(Tab& t)
} }
static int static int
createtable() createtables()
{ {
ll1("createtable"); ll1("createtables");
for (uint i = 0; i < maxtab(); i++) for (uint i = 0; i < maxtab(); i++)
chkrc(createtable(tab(i)) == 0); chkrc(createtable(tab(i)) == 0);
return 0; return 0;
...@@ -381,9 +382,9 @@ droptable(Tab& t) ...@@ -381,9 +382,9 @@ droptable(Tab& t)
} }
static int static int
droptable() droptables()
{ {
ll1("droptable"); ll1("droptables");
for (uint i = 0; i < maxtab(); i++) for (uint i = 0; i < maxtab(); i++)
chkrc(droptable(tab(i)) == 0); chkrc(droptable(tab(i)) == 0);
return 0; return 0;
...@@ -419,9 +420,9 @@ createevent(Tab& t) ...@@ -419,9 +420,9 @@ createevent(Tab& t)
} }
static int static int
createevent() createevents()
{ {
ll1("createevent"); ll1("createevents");
for (uint i = 0; i < maxtab(); i++) for (uint i = 0; i < maxtab(); i++)
chkrc(createevent(tab(i)) == 0); chkrc(createevent(tab(i)) == 0);
return 0; return 0;
...@@ -439,11 +440,14 @@ dropevent(Tab& t, bool force = false) ...@@ -439,11 +440,14 @@ dropevent(Tab& t, bool force = false)
} }
static int static int
dropevent(bool force = false) dropevents(bool force = false)
{ {
ll1("dropevent"); ll1("dropevents");
for (uint i = 0; i < maxtab(); i++) for (uint i = 0; i < maxtab(); i++) {
if (force && g_tablst[i] == 0)
continue;
chkrc(dropevent(tab(i), force) == 0 || force); chkrc(dropevent(tab(i), force) == 0 || force);
}
return 0; return 0;
} }
...@@ -1173,8 +1177,11 @@ static int ...@@ -1173,8 +1177,11 @@ static int
dropeventops(bool force = false) dropeventops(bool force = false)
{ {
ll1("dropeventops"); ll1("dropeventops");
for (uint i = 0; i < maxrun(); i++) for (uint i = 0; i < maxrun(); i++) {
if (force && g_runlst[i] == 0)
continue;
chkrc(dropeventop(run(i), force) == 0 || force); chkrc(dropeventop(run(i), force) == 0 || force);
}
return 0; return 0;
} }
...@@ -2139,8 +2146,8 @@ runtest() ...@@ -2139,8 +2146,8 @@ runtest()
{ {
setseed(-1); setseed(-1);
initrun(); initrun();
chkrc(createtable() == 0); chkrc(createtables() == 0);
chkrc(createevent() == 0); chkrc(createevents() == 0);
for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) { for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) {
ll0("=== loop " << g_loop << " ==="); ll0("=== loop " << g_loop << " ===");
setseed(g_loop); setseed(g_loop);
...@@ -2164,8 +2171,8 @@ runtest() ...@@ -2164,8 +2171,8 @@ runtest()
// time erases everything.. // time erases everything..
chkrc(waitgci(1) == 0); chkrc(waitgci(1) == 0);
} }
chkrc(dropevent() == 0); chkrc(dropevents() == 0);
chkrc(droptable() == 0); chkrc(droptables() == 0);
resetmem(); resetmem();
deleteops(); deleteops();
return 0; return 0;
...@@ -2287,6 +2294,16 @@ checkopts() ...@@ -2287,6 +2294,16 @@ checkopts()
return 0; return 0;
} }
static int
doconnect()
{
g_ncc = new Ndb_cluster_connection();
chkdb(g_ncc->connect(30) == 0);
g_ndb = new Ndb(g_ncc, "TEST_DB");
chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0);
return 0;
}
int int
main(int argc, char** argv) main(int argc, char** argv)
{ {
...@@ -2302,19 +2319,13 @@ main(int argc, char** argv) ...@@ -2302,19 +2319,13 @@ main(int argc, char** argv)
ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option); ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option);
if (ret != 0 || argc != 0 || checkopts() != 0) if (ret != 0 || argc != 0 || checkopts() != 0)
return NDBT_ProgramExit(NDBT_WRONGARGS); return NDBT_ProgramExit(NDBT_WRONGARGS);
g_ncc = new Ndb_cluster_connection(); if (doconnect() == 0 && runtest() == 0) {
if (g_ncc->connect(30) == 0) { delete g_ndb;
g_ndb = new Ndb(g_ncc, "TEST_DB"); delete g_ncc;
if (g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0) { return NDBT_ProgramExit(NDBT_OK);
if (runtest() == 0) {
delete g_ndb;
delete g_ncc;
return NDBT_ProgramExit(NDBT_OK);
}
}
} }
dropeventops(true); dropeventops(true);
dropevent(true); dropevents(true);
delete g_ndb; delete g_ndb;
delete g_ncc; delete g_ncc;
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment