Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
57b6cb39
Commit
57b6cb39
authored
Jan 26, 2012
by
Vladislav Vaintroub
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Further review points and simplify Windows implementation
parent
7ed6530a
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
123 additions
and
147 deletions
+123
-147
mysql-test/r/mysqld--help-notwin.result
mysql-test/r/mysqld--help-notwin.result
+2
-2
sql/sql_class.cc
sql/sql_class.cc
+2
-0
sql/sys_vars.cc
sql/sys_vars.cc
+4
-3
sql/threadpool_unix.cc
sql/threadpool_unix.cc
+71
-61
sql/threadpool_win.cc
sql/threadpool_win.cc
+44
-81
No files found.
mysql-test/r/mysqld--help-notwin.result
View file @
57b6cb39
...
@@ -934,7 +934,7 @@ lower-case-table-names 1
...
@@ -934,7 +934,7 @@ lower-case-table-names 1
master-info-file master.info
master-info-file master.info
master-retry-count 86400
master-retry-count 86400
master-verify-checksum FALSE
master-verify-checksum FALSE
max-allowed-packet
8388608
max-allowed-packet
1048576
max-binlog-cache-size 18446744073709547520
max-binlog-cache-size 18446744073709547520
max-binlog-size 1073741824
max-binlog-size 1073741824
max-binlog-stmt-cache-size 18446744073709547520
max-binlog-stmt-cache-size 18446744073709547520
...
@@ -945,7 +945,7 @@ max-error-count 64
...
@@ -945,7 +945,7 @@ max-error-count 64
max-heap-table-size 16777216
max-heap-table-size 16777216
max-join-size 18446744073709551615
max-join-size 18446744073709551615
max-length-for-sort-data 1024
max-length-for-sort-data 1024
max-long-data-size
8388608
max-long-data-size
1048576
max-prepared-stmt-count 16382
max-prepared-stmt-count 16382
max-relay-log-size 0
max-relay-log-size 0
max-seeks-for-key 18446744073709551615
max-seeks-for-key 18446744073709551615
...
...
sql/sql_class.cc
View file @
57b6cb39
...
@@ -3918,6 +3918,7 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd)
...
@@ -3918,6 +3918,7 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd)
SYNOPSIS
SYNOPSIS
thd_wait_begin()
thd_wait_begin()
thd Thread object
thd Thread object
Can be NULL, in this case current THD is used.
wait_type Type of wait
wait_type Type of wait
1 -- short wait (e.g. for mutex)
1 -- short wait (e.g. for mutex)
2 -- medium wait (e.g. for disk io)
2 -- medium wait (e.g. for disk io)
...
@@ -3945,6 +3946,7 @@ extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type)
...
@@ -3945,6 +3946,7 @@ extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type)
when they waking up from a sleep/stall.
when they waking up from a sleep/stall.
@param thd Thread handle
@param thd Thread handle
Can be NULL, in this case current THD is used.
*/
*/
extern
"C"
void
thd_wait_end
(
MYSQL_THD
thd
)
extern
"C"
void
thd_wait_end
(
MYSQL_THD
thd
)
{
{
...
...
sql/sys_vars.cc
View file @
57b6cb39
...
@@ -2245,7 +2245,7 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type)
...
@@ -2245,7 +2245,7 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type)
#ifdef _WIN32
#ifdef _WIN32
static
Sys_var_uint
Sys_threadpool_min_threads
(
static
Sys_var_uint
Sys_threadpool_min_threads
(
"thread_pool_min_threads"
,
"thread_pool_min_threads"
,
"Minimu
i
m number of threads in the thread pool."
,
"Minimum number of threads in the thread pool."
,
GLOBAL_VAR
(
threadpool_min_threads
),
CMD_LINE
(
REQUIRED_ARG
),
GLOBAL_VAR
(
threadpool_min_threads
),
CMD_LINE
(
REQUIRED_ARG
),
VALID_RANGE
(
1
,
256
),
DEFAULT
(
1
),
BLOCK_SIZE
(
1
),
VALID_RANGE
(
1
,
256
),
DEFAULT
(
1
),
BLOCK_SIZE
(
1
),
NO_MUTEX_GUARD
,
NOT_IN_BINLOG
,
ON_CHECK
(
0
),
NO_MUTEX_GUARD
,
NOT_IN_BINLOG
,
ON_CHECK
(
0
),
...
@@ -2267,8 +2267,9 @@ static Sys_var_uint Sys_threadpool_oversubscribe(
...
@@ -2267,8 +2267,9 @@ static Sys_var_uint Sys_threadpool_oversubscribe(
);
);
static
Sys_var_uint
Sys_threadpool_size
(
static
Sys_var_uint
Sys_threadpool_size
(
"thread_pool_size"
,
"thread_pool_size"
,
"Number of concurrently executing threads in the pool. "
"Number of thread groups in the pool. "
"Leaving value default (0) sets it to the number of processors."
,
"This parameter is roughly equivalent to maximum number of concurrently "
"executing threads (threads in a waiting state do not count as executing)."
,
GLOBAL_VAR
(
threadpool_size
),
CMD_LINE
(
REQUIRED_ARG
),
GLOBAL_VAR
(
threadpool_size
),
CMD_LINE
(
REQUIRED_ARG
),
VALID_RANGE
(
1
,
MAX_THREAD_GROUPS
),
DEFAULT
(
my_getncpus
()),
BLOCK_SIZE
(
1
),
VALID_RANGE
(
1
,
MAX_THREAD_GROUPS
),
DEFAULT
(
my_getncpus
()),
BLOCK_SIZE
(
1
),
NO_MUTEX_GUARD
,
NOT_IN_BINLOG
,
ON_CHECK
(
0
),
NO_MUTEX_GUARD
,
NOT_IN_BINLOG
,
ON_CHECK
(
0
),
...
...
sql/threadpool_unix.cc
View file @
57b6cb39
...
@@ -207,6 +207,7 @@ static void print_pool_blocked_message(bool);
...
@@ -207,6 +207,7 @@ static void print_pool_blocked_message(bool);
descriptors can be retrieved from native_events array, using
descriptors can be retrieved from native_events array, using
native_event_get_userdata() function.
native_event_get_userdata() function.
On Linux: epoll_wait()
On Linux: epoll_wait()
*/
*/
...
@@ -248,6 +249,11 @@ int io_poll_disassociate_fd(int pollfd, int fd)
...
@@ -248,6 +249,11 @@ int io_poll_disassociate_fd(int pollfd, int fd)
}
}
/*
Wrapper around epoll_wait.
NOTE - in case of EINTR, it restarts with original timeout. Since we use
either infinite or 0 timeouts, this is not critical
*/
int
io_poll_wait
(
int
pollfd
,
native_event
*
native_events
,
int
maxevents
,
int
io_poll_wait
(
int
pollfd
,
native_event
*
native_events
,
int
maxevents
,
int
timeout_ms
)
int
timeout_ms
)
{
{
...
@@ -260,6 +266,7 @@ int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
...
@@ -260,6 +266,7 @@ int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
return
ret
;
return
ret
;
}
}
static
void
*
native_event_get_userdata
(
native_event
*
event
)
static
void
*
native_event_get_userdata
(
native_event
*
event
)
{
{
return
event
->
data
.
ptr
;
return
event
->
data
.
ptr
;
...
@@ -364,8 +371,10 @@ int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms
...
@@ -364,8 +371,10 @@ int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms
(
timeout_ms
>=
0
)
?&
ts
:
NULL
);
(
timeout_ms
>=
0
)
?&
ts
:
NULL
);
}
}
while
(
ret
==
-
1
&&
errno
==
EINTR
);
while
(
ret
==
-
1
&&
errno
==
EINTR
);
return
nget
;
DBUG_ASSERT
(
nget
<
INT_MAX
);
return
(
int
)
nget
;
}
}
static
void
*
native_event_get_userdata
(
native_event
*
event
)
static
void
*
native_event_get_userdata
(
native_event
*
event
)
{
{
return
event
->
portev_user
;
return
event
->
portev_user
;
...
@@ -375,9 +384,8 @@ static void* native_event_get_userdata(native_event *event)
...
@@ -375,9 +384,8 @@ static void* native_event_get_userdata(native_event *event)
#endif
#endif
/* Dequeue element from a workqueue */
/* Dequeue element from a workqueue */
static
connection_t
*
queue_get
(
thread_group_t
*
thread_group
)
static
connection_t
*
queue_get
(
thread_group_t
*
thread_group
)
{
{
DBUG_ENTER
(
"queue_get"
);
DBUG_ENTER
(
"queue_get"
);
...
@@ -391,12 +399,12 @@ static connection_t *queue_get(thread_group_t *thread_group)
...
@@ -391,12 +399,12 @@ static connection_t *queue_get(thread_group_t *thread_group)
}
}
/*
/*
Handle wait timeout :
Handle wait timeout :
Find connections that have been idle for too long and kill them.
Find connections that have been idle for too long and kill them.
Also, recalculate time when next timeout check should run.
Also, recalculate time when next timeout check should run.
*/
*/
static
void
timeout_check
(
pool_timer_t
*
timer
)
static
void
timeout_check
(
pool_timer_t
*
timer
)
{
{
DBUG_ENTER
(
"timeout_check"
);
DBUG_ENTER
(
"timeout_check"
);
...
@@ -418,7 +426,7 @@ static void timeout_check(pool_timer_t *timer)
...
@@ -418,7 +426,7 @@ static void timeout_check(pool_timer_t *timer)
{
{
/*
/*
Connection does not have scheduler data. This happens for example
Connection does not have scheduler data. This happens for example
if THD belongs to a
nother
scheduler, that is listening to extra_port.
if THD belongs to a
different
scheduler, that is listening to extra_port.
*/
*/
continue
;
continue
;
}
}
...
@@ -458,18 +466,18 @@ static void timeout_check(pool_timer_t *timer)
...
@@ -458,18 +466,18 @@ static void timeout_check(pool_timer_t *timer)
static
void
*
timer_thread
(
void
*
param
)
static
void
*
timer_thread
(
void
*
param
)
{
{
uint
i
;
uint
i
;
pool_timer_t
*
timer
=
(
pool_timer_t
*
)
param
;
pool_timer_t
*
timer
=
(
pool_timer_t
*
)
param
;
timer
->
next_timeout_check
=
ULONGLONG_MAX
;
timer
->
current_microtime
=
microsecond_interval_timer
();
my_thread_init
();
my_thread_init
();
DBUG_ENTER
(
"timer_thread"
);
DBUG_ENTER
(
"timer_thread"
);
timer
->
next_timeout_check
=
ULONGLONG_MAX
;
timer
->
current_microtime
=
microsecond_interval_timer
();
for
(;;)
for
(;;)
{
{
struct
timespec
ts
;
struct
timespec
ts
;
int
err
;
int
err
;
set_timespec_nsec
(
ts
,
timer
->
tick_interval
*
1000000
);
set_timespec_nsec
(
ts
,
timer
->
tick_interval
*
1000000
);
mysql_mutex_lock
(
&
timer
->
mutex
);
mysql_mutex_lock
(
&
timer
->
mutex
);
err
=
mysql_cond_timedwait
(
&
timer
->
cond
,
&
timer
->
mutex
,
&
ts
);
err
=
mysql_cond_timedwait
(
&
timer
->
cond
,
&
timer
->
mutex
,
&
ts
);
...
@@ -543,7 +551,6 @@ void check_stall(thread_group_t *thread_group)
...
@@ -543,7 +551,6 @@ void check_stall(thread_group_t *thread_group)
}
}
static
void
start_timer
(
pool_timer_t
*
timer
)
static
void
start_timer
(
pool_timer_t
*
timer
)
{
{
pthread_t
thread_id
;
pthread_t
thread_id
;
...
@@ -555,6 +562,7 @@ static void start_timer(pool_timer_t* timer)
...
@@ -555,6 +562,7 @@ static void start_timer(pool_timer_t* timer)
DBUG_VOID_RETURN
;
DBUG_VOID_RETURN
;
}
}
static
void
stop_timer
(
pool_timer_t
*
timer
)
static
void
stop_timer
(
pool_timer_t
*
timer
)
{
{
DBUG_ENTER
(
"stop_timer"
);
DBUG_ENTER
(
"stop_timer"
);
...
@@ -664,42 +672,42 @@ static connection_t * listener(worker_thread_t *current_thread,
...
@@ -664,42 +672,42 @@ static connection_t * listener(worker_thread_t *current_thread,
thread_group
->
queue
.
push_back
(
c
);
thread_group
->
queue
.
push_back
(
c
);
}
}
if
(
listener_picks_event
)
{
/* Handle the first event. */
retval
=
(
connection_t
*
)
native_event_get_userdata
(
&
ev
[
0
]);
mysql_mutex_unlock
(
&
thread_group
->
mutex
);
break
;
}
if
(
thread_group
->
active_thread_count
==
0
&&
!
listener_picks_event
)
if
(
thread_group
->
active_thread_count
==
0
)
{
{
/* W
ake one worker thread
*/
/* W
e added some work items to queue, now wake a worker.
*/
if
(
wake_thread
(
thread_group
))
if
(
wake_thread
(
thread_group
))
{
{
/*
/*
Wake failed,
groups has no idle threads.
Wake failed,
hence groups has no idle threads. Now check if there are
Now check if the group has at least one work
er.
any threads in the group except listen
er.
*/
*/
if
(
thread_group
->
thread_count
==
1
&&
if
(
thread_group
->
thread_count
==
1
&&
thread_group
->
pending_thread_start_count
==
0
)
thread_group
->
pending_thread_start_count
==
0
)
{
{
/*
/*
Currently there is no worker thread in the group, as indicated by
Currently there is no worker thread in the group, as indicated by
thread_count == 1 (means listener is the only one thread in the
thread_count == 1 (this means listener is the only one thread in
group).
the group).
The queue is not empty, and listener is not going to handle
Rhe queue is not empty, and listener is not going to handle
events. In order to drain the queue, we create a worker here.
events. In order to drain the queue, we create a worker here.
Alternatively, we could just rely on timer to detect stall, but
Alternatively, we could just rely on timer to detect stall, and
this would be an inefficient, pointless delay.
create thread, but waiting for timer would be an inefficient and
pointless delay.
*/
*/
create_worker
(
thread_group
);
create_worker
(
thread_group
);
}
}
}
}
}
}
mysql_mutex_unlock
(
&
thread_group
->
mutex
);
mysql_mutex_unlock
(
&
thread_group
->
mutex
);
if
(
listener_picks_event
)
{
retval
=
(
connection_t
*
)
native_event_get_userdata
(
&
ev
[
0
]);
break
;
}
}
}
DBUG_RETURN
(
retval
);
DBUG_RETURN
(
retval
);
}
}
...
@@ -707,7 +715,7 @@ static connection_t * listener(worker_thread_t *current_thread,
...
@@ -707,7 +715,7 @@ static connection_t * listener(worker_thread_t *current_thread,
/*
/*
*
Creates a new worker thread.
Creates a new worker thread.
thread_mutex must be held when calling this function
thread_mutex must be held when calling this function
...
@@ -806,10 +814,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
...
@@ -806,10 +814,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if
(
thread_group
->
active_thread_count
==
0
)
if
(
thread_group
->
active_thread_count
==
0
)
{
{
/*
/*
We're better off creating a new thread here with no delay,
We're better off creating a new thread here with no delay,
either there
either there is no workers at all, or they all are all blocking
are no workers at all, or they all are all blocking and there was no
and there was no sleeping thread to wakeup. It smells like deadlock
idle thread to wakeup. Smells like a potential deadlock or very slowly
or very slowly
executing requests, e.g sleeps or user locks.
executing requests, e.g sleeps or user locks.
*/
*/
DBUG_RETURN
(
create_worker
(
thread_group
));
DBUG_RETURN
(
create_worker
(
thread_group
));
}
}
...
@@ -862,7 +870,8 @@ void thread_group_destroy(thread_group_t *thread_group)
...
@@ -862,7 +870,8 @@ void thread_group_destroy(thread_group_t *thread_group)
/**
/**
Wake sleeping thread from waiting list
Wake sleeping thread from waiting list
*/
*/
static
int
wake_thread
(
thread_group_t
*
thread_group
)
static
int
wake_thread
(
thread_group_t
*
thread_group
)
{
{
DBUG_ENTER
(
"wake_thread"
);
DBUG_ENTER
(
"wake_thread"
);
...
@@ -879,16 +888,14 @@ static int wake_thread(thread_group_t *thread_group)
...
@@ -879,16 +888,14 @@ static int wake_thread(thread_group_t *thread_group)
}
}
/*
/*
*
Initiate shutdown for thread group.
Initiate shutdown for thread group.
The shutdown is asynchronous, we only care to wake all threads
The shutdown is asynchronous, we only care to wake all threads in here, so
in here, so they can finish. We do not wait here until threads
they can finish. We do not wait here until threads terminate. Final cleanup
terminate,
of the group (thread_group_destroy) will be done by the last exiting threads.
Final cleanup of the group (thread_group_destroy) will be done by
the last exiting threads.
*/
*/
static
void
thread_group_close
(
thread_group_t
*
thread_group
)
static
void
thread_group_close
(
thread_group_t
*
thread_group
)
{
{
DBUG_ENTER
(
"thread_group_close"
);
DBUG_ENTER
(
"thread_group_close"
);
...
@@ -938,6 +945,7 @@ static void thread_group_close(thread_group_t *thread_group)
...
@@ -938,6 +945,7 @@ static void thread_group_close(thread_group_t *thread_group)
perform login (this is done in worker threads).
perform login (this is done in worker threads).
*/
*/
static
void
queue_put
(
thread_group_t
*
thread_group
,
connection_t
*
connection
)
static
void
queue_put
(
thread_group_t
*
thread_group
,
connection_t
*
connection
)
{
{
DBUG_ENTER
(
"queue_put"
);
DBUG_ENTER
(
"queue_put"
);
...
@@ -949,14 +957,16 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection)
...
@@ -949,14 +957,16 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection)
wake_or_create_thread
(
thread_group
);
wake_or_create_thread
(
thread_group
);
}
}
mysql_mutex_unlock
(
&
thread_group
->
mutex
);
mysql_mutex_unlock
(
&
thread_group
->
mutex
);
DBUG_VOID_RETURN
;
DBUG_VOID_RETURN
;
}
}
/*
/*
This is used to prevent too many threads executing at the same time,
Prevent too many threads executing at the same time,if the workload is
if the workload is
not CPU bound.
not CPU bound.
*/
*/
static
bool
too_many_threads
(
thread_group_t
*
thread_group
)
static
bool
too_many_threads
(
thread_group_t
*
thread_group
)
{
{
return
(
thread_group
->
active_thread_count
>=
1
+
(
int
)
threadpool_oversubscribe
return
(
thread_group
->
active_thread_count
>=
1
+
(
int
)
threadpool_oversubscribe
...
@@ -964,7 +974,6 @@ static bool too_many_threads(thread_group_t *thread_group)
...
@@ -964,7 +974,6 @@ static bool too_many_threads(thread_group_t *thread_group)
}
}
/**
/**
Retrieve a connection with pending event.
Retrieve a connection with pending event.
...
@@ -981,17 +990,15 @@ static bool too_many_threads(thread_group_t *thread_group)
...
@@ -981,17 +990,15 @@ static bool too_many_threads(thread_group_t *thread_group)
@return
@return
connection with pending event. NULL is returned if timeout has expired,or on shutdown.
connection with pending event. NULL is returned if timeout has expired,or on shutdown.
*/
*/
connection_t
*
get_event
(
worker_thread_t
*
current_thread
,
connection_t
*
get_event
(
worker_thread_t
*
current_thread
,
thread_group_t
*
thread_group
,
struct
timespec
*
abstime
)
thread_group_t
*
thread_group
,
struct
timespec
*
abstime
)
{
{
DBUG_ENTER
(
"get_event"
);
DBUG_ENTER
(
"get_event"
);
connection_t
*
connection
=
NULL
;
connection_t
*
connection
=
NULL
;
int
err
=
0
;
int
err
=
0
;
mysql_mutex_lock
(
&
thread_group
->
mutex
);
mysql_mutex_lock
(
&
thread_group
->
mutex
);
DBUG_ASSERT
(
thread_group
->
active_thread_count
>=
0
);
DBUG_ASSERT
(
thread_group
->
active_thread_count
>=
0
);
do
do
...
@@ -1083,6 +1090,7 @@ connection_t *get_event(worker_thread_t *current_thread,
...
@@ -1083,6 +1090,7 @@ connection_t *get_event(worker_thread_t *current_thread,
Tells the pool that worker starts waiting on IO, lock, condition,
Tells the pool that worker starts waiting on IO, lock, condition,
sleep() or similar.
sleep() or similar.
*/
*/
void
wait_begin
(
thread_group_t
*
thread_group
)
void
wait_begin
(
thread_group_t
*
thread_group
)
{
{
DBUG_ENTER
(
"wait_begin"
);
DBUG_ENTER
(
"wait_begin"
);
...
@@ -1273,7 +1281,6 @@ static void set_next_timeout_check(ulonglong abstime)
...
@@ -1273,7 +1281,6 @@ static void set_next_timeout_check(ulonglong abstime)
/**
/**
Set wait timeout for connection.
Set wait timeout for connection.
*/
*/
static
void
set_wait_timeout
(
connection_t
*
c
)
static
void
set_wait_timeout
(
connection_t
*
c
)
{
{
DBUG_ENTER
(
"set_wait_timeout"
);
DBUG_ENTER
(
"set_wait_timeout"
);
...
@@ -1402,6 +1409,7 @@ static void handle_event(connection_t *connection)
...
@@ -1402,6 +1409,7 @@ static void handle_event(connection_t *connection)
/**
/**
Worker thread's main
Worker thread's main
*/
*/
static
void
*
worker_main
(
void
*
param
)
static
void
*
worker_main
(
void
*
param
)
{
{
...
@@ -1543,7 +1551,8 @@ void tp_set_threadpool_stall_limit(uint limit)
...
@@ -1543,7 +1551,8 @@ void tp_set_threadpool_stall_limit(uint limit)
Calculate number of idle/waiting threads in the pool.
Calculate number of idle/waiting threads in the pool.
Sum idle threads over all groups.
Sum idle threads over all groups.
Don't do any locking, it is not required for stats.
D
on't do any locking, it is not required for stats.
*/
*/
int
tp_get_idle_thread_count
()
int
tp_get_idle_thread_count
()
{
{
...
@@ -1601,7 +1610,8 @@ static void print_pool_blocked_message(bool max_threads_reached)
...
@@ -1601,7 +1610,8 @@ static void print_pool_blocked_message(bool max_threads_reached)
else
else
sql_print_error
(
create_thread_error_msg
,
my_errno
);
sql_print_error
(
create_thread_error_msg
,
my_errno
);
sql_print_information
(
"Threadpool has been blocked for %u seconds
\n
"
,(
uint
)(
now
-
pool_block_start
));
sql_print_information
(
"Threadpool has been blocked for %u seconds
\n
"
,
(
uint
)(
now
-
pool_block_start
));
/* avoid reperated messages for the same blocking situation */
/* avoid reperated messages for the same blocking situation */
msg_written
=
true
;
msg_written
=
true
;
}
}
...
...
sql/threadpool_win.cc
View file @
57b6cb39
...
@@ -17,6 +17,30 @@
...
@@ -17,6 +17,30 @@
#include <windows.h>
#include <windows.h>
/*
Threadpool API is not available on XP. We still want to compile a single
version on Windows, but use the latest functionality if available.
We cannot use threadpool functionality directly, since executable won't
start on XP and loader will complain about missing symbols.
We solve using the usual way it is done on Windows, i.e with dynamic loading.
We'll need to load a lot of function, and make this less painful with the
WEAK_SYMBOL macro below
*/
/*
WEAK_SYMBOL(return_type, function_name, argument_type1,..,argument_typeN)
Declare and load function pointer from kernel32. The name of the static
variable that holds the function pointer is my_<original function name>
This should be combined with
#define <original function name> my_<original function name>
so that one could use Widows APIs transparently, without worrying whether
they are present in a particular version or not.
Of course, prior to use of any function there should be a check for correct
Windows version, or check whether function pointer is not NULL.
*/
#define WEAK_SYMBOL(return_type, function, ...) \
#define WEAK_SYMBOL(return_type, function, ...) \
typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
static pFN_##function my_##function = (pFN_##function) \
static pFN_##function my_##function = (pFN_##function) \
...
@@ -110,9 +134,7 @@ WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk);
...
@@ -110,9 +134,7 @@ WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk);
WEAK_SYMBOL
(
BOOL
,
SetThreadpoolStackInformation
,
PTP_POOL
,
WEAK_SYMBOL
(
BOOL
,
SetThreadpoolStackInformation
,
PTP_POOL
,
PTP_POOL_STACK_INFORMATION
);
PTP_POOL_STACK_INFORMATION
);
#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
#endif
#else
/* _MSC_VER < 1600 */
#if _MSC_VER < 1600
#define SetThreadpoolCallbackPriority(env,prio)
#define SetThreadpoolCallbackPriority(env,prio)
typedef
enum
_TP_CALLBACK_PRIORITY
{
typedef
enum
_TP_CALLBACK_PRIORITY
{
TP_CALLBACK_PRIORITY_HIGH
,
TP_CALLBACK_PRIORITY_HIGH
,
...
@@ -158,8 +180,6 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
...
@@ -158,8 +180,6 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
static
void
CALLBACK
shm_close_callback
(
PTP_CALLBACK_INSTANCE
instance
,
static
void
CALLBACK
shm_close_callback
(
PTP_CALLBACK_INSTANCE
instance
,
PVOID
Context
,
PTP_WAIT
wait
,
TP_WAIT_RESULT
wait_result
);
PVOID
Context
,
PTP_WAIT
wait
,
TP_WAIT_RESULT
wait_result
);
#define CONNECTION_SIGNATURE 0xAFFEAFFE
static
void
check_thread_init
();
static
void
check_thread_init
();
/* Get current time as Windows time */
/* Get current time as Windows time */
...
@@ -178,21 +198,19 @@ static ulonglong now()
...
@@ -178,21 +198,19 @@ static ulonglong now()
struct
connection_t
struct
connection_t
{
{
THD
*
thd
;
THD
*
thd
;
bool
logged_in
;
HANDLE
handle
;
HANDLE
handle
;
OVERLAPPED
overlapped
;
OVERLAPPED
overlapped
;
/* absolute time for wait timeout (as Windows time) */
/* absolute time for wait timeout (as Windows time) */
volatile
ulonglong
timeout
;
volatile
ulonglong
timeout
;
PTP_CLEANUP_GROUP
cleanup_group
;
PTP_CLEANUP_GROUP
cleanup_group
;
TP_CALLBACK_ENVIRON
callback_environ
;
TP_CALLBACK_ENVIRON
callback_environ
;
PTP_IO
io
;
PTP_IO
io
;
PTP_TIMER
timer
;
PTP_TIMER
timer
;
PTP_WAIT
shm_read
;
PTP_WAIT
shm_read
;
bool
logged_in
;
};
};
void
init_connection
(
connection_t
*
connection
)
void
init_connection
(
connection_t
*
connection
)
{
{
connection
->
logged_in
=
false
;
connection
->
logged_in
=
false
;
...
@@ -208,6 +226,7 @@ void init_connection(connection_t *connection)
...
@@ -208,6 +226,7 @@ void init_connection(connection_t *connection)
connection
->
thd
=
0
;
connection
->
thd
=
0
;
}
}
int
init_io
(
connection_t
*
connection
,
THD
*
thd
)
int
init_io
(
connection_t
*
connection
,
THD
*
thd
)
{
{
connection
->
thd
=
thd
;
connection
->
thd
=
thd
;
...
@@ -237,7 +256,7 @@ int init_io(connection_t *connection, THD *thd)
...
@@ -237,7 +256,7 @@ int init_io(connection_t *connection, THD *thd)
if
(
connection
->
handle
)
if
(
connection
->
handle
)
{
{
/* Performance tweaks (s. MSDN documentation)*/
/* Performance tweaks (s. MSDN documentation)*/
UCHAR
flags
=
FILE_SKIP_SET_EVENT_ON_HANDLE
;
UCHAR
flags
=
FILE_SKIP_SET_EVENT_ON_HANDLE
;
if
(
skip_completion_port_on_success
)
if
(
skip_completion_port_on_success
)
{
{
flags
|=
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
;
flags
|=
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
;
...
@@ -245,7 +264,7 @@ int init_io(connection_t *connection, THD *thd)
...
@@ -245,7 +264,7 @@ int init_io(connection_t *connection, THD *thd)
(
void
)
SetFileCompletionNotificationModes
(
connection
->
handle
,
flags
);
(
void
)
SetFileCompletionNotificationModes
(
connection
->
handle
,
flags
);
/* Assign io completion callback */
/* Assign io completion callback */
connection
->
io
=
CreateThreadpoolIo
(
connection
->
handle
,
connection
->
io
=
CreateThreadpoolIo
(
connection
->
handle
,
io_completion_callback
,
connection
,
&
connection
->
callback_environ
);
io_completion_callback
,
connection
,
&
connection
->
callback_environ
);
if
(
!
connection
->
io
)
if
(
!
connection
->
io
)
{
{
...
@@ -253,7 +272,7 @@ int init_io(connection_t *connection, THD *thd)
...
@@ -253,7 +272,7 @@ int init_io(connection_t *connection, THD *thd)
return
-
1
;
return
-
1
;
}
}
}
}
connection
->
timer
=
CreateThreadpoolTimer
(
timer_callback
,
connection
,
connection
->
timer
=
CreateThreadpoolTimer
(
timer_callback
,
connection
,
&
connection
->
callback_environ
);
&
connection
->
callback_environ
);
if
(
!
connection
->
timer
)
if
(
!
connection
->
timer
)
{
{
...
@@ -354,6 +373,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
...
@@ -354,6 +373,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
return
-
1
;
return
-
1
;
}
}
int
login
(
connection_t
*
connection
,
PTP_CALLBACK_INSTANCE
instance
)
int
login
(
connection_t
*
connection
,
PTP_CALLBACK_INSTANCE
instance
)
{
{
if
(
threadpool_add_connection
(
connection
->
thd
)
==
0
if
(
threadpool_add_connection
(
connection
->
thd
)
==
0
...
@@ -380,21 +400,6 @@ void set_wait_timeout(connection_t *connection, ulonglong old_timeout)
...
@@ -380,21 +400,6 @@ void set_wait_timeout(connection_t *connection, ulonglong old_timeout)
connection
->
timeout
=
new_timeout
;
connection
->
timeout
=
new_timeout
;
}
}
/*
Terminates (idle) connection by closing the socket.
This will activate io_completion_callback() in a different thread
*/
void
post_kill_notification
(
connection_t
*
connection
)
{
check_thread_init
();
THD
*
thd
=
connection
->
thd
;
mysql_mutex_lock
(
&
thd
->
LOCK_thd_data
);
thd
->
killed
=
KILL_CONNECTION
;
vio_shutdown
(
thd
->
net
.
vio
,
SHUT_RDWR
);
thd
->
mysys_var
=
NULL
;
mysql_mutex_unlock
(
&
thd
->
LOCK_thd_data
);
}
/* Connection destructor */
/* Connection destructor */
void
destroy_connection
(
connection_t
*
connection
)
void
destroy_connection
(
connection_t
*
connection
)
...
@@ -438,7 +443,6 @@ static void check_thread_init()
...
@@ -438,7 +443,6 @@ static void check_thread_init()
if
(
FlsGetValue
(
fls
)
==
NULL
)
if
(
FlsGetValue
(
fls
)
==
NULL
)
{
{
FlsSetValue
(
fls
,
(
void
*
)
1
);
FlsSetValue
(
fls
,
(
void
*
)
1
);
my_thread_init
();
thread_created
++
;
thread_created
++
;
InterlockedIncrement
((
volatile
long
*
)
&
tp_stats
.
num_worker_threads
);
InterlockedIncrement
((
volatile
long
*
)
&
tp_stats
.
num_worker_threads
);
}
}
...
@@ -446,28 +450,14 @@ static void check_thread_init()
...
@@ -446,28 +450,14 @@ static void check_thread_init()
/*
/*
Take care of proper cleanup when threadpool threads exit.
Decrement number of threads when a thread exits .
We do not control how threads are created, thus it is our responsibility to
On Windows, FlsAlloc() provides the thread destruction callbacks.
check that my_thread_init() is called on thread initialization and
my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the
thread destruction callbacks.
*/
*/
static
VOID
WINAPI
thread_destructor
(
void
*
data
)
static
VOID
WINAPI
thread_destructor
(
void
*
data
)
{
{
if
(
data
)
if
(
data
)
{
{
if
(
InterlockedDecrement
((
volatile
long
*
)
&
tp_stats
.
num_worker_threads
)
>=
0
)
InterlockedDecrement
((
volatile
long
*
)
&
tp_stats
.
num_worker_threads
);
{
/*
The above check for number of thread >= 0 is due to shutdown code (
see tp_end()) where we forcefully set num_worker_threads to 0, even
if not all threads have shut down yet to the point they would ran Fls
destructors, even after CloseThreadpool(). See also comment in tp_end().
*/
mysql_mutex_lock
(
&
LOCK_thread_count
);
my_thread_end
();
mysql_mutex_unlock
(
&
LOCK_thread_count
);
}
}
}
}
}
...
@@ -507,7 +497,7 @@ bool tp_init(void)
...
@@ -507,7 +497,7 @@ bool tp_init(void)
{
{
TP_POOL_STACK_INFORMATION
stackinfo
;
TP_POOL_STACK_INFORMATION
stackinfo
;
stackinfo
.
StackCommit
=
0
;
stackinfo
.
StackCommit
=
0
;
stackinfo
.
StackReserve
=
my_thread_stack_size
;
stackinfo
.
StackReserve
=
(
SIZE_T
)
my_thread_stack_size
;
if
(
!
SetThreadpoolStackInformation
(
pool
,
&
stackinfo
))
if
(
!
SetThreadpoolStackInformation
(
pool
,
&
stackinfo
))
{
{
tp_log_warning
(
"Can't set threadpool stack size"
,
tp_log_warning
(
"Can't set threadpool stack size"
,
...
@@ -520,45 +510,19 @@ bool tp_init(void)
...
@@ -520,45 +510,19 @@ bool tp_init(void)
}
}
/*
/*
*
Scheduler callback : Destroy the scheduler.
Scheduler callback : Destroy the scheduler.
*/
*/
extern
"C"
uint
THR_thread_count
;
extern
"C"
mysql_mutex_t
THR_LOCK_threads
;
extern
"C"
mysql_cond_t
THR_COND_threads
;
void
tp_end
(
void
)
void
tp_end
(
void
)
{
{
if
(
pool
)
if
(
pool
)
{
{
SetThreadpoolThreadMaximum
(
pool
,
0
);
SetThreadpoolThreadMaximum
(
pool
,
0
);
CloseThreadpool
(
pool
);
CloseThreadpool
(
pool
);
/*
Tell my_global_thread_end() we're complete.
This would not be necessary if CloseThreadpool() would synchronously
release all threads and wait until they disappear and call all their FLS
destructors . However, threads in the pool are released asynchronously
and might spend some time in the CRT shutdown code. Thus zero
num_worker_threads, to avoid thread destructor's my_thread_end()s after
this point.
*/
LONG
remaining_threads
=
InterlockedExchange
(
(
volatile
long
*
)
&
tp_stats
.
num_worker_threads
,
0
);
if
(
remaining_threads
)
{
mysql_mutex_lock
(
&
THR_LOCK_threads
);
THR_thread_count
-=
remaining_threads
;
mysql_cond_signal
(
&
THR_COND_threads
);
mysql_mutex_unlock
(
&
THR_LOCK_threads
);
}
}
}
}
}
/*
/*
*
Notify pool about connection being killed.
Notify pool about connection being killed.
*/
*/
void
tp_post_kill_notification
(
THD
*
thd
)
void
tp_post_kill_notification
(
THD
*
thd
)
...
@@ -606,7 +570,7 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
...
@@ -606,7 +570,7 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
DisassociateCurrentThreadFromCallback
(
instance
);
DisassociateCurrentThreadFromCallback
(
instance
);
destroy_connection
(
connection
);
destroy_connection
(
connection
);
my_
free
(
connection
);
free
(
connection
);
}
}
...
@@ -623,7 +587,7 @@ static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
...
@@ -623,7 +587,7 @@ static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
if
(
login
(
connection
,
instance
)
!=
0
)
if
(
login
(
connection
,
instance
)
!=
0
)
{
{
destroy_connection
(
connection
);
destroy_connection
(
connection
);
my_
free
(
connection
);
free
(
connection
);
}
}
}
}
...
@@ -688,8 +652,7 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
...
@@ -688,8 +652,7 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
*/
*/
void
tp_add_connection
(
THD
*
thd
)
void
tp_add_connection
(
THD
*
thd
)
{
{
bool
success
=
false
;
connection_t
*
con
=
(
connection_t
*
)
malloc
(
sizeof
(
connection_t
));
connection_t
*
con
=
(
connection_t
*
)
my_malloc
(
sizeof
(
connection_t
),
0
);
if
(
con
)
if
(
con
)
threads
.
append
(
thd
);
threads
.
append
(
thd
);
...
@@ -698,6 +661,7 @@ void tp_add_connection(THD *thd)
...
@@ -698,6 +661,7 @@ void tp_add_connection(THD *thd)
if
(
!
con
)
if
(
!
con
)
{
{
tp_log_warning
(
"Allocation failed"
,
"tp_add_connection"
);
tp_log_warning
(
"Allocation failed"
,
"tp_add_connection"
);
threadpool_remove_connection
(
thd
);
return
;
return
;
}
}
...
@@ -718,8 +682,7 @@ void tp_add_connection(THD *thd)
...
@@ -718,8 +682,7 @@ void tp_add_connection(THD *thd)
}
}
/**
/*
Sets the number of idle threads the thread pool maintains in anticipation of new
Sets the number of idle threads the thread pool maintains in anticipation of new
requests.
requests.
*/
*/
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment