Commit 8207d5ce authored by unknown's avatar unknown

Impl5 of WL2278 - dynamic port allocation for cluster nodes

Treat the management server specially.

It should always be the server in a transporter as we then have a known
port to connect to on node restart.

allows a mgm connection (i.e. to the management server port, our known port)
to be transformed into a transporter connection.

Also, clean up the struct TransporterConfiguration (used to be a struct for
each transporter type. now there's just one)


ndb/include/transporter/TransporterDefinitions.hpp:
  Clean up XXX_TransporterConfiguration and create one TransporterConfiguration structure.
  
  Makes things easier to read (especially in IPCConfig.cpp)
ndb/include/transporter/TransporterRegistry.hpp:
  add get_mgm_handle (to compliment the set_mgm_handle function)
  
  clean up createTransporter to use just one TransporterConfiguration struct
ndb/include/util/SocketClient.hpp:
  Introduce connect_without_auth() to ignore any authentication method that may have been set.
ndb/src/common/mgmcommon/IPCConfig.cpp:
  Remove dead IPCConfig::configureTransporters(TransporterRegistry*)
  
  Fixup IPCConfig::configureTransporters(Uint32 nodeId...)
  - use the 'one struct TransporterConfiguration to rule them all'
  - make MGM node the server
  - fix switch statement for transporter types
    - close } in strange place
    - possible inadvertent fall through
ndb/src/common/transporter/OSE_Transporter.cpp:
  a partial fix for the introduction of new parameters.
  
  OSE shouldn't build how it is now. Better to keep the build broken than have it build and fail strangely at runtime.
ndb/src/common/transporter/OSE_Transporter.hpp:
  a partial fix for the introduction of new parameters.
  
  OSE shouldn't build how it is now. Better to keep the build broken than have it build and fail strangely at runtime.
ndb/src/common/transporter/SCI_Transporter.cpp:
  should be correct for SCI transporter.
ndb/src/common/transporter/SCI_Transporter.hpp:
  should be correct for SCI transporter
ndb/src/common/transporter/SHM_Transporter.cpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/SHM_Transporter.hpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/TCP_Transporter.cpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/TCP_Transporter.hpp:
  add new parameters for Transporter constructor
ndb/src/common/transporter/Transporter.cpp:
  Add new parameters
   - isMgmConnection
  	requires transforming from mgm to transporter
   - serverNodeId
  	node id that will serve as the server
  
  Treat connection differently if isMgmConnection (send a special mgm command first)
ndb/src/common/transporter/Transporter.hpp:
  add fields to constructor
  
  add isMgmConnection member (if true, have to transform a mgm connection)
ndb/src/common/transporter/TransporterRegistry.cpp:
  createTransporter -> createTCPTransporter (etc)
  
  add extra transporter constructor parameters (from config)
  
  modify to use changes to TransporterConfiguration
ndb/src/common/util/SocketClient.cpp:
  SocketClient::connect_without_auth()
  
  Temporarily disables authentication and connects.
  This is useful if you're trying to change what this
  SocketClient object is for (e.g. from mgm to ndb)
ndb/src/common/util/SocketServer.cpp:
  Don't runSession or close socket when entering sessionThread if m_stopped
ndb/src/mgmsrv/ConfigInfo.cpp:
  fixPortNumber
  - Get port number from the MGM node as it will always be the server
ndb/src/mgmsrv/MgmtSrvr.cpp:
  transporter_connect(sockfd)
  - transform this mgm connection into a transporter connection
ndb/src/mgmsrv/MgmtSrvr.hpp:
  prototype for transporter_connect
ndb/src/mgmsrv/Services.cpp:
  add command: "transporter connect"
  
  stops the MgmApiSession and replaces it with a transporter connection
ndb/src/mgmsrv/Services.hpp:
  prototype for transporter_connect
parent 098fc224
...@@ -49,60 +49,41 @@ enum SendStatus { ...@@ -49,60 +49,41 @@ enum SendStatus {
const Uint32 MAX_MESSAGE_SIZE = (12+4+4+(4*25)+(3*4)+4*4096); const Uint32 MAX_MESSAGE_SIZE = (12+4+4+(4*25)+(3*4)+4*4096);
/** /**
* TCP Transporter Configuration * TransporterConfiguration
*
* used for setting up a transporter. the union member specific is for
* information specific to a transporter type.
*/ */
struct TCP_TransporterConfiguration { struct TransporterConfiguration {
Uint32 port; Uint32 port;
const char *remoteHostName; const char *remoteHostName;
const char *localHostName; const char *localHostName;
NodeId remoteNodeId; NodeId remoteNodeId;
NodeId localNodeId; NodeId localNodeId;
Uint32 sendBufferSize; // Size of SendBuffer of priority B NodeId serverNodeId;
Uint32 maxReceiveSize; // Maximum no of bytes to receive
bool checksum; bool checksum;
bool signalId; bool signalId;
}; bool isMgmConnection; // is a mgm connection, requires transforming
/** union { // Transporter specific configuration information
* SHM Transporter Configuration
*/
struct SHM_TransporterConfiguration {
Uint32 port;
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
struct {
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
} tcp;
struct {
Uint32 shmKey; Uint32 shmKey;
Uint32 shmSize; Uint32 shmSize;
int signum; int signum;
}; } shm;
/**
* OSE Transporter Configuration
*/
struct OSE_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool checksum;
bool signalId;
struct {
Uint32 prioASignalSize; Uint32 prioASignalSize;
Uint32 prioBSignalSize; Uint32 prioBSignalSize;
Uint32 receiveBufferSize; // In number of signals } ose;
};
/** struct {
* SCI Transporter Configuration
*/
struct SCI_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
Uint32 port;
Uint32 sendLimit; // Packet size Uint32 sendLimit; // Packet size
Uint32 bufferSize; // Buffer size Uint32 bufferSize; // Buffer size
...@@ -110,13 +91,8 @@ struct SCI_TransporterConfiguration { ...@@ -110,13 +91,8 @@ struct SCI_TransporterConfiguration {
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1 Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2 Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
} sci;
NodeId localNodeId; // Local node Id };
NodeId remoteNodeId; // Remote node Id
bool checksum;
bool signalId;
}; };
struct SignalHeader { struct SignalHeader {
......
...@@ -102,6 +102,7 @@ public: ...@@ -102,6 +102,7 @@ public:
unsigned sizeOfLongSignalMemory = 100); unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; }; void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
bool init(NodeId localNodeId); bool init(NodeId localNodeId);
...@@ -179,10 +180,10 @@ public: ...@@ -179,10 +180,10 @@ public:
* started, startServer is called. A transporter of the selected kind * started, startServer is called. A transporter of the selected kind
* is created and it is put in the transporter arrays. * is created and it is put in the transporter arrays.
*/ */
bool createTransporter(struct TCP_TransporterConfiguration * config); bool createTCPTransporter(struct TransporterConfiguration * config);
bool createTransporter(struct SCI_TransporterConfiguration * config); bool createSCITransporter(struct TransporterConfiguration * config);
bool createTransporter(struct SHM_TransporterConfiguration * config); bool createSHMTransporter(struct TransporterConfiguration * config);
bool createTransporter(struct OSE_TransporterConfiguration * config); bool createOSETransporter(struct TransporterConfiguration * config);
/** /**
* prepareSend * prepareSend
......
...@@ -36,6 +36,7 @@ public: ...@@ -36,6 +36,7 @@ public:
m_servaddr.sin_port = htons(m_port); m_servaddr.sin_port = htons(m_port);
}; };
NDB_SOCKET_TYPE connect(); NDB_SOCKET_TYPE connect();
NDB_SOCKET_TYPE connect_without_auth();
bool close(); bool close();
}; };
......
...@@ -110,175 +110,6 @@ IPCConfig::addRemoteNodeId(NodeId nodeId){ ...@@ -110,175 +110,6 @@ IPCConfig::addRemoteNodeId(NodeId nodeId){
return true; return true;
} }
/**
* Returns no of transporters configured
*/
int
IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
int noOfTransportersCreated = 0;
Uint32 noOfConnections;
if(!props->get("NoOfConnections", &noOfConnections)) return -1;
for (Uint32 i = 0; i < noOfConnections; i++){
const Properties * tmp;
Uint32 nodeId1, nodeId2;
const char * host1;
const char * host2;
if(!props->get("Connection", i, &tmp)) continue;
if(!tmp->get("NodeId1", &nodeId1)) continue;
if(!tmp->get("NodeId2", &nodeId2)) continue;
if(nodeId1 != the_ownId && nodeId2 != the_ownId) continue;
Uint32 sendSignalId;
Uint32 compression;
Uint32 checksum;
if(!tmp->get("SendSignalId", &sendSignalId)) continue;
if(!tmp->get("Checksum", &checksum)) continue;
const char * type;
if(!tmp->get("Type", &type)) continue;
if(strcmp("SHM", type) == 0){
SHM_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tmp->get("ShmKey", &conf.shmKey)) continue;
if(!tmp->get("ShmSize", &conf.shmSize)) continue;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create SHM Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
continue;
} else {
noOfTransportersCreated++;
continue;
}
} else if(strcmp("SCI", type) == 0){
SCI_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tmp->get("SendLimit", &conf.sendLimit)) continue;
if(!tmp->get("SharedBufferSize", &conf.bufferSize)) continue;
if(the_ownId == nodeId1){
if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
} else {
if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
}
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create SCI Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
continue;
} else {
noOfTransportersCreated++;
continue;
}
}
if(!tmp->get("HostName1", &host1)) continue;
if(!tmp->get("HostName2", &host2)) continue;
Uint32 ownNodeId;
Uint32 remoteNodeId;
const char * ownHostName;
const char * remoteHostName;
if(nodeId1 == the_ownId){
ownNodeId = nodeId1;
ownHostName = host1;
remoteNodeId = nodeId2;
remoteHostName = host2;
} else if(nodeId2 == the_ownId){
ownNodeId = nodeId2;
ownHostName = host2;
remoteNodeId = nodeId1;
remoteHostName = host1;
} else {
continue;
}
if(strcmp("TCP", type) == 0){
TCP_TransporterConfiguration conf;
if(!tmp->get("PortNumber", &conf.port)) continue;
if(!tmp->get("SendBufferSize", &conf.sendBufferSize)) continue;
if(!tmp->get("MaxReceiveSize", &conf.maxReceiveSize)) continue;
const char * proxy;
if (tmp->get("Proxy", &proxy)) {
if (strlen(proxy) > 0 && nodeId2 == the_ownId) {
// TODO handle host:port
conf.port = atoi(proxy);
}
}
conf.sendBufferSize *= MAX_MESSAGE_SIZE;
conf.maxReceiveSize *= MAX_MESSAGE_SIZE;
conf.remoteHostName = remoteHostName;
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create TCP Transporter from: "
<< ownNodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
} else if(strcmp("OSE", type) == 0){
OSE_TransporterConfiguration conf;
if(!tmp->get("PrioASignalSize", &conf.prioASignalSize))
continue;
if(!tmp->get("PrioBSignalSize", &conf.prioBSignalSize))
continue;
if(!tmp->get("ReceiveArraySize", &conf.receiveBufferSize))
continue;
conf.remoteHostName = remoteHostName;
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!theTransporterRegistry->createTransporter(&conf)){
ndbout << "Failed to create OSE Transporter from: "
<< ownNodeId << " to: " << remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
} else {
continue;
}
}
return noOfTransportersCreated;
}
/** /**
* Supply a nodeId, * Supply a nodeId,
* and get next higher node id * and get next higher node id
...@@ -335,6 +166,8 @@ Uint32 ...@@ -335,6 +166,8 @@ Uint32
IPCConfig::configureTransporters(Uint32 nodeId, IPCConfig::configureTransporters(Uint32 nodeId,
const class ndb_mgm_configuration & config, const class ndb_mgm_configuration & config,
class TransporterRegistry & tr){ class TransporterRegistry & tr){
TransporterConfiguration conf;
DBUG_ENTER("IPCConfig::configureTransporters"); DBUG_ENTER("IPCConfig::configureTransporters");
Uint32 noOfTransportersCreated= 0; Uint32 noOfTransportersCreated= 0;
...@@ -368,9 +201,35 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -368,9 +201,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
Uint32 server_port= 0; Uint32 server_port= 0;
if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break; if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break;
if (nodeId <= nodeId1 && nodeId <= nodeId2) {
/*
We check the node type. MGM node becomes server.
*/
Uint32 node1type, node2type;
ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE);
ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE);
node1iter.find(CFG_NODE_ID,nodeId1);
node2iter.find(CFG_NODE_ID,nodeId2);
node1iter.get(CFG_TYPE_OF_SECTION,&node1type);
node2iter.get(CFG_TYPE_OF_SECTION,&node2type);
conf.serverNodeId= (nodeId1 < nodeId2)? nodeId1:nodeId2;
conf.isMgmConnection= false;
if(node2type==NODE_TYPE_MGM)
{
conf.isMgmConnection= true;
conf.serverNodeId= nodeId2;
}
else if(node1type==NODE_TYPE_MGM)
{
conf.isMgmConnection= true;
conf.serverNodeId= nodeId1;
}
else if (nodeId == conf.serverNodeId) {
tr.add_transporter_interface(remoteNodeId, localHostName, server_port); tr.add_transporter_interface(remoteNodeId, localHostName, server_port);
} }
DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
nodeId, remoteNodeId, server_port, sendSignalId, checksum)); nodeId, remoteNodeId, server_port, sendSignalId, checksum));
/* /*
...@@ -386,27 +245,24 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -386,27 +245,24 @@ IPCConfig::configureTransporters(Uint32 nodeId,
if((int)server_port<0) if((int)server_port<0)
server_port= -server_port; server_port= -server_port;
switch(type){
case CONNECTION_TYPE_SHM:{
SHM_TransporterConfiguration conf;
conf.localNodeId = nodeId; conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId; conf.remoteNodeId = remoteNodeId;
conf.checksum = checksum; conf.checksum = checksum;
conf.signalId = sendSignalId; conf.signalId = sendSignalId;
conf.port = server_port;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
switch(type){
case CONNECTION_TYPE_SHM:
if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break;
{
Uint32 tmp; Uint32 tmp;
if(iter.get(CFG_SHM_SIGNUM, &tmp)) break; if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
conf.signum= tmp; conf.shm.signum= tmp;
}
conf.port= server_port;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
if(!tr.createTransporter(&conf)){ if(!tr.createSHMTransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d", DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId)); conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SHM Transporter from: " ndbout << "Failed to create SHM Transporter from: "
...@@ -414,60 +270,53 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -414,60 +270,53 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} else { } else {
noOfTransportersCreated++; noOfTransportersCreated++;
} }
DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d", DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
conf.shmKey, conf.shmSize)); "buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
break;
}
case CONNECTION_TYPE_SCI:{
SCI_TransporterConfiguration conf;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.checksum = checksum;
conf.signalId = sendSignalId;
conf.port= server_port;
conf.localHostName = localHostName; break;
conf.remoteHostName = remoteHostName;
if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break; case CONNECTION_TYPE_SCI:
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break; if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sci.sendLimit)) break;
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.sci.bufferSize)) break;
if (nodeId == nodeId1) { if (nodeId == nodeId1) {
if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break; if(iter.get(CFG_SCI_HOST2_ID_0, &conf.sci.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break; if(iter.get(CFG_SCI_HOST2_ID_1, &conf.sci.remoteSciNodeId1)) break;
} else { } else {
if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break; if(iter.get(CFG_SCI_HOST1_ID_0, &conf.sci.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break; if(iter.get(CFG_SCI_HOST1_ID_1, &conf.sci.remoteSciNodeId1)) break;
} }
if (conf.remoteSciNodeId1 == 0) { if (conf.sci.remoteSciNodeId1 == 0) {
conf.nLocalAdapters = 1; conf.sci.nLocalAdapters = 1;
} else { } else {
conf.nLocalAdapters = 2; conf.sci.nLocalAdapters = 2;
} }
if(!tr.createTransporter(&conf)){ if(!tr.createSCITransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId)); conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SCI Transporter from: " ndbout << "Failed to create SCI Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl; << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
} else { } else {
DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d", DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
conf.nLocalAdapters, conf.remoteSciNodeId0)); "remote SCI node id %d",
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d", conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize)); DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
if (conf.nLocalAdapters > 1) { "buf size = %d", conf.localHostName,
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d", conf.remoteHostName, conf.sci.sendLimit,
conf.remoteSciNodeId1)); conf.sci.bufferSize));
if (conf.sci.nLocalAdapters > 1) {
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, "
"second remote SCI node id = %d",
conf.sci.remoteSciNodeId1));
} }
noOfTransportersCreated++; noOfTransportersCreated++;
continue; continue;
} }
} break;
case CONNECTION_TYPE_TCP:{
TCP_TransporterConfiguration conf;
if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break; case CONNECTION_TYPE_TCP:
if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break; if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.tcp.sendBufferSize)) break;
if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.tcp.maxReceiveSize)) break;
conf.port= server_port;
const char * proxy; const char * proxy;
if (!iter.get(CFG_TCP_PROXY, &proxy)) { if (!iter.get(CFG_TCP_PROXY, &proxy)) {
if (strlen(proxy) > 0 && nodeId2 == nodeId) { if (strlen(proxy) > 0 && nodeId2 == nodeId) {
...@@ -476,50 +325,35 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -476,50 +325,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} }
} }
conf.localNodeId = nodeId; if(!tr.createTCPTransporter(&conf)){
conf.remoteNodeId = remoteNodeId;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tr.createTransporter(&conf)){
ndbout << "Failed to create TCP Transporter from: " ndbout << "Failed to create TCP Transporter from: "
<< nodeId << " to: " << remoteNodeId << endl; << nodeId << " to: " << remoteNodeId << endl;
} else { } else {
noOfTransportersCreated++; noOfTransportersCreated++;
} }
DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d", DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
conf.sendBufferSize, conf.maxReceiveSize)); "maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
break; break;
case CONNECTION_TYPE_OSE:{ case CONNECTION_TYPE_OSE:
OSE_TransporterConfiguration conf; if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.ose.prioASignalSize)) break;
if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.ose.prioBSignalSize)) break;
if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.prioASignalSize)) break;
if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.prioBSignalSize)) break;
if(iter.get(CFG_OSE_RECEIVE_ARRAY_SIZE, &conf.receiveBufferSize)) break;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.localHostName = localHostName;
conf.remoteHostName = remoteHostName;
conf.checksum = checksum;
conf.signalId = sendSignalId;
if(!tr.createTransporter(&conf)){ if(!tr.createOSETransporter(&conf)){
ndbout << "Failed to create OSE Transporter from: " ndbout << "Failed to create OSE Transporter from: "
<< nodeId << " to: " << remoteNodeId << endl; << nodeId << " to: " << remoteNodeId << endl;
} else { } else {
noOfTransportersCreated++; noOfTransportersCreated++;
} }
} break;
default: default:
ndbout << "Unknown transporter type from: " << nodeId << ndbout << "Unknown transporter type from: " << nodeId <<
" to: " << remoteNodeId << endl; " to: " << remoteNodeId << endl;
break; break;
} } // switch
} } // for
}
DBUG_RETURN(noOfTransportersCreated); DBUG_RETURN(noOfTransportersCreated);
} }
...@@ -32,6 +32,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize, ...@@ -32,6 +32,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
NodeId localNodeId, NodeId localNodeId,
const char * lHostName, const char * lHostName,
NodeId remoteNodeId, NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName, const char * rHostName,
int byteorder, int byteorder,
bool compression, bool compression,
...@@ -40,6 +41,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize, ...@@ -40,6 +41,7 @@ OSE_Transporter::OSE_Transporter(int _prioASignalSize,
Uint32 reportFreq) : Uint32 reportFreq) :
Transporter(localNodeId, Transporter(localNodeId,
remoteNodeId, remoteNodeId,
serverNodeId,
byteorder, byteorder,
compression, compression,
checksum, checksum,
......
...@@ -48,6 +48,7 @@ public: ...@@ -48,6 +48,7 @@ public:
NodeId localNodeId, NodeId localNodeId,
const char * lHostName, const char * lHostName,
NodeId remoteNodeId, NodeId remoteNodeId,
NodeId serverNodeId,
const char * rHostName, const char * rHostName,
int byteorder, int byteorder,
bool compression, bool compression,
......
...@@ -34,6 +34,7 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg, ...@@ -34,6 +34,7 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
Uint32 packetSize, Uint32 packetSize,
Uint32 bufferSize, Uint32 bufferSize,
Uint32 nAdapters, Uint32 nAdapters,
...@@ -41,12 +42,13 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg, ...@@ -41,12 +42,13 @@ SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
Uint16 remoteSciNodeId1, Uint16 remoteSciNodeId1,
NodeId _localNodeId, NodeId _localNodeId,
NodeId _remoteNodeId, NodeId _remoteNodeId,
NodeId serverNodeId,
bool chksm, bool chksm,
bool signalId, bool signalId,
Uint32 reportFreq) : Uint32 reportFreq) :
Transporter(t_reg, tt_SCI_TRANSPORTER, Transporter(t_reg, tt_SCI_TRANSPORTER,
lHostName, rHostName, r_port, _localNodeId, lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
_remoteNodeId, 0, false, chksm, signalId) _remoteNodeId, serverNodeID, 0, false, chksm, signalId)
{ {
DBUG_ENTER("SCI_Transporter::SCI_Transporter"); DBUG_ENTER("SCI_Transporter::SCI_Transporter");
m_PacketSize = (packetSize + 3)/4 ; m_PacketSize = (packetSize + 3)/4 ;
......
...@@ -139,6 +139,7 @@ private: ...@@ -139,6 +139,7 @@ private:
const char *local_host, const char *local_host,
const char *remote_host, const char *remote_host,
int port, int port,
bool isMgmConnection,
Uint32 packetSize, Uint32 packetSize,
Uint32 bufferSize, Uint32 bufferSize,
Uint32 nAdapters, Uint32 nAdapters,
...@@ -146,6 +147,7 @@ private: ...@@ -146,6 +147,7 @@ private:
Uint16 remoteSciNodeId1, Uint16 remoteSciNodeId1,
NodeId localNodeID, NodeId localNodeID,
NodeId remoteNodeID, NodeId remoteNodeID,
NodeId serverNodeId,
bool checksum, bool checksum,
bool signalId, bool signalId,
Uint32 reportFreq = 4096); Uint32 reportFreq = 4096);
......
...@@ -32,14 +32,17 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, ...@@ -32,14 +32,17 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
NodeId lNodeId, NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
NodeId serverNodeId,
bool checksum, bool checksum,
bool signalId, bool signalId,
key_t _shmKey, key_t _shmKey,
Uint32 _shmSize) : Uint32 _shmSize) :
Transporter(t_reg, tt_SHM_TRANSPORTER, Transporter(t_reg, tt_SHM_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId, lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, checksum, signalId), 0, false, checksum, signalId),
shmKey(_shmKey), shmKey(_shmKey),
shmSize(_shmSize) shmSize(_shmSize)
......
...@@ -36,8 +36,10 @@ public: ...@@ -36,8 +36,10 @@ public:
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
NodeId lNodeId, NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
NodeId serverNodeId,
bool checksum, bool checksum,
bool signalId, bool signalId,
key_t shmKey, key_t shmKey,
......
...@@ -68,12 +68,15 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, ...@@ -68,12 +68,15 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
NodeId lNodeId, NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
NodeId serverNodeId,
bool chksm, bool signalId, bool chksm, bool signalId,
Uint32 _reportFreq) : Uint32 _reportFreq) :
Transporter(t_reg, tt_TCP_TRANSPORTER, Transporter(t_reg, tt_TCP_TRANSPORTER,
lHostName, rHostName, r_port, lNodeId, rNodeId, lHostName, rHostName, r_port, isMgmConnection,
lNodeId, rNodeId, serverNodeId,
0, false, chksm, signalId), 0, false, chksm, signalId),
m_sendBuffer(sendBufSize) m_sendBuffer(sendBufSize)
{ {
......
...@@ -50,8 +50,10 @@ private: ...@@ -50,8 +50,10 @@ private:
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
NodeId lHostId, NodeId lHostId,
NodeId rHostId, NodeId rHostId,
NodeId serverNodeId,
bool checksum, bool signalId, bool checksum, bool signalId,
Uint32 reportFreq = 4096); Uint32 reportFreq = 4096);
......
...@@ -32,12 +32,14 @@ Transporter::Transporter(TransporterRegistry &t_reg, ...@@ -32,12 +32,14 @@ Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool _isMgmConnection,
NodeId lNodeId, NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
NodeId serverNodeId,
int _byteorder, int _byteorder,
bool _compression, bool _checksum, bool _signalId) bool _compression, bool _checksum, bool _signalId)
: m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId), : m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
isServer(lNodeId < rNodeId), isServer(lNodeId==serverNodeId), isMgmConnection(_isMgmConnection),
m_packer(_signalId, _checksum), m_packer(_signalId, _checksum),
m_type(_type), m_type(_type),
m_transporter_registry(t_reg) m_transporter_registry(t_reg)
...@@ -109,22 +111,46 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { ...@@ -109,22 +111,46 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
bool bool
Transporter::connect_client() { Transporter::connect_client() {
NDB_SOCKET_TYPE sockfd;
if(m_connected) if(m_connected)
return true; return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
if (sockfd == NDB_INVALID_SOCKET) DBUG_ENTER("Transporter::connect_client");
DBUG_PRINT("info",("port %d isMgmConnection=%d",m_r_port,isMgmConnection));
if(isMgmConnection)
sockfd= m_socket_client->connect_without_auth();
else
sockfd= m_socket_client->connect();
if(sockfd<0)
return false; return false;
DBUG_ENTER("Transporter::connect_client"); SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
if(isMgmConnection)
{
/*
We issue the magic command to the management server to
switch into transporter mode.
*/
s_output.println("transporter connect");
s_output.println("");
}
if (sockfd == NDB_INVALID_SOCKET)
return false;
// send info about own id // send info about own id
// send info about own transporter type // send info about own transporter type
SocketOutputStream s_output(sockfd);
s_output.println("%d %d", localNodeId, m_type); s_output.println("%d %d", localNodeId, m_type);
// get remote id // get remote id
int nodeId, remote_transporter_type= -1; int nodeId, remote_transporter_type= -1;
SocketInputStream s_input(sockfd);
char buf[256]; char buf[256];
if (s_input.gets(buf, 256) == 0) { if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd); NDB_CLOSE_SOCKET(sockfd);
......
...@@ -89,8 +89,10 @@ protected: ...@@ -89,8 +89,10 @@ protected:
const char *lHostName, const char *lHostName,
const char *rHostName, const char *rHostName,
int r_port, int r_port,
bool isMgmConnection,
NodeId lNodeId, NodeId lNodeId,
NodeId rNodeId, NodeId rNodeId,
NodeId serverNodeId,
int byteorder, int byteorder,
bool compression, bool compression,
bool checksum, bool checksum,
...@@ -133,6 +135,12 @@ protected: ...@@ -133,6 +135,12 @@ protected:
private: private:
/**
* means that we transform an MGM connection into
* a transporter connection
*/
bool isMgmConnection;
SocketClient *m_socket_client; SocketClient *m_socket_client;
protected: protected:
......
...@@ -248,7 +248,7 @@ TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd) ...@@ -248,7 +248,7 @@ TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
} }
bool bool
TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER #ifdef NDB_TCP_TRANSPORTER
if(!nodeIdSpecified){ if(!nodeIdSpecified){
...@@ -262,13 +262,15 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { ...@@ -262,13 +262,15 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false; return false;
TCP_Transporter * t = new TCP_Transporter(*this, TCP_Transporter * t = new TCP_Transporter(*this,
config->sendBufferSize, config->tcp.sendBufferSize,
config->maxReceiveSize, config->tcp.maxReceiveSize,
config->localHostName, config->localHostName,
config->remoteHostName, config->remoteHostName,
config->port, config->port,
config->isMgmConnection,
localNodeId, localNodeId,
config->remoteNodeId, config->remoteNodeId,
config->serverNodeId,
config->checksum, config->checksum,
config->signalId); config->signalId);
if (t == NULL) if (t == NULL)
...@@ -297,7 +299,7 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) { ...@@ -297,7 +299,7 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
} }
bool bool
TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { TransporterRegistry::createOSETransporter(TransporterConfiguration *conf) {
#ifdef NDB_OSE_TRANSPORTER #ifdef NDB_OSE_TRANSPORTER
if(!nodeIdSpecified){ if(!nodeIdSpecified){
...@@ -316,11 +318,12 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { ...@@ -316,11 +318,12 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
localNodeId); localNodeId);
} }
OSE_Transporter * t = new OSE_Transporter(conf->prioASignalSize, OSE_Transporter * t = new OSE_Transporter(conf->ose.prioASignalSize,
conf->prioBSignalSize, conf->ose.prioBSignalSize,
localNodeId, localNodeId,
conf->localHostName, conf->localHostName,
conf->remoteNodeId, conf->remoteNodeId,
conf->serverNodeId,
conf->remoteHostName, conf->remoteHostName,
conf->checksum, conf->checksum,
conf->signalId); conf->signalId);
...@@ -346,7 +349,7 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) { ...@@ -346,7 +349,7 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
} }
bool bool
TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
#ifdef NDB_SCI_TRANSPORTER #ifdef NDB_SCI_TRANSPORTER
if(!SCI_Transporter::initSCI()) if(!SCI_Transporter::initSCI())
...@@ -366,13 +369,15 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { ...@@ -366,13 +369,15 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
config->localHostName, config->localHostName,
config->remoteHostName, config->remoteHostName,
config->port, config->port,
config->sendLimit, config->isMgmConnection,
config->bufferSize, config->sci.sendLimit,
config->nLocalAdapters, config->sci.bufferSize,
config->remoteSciNodeId0, config->sci.nLocalAdapters,
config->remoteSciNodeId1, config->sci.remoteSciNodeId0,
config->sci.remoteSciNodeId1,
localNodeId, localNodeId,
config->remoteNodeId, config->remoteNodeId,
config->serverNodeId,
config->checksum, config->checksum,
config->signalId); config->signalId);
...@@ -397,7 +402,7 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) { ...@@ -397,7 +402,7 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
} }
bool bool
TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM"); DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER #ifdef NDB_SHM_TRANSPORTER
if(!nodeIdSpecified){ if(!nodeIdSpecified){
...@@ -408,7 +413,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { ...@@ -408,7 +413,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
return false; return false;
if (!g_ndb_shm_signum) { if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->signum; g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
/** /**
* Make sure to block g_ndb_shm_signum * Make sure to block g_ndb_shm_signum
...@@ -420,7 +425,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { ...@@ -420,7 +425,7 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
pthread_sigmask(SIG_BLOCK, &mask, 0); pthread_sigmask(SIG_BLOCK, &mask, 0);
} }
if(config->signum != g_ndb_shm_signum) if(config->shm.signum != g_ndb_shm_signum)
return false; return false;
if(theTransporters[config->remoteNodeId] != NULL) if(theTransporters[config->remoteNodeId] != NULL)
...@@ -430,12 +435,14 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) { ...@@ -430,12 +435,14 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
config->localHostName, config->localHostName,
config->remoteHostName, config->remoteHostName,
config->port, config->port,
config->isMgmConnection,
localNodeId, localNodeId,
config->remoteNodeId, config->remoteNodeId,
config->serverNodeId,
config->checksum, config->checksum,
config->signalId, config->signalId,
config->shmKey, config->shm.shmKey,
config->shmSize config->shm.shmSize
); );
if (t == NULL) if (t == NULL)
return false; return false;
......
...@@ -60,6 +60,27 @@ SocketClient::init() ...@@ -60,6 +60,27 @@ SocketClient::init()
return true; return true;
} }
/**
* SocketClient::connect_without_auth()
*
* Temporarily disables authentication and connects.
* This is useful if you're trying to change what this
* SocketClient object is for (e.g. from mgm to ndb)
*/
NDB_SOCKET_TYPE
SocketClient::connect_without_auth()
{
SocketAuthenticator *tmp;
NDB_SOCKET_TYPE retval;
tmp= m_auth;
m_auth= NULL;
retval= connect();
m_auth= tmp;
return retval;
}
NDB_SOCKET_TYPE NDB_SOCKET_TYPE
SocketClient::connect() SocketClient::connect()
{ {
......
...@@ -333,12 +333,19 @@ sessionThread_C(void* _sc){ ...@@ -333,12 +333,19 @@ sessionThread_C(void* _sc){
return 0; return 0;
} }
/**
* may have m_stopped set if we're transforming a mgm
* connection into a transporter connection.
*/
if(!si->m_stopped)
{
if(!si->m_stop){ if(!si->m_stop){
si->m_stopped = false; si->m_stopped = false;
si->runSession(); si->runSession();
} else { } else {
NDB_CLOSE_SOCKET(si->m_socket); NDB_CLOSE_SOCKET(si->m_socket);
} }
}
si->m_stopped = true; si->m_stopped = true;
my_thread_end(); my_thread_end();
......
...@@ -3132,8 +3132,8 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -3132,8 +3132,8 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
const Properties * node; const Properties * node;
require(ctx.m_config->get("Node", id1, &node)); require(ctx.m_config->get("Node", id1, &node));
BaseString hostname(hostName1); BaseString hostname(hostName1);
// require(node->get("HostName", hostname));
if (hostname.c_str()[0] == 0) { if (hostname.c_str()[0] == 0) {
ctx.reportError("Hostname required on nodeid %d since it will " ctx.reportError("Hostname required on nodeid %d since it will "
...@@ -3142,6 +3142,19 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ ...@@ -3142,6 +3142,19 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
} }
Uint32 port= 0; Uint32 port= 0;
const char * type1;
const char * type2;
const Properties * node2;
node->get("Type", &type1);
ctx.m_config->get("Node", id2, &node2);
node2->get("Type", &type2);
if(strcmp(type1, MGM_TOKEN)==0)
node->get("PortNumber",&port);
else if(strcmp(type2, MGM_TOKEN)==0)
node2->get("PortNumber",&port);
if (!node->get("ServerPort", &port) && if (!node->get("ServerPort", &port) &&
!ctx.m_userProperties.get("ServerPort_", id1, &port)) { !ctx.m_userProperties.get("ServerPort_", id1, &port)) {
ctx.m_currentSection->put("PortNumber", port); ctx.m_currentSection->put("PortNumber", port);
......
...@@ -2883,6 +2883,11 @@ MgmtSrvr::getConnectionDbParameter(int node1, ...@@ -2883,6 +2883,11 @@ MgmtSrvr::getConnectionDbParameter(int node1,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
{
theFacade->get_registry()->connect_server(sockfd);
}
int MgmtSrvr::set_connect_string(const char *str) int MgmtSrvr::set_connect_string(const char *str)
{ {
return ndb_mgm_set_connectstring(m_config_retriever->get_mgmHandle(),str); return ndb_mgm_set_connectstring(m_config_retriever->get_mgmHandle(),str);
......
...@@ -515,6 +515,8 @@ public: ...@@ -515,6 +515,8 @@ public:
int set_connect_string(const char *str); int set_connect_string(const char *str);
void transporter_connect(NDB_SOCKET_TYPE sockfd);
ConfigRetriever *get_config_retriever() { return m_config_retriever; }; ConfigRetriever *get_config_retriever() { return m_config_retriever; };
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
......
...@@ -264,6 +264,8 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -264,6 +264,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_CMD("check connection", &MgmApiSession::check_connection, ""), MGM_CMD("check connection", &MgmApiSession::check_connection, ""),
MGM_CMD("transporter connect", &MgmApiSession::transporter_connect, ""),
MGM_END() MGM_END()
}; };
...@@ -1538,5 +1540,17 @@ MgmApiSession::check_connection(Parser_t::Context &ctx, ...@@ -1538,5 +1540,17 @@ MgmApiSession::check_connection(Parser_t::Context &ctx,
m_output->println(""); m_output->println("");
} }
void
MgmApiSession::transporter_connect(Parser_t::Context &ctx,
Properties const &args) {
NDB_SOCKET_TYPE s= m_socket;
m_stop= true;
m_stopped= true; // force a stop (no closing socket)
m_socket= -1; // so nobody closes it
m_mgmsrv.transporter_connect(s);
}
template class MutexVector<int>; template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>; template class Vector<ParserRow<MgmApiSession> const*>;
...@@ -98,6 +98,8 @@ public: ...@@ -98,6 +98,8 @@ public:
void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args); void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args);
void check_connection(Parser_t::Context &ctx, const class Properties &args); void check_connection(Parser_t::Context &ctx, const class Properties &args);
void transporter_connect(Parser_t::Context &ctx, Properties const &args);
void repCommand(Parser_t::Context &ctx, const class Properties &args); void repCommand(Parser_t::Context &ctx, const class Properties &args);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment