/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <my_pthread.h>

#include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp"

#include "Transporter.hpp"
#include <SocketAuthenticator.hpp>

#ifdef NDB_TCP_TRANSPORTER
#include "TCP_Transporter.hpp"
#endif

#ifdef NDB_OSE_TRANSPORTER
#include "OSE_Receiver.hpp"
#include "OSE_Transporter.hpp"
#endif

#ifdef NDB_SCI_TRANSPORTER
#include "SCI_Transporter.hpp"
#endif

#ifdef NDB_SHM_TRANSPORTER
#include "SHM_Transporter.hpp"
extern int g_ndb_shm_signum;
#endif

#include "TransporterCallback.hpp"
#include "NdbOut.hpp"
#include <NdbSleep.h>
#include <NdbTick.h>
#include <InputStream.hpp>
#include <OutputStream.hpp>

#include <mgmapi/mgmapi.h>
#include <mgmapi_internal.h>
#include <mgmapi/mgmapi_debug.h>

#include <EventLogger.hpp>
extern EventLogger g_eventLogger;

struct in_addr
TransporterRegistry::get_connect_address(NodeId node_id) const
{
  return theTransporters[node_id]->m_connect_address;
}

SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
  DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
  if (m_auth && !m_auth->server_authenticate(sockfd)){
    NDB_CLOSE_SOCKET(sockfd);
    DBUG_RETURN(0);
  }

  if (!m_transporter_registry->connect_server(sockfd))
  {
    NDB_CLOSE_SOCKET(sockfd);
    DBUG_RETURN(0);
  }

  DBUG_RETURN(0);
}

TransporterRegistry::TransporterRegistry(void * callback,
					 unsigned _maxTransporters,
					 unsigned sizeOfLongSignalMemory)
{
  DBUG_ENTER("TransporterRegistry::TransporterRegistry");

  nodeIdSpecified = false;
  maxTransporters = _maxTransporters;
  sendCounter = 1;
  m_mgm_handle= 0;
  
  callbackObj=callback;

  theTCPTransporters  = new TCP_Transporter * [maxTransporters];
  theSCITransporters  = new SCI_Transporter * [maxTransporters];
  theSHMTransporters  = new SHM_Transporter * [maxTransporters];
  theOSETransporters  = new OSE_Transporter * [maxTransporters];
  theTransporterTypes = new TransporterType   [maxTransporters];
  theTransporters     = new Transporter     * [maxTransporters];
  performStates       = new PerformState      [maxTransporters];
  ioStates            = new IOState           [maxTransporters]; 
  
  // Initialize member variables
  nTransporters    = 0;
  nTCPTransporters = 0;
  nSCITransporters = 0;
  nSHMTransporters = 0;
  nOSETransporters = 0;
  
  // Initialize the transporter arrays
  for (unsigned i=0; i<maxTransporters; i++) {
    theTCPTransporters[i] = NULL;
    theSCITransporters[i] = NULL;
    theSHMTransporters[i] = NULL;
    theOSETransporters[i] = NULL;
    theTransporters[i]    = NULL;
    performStates[i]      = DISCONNECTED;
    ioStates[i]           = NoHalt;
  }
  theOSEReceiver = 0;
  theOSEJunkSocketSend = 0;
  theOSEJunkSocketRecv = 0;

  DBUG_VOID_RETURN;
}

void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
{
  DBUG_ENTER("TransporterRegistry::set_mgm_handle");
  if (m_mgm_handle)
    ndb_mgm_destroy_handle(&m_mgm_handle);
  m_mgm_handle= h;
#ifndef DBUG_OFF
  if (h)
  {
    char buf[256];
    DBUG_PRINT("info",("handle set with connectstring: %s",
		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
  }
  else
  {
    DBUG_PRINT("info",("handle set to NULL"));
  }
#endif
  DBUG_VOID_RETURN;
}

TransporterRegistry::~TransporterRegistry()
{
  DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
  
  removeAll();
  
  delete[] theTCPTransporters;
  delete[] theSCITransporters;
  delete[] theSHMTransporters;
  delete[] theOSETransporters;
  delete[] theTransporterTypes;
  delete[] theTransporters;
  delete[] performStates;
  delete[] ioStates;

#ifdef NDB_OSE_TRANSPORTER
  if(theOSEReceiver != NULL){
    theOSEReceiver->destroyPhantom();
    delete theOSEReceiver;
    theOSEReceiver = 0;
  }
#endif
  if (m_mgm_handle)
    ndb_mgm_destroy_handle(&m_mgm_handle);

  DBUG_VOID_RETURN;
}

void
TransporterRegistry::removeAll(){
  for(unsigned i = 0; i<maxTransporters; i++){
    if(theTransporters[i] != NULL)
      removeTransporter(theTransporters[i]->getRemoteNodeId());
  }
}

void
TransporterRegistry::disconnectAll(){
  for(unsigned i = 0; i<maxTransporters; i++){
    if(theTransporters[i] != NULL)
      theTransporters[i]->doDisconnect();
  }
}

bool
TransporterRegistry::init(NodeId nodeId) {
  DBUG_ENTER("TransporterRegistry::init");
  nodeIdSpecified = true;
  localNodeId = nodeId;
  
  DEBUG("TransporterRegistry started node: " << localNodeId);
  
  DBUG_RETURN(true);
}

bool
TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
{
  DBUG_ENTER("TransporterRegistry::connect_server");

  // read node id from client
  // read transporter type
  int nodeId, remote_transporter_type= -1;
  SocketInputStream s_input(sockfd);
  char buf[256];
  if (s_input.gets(buf, 256) == 0) {
    DBUG_PRINT("error", ("Could not get node id from client"));
    DBUG_RETURN(false);
  }
  int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
  switch (r) {
  case 2:
    break;
  case 1:
    // we're running version prior to 4.1.9
    // ok, but with no checks on transporter configuration compatability
    break;
  default:
    DBUG_PRINT("error", ("Error in node id from client"));
    DBUG_RETURN(false);
  }

  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
		      nodeId,remote_transporter_type));

  //check that nodeid is valid and that there is an allocated transporter
  if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
    DBUG_PRINT("error", ("Node id out of range from client"));
    DBUG_RETURN(false);
  }
  if (theTransporters[nodeId] == 0) {
      DBUG_PRINT("error", ("No transporter for this node id from client"));
      DBUG_RETURN(false);
  }

  //check that the transporter should be connected
  if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
    DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
    DBUG_RETURN(false);
  }

  Transporter *t= theTransporters[nodeId];

  // send info about own id (just as response to acknowledge connection)
  // send info on own transporter type
  SocketOutputStream s_output(sockfd);
  s_output.println("%d %d", t->getLocalNodeId(), t->m_type);

  if (remote_transporter_type != -1)
  {
    if (remote_transporter_type != t->m_type)
    {
      DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
			   t->m_type, remote_transporter_type));
      g_eventLogger.error("Incompatible configuration: Transporter type "
			  "mismatch with node %d", nodeId);

      // wait for socket close for 1 second to let message arrive at client
      {
	fd_set a_set;
	FD_ZERO(&a_set);
	FD_SET(sockfd, &a_set);
	struct timeval timeout;
	timeout.tv_sec  = 1; timeout.tv_usec = 0;
	select(sockfd+1, &a_set, 0, 0, &timeout);
      }
      DBUG_RETURN(false);
    }
  }
  else if (t->m_type == tt_SHM_TRANSPORTER)
  {
    g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
  }

  // setup transporter (transporter responsible for closing sockfd)
  t->connect_server(sockfd);

  DBUG_RETURN(true);
}

bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER

  if(!nodeIdSpecified){
    init(config->localNodeId);
  }
  
  if(config->localNodeId != localNodeId) 
    return false;
  
  if(theTransporters[config->remoteNodeId] != NULL)
    return false;
   
  TCP_Transporter * t = new TCP_Transporter(*this,
					    config->tcp.sendBufferSize,
					    config->tcp.maxReceiveSize,
					    config->localHostName,
					    config->remoteHostName,
					    config->s_port,
					    config->isMgmConnection,
					    localNodeId,
					    config->remoteNodeId,
					    config->serverNodeId,
					    config->checksum,
					    config->signalId);
  if (t == NULL) 
    return false;
  else if (!t->initTransporter()) {
    delete t;
    return false;
  }

  // Put the transporter in the transporter arrays
  theTCPTransporters[nTCPTransporters]      = t;
  theTransporters[t->getRemoteNodeId()]     = t;
  theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  nTransporters++;
  nTCPTransporters++;

#if defined NDB_OSE || defined NDB_SOFTOSE
  t->theReceiverPid = theReceiverPid;
#endif
  
  return true;
#else
  return false;
#endif
}

bool
TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
#ifdef NDB_OSE_TRANSPORTER

  if(!nodeIdSpecified){
    init(conf->localNodeId);
  }
  
  if(conf->localNodeId != localNodeId)
    return false;
  
  if(theTransporters[conf->remoteNodeId] != NULL)
    return false;

  if(theOSEReceiver == NULL){
    theOSEReceiver = new OSE_Receiver(this,
				      10,
				      localNodeId);
  }
  
  OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
					    conf->ose.prioBSignalSize,
					    localNodeId,
					    conf->localHostName,
					    conf->remoteNodeId,
					    conf->serverNodeId,
					    conf->remoteHostName,
					    conf->checksum,
					    conf->signalId);
  if (t == NULL)
    return false;
  else if (!t->initTransporter()) {
    delete t;
    return false;
  }
  // Put the transporter in the transporter arrays
  theOSETransporters[nOSETransporters]      = t;
  theTransporters[t->getRemoteNodeId()]     = t;
  theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  
  nTransporters++;
  nOSETransporters++;

  return true;
#else
  return false;
#endif
}

bool
TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
#ifdef NDB_SCI_TRANSPORTER

  if(!SCI_Transporter::initSCI())
    abort();
  
  if(!nodeIdSpecified){
    init(config->localNodeId);
  }
  
  if(config->localNodeId != localNodeId)
    return false;
 
  if(theTransporters[config->remoteNodeId] != NULL)
    return false;
 
  SCI_Transporter * t = new SCI_Transporter(*this,
                                            config->localHostName,
                                            config->remoteHostName,
                                            config->s_port,
					    config->isMgmConnection,
                                            config->sci.sendLimit, 
					    config->sci.bufferSize,
					    config->sci.nLocalAdapters,
					    config->sci.remoteSciNodeId0,
					    config->sci.remoteSciNodeId1,
					    localNodeId,
					    config->remoteNodeId,
					    config->serverNodeId,
					    config->checksum,
					    config->signalId);
  
  if (t == NULL) 
    return false;
  else if (!t->initTransporter()) {
    delete t;
    return false;
  }
  // Put the transporter in the transporter arrays
  theSCITransporters[nSCITransporters]      = t;
  theTransporters[t->getRemoteNodeId()]     = t;
  theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  nTransporters++;
  nSCITransporters++;
  
  return true;
#else
  return false;
#endif
}

bool
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
  DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
  if(!nodeIdSpecified){
    init(config->localNodeId);
  }
  
  if(config->localNodeId != localNodeId)
    return false;
  
  if (!g_ndb_shm_signum) {
    g_ndb_shm_signum= config->shm.signum;
    DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
    /**
     * Make sure to block g_ndb_shm_signum
     *   TransporterRegistry::init is run from "main" thread
     */
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, g_ndb_shm_signum);
    pthread_sigmask(SIG_BLOCK, &mask, 0);
  }

  if(config->shm.signum != g_ndb_shm_signum)
    return false;
  
  if(theTransporters[config->remoteNodeId] != NULL)
    return false;

  SHM_Transporter * t = new SHM_Transporter(*this,
					    config->localHostName,
					    config->remoteHostName,
					    config->s_port,
					    config->isMgmConnection,
					    localNodeId,
					    config->remoteNodeId,
					    config->serverNodeId,
					    config->checksum,
					    config->signalId,
					    config->shm.shmKey,
					    config->shm.shmSize
					    );
  if (t == NULL)
    return false;
  else if (!t->initTransporter()) {
    delete t;
    return false;
  }
  // Put the transporter in the transporter arrays
  theSHMTransporters[nSHMTransporters]      = t;
  theTransporters[t->getRemoteNodeId()]     = t;
  theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
  performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  
  nTransporters++;
  nSHMTransporters++;

  DBUG_RETURN(true);
#else
  DBUG_RETURN(false);
#endif
}


void
TransporterRegistry::removeTransporter(NodeId nodeId) {

  DEBUG("Removing transporter from " << localNodeId
	<< " to " << nodeId);
  
  if(theTransporters[nodeId] == NULL)
    return;
  
  theTransporters[nodeId]->doDisconnect();
  
  const TransporterType type = theTransporterTypes[nodeId];

  int ind = 0;
  switch(type){
  case tt_TCP_TRANSPORTER:
#ifdef NDB_TCP_TRANSPORTER
    for(; ind < nTCPTransporters; ind++)
      if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
	break;
    ind++;
    for(; ind<nTCPTransporters; ind++)
      theTCPTransporters[ind-1] = theTCPTransporters[ind];
    nTCPTransporters --;
#endif
    break;
  case tt_SCI_TRANSPORTER:
#ifdef NDB_SCI_TRANSPORTER
    for(; ind < nSCITransporters; ind++)
      if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
	break;
    ind++;
    for(; ind<nSCITransporters; ind++)
      theSCITransporters[ind-1] = theSCITransporters[ind];
    nSCITransporters --;
#endif
    break;
  case tt_SHM_TRANSPORTER:
#ifdef NDB_SHM_TRANSPORTER
    for(; ind < nSHMTransporters; ind++)
      if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
	break;
    ind++;
    for(; ind<nSHMTransporters; ind++)
      theSHMTransporters[ind-1] = theSHMTransporters[ind];
    nSHMTransporters --;
#endif
    break;
  case tt_OSE_TRANSPORTER:
#ifdef NDB_OSE_TRANSPORTER
    for(; ind < nOSETransporters; ind++)
      if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)
	break;
    ind++;
    for(; ind<nOSETransporters; ind++)
      theOSETransporters[ind-1] = theOSETransporters[ind];
    nOSETransporters --;
#endif
    break;
  }
  
  nTransporters--;

  // Delete the transporter and remove it from theTransporters array
  delete theTransporters[nodeId];
  theTransporters[nodeId] = NULL;        
}

Uint32
TransporterRegistry::get_free_buffer(Uint32 node) const
{
  Transporter *t;
  if(likely((t = theTransporters[node]) != 0))
  {
    return t->get_free_buffer();
  }
  return 0;
}


SendStatus
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
				 Uint8 prio,
				 const Uint32 * const signalData,
				 NodeId nodeId, 
				 const LinearSectionPtr ptr[3]){


  Transporter *t = theTransporters[nodeId];
  if(t != NULL && 
     (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
      ((signalHeader->theReceiversBlockNumber == 252) ||
       (signalHeader->theReceiversBlockNumber == 4002)))) {
	 
    if(t->isConnected()){
      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
      if(lenBytes <= MAX_MESSAGE_SIZE){
	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
	if(insertPtr != 0){
	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
	  t->updateWritePtr(lenBytes, prio);
	  return SEND_OK;
	}

	int sleepTime = 2;	

	/**
	 * @note: on linux/i386 the granularity is 10ms
	 *        so sleepTime = 2 generates a 10 ms sleep.
	 */
	for(int i = 0; i<50; i++){
	  if((nSHMTransporters+nSCITransporters) == 0)
	    NdbSleep_MilliSleep(sleepTime); 
	  insertPtr = t->getWritePtr(lenBytes, prio);
	  if(insertPtr != 0){
	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
	    t->updateWritePtr(lenBytes, prio);
	    break;
	  }
	}
	
	if(insertPtr != 0){
	  /**
	   * Send buffer full, but resend works
	   */
	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
	  return SEND_OK;
	}
	
	WARNING("Signal to " << nodeId << " lost(buffer)");
	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
	return SEND_BUFFER_FULL;
      } else {
	return SEND_MESSAGE_TOO_BIG;
      }
    } else {
      DEBUG("Signal to " << nodeId << " lost(disconnect) ");
      return SEND_DISCONNECTED;
    }
  } else {
    DEBUG("Discarding message to block: " 
	  << signalHeader->theReceiversBlockNumber 
	  << " node: " << nodeId);
    
    if(t == NULL)
      return SEND_UNKNOWN_NODE;
    
    return SEND_BLOCKED;
  }
}

SendStatus
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
				 Uint8 prio,
				 const Uint32 * const signalData,
				 NodeId nodeId, 
				 class SectionSegmentPool & thePool,
				 const SegmentedSectionPtr ptr[3]){
  

  Transporter *t = theTransporters[nodeId];
  if(t != NULL && 
     (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
      ((signalHeader->theReceiversBlockNumber == 252)|| 
       (signalHeader->theReceiversBlockNumber == 4002)))) {
    
    if(t->isConnected()){
      Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
      if(lenBytes <= MAX_MESSAGE_SIZE){
	Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
	if(insertPtr != 0){
	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
	  t->updateWritePtr(lenBytes, prio);
	  return SEND_OK;
	}
	
	
	/**
	 * @note: on linux/i386 the granularity is 10ms
	 *        so sleepTime = 2 generates a 10 ms sleep.
	 */
	int sleepTime = 2;
	for(int i = 0; i<50; i++){
	  if((nSHMTransporters+nSCITransporters) == 0)
	    NdbSleep_MilliSleep(sleepTime); 
	  insertPtr = t->getWritePtr(lenBytes, prio);
	  if(insertPtr != 0){
	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
	    t->updateWritePtr(lenBytes, prio);
	    break;
	  }
	}
	
	if(insertPtr != 0){
	  /**
	   * Send buffer full, but resend works
	   */
	  reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
	  return SEND_OK;
	}
	
	WARNING("Signal to " << nodeId << " lost(buffer)");
	reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
	return SEND_BUFFER_FULL;
      } else {
	return SEND_MESSAGE_TOO_BIG;
      }
    } else {
      DEBUG("Signal to " << nodeId << " lost(disconnect) ");
      return SEND_DISCONNECTED;
    }
  } else {
    DEBUG("Discarding message to block: " 
	  << signalHeader->theReceiversBlockNumber 
	  << " node: " << nodeId);
    
    if(t == NULL)
      return SEND_UNKNOWN_NODE;
    
    return SEND_BLOCKED;
  }
}

void
TransporterRegistry::external_IO(Uint32 timeOutMillis) {
  //-----------------------------------------------------------
  // Most of the time we will send the buffers here and then wait
  // for new signals. Thus we start by sending without timeout
  // followed by the receive part where we expect to sleep for
  // a while.
  //-----------------------------------------------------------
  if(pollReceive(timeOutMillis)){
    performReceive();
  }
  performSend();
}

Uint32
TransporterRegistry::pollReceive(Uint32 timeOutMillis){
  Uint32 retVal = 0;
#ifdef NDB_OSE_TRANSPORTER
  retVal |= poll_OSE(timeOutMillis);
  retVal |= poll_TCP(0);
  return retVal;
#endif
  
  if((nSCITransporters) > 0)
  {
    timeOutMillis=0;
  }

#ifdef NDB_SHM_TRANSPORTER
  if(nSHMTransporters > 0)
  {
    Uint32 res = poll_SHM(0);
    if(res)
    {
      retVal |= res;
      timeOutMillis = 0;
    }
  }
#endif

#ifdef NDB_TCP_TRANSPORTER
  if(nTCPTransporters > 0 || retVal == 0)
  {
    retVal |= poll_TCP(timeOutMillis);
  }
  else
    tcpReadSelectReply = 0;
#endif
#ifdef NDB_SCI_TRANSPORTER
  if(nSCITransporters > 0)
    retVal |= poll_SCI(timeOutMillis);
#endif
#ifdef NDB_SHM_TRANSPORTER
  if(nSHMTransporters > 0 && retVal == 0)
  {
    int res = poll_SHM(0);
    retVal |= res;
  }
#endif
  return retVal;
}


#ifdef NDB_SCI_TRANSPORTER
Uint32
TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
{
  for (int i=0; i<nSCITransporters; i++) {
    SCI_Transporter * t = theSCITransporters[i];
    if (t->isConnected()) {
      if(t->hasDataToRead())
	return 1;
    }
  }
  return 0;
}
#endif


#ifdef NDB_SHM_TRANSPORTER
static int g_shm_counter = 0;
Uint32
TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
{  
  for(int j=0; j < 100; j++)
  {
    for (int i=0; i<nSHMTransporters; i++) {
      SHM_Transporter * t = theSHMTransporters[i];
      if (t->isConnected()) {
	if(t->hasDataToRead()) {
	  return 1;
	}
      }
    }
  }
  return 0;
}
#endif

#ifdef NDB_OSE_TRANSPORTER
Uint32
TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
{
  if(theOSEReceiver != NULL){
    return theOSEReceiver->doReceive(timeOutMillis);
  }
  NdbSleep_MilliSleep(timeOutMillis);
  return 0;
}
#endif

#ifdef NDB_TCP_TRANSPORTER
Uint32 
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
{
  if (false && nTCPTransporters == 0)
  {
    tcpReadSelectReply = 0;
    return 0;
  }
  
  struct timeval timeout;
#ifdef NDB_OSE
  // Return directly if there are no TCP transporters configured
  
  if(timeOutMillis <= 1){
    timeout.tv_sec  = 0;
    timeout.tv_usec = 1025;
  } else {
    timeout.tv_sec  = timeOutMillis / 1000;
    timeout.tv_usec = (timeOutMillis % 1000) * 1000;
  }
#else  
  timeout.tv_sec  = timeOutMillis / 1000;
  timeout.tv_usec = (timeOutMillis % 1000) * 1000;
#endif

  NDB_SOCKET_TYPE maxSocketValue = -1;
  
  // Needed for TCP/IP connections
  // The read- and writeset are used by select
  
  FD_ZERO(&tcpReadset);

  // Prepare for sending and receiving
  for (int i = 0; i < nTCPTransporters; i++) {
    TCP_Transporter * t = theTCPTransporters[i];
    
    // If the transporter is connected
    if (t->isConnected()) {
      
      const NDB_SOCKET_TYPE socket = t->getSocket();
      // Find the highest socket value. It will be used by select
      if (socket > maxSocketValue)
	maxSocketValue = socket;
      
      // Put the connected transporters in the socket read-set 
      FD_SET(socket, &tcpReadset);
    }
  }
  
  // The highest socket value plus one
  maxSocketValue++; 
  
  tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  
  if(false && tcpReadSelectReply == -1 && errno == EINTR)
    ndbout_c("woke-up by signal");

#ifdef NDB_WIN32
  if(tcpReadSelectReply == SOCKET_ERROR)
  {
    NdbSleep_MilliSleep(timeOutMillis);
  }
#endif
  
  return tcpReadSelectReply;
}
#endif


void
TransporterRegistry::performReceive()
{
#ifdef NDB_OSE_TRANSPORTER
  if(theOSEReceiver != 0)
  {
    while(theOSEReceiver->hasData())
    {
      NodeId remoteNodeId;
      Uint32 * readPtr;
      Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
      transporter_recv_from(callbackObj, remoteNodeId);
      Uint32 szUsed = unpack(readPtr,
			     sz,
			     remoteNodeId,
			     ioStates[remoteNodeId]);
#ifdef DEBUG_TRANSPORTER
      /**
       * OSE transporter can handle executions of
       *   half signals
       */
      assert(sz == szUsed);
#endif
      theOSEReceiver->updateReceiveDataPtr(szUsed);
      theOSEReceiver->doReceive(0);
      //      checkJobBuffer();
    }
  }
#endif

#ifdef NDB_TCP_TRANSPORTER
  if(tcpReadSelectReply > 0)
  {
    for (int i=0; i<nTCPTransporters; i++) 
    {
      checkJobBuffer();
      TCP_Transporter *t = theTCPTransporters[i];
      const NodeId nodeId = t->getRemoteNodeId();
      const NDB_SOCKET_TYPE socket    = t->getSocket();
      if(is_connected(nodeId)){
	if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) 
	{
	  const int receiveSize = t->doReceive();
	  if(receiveSize > 0)
	  {
	    Uint32 * ptr;
	    Uint32 sz = t->getReceiveData(&ptr);
	    transporter_recv_from(callbackObj, nodeId);
	    Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
	    t->updateReceiveDataPtr(szUsed);
          }
	}
      } 
    }
  }
#endif
  
#ifdef NDB_SCI_TRANSPORTER
  //performReceive
  //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
  for (int i=0; i<nSCITransporters; i++) 
  {
    checkJobBuffer();
    SCI_Transporter  *t = theSCITransporters[i];
    const NodeId nodeId = t->getRemoteNodeId();
    if(is_connected(nodeId))
    {
      if(t->isConnected() && t->checkConnected())
      {
	Uint32 * readPtr, * eodPtr;
	t->getReceivePtr(&readPtr, &eodPtr);
	transporter_recv_from(callbackObj, nodeId);
	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
	t->updateReceivePtr(newPtr);
      }
    } 
  }
#endif
#ifdef NDB_SHM_TRANSPORTER
  for (int i=0; i<nSHMTransporters; i++) 
  {
    checkJobBuffer();
    SHM_Transporter *t = theSHMTransporters[i];
    const NodeId nodeId = t->getRemoteNodeId();
    if(is_connected(nodeId)){
      if(t->isConnected() && t->checkConnected())
      {
	Uint32 * readPtr, * eodPtr;
	t->getReceivePtr(&readPtr, &eodPtr);
	transporter_recv_from(callbackObj, nodeId);
	Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
	t->updateReceivePtr(newPtr);
      }
    } 
  }
#endif
}

static int x = 0;
void
TransporterRegistry::performSend()
{
  int i; 
  sendCounter = 1;
  
#ifdef NDB_OSE_TRANSPORTER
  for (int i = 0; i < nOSETransporters; i++)
  {
    OSE_Transporter *t = theOSETransporters[i]; 
    if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
    {
      t->doSend();
    }//if
  }//for
#endif
  
#ifdef NDB_TCP_TRANSPORTER
#ifdef NDB_OSE
  {
    int maxSocketValue = 0;
    
    // Needed for TCP/IP connections
    // The writeset are used by select
    fd_set writeset;
    FD_ZERO(&writeset);
    
    // Prepare for sending and receiving
    for (i = 0; i < nTCPTransporters; i++) {
      TCP_Transporter * t = theTCPTransporters[i];
      
      // If the transporter is connected
      if ((t->hasDataToSend()) && (t->isConnected())) {
	const int socket = t->getSocket();
	// Find the highest socket value. It will be used by select
	if (socket > maxSocketValue) {
	  maxSocketValue = socket;
	}//if
	FD_SET(socket, &writeset);
      }//if
    }//for
    
    // The highest socket value plus one
    if(maxSocketValue == 0)
      return;
    
    maxSocketValue++; 
    struct timeval timeout = { 0, 1025 };
    Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
    
    if (tmp == 0) 
    {
      return;
    }//if
    for (i = 0; i < nTCPTransporters; i++) {
      TCP_Transporter *t = theTCPTransporters[i];
      const NodeId nodeId = t->getRemoteNodeId();
      const int socket    = t->getSocket();
      if(is_connected(nodeId)){
	  if(t->isConnected() && FD_ISSET(socket, &writeset)) {
	    t->doSend();
	  }//if
	}//if
      }//for
    }
#endif
#ifdef NDB_TCP_TRANSPORTER
  for (i = x; i < nTCPTransporters; i++) 
  {
    TCP_Transporter *t = theTCPTransporters[i];
    if (t && t->hasDataToSend() && t->isConnected() &&
	is_connected(t->getRemoteNodeId())) 
    {
      t->doSend();
    }
  }
  for (i = 0; i < x && i < nTCPTransporters; i++) 
  {
    TCP_Transporter *t = theTCPTransporters[i];
    if (t && t->hasDataToSend() && t->isConnected() &&
	is_connected(t->getRemoteNodeId())) 
    {
      t->doSend();
    }
  }
  x++;
  if (x == nTCPTransporters) x = 0;
#endif
#endif
#ifdef NDB_SCI_TRANSPORTER
  //scroll through the SCI transporters, 
  // get each transporter, check if connected, send data
  for (i=0; i<nSCITransporters; i++) {
    SCI_Transporter  *t = theSCITransporters[i];
    const NodeId nodeId = t->getRemoteNodeId();
    
    if(is_connected(nodeId))
    {
      if(t->isConnected() && t->hasDataToSend()) {
	t->doSend();
      } //if
    } //if
  }
#endif
  
#ifdef NDB_SHM_TRANSPORTER
  for (i=0; i<nSHMTransporters; i++) 
  {
    SHM_Transporter  *t = theSHMTransporters[i];
    const NodeId nodeId = t->getRemoteNodeId();
    if(is_connected(nodeId))
    {
      if(t->isConnected())
      {
	t->doSend();
      }
    }
  }
#endif
}

int
TransporterRegistry::forceSendCheck(int sendLimit){
  int tSendCounter = sendCounter;
  sendCounter = tSendCounter + 1;
  if (tSendCounter >= sendLimit) {
    performSend();
    sendCounter = 1;
    return 1;
  }//if
  return 0;
}//TransporterRegistry::forceSendCheck()

#ifdef DEBUG_TRANSPORTER
void
TransporterRegistry::printState(){
  ndbout << "-- TransporterRegistry -- " << endl << endl
	 << "Transporters = " << nTransporters << endl;
  for(int i = 0; i<maxTransporters; i++)
    if(theTransporters[i] != NULL){
      const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
      ndbout << "Transporter: " << remoteNodeId 
	     << " PerformState: " << performStates[remoteNodeId]
	     << " IOState: " << ioStates[remoteNodeId] << endl;
    }
}
#endif

IOState
TransporterRegistry::ioState(NodeId nodeId) { 
  return ioStates[nodeId]; 
}

void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
  DEBUG("TransporterRegistry::setIOState("
	<< nodeId << ", " << state << ")");
  ioStates[nodeId] = state;
}

static void * 
run_start_clients_C(void * me)
{
  ((TransporterRegistry*) me)->start_clients_thread();
  return 0;
}

// Run by kernel thread
void
TransporterRegistry::do_connect(NodeId node_id)
{
  PerformState &curr_state = performStates[node_id];
  switch(curr_state){
  case DISCONNECTED:
    break;
  case CONNECTED:
    return;
  case CONNECTING:
    return;
  case DISCONNECTING:
    break;
  }
  DBUG_ENTER("TransporterRegistry::do_connect");
  DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
  curr_state= CONNECTING;
  DBUG_VOID_RETURN;
}
void
TransporterRegistry::do_disconnect(NodeId node_id)
{
  PerformState &curr_state = performStates[node_id];
  switch(curr_state){
  case DISCONNECTED:
    return;
  case CONNECTED:
    break;
  case CONNECTING:
    break;
  case DISCONNECTING:
    return;
  }
  DBUG_ENTER("TransporterRegistry::do_disconnect");
  DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
  curr_state= DISCONNECTING;
  DBUG_VOID_RETURN;
}

void
TransporterRegistry::report_connect(NodeId node_id)
{
  DBUG_ENTER("TransporterRegistry::report_connect");
  DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
  performStates[node_id] = CONNECTED;
  reportConnect(callbackObj, node_id);
  DBUG_VOID_RETURN;
}

void
TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
{
  DBUG_ENTER("TransporterRegistry::report_disconnect");
  DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
  performStates[node_id] = DISCONNECTED;
  reportDisconnect(callbackObj, node_id, errnum);
  DBUG_VOID_RETURN;
}

void
TransporterRegistry::update_connections()
{
  for (int i= 0, n= 0; n < nTransporters; i++){
    Transporter * t = theTransporters[i];
    if (!t)
      continue;
    n++;

    const NodeId nodeId = t->getRemoteNodeId();
    switch(performStates[nodeId]){
    case CONNECTED:
    case DISCONNECTED:
      break;
    case CONNECTING:
      if(t->isConnected())
	report_connect(nodeId);
      break;
    case DISCONNECTING:
      if(!t->isConnected())
	report_disconnect(nodeId, 0);
      break;
    }
  }
}

// run as own thread
void
TransporterRegistry::start_clients_thread()
{
  DBUG_ENTER("TransporterRegistry::start_clients_thread");
  while (m_run_start_clients_thread) {
    NdbSleep_MilliSleep(100);
    for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
      Transporter * t = theTransporters[i];
      if (!t)
	continue;
      n++;

      const NodeId nodeId = t->getRemoteNodeId();
      switch(performStates[nodeId]){
      case CONNECTING:
	if(!t->isConnected() && !t->isServer) {
	  bool connected= false;
	  /**
	   * First, we try to connect (if we have a port number).
	   */
	  if (t->get_s_port())
	    connected= t->connect_client();

	  /**
	   * If dynamic, get the port for connecting from the management server
	   */
	  if( !connected && t->get_s_port() <= 0) {	// Port is dynamic
	    int server_port= 0;
	    struct ndb_mgm_reply mgm_reply;

	    if(!ndb_mgm_is_connected(m_mgm_handle))
	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
	    
	    if(ndb_mgm_is_connected(m_mgm_handle))
	    {
	      int res=
		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
						     t->getRemoteNodeId(),
						     t->getLocalNodeId(),
						     CFG_CONNECTION_SERVER_PORT,
						     &server_port,
						     &mgm_reply);
	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
				 server_port,t->getRemoteNodeId(),
				 t->getLocalNodeId(),res));
	      if( res >= 0 )
	      {
		/**
		 * Server_port == 0 just means that that a mgmt server
		 * has not received a new port yet. Keep the old.
		 */
		if (server_port)
		  t->set_s_port(server_port);
	      }
	      else if(ndb_mgm_is_connected(m_mgm_handle))
	      {
		ndbout_c("Failed to get dynamic port to connect to: %d", res);
		ndb_mgm_disconnect(m_mgm_handle);
	      }
	      else
	      {
		ndbout_c("Management server closed connection early. "
			 "It is probably being shut down (or has problems). "
			 "We will retry the connection.");
	      }
	    }
	    /** else
	     * We will not be able to get a new port unless
	     * the m_mgm_handle is connected. Note that not
	     * being connected is an ok state, just continue
	     * until it is able to connect. Continue using the
	     * old port until we can connect again and get a
	     * new port.
	     */
	  }
	}
	break;
      case DISCONNECTING:
	if(t->isConnected())
	  t->doDisconnect();
	break;
      default:
	break;
      }
    }
  }
  DBUG_VOID_RETURN;
}

bool
TransporterRegistry::start_clients()
{
  m_run_start_clients_thread= true;
  m_start_clients_thread= NdbThread_Create(run_start_clients_C,
					   (void**)this,
					   32768,
					   "ndb_start_clients",
					   NDB_THREAD_PRIO_LOW);
  if (m_start_clients_thread == 0) {
    m_run_start_clients_thread= false;
    return false;
  }
  return true;
}

bool
TransporterRegistry::stop_clients()
{
  if (m_start_clients_thread) {
    m_run_start_clients_thread= false;
    void* status;
    NdbThread_WaitFor(m_start_clients_thread, &status);
    NdbThread_Destroy(&m_start_clients_thread);
  }
  return true;
}

void
TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
					       const char *interf, 
					       int s_port)
{
  DBUG_ENTER("TransporterRegistry::add_transporter_interface");
  DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
  if (interf && strlen(interf) == 0)
    interf= 0;

  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
  {
    Transporter_interface &tmp= m_transporter_interface[i];
    if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
      continue;
    if (interf != 0 && tmp.m_interface != 0 &&
	strcmp(interf, tmp.m_interface) == 0)
    {
      DBUG_VOID_RETURN; // found match, no need to insert
    }
    if (interf == 0 && tmp.m_interface == 0)
    {
      DBUG_VOID_RETURN; // found match, no need to insert
    }
  }
  Transporter_interface t;
  t.m_remote_nodeId= remoteNodeId;
  t.m_s_service_port= s_port;
  t.m_interface= interf;
  m_transporter_interface.push_back(t);
  DBUG_PRINT("exit",("interface and port added"));
  DBUG_VOID_RETURN;
}

bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
  struct ndb_mgm_reply mgm_reply;

  DBUG_ENTER("TransporterRegistry::start_service");
  if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
  {
    ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
    DBUG_RETURN(false);
  }

  for (unsigned i= 0; i < m_transporter_interface.size(); i++)
  {
    Transporter_interface &t= m_transporter_interface[i];

    unsigned short port= (unsigned short)t.m_s_service_port;
    if(t.m_s_service_port<0)
      port= -t.m_s_service_port; // is a dynamic port
    TransporterService *transporter_service =
      new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
    if(!socket_server.setup(transporter_service,
			    &port, t.m_interface))
    {
      DBUG_PRINT("info", ("Trying new port"));
      port= 0;
      if(t.m_s_service_port>0
	 || !socket_server.setup(transporter_service,
				 &port, t.m_interface))
      {
	/*
	 * If it wasn't a dynamically allocated port, or
	 * our attempts at getting a new dynamic port failed
	 */
	ndbout_c("Unable to setup transporter service port: %s:%d!\n"
		 "Please check if the port is already used,\n"
		 "(perhaps the node is already running)",
		 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
	delete transporter_service;
	DBUG_RETURN(false);
      }
    }
    t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
    DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
    transporter_service->setTransporterRegistry(this);
  }
  DBUG_RETURN(true);
}

#ifdef NDB_SHM_TRANSPORTER
static
RETSIGTYPE 
shm_sig_handler(int signo)
{
  g_shm_counter++;
}
#endif

void
TransporterRegistry::startReceiving()
{
  DBUG_ENTER("TransporterRegistry::startReceiving");
#ifdef NDB_OSE_TRANSPORTER
  if(theOSEReceiver != NULL){
    theOSEReceiver->createPhantom();
  }
#endif

#ifdef NDB_OSE
  theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);
#endif

#if defined NDB_OSE || defined NDB_SOFTOSE
  theReceiverPid = current_process();
  for(int i = 0; i<nTCPTransporters; i++)
    theTCPTransporters[i]->theReceiverPid = theReceiverPid;
#endif

#ifdef NDB_SHM_TRANSPORTER
  m_shm_own_pid = getpid();
  if (g_ndb_shm_signum)
  {
    DBUG_PRINT("info",("Install signal handler for signum %d",
		       g_ndb_shm_signum));
    struct sigaction sa;
    sigemptyset(&sa.sa_mask);
    sigaddset(&sa.sa_mask, g_ndb_shm_signum);
    pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
    sa.sa_handler = shm_sig_handler;
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    int ret;
    while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
    if(ret != 0)
    {
      DBUG_PRINT("error",("Install failed"));
      g_eventLogger.error("Failed to install signal handler for"
			  " SHM transporter, signum %d, errno: %d (%s)",
			  g_ndb_shm_signum, errno, strerror(errno));
    }
  }
#endif // NDB_SHM_TRANSPORTER
  DBUG_VOID_RETURN;
}

void
TransporterRegistry::stopReceiving(){
#ifdef NDB_OSE_TRANSPORTER
  if(theOSEReceiver != NULL){
    theOSEReceiver->destroyPhantom();
  }
#endif

  /**
   * Disconnect all transporters, this includes detach from remote node
   * and since that must be done from the same process that called attach
   * it's done here in the receive thread
   */
  disconnectAll();

#if defined NDB_OSE || defined NDB_SOFTOSE
  if(theOSEJunkSocketRecv > 0)
    close(theOSEJunkSocketRecv);
  theOSEJunkSocketRecv = -1;
#endif

}

void
TransporterRegistry::startSending(){
#if defined NDB_OSE || defined NDB_SOFTOSE
  theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);
#endif
}

void
TransporterRegistry::stopSending(){
#if defined NDB_OSE || defined NDB_SOFTOSE
  if(theOSEJunkSocketSend > 0)
    close(theOSEJunkSocketSend);
  theOSEJunkSocketSend = -1;
#endif
}

NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
  out << "-- Signal Header --" << endl;
  out << "theLength:    " << sh.theLength << endl;
  out << "gsn:          " << sh.theVerId_signalNumber << endl;
  out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
  out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
  out << "sendersSig:   " << sh.theSendersSignalId << endl;
  out << "theSignalId:  " << sh.theSignalId << endl;
  out << "trace:        " << (int)sh.theTrace << endl;
  return out;
} 

Transporter*
TransporterRegistry::get_transporter(NodeId nodeId) {
  return theTransporters[nodeId];
}

bool TransporterRegistry::connect_client(NdbMgmHandle *h)
{
  DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");

  Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);

  if(!mgm_nodeid)
  {
    ndbout_c("%s: %d", __FILE__, __LINE__);
    return false;
  }
  Transporter * t = theTransporters[mgm_nodeid];
  if (!t)
  {
    ndbout_c("%s: %d", __FILE__, __LINE__);
    return false;
  }
  DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
}

/**
 * Given a connected NdbMgmHandle, turns it into a transporter
 * and returns the socket.
 */
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
{
  struct ndb_mgm_reply mgm_reply;

  if ( h==NULL || *h == NULL )
  {
    ndbout_c("%s: %d", __FILE__, __LINE__);
    return NDB_INVALID_SOCKET;
  }

  for(unsigned int i=0;i < m_transporter_interface.size();i++)
    if (m_transporter_interface[i].m_s_service_port < 0
	&& ndb_mgm_set_connection_int_parameter(*h,
				   get_localNodeId(),
				   m_transporter_interface[i].m_remote_nodeId,
				   CFG_CONNECTION_SERVER_PORT,
				   m_transporter_interface[i].m_s_service_port,
				   &mgm_reply) < 0)
    {
      ndbout_c("Error: %s: %d",
	       ndb_mgm_get_latest_error_desc(*h),
	       ndb_mgm_get_latest_error(*h));
      ndbout_c("%s: %d", __FILE__, __LINE__);
      ndb_mgm_destroy_handle(h);
      return NDB_INVALID_SOCKET;
    }

  /**
   * convert_to_transporter also disposes of the handle (i.e. we don't leak
   * memory here.
   */
  NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
  if ( sockfd == NDB_INVALID_SOCKET)
  {
    ndbout_c("Error: %s: %d",
	     ndb_mgm_get_latest_error_desc(*h),
	     ndb_mgm_get_latest_error(*h));
    ndbout_c("%s: %d", __FILE__, __LINE__);
    ndb_mgm_destroy_handle(h);
  }
  return sockfd;
}

/**
 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
 * and returns the socket.
 */
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
{
  NdbMgmHandle h= ndb_mgm_create_handle();

  if ( h == NULL )
  {
    return NDB_INVALID_SOCKET;
  }

  /**
   * Set connectstring
   */
  {
    BaseString cs;
    cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
    ndb_mgm_set_connectstring(h, cs.c_str());
  }

  if(ndb_mgm_connect(h, 0, 0, 0)<0)
  {
    ndb_mgm_destroy_handle(&h);
    return NDB_INVALID_SOCKET;
  }

  return connect_ndb_mgmd(&h);
}

template class Vector<TransporterRegistry::Transporter_interface>;