Commit a171e164 authored by Mikael Ronstrom's avatar Mikael Ronstrom

Merged in WL#5138

parents 543125e7 0c91c582
...@@ -42,15 +42,37 @@ ...@@ -42,15 +42,37 @@
#endif #endif
#ifndef MY_ATOMIC_NO_XADD #ifndef MY_ATOMIC_NO_XADD
#define make_atomic_add_body(S) \ #define make_atomic_add_body(S) make_atomic_add_body ## S
asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a)) #define make_atomic_cas_body(S) make_atomic_cas_body ## S
#endif #endif
#define make_atomic_fas_body(S) \
asm volatile ("xchg %0, %1;" : "+q" (v) , "+m" (*a)) #define make_atomic_add_body32 \
#define make_atomic_cas_body(S) \ asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
#define make_atomic_cas_body32 \
asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \ asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;" \
: "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set)) : "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set))
#define make_atomic_cas_bodyptr make_atomic_cas_body32
#ifndef __x86_64__
#define make_atomic_add_body64 make_atomic_add_body32
#define make_atomic_cas_body64 make_atomic_cas_body32
#else
#define make_atomic_add_body64 \
int64 tmp=*a; \
while (!my_atomic_cas64(a, &tmp, tmp+v)); \
v=tmp;
#define make_atomic_cas_body64 \
int32 ebx=(set & 0xFFFFFFFF), ecx=(set >> 32); \
asm volatile (LOCK_prefix "; cmpxchg8b %0; setz %2;" \
: "+m" (*a), "+A" (*cmp), "=q" (ret) \
:"b" (ebx), "c" (ecx))
#endif
#define make_atomic_fas_body(S) \
asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a))
#ifdef MY_ATOMIC_MODE_DUMMY #ifdef MY_ATOMIC_MODE_DUMMY
#define make_atomic_load_body(S) ret=*a #define make_atomic_load_body(S) ret=*a
#define make_atomic_store_body(S) *a=v #define make_atomic_store_body(S) *a=v
...@@ -66,5 +88,4 @@ ...@@ -66,5 +88,4 @@
#define make_atomic_store_body(S) \ #define make_atomic_store_body(S) \
asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v)) asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v))
#endif #endif
#endif /* ATOMIC_X86_GCC_INCLUDED */ #endif /* ATOMIC_X86_GCC_INCLUDED */
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
my_atomic_store#(&var, what) my_atomic_store#(&var, what)
store 'what' in *var store 'what' in *var
'#' is substituted by a size suffix - 8, 16, 32, or ptr '#' is substituted by a size suffix - 8, 16, 32, 64, or ptr
(e.g. my_atomic_add8, my_atomic_fas32, my_atomic_casptr). (e.g. my_atomic_add8, my_atomic_fas32, my_atomic_casptr).
NOTE This operations are not always atomic, so they always must be NOTE This operations are not always atomic, so they always must be
...@@ -129,6 +129,7 @@ ...@@ -129,6 +129,7 @@
make_transparent_unions(8) make_transparent_unions(8)
make_transparent_unions(16) make_transparent_unions(16)
make_transparent_unions(32) make_transparent_unions(32)
make_transparent_unions(64)
make_transparent_unions(ptr) make_transparent_unions(ptr)
#undef uintptr #undef uintptr
#undef make_transparent_unions #undef make_transparent_unions
...@@ -140,10 +141,12 @@ make_transparent_unions(ptr) ...@@ -140,10 +141,12 @@ make_transparent_unions(ptr)
#define U_8 int8 #define U_8 int8
#define U_16 int16 #define U_16 int16
#define U_32 int32 #define U_32 int32
#define U_64 int64
#define U_ptr intptr #define U_ptr intptr
#define Uv_8 int8 #define Uv_8 int8
#define Uv_16 int16 #define Uv_16 int16
#define Uv_32 int32 #define Uv_32 int32
#define Uv_64 int64
#define Uv_ptr intptr #define Uv_ptr intptr
#define U_a volatile *a #define U_a volatile *a
#define U_cmp *cmp #define U_cmp *cmp
...@@ -217,6 +220,7 @@ make_atomic_cas(8) ...@@ -217,6 +220,7 @@ make_atomic_cas(8)
make_atomic_cas(16) make_atomic_cas(16)
#endif #endif
make_atomic_cas(32) make_atomic_cas(32)
make_atomic_cas(64)
make_atomic_cas(ptr) make_atomic_cas(ptr)
#ifdef MY_ATOMIC_HAS_8_16 #ifdef MY_ATOMIC_HAS_8_16
...@@ -224,12 +228,14 @@ make_atomic_add(8) ...@@ -224,12 +228,14 @@ make_atomic_add(8)
make_atomic_add(16) make_atomic_add(16)
#endif #endif
make_atomic_add(32) make_atomic_add(32)
make_atomic_add(64)
#ifdef MY_ATOMIC_HAS_8_16 #ifdef MY_ATOMIC_HAS_8_16
make_atomic_load(8) make_atomic_load(8)
make_atomic_load(16) make_atomic_load(16)
#endif #endif
make_atomic_load(32) make_atomic_load(32)
make_atomic_load(64)
make_atomic_load(ptr) make_atomic_load(ptr)
#ifdef MY_ATOMIC_HAS_8_16 #ifdef MY_ATOMIC_HAS_8_16
...@@ -237,6 +243,7 @@ make_atomic_fas(8) ...@@ -237,6 +243,7 @@ make_atomic_fas(8)
make_atomic_fas(16) make_atomic_fas(16)
#endif #endif
make_atomic_fas(32) make_atomic_fas(32)
make_atomic_fas(64)
make_atomic_fas(ptr) make_atomic_fas(ptr)
#ifdef MY_ATOMIC_HAS_8_16 #ifdef MY_ATOMIC_HAS_8_16
...@@ -244,6 +251,7 @@ make_atomic_store(8) ...@@ -244,6 +251,7 @@ make_atomic_store(8)
make_atomic_store(16) make_atomic_store(16)
#endif #endif
make_atomic_store(32) make_atomic_store(32)
make_atomic_store(64)
make_atomic_store(ptr) make_atomic_store(ptr)
#ifdef _atomic_h_cleanup_ #ifdef _atomic_h_cleanup_
...@@ -254,10 +262,12 @@ make_atomic_store(ptr) ...@@ -254,10 +262,12 @@ make_atomic_store(ptr)
#undef U_8 #undef U_8
#undef U_16 #undef U_16
#undef U_32 #undef U_32
#undef U_64
#undef U_ptr #undef U_ptr
#undef Uv_8 #undef Uv_8
#undef Uv_16 #undef Uv_16
#undef Uv_32 #undef Uv_32
#undef Uv_64
#undef Uv_ptr #undef Uv_ptr
#undef a #undef a
#undef cmp #undef cmp
......
...@@ -866,6 +866,8 @@ typedef SOCKET_SIZE_TYPE size_socket; ...@@ -866,6 +866,8 @@ typedef SOCKET_SIZE_TYPE size_socket;
#endif #endif
#endif /* defined (HAVE_LONG_LONG) && !defined(ULONGLONG_MAX)*/ #endif /* defined (HAVE_LONG_LONG) && !defined(ULONGLONG_MAX)*/
#define INT_MIN64 (~0x7FFFFFFFFFFFFFFFLL)
#define INT_MAX64 0x7FFFFFFFFFFFFFFFLL
#define INT_MIN32 (~0x7FFFFFFFL) #define INT_MIN32 (~0x7FFFFFFFL)
#define INT_MAX32 0x7FFFFFFFL #define INT_MAX32 0x7FFFFFFFL
#define UINT_MAX32 0xFFFFFFFFL #define UINT_MAX32 0xFFFFFFFFL
......
...@@ -133,9 +133,8 @@ post_init_event_thread(THD *thd) ...@@ -133,9 +133,8 @@ post_init_event_thread(THD *thd)
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd); threads.append(thd);
thread_count++; thread_count++;
thread_running++; inc_thread_running();
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
return FALSE; return FALSE;
} }
...@@ -157,7 +156,7 @@ deinit_event_thread(THD *thd) ...@@ -157,7 +156,7 @@ deinit_event_thread(THD *thd)
DBUG_PRINT("exit", ("Event thread finishing")); DBUG_PRINT("exit", ("Event thread finishing"));
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thread_count--; thread_count--;
thread_running--; dec_thread_running();
delete thd; delete thd;
pthread_cond_broadcast(&COND_thread_count); pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
...@@ -418,7 +417,7 @@ Event_scheduler::start() ...@@ -418,7 +417,7 @@ Event_scheduler::start()
net_end(&new_thd->net); net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thread_count--; thread_count--;
thread_running--; dec_thread_running();
delete new_thd; delete new_thd;
pthread_cond_broadcast(&COND_thread_count); pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
...@@ -551,7 +550,7 @@ error: ...@@ -551,7 +550,7 @@ error:
net_end(&new_thd->net); net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count); pthread_mutex_lock(&LOCK_thread_count);
thread_count--; thread_count--;
thread_running--; dec_thread_running();
delete new_thd; delete new_thd;
pthread_cond_broadcast(&COND_thread_count); pthread_cond_broadcast(&COND_thread_count);
pthread_mutex_unlock(&LOCK_thread_count); pthread_mutex_unlock(&LOCK_thread_count);
......
...@@ -3055,10 +3055,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, ...@@ -3055,10 +3055,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
rpl_filter->db_ok(thd->db)) rpl_filter->db_ok(thd->db))
{ {
thd->set_time((time_t)when); thd->set_time((time_t)when);
thd->set_query((char*)query_arg, q_len_arg); thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
pthread_mutex_lock(&LOCK_thread_count);
thd->query_id = next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
thd->variables.pseudo_thread_id= thread_id; // for temp tables thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query())); DBUG_PRINT("query",("%s", thd->query()));
...@@ -4580,9 +4577,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, ...@@ -4580,9 +4577,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
if (rpl_filter->db_ok(thd->db)) if (rpl_filter->db_ok(thd->db))
{ {
thd->set_time((time_t)when); thd->set_time((time_t)when);
pthread_mutex_lock(&LOCK_thread_count); thd->set_query_id(next_query_id());
thd->query_id = next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
thd->warning_info->opt_clear_warning_info(thd->query_id); thd->warning_info->opt_clear_warning_info(thd->query_id);
TABLE_LIST tables; TABLE_LIST tables;
...@@ -8071,9 +8066,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) ...@@ -8071,9 +8066,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->sql_thd == thd);
/* Step the query id to mark what columns that are actually used. */ /* Step the query id to mark what columns that are actually used. */
pthread_mutex_lock(&LOCK_thread_count); thd->set_query_id(next_query_id());
thd->query_id= next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
if (!(memory= my_multi_malloc(MYF(MY_WME), if (!(memory= my_multi_malloc(MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST), &table_list, (uint) sizeof(RPL_TABLE_LIST),
......
...@@ -53,6 +53,7 @@ ...@@ -53,6 +53,7 @@
#include "sql_array.h" #include "sql_array.h"
#include "sql_plugin.h" #include "sql_plugin.h"
#include "scheduler.h" #include "scheduler.h"
#include <my_atomic.h>
class Parser_state; class Parser_state;
...@@ -85,11 +86,60 @@ typedef ulong nesting_map; /* Used for flags of nesting constructs */ ...@@ -85,11 +86,60 @@ typedef ulong nesting_map; /* Used for flags of nesting constructs */
typedef ulonglong nested_join_map; typedef ulonglong nested_join_map;
/* query_id */ /* query_id */
typedef ulonglong query_id_t; typedef int64 query_id_t;
extern query_id_t global_query_id; extern query_id_t global_query_id;
extern int32 thread_running;
extern my_atomic_rwlock_t global_query_id_lock;
extern my_atomic_rwlock_t thread_running_lock;
/* increment query_id and return it. */ /* increment query_id and return it. */
inline query_id_t next_query_id() { return global_query_id++; } inline query_id_t next_query_id()
{
query_id_t id;
my_atomic_rwlock_wrlock(&global_query_id_lock);
id= my_atomic_add64(&global_query_id, 1);
my_atomic_rwlock_wrunlock(&global_query_id_lock);
return (id+1);
}
inline query_id_t get_query_id()
{
query_id_t id;
my_atomic_rwlock_wrlock(&global_query_id_lock);
id= my_atomic_load64(&global_query_id);
my_atomic_rwlock_wrunlock(&global_query_id_lock);
return id;
}
inline int32
inc_thread_running()
{
int32 num_thread_running;
my_atomic_rwlock_wrlock(&thread_running_lock);
num_thread_running= my_atomic_add32(&thread_running, 1);
my_atomic_rwlock_wrunlock(&thread_running_lock);
return (num_thread_running+1);
}
inline int32
dec_thread_running()
{
int32 num_thread_running;
my_atomic_rwlock_wrlock(&thread_running_lock);
num_thread_running= my_atomic_add32(&thread_running, -1);
my_atomic_rwlock_wrunlock(&thread_running_lock);
return (num_thread_running-1);
}
inline int32
get_thread_running()
{
int32 num_thread_running;
my_atomic_rwlock_wrlock(&thread_running_lock);
num_thread_running= my_atomic_load32(&thread_running);
my_atomic_rwlock_wrunlock(&thread_running_lock);
return num_thread_running;
}
/* useful constants */ /* useful constants */
extern MYSQL_PLUGIN_IMPORT const key_map key_map_empty; extern MYSQL_PLUGIN_IMPORT const key_map key_map_empty;
...@@ -1930,7 +1980,7 @@ extern bool opt_ignore_builtin_innodb; ...@@ -1930,7 +1980,7 @@ extern bool opt_ignore_builtin_innodb;
extern my_bool opt_character_set_client_handshake; extern my_bool opt_character_set_client_handshake;
extern bool volatile abort_loop, shutdown_in_progress; extern bool volatile abort_loop, shutdown_in_progress;
extern bool in_bootstrap; extern bool in_bootstrap;
extern uint volatile thread_count, thread_running, global_read_lock; extern uint volatile thread_count, global_read_lock;
extern uint connection_count; extern uint connection_count;
extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types; extern my_bool opt_sql_bin_update, opt_safe_user_create, opt_no_mix_types;
extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap; extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap;
......
...@@ -527,7 +527,8 @@ uint mysqld_port_timeout; ...@@ -527,7 +527,8 @@ uint mysqld_port_timeout;
uint delay_key_write_options, protocol_version; uint delay_key_write_options, protocol_version;
uint lower_case_table_names; uint lower_case_table_names;
uint tc_heuristic_recover= 0; uint tc_heuristic_recover= 0;
uint volatile thread_count, thread_running; uint volatile thread_count;
int32 thread_running;
ulonglong thd_startup_options; ulonglong thd_startup_options;
ulong back_log, connect_timeout, concurrency, server_id; ulong back_log, connect_timeout, concurrency, server_id;
ulong table_cache_size, table_def_size; ulong table_cache_size, table_def_size;
...@@ -543,6 +544,8 @@ ulonglong max_binlog_cache_size=0; ...@@ -543,6 +544,8 @@ ulonglong max_binlog_cache_size=0;
ulong query_cache_size=0; ulong query_cache_size=0;
ulong refresh_version; /* Increments on each reload */ ulong refresh_version; /* Increments on each reload */
query_id_t global_query_id; query_id_t global_query_id;
my_atomic_rwlock_t global_query_id_lock;
my_atomic_rwlock_t thread_running_lock;
ulong aborted_threads, aborted_connects; ulong aborted_threads, aborted_connects;
ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size; ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use; ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
...@@ -1376,6 +1379,8 @@ void clean_up(bool print_message) ...@@ -1376,6 +1379,8 @@ void clean_up(bool print_message)
DBUG_PRINT("quit", ("Error messages freed")); DBUG_PRINT("quit", ("Error messages freed"));
/* Tell main we are ready */ /* Tell main we are ready */
logger.cleanup_end(); logger.cleanup_end();
my_atomic_rwlock_destroy(&global_query_id_lock);
my_atomic_rwlock_destroy(&thread_running_lock);
(void) pthread_mutex_lock(&LOCK_thread_count); (void) pthread_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("got thread count lock")); DBUG_PRINT("quit", ("got thread count lock"));
ready_to_exit=1; ready_to_exit=1;
...@@ -7783,6 +7788,8 @@ static int mysql_init_variables(void) ...@@ -7783,6 +7788,8 @@ static int mysql_init_variables(void)
what_to_log= ~ (1L << (uint) COM_TIME); what_to_log= ~ (1L << (uint) COM_TIME);
refresh_version= 1L; /* Increments on each reload */ refresh_version= 1L; /* Increments on each reload */
global_query_id= thread_id= 1L; global_query_id= thread_id= 1L;
my_atomic_rwlock_init(&global_query_id_lock);
my_atomic_rwlock_init(&thread_running_lock);
strmov(server_version, MYSQL_SERVER_VERSION); strmov(server_version, MYSQL_SERVER_VERSION);
myisam_recover_options_str= sql_mode_str= "OFF"; myisam_recover_options_str= sql_mode_str= "OFF";
myisam_stats_method_str= "nulls_unequal"; myisam_stats_method_str= "nulls_unequal";
......
...@@ -1338,7 +1338,7 @@ sp_head::execute(THD *thd) ...@@ -1338,7 +1338,7 @@ sp_head::execute(THD *thd)
/* To avoid wiping out thd->change_list on old_change_list destruction */ /* To avoid wiping out thd->change_list on old_change_list destruction */
old_change_list.empty(); old_change_list.empty();
thd->lex= old_lex; thd->lex= old_lex;
thd->query_id= old_query_id; thd->set_query_id(old_query_id);
DBUG_ASSERT(!thd->derived_tables); DBUG_ASSERT(!thd->derived_tables);
thd->derived_tables= old_derived_tables; thd->derived_tables= old_derived_tables;
thd->variables.sql_mode= save_sql_mode; thd->variables.sql_mode= save_sql_mode;
...@@ -2736,9 +2736,7 @@ sp_lex_keeper::reset_lex_and_exec_core(THD *thd, uint *nextp, ...@@ -2736,9 +2736,7 @@ sp_lex_keeper::reset_lex_and_exec_core(THD *thd, uint *nextp,
*/ */
thd->lex= m_lex; thd->lex= m_lex;
pthread_mutex_lock(&LOCK_thread_count); thd->set_query_id(next_query_id());
thd->query_id= next_query_id();
pthread_mutex_unlock(&LOCK_thread_count);
if (thd->prelocked_mode == NON_PRELOCKED) if (thd->prelocked_mode == NON_PRELOCKED)
{ {
......
...@@ -3269,6 +3269,26 @@ void THD::set_query(char *query_arg, uint32 query_length_arg) ...@@ -3269,6 +3269,26 @@ void THD::set_query(char *query_arg, uint32 query_length_arg)
pthread_mutex_unlock(&LOCK_thd_data); pthread_mutex_unlock(&LOCK_thd_data);
} }
/** Assign a new value to thd->query and thd->query_id. */
void THD::set_query_and_id(char *query_arg, uint32 query_length_arg,
query_id_t new_query_id)
{
pthread_mutex_lock(&LOCK_thd_data);
set_query_inner(query_arg, query_length_arg);
query_id= new_query_id;
pthread_mutex_unlock(&LOCK_thd_data);
}
/** Assign a new value to thd->query_id. */
void THD::set_query_id(query_id_t new_query_id)
{
pthread_mutex_lock(&LOCK_thd_data);
query_id= new_query_id;
pthread_mutex_unlock(&LOCK_thd_data);
}
/** /**
Mark transaction to rollback and mark error as fatal to a sub-statement. Mark transaction to rollback and mark error as fatal to a sub-statement.
......
...@@ -2314,10 +2314,13 @@ public: ...@@ -2314,10 +2314,13 @@ public:
virtual void set_statement(Statement *stmt); virtual void set_statement(Statement *stmt);
/** /**
Assign a new value to thd->query. Assign a new value to thd->query and thd->query_id.
Protected with LOCK_thd_data mutex. Protected with LOCK_thd_data mutex.
*/ */
void set_query(char *query_arg, uint32 query_length_arg); void set_query(char *query_arg, uint32 query_length_arg);
void set_query_and_id(char *query_arg, uint32 query_length_arg,
query_id_t new_query_id);
void set_query_id(query_id_t new_query_id);
private: private:
/** The current internal error handler for this thread, or NULL. */ /** The current internal error handler for this thread, or NULL. */
Internal_error_handler *m_internal_handler; Internal_error_handler *m_internal_handler;
......
...@@ -438,7 +438,7 @@ Sensitive_cursor::fetch(ulong num_rows) ...@@ -438,7 +438,7 @@ Sensitive_cursor::fetch(ulong num_rows)
thd->derived_tables= derived_tables; thd->derived_tables= derived_tables;
thd->open_tables= open_tables; thd->open_tables= open_tables;
thd->lock= lock; thd->lock= lock;
thd->query_id= query_id; thd->set_query_id(query_id);
thd->change_list= change_list; thd->change_list= change_list;
/* save references to memory allocated during fetch */ /* save references to memory allocated during fetch */
thd->set_n_backup_active_arena(this, &backup_arena); thd->set_n_backup_active_arena(this, &backup_arena);
......
...@@ -484,7 +484,7 @@ static void handle_bootstrap_impl(THD *thd) ...@@ -484,7 +484,7 @@ static void handle_bootstrap_impl(THD *thd)
query= (char *) thd->memdup_w_gap(buff, length + 1, query= (char *) thd->memdup_w_gap(buff, length + 1,
thd->db_length + 1 + thd->db_length + 1 +
QUERY_CACHE_FLAGS_SIZE); QUERY_CACHE_FLAGS_SIZE);
thd->set_query(query, length); thd->set_query_and_id(query, length, next_query_id());
DBUG_PRINT("query",("%-.4096s",thd->query())); DBUG_PRINT("query",("%-.4096s",thd->query()));
#if defined(ENABLED_PROFILING) #if defined(ENABLED_PROFILING)
thd->profiling.start_new_query(); thd->profiling.start_new_query();
...@@ -495,7 +495,6 @@ static void handle_bootstrap_impl(THD *thd) ...@@ -495,7 +495,6 @@ static void handle_bootstrap_impl(THD *thd)
We don't need to obtain LOCK_thread_count here because in bootstrap We don't need to obtain LOCK_thread_count here because in bootstrap
mode we have only one thread. mode we have only one thread.
*/ */
thd->query_id=next_query_id();
thd->set_time(); thd->set_time();
mysql_parse(thd, thd->query(), length, & found_semicolon); mysql_parse(thd, thd->query(), length, & found_semicolon);
close_thread_tables(thd); // Free tables close_thread_tables(thd); // Free tables
...@@ -983,29 +982,29 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -983,29 +982,29 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->enable_slow_log= TRUE; thd->enable_slow_log= TRUE;
thd->lex->sql_command= SQLCOM_END; /* to avoid confusing VIEW detectors */ thd->lex->sql_command= SQLCOM_END; /* to avoid confusing VIEW detectors */
thd->set_time(); thd->set_time();
pthread_mutex_lock(&LOCK_thread_count); {
thd->query_id= global_query_id; query_id_t query_id;
switch( command ) { switch( command ) {
/* Ignore these statements. */ /* Ignore these statements. */
case COM_STATISTICS: case COM_STATISTICS:
case COM_PING: case COM_PING:
query_id= get_query_id();
break; break;
/* Only increase id on these statements but don't count them. */ /* Only increase id on these statements but don't count them. */
case COM_STMT_PREPARE: case COM_STMT_PREPARE:
case COM_STMT_CLOSE: case COM_STMT_CLOSE:
case COM_STMT_RESET: case COM_STMT_RESET:
next_query_id(); query_id= next_query_id() - 1;
break; break;
/* Increase id and count all other statements. */ /* Increase id and count all other statements. */
default: default:
statistic_increment(thd->status_var.questions, &LOCK_status); statistic_increment(thd->status_var.questions, &LOCK_status);
next_query_id(); query_id= next_query_id() - 1;
} }
thd->set_query_id(query_id);
thread_running++; }
inc_thread_running();
/* TODO: set thd->lex->sql_command to SQLCOM_END here */ /* TODO: set thd->lex->sql_command to SQLCOM_END here */
pthread_mutex_unlock(&LOCK_thread_count);
/** /**
Clear the set of flags that are expected to be cleared at the Clear the set of flags that are expected to be cleared at the
...@@ -1267,16 +1266,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1267,16 +1266,13 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->security_ctx->priv_user, thd->security_ctx->priv_user,
(char *) thd->security_ctx->host_or_ip); (char *) thd->security_ctx->host_or_ip);
thd->set_query(beginning_of_next_stmt, length); thd->set_query_and_id(beginning_of_next_stmt, length, next_query_id());
pthread_mutex_lock(&LOCK_thread_count);
/* /*
Count each statement from the client. Count each statement from the client.
*/ */
statistic_increment(thd->status_var.questions, &LOCK_status); statistic_increment(thd->status_var.questions, &LOCK_status);
thd->query_id= next_query_id();
thd->set_time(); /* Reset the query start time. */ thd->set_time(); /* Reset the query start time. */
/* TODO: set thd->lex->sql_command to SQLCOM_END here */ /* TODO: set thd->lex->sql_command to SQLCOM_END here */
pthread_mutex_unlock(&LOCK_thread_count);
mysql_parse(thd, beginning_of_next_stmt, length, &end_of_stmt); mysql_parse(thd, beginning_of_next_stmt, length, &end_of_stmt);
} }
...@@ -1590,9 +1586,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -1590,9 +1586,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd_proc_info(thd, "cleaning up"); thd_proc_info(thd, "cleaning up");
thd->set_query(NULL, 0); thd->set_query(NULL, 0);
thd->command=COM_SLEEP; thd->command=COM_SLEEP;
pthread_mutex_lock(&LOCK_thread_count); // For process list dec_thread_running();
thread_running--;
pthread_mutex_unlock(&LOCK_thread_count);
thd_proc_info(thd, 0); thd_proc_info(thd, 0);
thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory thd->packet.shrink(thd->variables.net_buffer_length); // Reclaim some memory
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
......
...@@ -48,6 +48,34 @@ pthread_handler_t test_atomic_add(void *arg) ...@@ -48,6 +48,34 @@ pthread_handler_t test_atomic_add(void *arg)
return 0; return 0;
} }
volatile int64 a64;
/* add and sub a random number in a loop. Must get 0 at the end */
pthread_handler_t test_atomic_add64(void *arg)
{
int m= (*(int *)arg)/2;
GCC_BUG_WORKAROUND int64 x;
for (x= ((int64)(intptr)(&m)); m ; m--)
{
x= (x*m+0xfdecba987654321LL) & INT_MAX64;
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add64(&a64, x);
my_atomic_rwlock_wrunlock(&rwl);
my_atomic_rwlock_wrlock(&rwl);
my_atomic_add64(&a64, -x);
my_atomic_rwlock_wrunlock(&rwl);
}
pthread_mutex_lock(&mutex);
if (!--running_threads)
{
bad= (a64 != 0);
pthread_cond_signal(&cond);
}
pthread_mutex_unlock(&mutex);
return 0;
}
/* /*
1. generate thread number 0..N-1 from b32 1. generate thread number 0..N-1 from b32
2. add it to bad 2. add it to bad
...@@ -128,7 +156,7 @@ pthread_handler_t test_atomic_cas(void *arg) ...@@ -128,7 +156,7 @@ pthread_handler_t test_atomic_cas(void *arg)
void do_tests() void do_tests()
{ {
plan(4); plan(6);
bad= my_atomic_initialize(); bad= my_atomic_initialize();
ok(!bad, "my_atomic_initialize() returned %d", bad); ok(!bad, "my_atomic_initialize() returned %d", bad);
...@@ -142,5 +170,14 @@ void do_tests() ...@@ -142,5 +170,14 @@ void do_tests()
b32= c32= 0; b32= c32= 0;
test_concurrently("my_atomic_cas32", test_atomic_cas, THREADS, CYCLES); test_concurrently("my_atomic_cas32", test_atomic_cas, THREADS, CYCLES);
{
int64 b=0x1000200030004000LL;
a64=0;
my_atomic_add64(&a64, b);
ok(a64==b, "add64");
}
a64=0;
test_concurrently("my_atomic_add64", test_atomic_add64, THREADS, CYCLES);
my_atomic_rwlock_destroy(&rwl); my_atomic_rwlock_destroy(&rwl);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment