Commit ac28b470 authored by stewart@mysql.com's avatar stewart@mysql.com

Impl4 of WL2278 - Dynamic port allocation of cluster nodes

When a node restarts it starts over again with fetching the configuration
-  It is not sure that it can use the "old dynamically allocated port number" again.
-  It should however try to reuse the old one, if not possible it should
allocate a new one. One has to be able to distinguish between portnumbers
specified originally in the config, and ones that has been dynamically added
(the latter may be changed if "busy", but the first cannot be changed).

We use negative portnumbers for ports that are ok to change.
parent f3ef9071
...@@ -146,7 +146,7 @@ extern "C" { ...@@ -146,7 +146,7 @@ extern "C" {
int node1, int node1,
int node2, int node2,
int param, int param,
unsigned value, int value,
struct ndb_mgm_reply* reply); struct ndb_mgm_reply* reply);
/** /**
...@@ -165,7 +165,7 @@ extern "C" { ...@@ -165,7 +165,7 @@ extern "C" {
int node1, int node1,
int node2, int node2,
int param, int param,
Uint32 *value, int *value,
struct ndb_mgm_reply* reply); struct ndb_mgm_reply* reply);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -232,12 +232,12 @@ public: ...@@ -232,12 +232,12 @@ public:
class Transporter_interface { class Transporter_interface {
public: public:
NodeId m_remote_nodeId; NodeId m_remote_nodeId;
unsigned short m_service_port; int m_service_port;
const char *m_interface; const char *m_interface;
}; };
Vector<Transporter_interface> m_transporter_interface; Vector<Transporter_interface> m_transporter_interface;
void add_transporter_interface(NodeId remoteNodeId, const char *interf, void add_transporter_interface(NodeId remoteNodeId, const char *interf,
unsigned short port); int port);
Transporter* get_transporter(NodeId nodeId); Transporter* get_transporter(NodeId nodeId);
NodeId get_localNodeId() { return localNodeId; }; NodeId get_localNodeId() { return localNodeId; };
......
...@@ -371,6 +371,9 @@ IPCConfig::configureTransporters(Uint32 nodeId, ...@@ -371,6 +371,9 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} }
DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d", DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
nodeId, remoteNodeId, server_port, sendSignalId, checksum)); nodeId, remoteNodeId, server_port, sendSignalId, checksum));
if((int)server_port<0)
server_port= -server_port; // A dynamic port
switch(type){ switch(type){
case CONNECTION_TYPE_SHM:{ case CONNECTION_TYPE_SHM:{
SHM_TransporterConfiguration conf; SHM_TransporterConfiguration conf;
......
...@@ -1191,7 +1191,7 @@ TransporterRegistry::start_clients_thread() ...@@ -1191,7 +1191,7 @@ TransporterRegistry::start_clients_thread()
case CONNECTING: case CONNECTING:
if(!t->isConnected() && !t->isServer) { if(!t->isConnected() && !t->isServer) {
if(t->get_r_port() <= 0) { // Port is dynamic if(t->get_r_port() <= 0) { // Port is dynamic
Uint32 server_port= 0; int server_port= 0;
struct ndb_mgm_reply mgm_reply; struct ndb_mgm_reply mgm_reply;
int res; int res;
...@@ -1201,9 +1201,13 @@ TransporterRegistry::start_clients_thread() ...@@ -1201,9 +1201,13 @@ TransporterRegistry::start_clients_thread()
CFG_CONNECTION_SERVER_PORT, CFG_CONNECTION_SERVER_PORT,
&server_port, &server_port,
&mgm_reply); &mgm_reply);
DBUG_PRINT("info",("Got dynamic port %u for %d -> %d (ret: %d)", DBUG_PRINT("info",("Got %s port %u for %d -> %d (ret: %d)",
(server_port<=0)?"dynamic":"static",
server_port,t->getRemoteNodeId(), server_port,t->getRemoteNodeId(),
t->getLocalNodeId(),res)); t->getLocalNodeId(),res));
if(server_port<0)
server_port = -server_port; // was a dynamic port
if(res>=0) if(res>=0)
t->set_r_port(server_port); t->set_r_port(server_port);
else else
...@@ -1265,7 +1269,7 @@ TransporterRegistry::stop_clients() ...@@ -1265,7 +1269,7 @@ TransporterRegistry::stop_clients()
void void
TransporterRegistry::add_transporter_interface(NodeId remoteNodeId, TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
const char *interf, const char *interf,
unsigned short port) int port)
{ {
DBUG_ENTER("TransporterRegistry::add_transporter_interface"); DBUG_ENTER("TransporterRegistry::add_transporter_interface");
DBUG_PRINT("enter",("interface=%s, port= %d", interf, port)); DBUG_PRINT("enter",("interface=%s, port= %d", interf, port));
...@@ -1309,18 +1313,34 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1309,18 +1313,34 @@ TransporterRegistry::start_service(SocketServer& socket_server)
{ {
Transporter_interface &t= m_transporter_interface[i]; Transporter_interface &t= m_transporter_interface[i];
unsigned short port= t.m_service_port;
if(t.m_service_port<0)
port= -t.m_service_port; // is a dynamic port
TransporterService *transporter_service = TransporterService *transporter_service =
new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd")); new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
if(!socket_server.setup(transporter_service, if(!socket_server.setup(transporter_service,
&t.m_service_port, t.m_interface)) &port, t.m_interface))
{ {
ndbout_c("Unable to setup transporter service port: %s:%d!\n" DBUG_PRINT("info", ("Trying new port"));
"Please check if the port is already used,\n" port= 0;
"(perhaps the node is already running)", if(t.m_service_port>0
t.m_interface ? t.m_interface : "*", t.m_service_port); || !socket_server.setup(transporter_service,
delete transporter_service; &port, t.m_interface))
return false; {
/*
* If it wasn't a dynamically allocated port, or
* our attempts at getting a new dynamic port failed
*/
ndbout_c("Unable to setup transporter service port: %s:%d!\n"
"Please check if the port is already used,\n"
"(perhaps the node is already running)",
t.m_interface ? t.m_interface : "*", t.m_service_port);
delete transporter_service;
return false;
}
} }
t.m_service_port= (t.m_service_port<=0)?-port:port; // -`ve if dynamic
DBUG_PRINT("info", ("t.m_service_port = %d",t.m_service_port));
transporter_service->setTransporterRegistry(this); transporter_service->setTransporterRegistry(this);
} }
return true; return true;
......
...@@ -2050,7 +2050,7 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, ...@@ -2050,7 +2050,7 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle,
int node1, int node1,
int node2, int node2,
int param, int param,
unsigned value, int value,
struct ndb_mgm_reply* mgmreply){ struct ndb_mgm_reply* mgmreply){
DBUG_ENTER("ndb_mgm_set_connection_int_parameter"); DBUG_ENTER("ndb_mgm_set_connection_int_parameter");
CHECK_HANDLE(handle, 0); CHECK_HANDLE(handle, 0);
...@@ -2060,7 +2060,7 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, ...@@ -2060,7 +2060,7 @@ ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle,
args.put("node1", node1); args.put("node1", node1);
args.put("node2", node2); args.put("node2", node2);
args.put("param", param); args.put("param", param);
args.put("value", value); args.put("value", (Uint32)value);
const ParserRow<ParserDummy> reply[]= { const ParserRow<ParserDummy> reply[]= {
MGM_CMD("set connection parameter reply", NULL, ""), MGM_CMD("set connection parameter reply", NULL, ""),
...@@ -2093,7 +2093,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2093,7 +2093,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
int node1, int node1,
int node2, int node2,
int param, int param,
Uint32 *value, int *value,
struct ndb_mgm_reply* mgmreply){ struct ndb_mgm_reply* mgmreply){
DBUG_ENTER("ndb_mgm_get_connection_int_parameter"); DBUG_ENTER("ndb_mgm_get_connection_int_parameter");
CHECK_HANDLE(handle, -1); CHECK_HANDLE(handle, -1);
...@@ -2125,7 +2125,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2125,7 +2125,7 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
res= 0; res= 0;
} while(0); } while(0);
if(!prop->get("value",value)){ if(!prop->get("value",(Uint32*)value)){
ndbout_c("Unable to get value"); ndbout_c("Unable to get value");
res = -4; res = -4;
} }
......
...@@ -2849,7 +2849,7 @@ int ...@@ -2849,7 +2849,7 @@ int
MgmtSrvr::getConnectionDbParameter(int node1, MgmtSrvr::getConnectionDbParameter(int node1,
int node2, int node2,
int param, int param,
unsigned *value, int *value,
BaseString& msg){ BaseString& msg){
DBUG_ENTER("MgmtSrvr::getConnectionDbParameter"); DBUG_ENTER("MgmtSrvr::getConnectionDbParameter");
...@@ -2874,12 +2874,12 @@ MgmtSrvr::getConnectionDbParameter(int node1, ...@@ -2874,12 +2874,12 @@ MgmtSrvr::getConnectionDbParameter(int node1,
return -1; return -1;
} }
if(iter.get(param, value) < 0) { if(iter.get(param, (Uint32*)value) < 0) {
msg.assign("Unable to get current value of parameter"); msg.assign("Unable to get current value of parameter");
return -1; return -1;
} }
msg.assfmt("%u",*value); msg.assfmt("%d",*value);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
......
...@@ -510,7 +510,7 @@ public: ...@@ -510,7 +510,7 @@ public:
int setConnectionDbParameter(int node1, int node2, int param, int value, int setConnectionDbParameter(int node1, int node2, int param, int value,
BaseString& msg); BaseString& msg);
int getConnectionDbParameter(int node1, int node2, int param, int getConnectionDbParameter(int node1, int node2, int param,
unsigned *value, BaseString& msg); int *value, BaseString& msg);
int set_connect_string(const char *str); int set_connect_string(const char *str);
......
...@@ -1373,7 +1373,7 @@ void ...@@ -1373,7 +1373,7 @@ void
MgmApiSession::getConnectionParameter(Parser_t::Context &ctx, MgmApiSession::getConnectionParameter(Parser_t::Context &ctx,
Properties const &args) { Properties const &args) {
BaseString node1, node2, param; BaseString node1, node2, param;
unsigned value = 0; int value = 0;
args.get("node1", node1); args.get("node1", node1);
args.get("node2", node2); args.get("node2", node2);
...@@ -1387,7 +1387,7 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx, ...@@ -1387,7 +1387,7 @@ MgmApiSession::getConnectionParameter(Parser_t::Context &ctx,
result); result);
m_output->println("get connection parameter reply"); m_output->println("get connection parameter reply");
m_output->println("value: %u", value); m_output->println("value: %d", value);
m_output->println("result: %s", (ret>0)?"Ok":result.c_str()); m_output->println("result: %s", (ret>0)?"Ok":result.c_str());
m_output->println(""); m_output->println("");
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment