Commit be6596bf authored by unknown's avatar unknown

ndb: added timeout handling to alloc node id to avoid the usage of purge stale sessions

parent 55ce5e30
...@@ -2378,6 +2378,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) ...@@ -2378,6 +2378,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m) MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m) : m_mgmsrv(m)
{ {
m_reserved_nodes.clear();
m_alloc_timeout= 0;
} }
MgmtSrvr::Allocated_resources::~Allocated_resources() MgmtSrvr::Allocated_resources::~Allocated_resources()
...@@ -2396,9 +2398,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() ...@@ -2396,9 +2398,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
} }
void void
MgmtSrvr::Allocated_resources::reserve_node(NodeId id) MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout)
{ {
m_reserved_nodes.set(id); m_reserved_nodes.set(id);
m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout;
}
bool
MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick)
{
if (m_alloc_timeout && tick > m_alloc_timeout)
{
g_eventLogger.info("Mgmt server state: nodeid %d timed out.",
get_nodeid());
return true;
}
return false;
} }
NodeId NodeId
......
...@@ -106,7 +106,8 @@ public: ...@@ -106,7 +106,8 @@ public:
~Allocated_resources(); ~Allocated_resources();
// methods to reserve/allocate resources which // methods to reserve/allocate resources which
// will be freed when running destructor // will be freed when running destructor
void reserve_node(NodeId id); void reserve_node(NodeId id, NDB_TICKS timeout);
bool is_timed_out(NDB_TICKS tick);
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); }
bool isclear() { return m_reserved_nodes.isclear(); } bool isclear() { return m_reserved_nodes.isclear(); }
...@@ -114,6 +115,7 @@ public: ...@@ -114,6 +115,7 @@ public:
private: private:
MgmtSrvr &m_mgmsrv; MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes; NodeBitmask m_reserved_nodes;
NDB_TICKS m_alloc_timeout;
}; };
NdbMutex *m_node_id_mutex; NdbMutex *m_node_id_mutex;
......
...@@ -137,6 +137,7 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -137,6 +137,7 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("public key", String, Mandatory, "Public key"), MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_ARG("endian", String, Optional, "Endianness"), MGM_ARG("endian", String, Optional, "Endianness"),
MGM_ARG("name", String, Optional, "Name of connection"), MGM_ARG("name", String, Optional, "Name of connection"),
MGM_ARG("timeout", Int, Optional, "Timeout in seconds"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""), MGM_CMD("get version", &MgmApiSession::getVersion, ""),
...@@ -259,6 +260,15 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -259,6 +260,15 @@ ParserRow<MgmApiSession> commands[] = {
MGM_END() MGM_END()
}; };
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
NDB_TICKS tick;
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm) : SocketServer::Session(sock), m_mgmsrv(mgm)
{ {
...@@ -408,6 +418,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -408,6 +418,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
{ {
const char *cmd= "get nodeid reply"; const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff; Uint32 version, nodeid= 0, nodetype= 0xff;
Uint32 timeout= 20; // default seconds timeout
const char * transporter; const char * transporter;
const char * user; const char * user;
const char * password; const char * password;
...@@ -425,6 +436,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -425,6 +436,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
args.get("public key", &public_key); args.get("public key", &public_key);
args.get("endian", &endian); args.get("endian", &endian);
args.get("name", &name); args.get("name", &name);
args.get("timeout", &timeout);
endian_check.l = 1; endian_check.l = 1;
if(endian if(endian
...@@ -464,8 +476,24 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -464,8 +476,24 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
NodeId tmp= nodeid; NodeId tmp= nodeid;
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string; BaseString error_string;
if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, NDB_TICKS tick= 0;
&addr, &addrlen, error_string)){ while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
&addr, &addrlen, error_string))
{
if (tick == 0)
{
// attempt to free any timed out reservations
tick= NdbTick_CurrentMillisecond();
struct PurgeStruct ps;
m_mgmsrv.get_connected_nodes(ps.free_nodes);
// invert connected_nodes to get free nodes
ps.free_nodes.bitXORC(NodeBitmask());
ps.str= 0;
ps.tick= tick;
m_mgmsrv.get_socket_server()->
foreachSession(stop_session_if_timed_out,&ps);
continue;
}
const char *alias; const char *alias;
const char *str; const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
...@@ -491,7 +519,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -491,7 +519,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
m_output->println("nodeid: %u", tmp); m_output->println("nodeid: %u", tmp);
m_output->println("result: Ok"); m_output->println("result: Ok");
m_output->println(""); m_output->println("");
m_allocated_resources->reserve_node(tmp); m_allocated_resources->reserve_node(tmp, timeout*1000);
if (name) if (name)
g_eventLogger.info("Node %d: %s", tmp, name); g_eventLogger.info("Node %d: %s", tmp, name);
...@@ -1480,14 +1508,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, ...@@ -1480,14 +1508,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
m_output->println(""); m_output->println("");
} }
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
};
void void
MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
{ {
...@@ -1495,7 +1515,20 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da ...@@ -1495,7 +1515,20 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da
struct PurgeStruct &ps= *(struct PurgeStruct *)data; struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes)) if (s->m_allocated_resources->is_reserved(ps.free_nodes))
{ {
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); if (ps.str)
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
s->stopSession();
}
}
void
MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data)
{
MgmApiSession *s= (MgmApiSession *)_s;
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes) &&
s->m_allocated_resources->is_timed_out(ps.tick))
{
s->stopSession(); s->stopSession();
} }
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
class MgmApiSession : public SocketServer::Session class MgmApiSession : public SocketServer::Session
{ {
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
private: private:
typedef Parser<MgmApiSession> Parser_t; typedef Parser<MgmApiSession> Parser_t;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment