Commit 336f4cc8 authored by stewart@mysql.com's avatar stewart@mysql.com

Fixes for: use initial mgm connection as transporter connection

parent e88eba65
...@@ -994,7 +994,7 @@ extern "C" { ...@@ -994,7 +994,7 @@ extern "C" {
* *
* @note the socket is now able to be used as a transporter connection * @note the socket is now able to be used as a transporter connection
*/ */
NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle handle); NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle *handle);
/** /**
* Get the node id of the mgm server we're connected to * Get the node id of the mgm server we're connected to
......
...@@ -76,6 +76,7 @@ public: ...@@ -76,6 +76,7 @@ public:
const char *get_mgmd_host() const; const char *get_mgmd_host() const;
const char *get_connectstring(char *buf, int buf_sz) const; const char *get_connectstring(char *buf, int buf_sz) const;
NdbMgmHandle get_mgmHandle() { return m_handle; }; NdbMgmHandle get_mgmHandle() { return m_handle; };
NdbMgmHandle* get_mgmHandlePtr() { return &m_handle; };
Uint32 get_configuration_nodeid() const; Uint32 get_configuration_nodeid() const;
private: private:
......
...@@ -116,7 +116,7 @@ public: ...@@ -116,7 +116,7 @@ public:
*/ */
bool connect_server(NDB_SOCKET_TYPE sockfd); bool connect_server(NDB_SOCKET_TYPE sockfd);
int TransporterRegistry::connect_client(NdbMgmHandle h); bool connect_client(NdbMgmHandle *h);
/** /**
* Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
...@@ -128,7 +128,7 @@ public: ...@@ -128,7 +128,7 @@ public:
* Given a connected NdbMgmHandle, turns it into a transporter * Given a connected NdbMgmHandle, turns it into a transporter
* and returns the socket. * and returns the socket.
*/ */
NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle h); NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
/** /**
* Remove all transporters * Remove all transporters
......
...@@ -1365,6 +1365,8 @@ TransporterRegistry::add_transporter_interface(NodeId remoteNodeId, ...@@ -1365,6 +1365,8 @@ TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
bool bool
TransporterRegistry::start_service(SocketServer& socket_server) TransporterRegistry::start_service(SocketServer& socket_server)
{ {
struct ndb_mgm_reply mgm_reply;
DBUG_ENTER("TransporterRegistry::start_service"); DBUG_ENTER("TransporterRegistry::start_service");
if (m_transporter_interface.size() > 0 && !nodeIdSpecified) if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
{ {
...@@ -1372,6 +1374,11 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1372,6 +1374,11 @@ TransporterRegistry::start_service(SocketServer& socket_server)
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(!ndb_mgm_is_connected(m_mgm_handle))
ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
if(!ndb_mgm_is_connected(m_mgm_handle))
DBUG_RETURN(false);
for (unsigned i= 0; i < m_transporter_interface.size(); i++) for (unsigned i= 0; i < m_transporter_interface.size(); i++)
{ {
Transporter_interface &t= m_transporter_interface[i]; Transporter_interface &t= m_transporter_interface[i];
...@@ -1404,6 +1411,18 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1404,6 +1411,18 @@ TransporterRegistry::start_service(SocketServer& socket_server)
} }
t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port)); DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
if(t.m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(m_mgm_handle,
get_localNodeId(),
t.m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
t.m_s_service_port,
&mgm_reply) < 0)
{
delete transporter_service;
DBUG_RETURN(false);
}
transporter_service->setTransporterRegistry(this); transporter_service->setTransporterRegistry(this);
} }
DBUG_RETURN(true); DBUG_RETURN(true);
...@@ -1521,15 +1540,18 @@ TransporterRegistry::get_transporter(NodeId nodeId) { ...@@ -1521,15 +1540,18 @@ TransporterRegistry::get_transporter(NodeId nodeId) {
return theTransporters[nodeId]; return theTransporters[nodeId];
} }
int TransporterRegistry::connect_client(NdbMgmHandle h) bool TransporterRegistry::connect_client(NdbMgmHandle *h)
{ {
DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)"); DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(h); Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
if(!mgm_nodeid)
return false;
Transporter * t = theTransporters[mgm_nodeid]; Transporter * t = theTransporters[mgm_nodeid];
if (!t) if (!t)
return -1; return false;
DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h))); DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
} }
...@@ -1538,24 +1560,25 @@ int TransporterRegistry::connect_client(NdbMgmHandle h) ...@@ -1538,24 +1560,25 @@ int TransporterRegistry::connect_client(NdbMgmHandle h)
* Given a connected NdbMgmHandle, turns it into a transporter * Given a connected NdbMgmHandle, turns it into a transporter
* and returns the socket. * and returns the socket.
*/ */
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h) NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
{ {
struct ndb_mgm_reply mgm_reply; struct ndb_mgm_reply mgm_reply;
if ( h == NULL ) if ( h==NULL || *h == NULL )
{ {
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
for(unsigned int i=0;i < m_transporter_interface.size();i++) for(unsigned int i=0;i < m_transporter_interface.size();i++)
if (ndb_mgm_set_connection_int_parameter(h, if (m_transporter_interface[i].m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(*h,
get_localNodeId(), get_localNodeId(),
m_transporter_interface[i].m_remote_nodeId, m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT, CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port, m_transporter_interface[i].m_s_service_port,
&mgm_reply) < 0) &mgm_reply) < 0)
{ {
ndb_mgm_destroy_handle(&h); ndb_mgm_destroy_handle(h);
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
...@@ -1565,7 +1588,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h) ...@@ -1565,7 +1588,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h)
*/ */
NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h); NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
if ( sockfd == NDB_INVALID_SOCKET) if ( sockfd == NDB_INVALID_SOCKET)
ndb_mgm_destroy_handle(&h); ndb_mgm_destroy_handle(h);
return sockfd; return sockfd;
} }
...@@ -1613,7 +1636,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc) ...@@ -1613,7 +1636,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
return connect_ndb_mgmd(h); return connect_ndb_mgmd(&h);
} }
template class Vector<TransporterRegistry::Transporter_interface>; template class Vector<TransporterRegistry::Transporter_interface>;
...@@ -154,6 +154,7 @@ Configuration::Configuration() ...@@ -154,6 +154,7 @@ Configuration::Configuration()
m_config_retriever= 0; m_config_retriever= 0;
m_clusterConfig= 0; m_clusterConfig= 0;
m_clusterConfigIter= 0; m_clusterConfigIter= 0;
m_mgmd_host= NULL;
} }
Configuration::~Configuration(){ Configuration::~Configuration(){
...@@ -166,6 +167,9 @@ Configuration::~Configuration(){ ...@@ -166,6 +167,9 @@ Configuration::~Configuration(){
if(_backupPath != NULL) if(_backupPath != NULL)
free(_backupPath); free(_backupPath);
if(m_mgmd_host)
free(m_mgmd_host);
if (m_config_retriever) { if (m_config_retriever) {
delete m_config_retriever; delete m_config_retriever;
} }
...@@ -210,8 +214,16 @@ Configuration::fetch_configuration(){ ...@@ -210,8 +214,16 @@ Configuration::fetch_configuration(){
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could not connect to ndb_mgmd", s); ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could not connect to ndb_mgmd", s);
} }
{
m_mgmd_port= m_config_retriever->get_mgmd_port(); m_mgmd_port= m_config_retriever->get_mgmd_port();
m_mgmd_host= m_config_retriever->get_mgmd_host(); /**
* We copy the mgmd host as the handle is later
* destroyed, so a pointer won't work
*/
int len= strlen(m_config_retriever->get_mgmd_host());
m_mgmd_host= (char*)malloc(sizeof(char)*len);
strcpy(m_mgmd_host,m_config_retriever->get_mgmd_host());
}
ConfigRetriever &cr= *m_config_retriever; ConfigRetriever &cr= *m_config_retriever;
...@@ -313,7 +325,8 @@ Configuration::setupConfiguration(){ ...@@ -313,7 +325,8 @@ Configuration::setupConfiguration(){
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched",
"No transporters configured"); "No transporters configured");
} }
if(!globalTransporterRegistry.connect_client(m_config_retriever->get_mgmHandle())) if(!globalTransporterRegistry.connect_client(
m_config_retriever->get_mgmHandlePtr()))
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Connection to mgmd terminated before setup was complete", ERROR_SET(fatal, ERR_INVALID_CONFIG, "Connection to mgmd terminated before setup was complete",
"StopOnError missing"); "StopOnError missing");
} }
......
...@@ -67,7 +67,7 @@ public: ...@@ -67,7 +67,7 @@ public:
const ndb_mgm_configuration_iterator * getOwnConfigIterator() const; const ndb_mgm_configuration_iterator * getOwnConfigIterator() const;
Uint32 get_mgmd_port() const {return m_mgmd_port;}; Uint32 get_mgmd_port() const {return m_mgmd_port;};
const char *get_mgmd_host() const {return m_mgmd_host;}; char *get_mgmd_host() const {return m_mgmd_host;};
ConfigRetriever* get_config_retriever() { return m_config_retriever; }; ConfigRetriever* get_config_retriever() { return m_config_retriever; };
class LogLevel * m_logLevel; class LogLevel * m_logLevel;
...@@ -99,7 +99,7 @@ private: ...@@ -99,7 +99,7 @@ private:
bool _initialStart; bool _initialStart;
char * _connectString; char * _connectString;
Uint32 m_mgmd_port; Uint32 m_mgmd_port;
const char *m_mgmd_host; char *m_mgmd_host;
bool _daemonMode; bool _daemonMode;
void calcSizeAlt(class ConfigValues * ); void calcSizeAlt(class ConfigValues * );
......
...@@ -2189,21 +2189,21 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2189,21 +2189,21 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
extern "C" extern "C"
NDB_SOCKET_TYPE NDB_SOCKET_TYPE
ndb_mgm_convert_to_transporter(NdbMgmHandle handle) ndb_mgm_convert_to_transporter(NdbMgmHandle *handle)
{ {
NDB_SOCKET_TYPE s; NDB_SOCKET_TYPE s;
CHECK_HANDLE(handle, NDB_INVALID_SOCKET); CHECK_HANDLE((*handle), NDB_INVALID_SOCKET);
CHECK_CONNECTED(handle, NDB_INVALID_SOCKET); CHECK_CONNECTED((*handle), NDB_INVALID_SOCKET);
handle->connected= 0; // we pretend we're disconnected (*handle)->connected= 0; // we pretend we're disconnected
s= handle->socket; s= (*handle)->socket;
SocketOutputStream s_output(s); SocketOutputStream s_output(s);
s_output.println("transporter connect"); s_output.println("transporter connect");
s_output.println(""); s_output.println("");
ndb_mgm_destroy_handle(&handle); // set connected=0, so won't disconnect ndb_mgm_destroy_handle(handle); // set connected=0, so won't disconnect
return s; return s;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment