Commit b91ad17c authored by unknown's avatar unknown

MWL#116: Code simplifications for TC_LOG_MMAP.

Make TC_LOG_MMAP (and TC_LOG_DUMMY) derive directly from TC_LOG, avoiding the
inheritance hierarchy TC_LOG_queued->TC_LOG_unordered.

Put the wakeup facility for commit_ordered() calls into the THD class.

Some renaming to get better names.
parent b357fca4
...@@ -5778,43 +5778,12 @@ TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue) ...@@ -5778,43 +5778,12 @@ TC_LOG_queued::reverse_queue(TC_LOG_queued::TC_group_commit_entry *queue)
return prev; return prev;
} }
void int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
TC_LOG_queued::group_commit_wait_for_wakeup(TC_group_commit_entry *entry) bool need_prepare_ordered,
{ bool need_commit_ordered)
THD *thd= entry->thd;
pthread_mutex_lock(&thd->LOCK_commit_ordered);
while (!entry->group_commit_ready)
pthread_cond_wait(&thd->COND_commit_ordered,
&thd->LOCK_commit_ordered);
pthread_mutex_unlock(&thd->LOCK_commit_ordered);
}
void
TC_LOG_queued::group_commit_wakeup_other(TC_group_commit_entry *other)
{
THD *thd= other->thd;
pthread_mutex_lock(&thd->LOCK_commit_ordered);
other->group_commit_ready= TRUE;
pthread_cond_signal(&thd->COND_commit_ordered);
pthread_mutex_unlock(&thd->LOCK_commit_ordered);
}
TC_LOG_unordered::TC_LOG_unordered() : group_commit_queue_busy(0)
{
pthread_cond_init(&COND_queue_busy, 0);
}
TC_LOG_unordered::~TC_LOG_unordered()
{
pthread_cond_destroy(&COND_queue_busy);
}
int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered,
bool need_commit_ordered)
{ {
int cookie; int cookie;
struct TC_group_commit_entry entry; struct commit_entry entry;
bool is_group_commit_leader; bool is_group_commit_leader;
LINT_INIT(is_group_commit_leader); LINT_INIT(is_group_commit_leader);
...@@ -5828,18 +5797,18 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5828,18 +5797,18 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all,
Must put us in queue so we can run_commit_ordered() in same sequence Must put us in queue so we can run_commit_ordered() in same sequence
as we did run_prepare_ordered(). as we did run_prepare_ordered().
*/ */
thd->clear_wakeup_ready();
entry.thd= thd; entry.thd= thd;
entry.group_commit_ready= false; commit_entry *previous_queue= commit_ordered_queue;
TC_group_commit_entry *previous_queue= group_commit_queue;
entry.next= previous_queue; entry.next= previous_queue;
group_commit_queue= &entry; commit_ordered_queue= &entry;
is_group_commit_leader= (previous_queue == NULL); is_group_commit_leader= (previous_queue == NULL);
} }
pthread_mutex_unlock(&LOCK_prepare_ordered); pthread_mutex_unlock(&LOCK_prepare_ordered);
} }
if (xid) if (xid)
cookie= log_xid(thd, xid); cookie= log_one_transaction(xid);
else else
cookie= 0; cookie= 0;
...@@ -5859,24 +5828,32 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5859,24 +5828,32 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all,
{ {
/* The first in queue starts the ball rolling. */ /* The first in queue starts the ball rolling. */
pthread_mutex_lock(&LOCK_prepare_ordered); pthread_mutex_lock(&LOCK_prepare_ordered);
while (group_commit_queue_busy) while (commit_ordered_queue_busy)
pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); pthread_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
TC_group_commit_entry *queue= group_commit_queue; commit_entry *queue= commit_ordered_queue;
group_commit_queue= NULL; commit_ordered_queue= NULL;
/* /*
Mark the queue busy while we bounce it from one thread to the Mark the queue busy while we bounce it from one thread to the
next. next.
*/ */
group_commit_queue_busy= TRUE; commit_ordered_queue_busy= true;
pthread_mutex_unlock(&LOCK_prepare_ordered); pthread_mutex_unlock(&LOCK_prepare_ordered);
queue= reverse_queue(queue); /* Reverse the queue list so we get correct order. */
DBUG_ASSERT(queue == &entry && queue->thd == thd); commit_entry *prev= NULL;
while (queue)
{
commit_entry *next= queue->next;
queue->next= prev;
prev= queue;
queue= next;
}
DBUG_ASSERT(prev == &entry && prev->thd == thd);
} }
else else
{ {
/* Not first in queue; just wait until previous thread wakes us up. */ /* Not first in queue; just wait until previous thread wakes us up. */
group_commit_wait_for_wakeup(&entry); thd->wait_for_wakeup_ready();
} }
} }
...@@ -5890,15 +5867,15 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5890,15 +5867,15 @@ int TC_LOG_unordered::log_and_order(THD *thd, my_xid xid, bool all,
if (need_prepare_ordered) if (need_prepare_ordered)
{ {
TC_group_commit_entry *next= entry.next; commit_entry *next= entry.next;
if (next) if (next)
{ {
group_commit_wakeup_other(next); next->thd->signal_wakeup_ready();
} }
else else
{ {
pthread_mutex_lock(&LOCK_prepare_ordered); pthread_mutex_lock(&LOCK_prepare_ordered);
group_commit_queue_busy= FALSE; commit_ordered_queue_busy= false;
pthread_cond_signal(&COND_queue_busy); pthread_cond_signal(&COND_queue_busy);
pthread_mutex_unlock(&LOCK_prepare_ordered); pthread_mutex_unlock(&LOCK_prepare_ordered);
} }
...@@ -5940,9 +5917,9 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -5940,9 +5917,9 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all,
struct TC_group_commit_entry entry; struct TC_group_commit_entry entry;
bool is_group_commit_leader; bool is_group_commit_leader;
thd->clear_wakeup_ready();
entry.thd= thd; entry.thd= thd;
entry.all= all; entry.all= all;
entry.group_commit_ready= false;
entry.xid_error= 0; entry.xid_error= 0;
pthread_mutex_lock(&LOCK_prepare_ordered); pthread_mutex_lock(&LOCK_prepare_ordered);
...@@ -6019,7 +5996,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -6019,7 +5996,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all,
*/ */
TC_group_commit_entry *next= current->next; TC_group_commit_entry *next= current->next;
if (current != &entry) // Don't wake up ourself if (current != &entry) // Don't wake up ourself
group_commit_wakeup_other(current); current->thd->signal_wakeup_ready();
current= next; current= next;
} while (current != NULL); } while (current != NULL);
DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered"); DEBUG_SYNC(thd, "commit_after_group_run_commit_ordered");
...@@ -6029,7 +6006,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all, ...@@ -6029,7 +6006,7 @@ int TC_LOG_group_commit::log_and_order(THD *thd, my_xid xid, bool all,
else else
{ {
/* If not leader, just wait until leader wakes us up. */ /* If not leader, just wait until leader wakes us up. */
group_commit_wait_for_wakeup(&entry); thd->wait_for_wakeup_ready();
} }
/* /*
...@@ -6181,6 +6158,7 @@ int TC_LOG_MMAP::open(const char *opt_name) ...@@ -6181,6 +6158,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_pool, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_active, 0); pthread_cond_init(&COND_active, 0);
pthread_cond_init(&COND_pool, 0); pthread_cond_init(&COND_pool, 0);
pthread_cond_init(&COND_queue_busy, 0);
inited=6; inited=6;
...@@ -6188,6 +6166,8 @@ int TC_LOG_MMAP::open(const char *opt_name) ...@@ -6188,6 +6166,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
active=pages; active=pages;
pool=pages+1; pool=pages+1;
pool_last=pages+npages-1; pool_last=pages+npages-1;
commit_ordered_queue= NULL;
commit_ordered_queue_busy= false;
return 0; return 0;
...@@ -6293,7 +6273,7 @@ int TC_LOG_MMAP::overflow() ...@@ -6293,7 +6273,7 @@ int TC_LOG_MMAP::overflow()
to the position in memory where xid was logged to. to the position in memory where xid was logged to.
*/ */
int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid) int TC_LOG_MMAP::log_one_transaction(my_xid xid)
{ {
int err; int err;
PAGE *p; PAGE *p;
...@@ -6462,6 +6442,8 @@ void TC_LOG_MMAP::close() ...@@ -6462,6 +6442,8 @@ void TC_LOG_MMAP::close()
pthread_mutex_destroy(&LOCK_active); pthread_mutex_destroy(&LOCK_active);
pthread_mutex_destroy(&LOCK_pool); pthread_mutex_destroy(&LOCK_pool);
pthread_cond_destroy(&COND_pool); pthread_cond_destroy(&COND_pool);
pthread_cond_destroy(&COND_active);
pthread_cond_destroy(&COND_queue_busy);
case 5: case 5:
data[0]='A'; // garble the first (signature) byte, in case my_delete fails data[0]='A'; // garble the first (signature) byte, in case my_delete fails
case 4: case 4:
......
...@@ -91,11 +91,6 @@ class TC_LOG_queued: public TC_LOG ...@@ -91,11 +91,6 @@ class TC_LOG_queued: public TC_LOG
THD *thd; THD *thd;
/* This is the `all' parameter for ha_commit_trans() etc. */ /* This is the `all' parameter for ha_commit_trans() etc. */
bool all; bool all;
/*
Flag set true when it is time for this thread to wake up after group
commit. Used with THD::LOCK_commit_ordered and THD::COND_commit_ordered.
*/
bool group_commit_ready;
/* /*
Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and Set by TC_LOG_group_commit::group_log_xid(), to return per-thd error and
cookie. cookie.
...@@ -105,9 +100,6 @@ class TC_LOG_queued: public TC_LOG ...@@ -105,9 +100,6 @@ class TC_LOG_queued: public TC_LOG
TC_group_commit_entry * reverse_queue(TC_group_commit_entry *queue); TC_group_commit_entry * reverse_queue(TC_group_commit_entry *queue);
void group_commit_wait_for_wakeup(TC_group_commit_entry *entry);
void group_commit_wakeup_other(TC_group_commit_entry *other);
/* /*
This is a queue of threads waiting for being allowed to commit. This is a queue of threads waiting for being allowed to commit.
Access to the queue must be protected by LOCK_prepare_ordered. Access to the queue must be protected by LOCK_prepare_ordered.
...@@ -115,36 +107,6 @@ class TC_LOG_queued: public TC_LOG ...@@ -115,36 +107,6 @@ class TC_LOG_queued: public TC_LOG
TC_group_commit_entry *group_commit_queue; TC_group_commit_entry *group_commit_queue;
}; };
class TC_LOG_unordered: public TC_LOG_queued
{
public:
TC_LOG_unordered();
~TC_LOG_unordered();
int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered);
protected:
virtual int log_xid(THD *thd, my_xid xid)=0;
private:
/*
This flag and condition is used to reserve the queue while threads in it
each run the commit_ordered() methods one after the other. Only once the
last commit_ordered() in the queue is done can we start on a new queue
run.
Since we start this process in the first thread in the queue and finish in
the last (and possibly different) thread, we need a condition variable for
this (we cannot unlock a mutex in a different thread than the one who
locked it).
The condition is used together with the LOCK_prepare_ordered mutex.
*/
my_bool group_commit_queue_busy;
pthread_cond_t COND_queue_busy;
};
class TC_LOG_group_commit: public TC_LOG_queued class TC_LOG_group_commit: public TC_LOG_queued
{ {
public: public:
...@@ -206,18 +168,28 @@ class TC_LOG_group_commit: public TC_LOG_queued ...@@ -206,18 +168,28 @@ class TC_LOG_group_commit: public TC_LOG_queued
pthread_mutex_t LOCK_group_commit; pthread_mutex_t LOCK_group_commit;
}; };
class TC_LOG_DUMMY: public TC_LOG_unordered // use it to disable the logging class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
{ {
public: public:
TC_LOG_DUMMY() {} TC_LOG_DUMMY() {}
int open(const char *opt_name) { return 0; } int open(const char *opt_name) { return 0; }
void close() { } void close() { }
int log_xid(THD *thd, my_xid xid) { return 1; } /*
TC_LOG_DUMMY is only used when there are <= 1 XA-capable engines, and we
only use internal XA during commit when >= 2 XA-capable engines
participate.
*/
int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered)
{
DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
return 1;
}
void unlog(ulong cookie, my_xid xid) { } void unlog(ulong cookie, my_xid xid) { }
}; };
#ifdef HAVE_MMAP #ifdef HAVE_MMAP
class TC_LOG_MMAP: public TC_LOG_unordered class TC_LOG_MMAP: public TC_LOG
{ {
public: // only to keep Sun Forte on sol9x86 happy public: // only to keep Sun Forte on sol9x86 happy
typedef enum { typedef enum {
...@@ -238,6 +210,13 @@ class TC_LOG_MMAP: public TC_LOG_unordered ...@@ -238,6 +210,13 @@ class TC_LOG_MMAP: public TC_LOG_unordered
pthread_cond_t cond; // to wait for a sync pthread_cond_t cond; // to wait for a sync
} PAGE; } PAGE;
/* List of THDs for which to invoke commit_ordered(), in order. */
struct commit_entry
{
struct commit_entry *next;
THD *thd;
};
char logname[FN_REFLEN]; char logname[FN_REFLEN];
File fd; File fd;
my_off_t file_length; my_off_t file_length;
...@@ -252,16 +231,38 @@ class TC_LOG_MMAP: public TC_LOG_unordered ...@@ -252,16 +231,38 @@ class TC_LOG_MMAP: public TC_LOG_unordered
*/ */
pthread_mutex_t LOCK_active, LOCK_pool, LOCK_sync; pthread_mutex_t LOCK_active, LOCK_pool, LOCK_sync;
pthread_cond_t COND_pool, COND_active; pthread_cond_t COND_pool, COND_active;
/*
Queue of threads that need to call commit_ordered().
Access to this queue must be protected by LOCK_prepare_ordered.
*/
commit_entry *commit_ordered_queue;
/*
This flag and condition is used to reserve the queue while threads in it
each run the commit_ordered() methods one after the other. Only once the
last commit_ordered() in the queue is done can we start on a new queue
run.
Since we start this process in the first thread in the queue and finish in
the last (and possibly different) thread, we need a condition variable for
this (we cannot unlock a mutex in a different thread than the one who
locked it).
The condition is used together with the LOCK_prepare_ordered mutex.
*/
my_bool commit_ordered_queue_busy;
pthread_cond_t COND_queue_busy;
public: public:
TC_LOG_MMAP(): inited(0) {} TC_LOG_MMAP(): inited(0) {}
int open(const char *opt_name); int open(const char *opt_name);
void close(); void close();
int log_xid(THD *thd, my_xid xid); int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered);
void unlog(ulong cookie, my_xid xid); void unlog(ulong cookie, my_xid xid);
int recover(); int recover();
private: private:
int log_one_transaction(my_xid xid);
void get_active_from_pool(); void get_active_from_pool();
int sync(); int sync();
int overflow(); int overflow();
......
...@@ -704,8 +704,8 @@ THD::THD() ...@@ -704,8 +704,8 @@ THD::THD()
active_vio = 0; active_vio = 0;
#endif #endif
pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_wakeup_ready, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_commit_ordered, 0); pthread_cond_init(&COND_wakeup_ready, 0);
/* Variables with default values */ /* Variables with default values */
proc_info="login"; proc_info="login";
...@@ -1037,8 +1037,8 @@ THD::~THD() ...@@ -1037,8 +1037,8 @@ THD::~THD()
free_root(&transaction.mem_root,MYF(0)); free_root(&transaction.mem_root,MYF(0));
#endif #endif
mysys_var=0; // Safety (shouldn't be needed) mysys_var=0; // Safety (shouldn't be needed)
pthread_cond_destroy(&COND_commit_ordered); pthread_cond_destroy(&COND_wakeup_ready);
pthread_mutex_destroy(&LOCK_commit_ordered); pthread_mutex_destroy(&LOCK_wakeup_ready);
pthread_mutex_destroy(&LOCK_thd_data); pthread_mutex_destroy(&LOCK_thd_data);
#ifndef DBUG_OFF #ifndef DBUG_OFF
dbug_sentry= THD_SENTRY_GONE; dbug_sentry= THD_SENTRY_GONE;
...@@ -4009,6 +4009,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, ...@@ -4009,6 +4009,25 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
void
THD::wait_for_wakeup_ready()
{
pthread_mutex_lock(&LOCK_wakeup_ready);
while (!wakeup_ready)
pthread_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready);
pthread_mutex_unlock(&LOCK_wakeup_ready);
}
void
THD::signal_wakeup_ready()
{
pthread_mutex_lock(&LOCK_wakeup_ready);
wakeup_ready= true;
pthread_cond_signal(&COND_wakeup_ready);
pthread_mutex_unlock(&LOCK_wakeup_ready);
}
bool Discrete_intervals_list::append(ulonglong start, ulonglong val, bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr) ulonglong incr)
{ {
......
...@@ -1447,10 +1447,6 @@ class THD :public Statement, ...@@ -1447,10 +1447,6 @@ class THD :public Statement,
/* container for handler's private per-connection data */ /* container for handler's private per-connection data */
Ha_data ha_data[MAX_HA]; Ha_data ha_data[MAX_HA];
/* Mutex and condition for waking up threads after group commit. */
pthread_mutex_t LOCK_commit_ordered;
pthread_cond_t COND_commit_ordered;
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
int binlog_setup_trx_data(); int binlog_setup_trx_data();
...@@ -2380,6 +2376,14 @@ class THD :public Statement, ...@@ -2380,6 +2376,14 @@ class THD :public Statement,
LEX_STRING get_invoker_user() { return invoker_user; } LEX_STRING get_invoker_user() { return invoker_user; }
LEX_STRING get_invoker_host() { return invoker_host; } LEX_STRING get_invoker_host() { return invoker_host; }
bool has_invoker() { return invoker_user.length > 0; } bool has_invoker() { return invoker_user.length > 0; }
void clear_wakeup_ready() { wakeup_ready= false; }
/*
Sleep waiting for others to wake us up with signal_wakeup_ready().
Must call clear_wakeup_ready() before waiting.
*/
void wait_for_wakeup_ready();
/* Wake this thread up from wait_for_wakeup_ready(). */
void signal_wakeup_ready();
private: private:
/** The current internal error handler for this thread, or NULL. */ /** The current internal error handler for this thread, or NULL. */
Internal_error_handler *m_internal_handler; Internal_error_handler *m_internal_handler;
...@@ -2418,6 +2422,16 @@ class THD :public Statement, ...@@ -2418,6 +2422,16 @@ class THD :public Statement,
*/ */
LEX_STRING invoker_user; LEX_STRING invoker_user;
LEX_STRING invoker_host; LEX_STRING invoker_host;
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
Currently used to wait for group commit to complete, can also be used for
other purposes.
*/
bool wakeup_ready;
pthread_mutex_t LOCK_wakeup_ready;
pthread_cond_t COND_wakeup_ready;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment