Commit 1a3db0e2 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Fix threadpool after it was broken by MDEV-6150

parent a8d97fb8
......@@ -5,11 +5,11 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_1';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
Got one of the listed errors
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
Got one of the listed errors
set global debug_dbug=@old_debug;
select 1;
1
......@@ -19,11 +19,11 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_1';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
Got one of the listed errors
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
Got one of the listed errors
set global debug_dbug=@old_debug;
select 1;
1
......@@ -37,7 +37,7 @@ select 1;
1
set global debug_dbug='+d,simulate_failed_connection_2';
connect(localhost,root,,test,MASTER_PORT,MASTER_SOCKET);
ERROR HY000: Lost connection to MySQL server at 'reading initial communication packet', system error: 95 "Operation not supported"
Got one of the listed errors
show status like "Threads_connected";
Variable_name Value
Threads_connected 1
......
......@@ -16,14 +16,15 @@ disconnect con1;
connection default;
set global debug_dbug='+d,simulate_failed_connection_1';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT
--error 2013
--error 1041,2013
connect(con1,localhost,root,,test,,);
connection default;
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_MYPORT MASTER_PORT
--error 2013
--error 1041,2013
connect(con1,localhost,root,,test,,);
--enable_result_log
connection default;
set global debug_dbug=@old_debug;
connect(con1,localhost,root,,test,,);
......@@ -38,13 +39,13 @@ disconnect con1;
connection default;
set global debug_dbug='+d,simulate_failed_connection_1';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
--error 2013
--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
set global debug_dbug=@old_debug;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
--error 2013
--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
set global debug_dbug=@old_debug;
......@@ -66,7 +67,7 @@ disconnect con2;
connection default;
set global debug_dbug='+d,simulate_failed_connection_2';
--replace_result $MASTER_MYSOCK MASTER_SOCKET $MASTER_EXTRA_PORT MASTER_PORT
--error 2013
--error 1041,2013
connect(con1,localhost,root,,test,$MASTER_EXTRA_PORT,);
connection default;
......
......@@ -30,7 +30,7 @@ extern uint threadpool_oversubscribe; /* Maximum active threads in group */
extern void threadpool_cleanup_connection(THD *thd);
extern void threadpool_remove_connection(THD *thd);
extern int threadpool_process_request(THD *thd);
extern int threadpool_add_connection(THD *thd);
extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data);
/*
Functions used by scheduler.
......
......@@ -94,7 +94,7 @@ struct Worker_thread_context
/*
Attach/associate the connection with the OS thread,
*/
static bool thread_attach(THD* thd)
static void thread_attach(THD* thd)
{
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd;
......@@ -103,13 +103,14 @@ static bool thread_attach(THD* thd)
if (PSI_server)
PSI_server->set_thread(thd->event_scheduler.m_psi);
#endif
return 0;
}
int threadpool_add_connection(THD *thd)
THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{
int retval=1;
THD *thd= NULL;
int error=1;
Worker_thread_context worker_context;
worker_context.save();
......@@ -120,13 +121,23 @@ int threadpool_add_connection(THD *thd)
pthread_setspecific(THR_KEY_mysys, 0);
my_thread_init();
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
if (!thd->mysys_var)
st_my_thread_var* mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
DBUG_EXECUTE_IF("simulate_failed_connection_1", mysys_var= NULL; my_thread_end(););
if (!mysys_var ||!(thd= connect->create_thd()))
{
/* Out of memory? */
connect->close_and_delete();
if (mysys_var)
{
my_thread_end();
}
worker_context.restore();
return 1;
return NULL;
}
delete connect;
add_to_active_threads(thd);
thd->mysys_var= mysys_var;
thd->event_scheduler.data= scheduler_data;
/* Create new PSI thread for use with the THD. */
#ifdef HAVE_PSI_INTERFACE
......@@ -157,14 +168,19 @@ int threadpool_add_connection(THD *thd)
*/
if (thd_is_connection_alive(thd))
{
retval= 0;
error= 0;
thd->net.reading_or_writing= 1;
thd->skip_wait_timeout= true;
}
}
}
if (error)
{
threadpool_cleanup_connection(thd);
thd= NULL;
}
worker_context.restore();
return retval;
return thd;
}
/*
......
......@@ -116,6 +116,7 @@ struct connection_t
connection_t *next_in_queue;
connection_t **prev_in_queue;
ulonglong abs_wait_timeout;
CONNECT* connect;
bool logged_in;
bool bound_to_poll_descriptor;
bool waiting;
......@@ -1215,6 +1216,7 @@ connection_t *alloc_connection()
connection->logged_in= false;
connection->bound_to_poll_descriptor= false;
connection->abs_wait_timeout= ULONGLONG_MAX;
connection->thd= 0;
}
DBUG_RETURN(connection);
}
......@@ -1228,26 +1230,19 @@ connection_t *alloc_connection()
void tp_add_connection(CONNECT *connect)
{
connection_t *connection;
THD *thd;
DBUG_ENTER("tp_add_connection");
if (!(connection= alloc_connection()) || !(thd= connect->create_thd()))
connection= alloc_connection();
if (!connection)
{
my_free(connection);
connect->close_and_delete();
DBUG_VOID_RETURN;
}
connection->thd= thd;
delete connect;
add_to_active_threads(thd);
connection->connect= connect;
thd->event_scheduler.data= connection;
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[thd->thread_id%group_count];
&all_groups[connect->thread_id%group_count];
connection->thread_group=group;
mysql_mutex_lock(&group->mutex);
......@@ -1271,9 +1266,12 @@ static void connection_abort(connection_t *connection)
{
DBUG_ENTER("connection_abort");
thread_group_t *group= connection->thread_group;
threadpool_remove_connection(connection->thd);
if (connection->thd)
{
threadpool_remove_connection(connection->thd);
}
mysql_mutex_lock(&group->mutex);
group->connection_count--;
mysql_mutex_unlock(&group->mutex);
......@@ -1442,7 +1440,8 @@ static void handle_event(connection_t *connection)
if (!connection->logged_in)
{
err= threadpool_add_connection(connection->thd);
connection->thd = threadpool_add_connection(connection->connect, connection);
err= (connection->thd == NULL);
connection->logged_in= true;
}
else
......
......@@ -226,11 +226,12 @@ struct connection_t
PTP_WAIT shm_read;
/* Callback instance, used to inform treadpool about long callbacks */
PTP_CALLBACK_INSTANCE callback_instance;
CONNECT* connect;
bool logged_in;
};
void init_connection(connection_t *connection, THD *thd)
void init_connection(connection_t *connection, CONNECT *connect)
{
connection->logged_in = false;
connection->handle= 0;
......@@ -240,11 +241,11 @@ void init_connection(connection_t *connection, THD *thd)
connection->logged_in = false;
connection->timeout= ULONGLONG_MAX;
connection->callback_instance= 0;
connection->thd= 0;
memset(&connection->overlapped, 0, sizeof(OVERLAPPED));
InitializeThreadpoolEnvironment(&connection->callback_environ);
SetThreadpoolCallbackPool(&connection->callback_environ, pool);
connection->thd = thd;
thd->event_scheduler.data= connection;
connection->connect= connect;
}
......@@ -397,8 +398,8 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{
if (threadpool_add_connection(connection->thd) == 0
&& init_io(connection, connection->thd) == 0
if ((connection->thd= threadpool_add_connection(connection->connect, connection))
&& init_io(connection, connection->thd) == 0
&& start_io(connection, instance) == 0)
{
return 0;
......@@ -660,22 +661,16 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
void tp_add_connection(CONNECT *connect)
{
THD *thd;
connection_t *con;
if (!(con = (connection_t *) malloc(sizeof(connection_t))) ||
!(thd= connect->create_thd()))
connection_t *con;
con= (connection_t *)malloc(sizeof(connection_t));
if (!con)
{
tp_log_warning("Allocation failed", "tp_add_connection");
free(con);
connect->close_and_delete();
return;
}
delete connect;
add_to_active_threads(thd);
init_connection(con, thd);
init_connection(con, connect);
/* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
......@@ -687,7 +682,7 @@ void tp_add_connection(CONNECT *connect)
else
{
/* Likely memory pressure */
threadpool_cleanup_connection(thd);
connect->close_and_delete();
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment