/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <my_pthread.h>

#include "MgmtSrvr.hpp"
#include "MgmtErrorReporter.hpp"
#include <ConfigRetriever.hpp>

#include <NdbOut.hpp>
#include <NdbApiSignal.hpp>
#include <kernel_types.h>
#include <RefConvert.hpp>
#include <BlockNumbers.h>
#include <GlobalSignalNumbers.h>
#include <signaldata/TestOrd.hpp>
#include <signaldata/TamperOrd.hpp>
#include <signaldata/StartOrd.hpp>
#include <signaldata/ApiVersion.hpp>
#include <signaldata/ResumeReq.hpp>
#include <signaldata/SetLogLevelOrd.hpp>
#include <signaldata/EventSubscribeReq.hpp>
#include <signaldata/EventReport.hpp>
#include <signaldata/DumpStateOrd.hpp>
#include <signaldata/BackupSignalData.hpp>
#include <signaldata/ManagementServer.hpp>
#include <signaldata/NFCompleteRep.hpp>
#include <signaldata/NodeFailRep.hpp>
#include <signaldata/AllocNodeId.hpp>
#include <NdbSleep.h>
#include <EventLogger.hpp>
#include <DebuggerNames.hpp>
#include <ndb_version.h>

#include <SocketServer.hpp>
#include <NdbConfig.h>

#include <NdbAutoPtr.hpp>

#include <ndberror.h>

#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
#include <m_string.h>

#include <SignalSender.hpp>

extern bool g_StopServer;
extern bool g_RestartServer;

//#define MGM_SRV_DEBUG
#ifdef MGM_SRV_DEBUG
#define DEBUG(x) do ndbout << x << endl; while(0)
#else
#define DEBUG(x)
#endif

#define INIT_SIGNAL_SENDER(ss,nodeId) \
  SignalSender ss(theFacade); \
  ss.lock(); /* lock will be released on exit */ \
  {\
    int result = okToSendTo(nodeId, true);\
    if (result != 0) {\
      return result;\
    }\
  }

extern int global_flag_send_heartbeat_now;
extern int g_no_nodeid_checks;
extern my_bool opt_core;

static void require(bool v)
{
  if(!v)
  {
    if (opt_core)
      abort();
    else
      exit(-1);
  }
}

void *
MgmtSrvr::logLevelThread_C(void* m)
{
  MgmtSrvr *mgm = (MgmtSrvr*)m;
  mgm->logLevelThreadRun();
  return 0;
}

extern EventLogger g_eventLogger;

static NdbOut&
operator<<(NdbOut& out, const LogLevel & ll)
{
  out << "[LogLevel: ";
  for(size_t i = 0; i<LogLevel::LOGLEVEL_CATEGORIES; i++)
    out << ll.getLogLevel((LogLevel::EventCategory)i) << " ";
  out << "]";
  return out;
}

void
MgmtSrvr::logLevelThreadRun() 
{
  while (!_isStopThread) {
    /**
     * Handle started nodes
     */
    EventSubscribeReq req;
    req = m_event_listner[0].m_logLevel;
    req.blockRef = _ownReference;

    SetLogLevelOrd ord;
    
    m_started_nodes.lock();
    while(m_started_nodes.size() > 0){
      Uint32 node = m_started_nodes[0];
      m_started_nodes.erase(0, false);
      m_started_nodes.unlock();

      setEventReportingLevelImpl(node, req);
      
      ord = m_nodeLogLevel[node];
      setNodeLogLevelImpl(node, ord);
      
      m_started_nodes.lock();
    }				 
    m_started_nodes.unlock();
    
    m_log_level_requests.lock();
    while(m_log_level_requests.size() > 0){
      req = m_log_level_requests[0];
      m_log_level_requests.erase(0, false);
      m_log_level_requests.unlock();
      
      LogLevel tmp;
      tmp = req;
      
      if(req.blockRef == 0){
	req.blockRef = _ownReference;
	setEventReportingLevelImpl(0, req);
      } else {
	ord = req;
	setNodeLogLevelImpl(req.blockRef, ord);
      }
      m_log_level_requests.lock();
    }      
    m_log_level_requests.unlock();
    NdbSleep_MilliSleep(_logLevelThreadSleep);  
  }
}

void
MgmtSrvr::startEventLog() 
{
  NdbMutex_Lock(m_configMutex);

  g_eventLogger.setCategory("MgmSrvr");

  ndb_mgm_configuration_iterator 
    iter(* _config->m_configValues, CFG_SECTION_NODE);

  if(iter.find(CFG_NODE_ID, _ownNodeId) != 0){
    NdbMutex_Unlock(m_configMutex);
    return;
  }
  
  const char * tmp;
  char errStr[100];
  int err= 0;
  BaseString logdest;
  char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId);
  NdbAutoPtr<char> tmp_aptr(clusterLog);

  if(iter.get(CFG_LOG_DESTINATION, &tmp) == 0){
    logdest.assign(tmp);
  }
  NdbMutex_Unlock(m_configMutex);
  
  if(logdest.length() == 0 || logdest == "") {
    logdest.assfmt("FILE:filename=%s,maxsize=1000000,maxfiles=6", 
		   clusterLog);
  }
  errStr[0]='\0';
  if(!g_eventLogger.addHandler(logdest, &err, sizeof(errStr), errStr)) {
    ndbout << "Warning: could not add log destination \""
           << logdest.c_str() << "\". Reason: ";
    if(err)
      ndbout << strerror(err);
    if(err && errStr[0]!='\0')
      ndbout << ", ";
    if(errStr[0]!='\0')
      ndbout << errStr;
    ndbout << endl;
  }
}

void 
MgmtSrvr::stopEventLog() 
{
  // Nothing yet
}

class ErrorItem 
{
public:
  int _errorCode;
  const char * _errorText;
};

bool
MgmtSrvr::setEventLogFilter(int severity, int enable)
{
  Logger::LoggerLevel level = (Logger::LoggerLevel)severity;
  if (enable > 0) {
    g_eventLogger.enable(level);
  } else if (enable == 0) {
    g_eventLogger.disable(level);
  } else if (g_eventLogger.isEnable(level)) {
    g_eventLogger.disable(level);
  } else {
    g_eventLogger.enable(level);
  }
  return g_eventLogger.isEnable(level);
}

bool 
MgmtSrvr::isEventLogFilterEnabled(int severity) 
{
  return g_eventLogger.isEnable((Logger::LoggerLevel)severity);
}

static ErrorItem errorTable[] = 
{
  {MgmtSrvr::NO_CONTACT_WITH_PROCESS, "No contact with the process (dead ?)."},
  {MgmtSrvr::PROCESS_NOT_CONFIGURED, "The process is not configured."},
  {MgmtSrvr::WRONG_PROCESS_TYPE, 
   "The process has wrong type. Expected a DB process."},
  {MgmtSrvr::COULD_NOT_ALLOCATE_MEMORY, "Could not allocate memory."},
  {MgmtSrvr::SEND_OR_RECEIVE_FAILED, "Send to process or receive failed."},
  {MgmtSrvr::INVALID_LEVEL, "Invalid level. Should be between 1 and 30."},
  {MgmtSrvr::INVALID_ERROR_NUMBER, "Invalid error number. Should be >= 0."},
  {MgmtSrvr::INVALID_TRACE_NUMBER, "Invalid trace number."},
  {MgmtSrvr::NOT_IMPLEMENTED, "Not implemented."},
  {MgmtSrvr::INVALID_BLOCK_NAME, "Invalid block name"},

  {MgmtSrvr::CONFIG_PARAM_NOT_EXIST, 
   "The configuration parameter does not exist for the process type."},
  {MgmtSrvr::CONFIG_PARAM_NOT_UPDATEABLE, 
   "The configuration parameter is not possible to update."},
  {MgmtSrvr::VALUE_WRONG_FORMAT_INT_EXPECTED, 
   "Incorrect value. Expected integer."},
  {MgmtSrvr::VALUE_TOO_LOW, "Value is too low."},
  {MgmtSrvr::VALUE_TOO_HIGH, "Value is too high."},
  {MgmtSrvr::VALUE_WRONG_FORMAT_BOOL_EXPECTED, 
   "Incorrect value. Expected TRUE or FALSE."},

  {MgmtSrvr::CONFIG_FILE_OPEN_WRITE_ERROR, 
   "Could not open configuration file for writing."},
  {MgmtSrvr::CONFIG_FILE_OPEN_READ_ERROR, 
   "Could not open configuration file for reading."},
  {MgmtSrvr::CONFIG_FILE_WRITE_ERROR, 
   "Write error when writing configuration file."},
  {MgmtSrvr::CONFIG_FILE_READ_ERROR, 
   "Read error when reading configuration file."},
  {MgmtSrvr::CONFIG_FILE_CLOSE_ERROR, "Could not close configuration file."},

  {MgmtSrvr::CONFIG_CHANGE_REFUSED_BY_RECEIVER, 
   "The change was refused by the receiving process."},
  {MgmtSrvr::COULD_NOT_SYNC_CONFIG_CHANGE_AGAINST_PHYSICAL_MEDIUM, 
   "The change could not be synced against physical medium."},
  {MgmtSrvr::CONFIG_FILE_CHECKSUM_ERROR, 
   "The config file is corrupt. Checksum error."},
  {MgmtSrvr::NOT_POSSIBLE_TO_SEND_CONFIG_UPDATE_TO_PROCESS_TYPE, 
   "It is not possible to send an update of a configuration variable "
   "to this kind of process."},
  {MgmtSrvr::NODE_SHUTDOWN_IN_PROGESS, "Node shutdown in progress" },
  {MgmtSrvr::SYSTEM_SHUTDOWN_IN_PROGRESS, "System shutdown in progress" },
  {MgmtSrvr::NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH,
   "Node shutdown would cause system crash" },
  {MgmtSrvr::UNSUPPORTED_NODE_SHUTDOWN,
   "Unsupported multi node shutdown. Abort option required." },
  {MgmtSrvr::NODE_NOT_API_NODE, "The specified node is not an API node." },
  {MgmtSrvr::OPERATION_NOT_ALLOWED_START_STOP, 
   "Operation not allowed while nodes are starting or stopping."},
  {MgmtSrvr::NO_CONTACT_WITH_DB_NODES, "No contact with database nodes" }
};

int MgmtSrvr::translateStopRef(Uint32 errCode)
{
  switch(errCode){
  case StopRef::NodeShutdownInProgress:
    return NODE_SHUTDOWN_IN_PROGESS;
    break;
  case StopRef::SystemShutdownInProgress:
    return SYSTEM_SHUTDOWN_IN_PROGRESS;
    break;
  case StopRef::NodeShutdownWouldCauseSystemCrash:
    return NODE_SHUTDOWN_WOULD_CAUSE_SYSTEM_CRASH;
    break;
  case StopRef::UnsupportedNodeShutdown:
    return UNSUPPORTED_NODE_SHUTDOWN;
    break;
  }
  return 4999;
}

static int noOfErrorCodes = sizeof(errorTable) / sizeof(ErrorItem);

int 
MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const 
{
  int count = 0;
  NodeId nodeId = 0;

  while (getNextNodeId(&nodeId, type)) {
    count++;
  }
  return count;
}

int 
MgmtSrvr::getPort() const
{
  if(NdbMutex_Lock(m_configMutex))
    return 0;

  ndb_mgm_configuration_iterator 
    iter(* _config->m_configValues, CFG_SECTION_NODE);

  if(iter.find(CFG_NODE_ID, getOwnNodeId()) != 0){
    ndbout << "Could not retrieve configuration for Node " 
	   << getOwnNodeId() << " in config file." << endl 
	   << "Have you set correct NodeId for this node?" << endl;
    NdbMutex_Unlock(m_configMutex);
    return 0;
  }

  unsigned type;
  if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0 ||
     type != NODE_TYPE_MGM){
    ndbout << "Local node id " << getOwnNodeId()
	   << " is not defined as management server" << endl
	   << "Have you set correct NodeId for this node?" << endl;
    NdbMutex_Unlock(m_configMutex);
    return 0;
  }
  
  Uint32 port = 0;
  if(iter.get(CFG_MGM_PORT, &port) != 0){
    ndbout << "Could not find PortNumber in the configuration file." << endl;
    NdbMutex_Unlock(m_configMutex);
    return 0;
  }

  NdbMutex_Unlock(m_configMutex);

  return port;
}

/* Constructor */
int MgmtSrvr::init()
{
  if ( _ownNodeId > 0)
    return 0;
  return -1;
}

MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
		   const char *config_filename,
		   const char *connect_string) :
  _blockNumber(1), // Hard coded block number since it makes it easy to send
                   // signals to other management servers.
  m_socket_server(socket_server),
  _ownReference(0),
  theSignalIdleList(NULL),
  theWaitState(WAIT_SUBSCRIBE_CONF),
  m_local_mgm_handle(0),
  m_event_listner(this),
  m_master_node(0)
{
    
  DBUG_ENTER("MgmtSrvr::MgmtSrvr");

  _ownNodeId= 0;

  _config     = NULL;

  _isStopThread        = false;
  _logLevelThread      = NULL;
  _logLevelThreadSleep = 500;

  theFacade = 0;

  m_newConfig = NULL;
  if (config_filename)
    m_configFilename.assign(config_filename);

  m_nextConfigGenerationNumber = 0;

  m_config_retriever= new ConfigRetriever(connect_string,
					  NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
  // if connect_string explicitly given or
  // no config filename is given then
  // first try to allocate nodeid from another management server
  if ((connect_string || config_filename == NULL) &&
      (m_config_retriever->do_connect(0,0,0) == 0))
  {
    int tmp_nodeid= 0;
    tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
    if (tmp_nodeid == 0)
    {
      ndbout_c(m_config_retriever->getErrorString());
      require(false);
    }
    // read config from other managent server
    _config= fetchConfig();
    if (_config == 0)
    {
      ndbout << m_config_retriever->getErrorString() << endl;
      require(false);
    }
    _ownNodeId= tmp_nodeid;
  }

  if (_ownNodeId == 0)
  {
    // read config locally
    _config= readConfig();
    if (_config == 0) {
      ndbout << "Unable to read config file" << endl;
      exit(-1);
    }
  }

  theMgmtWaitForResponseCondPtr = NdbCondition_Create();

  m_configMutex = NdbMutex_Create();

  /**
   * Fill the nodeTypes array
   */
  for(Uint32 i = 0; i<MAX_NODES; i++) {
    nodeTypes[i] = (enum ndb_mgm_node_type)-1;
    m_connect_address[i].s_addr= 0;
  }

  {
    ndb_mgm_configuration_iterator
      iter(* _config->m_configValues, CFG_SECTION_NODE);

    for(iter.first(); iter.valid(); iter.next()){
      unsigned type, id;
      if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0)
	continue;
      
      if(iter.get(CFG_NODE_ID, &id) != 0)
	continue;
      
      MGM_REQUIRE(id < MAX_NODES);
      
      switch(type){
      case NODE_TYPE_DB:
	nodeTypes[id] = NDB_MGM_NODE_TYPE_NDB;
	break;
      case NODE_TYPE_API:
	nodeTypes[id] = NDB_MGM_NODE_TYPE_API;
	break;
      case NODE_TYPE_MGM:
	nodeTypes[id] = NDB_MGM_NODE_TYPE_MGM;
	break;
      default:
	break;
      }
    }
  }

  _props = NULL;
  BaseString error_string;

  if ((m_node_id_mutex = NdbMutex_Create()) == 0)
  {
    ndbout << "mutex creation failed line = " << __LINE__ << endl;
    require(false);
  }

  if (_ownNodeId == 0) // we did not get node id from other server
  {
    NodeId tmp= m_config_retriever->get_configuration_nodeid();

    if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
		       0, 0, error_string)){
      ndbout << "Unable to obtain requested nodeid: "
	     << error_string.c_str() << endl;
      require(false);
    }
    _ownNodeId = tmp;
  }

  {
    DBUG_PRINT("info", ("verifyConfig"));
    if (!m_config_retriever->verifyConfig(_config->m_configValues,
					  _ownNodeId))
    {
      ndbout << m_config_retriever->getErrorString() << endl;
      require(false);
    }
  }

  // Setup clusterlog as client[0] in m_event_listner
  {
    Ndb_mgmd_event_service::Event_listener se;
    se.m_socket = NDB_INVALID_SOCKET;
    for(size_t t = 0; t<LogLevel::LOGLEVEL_CATEGORIES; t++){
      se.m_logLevel.setLogLevel((LogLevel::EventCategory)t, 7);
    }
    se.m_logLevel.setLogLevel(LogLevel::llError, 15);
    se.m_logLevel.setLogLevel(LogLevel::llConnection, 8);
    se.m_logLevel.setLogLevel(LogLevel::llBackup, 15);
    m_event_listner.m_clients.push_back(se);
    m_event_listner.m_logLevel = se.m_logLevel;
  }
  
  DBUG_VOID_RETURN;
}


//****************************************************************************
//****************************************************************************
bool 
MgmtSrvr::check_start() 
{
  if (_config == 0) {
    DEBUG("MgmtSrvr.cpp: _config is NULL.");
    return false;
  }

  return true;
}

bool 
MgmtSrvr::start(BaseString &error_string)
{
  int mgm_connect_result;

  DBUG_ENTER("MgmtSrvr::start");
  if (_props == NULL) {
    if (!check_start()) {
      error_string.append("MgmtSrvr.cpp: check_start() failed.");
      DBUG_RETURN(false);
    }
  }
  theFacade= new TransporterFacade();
  
  if(theFacade == 0) {
    DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
    error_string.append("MgmtSrvr.cpp: theFacade is NULL.");
    DBUG_RETURN(false);
  }  
  if ( theFacade->start_instance
       (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
    DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
    DBUG_RETURN(false);
  }

  MGM_REQUIRE(_blockNumber == 1);

  // Register ourself at TransporterFacade to be able to receive signals
  // and to be notified when a database process has died.
  _blockNumber = theFacade->open(this,
				 signalReceivedNotification,
				 nodeStatusNotification);
  
  if(_blockNumber == -1){
    DEBUG("MgmtSrvr.cpp: _blockNumber is -1.");
    error_string.append("MgmtSrvr.cpp: _blockNumber is -1.");
    theFacade->stop_instance();
    theFacade = 0;
    DBUG_RETURN(false);
  }

  if((mgm_connect_result= connect_to_self()) < 0)
  {
    ndbout_c("Unable to connect to our own ndb_mgmd (Error %d)",
             mgm_connect_result);
    ndbout_c("This is probably a bug.");
  }

  TransporterRegistry *reg = theFacade->get_registry();
  for(unsigned int i=0;i<reg->m_transporter_interface.size();i++) {
    BaseString msg;
    DBUG_PRINT("info",("Setting dynamic port %d->%d : %d",
		       reg->get_localNodeId(),
		       reg->m_transporter_interface[i].m_remote_nodeId,
		       reg->m_transporter_interface[i].m_s_service_port
		       )
	       );
    int res = setConnectionDbParameter((int)reg->get_localNodeId(),
				       (int)reg->m_transporter_interface[i]
				            .m_remote_nodeId,
				       (int)CFG_CONNECTION_SERVER_PORT,
				       reg->m_transporter_interface[i]
				            .m_s_service_port,
					 msg);
    DBUG_PRINT("info",("Set result: %d: %s",res,msg.c_str()));
  }

  _ownReference = numberToRef(_blockNumber, _ownNodeId);
  
  startEventLog();
  // Set the initial confirmation count for subscribe requests confirm
  // from NDB nodes in the cluster.
  //
  // Loglevel thread
  _logLevelThread = NdbThread_Create(logLevelThread_C,
				     (void**)this,
				     32768,
				     "MgmtSrvr_Loglevel",
				     NDB_THREAD_PRIO_LOW);

  DBUG_RETURN(true);
}


//****************************************************************************
//****************************************************************************
MgmtSrvr::~MgmtSrvr() 
{
  if(theFacade != 0){
    theFacade->stop_instance();
    delete theFacade;
    theFacade = 0;
  }

  stopEventLog();

  NdbMutex_Destroy(m_node_id_mutex);
  NdbCondition_Destroy(theMgmtWaitForResponseCondPtr);
  NdbMutex_Destroy(m_configMutex);

  if(m_newConfig != NULL)
    free(m_newConfig);

  if(_config != NULL)
    delete _config;

  // End set log level thread
  void* res = 0;
  _isStopThread = true;

  if (_logLevelThread != NULL) {
    NdbThread_WaitFor(_logLevelThread, &res);
    NdbThread_Destroy(&_logLevelThread);
  }

  if (m_config_retriever)
    delete m_config_retriever;
}

//****************************************************************************
//****************************************************************************

int MgmtSrvr::okToSendTo(NodeId nodeId, bool unCond) 
{
  if(nodeId == 0 || getNodeType(nodeId) != NDB_MGM_NODE_TYPE_NDB)
    return WRONG_PROCESS_TYPE;
  // Check if we have contact with it
  if(unCond){
    if(theFacade->theClusterMgr->getNodeInfo(nodeId).connected)
      return 0;
  }
  else if (theFacade->get_node_alive(nodeId) == true)
    return 0;
  return NO_CONTACT_WITH_PROCESS;
}

void report_unknown_signal(SimpleSignal *signal)
{
  g_eventLogger.error("Unknown signal received. SignalNumber: "
		      "%i from (%d, %x)",
		      signal->readSignalNumber(),
		      refToNode(signal->header.theSendersBlockRef),
		      refToBlock(signal->header.theSendersBlockRef));
}

/*****************************************************************************
 * Starting and stopping database nodes
 ****************************************************************************/

int 
MgmtSrvr::start(int nodeId)
{
  INIT_SIGNAL_SENDER(ss,nodeId);
  
  SimpleSignal ssig;
  StartOrd* const startOrd = CAST_PTR(StartOrd, ssig.getDataPtrSend());
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_START_ORD, StartOrd::SignalLength);
  startOrd->restartInfo = 0;
  
  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}

/*****************************************************************************
 * Version handling
 *****************************************************************************/

int 
MgmtSrvr::versionNode(int nodeId, Uint32 &version, const char **address)
{
  version= 0;
  if (getOwnNodeId() == nodeId)
  {
    /**
     * If we're inquiring about our own node id,
     * We know what version we are (version implies connected for mgm)
     * but would like to find out from elsewhere what address they're using
     * to connect to us. This means that secondary mgm servers
     * can list ip addresses for mgm servers.
     *
     * If we don't get an address (i.e. no db nodes),
     * we get the address from the configuration.
     */
    sendVersionReq(nodeId, version, address);
    version= NDB_VERSION;
    if(!*address)
    {
      ndb_mgm_configuration_iterator
	iter(*_config->m_configValues, CFG_SECTION_NODE);
      unsigned tmp= 0;
      for(iter.first();iter.valid();iter.next())
      {
	if(iter.get(CFG_NODE_ID, &tmp)) require(false);
	if((unsigned)nodeId!=tmp)
	  continue;
	if(iter.get(CFG_NODE_HOST, address)) require(false);
	break;
      }
    }
  }
  else if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_NDB)
  {
    ClusterMgr::Node node= theFacade->theClusterMgr->getNodeInfo(nodeId);
    if(node.connected)
      version= node.m_info.m_version;
    *address= get_connect_address(nodeId);
  }
  else if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_API ||
	   getNodeType(nodeId) == NDB_MGM_NODE_TYPE_MGM)
  {
    return sendVersionReq(nodeId, version, address);
  }

  return 0;
}

int 
MgmtSrvr::sendVersionReq(int v_nodeId, Uint32 &version, const char **address)
{
  SignalSender ss(theFacade);
  ss.lock();

  SimpleSignal ssig;
  ApiVersionReq* req = CAST_PTR(ApiVersionReq, ssig.getDataPtrSend());
  req->senderRef = ss.getOwnRef();
  req->nodeId = v_nodeId;
  ssig.set(ss, TestOrd::TraceAPI, QMGR, GSN_API_VERSION_REQ, 
	   ApiVersionReq::SignalLength);

  int do_send = 1;
  NodeId nodeId;

  while (1)
  {
    if (do_send)
    {
      bool next;
      nodeId = 0;

      while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
	    okToSendTo(nodeId, true) != 0);

      const ClusterMgr::Node &node=
	theFacade->theClusterMgr->getNodeInfo(nodeId);
      if(next && node.m_state.startLevel != NodeState::SL_STARTED)
      {
	NodeId tmp=nodeId;
	while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
	      okToSendTo(nodeId, true) != 0);
	if(!next)
	  nodeId= tmp;
      }

      if(!next) return NO_CONTACT_WITH_DB_NODES;

      if (ss.sendSignal(nodeId, &ssig) != SEND_OK) {
	return SEND_OR_RECEIVE_FAILED;
      }
      do_send = 0;
    }

    SimpleSignal *signal = ss.waitFor();

    int gsn = signal->readSignalNumber();
    switch (gsn) {
    case GSN_API_VERSION_CONF: {
      const ApiVersionConf * const conf = 
	CAST_CONSTPTR(ApiVersionConf, signal->getDataPtr());
      assert(conf->nodeId == v_nodeId);
      version = conf->version;
      struct in_addr in;
      in.s_addr= conf->inet_addr;
      *address= inet_ntoa(in);
      return 0;
    }
    case GSN_NF_COMPLETEREP:{
      const NFCompleteRep * const rep =
	CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
      if (rep->failedNodeId == nodeId)
	do_send = 1; // retry with other node
      continue;
    }
    case GSN_NODE_FAILREP:{
      const NodeFailRep * const rep =
	CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
      if (NodeBitmask::get(rep->theNodes,nodeId))
	do_send = 1; // retry with other node
      continue;
    }
    default:
      report_unknown_signal(signal);
      return SEND_OR_RECEIVE_FAILED;
    }
    break;
  } // while(1)

  return 0;
}

int MgmtSrvr::sendStopMgmd(NodeId nodeId,
			   bool abort,
			   bool stop,
			   bool restart,
			   bool nostart,
			   bool initialStart)
{
  const char* hostname;
  Uint32 port;
  BaseString connect_string;

  {
    Guard g(m_configMutex);
    {
      ndb_mgm_configuration_iterator
        iter(* _config->m_configValues, CFG_SECTION_NODE);

      if(iter.first())                       return SEND_OR_RECEIVE_FAILED;
      if(iter.find(CFG_NODE_ID, nodeId))     return SEND_OR_RECEIVE_FAILED;
      if(iter.get(CFG_NODE_HOST, &hostname)) return SEND_OR_RECEIVE_FAILED;
    }
    {
      ndb_mgm_configuration_iterator
        iter(* _config->m_configValues, CFG_SECTION_NODE);

      if(iter.first())                   return SEND_OR_RECEIVE_FAILED;
      if(iter.find(CFG_NODE_ID, nodeId)) return SEND_OR_RECEIVE_FAILED;
      if(iter.get(CFG_MGM_PORT, &port))  return SEND_OR_RECEIVE_FAILED;
    }
    if( strlen(hostname) == 0 )
      return SEND_OR_RECEIVE_FAILED;
  }
  connect_string.assfmt("%s:%u",hostname,port);

  DBUG_PRINT("info",("connect string: %s",connect_string.c_str()));

  NdbMgmHandle h= ndb_mgm_create_handle();
  if ( h && connect_string.length() > 0 )
  {
    ndb_mgm_set_connectstring(h,connect_string.c_str());
    if(ndb_mgm_connect(h,1,0,0))
    {
      DBUG_PRINT("info",("failed ndb_mgm_connect"));
      return SEND_OR_RECEIVE_FAILED;
    }
    if(!restart)
    {
      if(ndb_mgm_stop(h, 1, (const int*)&nodeId) < 0)
      {
        return SEND_OR_RECEIVE_FAILED;
      }
    }
    else
    {
      int nodes[1];
      nodes[0]= (int)nodeId;
      if(ndb_mgm_restart2(h, 1, nodes, initialStart, nostart, abort) < 0)
      {
        return SEND_OR_RECEIVE_FAILED;
      }
    }
  }
  ndb_mgm_destroy_handle(&h);

  return 0;
}

/*
 * Common method for handeling all STOP_REQ signalling that
 * is used by Stopping, Restarting and Single user commands
 *
 * In the event that we need to stop a mgmd, we create a mgm
 * client connection to that mgmd and stop it that way.
 * This allows us to stop mgm servers when there isn't any real
 * distributed communication up.
 */

int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids,
			   NodeBitmask &stoppedNodes,
			   Uint32 singleUserNodeId,
			   bool abort,
			   bool stop,
			   bool restart,
			   bool nostart,
			   bool initialStart)
{
  int error = 0;
  DBUG_ENTER("MgmtSrvr::sendSTOP_REQ");
  DBUG_PRINT("enter", ("no of nodes: %d  singleUseNodeId: %d  "
                       "abort: %d  stop: %d  restart: %d  "
                       "nostart: %d  initialStart: %d",
                       node_ids.size(), singleUserNodeId,
                       abort, stop, restart, nostart, initialStart));

  stoppedNodes.clear();

  SignalSender ss(theFacade);
  ss.lock(); // lock will be released on exit

  SimpleSignal ssig;
  StopReq* const stopReq = CAST_PTR(StopReq, ssig.getDataPtrSend());
  ssig.set(ss, TestOrd::TraceAPI, NDBCNTR, GSN_STOP_REQ, StopReq::SignalLength);

  stopReq->requestInfo = 0;
  stopReq->apiTimeout = 5000;
  stopReq->transactionTimeout = 1000;
  stopReq->readOperationTimeout = 1000;
  stopReq->operationTimeout = 1000;
  stopReq->senderData = 12;
  stopReq->senderRef = ss.getOwnRef();
  if (singleUserNodeId)
  {
    stopReq->singleuser = 1;
    stopReq->singleUserApi = singleUserNodeId;
    StopReq::setSystemStop(stopReq->requestInfo, false);
    StopReq::setPerformRestart(stopReq->requestInfo, false);
    StopReq::setStopAbort(stopReq->requestInfo, false);
  }
  else
  {
    stopReq->singleuser = 0;
    StopReq::setSystemStop(stopReq->requestInfo, stop);
    StopReq::setPerformRestart(stopReq->requestInfo, restart);
    StopReq::setStopAbort(stopReq->requestInfo, abort);
    StopReq::setNoStart(stopReq->requestInfo, nostart);
    StopReq::setInitialStart(stopReq->requestInfo, initialStart);
  }

  // send the signals
  NodeBitmask nodes;
  NodeId nodeId= 0;
  int use_master_node= 0;
  int do_send= 0;
  int do_stop_self= 0;
  NdbNodeBitmask nodes_to_stop;
  {
    for (unsigned i= 0; i < node_ids.size(); i++)
    {
      nodeId= node_ids[i];
      if (getNodeType(nodeId) != NDB_MGM_NODE_TYPE_MGM)
        nodes_to_stop.set(nodeId);
      else if (nodeId != getOwnNodeId())
      {
        error= sendStopMgmd(nodeId, abort, stop, restart,
                            nostart, initialStart);
        if (error == 0)
          stoppedNodes.set(nodeId);
      }
      else
        do_stop_self= 1;;
    }
  }
  int no_of_nodes_to_stop= nodes_to_stop.count();
  if (node_ids.size())
  {
    if (no_of_nodes_to_stop)
    {
      do_send= 1;
      if (no_of_nodes_to_stop == 1)
      {
        nodeId= nodes_to_stop.find(0);
      }
      else // multi node stop, send to master
      {
        use_master_node= 1;
        nodes_to_stop.copyto(NdbNodeBitmask::Size, stopReq->nodes);
        StopReq::setStopNodes(stopReq->requestInfo, 1);
      }
    }
  }
  else
  {
    nodeId= 0;
    while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB))
    {
      if(okToSendTo(nodeId, true) == 0)
      {
	SendStatus result = ss.sendSignal(nodeId, &ssig);
	if (result == SEND_OK)
	  nodes.set(nodeId);
      }
    }
    nodeId= 0;
    while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_MGM))
    {
      if(nodeId==getOwnNodeId())
        continue;
      if(sendStopMgmd(nodeId, abort, stop, restart, nostart, initialStart)==0)
        stoppedNodes.set(nodeId);
    }
  }

  // now wait for the replies
  while (!nodes.isclear() || do_send)
  {
    if (do_send)
    {
      int r;
      assert(nodes.count() == 0);
      if (use_master_node)
        nodeId= m_master_node;
      if ((r= okToSendTo(nodeId, true)) != 0)
      {
        bool next;
        if (!use_master_node)
          DBUG_RETURN(r);
        m_master_node= nodeId= 0;
        while((next= getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
              (r= okToSendTo(nodeId, true)) != 0);
        if (!next)
          DBUG_RETURN(NO_CONTACT_WITH_DB_NODES);
      }
      if (ss.sendSignal(nodeId, &ssig) != SEND_OK)
        DBUG_RETURN(SEND_OR_RECEIVE_FAILED);
      nodes.set(nodeId);
      do_send= 0;
    }
    SimpleSignal *signal = ss.waitFor();
    int gsn = signal->readSignalNumber();
    switch (gsn) {
    case GSN_STOP_REF:{
      const StopRef * const ref = CAST_CONSTPTR(StopRef, signal->getDataPtr());
      const NodeId nodeId = refToNode(signal->header.theSendersBlockRef);
#ifdef VM_TRACE
      ndbout_c("Node %d refused stop", nodeId);
#endif
      assert(nodes.get(nodeId));
      nodes.clear(nodeId);
      if (ref->errorCode == StopRef::MultiNodeShutdownNotMaster)
      {
        assert(use_master_node);
        m_master_node= ref->masterNodeId;
        do_send= 1;
        continue;
      }
      error = translateStopRef(ref->errorCode);
      break;
    }
    case GSN_STOP_CONF:{
      const StopConf * const ref = CAST_CONSTPTR(StopConf, signal->getDataPtr());
      const NodeId nodeId = refToNode(signal->header.theSendersBlockRef);
#ifdef VM_TRACE
      ndbout_c("Node %d single user mode", nodeId);
#endif
      assert(nodes.get(nodeId));
      if (singleUserNodeId != 0)
      {
        stoppedNodes.set(nodeId);
      }
      else
      {
        assert(no_of_nodes_to_stop > 1);
        stoppedNodes.bitOR(nodes_to_stop);
      }
      nodes.clear(nodeId);
      break;
    }
    case GSN_NF_COMPLETEREP:{
      const NFCompleteRep * const rep =
	CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
      ndbout_c("Node %d fail completed", rep->failedNodeId);
#endif
      break;
    }
    case GSN_NODE_FAILREP:{
      const NodeFailRep * const rep =
	CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
      NodeBitmask failedNodes;
      failedNodes.assign(NodeBitmask::Size, rep->theNodes);
#ifdef VM_TRACE
      {
	ndbout << "Failed nodes:";
	for (unsigned i = 0; i < 32*NodeBitmask::Size; i++)
	  if(failedNodes.get(i))
	    ndbout << " " << i;
	ndbout << endl;
      }
#endif
      failedNodes.bitAND(nodes);
      if (!failedNodes.isclear())
      {
	nodes.bitANDC(failedNodes); // clear the failed nodes
	if (singleUserNodeId == 0)
	  stoppedNodes.bitOR(failedNodes);
      }
      break;
    }
    default:
      report_unknown_signal(signal);
#ifdef VM_TRACE
      ndbout_c("Unknown signal %d", gsn);
#endif
      DBUG_RETURN(SEND_OR_RECEIVE_FAILED);
    }
  }
  if (!error && do_stop_self)
  {
    if (restart)
      g_RestartServer= true;
    g_StopServer= true;
  }
  DBUG_RETURN(error);
}

/*
 * Stop one nodes
 */

int MgmtSrvr::stopNodes(const Vector<NodeId> &node_ids,
                        int *stopCount, bool abort)
{
  if (!abort)
  {
    NodeId nodeId = 0;
    ClusterMgr::Node node;
    while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB))
    {
      node = theFacade->theClusterMgr->getNodeInfo(nodeId);
      if((node.m_state.startLevel != NodeState::SL_STARTED) && 
	 (node.m_state.startLevel != NodeState::SL_NOTHING))
	return OPERATION_NOT_ALLOWED_START_STOP;
    }
  }
  NodeBitmask nodes;
  int ret= sendSTOP_REQ(node_ids,
                        nodes,
                        0,
                        abort,
                        false,
                        false,
                        false,
                        false);
  if (stopCount)
    *stopCount= nodes.count();
  return ret;
}

/*
 * Perform system shutdown
 */

int MgmtSrvr::stop(int * stopCount, bool abort)
{
  NodeBitmask nodes;
  Vector<NodeId> node_ids;
  int ret = sendSTOP_REQ(node_ids,
			 nodes,
			 0,
			 abort,
			 true,
			 false,
			 false,
			 false);
  if (stopCount)
    *stopCount = nodes.count();
  return ret;
}

/*
 * Enter single user mode on all live nodes
 */

int MgmtSrvr::enterSingleUser(int * stopCount, Uint32 singleUserNodeId)
{
  if (getNodeType(singleUserNodeId) != NDB_MGM_NODE_TYPE_API)
    return NODE_NOT_API_NODE;
  NodeId nodeId = 0;
  ClusterMgr::Node node;
  while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB))
  {
    node = theFacade->theClusterMgr->getNodeInfo(nodeId);
    if((node.m_state.startLevel != NodeState::SL_STARTED) && 
       (node.m_state.startLevel != NodeState::SL_NOTHING))
      return OPERATION_NOT_ALLOWED_START_STOP;
  }
  NodeBitmask nodes;
  Vector<NodeId> node_ids;
  int ret = sendSTOP_REQ(node_ids,
			 nodes,
			 singleUserNodeId,
			 false,
			 false,
			 false,
			 false,
			 false);
  if (stopCount)
    *stopCount = nodes.count();
  return ret;
}

/*
 * Perform node restart
 */

int MgmtSrvr::restartNodes(const Vector<NodeId> &node_ids,
                           int * stopCount, bool nostart,
                           bool initialStart, bool abort)
{
  NodeBitmask nodes;
  int ret= sendSTOP_REQ(node_ids,
                        nodes,
                        0,
                        abort,
                        false,
                        true,
                        nostart,
                        initialStart);
  if (stopCount)
    *stopCount = nodes.count();
  return ret;
}

/*
 * Perform system restart
 */

int MgmtSrvr::restart(bool nostart, bool initialStart, 
		      bool abort, int * stopCount )
{
  NodeBitmask nodes;
  Vector<NodeId> node_ids;
  int ret = sendSTOP_REQ(node_ids,
			 nodes,
			 0,
			 abort,
			 true,
			 true,
			 true,
			 initialStart);

  if (ret)
    return ret;

  if (stopCount)
    *stopCount = nodes.count();

#ifdef VM_TRACE
    ndbout_c("Stopped %d nodes", nodes.count());
#endif
  /**
   * Here all nodes were correctly stopped,
   * so we wait for all nodes to be contactable
   */
  int waitTime = 12000;
  NodeId nodeId = 0;
  NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;

  ndbout_c(" %d", nodes.get(1));
  ndbout_c(" %d", nodes.get(2));

  while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) {
    if (!nodes.get(nodeId))
      continue;
    enum ndb_mgm_node_status s;
    s = NDB_MGM_NODE_STATUS_NO_CONTACT;
#ifdef VM_TRACE
    ndbout_c("Waiting for %d not started", nodeId);
#endif
    while (s != NDB_MGM_NODE_STATUS_NOT_STARTED && waitTime > 0) {
      Uint32 startPhase = 0, version = 0, dynamicId = 0, nodeGroup = 0;
      Uint32 connectCount = 0;
      bool system;
      const char *address;
      status(nodeId, &s, &version, &startPhase, 
	     &system, &dynamicId, &nodeGroup, &connectCount, &address);
      NdbSleep_MilliSleep(100);  
      waitTime = (maxTime - NdbTick_CurrentMillisecond());
    }
  }
  
  if(nostart)
    return 0;
  
  /**
   * Now we start all database nodes (i.e. we make them non-idle)
   * We ignore the result we get from the start command.
   */
  nodeId = 0;
  while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) {
    if (!nodes.get(nodeId))
      continue;
    int result;
    result = start(nodeId);
    DEBUG("Starting node " << nodeId << " with result " << result);
    /**
     * Errors from this call are deliberately ignored.
     * Maybe the user only wanted to restart a subset of the nodes.
     * It is also easy for the user to check which nodes have 
     * started and which nodes have not.
     */
  }
  
  return 0;
}

int
MgmtSrvr::exitSingleUser(int * stopCount, bool abort)
{
  NodeId nodeId = 0;
  int count = 0;

  SignalSender ss(theFacade);
  ss.lock(); // lock will be released on exit

  SimpleSignal ssig;
  ResumeReq* const resumeReq = 
    CAST_PTR(ResumeReq, ssig.getDataPtrSend());
  ssig.set(ss,TestOrd::TraceAPI, NDBCNTR, GSN_RESUME_REQ, 
	   ResumeReq::SignalLength);
  resumeReq->senderData = 12;
  resumeReq->senderRef = ss.getOwnRef();

  while(getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)){
    if(okToSendTo(nodeId, true) == 0){
      SendStatus result = ss.sendSignal(nodeId, &ssig);
      if (result == SEND_OK)
	count++;
    }
  }

  if(stopCount != 0)
    * stopCount = count;

  return 0;
}

/*****************************************************************************
 * Status
 ****************************************************************************/

#include <ClusterMgr.hpp>

int 
MgmtSrvr::status(int nodeId, 
                 ndb_mgm_node_status * _status, 
		 Uint32 * version,
		 Uint32 * _phase, 
		 bool * _system,
		 Uint32 * dynamic,
		 Uint32 * nodegroup,
		 Uint32 * connectCount,
		 const char **address)
{
  if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_API ||
      getNodeType(nodeId) == NDB_MGM_NODE_TYPE_MGM) {
    versionNode(nodeId, *version, address);
  } else {
    *address= get_connect_address(nodeId);
  }

  const ClusterMgr::Node node = 
    theFacade->theClusterMgr->getNodeInfo(nodeId);

  if(!node.connected){
    * _status = NDB_MGM_NODE_STATUS_NO_CONTACT;
    return 0;
  }
  
  if (getNodeType(nodeId) == NDB_MGM_NODE_TYPE_NDB) {
    * version = node.m_info.m_version;
  }

  * dynamic = node.m_state.dynamicId;
  * nodegroup = node.m_state.nodeGroup;
  * connectCount = node.m_info.m_connectCount;
  
  switch(node.m_state.startLevel){
  case NodeState::SL_CMVMI:
    * _status = NDB_MGM_NODE_STATUS_NOT_STARTED;
    * _phase = 0;
    return 0;
    break;
  case NodeState::SL_STARTING:
    * _status     = NDB_MGM_NODE_STATUS_STARTING;
    * _phase = node.m_state.starting.startPhase;
    return 0;
    break;
  case NodeState::SL_STARTED:
    * _status = NDB_MGM_NODE_STATUS_STARTED;
    * _phase = 0;
    return 0;
    break;
  case NodeState::SL_STOPPING_1:
    * _status = NDB_MGM_NODE_STATUS_SHUTTING_DOWN;
    * _phase = 1;
    * _system = node.m_state.stopping.systemShutdown != 0;
    return 0;
    break;
  case NodeState::SL_STOPPING_2:
    * _status = NDB_MGM_NODE_STATUS_SHUTTING_DOWN;
    * _phase = 2;
    * _system = node.m_state.stopping.systemShutdown != 0;
    return 0;
    break;
  case NodeState::SL_STOPPING_3:
    * _status = NDB_MGM_NODE_STATUS_SHUTTING_DOWN;
    * _phase = 3;
    * _system = node.m_state.stopping.systemShutdown != 0;
    return 0;
    break;
  case NodeState::SL_STOPPING_4:
    * _status = NDB_MGM_NODE_STATUS_SHUTTING_DOWN;
    * _phase = 4;
    * _system = node.m_state.stopping.systemShutdown != 0;
    return 0;
    break;
  case NodeState::SL_SINGLEUSER:
    * _status = NDB_MGM_NODE_STATUS_SINGLEUSER;
    * _phase  = 0;
    return 0;
    break;
  default:
    * _status = NDB_MGM_NODE_STATUS_UNKNOWN;
    * _phase = 0;
    return 0;
  }
  
  return -1;
}

int 
MgmtSrvr::setEventReportingLevelImpl(int nodeId, 
				     const EventSubscribeReq& ll)
{
  INIT_SIGNAL_SENDER(ss,nodeId);

  SimpleSignal ssig;
  EventSubscribeReq * dst = 
    CAST_PTR(EventSubscribeReq, ssig.getDataPtrSend());
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_EVENT_SUBSCRIBE_REQ,
	   EventSubscribeReq::SignalLength);
  *dst = ll;

  send(ss,ssig,nodeId,NODE_TYPE_DB);

#if 0
  while (1)
  {
    SimpleSignal *signal = ss.waitFor();
    int gsn = signal->readSignalNumber();
    switch (gsn) { 
    case GSN_EVENT_SUBSCRIBE_CONF:{
      break;
    }
    case GSN_EVENT_SUBSCRIBE_REF:{
      return SEND_OR_RECEIVE_FAILED;
    }
    case GSN_NF_COMPLETEREP:{
      const NFCompleteRep * const rep =
	CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
      if (rep->failedNodeId == nodeId)
	return SEND_OR_RECEIVE_FAILED;
      break;
    }
    case GSN_NODE_FAILREP:{
      const NodeFailRep * const rep =
	CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
      if (NodeBitmask::get(rep->theNodes,nodeId))
	return SEND_OR_RECEIVE_FAILED;
      break;
    }
    default:
      report_unknown_signal(signal);
      return SEND_OR_RECEIVE_FAILED;
    }

  }
#endif
  return 0;
}

//****************************************************************************
//****************************************************************************
int 
MgmtSrvr::setNodeLogLevelImpl(int nodeId, const SetLogLevelOrd & ll)
{
  INIT_SIGNAL_SENDER(ss,nodeId);

  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_SET_LOGLEVELORD,
	   SetLogLevelOrd::SignalLength);
  SetLogLevelOrd* const dst = CAST_PTR(SetLogLevelOrd, ssig.getDataPtrSend());
  *dst = ll;
  
  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}

int
MgmtSrvr::send(SignalSender &ss, SimpleSignal &ssig, Uint32 node, Uint32 node_type){
  Uint32 max = (node == 0) ? MAX_NODES : node + 1;
  
  for(; node < max; node++){
    while(nodeTypes[node] != (int)node_type && node < max) node++;
    if(nodeTypes[node] != (int)node_type)
      break;
    ss.sendSignal(node, &ssig);
  }
  return 0;
}

//****************************************************************************
//****************************************************************************

int 
MgmtSrvr::insertError(int nodeId, int errorNo) 
{
  if (errorNo < 0) {
    return INVALID_ERROR_NUMBER;
  }

  INIT_SIGNAL_SENDER(ss,nodeId);
  
  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_TAMPER_ORD, 
	   TamperOrd::SignalLength);
  TamperOrd* const tamperOrd = CAST_PTR(TamperOrd, ssig.getDataPtrSend());
  tamperOrd->errorNo = errorNo;

  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}



//****************************************************************************
//****************************************************************************

int 
MgmtSrvr::setTraceNo(int nodeId, int traceNo)
{
  if (traceNo < 0) {
    return INVALID_TRACE_NUMBER;
  }

  INIT_SIGNAL_SENDER(ss,nodeId);

  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_TEST_ORD, TestOrd::SignalLength);
  TestOrd* const testOrd = CAST_PTR(TestOrd, ssig.getDataPtrSend());
  testOrd->clear();
  // Assume TRACE command causes toggling. Not really defined... ? TODO
  testOrd->setTraceCommand(TestOrd::Toggle, 
			   (TestOrd::TraceSpecification)traceNo);

  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}

//****************************************************************************
//****************************************************************************

int 
MgmtSrvr::getBlockNumber(const BaseString &blockName) 
{
  short bno = getBlockNo(blockName.c_str());
  if(bno != 0)
    return bno;
  return -1;
}

//****************************************************************************
//****************************************************************************

int 
MgmtSrvr::setSignalLoggingMode(int nodeId, LogMode mode, 
			       const Vector<BaseString>& blocks)
{
  INIT_SIGNAL_SENDER(ss,nodeId);

  // Convert from MgmtSrvr format...

  TestOrd::Command command;
  if (mode == Off) {
    command = TestOrd::Off;
  }
  else {
    command = TestOrd::On;
  }

  TestOrd::SignalLoggerSpecification logSpec;
  switch (mode) {
  case In:
    logSpec = TestOrd::InputSignals;
    break;
  case Out:
    logSpec = TestOrd::OutputSignals;
    break;
  case InOut:
    logSpec = TestOrd::InputOutputSignals;
    break;
  case Off:
    // In MgmtSrvr interface it's just possible to switch off all logging, both
    // "in" and "out" (this should probably be changed).
    logSpec = TestOrd::InputOutputSignals;
    break;
  default:
    ndbout_c("Unexpected value %d, MgmtSrvr::setSignalLoggingMode, line %d",
	     (unsigned)mode, __LINE__);
    assert(false);
    return -1;
  }

  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_TEST_ORD, TestOrd::SignalLength);

  TestOrd* const testOrd = CAST_PTR(TestOrd, ssig.getDataPtrSend());
  testOrd->clear();
  
  if (blocks.size() == 0 || blocks[0] == "ALL") {
    // Logg command for all blocks
    testOrd->addSignalLoggerCommand(command, logSpec);
  } else {
    for(unsigned i = 0; i < blocks.size(); i++){
      int blockNumber = getBlockNumber(blocks[i]);
      if (blockNumber == -1) {
        return INVALID_BLOCK_NAME;
      }
      testOrd->addSignalLoggerCommand(blockNumber, command, logSpec);
    } // for
  } // else

  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}

/*****************************************************************************
 * Signal tracing
 *****************************************************************************/
int MgmtSrvr::startSignalTracing(int nodeId)
{
  INIT_SIGNAL_SENDER(ss,nodeId);
  
  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_TEST_ORD, TestOrd::SignalLength);

  TestOrd* const testOrd = CAST_PTR(TestOrd, ssig.getDataPtrSend());
  testOrd->clear();
  testOrd->setTestCommand(TestOrd::On);

  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}

int 
MgmtSrvr::stopSignalTracing(int nodeId) 
{
  INIT_SIGNAL_SENDER(ss,nodeId);

  SimpleSignal ssig;
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_TEST_ORD, TestOrd::SignalLength);
  TestOrd* const testOrd = CAST_PTR(TestOrd, ssig.getDataPtrSend());
  testOrd->clear();
  testOrd->setTestCommand(TestOrd::Off);

  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}


/*****************************************************************************
 * Dump state
 *****************************************************************************/

int
MgmtSrvr::dumpState(int nodeId, const char* args)
{
  // Convert the space separeted args 
  // string to an int array
  Uint32 args_array[25];
  Uint32 numArgs = 0;

  char buf[10];  
  int b  = 0;
  memset(buf, 0, 10);
  for (size_t i = 0; i <= strlen(args); i++){
    if (args[i] == ' ' || args[i] == 0){
      args_array[numArgs] = atoi(buf);
      numArgs++;
      memset(buf, 0, 10);
      b = 0;
    } else {
      buf[b] = args[i];
      b++;
    }    
  }
  
  return dumpState(nodeId, args_array, numArgs);
}

int
MgmtSrvr::dumpState(int nodeId, const Uint32 args[], Uint32 no)
{
  INIT_SIGNAL_SENDER(ss,nodeId);

  const Uint32 len = no > 25 ? 25 : no;
  
  SimpleSignal ssig;
  DumpStateOrd * const dumpOrd = 
    CAST_PTR(DumpStateOrd, ssig.getDataPtrSend());
  ssig.set(ss,TestOrd::TraceAPI, CMVMI, GSN_DUMP_STATE_ORD, len);
  for(Uint32 i = 0; i<25; i++){
    if (i < len)
      dumpOrd->args[i] = args[i];
    else
      dumpOrd->args[i] = 0;
  }
  
  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}


//****************************************************************************
//****************************************************************************

const char* MgmtSrvr::getErrorText(int errorCode, char *buf, int buf_sz)
{

  for (int i = 0; i < noOfErrorCodes; ++i) {
    if (errorCode == errorTable[i]._errorCode) {
      BaseString::snprintf(buf, buf_sz, errorTable[i]._errorText);
      buf[buf_sz-1]= 0;
      return buf;
    }
  }

  ndb_error_string(errorCode, buf, buf_sz);
  buf[buf_sz-1]= 0;

  return buf;
}

void 
MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
{
  // The way of handling a received signal is taken from the Ndb class.
  int gsn = signal->readSignalNumber();

  switch (gsn) {
  case GSN_EVENT_SUBSCRIBE_CONF:
    break;
  case GSN_EVENT_SUBSCRIBE_REF:
    break;
  case GSN_EVENT_REP:
  {
    eventReport(signal->getDataPtr());
    break;
  }

  case GSN_NF_COMPLETEREP:
    break;
  case GSN_NODE_FAILREP:
    break;

  default:
    g_eventLogger.error("Unknown signal received. SignalNumber: "
			"%i from (%d, %x)",
			gsn,
			refToNode(signal->theSendersBlockRef),
			refToBlock(signal->theSendersBlockRef));
  }
  
  if (theWaitState == NO_WAIT) {
    NdbCondition_Signal(theMgmtWaitForResponseCondPtr);
  }
}

void
MgmtSrvr::handleStatus(NodeId nodeId, bool alive, bool nfComplete)
{
  DBUG_ENTER("MgmtSrvr::handleStatus");
  Uint32 theData[25];
  EventReport *rep = (EventReport *)theData;

  theData[1] = nodeId;
  if (alive) {
    m_started_nodes.push_back(nodeId);
    rep->setEventType(NDB_LE_Connected);
  } else {
    rep->setEventType(NDB_LE_Connected);
    if(nfComplete)
    {
      DBUG_VOID_RETURN;
    }
  }
  rep->setNodeId(_ownNodeId);
  eventReport(theData);
  DBUG_VOID_RETURN;
}

//****************************************************************************
//****************************************************************************

void 
MgmtSrvr::signalReceivedNotification(void* mgmtSrvr, 
                                     NdbApiSignal* signal,
				     LinearSectionPtr ptr[3]) 
{
  ((MgmtSrvr*)mgmtSrvr)->handleReceivedSignal(signal);
}


//****************************************************************************
//****************************************************************************
void 
MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId, 
				 bool alive, bool nfComplete)
{
  DBUG_ENTER("MgmtSrvr::nodeStatusNotification");
  DBUG_PRINT("enter",("nodeid= %d, alive= %d, nfComplete= %d", nodeId, alive, nfComplete));
  ((MgmtSrvr*)mgmSrv)->handleStatus(nodeId, alive, nfComplete);
  DBUG_VOID_RETURN;
}

enum ndb_mgm_node_type 
MgmtSrvr::getNodeType(NodeId nodeId) const 
{
  if(nodeId >= MAX_NODES)
    return (enum ndb_mgm_node_type)-1;
  
  return nodeTypes[nodeId];
}

const char *MgmtSrvr::get_connect_address(Uint32 node_id)
{
  if (m_connect_address[node_id].s_addr == 0 &&
      theFacade && theFacade->theTransporterRegistry &&
      theFacade->theClusterMgr &&
      getNodeType(node_id) == NDB_MGM_NODE_TYPE_NDB) 
  {
    const ClusterMgr::Node &node=
      theFacade->theClusterMgr->getNodeInfo(node_id);
    if (node.connected)
    {
      m_connect_address[node_id]=
	theFacade->theTransporterRegistry->get_connect_address(node_id);
    }
  }
  return inet_ntoa(m_connect_address[node_id]);  
}

void
MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
{
  if (theFacade && theFacade->theClusterMgr) 
  {
    for(Uint32 i = 0; i < MAX_NODES; i++)
    {
      if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB)
      {
	const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
	connected_nodes.bitOR(node.m_state.m_connected_nodes);
      }
    }
  }
}

int
MgmtSrvr::alloc_node_id_req(Uint32 free_node_id)
{
  SignalSender ss(theFacade);
  ss.lock(); // lock will be released on exit

  SimpleSignal ssig;
  AllocNodeIdReq* req = CAST_PTR(AllocNodeIdReq, ssig.getDataPtrSend());
  ssig.set(ss, TestOrd::TraceAPI, QMGR, GSN_ALLOC_NODEID_REQ,
	   AllocNodeIdReq::SignalLength);
  
  req->senderRef = ss.getOwnRef();
  req->senderData = 19;
  req->nodeId = free_node_id;

  int do_send = 1;
  NodeId nodeId = 0;
  while (1)
  {
    if (nodeId == 0)
    {
      bool next;
      while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
            theFacade->get_node_alive(nodeId) == false);
      if (!next)
        return NO_CONTACT_WITH_DB_NODES;
      do_send = 1;
    }
    if (do_send)
    {
      if (ss.sendSignal(nodeId, &ssig) != SEND_OK) {
        return SEND_OR_RECEIVE_FAILED;
      }
      do_send = 0;
    }
    
    SimpleSignal *signal = ss.waitFor();

    int gsn = signal->readSignalNumber();
    switch (gsn) {
    case GSN_ALLOC_NODEID_CONF:
    {
      const AllocNodeIdConf * const conf =
        CAST_CONSTPTR(AllocNodeIdConf, signal->getDataPtr());
      return 0;
    }
    case GSN_ALLOC_NODEID_REF:
    {
      const AllocNodeIdRef * const ref =
        CAST_CONSTPTR(AllocNodeIdRef, signal->getDataPtr());
      if (ref->errorCode == AllocNodeIdRef::NotMaster ||
          ref->errorCode == AllocNodeIdRef::Busy)
      {
        do_send = 1;
        nodeId = refToNode(ref->masterRef);
        continue;
      }
      return ref->errorCode;
    }
    case GSN_NF_COMPLETEREP:
    {
      const NFCompleteRep * const rep =
        CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
      ndbout_c("Node %d fail completed", rep->failedNodeId);
#endif
      if (rep->failedNodeId == nodeId)
        nodeId = 0;
      continue;
    }
    case GSN_NODE_FAILREP:{
      // ignore NF_COMPLETEREP will come
      continue;
    }
    default:
      report_unknown_signal(signal);
      return SEND_OR_RECEIVE_FAILED;
    }
  }
  return 0;
}

bool
MgmtSrvr::alloc_node_id(NodeId * nodeId, 
			enum ndb_mgm_node_type type,
			struct sockaddr *client_addr, 
			SOCKET_SIZE_TYPE *client_addr_len,
			BaseString &error_string)
{
  DBUG_ENTER("MgmtSrvr::alloc_node_id");
  DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d",
		       *nodeId, type, client_addr));
  if (g_no_nodeid_checks) {
    if (*nodeId == 0) {
      error_string.appfmt("no-nodeid-checks set in management server.\n"
			  "node id must be set explicitly in connectstring");
      DBUG_RETURN(false);
    }
    DBUG_RETURN(true);
  }
  Guard g(m_node_id_mutex);
  int no_mgm= 0;
  NodeBitmask connected_nodes(m_reserved_nodes);
  get_connected_nodes(connected_nodes);
  {
    for(Uint32 i = 0; i < MAX_NODES; i++)
      if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
	no_mgm++;
  }
  bool found_matching_id= false;
  bool found_matching_type= false;
  bool found_free_node= false;
  unsigned id_found= 0;
  const char *config_hostname= 0;
  struct in_addr config_addr= {0};
  int r_config_addr= -1;
  unsigned type_c= 0;

  if(NdbMutex_Lock(m_configMutex))
  {
    error_string.appfmt("unable to lock configuration mutex");
    return false;
  }
  ndb_mgm_configuration_iterator
    iter(* _config->m_configValues, CFG_SECTION_NODE);
  for(iter.first(); iter.valid(); iter.next()) {
    unsigned tmp= 0;
    if(iter.get(CFG_NODE_ID, &tmp)) require(false);
    if (*nodeId && *nodeId != tmp)
      continue;
    found_matching_id= true;
    if(iter.get(CFG_TYPE_OF_SECTION, &type_c)) require(false);
    if(type_c != (unsigned)type)
      continue;
    found_matching_type= true;
    if (connected_nodes.get(tmp))
      continue;
    found_free_node= true;
    if(iter.get(CFG_NODE_HOST, &config_hostname)) require(false);
    if (config_hostname && config_hostname[0] == 0)
      config_hostname= 0;
    else if (client_addr) {
      // check hostname compatability
      const void *tmp_in= &(((sockaddr_in*)client_addr)->sin_addr);
      if((r_config_addr= Ndb_getInAddr(&config_addr, config_hostname)) != 0
	 || memcmp(&config_addr, tmp_in, sizeof(config_addr)) != 0) {
	struct in_addr tmp_addr;
	if(Ndb_getInAddr(&tmp_addr, "localhost") != 0
	   || memcmp(&tmp_addr, tmp_in, sizeof(config_addr)) != 0) {
	  // not localhost
#if 0
	  ndbout << "MgmtSrvr::getFreeNodeId compare failed for \""
		 << config_hostname
		 << "\" id=" << tmp << endl;
#endif
	  continue;
	}
	// connecting through localhost
	// check if config_hostname is local
	if (!SocketServer::tryBind(0,config_hostname)) {
	  continue;
	}
      }
    } else { // client_addr == 0
      if (!SocketServer::tryBind(0,config_hostname)) {
	continue;
      }
    }
    if (*nodeId != 0 ||
	type != NDB_MGM_NODE_TYPE_MGM ||
	no_mgm == 1) { // any match is ok

      if (config_hostname == 0 &&
	  *nodeId == 0 &&
	  type != NDB_MGM_NODE_TYPE_MGM)
      {
	if (!id_found) // only set if not set earlier
	  id_found= tmp;
	continue; /* continue looking for a nodeid with specified
		   * hostname
		   */
      }
      assert(id_found == 0);
      id_found= tmp;
      break;
    }
    if (id_found) { // mgmt server may only have one match
      error_string.appfmt("Ambiguous node id's %d and %d.\n"
			  "Suggest specifying node id in connectstring,\n"
			  "or specifying unique host names in config file.",
			  id_found, tmp);
      NdbMutex_Unlock(m_configMutex);
      DBUG_RETURN(false);
    }
    if (config_hostname == 0) {
      error_string.appfmt("Ambiguity for node id %d.\n"
			  "Suggest specifying node id in connectstring,\n"
			  "or specifying unique host names in config file,\n"
			  "or specifying just one mgmt server in config file.",
			  tmp);
      DBUG_RETURN(false);
    }
    id_found= tmp; // mgmt server matched, check for more matches
  }
  NdbMutex_Unlock(m_configMutex);

  if (id_found && client_addr != 0)
  {
    int res = alloc_node_id_req(id_found);
    unsigned save_id_found = id_found;
    switch (res)
    {
    case 0:
      // ok continue
      break;
    case NO_CONTACT_WITH_DB_NODES:
      // ok continue
      break;
    default:
      // something wrong
      id_found = 0;
      break;

    }
    if (id_found == 0)
    {
      char buf[128];
      ndb_error_string(res, buf, sizeof(buf));
      error_string.appfmt("Cluster refused allocation of id %d. Error: %d (%s).",
			  save_id_found, res, buf);
      g_eventLogger.warning("Cluster refused allocation of id %d. "
                            "Connection from ip %s. "
                            "Returned error string \"%s\"", save_id_found,
                            inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr),
                            error_string.c_str());
      DBUG_RETURN(false);
    }
  }

  if (id_found)
  {
    *nodeId= id_found;
    DBUG_PRINT("info", ("allocating node id %d",*nodeId));
    {
      int r= 0;
      if (client_addr)
	m_connect_address[id_found]=
	  ((struct sockaddr_in *)client_addr)->sin_addr;
      else if (config_hostname)
	r= Ndb_getInAddr(&(m_connect_address[id_found]), config_hostname);
      else {
	char name[256];
	r= gethostname(name, sizeof(name));
	if (r == 0) {
	  name[sizeof(name)-1]= 0;
	  r= Ndb_getInAddr(&(m_connect_address[id_found]), name);
	}
      }
      if (r)
	m_connect_address[id_found].s_addr= 0;
    }
    m_reserved_nodes.set(id_found);
    if (theFacade && id_found != theFacade->ownId())
    {
      /**
       * Make sure we're ready to accept connections from this node
       */
      theFacade->lock_mutex();
      theFacade->doConnect(id_found);
      theFacade->unlock_mutex();
    }
    
    char tmp_str[128];
    m_reserved_nodes.getText(tmp_str);
    g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.",
		       id_found, get_connect_address(id_found), tmp_str);
    DBUG_RETURN(true);
  }

  if (found_matching_type && !found_free_node) {
    // we have a temporary error which might be due to that 
    // we have got the latest connect status from db-nodes.  Force update.
    global_flag_send_heartbeat_now= 1;
  }

  BaseString type_string, type_c_string;
  {
    const char *alias, *str;
    alias= ndb_mgm_get_node_type_alias_string(type, &str);
    type_string.assfmt("%s(%s)", alias, str);
    alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)type_c,
					      &str);
    type_c_string.assfmt("%s(%s)", alias, str);
  }

  if (*nodeId == 0) {
    if (found_matching_id)
      if (found_matching_type)
	if (found_free_node)
	  error_string.appfmt("Connection done from wrong host ip %s.",
			      (client_addr)?
			        inet_ntoa(((struct sockaddr_in *)
					 (client_addr))->sin_addr):"");
	else
	  error_string.appfmt("No free node id found for %s.",
			      type_string.c_str());
      else
	error_string.appfmt("No %s node defined in config file.",
			    type_string.c_str());
    else
      error_string.append("No nodes defined in config file.");
  } else {
    if (found_matching_id)
      if (found_matching_type)
	if (found_free_node) {
	  // have to split these into two since inet_ntoa overwrites itself
	  error_string.appfmt("Connection with id %d done from wrong host ip %s,",
			      *nodeId, inet_ntoa(((struct sockaddr_in *)
						  (client_addr))->sin_addr));
	  error_string.appfmt(" expected %s(%s).", config_hostname,
			      r_config_addr ?
			      "lookup failed" : inet_ntoa(config_addr));
	} else
	  error_string.appfmt("Id %d already allocated by another node.",
			      *nodeId);
      else
	error_string.appfmt("Id %d configured as %s, connect attempted as %s.",
			    *nodeId, type_c_string.c_str(),
			    type_string.c_str());
    else
      error_string.appfmt("No node defined with id=%d in config file.",
			  *nodeId);
  }

  g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. "
			"Returned error string \"%s\"",
			*nodeId,
			client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>",
			error_string.c_str());

  NodeBitmask connected_nodes2;
  get_connected_nodes(connected_nodes2);
  {
    BaseString tmp_connected, tmp_not_connected;
    for(Uint32 i = 0; i < MAX_NODES; i++)
    {
      if (connected_nodes2.get(i))
      {
	if (!m_reserved_nodes.get(i))
	  tmp_connected.appfmt(" %d", i);
      }
      else if (m_reserved_nodes.get(i))
      {
	tmp_not_connected.appfmt(" %d", i);
      }
    }
    if (tmp_connected.length() > 0)
      g_eventLogger.info("Mgmt server state: node id's %s connected but not reserved", 
			 tmp_connected.c_str());
    if (tmp_not_connected.length() > 0)
      g_eventLogger.info("Mgmt server state: node id's %s not connected but reserved",
			 tmp_not_connected.c_str());
  }
  DBUG_RETURN(false);
}

bool
MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const 
{
  NodeId tmp = * nodeId;

  tmp++;
  while(nodeTypes[tmp] != type && tmp < MAX_NODES)
    tmp++;
  
  if(tmp == MAX_NODES){
    return false;
  }

  * nodeId = tmp;
  return true;
}

#include "Services.hpp"

void
MgmtSrvr::eventReport(const Uint32 * theData)
{
  const EventReport * const eventReport = (EventReport *)&theData[0];
  
  NodeId nodeId = eventReport->getNodeId();
  Ndb_logevent_type type = eventReport->getEventType();
  // Log event
  g_eventLogger.log(type, theData, nodeId, 
		    &m_event_listner[0].m_logLevel);  
  m_event_listner.log(type, theData, nodeId);
}

/***************************************************************************
 * Backup
 ***************************************************************************/

int
MgmtSrvr::startBackup(Uint32& backupId, int waitCompleted)
{
  SignalSender ss(theFacade);
  ss.lock(); // lock will be released on exit

  NodeId nodeId = m_master_node;
  if (okToSendTo(nodeId, false) != 0)
  {
    bool next;
    nodeId = m_master_node = 0;
    while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
          okToSendTo(nodeId, false) != 0);
    if(!next)
      return NO_CONTACT_WITH_DB_NODES;
  }

  SimpleSignal ssig;
  BackupReq* req = CAST_PTR(BackupReq, ssig.getDataPtrSend());
  ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_BACKUP_REQ, 
	   BackupReq::SignalLength);
  
  req->senderData = 19;
  req->backupDataLen = 0;
  assert(waitCompleted < 3);
  req->flags = waitCompleted & 0x3;

  BackupEvent event;
  int do_send = 1;
  while (1) {
    if (do_send)
    {
      if (ss.sendSignal(nodeId, &ssig) != SEND_OK) {
	return SEND_OR_RECEIVE_FAILED;
      }
      if (waitCompleted == 0)
	return 0;
      do_send = 0;
    }
    SimpleSignal *signal = ss.waitFor();

    int gsn = signal->readSignalNumber();
    switch (gsn) {
    case GSN_BACKUP_CONF:{
      const BackupConf * const conf = 
	CAST_CONSTPTR(BackupConf, signal->getDataPtr());
      event.Event = BackupEvent::BackupStarted;
      event.Started.BackupId = conf->backupId;
      event.Nodes = conf->nodes;
#ifdef VM_TRACE
      ndbout_c("Backup(%d) master is %d", conf->backupId,
	       refToNode(signal->header.theSendersBlockRef));
#endif
      backupId = conf->backupId;
      if (waitCompleted == 1)
	return 0;
      // wait for next signal
      break;
    }
    case GSN_BACKUP_COMPLETE_REP:{
      const BackupCompleteRep * const rep = 
	CAST_CONSTPTR(BackupCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
      ndbout_c("Backup(%d) completed %d", rep->backupId);
#endif
      event.Event = BackupEvent::BackupCompleted;
      event.Completed.BackupId = rep->backupId;
    
      event.Completed.NoOfBytes = rep->noOfBytes;
      event.Completed.NoOfLogBytes = rep->noOfLogBytes;
      event.Completed.NoOfRecords = rep->noOfRecords;
      event.Completed.NoOfLogRecords = rep->noOfLogRecords;
      event.Completed.stopGCP = rep->stopGCP;
      event.Completed.startGCP = rep->startGCP;
      event.Nodes = rep->nodes;

      backupId = rep->backupId;
      return 0;
    }
    case GSN_BACKUP_REF:{
      const BackupRef * const ref = 
	CAST_CONSTPTR(BackupRef, signal->getDataPtr());
      if(ref->errorCode == BackupRef::IAmNotMaster){
	m_master_node = nodeId = refToNode(ref->masterRef);
#ifdef VM_TRACE
	ndbout_c("I'm not master resending to %d", nodeId);
#endif
	do_send = 1; // try again
	continue;
      }
      event.Event = BackupEvent::BackupFailedToStart;
      event.FailedToStart.ErrorCode = ref->errorCode;
      return ref->errorCode;
    }
    case GSN_BACKUP_ABORT_REP:{
      const BackupAbortRep * const rep = 
	CAST_CONSTPTR(BackupAbortRep, signal->getDataPtr());
      event.Event = BackupEvent::BackupAborted;
      event.Aborted.Reason = rep->reason;
      event.Aborted.BackupId = rep->backupId;
      event.Aborted.ErrorCode = rep->reason;
#ifdef VM_TRACE
      ndbout_c("Backup %d aborted", rep->backupId);
#endif
      return rep->reason;
    }
    case GSN_NF_COMPLETEREP:{
      const NFCompleteRep * const rep =
	CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE
      ndbout_c("Node %d fail completed", rep->failedNodeId);
#endif
      if (rep->failedNodeId == nodeId ||
	  waitCompleted == 1)
	return 1326;
      // wait for next signal
      // master node will report aborted backup
      break;
    }
    case GSN_NODE_FAILREP:{
      const NodeFailRep * const rep =
	CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
      if (NodeBitmask::get(rep->theNodes,nodeId) ||
	  waitCompleted == 1)
	return 1326;
      // wait for next signal
      // master node will report aborted backup
      break;
    }
    default:
      report_unknown_signal(signal);
      return SEND_OR_RECEIVE_FAILED;
    }
  }
}

int 
MgmtSrvr::abortBackup(Uint32 backupId)
{
  SignalSender ss(theFacade);

  bool next;
  NodeId nodeId = 0;
  while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
	theFacade->get_node_alive(nodeId) == false);
  
  if(!next){
    return NO_CONTACT_WITH_DB_NODES;
  }
  
  SimpleSignal ssig;

  AbortBackupOrd* ord = CAST_PTR(AbortBackupOrd, ssig.getDataPtrSend());
  ssig.set(ss, TestOrd::TraceAPI, BACKUP, GSN_ABORT_BACKUP_ORD, 
	   AbortBackupOrd::SignalLength);
  
  ord->requestType = AbortBackupOrd::ClientAbort;
  ord->senderData = 19;
  ord->backupId = backupId;
  
  return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
}


MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
  : m_mgmsrv(m)
{
}

MgmtSrvr::Allocated_resources::~Allocated_resources()
{
  Guard g(m_mgmsrv.m_node_id_mutex);
  if (!m_reserved_nodes.isclear()) {
    m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); 
    // node has been reserved, force update signal to ndb nodes
    global_flag_send_heartbeat_now= 1;

    char tmp_str[128];
    m_mgmsrv.m_reserved_nodes.getText(tmp_str);
    g_eventLogger.info("Mgmt server state: nodeid %d freed, m_reserved_nodes %s.",
		       get_nodeid(), tmp_str);
  }
}

void
MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
{
  m_reserved_nodes.set(id);
}

NodeId
MgmtSrvr::Allocated_resources::get_nodeid() const
{
  for(Uint32 i = 0; i < MAX_NODES; i++)
  {
    if (m_reserved_nodes.get(i))
      return i;
  }
  return 0;
}

int
MgmtSrvr::setDbParameter(int node, int param, const char * value,
			 BaseString& msg){

  if(NdbMutex_Lock(m_configMutex))
    return -1;

  /**
   * Check parameter
   */
  ndb_mgm_configuration_iterator
    iter(* _config->m_configValues, CFG_SECTION_NODE);
  if(iter.first() != 0){
    msg.assign("Unable to find node section (iter.first())");
    NdbMutex_Unlock(m_configMutex);
    return -1;
  }
  
  Uint32 type = NODE_TYPE_DB + 1;
  if(node != 0){
    if(iter.find(CFG_NODE_ID, node) != 0){
      msg.assign("Unable to find node (iter.find())");
      NdbMutex_Unlock(m_configMutex);
      return -1;
    }
    if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0){
      msg.assign("Unable to get node type(iter.get(CFG_TYPE_OF_SECTION))");
      NdbMutex_Unlock(m_configMutex);
      return -1;
    }
  } else {
    do {
      if(iter.get(CFG_TYPE_OF_SECTION, &type) != 0){
	msg.assign("Unable to get node type(iter.get(CFG_TYPE_OF_SECTION))");
	NdbMutex_Unlock(m_configMutex);
	return -1;
      }
      if(type == NODE_TYPE_DB)
	break;
    } while(iter.next() == 0);
  }
  
  if(type != NODE_TYPE_DB){
    msg.assfmt("Invalid node type or no such node (%d %d)", 
	       type, NODE_TYPE_DB);
    NdbMutex_Unlock(m_configMutex);
    return -1;
  }

  int p_type;
  unsigned val_32;
  Uint64 val_64;
  const char * val_char;
  do {
    p_type = 0;
    if(iter.get(param, &val_32) == 0){
      val_32 = atoi(value);
      break;
    }
    
    p_type++;
    if(iter.get(param, &val_64) == 0){
      val_64 = strtoll(value, 0, 10);
      break;
    }
    p_type++;
    if(iter.get(param, &val_char) == 0){
      val_char = value;
      break;
    }
    msg.assign("Could not get parameter");
    NdbMutex_Unlock(m_configMutex);
    return -1;
  } while(0);
  
  bool res = false;
  do {
    int ret = iter.get(CFG_TYPE_OF_SECTION, &type);
    assert(ret == 0);
    
    if(type != NODE_TYPE_DB)
      continue;
    
    Uint32 node;
    ret = iter.get(CFG_NODE_ID, &node);
    assert(ret == 0);
    
    ConfigValues::Iterator i2(_config->m_configValues->m_config, 
			      iter.m_config);
    switch(p_type){
    case 0:
      res = i2.set(param, val_32);
      ndbout_c("Updating node %d param: %d to %d",  node, param, val_32);
      break;
    case 1:
      res = i2.set(param, val_64);
      ndbout_c("Updating node %d param: %d to %Ld",  node, param, val_32);
      break;
    case 2:
      res = i2.set(param, val_char);
      ndbout_c("Updating node %d param: %d to %s",  node, param, val_char);
      break;
    default:
      require(false);
    }
    assert(res);
  } while(node == 0 && iter.next() == 0);

  msg.assign("Success");
  NdbMutex_Unlock(m_configMutex);
  return 0;
}
int
MgmtSrvr::setConnectionDbParameter(int node1, 
				   int node2,
				   int param,
				   int value,
				   BaseString& msg){
  Uint32 current_value,new_value;

  DBUG_ENTER("MgmtSrvr::setConnectionDbParameter");

  if(NdbMutex_Lock(m_configMutex))
  {
    DBUG_RETURN(-1);
  }

  ndb_mgm_configuration_iterator 
    iter(* _config->m_configValues, CFG_SECTION_CONNECTION);

  if(iter.first() != 0){
    msg.assign("Unable to find connection section (iter.first())");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-1);
  }

  for(;iter.valid();iter.next()) {
    Uint32 n1,n2;
    iter.get(CFG_CONNECTION_NODE_1, &n1);
    iter.get(CFG_CONNECTION_NODE_2, &n2);
    if((n1 == (unsigned)node1 && n2 == (unsigned)node2)
       || (n1 == (unsigned)node2 && n2 == (unsigned)node1))
      break;
  }
  if(!iter.valid()) {
    msg.assign("Unable to find connection between nodes");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-2);
  }
  
  if(iter.get(param, &current_value) != 0) {
    msg.assign("Unable to get current value of parameter");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-3);
  }

  ConfigValues::Iterator i2(_config->m_configValues->m_config, 
			    iter.m_config);

  if(i2.set(param, (unsigned)value) == false) {
    msg.assign("Unable to set new value of parameter");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-4);
  }
  
  if(iter.get(param, &new_value) != 0) {
    msg.assign("Unable to get parameter after setting it.");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-5);
  }

  msg.assfmt("%u -> %u",current_value,new_value);
  NdbMutex_Unlock(m_configMutex);
  DBUG_RETURN(1);
}


int
MgmtSrvr::getConnectionDbParameter(int node1, 
				   int node2,
				   int param,
				   int *value,
				   BaseString& msg){
  DBUG_ENTER("MgmtSrvr::getConnectionDbParameter");

  if(NdbMutex_Lock(m_configMutex))
  {
    DBUG_RETURN(-1);
  }

  ndb_mgm_configuration_iterator
    iter(* _config->m_configValues, CFG_SECTION_CONNECTION);

  if(iter.first() != 0){
    msg.assign("Unable to find connection section (iter.first())");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-1);
  }

  for(;iter.valid();iter.next()) {
    Uint32 n1=0,n2=0;
    iter.get(CFG_CONNECTION_NODE_1, &n1);
    iter.get(CFG_CONNECTION_NODE_2, &n2);
    if((n1 == (unsigned)node1 && n2 == (unsigned)node2)
       || (n1 == (unsigned)node2 && n2 == (unsigned)node1))
      break;
  }
  if(!iter.valid()) {
    msg.assign("Unable to find connection between nodes");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-1);
  }
  
  if(iter.get(param, (Uint32*)value) != 0) {
    msg.assign("Unable to get current value of parameter");
    NdbMutex_Unlock(m_configMutex);
    DBUG_RETURN(-1);
  }

  msg.assfmt("%d",*value);
  NdbMutex_Unlock(m_configMutex);
  DBUG_RETURN(1);
}

void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
{
  if (theFacade->get_registry()->connect_server(sockfd))
  {
    /**
     * Force an update_connections() so that the
     * ClusterMgr and TransporterFacade is up to date
     * with the new connection.
     * Important for correct node id reservation handling
     */
    NdbMutex_Lock(theFacade->theMutexPtr);
    theFacade->get_registry()->update_connections();
    NdbMutex_Unlock(theFacade->theMutexPtr);
  }
}

int MgmtSrvr::connect_to_self(void)
{
  int r= 0;
  m_local_mgm_handle= ndb_mgm_create_handle();
  snprintf(m_local_mgm_connect_string,sizeof(m_local_mgm_connect_string),
           "localhost:%u",getPort());
  ndb_mgm_set_connectstring(m_local_mgm_handle, m_local_mgm_connect_string);

  if((r= ndb_mgm_connect(m_local_mgm_handle, 0, 0, 0)) < 0)
  {
    ndb_mgm_destroy_handle(&m_local_mgm_handle);
    return r;
  }
  // TransporterRegistry now owns this NdbMgmHandle and will destroy it.
  theFacade->get_registry()->set_mgm_handle(m_local_mgm_handle);

  return 0;
}



template class MutexVector<unsigned short>;
template class MutexVector<Ndb_mgmd_event_service::Event_listener>;
template class MutexVector<EventSubscribeReq>;