Commit dcf21e94 authored by unknown's avatar unknown

BUG#21154 The management server consumes too much CPU

BUG#13987 Cluster: Loss of data nodes can cause high CPU usage from ndb_mgmd

fix the actual problem (getting incomplete line of data), introduced with previous
improvement.

also add list sessions, get session and get session id to mgmapi to allow
the implementation of a test case for this exact problem.


ndb/include/mgmapi/mgmapi.h:
  Add internal ndb_mgm_get_fd for use in test case
ndb/include/mgmapi/mgmapi_debug.h:
  Add internal get_session_id and get_session
ndb/include/util/InputStream.hpp:
  - fix warning when building with gcc 4
  - add mutex to be UNLOCKED when blocking (e.g. select(2))
    - this means we can list sessions in a threadsafe way
  - add this weird startover member to SocketInputStream
  	- this helps work out if we've read a newline yet and should start inserting
  	  into buffer from the start
ndb/include/util/Parser.hpp:
  add mutex to context to pass down to SocketInputStream
ndb/include/util/socket_io.h:
  readln_socket accepts mutex to UNLOCK around select(2)
ndb/src/common/util/InputStream.cpp:
  remove evil, add more.
  
  As
ndb/src/common/util/Parser.cpp:
  set mutex for passing down to InputStream to unlock on select(2).
  
  change way detecting of NoLine
ndb/src/common/util/socket_io.cpp:
  unlock a mutex around select so that we can nicely do thread safe
  listing of sessions.
  
  Always retrieve data from the OS so that we instantly get EOF on disconnect
  and don't end up spinning looking for a newline.
ndb/src/mgmapi/mgmapi.cpp:
  add internal ndb_mgm_get_fd() for internal testing
  
  internal/debug:
  ndb_mgm_get_session_id
  
  ndb_mgm_get_session
ndb/src/mgmsrv/Services.cpp:
  Add list sessions, get session id and get session.
  
  introduce a session mutex
ndb/src/mgmsrv/Services.hpp:
  Add list and get session.
  
  Add session_id to MgmApiSession.
ndb/test/ndbapi/testMgm.cpp:
  Add test for MgmApiSession disconnection (mgmd at 100%)
parent 81526834
...@@ -1071,6 +1071,19 @@ extern "C" { ...@@ -1071,6 +1071,19 @@ extern "C" {
*/ */
int ndb_mgm_end_session(NdbMgmHandle handle); int ndb_mgm_end_session(NdbMgmHandle handle);
/**
* ndb_mgm_get_fd
*
* get the file descriptor of the handle.
* INTERNAL ONLY.
* USE FOR TESTING. OTHER USES ARE NOT A GOOD IDEA.
*
* @param handle NDB management handle
* @return handle->socket
*
*/
int ndb_mgm_get_fd(NdbMgmHandle handle);
/** /**
* Get the node id of the mgm server we're connected to * Get the node id of the mgm server we're connected to
*/ */
......
...@@ -132,6 +132,20 @@ extern "C" { ...@@ -132,6 +132,20 @@ extern "C" {
const char * value, const char * value,
struct ndb_mgm_reply* reply); struct ndb_mgm_reply* reply);
Uint64 ndb_mgm_get_session_id(NdbMgmHandle handle);
struct NdbMgmSession {
Uint64 id;
Uint32 m_stopSelf;
Uint32 m_stop;
Uint32 nodeid;
Uint32 parser_buffer_len;
Uint32 parser_status;
};
int ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
struct NdbMgmSession *s, int *len);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,13 +19,22 @@ ...@@ -19,13 +19,22 @@
#include <ndb_global.h> #include <ndb_global.h>
#include <NdbTCP.h> #include <NdbTCP.h>
#include <NdbMutex.h>
/** /**
* Input stream * Input stream
*/ */
class InputStream { class InputStream {
public: public:
InputStream() { m_mutex= NULL; };
virtual ~InputStream() {};
virtual char* gets(char * buf, int bufLen) = 0; virtual char* gets(char * buf, int bufLen) = 0;
/**
* Set the mutex to be UNLOCKED when blocking (e.g. select(2))
*/
void set_mutex(NdbMutex *m) { m_mutex= m; };
protected:
NdbMutex *m_mutex;
}; };
class FileInputStream : public InputStream { class FileInputStream : public InputStream {
...@@ -40,6 +49,7 @@ extern FileInputStream Stdin; ...@@ -40,6 +49,7 @@ extern FileInputStream Stdin;
class SocketInputStream : public InputStream { class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket; NDB_SOCKET_TYPE m_socket;
unsigned m_timeout; unsigned m_timeout;
bool m_startover;
public: public:
SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000); SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000);
char* gets(char * buf, int bufLen); char* gets(char * buf, int bufLen);
......
...@@ -61,12 +61,15 @@ public: ...@@ -61,12 +61,15 @@ public:
/** /**
* Context for parse * Context for parse
*/ */
struct Context { class Context {
public:
Context() { m_mutex= NULL; };
ParserStatus m_status; ParserStatus m_status;
const ParserRow<T> * m_currentCmd; const ParserRow<T> * m_currentCmd;
const ParserRow<T> * m_currentArg; const ParserRow<T> * m_currentArg;
char * m_currentToken; char * m_currentToken;
char m_tokenBuffer[512]; char m_tokenBuffer[512];
NdbMutex *m_mutex;
Vector<const ParserRow<T> *> m_aliasUsed; Vector<const ParserRow<T> *> m_aliasUsed;
}; };
......
...@@ -21,12 +21,17 @@ ...@@ -21,12 +21,17 @@
#include <NdbTCP.h> #include <NdbTCP.h>
#include <NdbMutex.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len); int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
int readln_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
char * buf, int buflen, NdbMutex *mutex);
int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len); int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len);
int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...); int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...);
......
...@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){ ...@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){
SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket, SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned readTimeout) unsigned readTimeout)
: m_socket(socket) { : m_socket(socket) {
m_timeout = readTimeout; m_startover= true;
m_timeout = readTimeout;
} }
char* char*
SocketInputStream::gets(char * buf, int bufLen) { SocketInputStream::gets(char * buf, int bufLen) {
buf[0] = 77;
assert(bufLen >= 2); assert(bufLen >= 2);
int res = readln_socket(m_socket, m_timeout, buf, bufLen - 1); int offset= 0;
if(m_startover)
{
buf[0]= '\0';
m_startover= false;
}
else
offset= strlen(buf);
int res = readln_socket(m_socket, m_timeout, buf+offset, bufLen-offset, m_mutex);
if(res == 0)
{
buf[0]=0;
return buf;
}
m_startover= true;
if(res == -1) if(res == -1)
return 0; return 0;
if(res == 0 && buf[0] == 77){ // select return 0
buf[0] = 0;
} else if(res == 0 && buf[0] == 0){ // only newline
buf[0] = '\n';
buf[1] = 0;
} else {
int len = strlen(buf);
buf[len + 1] = '\0';
buf[len] = '\n';
}
return buf; return buf;
} }
...@@ -32,6 +32,7 @@ class ParseInputStream : public InputStream { ...@@ -32,6 +32,7 @@ class ParseInputStream : public InputStream {
char* gets(char * buf, int bufLen); char* gets(char * buf, int bufLen);
void push_back(const char *); void push_back(const char *);
void set_mutex(NdbMutex *m) { in.set_mutex(m); };
private: private:
InputStream & in; InputStream & in;
char * buffer; char * buffer;
...@@ -144,25 +145,32 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -144,25 +145,32 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
{ {
DBUG_ENTER("ParserImpl::run"); DBUG_ENTER("ParserImpl::run");
input.set_mutex(ctx->m_mutex);
* pDst = 0; * pDst = 0;
bool ownStop = false; bool ownStop = false;
if(stop == 0) if(stop == 0)
stop = &ownStop; stop = &ownStop;
ctx->m_aliasUsed.clear(); ctx->m_aliasUsed.clear();
const unsigned sz = sizeof(ctx->m_tokenBuffer); const unsigned sz = sizeof(ctx->m_tokenBuffer);
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){ if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof; ctx->m_status = Parser<Dummy>::Eof;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(ctx->m_currentToken[0] == 0){ int last= strlen(ctx->m_currentToken);
if(last>0)
last--;
if(ctx->m_currentToken[last] !='\n'){
ctx->m_status = Parser<Dummy>::NoLine; ctx->m_status = Parser<Dummy>::NoLine;
ctx->m_tokenBuffer[0]= '\0';
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(Empty(ctx->m_currentToken)){ if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine; ctx->m_status = Parser<Dummy>::EmptyLine;
DBUG_RETURN(false); DBUG_RETURN(false);
...@@ -174,14 +182,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -174,14 +182,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_status = Parser<Dummy>::UnknownCommand; ctx->m_status = Parser<Dummy>::UnknownCommand;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
Properties * p = new Properties(); Properties * p = new Properties();
bool invalidArgument = false; bool invalidArgument = false;
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
while((! * stop) && while((! * stop) &&
!Eof(ctx->m_currentToken) && !Eof(ctx->m_currentToken) &&
!Empty(ctx->m_currentToken)){ !Empty(ctx->m_currentToken)){
if(ctx->m_currentToken[0] != 0){ if(ctx->m_currentToken[0] != 0){
trim(ctx->m_currentToken); trim(ctx->m_currentToken);
...@@ -193,7 +201,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -193,7 +201,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
} }
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
} }
if(invalidArgument){ if(invalidArgument){
char buf[sz]; char buf[sz];
char * tmp; char * tmp;
...@@ -204,13 +212,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -204,13 +212,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
} }
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(* stop){ if(* stop){
delete p; delete p;
ctx->m_status = Parser<Dummy>::ExternalStop; ctx->m_status = Parser<Dummy>::ExternalStop;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(!checkMandatory(ctx, p)){ if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument; ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p; delete p;
...@@ -226,9 +234,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -226,9 +234,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
tmp.put("name", alias->name); tmp.put("name", alias->name);
tmp.put("realName", alias->realName); tmp.put("realName", alias->realName);
p->put("$ALIAS", i, &tmp); p->put("$ALIAS", i, &tmp);
} }
p->put("$ALIAS", ctx->m_aliasUsed.size()); p->put("$ALIAS", ctx->m_aliasUsed.size());
ctx->m_status = Parser<Dummy>::Ok; ctx->m_status = Parser<Dummy>::Ok;
* pDst = p; * pDst = p;
DBUG_RETURN(true); DBUG_RETURN(true);
......
...@@ -49,7 +49,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -49,7 +49,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
char * buf, int buflen){ char * buf, int buflen, NdbMutex *mutex){
if(buflen <= 1) if(buflen <= 1)
return 0; return 0;
...@@ -65,7 +65,12 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -65,7 +65,12 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
timeout.tv_sec = (timeout_millis / 1000); timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000; timeout.tv_usec = (timeout_millis % 1000) * 1000;
if(mutex)
NdbMutex_Unlock(mutex);
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
if(mutex)
NdbMutex_Lock(mutex);
if(selectRes == 0){ if(selectRes == 0){
return 0; return 0;
} }
...@@ -75,7 +80,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -75,7 +80,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
return -1; return -1;
} }
buf[0] = 0;
const int t = recv(socket, buf, buflen, MSG_PEEK); const int t = recv(socket, buf, buflen, MSG_PEEK);
if(t < 1) if(t < 1)
...@@ -87,27 +91,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -87,27 +91,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
for(int i=0; i< t;i++) for(int i=0; i< t;i++)
{ {
if(buf[i] == '\n'){ if(buf[i] == '\n'){
recv(socket, buf, i+1, 0); int r= recv(socket, buf, i+1, 0);
buf[i] = 0; buf[i+1]= 0;
if(r < 1) {
fcntl(socket, F_SETFL, sock_flags);
return -1;
}
if(i > 0 && buf[i-1] == '\r'){ if(i > 0 && buf[i-1] == '\r'){
i--; buf[i-1] = '\n';
buf[i] = 0; buf[i]= '\0';
} }
fcntl(socket, F_SETFL, sock_flags); fcntl(socket, F_SETFL, sock_flags);
return t; return r;
} }
} }
if(t == (buflen - 1)){ int r= recv(socket, buf, t, 0);
recv(socket, buf, t, 0); if(r>=0)
buf[t] = 0; buf[r] = 0;
fcntl(socket, F_SETFL, sock_flags); fcntl(socket, F_SETFL, sock_flags);
return buflen; return r;
}
return 0;
} }
extern "C" extern "C"
......
...@@ -502,6 +502,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, ...@@ -502,6 +502,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
* Only used for low level testing
* Never to be used by end user.
* Or anybody who doesn't know exactly what they're doing.
*/
extern "C"
int
ndb_mgm_get_fd(NdbMgmHandle handle)
{
return handle->socket;
}
/** /**
* Disconnect from a mgm server * Disconnect from a mgm server
*/ */
...@@ -692,22 +704,16 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -692,22 +704,16 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected"); SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL; return NULL;
} }
if(buf[strlen(buf)-1] == '\n') if(strcmp("node status\n", buf) != 0) {
buf[strlen(buf)-1] = '\0';
if(strcmp("node status", buf) != 0) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
if(!in.gets(buf, sizeof(buf))) if(!in.gets(buf, sizeof(buf)))
{ {
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected"); SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL; return NULL;
} }
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
BaseString tmp(buf); BaseString tmp(buf);
Vector<BaseString> split; Vector<BaseString> split;
tmp.split(split, ":"); tmp.split(split, ":");
...@@ -715,7 +721,7 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -715,7 +721,7 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
if(!(split[0].trim() == "nodes")){ if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
...@@ -2280,7 +2286,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){ ...@@ -2280,7 +2286,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){
SocketOutputStream out(handle->socket); SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout); SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32]; char buf[32];
if (out.println("check connection")) if (out.println("check connection"))
goto ndb_mgm_check_connection_error; goto ndb_mgm_check_connection_error;
...@@ -2490,7 +2495,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle) ...@@ -2490,7 +2495,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle)
SocketInputStream in(handle->socket, handle->read_timeout); SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32]; char buf[32];
in.gets(buf, sizeof(buf)); in.gets(buf, sizeof(buf));
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -2548,4 +2552,104 @@ int ndb_mgm_get_version(NdbMgmHandle handle, ...@@ -2548,4 +2552,104 @@ int ndb_mgm_get_version(NdbMgmHandle handle,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
extern "C"
Uint64
ndb_mgm_get_session_id(NdbMgmHandle handle)
{
Uint64 session_id=0;
DBUG_ENTER("ndb_mgm_get_session_id");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get session id reply", NULL, ""),
MGM_ARG("id", Int, Mandatory, "Node ID"),
MGM_END()
};
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get session id", &args);
CHECK_REPLY(prop, 0);
if(!prop->get("id",&session_id)){
fprintf(handle->errstream, "Unable to get session id\n");
return 0;
}
delete prop;
DBUG_RETURN(session_id);
}
extern "C"
int
ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
struct NdbMgmSession *s, int *len)
{
int retval= 0;
DBUG_ENTER("ndb_mgm_get_session");
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
args.put("id", id);
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get session reply", NULL, ""),
MGM_ARG("id", Int, Mandatory, "Node ID"),
MGM_ARG("m_stopSelf", Int, Optional, "m_stopSelf"),
MGM_ARG("m_stop", Int, Optional, "stop session"),
MGM_ARG("nodeid", Int, Optional, "allocated node id"),
MGM_ARG("parser_buffer_len", Int, Optional, "waiting in buffer"),
MGM_ARG("parser_status", Int, Optional, "parser status"),
MGM_END()
};
const Properties *prop;
prop = ndb_mgm_call(handle, reply, "get session", &args);
CHECK_REPLY(prop, 0);
Uint64 r_id;
int rlen= 0;
if(!prop->get("id",&r_id)){
fprintf(handle->errstream, "Unable to get session id\n");
goto err;
}
s->id= r_id;
rlen+=sizeof(s->id);
if(prop->get("m_stopSelf",&(s->m_stopSelf)))
rlen+=sizeof(s->m_stopSelf);
else
goto err;
if(prop->get("m_stop",&(s->m_stop)))
rlen+=sizeof(s->m_stop);
else
goto err;
if(prop->get("nodeid",&(s->nodeid)))
rlen+=sizeof(s->nodeid);
else
goto err;
if(prop->get("parser_buffer_len",&(s->parser_buffer_len)))
{
rlen+=sizeof(s->parser_buffer_len);
if(prop->get("parser_status",&(s->parser_status)))
rlen+=sizeof(s->parser_status);
}
*len= rlen;
retval= 1;
err:
delete prop;
DBUG_RETURN(retval);
}
template class Vector<const ParserRow<ParserDummy>*>; template class Vector<const ParserRow<ParserDummy>*>;
...@@ -270,6 +270,13 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -270,6 +270,13 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("length", Int, Mandatory, "Length"), MGM_ARG("length", Int, Mandatory, "Length"),
MGM_ARG("data", String, Mandatory, "Data"), MGM_ARG("data", String, Mandatory, "Data"),
MGM_CMD("list sessions", &MgmApiSession::listSessions, ""),
MGM_CMD("get session id", &MgmApiSession::getSessionId, ""),
MGM_CMD("get session", &MgmApiSession::getSession, ""),
MGM_ARG("id", Int, Mandatory, "SessionID"),
MGM_END() MGM_END()
}; };
...@@ -282,7 +289,7 @@ struct PurgeStruct ...@@ -282,7 +289,7 @@ struct PurgeStruct
NDB_TICKS tick; NDB_TICKS tick;
}; };
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id)
: SocketServer::Session(sock), m_mgmsrv(mgm) : SocketServer::Session(sock), m_mgmsrv(mgm)
{ {
DBUG_ENTER("MgmApiSession::MgmApiSession"); DBUG_ENTER("MgmApiSession::MgmApiSession");
...@@ -291,6 +298,9 @@ MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) ...@@ -291,6 +298,9 @@ MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
m_parser = new Parser_t(commands, *m_input, true, true, true); m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
m_stopSelf= 0; m_stopSelf= 0;
m_ctx= NULL;
m_session_id= session_id;
m_mutex= NdbMutex_Create();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -314,6 +324,7 @@ MgmApiSession::~MgmApiSession() ...@@ -314,6 +324,7 @@ MgmApiSession::~MgmApiSession()
g_RestartServer= true; g_RestartServer= true;
if(m_stopSelf) if(m_stopSelf)
g_StopServer= true; g_StopServer= true;
NdbMutex_Destroy(m_mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -323,11 +334,19 @@ MgmApiSession::runSession() ...@@ -323,11 +334,19 @@ MgmApiSession::runSession()
DBUG_ENTER("MgmApiSession::runSession"); DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx; Parser_t::Context ctx;
while(!m_stop) { ctx.m_mutex= m_mutex;
m_ctx= &ctx;
bool stop= false;
while(!stop) {
NdbMutex_Lock(m_mutex);
m_parser->run(ctx, *this); m_parser->run(ctx, *this);
if(ctx.m_currentToken == 0) if(ctx.m_currentToken == 0)
{
NdbMutex_Unlock(m_mutex);
break; break;
}
switch(ctx.m_status) { switch(ctx.m_status) {
case Parser_t::UnknownCommand: case Parser_t::UnknownCommand:
...@@ -348,13 +367,19 @@ MgmApiSession::runSession() ...@@ -348,13 +367,19 @@ MgmApiSession::runSession()
default: default:
break; break;
} }
}
stop= m_stop;
NdbMutex_Unlock(m_mutex);
};
NdbMutex_Lock(m_mutex);
m_ctx= NULL;
if(m_socket != NDB_INVALID_SOCKET) if(m_socket != NDB_INVALID_SOCKET)
{ {
NDB_CLOSE_SOCKET(m_socket); NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET; m_socket= NDB_INVALID_SOCKET;
} }
NdbMutex_Unlock(m_mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1568,11 +1593,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, ...@@ -1568,11 +1593,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
result = -1; result = -1;
goto done; goto done;
} }
m_mgmsrv.m_event_listner.add_listener(le);
m_stop = true;
m_socket = NDB_INVALID_SOCKET;
done: done:
m_output->println("listen event"); m_output->println("listen event");
...@@ -1580,6 +1600,13 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, ...@@ -1580,6 +1600,13 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
if(result != 0) if(result != 0)
m_output->println("msg: %s", msg.c_str()); m_output->println("msg: %s", msg.c_str());
m_output->println(""); m_output->println("");
if(result==0)
{
m_mgmsrv.m_event_listner.add_listener(le);
m_stop = true;
m_socket = NDB_INVALID_SOCKET;
}
} }
void void
...@@ -1682,5 +1709,122 @@ MgmApiSession::report_event(Parser_t::Context &ctx, ...@@ -1682,5 +1709,122 @@ MgmApiSession::report_event(Parser_t::Context &ctx,
m_output->println(""); m_output->println("");
} }
void
MgmApiSession::list_session(SocketServer::Session *_s, void *data)
{
MgmApiSession *s= (MgmApiSession *)_s;
MgmApiSession *lister= (MgmApiSession*) data;
if(s!=lister)
NdbMutex_Lock(s->m_mutex);
Uint64 id= s->m_session_id;
lister->m_output->println("session: %llu",id);
lister->m_output->println("session.%llu.m_stopSelf: %d",id,s->m_stopSelf);
lister->m_output->println("session.%llu.m_stop: %d",id,s->m_stop);
lister->m_output->println("session.%llu.allocated.nodeid: %d",id,s->m_allocated_resources->get_nodeid());
if(s->m_ctx)
{
int l= strlen(s->m_ctx->m_tokenBuffer);
char *buf= (char*) malloc(2*l+1);
char *b= buf;
for(int i=0; i<l;i++)
if(s->m_ctx->m_tokenBuffer[i]=='\n')
{
*b++='\\';
*b++='n';
}
else
{
*b++= s->m_ctx->m_tokenBuffer[i];
}
*b= '\0';
lister->m_output->println("session.%llu.parser.buffer.len: %u",id,l);
lister->m_output->println("session.%llu.parser.buffer: %s",id,buf);
lister->m_output->println("session.%llu.parser.status: %d",id,s->m_ctx->m_status);
free(buf);
}
if(s!=lister)
NdbMutex_Unlock(s->m_mutex);
}
void
MgmApiSession::listSessions(Parser_t::Context &ctx,
Properties const &args) {
m_mgmsrv.get_socket_server()->foreachSession(list_session,(void*)this);
m_output->println("");
}
void
MgmApiSession::getSessionId(Parser_t::Context &ctx,
Properties const &args) {
m_output->println("get session id reply");
m_output->println("id: %llu",m_session_id);
m_output->println("");
}
struct get_session_param {
MgmApiSession *l;
Uint64 id;
int found;
};
void
MgmApiSession::get_session(SocketServer::Session *_s, void *data)
{
struct get_session_param *p= (struct get_session_param*)data;
MgmApiSession *s= (MgmApiSession *)_s;
if(s!=p->l)
NdbMutex_Lock(s->m_mutex);
if(p->id != s->m_session_id)
{
if(s!=p->l)
NdbMutex_Unlock(s->m_mutex);
return;
}
p->found= true;
p->l->m_output->println("id: %llu",s->m_session_id);
p->l->m_output->println("m_stopSelf: %d",s->m_stopSelf);
p->l->m_output->println("m_stop: %d",s->m_stop);
p->l->m_output->println("nodeid: %d",s->m_allocated_resources->get_nodeid());
if(s->m_ctx)
{
int l= strlen(s->m_ctx->m_tokenBuffer);
p->l->m_output->println("parser_buffer_len: %u",l);
p->l->m_output->println("parser_status: %d",s->m_ctx->m_status);
}
if(s!=p->l)
NdbMutex_Unlock(s->m_mutex);
}
void
MgmApiSession::getSession(Parser_t::Context &ctx,
Properties const &args) {
Uint64 id;
struct get_session_param p;
args.get("id", &id);
p.l= this;
p.id= id;
p.found= false;
m_output->println("get session reply");
m_mgmsrv.get_socket_server()->foreachSession(get_session,(void*)&p);
if(p.found==false)
m_output->println("id: 0");
m_output->println("");
}
template class MutexVector<int>; template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>; template class Vector<ParserRow<MgmApiSession> const*>;
...@@ -32,6 +32,8 @@ class MgmApiSession : public SocketServer::Session ...@@ -32,6 +32,8 @@ class MgmApiSession : public SocketServer::Session
{ {
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data); static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
static void list_session(SocketServer::Session *_s, void *data);
static void get_session(SocketServer::Session *_s, void *data);
private: private:
typedef Parser<MgmApiSession> Parser_t; typedef Parser<MgmApiSession> Parser_t;
...@@ -42,6 +44,11 @@ private: ...@@ -42,6 +44,11 @@ private:
MgmtSrvr::Allocated_resources *m_allocated_resources; MgmtSrvr::Allocated_resources *m_allocated_resources;
char m_err_str[1024]; char m_err_str[1024];
int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop
NdbMutex *m_mutex;
// for listing sessions and other fun:
Parser_t::Context *m_ctx;
Uint64 m_session_id;
void getConfig_common(Parser_t::Context &ctx, void getConfig_common(Parser_t::Context &ctx,
const class Properties &args, const class Properties &args,
...@@ -50,7 +57,7 @@ private: ...@@ -50,7 +57,7 @@ private:
{ return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); } { return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); }
public: public:
MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock); MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id);
virtual ~MgmApiSession(); virtual ~MgmApiSession();
void runSession(); void runSession();
...@@ -107,13 +114,20 @@ public: ...@@ -107,13 +114,20 @@ public:
void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args); void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args);
void report_event(Parser_t::Context &ctx, Properties const &args); void report_event(Parser_t::Context &ctx, Properties const &args);
void listSessions(Parser_t::Context &ctx, Properties const &args);
void getSessionId(Parser_t::Context &ctx, Properties const &args);
void getSession(Parser_t::Context &ctx, Properties const &args);
}; };
class MgmApiService : public SocketServer::Service { class MgmApiService : public SocketServer::Service {
class MgmtSrvr * m_mgmsrv; class MgmtSrvr * m_mgmsrv;
Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer
public: public:
MgmApiService(){ MgmApiService(){
m_mgmsrv = 0; m_mgmsrv = 0;
m_next_session_id= 1;
} }
void setMgm(class MgmtSrvr * mgmsrv){ void setMgm(class MgmtSrvr * mgmsrv){
...@@ -121,7 +135,7 @@ public: ...@@ -121,7 +135,7 @@ public:
} }
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){ SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){
return new MgmApiSession(* m_mgmsrv, socket); return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++);
} }
}; };
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include <NdbRestarter.hpp> #include <NdbRestarter.hpp>
#include <Vector.hpp> #include <Vector.hpp>
#include <random.h> #include <random.h>
#include <mgmapi.h>
#include <mgmapi_debug.h>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
...@@ -167,6 +169,44 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -167,6 +169,44 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){
return result; return result;
} }
int runTestApiSession(NDBT_Context* ctx, NDBT_Step* step)
{
char *mgm= ctx->getRemoteMgm();
Uint64 session_id= 0;
NdbMgmHandle h;
h= ndb_mgm_create_handle();
ndb_mgm_set_connectstring(h, mgm);
ndb_mgm_connect(h,0,0,0);
int s= ndb_mgm_get_fd(h);
session_id= ndb_mgm_get_session_id(h);
ndbout << "MGM Session id: " << session_id << endl;
write(s,"get",3);
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
struct NdbMgmSession sess;
int slen= sizeof(struct NdbMgmSession);
h= ndb_mgm_create_handle();
ndb_mgm_set_connectstring(h, mgm);
ndb_mgm_connect(h,0,0,0);
if(ndb_mgm_get_session(h,session_id,&sess,&slen))
{
ndbout << "Failed, session still exists" << endl;
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
return NDBT_FAILED;
}
else
{
ndbout << "SUCCESS: session is gone" << endl;
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
return NDBT_OK;
}
}
NDBT_TESTSUITE(testMgm); NDBT_TESTSUITE(testMgm);
...@@ -175,6 +215,11 @@ TESTCASE("SingleUserMode", ...@@ -175,6 +215,11 @@ TESTCASE("SingleUserMode",
INITIALIZER(runTestSingleUserMode); INITIALIZER(runTestSingleUserMode);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("ApiSessionFailure",
"Test failures in MGMAPI session"){
INITIALIZER(runTestApiSession);
}
NDBT_TESTSUITE_END(testMgm); NDBT_TESTSUITE_END(testMgm);
int main(int argc, const char** argv){ int main(int argc, const char** argv){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment