Commit 744a18eb authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge

parents eb3e306f 7805cb4d
......@@ -29,6 +29,7 @@ noinst_HEADERS = sql_string.h completion_hash.h my_readline.h \
client_priv.h
mysqladmin_SOURCES = mysqladmin.cc
mysql_SOURCES = mysql.cc readline.cc sql_string.cc completion_hash.cc
mysqladmin_SOURCES = mysqladmin.cc
mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD) $(CXXLDFLAGS)
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysql_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) $(DEPLIB)
......
......@@ -28,7 +28,6 @@ ndbapi/NdbIndexScanOperation.hpp \
ndbapi/ndberror.h
mgmapiinclude_HEADERS = \
mgmapi/LocalConfig.hpp \
mgmapi/mgmapi.h \
mgmapi/mgmapi_debug.h
......
......@@ -356,11 +356,28 @@ extern "C" {
/**
* Create a handle to a management server
*
* @return A management handle<br>
* or NULL if no management handle could be created.
* @param connect_string Connect string to the management server,
*
* @return A management handle<br>
* or NULL if no management handle could be created.
*/
NdbMgmHandle ndb_mgm_create_handle();
/**
* Set connecst string to management server
*
* @param handle Management handle
* @param connect_string Connect string to the management server,
*
* @return -1 on error.
*/
int ndb_mgm_set_connectstring(NdbMgmHandle handle,
const char *connect_string);
int ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle);
int ndb_mgm_get_connected_port(NdbMgmHandle handle);
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
/**
* Destroy a management server handle
*
......@@ -378,11 +395,10 @@ extern "C" {
* Connect to a management server
*
* @param handle Management handle.
* @param mgmsrv Hostname and port of the management server,
* "hostname:port".
* @return -1 on error.
*/
int ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv);
int ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
int retry_delay_in_seconds, int verbose);
/**
* Disconnect from a management server
......@@ -709,9 +725,7 @@ extern "C" {
void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version,
unsigned *pnodeid,
int nodetype);
unsigned version, int nodetype);
/**
* Config iterator
*/
......
......@@ -20,7 +20,6 @@
#include <ndb_types.h>
#include <mgmapi.h>
#include <BaseString.hpp>
#include <LocalConfig.hpp>
/**
* @class ConfigRetriever
......@@ -28,10 +27,11 @@
*/
class ConfigRetriever {
public:
ConfigRetriever(LocalConfig &local_config, Uint32 version, Uint32 nodeType);
ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 nodeType);
~ConfigRetriever();
int do_connect(int exit_on_connect_failure= false);
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);
/**
* Get configuration for current node.
......@@ -46,12 +46,14 @@ public:
*/
struct ndb_mgm_configuration * getConfig();
void resetError();
int hasError();
const char * getErrorString();
/**
* @return Node id of this node (as stated in local config or connectString)
*/
Uint32 allocNodeId();
Uint32 allocNodeId(int no_retries, int retry_delay_in_seconds);
/**
* Get config using socket
......@@ -68,22 +70,26 @@ public:
*/
bool verifyConfig(const struct ndb_mgm_configuration *, Uint32 nodeid);
Uint32 get_mgmd_port() const {return m_mgmd_port;};
const char *get_mgmd_host() const {return m_mgmd_host;};
Uint32 get_mgmd_port() const;
const char *get_mgmd_host() const;
Uint32 get_configuration_nodeid() const;
private:
BaseString errorString;
enum ErrorType {
CR_ERROR = 0,
CR_RETRY = 1
CR_NO_ERROR = 0,
CR_ERROR = 1,
CR_RETRY = 2
};
ErrorType latestErrorType;
void setError(ErrorType, const char * errorMsg);
struct LocalConfig& _localConfig;
Uint32 _ownNodeId;
Uint32 _ownNodeId;
/*
Uint32 m_mgmd_port;
const char *m_mgmd_host;
*/
Uint32 m_version;
Uint32 m_node_type;
......
......@@ -19,7 +19,6 @@
#define CLUSTER_CONNECTION_HPP
class TransporterFacade;
class LocalConfig;
class ConfigRetriever;
class NdbThread;
......@@ -38,7 +37,6 @@ private:
void connect_thread();
char *m_connect_string;
TransporterFacade *m_facade;
LocalConfig *m_local_config;
ConfigRetriever *m_config_retriever;
NdbThread *m_connect_thread;
int (*m_connect_callback)(void);
......
......@@ -20,7 +20,6 @@
#include <ConfigRetriever.hpp>
#include <SocketServer.hpp>
#include "LocalConfig.hpp"
#include <NdbSleep.h>
#include <NdbOut.hpp>
......@@ -45,90 +44,62 @@
//****************************************************************************
//****************************************************************************
ConfigRetriever::ConfigRetriever(LocalConfig &local_config,
ConfigRetriever::ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 node_type)
: _localConfig(local_config)
{
m_handle= 0;
m_version = version;
m_node_type = node_type;
_ownNodeId = _localConfig._ownNodeId;
}
_ownNodeId= 0;
ConfigRetriever::~ConfigRetriever(){
m_handle= ndb_mgm_create_handle();
if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle");
return;
}
if (ndb_mgm_set_connectstring(m_handle, _connect_string))
{
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return;
}
resetError();
}
ConfigRetriever::~ConfigRetriever()
{
if (m_handle) {
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
}
}
Uint32
ConfigRetriever::get_configuration_nodeid() const
{
return ndb_mgm_get_configuration_nodeid(m_handle);
}
Uint32 ConfigRetriever::get_mgmd_port() const
{
return ndb_mgm_get_connected_port(m_handle);
}
const char *ConfigRetriever::get_mgmd_host() const
{
return ndb_mgm_get_connected_host(m_handle);
}
//****************************************************************************
//****************************************************************************
int
ConfigRetriever::do_connect(int exit_on_connect_failure){
m_mgmd_port= 0;
m_mgmd_host= 0;
if(!m_handle)
m_handle= ndb_mgm_create_handle();
if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle");
return -1;
}
int retry = 1;
int retry_max = 12; // Max number of retry attempts
int retry_interval= 5; // Seconds between each retry
while(retry < retry_max){
Uint32 type = CR_ERROR;
BaseString tmp;
for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
MgmtSrvrId * m = &_localConfig.ids[i];
DBUG_PRINT("info",("trying %s:%d",
m->name.c_str(),
m->port));
switch(m->type){
case MgmId_TCP:
tmp.assfmt("%s:%d", m->name.c_str(), m->port);
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
m_mgmd_port= m->port;
m_mgmd_host= m->name.c_str();
DBUG_PRINT("info",("connected to ndb_mgmd at %s:%d",
m_mgmd_host,
m_mgmd_port));
return 0;
}
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
case MgmId_File:
break;
}
}
if(latestErrorType == CR_RETRY){
DBUG_PRINT("info",("CR_RETRY"));
if (exit_on_connect_failure)
return 1;
REPORT_WARNING("Failed to retrieve cluster configuration");
ndbout << "(Cause of failure: " << getErrorString() << ")" << endl;
ndbout << "Attempt " << retry << " of " << retry_max << ". "
<< "Trying again in "<< retry_interval <<" seconds..."
<< endl << endl;
NdbSleep_SecSleep(retry_interval);
} else {
break;
}
retry++;
}
ndb_mgm_destroy_handle(&m_handle);
m_handle= 0;
m_mgmd_port= 0;
m_mgmd_host= 0;
return -1;
ConfigRetriever::do_connect(int no_retries,
int retry_delay_in_seconds, int verbose)
{
return
(ndb_mgm_connect(m_handle,no_retries,retry_delay_in_seconds,verbose)==0) ?
0 : -1;
}
//****************************************************************************
......@@ -140,22 +111,9 @@ ConfigRetriever::getConfig() {
struct ndb_mgm_configuration * p = 0;
if(m_handle != 0){
if(m_handle != 0)
p = getConfig(m_handle);
} else {
for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
MgmtSrvrId * m = &_localConfig.ids[i];
switch(m->type){
case MgmId_File:
p = getConfig(m->name.c_str());
break;
case MgmId_TCP:
break;
}
if(p)
break;
}
}
if(p == 0)
return 0;
......@@ -227,6 +185,16 @@ ConfigRetriever::setError(ErrorType et, const char * s){
latestErrorType = et;
}
void
ConfigRetriever::resetError(){
setError(CR_NO_ERROR,0);
}
int
ConfigRetriever::hasError()
{
return latestErrorType != CR_NO_ERROR;
}
const char *
ConfigRetriever::getErrorString(){
......@@ -341,16 +309,23 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32
}
Uint32
ConfigRetriever::allocNodeId(){
unsigned nodeid= _ownNodeId;
if(m_handle != 0){
int res= ndb_mgm_alloc_nodeid(m_handle, m_version, &nodeid, m_node_type);
if(res != 0) {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return 0;
ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds)
{
_ownNodeId= 0;
if(m_handle != 0)
{
while (1)
{
int res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type);
if(res >= 0)
return _ownNodeId= (Uint32)res;
if (no_retries == 0)
break;
no_retries--;
NdbSleep_SecSleep(retry_delay_in_seconds);
}
}
return _ownNodeId= nodeid;
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
} else
setError(CR_ERROR, "management server handle not initialized");
return 0;
}
......@@ -19,7 +19,6 @@
#include <ndb_version.h>
#include "Configuration.hpp"
#include <LocalConfig.hpp>
#include <TransporterRegistry.hpp>
#include "vm/SimBlockList.hpp"
......@@ -69,16 +68,9 @@ int main(int argc, char** argv)
return NRT_Default;
}
LocalConfig local_config;
if (!local_config.init(theConfig->getConnectString(),0)){
local_config.printError();
local_config.printUsage();
return NRT_Default;
}
{ // Do configuration
signal(SIGPIPE, SIG_IGN);
theConfig->fetch_configuration(local_config);
theConfig->fetch_configuration();
}
chdir(NdbConfig_get_path(0));
......@@ -141,7 +133,7 @@ int main(int argc, char** argv)
exit(0);
}
g_eventLogger.info("Ndb has terminated (pid %d) restarting", child);
theConfig->fetch_configuration(local_config);
theConfig->fetch_configuration();
}
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());
......
......@@ -17,7 +17,6 @@
#include <ndb_global.h>
#include <ndb_opts.h>
#include <LocalConfig.hpp>
#include "Configuration.hpp"
#include <ErrorHandlingMacros.hpp>
#include "GlobalData.hpp"
......@@ -189,7 +188,7 @@ Configuration::closeConfiguration(){
}
void
Configuration::fetch_configuration(LocalConfig &local_config){
Configuration::fetch_configuration(){
/**
* Fetch configuration from management server
*/
......@@ -199,8 +198,17 @@ Configuration::fetch_configuration(LocalConfig &local_config){
m_mgmd_port= 0;
m_mgmd_host= 0;
m_config_retriever= new ConfigRetriever(local_config, NDB_VERSION, NODE_TYPE_DB);
if(m_config_retriever->do_connect() == -1){
m_config_retriever= new ConfigRetriever(getConnectString(),
NDB_VERSION, NODE_TYPE_DB);
if (m_config_retriever->hasError())
{
ERROR_SET(fatal, ERR_INVALID_CONFIG,
"Could not connect initialize handle to management server",
m_config_retriever->getErrorString());
}
if(m_config_retriever->do_connect(12,5,1) == -1){
const char * s = m_config_retriever->getErrorString();
if(s == 0)
s = "No error given!";
......@@ -215,13 +223,7 @@ Configuration::fetch_configuration(LocalConfig &local_config){
ConfigRetriever &cr= *m_config_retriever;
if((globalData.ownId = cr.allocNodeId()) == 0){
for(Uint32 i = 0; i<3; i++){
NdbSleep_SecSleep(3);
if((globalData.ownId = cr.allocNodeId()) != 0)
break;
}
}
globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/);
if(globalData.ownId == 0){
ERROR_SET(fatal, ERR_INVALID_CONFIG,
......
......@@ -21,7 +21,6 @@
#include <ndb_types.h>
class ConfigRetriever;
class LocalConfig;
class Configuration {
public:
......@@ -33,7 +32,7 @@ public:
*/
bool init(int argc, char** argv);
void fetch_configuration(LocalConfig &local_config);
void fetch_configuration();
void setupConfiguration();
void closeConfiguration();
......
......@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <LocalConfig.hpp>
#include "LocalConfig.hpp"
#include <NdbEnv.h>
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
......@@ -294,4 +294,19 @@ LocalConfig::readConnectString(const char * connectString,
return return_value;
}
char *
LocalConfig::makeConnectString(char *buf, int sz)
{
int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId);
for (int i = 0; (i < ids.size()) && (sz-p > 0); i++)
{
if (ids[i].type != MgmId_TCP)
continue;
p+=BaseString::snprintf(buf+p,sz-p,",%s:%d",
ids[i].name.c_str(), ids[i].port);
}
buf[sz-1]=0;
return buf;
}
template class Vector<MgmtSrvrId>;
......@@ -61,6 +61,7 @@ struct LocalConfig {
bool parseHostName(const char *buf);
bool parseFileName(const char *buf);
bool parseString(const char *buf, BaseString &err);
char * makeConnectString(char *buf, int sz);
};
#endif // LocalConfig_H
......
......@@ -20,6 +20,7 @@
#include <LocalConfig.hpp>
#include <NdbAutoPtr.hpp>
#include <NdbSleep.h>
#include <NdbTCP.h>
#include "mgmapi.h"
#include "mgmapi_debug.h"
......@@ -83,8 +84,8 @@ typedef Parser<ParserDummy> Parser_t;
#define NDB_MGM_MAX_ERR_DESC_SIZE 256
struct ndb_mgm_handle {
char * hostname;
unsigned short port;
char * connectstring;
int cfg_i;
int connected;
int last_error;
......@@ -95,7 +96,7 @@ struct ndb_mgm_handle {
NDB_SOCKET_TYPE socket;
char cfg_ptr[sizeof(LocalConfig)];
LocalConfig cfg;
#ifdef MGMAPI_LOG
FILE* logfile;
......@@ -148,14 +149,16 @@ ndb_mgm_create_handle()
h->connected = 0;
h->last_error = 0;
h->last_error_line = 0;
h->hostname = 0;
h->socket = NDB_INVALID_SOCKET;
h->read_timeout = 50000;
h->write_timeout = 100;
new (h->cfg_ptr) LocalConfig;
h->cfg_i = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
new (&(h->cfg)) LocalConfig;
h->cfg.init(0, 0);
#ifdef MGMAPI_LOG
h->logfile = 0;
#endif
......@@ -163,6 +166,23 @@ ndb_mgm_create_handle()
return h;
}
extern "C"
int
ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
{
new (&(handle->cfg)) LocalConfig;
if (!handle->cfg.init(mgmsrv, 0) ||
handle->cfg.ids.size() == 0)
{
new (&(handle->cfg)) LocalConfig;
handle->cfg.init(0, 0); /* reset the LocalCongig */
SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
return -1;
}
handle->cfg_i= 0;
return 0;
}
/**
* Destroy a handle
*/
......@@ -175,14 +195,13 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
if((* handle)->connected){
ndb_mgm_disconnect(* handle);
}
my_free((* handle)->hostname,MYF(MY_ALLOW_ZERO_PTR));
#ifdef MGMAPI_LOG
if ((* handle)->logfile != 0){
fclose((* handle)->logfile);
(* handle)->logfile = 0;
}
#endif
((LocalConfig*)((*handle)->cfg_ptr))->~LocalConfig();
(*handle)->cfg.~LocalConfig();
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
}
......@@ -314,7 +333,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
*/
extern "C"
int
ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
int retry_delay_in_seconds, int verbose)
{
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_connect");
CHECK_HANDLE(handle, -1);
......@@ -331,36 +351,48 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/**
* Do connect
*/
LocalConfig *cfg= (LocalConfig*)(handle->cfg_ptr);
new (cfg) LocalConfig;
if (!cfg->init(mgmsrv, 0) ||
cfg->ids.size() == 0)
{
SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
return -1;
}
LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
for (i = 0; i < cfg->ids.size(); i++)
while (sockfd == NDB_INVALID_SOCKET)
{
if (cfg->ids[i].type != MgmId_TCP)
continue;
SocketClient s(cfg->ids[i].name.c_str(), cfg->ids[i].port);
sockfd = s.connect();
// do all the mgmt servers
for (i = 0; i < cfg.ids.size(); i++)
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
sockfd = s.connect();
if (sockfd != NDB_INVALID_SOCKET)
break;
}
if (sockfd != NDB_INVALID_SOCKET)
break;
}
if (sockfd == NDB_INVALID_SOCKET)
{
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect using connectstring %s", mgmsrv);
return -1;
if (verbose > 0) {
char buf[1024];
ndbout_c("Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
verbose= -1;
}
if (no_retries == 0) {
char buf[1024];
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
return -1;
}
if (verbose == -1) {
ndbout << "retrying every " << retry_delay_in_seconds << " seconds:";
verbose= -2;
}
NdbSleep_SecSleep(retry_delay_in_seconds);
if (verbose == -2) {
ndbout << " " << no_retries;
}
no_retries--;
}
my_free(handle->hostname,MYF(MY_ALLOW_ZERO_PTR));
handle->hostname = my_strdup(cfg->ids[i].name.c_str(),MYF(MY_WME));
handle->port = cfg->ids[i].port;
handle->cfg_i = i;
handle->socket = sockfd;
handle->connected = 1;
......@@ -1068,7 +1100,9 @@ ndb_mgm_listen_event(NdbMgmHandle handle, int filter[])
};
CHECK_HANDLE(handle, -1);
SocketClient s(handle->hostname, handle->port);
const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle);
SocketClient s(hostname, port);
const NDB_SOCKET_TYPE sockfd = s.connect();
if (sockfd < 0) {
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
......@@ -1613,16 +1647,37 @@ ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *cfg)
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle)
{
CHECK_HANDLE(handle, 0);
return handle->cfg._ownNodeId;
}
extern "C"
int ndb_mgm_get_connected_port(NdbMgmHandle handle)
{
return handle->cfg.ids[handle->cfg_i].port;
}
extern "C"
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle)
{
return handle->cfg.ids[handle->cfg_i].name.c_str();
}
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
int nodeid= handle->cfg._ownNodeId;
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
args.put("nodeid", *pnodeid);
args.put("nodeid", nodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
......@@ -1638,26 +1693,29 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodei
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
CHECK_REPLY(prop, -1);
int res= -1;
nodeid= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
const char *hostname= ndb_mgm_get_connected_host(handle);
unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err;
err.assfmt("Could not alloc node id at %s port %d: %s",
handle->hostname, handle->port, buf);
hostname, port, buf);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
err.c_str());
break;
}
if(!prop->get("nodeid", pnodeid) != 0){
Uint32 _nodeid;
if(!prop->get("nodeid", &_nodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
res= 0;
nodeid= _nodeid;
}while(0);
delete prop;
return res;
return nodeid;
}
/*****************************************************************************
......
......@@ -153,7 +153,6 @@ private:
NdbMgmHandle m_mgmsrv;
bool connected;
const char *host;
int try_reconnect;
#ifdef HAVE_GLOBAL_REPLICATION
NdbRepHandle m_repserver;
......@@ -379,15 +378,16 @@ CommandInterpreter::CommandInterpreter(const char *_host)
m_mgmsrv = ndb_mgm_create_handle();
if(m_mgmsrv == NULL) {
ndbout_c("Cannot create handle to management server.");
exit(-1);
}
if (ndb_mgm_set_connectstring(m_mgmsrv, _host))
{
printError();
exit(-1);
}
connected = false;
try_reconnect = 0;
if (_host)
host= my_strdup(_host,MYF(MY_WME));
else
host= 0;
#ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL;
m_repserver = NULL;
......@@ -402,8 +402,6 @@ CommandInterpreter::~CommandInterpreter()
{
connected = false;
ndb_mgm_destroy_handle(&m_mgmsrv);
my_free((char *)host,MYF(MY_ALLOW_ZERO_PTR));
host = NULL;
}
static bool
......@@ -438,18 +436,8 @@ bool
CommandInterpreter::connect()
{
if(!connected) {
int tries = try_reconnect; // tries == 0 => infinite
while(!connected) {
if(ndb_mgm_connect(m_mgmsrv, host) == -1) {
ndbout << "Cannot connect to management server (" << host << ").";
tries--;
if (tries == 0)
break;
ndbout << "Retrying in 5 seconds." << endl;
NdbSleep_SecSleep(5);
} else
connected = true;
}
if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
connected = true;
}
return connected;
}
......
......@@ -30,9 +30,10 @@ extern "C" int add_history(const char *command); /* From readline directory */
#include <NdbMain.h>
#include <NdbHost.h>
#include <BaseString.hpp>
#include <NdbOut.hpp>
#include <mgmapi.h>
#include <ndb_version.h>
#include <LocalConfig.hpp>
#include "ndb_mgmclient.hpp"
......
......@@ -399,16 +399,20 @@ MgmtSrvr::getPort() const {
}
/* Constructor */
MgmtSrvr::MgmtSrvr(NodeId nodeId,
SocketServer *socket_server,
const BaseString &configFilename,
LocalConfig &local_config,
Config * config):
int MgmtSrvr::init()
{
if ( _ownNodeId > 0)
return 0;
return -1;
}
MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
const char *config_filename,
const char *connect_string) :
_blockNumber(1), // Hard coded block number since it makes it easy to send
// signals to other management servers.
m_socket_server(socket_server),
_ownReference(0),
m_local_config(local_config),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
m_statisticsListner(this)
......@@ -416,6 +420,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
DBUG_ENTER("MgmtSrvr::MgmtSrvr");
_ownNodeId= 0;
_config = NULL;
_isStopThread = false;
......@@ -426,12 +432,43 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
theFacade = 0;
m_newConfig = NULL;
m_configFilename = configFilename;
m_configFilename.assign(config_filename);
m_nextConfigGenerationNumber = 0;
_config = (config == 0 ? readConfig() : config);
m_config_retriever= new ConfigRetriever(connect_string,
NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
// first try to allocate nodeid from another management server
if(m_config_retriever->do_connect(0,0,0) == 0)
{
int tmp_nodeid= 0;
tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
if (tmp_nodeid == 0)
{
ndbout_c(m_config_retriever->getErrorString());
exit(-1);
}
// read config from other managent server
_config= fetchConfig();
if (_config == 0)
{
ndbout << m_config_retriever->getErrorString() << endl;
exit(-1);
}
_ownNodeId= tmp_nodeid;
}
if (_ownNodeId == 0)
{
// read config locally
_config= readConfig();
if (_config == 0) {
ndbout << "Unable to read config file" << endl;
exit(-1);
}
}
theMgmtWaitForResponseCondPtr = NdbCondition_Create();
m_configMutex = NdbMutex_Create();
......@@ -443,9 +480,11 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
nodeTypes[i] = (enum ndb_mgm_node_type)-1;
m_connect_address[i].s_addr= 0;
}
{
ndb_mgm_configuration_iterator * iter = ndb_mgm_create_configuration_iterator
(config->m_configValues, CFG_SECTION_NODE);
ndb_mgm_configuration_iterator
*iter = ndb_mgm_create_configuration_iterator(_config->m_configValues,
CFG_SECTION_NODE);
for(ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)){
unsigned type, id;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0)
......@@ -478,8 +517,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
}
_props = NULL;
_ownNodeId= 0;
NodeId tmp= nodeId;
BaseString error_string;
if ((m_node_id_mutex = NdbMutex_Create()) == 0)
......@@ -488,43 +525,25 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
exit(-1);
}
#if 0
char my_hostname[256];
struct sockaddr_in tmp_addr;
SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr);
if (!g_no_nodeid_checks) {
if (gethostname(my_hostname, sizeof(my_hostname))) {
ndbout << "error: gethostname() - " << strerror(errno) << endl;
exit(-1);
}
if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) {
ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - "
<< strerror(errno) << endl;
if (_ownNodeId == 0) // we did not get node id from other server
{
NodeId tmp= m_config_retriever->get_configuration_nodeid();
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
_ownNodeId = tmp;
}
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
(struct sockaddr *)&tmp_addr,
&addrlen, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
#else
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){
ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl;
exit(-1);
}
#endif
_ownNodeId = tmp;
{
DBUG_PRINT("info", ("verifyConfig"));
ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) {
ndbout << cr.getErrorString() << endl;
if (!m_config_retriever->verifyConfig(_config->m_configValues,
_ownNodeId))
{
ndbout << m_config_retriever->getErrorString() << endl;
exit(-1);
}
}
......@@ -657,6 +676,8 @@ MgmtSrvr::~MgmtSrvr()
NdbThread_WaitFor(m_signalRecvThread, &res);
NdbThread_Destroy(&m_signalRecvThread);
}
if (m_config_retriever)
delete m_config_retriever;
}
//****************************************************************************
......
......@@ -175,11 +175,10 @@ public:
/* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */
SocketServer *socket_server,
const BaseString &config_filename, /* Where to save config */
LocalConfig &local_config, /* Ndb.cfg filename */
Config * config);
MgmtSrvr(SocketServer *socket_server,
const char *config_filename, /* Where to save config */
const char *connect_string);
int init();
NodeId getOwnNodeId() const {return _ownNodeId;};
/**
......@@ -538,7 +537,6 @@ private:
NdbMutex *m_configMutex;
const Config * _config;
Config * m_newConfig;
LocalConfig &m_local_config;
BaseString m_configFilename;
Uint32 m_nextConfigGenerationNumber;
......@@ -755,6 +753,9 @@ private:
Config *_props;
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
ConfigRetriever *m_config_retriever;
public:
/**
* This method does not exist
......
......@@ -272,30 +272,20 @@ MgmtSrvr::saveConfig(const Config *conf) {
Config *
MgmtSrvr::readConfig() {
Config *conf = NULL;
if(m_configFilename.length() != 0) {
/* Use config file */
InitConfigFileParser parser;
conf = parser.parseConfig(m_configFilename.c_str());
if(conf == NULL) {
/* Try to get configuration from other MGM server */
return fetchConfig();
}
}
Config *conf;
InitConfigFileParser parser;
conf = parser.parseConfig(m_configFilename.c_str());
return conf;
}
Config *
MgmtSrvr::fetchConfig() {
ConfigRetriever cr(m_local_config, NDB_VERSION, NODE_TYPE_MGM);
struct ndb_mgm_configuration * tmp = cr.getConfig();
struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig();
if(tmp != 0){
Config * conf = new Config();
conf->m_configValues = tmp;
return conf;
}
return 0;
}
......
......@@ -62,7 +62,6 @@ struct MgmGlobals {
int non_interactive;
int interactive;
const char * config_filename;
const char * local_config_filename;
/** Stuff found in environment or in local config */
NodeId localNodeId;
......@@ -70,9 +69,6 @@ struct MgmGlobals {
char * interface_name;
int port;
/** The configuration of the cluster */
Config * cluster_config;
/** The Mgmt Server */
MgmtSrvr * mgmObject;
......@@ -86,9 +82,6 @@ static MgmGlobals glob;
/******************************************************************************
* Function prototypes
******************************************************************************/
static bool readLocalConfig();
static bool readGlobalConfig();
/**
* Global variables
*/
......@@ -122,9 +115,6 @@ static struct my_option my_long_options[] =
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
{ "l", 'l', "Specify configuration file connect string (default Ndb.cfg if available)",
(gptr*) &glob.local_config_filename, (gptr*) &glob.local_config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "interactive", 256, "Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
......@@ -212,29 +202,16 @@ int main(int argc, char** argv)
MgmApiService * mapi = new MgmApiService();
/****************************
* Read configuration files *
****************************/
LocalConfig local_config;
if(!local_config.init(opt_connect_str,glob.local_config_filename)){
local_config.printError();
goto error_end;
}
glob.localNodeId = local_config._ownNodeId;
glob.mgmObject = new MgmtSrvr(glob.socketServer,
glob.config_filename,
opt_connect_str);
if (!readGlobalConfig())
if (glob.mgmObject->init())
goto error_end;
glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer,
BaseString(glob.config_filename),
local_config,
glob.cluster_config);
chdir(NdbConfig_get_path(0));
glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0) {
goto error_end;
}
......@@ -345,9 +322,7 @@ MgmGlobals::MgmGlobals(){
// Default values
port = 0;
config_filename = NULL;
local_config_filename = NULL;
interface_name = 0;
cluster_config = 0;
daemon = 1;
non_interactive = 0;
interactive = 0;
......@@ -360,27 +335,6 @@ MgmGlobals::~MgmGlobals(){
delete socketServer;
if (mgmObject)
delete mgmObject;
if (cluster_config)
delete cluster_config;
if (interface_name)
free(interface_name);
}
/**
* @fn readGlobalConfig
* @param glob : Global variables
* @return true if success, false otherwise.
*/
static bool
readGlobalConfig() {
if(glob.config_filename == NULL)
return false;
/* Use config file */
InitConfigFileParser parser;
glob.cluster_config = parser.parseConfig(glob.config_filename);
if(glob.cluster_config == 0){
return false;
}
return true;
}
......@@ -45,7 +45,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
else
m_connect_string= 0;
m_config_retriever= 0;
m_local_config= 0;
m_connect_thread= 0;
m_connect_callback= 0;
......@@ -125,38 +124,31 @@ int Ndb_cluster_connection::connect(int reconnect)
do {
if (m_config_retriever == 0)
{
if (m_local_config == 0) {
m_local_config= new LocalConfig();
if (!m_local_config->init(m_connect_string,0)) {
ndbout_c("Configuration error: Unable to retrieve local config");
m_local_config->printError();
m_local_config->printUsage();
DBUG_RETURN(-1);
}
}
m_config_retriever=
new ConfigRetriever(*m_local_config, NDB_VERSION, NODE_TYPE_API);
new ConfigRetriever(m_connect_string, NDB_VERSION, NODE_TYPE_API);
if (m_config_retriever->hasError())
{
printf("Could not connect initialize handle to management server",
m_config_retriever->getErrorString());
DBUG_RETURN(-1);
}
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
int r= m_config_retriever->do_connect(1);
int r= m_config_retriever->do_connect(0,0,0);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
if(m_config_retriever->do_connect() == -1)
if(m_config_retriever->do_connect(12,5,1) == -1)
break;
Uint32 nodeId = m_config_retriever->allocNodeId();
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
NdbSleep_SecSleep(3);
nodeId = m_config_retriever->allocNodeId();
}
Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/);
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
......@@ -200,8 +192,6 @@ Ndb_cluster_connection::~Ndb_cluster_connection()
my_free(m_connect_string,MYF(MY_ALLOW_ZERO_PTR));
if (m_config_retriever)
delete m_config_retriever;
if (m_local_config)
delete m_local_config;
DBUG_VOID_RETURN;
}
......
......@@ -538,15 +538,19 @@ connect_ndb_mgm(atrt_process & proc){
}
BaseString tmp = proc.m_hostname;
tmp.appfmt(":%d", proc.m_ndb_mgm_port);
time_t start = time(0);
const time_t max_connect_time = 30;
do {
if(ndb_mgm_connect(handle, tmp.c_str()) != -1){
proc.m_ndb_mgm_handle = handle;
return true;
}
sleep(1);
} while(time(0) < (start + max_connect_time));
if (ndb_mgm_set_connectstring(handle,tmp.c_str()))
{
g_logger.critical("Unable to create parse connectstring");
return false;
}
if(ndb_mgm_connect(handle, 30, 1, 0) != -1)
{
proc.m_ndb_mgm_handle = handle;
return true;
}
g_logger.critical("Unable to connect to ndb mgm %s", tmp.c_str());
return false;
}
......
......@@ -69,16 +69,14 @@ NdbBackup::getBackupDataDirForNode(int _node_id){
/**
* Fetch configuration from management server
*/
LocalConfig lc;
if (!lc.init(0,0)) {
abort();
}
ConfigRetriever cr(lc, 0, NODE_TYPE_API);
ConfigRetriever cr(0, 0, NODE_TYPE_API);
ndb_mgm_configuration * p = 0;
BaseString tmp; tmp.assfmt("%s:%d", host.c_str(), port);
NdbMgmHandle handle = ndb_mgm_create_handle();
if(handle == 0 || ndb_mgm_connect(handle, tmp.c_str()) != 0 ||
if(handle == 0 ||
ndb_mgm_set_connectstring(handle,tmp.c_str()) != 0 ||
ndb_mgm_connect(handle,0,0,0) != 0 ||
(p = ndb_mgm_get_configuration(handle, 0)) == 0){
const char * s = 0;
......
......@@ -18,7 +18,6 @@
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <LocalConfig.hpp>
#include <mgmapi_debug.h>
#include <NDBT_Output.hpp>
#include <random.h>
......@@ -38,37 +37,7 @@ NdbRestarter::NdbRestarter(const char* _addr):
m_config(0)
{
if (_addr == NULL){
LocalConfig lcfg;
if(!lcfg.init()){
lcfg.printError();
lcfg.printUsage();
g_err << "NdbRestarter - Error parsing local config file" << endl;
return;
}
if (lcfg.ids.size() == 0){
g_err << "NdbRestarter - No management servers configured in local config file" << endl;
return;
}
for (int i = 0; i<lcfg.ids.size(); i++){
MgmtSrvrId * m = &lcfg.ids[i];
switch(m->type){
case MgmId_TCP:
char buf[255];
snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port);
addr.assign(buf);
host.assign(m->name.c_str());
port = m->port;
return;
break;
case MgmId_File:
break;
default:
break;
}
}
addr.assign("");
} else {
addr.assign(_addr);
}
......@@ -397,7 +366,15 @@ NdbRestarter::connect(){
return -1;
}
g_info << "Connecting to mgmsrv at " << addr.c_str() << endl;
if (ndb_mgm_connect(handle, addr.c_str()) == -1) {
if (ndb_mgm_set_connectstring(handle,addr.c_str()))
{
MGMERR(handle);
g_err << "Connection to " << addr.c_str() << " failed" << endl;
return -1;
}
if (ndb_mgm_connect(handle, 0, 0, 0) == -1)
{
MGMERR(handle);
g_err << "Connection to " << addr.c_str() << " failed" << endl;
return -1;
......
......@@ -23,7 +23,6 @@
#include <NdbOut.hpp>
#include <NdbSleep.h>
#include <kernel/ndb_limits.h>
#include <LocalConfig.hpp>
#include <NDBT.hpp>
......@@ -85,39 +84,8 @@ int main(int argc, char** argv){
char buf[255];
_hostName = argv[0];
if (_hostName == NULL){
LocalConfig lcfg;
if(!lcfg.init(opt_connect_str, 0))
{
lcfg.printError();
lcfg.printUsage();
g_err << "Error parsing local config file" << endl;
return NDBT_ProgramExit(NDBT_FAILED);
}
for (unsigned i = 0; i<lcfg.ids.size();i++)
{
MgmtSrvrId * m = &lcfg.ids[i];
switch(m->type){
case MgmId_TCP:
snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port);
_hostName = buf;
break;
case MgmId_File:
break;
default:
break;
}
if (_hostName != NULL)
break;
}
if (_hostName == NULL)
{
g_err << "No management servers configured in local config file" << endl;
return NDBT_ProgramExit(NDBT_FAILED);
}
}
if (_hostName == 0)
_hostName= opt_connect_str;
if (_no_contact) {
if (waitClusterStatus(_hostName, NDB_MGM_NODE_STATUS_NO_CONTACT, _timeout) != 0)
......@@ -210,13 +178,19 @@ waitClusterStatus(const char* _addr,
int _nodes[MAX_NDB_NODES];
int _num_nodes = 0;
handle = ndb_mgm_create_handle();
handle = ndb_mgm_create_handle();
if (handle == NULL){
g_err << "handle == NULL" << endl;
return -1;
}
g_info << "Connecting to mgmsrv at " << _addr << endl;
if (ndb_mgm_connect(handle, _addr) == -1) {
if (ndb_mgm_set_connectstring(handle, _addr))
{
MGMERR(handle);
g_err << "Connectstring " << _addr << " invalid" << endl;
return -1;
}
if (ndb_mgm_connect(handle,0,0,1)) {
MGMERR(handle);
g_err << "Connection to " << _addr << " failed" << endl;
return -1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment