Commit 6cc3eee4 authored by unknown's avatar unknown

wl2405.patch


storage/ndb/include/ndbapi/Ndb.hpp:
  Import patch wl2405.patch
storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Import patch wl2405.patch
storage/ndb/include/ndbapi/NdbScanOperation.hpp:
  Import patch wl2405.patch
storage/ndb/include/portlib/NdbThread.h:
  Import patch wl2405.patch
storage/ndb/src/common/portlib/NdbThread.c:
  Import patch wl2405.patch
storage/ndb/src/common/transporter/TransporterRegistry.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndb.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbScanOperation.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbWaiter.hpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndbif.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndbinit.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/TransporterFacade.cpp:
  Import patch wl2405.patch
storage/ndb/src/ndbapi/TransporterFacade.hpp:
  Import patch wl2405.patch
parent bc2776d9
......@@ -984,6 +984,8 @@ class BaseString;
class NdbEventOperation;
class NdbBlob;
class NdbReceiver;
class TransporterFacade;
class PollGuard;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
......@@ -1462,7 +1464,12 @@ public:
/*****************************************************************************
* These are service routines used by the other classes in the NDBAPI.
****************************************************************************/
Uint32 get_cond_wait_index() { return cond_wait_index; }
void set_cond_wait_index(Uint32 index) { cond_wait_index = index; }
private:
Uint32 cond_wait_index;
Ndb *cond_signal_ndb;
void cond_signal();
void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName);
......@@ -1513,13 +1520,11 @@ private:
// synchronous and asynchronous interface
void handleReceivedSignal(NdbApiSignal* anApiSignal, struct LinearSectionPtr ptr[3]);
// Receive response signals
int receiveResponse(int waitTime = WAITFOR_RESPONSE_TIMEOUT);
int sendRecSignal(Uint16 aNodeId,
Uint32 aWaitState,
NdbApiSignal* aSignal,
Uint32 nodeSequence);
Uint32 nodeSequence,
Uint32 *ret_conn_seq= 0);
// Sets Restart GCI in Ndb object
void RestartGCI(int aRestartGCI);
......@@ -1576,7 +1581,9 @@ private:
Uint32 pollCompleted(NdbTransaction** aCopyArray);
void sendPrepTrans(int forceSend);
void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans);
void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor);
int poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg);
void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor,
PollGuard *pg);
void completedTransaction(NdbTransaction* aTransaction);
void completedScanTransaction(NdbTransaction* aTransaction);
......
......@@ -161,7 +161,7 @@ private:
void fix_get_values();
int next_result_ordered(bool fetchAllowed, bool forceSend = false);
int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
int send_next_scan_ordered(Uint32 idx);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns;
......
......@@ -21,6 +21,7 @@
class NdbBlob;
class NdbResultSet;
class PollGuard;
/**
* @class NdbScanOperation
......@@ -183,7 +184,8 @@ protected:
int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
virtual void release();
int close_impl(class TransporterFacade*, bool forceSend = false);
int close_impl(class TransporterFacade*, bool forceSend,
PollGuard *poll_guard);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
......@@ -192,7 +194,6 @@ protected:
int init(const NdbTableImpl* tab, NdbTransaction*);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId);
void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode);
......@@ -234,7 +235,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here
int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP();
......
......@@ -37,6 +37,14 @@ typedef size_t NDB_THREAD_STACKSIZE;
struct NdbThread;
/*
Method to block/unblock thread from receiving KILL signal with
signum set in g_ndb_shm_signum in a portable manner.
*/
#ifdef NDB_SHM_TRANSPORTER
void NdbThread_set_shm_sigmask(bool block);
#endif
/**
* Create a thread
*
......
......@@ -36,6 +36,27 @@ struct NdbThread
void * object;
};
#ifdef NDB_SHM_TRANSPORTER
void NdbThread_set_shm_sigmask(bool block)
{
DBUG_ENTER("NdbThread_set_shm_sigmask");
if (g_ndb_shm_signum)
{
sigset_t mask;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
if (block)
pthread_sigmask(SIG_BLOCK, &mask, 0);
else
pthread_sigmask(SIG_UNBLOCK, &mask, 0);
}
DBUG_VOID_RETURN;
}
#endif
static
void*
ndb_thread_wrapper(void* _ss){
......@@ -43,14 +64,7 @@ ndb_thread_wrapper(void* _ss){
{
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER
if (g_ndb_shm_signum)
{
sigset_t mask;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
pthread_sigmask(SIG_BLOCK, &mask, 0);
}
NdbThread_set_shm_sigmask(true);
#endif
{
void *ret;
......
......@@ -457,10 +457,7 @@ TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
* Make sure to block g_ndb_shm_signum
* TransporterRegistry::init is run from "main" thread
*/
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
pthread_sigmask(SIG_BLOCK, &mask, 0);
NdbThread_set_shm_sigmask(true);
}
if(config->shm.signum != g_ndb_shm_signum)
......@@ -1490,11 +1487,9 @@ TransporterRegistry::startReceiving()
DBUG_PRINT("info",("Install signal handler for signum %d",
g_ndb_shm_signum));
struct sigaction sa;
NdbThread_set_shm_sigmask(false);
sigemptyset(&sa.sa_mask);
sigaddset(&sa.sa_mask, g_ndb_shm_signum);
pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
sa.sa_handler = shm_sig_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
int ret;
while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
......
......@@ -173,23 +173,9 @@ Ndb::NDB_connect(Uint32 tNode)
tSignal->setData(theMyRef, 2); // Set my block reference
tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
Uint32 nodeSequence;
{ // send and receive signal
Guard guard(tp->theMutexPtr);
nodeSequence = tp->getNodeSequence(tNode);
bool node_is_alive = tp->get_node_alive(tNode);
if (node_is_alive) {
tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal);
if (tReturnCode != -1) {
theImpl->theWaiter.m_node = tNode;
theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse();
}//if
} else {
releaseSignal(tSignal);
tReturnCode = -1;
}//if
}
tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
0, &nodeSequence);
releaseSignal(tSignal);
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
//************************************************
// Send and receive was successful
......
......@@ -989,7 +989,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
m_buffer.clear();
// Protected area
m_transporter->lock_mutex();
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint32 aNodeId;
if (useMasterNodeId) {
if ((m_masterNodeId == 0) ||
......@@ -1002,7 +1008,6 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
}
if(aNodeId == 0){
m_error.code= 4009;
m_transporter->unlock_mutex();
DBUG_RETURN(-1);
}
{
......@@ -1023,21 +1028,15 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
r = m_transporter->sendSignal(signal, aNodeId);
}
if(r != 0){
m_transporter->unlock_mutex();
continue;
}
}
m_error.code= 0;
m_waiter.m_node = aNodeId;
m_waiter.m_state = wst;
m_waiter.wait(theWait);
m_transporter->unlock_mutex();
int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst);
// End of Protected area
if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
if(ret_val == 0 && m_error.code == 0){
// Normal return
DBUG_RETURN(0);
}
......@@ -1045,7 +1044,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
/**
* Handle error codes
*/
if(m_waiter.m_state == WAIT_NODE_FAILURE)
if(ret_val == -2) //WAIT_NODE_FAILURE
continue;
if(m_waiter.m_state == WST_WAIT_TIMEOUT)
......@@ -3166,26 +3165,28 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
for (Uint32 i = 0; i < RETRIES; i++) {
m_buffer.clear();
// begin protected
m_transporter->lock_mutex();
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint16 aNodeId = m_transporter->get_an_alive_node();
if (aNodeId == 0) {
m_error.code= 4009;
m_transporter->unlock_mutex();
return -1;
}
if (m_transporter->sendSignal(signal, aNodeId) != 0) {
m_transporter->unlock_mutex();
continue;
}
m_error.code= 0;
m_waiter.m_node = aNodeId;
m_waiter.m_state = WAIT_LIST_TABLES_CONF;
m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
m_transporter->unlock_mutex();
int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,
aNodeId, WAIT_LIST_TABLES_CONF);
// end protected
if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
if (ret_val == 0 && m_error.code == 0)
return 0;
if (m_waiter.m_state == WAIT_NODE_FAILURE)
if (ret_val == -2) //WAIT_NODE_FAILURE
continue;
return -1;
}
......
......@@ -460,13 +460,20 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
forceSend) == 0){
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
{
idx = m_current_api_receiver;
last = m_api_receivers_count;
......@@ -495,10 +502,9 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
/**
* No completed...
*/
theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
......@@ -557,8 +563,8 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
}
int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
bool forceSend){
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
{
if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
......@@ -605,9 +611,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
ret = tp->sendSignal(&tSignal, nodeId);
}
}
if (!ret) checkForceSend(forceSend);
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
......@@ -617,15 +620,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
return 0;
}
void NdbScanOperation::checkForceSend(bool forceSend)
{
if (forceSend) {
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
} else {
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
}//if
}
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
......@@ -661,9 +655,15 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
m_sent_receivers_count);
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
close_impl(tp, forceSend);
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
close_impl(tp, forceSend, &poll_guard);
}
NdbConnection* tCon = theNdbCon;
......@@ -1338,20 +1338,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) &&
!send_next_scan_ordered(s_idx, forceSend)){
!send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
......@@ -1438,7 +1444,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
}
int
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
{
if(idx == theParallelism)
return 0;
......@@ -1476,12 +1483,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
int ret= tp->sendSignal(&tSignal, nodeId);
if (!ret) checkForceSend(forceSend);
return ret;
}
int
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
PollGuard *poll_guard)
{
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
......@@ -1496,9 +1504,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(theError.code == 0 && m_sent_receivers_count)
{
theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
false);
switch(return_code){
case 0:
break;
......@@ -1555,7 +1562,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
}
// Send close scan
if(send_next_scan(api+conf, true, forceSend) == -1)
if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
......@@ -1566,9 +1573,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
switch(return_code){
case 0:
break;
......@@ -1608,12 +1614,19 @@ NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
Uint32 nodeId = theNdbCon->theDBnode;
{
int res;
if((res= close_impl(tp, forceSend)))
if((res= close_impl(tp, forceSend, &poll_guard)))
{
return res;
}
......@@ -1627,7 +1640,6 @@ NdbScanOperation::restart(bool forceSend)
theError.code = 0;
if (doSendScan(nodeId) == -1)
return -1;
return 0;
}
......@@ -1637,8 +1649,15 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
res= close_impl(tp, forceSend);
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
res= close_impl(tp, forceSend, &poll_guard);
}
if(!res)
......
......@@ -54,10 +54,19 @@ public:
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
void cond_signal();
void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; }
Uint32 get_state() { return m_state; }
void set_state(Uint32 state) { m_state= state; }
void set_node(Uint32 node) { m_node= node; }
Uint32 get_cond_wait_index() { return m_cond_wait_index; }
void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; }
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
bool m_poll_owner;
Uint32 m_cond_wait_index;
struct NdbCondition * m_condition;
};
......@@ -65,22 +74,8 @@ inline
void
NdbWaiter::wait(int waitTime)
{
const bool forever = (waitTime == -1);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
assert(!m_poll_owner);
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
}
inline
......@@ -88,7 +83,8 @@ void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
NdbCondition_Signal(m_condition);
if (!m_poll_owner)
NdbCondition_Signal(m_condition);
}
}
......@@ -96,7 +92,14 @@ inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
NdbCondition_Signal(m_condition);
if (!m_poll_owner)
NdbCondition_Signal(m_condition);
}
inline
void
NdbWaiter::cond_signal()
{
NdbCondition_Signal(m_condition);
}
#endif
This diff is collapsed.
......@@ -86,6 +86,9 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theFirstTransId= 0;
theMyRef= 0;
cond_wait_index = TransporterFacade::MAX_NO_THREADS;
cond_signal_ndb = NULL;
fullyQualifiedNames = true;
#ifdef POORMANSPURIFY
......@@ -217,6 +220,8 @@ NdbWaiter::NdbWaiter(){
m_node = 0;
m_state = NO_WAIT;
m_mutex = 0;
m_poll_owner= false;
m_cond_wait_index= TransporterFacade::MAX_NO_THREADS;
m_condition = NdbCondition_Create();
}
......
......@@ -34,6 +34,7 @@ class ConfigRetriever;
class Ndb;
class NdbApiSignal;
class NdbWaiter;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
......@@ -47,6 +48,11 @@ extern "C" {
class TransporterFacade
{
public:
/**
* Max number of Ndb objects.
* (Ndb objects should not be shared by different threads.)
*/
STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade();
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
......@@ -114,10 +120,44 @@ public:
TransporterRegistry* get_registry() { return theTransporterRegistry;};
/*
When a thread has sent its signals and is ready to wait for reception
of these it does normally always wait on a conditional mutex and
the actual reception is handled by the receiver thread in the NDB API.
With the below new methods and variables each thread has the possibility
of becoming owner of the "right" to poll for signals. Effectually this
means that the thread acts temporarily as a receiver thread.
For the thread that succeeds in grabbing this "ownership" it will avoid
a number of expensive calls to conditional mutex and even more expensive
context switches to wake up.
When an owner of the poll "right" has completed its own task it is likely
that there are others still waiting. In this case we pick one of the
threads as new owner of the poll "right". Since we want to switch owner
as seldom as possible we always pick the last thread which is likely to
be the last to complete its reception.
*/
void external_poll(Uint32 wait_time);
NdbWaiter* get_poll_owner(void) const { return poll_owner; }
void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
NdbWaiter* rem_last_from_cond_wait_queue();
// heart beat received from a node (e.g. a signal came)
void hb_received(NodeId n);
private:
void init_cond_wait_queue();
struct CondWaitQueueElement {
NdbWaiter *cond_wait_object;
Uint32 next_cond_wait;
Uint32 prev_cond_wait;
};
NdbWaiter *poll_owner;
CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
Uint32 first_in_cond_wait;
Uint32 first_free_cond_wait;
Uint32 last_in_cond_wait;
/* End poll owner stuff */
/**
* Send a signal unconditional of node status (used by ClusterMgr)
*/
......@@ -172,12 +212,6 @@ private:
/**
* Block number handling
*/
public:
/**
* Max number of Ndb objects.
* (Ndb objects should not be shared by different threads.)
*/
STATIC_CONST( MAX_NO_THREADS = 4711 );
private:
struct ThreadData {
......@@ -245,6 +279,24 @@ public:
GlobalDictCache m_globalDictCache;
};
class PollGuard
{
public:
PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
~PollGuard() { unlock_and_signal(); }
int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
bool forceSend= false);
int wait_for_input_in_loop(int wait_time, bool forceSend);
void wait_for_input(int wait_time);
int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
void unlock_and_signal();
private:
TransporterFacade *m_tp;
NdbWaiter *m_waiter;
Uint32 m_block_no;
bool m_locked;
};
inline
TransporterFacade*
TransporterFacade::instance()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment