Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
95e87f08
Commit
95e87f08
authored
Feb 16, 2006
by
andrey@lmy004
Browse files
Options
Browse Files
Download
Plain Diff
Merge ahristov@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into lmy004.:/work/mysql-5.1-bug16406
parents
6892c2a5
2158add8
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
704 additions
and
128 deletions
+704
-128
mysql-test/r/events_stress.result
mysql-test/r/events_stress.result
+46
-0
mysql-test/t/events.test
mysql-test/t/events.test
+0
-1
mysql-test/t/events_stress.test
mysql-test/t/events_stress.test
+80
-0
sql/event.cc
sql/event.cc
+217
-12
sql/event.h
sql/event.h
+52
-8
sql/event_executor.cc
sql/event_executor.cc
+162
-101
sql/event_priv.h
sql/event_priv.h
+8
-4
sql/event_timed.cc
sql/event_timed.cc
+137
-2
sql/sql_db.cc
sql/sql_db.cc
+2
-0
No files found.
mysql-test/r/events_stress.result
0 → 100644
View file @
95e87f08
CREATE DATABASE IF NOT EXISTS events_test;
CREATE DATABASE events_test2;
USE events_test2;
CREATE EVENT ev_drop1 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1;
CREATE EVENT ev_drop2 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1;
CREATE EVENT ev_drop3 ON SCHEDULE EVERY 10 MINUTE DISABLE DO SELECT 1;
USE events_test;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2';
COUNT(*)
3
DROP DATABASE events_test2;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2';
COUNT(*)
0
"Now testing stability - dropping db -> events while they are running"
CREATE DATABASE events_test2;
USE events_test2;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2';
COUNT(*)
1000
SET GLOBAL event_scheduler=1;
DROP DATABASE events_test2;
SET GLOBAL event_scheduler=0;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2';
COUNT(*)
0
CREATE DATABASE events_test3;
USE events_test3;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test3';
COUNT(*)
950
CREATE DATABASE events_test4;
USE events_test4;
CREATE DATABASE events_test2;
USE events_test2;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_test2';
COUNT(*)
1050
DROP DATABASE events_test2;
SET GLOBAL event_scheduler=0;
DROP DATABASE events_test3;
SET GLOBAL event_scheduler=1;
DROP DATABASE events_test4;
SET GLOBAL event_scheduler=1;
USE events_test;
DROP DATABASE events_test;
mysql-test/t/events.test
View file @
95e87f08
...
@@ -274,7 +274,6 @@ drop event one_event;
...
@@ -274,7 +274,6 @@ drop event one_event;
--
echo
"Sleep a bit so the server closes the second connection"
--
echo
"Sleep a bit so the server closes the second connection"
--
sleep
2
--
sleep
2
create
event
e_26
on
schedule
at
'2017-01-01 00:00:00'
disable
do
set
@
a
=
5
;
create
event
e_26
on
schedule
at
'2017-01-01 00:00:00'
disable
do
set
@
a
=
5
;
select
db
,
name
,
body
,
definer
,
convert_tz
(
execute_at
,
'UTC'
,
'SYSTEM'
),
on_completion
from
mysql
.
event
;
select
db
,
name
,
body
,
definer
,
convert_tz
(
execute_at
,
'UTC'
,
'SYSTEM'
),
on_completion
from
mysql
.
event
;
drop
event
e_26
;
drop
event
e_26
;
...
...
mysql-test/t/events_stress.test
0 → 100644
View file @
95e87f08
CREATE
DATABASE
IF
NOT
EXISTS
events_test
;
#
# DROP DATABASE test start (bug #16406)
#
CREATE
DATABASE
events_test2
;
USE
events_test2
;
CREATE
EVENT
ev_drop1
ON
SCHEDULE
EVERY
10
MINUTE
DISABLE
DO
SELECT
1
;
CREATE
EVENT
ev_drop2
ON
SCHEDULE
EVERY
10
MINUTE
DISABLE
DO
SELECT
1
;
CREATE
EVENT
ev_drop3
ON
SCHEDULE
EVERY
10
MINUTE
DISABLE
DO
SELECT
1
;
USE
events_test
;
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test2'
;
DROP
DATABASE
events_test2
;
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test2'
;
--
echo
"Now testing stability - dropping db -> events while they are running"
CREATE
DATABASE
events_test2
;
USE
events_test2
;
--
disable_query_log
let
$
1
=
1000
;
while
(
$
1
)
{
eval
CREATE
EVENT
ev_drop
$
1
ON
SCHEDULE
EVERY
1
SECOND
DO
SELECT
$
1
;
dec
$
1
;
}
--
enable_query_log
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test2'
;
SET
GLOBAL
event_scheduler
=
1
;
--
sleep
4
DROP
DATABASE
events_test2
;
SET
GLOBAL
event_scheduler
=
0
;
--
sleep
2
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test2'
;
CREATE
DATABASE
events_test3
;
USE
events_test3
;
--
disable_query_log
let
$
1
=
950
;
while
(
$
1
)
{
eval
CREATE
EVENT
ev_drop
$
1
ON
SCHEDULE
EVERY
1
SECOND
DO
SELECT
$
1
;
dec
$
1
;
}
--
enable_query_log
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test3'
;
--
sleep
3
CREATE
DATABASE
events_test4
;
USE
events_test4
;
--
disable_query_log
let
$
1
=
860
;
while
(
$
1
)
{
eval
CREATE
EVENT
ev_drop
$
1
ON
SCHEDULE
EVERY
1
SECOND
DO
SELECT
$
1
;
dec
$
1
;
}
--
enable_query_log
CREATE
DATABASE
events_test2
;
USE
events_test2
;
--
disable_query_log
let
$
1
=
1050
;
while
(
$
1
)
{
eval
CREATE
EVENT
ev_drop
$
1
ON
SCHEDULE
EVERY
1
SECOND
DO
SELECT
$
1
;
dec
$
1
;
}
--
enable_query_log
SELECT
COUNT
(
*
)
FROM
INFORMATION_SCHEMA
.
EVENTS
WHERE
EVENT_SCHEMA
=
'events_test2'
;
--
sleep
6
DROP
DATABASE
events_test2
;
SET
GLOBAL
event_scheduler
=
0
;
DROP
DATABASE
events_test3
;
SET
GLOBAL
event_scheduler
=
1
;
DROP
DATABASE
events_test4
;
SET
GLOBAL
event_scheduler
=
1
;
USE
events_test
;
#
# DROP DATABASE test end (bug #16406)
#
DROP
DATABASE
events_test
;
sql/event.cc
View file @
95e87f08
...
@@ -514,6 +514,28 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
...
@@ -514,6 +514,28 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
SYNOPSIS
SYNOPSIS
evex_db_find_event_aux()
evex_db_find_event_aux()
thd Thread context
et evet_timed object containing dbname, name & definer
table TABLE object for open mysql.event table.
RETURN VALUE
0 - Routine found
EVEX_KEY_NOT_FOUND - No routine with given name
*/
inline
int
evex_db_find_event_aux
(
THD
*
thd
,
event_timed
*
et
,
TABLE
*
table
)
{
return
evex_db_find_event_by_name
(
thd
,
et
->
dbname
,
et
->
name
,
et
->
definer
,
table
);
}
/*
Find row in open mysql.event table representing event
SYNOPSIS
evex_db_find_event_by_name()
thd Thread context
thd Thread context
dbname Name of event's database
dbname Name of event's database
rname Name of the event inside the db
rname Name of the event inside the db
...
@@ -525,13 +547,13 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
...
@@ -525,13 +547,13 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table)
*/
*/
int
int
evex_db_find_event_
aux
(
THD
*
thd
,
const
LEX_STRING
dbname
,
evex_db_find_event_
by_name
(
THD
*
thd
,
const
LEX_STRING
dbname
,
const
LEX_STRING
ev_name
,
const
LEX_STRING
ev_name
,
const
LEX_STRING
user_name
,
const
LEX_STRING
user_name
,
TABLE
*
table
)
TABLE
*
table
)
{
{
byte
key
[
MAX_KEY_LENGTH
];
byte
key
[
MAX_KEY_LENGTH
];
DBUG_ENTER
(
"evex_db_find_event_
aux
"
);
DBUG_ENTER
(
"evex_db_find_event_
by_name
"
);
DBUG_PRINT
(
"enter"
,
(
"name: %.*s"
,
ev_name
.
length
,
ev_name
.
str
));
DBUG_PRINT
(
"enter"
,
(
"name: %.*s"
,
ev_name
.
length
,
ev_name
.
str
));
/*
/*
...
@@ -710,7 +732,7 @@ db_create_event(THD *thd, event_timed *et, my_bool create_if_not,
...
@@ -710,7 +732,7 @@ db_create_event(THD *thd, event_timed *et, my_bool create_if_not,
}
}
DBUG_PRINT
(
"info"
,
(
"check existance of an event with the same name"
));
DBUG_PRINT
(
"info"
,
(
"check existance of an event with the same name"
));
if
(
!
evex_db_find_event_aux
(
thd
,
et
->
dbname
,
et
->
name
,
et
->
definer
,
table
))
if
(
!
evex_db_find_event_aux
(
thd
,
et
,
table
))
{
{
if
(
create_if_not
)
if
(
create_if_not
)
{
{
...
@@ -848,7 +870,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
...
@@ -848,7 +870,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
goto
err
;
goto
err
;
}
}
if
(
!
evex_db_find_event_
aux
(
thd
,
new_name
->
m_db
,
new_name
->
m_name
,
if
(
!
evex_db_find_event_
by_name
(
thd
,
new_name
->
m_db
,
new_name
->
m_name
,
et
->
definer
,
table
))
et
->
definer
,
table
))
{
{
my_error
(
ER_EVENT_ALREADY_EXISTS
,
MYF
(
0
),
new_name
->
m_name
.
str
);
my_error
(
ER_EVENT_ALREADY_EXISTS
,
MYF
(
0
),
new_name
->
m_name
.
str
);
...
@@ -861,8 +883,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
...
@@ -861,8 +883,7 @@ db_update_event(THD *thd, event_timed *et, sp_name *new_name)
overwrite the key and SE will tell us that it cannot find the already found
overwrite the key and SE will tell us that it cannot find the already found
row (copied into record[1] later
row (copied into record[1] later
*/
*/
if
(
EVEX_KEY_NOT_FOUND
==
evex_db_find_event_aux
(
thd
,
et
->
dbname
,
et
->
name
,
if
(
EVEX_KEY_NOT_FOUND
==
evex_db_find_event_aux
(
thd
,
et
,
table
))
et
->
definer
,
table
))
{
{
my_error
(
ER_EVENT_DOES_NOT_EXIST
,
MYF
(
0
),
et
->
name
.
str
);
my_error
(
ER_EVENT_DOES_NOT_EXIST
,
MYF
(
0
),
et
->
name
.
str
);
goto
err
;
goto
err
;
...
@@ -943,8 +964,8 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, event_timed **ett,
...
@@ -943,8 +964,8 @@ db_find_event(THD *thd, sp_name *name, LEX_STRING *definer, event_timed **ett,
goto
done
;
goto
done
;
}
}
if
((
ret
=
evex_db_find_event_
aux
(
thd
,
name
->
m_db
,
name
->
m_name
,
*
definer
,
if
((
ret
=
evex_db_find_event_
by_name
(
thd
,
name
->
m_db
,
name
->
m_name
,
*
definer
,
table
)))
table
)))
{
{
my_error
(
ER_EVENT_DOES_NOT_EXIST
,
MYF
(
0
),
name
->
m_name
.
str
);
my_error
(
ER_EVENT_DOES_NOT_EXIST
,
MYF
(
0
),
name
->
m_name
.
str
);
goto
done
;
goto
done
;
...
@@ -1089,7 +1110,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock,
...
@@ -1089,7 +1110,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock,
if
(
!
sortcmp_lex_string
(
*
name
,
et
->
name
,
system_charset_info
)
&&
if
(
!
sortcmp_lex_string
(
*
name
,
et
->
name
,
system_charset_info
)
&&
!
sortcmp_lex_string
(
*
db
,
et
->
dbname
,
system_charset_info
))
!
sortcmp_lex_string
(
*
db
,
et
->
dbname
,
system_charset_info
))
{
{
if
(
!
et
->
is_running
())
if
(
et
->
can_spawn_now
())
{
{
DBUG_PRINT
(
"evex_remove_from_cache"
,
(
"not running - free and delete"
));
DBUG_PRINT
(
"evex_remove_from_cache"
,
(
"not running - free and delete"
));
et
->
free_sp
();
et
->
free_sp
();
...
@@ -1393,3 +1414,187 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer)
...
@@ -1393,3 +1414,187 @@ evex_show_create_event(THD *thd, sp_name *spn, LEX_STRING definer)
DBUG_RETURN
(
ret
);
DBUG_RETURN
(
ret
);
}
}
/*
evex_drop_db_events - Drops all events in the selected database
thd - Thread
db - ASCIIZ the name of the database
Returns:
0 - OK
1 - Failed to delete a specific row
2 - Got NULL while reading db name from a row
Note:
The algo is the following
1. Go through the in-memory cache, if the scheduler is working
and for every event whose dbname matches the database we drop
check whether is currently in execution:
- event_timed::can_spawn() returns true -> the event is not
being executed in a child thread. The reason not to use
event_timed::is_running() is that the latter shows only if
it is being executed, which is 99% of the time in the thread
but there are some initiliazations before and after the
anonymous SP is being called. So if we delete in this moment
-=> *boom*, so we have to check whether the thread has been
spawned and can_spawn() is the right method.
- event_timed::can_spawn() returns false -> being runned ATM
just set the flags so it should drop itself.
*/
int
evex_drop_db_events
(
THD
*
thd
,
char
*
db
)
{
TABLE
*
table
;
READ_RECORD
read_record_info
;
MYSQL_LOCK
*
lock
;
int
ret
=
0
;
int
i
;
LEX_STRING
db_lex
=
{
db
,
strlen
(
db
)};
DBUG_ENTER
(
"evex_drop_db_events"
);
DBUG_PRINT
(
"info"
,(
"dropping events from %s"
,
db
));
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
if
((
ret
=
evex_open_event_table
(
thd
,
TL_WRITE
,
&
table
)))
{
sql_print_error
(
"Table mysql.event is damaged."
);
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
DBUG_RETURN
(
SP_OPEN_TABLE_FAILED
);
}
DBUG_PRINT
(
"info"
,(
"%d elements in the queue"
,
evex_queue_num_elements
(
EVEX_EQ_NAME
)));
VOID
(
pthread_mutex_lock
(
&
LOCK_evex_running
));
if
(
!
evex_is_running
)
goto
skip_memory
;
for
(
i
=
0
;
i
<
evex_queue_num_elements
(
EVEX_EQ_NAME
);
++
i
)
{
event_timed
*
et
=
evex_queue_element
(
&
EVEX_EQ_NAME
,
i
,
event_timed
*
);
if
(
sortcmp_lex_string
(
et
->
dbname
,
db_lex
,
system_charset_info
))
continue
;
if
(
et
->
can_spawn_now_n_lock
(
thd
))
{
DBUG_PRINT
(
"info"
,(
"event %s not running - direct delete"
,
et
->
name
.
str
));
if
(
!
(
ret
=
evex_db_find_event_aux
(
thd
,
et
,
table
)))
{
DBUG_PRINT
(
"info"
,(
"event %s found on disk"
,
et
->
name
.
str
));
if
((
ret
=
table
->
file
->
ha_delete_row
(
table
->
record
[
0
])))
{
sql_print_error
(
"Error while deleting a row - dropping "
"a database. Skipping the rest."
);
my_error
(
ER_EVENT_DROP_FAILED
,
MYF
(
0
),
et
->
name
.
str
);
goto
end
;
}
DBUG_PRINT
(
"info"
,(
"deleted event [%s] num [%d]. Time to free mem"
,
et
->
name
.
str
,
i
));
}
else
if
(
ret
==
EVEX_KEY_NOT_FOUND
)
{
sql_print_error
(
"Expected to find event %s.%s of %s on disk-not there."
,
et
->
dbname
.
str
,
et
->
name
.
str
,
et
->
definer
.
str
);
}
et
->
free_sp
();
delete
et
;
et
=
0
;
/* no need to call et->spawn_unlock because we already cleaned et */
}
else
{
DBUG_PRINT
(
"info"
,(
"event %s is running. setting exec_no_more and dropped"
,
et
->
name
.
str
));
et
->
flags
|=
EVENT_EXEC_NO_MORE
;
et
->
dropped
=
TRUE
;
}
DBUG_PRINT
(
"info"
,(
"%d elements in the queue"
,
evex_queue_num_elements
(
EVEX_EQ_NAME
)));
evex_queue_delete_element
(
&
EVEX_EQ_NAME
,
i
);
// 1 is top
DBUG_PRINT
(
"info"
,(
"%d elements in the queue"
,
evex_queue_num_elements
(
EVEX_EQ_NAME
)));
/*
decrease so we start at the same position, there will be
less elements in the queue, it will still be ordered so on
next iteration it will be again i the current element or if
no more we finish.
*/
--
i
;
}
skip_memory:
/*
The reasoning behind having two loops is the following:
If there was only one loop, the table-scan, then for every element which
matches, the queue in memory has to be searched to remove the element.
While if we go first over the queue and remove what's in there we have only
one pass over it and after finishing it, moving to table-scan for the disabled
events. This needs quite less time and means quite less locking on
LOCK_event_arrays.
*/
DBUG_PRINT
(
"info"
,(
"Mem-cache checked, now going to db for disabled events"
));
/* only enabled events are in memory, so we go now and delete the rest */
init_read_record
(
&
read_record_info
,
thd
,
table
,
NULL
,
1
,
0
);
while
(
!
(
read_record_info
.
read_record
(
&
read_record_info
))
&&
!
ret
)
{
char
*
et_db
;
if
((
et_db
=
get_field
(
thd
->
mem_root
,
table
->
field
[
EVEX_FIELD_DB
]))
==
NULL
)
{
ret
=
2
;
break
;
}
LEX_STRING
et_db_lex
=
{
et_db
,
strlen
(
et_db
)};
if
(
!
sortcmp_lex_string
(
et_db_lex
,
db_lex
,
system_charset_info
))
{
event_timed
ett
;
char
*
ptr
;
if
((
ptr
=
get_field
(
thd
->
mem_root
,
table
->
field
[
EVEX_FIELD_STATUS
]))
==
NullS
)
{
sql_print_error
(
"Error while loading from mysql.event. "
"Table probably corrupted"
);
goto
end
;
}
/*
When not running nothing is in memory so we have to clean
everything.
We don't delete EVENT_ENABLED events when the scheduler is running
because maybe this is an event which we asked to drop itself when
it is finished and it hasn't finished yet, so we don't touch it.
It will drop itself. The not running ENABLED events has been already
deleted from ha_delete_row() above in the loop over the QUEUE
(in case the executor is running).
'D' stands for DISABLED, 'E' for ENABLED - it's an enum
*/
if
((
evex_is_running
&&
ptr
[
0
]
==
'D'
)
||
!
evex_is_running
)
{
DBUG_PRINT
(
"info"
,
(
"Dropping %s.%s"
,
et_db
,
ett
.
name
.
str
));
if
((
ret
=
table
->
file
->
ha_delete_row
(
table
->
record
[
0
])))
{
my_error
(
ER_EVENT_DROP_FAILED
,
MYF
(
0
),
ett
.
name
.
str
);
goto
end
;
}
}
}
}
DBUG_PRINT
(
"info"
,(
"Disk checked for disabled events. Finishing."
));
end:
VOID
(
pthread_mutex_unlock
(
&
LOCK_evex_running
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
end_read_record
(
&
read_record_info
);
thd
->
version
--
;
// Force close to free memory
close_thread_tables
(
thd
);
DBUG_RETURN
(
ret
);
}
sql/event.h
View file @
95e87f08
...
@@ -79,6 +79,8 @@ class event_timed
...
@@ -79,6 +79,8 @@ class event_timed
{
{
event_timed
(
const
event_timed
&
);
/* Prevent use of these */
event_timed
(
const
event_timed
&
);
/* Prevent use of these */
void
operator
=
(
event_timed
&
);
void
operator
=
(
event_timed
&
);
my_bool
in_spawned_thread
;
ulong
locked_by_thread_id
;
my_bool
running
;
my_bool
running
;
pthread_mutex_t
LOCK_running
;
pthread_mutex_t
LOCK_running
;
...
@@ -117,13 +119,14 @@ public:
...
@@ -117,13 +119,14 @@ public:
bool
free_sphead_on_delete
;
bool
free_sphead_on_delete
;
uint
flags
;
//all kind of purposes
uint
flags
;
//all kind of purposes
event_timed
()
:
running
(
0
),
status_changed
(
false
),
last_executed_changed
(
false
),
event_timed
()
:
in_spawned_thread
(
0
),
locked_by_thread_id
(
0
),
expression
(
0
),
created
(
0
),
modified
(
0
),
running
(
0
),
status_changed
(
false
),
on_completion
(
MYSQL_EVENT_ON_COMPLETION_DROP
),
last_executed_changed
(
false
),
expression
(
0
),
created
(
0
),
status
(
MYSQL_EVENT_ENABLED
),
sphead
(
0
),
sql_mode
(
0
),
modified
(
0
),
on_completion
(
MYSQL_EVENT_ON_COMPLETION_DROP
),
body_begin
(
0
),
dropped
(
false
),
free_sphead_on_delete
(
true
),
status
(
MYSQL_EVENT_ENABLED
),
sphead
(
0
),
sql_mode
(
0
),
flags
(
0
)
body_begin
(
0
),
dropped
(
false
),
free_sphead_on_delete
(
true
),
flags
(
0
)
{
{
pthread_mutex_init
(
&
this
->
LOCK_running
,
MY_MUTEX_INIT_FAST
);
pthread_mutex_init
(
&
this
->
LOCK_running
,
MY_MUTEX_INIT_FAST
);
init
();
init
();
...
@@ -200,7 +203,44 @@ public:
...
@@ -200,7 +203,44 @@ public:
return
ret
;
return
ret
;
}
}
void
free_sp
()
/*
Checks whether the object is being used in a spawned thread.
This method is for very basic checking. Use ::can_spawn_now_n_lock()
for most of the cases.
*/
my_bool
can_spawn_now
()
{
my_bool
ret
;
VOID
(
pthread_mutex_lock
(
&
this
->
LOCK_running
));
ret
=
!
in_spawned_thread
;
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
return
ret
;
}
/*
Checks whether this thread can lock the object for modification ->
preventing being spawned for execution, and locks if possible.
use ::can_spawn_now() only for basic checking because a race
condition may occur between the check and eventual modification (deletion)
of the object.
*/
my_bool
can_spawn_now_n_lock
(
THD
*
thd
);
int
spawn_unlock
(
THD
*
thd
);
int
spawn_now
(
void
*
(
*
thread_func
)(
void
*
));
void
spawn_thread_finish
(
THD
*
thd
);
void
free_sp
()
{
{
delete
sphead
;
delete
sphead
;
sphead
=
0
;
sphead
=
0
;
...
@@ -239,6 +279,10 @@ event_reconstruct_interval_expression(String *buf,
...
@@ -239,6 +279,10 @@ event_reconstruct_interval_expression(String *buf,
interval_type
interval
,
interval_type
interval
,
longlong
expression
);
longlong
expression
);
int
evex_drop_db_events
(
THD
*
thd
,
char
*
db
);
int
int
init_events
();
init_events
();
...
...
sql/event_executor.cc
View file @
95e87f08
...
@@ -18,6 +18,11 @@
...
@@ -18,6 +18,11 @@
#include "event.h"
#include "event.h"
#include "sp.h"
#include "sp.h"
#define WAIT_STATUS_READY 0
#define WAIT_STATUS_EMPTY_QUEUE 1
#define WAIT_STATUS_NEW_TOP_EVENT 2
#define WAIT_STATUS_STOP_EXECUTOR 3
/*
/*
Make this define DBUG_FAULTY_THR to be able to put breakpoints inside
Make this define DBUG_FAULTY_THR to be able to put breakpoints inside
...
@@ -294,6 +299,85 @@ init_event_thread(THD* thd)
...
@@ -294,6 +299,85 @@ init_event_thread(THD* thd)
}
}
/*
This function waits till the time next event in the queue should be
executed.
Returns
WAIT_STATUS_READY There is an event to be executed right now
WAIT_STATUS_EMPTY_QUEUE No events or the last event was dropped.
WAIT_STATUS_NEW_TOP_EVENT New event has entered the queue and scheduled
on top. Restart ticking.
WAIT_STATUS_STOP_EXECUTOR The thread was killed or SET global event_scheduler=0;
*/
static
int
executor_wait_till_next_event_exec
(
THD
*
thd
)
{
event_timed
*
et
;
TIME
time_now
;
int
t2sleep
;
DBUG_ENTER
(
"executor_wait_till_next_event_exec"
);
/*
now let's see how much time to sleep, we know there is at least 1
element in the queue.
*/
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
if
(
!
evex_queue_num_elements
(
EVEX_EQ_NAME
))
{
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
DBUG_RETURN
(
1
);
}
et
=
evex_queue_first_element
(
&
EVEX_EQ_NAME
,
event_timed
*
);
DBUG_ASSERT
(
et
);
if
(
et
->
status
==
MYSQL_EVENT_DISABLED
)
{
DBUG_PRINT
(
"evex main thread"
,(
"Now it is disabled-exec no more"
));
if
(
et
->
dropped
)
et
->
drop
(
thd
);
delete
et
;
evex_queue_delete_element
(
&
EVEX_EQ_NAME
,
1
);
// 1 is top
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
sql_print_information
(
"Event found disabled, dropping."
);
DBUG_RETURN
(
1
);
}
DBUG_PRINT
(
"evex main thread"
,(
"computing time to sleep till next exec"
));
// set the internal clock of thd
thd
->
end_time
();
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
thd
->
query_start
());
t2sleep
=
evex_time_diff
(
&
et
->
execute_at
,
&
time_now
);
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
DBUG_PRINT
(
"evex main thread"
,(
"unlocked LOCK_event_arrays"
));
if
(
t2sleep
>
0
)
{
/*
We sleep t2sleep seconds but we check every second whether this thread
has been killed, or there is a new candidate
*/
while
(
t2sleep
--
&&
!
thd
->
killed
&&
event_executor_running_global_var
&&
evex_queue_num_elements
(
EVEX_EQ_NAME
)
&&
(
evex_queue_first_element
(
&
EVEX_EQ_NAME
,
event_timed
*
)
==
et
))
{
DBUG_PRINT
(
"evex main thread"
,(
"will sleep a bit more"
));
my_sleep
(
1000000
);
}
}
int
ret
=
0
;
if
(
!
evex_queue_num_elements
(
EVEX_EQ_NAME
))
ret
=
1
;
else
if
(
evex_queue_first_element
(
&
EVEX_EQ_NAME
,
event_timed
*
)
!=
et
)
ret
=
2
;
if
(
thd
->
killed
&&
event_executor_running_global_var
)
ret
=
3
;
DBUG_RETURN
(
ret
);
}
/*
/*
The main scheduler thread. Inits the priority queue on start and
The main scheduler thread. Inits the priority queue on start and
destroys it on thread shutdown. Forks child threads for every event
destroys it on thread shutdown. Forks child threads for every event
...
@@ -313,9 +397,9 @@ pthread_handler_t
...
@@ -313,9 +397,9 @@ pthread_handler_t
event_executor_main
(
void
*
arg
)
event_executor_main
(
void
*
arg
)
{
{
THD
*
thd
;
/* needs to be first for thread_stack */
THD
*
thd
;
/* needs to be first for thread_stack */
ulonglong
iter_num
=
0
;
uint
i
=
0
,
j
=
0
;
uint
i
=
0
,
j
=
0
;
my_ulonglong
cnt
=
0
;
my_ulonglong
cnt
=
0
;
TIME
time_now
;
DBUG_ENTER
(
"event_executor_main"
);
DBUG_ENTER
(
"event_executor_main"
);
DBUG_PRINT
(
"event_executor_main"
,
(
"EVEX thread started"
));
DBUG_PRINT
(
"event_executor_main"
,
(
"EVEX thread started"
));
...
@@ -330,15 +414,16 @@ event_executor_main(void *arg)
...
@@ -330,15 +414,16 @@ event_executor_main(void *arg)
if
(
sizeof
(
my_time_t
)
!=
sizeof
(
time_t
))
if
(
sizeof
(
my_time_t
)
!=
sizeof
(
time_t
))
{
{
sql_print_error
(
"sizeof(my_time_t) != sizeof(time_t) ."
sql_print_error
(
"
SCHEDULER:
sizeof(my_time_t) != sizeof(time_t) ."
"The scheduler will not work correctly. Stopping."
);
"The scheduler will not work correctly. Stopping."
);
DBUG_ASSERT
(
0
);
goto
err_no_thd
;
goto
err_no_thd
;
}
}
//TODO Andrey: Check for NULL
//TODO Andrey: Check for NULL
if
(
!
(
thd
=
new
THD
))
// note that contructor of THD uses DBUG_ !
if
(
!
(
thd
=
new
THD
))
// note that contructor of THD uses DBUG_ !
{
{
sql_print_error
(
"
Cannot create THD for event_executor_main
"
);
sql_print_error
(
"
SCHEDULER: Cannot create THD for the main thread.
"
);
goto
err_no_thd
;
goto
err_no_thd
;
}
}
thd
->
thread_stack
=
(
char
*
)
&
thd
;
// remember where our stack is
thd
->
thread_stack
=
(
char
*
)
&
thd
;
// remember where our stack is
...
@@ -346,7 +431,7 @@ event_executor_main(void *arg)
...
@@ -346,7 +431,7 @@ event_executor_main(void *arg)
pthread_detach_this_thread
();
pthread_detach_this_thread
();
if
(
init_event_thread
(
thd
))
if
(
init_event_thread
(
thd
))
goto
err
;
goto
finish
;
/*
/*
make this thread visible it has no vio -> show processlist won't see it
make this thread visible it has no vio -> show processlist won't see it
...
@@ -360,7 +445,7 @@ event_executor_main(void *arg)
...
@@ -360,7 +445,7 @@ event_executor_main(void *arg)
thread_running
++
;
thread_running
++
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
DBUG_PRINT
(
"EVEX main thread"
,
(
"Initing events_queue
y
"
));
DBUG_PRINT
(
"EVEX main thread"
,
(
"Initing events_queue"
));
/*
/*
eventually manifest that we are running, not to crashe because of
eventually manifest that we are running, not to crashe because of
...
@@ -376,15 +461,14 @@ event_executor_main(void *arg)
...
@@ -376,15 +461,14 @@ event_executor_main(void *arg)
thd
->
security_ctx
->
user
=
my_strdup
(
"event_scheduler"
,
MYF
(
0
));
thd
->
security_ctx
->
user
=
my_strdup
(
"event_scheduler"
,
MYF
(
0
));
if
(
evex_load_events_from_db
(
thd
))
if
(
evex_load_events_from_db
(
thd
))
goto
err
;
goto
finish
;
evex_main_thread_id
=
thd
->
thread_id
;
evex_main_thread_id
=
thd
->
thread_id
;
sql_print_information
(
"S
cheduler
thread started"
);
sql_print_information
(
"S
CHEDULER: Main
thread started"
);
while
(
!
thd
->
killed
)
while
(
!
thd
->
killed
)
{
{
TIME
time_now
;
TIME
time_now
;
my_time_t
now
;
event_timed
*
et
;
event_timed
*
et
;
cnt
++
;
cnt
++
;
...
@@ -393,7 +477,7 @@ event_executor_main(void *arg)
...
@@ -393,7 +477,7 @@ event_executor_main(void *arg)
thd
->
proc_info
=
"Sleeping"
;
thd
->
proc_info
=
"Sleeping"
;
if
(
!
event_executor_running_global_var
)
if
(
!
event_executor_running_global_var
)
{
{
sql_print_information
(
"S
cheduler a
sked to stop."
);
sql_print_information
(
"S
CHEDULER: A
sked to stop."
);
break
;
break
;
}
}
...
@@ -402,62 +486,31 @@ event_executor_main(void *arg)
...
@@ -402,62 +486,31 @@ event_executor_main(void *arg)
my_sleep
(
1000000
);
// sleep 1s
my_sleep
(
1000000
);
// sleep 1s
continue
;
continue
;
}
}
{
restart_ticking:
int
t2sleep
;
switch
(
executor_wait_till_next_event_exec
(
thd
))
{
/*
case
WAIT_STATUS_READY
:
// time to execute the event on top
now let's see how much time to sleep, we know there is at least 1
DBUG_PRINT
(
"evex main thread"
,(
"time to execute an event"
));
element in the queue.
break
;
*/
case
WAIT_STATUS_EMPTY_QUEUE
:
// no more events
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
DBUG_PRINT
(
"evex main thread"
,(
"no more events"
));
if
(
!
evex_queue_num_elements
(
EVEX_EQ_NAME
))
continue
;
{
break
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
case
WAIT_STATUS_NEW_TOP_EVENT
:
// new event on top in the queue
continue
;
DBUG_PRINT
(
"evex main thread"
,(
"restart ticking"
));
}
goto
restart_ticking
;
et
=
evex_queue_first_element
(
&
EVEX_EQ_NAME
,
event_timed
*
);
case
WAIT_STATUS_STOP_EXECUTOR
:
if
(
et
->
status
==
MYSQL_EVENT_DISABLED
)
sql_print_information
(
"SCHEDULER: Asked to stop."
);
{
goto
finish
;
DBUG_PRINT
(
"evex main thread"
,(
"Now it is disabled-exec no more"
));
break
;
if
(
et
->
dropped
)
default:
et
->
drop
(
thd
);
DBUG_ASSERT
(
0
);
delete
et
;
evex_queue_delete_element
(
&
EVEX_EQ_NAME
,
1
);
// 1 is top
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
sql_print_information
(
"Event found disabled, dropping."
);
continue
;
}
DBUG_PRINT
(
"evex main thread"
,(
"computing time to sleep till next exec"
));
time
((
time_t
*
)
&
now
);
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
now
);
t2sleep
=
evex_time_diff
(
&
et
->
execute_at
,
&
time_now
);
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
DBUG_PRINT
(
"evex main thread"
,(
"unlocked LOCK_event_arrays"
));
if
(
t2sleep
>
0
)
{
/*
We sleep t2sleep seconds but we check every second whether this thread
has been killed, or there is a new candidate
*/
while
(
t2sleep
--
&&
!
thd
->
killed
&&
event_executor_running_global_var
&&
evex_queue_num_elements
(
EVEX_EQ_NAME
)
&&
(
evex_queue_first_element
(
&
EVEX_EQ_NAME
,
event_timed
*
)
==
et
))
{
DBUG_PRINT
(
"evex main thread"
,(
"will sleep a bit more"
));
my_sleep
(
1000000
);
}
}
if
(
!
event_executor_running_global_var
)
{
sql_print_information
(
"Scheduler asked to stop."
);
break
;
}
}
}
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
thd
->
end_time
();
my_tz_UTC
->
gmt_sec_to_TIME
(
&
time_now
,
thd
->
query_start
());
if
(
!
evex_queue_num_elements
(
EVEX_EQ_NAME
))
if
(
!
evex_queue_num_elements
(
EVEX_EQ_NAME
))
{
{
...
@@ -479,14 +532,14 @@ event_executor_main(void *arg)
...
@@ -479,14 +532,14 @@ event_executor_main(void *arg)
DBUG_PRINT
(
"evex main thread"
,(
"it's right time"
));
DBUG_PRINT
(
"evex main thread"
,(
"it's right time"
));
if
(
et
->
status
==
MYSQL_EVENT_ENABLED
)
if
(
et
->
status
==
MYSQL_EVENT_ENABLED
)
{
{
pthread_t
th
;
int
fork_ret_code
;
DBUG_PRINT
(
"evex main thread"
,
(
"[%10s] this exec at [%llu]"
,
et
->
name
.
str
,
DBUG_PRINT
(
"evex main thread"
,
(
"[%10s] this exec at [%llu]"
,
et
->
name
.
str
,
TIME_to_ulonglong_datetime
(
&
et
->
execute_at
)));
TIME_to_ulonglong_datetime
(
&
et
->
execute_at
)));
et
->
mark_last_executed
(
thd
);
et
->
mark_last_executed
(
thd
);
if
(
et
->
compute_next_execution_time
())
if
(
et
->
compute_next_execution_time
())
{
{
sql_print_error
(
"Error while computing time of %s.%s . "
sql_print_error
(
"
SCHEDULER:
Error while computing time of %s.%s . "
"Disabling after execution."
,
"Disabling after execution."
,
et
->
dbname
.
str
,
et
->
name
.
str
);
et
->
dbname
.
str
,
et
->
name
.
str
);
et
->
status
=
MYSQL_EVENT_DISABLED
;
et
->
status
=
MYSQL_EVENT_DISABLED
;
...
@@ -495,13 +548,23 @@ event_executor_main(void *arg)
...
@@ -495,13 +548,23 @@ event_executor_main(void *arg)
TIME_to_ulonglong_datetime
(
&
et
->
execute_at
)));
TIME_to_ulonglong_datetime
(
&
et
->
execute_at
)));
et
->
update_fields
(
thd
);
et
->
update_fields
(
thd
);
++
iter_num
;
DBUG_PRINT
(
"info"
,
(
" Spawning a thread %d"
,
iter_num
));
#ifndef DBUG_FAULTY_THR
#ifndef DBUG_FAULTY_THR
if
(
pthread_create
(
&
th
,
&
connection_attrib
,
event_executor_worker
,(
void
*
)
et
))
thread_safe_increment
(
workers_count
,
&
LOCK_workers_count
);
{
switch
((
fork_ret_code
=
et
->
spawn_now
(
event_executor_worker
)))
{
sql_print_error
(
"Problem while trying to create a thread"
);
case
EVENT_EXEC_CANT_FORK
:
UNLOCK_MUTEX_AND_BAIL_OUT
(
LOCK_event_arrays
,
err
);
thread_safe_decrement
(
workers_count
,
&
LOCK_workers_count
);
sql_print_error
(
"SCHEDULER: Problem while trying to create a thread"
);
UNLOCK_MUTEX_AND_BAIL_OUT
(
LOCK_event_arrays
,
finish
);
case
EVENT_EXEC_ALREADY_EXEC
:
thread_safe_decrement
(
workers_count
,
&
LOCK_workers_count
);
sql_print_information
(
"SCHEDULER: %s.%s in execution. Skip this time."
,
et
->
dbname
.
str
,
et
->
name
.
str
);
break
;
default:
DBUG_ASSERT
(
!
fork_ret_code
);
if
(
fork_ret_code
)
thread_safe_decrement
(
workers_count
,
&
LOCK_workers_count
);
break
;
}
}
#else
#else
event_executor_worker
((
void
*
)
et
);
event_executor_worker
((
void
*
)
et
);
...
@@ -511,22 +574,21 @@ event_executor_main(void *arg)
...
@@ -511,22 +574,21 @@ event_executor_main(void *arg)
et
->
flags
|=
EVENT_EXEC_NO_MORE
;
et
->
flags
|=
EVENT_EXEC_NO_MORE
;
if
((
et
->
flags
&
EVENT_EXEC_NO_MORE
)
||
et
->
status
==
MYSQL_EVENT_DISABLED
)
if
((
et
->
flags
&
EVENT_EXEC_NO_MORE
)
||
et
->
status
==
MYSQL_EVENT_DISABLED
)
evex_queue_delete_element
(
&
EVEX_EQ_NAME
,
1
);
// 1 is top
evex_queue_delete_element
(
&
EVEX_EQ_NAME
,
0
);
// 0 is top, internally 1
else
else
evex_queue_first_updated
(
&
EVEX_EQ_NAME
);
evex_queue_first_updated
(
&
EVEX_EQ_NAME
);
}
}
DBUG_PRINT
(
"evex main thread"
,(
"unlocking"
));
DBUG_PRINT
(
"evex main thread"
,(
"unlocking"
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
}
// while
}
// while
finish:
err:
// First manifest that this thread does not work and then destroy
// First manifest that this thread does not work and then destroy
VOID
(
pthread_mutex_lock
(
&
LOCK_evex_running
));
VOID
(
pthread_mutex_lock
(
&
LOCK_evex_running
));
evex_is_running
=
false
;
evex_is_running
=
false
;
evex_main_thread_id
=
0
;
evex_main_thread_id
=
0
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_evex_running
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_evex_running
));
sql_print_information
(
"Event scheduler stopping. Waiting for worker threads to finish."
);
/*
/*
TODO: A better will be with a conditional variable
TODO: A better will be with a conditional variable
...
@@ -535,21 +597,33 @@ err:
...
@@ -535,21 +597,33 @@ err:
Read workers_count without lock, no need for locking.
Read workers_count without lock, no need for locking.
In the worst case we have to wait 1sec more.
In the worst case we have to wait 1sec more.
*/
*/
while
(
workers_count
)
sql_print_information
(
"SCHEDULER: Stopping. Waiting for worker threads to finish."
);
my_sleep
(
1000000
);
// 1s
while
(
1
)
{
VOID
(
pthread_mutex_lock
(
&
LOCK_workers_count
));
if
(
!
workers_count
)
{
VOID
(
pthread_mutex_unlock
(
&
LOCK_workers_count
));
break
;
}
VOID
(
pthread_mutex_unlock
(
&
LOCK_workers_count
));
my_sleep
(
1000000
);
// 1s
}
/*
/*
LEX_STRINGs reside in the memory root and will be destroyed with it
.
First we free all objects ..
.
Hence no need of delete but only freeing of SP
Lock because a DROP DATABASE could be running in parallel and it locks on these
*/
*/
// First we free all objects ...
sql_print_information
(
"SCHEDULER: Emptying the queue."
);
VOID
(
pthread_mutex_lock
(
&
LOCK_event_arrays
));
for
(
i
=
0
;
i
<
evex_queue_num_elements
(
EVEX_EQ_NAME
);
++
i
)
for
(
i
=
0
;
i
<
evex_queue_num_elements
(
EVEX_EQ_NAME
);
++
i
)
{
{
event_timed
*
et
=
evex_queue_element
(
&
EVEX_EQ_NAME
,
i
,
event_timed
*
);
event_timed
*
et
=
evex_queue_element
(
&
EVEX_EQ_NAME
,
i
,
event_timed
*
);
et
->
free_sp
();
et
->
free_sp
();
delete
et
;
delete
et
;
}
}
// ... then we can thras the whole queue at once
VOID
(
pthread_mutex_unlock
(
&
LOCK_event_arrays
));
// ... then we can thrash the whole queue at once
evex_queue_destroy
(
&
EVEX_EQ_NAME
);
evex_queue_destroy
(
&
EVEX_EQ_NAME
);
thd
->
proc_info
=
"Clearing"
;
thd
->
proc_info
=
"Clearing"
;
...
@@ -573,7 +647,7 @@ err_no_thd:
...
@@ -573,7 +647,7 @@ err_no_thd:
VOID
(
pthread_mutex_unlock
(
&
LOCK_evex_running
));
VOID
(
pthread_mutex_unlock
(
&
LOCK_evex_running
));
free_root
(
&
evex_mem_root
,
MYF
(
0
));
free_root
(
&
evex_mem_root
,
MYF
(
0
));
sql_print_information
(
"
Event scheduler s
topped."
);
sql_print_information
(
"
SCHEDULER: S
topped."
);
#ifndef DBUG_FAULTY_THR
#ifndef DBUG_FAULTY_THR
my_thread_end
();
my_thread_end
();
...
@@ -600,9 +674,6 @@ event_executor_worker(void *event_void)
...
@@ -600,9 +674,6 @@ event_executor_worker(void *event_void)
MEM_ROOT
worker_mem_root
;
MEM_ROOT
worker_mem_root
;
DBUG_ENTER
(
"event_executor_worker"
);
DBUG_ENTER
(
"event_executor_worker"
);
VOID
(
pthread_mutex_lock
(
&
LOCK_workers_count
));
++
workers_count
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_workers_count
));
init_alloc_root
(
&
worker_mem_root
,
MEM_ROOT_BLOCK_SIZE
,
MEM_ROOT_PREALLOC
);
init_alloc_root
(
&
worker_mem_root
,
MEM_ROOT_BLOCK_SIZE
,
MEM_ROOT_PREALLOC
);
...
@@ -611,7 +682,7 @@ event_executor_worker(void *event_void)
...
@@ -611,7 +682,7 @@ event_executor_worker(void *event_void)
if
(
!
(
thd
=
new
THD
))
// note that contructor of THD uses DBUG_ !
if
(
!
(
thd
=
new
THD
))
// note that contructor of THD uses DBUG_ !
{
{
sql_print_error
(
"
Cannot create a THD structure in a scheduler worker thread
"
);
sql_print_error
(
"
SCHEDULER: Cannot create a THD structure in an worker.
"
);
goto
err_no_thd
;
goto
err_no_thd
;
}
}
thd
->
thread_stack
=
(
char
*
)
&
thd
;
// remember where our stack is
thd
->
thread_stack
=
(
char
*
)
&
thd
;
// remember where our stack is
...
@@ -653,14 +724,7 @@ event_executor_worker(void *event_void)
...
@@ -653,14 +724,7 @@ event_executor_worker(void *event_void)
event
->
dbname
.
str
,
event
->
name
.
str
,
event
->
dbname
.
str
,
event
->
name
.
str
,
event
->
definer
.
str
);
event
->
definer
.
str
);
}
}
if
((
event
->
flags
&
EVENT_EXEC_NO_MORE
)
||
event
->
status
==
MYSQL_EVENT_DISABLED
)
event
->
spawn_thread_finish
(
thd
);
{
DBUG_PRINT
(
"event_executor_worker"
,
(
"%s exec no more. to drop=%d"
,
event
->
name
.
str
,
event
->
dropped
));
if
(
event
->
dropped
)
event
->
drop
(
thd
);
delete
event
;
}
err:
err:
...
@@ -689,10 +753,7 @@ err:
...
@@ -689,10 +753,7 @@ err:
err_no_thd:
err_no_thd:
free_root
(
&
worker_mem_root
,
MYF
(
0
));
free_root
(
&
worker_mem_root
,
MYF
(
0
));
thread_safe_decrement
(
workers_count
,
&
LOCK_workers_count
);
VOID
(
pthread_mutex_lock
(
&
LOCK_workers_count
));
--
workers_count
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_workers_count
));
#ifndef DBUG_FAULTY_THR
#ifndef DBUG_FAULTY_THR
my_thread_end
();
my_thread_end
();
...
@@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd)
...
@@ -733,7 +794,7 @@ evex_load_events_from_db(THD *thd)
if
((
ret
=
evex_open_event_table
(
thd
,
TL_READ
,
&
table
)))
if
((
ret
=
evex_open_event_table
(
thd
,
TL_READ
,
&
table
)))
{
{
sql_print_error
(
"
Table mysql.event is damaged
."
);
sql_print_error
(
"
SCHEDULER: Table mysql.event is damaged. Can not open
."
);
DBUG_RETURN
(
SP_OPEN_TABLE_FAILED
);
DBUG_RETURN
(
SP_OPEN_TABLE_FAILED
);
}
}
...
@@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd)
...
@@ -753,7 +814,7 @@ evex_load_events_from_db(THD *thd)
if
((
ret
=
et
->
load_from_row
(
&
evex_mem_root
,
table
)))
if
((
ret
=
et
->
load_from_row
(
&
evex_mem_root
,
table
)))
{
{
sql_print_error
(
"Error while loading from mysql.event. "
sql_print_error
(
"
SCHEDULER:
Error while loading from mysql.event. "
"Table probably corrupted"
);
"Table probably corrupted"
);
goto
end
;
goto
end
;
}
}
...
@@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd)
...
@@ -769,7 +830,7 @@ evex_load_events_from_db(THD *thd)
if
((
ret
=
et
->
compile
(
thd
,
&
evex_mem_root
)))
if
((
ret
=
et
->
compile
(
thd
,
&
evex_mem_root
)))
{
{
sql_print_error
(
"Error while compiling %s.%s. Aborting load."
,
sql_print_error
(
"
SCHEDULER:
Error while compiling %s.%s. Aborting load."
,
et
->
dbname
.
str
,
et
->
name
.
str
);
et
->
dbname
.
str
,
et
->
name
.
str
);
goto
end
;
goto
end
;
}
}
...
@@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd)
...
@@ -777,8 +838,8 @@ evex_load_events_from_db(THD *thd)
// let's find when to be executed
// let's find when to be executed
if
(
et
->
compute_next_execution_time
())
if
(
et
->
compute_next_execution_time
())
{
{
sql_print_error
(
"
Error while computing execution time of %s.%s. Skipping"
,
sql_print_error
(
"
SCHEDULER: Error while computing execution time of %s.%s."
et
->
dbname
.
str
,
et
->
name
.
str
);
" Skipping"
,
et
->
dbname
.
str
,
et
->
name
.
str
);
continue
;
continue
;
}
}
...
@@ -799,7 +860,7 @@ end:
...
@@ -799,7 +860,7 @@ end:
thd
->
version
--
;
// Force close to free memory
thd
->
version
--
;
// Force close to free memory
close_thread_tables
(
thd
);
close_thread_tables
(
thd
);
sql_print_information
(
"S
cheduler l
oaded %d event%s"
,
count
,
(
count
==
1
)
?
""
:
"s"
);
sql_print_information
(
"S
CHEDULER: L
oaded %d event%s"
,
count
,
(
count
==
1
)
?
""
:
"s"
);
DBUG_PRINT
(
"info"
,
(
"Status code %d. Loaded %d event(s)"
,
ret
,
count
));
DBUG_PRINT
(
"info"
,
(
"Status code %d. Loaded %d event(s)"
,
ret
,
count
));
DBUG_RETURN
(
ret
);
DBUG_RETURN
(
ret
);
...
...
sql/event_priv.h
View file @
95e87f08
...
@@ -19,6 +19,10 @@
...
@@ -19,6 +19,10 @@
#include "mysql_priv.h"
#include "mysql_priv.h"
#define EVENT_EXEC_STARTED 0
#define EVENT_EXEC_ALREADY_EXEC 1
#define EVENT_EXEC_CANT_FORK 2
#define EVEX_USE_QUEUE
#define EVEX_USE_QUEUE
#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
#define UNLOCK_MUTEX_AND_BAIL_OUT(__mutex, __label) \
...
@@ -32,10 +36,10 @@ int
...
@@ -32,10 +36,10 @@ int
my_time_compare
(
TIME
*
a
,
TIME
*
b
);
my_time_compare
(
TIME
*
a
,
TIME
*
b
);
int
int
evex_db_find_event_
aux
(
THD
*
thd
,
const
LEX_STRING
dbname
,
evex_db_find_event_
by_name
(
THD
*
thd
,
const
LEX_STRING
dbname
,
const
LEX_STRING
r
name
,
const
LEX_STRING
ev_
name
,
const
LEX_STRING
definer
,
const
LEX_STRING
user_name
,
TABLE
*
table
);
TABLE
*
table
);
int
int
event_timed_compare_q
(
void
*
vptr
,
byte
*
a
,
byte
*
b
);
event_timed_compare_q
(
void
*
vptr
,
byte
*
a
,
byte
*
b
);
...
...
sql/event_timed.cc
View file @
95e87f08
...
@@ -908,7 +908,7 @@ event_timed::drop(THD *thd)
...
@@ -908,7 +908,7 @@ event_timed::drop(THD *thd)
Saves status and last_executed_at to the disk if changed.
Saves status and last_executed_at to the disk if changed.
SYNOPSIS
SYNOPSIS
event_timed::
drop
()
event_timed::
update_fields
()
thd - thread context
thd - thread context
RETURN VALUE
RETURN VALUE
...
@@ -945,7 +945,7 @@ event_timed::update_fields(THD *thd)
...
@@ -945,7 +945,7 @@ event_timed::update_fields(THD *thd)
}
}
if
((
ret
=
evex_db_find_event_
aux
(
thd
,
dbname
,
name
,
definer
,
table
)))
if
((
ret
=
evex_db_find_event_
by_name
(
thd
,
dbname
,
name
,
definer
,
table
)))
goto
done
;
goto
done
;
store_record
(
table
,
record
[
1
]);
store_record
(
table
,
record
[
1
]);
...
@@ -1204,6 +1204,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
...
@@ -1204,6 +1204,7 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
MEM_ROOT
*
tmp_mem_root
=
0
;
MEM_ROOT
*
tmp_mem_root
=
0
;
LEX
*
old_lex
=
thd
->
lex
,
lex
;
LEX
*
old_lex
=
thd
->
lex
,
lex
;
char
*
old_db
;
char
*
old_db
;
int
old_db_length
;
event_timed
*
ett
;
event_timed
*
ett
;
sp_name
*
spn
;
sp_name
*
spn
;
char
*
old_query
;
char
*
old_query
;
...
@@ -1237,7 +1238,9 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
...
@@ -1237,7 +1238,9 @@ event_timed::compile(THD *thd, MEM_ROOT *mem_root)
old_query_len
=
thd
->
query_length
;
old_query_len
=
thd
->
query_length
;
old_query
=
thd
->
query
;
old_query
=
thd
->
query
;
old_db
=
thd
->
db
;
old_db
=
thd
->
db
;
old_db_length
=
thd
->
db_length
;
thd
->
db
=
dbname
.
str
;
thd
->
db
=
dbname
.
str
;
thd
->
db_length
=
dbname
.
length
;
get_create_event
(
thd
,
&
show_create
);
get_create_event
(
thd
,
&
show_create
);
...
@@ -1303,3 +1306,135 @@ done:
...
@@ -1303,3 +1306,135 @@ done:
DBUG_RETURN
(
ret
);
DBUG_RETURN
(
ret
);
}
}
/*
Checks whether this thread can lock the object for modification ->
preventing being spawned for execution, and locks if possible.
use ::can_spawn_now() only for basic checking because a race
condition may occur between the check and eventual modification (deletion)
of the object.
Returns
true - locked
false - cannot lock
*/
my_bool
event_timed
::
can_spawn_now_n_lock
(
THD
*
thd
)
{
my_bool
ret
=
FALSE
;
VOID
(
pthread_mutex_lock
(
&
this
->
LOCK_running
));
if
(
!
in_spawned_thread
)
{
in_spawned_thread
=
TRUE
;
ret
=
TRUE
;
locked_by_thread_id
=
thd
->
thread_id
;
}
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
return
ret
;
}
extern
pthread_attr_t
connection_attrib
;
/*
Checks whether is possible and forks a thread. Passes self as argument.
Returns
EVENT_EXEC_STARTED - OK
EVENT_EXEC_ALREADY_EXEC - Thread not forked, already working
EVENT_EXEC_CANT_FORK - Unable to spawn thread (error)
*/
int
event_timed
::
spawn_now
(
void
*
(
*
thread_func
)(
void
*
))
{
int
ret
=
EVENT_EXEC_STARTED
;
static
uint
exec_num
=
0
;
DBUG_ENTER
(
"event_timed::spawn_now"
);
DBUG_PRINT
(
"info"
,
(
"[%s.%s]"
,
dbname
.
str
,
name
.
str
));
VOID
(
pthread_mutex_lock
(
&
this
->
LOCK_running
));
if
(
!
in_spawned_thread
)
{
pthread_t
th
;
in_spawned_thread
=
true
;
if
(
pthread_create
(
&
th
,
&
connection_attrib
,
thread_func
,
(
void
*
)
this
))
{
DBUG_PRINT
(
"info"
,
(
"problem while spawning thread"
));
ret
=
EVENT_EXEC_CANT_FORK
;
in_spawned_thread
=
false
;
}
#ifndef DBUG_OFF
else
{
sql_print_information
(
"SCHEDULER: Started thread %d"
,
++
exec_num
);
DBUG_PRINT
(
"info"
,
(
"thread spawned"
));
}
#endif
}
else
{
DBUG_PRINT
(
"info"
,
(
"already in spawned thread. skipping"
));
ret
=
EVENT_EXEC_ALREADY_EXEC
;
}
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
DBUG_RETURN
(
ret
);
}
void
event_timed
::
spawn_thread_finish
(
THD
*
thd
)
{
DBUG_ENTER
(
"event_timed::spawn_thread_finish"
);
VOID
(
pthread_mutex_lock
(
&
this
->
LOCK_running
));
in_spawned_thread
=
false
;
if
((
flags
&
EVENT_EXEC_NO_MORE
)
||
status
==
MYSQL_EVENT_DISABLED
)
{
DBUG_PRINT
(
"info"
,
(
"%s exec no more. to drop=%d"
,
name
.
str
,
dropped
));
if
(
dropped
)
drop
(
thd
);
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
delete
this
;
DBUG_VOID_RETURN
;
}
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
DBUG_VOID_RETURN
;
}
/*
Unlocks the object after it has been locked with ::can_spawn_now_n_lock()
Returns
0 - ok
1 - not locked by this thread
*/
int
event_timed
::
spawn_unlock
(
THD
*
thd
)
{
int
ret
=
0
;
VOID
(
pthread_mutex_lock
(
&
this
->
LOCK_running
));
if
(
!
in_spawned_thread
)
{
if
(
locked_by_thread_id
==
thd
->
thread_id
)
{
in_spawned_thread
=
FALSE
;
locked_by_thread_id
=
0
;
}
else
{
sql_print_error
(
"A thread tries to unlock when he hasn't locked. "
"thread_id=%ld locked by %ld"
,
thd
->
thread_id
,
locked_by_thread_id
);
DBUG_ASSERT
(
0
);
ret
=
1
;
}
}
VOID
(
pthread_mutex_unlock
(
&
this
->
LOCK_running
));
return
ret
;
}
sql/sql_db.cc
View file @
95e87f08
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include "mysql_priv.h"
#include "mysql_priv.h"
#include <mysys_err.h>
#include <mysys_err.h>
#include "sp.h"
#include "sp.h"
#include "event.h"
#include <my_dir.h>
#include <my_dir.h>
#include <m_ctype.h>
#include <m_ctype.h>
#ifdef __WIN__
#ifdef __WIN__
...
@@ -870,6 +871,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
...
@@ -870,6 +871,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
exit:
exit:
(
void
)
sp_drop_db_routines
(
thd
,
db
);
/* QQ Ignore errors for now */
(
void
)
sp_drop_db_routines
(
thd
,
db
);
/* QQ Ignore errors for now */
(
void
)
evex_drop_db_events
(
thd
,
db
);
/* QQ Ignore errors for now */
start_waiting_global_read_lock
(
thd
);
start_waiting_global_read_lock
(
thd
);
/*
/*
If this database was the client's selected database, we silently change the
If this database was the client's selected database, we silently change the
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment