Commit e02b8608 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Windows, generic threadpool cleanups

- get rid of casts between int and HANDLE.
- store file descriptor in TP_connection_generic to get
rid of multiple mysql_socket_getfd(c->thd->net.vio->mysql_socket))

Also support named pipes (no reason not to support, since it is easy)
parent 77872e45
......@@ -28,11 +28,19 @@
#endif
#ifdef HAVE_IOCP
#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
#ifdef _WIN32
typedef HANDLE TP_file_handle;
#else
typedef int TP_file_handle;
#define INVALID_HANDLE_VALUE -1
#endif
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
......@@ -59,10 +67,10 @@ typedef OVERLAPPED_ENTRY native_event;
#pragma warning (disable : 4312)
#endif
static void io_poll_close(int fd)
static void io_poll_close(TP_file_handle fd)
{
#ifdef _WIN32
CloseHandle((HANDLE)fd);
CloseHandle(fd);
#else
close(fd);
#endif
......@@ -151,14 +159,17 @@ struct TP_connection_generic:public TP_connection
TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout;
ulonglong dequeue_time;
TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
#ifdef _WIN32
enum_vio_type vio_type;
#endif
};
typedef TP_connection_generic TP_connection_generic;
typedef I_P_List<TP_connection_generic,
I_P_List_adapter<TP_connection_generic,
......@@ -177,7 +188,7 @@ struct thread_group_t
worker_list_t waiting_threads;
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
int pollfd;
TP_file_handle pollfd;
int thread_count;
int active_thread_count;
int connection_count;
......@@ -245,11 +256,11 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor
On Linux: epoll_create()
- io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
- io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
- io_poll_disassociate_fd(int pollfd, int fd)
- io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
Associate file descriptor with io poll descriptor
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
......@@ -259,7 +270,7 @@ static void print_pool_blocked_message(bool);
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
- io_poll_wait (int pollfd, native_event *native_events, int maxevents,
- io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
wait until one or more descriptors added with io_poll_associate_fd()
......@@ -276,13 +287,13 @@ static void print_pool_blocked_message(bool);
/* Early 2.6 kernel did not have EPOLLRDHUP */
#define EPOLLRDHUP 0
#endif
static int io_poll_create()
static TP_file_handle io_poll_create()
{
return epoll_create(1);
}
int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
......@@ -293,7 +304,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
int io_poll_start_read(int pollfd, int fd, void *data, void *)
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
......@@ -302,7 +313,7 @@ int io_poll_start_read(int pollfd, int fd, void *data, void *)
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
}
int io_poll_disassociate_fd(int pollfd, int fd)
int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct epoll_event ev;
return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
......@@ -314,7 +325,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
NOTE - in case of EINTR, it restarts with original timeout. Since we use
either infinite or 0 timeouts, this is not critical
*/
int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
{
int ret;
......@@ -347,12 +358,12 @@ static void *native_event_get_userdata(native_event *event)
#endif
int io_poll_create()
TP_file_handle io_poll_create()
{
return kqueue();
}
int io_poll_start_read(int pollfd, int fd, void *data,void *)
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
......@@ -361,7 +372,7 @@ int io_poll_start_read(int pollfd, int fd, void *data,void *)
}
int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
......@@ -370,7 +381,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
}
int io_poll_disassociate_fd(int pollfd, int fd)
int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct kevent ke;
MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
......@@ -378,7 +389,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
}
int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
......@@ -403,27 +414,27 @@ static void* native_event_get_userdata(native_event *event)
#elif defined (__sun)
static int io_poll_create()
static TP_file_handle io_poll_create()
{
return port_create();
}
int io_poll_start_read(int pollfd, int fd, void *data, void *)
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return io_poll_start_read(pollfd, fd, data, 0);
}
int io_poll_disassociate_fd(int pollfd, int fd)
int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
}
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
......@@ -451,25 +462,32 @@ static void* native_event_get_userdata(native_event *event)
#elif defined(HAVE_IOCP)
static int io_poll_create()
static TP_file_handle io_poll_create()
{
HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
return PtrToInt(h);
return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
}
int io_poll_start_read(int pollfd, int fd, void *, void *opt)
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
DWORD num_bytes = 0;
static char c;
TP_connection_generic *con= (TP_connection_generic *)opt;
OVERLAPPED *overlapped= &con->overlapped;
if (con->vio_type == VIO_TYPE_NAMEDPIPE)
{
if (ReadFile(fd, &c, 0, NULL, overlapped))
return 0;
}
else
{
WSABUF buf;
buf.buf= &c;
buf.len= 0;
DWORD flags=0;
WSABUF buf;
buf.buf= &c;
buf.len= 0;
DWORD flags=0;
if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
return 0;
if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
return 0;
}
if (GetLastError() == ERROR_IO_PENDING)
return 0;
......@@ -478,26 +496,26 @@ int io_poll_start_read(int pollfd, int fd, void *, void *opt)
}
static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
{
HANDLE h= CreateIoCompletionPort(IntToPtr(fd), IntToPtr(pollfd), (ULONG_PTR)data, 0);
HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
if (!h)
return -1;
return io_poll_start_read(pollfd,fd, 0, opt);
}
int io_poll_disassociate_fd(int pollfd, int fd)
int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
/* Not possible to unbind/rebind file descriptor in IOCP. */
return 0;
}
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
ULONG n;
BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
maxevents, &n, timeout_ms, FALSE);
return ok ? (int)n : -1;
......@@ -1038,7 +1056,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
DBUG_ENTER("thread_group_init");
thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
thread_group->pollfd= -1;
thread_group->pollfd= INVALID_HANDLE_VALUE;
thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1;
queue_init(thread_group);
......@@ -1049,10 +1067,10 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
void thread_group_destroy(thread_group_t *thread_group)
{
mysql_mutex_destroy(&thread_group->mutex);
if (thread_group->pollfd != -1)
if (thread_group->pollfd != INVALID_HANDLE_VALUE)
{
io_poll_close(thread_group->pollfd);
thread_group->pollfd= -1;
thread_group->pollfd= INVALID_HANDLE_VALUE;
}
#ifndef HAVE_IOCP
for(int i=0; i < 2; i++)
......@@ -1109,7 +1127,7 @@ static int wake_listener(thread_group_t *thread_group)
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
return -1;
#else
PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
#endif
return 0;
}
......@@ -1432,6 +1450,16 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
, overlapped()
#endif
{
DBUG_ASSERT(c->vio);
#ifdef _WIN32
vio_type= c->vio->type;
fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
#else
fd= mysql_socket_getfd(c->vio->mysql_socket);
#endif
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[c->thread_id%group_count];
......@@ -1486,7 +1514,6 @@ static int change_group(TP_connection_generic *c,
thread_group_t *new_group)
{
int ret= 0;
int fd= (int)mysql_socket_getfd(c->thd->net.vio->mysql_socket);
DBUG_ASSERT(c->thread_group == old_group);
......@@ -1494,7 +1521,7 @@ static int change_group(TP_connection_generic *c,
mysql_mutex_lock(&old_group->mutex);
if (c->bound_to_poll_descriptor)
{
io_poll_disassociate_fd(old_group->pollfd,fd);
io_poll_disassociate_fd(old_group->pollfd,c->fd);
c->bound_to_poll_descriptor= false;
}
c->thread_group->connection_count--;
......@@ -1513,9 +1540,7 @@ static int change_group(TP_connection_generic *c,
int TP_connection_generic::start_io()
{
int fd= (int)mysql_socket_getfd(thd->net.vio->mysql_socket);
{
#ifndef HAVE_IOCP
/*
Usually, connection will stay in the same group for the entire
......@@ -1666,10 +1691,10 @@ int TP_pool_generic::set_pool_size(uint size)
{
thread_group_t *group= &all_groups[i];
mysql_mutex_lock(&group->mutex);
if (group->pollfd == -1)
if (group->pollfd == INVALID_HANDLE_VALUE)
{
group->pollfd= io_poll_create();
success= (group->pollfd >= 0);
success= (group->pollfd != INVALID_HANDLE_VALUE);
if(!success)
{
sql_print_error("io_poll_create() failed, errno=%d\n", errno);
......@@ -1707,7 +1732,7 @@ int TP_pool_generic::set_stall_limit(uint limit)
int TP_pool_generic::get_idle_thread_count()
{
int sum=0;
for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment