Commit aa8b15b7 authored by unknown's avatar unknown

[PATCH] WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request

use portable method, getting milliseconds between calls - Linux would
let us do funky stuff by getting the timeout from select(2). Everywhere
else sucks and doesn't let us do that :(

Index: ndb-work/storage/ndb/include/util/InputStream.hpp
===================================================================


storage/ndb/include/util/InputStream.hpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/include/util/OutputStream.hpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/include/util/socket_io.h:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/src/common/util/InputStream.cpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/src/common/util/OutputStream.cpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/src/common/util/socket_io.cpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
storage/ndb/src/mgmsrv/Services.cpp:
  WL#3704 mgmapi timeouts: Change to have total timeout for call, not per request
parent c59722ad
...@@ -49,6 +49,7 @@ extern FileInputStream Stdin; ...@@ -49,6 +49,7 @@ extern FileInputStream Stdin;
class SocketInputStream : public InputStream { class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket; NDB_SOCKET_TYPE m_socket;
unsigned m_timeout_ms; unsigned m_timeout_ms;
unsigned m_timeout_remain;
bool m_startover; bool m_startover;
bool m_timedout; bool m_timedout;
public: public:
......
...@@ -46,6 +46,7 @@ class SocketOutputStream : public OutputStream { ...@@ -46,6 +46,7 @@ class SocketOutputStream : public OutputStream {
NDB_SOCKET_TYPE m_socket; NDB_SOCKET_TYPE m_socket;
unsigned m_timeout_ms; unsigned m_timeout_ms;
bool m_timedout; bool m_timedout;
unsigned m_timeout_remain;
public: public:
SocketOutputStream(NDB_SOCKET_TYPE socket, unsigned write_timeout_ms = 1000); SocketOutputStream(NDB_SOCKET_TYPE socket, unsigned write_timeout_ms = 1000);
virtual ~SocketOutputStream() {} virtual ~SocketOutputStream() {}
......
...@@ -28,15 +28,20 @@ extern "C" { ...@@ -28,15 +28,20 @@ extern "C" {
int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len); int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
char * buf, int buflen, NdbMutex *mutex); char * buf, int buflen, NdbMutex *mutex);
int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len); int write_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time,
const char[], int len);
int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...);
int println_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...); int print_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time,
int vprint_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, va_list); const char *, ...);
int vprintln_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, va_list); int println_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time,
const char *, ...);
int vprint_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time,
const char *, va_list);
int vprintln_socket(NDB_SOCKET_TYPE, int timeout_ms, int *time,
const char *, va_list);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -37,7 +37,8 @@ SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket, ...@@ -37,7 +37,8 @@ SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned read_timeout_ms) unsigned read_timeout_ms)
: m_socket(socket) { : m_socket(socket) {
m_startover= true; m_startover= true;
m_timeout_ms = read_timeout_ms; m_timeout_remain= m_timeout_ms = read_timeout_ms;
m_timedout= false; m_timedout= false;
} }
...@@ -55,9 +56,13 @@ SocketInputStream::gets(char * buf, int bufLen) { ...@@ -55,9 +56,13 @@ SocketInputStream::gets(char * buf, int bufLen) {
else else
offset= strlen(buf); offset= strlen(buf);
int res = readln_socket(m_socket, m_timeout_ms, buf+offset, bufLen-offset, m_mutex); int time= 0;
int res = readln_socket(m_socket, m_timeout_remain, &time,
buf+offset, bufLen-offset, m_mutex);
if(res == 0) if(res >= 0)
m_timeout_remain-=time;
if(res == 0 || m_timeout_remain<=0)
{ {
m_timedout= true; m_timedout= true;
buf[0]=0; buf[0]=0;
......
...@@ -44,7 +44,7 @@ FileOutputStream::println(const char * fmt, ...){ ...@@ -44,7 +44,7 @@ FileOutputStream::println(const char * fmt, ...){
SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket, SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket,
unsigned write_timeout_ms){ unsigned write_timeout_ms){
m_socket = socket; m_socket = socket;
m_timeout_ms = write_timeout_ms; m_timeout_remain= m_timeout_ms = write_timeout_ms;
m_timedout= false; m_timedout= false;
} }
...@@ -55,12 +55,18 @@ SocketOutputStream::print(const char * fmt, ...){ ...@@ -55,12 +55,18 @@ SocketOutputStream::print(const char * fmt, ...){
if(timedout()) if(timedout())
return -1; return -1;
int time= 0;
va_start(ap, fmt); va_start(ap, fmt);
const int ret = vprint_socket(m_socket, m_timeout_ms, fmt, ap); int ret = vprint_socket(m_socket, m_timeout_ms, &time, fmt, ap);
va_end(ap); va_end(ap);
if (errno==ETIMEDOUT) if(ret >= 0)
m_timeout_remain-=time;
if(errno==ETIMEDOUT || m_timeout_remain<=0)
{
m_timedout= true; m_timedout= true;
ret= -1;
}
return ret; return ret;
} }
...@@ -71,12 +77,18 @@ SocketOutputStream::println(const char * fmt, ...){ ...@@ -71,12 +77,18 @@ SocketOutputStream::println(const char * fmt, ...){
if(timedout()) if(timedout())
return -1; return -1;
int time= 0;
va_start(ap, fmt); va_start(ap, fmt);
const int ret = vprintln_socket(m_socket, m_timeout_ms, fmt, ap); int ret = vprintln_socket(m_socket, m_timeout_ms, &time, fmt, ap);
va_end(ap); va_end(ap);
if (errno==ETIMEDOUT) if(ret >= 0)
m_timeout_remain-=time;
if (errno==ETIMEDOUT || m_timeout_remain<=0)
{
m_timedout= true; m_timedout= true;
ret= -1;
}
return ret; return ret;
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <NdbTCP.h> #include <NdbTCP.h>
#include <socket_io.h> #include <socket_io.h>
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <NdbTick.h>
extern "C" extern "C"
int int
...@@ -47,7 +48,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -47,7 +48,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
char * buf, int buflen, NdbMutex *mutex){ char * buf, int buflen, NdbMutex *mutex){
if(buflen <= 1) if(buflen <= 1)
return 0; return 0;
...@@ -62,7 +63,10 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -62,7 +63,10 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
if(mutex) if(mutex)
NdbMutex_Unlock(mutex); NdbMutex_Unlock(mutex);
Uint64 tick= NdbTick_CurrentMillisecond();
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
*time= NdbTick_CurrentMillisecond() - tick;
if(mutex) if(mutex)
NdbMutex_Lock(mutex); NdbMutex_Lock(mutex);
...@@ -126,9 +130,13 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -126,9 +130,13 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
FD_ZERO(&readset); FD_ZERO(&readset);
FD_SET(socket, &readset); FD_SET(socket, &readset);
timeout.tv_sec = (timeout_millis / 1000); timeout.tv_sec = ((timeout_millis - *time) / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000; timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
tick= NdbTick_CurrentMillisecond();
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout); const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
*time= NdbTick_CurrentMillisecond() - tick;
if(selectRes != 1){ if(selectRes != 1){
return -1; return -1;
} }
...@@ -139,7 +147,7 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -139,7 +147,7 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char buf[], int len){ const char buf[], int len){
fd_set writeset; fd_set writeset;
FD_ZERO(&writeset); FD_ZERO(&writeset);
...@@ -148,7 +156,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -148,7 +156,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
timeout.tv_sec = (timeout_millis / 1000); timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000; timeout.tv_usec = (timeout_millis % 1000) * 1000;
Uint64 tick= NdbTick_CurrentMillisecond();
const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout); const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout);
*time= NdbTick_CurrentMillisecond() - tick;
if(selectRes != 1){ if(selectRes != 1){
return -1; return -1;
} }
...@@ -167,9 +179,13 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -167,9 +179,13 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
FD_ZERO(&writeset); FD_ZERO(&writeset);
FD_SET(socket, &writeset); FD_SET(socket, &writeset);
timeout.tv_sec = 1; timeout.tv_sec = ((timeout_millis - *time) / 1000);
timeout.tv_usec = 0; timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
Uint64 tick= NdbTick_CurrentMillisecond();
const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout); const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout);
*time= NdbTick_CurrentMillisecond() - tick;
if(selectRes2 != 1){ if(selectRes2 != 1){
return -1; return -1;
} }
...@@ -180,11 +196,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -180,11 +196,11 @@ write_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char * fmt, ...){ const char * fmt, ...){
va_list ap; va_list ap;
va_start(ap, fmt); va_start(ap, fmt);
int ret = vprint_socket(socket, timeout_millis, fmt, ap); int ret = vprint_socket(socket, timeout_millis, time, fmt, ap);
va_end(ap); va_end(ap);
return ret; return ret;
...@@ -192,18 +208,18 @@ print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -192,18 +208,18 @@ print_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
println_socket(NDB_SOCKET_TYPE socket, int timeout_millis, println_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char * fmt, ...){ const char * fmt, ...){
va_list ap; va_list ap;
va_start(ap, fmt); va_start(ap, fmt);
int ret = vprintln_socket(socket, timeout_millis, fmt, ap); int ret = vprintln_socket(socket, timeout_millis, time, fmt, ap);
va_end(ap); va_end(ap);
return ret; return ret;
} }
extern "C" extern "C"
int int
vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char * fmt, va_list ap){ const char * fmt, va_list ap){
char buf[1000]; char buf[1000];
char *buf2 = buf; char *buf2 = buf;
...@@ -221,7 +237,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -221,7 +237,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
} else } else
return 0; return 0;
int ret = write_socket(socket, timeout_millis, buf2, size); int ret = write_socket(socket, timeout_millis, time, buf2, size);
if(buf2 != buf) if(buf2 != buf)
free(buf2); free(buf2);
return ret; return ret;
...@@ -229,7 +245,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -229,7 +245,7 @@ vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C" extern "C"
int int
vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char * fmt, va_list ap){ const char * fmt, va_list ap){
char buf[1000]; char buf[1000];
char *buf2 = buf; char *buf2 = buf;
...@@ -249,7 +265,7 @@ vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -249,7 +265,7 @@ vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
} }
buf2[size-1]='\n'; buf2[size-1]='\n';
int ret = write_socket(socket, timeout_millis, buf2, size); int ret = write_socket(socket, timeout_millis, time, buf2, size);
if(buf2 != buf) if(buf2 != buf)
free(buf2); free(buf2);
return ret; return ret;
......
...@@ -1334,23 +1334,24 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1334,23 +1334,24 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
{ {
if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)) if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat))
{ {
NDB_SOCKET_TYPE fd= m_clients[i].m_socket; if(m_clients[i].m_socket==NDB_INVALID_SOCKET)
if(fd != NDB_INVALID_SOCKET) continue;
{
SocketOutputStream out(m_clients[i].m_socket);
int r; int r;
if (m_clients[i].m_parsable) if (m_clients[i].m_parsable)
r= println_socket(fd, r= out.println(str.c_str());
MAX_WRITE_TIMEOUT, str.c_str());
else else
r= println_socket(fd, r= out.println(m_text);
MAX_WRITE_TIMEOUT, m_text);
if (r == -1) { if (r<0)
copy.push_back(fd); {
copy.push_back(m_clients[i].m_socket);
m_clients.erase(i, false); m_clients.erase(i, false);
} }
} }
} }
}
m_clients.unlock(); m_clients.unlock();
if ((n= (int)copy.size())) if ((n= (int)copy.size()))
...@@ -1398,14 +1399,16 @@ Ndb_mgmd_event_service::check_listeners() ...@@ -1398,14 +1399,16 @@ Ndb_mgmd_event_service::check_listeners()
m_clients.lock(); m_clients.lock();
for(i= m_clients.size() - 1; i >= 0; i--) for(i= m_clients.size() - 1; i >= 0; i--)
{ {
int fd= m_clients[i].m_socket; if(m_clients[i].m_socket==NDB_INVALID_SOCKET)
DBUG_PRINT("info",("%d %d",i,fd)); continue;
char buf[1];
buf[0]=0; SocketOutputStream out(m_clients[i].m_socket);
if (fd != NDB_INVALID_SOCKET &&
println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1) DBUG_PRINT("info",("%d %d",i,m_clients[i].m_socket));
if(out.println("<PING>") < 0)
{ {
NDB_CLOSE_SOCKET(fd); NDB_CLOSE_SOCKET(m_clients[i].m_socket);
m_clients.erase(i, false); m_clients.erase(i, false);
n=1; n=1;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment