Commit a00824b5 authored by unknown's avatar unknown

Cset exclude: marko@hundin.mysql.fi|ChangeSet|20040525171209|56870


BitKeeper/deleted/.del-srv0que.c~d1feebb77b5a9b96:
  Exclude
BitKeeper/deleted/.del-srv0que.h~f12ecb4b5afe203e:
  Exclude
innobase/include/que0que.h:
  Exclude
innobase/include/trx0roll.h:
  Exclude
innobase/include/trx0trx.h:
  Exclude
innobase/include/usr0sess.h:
  Exclude
innobase/que/que0que.c:
  Exclude
innobase/srv/Makefile.am:
  Exclude
innobase/include/Makefile.am:
  Exclude
innobase/srv/srv0srv.c:
  Exclude
innobase/trx/trx0purge.c:
  Exclude
innobase/trx/trx0roll.c:
  Exclude
innobase/trx/trx0trx.c:
  Exclude
innobase/usr/usr0sess.c:
  Exclude
parent 6a99971f
...@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \ ...@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \
row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic \ row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic \
row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic \ row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic \
row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h \ row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h \
row0vers.ic srv0srv.h srv0srv.ic srv0start.h \ row0vers.ic srv0que.h srv0srv.h srv0srv.ic srv0start.h \
sync0arr.h sync0arr.ic sync0rw.h \ sync0arr.h sync0arr.ic sync0rw.h \
sync0rw.ic sync0sync.h sync0sync.ic sync0types.h \ sync0rw.ic sync0sync.h sync0sync.ic sync0types.h \
thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h \ thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h \
......
...@@ -152,6 +152,17 @@ que_run_threads( ...@@ -152,6 +152,17 @@ que_run_threads(
/*============*/ /*============*/
que_thr_t* thr); /* in: query thread which is run initially */ que_thr_t* thr); /* in: query thread which is run initially */
/************************************************************************** /**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle(
/*==================*/
trx_t* trx, /* in: trx */
que_t* fork); /* in: query graph which was run before signal
handling started, NULL not allowed */
/**************************************************************************
Handles an SQL error noticed during query thread execution. At the moment, Handles an SQL error noticed during query thread execution. At the moment,
does nothing! */ does nothing! */
...@@ -170,15 +181,18 @@ a single worker thread to execute it. This function should be used to end ...@@ -170,15 +181,18 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure the wait state of a query thread waiting for a lock or a stored procedure
completion. */ completion. */
que_thr_t* void
que_thr_end_wait( que_thr_end_wait(
/*=============*/ /*=============*/
/* out: next query thread to run; que_thr_t* thr, /* in: query thread in the
NULL if none */
que_thr_t* thr); /* in: query thread in the
QUE_THR_LOCK_WAIT, QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */ QUE_THR_SIG_REPLY_WAIT state */
que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/************************************************************************** /**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */ Same as que_thr_end_wait, but no parameter next_thr available. */
......
...@@ -91,12 +91,16 @@ trx_undo_rec_release( ...@@ -91,12 +91,16 @@ trx_undo_rec_release(
/************************************************************************* /*************************************************************************
Starts a rollback operation. */ Starts a rollback operation. */
que_thr_t* void
trx_rollback( trx_rollback(
/*=========*/ /*=========*/
/* out: next query thread to run */
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
trx_sig_t* sig); /* in: signal starting the rollback */ trx_sig_t* sig, /* in: signal starting the rollback */
que_thr_t** next_thr);/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/*********************************************************************** /***********************************************************************
Rollback or clean up transactions which have no user session. If the Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert transaction already was committed, then we clean up a possible insert
...@@ -108,12 +112,17 @@ trx_rollback_or_clean_all_without_sess(void); ...@@ -108,12 +112,17 @@ trx_rollback_or_clean_all_without_sess(void);
/******************************************************************** /********************************************************************
Finishes a transaction rollback. */ Finishes a transaction rollback. */
que_thr_t* void
trx_finish_rollback_off_kernel( trx_finish_rollback_off_kernel(
/*===========================*/ /*===========================*/
/* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */ que_t* graph, /* in: undo graph which can now be freed */
trx_t* trx); /* in: transaction */ trx_t* trx, /* in: transaction */
que_thr_t** next_thr);/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
/******************************************************************** /********************************************************************
Builds an undo 'query' graph for a transaction. The actual rollback is Builds an undo 'query' graph for a transaction. The actual rollback is
performed by executing this query graph like a query subprocedure call. performed by executing this query graph like a query subprocedure call.
......
...@@ -194,10 +194,9 @@ trx_end_lock_wait( ...@@ -194,10 +194,9 @@ trx_end_lock_wait(
/******************************************************************** /********************************************************************
Sends a signal to a trx object. */ Sends a signal to a trx object. */
que_thr_t* ibool
trx_sig_send( trx_sig_send(
/*=========*/ /*=========*/
/* out: next query thread to run */
/* out: TRUE if the signal was /* out: TRUE if the signal was
successfully delivered */ successfully delivered */
trx_t* trx, /* in: trx handle */ trx_t* trx, /* in: trx handle */
...@@ -207,17 +206,27 @@ trx_sig_send( ...@@ -207,17 +206,27 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */ TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept); /* in: possible rollback savepoint, or trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
/******************************************************************** /********************************************************************
Send the reply message when a signal in the queue of the trx has Send the reply message when a signal in the queue of the trx has
been handled. */ been handled. */
que_thr_t* void
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
/* out: next query thread to run */ trx_sig_t* sig, /* in: signal */
trx_sig_t* sig); /* in: signal */ que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/******************************************************************** /********************************************************************
Removes the signal object from a trx signal queue. */ Removes the signal object from a trx signal queue. */
...@@ -229,11 +238,15 @@ trx_sig_remove( ...@@ -229,11 +238,15 @@ trx_sig_remove(
/******************************************************************** /********************************************************************
Starts handling of a trx signal. */ Starts handling of a trx signal. */
que_thr_t* void
trx_sig_start_handle( trx_sig_start_handle(
/*=================*/ /*=================*/
/* out: next query thread to run, or NULL */ trx_t* trx, /* in: trx handle */
trx_t* trx); /* in: trx handle */ que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/******************************************************************** /********************************************************************
Ends signal handling. If the session is in the error state, and Ends signal handling. If the session is in the error state, and
trx->graph_before_signal_handling != NULL, returns control to the error trx->graph_before_signal_handling != NULL, returns control to the error
......
...@@ -38,6 +38,7 @@ sess_try_close( ...@@ -38,6 +38,7 @@ sess_try_close(
/* The session handle. All fields are protected by the kernel mutex */ /* The session handle. All fields are protected by the kernel mutex */
struct sess_struct{ struct sess_struct{
ulint state; /* state of the session */
trx_t* trx; /* transaction object permanently trx_t* trx; /* transaction object permanently
assigned for the session: the assigned for the session: the
transaction instance designated by the transaction instance designated by the
...@@ -48,6 +49,11 @@ struct sess_struct{ ...@@ -48,6 +49,11 @@ struct sess_struct{
session */ session */
}; };
/* Session states */
#define SESS_ACTIVE 1
#define SESS_ERROR 2 /* session contains an error message
which has not yet been communicated
to the client */
#ifndef UNIV_NONINL #ifndef UNIV_NONINL
#include "usr0sess.ic" #include "usr0sess.ic"
#endif #endif
......
...@@ -12,6 +12,7 @@ Created 5/27/1996 Heikki Tuuri ...@@ -12,6 +12,7 @@ Created 5/27/1996 Heikki Tuuri
#include "que0que.ic" #include "que0que.ic"
#endif #endif
#include "srv0que.h"
#include "usr0sess.h" #include "usr0sess.h"
#include "trx0trx.h" #include "trx0trx.h"
#include "trx0roll.h" #include "trx0roll.h"
...@@ -174,15 +175,19 @@ a single worker thread to execute it. This function should be used to end ...@@ -174,15 +175,19 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure the wait state of a query thread waiting for a lock or a stored procedure
completion. */ completion. */
que_thr_t* void
que_thr_end_wait( que_thr_end_wait(
/*=============*/ /*=============*/
/* out: next query thread to run; que_thr_t* thr, /* in: query thread in the
NULL if none */
que_thr_t* thr) /* in: query thread in the
QUE_THR_LOCK_WAIT, QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */ QUE_THR_SIG_REPLY_WAIT state */
que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if NULL is passed
as the parameter, it is ignored */
{ {
ibool was_active; ibool was_active;
...@@ -201,8 +206,17 @@ que_thr_end_wait( ...@@ -201,8 +206,17 @@ que_thr_end_wait(
que_thr_move_to_run_state(thr); que_thr_move_to_run_state(thr);
return(was_active ? NULL : thr); if (was_active) {
}
return;
}
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
} else {
srv_que_task_enqueue_low(thr);
}
}
/************************************************************************** /**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */ Same as que_thr_end_wait, but no parameter next_thr available. */
...@@ -239,6 +253,8 @@ que_thr_end_wait_no_next_thr( ...@@ -239,6 +253,8 @@ que_thr_end_wait_no_next_thr(
for the lock to be released: */ for the lock to be released: */
srv_release_mysql_thread_if_suspended(thr); srv_release_mysql_thread_if_suspended(thr);
/* srv_que_task_enqueue_low(thr); */
} }
/************************************************************************** /**************************************************************************
...@@ -339,6 +355,48 @@ que_fork_start_command( ...@@ -339,6 +355,48 @@ que_fork_start_command(
return(NULL); return(NULL);
} }
/**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle(
/*==================*/
trx_t* trx __attribute__((unused)), /* in: trx */
que_t* fork) /* in: query graph which was run before signal
handling started, NULL not allowed */
{
que_thr_t* thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(trx->sess->state == SESS_ERROR);
ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
thr = UT_LIST_GET_FIRST(fork->thrs);
while (thr != NULL) {
ut_ad(!thr->is_active);
ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
ut_ad(thr->state != QUE_THR_LOCK_WAIT);
thr->run_node = thr;
thr->prev_node = thr->child;
thr->state = QUE_THR_COMPLETED;
thr = UT_LIST_GET_NEXT(thrs, thr);
}
thr = UT_LIST_GET_FIRST(fork->thrs);
que_thr_move_to_run_state(thr);
srv_que_task_enqueue_low(thr);
}
/******************************************************************** /********************************************************************
Tests if all the query threads in the same fork have a given state. */ Tests if all the query threads in the same fork have a given state. */
UNIV_INLINE UNIV_INLINE
...@@ -707,18 +765,22 @@ this function may only be called from inside que_run_threads or ...@@ -707,18 +765,22 @@ this function may only be called from inside que_run_threads or
que_thr_check_if_switch! These restrictions exist to make the rollback code que_thr_check_if_switch! These restrictions exist to make the rollback code
easier to maintain. */ easier to maintain. */
static static
que_thr_t* void
que_thr_dec_refer_count( que_thr_dec_refer_count(
/*====================*/ /*====================*/
/* out: next query thread to run */ que_thr_t* thr, /* in: query thread */
que_thr_t* thr) /* in: query thread */ que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
que_fork_t* fork; que_fork_t* fork;
trx_t* trx; trx_t* trx;
sess_t* sess; sess_t* sess;
ulint fork_type; ulint fork_type;
que_thr_t* next_thr = NULL; ibool stopped;
fork = thr->common.parent; fork = thr->common.parent;
trx = thr->graph->trx; trx = thr->graph->trx;
sess = trx->sess; sess = trx->sess;
...@@ -729,7 +791,9 @@ que_thr_dec_refer_count( ...@@ -729,7 +791,9 @@ que_thr_dec_refer_count(
if (thr->state == QUE_THR_RUNNING) { if (thr->state == QUE_THR_RUNNING) {
if (!que_thr_stop(thr)) { stopped = que_thr_stop(thr);
if (!stopped) {
/* The reason for the thr suspension or wait was /* The reason for the thr suspension or wait was
already canceled before we came here: continue already canceled before we came here: continue
running the thread */ running the thread */
...@@ -737,9 +801,15 @@ que_thr_dec_refer_count( ...@@ -737,9 +801,15 @@ que_thr_dec_refer_count(
/* fputs("!!!!!!!! Wait already ended: continue thr\n", /* fputs("!!!!!!!! Wait already ended: continue thr\n",
stderr); */ stderr); */
if (next_thr && *next_thr == NULL) {
*next_thr = thr;
} else {
srv_que_task_enqueue_low(thr);
}
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return(thr); return;
} }
} }
...@@ -755,7 +825,7 @@ que_thr_dec_refer_count( ...@@ -755,7 +825,7 @@ que_thr_dec_refer_count(
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return(next_thr); return;
} }
fork_type = fork->fork_type; fork_type = fork->fork_type;
...@@ -771,7 +841,7 @@ que_thr_dec_refer_count( ...@@ -771,7 +841,7 @@ que_thr_dec_refer_count(
ut_ad(UT_LIST_GET_LEN(trx->signals) > 0); ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
ut_ad(trx->handling_signals == TRUE); ut_ad(trx->handling_signals == TRUE);
next_thr = trx_finish_rollback_off_kernel(fork, trx); trx_finish_rollback_off_kernel(fork, trx, next_thr);
} else if (fork_type == QUE_FORK_PURGE) { } else if (fork_type == QUE_FORK_PURGE) {
...@@ -793,7 +863,7 @@ que_thr_dec_refer_count( ...@@ -793,7 +863,7 @@ que_thr_dec_refer_count(
zero, then we start processing a signal; from it we may get zero, then we start processing a signal; from it we may get
a new query thread to run */ a new query thread to run */
next_thr = trx_sig_start_handle(trx); trx_sig_start_handle(trx, next_thr);
} }
if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) { if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
...@@ -802,8 +872,6 @@ que_thr_dec_refer_count( ...@@ -802,8 +872,6 @@ que_thr_dec_refer_count(
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
return(next_thr);
} }
/************************************************************************** /**************************************************************************
...@@ -1175,7 +1243,6 @@ que_run_threads( ...@@ -1175,7 +1243,6 @@ que_run_threads(
/*-------------------------*/ /*-------------------------*/
next_thr = que_thr_step(thr); next_thr = que_thr_step(thr);
/*-------------------------*/ /*-------------------------*/
ut_a(next_thr == thr || next_thr == NULL);
/* Test the effect on performance of adding extra mutex /* Test the effect on performance of adding extra mutex
reservations */ reservations */
...@@ -1192,7 +1259,7 @@ que_run_threads( ...@@ -1192,7 +1259,7 @@ que_run_threads(
loop_count++; loop_count++;
if (next_thr != thr) { if (next_thr != thr) {
next_thr = que_thr_dec_refer_count(thr); que_thr_dec_refer_count(thr, &next_thr);
if (next_thr == NULL) { if (next_thr == NULL) {
......
...@@ -19,6 +19,6 @@ include ../include/Makefile.i ...@@ -19,6 +19,6 @@ include ../include/Makefile.i
noinst_LIBRARIES = libsrv.a noinst_LIBRARIES = libsrv.a
libsrv_a_SOURCES = srv0srv.c srv0start.c libsrv_a_SOURCES = srv0srv.c srv0que.c srv0start.c
EXTRA_PROGRAMS = EXTRA_PROGRAMS =
...@@ -34,6 +34,7 @@ Created 10/8/1995 Heikki Tuuri ...@@ -34,6 +34,7 @@ Created 10/8/1995 Heikki Tuuri
#include "sync0sync.h" #include "sync0sync.h"
#include "thr0loc.h" #include "thr0loc.h"
#include "que0que.h" #include "que0que.h"
#include "srv0que.h"
#include "log0recv.h" #include "log0recv.h"
#include "pars0pars.h" #include "pars0pars.h"
#include "usr0sess.h" #include "usr0sess.h"
......
...@@ -23,6 +23,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -23,6 +23,7 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h" #include "row0purge.h"
#include "row0upd.h" #include "row0upd.h"
#include "trx0rec.h" #include "trx0rec.h"
#include "srv0que.h"
#include "os0thread.h" #include "os0thread.h"
/* The global data structure coordinating a purge */ /* The global data structure coordinating a purge */
...@@ -1059,6 +1060,8 @@ trx_purge(void) ...@@ -1059,6 +1060,8 @@ trx_purge(void)
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
/* srv_que_task_enqueue(thr2); */
if (srv_print_thread_releases) { if (srv_print_thread_releases) {
fputs("Starting purge\n", stderr); fputs("Starting purge\n", stderr);
......
...@@ -20,6 +20,7 @@ Created 3/26/1996 Heikki Tuuri ...@@ -20,6 +20,7 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0rec.h" #include "trx0rec.h"
#include "que0que.h" #include "que0que.h"
#include "usr0sess.h" #include "usr0sess.h"
#include "srv0que.h"
#include "srv0start.h" #include "srv0start.h"
#include "row0undo.h" #include "row0undo.h"
#include "row0mysql.h" #include "row0mysql.h"
...@@ -931,15 +932,21 @@ trx_undo_rec_release( ...@@ -931,15 +932,21 @@ trx_undo_rec_release(
/************************************************************************* /*************************************************************************
Starts a rollback operation. */ Starts a rollback operation. */
que_thr_t* void
trx_rollback( trx_rollback(
/*=========*/ /*=========*/
/* out: next query thread to run */
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
trx_sig_t* sig) /* in: signal starting the rollback */ trx_sig_t* sig, /* in: signal starting the rollback */
que_thr_t** next_thr)/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the passed value is
NULL, the parameter is ignored */
{ {
que_t* roll_graph; que_t* roll_graph;
que_thr_t* thr; que_thr_t* thr;
/* que_thr_t* thr2; */
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -981,7 +988,18 @@ trx_rollback( ...@@ -981,7 +988,18 @@ trx_rollback(
thr = que_fork_start_command(roll_graph); thr = que_fork_start_command(roll_graph);
ut_ad(thr); ut_ad(thr);
return(thr);
/* thr2 = que_fork_start_command(roll_graph);
ut_ad(thr2); */
if (next_thr && (*next_thr == NULL)) {
*next_thr = thr;
/* srv_que_task_enqueue_low(thr2); */
} else {
srv_que_task_enqueue_low(thr);
/* srv_que_task_enqueue_low(thr2); */
}
} }
/******************************************************************** /********************************************************************
...@@ -1053,14 +1071,17 @@ trx_finish_error_processing( ...@@ -1053,14 +1071,17 @@ trx_finish_error_processing(
/************************************************************************* /*************************************************************************
Finishes a partial rollback operation. */ Finishes a partial rollback operation. */
static static
que_thr_t* void
trx_finish_partial_rollback_off_kernel( trx_finish_partial_rollback_off_kernel(
/*===================================*/ /*===================================*/
/* out: next query thread to run */ trx_t* trx, /* in: transaction */
trx_t* trx) /* in: transaction */ que_thr_t** next_thr)/* in/out: next query thread to run;
if the value which is passed in is a pointer
to a NULL pointer, then the calling function
can start running a new query thread; if this
parameter is NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1071,26 +1092,29 @@ trx_finish_partial_rollback_off_kernel( ...@@ -1071,26 +1092,29 @@ trx_finish_partial_rollback_off_kernel(
/* Remove the signal from the signal queue and send reply message /* Remove the signal from the signal queue and send reply message
to it */ to it */
next_thr = trx_sig_reply(sig); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
return(next_thr);
} }
/******************************************************************** /********************************************************************
Finishes a transaction rollback. */ Finishes a transaction rollback. */
que_thr_t* void
trx_finish_rollback_off_kernel( trx_finish_rollback_off_kernel(
/*===========================*/ /*===========================*/
/* out: next query thread to run */
que_t* graph, /* in: undo graph which can now be freed */ que_t* graph, /* in: undo graph which can now be freed */
trx_t* trx) /* in: transaction */ trx_t* trx, /* in: transaction */
que_thr_t** next_thr)/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_sig_t* next_sig; trx_sig_t* next_sig;
que_thr_t* next_thr;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1105,13 +1129,15 @@ trx_finish_rollback_off_kernel( ...@@ -1105,13 +1129,15 @@ trx_finish_rollback_off_kernel(
if (sig->type == TRX_SIG_ROLLBACK_TO_SAVEPT) { if (sig->type == TRX_SIG_ROLLBACK_TO_SAVEPT) {
return(trx_finish_partial_rollback_off_kernel(trx)); trx_finish_partial_rollback_off_kernel(trx, next_thr);
return;
} else if (sig->type == TRX_SIG_ERROR_OCCURRED) { } else if (sig->type == TRX_SIG_ERROR_OCCURRED) {
trx_finish_error_processing(trx); trx_finish_error_processing(trx);
return(NULL); return;
} }
if (lock_print_waits) { if (lock_print_waits) {
...@@ -1125,23 +1151,19 @@ trx_finish_rollback_off_kernel( ...@@ -1125,23 +1151,19 @@ trx_finish_rollback_off_kernel(
send reply messages to them */ send reply messages to them */
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
next_thr = NULL;
while (sig != NULL) { while (sig != NULL) {
next_sig = UT_LIST_GET_NEXT(signals, sig); next_sig = UT_LIST_GET_NEXT(signals, sig);
if (sig->type == TRX_SIG_TOTAL_ROLLBACK) { if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
ut_a(next_thr == NULL); trx_sig_reply(sig, next_thr);
next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
sig = next_sig; sig = next_sig;
} }
return(next_thr);
} }
/************************************************************************* /*************************************************************************
...@@ -1174,6 +1196,7 @@ trx_rollback_step( ...@@ -1174,6 +1196,7 @@ trx_rollback_step(
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
roll_node_t* node; roll_node_t* node;
ibool success;
ulint sig_no; ulint sig_no;
trx_savept_t* savept; trx_savept_t* savept;
...@@ -1200,13 +1223,19 @@ trx_rollback_step( ...@@ -1200,13 +1223,19 @@ trx_rollback_step(
/* Send a rollback signal to the transaction */ /* Send a rollback signal to the transaction */
trx_sig_send(thr_get_trx(thr), sig_no, TRX_SIG_SELF, success = trx_sig_send(thr_get_trx(thr),
thr, savept); sig_no, TRX_SIG_SELF,
thr, savept, NULL);
thr->state = QUE_THR_SIG_REPLY_WAIT; thr->state = QUE_THR_SIG_REPLY_WAIT;
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
if (!success) {
/* Error in delivering the rollback signal */
que_thr_handle_error(thr, DB_ERROR, NULL, 0);
}
return(NULL); return(NULL);
} }
......
...@@ -895,15 +895,18 @@ trx_assign_read_view( ...@@ -895,15 +895,18 @@ trx_assign_read_view(
/******************************************************************** /********************************************************************
Commits a transaction. NOTE that the kernel mutex is temporarily released. */ Commits a transaction. NOTE that the kernel mutex is temporarily released. */
static static
que_thr_t* void
trx_handle_commit_sig_off_kernel( trx_handle_commit_sig_off_kernel(
/*=============================*/ /*=============================*/
/* out: next query thread to run */ trx_t* trx, /* in: transaction */
trx_t* trx) /* in: transaction */ que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_sig_t* next_sig; trx_sig_t* next_sig;
que_thr_t* next_thr = NULL;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -925,8 +928,7 @@ trx_handle_commit_sig_off_kernel( ...@@ -925,8 +928,7 @@ trx_handle_commit_sig_off_kernel(
if (sig->type == TRX_SIG_COMMIT) { if (sig->type == TRX_SIG_COMMIT) {
ut_a(next_thr == NULL); trx_sig_reply(sig, next_thr);
next_thr = trx_sig_reply(sig);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
...@@ -934,8 +936,6 @@ trx_handle_commit_sig_off_kernel( ...@@ -934,8 +936,6 @@ trx_handle_commit_sig_off_kernel(
} }
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
return(next_thr);
} }
/*************************************************************** /***************************************************************
...@@ -997,6 +997,39 @@ trx_lock_wait_to_suspended( ...@@ -997,6 +997,39 @@ trx_lock_wait_to_suspended(
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
} }
/***************************************************************
Moves the query threads in the sig reply wait list of trx to the SUSPENDED
state. */
static
void
trx_sig_reply_wait_to_suspended(
/*============================*/
trx_t* trx) /* in: transaction */
{
trx_sig_t* sig;
que_thr_t* thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
sig = UT_LIST_GET_FIRST(trx->reply_signals);
while (sig != NULL) {
thr = sig->receiver;
ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT);
thr->state = QUE_THR_SUSPENDED;
sig->receiver = NULL;
UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
sig = UT_LIST_GET_FIRST(trx->reply_signals);
}
}
/********************************************************************* /*********************************************************************
Checks the compatibility of a new signal with the other signals in the Checks the compatibility of a new signal with the other signals in the
queue. */ queue. */
...@@ -1076,10 +1109,11 @@ trx_sig_is_compatible( ...@@ -1076,10 +1109,11 @@ trx_sig_is_compatible(
/******************************************************************** /********************************************************************
Sends a signal to a trx object. */ Sends a signal to a trx object. */
que_thr_t* ibool
trx_sig_send( trx_sig_send(
/*=========*/ /*=========*/
/* out: next query thread to run */ /* out: TRUE if the signal was
successfully delivered */
trx_t* trx, /* in: trx handle */ trx_t* trx, /* in: trx handle */
ulint type, /* in: signal type */ ulint type, /* in: signal type */
ulint sender, /* in: TRX_SIG_SELF or ulint sender, /* in: TRX_SIG_SELF or
...@@ -1087,8 +1121,14 @@ trx_sig_send( ...@@ -1087,8 +1121,14 @@ trx_sig_send(
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL; if type is reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */ TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept) /* in: possible rollback savepoint, or trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{ {
trx_sig_t* sig; trx_sig_t* sig;
trx_t* receiver_trx; trx_t* receiver_trx;
...@@ -1098,7 +1138,14 @@ trx_sig_send( ...@@ -1098,7 +1138,14 @@ trx_sig_send(
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
ut_a(trx_sig_is_compatible(trx, type, sender)); if (!trx_sig_is_compatible(trx, type, sender)) {
/* The signal is not compatible with the other signals in
the queue: do nothing */
ut_error;
return(FALSE);
}
/* Queue the signal object */ /* Queue the signal object */
...@@ -1132,6 +1179,11 @@ trx_sig_send( ...@@ -1132,6 +1179,11 @@ trx_sig_send(
sig); sig);
} }
if (trx->sess->state == SESS_ERROR) {
trx_sig_reply_wait_to_suspended(trx);
}
if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) { if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) {
/* The following call will add a TRX_SIG_ERROR_OCCURRED /* The following call will add a TRX_SIG_ERROR_OCCURRED
...@@ -1146,10 +1198,10 @@ trx_sig_send( ...@@ -1146,10 +1198,10 @@ trx_sig_send(
if (UT_LIST_GET_FIRST(trx->signals) == sig) { if (UT_LIST_GET_FIRST(trx->signals) == sig) {
return(trx_sig_start_handle(trx)); trx_sig_start_handle(trx, next_thr);
} }
return(NULL); return(TRUE);
} }
/******************************************************************** /********************************************************************
...@@ -1171,18 +1223,27 @@ trx_end_signal_handling( ...@@ -1171,18 +1223,27 @@ trx_end_signal_handling(
trx->handling_signals = FALSE; trx->handling_signals = FALSE;
trx->graph = trx->graph_before_signal_handling; trx->graph = trx->graph_before_signal_handling;
if (trx->graph && (trx->sess->state == SESS_ERROR)) {
que_fork_error_handle(trx, trx->graph);
}
} }
/******************************************************************** /********************************************************************
Starts handling of a trx signal. */ Starts handling of a trx signal. */
que_thr_t* void
trx_sig_start_handle( trx_sig_start_handle(
/*=================*/ /*=================*/
/* out: next query thread to run, or NULL */ trx_t* trx, /* in: trx handle */
trx_t* trx) /* in: trx handle */ que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{ {
que_thr_t* next_thr = NULL;
trx_sig_t* sig; trx_sig_t* sig;
ulint type; ulint type;
loop: loop:
...@@ -1198,7 +1259,7 @@ trx_sig_start_handle( ...@@ -1198,7 +1259,7 @@ trx_sig_start_handle(
trx_end_signal_handling(trx); trx_end_signal_handling(trx);
return(next_thr); return;
} }
if (trx->conc_state == TRX_NOT_STARTED) { if (trx->conc_state == TRX_NOT_STARTED) {
...@@ -1214,13 +1275,23 @@ trx_sig_start_handle( ...@@ -1214,13 +1275,23 @@ trx_sig_start_handle(
trx_lock_wait_to_suspended(trx); trx_lock_wait_to_suspended(trx);
} }
/* If the session is in the error state and this trx has threads
waiting for reply from signals, moves these threads to the suspended
state, canceling wait reservations; note that if the transaction has
sent a commit or rollback signal to itself, and its session is not in
the error state, then nothing is done here. */
if (trx->sess->state == SESS_ERROR) {
trx_sig_reply_wait_to_suspended(trx);
}
/* If there are no running query threads, we can start processing of a /* If there are no running query threads, we can start processing of a
signal, otherwise we have to wait until all query threads of this signal, otherwise we have to wait until all query threads of this
transaction are aware of the arrival of the signal. */ transaction are aware of the arrival of the signal. */
if (trx->n_active_thrs > 0) { if (trx->n_active_thrs > 0) {
return(NULL); return;
} }
if (trx->handling_signals == FALSE) { if (trx->handling_signals == FALSE) {
...@@ -1234,19 +1305,30 @@ trx_sig_start_handle( ...@@ -1234,19 +1305,30 @@ trx_sig_start_handle(
if (type == TRX_SIG_COMMIT) { if (type == TRX_SIG_COMMIT) {
next_thr = trx_handle_commit_sig_off_kernel(trx); trx_handle_commit_sig_off_kernel(trx, next_thr);
} else if ((type == TRX_SIG_TOTAL_ROLLBACK) } else if ((type == TRX_SIG_TOTAL_ROLLBACK)
|| (type == TRX_SIG_ROLLBACK_TO_SAVEPT) || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) {
|| (type == TRX_SIG_ERROR_OCCURRED)) {
trx_rollback(trx, sig, next_thr);
/* No further signals can be handled until the rollback
completes, therefore we return */
return;
} else if (type == TRX_SIG_ERROR_OCCURRED) {
trx_rollback(trx, sig, next_thr);
/* No further signals can be handled until the rollback /* No further signals can be handled until the rollback
completes, therefore we return */ completes, therefore we return */
return(trx_rollback(trx, sig)); return;
} else if (type == TRX_SIG_BREAK_EXECUTION) { } else if (type == TRX_SIG_BREAK_EXECUTION) {
next_thr = trx_sig_reply(sig); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} else { } else {
ut_error; ut_error;
...@@ -1259,14 +1341,17 @@ trx_sig_start_handle( ...@@ -1259,14 +1341,17 @@ trx_sig_start_handle(
Send the reply message when a signal in the queue of the trx has been Send the reply message when a signal in the queue of the trx has been
handled. */ handled. */
que_thr_t* void
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
/* out: next query thread to run */ trx_sig_t* sig, /* in: signal */
trx_sig_t* sig) /* in: signal */ que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{ {
trx_t* receiver_trx; trx_t* receiver_trx;
que_thr_t* next_thr = NULL;
ut_ad(sig); ut_ad(sig);
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
...@@ -1280,13 +1365,13 @@ trx_sig_reply( ...@@ -1280,13 +1365,13 @@ trx_sig_reply(
UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals, UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals,
sig); sig);
next_thr = que_thr_end_wait(sig->receiver); ut_ad(receiver_trx->sess->state != SESS_ERROR);
que_thr_end_wait(sig->receiver, next_thr);
sig->receiver = NULL; sig->receiver = NULL;
} }
return(next_thr);
} }
/******************************************************************** /********************************************************************
...@@ -1342,6 +1427,7 @@ trx_commit_step( ...@@ -1342,6 +1427,7 @@ trx_commit_step(
{ {
commit_node_t* node; commit_node_t* node;
que_thr_t* next_thr; que_thr_t* next_thr;
ibool success;
node = thr->run_node; node = thr->run_node;
...@@ -1356,15 +1442,22 @@ trx_commit_step( ...@@ -1356,15 +1442,22 @@ trx_commit_step(
node->state = COMMIT_NODE_WAIT; node->state = COMMIT_NODE_WAIT;
next_thr = NULL;
thr->state = QUE_THR_SIG_REPLY_WAIT; thr->state = QUE_THR_SIG_REPLY_WAIT;
/* Send the commit signal to the transaction */ /* Send the commit signal to the transaction */
next_thr = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
TRX_SIG_SELF, thr, NULL); TRX_SIG_SELF, thr, NULL, &next_thr);
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
if (!success) {
/* Error in delivering the commit signal */
que_thr_handle_error(thr, DB_ERROR, NULL, 0);
}
return(next_thr); return(next_thr);
} }
......
...@@ -37,6 +37,8 @@ sess_open(void) ...@@ -37,6 +37,8 @@ sess_open(void)
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
sess = mem_alloc(sizeof(sess_t)); sess = mem_alloc(sizeof(sess_t));
sess->state = SESS_ACTIVE;
sess->trx = trx_create(sess); sess->trx = trx_create(sess);
UT_LIST_INIT(sess->graphs); UT_LIST_INIT(sess->graphs);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment