diff --git a/storage/ndb/test/tools/listen.cpp b/storage/ndb/test/tools/listen.cpp index 661193bf4b8221389a68069c97af2b3a90d0ecb9..97c307e9c15f5fced1d48079143a59b5cdfbec7f 100644 --- a/storage/ndb/test/tools/listen.cpp +++ b/storage/ndb/test/tools/listen.cpp @@ -22,6 +22,128 @@ #include <getarg.h> +#define BATCH_SIZE 128 +struct Table_info +{ + Uint32 id; +}; + +struct Trans_arg +{ + Ndb *ndb; + NdbTransaction *trans; + Uint32 bytes_batched; +}; + +Vector< Vector<NdbRecAttr*> > event_values; +Vector< Vector<NdbRecAttr*> > event_pre_values; +Vector<struct Table_info> table_infos; + +static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg) +{ + trans_arg.ndb = ndb; + trans_arg.trans = ndb->startTransaction(); + trans_arg.bytes_batched = 0; +} + +static void do_equal(NdbOperation *op, + NdbEventOperation *pOp) +{ + struct Table_info *ti = (struct Table_info *)pOp->getCustomData(); + Vector<NdbRecAttr*> &ev = event_values[ti->id]; + const NdbDictionary::Table *tab= pOp->getTable(); + unsigned i, n_columns = tab->getNoOfColumns(); + for (i= 0; i < n_columns; i++) + { + if (tab->getColumn(i)->getPrimaryKey() && + op->equal(i, ev[i]->aRef())) + { + abort(); + } + } +} + +static void do_set_value(NdbOperation *op, + NdbEventOperation *pOp) +{ + struct Table_info *ti = (struct Table_info *)pOp->getCustomData(); + Vector<NdbRecAttr*> &ev = event_values[ti->id]; + const NdbDictionary::Table *tab= pOp->getTable(); + unsigned i, n_columns = tab->getNoOfColumns(); + for (i= 0; i < n_columns; i++) + { + if (!tab->getColumn(i)->getPrimaryKey() && + op->setValue(i, ev[i]->aRef())) + { + abort(); + } + } +} + +static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->writeTuple(); + + do_equal(op, pOp); + do_set_value(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->writeTuple(); + + do_equal(op, pOp); + do_set_value(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp) +{ + if (!trans_arg.trans) + return; + + NdbOperation *op = + trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName()); + op->deleteTuple(); + + do_equal(op, pOp); + + trans_arg.bytes_batched++; + if (trans_arg.bytes_batched > BATCH_SIZE) + { + trans_arg.trans->execute(NdbTransaction::NoCommit); + trans_arg.bytes_batched = 0; + } +} +static void do_commit(struct Trans_arg &trans_arg) +{ + if (!trans_arg.trans) + return; + trans_arg.trans->execute(NdbTransaction::Commit); + trans_arg.ndb->closeTransaction(trans_arg.trans); +} + int main(int argc, const char** argv){ ndb_init(); @@ -29,8 +151,14 @@ main(int argc, const char** argv){ int _help = 0; const char* db = 0; + const char* connectstring1 = 0; + const char* connectstring2 = 0; struct getargs args[] = { + { "connectstring1", 'c', + arg_string, &connectstring1, "connectstring1", "" }, + { "connectstring2", 'C', + arg_string, &connectstring2, "connectstring2", "" }, { "database", 'd', arg_string, &db, "Database", "" }, { "usage", '?', arg_flag, &_help, "Print help", "" } }; @@ -46,7 +174,7 @@ main(int argc, const char** argv){ } // Connect to Ndb - Ndb_cluster_connection con; + Ndb_cluster_connection con(connectstring1); if(con.connect(12, 5, 1) != 0) { return NDBT_ProgramExit(NDBT_FAILED); @@ -61,12 +189,35 @@ main(int argc, const char** argv){ // Connect to Ndb and wait for it to become ready while(MyNdb.waitUntilReady() != 0) ndbout << "Waiting for ndb to become ready..." << endl; - + + Ndb_cluster_connection *con2 = NULL; + Ndb *ndb2 = NULL; + if (connectstring2) + { + con2 = new Ndb_cluster_connection(connectstring2); + + if(con2->connect(12, 5, 1) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } + ndb2 = new Ndb( con2, db ? db : "TEST_DB" ); + + if(ndb2->init() != 0){ + ERR(ndb2->getNdbError()); + return NDBT_ProgramExit(NDBT_FAILED); + } + + // Connect to Ndb and wait for it to become ready + while(ndb2->waitUntilReady() != 0) + ndbout << "Waiting for ndb to become ready..." << endl; + } + int result = 0; NdbDictionary::Dictionary *myDict = MyNdb.getDictionary(); Vector<NdbDictionary::Event*> events; Vector<NdbEventOperation*> event_ops; + int sz = 0; for(i= optind; i<argc; i++) { const NdbDictionary::Table* table= myDict->getTable(argv[i]); @@ -121,12 +272,23 @@ main(int argc, const char** argv){ goto end; } + event_values.push_back(Vector<NdbRecAttr *>()); + event_pre_values.push_back(Vector<NdbRecAttr *>()); for (int a = 0; a < table->getNoOfColumns(); a++) { - pOp->getValue(table->getColumn(a)->getName()); - pOp->getPreValue(table->getColumn(a)->getName()); + event_values[sz]. + push_back(pOp->getValue(table->getColumn(a)->getName())); + event_pre_values[sz]. + push_back(pOp->getPreValue(table->getColumn(a)->getName())); } event_ops.push_back(pOp); + { + struct Table_info ti; + ti.id = sz; + table_infos.push_back(ti); + } + pOp->setCustomData((void *)&table_infos[sz]); + sz++; } for(i= 0; i<(int)event_ops.size(); i++) @@ -140,6 +302,7 @@ main(int argc, const char** argv){ } } + struct Trans_arg trans_arg; while(true) { while(MyNdb.pollEvents(100) == 0); @@ -149,18 +312,26 @@ main(int argc, const char** argv){ { Uint64 gci= pOp->getGCI(); Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0; + if (ndb2) + do_begin(ndb2, trans_arg); do { switch(pOp->getEventType()) { case NdbDictionary::Event::TE_INSERT: cnt_i++; + if (ndb2) + do_insert(trans_arg, pOp); break; case NdbDictionary::Event::TE_DELETE: cnt_d++; + if (ndb2) + do_delete(trans_arg, pOp); break; case NdbDictionary::Event::TE_UPDATE: cnt_u++; + if (ndb2) + do_update(trans_arg, pOp); break; case NdbDictionary::Event::TE_CLUSTER_FAILURE: break; @@ -180,12 +351,21 @@ main(int argc, const char** argv){ abort(); } } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI()); + if (ndb2) + do_commit(trans_arg); ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d); } } end: + if (ndb2) + delete ndb2; + if (con2) + delete con2; return NDBT_ProgramExit(NDBT_OK); } +template class Vector<struct Table_info>; +template class Vector<NdbRecAttr*>; +template class Vector< Vector<NdbRecAttr*> >; template class Vector<NdbDictionary::Event*>; template class Vector<NdbEventOperation*>;