/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "SignalSender.hpp" #include <NdbSleep.h> #include <SignalLoggerManager.hpp> #include <signaldata/NFCompleteRep.hpp> #include <signaldata/NodeFailRep.hpp> SimpleSignal::SimpleSignal(bool dealloc){ memset(this, 0, sizeof(* this)); deallocSections = dealloc; } SimpleSignal::~SimpleSignal(){ if(!deallocSections) return; if(ptr[0].p != 0) delete []ptr[0].p; if(ptr[1].p != 0) delete []ptr[1].p; if(ptr[2].p != 0) delete []ptr[2].p; } void SimpleSignal::set(class SignalSender& ss, Uint8 trace, Uint16 recBlock, Uint16 gsn, Uint32 len){ header.theTrace = trace; header.theReceiversBlockNumber = recBlock; header.theVerId_signalNumber = gsn; header.theLength = len; header.theSendersBlockRef = refToBlock(ss.getOwnRef()); } void SimpleSignal::print(FILE * out){ fprintf(out, "---- Signal ----------------\n"); SignalLoggerManager::printSignalHeader(out, header, 0, 0, false); SignalLoggerManager::printSignalData(out, header, theData); for(Uint32 i = 0; i<header.m_noOfSections; i++){ Uint32 len = ptr[i].sz; fprintf(out, " --- Section %d size=%d ---\n", i, len); Uint32 * signalData = ptr[i].p; while(len >= 7){ fprintf(out, " H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n", signalData[0], signalData[1], signalData[2], signalData[3], signalData[4], signalData[5], signalData[6]); len -= 7; signalData += 7; } if(len > 0){ fprintf(out, " H\'%.8x", signalData[0]); for(Uint32 i = 1; i<len; i++) fprintf(out, " H\'%.8x", signalData[i]); fprintf(out, "\n"); } } } SignalSender::SignalSender(TransporterFacade *facade) : m_lock(0) { m_cond = NdbCondition_Create(); theFacade = facade; m_blockNo = theFacade->open(this, execSignal, execNodeStatus); assert(m_blockNo > 0); } SignalSender::~SignalSender(){ int i; if (m_lock) unlock(); theFacade->close(m_blockNo,0); // free these _after_ closing theFacade to ensure that // we delete all signals for (i= m_jobBuffer.size()-1; i>= 0; i--) delete m_jobBuffer[i]; for (i= m_usedBuffer.size()-1; i>= 0; i--) delete m_usedBuffer[i]; NdbCondition_Destroy(m_cond); } int SignalSender::lock() { if (NdbMutex_Lock(theFacade->theMutexPtr)) return -1; m_lock= 1; return 0; } int SignalSender::unlock() { if (NdbMutex_Unlock(theFacade->theMutexPtr)) return -1; m_lock= 0; return 0; } Uint32 SignalSender::getOwnRef() const { return numberToRef(m_blockNo, theFacade->ownId()); } Uint32 SignalSender::getAliveNode() const{ return theFacade->get_an_alive_node(); } const ClusterMgr::Node & SignalSender::getNodeInfo(Uint16 nodeId) const { return theFacade->theClusterMgr->getNodeInfo(nodeId); } Uint32 SignalSender::getNoOfConnectedNodes() const { return theFacade->theClusterMgr->getNoOfConnectedNodes(); } SendStatus SignalSender::sendSignal(Uint16 nodeId, const SimpleSignal * s){ return theFacade->theTransporterRegistry->prepareSend(&s->header, 1, // JBB &s->theData[0], nodeId, &s->ptr[0]); } template<class T> SimpleSignal * SignalSender::waitFor(Uint32 timeOutMillis, T & t) { SimpleSignal * s = t.check(m_jobBuffer); if(s != 0){ return s; } NDB_TICKS now = NdbTick_CurrentMillisecond(); NDB_TICKS stop = now + timeOutMillis; Uint32 wait = (timeOutMillis == 0 ? 10 : timeOutMillis); do { NdbCondition_WaitTimeout(m_cond, theFacade->theMutexPtr, wait); SimpleSignal * s = t.check(m_jobBuffer); if(s != 0){ m_usedBuffer.push_back(s); return s; } now = NdbTick_CurrentMillisecond(); wait = (timeOutMillis == 0 ? 10 : stop - now); } while(stop > now || timeOutMillis == 0); return 0; } class WaitForAny { public: SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){ if(m_jobBuffer.size() > 0){ SimpleSignal * s = m_jobBuffer[0]; m_jobBuffer.erase(0); return s; } return 0; } }; SimpleSignal * SignalSender::waitFor(Uint32 timeOutMillis){ WaitForAny w; return waitFor(timeOutMillis, w); } class WaitForNode { public: Uint32 m_nodeId; SimpleSignal * check(Vector<SimpleSignal*> & m_jobBuffer){ Uint32 len = m_jobBuffer.size(); for(Uint32 i = 0; i<len; i++){ if(refToNode(m_jobBuffer[i]->header.theSendersBlockRef) == m_nodeId){ SimpleSignal * s = m_jobBuffer[i]; m_jobBuffer.erase(i); return s; } } return 0; } }; SimpleSignal * SignalSender::waitFor(Uint16 nodeId, Uint32 timeOutMillis){ WaitForNode w; w.m_nodeId = nodeId; return waitFor(timeOutMillis, w); } #include <NdbApiSignal.hpp> void SignalSender::execSignal(void* signalSender, NdbApiSignal* signal, class LinearSectionPtr ptr[3]){ SimpleSignal * s = new SimpleSignal(true); s->header = * signal; memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength); for(Uint32 i = 0; i<s->header.m_noOfSections; i++){ s->ptr[i].p = new Uint32[ptr[i].sz]; s->ptr[i].sz = ptr[i].sz; memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz); } SignalSender * ss = (SignalSender*)signalSender; ss->m_jobBuffer.push_back(s); NdbCondition_Signal(ss->m_cond); } void SignalSender::execNodeStatus(void* signalSender, Uint32 nodeId, bool alive, bool nfCompleted){ if (alive) { // node connected return; } SimpleSignal * s = new SimpleSignal(true); SignalSender * ss = (SignalSender*)signalSender; // node disconnected if(nfCompleted) { // node shutdown complete s->header.theVerId_signalNumber = GSN_NF_COMPLETEREP; NFCompleteRep *rep = (NFCompleteRep *)s->getDataPtrSend(); rep->blockNo = 0; rep->nodeId = 0; rep->failedNodeId = nodeId; rep->unused = 0; rep->from = 0; } else { // node failure s->header.theVerId_signalNumber = GSN_NODE_FAILREP; NodeFailRep *rep = (NodeFailRep *)s->getDataPtrSend(); rep->failNo = 0; rep->masterNodeId = 0; rep->noOfNodes = 1; NodeBitmask::clear(rep->theNodes); NodeBitmask::set(rep->theNodes,nodeId); } ss->m_jobBuffer.push_back(s); NdbCondition_Signal(ss->m_cond); } #if __SUNPRO_CC != 0x560 template SimpleSignal* SignalSender::waitFor<WaitForNode>(unsigned, WaitForNode&); template SimpleSignal* SignalSender::waitFor<WaitForAny>(unsigned, WaitForAny&); #endif template class Vector<SimpleSignal*>;