Commit 9bc6ed86 authored by unknown's avatar unknown

Fixes for: use initial mgm connection as transporter connection


ndb/include/mgmapi/mgmapi.h:
  ndb_mgm_convert_to_transporter may destroy the handle, now takes a pointer.
ndb/include/mgmcommon/ConfigRetriever.hpp:
  If outside code is going to manipulate the NdbMgmHandle, allow it to do it
  via a get_mgmHandlePtr() call
ndb/include/transporter/TransporterRegistry.hpp:
  connect_client and connect_ndb_mgmd may destroy the handle, now they take a pointer.
ndb/src/common/transporter/TransporterRegistry.cpp:
  When start_service is binding to ports, report back the port numbers.
  
  We need this here now, as we re-use the initial mgm connection as a transporter, which
  is connected *before* start_service has allocated the dynamic port numbers. So the creation
  of this early transporter cannot be used to send the dynamic ports to the mgmd.
  
  We connect to the mgm server (using the handle that will be used in the client_Connect thread)
  if needed. This is thread safe as start_service is only ever called once, before
  the client connect thread starts.
  
  connect_client,connect_ndb_mgmd may destroy the NdbMgmHandle. It now accepts a pointer to it.
ndb/src/kernel/vm/Configuration.cpp:
  Copy the m_mgmd_host string from the config_retreiver as the NdbMgmHandle in the
  ConfigRetreiver will be destroyed later (along with the host string).
  
  globalTransporterRegistry.connect_client will destroy the mgm handle, use a pointer to the handle.
ndb/src/kernel/vm/Configuration.hpp:
  allow the dynamic allocation of m_mgmd_host.
ndb/src/mgmapi/mgmapi.cpp:
  accept a pointer for ndb_mgm_convert_to_transporter as we destroy the handle.
parent bb5a2f28
...@@ -994,7 +994,7 @@ extern "C" { ...@@ -994,7 +994,7 @@ extern "C" {
* *
* @note the socket is now able to be used as a transporter connection * @note the socket is now able to be used as a transporter connection
*/ */
NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle handle); NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle *handle);
/** /**
* Get the node id of the mgm server we're connected to * Get the node id of the mgm server we're connected to
......
...@@ -76,6 +76,7 @@ public: ...@@ -76,6 +76,7 @@ public:
const char *get_mgmd_host() const; const char *get_mgmd_host() const;
const char *get_connectstring(char *buf, int buf_sz) const; const char *get_connectstring(char *buf, int buf_sz) const;
NdbMgmHandle get_mgmHandle() { return m_handle; }; NdbMgmHandle get_mgmHandle() { return m_handle; };
NdbMgmHandle* get_mgmHandlePtr() { return &m_handle; };
Uint32 get_configuration_nodeid() const; Uint32 get_configuration_nodeid() const;
private: private:
......
...@@ -116,7 +116,7 @@ public: ...@@ -116,7 +116,7 @@ public:
*/ */
bool connect_server(NDB_SOCKET_TYPE sockfd); bool connect_server(NDB_SOCKET_TYPE sockfd);
int TransporterRegistry::connect_client(NdbMgmHandle h); bool connect_client(NdbMgmHandle *h);
/** /**
* Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
...@@ -128,7 +128,7 @@ public: ...@@ -128,7 +128,7 @@ public:
* Given a connected NdbMgmHandle, turns it into a transporter * Given a connected NdbMgmHandle, turns it into a transporter
* and returns the socket. * and returns the socket.
*/ */
NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle h); NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
/** /**
* Remove all transporters * Remove all transporters
......
...@@ -1365,6 +1365,8 @@ TransporterRegistry::add_transporter_interface(NodeId remoteNodeId, ...@@ -1365,6 +1365,8 @@ TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
bool bool
TransporterRegistry::start_service(SocketServer& socket_server) TransporterRegistry::start_service(SocketServer& socket_server)
{ {
struct ndb_mgm_reply mgm_reply;
DBUG_ENTER("TransporterRegistry::start_service"); DBUG_ENTER("TransporterRegistry::start_service");
if (m_transporter_interface.size() > 0 && !nodeIdSpecified) if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
{ {
...@@ -1372,6 +1374,11 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1372,6 +1374,11 @@ TransporterRegistry::start_service(SocketServer& socket_server)
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(!ndb_mgm_is_connected(m_mgm_handle))
ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
if(!ndb_mgm_is_connected(m_mgm_handle))
DBUG_RETURN(false);
for (unsigned i= 0; i < m_transporter_interface.size(); i++) for (unsigned i= 0; i < m_transporter_interface.size(); i++)
{ {
Transporter_interface &t= m_transporter_interface[i]; Transporter_interface &t= m_transporter_interface[i];
...@@ -1404,6 +1411,18 @@ TransporterRegistry::start_service(SocketServer& socket_server) ...@@ -1404,6 +1411,18 @@ TransporterRegistry::start_service(SocketServer& socket_server)
} }
t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port)); DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
if(t.m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(m_mgm_handle,
get_localNodeId(),
t.m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
t.m_s_service_port,
&mgm_reply) < 0)
{
delete transporter_service;
DBUG_RETURN(false);
}
transporter_service->setTransporterRegistry(this); transporter_service->setTransporterRegistry(this);
} }
DBUG_RETURN(true); DBUG_RETURN(true);
...@@ -1521,15 +1540,18 @@ TransporterRegistry::get_transporter(NodeId nodeId) { ...@@ -1521,15 +1540,18 @@ TransporterRegistry::get_transporter(NodeId nodeId) {
return theTransporters[nodeId]; return theTransporters[nodeId];
} }
int TransporterRegistry::connect_client(NdbMgmHandle h) bool TransporterRegistry::connect_client(NdbMgmHandle *h)
{ {
DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)"); DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(h); Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
if(!mgm_nodeid)
return false;
Transporter * t = theTransporters[mgm_nodeid]; Transporter * t = theTransporters[mgm_nodeid];
if (!t) if (!t)
return -1; return false;
DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h))); DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h)));
} }
...@@ -1538,24 +1560,25 @@ int TransporterRegistry::connect_client(NdbMgmHandle h) ...@@ -1538,24 +1560,25 @@ int TransporterRegistry::connect_client(NdbMgmHandle h)
* Given a connected NdbMgmHandle, turns it into a transporter * Given a connected NdbMgmHandle, turns it into a transporter
* and returns the socket. * and returns the socket.
*/ */
NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h) NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
{ {
struct ndb_mgm_reply mgm_reply; struct ndb_mgm_reply mgm_reply;
if ( h == NULL ) if ( h==NULL || *h == NULL )
{ {
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
for(unsigned int i=0;i < m_transporter_interface.size();i++) for(unsigned int i=0;i < m_transporter_interface.size();i++)
if (ndb_mgm_set_connection_int_parameter(h, if (m_transporter_interface[i].m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(*h,
get_localNodeId(), get_localNodeId(),
m_transporter_interface[i].m_remote_nodeId, m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT, CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port, m_transporter_interface[i].m_s_service_port,
&mgm_reply) < 0) &mgm_reply) < 0)
{ {
ndb_mgm_destroy_handle(&h); ndb_mgm_destroy_handle(h);
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
...@@ -1565,7 +1588,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h) ...@@ -1565,7 +1588,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle h)
*/ */
NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h); NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h);
if ( sockfd == NDB_INVALID_SOCKET) if ( sockfd == NDB_INVALID_SOCKET)
ndb_mgm_destroy_handle(&h); ndb_mgm_destroy_handle(h);
return sockfd; return sockfd;
} }
...@@ -1613,7 +1636,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc) ...@@ -1613,7 +1636,7 @@ NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
return NDB_INVALID_SOCKET; return NDB_INVALID_SOCKET;
} }
return connect_ndb_mgmd(h); return connect_ndb_mgmd(&h);
} }
template class Vector<TransporterRegistry::Transporter_interface>; template class Vector<TransporterRegistry::Transporter_interface>;
...@@ -154,6 +154,7 @@ Configuration::Configuration() ...@@ -154,6 +154,7 @@ Configuration::Configuration()
m_config_retriever= 0; m_config_retriever= 0;
m_clusterConfig= 0; m_clusterConfig= 0;
m_clusterConfigIter= 0; m_clusterConfigIter= 0;
m_mgmd_host= NULL;
} }
Configuration::~Configuration(){ Configuration::~Configuration(){
...@@ -166,6 +167,9 @@ Configuration::~Configuration(){ ...@@ -166,6 +167,9 @@ Configuration::~Configuration(){
if(_backupPath != NULL) if(_backupPath != NULL)
free(_backupPath); free(_backupPath);
if(m_mgmd_host)
free(m_mgmd_host);
if (m_config_retriever) { if (m_config_retriever) {
delete m_config_retriever; delete m_config_retriever;
} }
...@@ -210,8 +214,16 @@ Configuration::fetch_configuration(){ ...@@ -210,8 +214,16 @@ Configuration::fetch_configuration(){
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could not connect to ndb_mgmd", s); ERROR_SET(fatal, ERR_INVALID_CONFIG, "Could not connect to ndb_mgmd", s);
} }
{
m_mgmd_port= m_config_retriever->get_mgmd_port(); m_mgmd_port= m_config_retriever->get_mgmd_port();
m_mgmd_host= m_config_retriever->get_mgmd_host(); /**
* We copy the mgmd host as the handle is later
* destroyed, so a pointer won't work
*/
int len= strlen(m_config_retriever->get_mgmd_host());
m_mgmd_host= (char*)malloc(sizeof(char)*len);
strcpy(m_mgmd_host,m_config_retriever->get_mgmd_host());
}
ConfigRetriever &cr= *m_config_retriever; ConfigRetriever &cr= *m_config_retriever;
...@@ -313,7 +325,8 @@ Configuration::setupConfiguration(){ ...@@ -313,7 +325,8 @@ Configuration::setupConfiguration(){
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched", ERROR_SET(fatal, ERR_INVALID_CONFIG, "Invalid configuration fetched",
"No transporters configured"); "No transporters configured");
} }
if(!globalTransporterRegistry.connect_client(m_config_retriever->get_mgmHandle())) if(!globalTransporterRegistry.connect_client(
m_config_retriever->get_mgmHandlePtr()))
ERROR_SET(fatal, ERR_INVALID_CONFIG, "Connection to mgmd terminated before setup was complete", ERROR_SET(fatal, ERR_INVALID_CONFIG, "Connection to mgmd terminated before setup was complete",
"StopOnError missing"); "StopOnError missing");
} }
......
...@@ -67,7 +67,7 @@ public: ...@@ -67,7 +67,7 @@ public:
const ndb_mgm_configuration_iterator * getOwnConfigIterator() const; const ndb_mgm_configuration_iterator * getOwnConfigIterator() const;
Uint32 get_mgmd_port() const {return m_mgmd_port;}; Uint32 get_mgmd_port() const {return m_mgmd_port;};
const char *get_mgmd_host() const {return m_mgmd_host;}; char *get_mgmd_host() const {return m_mgmd_host;};
ConfigRetriever* get_config_retriever() { return m_config_retriever; }; ConfigRetriever* get_config_retriever() { return m_config_retriever; };
class LogLevel * m_logLevel; class LogLevel * m_logLevel;
...@@ -99,7 +99,7 @@ private: ...@@ -99,7 +99,7 @@ private:
bool _initialStart; bool _initialStart;
char * _connectString; char * _connectString;
Uint32 m_mgmd_port; Uint32 m_mgmd_port;
const char *m_mgmd_host; char *m_mgmd_host;
bool _daemonMode; bool _daemonMode;
void calcSizeAlt(class ConfigValues * ); void calcSizeAlt(class ConfigValues * );
......
...@@ -2189,21 +2189,21 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, ...@@ -2189,21 +2189,21 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle,
extern "C" extern "C"
NDB_SOCKET_TYPE NDB_SOCKET_TYPE
ndb_mgm_convert_to_transporter(NdbMgmHandle handle) ndb_mgm_convert_to_transporter(NdbMgmHandle *handle)
{ {
NDB_SOCKET_TYPE s; NDB_SOCKET_TYPE s;
CHECK_HANDLE(handle, NDB_INVALID_SOCKET); CHECK_HANDLE((*handle), NDB_INVALID_SOCKET);
CHECK_CONNECTED(handle, NDB_INVALID_SOCKET); CHECK_CONNECTED((*handle), NDB_INVALID_SOCKET);
handle->connected= 0; // we pretend we're disconnected (*handle)->connected= 0; // we pretend we're disconnected
s= handle->socket; s= (*handle)->socket;
SocketOutputStream s_output(s); SocketOutputStream s_output(s);
s_output.println("transporter connect"); s_output.println("transporter connect");
s_output.println(""); s_output.println("");
ndb_mgm_destroy_handle(&handle); // set connected=0, so won't disconnect ndb_mgm_destroy_handle(handle); // set connected=0, so won't disconnect
return s; return s;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment