Commit dbe941e0 authored by Eugene Kosov's avatar Eugene Kosov

cleanup: os_thread_create -> std::thread

parent da342880
...@@ -129,7 +129,6 @@ struct datadir_thread_ctxt_t { ...@@ -129,7 +129,6 @@ struct datadir_thread_ctxt_t {
uint n_thread; uint n_thread;
uint *count; uint *count;
pthread_mutex_t* count_mutex; pthread_mutex_t* count_mutex;
os_thread_id_t id;
bool ret; bool ret;
}; };
...@@ -947,7 +946,7 @@ backup_file_printf(const char *filename, const char *fmt, ...) ...@@ -947,7 +946,7 @@ backup_file_printf(const char *filename, const char *fmt, ...)
static static
bool bool
run_data_threads(datadir_iter_t *it, os_thread_func_t func, uint n) run_data_threads(datadir_iter_t *it, void (*func)(datadir_thread_ctxt_t *ctxt), uint n)
{ {
datadir_thread_ctxt_t *data_threads; datadir_thread_ctxt_t *data_threads;
uint i, count; uint i, count;
...@@ -965,7 +964,7 @@ run_data_threads(datadir_iter_t *it, os_thread_func_t func, uint n) ...@@ -965,7 +964,7 @@ run_data_threads(datadir_iter_t *it, os_thread_func_t func, uint n)
data_threads[i].n_thread = i + 1; data_threads[i].n_thread = i + 1;
data_threads[i].count = &count; data_threads[i].count = &count;
data_threads[i].count_mutex = &count_mutex; data_threads[i].count_mutex = &count_mutex;
data_threads[i].id = os_thread_create(func, data_threads + i); std::thread(func, data_threads + i).detach();
} }
/* Wait for threads to exit */ /* Wait for threads to exit */
...@@ -2036,13 +2035,10 @@ decrypt_decompress_file(const char *filepath, uint thread_n) ...@@ -2036,13 +2035,10 @@ decrypt_decompress_file(const char *filepath, uint thread_n)
return(true); return(true);
} }
static static void decrypt_decompress_thread_func(datadir_thread_ctxt_t *ctxt)
os_thread_ret_t STDCALL
decrypt_decompress_thread_func(void *arg)
{ {
bool ret = true; bool ret = true;
datadir_node_t node; datadir_node_t node;
datadir_thread_ctxt_t *ctxt = (datadir_thread_ctxt_t *)(arg);
datadir_node_init(&node); datadir_node_init(&node);
...@@ -2072,9 +2068,6 @@ decrypt_decompress_thread_func(void *arg) ...@@ -2072,9 +2068,6 @@ decrypt_decompress_thread_func(void *arg)
pthread_mutex_unlock(ctxt->count_mutex); pthread_mutex_unlock(ctxt->count_mutex);
ctxt->ret = ret; ctxt->ret = ret;
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
} }
bool bool
......
...@@ -799,7 +799,7 @@ wait_for_no_updates(MYSQL *connection, uint timeout, uint threshold) ...@@ -799,7 +799,7 @@ wait_for_no_updates(MYSQL *connection, uint timeout, uint threshold)
return(false); return(false);
} }
static os_thread_ret_t DECLARE_THREAD(kill_query_thread)(void*) static void kill_query_thread()
{ {
mysql_mutex_lock(&kill_query_thread_mutex); mysql_mutex_lock(&kill_query_thread_mutex);
...@@ -835,9 +835,6 @@ static os_thread_ret_t DECLARE_THREAD(kill_query_thread)(void*) ...@@ -835,9 +835,6 @@ static os_thread_ret_t DECLARE_THREAD(kill_query_thread)(void*)
kill_query_thread_running= false; kill_query_thread_running= false;
mysql_cond_signal(&kill_query_thread_stopped); mysql_cond_signal(&kill_query_thread_stopped);
mysql_mutex_unlock(&kill_query_thread_mutex); mysql_mutex_unlock(&kill_query_thread_mutex);
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
} }
...@@ -849,7 +846,7 @@ static void start_query_killer() ...@@ -849,7 +846,7 @@ static void start_query_killer()
mysql_mutex_init(0, &kill_query_thread_mutex, nullptr); mysql_mutex_init(0, &kill_query_thread_mutex, nullptr);
mysql_cond_init(0, &kill_query_thread_stop, nullptr); mysql_cond_init(0, &kill_query_thread_stop, nullptr);
mysql_cond_init(0, &kill_query_thread_stopped, nullptr); mysql_cond_init(0, &kill_query_thread_stopped, nullptr);
os_thread_create(kill_query_thread); std::thread(kill_query_thread).detach();
} }
static void stop_query_killer() static void stop_query_killer()
......
...@@ -898,7 +898,6 @@ typedef struct { ...@@ -898,7 +898,6 @@ typedef struct {
uint num; uint num;
uint *count; uint *count;
pthread_mutex_t* count_mutex; pthread_mutex_t* count_mutex;
os_thread_id_t id;
CorruptedPages *corrupted_pages; CorruptedPages *corrupted_pages;
} data_thread_ctxt_t; } data_thread_ctxt_t;
...@@ -3013,7 +3012,7 @@ void backup_wait_for_lsn(lsn_t lsn) { ...@@ -3013,7 +3012,7 @@ void backup_wait_for_lsn(lsn_t lsn) {
extern lsn_t server_lsn_after_lock; extern lsn_t server_lsn_after_lock;
static os_thread_ret_t DECLARE_THREAD(log_copying_thread)(void*) static void log_copying_thread()
{ {
my_thread_init(); my_thread_init();
mysql_mutex_lock(&log_sys.mutex); mysql_mutex_lock(&log_sys.mutex);
...@@ -3027,8 +3026,6 @@ static os_thread_ret_t DECLARE_THREAD(log_copying_thread)(void*) ...@@ -3027,8 +3026,6 @@ static os_thread_ret_t DECLARE_THREAD(log_copying_thread)(void*)
log_copying_running= false; log_copying_running= false;
mysql_mutex_unlock(&log_sys.mutex); mysql_mutex_unlock(&log_sys.mutex);
my_thread_end(); my_thread_end();
os_thread_exit();
return 0;
} }
static bool have_io_watching_thread; static bool have_io_watching_thread;
...@@ -3096,15 +3093,9 @@ void dbug_mariabackup_event(const char *event,const char *key) ...@@ -3096,15 +3093,9 @@ void dbug_mariabackup_event(const char *event,const char *key)
} }
#endif // DBUG_OFF #endif // DBUG_OFF
/************************************************************************** /** Datafiles copying thread.*/
Datafiles copying thread.*/ static void data_copy_thread_func(data_thread_ctxt_t *ctxt) /* thread context */
static
os_thread_ret_t
DECLARE_THREAD(data_copy_thread_func)(
/*==================*/
void *arg) /* thread context */
{ {
data_thread_ctxt_t *ctxt = (data_thread_ctxt_t *) arg;
uint num = ctxt->num; uint num = ctxt->num;
fil_node_t* node; fil_node_t* node;
ut_ad(ctxt->corrupted_pages); ut_ad(ctxt->corrupted_pages);
...@@ -3136,8 +3127,6 @@ DECLARE_THREAD(data_copy_thread_func)( ...@@ -3136,8 +3127,6 @@ DECLARE_THREAD(data_copy_thread_func)(
pthread_mutex_unlock(ctxt->count_mutex); pthread_mutex_unlock(ctxt->count_mutex);
my_thread_end(); my_thread_end();
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
} }
/************************************************************************ /************************************************************************
...@@ -4424,7 +4413,7 @@ static bool xtrabackup_backup_func() ...@@ -4424,7 +4413,7 @@ static bool xtrabackup_backup_func()
DBUG_MARIABACKUP_EVENT("before_innodb_log_copy_thread_started",0); DBUG_MARIABACKUP_EVENT("before_innodb_log_copy_thread_started",0);
mysql_cond_init(0, &log_copying_stop, nullptr); mysql_cond_init(0, &log_copying_stop, nullptr);
os_thread_create(log_copying_thread); std::thread(log_copying_thread).detach();
/* FLUSH CHANGED_PAGE_BITMAPS call */ /* FLUSH CHANGED_PAGE_BITMAPS call */
if (!flush_changed_page_bitmaps()) { if (!flush_changed_page_bitmaps()) {
...@@ -4466,8 +4455,7 @@ static bool xtrabackup_backup_func() ...@@ -4466,8 +4455,7 @@ static bool xtrabackup_backup_func()
data_threads[i].count = &count; data_threads[i].count = &count;
data_threads[i].count_mutex = &count_mutex; data_threads[i].count_mutex = &count_mutex;
data_threads[i].corrupted_pages = &corrupted_pages; data_threads[i].corrupted_pages = &corrupted_pages;
data_threads[i].id = os_thread_create(data_copy_thread_func, std::thread(data_copy_thread_func, data_threads + i).detach();
data_threads + i);
} }
/* Wait for threads to exit */ /* Wait for threads to exit */
......
...@@ -2025,11 +2025,9 @@ static ulint page_cleaner_flush_pages_recommendation(ulint last_pages_in, ...@@ -2025,11 +2025,9 @@ static ulint page_cleaner_flush_pages_recommendation(ulint last_pages_in,
return(n_pages); return(n_pages);
} }
/******************************************************************//** /** page_cleaner thread tasked with flushing dirty pages from the buffer
page_cleaner thread tasked with flushing dirty pages from the buffer pools. As of now we'll have only one coordinator. */
pools. As of now we'll have only one coordinator. static void buf_flush_page_cleaner()
@return a dummy parameter */
static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
{ {
my_thread_init(); my_thread_init();
#ifdef UNIV_PFS_THREAD #ifdef UNIV_PFS_THREAD
...@@ -2253,11 +2251,10 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*) ...@@ -2253,11 +2251,10 @@ static os_thread_ret_t DECLARE_THREAD(buf_flush_page_cleaner)(void*)
mysql_mutex_unlock(&buf_pool.flush_list_mutex); mysql_mutex_unlock(&buf_pool.flush_list_mutex);
my_thread_end(); my_thread_end();
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN; #ifdef UNIV_PFS_THREAD
pfs_delete_thread();
#endif
} }
/** Initialize page_cleaner. */ /** Initialize page_cleaner. */
...@@ -2269,7 +2266,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init() ...@@ -2269,7 +2266,7 @@ ATTRIBUTE_COLD void buf_flush_page_cleaner_init()
srv_operation == SRV_OPERATION_RESTORE_EXPORT); srv_operation == SRV_OPERATION_RESTORE_EXPORT);
buf_flush_sync_lsn= 0; buf_flush_sync_lsn= 0;
buf_page_cleaner_is_active= true; buf_page_cleaner_is_active= true;
os_thread_create(buf_flush_page_cleaner); std::thread(buf_flush_page_cleaner).detach();
} }
/** @return the number of dirty pages in the buffer pool */ /** @return the number of dirty pages in the buffer pool */
......
...@@ -2062,10 +2062,9 @@ static void fil_crypt_complete_rotate_space(rotate_thread_t* state) ...@@ -2062,10 +2062,9 @@ static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
mysql_mutex_unlock(&crypt_data->mutex); mysql_mutex_unlock(&crypt_data->mutex);
} }
/*********************************************************************//** /** A thread which monitors global key state and rotates tablespaces
A thread which monitors global key state and rotates tablespaces accordingly accordingly */
@return a dummy parameter */ static void fil_crypt_thread()
static os_thread_ret_t DECLARE_THREAD(fil_crypt_thread)(void*)
{ {
mysql_mutex_lock(&fil_crypt_threads_mutex); mysql_mutex_lock(&fil_crypt_threads_mutex);
rotate_thread_t thr(srv_n_fil_crypt_threads_started++); rotate_thread_t thr(srv_n_fil_crypt_threads_started++);
...@@ -2144,12 +2143,9 @@ static os_thread_ret_t DECLARE_THREAD(fil_crypt_thread)(void*) ...@@ -2144,12 +2143,9 @@ static os_thread_ret_t DECLARE_THREAD(fil_crypt_thread)(void*)
pthread_cond_signal(&fil_crypt_cond); /* signal that we stopped */ pthread_cond_signal(&fil_crypt_cond); /* signal that we stopped */
mysql_mutex_unlock(&fil_crypt_threads_mutex); mysql_mutex_unlock(&fil_crypt_threads_mutex);
/* We count the number of threads in os_thread_exit(). A created #ifdef UNIV_PFS_THREAD
thread should always use that to exit and not use return() to exit. */ pfs_delete_thread();
#endif
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
} }
/********************************************************************* /*********************************************************************
...@@ -2172,10 +2168,12 @@ fil_crypt_set_thread_cnt( ...@@ -2172,10 +2168,12 @@ fil_crypt_set_thread_cnt(
uint add = new_cnt - srv_n_fil_crypt_threads; uint add = new_cnt - srv_n_fil_crypt_threads;
srv_n_fil_crypt_threads = new_cnt; srv_n_fil_crypt_threads = new_cnt;
for (uint i = 0; i < add; i++) { for (uint i = 0; i < add; i++) {
std::thread thd(fil_crypt_thread);
ib::info() << "Creating #" ib::info() << "Creating #"
<< i+1 << " encryption thread id " << i+1 << " encryption thread id "
<< os_thread_create(fil_crypt_thread) << thd.get_id()
<< " total threads " << new_cnt << "."; << " total threads " << new_cnt << ".";
thd.detach();
} }
} else if (new_cnt < srv_n_fil_crypt_threads) { } else if (new_cnt < srv_n_fil_crypt_threads) {
srv_n_fil_crypt_threads = new_cnt; srv_n_fil_crypt_threads = new_cnt;
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2020, MariaDB Corporation. Copyright (c) 2017, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -28,48 +28,15 @@ Created 9/8/1995 Heikki Tuuri ...@@ -28,48 +28,15 @@ Created 9/8/1995 Heikki Tuuri
#pragma once #pragma once
#include "univ.i" #include "univ.i"
/* Possible fixed priorities for threads */
#define OS_THREAD_PRIORITY_NONE 100
#define OS_THREAD_PRIORITY_BACKGROUND 1
#define OS_THREAD_PRIORITY_NORMAL 2
#define OS_THREAD_PRIORITY_ABOVE_NORMAL 3
#ifdef _WIN32 #ifdef _WIN32
typedef DWORD os_thread_t;
typedef DWORD os_thread_id_t; /*!< In Windows the thread id typedef DWORD os_thread_id_t; /*!< In Windows the thread id
is an unsigned long int */ is an unsigned long int */
extern "C" {
typedef LPTHREAD_START_ROUTINE os_thread_func_t;
}
/** Macro for specifying a Windows thread start function. */
#define DECLARE_THREAD(func) WINAPI func
#else #else
typedef pthread_t os_thread_t;
typedef pthread_t os_thread_id_t; /*!< In Unix we use the thread typedef pthread_t os_thread_id_t; /*!< In Unix we use the thread
handle itself as the id of handle itself as the id of
the thread */ the thread */
extern "C" { typedef void* (*os_thread_func_t)(void*); }
/** Macro for specifying a POSIX thread start function. */
#define DECLARE_THREAD(func) func
#endif /* _WIN32 */ #endif /* _WIN32 */
/* Define a function pointer type to use in a typecast */
typedef void* (*os_posix_f_t) (void*);
#define os_thread_eq(a,b) IF_WIN(a == b, pthread_equal(a, b)) #define os_thread_eq(a,b) IF_WIN(a == b, pthread_equal(a, b))
#define os_thread_get_curr_id() IF_WIN(GetCurrentThreadId(), pthread_self()) #define os_thread_get_curr_id() IF_WIN(GetCurrentThreadId(), pthread_self())
/****************************************************************//**
Creates a new thread of execution. The execution starts from
the function given.
NOTE: We count the number of threads in os_thread_exit(). A created
thread should always use that to exit so thatthe thread count will be
decremented.
We do not return an error code because if there is one, we crash here. */
os_thread_t os_thread_create(os_thread_func_t func, void *arg= nullptr);
/** Detach and terminate the current thread. */
ATTRIBUTE_NORETURN void os_thread_exit();
...@@ -50,11 +50,8 @@ Rollback or clean up any incomplete transactions which were ...@@ -50,11 +50,8 @@ Rollback or clean up any incomplete transactions which were
encountered in crash recovery. If the transaction already was encountered in crash recovery. If the transaction already was
committed, then we clean up a possible insert undo log. If the committed, then we clean up a possible insert undo log. If the
transaction was not yet committed, then we roll it back. transaction was not yet committed, then we roll it back.
Note: this is done in a background thread. Note: this is done in a background thread. */
@return a dummy parameter */ void trx_rollback_all_recovered(void*);
extern "C"
os_thread_ret_t
DECLARE_THREAD(trx_rollback_all_recovered)(void*);
/*********************************************************************//** /*********************************************************************//**
Creates a rollback command node struct. Creates a rollback command node struct.
@return own: rollback node struct */ @return own: rollback node struct */
......
...@@ -26,74 +26,3 @@ Created 9/8/1995 Heikki Tuuri ...@@ -26,74 +26,3 @@ Created 9/8/1995 Heikki Tuuri
#include "univ.i" #include "univ.i"
#include "srv0srv.h" #include "srv0srv.h"
/****************************************************************//**
Creates a new thread of execution. The execution starts from
the function given.
NOTE: We count the number of threads in os_thread_exit(). A created
thread should always use that to exit so thatthe thread count will be
decremented.
We do not return an error code because if there is one, we crash here. */
os_thread_t os_thread_create(os_thread_func_t func, void *arg)
{
os_thread_id_t new_thread_id;
#ifdef _WIN32
HANDLE handle;
handle = CreateThread(NULL, /* no security attributes */
0, /* default size stack */
func,
arg,
0, /* thread runs immediately */
&new_thread_id);
if (!handle) {
/* If we cannot start a new thread, life has no meaning. */
ib::fatal() << "CreateThread returned " << GetLastError();
}
CloseHandle(handle);
return((os_thread_t)new_thread_id);
#else /* _WIN32 else */
pthread_attr_t attr;
int ret = pthread_attr_init(&attr);
if (UNIV_UNLIKELY(ret)) {
fprintf(stderr,
"InnoDB: Error: pthread_attr_init() returned %d\n",
ret);
abort();
}
ret = pthread_create(&new_thread_id, &attr, func, arg);
ut_a(ret == 0);
pthread_attr_destroy(&attr);
#endif /* not _WIN32 */
return((os_thread_t)new_thread_id);
}
/** Detach and terminate the current thread. */
ATTRIBUTE_NORETURN void os_thread_exit()
{
#ifdef UNIV_DEBUG_THREAD_CREATION
ib::info() << "Thread exits, id " << os_thread_get_curr_id();
#endif
#ifdef UNIV_PFS_THREAD
pfs_delete_thread();
#endif
#ifdef _WIN32
ExitThread(0);
#else
pthread_detach(pthread_self());
pthread_exit(NULL);
#endif
}
...@@ -1042,6 +1042,11 @@ static dberr_t find_and_check_log_file(bool &log_file_found) ...@@ -1042,6 +1042,11 @@ static dberr_t find_and_check_log_file(bool &log_file_found)
return DB_SUCCESS; return DB_SUCCESS;
} }
static tpool::task_group rollback_all_recovered_group(1);
static tpool::task rollback_all_recovered_task(trx_rollback_all_recovered,
nullptr,
&rollback_all_recovered_group);
/** Start InnoDB. /** Start InnoDB.
@param[in] create_new_db whether to create a new database @param[in] create_new_db whether to create a new database
@return DB_SUCCESS or error code */ @return DB_SUCCESS or error code */
...@@ -1785,7 +1790,7 @@ dberr_t srv_start(bool create_new_db) ...@@ -1785,7 +1790,7 @@ dberr_t srv_start(bool create_new_db)
/* Rollback incomplete non-DDL transactions */ /* Rollback incomplete non-DDL transactions */
trx_rollback_is_active = true; trx_rollback_is_active = true;
os_thread_create(trx_rollback_all_recovered); srv_thread_pool->submit_task(&rollback_all_recovered_task);
} }
} }
......
...@@ -800,19 +800,11 @@ Rollback or clean up any incomplete transactions which were ...@@ -800,19 +800,11 @@ Rollback or clean up any incomplete transactions which were
encountered in crash recovery. If the transaction already was encountered in crash recovery. If the transaction already was
committed, then we clean up a possible insert undo log. If the committed, then we clean up a possible insert undo log. If the
transaction was not yet committed, then we roll it back. transaction was not yet committed, then we roll it back.
Note: this is done in a background thread. Note: this is done in a background thread. */
@return a dummy parameter */ void trx_rollback_all_recovered(void*)
extern "C"
os_thread_ret_t
DECLARE_THREAD(trx_rollback_all_recovered)(void*)
{ {
my_thread_init();
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
#ifdef UNIV_PFS_THREAD
pfs_register_thread(trx_rollback_clean_thread_key);
#endif /* UNIV_PFS_THREAD */
if (trx_sys.rw_trx_hash.size()) { if (trx_sys.rw_trx_hash.size()) {
ib::info() << "Starting in background the rollback of" ib::info() << "Starting in background the rollback of"
" recovered transactions"; " recovered transactions";
...@@ -822,14 +814,6 @@ DECLARE_THREAD(trx_rollback_all_recovered)(void*) ...@@ -822,14 +814,6 @@ DECLARE_THREAD(trx_rollback_all_recovered)(void*)
} }
trx_rollback_is_active = false; trx_rollback_is_active = false;
my_thread_end();
/* We count the number of threads in os_thread_exit(). A created
thread should always use that to exit and not use return() to exit. */
os_thread_exit();
OS_THREAD_DUMMY_RETURN;
} }
/****************************************************************//** /****************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment