Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
bfb0e284
Commit
bfb0e284
authored
Jun 17, 2004
by
heikki@hundin.mysql.fi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Cset exclude: marko@hundin.mysql.fi|ChangeSet|20040525171209|56870
parent
46d0e677
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
338 additions
and
101 deletions
+338
-101
innobase/include/Makefile.am
innobase/include/Makefile.am
+1
-1
innobase/include/que0que.h
innobase/include/que0que.h
+18
-4
innobase/include/trx0roll.h
innobase/include/trx0roll.h
+15
-6
innobase/include/trx0trx.h
innobase/include/trx0trx.h
+22
-9
innobase/include/usr0sess.h
innobase/include/usr0sess.h
+6
-0
innobase/que/que0que.c
innobase/que/que0que.c
+87
-20
innobase/srv/Makefile.am
innobase/srv/Makefile.am
+1
-1
innobase/srv/srv0srv.c
innobase/srv/srv0srv.c
+1
-0
innobase/trx/trx0purge.c
innobase/trx/trx0purge.c
+3
-0
innobase/trx/trx0roll.c
innobase/trx/trx0roll.c
+53
-24
innobase/trx/trx0trx.c
innobase/trx/trx0trx.c
+129
-36
innobase/usr/usr0sess.c
innobase/usr/usr0sess.c
+2
-0
No files found.
innobase/include/Makefile.am
View file @
bfb0e284
...
...
@@ -43,7 +43,7 @@ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \
row0purge.ic row0row.h row0row.ic row0sel.h row0sel.ic
\
row0types.h row0uins.h row0uins.ic row0umod.h row0umod.ic
\
row0undo.h row0undo.ic row0upd.h row0upd.ic row0vers.h
\
row0vers.ic srv0srv.h srv0srv.ic srv0start.h
\
row0vers.ic srv0
que.h srv0
srv.h srv0srv.ic srv0start.h
\
sync0arr.h sync0arr.ic sync0rw.h
\
sync0rw.ic sync0sync.h sync0sync.ic sync0types.h
\
thr0loc.h thr0loc.ic trx0purge.h trx0purge.ic trx0rec.h
\
...
...
innobase/include/que0que.h
View file @
bfb0e284
...
...
@@ -152,6 +152,17 @@ que_run_threads(
/*============*/
que_thr_t
*
thr
);
/* in: query thread which is run initially */
/**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle
(
/*==================*/
trx_t
*
trx
,
/* in: trx */
que_t
*
fork
);
/* in: query graph which was run before signal
handling started, NULL not allowed */
/**************************************************************************
Handles an SQL error noticed during query thread execution. At the moment,
does nothing! */
...
...
@@ -170,15 +181,18 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure
completion. */
que_thr_t
*
void
que_thr_end_wait
(
/*=============*/
/* out: next query thread to run;
NULL if none */
que_thr_t
*
thr
);
/* in: query thread in the
que_thr_t
*
thr
,
/* in: query thread in the
QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */
...
...
innobase/include/trx0roll.h
View file @
bfb0e284
...
...
@@ -91,12 +91,16 @@ trx_undo_rec_release(
/*************************************************************************
Starts a rollback operation. */
que_thr_t
*
void
trx_rollback
(
/*=========*/
/* out: next query thread to run */
trx_t
*
trx
,
/* in: transaction */
trx_sig_t
*
sig
);
/* in: signal starting the rollback */
trx_sig_t
*
sig
,
/* in: signal starting the rollback */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/***********************************************************************
Rollback or clean up transactions which have no user session. If the
transaction already was committed, then we clean up a possible insert
...
...
@@ -108,12 +112,17 @@ trx_rollback_or_clean_all_without_sess(void);
/********************************************************************
Finishes a transaction rollback. */
que_thr_t
*
void
trx_finish_rollback_off_kernel
(
/*===========================*/
/* out: next query thread to run */
que_t
*
graph
,
/* in: undo graph which can now be freed */
trx_t
*
trx
);
/* in: transaction */
trx_t
*
trx
,
/* in: transaction */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
/********************************************************************
Builds an undo 'query' graph for a transaction. The actual rollback is
performed by executing this query graph like a query subprocedure call.
...
...
innobase/include/trx0trx.h
View file @
bfb0e284
...
...
@@ -194,10 +194,9 @@ trx_end_lock_wait(
/********************************************************************
Sends a signal to a trx object. */
que_thr_t
*
ibool
trx_sig_send
(
/*=========*/
/* out: next query thread to run */
/* out: TRUE if the signal was
successfully delivered */
trx_t
*
trx
,
/* in: trx handle */
...
...
@@ -207,17 +206,27 @@ trx_sig_send(
que_thr_t
*
receiver_thr
,
/* in: query thread which wants the
reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t
*
savept
);
/* in: possible rollback savepoint, or
trx_savept_t
*
savept
,
/* in: possible rollback savepoint, or
NULL */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
/********************************************************************
Send the reply message when a signal in the queue of the trx has
been handled. */
que_thr_t
*
void
trx_sig_reply
(
/*==========*/
/* out: next query thread to run */
trx_sig_t
*
sig
);
/* in: signal */
trx_sig_t
*
sig
,
/* in: signal */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/********************************************************************
Removes the signal object from a trx signal queue. */
...
...
@@ -229,11 +238,15 @@ trx_sig_remove(
/********************************************************************
Starts handling of a trx signal. */
que_thr_t
*
void
trx_sig_start_handle
(
/*=================*/
/* out: next query thread to run, or NULL */
trx_t
*
trx
);
/* in: trx handle */
trx_t
*
trx
,
/* in: trx handle */
que_thr_t
**
next_thr
);
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
/********************************************************************
Ends signal handling. If the session is in the error state, and
trx->graph_before_signal_handling != NULL, returns control to the error
...
...
innobase/include/usr0sess.h
View file @
bfb0e284
...
...
@@ -38,6 +38,7 @@ sess_try_close(
/* The session handle. All fields are protected by the kernel mutex */
struct
sess_struct
{
ulint
state
;
/* state of the session */
trx_t
*
trx
;
/* transaction object permanently
assigned for the session: the
transaction instance designated by the
...
...
@@ -48,6 +49,11 @@ struct sess_struct{
session */
};
/* Session states */
#define SESS_ACTIVE 1
#define SESS_ERROR 2
/* session contains an error message
which has not yet been communicated
to the client */
#ifndef UNIV_NONINL
#include "usr0sess.ic"
#endif
...
...
innobase/que/que0que.c
View file @
bfb0e284
...
...
@@ -12,6 +12,7 @@ Created 5/27/1996 Heikki Tuuri
#include "que0que.ic"
#endif
#include "srv0que.h"
#include "usr0sess.h"
#include "trx0trx.h"
#include "trx0roll.h"
...
...
@@ -174,15 +175,19 @@ a single worker thread to execute it. This function should be used to end
the wait state of a query thread waiting for a lock or a stored procedure
completion. */
que_thr_t
*
void
que_thr_end_wait
(
/*=============*/
/* out: next query thread to run;
NULL if none */
que_thr_t
*
thr
)
/* in: query thread in the
que_thr_t
*
thr
,
/* in: query thread in the
QUE_THR_LOCK_WAIT,
or QUE_THR_PROCEDURE_WAIT, or
QUE_THR_SIG_REPLY_WAIT state */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if NULL is passed
as the parameter, it is ignored */
{
ibool
was_active
;
...
...
@@ -201,8 +206,17 @@ que_thr_end_wait(
que_thr_move_to_run_state
(
thr
);
return
(
was_active
?
NULL
:
thr
);
}
if
(
was_active
)
{
return
;
}
if
(
next_thr
&&
*
next_thr
==
NULL
)
{
*
next_thr
=
thr
;
}
else
{
srv_que_task_enqueue_low
(
thr
);
}
}
/**************************************************************************
Same as que_thr_end_wait, but no parameter next_thr available. */
...
...
@@ -239,6 +253,8 @@ que_thr_end_wait_no_next_thr(
for the lock to be released: */
srv_release_mysql_thread_if_suspended
(
thr
);
/* srv_que_task_enqueue_low(thr); */
}
/**************************************************************************
...
...
@@ -339,6 +355,48 @@ que_fork_start_command(
return
(
NULL
);
}
/**************************************************************************
After signal handling is finished, returns control to a query graph error
handling routine. (Currently, just returns the control to the root of the
graph so that the graph can communicate an error message to the client.) */
void
que_fork_error_handle
(
/*==================*/
trx_t
*
trx
__attribute__
((
unused
)),
/* in: trx */
que_t
*
fork
)
/* in: query graph which was run before signal
handling started, NULL not allowed */
{
que_thr_t
*
thr
;
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
#endif
/* UNIV_SYNC_DEBUG */
ut_ad
(
trx
->
sess
->
state
==
SESS_ERROR
);
ut_ad
(
UT_LIST_GET_LEN
(
trx
->
reply_signals
)
==
0
);
ut_ad
(
UT_LIST_GET_LEN
(
trx
->
wait_thrs
)
==
0
);
thr
=
UT_LIST_GET_FIRST
(
fork
->
thrs
);
while
(
thr
!=
NULL
)
{
ut_ad
(
!
thr
->
is_active
);
ut_ad
(
thr
->
state
!=
QUE_THR_SIG_REPLY_WAIT
);
ut_ad
(
thr
->
state
!=
QUE_THR_LOCK_WAIT
);
thr
->
run_node
=
thr
;
thr
->
prev_node
=
thr
->
child
;
thr
->
state
=
QUE_THR_COMPLETED
;
thr
=
UT_LIST_GET_NEXT
(
thrs
,
thr
);
}
thr
=
UT_LIST_GET_FIRST
(
fork
->
thrs
);
que_thr_move_to_run_state
(
thr
);
srv_que_task_enqueue_low
(
thr
);
}
/********************************************************************
Tests if all the query threads in the same fork have a given state. */
UNIV_INLINE
...
...
@@ -707,18 +765,22 @@ this function may only be called from inside que_run_threads or
que_thr_check_if_switch! These restrictions exist to make the rollback code
easier to maintain. */
static
que_thr_t
*
void
que_thr_dec_refer_count
(
/*====================*/
/* out: next query thread to run */
que_thr_t
*
thr
)
/* in: query thread */
que_thr_t
*
thr
,
/* in: query thread */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{
que_fork_t
*
fork
;
trx_t
*
trx
;
sess_t
*
sess
;
ulint
fork_type
;
que_thr_t
*
next_thr
=
NULL
;
ibool
stopped
;
fork
=
thr
->
common
.
parent
;
trx
=
thr
->
graph
->
trx
;
sess
=
trx
->
sess
;
...
...
@@ -729,7 +791,9 @@ que_thr_dec_refer_count(
if
(
thr
->
state
==
QUE_THR_RUNNING
)
{
if
(
!
que_thr_stop
(
thr
))
{
stopped
=
que_thr_stop
(
thr
);
if
(
!
stopped
)
{
/* The reason for the thr suspension or wait was
already canceled before we came here: continue
running the thread */
...
...
@@ -737,9 +801,15 @@ que_thr_dec_refer_count(
/* fputs("!!!!!!!! Wait already ended: continue thr\n",
stderr); */
if
(
next_thr
&&
*
next_thr
==
NULL
)
{
*
next_thr
=
thr
;
}
else
{
srv_que_task_enqueue_low
(
thr
);
}
mutex_exit
(
&
kernel_mutex
);
return
(
thr
)
;
return
;
}
}
...
...
@@ -755,7 +825,7 @@ que_thr_dec_refer_count(
mutex_exit
(
&
kernel_mutex
);
return
(
next_thr
)
;
return
;
}
fork_type
=
fork
->
fork_type
;
...
...
@@ -771,7 +841,7 @@ que_thr_dec_refer_count(
ut_ad
(
UT_LIST_GET_LEN
(
trx
->
signals
)
>
0
);
ut_ad
(
trx
->
handling_signals
==
TRUE
);
next_thr
=
trx_finish_rollback_off_kernel
(
fork
,
trx
);
trx_finish_rollback_off_kernel
(
fork
,
trx
,
next_thr
);
}
else
if
(
fork_type
==
QUE_FORK_PURGE
)
{
...
...
@@ -793,7 +863,7 @@ que_thr_dec_refer_count(
zero, then we start processing a signal; from it we may get
a new query thread to run */
next_thr
=
trx_sig_start_handle
(
trx
);
trx_sig_start_handle
(
trx
,
next_thr
);
}
if
(
trx
->
handling_signals
&&
UT_LIST_GET_LEN
(
trx
->
signals
)
==
0
)
{
...
...
@@ -802,8 +872,6 @@ que_thr_dec_refer_count(
}
mutex_exit
(
&
kernel_mutex
);
return
(
next_thr
);
}
/**************************************************************************
...
...
@@ -1175,7 +1243,6 @@ que_run_threads(
/*-------------------------*/
next_thr
=
que_thr_step
(
thr
);
/*-------------------------*/
ut_a
(
next_thr
==
thr
||
next_thr
==
NULL
);
/* Test the effect on performance of adding extra mutex
reservations */
...
...
@@ -1192,7 +1259,7 @@ que_run_threads(
loop_count
++
;
if
(
next_thr
!=
thr
)
{
next_thr
=
que_thr_dec_refer_count
(
thr
);
que_thr_dec_refer_count
(
thr
,
&
next_
thr
);
if
(
next_thr
==
NULL
)
{
...
...
innobase/srv/Makefile.am
View file @
bfb0e284
...
...
@@ -19,6 +19,6 @@ include ../include/Makefile.i
noinst_LIBRARIES
=
libsrv.a
libsrv_a_SOURCES
=
srv0srv.c srv0start.c
libsrv_a_SOURCES
=
srv0srv.c srv0
que.c srv0
start.c
EXTRA_PROGRAMS
=
innobase/srv/srv0srv.c
View file @
bfb0e284
...
...
@@ -34,6 +34,7 @@ Created 10/8/1995 Heikki Tuuri
#include "sync0sync.h"
#include "thr0loc.h"
#include "que0que.h"
#include "srv0que.h"
#include "log0recv.h"
#include "pars0pars.h"
#include "usr0sess.h"
...
...
innobase/trx/trx0purge.c
View file @
bfb0e284
...
...
@@ -23,6 +23,7 @@ Created 3/26/1996 Heikki Tuuri
#include "row0purge.h"
#include "row0upd.h"
#include "trx0rec.h"
#include "srv0que.h"
#include "os0thread.h"
/* The global data structure coordinating a purge */
...
...
@@ -1059,6 +1060,8 @@ trx_purge(void)
mutex_exit
(
&
kernel_mutex
);
/* srv_que_task_enqueue(thr2); */
if
(
srv_print_thread_releases
)
{
fputs
(
"Starting purge
\n
"
,
stderr
);
...
...
innobase/trx/trx0roll.c
View file @
bfb0e284
...
...
@@ -20,6 +20,7 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0rec.h"
#include "que0que.h"
#include "usr0sess.h"
#include "srv0que.h"
#include "srv0start.h"
#include "row0undo.h"
#include "row0mysql.h"
...
...
@@ -931,15 +932,21 @@ trx_undo_rec_release(
/*************************************************************************
Starts a rollback operation. */
que_thr_t
*
void
trx_rollback
(
/*=========*/
/* out: next query thread to run */
trx_t
*
trx
,
/* in: transaction */
trx_sig_t
*
sig
)
/* in: signal starting the rollback */
trx_sig_t
*
sig
,
/* in: signal starting the rollback */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the passed value is
NULL, the parameter is ignored */
{
que_t
*
roll_graph
;
que_thr_t
*
thr
;
/* que_thr_t* thr2; */
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
...
...
@@ -981,7 +988,18 @@ trx_rollback(
thr
=
que_fork_start_command
(
roll_graph
);
ut_ad
(
thr
);
return
(
thr
);
/* thr2 = que_fork_start_command(roll_graph);
ut_ad(thr2); */
if
(
next_thr
&&
(
*
next_thr
==
NULL
))
{
*
next_thr
=
thr
;
/* srv_que_task_enqueue_low(thr2); */
}
else
{
srv_que_task_enqueue_low
(
thr
);
/* srv_que_task_enqueue_low(thr2); */
}
}
/********************************************************************
...
...
@@ -1053,14 +1071,17 @@ trx_finish_error_processing(
/*************************************************************************
Finishes a partial rollback operation. */
static
que_thr_t
*
void
trx_finish_partial_rollback_off_kernel
(
/*===================================*/
/* out: next query thread to run */
trx_t
*
trx
)
/* in: transaction */
trx_t
*
trx
,
/* in: transaction */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is a pointer
to a NULL pointer, then the calling function
can start running a new query thread; if this
parameter is NULL, it is ignored */
{
trx_sig_t
*
sig
;
que_thr_t
*
next_thr
;
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
...
...
@@ -1071,26 +1092,29 @@ trx_finish_partial_rollback_off_kernel(
/* Remove the signal from the signal queue and send reply message
to it */
next_thr
=
trx_sig_reply
(
sig
);
trx_sig_reply
(
sig
,
next_thr
);
trx_sig_remove
(
trx
,
sig
);
trx
->
que_state
=
TRX_QUE_RUNNING
;
return
(
next_thr
);
}
/********************************************************************
Finishes a transaction rollback. */
que_thr_t
*
void
trx_finish_rollback_off_kernel
(
/*===========================*/
/* out: next query thread to run */
que_t
*
graph
,
/* in: undo graph which can now be freed */
trx_t
*
trx
)
/* in: transaction */
trx_t
*
trx
,
/* in: transaction */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if this parameter is
NULL, it is ignored */
{
trx_sig_t
*
sig
;
trx_sig_t
*
next_sig
;
que_thr_t
*
next_thr
;
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
...
...
@@ -1105,13 +1129,15 @@ trx_finish_rollback_off_kernel(
if
(
sig
->
type
==
TRX_SIG_ROLLBACK_TO_SAVEPT
)
{
return
(
trx_finish_partial_rollback_off_kernel
(
trx
));
trx_finish_partial_rollback_off_kernel
(
trx
,
next_thr
);
return
;
}
else
if
(
sig
->
type
==
TRX_SIG_ERROR_OCCURRED
)
{
trx_finish_error_processing
(
trx
);
return
(
NULL
)
;
return
;
}
if
(
lock_print_waits
)
{
...
...
@@ -1125,23 +1151,19 @@ trx_finish_rollback_off_kernel(
send reply messages to them */
trx
->
que_state
=
TRX_QUE_RUNNING
;
next_thr
=
NULL
;
while
(
sig
!=
NULL
)
{
next_sig
=
UT_LIST_GET_NEXT
(
signals
,
sig
);
if
(
sig
->
type
==
TRX_SIG_TOTAL_ROLLBACK
)
{
ut_a
(
next_thr
==
NULL
);
next_thr
=
trx_sig_reply
(
sig
);
trx_sig_reply
(
sig
,
next_thr
);
trx_sig_remove
(
trx
,
sig
);
}
sig
=
next_sig
;
}
return
(
next_thr
);
}
/*************************************************************************
...
...
@@ -1174,6 +1196,7 @@ trx_rollback_step(
que_thr_t
*
thr
)
/* in: query thread */
{
roll_node_t
*
node
;
ibool
success
;
ulint
sig_no
;
trx_savept_t
*
savept
;
...
...
@@ -1200,13 +1223,19 @@ trx_rollback_step(
/* Send a rollback signal to the transaction */
trx_sig_send
(
thr_get_trx
(
thr
),
sig_no
,
TRX_SIG_SELF
,
thr
,
savept
);
success
=
trx_sig_send
(
thr_get_trx
(
thr
),
sig_no
,
TRX_SIG_SELF
,
thr
,
savept
,
NULL
);
thr
->
state
=
QUE_THR_SIG_REPLY_WAIT
;
mutex_exit
(
&
kernel_mutex
);
if
(
!
success
)
{
/* Error in delivering the rollback signal */
que_thr_handle_error
(
thr
,
DB_ERROR
,
NULL
,
0
);
}
return
(
NULL
);
}
...
...
innobase/trx/trx0trx.c
View file @
bfb0e284
...
...
@@ -895,15 +895,18 @@ trx_assign_read_view(
/********************************************************************
Commits a transaction. NOTE that the kernel mutex is temporarily released. */
static
que_thr_t
*
void
trx_handle_commit_sig_off_kernel
(
/*=============================*/
/* out: next query thread to run */
trx_t
*
trx
)
/* in: transaction */
trx_t
*
trx
,
/* in: transaction */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{
trx_sig_t
*
sig
;
trx_sig_t
*
next_sig
;
que_thr_t
*
next_thr
=
NULL
;
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
...
...
@@ -925,8 +928,7 @@ trx_handle_commit_sig_off_kernel(
if
(
sig
->
type
==
TRX_SIG_COMMIT
)
{
ut_a
(
next_thr
==
NULL
);
next_thr
=
trx_sig_reply
(
sig
);
trx_sig_reply
(
sig
,
next_thr
);
trx_sig_remove
(
trx
,
sig
);
}
...
...
@@ -934,8 +936,6 @@ trx_handle_commit_sig_off_kernel(
}
trx
->
que_state
=
TRX_QUE_RUNNING
;
return
(
next_thr
);
}
/***************************************************************
...
...
@@ -997,6 +997,39 @@ trx_lock_wait_to_suspended(
trx
->
que_state
=
TRX_QUE_RUNNING
;
}
/***************************************************************
Moves the query threads in the sig reply wait list of trx to the SUSPENDED
state. */
static
void
trx_sig_reply_wait_to_suspended
(
/*============================*/
trx_t
*
trx
)
/* in: transaction */
{
trx_sig_t
*
sig
;
que_thr_t
*
thr
;
#ifdef UNIV_SYNC_DEBUG
ut_ad
(
mutex_own
(
&
kernel_mutex
));
#endif
/* UNIV_SYNC_DEBUG */
sig
=
UT_LIST_GET_FIRST
(
trx
->
reply_signals
);
while
(
sig
!=
NULL
)
{
thr
=
sig
->
receiver
;
ut_ad
(
thr
->
state
==
QUE_THR_SIG_REPLY_WAIT
);
thr
->
state
=
QUE_THR_SUSPENDED
;
sig
->
receiver
=
NULL
;
UT_LIST_REMOVE
(
reply_signals
,
trx
->
reply_signals
,
sig
);
sig
=
UT_LIST_GET_FIRST
(
trx
->
reply_signals
);
}
}
/*********************************************************************
Checks the compatibility of a new signal with the other signals in the
queue. */
...
...
@@ -1076,10 +1109,11 @@ trx_sig_is_compatible(
/********************************************************************
Sends a signal to a trx object. */
que_thr_t
*
ibool
trx_sig_send
(
/*=========*/
/* out: next query thread to run */
/* out: TRUE if the signal was
successfully delivered */
trx_t
*
trx
,
/* in: trx handle */
ulint
type
,
/* in: signal type */
ulint
sender
,
/* in: TRX_SIG_SELF or
...
...
@@ -1087,8 +1121,14 @@ trx_sig_send(
que_thr_t
*
receiver_thr
,
/* in: query thread which wants the
reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t
*
savept
)
/* in: possible rollback savepoint, or
trx_savept_t
*
savept
,
/* in: possible rollback savepoint, or
NULL */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{
trx_sig_t
*
sig
;
trx_t
*
receiver_trx
;
...
...
@@ -1098,7 +1138,14 @@ trx_sig_send(
ut_ad
(
mutex_own
(
&
kernel_mutex
));
#endif
/* UNIV_SYNC_DEBUG */
ut_a
(
trx_sig_is_compatible
(
trx
,
type
,
sender
));
if
(
!
trx_sig_is_compatible
(
trx
,
type
,
sender
))
{
/* The signal is not compatible with the other signals in
the queue: do nothing */
ut_error
;
return
(
FALSE
);
}
/* Queue the signal object */
...
...
@@ -1132,6 +1179,11 @@ trx_sig_send(
sig
);
}
if
(
trx
->
sess
->
state
==
SESS_ERROR
)
{
trx_sig_reply_wait_to_suspended
(
trx
);
}
if
((
sender
!=
TRX_SIG_SELF
)
||
(
type
==
TRX_SIG_BREAK_EXECUTION
))
{
/* The following call will add a TRX_SIG_ERROR_OCCURRED
...
...
@@ -1146,10 +1198,10 @@ trx_sig_send(
if
(
UT_LIST_GET_FIRST
(
trx
->
signals
)
==
sig
)
{
return
(
trx_sig_start_handle
(
trx
)
);
trx_sig_start_handle
(
trx
,
next_thr
);
}
return
(
NULL
);
return
(
TRUE
);
}
/********************************************************************
...
...
@@ -1171,18 +1223,27 @@ trx_end_signal_handling(
trx
->
handling_signals
=
FALSE
;
trx
->
graph
=
trx
->
graph_before_signal_handling
;
if
(
trx
->
graph
&&
(
trx
->
sess
->
state
==
SESS_ERROR
))
{
que_fork_error_handle
(
trx
,
trx
->
graph
);
}
}
/********************************************************************
Starts handling of a trx signal. */
que_thr_t
*
void
trx_sig_start_handle
(
/*=================*/
/* out: next query thread to run, or NULL */
trx_t
*
trx
)
/* in: trx handle */
trx_t
*
trx
,
/* in: trx handle */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread; if the parameter
is NULL, it is ignored */
{
que_thr_t
*
next_thr
=
NULL
;
trx_sig_t
*
sig
;
ulint
type
;
loop:
...
...
@@ -1198,7 +1259,7 @@ trx_sig_start_handle(
trx_end_signal_handling
(
trx
);
return
(
next_thr
)
;
return
;
}
if
(
trx
->
conc_state
==
TRX_NOT_STARTED
)
{
...
...
@@ -1214,13 +1275,23 @@ trx_sig_start_handle(
trx_lock_wait_to_suspended
(
trx
);
}
/* If the session is in the error state and this trx has threads
waiting for reply from signals, moves these threads to the suspended
state, canceling wait reservations; note that if the transaction has
sent a commit or rollback signal to itself, and its session is not in
the error state, then nothing is done here. */
if
(
trx
->
sess
->
state
==
SESS_ERROR
)
{
trx_sig_reply_wait_to_suspended
(
trx
);
}
/* If there are no running query threads, we can start processing of a
signal, otherwise we have to wait until all query threads of this
transaction are aware of the arrival of the signal. */
if
(
trx
->
n_active_thrs
>
0
)
{
return
(
NULL
)
;
return
;
}
if
(
trx
->
handling_signals
==
FALSE
)
{
...
...
@@ -1234,19 +1305,30 @@ trx_sig_start_handle(
if
(
type
==
TRX_SIG_COMMIT
)
{
next_thr
=
trx_handle_commit_sig_off_kernel
(
trx
);
trx_handle_commit_sig_off_kernel
(
trx
,
next_thr
);
}
else
if
((
type
==
TRX_SIG_TOTAL_ROLLBACK
)
||
(
type
==
TRX_SIG_ROLLBACK_TO_SAVEPT
)
||
(
type
==
TRX_SIG_ERROR_OCCURRED
))
{
||
(
type
==
TRX_SIG_ROLLBACK_TO_SAVEPT
))
{
trx_rollback
(
trx
,
sig
,
next_thr
);
/* No further signals can be handled until the rollback
completes, therefore we return */
return
;
}
else
if
(
type
==
TRX_SIG_ERROR_OCCURRED
)
{
trx_rollback
(
trx
,
sig
,
next_thr
);
/* No further signals can be handled until the rollback
completes, therefore we return */
return
(
trx_rollback
(
trx
,
sig
))
;
return
;
}
else
if
(
type
==
TRX_SIG_BREAK_EXECUTION
)
{
next_thr
=
trx_sig_reply
(
sig
);
trx_sig_reply
(
sig
,
next_thr
);
trx_sig_remove
(
trx
,
sig
);
}
else
{
ut_error
;
...
...
@@ -1259,14 +1341,17 @@ trx_sig_start_handle(
Send the reply message when a signal in the queue of the trx has been
handled. */
que_thr_t
*
void
trx_sig_reply
(
/*==========*/
/* out: next query thread to run */
trx_sig_t
*
sig
)
/* in: signal */
trx_sig_t
*
sig
,
/* in: signal */
que_thr_t
**
next_thr
)
/* in/out: next query thread to run;
if the value which is passed in is
a pointer to a NULL pointer, then the
calling function can start running
a new query thread */
{
trx_t
*
receiver_trx
;
que_thr_t
*
next_thr
=
NULL
;
trx_t
*
receiver_trx
;
ut_ad
(
sig
);
#ifdef UNIV_SYNC_DEBUG
...
...
@@ -1280,13 +1365,13 @@ trx_sig_reply(
UT_LIST_REMOVE
(
reply_signals
,
receiver_trx
->
reply_signals
,
sig
);
next_thr
=
que_thr_end_wait
(
sig
->
receiver
);
ut_ad
(
receiver_trx
->
sess
->
state
!=
SESS_ERROR
);
que_thr_end_wait
(
sig
->
receiver
,
next_thr
);
sig
->
receiver
=
NULL
;
}
return
(
next_thr
);
}
/********************************************************************
...
...
@@ -1342,6 +1427,7 @@ trx_commit_step(
{
commit_node_t
*
node
;
que_thr_t
*
next_thr
;
ibool
success
;
node
=
thr
->
run_node
;
...
...
@@ -1356,15 +1442,22 @@ trx_commit_step(
node
->
state
=
COMMIT_NODE_WAIT
;
next_thr
=
NULL
;
thr
->
state
=
QUE_THR_SIG_REPLY_WAIT
;
/* Send the commit signal to the transaction */
next_thr
=
trx_sig_send
(
thr_get_trx
(
thr
),
TRX_SIG_COMMIT
,
TRX_SIG_SELF
,
thr
,
NULL
);
success
=
trx_sig_send
(
thr_get_trx
(
thr
),
TRX_SIG_COMMIT
,
TRX_SIG_SELF
,
thr
,
NULL
,
&
next_thr
);
mutex_exit
(
&
kernel_mutex
);
if
(
!
success
)
{
/* Error in delivering the commit signal */
que_thr_handle_error
(
thr
,
DB_ERROR
,
NULL
,
0
);
}
return
(
next_thr
);
}
...
...
innobase/usr/usr0sess.c
View file @
bfb0e284
...
...
@@ -37,6 +37,8 @@ sess_open(void)
#endif
/* UNIV_SYNC_DEBUG */
sess
=
mem_alloc
(
sizeof
(
sess_t
));
sess
->
state
=
SESS_ACTIVE
;
sess
->
trx
=
trx_create
(
sess
);
UT_LIST_INIT
(
sess
->
graphs
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment