Commit 46bb6f65 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.0-main
parents 5e978cc0 868f7df7
...@@ -105,7 +105,8 @@ public: ...@@ -105,7 +105,8 @@ public:
void stopSessions(bool wait = false); void stopSessions(bool wait = false);
void foreachSession(void (*f)(Session*, void*), void *data); void foreachSession(void (*f)(Session*, void*), void *data);
void checkSessions();
private: private:
struct SessionInstance { struct SessionInstance {
Service * m_service; Service * m_service;
...@@ -116,12 +117,13 @@ private: ...@@ -116,12 +117,13 @@ private:
Service * m_service; Service * m_service;
NDB_SOCKET_TYPE m_socket; NDB_SOCKET_TYPE m_socket;
}; };
MutexVector<SessionInstance> m_sessions; NdbLockable m_session_mutex;
Vector<SessionInstance> m_sessions;
MutexVector<ServiceInstance> m_services; MutexVector<ServiceInstance> m_services;
unsigned m_maxSessions; unsigned m_maxSessions;
void doAccept(); void doAccept();
void checkSessions(); void checkSessionsImpl();
void startSession(SessionInstance &); void startSession(SessionInstance &);
/** /**
......
...@@ -184,9 +184,12 @@ SocketServer::doAccept(){ ...@@ -184,9 +184,12 @@ SocketServer::doAccept(){
SessionInstance s; SessionInstance s;
s.m_service = si.m_service; s.m_service = si.m_service;
s.m_session = si.m_service->newSession(childSock); s.m_session = si.m_service->newSession(childSock);
if(s.m_session != 0){ if(s.m_session != 0)
{
m_session_mutex.lock();
m_sessions.push_back(s); m_sessions.push_back(s);
startSession(m_sessions.back()); startSession(m_sessions.back());
m_session_mutex.unlock();
} }
continue; continue;
...@@ -240,10 +243,13 @@ void ...@@ -240,10 +243,13 @@ void
SocketServer::doRun(){ SocketServer::doRun(){
while(!m_stopThread){ while(!m_stopThread){
checkSessions(); m_session_mutex.lock();
checkSessionsImpl();
if(m_sessions.size() < m_maxSessions){ if(m_sessions.size() < m_maxSessions){
m_session_mutex.unlock();
doAccept(); doAccept();
} else { } else {
m_session_mutex.unlock();
NdbSleep_MilliSleep(200); NdbSleep_MilliSleep(200);
} }
} }
...@@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){ ...@@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){
void void
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
{ {
m_session_mutex.lock();
for(int i = m_sessions.size() - 1; i >= 0; i--){ for(int i = m_sessions.size() - 1; i >= 0; i--){
(*func)(m_sessions[i].m_session, data); (*func)(m_sessions[i].m_session, data);
} }
checkSessions(); m_session_mutex.unlock();
} }
void void
SocketServer::checkSessions(){ SocketServer::checkSessions()
for(int i = m_sessions.size() - 1; i >= 0; i--){ {
if(m_sessions[i].m_session->m_stopped){ m_session_mutex.lock();
if(m_sessions[i].m_thread != 0){ checkSessionsImpl();
m_session_mutex.unlock();
}
void
SocketServer::checkSessionsImpl()
{
for(int i = m_sessions.size() - 1; i >= 0; i--)
{
if(m_sessions[i].m_session->m_stopped)
{
if(m_sessions[i].m_thread != 0)
{
void* ret; void* ret;
NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
NdbThread_Destroy(&m_sessions[i].m_thread); NdbThread_Destroy(&m_sessions[i].m_thread);
...@@ -301,19 +320,26 @@ SocketServer::checkSessions(){ ...@@ -301,19 +320,26 @@ SocketServer::checkSessions(){
void void
SocketServer::stopSessions(bool wait){ SocketServer::stopSessions(bool wait){
int i; int i;
m_session_mutex.lock();
for(i = m_sessions.size() - 1; i>=0; i--) for(i = m_sessions.size() - 1; i>=0; i--)
{ {
m_sessions[i].m_session->stopSession(); m_sessions[i].m_session->stopSession();
m_sessions[i].m_session->m_stop = true; // to make sure m_sessions[i].m_session->m_stop = true; // to make sure
} }
m_session_mutex.unlock();
for(i = m_services.size() - 1; i>=0; i--) for(i = m_services.size() - 1; i>=0; i--)
m_services[i].m_service->stopSessions(); m_services[i].m_service->stopSessions();
if(wait){ if(wait){
m_session_mutex.lock();
while(m_sessions.size() > 0){ while(m_sessions.size() > 0){
checkSessions(); checkSessionsImpl();
m_session_mutex.unlock();
NdbSleep_MilliSleep(100); NdbSleep_MilliSleep(100);
m_session_mutex.lock();
} }
m_session_mutex.unlock();
} }
} }
...@@ -348,4 +374,4 @@ sessionThread_C(void* _sc){ ...@@ -348,4 +374,4 @@ sessionThread_C(void* _sc){
} }
template class MutexVector<SocketServer::ServiceInstance>; template class MutexVector<SocketServer::ServiceInstance>;
template class MutexVector<SocketServer::SessionInstance>; template class Vector<SocketServer::SessionInstance>;
...@@ -49,7 +49,9 @@ extern EventLogger g_eventLogger; ...@@ -49,7 +49,9 @@ extern EventLogger g_eventLogger;
enum ndbd_options { enum ndbd_options {
OPT_INITIAL = NDB_STD_OPTIONS_LAST, OPT_INITIAL = NDB_STD_OPTIONS_LAST,
OPT_NODAEMON, OPT_NODAEMON,
OPT_FOREGROUND OPT_FOREGROUND,
OPT_NOWAIT_NODES,
OPT_INITIAL_START
}; };
NDB_STD_OPTS_VARS; NDB_STD_OPTS_VARS;
...@@ -88,11 +90,11 @@ static struct my_option my_long_options[] = ...@@ -88,11 +90,11 @@ static struct my_option my_long_options[] =
" (implies --nodaemon)", " (implies --nodaemon)",
(gptr*) &_foreground, (gptr*) &_foreground, 0, (gptr*) &_foreground, (gptr*) &_foreground, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "nowait-nodes", NO_ARG, { "nowait-nodes", OPT_NOWAIT_NODES,
"Nodes that will not be waited for during start", "Nodes that will not be waited for during start",
(gptr*) &_nowait_nodes, (gptr*) &_nowait_nodes, 0, (gptr*) &_nowait_nodes, (gptr*) &_nowait_nodes, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "initial-start", NO_ARG, { "initial-start", OPT_INITIAL_START,
"Perform initial start", "Perform initial start",
(gptr*) &_initialstart, (gptr*) &_initialstart, 0, (gptr*) &_initialstart, (gptr*) &_initialstart, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
......
...@@ -501,6 +501,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -501,6 +501,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
ps.tick= tick; ps.tick= tick;
m_mgmsrv.get_socket_server()-> m_mgmsrv.get_socket_server()->
foreachSession(stop_session_if_timed_out,&ps); foreachSession(stop_session_if_timed_out,&ps);
m_mgmsrv.get_socket_server()->checkSessions();
error_string = ""; error_string = "";
continue; continue;
} }
...@@ -1558,6 +1559,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, ...@@ -1558,6 +1559,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
m_mgmsrv.get_socket_server()->checkSessions();
m_output->println("purge stale sessions reply"); m_output->println("purge stale sessions reply");
if (str.length() > 0) if (str.length() > 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment