/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "IPCConfig.hpp" #include <NdbOut.hpp> #include <NdbHost.h> #include <TransporterDefinitions.hpp> #include <TransporterRegistry.hpp> #include <Properties.hpp> #include <mgmapi_configuration.hpp> #include <mgmapi_config_parameters.h> #if defined DEBUG_TRANSPORTER #define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; #else #define DEBUG(t) #endif IPCConfig::IPCConfig(Properties * p) { theNoOfRemoteNodes = 0; the_ownId = 0; if(p != 0) props = new Properties(* p); else props = 0; } IPCConfig::~IPCConfig() { if(props != 0){ delete props; } } int IPCConfig::init(){ Uint32 nodeId; if(props == 0) return -1; if(!props->get("LocalNodeId", &nodeId)) { DEBUG( "Did not find local node id." ); return -1; } the_ownId = nodeId; Uint32 noOfConnections; if(!props->get("NoOfConnections", &noOfConnections)) { DEBUG( "Did not find noOfConnections." ); return -1; } for(Uint32 i = 0; i<noOfConnections; i++){ const Properties * tmp; Uint32 node1, node2; if(!props->get("Connection", i, &tmp)) { DEBUG( "Did not find Connection." ); return -1; } if(!tmp->get("NodeId1", &node1)) { DEBUG( "Did not find NodeId1." ); return -1; } if(!tmp->get("NodeId2", &node2)) { DEBUG( "Did not find NodeId2." ); return -1; } if(node1 == the_ownId && node2 != the_ownId) if(!addRemoteNodeId(node2)) { DEBUG( "addRemoteNodeId(node2) failed." ); return -1; } if(node1 != the_ownId && node2 == the_ownId) if(!addRemoteNodeId(node1)) { DEBUG( "addRemoteNodeId(node2) failed." ); return -1; } } return 0; } bool IPCConfig::addRemoteNodeId(NodeId nodeId){ for(int i = 0; i<theNoOfRemoteNodes; i++) if(theRemoteNodeIds[i] == nodeId) return false; theRemoteNodeIds[theNoOfRemoteNodes] = nodeId; theNoOfRemoteNodes++; return true; } /** * Returns no of transporters configured */ int IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){ int noOfTransportersCreated = 0; Uint32 noOfConnections; if(!props->get("NoOfConnections", &noOfConnections)) return -1; for (Uint32 i = 0; i < noOfConnections; i++){ const Properties * tmp; Uint32 nodeId1, nodeId2; const char * host1; const char * host2; if(!props->get("Connection", i, &tmp)) continue; if(!tmp->get("NodeId1", &nodeId1)) continue; if(!tmp->get("NodeId2", &nodeId2)) continue; if(nodeId1 != the_ownId && nodeId2 != the_ownId) continue; Uint32 sendSignalId; Uint32 compression; Uint32 checksum; if(!tmp->get("SendSignalId", &sendSignalId)) continue; if(!tmp->get("Checksum", &checksum)) continue; const char * type; if(!tmp->get("Type", &type)) continue; if(strcmp("SHM", type) == 0){ SHM_TransporterConfiguration conf; conf.localNodeId = the_ownId; conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); conf.checksum = checksum; conf.signalId = sendSignalId; if(!tmp->get("ShmKey", &conf.shmKey)) continue; if(!tmp->get("ShmSize", &conf.shmSize)) continue; if(!theTransporterRegistry->createTransporter(&conf)){ ndbout << "Failed to create SHM Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; continue; } else { noOfTransportersCreated++; continue; } } else if(strcmp("SCI", type) == 0){ SCI_TransporterConfiguration conf; conf.localNodeId = the_ownId; conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2); conf.checksum = checksum; conf.signalId = sendSignalId; if(!tmp->get("SendLimit", &conf.sendLimit)) continue; if(!tmp->get("SharedBufferSize", &conf.bufferSize)) continue; if(the_ownId == nodeId1){ if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue; if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue; if(conf.nLocalAdapters > 1){ if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue; } } else { if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue; if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue; if(conf.nLocalAdapters > 1){ if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue; } } if(!theTransporterRegistry->createTransporter(&conf)){ ndbout << "Failed to create SCI Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; continue; } else { noOfTransportersCreated++; continue; } } if(!tmp->get("HostName1", &host1)) continue; if(!tmp->get("HostName2", &host2)) continue; Uint32 ownNodeId; Uint32 remoteNodeId; const char * ownHostName; const char * remoteHostName; if(nodeId1 == the_ownId){ ownNodeId = nodeId1; ownHostName = host1; remoteNodeId = nodeId2; remoteHostName = host2; } else if(nodeId2 == the_ownId){ ownNodeId = nodeId2; ownHostName = host2; remoteNodeId = nodeId1; remoteHostName = host1; } else { continue; } if(strcmp("TCP", type) == 0){ TCP_TransporterConfiguration conf; if(!tmp->get("PortNumber", &conf.port)) continue; if(!tmp->get("SendBufferSize", &conf.sendBufferSize)) continue; if(!tmp->get("MaxReceiveSize", &conf.maxReceiveSize)) continue; const char * proxy; if (tmp->get("Proxy", &proxy)) { if (strlen(proxy) > 0 && nodeId2 == the_ownId) { // TODO handle host:port conf.port = atoi(proxy); } } conf.sendBufferSize *= MAX_MESSAGE_SIZE; conf.maxReceiveSize *= MAX_MESSAGE_SIZE; conf.remoteHostName = remoteHostName; conf.localHostName = ownHostName; conf.remoteNodeId = remoteNodeId; conf.localNodeId = ownNodeId; conf.checksum = checksum; conf.signalId = sendSignalId; if(!theTransporterRegistry->createTransporter(&conf)){ ndbout << "Failed to create TCP Transporter from: " << ownNodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } } else if(strcmp("OSE", type) == 0){ OSE_TransporterConfiguration conf; if(!tmp->get("PrioASignalSize", &conf.prioASignalSize)) continue; if(!tmp->get("PrioBSignalSize", &conf.prioBSignalSize)) continue; if(!tmp->get("ReceiveArraySize", &conf.receiveBufferSize)) continue; conf.remoteHostName = remoteHostName; conf.localHostName = ownHostName; conf.remoteNodeId = remoteNodeId; conf.localNodeId = ownNodeId; conf.checksum = checksum; conf.signalId = sendSignalId; if(!theTransporterRegistry->createTransporter(&conf)){ ndbout << "Failed to create OSE Transporter from: " << ownNodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } } else { continue; } } return noOfTransportersCreated; } /** * Supply a nodeId, * and get next higher node id * Returns false if none found */ bool IPCConfig::getNextRemoteNodeId(NodeId & nodeId) const { NodeId returnNode = MAX_NODES + 1; for(int i = 0; i<theNoOfRemoteNodes; i++) if(theRemoteNodeIds[i] > nodeId){ if(theRemoteNodeIds[i] < returnNode){ returnNode = theRemoteNodeIds[i]; } } if(returnNode == (MAX_NODES + 1)) return false; nodeId = returnNode; return true; } Uint32 IPCConfig::getREPHBFrequency(NodeId id) const { const Properties * tmp; Uint32 out; /** * Todo: Fix correct heartbeat */ if (!props->get("Node", id, &tmp) || !tmp->get("HeartbeatIntervalRepRep", &out)) { DEBUG("Illegal Node or HeartbeatIntervalRepRep in config."); out = 10000; } return out; } const char* IPCConfig::getNodeType(NodeId id) const { const char * out; const Properties * tmp; if (!props->get("Node", id, &tmp) || !tmp->get("Type", &out)) { DEBUG("Illegal Node or NodeType in config."); out = "Unknown"; } return out; } #include <mgmapi.h> Uint32 IPCConfig::configureTransporters(Uint32 nodeId, const class ndb_mgm_configuration & config, class TransporterRegistry & tr){ DBUG_ENTER("IPCConfig::configureTransporters"); Uint32 noOfTransportersCreated= 0; ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); for(iter.first(); iter.valid(); iter.next()){ Uint32 nodeId1, nodeId2, remoteNodeId; const char * remoteHostName= 0, * localHostName= 0; if(iter.get(CFG_CONNECTION_NODE_1, &nodeId1)) continue; if(iter.get(CFG_CONNECTION_NODE_2, &nodeId2)) continue; if(nodeId1 != nodeId && nodeId2 != nodeId) continue; remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1); { const char * host1= 0, * host2= 0; iter.get(CFG_CONNECTION_HOSTNAME_1, &host1); iter.get(CFG_CONNECTION_HOSTNAME_2, &host2); localHostName = (nodeId == nodeId1 ? host1 : host2); remoteHostName = (nodeId == nodeId1 ? host2 : host1); } Uint32 sendSignalId = 1; Uint32 checksum = 1; if(iter.get(CFG_CONNECTION_SEND_SIGNAL_ID, &sendSignalId)) continue; if(iter.get(CFG_CONNECTION_CHECKSUM, &checksum)) continue; Uint32 type = ~0; if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; Uint32 server_port= 0; if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break; if (nodeId <= nodeId1 && nodeId <= nodeId2) { tr.add_transporter_interface(localHostName, server_port); } DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", nodeId, remoteNodeId, tmp_server_port, sendSignalId, checksum)); switch(type){ case CONNECTION_TYPE_SHM:{ SHM_TransporterConfiguration conf; conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; conf.checksum = checksum; conf.signalId = sendSignalId; if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break; if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break; conf.port= server_port; if(!tr.createTransporter(&conf)){ DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SHM Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; } else { noOfTransportersCreated++; } DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d", conf.shmKey, conf.shmSize)); break; } case CONNECTION_TYPE_SCI:{ SCI_TransporterConfiguration conf; const char * host1, * host2; conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; conf.checksum = checksum; conf.signalId = sendSignalId; conf.port= tmp_server_port; if(iter.get(CFG_SCI_HOSTNAME_1, &host1)) break; if(iter.get(CFG_SCI_HOSTNAME_2, &host2)) break; conf.localHostName = (nodeId == nodeId1 ? host1 : host2); conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1); if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break; if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break; if (nodeId == nodeId1) { if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break; if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break; } else { if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break; if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break; } if (conf.remoteSciNodeId1 == 0) { conf.nLocalAdapters = 1; } else { conf.nLocalAdapters = 2; } if(!tr.createTransporter(&conf)){ DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", conf.localNodeId, conf.remoteNodeId)); ndbout << "Failed to create SCI Transporter from: " << conf.localNodeId << " to: " << conf.remoteNodeId << endl; } else { DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d", conf.nLocalAdapters, conf.remoteSciNodeId0)); DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d", conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize)); if (conf.nLocalAdapters > 1) { DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d", conf.remoteSciNodeId1)); } noOfTransportersCreated++; continue; } } case CONNECTION_TYPE_TCP:{ TCP_TransporterConfiguration conf; if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break; if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break; conf.port= server_port; const char * proxy; if (!iter.get(CFG_TCP_PROXY, &proxy)) { if (strlen(proxy) > 0 && nodeId2 == nodeId) { // TODO handle host:port conf.port = atoi(proxy); } } conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; conf.localHostName = localHostName; conf.remoteHostName = remoteHostName; conf.checksum = checksum; conf.signalId = sendSignalId; if(!tr.createTransporter(&conf)){ ndbout << "Failed to create TCP Transporter from: " << nodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d", conf.sendBufferSize, conf.maxReceiveSize)); break; case CONNECTION_TYPE_OSE:{ OSE_TransporterConfiguration conf; if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.prioASignalSize)) break; if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.prioBSignalSize)) break; if(iter.get(CFG_OSE_RECEIVE_ARRAY_SIZE, &conf.receiveBufferSize)) break; conf.localNodeId = nodeId; conf.remoteNodeId = remoteNodeId; conf.localHostName = localHostName; conf.remoteHostName = remoteHostName; conf.checksum = checksum; conf.signalId = sendSignalId; if(!tr.createTransporter(&conf)){ ndbout << "Failed to create OSE Transporter from: " << nodeId << " to: " << remoteNodeId << endl; } else { noOfTransportersCreated++; } } default: ndbout << "Unknown transporter type from: " << nodeId << " to: " << remoteNodeId << endl; break; } } } DBUG_RETURN(noOfTransportersCreated); }