Commit 4e56931a authored by unknown's avatar unknown

removed NdbMgmHandle from TransporterRegistry constructor

changed to set NdbMgmHandle from IPPConfig, and use info from configuration, instead of mgm handle from config retrieval


ndb/src/ndbapi/Ndb.cpp:
  added som debug printout
ndb/src/ndbapi/NdbTransaction.cpp:
  added some debug printout
parent e26a01e6
......@@ -97,12 +97,11 @@ public:
/**
* Constructor
*/
TransporterRegistry(NdbMgmHandle mgm_handle=NULL,
void * callback = 0 ,
TransporterRegistry(void * callback = 0 ,
unsigned maxTransporters = MAX_NTRANSPORTERS,
unsigned sizeOfLongSignalMemory = 100);
void set_mgm_handle(NdbMgmHandle h) { m_mgm_handle = h; };
void set_mgm_handle(NdbMgmHandle h);
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; };
bool init(NodeId localNodeId);
......
......@@ -170,6 +170,35 @@ IPCConfig::configureTransporters(Uint32 nodeId,
DBUG_ENTER("IPCConfig::configureTransporters");
/**
* Iterate over all MGM's an construct a connectstring
* create mgm_handle and give it to the Transporter Registry
*/
{
const char *separator= "";
BaseString connect_string;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next())
{
Uint32 type;
if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
if(type != NODE_TYPE_MGM) continue;
const char* hostname;
Uint32 port;
if(iter.get(CFG_NODE_HOST, &hostname)) continue;
if( strlen(hostname) == 0 ) continue;
if(iter.get(CFG_MGM_PORT, &port)) continue;
connect_string.appfmt("%s%s:port",separator,hostname,port);
separator= ",";
}
NdbMgmHandle h= ndb_mgm_create_handle();
if ( h && connect_string.length() > 0 )
{
ndb_mgm_set_connectstring(h,connect_string.c_str());
tr.set_mgm_handle(h);
}
}
Uint32 noOfTransportersCreated= 0;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
......
......@@ -71,15 +71,14 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
DBUG_RETURN(0);
}
TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
void * callback,
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
m_mgm_handle = mgm_handle;
m_mgm_handle= 0;
callbackObj=callback;
......@@ -114,6 +113,27 @@ TransporterRegistry::TransporterRegistry(NdbMgmHandle mgm_handle,
theOSEJunkSocketRecv = 0;
}
void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
{
DBUG_ENTER("TransporterRegistry::set_mgm_handle");
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
m_mgm_handle= h;
#ifndef DBUG_OFF
if (h)
{
char buf[256];
DBUG_PRINT("info",("handle set with connectstring: %s",
ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
}
else
{
DBUG_PRINT("info",("handle set to NULL"));
}
#endif
DBUG_VOID_RETURN;
};
TransporterRegistry::~TransporterRegistry() {
removeAll();
......@@ -134,6 +154,8 @@ TransporterRegistry::~TransporterRegistry() {
theOSEReceiver = 0;
}
#endif
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
}
void
......@@ -1211,40 +1233,32 @@ TransporterRegistry::start_clients_thread()
switch(performStates[nodeId]){
case CONNECTING:
if(!t->isConnected() && !t->isServer) {
int result= 0;
bool connected= false;
/**
* First, we try to connect (if we have a port number).
*/
if (t->get_s_port())
result= t->connect_client();
connected= t->connect_client();
if (result<0 && t->get_s_port()!=0)
g_eventLogger.warning("Error while trying to make connection "
"(Node %u to %u via port %u) "
"error: %d. Retrying...",
t->getRemoteNodeId(),
t->getLocalNodeId(),
t->get_s_port());
/**
* If dynamic, get the port for connecting from the management server
*/
if(t->get_s_port() <= 0) { // Port is dynamic
if( !connected && t->get_s_port() <= 0) { // Port is dynamic
int server_port= 0;
struct ndb_mgm_reply mgm_reply;
int res;
int res= -1;
if(!ndb_mgm_is_connected(m_mgm_handle))
if(ndb_mgm_connect(m_mgm_handle, 0, 0, 0)<0)
ndbout_c("Failed to reconnect to management server");
res= ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
else
res=
ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
t->getLocalNodeId(),
CFG_CONNECTION_SERVER_PORT,
&server_port,
&mgm_reply);
DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
server_port,t->getRemoteNodeId(),
t->getLocalNodeId(),res));
......
......@@ -92,10 +92,6 @@ int main(int argc, char** argv)
}
}
globalTransporterRegistry.set_mgm_handle(theConfig
->get_config_retriever()
->get_mgmHandle());
#ifndef NDB_WIN32
for(pid_t child = fork(); child != 0; child = fork()){
/**
......
......@@ -576,7 +576,7 @@ MgmtSrvr::start(BaseString &error_string)
}
}
theFacade= TransporterFacade::theFacadeInstance
= new TransporterFacade(m_config_retriever->get_mgmHandle());
= new TransporterFacade();
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......
......@@ -84,7 +84,9 @@ NdbTransaction* Ndb::doConnect(Uint32 tConNode)
} else if (TretCode != 0) {
tAnyAlive= 1;
}//if
DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));
DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",
tNode, TretCode, getNdbError().code,
getNdbError().message));
}
}
else // just do a regular round robin
......
......@@ -1420,14 +1420,18 @@ Remark: Sets TC Connect pointer.
int
NdbTransaction::receiveTCSEIZEREF(NdbApiSignal* aSignal)
{
DBUG_ENTER("NdbTransaction::receiveTCSEIZEREF");
if (theStatus != Connecting)
{
return -1;
DBUG_RETURN(-1);
} else
{
theStatus = ConnectFailure;
theNdb->theError.code = aSignal->readData(2);
return 0;
DBUG_PRINT("info",("error code %d, %s",
theNdb->getNdbError().code,
theNdb->getNdbError().message));
DBUG_RETURN(0);
}
}//NdbTransaction::receiveTCSEIZEREF()
......
......@@ -466,8 +466,7 @@ void TransporterFacade::threadMainReceive(void)
theTransporterRegistry->stopReceiving();
}
TransporterFacade::TransporterFacade(NdbMgmHandle mgm_handle) :
m_mgm_handle(mgm_handle),
TransporterFacade::TransporterFacade() :
theTransporterRegistry(0),
theStopReceive(0),
theSendThread(NULL),
......@@ -496,7 +495,7 @@ bool
TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(m_mgm_handle,this);
theTransporterRegistry = new TransporterRegistry(this);
const int res = IPCConfig::configureTransporters(nodeId,
* props,
......
......@@ -47,7 +47,7 @@ extern "C" {
class TransporterFacade
{
public:
TransporterFacade(NdbMgmHandle mgm_handle);
TransporterFacade();
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
......@@ -133,7 +133,6 @@ private:
bool isConnected(NodeId aNodeId);
void doStop();
NdbMgmHandle m_mgm_handle;
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
......
......@@ -284,7 +284,7 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
m_transporter_facade=
TransporterFacade::theFacadeInstance=
new TransporterFacade(m_config_retriever->get_mgmHandle());
new TransporterFacade();
DBUG_VOID_RETURN;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment