From 6a494158bd3a509449efeacfa12721ad1be7688e Mon Sep 17 00:00:00 2001
From: unknown <vva@eagle.mysql.r18.ru>
Date: Mon, 30 Dec 2002 01:46:48 +0400
Subject: [PATCH] processing thread specific queries

BitKeeper/etc/logging_ok:
  Logging to logging@openlogging.org accepted
---
 BitKeeper/etc/logging_ok          |  1 +
 mysql-test/r/rpl_temporary.result | 74 +++++++++++++++++++++++++++++++
 mysql-test/t/rpl_temporary.test   | 72 ++++++++++++++++++++++++++++++
 sql/log_event.cc                  | 10 +++--
 sql/log_event.h                   |  6 ++-
 sql/mysqld.cc                     |  1 +
 sql/set_var.cc                    |  6 +++
 sql/sql_base.cc                   |  9 ++--
 sql/sql_class.cc                  |  2 +-
 sql/sql_class.h                   | 11 ++---
 sql/sql_lex.cc                    |  1 +
 sql/sql_lex.h                     |  1 +
 sql/sql_table.cc                  |  2 +
 13 files changed, 181 insertions(+), 15 deletions(-)
 create mode 100644 mysql-test/r/rpl_temporary.result
 create mode 100644 mysql-test/t/rpl_temporary.test

diff --git a/BitKeeper/etc/logging_ok b/BitKeeper/etc/logging_ok
index 89391563f72..15f884302a1 100644
--- a/BitKeeper/etc/logging_ok
+++ b/BitKeeper/etc/logging_ok
@@ -90,6 +90,7 @@ tonu@x153.internalnet
 tonu@x3.internalnet
 venu@myvenu.com
 venu@work.mysql.com
+vva@eagle.mysql.r18.ru
 vva@genie.(none)
 walrus@mysql.com
 wax@mysql.com
diff --git a/mysql-test/r/rpl_temporary.result b/mysql-test/r/rpl_temporary.result
new file mode 100644
index 00000000000..a8a9fd92a87
--- /dev/null
+++ b/mysql-test/r/rpl_temporary.result
@@ -0,0 +1,74 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+drop table if exists t1;
+create table t1(f int);
+drop table if exists t2;
+create table t2(f int);
+insert into t1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+create temporary table t3(f int);
+insert into t3 select * from t1 where f<6;
+create temporary table t3(f int);
+insert into t2 select count(*) from t3;
+insert into t3 select * from t1 where f>=4;
+drop temporary table t3;
+insert into t2 select count(*) from t3;
+drop temporary table t3;
+select * from t2;
+f
+5
+7
+show binlog events;
+Log_name	Pos	Event_type	Server_id	Orig_log_pos	Info
+master-bin.000001	4	Start	1	4	Server ver: 4.1.0-alpha-debug-log, Binlog ver: 3
+master-bin.000001	79	Query	1	79	use `test`; create table t1(f int)
+master-bin.000001	136	Query	1	136	use `test`; create table t2(f int)
+master-bin.000001	193	Query	1	193	use `test`; insert into t1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)
+master-bin.000001	290	Query	1	290	use `test`; create temporary table t3(f int)
+master-bin.000001	357	Query	1	357	use `test`; insert into t3 select * from t1 where f<6
+master-bin.000001	433	Query	1	433	use `test`; create temporary table t3(f int)
+master-bin.000001	500	Query	1	500	use `test`; insert into t2 select count(*) from t3
+master-bin.000001	573	Query	1	573	use `test`; drop temporary table t3
+master-bin.000001	631	Query	1	631	use `test`; insert into t3 select * from t1 where f>=4
+master-bin.000001	708	Query	1	708	use `test`; insert into t2 select count(*) from t3
+master-bin.000001	781	Query	1	781	use `test`; drop temporary table t3
+drop table if exists t1;
+drop table if exists t2;
+use test;
+SET TIMESTAMP=1040323920;
+create table t1(f int);
+SET TIMESTAMP=1040323931;
+create table t2(f int);
+SET TIMESTAMP=1040323938;
+insert into t1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+SET TIMESTAMP=1040323945;
+SET @@session.pseudo_thread_id=1;
+create temporary table t3(f int);
+SET TIMESTAMP=1040323952;
+SET @@session.pseudo_thread_id=1;
+insert into t3 select * from t1 where f<6;
+SET TIMESTAMP=1040324145;
+SET @@session.pseudo_thread_id=2;
+create temporary table t3(f int);
+SET TIMESTAMP=1040324186;
+SET @@session.pseudo_thread_id=1;
+insert into t2 select count(*) from t3;
+SET TIMESTAMP=1040324200;
+SET @@session.pseudo_thread_id=2;
+insert into t3 select * from t1 where f>=4;
+SET TIMESTAMP=1040324211;
+SET @@session.pseudo_thread_id=1;
+drop temporary table t3;
+SET TIMESTAMP=1040324219;
+SET @@session.pseudo_thread_id=2;
+insert into t2 select count(*) from t3;
+SET TIMESTAMP=1040324224;
+SET @@session.pseudo_thread_id=2;
+drop temporary table t3;
+select * from t2;
+f
+5
+7
diff --git a/mysql-test/t/rpl_temporary.test b/mysql-test/t/rpl_temporary.test
new file mode 100644
index 00000000000..a924e628fd4
--- /dev/null
+++ b/mysql-test/t/rpl_temporary.test
@@ -0,0 +1,72 @@
+source include/master-slave.inc;
+
+connect (con1,localhost,root,,);
+connect (con2,localhost,root,,);
+
+drop table if exists t1;
+create table t1(f int);
+drop table if exists t2;
+create table t2(f int);
+insert into t1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+
+connection con1;
+create temporary table t3(f int);
+insert into t3 select * from t1 where f<6;
+
+connection con2;
+create temporary table t3(f int);
+
+connection con1;
+insert into t2 select count(*) from t3;
+
+connection con2;
+insert into t3 select * from t1 where f>=4;
+
+connection con1;
+drop temporary table t3;
+
+connection con2;
+insert into t2 select count(*) from t3;
+drop temporary table t3;
+
+select * from t2;
+
+show binlog events;
+
+drop table if exists t1;
+drop table if exists t2;
+
+use test;
+SET TIMESTAMP=1040323920;
+create table t1(f int);
+SET TIMESTAMP=1040323931;
+create table t2(f int);
+SET TIMESTAMP=1040323938;
+insert into t1 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+
+SET TIMESTAMP=1040323945;
+SET @@session.pseudo_thread_id=1;
+create temporary table t3(f int);
+SET TIMESTAMP=1040323952;
+SET @@session.pseudo_thread_id=1;
+insert into t3 select * from t1 where f<6;
+SET TIMESTAMP=1040324145;
+SET @@session.pseudo_thread_id=2;
+create temporary table t3(f int);
+SET TIMESTAMP=1040324186;
+SET @@session.pseudo_thread_id=1;
+insert into t2 select count(*) from t3;
+SET TIMESTAMP=1040324200;
+SET @@session.pseudo_thread_id=2;
+insert into t3 select * from t1 where f>=4;
+SET TIMESTAMP=1040324211;
+SET @@session.pseudo_thread_id=1;
+drop temporary table t3;
+SET TIMESTAMP=1040324219;
+SET @@session.pseudo_thread_id=2;
+insert into t2 select count(*) from t3;
+SET TIMESTAMP=1040324224;
+SET @@session.pseudo_thread_id=2;
+drop temporary table t3;
+
+select * from t2;
\ No newline at end of file
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 5050bba9965..9fae7639549 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -728,7 +728,9 @@ int Query_log_event::write_data(IO_CACHE* file)
 #ifndef MYSQL_CLIENT
 Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
 				 ulong query_length, bool using_trans)
-  :Log_event(thd_arg, 0, using_trans), data_buf(0), query(query_arg),
+  :Log_event(thd_arg, !thd_arg->lex.tmp_table_used ? 
+	     0 : LOG_EVENT_THREAD_SPECIFIC_F, using_trans), 
+   data_buf(0), query(query_arg),
    db(thd_arg->db), q_len((uint32) query_length),
   error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
   thread_id(thd_arg->thread_id)
@@ -810,6 +812,8 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
   *end++=';';
   *end++='\n';
   my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
+  if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
+    fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id);
   my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
   fprintf(file, ";\n");
 }
@@ -845,7 +849,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
     thd->query_error= 0;			// clear error
     thd->clear_error();
     
-    thd->slave_proxy_id = thread_id;		// for temp tables
+    thd->variables.pseudo_thread_id= thread_id;		// for temp tables
 	
     /*
       Sanity check to make sure the master did not get a really bad
@@ -1468,7 +1472,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
       ex.skip_lines = skip_lines;
       List<Item> field_list;
       set_fields(field_list);
-      thd->slave_proxy_id = thd->thread_id;
+      thd->variables.pseudo_thread_id= thd->thread_id;
       if (net)
       {
 	// mysql_load will use thd->net to read the file
diff --git a/sql/log_event.h b/sql/log_event.h
index 20a134ab3cc..4fa18c5cec6 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -212,8 +212,10 @@ struct sql_ex_info
 
 #define BINLOG_MAGIC        "\xfe\x62\x69\x6e"
 
-#define LOG_EVENT_TIME_F           0x1
-#define LOG_EVENT_FORCED_ROTATE_F  0x2
+#define LOG_EVENT_TIME_F            0x1
+#define LOG_EVENT_FORCED_ROTATE_F   0x2
+#define LOG_EVENT_THREAD_SPECIFIC_F 0x4 /* query depends on thread  
+                                           (for example: TEMPORARY TABLE) */
 
 enum Log_event_type
 {
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 9de04ac3454..19636531208 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -1894,6 +1894,7 @@ int main(int argc, char **argv)
 
   set_options();
   get_options(argc,argv);
+  max_system_variables.pseudo_thread_id= (ulong)~0;
   if (opt_log || opt_update_log || opt_slow_log || opt_bin_log)
     strcat(server_version,"-log");
   DBUG_PRINT("info",("%s  Ver %s for %s on %s\n",my_progname,
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 691add191b2..669a0911192 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -21,6 +21,8 @@
 
   - If the variable is thread specific, add it to 'system_variables' struct.
     If not, add it to mysqld.cc and an declaration in 'mysql_priv.h'
+  - Don't forget to initialize new fields in global_system_variables and
+     max_system_variables!
   - Use one of the 'sys_var... classes from set_var.h or write a specific
     one for the variable type.
   - Define it in the 'variable definition list' in this file.
@@ -154,6 +156,8 @@ sys_var_thd_ulong	sys_max_error_count("max_error_count",
 					    &SV::max_error_count);
 sys_var_thd_ulong	sys_max_heap_table_size("max_heap_table_size",
 						&SV::max_heap_table_size);
+sys_var_thd_ulong       sys_pseudo_thread_id("pseudo_thread_id",
+					     &SV::pseudo_thread_id);
 sys_var_thd_ulonglong	sys_max_join_size("max_join_size",
 					  &SV::max_join_size,
 					  fix_max_join_size);
@@ -364,6 +368,7 @@ sys_var *sys_variables[]=
   &sys_net_retry_count,
   &sys_net_wait_timeout,
   &sys_net_write_timeout,
+  &sys_pseudo_thread_id,
   &sys_query_cache_size,
 #ifdef HAVE_QUERY_CACHE
   &sys_query_cache_limit,
@@ -512,6 +517,7 @@ struct show_var_st init_vars[]= {
   {"pid_file",                (char*) pidfile_name,                 SHOW_CHAR},
   {"port",                    (char*) &mysql_port,                  SHOW_INT},
   {"protocol_version",        (char*) &protocol_version,            SHOW_INT},
+  {sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id,        SHOW_SYS},
   {sys_read_buff_size.name,   (char*) &sys_read_buff_size,	    SHOW_SYS},
   {sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size,	    SHOW_SYS},
   {sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank,       SHOW_SYS},
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index e2b36106fb0..7d0864d7adb 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -798,7 +798,7 @@ TABLE **find_temporary_table(THD *thd, const char *db, const char *table_name)
   uint	key_length= (uint) (strmov(strmov(key,db)+1,table_name)-key)+1;
   TABLE *table,**prev;
 
-  int4store(key+key_length,thd->slave_proxy_id);
+  int4store(key+key_length,thd->variables.pseudo_thread_id);
   key_length += 4;
 
   prev= &thd->temporary_tables;
@@ -838,7 +838,7 @@ bool rename_temporary_table(THD* thd, TABLE *table, const char *db,
     (strmov((table->real_name=strmov(table->table_cache_key=key,
 				     db)+1),
 	    table_name) - table->table_cache_key)+1;
-  int4store(key+table->key_length,thd->slave_proxy_id);
+  int4store(key+table->key_length,thd->variables.pseudo_thread_id);
   table->key_length += 4;
   return 0;
 }
@@ -992,7 +992,7 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name,
   if (thd->killed)
     DBUG_RETURN(0);
   key_length= (uint) (strmov(strmov(key,db)+1,table_name)-key)+1;
-  int4store(key + key_length, thd->slave_proxy_id);
+  int4store(key + key_length, thd->variables.pseudo_thread_id);
 
   for (table=thd->temporary_tables; table ; table=table->next)
   {
@@ -1006,6 +1006,7 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name,
 	DBUG_RETURN(0);
       }
       table->query_id=thd->query_id;
+      thd->lex.tmp_table_used= 1;
       goto reset;
     }
   }
@@ -1806,7 +1807,7 @@ TABLE *open_temporary_table(THD *thd, const char *path, const char *db,
 					 +1), table_name)
 				 - tmp_table->table_cache_key)+1;
   int4store(tmp_table->table_cache_key + tmp_table->key_length,
-	    thd->slave_proxy_id);
+	    thd->variables.pseudo_thread_id);
   tmp_table->key_length += 4;
 
   if (link_in_list)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index ba6369b0022..12ea06fa220 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -100,7 +100,7 @@ THD::THD():user_time(0), fatal_error(0),
   start_time=(time_t) 0;
   current_linfo =  0;
   slave_thread = 0;
-  slave_proxy_id = 0;
+  variables.pseudo_thread_id= 0;
   file_id = 0;
   cond_count=0;
   warn_id= 0;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index ca56d2dcdf5..084ae412b3a 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -370,6 +370,12 @@ struct system_variables
   ulong tmp_table_size;
   ulong tx_isolation;
 
+  /*
+    In slave thread we need to know in behalf of which
+    thread the query is being run to replicate temp tables properly
+  */
+  ulong pseudo_thread_id;
+
   my_bool log_warnings;
   my_bool low_priority_updates; 
 
@@ -522,11 +528,6 @@ class THD :public ilink {
     each thread that is using LOG_INFO needs to adjust the pointer to it
   */
   LOG_INFO*  current_linfo;
-  /*
-    In slave thread we need to know in behalf of which
-    thread the query is being run to replicate temp tables properly
-  */
-  ulong	     slave_proxy_id;
   NET*       slave_net;			// network connection from slave -> m.
   my_off_t   log_pos;
   /* Used by the sys_var class to store temporary values */
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index 833f36dbe9f..c1f44a85609 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -170,6 +170,7 @@ LEX *lex_start(THD *thd, uchar *buf,uint length)
   lex->slave_thd_opt=0;
   lex->sql_command=SQLCOM_END;
   lex->safe_to_cache_query= 1;
+  lex->tmp_table_used= 0;
   bzero(&lex->mi,sizeof(lex->mi));
   return lex;
 }
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 0c761baffa3..385defbdf8e 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -459,6 +459,7 @@ typedef struct st_lex
   uint slave_thd_opt;
   CHARSET_INFO *charset;
   char *help_arg;
+  bool tmp_table_used;
 } LEX;
 
 
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index d343ccd39f5..7ce50ba0f96 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -190,6 +190,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists,
       wrong_tables.append(String(table->real_name,default_charset_info));
     }
   }
+  thd->lex.tmp_table_used= tmp_table_deleted;
   if (some_tables_deleted || tmp_table_deleted)
   {
     query_cache_invalidate3(thd, tables, 0);
@@ -814,6 +815,7 @@ int mysql_create_table(THD *thd,const char *db, const char *table_name,
       (void) rm_temporary_table(create_info->db_type, path);
       goto end;
     }
+    thd->lex.tmp_table_used= 1;
   }
   if (!tmp_table && !no_log)
   {
-- 
2.30.9