Commit c568e253 authored by Jan Lindström's avatar Jan Lindström Committed by GitHub

Merge pull request #1185 from codership/10.4-wsrep_schema_cleanup

Cleanup wsrep_schema and remove all references to wsrep_thd_pool
parents 677a1e7c 047754a7
......@@ -303,7 +303,6 @@ extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
extern mysql_mutex_t LOCK_wsrep_SR_pool;
extern mysql_mutex_t LOCK_wsrep_SR_store;
extern mysql_mutex_t LOCK_wsrep_thd_pool;
extern mysql_mutex_t LOCK_wsrep_config_state;
extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation;
......@@ -330,7 +329,6 @@ extern PSI_mutex_key key_LOCK_wsrep_slave_threads;
extern PSI_mutex_key key_LOCK_wsrep_desync;
extern PSI_mutex_key key_LOCK_wsrep_SR_pool;
extern PSI_mutex_key key_LOCK_wsrep_SR_store;
extern PSI_mutex_key key_LOCK_wsrep_thd_pool;
extern PSI_mutex_key key_LOCK_wsrep_global_seqno;
extern PSI_mutex_key key_LOCK_wsrep_thd_queue;
extern PSI_cond_key key_COND_wsrep_thd_queue;
......
/* Copyright (C) 2015-2017 Codership Oy <info@codership.com>
/* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -584,8 +584,6 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->real_id=pthread_self(); // Keep purify happy
WSREP_DEBUG("Wsrep_thd_pool: creating system thread: %lld",
(long long)thd->thread_id);
thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
(void) mysql_mutex_unlock(&LOCK_thread_count);
......@@ -1115,7 +1113,7 @@ int Wsrep_schema::remove_fragments(THD* thd,
DBUG_RETURN(ret);
}
int Wsrep_schema::replay_transaction(THD* thd,
int Wsrep_schema::replay_transaction(THD* orig_thd,
Relay_log_info* rli,
const wsrep::ws_meta& ws_meta,
const std::vector<wsrep::seqno>& fragments)
......@@ -1123,8 +1121,13 @@ int Wsrep_schema::replay_transaction(THD* thd,
DBUG_ENTER("Wsrep_schema::replay_transaction");
DBUG_ASSERT(!fragments.empty());
Wsrep_schema_impl::wsrep_off wsrep_off(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd);
THD thd(next_thread_id(), true);
thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &thd);
Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
Wsrep_schema_impl::binlog_off binlog_off(&thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
int ret= 1;
int error;
......@@ -1135,11 +1138,11 @@ int Wsrep_schema::replay_transaction(THD* thd,
for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
i != fragments.end(); ++i)
{
Wsrep_schema_impl::init_stmt(thd);
if ((error= Wsrep_schema_impl::open_for_read(thd, sr_table_str.c_str(), &frag_table)))
Wsrep_schema_impl::init_stmt(&thd);
if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
{
WSREP_WARN("Could not open SR table for read: %d", error);
Wsrep_schema_impl::finish_stmt(thd);
Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1);
}
......@@ -1169,20 +1172,28 @@ int Wsrep_schema::replay_transaction(THD* thd,
String buf;
frag_table->field[4]->val_str(&buf);
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
ret= wsrep_apply_events(thd, rli, buf.c_ptr_safe(), buf.length());
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
break;
Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
break;
}
}
Wsrep_schema_impl::init_stmt(thd);
if ((error= Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)))
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(&thd);
Wsrep_schema_impl::init_stmt(&thd);
if ((error= Wsrep_schema_impl::open_for_write(&thd,
sr_table_str.c_str(),
&frag_table)))
{
WSREP_WARN("Could not open SR table for write: %d", error);
Wsrep_schema_impl::finish_stmt(thd);
Wsrep_schema_impl::finish_stmt(&thd);
DBUG_RETURN(1);
}
error= Wsrep_schema_impl::init_for_index_scan(frag_table,
......@@ -1206,7 +1217,7 @@ int Wsrep_schema::replay_transaction(THD* thd,
break;
}
Wsrep_schema_impl::end_index_scan(frag_table);
Wsrep_schema_impl::finish_stmt(thd);
Wsrep_schema_impl::finish_stmt(&thd);
}
DBUG_RETURN(ret);
......@@ -1215,14 +1226,14 @@ int Wsrep_schema::replay_transaction(THD* thd,
int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
{
DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
THD storage_thd(true, true);
THD storage_thd(next_thread_id(), true);
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd);
TABLE* frag_table= 0;
TABLE* cluster_table= 0;
Wsrep_storage_service storage_service(&storage_thd);
Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
Wsrep_schema_impl::wsrep_off binglog_off(&storage_thd);
Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
&storage_thd);
Wsrep_server_state& server_state(Wsrep_server_state::instance());
......@@ -1233,13 +1244,9 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
Wsrep_schema_impl::init_stmt(&storage_thd);
storage_thd.wsrep_skip_locking= FALSE;
/*
Open the table for reading and writing so that fragments without
valid seqno can be deleted.
*/
if (Wsrep_schema_impl::open_for_write(&storage_thd,
cluster_table_str.c_str(),
&cluster_table) ||
if (Wsrep_schema_impl::open_for_read(&storage_thd,
cluster_table_str.c_str(),
&cluster_table) ||
Wsrep_schema_impl::init_for_scan(cluster_table))
{
Wsrep_schema_impl::finish_stmt(&storage_thd);
......@@ -1273,10 +1280,15 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
storage_thd.wsrep_skip_locking= TRUE;
Wsrep_schema_impl::init_stmt(&storage_thd);
if (Wsrep_schema_impl::open_for_read(&storage_thd, sr_table_str.c_str(), &frag_table) ||
/*
Open the table for reading and writing so that fragments without
valid seqno can be deleted.
*/
if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
Wsrep_schema_impl::init_for_scan(frag_table))
{
WSREP_ERROR("Failed to open SR table for read");
WSREP_ERROR("Failed to open SR table for write");
goto out;
}
......@@ -1309,7 +1321,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
String data_str;
(void)frag_table->field[4]->val_str(&data_str);
wsrep::const_buffer data(data_str.c_ptr(), data_str.length());
wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
wsrep::ws_meta ws_meta(gtid,
wsrep::stid(server_id,
transaction_id,
......@@ -1319,14 +1331,13 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
wsrep::high_priority_service* applier;
if (!(applier= server_state.find_streaming_applier(server_id,
transaction_id)))
transaction_id)))
{
DBUG_ASSERT(wsrep::starts_transaction(flags));
THD* thd= new THD(true, true);
THD* thd= new THD(next_thread_id(), true);
thd->thread_stack= (char*)&storage_thd;
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= next_thread_id();
thd->real_id= pthread_self();
mysql_mutex_unlock(&LOCK_thread_count);
......
/* Copyright (C) 2015-2018 Codership Oy <info@codership.com>
/* Copyright (C) 2015-2019 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -20,13 +20,9 @@
/* wsrep-lib */
#include "wsrep_types.h"
#include "mysqld.h"
#include "thr_lock.h" /* enum thr_lock_type */
#include "wsrep_mysqld.h"
#include <string>
/*
Forward decls
*/
......@@ -64,14 +60,6 @@ class Wsrep_schema
*/
Wsrep_view restore_view(THD* thd, const Wsrep_id& own_id) const;
/*
Append transaction fragment to fragment storage.
Starts a trx using a THD from thd_pool, does not commit.
Should be followed by a call to update_frag_seqno(), or
release_SR_thd() if wsrep->certify() fails.
*/
THD* append_frag(const wsrep_trx_meta_t&, uint32_t,
const unsigned char*, size_t);
/**
Append transaction fragment to fragment storage.
Transaction must have been started for THD before this call.
......@@ -145,11 +133,6 @@ class Wsrep_schema
*/
int recover_sr_transactions(THD* orig_thd);
/*
Close wsrep schema.
*/
void close();
private:
/* Non-copyable */
Wsrep_schema(const Wsrep_schema&);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment