Commit 01d8c667 authored by ingo@mysql.com's avatar ingo@mysql.com

Merge mysql.com:/home/mydev/mysql-5.1

into  mysql.com:/home/mydev/mysql-5.1-wl1563
parents a1dd235f 582488e8
......@@ -21,7 +21,7 @@ INITIAL_SIZE 16M
UNDO_BUFFER_SIZE = 1M
ENGINE=NDB;
--error 1502
--error ER_CREATE_TABLESPACE_FAILED
CREATE LOGFILE GROUP lg1
ADD UNDOFILE 'undofile.dat'
INITIAL_SIZE 16M
......@@ -32,7 +32,7 @@ ALTER LOGFILE GROUP lg1
ADD UNDOFILE 'undofile02.dat'
INITIAL_SIZE 4M ENGINE NDB;
--error 1507
--error ER_ALTER_TABLESPACE_FAILED
ALTER LOGFILE GROUP lg1
ADD UNDOFILE 'undofile02.dat'
INITIAL_SIZE 4M ENGINE=NDB;
......@@ -43,20 +43,20 @@ USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB;
--error 1502 # Bug 16158
--error ER_CREATE_TABLESPACE_FAILED
CREATE TABLESPACE ts1
ADD DATAFILE 'datafile.dat'
USE LOGFILE GROUP lg1
INITIAL_SIZE 12M
ENGINE NDB;
# Currently a bug, bug#16158
ALTER TABLESPACE ts1
ADD DATAFILE 'datafile2.dat'
INITIAL_SIZE 12M
ENGINE=NDB;
--error 1507 # Currently a bug, bug#16158
--error ER_ALTER_TABLESPACE_FAILED
ALTER TABLESPACE ts1
ADD DATAFILE 'datafile2.dat'
INITIAL_SIZE 12M
......@@ -67,7 +67,7 @@ CREATE TABLE t1
tablespace ts1 storage disk
engine ndb;
--error 1050
--error ER_TABLE_EXISTS_ERROR
CREATE TABLE t1
(pk1 int not null primary key, b int not null, c int not null)
tablespace ts1 storage disk
......@@ -79,7 +79,7 @@ ALTER TABLESPACE ts1
DROP DATAFILE 'datafile2.dat'
ENGINE=NDB;
--error 1507
--error ER_ALTER_TABLESPACE_FAILED
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile2.dat'
ENGINE=NDB;
......@@ -88,7 +88,7 @@ ALTER TABLESPACE ts1
DROP DATAFILE 'datafile.dat'
ENGINE=NDB;
--error 1507
--error ER_ALTER_TABLESPACE_FAILED
ALTER TABLESPACE ts1
DROP DATAFILE 'datafile.dat'
ENGINE=NDB;
......@@ -96,14 +96,14 @@ ENGINE=NDB;
DROP TABLESPACE ts1
ENGINE=NDB;
--error 1503
--error ER_DROP_TABLESPACE_FAILED
DROP TABLESPACE ts1
ENGINE=NDB;
DROP LOGFILE GROUP lg1
ENGINE=NDB;
--error 1503
--error ER_DROP_TABLESPACE_FAILED
DROP LOGFILE GROUP lg1
ENGINE=NDB;
--echo **** End Duplicate Statement Testing ****
......
......@@ -183,13 +183,20 @@ event_executor_main(void *arg)
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
if (sizeof(my_time_t) != sizeof(time_t))
{
sql_print_error("sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping.");
goto err_no_thd;
}
//TODO Andrey: Check for NULL
if (!(thd = new THD)) // note that contructor of THD uses DBUG_ !
{
sql_print_error("Cannot create THD for event_executor_main");
goto err_no_thd;
}
}
thd->thread_stack = (char*)&thd; // remember where our stack is
pthread_detach_this_thread();
......@@ -275,7 +282,7 @@ event_executor_main(void *arg)
}
DBUG_PRINT("evex main thread",("computing time to sleep till next exec"));
time(&now);
time((time_t *)&now);
my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
t2sleep= evex_time_diff(&et->execute_at, &time_now);
VOID(pthread_mutex_unlock(&LOCK_event_arrays));
......
......@@ -537,7 +537,7 @@ event_timed::compute_next_execution_time()
}
goto ret;
}
time(&now);
time((time_t *)&now);
my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
/*
sql_print_information("[%s.%s]", dbname.str, name.str);
......@@ -703,7 +703,7 @@ event_timed::mark_last_executed()
TIME time_now;
my_time_t now;
time(&now);
time((time_t *)&now);
my_tz_UTC->gmt_sec_to_TIME(&time_now, now);
last_executed= time_now; // was execute_at
......
......@@ -1051,7 +1051,8 @@ public:
_TE_CREATE=6,
_TE_GCP_COMPLETE=7,
_TE_CLUSTER_FAILURE=8,
_TE_STOP=9
_TE_STOP=9,
_TE_NUL=10 // internal (INS o DEL within same GCI)
};
#endif
/**
......
......@@ -93,6 +93,12 @@ public:
* Retrieve current state of the NdbEventOperation object
*/
State getState();
/**
* By default events on same NdbEventOperation within same GCI
* are merged into a single event. This can be changed with
* separateEvents(true).
*/
void separateEvents(bool flag);
/**
* Activates the NdbEventOperation to start receiving events. The
......
TARGET = ndbapi_event
SRCS = ndbapi_event.cpp
OBJS = ndbapi_event.o
CXX = g++
CXX = g++ -g
CFLAGS = -c -Wall -fno-rtti -fno-exceptions
CXXFLAGS =
DEBUG =
......@@ -17,7 +17,7 @@ $(TARGET): $(OBJS)
$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)
$(TARGET).o: $(SRCS)
$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS)
$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
clean:
rm -f *.o $(TARGET)
......@@ -58,24 +58,29 @@
/**
*
* Assume that there is a table TAB0 which is being updated by
* Assume that there is a table t0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns COL0, COL2, COL11
* We want to monitor what happens with columns c0,c1,c2,c3.
*
* or together with the mysql client;
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into TAB0 values (1,2,3);
insert into TAB0 values (2,2,3);
insert into TAB0 values (3,2,9);
update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
update t0 set c2 = 'g' where c0 = 1; -- update pk part
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
* you should see the data popping up in the example window
*
......@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb,
const char **eventColumnName,
const int noEventColumnName);
int main()
int main(int argc, char** argv)
{
ndb_init();
bool sep = argc > 1 && strcmp(argv[1], "-s") == 0;
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster
......@@ -126,13 +132,15 @@ int main()
if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
const char *eventName= "CHNG_IN_TAB0";
const char *eventTableName= "TAB0";
const int noEventColumnName= 3;
const char *eventName= "CHNG_IN_t0";
const char *eventTableName= "t0";
const int noEventColumnName= 4;
const char *eventColumnName[noEventColumnName]=
{"COL0",
"COL1",
"COL11"};
{"c0",
"c1",
"c2",
"c3"
};
// Create events
myCreateEvent(myNdb,
......@@ -142,13 +150,14 @@ int main()
noEventColumnName);
int j= 0;
while (j < 5) {
while (j < 99) {
// Start "transaction" for handling events
NdbEventOperation* op;
printf("create EventOperation\n");
if ((op = myNdb->createEventOperation(eventName)) == NULL)
APIERROR(myNdb->getNdbError());
op->separateEvents(sep);
printf("get values\n");
NdbRecAttr* recAttr[noEventColumnName];
......@@ -175,34 +184,45 @@ int main()
i++;
switch (op->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
printf("%u INSERT: ", i);
printf("%u INSERT", i);
break;
case NdbDictionary::Event::TE_DELETE:
printf("%u DELETE: ", i);
printf("%u DELETE", i);
break;
case NdbDictionary::Event::TE_UPDATE:
printf("%u UPDATE: ", i);
printf("%u UPDATE", i);
break;
default:
abort(); // should not happen
}
for (int i = 1; i < noEventColumnName; i++) {
printf(" gci=%d\n", op->getGCI());
printf("post: ");
for (int i = 0; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value
printf(" post[%u]=", i);
if (recAttr[i]->isNULL() == 0) // we have a non-null value
printf("%u", recAttr[i]->u_32_value());
else // we have a null value
printf("NULL");
}
if (recAttr[i]->isNULL() == 0) { // we have a non-null value
if (i < 2)
printf("%-5u", recAttr[i]->u_32_value());
else
printf("%-5.4s", recAttr[i]->aRef());
} else // we have a null value
printf("%-5s", "NULL");
} else
printf("%-5s", "-");
}
printf("\npre : ");
for (int i = 0; i < noEventColumnName; i++) {
if (recAttrPre[i]->isNULL() >= 0) { // we have a value
printf(" pre[%u]=", i);
if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
printf("%u", recAttrPre[i]->u_32_value());
else // we have a null value
printf("NULL");
}
if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
if (i < 2)
printf("%-5u", recAttrPre[i]->u_32_value());
else
printf("%-5.4s", recAttrPre[i]->aRef());
} else // we have a null value
printf("%-5s", "NULL");
} else
printf("%-5s", "-");
}
printf("\n");
printf("\n");
}
} else
;//printf("timed out\n");
......
......@@ -132,7 +132,7 @@ void Dbtup::updatePackedList(Signal* signal, Uint16 hostId)
void Dbtup::sendReadAttrinfo(Signal* signal,
KeyReqStruct *req_struct,
Uint32 ToutBufIndex,
const Operationrec * const regOperPtr)
const Operationrec *regOperPtr)
{
if(ToutBufIndex == 0)
return;
......
......@@ -242,8 +242,8 @@ Dbtup::alloc_page(Tablerec* tabPtrP, Fragrecord* fragPtrP,
}
Uint32*
Dbtup::alloc_fix_rowid(Fragrecord* const regFragPtr,
Tablerec* const regTabPtr,
Dbtup::alloc_fix_rowid(Fragrecord* regFragPtr,
Tablerec* regTabPtr,
Local_key* key,
Uint32 * out_frag_page_id)
{
......
......@@ -89,7 +89,7 @@
//
// The full page range struct
Uint32 Dbtup::getEmptyPage(Fragrecord* const regFragPtr)
Uint32 Dbtup::getEmptyPage(Fragrecord* regFragPtr)
{
Uint32 pageId = regFragPtr->emptyPrimPage.firstItem;
if (pageId == RNIL) {
......@@ -108,7 +108,7 @@ Uint32 Dbtup::getEmptyPage(Fragrecord* const regFragPtr)
return pageId;
}//Dbtup::getEmptyPage()
Uint32 Dbtup::getRealpid(Fragrecord* const regFragPtr, Uint32 logicalPageId)
Uint32 Dbtup::getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId)
{
PageRangePtr grpPageRangePtr;
Uint32 loopLimit;
......@@ -241,7 +241,7 @@ bool Dbtup::insertPageRangeTab(Fragrecord* const regFragPtr,
}//Dbtup::insertPageRangeTab()
void Dbtup::releaseFragPages(Fragrecord* const regFragPtr)
void Dbtup::releaseFragPages(Fragrecord* regFragPtr)
{
if (regFragPtr->rootPageRange == RNIL) {
ljam();
......@@ -349,7 +349,7 @@ void Dbtup::initFragRange(Fragrecord* const regFragPtr)
regFragPtr->nextStartRange = 0;
}//initFragRange()
Uint32 Dbtup::allocFragPages(Fragrecord* const regFragPtr, Uint32 tafpNoAllocRequested)
Uint32 Dbtup::allocFragPages(Fragrecord* regFragPtr, Uint32 tafpNoAllocRequested)
{
Uint32 tafpPagesAllocated = 0;
while (true) {
......
......@@ -28,7 +28,7 @@
#define ljamEntry() { jamEntryLine(3000 + __LINE__); }
void
Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
Dbtup::setUpQueryRoutines(Tablerec *regTabPtr)
{
Uint32 startDescriptor= regTabPtr->tabDescriptor;
ndbrequire((startDescriptor + (regTabPtr->m_no_of_attributes << ZAD_LOG_SIZE))
......
......@@ -356,8 +356,8 @@ Dbtup::dropTrigger(Tablerec* table, const DropTrigReq* req)
/* ---------------------------------------------------------------- */
void
Dbtup::checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
Operationrec* const regOperPtr,
Tablerec* const regTablePtr)
Operationrec *regOperPtr,
Tablerec *regTablePtr)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
......@@ -374,8 +374,8 @@ Dbtup::checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
void
Dbtup::checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
Operationrec* const regOperPtr,
Tablerec* const regTablePtr)
Operationrec* regOperPtr,
Tablerec* regTablePtr)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
......@@ -399,8 +399,8 @@ Dbtup::checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
void
Dbtup::checkImmediateTriggersAfterDelete(KeyReqStruct *req_struct,
Operationrec* const regOperPtr,
Tablerec* const regTablePtr)
Operationrec* regOperPtr,
Tablerec* regTablePtr)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
......@@ -444,8 +444,8 @@ void Dbtup::checkDeferredTriggers(Signal* signal,
/* */
/* ---------------------------------------------------------------- */
void Dbtup::checkDetachedTriggers(KeyReqStruct *req_struct,
Operationrec* const regOperPtr,
Tablerec* const regTablePtr)
Operationrec* regOperPtr,
Tablerec* regTablePtr)
{
Uint32 save_type = regOperPtr->op_struct.op_type;
Tuple_header *save_ptr = req_struct->m_tuple_ptr;
......@@ -1049,9 +1049,9 @@ void Dbtup::sendFireTrigOrd(Signal* signal,
int
Dbtup::executeTuxInsertTriggers(Signal* signal,
Operationrec* const regOperPtr,
Fragrecord* const regFragPtr,
Tablerec* const regTabPtr)
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
// fill in constant part
......@@ -1066,9 +1066,9 @@ Dbtup::executeTuxInsertTriggers(Signal* signal,
int
Dbtup::executeTuxUpdateTriggers(Signal* signal,
Operationrec* const regOperPtr,
Fragrecord* const regFragPtr,
Tablerec* const regTabPtr)
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
// fill in constant part
......@@ -1139,8 +1139,8 @@ Dbtup::executeTuxDeleteTriggers(Signal* signal,
void
Dbtup::executeTuxCommitTriggers(Signal* signal,
Operationrec* regOperPtr,
Fragrecord* const regFragPtr,
Tablerec* const regTabPtr)
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
Uint32 tupVersion;
......@@ -1174,8 +1174,8 @@ Dbtup::executeTuxCommitTriggers(Signal* signal,
void
Dbtup::executeTuxAbortTriggers(Signal* signal,
Operationrec* regOperPtr,
Fragrecord* const regFragPtr,
Tablerec* const regTabPtr)
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
TuxMaintReq* const req = (TuxMaintReq*)signal->getDataPtrSend();
// get version
......
......@@ -83,6 +83,8 @@ Ndbd_mem_manager::init(Uint32 pages)
release(start+1, end - 1 - start);
}
return 0;
}
void
......
......@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState()
return m_impl.getState();
}
void NdbEventOperation::separateEvents(bool flag)
{
m_impl.m_separateEvents = flag;
}
NdbRecAttr *
NdbEventOperation::getValue(const char *colName, char *aValue)
{
......
......@@ -104,6 +104,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state= EO_CREATED;
m_separateEvents = false;
m_has_error= 0;
DBUG_PRINT("exit",("this: 0x%x oid: %u", this, m_oid));
......@@ -693,6 +695,21 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return ret;
}
#ifdef VM_TRACE
static void
print_std(const char* tag, const SubTableData * sdata, LinearSectionPtr ptr[3])
{
printf("%s\n", tag);
printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
for (int i = 0; i <= 2; i++) {
printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
for (int j = 0; j < ptr[i].sz; j++)
printf("%08x ", ptr[i].p[j]);
printf("\n");
}
}
#endif
NdbEventOperation *
NdbEventBuffer::nextEvent()
{
......@@ -734,6 +751,10 @@ NdbEventBuffer::nextEvent()
op->m_data_done_count++;
#endif
// NUL event is not returned
if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
continue;
int r= op->receive_event();
if (r > 0)
{
......@@ -1099,13 +1120,15 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER("NdbEventBuffer::insertDataL");
Uint64 gci= sdata->gci;
EventBufData *data= m_free_data;
if ( likely((Uint32)op->mi_type & 1 << (Uint32)sdata->operation) )
{
Gci_container* bucket= find_bucket(&m_active_gci, gci);
DBUG_PRINT("info", ("data insertion in eventId %d", op->m_eventId));
DBUG_PRINT("info", ("gci=%d tab=%d op=%d node=%d",
sdata->gci, sdata->tableId, sdata->operation,
sdata->req_nodeid));
if (unlikely(bucket == 0))
{
......@@ -1116,61 +1139,65 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN(0);
}
if (unlikely(data == 0))
bool use_hash =
! op->m_separateEvents &&
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
// find position in bucket hash table
EventBufData* data = 0;
EventBufData_hash::Pos hpos;
if (use_hash)
{
#ifdef VM_TRACE
assert(m_free_data_count == 0);
assert(m_free_data_sz == 0);
#endif
expand(4000);
reportStatus();
bucket->m_data_hash.search(hpos, op, ptr);
data = hpos.data;
}
data= m_free_data;
if (data == 0)
{
// allocate new result buffer
data = alloc_data();
if (unlikely(data == 0))
{
#ifdef VM_TRACE
printf("m_latest_command: %s\n", m_latest_command);
printf("no free data, m_latestGCI %lld\n",
m_latestGCI);
printf("m_free_data_count %d\n", m_free_data_count);
printf("m_available_data_count %d first gci %d last gci %d\n",
m_available_data.m_count,
m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
printf("m_used_data_count %d\n", m_used_data.m_count);
#endif
op->m_has_error= 2;
DBUG_RETURN(-1); // TODO handle this, overrun, or, skip?
op->m_has_error = 2;
DBUG_RETURN(-1);
}
}
// remove data from free list
m_free_data= data->m_next;
if (unlikely(copy_data(sdata, ptr, data)))
{
op->m_has_error = 3;
DBUG_RETURN(-1);
}
// add it to list and hash table
bucket->m_data.append(data);
if (use_hash)
{
bucket->m_data_hash.append(hpos, data);
}
#ifdef VM_TRACE
m_free_data_count--;
assert(m_free_data_sz >= data->sz);
op->m_data_count++;
#endif
m_free_data_sz-= data->sz;
if (unlikely(copy_data_alloc(sdata, ptr, data)))
}
else
{
op->m_has_error= 3;
DBUG_RETURN(-1);
// event with same op, PK found, merge into old buffer
if (unlikely(merge_data(sdata, ptr, data)))
{
op->m_has_error = 3;
DBUG_RETURN(-1);
}
}
data->m_event_op = op;
if (use_hash)
{
data->m_pkhash = hpos.pkhash;
}
// add it to received data
bucket->m_data.append(data);
data->m_event_op= op;
#ifdef VM_TRACE
op->m_data_count++;
#endif
DBUG_RETURN(0);
}
#ifdef VM_TRACE
if ((Uint32)op->m_eventImpl->mi_type & 1 << (Uint32)sdata->operation)
{
// XXX never reached
DBUG_PRINT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN(0);
}
......@@ -1183,80 +1210,325 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
#endif
}
int
NdbEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr f_ptr[3],
EventBufData *ev_buf)
// allocate EventBufData
EventBufData*
NdbEventBuffer::alloc_data()
{
DBUG_ENTER("NdbEventBuffer::copy_data_alloc");
const unsigned min_alloc_size= 128;
const unsigned sz4= (sizeof(SubTableData)+3)>>2;
Uint32 f_ptr_sz_0= f_ptr[0].sz;
Uint32 f_ptr_sz_1= f_ptr[1].sz;
Uint32 f_ptr_sz_2= f_ptr[2].sz;
LinearSectionPtr *t_ptr= ev_buf->ptr;
SubTableData *sdata= ev_buf->sdata;
const unsigned alloc_size= (sz4 +
f_ptr_sz_0 +
f_ptr_sz_1 +
f_ptr_sz_2) * sizeof(Uint32);
Uint32 *ptr;
if (alloc_size > min_alloc_size)
DBUG_ENTER("alloc_data");
EventBufData* data = m_free_data;
if (unlikely(data == 0))
{
if (sdata)
#ifdef VM_TRACE
assert(m_free_data_count == 0);
assert(m_free_data_sz == 0);
#endif
expand(4000);
reportStatus();
data = m_free_data;
if (unlikely(data == 0))
{
NdbMem_Free((char*)sdata);
#ifdef VM_TRACE
assert(m_total_alloc >= ev_buf->sz);
printf("m_latest_command: %s\n", m_latest_command);
printf("no free data, m_latestGCI %lld\n",
m_latestGCI);
printf("m_free_data_count %d\n", m_free_data_count);
printf("m_available_data_count %d first gci %d last gci %d\n",
m_available_data.m_count,
m_available_data.m_head ? m_available_data.m_head->sdata->gci : 0,
m_available_data.m_tail ? m_available_data.m_tail->sdata->gci : 0);
printf("m_used_data_count %d\n", m_used_data.m_count);
#endif
m_total_alloc-= ev_buf->sz;
DBUG_RETURN(0); // TODO handle this, overrun, or, skip?
}
ptr= (Uint32*)NdbMem_Allocate(alloc_size);
ev_buf->sdata= (SubTableData *)ptr;
ev_buf->sz= alloc_size;
m_total_alloc+= alloc_size;
}
else /* alloc_size <= min_alloc_size */
// remove data from free list
m_free_data = data->m_next;
data->m_next = 0;
#ifdef VM_TRACE
m_free_data_count--;
assert(m_free_data_sz >= data->sz);
#endif
m_free_data_sz -= data->sz;
DBUG_RETURN(data);
}
// allocate initial or bigger memory area in EventBufData
// takes sizes from given ptr and sets up data->ptr
int
NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
{
const Uint32 min_alloc_size = 128;
Uint32 sz4 = (sizeof(SubTableData) + 3) >> 2;
Uint32 alloc_size = (sz4 + ptr[0].sz + ptr[1].sz + ptr[2].sz) << 2;
if (alloc_size < min_alloc_size)
alloc_size = min_alloc_size;
if (data->sz < alloc_size)
{
NdbMem_Free((char*)data->memory);
assert(m_total_alloc >= data->sz);
m_total_alloc -= data->sz;
data->memory = 0;
data->sz = 0;
data->memory = (Uint32*)NdbMem_Allocate(alloc_size);
if (data->memory == 0)
return -1;
data->sz = alloc_size;
m_total_alloc += data->sz;
}
Uint32* memptr = data->memory;
memptr += sz4;
int i;
for (i = 0; i <= 2; i++)
{
if (sdata)
ptr= (Uint32*)sdata;
else
{
ptr= (Uint32*)NdbMem_Allocate(min_alloc_size);
ev_buf->sdata= (SubTableData *)ptr;
ev_buf->sz= min_alloc_size;
m_total_alloc+= min_alloc_size;
}
data->ptr[i].p = memptr;
data->ptr[i].sz = ptr[i].sz;
memptr += ptr[i].sz;
}
memcpy(ptr,f_sdata,sizeof(SubTableData));
ptr+= sz4;
return 0;
}
int
NdbEventBuffer::copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data)
{
DBUG_ENTER("NdbEventBuffer::copy_data");
t_ptr->p= ptr;
t_ptr->sz= f_ptr_sz_0;
if (alloc_mem(data, ptr) != 0)
DBUG_RETURN(-1);
memcpy(data->sdata, sdata, sizeof(SubTableData));
int i;
for (i = 0; i <= 2; i++)
memcpy(data->ptr[i].p, ptr[i].p, ptr[i].sz << 2);
DBUG_RETURN(0);
}
memcpy(ptr, f_ptr[0].p, sizeof(Uint32)*f_ptr_sz_0);
ptr+= f_ptr_sz_0;
t_ptr++;
static struct Ev_t {
enum {
INS = NdbDictionary::Event::_TE_INSERT,
DEL = NdbDictionary::Event::_TE_DELETE,
UPD = NdbDictionary::Event::_TE_UPDATE,
NUL = NdbDictionary::Event::_TE_NUL,
ERR = 255
};
int t1, t2, t3;
} ev_t[] = {
{ Ev_t::INS, Ev_t::INS, Ev_t::ERR },
{ Ev_t::INS, Ev_t::DEL, Ev_t::NUL }, //ok
{ Ev_t::INS, Ev_t::UPD, Ev_t::INS }, //ok
{ Ev_t::DEL, Ev_t::INS, Ev_t::UPD }, //ok
{ Ev_t::DEL, Ev_t::DEL, Ev_t::ERR },
{ Ev_t::DEL, Ev_t::UPD, Ev_t::ERR },
{ Ev_t::UPD, Ev_t::INS, Ev_t::ERR },
{ Ev_t::UPD, Ev_t::DEL, Ev_t::DEL }, //ok
{ Ev_t::UPD, Ev_t::UPD, Ev_t::UPD } //ok
};
t_ptr->p= ptr;
t_ptr->sz= f_ptr_sz_1;
/*
* | INS | DEL | UPD
* 0 | pk ah + all ah | pk ah | pk ah + new ah
* 1 | pk ad + all ad | old pk ad | new pk ad + new ad
* 2 | empty | old non-pk ah+ad | old ah+ad
*/
memcpy(ptr, f_ptr[1].p, sizeof(Uint32)*f_ptr_sz_1);
ptr+= f_ptr_sz_1;
t_ptr++;
static AttributeHeader
copy_head(Uint32& i1, Uint32* p1, Uint32& i2, const Uint32* p2,
Uint32 flags)
{
AttributeHeader ah(p2[i2]);
bool do_copy = (flags & 1);
if (do_copy)
p1[i1] = p2[i2];
i1++;
i2++;
return ah;
}
if (f_ptr_sz_2)
static void
copy_attr(AttributeHeader ah,
Uint32& j1, Uint32* p1, Uint32& j2, const Uint32* p2,
Uint32 flags)
{
bool do_copy = (flags & 1);
bool with_head = (flags & 2);
Uint32 n = with_head + ah.getDataSize();
if (do_copy)
{
t_ptr->p= ptr;
t_ptr->sz= f_ptr_sz_2;
memcpy(ptr, f_ptr[2].p, sizeof(Uint32)*f_ptr_sz_2);
Uint32 k;
for (k = 0; k < n; k++)
p1[j1++] = p2[j2++];
}
else
{
t_ptr->p= 0;
t_ptr->sz= 0;
j1 += n;
j2 += n;
}
}
int
NdbEventBuffer::merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr2[3],
EventBufData* data)
{
DBUG_ENTER("NdbEventBuffer::merge_data");
Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
int t1 = data->sdata->operation;
int t2 = sdata->operation;
if (t1 == Ev_t::NUL)
DBUG_RETURN(copy_data(sdata, ptr2, data));
Ev_t* tp = 0;
int i;
for (i = 0; i < sizeof(ev_t)/sizeof(ev_t[0]); i++) {
if (ev_t[i].t1 == t1 && ev_t[i].t2 == t2) {
tp = &ev_t[i];
break;
}
}
assert(tp != 0 && tp->t3 != Ev_t::ERR);
// save old data
EventBufData olddata = *data;
data->memory = 0;
data->sz = 0;
// compose ptr1 o ptr2 = ptr
LinearSectionPtr (&ptr1) [3] = olddata.ptr;
LinearSectionPtr (&ptr) [3] = data->ptr;
// loop twice where first loop only sets sizes
int loop;
for (loop = 0; loop <= 1; loop++)
{
if (loop == 1)
{
if (alloc_mem(data, ptr) != 0)
DBUG_RETURN(-1);
*data->sdata = *sdata;
data->sdata->operation = tp->t3;
}
ptr[0].sz = ptr[1].sz = ptr[3].sz = 0;
// copy pk from new version
{
AttributeHeader ah;
Uint32 i = 0;
Uint32 j = 0;
Uint32 i2 = 0;
Uint32 j2 = 0;
while (i < nkey)
{
ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
}
ptr[0].sz = i;
ptr[1].sz = j;
}
// merge after values, new version overrides
if (tp->t3 != Ev_t::DEL)
{
AttributeHeader ah;
Uint32 i = ptr[0].sz;
Uint32 j = ptr[1].sz;
Uint32 i1 = 0;
Uint32 j1 = 0;
Uint32 i2 = nkey;
Uint32 j2 = ptr[1].sz;
while (i1 < nkey)
{
j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
}
while (1)
{
bool b1 = (i1 < ptr1[0].sz);
bool b2 = (i2 < ptr2[0].sz);
if (b1 && b2)
{
Uint32 id1 = AttributeHeader(ptr1[0].p[i1]).getAttributeId();
Uint32 id2 = AttributeHeader(ptr2[0].p[i2]).getAttributeId();
if (id1 < id2)
b2 = false;
else if (id1 > id2)
b1 = false;
else
{
j1 += AttributeHeader(ptr1[0].p[i1++]).getDataSize();
b1 = false;
}
}
if (b1)
{
ah = copy_head(i, ptr[0].p, i1, ptr1[0].p, loop);
copy_attr(ah, j, ptr[1].p, j1, ptr1[1].p, loop);
}
else if (b2)
{
ah = copy_head(i, ptr[0].p, i2, ptr2[0].p, loop);
copy_attr(ah, j, ptr[1].p, j2, ptr2[1].p, loop);
}
else
break;
}
ptr[0].sz = i;
ptr[1].sz = j;
}
// merge before values, old version overrides
if (tp->t3 != Ev_t::INS)
{
AttributeHeader ah;
Uint32 k = 0;
Uint32 k1 = 0;
Uint32 k2 = 0;
while (1)
{
bool b1 = (k1 < ptr1[2].sz);
bool b2 = (k2 < ptr2[2].sz);
if (b1 && b2)
{
Uint32 id1 = AttributeHeader(ptr1[2].p[k1]).getAttributeId();
Uint32 id2 = AttributeHeader(ptr2[2].p[k2]).getAttributeId();
if (id1 < id2)
b2 = false;
else if (id1 > id2)
b1 = false;
else
{
k2 += 1 + AttributeHeader(ptr2[2].p[k2]).getDataSize();
b2 = false;
}
}
if (b1)
{
ah = AttributeHeader(ptr1[2].p[k1]);
copy_attr(ah, k, ptr[2].p, k1, ptr1[2].p, loop | 2);
}
else if (b2)
{
ah = AttributeHeader(ptr2[2].p[k2]);
copy_attr(ah, k, ptr[2].p, k2, ptr2[2].p, loop | 2);
}
else
break;
}
ptr[2].sz = k;
}
}
// free old data
NdbMem_Free((char*)olddata.memory);
DBUG_RETURN(0);
}
......@@ -1399,5 +1671,107 @@ send_report:
#endif
}
// hash table routines
// could optimize the all-fixed case
Uint32
EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
{
const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
// for pk update (to equivalent pk) post/pre values give same hash
Uint32 nkey = tab->m_noOfKeys;
assert(nkey != 0 && nkey <= ptr[0].sz);
const Uint32* hptr = ptr[0].p;
const uchar* dptr = (uchar*)ptr[1].p;
// hash registers
ulong nr1 = 0;
ulong nr2 = 0;
while (nkey-- != 0)
{
AttributeHeader ah(*hptr++);
Uint32 bytesize = ah.getByteSize();
assert(dptr + bytesize <= (uchar*)(ptr[1].p + ptr[1].sz));
Uint32 i = ah.getAttributeId();
const NdbColumnImpl* col = tab->getColumn(i);
assert(col != 0);
Uint32 lb, len;
bool ok = NdbSqlUtil::get_var_length(col->m_type, dptr, bytesize, lb, len);
assert(ok);
CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
(*cs->coll->hash_sort)(cs, dptr + lb, len, &nr1, &nr2);
dptr += bytesize;
}
return nr1;
}
// this is seldom invoked
bool
EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3])
{
const NdbTableImpl* tab = op->m_eventImpl->m_tableImpl;
Uint32 nkey = tab->m_noOfKeys;
assert(nkey != 0 && nkey <= ptr1[0].sz && nkey <= ptr2[0].sz);
const Uint32* hptr1 = ptr1[0].p;
const Uint32* hptr2 = ptr2[0].p;
const uchar* dptr1 = (uchar*)ptr1[1].p;
const uchar* dptr2 = (uchar*)ptr2[1].p;
while (nkey-- != 0)
{
AttributeHeader ah1(*hptr1++);
AttributeHeader ah2(*hptr2++);
// sizes can differ on update of varchar endspace
Uint32 bytesize1 = ah1.getByteSize();
Uint32 bytesize2 = ah1.getByteSize();
assert(dptr1 + bytesize1 <= (uchar*)(ptr1[1].p + ptr1[1].sz));
assert(dptr2 + bytesize2 <= (uchar*)(ptr2[1].p + ptr2[1].sz));
assert(ah1.getAttributeId() == ah2.getAttributeId());
Uint32 i = ah1.getAttributeId();
const NdbColumnImpl* col = tab->getColumn(i);
assert(col != 0);
Uint32 lb1, len1;
bool ok1 = NdbSqlUtil::get_var_length(col->m_type, dptr1, bytesize1, lb1, len1);
Uint32 lb2, len2;
bool ok2 = NdbSqlUtil::get_var_length(col->m_type, dptr2, bytesize2, lb2, len2);
assert(ok1 && ok2 && lb1 == lb2);
CHARSET_INFO* cs = col->m_cs ? col->m_cs : &my_charset_bin;
int res = (cs->coll->strnncollsp)(cs, dptr1 + lb1, len1, dptr2 + lb2, len2, false);
if (res != 0)
return false;
dptr1 += bytesize1;
dptr2 += bytesize2;
}
return true;
}
void
EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
{
Uint32 pkhash = getpkhash(op, ptr);
Uint32 index = (op->m_oid ^ pkhash) % GCI_EVENT_HASH_SIZE;
EventBufData* data = m_hash[index];
while (data != 0)
{
if (data->m_event_op == op &&
data->m_pkhash == pkhash &&
getpkequal(op, data->ptr, ptr))
break;
data = data->m_next_hash;
}
hpos.index = index;
hpos.data = data;
hpos.pkhash = pkhash;
}
template class Vector<Gci_container>;
template class Vector<NdbEventBuffer::EventBufData_chunk*>;
......@@ -25,16 +25,19 @@
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
class NdbEventOperationImpl;
struct EventBufData
{
union {
SubTableData *sdata;
char *memory;
Uint32 *memory;
};
LinearSectionPtr ptr[3];
unsigned sz;
NdbEventOperationImpl *m_event_op;
EventBufData *m_next; // Next wrt to global order
EventBufData *m_next_hash; // Next in per-GCI hash
Uint32 m_pkhash; // PK hash (without op) for fast compare
};
class EventBufData_list
......@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list)
m_sz+= list.m_sz;
}
// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class EventBufData_hash
{
public:
struct Pos { // search result
Uint32 index; // index into hash array
EventBufData* data; // non-zero if found
Uint32 pkhash; // PK hash
};
static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);
void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
void append(Pos& hpos, EventBufData* data);
enum { GCI_EVENT_HASH_SIZE = 101 };
EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
};
inline
void EventBufData_hash::append(Pos& hpos, EventBufData* data)
{
data->m_next_hash = m_hash[hpos.index];
m_hash[hpos.index] = data;
}
struct Gci_container
{
enum State
......@@ -127,6 +158,7 @@ struct Gci_container
Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
Uint64 m_gci; // GCI
EventBufData_list m_data;
EventBufData_hash m_data_hash;
};
class NdbEventOperationImpl : public NdbEventOperation {
......@@ -173,6 +205,8 @@ public:
*/
Uint32 m_eventId;
Uint32 m_oid;
bool m_separateEvents;
EventBufData *m_data_item;
......@@ -212,7 +246,6 @@ public:
void add_op();
void remove_op();
void init_gci_containers();
Uint32 m_active_op_count;
// accessed from the "receive thread"
int insertDataL(NdbEventOperationImpl *op,
......@@ -233,10 +266,15 @@ public:
NdbEventOperationImpl *move_data();
// used by both user thread and receive thread
int copy_data_alloc(const SubTableData * const f_sdata,
LinearSectionPtr f_ptr[3],
EventBufData *ev_buf);
// routines to copy/merge events
EventBufData* alloc_data();
int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]);
int copy_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data);
int merge_data(const SubTableData * const sdata,
LinearSectionPtr ptr[3],
EventBufData* data);
void free_list(EventBufData_list &list);
......@@ -290,6 +328,8 @@ private:
// dropped event operations that have not yet
// been deleted
NdbEventOperationImpl *m_dropped_ev_op;
Uint32 m_active_op_count;
};
inline
......
......@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec
g_err << function << "Event operation creation failed\n";
return NDBT_FAILED;
}
pOp->separateEvents(true);
g_info << function << "get values\n";
NdbRecAttr* recAttr[1024];
......@@ -380,6 +381,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
g_err << "Event operation creation failed\n";
return NDBT_FAILED;
}
pOp->separateEvents(true);
g_info << "dropping event operation" << endl;
int res = pNdb->dropEventOperation(pOp);
......@@ -550,6 +552,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
g_err << "Event operation creation failed on %s" << buf << endl;
DBUG_RETURN(NDBT_FAILED);
}
pOp->separateEvents(true);
int i;
int n_columns= table->getNoOfColumns();
......@@ -1195,6 +1198,7 @@ static int createEventOperations(Ndb * ndb)
{
DBUG_RETURN(NDBT_FAILED);
}
pOp->separateEvents(true);
int n_columns= pTabs[i]->getNoOfColumns();
for (int j = 0; j < n_columns; j++)
......
......@@ -473,9 +473,9 @@ struct Op { // single or composite
Kind kind;
Type type;
Op* next_op; // within one commit
Op* next_com; // next commit chain or next event
Op* next_com; // next commit chain
Op* next_gci; // groups commit chains (unless --separate-events)
Op* next_ev;
Op* next_ev; // next event
Op* next_free; // free list
bool free; // on free list
uint num_op;
......@@ -564,6 +564,8 @@ static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-pre
static NdbBlob* g_ev_bh[2][g_maxcol]; // 0-post 1-pre
static Op* g_rec_ev;
static uint g_ev_pos[g_maxpk];
static uint g_num_gci = 0;
static uint g_num_ev = 0;
static Op*
getop(Op::Kind a_kind)
......@@ -651,6 +653,7 @@ resetmem()
}
}
assert(g_usedops == 0);
g_num_gci = g_num_ev = 0;
}
struct Comp {
......@@ -877,9 +880,8 @@ createeventop()
chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName(), bsz)) != 0);
#else
chkdb((g_evt_op = g_ndb->createEventOperation(g_evt->getName())) != 0);
#ifdef version51rbr
// available in gci merge changeset
g_evt_op->separateEvents(g_opts.separate_events); // not yet inherited
#endif
#endif
uint i;
for (i = 0; i < ncol(); i++) {
......@@ -1203,8 +1205,9 @@ makeops()
// copy to gci level
copyop(com_op, gci_op);
tot_op->num_com += 1;
g_num_gci += 1;
}
ll1("makeops: used ops = " << g_usedops);
ll1("makeops: used ops = " << g_usedops << " com ops = " << g_num_gci);
}
static int
......@@ -1341,12 +1344,13 @@ mergeops()
gci_op2 = gci_op2->next_gci;
freeop(tmp_op);
mergecnt++;
assert(g_num_gci != 0);
g_num_gci--;
}
gci_op = gci_op->next_gci = gci_op2;
}
}
ll1("mergeops: used ops = " << g_usedops);
ll1("mergeops: merged " << mergecnt << " gci entries");
ll1("mergeops: used ops = " << g_usedops << " gci ops = " << g_num_gci);
return 0;
}
......@@ -1504,27 +1508,37 @@ matchevents()
static int
matchops()
{
ll1("matchops");
uint nomatch = 0;
Uint32 pk1;
for (pk1 = 0; pk1 < g_opts.maxpk; pk1++) {
Op* tot_op = g_pk_op[pk1];
if (tot_op == 0)
continue;
Op* com_op = tot_op->next_com;
while (com_op != 0) {
if (com_op->type != Op::NUL && ! com_op->match) {
Op* gci_op = tot_op->next_gci;
while (gci_op != 0) {
if (gci_op->type == Op::NUL) {
ll2("GCI: " << *gci_op << " [skip NUL]");
} else if (gci_op->match) {
ll2("GCI: " << *gci_op << " [match OK]");
} else {
ll0("GCI: " << *gci_op);
Op* com_op = gci_op->next_com;
assert(com_op != 0);
ll0("COM: " << *com_op);
Op* op = com_op->next_op;
assert(op != 0);
while (op != 0) {
ll0("---: " << *op);
ll0("OP : " << *op);
op = op->next_op;
}
ll0("no matching event");
return -1;
nomatch++;
}
com_op = com_op->next_com;
gci_op = gci_op->next_gci;
}
}
chkrc(nomatch == 0);
return 0;
}
......@@ -1619,9 +1633,10 @@ runevents()
Op* ev = getop(Op::EV);
copyop(g_rec_ev, ev);
last_ev->next_ev = ev;
g_num_ev++;
}
}
ll1("runevents: used ops = " << g_usedops);
ll1("runevents: used ops = " << g_usedops << " events = " << g_num_ev);
return 0;
}
......@@ -1666,6 +1681,7 @@ runtest()
chkrc(mergeops() == 0);
cmppostpre();
chkrc(runevents() == 0);
ll0("counts: gci = " << g_num_gci << " ev = " << g_num_ev);
chkrc(matchevents() == 0);
chkrc(matchops() == 0);
chkrc(dropeventop() == 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment