diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp index ee0f194211b9846b347009b26770c48c748c5ca7..812f071e03702f7d56baa37e62a3ca8cc6994d80 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp @@ -179,7 +179,8 @@ Dbtup::dealloc_tuple(Signal* signal, &disk, tmpptr, gci); } - if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL) + if (! (bits & (Tuple_header::LCP_SKIP | Tuple_header::ALLOC)) && + lcpScan_ptr_i != RNIL) { ScanOpPtr scanOp; c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i); diff --git a/storage/ndb/test/include/HugoTransactions.hpp b/storage/ndb/test/include/HugoTransactions.hpp index caed577f8c97572ac98d3fb54338e8e52f58d0df..e2b12f261a87c8a25abda121191b941134930382 100644 --- a/storage/ndb/test/include/HugoTransactions.hpp +++ b/storage/ndb/test/include/HugoTransactions.hpp @@ -20,7 +20,7 @@ #include <NDBT.hpp> #include <HugoCalculator.hpp> #include <HugoOperations.hpp> - +class NDBT_Stats; class HugoTransactions : public HugoOperations { public: @@ -109,10 +109,24 @@ public: void setRetryMax(int retryMax = 100) { m_retryMax = retryMax; } Uint32 m_latest_gci; + + void setStatsLatency(NDBT_Stats* stats) { m_stats_latency = stats; } + + // allows multiple threads to update separate batches + void setThrInfo(int thr_count, int thr_no) { + m_thr_count = thr_count; + m_thr_no = thr_no; + } + protected: NDBT_ResultRow row; int m_defaultScanUpdateMethod; int m_retryMax; + + NDBT_Stats* m_stats_latency; + + int m_thr_count; // 0 if no separation between threads + int m_thr_no; }; diff --git a/storage/ndb/test/include/NDBT_Thread.hpp b/storage/ndb/test/include/NDBT_Thread.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5b724991b29105a4610ef3f67071969c22d2f0be --- /dev/null +++ b/storage/ndb/test/include/NDBT_Thread.hpp @@ -0,0 +1,226 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef NDB_THREAD_HPP +#define NDB_THREAD_HPP + +#include <NdbMutex.h> +#include <NdbCondition.h> +#include <NdbThread.h> + +// NDBT_Thread ctor -> NDBT_Thread_run -> thr.run() +extern "C" { +static void* NDBT_Thread_run(void* arg); +} + +// Function to run in a thread. + +typedef void NDBT_ThreadFunc(class NDBT_Thread&); + +/* + * NDBT_Thread + * + * Represents a thread. The thread pauses at startup. + * Main process sets a function to run. When the function + * returns, the thread pauses again to wait for a command. + * This allows main process to sync with the thread and + * exchange data with it. + * + * Input to thread is typically options. The input area + * is read-only in the thread. Output from thread is + * results such as statistics. Error code is handled + * separately. + * + * Pointer to Ndb object and method to create it are + * provided for convenience. + */ + +class NDBT_ThreadSet; + +class NDBT_Thread { +public: + NDBT_Thread(); + NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no); + void create(NDBT_ThreadSet* thread_set, int thread_no); + ~NDBT_Thread(); + + // if part of a set + inline NDBT_ThreadSet& get_thread_set() const { + assert(m_thread_set != 0); + return *m_thread_set; + } + inline int get_thread_no() const { + return m_thread_no; + } + + // { Wait -> Start -> Stop }+ -> Exit + enum State { + Wait = 1, // wait for command + Start, // run current function + Stop, // stopped (paused) when current function done + Exit // exit thread + }; + + // tell thread to start running current function + void start(); + // wait for thread to stop when function is done + void stop(); + // tell thread to exit + void exit(); + // collect thread after exit + void join(); + + // set function to run + inline void set_func(NDBT_ThreadFunc* func) { + m_func = func; + } + + // input area + inline void set_input(const void* input) { + m_input = input; + } + inline const void* get_input() const { + return m_input; + } + + // output area + inline void set_output(void* output) { + m_output = output; + } + inline void* get_output() const { + return m_output; + } + template <class T> inline void set_output() { + set_output(new T); + } + inline void delete_output() { + delete m_output; + m_output = 0; + } + + // thread-specific Ndb object + inline class Ndb* get_ndb() const { + return m_ndb; + } + int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB"); + void disconnect(); + + // error code (OS, Ndb, other) + void clear_err() { + m_err = 0; + } + void set_err(int err) { + m_err = err; + } + int get_err() const { + return m_err; + } + +private: + friend class NDBT_ThreadSet; + friend void* NDBT_Thread_run(void* arg); + + enum { Magic = 0xabacadae }; + Uint32 m_magic; + + State m_state; + NDBT_ThreadSet* m_thread_set; + int m_thread_no; + + NDBT_ThreadFunc* m_func; + const void* m_input; + void* m_output; + class Ndb* m_ndb; + int m_err; + + // run the thread + void run(); + + void lock() { + NdbMutex_Lock(m_mutex); + } + void unlock() { + NdbMutex_Unlock(m_mutex); + } + + void wait() { + NdbCondition_Wait(m_cond, m_mutex); + } + void signal() { + NdbCondition_Signal(m_cond); + } + + NdbMutex* m_mutex; + NdbCondition* m_cond; + NdbThread* m_thread; + void* m_status; +}; + +/* + * A set of threads, indexed from 0 to count-1. Methods + * are applied to each thread (serially). Input area is + * common to all threads. Output areas are allocated + * separately according to a template class. + */ + +class NDBT_ThreadSet { +public: + NDBT_ThreadSet(int count); + ~NDBT_ThreadSet(); + + inline int get_count() const { + return m_count; + } + inline NDBT_Thread& get_thread(int n) { + assert(n < m_count && m_thread[n] != 0); + return *m_thread[n]; + } + + // tell each thread to start running + void start(); + // wait for each thread to stop + void stop(); + // tell each thread to exit + void exit(); + // collect each thread after exit + void join(); + + // set function to run in each thread + void set_func(NDBT_ThreadFunc* func); + + // set input area (same instance in each thread) + void set_input(const void* input); + + // set output areas + template <class T> inline void set_output() { + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.set_output<T>(); + } + } + void delete_output(); + + // thread-specific Ndb objects + int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB"); + void disconnect(); + + int get_err() const; + +private: + int m_count; + NDBT_Thread** m_thread; +}; + +#endif diff --git a/storage/ndb/test/src/HugoTransactions.cpp b/storage/ndb/test/src/HugoTransactions.cpp index 456782f47268835c422a9e92b4bb80efe519d285..3a1600815e014043c3b14e46627720080ef0f272 100644 --- a/storage/ndb/test/src/HugoTransactions.cpp +++ b/storage/ndb/test/src/HugoTransactions.cpp @@ -14,8 +14,9 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "HugoTransactions.hpp" +#include <NDBT_Stats.hpp> #include <NdbSleep.h> - +#include <NdbTick.h> HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab, const NdbDictionary::Index* idx): @@ -24,6 +25,10 @@ HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab, m_defaultScanUpdateMethod = 3; setRetryMax(); + m_stats_latency = 0; + + m_thr_count = 0; + m_thr_no = -1; } HugoTransactions::~HugoTransactions(){ @@ -820,6 +825,16 @@ HugoTransactions::pkReadRecords(Ndb* pNdb, return NDBT_FAILED; } + MicroSecondTimer timer_start; + MicroSecondTimer timer_stop; + bool timer_active = + m_stats_latency != 0 && + r >= batch && // first batch is "warmup" + r + batch != records; // last batch is usually partial + + if (timer_active) + NdbTick_getMicroTimer(&timer_start); + if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK) { ERR(pTrans->getNdbError()); @@ -892,6 +907,12 @@ HugoTransactions::pkReadRecords(Ndb* pNdb, } closeTransaction(pNdb); + + if (timer_active) { + NdbTick_getMicroTimer(&timer_stop); + NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop); + m_stats_latency->addObservation((double)ticks); + } } deallocRows(); g_info << reads << " records read" << endl; @@ -913,9 +934,17 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb, allocRows(batch); g_info << "|- Updating records (batch=" << batch << ")..." << endl; + int batch_no = 0; while (r < records){ if(r + batch > records) batch = records - r; + + if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count) + { + r += batch; + batch_no++; + continue; + } if (retryAttempt >= m_retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt @@ -963,6 +992,16 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb, return NDBT_FAILED; } + MicroSecondTimer timer_start; + MicroSecondTimer timer_stop; + bool timer_active = + m_stats_latency != 0 && + r >= batch && // first batch is "warmup" + r + batch != records; // last batch is usually partial + + if (timer_active) + NdbTick_getMicroTimer(&timer_start); + if(pIndexScanOp) { int rows_found = 0; @@ -1039,8 +1078,15 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb, } closeTransaction(pNdb); - + + if (timer_active) { + NdbTick_getMicroTimer(&timer_stop); + NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop); + m_stats_latency->addObservation((double)ticks); + } + r += batch; // Read next record + batch_no++; } deallocRows(); @@ -1228,10 +1274,18 @@ HugoTransactions::pkDelRecords(Ndb* pNdb, int check; g_info << "|- Deleting records..." << endl; + int batch_no = 0; while (r < records){ if(r + batch > records) batch = records - r; + if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count) + { + r += batch; + batch_no++; + continue; + } + if (retryAttempt >= m_retryMax){ g_info << "ERROR: has retried this operation " << retryAttempt << " times, failing!" << endl; @@ -1255,6 +1309,16 @@ HugoTransactions::pkDelRecords(Ndb* pNdb, return NDBT_FAILED; } + MicroSecondTimer timer_start; + MicroSecondTimer timer_stop; + bool timer_active = + m_stats_latency != 0 && + r >= batch && // first batch is "warmup" + r + batch != records; // last batch is usually partial + + if (timer_active) + NdbTick_getMicroTimer(&timer_start); + if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK) { ERR(pTrans->getNdbError()); @@ -1303,9 +1367,15 @@ HugoTransactions::pkDelRecords(Ndb* pNdb, m_latest_gci = pTrans->getGCI(); } closeTransaction(pNdb); - - r += batch; // Read next record + if (timer_active) { + NdbTick_getMicroTimer(&timer_stop); + NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop); + m_stats_latency->addObservation((double)ticks); + } + + r += batch; // Read next record + batch_no++; } g_info << "|- " << deleted << " records deleted" << endl; diff --git a/storage/ndb/test/src/Makefile.am b/storage/ndb/test/src/Makefile.am index 37f6497e508f5b02601ba8412fc9ee7a2037f49a..a025579cb7238aff0617df8a3ff07f112936ebb5 100644 --- a/storage/ndb/test/src/Makefile.am +++ b/storage/ndb/test/src/Makefile.am @@ -24,7 +24,7 @@ libNDBT_a_SOURCES = \ NdbRestarter.cpp NdbRestarts.cpp NDBT_Output.cpp \ NdbBackup.cpp NdbConfig.cpp NdbGrep.cpp NDBT_Table.cpp \ NdbSchemaCon.cpp NdbSchemaOp.cpp getarg.c \ - CpcClient.cpp NdbMixRestarter.cpp + CpcClient.cpp NdbMixRestarter.cpp NDBT_Thread.cpp INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/common/mgmcommon -I$(top_srcdir)/storage/ndb/include/mgmcommon -I$(top_srcdir)/storage/ndb/include/kernel -I$(top_srcdir)/storage/ndb/src/mgmapi diff --git a/storage/ndb/test/src/NDBT_Thread.cpp b/storage/ndb/test/src/NDBT_Thread.cpp new file mode 100644 index 0000000000000000000000000000000000000000..56cf2f6815b630e5ab8de8a87c29316958760921 --- /dev/null +++ b/storage/ndb/test/src/NDBT_Thread.cpp @@ -0,0 +1,283 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> +#include <NDBT_Thread.hpp> +#include <NdbApi.hpp> + +NDBT_Thread::NDBT_Thread() +{ + create(0, -1); +} + +NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no) +{ + create(thread_set, thread_no); +} + +void +NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no) +{ + m_magic = NDBT_Thread::Magic; + + m_state = Wait; + m_thread_set = thread_set; + m_thread_no = thread_no; + m_func = 0; + m_input = 0; + m_output = 0; + m_ndb = 0; + m_err = 0; + + m_mutex = NdbMutex_Create(); + assert(m_mutex != 0); + m_cond = NdbCondition_Create(); + assert(m_cond != 0); + + char buf[20]; + sprintf(buf, "NDBT_%04u"); + const char* name = strdup(buf); + assert(name != 0); + + unsigned stacksize = 512 * 1024; + NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW; + m_thread = NdbThread_Create(NDBT_Thread_run, + (void**)this, stacksize, name, prio); + assert(m_thread != 0); +} + +NDBT_Thread::~NDBT_Thread() +{ + if (m_thread != 0) { + NdbThread_Destroy(&m_thread); + m_thread = 0; + } + if (m_cond != 0) { + NdbCondition_Destroy(m_cond); + m_cond = 0; + } + if (m_mutex != 0) { + NdbMutex_Destroy(m_mutex); + m_mutex = 0; + } +} + +static void* +NDBT_Thread_run(void* arg) +{ + assert(arg != 0); + NDBT_Thread& thr = *(NDBT_Thread*)arg; + assert(thr.m_magic == NDBT_Thread::Magic); + thr.run(); + return 0; +} + +void +NDBT_Thread::run() +{ + while (1) { + lock(); + while (m_state != Start && m_state != Exit) { + wait(); + } + if (m_state == Exit) { + unlock(); + break; + } + (*m_func)(*this); + m_state = Stop; + signal(); + unlock(); + } +} + +// methods for main process + +void +NDBT_Thread::start() +{ + lock(); + m_state = Start; + signal(); + unlock(); +} + +void +NDBT_Thread::stop() +{ + lock(); + while (m_state != Stop) + wait(); + m_state = Wait; + unlock(); +} + +void +NDBT_Thread::exit() +{ + lock(); + m_state = Exit; + signal(); + unlock(); +}; + +void +NDBT_Thread::join() +{ + NdbThread_WaitFor(m_thread, &m_status); + m_thread = 0; +} + +int +NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db) +{ + m_ndb = new Ndb(ncc, db); + if (m_ndb->init() == -1 || + m_ndb->waitUntilReady() == -1) { + m_err = m_ndb->getNdbError().code; + return -1; + } + return 0; +} + +void +NDBT_Thread::disconnect() +{ + delete m_ndb; + m_ndb = 0; +} + +// set of threads + +NDBT_ThreadSet::NDBT_ThreadSet(int count) +{ + m_count = count; + m_thread = new NDBT_Thread* [count]; + for (int n = 0; n < count; n++) { + m_thread[n] = new NDBT_Thread(this, n); + } +} + +NDBT_ThreadSet::~NDBT_ThreadSet() +{ + delete_output(); + for (int n = 0; n < m_count; n++) { + delete m_thread[n]; + m_thread[n] = 0; + } + delete [] m_thread; +} + +void +NDBT_ThreadSet::start() +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.start(); + } +} + +void +NDBT_ThreadSet::stop() +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.stop(); + } +} + +void +NDBT_ThreadSet::exit() +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.exit(); + } +} + +void +NDBT_ThreadSet::join() +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.join(); + } +} + +void +NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func) +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.set_func(func); + } +} + +void +NDBT_ThreadSet::set_input(const void* input) +{ + for (int n = 0; n < m_count; n++) { + NDBT_Thread& thr = *m_thread[n]; + thr.set_input(input); + } +} + +void +NDBT_ThreadSet::delete_output() +{ + for (int n = 0; n < m_count; n++) { + if (m_thread[n] != 0) { + NDBT_Thread& thr = *m_thread[n]; + thr.delete_output(); + } + } +} + +int +NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db) +{ + for (int n = 0; n < m_count; n++) { + assert(m_thread[n] != 0); + NDBT_Thread& thr = *m_thread[n]; + if (thr.connect(ncc, db) == -1) + return -1; + } + return 0; +} + +void +NDBT_ThreadSet::disconnect() +{ + for (int n = 0; n < m_count; n++) { + if (m_thread[n] != 0) { + NDBT_Thread& thr = *m_thread[n]; + thr.disconnect(); + } + } +} + +int +NDBT_ThreadSet::get_err() const +{ + for (int n = 0; n < m_count; n++) { + if (m_thread[n] != 0) { + NDBT_Thread& thr = *m_thread[n]; + int err = thr.get_err(); + if (err != 0) + return err; + } + } + return 0; +} diff --git a/storage/ndb/test/tools/hugoPkDelete.cpp b/storage/ndb/test/tools/hugoPkDelete.cpp index b185eacdddf37cb6cea05eebd6bd540b08174f66..aa8e6c654a73c3d0275243cd9615bbb406bff13b 100644 --- a/storage/ndb/test/tools/hugoPkDelete.cpp +++ b/storage/ndb/test/tools/hugoPkDelete.cpp @@ -20,22 +20,41 @@ #include <NdbApi.hpp> #include <NdbMain.h> #include <NDBT.hpp> +#include <NDBT_Thread.hpp> +#include <NDBT_Stats.hpp> #include <NdbSleep.h> #include <getarg.h> #include <HugoTransactions.hpp> +static NDBT_ThreadFunc hugoPkDelete; + +struct ThrInput { + const NdbDictionary::Table* pTab; + int records; + int batch; + int stats; +}; + +struct ThrOutput { + NDBT_Stats latency; +}; + int main(int argc, const char** argv){ ndb_init(); int _records = 0; int _loops = 1; - int _batch = 0; + int _threads = 1; + int _stats = 0; + int _batch = 1; const char* _tabname = NULL; int _help = 0; struct getargs args[] = { { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, + { "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" }, + { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" }, // { "batch", 'b', arg_integer, &_batch, "batch value", "batch" }, { "records", 'r', arg_integer, &_records, "Number of records", "records" }, { "usage", '?', arg_flag, &_help, "Print help", "" } @@ -81,12 +100,57 @@ int main(int argc, const char** argv){ return NDBT_ProgramExit(NDBT_WRONGARGS); } - HugoTransactions hugoTrans(*pTab); + // threads + NDBT_ThreadSet ths(_threads); + + // create Ndb object for each thread + if (ths.connect(&con, "TEST_DB") == -1) { + ndbout << "connect failed: err=" << ths.get_err() << endl; + return NDBT_ProgramExit(NDBT_FAILED); + } + + // input is options + ThrInput input; + ths.set_input(&input); + input.pTab = pTab; + input.records = _records; + input.batch = _batch; + input.stats = _stats; + + // output is stats + ThrOutput output; + ths.set_output<ThrOutput>(); + int i = 0; - while (i<_loops || _loops==0) { + while (i < _loops || _loops == 0) { ndbout << i << ": "; - if (hugoTrans.pkDelRecords(&MyNdb, _records) != 0){ - return NDBT_ProgramExit(NDBT_FAILED); + + ths.set_func(hugoPkDelete); + ths.start(); + ths.stop(); + + if (ths.get_err()) + NDBT_ProgramExit(NDBT_FAILED); + + if (_stats) { + NDBT_Stats latency; + + // add stats from each thread + int n; + for (n = 0; n < ths.get_count(); n++) { + NDBT_Thread& thr = ths.get_thread(n); + ThrOutput* output = (ThrOutput*)thr.get_output(); + latency += output->latency; + } + + ndbout + << "latency per batch (us): " + << " samples=" << latency.getCount() + << " min=" << (int)latency.getMin() + << " max=" << (int)latency.getMax() + << " mean=" << (int)latency.getMean() + << " stddev=" << (int)latency.getStddev() + << endl; } i++; } @@ -94,3 +158,23 @@ int main(int argc, const char** argv){ return NDBT_ProgramExit(NDBT_OK); } +static void hugoPkDelete(NDBT_Thread& thr) +{ + const ThrInput* input = (const ThrInput*)thr.get_input(); + ThrOutput* output = (ThrOutput*)thr.get_output(); + + HugoTransactions hugoTrans(*input->pTab); + output->latency.reset(); + if (input->stats) + hugoTrans.setStatsLatency(&output->latency); + + NDBT_ThreadSet& ths = thr.get_thread_set(); + hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no()); + + int ret; + ret = hugoTrans.pkDelRecords(thr.get_ndb(), + input->records, + input->batch); + if (ret != 0) + thr.set_err(ret); +} diff --git a/storage/ndb/test/tools/hugoPkRead.cpp b/storage/ndb/test/tools/hugoPkRead.cpp index dd14203c16e0f034ad0d10bc5da2e5893f78f36c..232f55b35b8b5619244ed7d33540018605c52301 100644 --- a/storage/ndb/test/tools/hugoPkRead.cpp +++ b/storage/ndb/test/tools/hugoPkRead.cpp @@ -20,17 +20,33 @@ #include <NdbApi.hpp> #include <NdbMain.h> #include <NDBT.hpp> +#include <NDBT_Thread.hpp> +#include <NDBT_Stats.hpp> #include <NdbSleep.h> #include <getarg.h> #include <HugoTransactions.hpp> +static NDBT_ThreadFunc hugoPkRead; + +struct ThrInput { + const NdbDictionary::Table* pTab; + int records; + int batch; + int stats; +}; + +struct ThrOutput { + NDBT_Stats latency; +}; int main(int argc, const char** argv){ ndb_init(); int _records = 0; int _loops = 1; + int _threads = 1; + int _stats = 0; int _abort = 0; int _batch = 1; const char* _tabname = NULL; @@ -39,6 +55,8 @@ int main(int argc, const char** argv){ struct getargs args[] = { { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" }, { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, + { "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" }, + { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" }, { "batch", 'b', arg_integer, &_batch, "batch value(not 0)", "batch" }, { "records", 'r', arg_integer, &_records, "Number of records", "records" }, { "usage", '?', arg_flag, &_help, "Print help", "" } @@ -64,6 +82,7 @@ int main(int argc, const char** argv){ { return NDBT_ProgramExit(NDBT_FAILED); } + Ndb MyNdb(&con, "TEST_DB" ); if(MyNdb.init() != 0){ @@ -81,12 +100,57 @@ int main(int argc, const char** argv){ return NDBT_ProgramExit(NDBT_WRONGARGS); } - HugoTransactions hugoTrans(*pTab); + // threads + NDBT_ThreadSet ths(_threads); + + // create Ndb object for each thread + if (ths.connect(&con, "TEST_DB") == -1) { + ndbout << "connect failed: err=" << ths.get_err() << endl; + return NDBT_ProgramExit(NDBT_FAILED); + } + + // input is options + ThrInput input; + ths.set_input(&input); + input.pTab = pTab; + input.records = _records; + input.batch = _batch; + input.stats = _stats; + + // output is stats + ThrOutput output; + ths.set_output<ThrOutput>(); + int i = 0; - while (i<_loops || _loops==0) { + while (i < _loops || _loops == 0) { ndbout << i << ": "; - if (hugoTrans.pkReadRecords(&MyNdb, _records, _batch) != 0){ - return NDBT_ProgramExit(NDBT_FAILED); + + ths.set_func(hugoPkRead); + ths.start(); + ths.stop(); + + if (ths.get_err()) + NDBT_ProgramExit(NDBT_FAILED); + + if (_stats) { + NDBT_Stats latency; + + // add stats from each thread + int n; + for (n = 0; n < ths.get_count(); n++) { + NDBT_Thread& thr = ths.get_thread(n); + ThrOutput* output = (ThrOutput*)thr.get_output(); + latency += output->latency; + } + + ndbout + << "latency per batch (us): " + << " samples=" << latency.getCount() + << " min=" << (int)latency.getMin() + << " max=" << (int)latency.getMax() + << " mean=" << (int)latency.getMean() + << " stddev=" << (int)latency.getStddev() + << endl; } i++; } @@ -94,3 +158,20 @@ int main(int argc, const char** argv){ return NDBT_ProgramExit(NDBT_OK); } +static void hugoPkRead(NDBT_Thread& thr) +{ + const ThrInput* input = (const ThrInput*)thr.get_input(); + ThrOutput* output = (ThrOutput*)thr.get_output(); + + HugoTransactions hugoTrans(*input->pTab); + output->latency.reset(); + if (input->stats) + hugoTrans.setStatsLatency(&output->latency); + + int ret; + ret = hugoTrans.pkReadRecords(thr.get_ndb(), + input->records, + input->batch); + if (ret != 0) + thr.set_err(ret); +} diff --git a/storage/ndb/test/tools/hugoPkUpdate.cpp b/storage/ndb/test/tools/hugoPkUpdate.cpp index 3e950bc96cd6f34c234bf7b941dd396beca3db76..b920a4f396ac14eff9355725a623b4c59b01b89c 100644 --- a/storage/ndb/test/tools/hugoPkUpdate.cpp +++ b/storage/ndb/test/tools/hugoPkUpdate.cpp @@ -20,24 +20,43 @@ #include <NdbApi.hpp> #include <NdbMain.h> #include <NDBT.hpp> +#include <NDBT_Thread.hpp> +#include <NDBT_Stats.hpp> #include <NdbSleep.h> #include <getarg.h> #include <HugoTransactions.hpp> +static NDBT_ThreadFunc hugoPkUpdate; + +struct ThrInput { + const NdbDictionary::Table* pTab; + int records; + int batch; + int stats; +}; + +struct ThrOutput { + NDBT_Stats latency; +}; + int main(int argc, const char** argv){ ndb_init(); int _records = 0; int _loops = 1; + int _threads = 1; + int _stats = 0; int _abort = 0; - int _batch = 0; + int _batch = 1; const char* _tabname = NULL, *db = 0; int _help = 0; struct getargs args[] = { { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" }, { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, + { "threads", 't', arg_integer, &_threads, "number of threads (default 1)", "threads" }, + { "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" }, // { "batch", 'b', arg_integer, &_batch, "batch value", "batch" }, { "records", 'r', arg_integer, &_records, "Number of records", "records" }, { "usage", '?', arg_flag, &_help, "Print help", "" }, @@ -83,16 +102,81 @@ int main(int argc, const char** argv){ return NDBT_ProgramExit(NDBT_WRONGARGS); } - HugoTransactions hugoTrans(*pTab); + // threads + NDBT_ThreadSet ths(_threads); + + // create Ndb object for each thread + if (ths.connect(&con, "TEST_DB") == -1) { + ndbout << "connect failed: err=" << ths.get_err() << endl; + return NDBT_ProgramExit(NDBT_FAILED); + } + + // input is options + ThrInput input; + ths.set_input(&input); + input.pTab = pTab; + input.records = _records; + input.batch = _batch; + input.stats = _stats; + + // output is stats + ThrOutput output; + ths.set_output<ThrOutput>(); + int i = 0; - while (i<_loops || _loops==0) { - ndbout << "loop " << i << ": "; - if (hugoTrans.pkUpdateRecords(&MyNdb, - _records) != 0){ - return NDBT_ProgramExit(NDBT_FAILED); + while (i < _loops || _loops == 0) { + ndbout << i << ": "; + + ths.set_func(hugoPkUpdate); + ths.start(); + ths.stop(); + + if (ths.get_err()) + NDBT_ProgramExit(NDBT_FAILED); + + if (_stats) { + NDBT_Stats latency; + + // add stats from each thread + int n; + for (n = 0; n < ths.get_count(); n++) { + NDBT_Thread& thr = ths.get_thread(n); + ThrOutput* output = (ThrOutput*)thr.get_output(); + latency += output->latency; + } + + ndbout + << "latency per batch (us): " + << " samples=" << latency.getCount() + << " min=" << (int)latency.getMin() + << " max=" << (int)latency.getMax() + << " mean=" << (int)latency.getMean() + << " stddev=" << (int)latency.getStddev() + << endl; } i++; } return NDBT_ProgramExit(NDBT_OK); } + +static void hugoPkUpdate(NDBT_Thread& thr) +{ + const ThrInput* input = (const ThrInput*)thr.get_input(); + ThrOutput* output = (ThrOutput*)thr.get_output(); + + HugoTransactions hugoTrans(*input->pTab); + output->latency.reset(); + if (input->stats) + hugoTrans.setStatsLatency(&output->latency); + + NDBT_ThreadSet& ths = thr.get_thread_set(); + hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no()); + + int ret; + ret = hugoTrans.pkUpdateRecords(thr.get_ndb(), + input->records, + input->batch); + if (ret != 0) + thr.set_err(ret); +}