Commit 130cd50d authored by unknown's avatar unknown

Merge mysqldev@production.mysql.com:my/mysql-4.1-release

into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-release

parents 7dbae42b d4679992
...@@ -238,6 +238,8 @@ public: ...@@ -238,6 +238,8 @@ public:
}; };
Vector<Transporter_interface> m_transporter_interface; Vector<Transporter_interface> m_transporter_interface;
void add_transporter_interface(const char *interf, unsigned short port); void add_transporter_interface(const char *interf, unsigned short port);
struct in_addr get_connect_address(NodeId node_id) const;
protected: protected:
private: private:
......
...@@ -131,16 +131,14 @@ ConfigRetriever::getConfig() { ...@@ -131,16 +131,14 @@ ConfigRetriever::getConfig() {
} }
ndb_mgm_configuration * ndb_mgm_configuration *
ConfigRetriever::getConfig(NdbMgmHandle m_handle){ ConfigRetriever::getConfig(NdbMgmHandle m_handle)
{
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle,m_version); ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle,m_version);
if(conf == 0){ if(conf == 0)
{
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
return 0; return 0;
} }
ndb_mgm_disconnect(m_handle);
return conf; return conf;
} }
......
...@@ -74,6 +74,7 @@ Transporter::Transporter(TransporterRegistry &t_reg, ...@@ -74,6 +74,7 @@ Transporter::Transporter(TransporterRegistry &t_reg,
m_connected = false; m_connected = false;
m_timeOutMillis = 1000; m_timeOutMillis = 1000;
m_connect_address.s_addr= 0;
if (isServer) if (isServer)
m_socket_client= 0; m_socket_client= 0;
else else
...@@ -98,6 +99,13 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { ...@@ -98,6 +99,13 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
DBUG_RETURN(true); // TODO assert(0); DBUG_RETURN(true); // TODO assert(0);
} }
{
struct sockaddr addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
}
bool res = connect_server_impl(sockfd); bool res = connect_server_impl(sockfd);
if(res){ if(res){
m_connected = true; m_connected = true;
...@@ -164,6 +172,13 @@ Transporter::connect_client() { ...@@ -164,6 +172,13 @@ Transporter::connect_client() {
g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId); g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
} }
{
struct sockaddr addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
}
bool res = connect_client_impl(sockfd); bool res = connect_client_impl(sockfd);
if(res){ if(res){
m_connected = true; m_connected = true;
......
...@@ -122,6 +122,7 @@ protected: ...@@ -122,6 +122,7 @@ protected:
private: private:
SocketClient *m_socket_client; SocketClient *m_socket_client;
struct in_addr m_connect_address;
protected: protected:
Uint32 getErrorCount(); Uint32 getErrorCount();
......
...@@ -51,6 +51,12 @@ extern int g_ndb_shm_signum; ...@@ -51,6 +51,12 @@ extern int g_ndb_shm_signum;
#include <EventLogger.hpp> #include <EventLogger.hpp>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
struct in_addr
TransporterRegistry::get_connect_address(NodeId node_id) const
{
return theTransporters[node_id]->m_connect_address;
}
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{ {
DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
......
...@@ -455,11 +455,13 @@ static int do_event_thread; ...@@ -455,11 +455,13 @@ static int do_event_thread;
static void* static void*
event_thread_run(void* m) event_thread_run(void* m)
{ {
DBUG_ENTER("event_thread_run");
NdbMgmHandle handle= *(NdbMgmHandle*)m; NdbMgmHandle handle= *(NdbMgmHandle*)m;
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 }; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 };
int fd = ndb_mgm_listen_event(handle, filter); int fd = ndb_mgm_listen_event(handle, filter);
if (fd > 0) if (fd != NDB_INVALID_SOCKET)
{ {
do_event_thread= 1; do_event_thread= 1;
char *tmp= 0; char *tmp= 0;
...@@ -468,20 +470,26 @@ event_thread_run(void* m) ...@@ -468,20 +470,26 @@ event_thread_run(void* m)
do { do {
if (tmp == 0) NdbSleep_MilliSleep(10); if (tmp == 0) NdbSleep_MilliSleep(10);
if((tmp = in.gets(buf, 1024))) if((tmp = in.gets(buf, 1024)))
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
ndbout << tmp; ndbout << tmp;
}
} while(do_event_thread); } while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
} }
else else
{ {
do_event_thread= -1; do_event_thread= -1;
} }
return NULL; DBUG_RETURN(NULL);
} }
bool bool
CommandInterpreter::connect() CommandInterpreter::connect()
{ {
DBUG_ENTER("CommandInterpreter::connect");
if(!m_connected) if(!m_connected)
{ {
if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
...@@ -512,8 +520,19 @@ CommandInterpreter::connect() ...@@ -512,8 +520,19 @@ CommandInterpreter::connect()
do_event_thread == 0 || do_event_thread == 0 ||
do_event_thread == -1) do_event_thread == -1)
{ {
printf("Warning, event thread startup failed, degraded printouts as result\n"); DBUG_PRINT("info",("Warning, event thread startup failed, "
"degraded printouts as result, errno=%d",
errno));
printf("Warning, event thread startup failed, "
"degraded printouts as result, errno=%d\n", errno);
do_event_thread= 0; do_event_thread= 0;
if (m_event_thread)
{
void *res;
NdbThread_WaitFor(m_event_thread, &res);
NdbThread_Destroy(&m_event_thread);
}
ndb_mgm_disconnect(m_mgmsrv2);
} }
} }
else else
...@@ -521,6 +540,8 @@ CommandInterpreter::connect() ...@@ -521,6 +540,8 @@ CommandInterpreter::connect()
printf("Warning, event connect failed, degraded printouts as result\n"); printf("Warning, event connect failed, degraded printouts as result\n");
} }
m_connected= true; m_connected= true;
DBUG_PRINT("info",("Connected to Management Server at: %s:%d",
host,port));
if (m_verbose) if (m_verbose)
{ {
printf("Connected to Management Server at: %s:%d\n", printf("Connected to Management Server at: %s:%d\n",
...@@ -528,12 +549,13 @@ CommandInterpreter::connect() ...@@ -528,12 +549,13 @@ CommandInterpreter::connect()
} }
} }
} }
return m_connected; DBUG_RETURN(m_connected);
} }
bool bool
CommandInterpreter::disconnect() CommandInterpreter::disconnect()
{ {
DBUG_ENTER("CommandInterpreter::disconnect");
if (m_event_thread) { if (m_event_thread) {
void *res; void *res;
do_event_thread= 0; do_event_thread= 0;
...@@ -550,7 +572,7 @@ CommandInterpreter::disconnect() ...@@ -550,7 +572,7 @@ CommandInterpreter::disconnect()
} }
m_connected= false; m_connected= false;
} }
return true; DBUG_RETURN(true);
} }
//***************************************************************************** //*****************************************************************************
......
...@@ -2124,6 +2124,24 @@ MgmtSrvr::getNodeType(NodeId nodeId) const ...@@ -2124,6 +2124,24 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId]; return nodeTypes[nodeId];
} }
const char *MgmtSrvr::get_connect_address(Uint32 node_id)
{
if (m_connect_address[node_id].s_addr == 0 &&
theFacade && theFacade->theTransporterRegistry &&
theFacade->theClusterMgr &&
getNodeType(node_id) == NDB_MGM_NODE_TYPE_NDB)
{
const ClusterMgr::Node &node=
theFacade->theClusterMgr->getNodeInfo(node_id);
if (node.connected)
{
m_connect_address[node_id]=
theFacade->theTransporterRegistry->get_connect_address(node_id);
}
}
return inet_ntoa(m_connect_address[node_id]);
}
void void
MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
{ {
......
...@@ -60,6 +60,7 @@ public: ...@@ -60,6 +60,7 @@ public:
} }
void add_listener(const Event_listener&); void add_listener(const Event_listener&);
void check_listeners();
void update_max_log_level(const LogLevel&); void update_max_log_level(const LogLevel&);
void update_log_level(const LogLevel&); void update_log_level(const LogLevel&);
...@@ -508,7 +509,7 @@ public: ...@@ -508,7 +509,7 @@ public:
int setDbParameter(int node, int parameter, const char * value, BaseString&); int setDbParameter(int node, int parameter, const char * value, BaseString&);
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } const char *get_connect_address(Uint32 node_id);
void get_connected_nodes(NodeBitmask &connected_nodes) const; void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; } SocketServer *get_socket_server() { return m_socket_server; }
......
...@@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = {
}; };
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm) { : SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock); m_input = new SocketInputStream(sock);
m_output = new SocketOutputStream(sock); m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true); m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
DBUG_VOID_RETURN;
} }
MgmApiSession::~MgmApiSession() MgmApiSession::~MgmApiSession()
{ {
DBUG_ENTER("MgmApiSession::~MgmApiSession");
if (m_input) if (m_input)
delete m_input; delete m_input;
if (m_output) if (m_output)
...@@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession() ...@@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession()
delete m_parser; delete m_parser;
if (m_allocated_resources) if (m_allocated_resources)
delete m_allocated_resources; delete m_allocated_resources;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
} }
void void
MgmApiSession::runSession() { MgmApiSession::runSession()
{
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx; Parser_t::Context ctx;
while(!m_stop) { while(!m_stop) {
m_parser->run(ctx, *this); m_parser->run(ctx, *this);
...@@ -301,8 +314,13 @@ MgmApiSession::runSession() { ...@@ -301,8 +314,13 @@ MgmApiSession::runSession() {
break; break;
} }
} }
if(m_socket >= 0) if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket); NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
DBUG_VOID_RETURN;
} }
#ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT #ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT
...@@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Uint32 threshold; Uint32 threshold;
LogLevel::EventCategory cat; LogLevel::EventCategory cat;
Logger::LoggerLevel severity; Logger::LoggerLevel severity;
int i; int i, n;
DBUG_ENTER("Ndb_mgmd_event_service::log"); DBUG_ENTER("Ndb_mgmd_event_service::log");
DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId)); DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId));
...@@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
Vector<NDB_SOCKET_TYPE> copy; Vector<NDB_SOCKET_TYPE> copy;
m_clients.lock(); m_clients.lock();
for(i = m_clients.size() - 1; i >= 0; i--){ for(i = m_clients.size() - 1; i >= 0; i--)
if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){ {
if(m_clients[i].m_socket != NDB_INVALID_SOCKET && if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat))
println_socket(m_clients[i].m_socket, {
MAX_WRITE_TIMEOUT, m_text) == -1){ int fd= m_clients[i].m_socket;
copy.push_back(m_clients[i].m_socket); if(fd != NDB_INVALID_SOCKET &&
println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1)
{
copy.push_back(fd);
m_clients.erase(i, false); m_clients.erase(i, false);
} }
} }
} }
m_clients.unlock(); m_clients.unlock();
for(i = 0; (unsigned)i < copy.size(); i++){ if ((n= (int)copy.size()))
{
for(i= 0; i < n; i++)
NDB_CLOSE_SOCKET(copy[i]); NDB_CLOSE_SOCKET(copy[i]);
}
if(copy.size()){
LogLevel tmp; tmp.clear(); LogLevel tmp; tmp.clear();
m_clients.lock(); m_clients.lock();
for(i = 0; (unsigned)i < m_clients.size(); i++){ for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel); tmp.set_max(m_clients[i].m_logLevel);
}
m_clients.unlock(); m_clients.unlock();
update_log_level(tmp); update_log_level(tmp);
} }
...@@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) ...@@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
} }
void void
Ndb_mgmd_event_service::add_listener(const Event_listener& client){ Ndb_mgmd_event_service::check_listeners()
{
int i, n= 0;
DBUG_ENTER("Ndb_mgmd_event_service::check_listeners");
m_clients.lock();
for(i= m_clients.size() - 1; i >= 0; i--)
{
int fd= m_clients[i].m_socket;
DBUG_PRINT("info",("%d %d",i,fd));
char buf[1];
buf[0]=0;
if (fd != NDB_INVALID_SOCKET &&
println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1)
{
NDB_CLOSE_SOCKET(fd);
m_clients.erase(i, false);
n=1;
}
}
if (n)
{
LogLevel tmp; tmp.clear();
for(i= m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
update_log_level(tmp);
}
m_clients.unlock();
DBUG_VOID_RETURN;
}
void
Ndb_mgmd_event_service::add_listener(const Event_listener& client)
{
DBUG_ENTER("Ndb_mgmd_event_service::add_listener");
DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket));
check_listeners();
m_clients.push_back(client); m_clients.push_back(client);
update_max_log_level(client.m_logLevel); update_max_log_level(client.m_logLevel);
DBUG_VOID_RETURN;
} }
void void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment