Commit 047754a7 authored by Daniele Sciascia's avatar Daniele Sciascia

Cleanup wsrep_schema and remove all references to wsrep_thd_pool

* Removed all references related to wsrep_thd_pool (which was removed)

* Removed unused declarations in wsrep_schema.h

* The following would result invalid reads in
  Wsrep_schema::replay_transaction():
  ```
  frag_table->field[4]->val_str(&buf);

  Wsrep_schema_impl::end_index_scan(frag_table);
  Wsrep_schema_impl::finish_stmt(thd);
  ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length());
  ```

  because `buf` was accessed after closing the table. The fix is to
  perform storage reads using a different THD.

* In Wsrep_schema::recover_sr_transactions(), cluster_table was opened
  for write, however it is only read here. And frag_table was opened
  for read, wereas write is potentially needed.
  Also, avoid copy caused by String::c_ptr() to zero terminate the c
  string, use c_ptr_quick instead.
parent a081a998
...@@ -303,7 +303,6 @@ extern mysql_mutex_t LOCK_wsrep_slave_threads; ...@@ -303,7 +303,6 @@ extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_SR_pool; extern mysql_mutex_t LOCK_wsrep_SR_pool;
extern mysql_mutex_t LOCK_wsrep_SR_store; extern mysql_mutex_t LOCK_wsrep_SR_store;
extern mysql_mutex_t LOCK_wsrep_thd_pool;
extern mysql_mutex_t LOCK_wsrep_config_state; extern mysql_mutex_t LOCK_wsrep_config_state;
extern my_bool wsrep_emulate_bin_log; extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation; extern int wsrep_to_isolation;
...@@ -330,7 +329,6 @@ extern PSI_mutex_key key_LOCK_wsrep_slave_threads; ...@@ -330,7 +329,6 @@ extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync; extern PSI_mutex_key key_LOCK_wsrep_desync;
extern PSI_mutex_key key_LOCK_wsrep_SR_pool; extern PSI_mutex_key key_LOCK_wsrep_SR_pool;
extern PSI_mutex_key key_LOCK_wsrep_SR_store; extern PSI_mutex_key key_LOCK_wsrep_SR_store;
extern PSI_mutex_key key_LOCK_wsrep_thd_pool;
extern PSI_mutex_key key_LOCK_wsrep_global_seqno; extern PSI_mutex_key key_LOCK_wsrep_global_seqno;
extern PSI_mutex_key key_LOCK_wsrep_thd_queue; extern PSI_mutex_key key_LOCK_wsrep_thd_queue;
extern PSI_cond_key key_COND_wsrep_thd_queue; extern PSI_cond_key key_COND_wsrep_thd_queue;
......
/* Copyright (C) 2015-2017 Codership Oy <info@codership.com> /* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd) ...@@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->real_id=pthread_self(); // Keep purify happy thd->real_id=pthread_self(); // Keep purify happy
WSREP_DEBUG("Wsrep_thd_pool: creating system thread: %lld",
(long long)thd->thread_id);
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime; thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
(void) mysql_mutex_unlock(&LOCK_thread_count); (void) mysql_mutex_unlock(&LOCK_thread_count);
...@@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd, ...@@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd,
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
int Wsrep_schema::replay_transaction(THD* thd, int Wsrep_schema::replay_transaction(THD* orig_thd,
Relay_log_info* rli, Relay_log_info* rli,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments) const std::vector<wsrep::seqno>& fragments)
...@@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd, ...@@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd,
DBUG_ENTER("Wsrep_schema::replay_transaction"); DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty()); DBUG_ASSERT(!fragments.empty());
Wsrep_schema_impl::wsrep_off wsrep_off(thd); THD thd(next_thread_id(), true);
Wsrep_schema_impl::binlog_off binlog_off(thd); thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &thd);
Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
Wsrep_schema_impl::binlog_off binlog_off(&thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
int ret= 1; int ret= 1;
int error; int error;
...@@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd, ...@@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd,
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin(); for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i) i != fragments.end(); ++i)
{ {
Wsrep_schema_impl::init_stmt(thd); Wsrep_schema_impl::init_stmt(&thd);
if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table))) if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
{ {
WSREP_WARN("Could not open SR table for read: %d", error); WSREP_WARN("Could not open SR table for read: %d", error);
Wsrep_schema_impl::finish_stmt(thd); Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd, ...@@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd,
String buf; String buf;
frag_table->field[4]->val_str(&buf); frag_table->field[4]->val_str(&buf);
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length());
if (ret)
{ {
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments"); Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
break;
ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
break;
}
} }
Wsrep_schema_impl::init_stmt(thd);
if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))) Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(&thd);
Wsrep_schema_impl::init_stmt(&thd);
if ((error= Wsrep_schema_impl::open_for_write(&thd,
sr_table_str.c_str(),
&frag_table)))
{ {
WSREP_WARN("Could not open SR table for write: %d", error); WSREP_WARN("Could not open SR table for write: %d", error);
Wsrep_schema_impl::finish_stmt(thd); Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
error= Wsrep_schema_impl::init_for_index_scan(frag_table, error= Wsrep_schema_impl::init_for_index_scan(frag_table,
...@@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd, ...@@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd,
break; break;
} }
Wsrep_schema_impl::end_index_scan(frag_table); Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd); Wsrep_schema_impl::finish_stmt(&thd);
} }
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd, ...@@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd,
int Wsrep_schema::recover_sr_transactions(THD *orig_thd) int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
{ {
DBUG_ENTER("Wsrep_schema::recover_sr_transactions"); DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
THD storage_thd(true, true); THD storage_thd(next_thread_id(), true);
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack : storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd); (char*) &storage_thd);
TABLE* frag_table= 0; TABLE* frag_table= 0;
TABLE* cluster_table= 0; TABLE* cluster_table= 0;
Wsrep_storage_service storage_service(&storage_thd); Wsrep_storage_service storage_service(&storage_thd);
Wsrep_schema_impl::binlog_off binlog_off(&storage_thd); Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
Wsrep_schema_impl::wsrep_off binglog_off(&storage_thd); Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
&storage_thd); &storage_thd);
Wsrep_server_state& server_state(Wsrep_server_state::instance()); Wsrep_server_state& server_state(Wsrep_server_state::instance());
...@@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
Wsrep_schema_impl::init_stmt(&storage_thd); Wsrep_schema_impl::init_stmt(&storage_thd);
storage_thd.wsrep_skip_locking= FALSE; storage_thd.wsrep_skip_locking= FALSE;
/* if (Wsrep_schema_impl::open_for_read(&storage_thd,
Open the table for reading and writing so that fragments without cluster_table_str.c_str(),
valid seqno can be deleted. &cluster_table) ||
*/
if (Wsrep_schema_impl::open_for_write(&storage_thd,
cluster_table_str.c_str(),
&cluster_table) ||
Wsrep_schema_impl::init_for_scan(cluster_table)) Wsrep_schema_impl::init_for_scan(cluster_table))
{ {
Wsrep_schema_impl::finish_stmt(&storage_thd); Wsrep_schema_impl::finish_stmt(&storage_thd);
...@@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
storage_thd.wsrep_skip_locking= TRUE; storage_thd.wsrep_skip_locking= TRUE;
Wsrep_schema_impl::init_stmt(&storage_thd); Wsrep_schema_impl::init_stmt(&storage_thd);
if (Wsrep_schema_impl::open_for_read(&storage_thd, sr_table_str.c_str(), &frag_table) ||
/*
Open the table for reading and writing so that fragments without
valid seqno can be deleted.
*/
if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
Wsrep_schema_impl::init_for_scan(frag_table)) Wsrep_schema_impl::init_for_scan(frag_table))
{ {
WSREP_ERROR("Failed to open SR table for read"); WSREP_ERROR("Failed to open SR table for write");
goto out; goto out;
} }
...@@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
String data_str; String data_str;
(void)frag_table->field[4]->val_str(&data_str); (void)frag_table->field[4]->val_str(&data_str);
wsrep::const_buffer data(data_str.c_ptr(), data_str.length()); wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
wsrep::ws_meta ws_meta(gtid, wsrep::ws_meta ws_meta(gtid,
wsrep::stid(server_id, wsrep::stid(server_id,
transaction_id, transaction_id,
...@@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) ...@@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
wsrep::high_priority_service* applier; wsrep::high_priority_service* applier;
if (!(applier= server_state.find_streaming_applier(server_id, if (!(applier= server_state.find_streaming_applier(server_id,
transaction_id))) transaction_id)))
{ {
DBUG_ASSERT(wsrep::starts_transaction(flags)); DBUG_ASSERT(wsrep::starts_transaction(flags));
THD* thd= new THD(true, true); THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd; thd->thread_stack= (char*)&storage_thd;
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= next_thread_id();
thd->real_id= pthread_self(); thd->real_id= pthread_self();
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
......
/* Copyright (C) 2015-2018 Codership Oy <info@codership.com> /* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -20,13 +20,9 @@ ...@@ -20,13 +20,9 @@
/* wsrep-lib */ /* wsrep-lib */
#include "wsrep_types.h" #include "wsrep_types.h"
#include "mysqld.h" #include "mysqld.h"
#include "thr_lock.h" /* enum thr_lock_type */
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
#include <string>
/* /*
Forward decls Forward decls
*/ */
...@@ -64,14 +60,6 @@ class Wsrep_schema ...@@ -64,14 +60,6 @@ class Wsrep_schema
*/ */
Wsrep_view restore_view(THD* thd, const Wsrep_id& own_id) const; Wsrep_view restore_view(THD* thd, const Wsrep_id& own_id) const;
/*
Append transaction fragment to fragment storage.
Starts a trx using a THD from thd_pool, does not commit.
Should be followed by a call to update_frag_seqno(), or
release_SR_thd() if wsrep->certify() fails.
*/
THD* append_frag(const wsrep_trx_meta_t&, uint32_t,
const unsigned char*, size_t);
/** /**
Append transaction fragment to fragment storage. Append transaction fragment to fragment storage.
Transaction must have been started for THD before this call. Transaction must have been started for THD before this call.
...@@ -145,11 +133,6 @@ class Wsrep_schema ...@@ -145,11 +133,6 @@ class Wsrep_schema
*/ */
int recover_sr_transactions(THD* orig_thd); int recover_sr_transactions(THD* orig_thd);
/*
Close wsrep schema.
*/
void close();
private: private:
/* Non-copyable */ /* Non-copyable */
Wsrep_schema(const Wsrep_schema&); Wsrep_schema(const Wsrep_schema&);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment