Commit 46f9d590 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub Committed by Dave Gosselin

mariadb-import, multithreading - improve error handling

In case of error in one connection, issue KILL CONNECTION for all others,
or server will still do LOAD FILE, possibly for hours, after we exited.
parent c7660290
......@@ -45,6 +45,9 @@
#include <my_dir.h>
tpool::thread_pool *thread_pool;
static std::vector<MYSQL *> all_tp_connections;
std::atomic<bool> aborting{false};
static void kill_tp_connections(MYSQL *mysql);
static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql);
......@@ -479,9 +482,12 @@ static int exec_sql(MYSQL *mysql, const std::string& s)
{
if (mysql_query(mysql, s.c_str()))
{
fprintf(stdout,"Error: %d, %s, when using statement: %s\n",
mysql_errno(mysql), mysql_error(mysql), s.c_str());
db_error(mysql);
if (!aborting)
{
fprintf(stdout, "Error: %d, %s, when using statement: %s\n",
mysql_errno(mysql), mysql_error(mysql), s.c_str());
db_error(mysql);
}
return 1;
}
return 0;
......@@ -582,6 +588,8 @@ static int handle_one_table(const table_load_params *params, MYSQL *mysql)
DBUG_ENTER("handle_one_table");
DBUG_PRINT("enter",("datafile: %s",params->data_file.c_str()));
if (aborting)
return 1;
if (verbose && !params->sql_file.empty())
{
fprintf(stdout, "Executing SQL script %s\n", params->sql_file.c_str());
......@@ -828,20 +836,34 @@ static void db_disconnect(char *host, MYSQL *mysql)
static void safe_exit(int error, MYSQL *mysql)
{
if (error && ignore_errors)
return;
if (mysql)
mysql_close(mysql);
bool expected= false;
if (!aborting.compare_exchange_strong(expected, true))
return;
if (thread_pool)
{
/* dirty exit. some threads are running,
memory is not freed, openssl not deinitialized */
memory is not freed, openssl not deinitialized */
DBUG_ASSERT(error);
if (mysql)
{
/*
We still need tell server to kill all connections
so it does not keep busy with load.
*/
kill_tp_connections(mysql);
}
_exit(error);
}
if (mysql)
{
mysql_close(mysql);
}
mysql_library_end();
free_defaults(argv_to_free);
my_free(opt_password);
......@@ -853,6 +875,8 @@ static void safe_exit(int error, MYSQL *mysql)
static void db_error_with_table(MYSQL *mysql, char *table)
{
if (aborting)
return;
my_printf_error(0,"Error: %d, %s, when using table: %s",
MYF(0), mysql_errno(mysql), mysql_error(mysql), table);
safe_exit(1, mysql);
......@@ -947,16 +971,48 @@ void load_single_table(void *arg)
set_exitcode(error);
}
static void init_tp_connections(size_t n)
{
for (size_t i= 0; i < n; i++)
{
MYSQL *mysql=
db_connect(current_host, current_db, current_user, opt_password);
all_tp_connections.push_back(mysql);
}
}
static void close_tp_connections()
{
for (auto &conn : all_tp_connections)
{
db_disconnect(current_host, conn);
}
all_tp_connections.clear();
}
/*
If we end with an error, in one connection,
we need to kill all others.
Otherwise, server will still be busy with load,
when we already exited.
*/
static void kill_tp_connections(MYSQL *mysql)
{
for (auto &conn : all_tp_connections)
mysql_kill(mysql, mysql_thread_id(conn));
}
static void tpool_thread_init(void)
{
mysql_thread_init();
thread_local_mysql= db_connect(current_host,current_db,current_user,opt_password);
static std::atomic<size_t> next_connection(0);
assert(next_connection < all_tp_connections.size());
thread_local_mysql= all_tp_connections[next_connection++];
}
static void tpool_thread_exit(void)
{
if (thread_local_mysql)
db_disconnect(current_host,thread_local_mysql);
mysql_thread_end();
}
......@@ -1144,6 +1200,7 @@ int main(int argc, char **argv)
fatal_error("Too many connections, max value for --parallel is %d\n",
MAX_THREADS);
}
init_tp_connections(opt_use_threads);
thread_pool= tpool::create_thread_pool_generic(opt_use_threads,opt_use_threads);
thread_pool->set_thread_callbacks(tpool_thread_init,tpool_thread_exit);
......@@ -1155,6 +1212,7 @@ int main(int argc, char **argv)
thread_pool->submit_task(&t);
delete thread_pool;
close_tp_connections();
thread_pool= nullptr;
files_to_load.clear();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment