Commit ce4f4b8b authored by Yuchen Pei's avatar Yuchen Pei

MDEV-31788 Factor spider locking and unlocking code around sending queries

parent f4e02905
This diff is collapsed.
...@@ -2579,7 +2579,6 @@ void *spider_bg_conn_action( ...@@ -2579,7 +2579,6 @@ void *spider_bg_conn_action(
} else { } else {
sql_type = SPIDER_SQL_TYPE_HANDLER; sql_type = SPIDER_SQL_TYPE_HANDLER;
} }
pthread_mutex_assert_not_owner(&conn->mta_conn_mutex);
if (spider->use_fields) if (spider->use_fields)
{ {
if ((error_num = dbton_handler->set_sql_for_exec(sql_type, if ((error_num = dbton_handler->set_sql_for_exec(sql_type,
...@@ -2598,16 +2597,12 @@ void *spider_bg_conn_action( ...@@ -2598,16 +2597,12 @@ void *spider_bg_conn_action(
strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd)); strmov(result_list->bgs_error_msg, spider_stmt_da_message(thd));
} }
} }
pthread_mutex_lock(&conn->mta_conn_mutex); /* todo: is it ok if the following statement is not locked? */
sql_type &= ~SPIDER_SQL_TYPE_TMP_SQL; sql_type &= ~SPIDER_SQL_TYPE_TMP_SQL;
DBUG_PRINT("info",("spider sql_type=%lu", sql_type)); DBUG_PRINT("info",("spider sql_type=%lu", sql_type));
if (!result_list->bgs_error) if (!result_list->bgs_error)
{ {
conn->need_mon = &spider->need_mons[conn->link_idx]; spider_lock_before_query(conn, &spider->need_mons[conn->link_idx]);
DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
if (!(result_list->bgs_error = if (!(result_list->bgs_error =
spider_db_set_names(spider, conn, conn->link_idx))) spider_db_set_names(spider, conn, conn->link_idx)))
{ {
...@@ -2676,13 +2671,7 @@ void *spider_bg_conn_action( ...@@ -2676,13 +2671,7 @@ void *spider_bg_conn_action(
strmov(result_list->bgs_error_msg, strmov(result_list->bgs_error_msg,
spider_stmt_da_message(thd)); spider_stmt_da_message(thd));
} }
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); spider_unlock_after_query(conn, 0);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
} else {
pthread_mutex_unlock(&conn->mta_conn_mutex);
} }
} else { } else {
spider->connection_ids[conn->link_idx] = conn->connection_id; spider->connection_ids[conn->link_idx] = conn->connection_id;
...@@ -2757,24 +2746,14 @@ void *spider_bg_conn_action( ...@@ -2757,24 +2746,14 @@ void *spider_bg_conn_action(
{ {
DBUG_PRINT("info",("spider bg exec sql start")); DBUG_PRINT("info",("spider bg exec sql start"));
spider = (ha_spider*) conn->bg_target; spider = (ha_spider*) conn->bg_target;
pthread_mutex_assert_not_owner(&conn->mta_conn_mutex); spider_lock_before_query(conn, &spider->need_mons[conn->link_idx]);
pthread_mutex_lock(&conn->mta_conn_mutex);
conn->need_mon = &spider->need_mons[conn->link_idx];
DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
*conn->bg_error_num = spider_db_query_with_set_names( *conn->bg_error_num = spider_db_query_with_set_names(
conn->bg_sql_type, conn->bg_sql_type,
spider, spider,
conn, conn,
conn->link_idx conn->link_idx
); );
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); spider_unlock_after_query(conn, 0);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
conn->bg_exec_sql = FALSE; conn->bg_exec_sql = FALSE;
continue; continue;
} }
...@@ -4145,3 +4124,42 @@ void spider_free_ipport_conn(void *info) ...@@ -4145,3 +4124,42 @@ void spider_free_ipport_conn(void *info)
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void spider_lock_before_query(SPIDER_CONN *conn, int *need_mon)
{
pthread_mutex_assert_not_owner(&conn->mta_conn_mutex);
pthread_mutex_lock(&conn->mta_conn_mutex);
conn->need_mon = need_mon;
DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
}
int spider_unlock_after_query(SPIDER_CONN *conn, int ret)
{
DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
return ret;
}
int spider_unlock_after_query_1(SPIDER_CONN *conn)
{
DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
return spider_db_errorno(conn);
}
int spider_unlock_after_query_2(SPIDER_CONN *conn, ha_spider *spider, int link_idx, TABLE *table)
{
DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
return spider_db_store_result(spider, link_idx, table);
}
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#define SPIDER_SIMPLE_RECORDS 3 #define SPIDER_SIMPLE_RECORDS 3
#define SPIDER_SIMPLE_CHECKSUM_TABLE 4 #define SPIDER_SIMPLE_CHECKSUM_TABLE 4
struct TABLE;
/* /*
The SPIDER_CONN_LOOP_CHECK has been added to the loop_check queue to The SPIDER_CONN_LOOP_CHECK has been added to the loop_check queue to
check for self-reference. check for self-reference.
...@@ -454,3 +456,11 @@ SPIDER_CONN* spider_get_conn_from_idle_connection ...@@ -454,3 +456,11 @@ SPIDER_CONN* spider_get_conn_from_idle_connection
int *error_num int *error_num
); );
void spider_free_ipport_conn(void *info); void spider_free_ipport_conn(void *info);
void spider_lock_before_query(SPIDER_CONN *conn, int *need_mon);
int spider_unlock_after_query(SPIDER_CONN *conn, int ret);
int spider_unlock_after_query_1(SPIDER_CONN *conn);
int spider_unlock_after_query_2(SPIDER_CONN *conn, ha_spider *spider, int link_idx, TABLE *table);
This diff is collapsed.
This diff is collapsed.
...@@ -1191,25 +1191,15 @@ static int spider_send_query( ...@@ -1191,25 +1191,15 @@ static int spider_send_query(
} }
} else { } else {
#endif #endif
pthread_mutex_assert_not_owner(&conn->mta_conn_mutex);
if ((error_num = dbton_hdl->set_sql_for_exec( if ((error_num = dbton_hdl->set_sql_for_exec(
SPIDER_SQL_TYPE_SELECT_SQL, link_idx, link_idx_chain))) SPIDER_SQL_TYPE_SELECT_SQL, link_idx, link_idx_chain)))
DBUG_RETURN(error_num); DBUG_RETURN(error_num);
pthread_mutex_lock(&conn->mta_conn_mutex); spider_lock_before_query(conn, &spider->need_mons[link_idx]);
conn->need_mon = &spider->need_mons[link_idx];
DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
if ((error_num = spider_db_set_names(spider, conn, if ((error_num = spider_db_set_names(spider, conn,
link_idx))) link_idx)))
if ((error_num = spider_db_set_names(spider, conn, link_idx))) if ((error_num = spider_db_set_names(spider, conn, link_idx)))
{ {
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); spider_unlock_after_query(conn, 0);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
if (spider->need_mons[link_idx]) if (spider->need_mons[link_idx])
error_num = fields->ping_table_mon_from_table(link_idx_chain); error_num = fields->ping_table_mon_from_table(link_idx_chain);
if ((error_num = spider->check_error_mode_eof(error_num)) == if ((error_num = spider->check_error_mode_eof(error_num)) ==
...@@ -1227,11 +1217,7 @@ static int spider_send_query( ...@@ -1227,11 +1217,7 @@ static int spider_send_query(
spider->result_list.quick_mode, spider->result_list.quick_mode,
&spider->need_mons[link_idx])) &spider->need_mons[link_idx]))
{ {
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); error_num= spider_unlock_after_query_1(conn);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
error_num = spider_db_errorno(conn);
if (spider->need_mons[link_idx]) if (spider->need_mons[link_idx])
error_num = fields->ping_table_mon_from_table(link_idx_chain); error_num = fields->ping_table_mon_from_table(link_idx_chain);
if ((error_num = spider->check_error_mode_eof(error_num)) == if ((error_num = spider->check_error_mode_eof(error_num)) ==
...@@ -1243,13 +1229,9 @@ static int spider_send_query( ...@@ -1243,13 +1229,9 @@ static int spider_send_query(
DBUG_RETURN(error_num); DBUG_RETURN(error_num);
} }
spider->connection_ids[link_idx] = conn->connection_id; spider->connection_ids[link_idx] = conn->connection_id;
DBUG_ASSERT(conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
if (fields->is_first_link_ok_chain(link_idx_chain)) if (fields->is_first_link_ok_chain(link_idx_chain))
{ {
if ((error_num = spider_db_store_result(spider, link_idx, table))) if ((error_num = spider_unlock_after_query_2(conn, spider, link_idx, table)))
{ {
if (error_num != HA_ERR_END_OF_FILE && spider->need_mons[link_idx]) if (error_num != HA_ERR_END_OF_FILE && spider->need_mons[link_idx])
error_num = fields->ping_table_mon_from_table(link_idx_chain); error_num = fields->ping_table_mon_from_table(link_idx_chain);
...@@ -1266,7 +1248,7 @@ static int spider_send_query( ...@@ -1266,7 +1248,7 @@ static int spider_send_query(
} else } else
{ {
spider_db_discard_result(spider, link_idx, conn); spider_db_discard_result(spider, link_idx, conn);
pthread_mutex_unlock(&conn->mta_conn_mutex); spider_unlock_after_query(conn, 0);
} }
#ifndef WITHOUT_SPIDER_BG_SEARCH #ifndef WITHOUT_SPIDER_BG_SEARCH
} }
......
...@@ -5879,20 +5879,10 @@ int spider_open_all_tables( ...@@ -5879,20 +5879,10 @@ int spider_open_all_tables(
} }
conn->error_mode &= spider_param_error_read_mode(thd, 0); conn->error_mode &= spider_param_error_read_mode(thd, 0);
conn->error_mode &= spider_param_error_write_mode(thd, 0); conn->error_mode &= spider_param_error_write_mode(thd, 0);
pthread_mutex_assert_not_owner(&conn->mta_conn_mutex); spider_lock_before_query(conn, &mon_val);
pthread_mutex_lock(&conn->mta_conn_mutex);
conn->need_mon = &mon_val;
DBUG_ASSERT(!conn->mta_conn_mutex_lock_already);
DBUG_ASSERT(!conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = TRUE;
conn->mta_conn_mutex_unlock_later = TRUE;
if ((error_num = spider_db_before_query(conn, &mon_val))) if ((error_num = spider_db_before_query(conn, &mon_val)))
{ {
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); spider_unlock_after_query(conn, 0);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
spider_sys_index_end(table_tables); spider_sys_index_end(table_tables);
spider_close_sys_table(thd, table_tables, spider_close_sys_table(thd, table_tables,
&open_tables_backup, TRUE); &open_tables_backup, TRUE);
...@@ -5901,11 +5891,7 @@ int spider_open_all_tables( ...@@ -5901,11 +5891,7 @@ int spider_open_all_tables(
free_root(&mem_root, MYF(0)); free_root(&mem_root, MYF(0));
DBUG_RETURN(error_num); DBUG_RETURN(error_num);
} }
DBUG_ASSERT(conn->mta_conn_mutex_lock_already); spider_unlock_after_query(conn, 0);
DBUG_ASSERT(conn->mta_conn_mutex_unlock_later);
conn->mta_conn_mutex_lock_already = FALSE;
conn->mta_conn_mutex_unlock_later = FALSE;
pthread_mutex_unlock(&conn->mta_conn_mutex);
if (lock && spider_param_use_snapshot_with_flush_tables(thd) == 2) if (lock && spider_param_use_snapshot_with_flush_tables(thd) == 2)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment