/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <NdbOut.hpp> #include "OSE_Receiver.hpp" #include "OSE_Transporter.hpp" #include "TransporterCallback.hpp" #include <TransporterRegistry.hpp> #include "TransporterInternalDefinitions.hpp" #include <NdbStdio.h> OSE_Receiver::OSE_Receiver(TransporterRegistry * tr, int _recBufSize, NodeId _localNodeId) { theTransporterRegistry = tr; recBufSize = _recBufSize; recBufReadIndex = 0; recBufWriteIndex = 0; receiveBuffer = new union SIGNAL * [recBufSize]; waitStackCount = 0; waitStackSize = _recBufSize; waitStack = new union SIGNAL * [waitStackSize]; nextSigId = new Uint32[MAX_NTRANSPORTERS]; for (int i = 0; i < MAX_NTRANSPORTERS; i++) nextSigId[i] = 0; phantomCreated = false; localNodeId = _localNodeId; snprintf(localHostName, sizeof(localHostName), "ndb_node%d", localNodeId); DEBUG("localNodeId = " << localNodeId << " -> localHostName = " << localHostName); } OSE_Receiver::~OSE_Receiver(){ while(recBufReadIndex != recBufWriteIndex){ free_buf(&receiveBuffer[recBufReadIndex]); recBufReadIndex = (recBufReadIndex + 1) % recBufSize; } delete [] receiveBuffer; destroyPhantom(); } PROCESS OSE_Receiver::createPhantom(){ redir.sig = 1; redir.pid = current_process(); if(!phantomCreated){ phantomPid = create_process (OS_PHANTOM, // Type localHostName, // Name NULL, // Entry point 0, // Stack size 0, // Prio - Not used (OSTIME)0, // Timeslice - Not used 0, // Block - current block &redir, (OSVECTOR)0, // vector (OSUSER)0); // user phantomCreated = true; DEBUG("Created phantom pid: " << hex << phantomPid); } return phantomPid; } void OSE_Receiver::destroyPhantom(){ if(phantomCreated){ DEBUG("Destroying phantom pid: " << hex << phantomPid); kill_proc(phantomPid); phantomCreated = false; } } static SIGSELECT PRIO_A_SIGNALS[] = { 6, NDB_TRANSPORTER_PRIO_A, NDB_TRANSPORTER_HUNT, NDB_TRANSPORTER_CONNECT_REQ, NDB_TRANSPORTER_CONNECT_REF, NDB_TRANSPORTER_CONNECT_CONF, NDB_TRANSPORTER_DISCONNECT_ORD }; static SIGSELECT PRIO_B_SIGNALS[] = { 1, NDB_TRANSPORTER_DATA }; /** * Check waitstack for signals that are next in sequence * Put any found signal in receive buffer * Returns true if one signal is found */ bool OSE_Receiver::checkWaitStack(NodeId _nodeId){ for(int i = 0; i < waitStackCount; i++){ if (waitStack[i]->dataSignal.senderNodeId == _nodeId && waitStack[i]->dataSignal.sigId == nextSigId[_nodeId]){ ndbout_c("INFO: signal popped from waitStack, sigId = %d", waitStack[i]->dataSignal.sigId); if(isFull()){ ndbout_c("ERROR: receiveBuffer is full"); reportError(callbackObj, _nodeId, TE_RECEIVE_BUFFER_FULL); return false; } // The next signal was found, put it in the receive buffer insertReceiveBuffer(waitStack[i]); // Increase sequence id, set it to the next expected id nextSigId[_nodeId]++; // Move signals below up one step for(int j = i; j < waitStackCount-1; j++) waitStack[j] = waitStack[j+1]; waitStack[waitStackCount] = NULL; waitStackCount--; // return true since signal was found return true; } } return false; } /** * Clear waitstack for signals from node with _nodeId */ void OSE_Receiver::clearWaitStack(NodeId _nodeId){ for(int i = 0; i < waitStackCount; i++){ if (waitStack[i]->dataSignal.senderNodeId == _nodeId){ // Free signal buffer free_buf(&waitStack[i]); // Move signals below up one step for(int j = i; j < waitStackCount-1; j++) waitStack[j] = waitStack[j+1]; waitStack[waitStackCount] = NULL; waitStackCount--; } } nextSigId[_nodeId] = 0; } inline void OSE_Receiver::insertWaitStack(union SIGNAL* _sig){ if (waitStackCount <= waitStackSize){ waitStack[waitStackCount] = _sig; waitStackCount++; } else { ndbout_c("ERROR: waitStack is full"); reportError(callbackObj, localNodeId, TE_WAIT_STACK_FULL); } } bool OSE_Receiver::doReceive(Uint32 timeOutMillis) { if(isFull()) return false; union SIGNAL * sig = receive_w_tmo(0, PRIO_A_SIGNALS); if(sig == NIL){ sig = receive_w_tmo(timeOutMillis, PRIO_B_SIGNALS); if(sig == NIL) return false; } DEBUG("Received signal: " << sig->sigNo << " " << sigNo2String(sig->sigNo)); switch(sig->sigNo){ case NDB_TRANSPORTER_PRIO_A: { OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId); if (t != 0 && t->isConnected()){ insertReceiveBuffer(sig); } else { free_buf(&sig); } } break; case NDB_TRANSPORTER_DATA: { OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId); if (t != 0 && t->isConnected()){ int nodeId = sig->dataSignal.senderNodeId; Uint32 currSigId = sig->dataSignal.sigId; /** * Check if signal is the next in sequence * nextSigId is always set to the next sigId to wait for */ if (nextSigId[nodeId] == currSigId){ // Insert in receive buffer insertReceiveBuffer(sig); // Increase sequence id, set it to the next expected id nextSigId[nodeId]++; // Check if there are any signal in the wait stack if (waitStackCount > 0){ while(checkWaitStack(nodeId)); } } else { // Signal was not received in correct order // Check values and put it in the waitStack ndbout_c("WARNING: sigId out of order," " currSigId = %d, nextSigId = %d", currSigId, nextSigId[nodeId]); if (currSigId < nextSigId[nodeId]){ // Current recieved sigId was smaller than nextSigId // There is no use to put it in the waitStack ndbout_c("ERROR: recieved sigId was smaller than nextSigId"); reportError(callbackObj, nodeId, TE_TOO_SMALL_SIGID); return false; } if (currSigId > (nextSigId[nodeId] + waitStackSize)){ // Current sigId was larger than nextSigId + size of waitStack // we can never "save" so many signal's on the stack ndbout_c("ERROR: currSigId > (nextSigId + size of waitStack)"); reportError(callbackObj, nodeId, TE_TOO_LARGE_SIGID); return false; } // Insert in wait stack insertWaitStack(sig); } } else { free_buf(&sig); } } break; case NDB_TRANSPORTER_HUNT: { NdbTransporterHunt * s = (NdbTransporterHunt*)sig; OSE_Transporter * t = getTransporter(s->remoteNodeId); if(t != 0) t->huntReceived(s); free_buf(&sig); } break; case NDB_TRANSPORTER_CONNECT_REQ: { NdbTransporterConnectReq * s = (NdbTransporterConnectReq*)sig; OSE_Transporter * t = getTransporter(s->senderNodeId); if(t != 0){ if(t->connectReq(s)){ clearWaitStack(s->senderNodeId); clearRecvBuffer(s->senderNodeId); } } free_buf(&sig); } break; case NDB_TRANSPORTER_CONNECT_REF: { NdbTransporterConnectRef * s = (NdbTransporterConnectRef*)sig; OSE_Transporter * t = getTransporter(s->senderNodeId); if(t != 0){ if(t->connectRef(s)){ clearWaitStack(s->senderNodeId); clearRecvBuffer(s->senderNodeId); } } free_buf(&sig); } break; case NDB_TRANSPORTER_CONNECT_CONF: { NdbTransporterConnectConf * s = (NdbTransporterConnectConf*)sig; OSE_Transporter * t = getTransporter(s->senderNodeId); if(t != 0){ if(t->connectConf(s)){ clearWaitStack(s->senderNodeId); clearRecvBuffer(s->senderNodeId); } } free_buf(&sig); } break; case NDB_TRANSPORTER_DISCONNECT_ORD: { NdbTransporterDisconnectOrd * s = (NdbTransporterDisconnectOrd*)sig; OSE_Transporter * t = getTransporter(s->senderNodeId); if(t != 0){ if(t->disconnectOrd(s)){ clearWaitStack(s->senderNodeId); clearRecvBuffer(s->senderNodeId); } } free_buf(&sig); } } return true; } OSE_Transporter * OSE_Receiver::getTransporter(NodeId nodeId){ if(theTransporterRegistry->theTransporterTypes[nodeId] != tt_OSE_TRANSPORTER) return 0; return (OSE_Transporter *) theTransporterRegistry->theTransporters[nodeId]; } void OSE_Receiver::clearRecvBuffer(NodeId nodeId){ int tmpIndex = 0; union SIGNAL** tmp = new union SIGNAL * [recBufSize]; /** * Put all signal that I want to keep into tmp */ while(recBufReadIndex != recBufWriteIndex){ if(receiveBuffer[recBufReadIndex]->dataSignal.senderNodeId != nodeId){ tmp[tmpIndex] = receiveBuffer[recBufReadIndex]; tmpIndex++; } else { free_buf(&receiveBuffer[recBufReadIndex]); } recBufReadIndex = (recBufReadIndex + 1) % recBufSize; } /** * Put all signals that I kept back into receiveBuffer */ for(int i = 0; i<tmpIndex; i++) insertReceiveBuffer(tmp[i]); delete [] tmp; }