wl2405.patch

parent aadcf0d3
...@@ -984,6 +984,8 @@ class BaseString; ...@@ -984,6 +984,8 @@ class BaseString;
class NdbEventOperation; class NdbEventOperation;
class NdbBlob; class NdbBlob;
class NdbReceiver; class NdbReceiver;
class TransporterFacade;
class PollGuard;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
...@@ -1462,7 +1464,12 @@ public: ...@@ -1462,7 +1464,12 @@ public:
/***************************************************************************** /*****************************************************************************
* These are service routines used by the other classes in the NDBAPI. * These are service routines used by the other classes in the NDBAPI.
****************************************************************************/ ****************************************************************************/
Uint32 get_cond_wait_index() { return cond_wait_index; }
void set_cond_wait_index(Uint32 index) { cond_wait_index = index; }
private: private:
Uint32 cond_wait_index;
Ndb *cond_signal_ndb;
void cond_signal();
void setup(Ndb_cluster_connection *ndb_cluster_connection, void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName); const char* aCatalogName, const char* aSchemaName);
...@@ -1513,13 +1520,11 @@ private: ...@@ -1513,13 +1520,11 @@ private:
// synchronous and asynchronous interface // synchronous and asynchronous interface
void handleReceivedSignal(NdbApiSignal* anApiSignal, struct LinearSectionPtr ptr[3]); void handleReceivedSignal(NdbApiSignal* anApiSignal, struct LinearSectionPtr ptr[3]);
// Receive response signals
int receiveResponse(int waitTime = WAITFOR_RESPONSE_TIMEOUT);
int sendRecSignal(Uint16 aNodeId, int sendRecSignal(Uint16 aNodeId,
Uint32 aWaitState, Uint32 aWaitState,
NdbApiSignal* aSignal, NdbApiSignal* aSignal,
Uint32 nodeSequence); Uint32 nodeSequence,
Uint32 *ret_conn_seq= 0);
// Sets Restart GCI in Ndb object // Sets Restart GCI in Ndb object
void RestartGCI(int aRestartGCI); void RestartGCI(int aRestartGCI);
...@@ -1576,7 +1581,9 @@ private: ...@@ -1576,7 +1581,9 @@ private:
Uint32 pollCompleted(NdbTransaction** aCopyArray); Uint32 pollCompleted(NdbTransaction** aCopyArray);
void sendPrepTrans(int forceSend); void sendPrepTrans(int forceSend);
void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans); void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans);
void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor); int poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg);
void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor,
PollGuard *pg);
void completedTransaction(NdbTransaction* aTransaction); void completedTransaction(NdbTransaction* aTransaction);
void completedScanTransaction(NdbTransaction* aTransaction); void completedScanTransaction(NdbTransaction* aTransaction);
......
...@@ -161,7 +161,7 @@ private: ...@@ -161,7 +161,7 @@ private:
void fix_get_values(); void fix_get_values();
int next_result_ordered(bool fetchAllowed, bool forceSend = false); int next_result_ordered(bool fetchAllowed, bool forceSend = false);
int send_next_scan_ordered(Uint32 idx, bool forceSend = false); int send_next_scan_ordered(Uint32 idx);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns; Uint32 m_sort_columns;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
class NdbBlob; class NdbBlob;
class NdbResultSet; class NdbResultSet;
class PollGuard;
/** /**
* @class NdbScanOperation * @class NdbScanOperation
...@@ -183,7 +184,8 @@ protected: ...@@ -183,7 +184,8 @@ protected:
int nextResultImpl(bool fetchAllowed = true, bool forceSend = false); int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
virtual void release(); virtual void release();
int close_impl(class TransporterFacade*, bool forceSend = false); int close_impl(class TransporterFacade*, bool forceSend,
PollGuard *poll_guard);
// Overloaded methods from NdbCursorOperation // Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId); int executeCursor(int ProcessorId);
...@@ -192,7 +194,6 @@ protected: ...@@ -192,7 +194,6 @@ protected:
int init(const NdbTableImpl* tab, NdbTransaction*); int init(const NdbTableImpl* tab, NdbTransaction*);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId); int doSend(int ProcessorId);
void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode); virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode);
...@@ -234,7 +235,7 @@ protected: ...@@ -234,7 +235,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here NdbReceiver** m_sent_receivers; // receive thread puts them here
int send_next_scan(Uint32 cnt, bool close, bool forceSend = false); int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*); void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*); void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(); void execCLOSE_SCAN_REP();
......
...@@ -37,6 +37,14 @@ typedef size_t NDB_THREAD_STACKSIZE; ...@@ -37,6 +37,14 @@ typedef size_t NDB_THREAD_STACKSIZE;
struct NdbThread; struct NdbThread;
/*
Method to block/unblock thread from receiving KILL signal with
signum set in g_ndb_shm_signum in a portable manner.
*/
#ifdef NDB_SHM_TRANSPORTER
void NdbThread_set_shm_sigmask(bool block);
#endif
/** /**
* Create a thread * Create a thread
* *
......
...@@ -36,21 +36,35 @@ struct NdbThread ...@@ -36,21 +36,35 @@ struct NdbThread
void * object; void * object;
}; };
static
void*
ndb_thread_wrapper(void* _ss){
my_thread_init();
{
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER #ifdef NDB_SHM_TRANSPORTER
void NdbThread_set_shm_sigmask(bool block)
{
DBUG_ENTER("NdbThread_set_shm_sigmask");
if (g_ndb_shm_signum) if (g_ndb_shm_signum)
{ {
sigset_t mask; sigset_t mask;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
sigemptyset(&mask); sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum); sigaddset(&mask, g_ndb_shm_signum);
if (block)
pthread_sigmask(SIG_BLOCK, &mask, 0); pthread_sigmask(SIG_BLOCK, &mask, 0);
else
pthread_sigmask(SIG_UNBLOCK, &mask, 0);
} }
DBUG_VOID_RETURN;
}
#endif
static
void*
ndb_thread_wrapper(void* _ss){
my_thread_init();
{
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER
NdbThread_set_shm_sigmask(true);
#endif #endif
{ {
void *ret; void *ret;
......
...@@ -457,10 +457,7 @@ TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) { ...@@ -457,10 +457,7 @@ TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
* Make sure to block g_ndb_shm_signum * Make sure to block g_ndb_shm_signum
* TransporterRegistry::init is run from "main" thread * TransporterRegistry::init is run from "main" thread
*/ */
sigset_t mask; NdbThread_set_shm_sigmask(true);
sigemptyset(&mask);
sigaddset(&mask, g_ndb_shm_signum);
pthread_sigmask(SIG_BLOCK, &mask, 0);
} }
if(config->shm.signum != g_ndb_shm_signum) if(config->shm.signum != g_ndb_shm_signum)
...@@ -1490,11 +1487,9 @@ TransporterRegistry::startReceiving() ...@@ -1490,11 +1487,9 @@ TransporterRegistry::startReceiving()
DBUG_PRINT("info",("Install signal handler for signum %d", DBUG_PRINT("info",("Install signal handler for signum %d",
g_ndb_shm_signum)); g_ndb_shm_signum));
struct sigaction sa; struct sigaction sa;
NdbThread_set_shm_sigmask(false);
sigemptyset(&sa.sa_mask); sigemptyset(&sa.sa_mask);
sigaddset(&sa.sa_mask, g_ndb_shm_signum);
pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
sa.sa_handler = shm_sig_handler; sa.sa_handler = shm_sig_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0; sa.sa_flags = 0;
int ret; int ret;
while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR); while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
......
...@@ -173,23 +173,9 @@ Ndb::NDB_connect(Uint32 tNode) ...@@ -173,23 +173,9 @@ Ndb::NDB_connect(Uint32 tNode)
tSignal->setData(theMyRef, 2); // Set my block reference tSignal->setData(theMyRef, 2); // Set my block reference
tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
Uint32 nodeSequence; Uint32 nodeSequence;
{ // send and receive signal tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
Guard guard(tp->theMutexPtr); 0, &nodeSequence);
nodeSequence = tp->getNodeSequence(tNode);
bool node_is_alive = tp->get_node_alive(tNode);
if (node_is_alive) {
tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal); releaseSignal(tSignal);
if (tReturnCode != -1) {
theImpl->theWaiter.m_node = tNode;
theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse();
}//if
} else {
releaseSignal(tSignal);
tReturnCode = -1;
}//if
}
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) { if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
//************************************************ //************************************************
// Send and receive was successful // Send and receive was successful
......
...@@ -989,7 +989,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, ...@@ -989,7 +989,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
m_buffer.clear(); m_buffer.clear();
// Protected area // Protected area
m_transporter->lock_mutex(); /*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint32 aNodeId; Uint32 aNodeId;
if (useMasterNodeId) { if (useMasterNodeId) {
if ((m_masterNodeId == 0) || if ((m_masterNodeId == 0) ||
...@@ -1002,7 +1008,6 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, ...@@ -1002,7 +1008,6 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
} }
if(aNodeId == 0){ if(aNodeId == 0){
m_error.code= 4009; m_error.code= 4009;
m_transporter->unlock_mutex();
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
{ {
...@@ -1023,21 +1028,15 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, ...@@ -1023,21 +1028,15 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
r = m_transporter->sendSignal(signal, aNodeId); r = m_transporter->sendSignal(signal, aNodeId);
} }
if(r != 0){ if(r != 0){
m_transporter->unlock_mutex();
continue; continue;
} }
} }
m_error.code= 0; m_error.code= 0;
int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst);
m_waiter.m_node = aNodeId;
m_waiter.m_state = wst;
m_waiter.wait(theWait);
m_transporter->unlock_mutex();
// End of Protected area // End of Protected area
if(m_waiter.m_state == NO_WAIT && m_error.code == 0){ if(ret_val == 0 && m_error.code == 0){
// Normal return // Normal return
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1045,7 +1044,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, ...@@ -1045,7 +1044,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
/** /**
* Handle error codes * Handle error codes
*/ */
if(m_waiter.m_state == WAIT_NODE_FAILURE) if(ret_val == -2) //WAIT_NODE_FAILURE
continue; continue;
if(m_waiter.m_state == WST_WAIT_TIMEOUT) if(m_waiter.m_state == WST_WAIT_TIMEOUT)
...@@ -3166,26 +3165,28 @@ NdbDictInterface::listObjects(NdbApiSignal* signal) ...@@ -3166,26 +3165,28 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
for (Uint32 i = 0; i < RETRIES; i++) { for (Uint32 i = 0; i < RETRIES; i++) {
m_buffer.clear(); m_buffer.clear();
// begin protected // begin protected
m_transporter->lock_mutex(); /*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint16 aNodeId = m_transporter->get_an_alive_node(); Uint16 aNodeId = m_transporter->get_an_alive_node();
if (aNodeId == 0) { if (aNodeId == 0) {
m_error.code= 4009; m_error.code= 4009;
m_transporter->unlock_mutex();
return -1; return -1;
} }
if (m_transporter->sendSignal(signal, aNodeId) != 0) { if (m_transporter->sendSignal(signal, aNodeId) != 0) {
m_transporter->unlock_mutex();
continue; continue;
} }
m_error.code= 0; m_error.code= 0;
m_waiter.m_node = aNodeId; int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,
m_waiter.m_state = WAIT_LIST_TABLES_CONF; aNodeId, WAIT_LIST_TABLES_CONF);
m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
m_transporter->unlock_mutex();
// end protected // end protected
if (m_waiter.m_state == NO_WAIT && m_error.code == 0) if (ret_val == 0 && m_error.code == 0)
return 0; return 0;
if (m_waiter.m_state == WAIT_NODE_FAILURE) if (ret_val == -2) //WAIT_NODE_FAILURE
continue; continue;
return -1; return -1;
} }
......
...@@ -460,13 +460,20 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) ...@@ -460,13 +460,20 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); /*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
if(theError.code) if(theError.code)
return -1; return -1;
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
forceSend) == 0){ {
idx = m_current_api_receiver; idx = m_current_api_receiver;
last = m_api_receivers_count; last = m_api_receivers_count;
...@@ -495,10 +502,9 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) ...@@ -495,10 +502,9 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
/** /**
* No completed... * No completed...
*/ */
theNdb->theImpl->theWaiter.m_node = nodeId; int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; forceSend);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
} else { } else {
idx = last; idx = last;
...@@ -557,8 +563,8 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) ...@@ -557,8 +563,8 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
} }
int int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
bool forceSend){ {
if(cnt > 0){ if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef); NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ); tSignal.setSignal(GSN_SCAN_NEXTREQ);
...@@ -605,9 +611,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, ...@@ -605,9 +611,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
ret = tp->sendSignal(&tSignal, nodeId); ret = tp->sendSignal(&tSignal, nodeId);
} }
} }
if (!ret) checkForceSend(forceSend);
m_sent_receivers_count = last + sent; m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt; m_api_receivers_count -= cnt;
m_current_api_receiver = 0; m_current_api_receiver = 0;
...@@ -617,15 +620,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, ...@@ -617,15 +620,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
return 0; return 0;
} }
void NdbScanOperation::checkForceSend(bool forceSend)
{
if (forceSend) {
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
} else {
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
}//if
}
int int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{ {
...@@ -661,9 +655,15 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp) ...@@ -661,9 +655,15 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
m_sent_receivers_count); m_sent_receivers_count);
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); /*
close_impl(tp, forceSend); The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
close_impl(tp, forceSend, &poll_guard);
} }
NdbConnection* tCon = theNdbCon; NdbConnection* tCon = theNdbCon;
...@@ -1338,20 +1338,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, ...@@ -1338,20 +1338,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
if(fetchAllowed){ if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); /*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
if(theError.code) if(theError.code)
return -1; return -1;
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && if(seq == tp->getNodeSequence(nodeId) &&
!send_next_scan_ordered(s_idx, forceSend)){ !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count; Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver; s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){ while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theImpl->theWaiter.m_node = nodeId; int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; forceSend);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
} }
if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
...@@ -1438,7 +1444,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, ...@@ -1438,7 +1444,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
} }
int int
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
{
if(idx == theParallelism) if(idx == theParallelism)
return 0; return 0;
...@@ -1476,12 +1483,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ ...@@ -1476,12 +1483,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
TransporterFacade * tp = TransporterFacade::instance(); TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1); tSignal.setLength(4+1);
int ret= tp->sendSignal(&tSignal, nodeId); int ret= tp->sendSignal(&tSignal, nodeId);
if (!ret) checkForceSend(forceSend);
return ret; return ret;
} }
int int
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
PollGuard *poll_guard)
{
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
...@@ -1496,9 +1504,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1496,9 +1504,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(theError.code == 0 && m_sent_receivers_count) while(theError.code == 0 && m_sent_receivers_count)
{ {
theNdb->theImpl->theWaiter.m_node = nodeId; int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; false);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
break; break;
...@@ -1555,7 +1562,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1555,7 +1562,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
} }
// Send close scan // Send close scan
if(send_next_scan(api+conf, true, forceSend) == -1) if(send_next_scan(api+conf, true) == -1)
{ {
theNdbCon->theReleaseOnClose = true; theNdbCon->theReleaseOnClose = true;
return -1; return -1;
...@@ -1566,9 +1573,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1566,9 +1573,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/ */
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{ {
theNdb->theImpl->theWaiter.m_node = nodeId; int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; forceSend);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
break; break;
...@@ -1608,12 +1614,19 @@ NdbScanOperation::restart(bool forceSend) ...@@ -1608,12 +1614,19 @@ NdbScanOperation::restart(bool forceSend)
{ {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); /*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
{ {
int res; int res;
if((res= close_impl(tp, forceSend))) if((res= close_impl(tp, forceSend, &poll_guard)))
{ {
return res; return res;
} }
...@@ -1627,7 +1640,6 @@ NdbScanOperation::restart(bool forceSend) ...@@ -1627,7 +1640,6 @@ NdbScanOperation::restart(bool forceSend)
theError.code = 0; theError.code = 0;
if (doSendScan(nodeId) == -1) if (doSendScan(nodeId) == -1)
return -1; return -1;
return 0; return 0;
} }
...@@ -1637,8 +1649,15 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){ ...@@ -1637,8 +1649,15 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){
{ {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); /*
res= close_impl(tp, forceSend); The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
in all places where the object is out of context due to a return,
break, continue or simply end of statement block
*/
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber);
res= close_impl(tp, forceSend, &poll_guard);
} }
if(!res) if(!res)
......
...@@ -54,10 +54,19 @@ public: ...@@ -54,10 +54,19 @@ public:
void wait(int waitTime); void wait(int waitTime);
void nodeFail(Uint32 node); void nodeFail(Uint32 node);
void signal(Uint32 state); void signal(Uint32 state);
void cond_signal();
void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; }
Uint32 get_state() { return m_state; }
void set_state(Uint32 state) { m_state= state; }
void set_node(Uint32 node) { m_node= node; }
Uint32 get_cond_wait_index() { return m_cond_wait_index; }
void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; }
Uint32 m_node; Uint32 m_node;
Uint32 m_state; Uint32 m_state;
void * m_mutex; void * m_mutex;
bool m_poll_owner;
Uint32 m_cond_wait_index;
struct NdbCondition * m_condition; struct NdbCondition * m_condition;
}; };
...@@ -65,22 +74,8 @@ inline ...@@ -65,22 +74,8 @@ inline
void void
NdbWaiter::wait(int waitTime) NdbWaiter::wait(int waitTime)
{ {
const bool forever = (waitTime == -1); assert(!m_poll_owner);
const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
while (1) {
if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
break;
if (forever) {
NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
} else {
if (waitTime <= 0) {
m_state = WST_WAIT_TIMEOUT;
break;
}
NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime); NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
} }
inline inline
...@@ -88,6 +83,7 @@ void ...@@ -88,6 +83,7 @@ void
NdbWaiter::nodeFail(Uint32 aNodeId){ NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){ if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE; m_state = WAIT_NODE_FAILURE;
if (!m_poll_owner)
NdbCondition_Signal(m_condition); NdbCondition_Signal(m_condition);
} }
} }
...@@ -96,7 +92,14 @@ inline ...@@ -96,7 +92,14 @@ inline
void void
NdbWaiter::signal(Uint32 state){ NdbWaiter::signal(Uint32 state){
m_state = state; m_state = state;
if (!m_poll_owner)
NdbCondition_Signal(m_condition); NdbCondition_Signal(m_condition);
} }
inline
void
NdbWaiter::cond_signal()
{
NdbCondition_Signal(m_condition);
}
#endif #endif
This diff is collapsed.
...@@ -86,6 +86,9 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, ...@@ -86,6 +86,9 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theFirstTransId= 0; theFirstTransId= 0;
theMyRef= 0; theMyRef= 0;
cond_wait_index = TransporterFacade::MAX_NO_THREADS;
cond_signal_ndb = NULL;
fullyQualifiedNames = true; fullyQualifiedNames = true;
#ifdef POORMANSPURIFY #ifdef POORMANSPURIFY
...@@ -217,6 +220,8 @@ NdbWaiter::NdbWaiter(){ ...@@ -217,6 +220,8 @@ NdbWaiter::NdbWaiter(){
m_node = 0; m_node = 0;
m_state = NO_WAIT; m_state = NO_WAIT;
m_mutex = 0; m_mutex = 0;
m_poll_owner= false;
m_cond_wait_index= TransporterFacade::MAX_NO_THREADS;
m_condition = NdbCondition_Create(); m_condition = NdbCondition_Create();
} }
......
...@@ -34,6 +34,7 @@ class ConfigRetriever; ...@@ -34,6 +34,7 @@ class ConfigRetriever;
class Ndb; class Ndb;
class NdbApiSignal; class NdbApiSignal;
class NdbWaiter;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]); typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete); typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
...@@ -47,6 +48,11 @@ extern "C" { ...@@ -47,6 +48,11 @@ extern "C" {
class TransporterFacade class TransporterFacade
{ {
public: public:
/**
* Max number of Ndb objects.
* (Ndb objects should not be shared by different threads.)
*/
STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade(); TransporterFacade();
virtual ~TransporterFacade(); virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *); bool init(Uint32, const ndb_mgm_configuration *);
...@@ -114,10 +120,44 @@ public: ...@@ -114,10 +120,44 @@ public:
TransporterRegistry* get_registry() { return theTransporterRegistry;}; TransporterRegistry* get_registry() { return theTransporterRegistry;};
/*
When a thread has sent its signals and is ready to wait for reception
of these it does normally always wait on a conditional mutex and
the actual reception is handled by the receiver thread in the NDB API.
With the below new methods and variables each thread has the possibility
of becoming owner of the "right" to poll for signals. Effectually this
means that the thread acts temporarily as a receiver thread.
For the thread that succeeds in grabbing this "ownership" it will avoid
a number of expensive calls to conditional mutex and even more expensive
context switches to wake up.
When an owner of the poll "right" has completed its own task it is likely
that there are others still waiting. In this case we pick one of the
threads as new owner of the poll "right". Since we want to switch owner
as seldom as possible we always pick the last thread which is likely to
be the last to complete its reception.
*/
void external_poll(Uint32 wait_time);
NdbWaiter* get_poll_owner(void) const { return poll_owner; }
void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
NdbWaiter* rem_last_from_cond_wait_queue();
// heart beat received from a node (e.g. a signal came) // heart beat received from a node (e.g. a signal came)
void hb_received(NodeId n); void hb_received(NodeId n);
private: private:
void init_cond_wait_queue();
struct CondWaitQueueElement {
NdbWaiter *cond_wait_object;
Uint32 next_cond_wait;
Uint32 prev_cond_wait;
};
NdbWaiter *poll_owner;
CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
Uint32 first_in_cond_wait;
Uint32 first_free_cond_wait;
Uint32 last_in_cond_wait;
/* End poll owner stuff */
/** /**
* Send a signal unconditional of node status (used by ClusterMgr) * Send a signal unconditional of node status (used by ClusterMgr)
*/ */
...@@ -172,12 +212,6 @@ private: ...@@ -172,12 +212,6 @@ private:
/** /**
* Block number handling * Block number handling
*/ */
public:
/**
* Max number of Ndb objects.
* (Ndb objects should not be shared by different threads.)
*/
STATIC_CONST( MAX_NO_THREADS = 4711 );
private: private:
struct ThreadData { struct ThreadData {
...@@ -245,6 +279,24 @@ public: ...@@ -245,6 +279,24 @@ public:
GlobalDictCache m_globalDictCache; GlobalDictCache m_globalDictCache;
}; };
class PollGuard
{
public:
PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
~PollGuard() { unlock_and_signal(); }
int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
bool forceSend= false);
int wait_for_input_in_loop(int wait_time, bool forceSend);
void wait_for_input(int wait_time);
int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
void unlock_and_signal();
private:
TransporterFacade *m_tp;
NdbWaiter *m_waiter;
Uint32 m_block_no;
bool m_locked;
};
inline inline
TransporterFacade* TransporterFacade*
TransporterFacade::instance() TransporterFacade::instance()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment