Commit e0100606 authored by sasha@mysql.sashanet.com's avatar sasha@mysql.sashanet.com

Merge work.mysql.com:/home/bk/mysql

into mysql.sashanet.com:/home/sasha/src/bk/mysql
parents 4dfa7cad de445fe9
...@@ -371,3 +371,5 @@ libmysql_r/thr_mutex.c ...@@ -371,3 +371,5 @@ libmysql_r/thr_mutex.c
libmysql_r/typelib.c libmysql_r/typelib.c
libmysql_r/violite.c libmysql_r/violite.c
linked_libmysql_r_sources linked_libmysql_r_sources
client/thimble
support-files/mysql-3.23.29-gamma.spec
...@@ -46,6 +46,9 @@ ...@@ -46,6 +46,9 @@
#define MAX_CONS 1024 #define MAX_CONS 1024
#define MAX_INCLUDE_DEPTH 16 #define MAX_INCLUDE_DEPTH 16
#define LAZY_GUESS_BUF_SIZE 8192 #define LAZY_GUESS_BUF_SIZE 8192
#define INIT_Q_LINES 1024
#define MIN_VAR_ALLOC 32
#define BLOCK_STACK_DEPTH 32
int record = 0, verbose = 0, silent = 0; int record = 0, verbose = 0, silent = 0;
const char* record_mode = "r"; const char* record_mode = "r";
...@@ -58,15 +61,44 @@ FILE* file_stack[MAX_INCLUDE_DEPTH]; ...@@ -58,15 +61,44 @@ FILE* file_stack[MAX_INCLUDE_DEPTH];
FILE** cur_file; FILE** cur_file;
FILE** file_stack_end; FILE** file_stack_end;
int block_stack[BLOCK_STACK_DEPTH];
int *cur_block, *block_stack_end;
DYNAMIC_ARRAY q_lines;
struct connection struct connection
{ {
MYSQL mysql; MYSQL mysql;
char *name; char *name;
}; };
typedef
struct
{
int read_lines,current_line;
} PARSER;
PARSER parser;
int block_ok = 1; /* set to 0 if the current block should not be executed */
int false_block_depth = 0;
typedef struct
{
char* name;
char* str_val;
int str_val_len;
int int_val;
int alloced_len;
int int_dirty; /* do not update string if int is updated until first read */
} VAR;
VAR var_reg[10];
/*Perl/shell-like variable registers */
struct connection cons[MAX_CONS]; struct connection cons[MAX_CONS];
struct connection* cur_con, *next_con, *cons_end; struct connection* cur_con, *next_con, *cons_end;
/* this should really be called command*/
struct query struct query
{ {
char q[MAX_QUERY]; char q[MAX_QUERY];
...@@ -75,11 +107,21 @@ struct query ...@@ -75,11 +107,21 @@ struct query
int abort_on_error; int abort_on_error;
uint expected_errno; uint expected_errno;
char record_file[MAX_RECORD_FILE]; char record_file[MAX_RECORD_FILE];
enum {Q_CONNECTION, Q_QUERY, Q_CONNECT,
Q_SLEEP, Q_INC, Q_DEC,Q_SOURCE,
Q_DISCONNECT,Q_LET, Q_ECHO, Q_WHILE, Q_END_BLOCK,
Q_UNKNOWN} type;
}; };
static void die(const char* fmt, ...); static void die(const char* fmt, ...);
int close_connection(struct query* q); int close_connection(struct query* q);
VAR* var_get(char* var_name, char* var_name_end, int raw);
void init_parser()
{
parser.current_line = parser.read_lines = 0;
memset(&var_reg,0, sizeof(var_reg));
}
int hex_val(int c) int hex_val(int c)
{ {
...@@ -91,6 +133,69 @@ int hex_val(int c) ...@@ -91,6 +133,69 @@ int hex_val(int c)
return -1; return -1;
} }
VAR* var_get(char* var_name, char* var_name_end, int raw)
{
int digit;
VAR* v;
if(*var_name++ != '$')
{
--var_name;
goto err;
}
digit = *var_name - '0';
if(!(digit < 10 && digit >= 0))
{
--var_name;
goto err;
}
v = var_reg + digit;
if(!raw && v->int_dirty)
{
sprintf(v->str_val, "%d", v->int_val);
v->int_dirty = 0;
}
return v;
err:
if(var_name_end)
*var_name_end = 0;
die("Unsupported variable name: %s", var_name);
return 0;
}
int var_set(char* var_name, char* var_name_end, char* var_val,
char* var_val_end)
{
int digit;
int val_len;
VAR* v;
if(*var_name++ != '$')
{
--var_name;
*var_name_end = 0;
die("Variable name in %s does not start with '$'", var_name);
}
digit = *var_name - '0';
if(!(digit < 10 && digit >= 0))
{
*var_name_end = 0;
die("Unsupported variable name: %s", var_name);
}
v = var_reg + digit;
if(v->alloced_len < (val_len = (int)(var_val_end - var_val)+1))
{
v->alloced_len = (val_len < MIN_VAR_ALLOC) ? MIN_VAR_ALLOC : val_len;
if(!(v->str_val =
v->str_val ? my_realloc(v->str_val, v->alloced_len, MYF(MY_WME)) :
my_malloc(v->alloced_len, MYF(MY_WME))))
die("Out of memory");
}
memcpy(v->str_val, var_val, val_len-1);
v->str_val_len = val_len;
v->str_val[val_len] = 0;
v->int_val = atoi(v->str_val);
return 0;
}
int open_file(const char* name) int open_file(const char* name)
{ {
if(*cur_file && ++cur_file == file_stack_end) if(*cur_file && ++cur_file == file_stack_end)
...@@ -116,19 +221,118 @@ int do_source(struct query* q) ...@@ -116,19 +221,118 @@ int do_source(struct query* q)
return open_file(name); return open_file(name);
} }
int eval_expr(VAR* v, char* p, char* p_end)
{
VAR* vp;
if((vp = var_get(p,p_end,0)))
{
memcpy(v, vp, sizeof(VAR));
return 0;
}
if(p_end)
*p_end = 0;
die("Invalid expr: %s", p);
return 1;
}
int do_inc(struct query* q)
{
char* p;
VAR* v;
p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++;
v = var_get(p, 0, 1);
v->int_val++;
v->int_dirty = 1;
return 0;
}
int do_dec(struct query* q)
{
char* p;
VAR* v;
p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++;
v = var_get(p, 0, 1);
v->int_val--;
v->int_dirty = 1;
return 0;
}
int do_echo(struct query* q)
{
char* p;
VAR v;
p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++;
eval_expr(&v, p, 0); /* NULL terminated */
if(v.str_val_len > 1)
{
fflush(stdout);
write(1, v.str_val, v.str_val_len - 1);
}
write(1, "\n", 1);
return 0;
}
int do_let(struct query* q)
{
char* p, *var_name, *var_name_end, *var_val_start;
p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++;
if(!*p)
die("Missing variable name in let\n");
var_name = p;
while(*p && (*p != '=' || isspace(*p)))
p++;
var_name_end = p;
if(*p == '=') p++;
while(*p && isspace(*p))
p++;
var_val_start = p;
while(*p && !isspace(*p))
p++;
return var_set(var_name, var_name_end, var_val_start, p);
}
int do_sleep(struct query* q) int do_sleep(struct query* q)
{ {
char* p, *arg; char* p, *arg;
struct timeval t;
int dec_mul = 1000000;
p = (char*)q->q + q->first_word_len; p = (char*)q->q + q->first_word_len;
while(*p && isspace(*p)) p++; while(*p && isspace(*p)) p++;
if(!*p) if(!*p)
die("Missing agument in sleep\n"); die("Missing agument in sleep\n");
arg = p; arg = p;
while(*p && !isspace(*p)) t.tv_sec = atoi(arg);
t.tv_usec = 0;
while(*p && *p != '.' && !isspace(*p))
p++; p++;
if(*p == '.')
{
char c;
char *p_end;
p++;
p_end = p + 6;
for(;p <= p_end; ++p)
{
c = *p - '0';
if(c < 10 && c >= 0)
{
t.tv_usec = t.tv_usec * 10 + c;
dec_mul /= 10;
}
else
break;
}
}
*p = 0; *p = 0;
t.tv_usec *= dec_mul;
return sleep(atoi(arg)); return select(0,0,0,0, &t);
} }
...@@ -238,6 +442,50 @@ int do_connect(struct query* q) ...@@ -238,6 +442,50 @@ int do_connect(struct query* q)
return 0; return 0;
} }
int do_done(struct query* q)
{
q->type = Q_END_BLOCK;
if(cur_block == block_stack)
die("Stray '}' - end of block before beginning");
if(block_ok)
parser.current_line = *--cur_block;
else
{
if(!--false_block_depth)
block_ok = 1;
++parser.current_line;
}
return 0;
}
int do_while(struct query* q)
{
char *p = q->q + q->first_word_len;
char* expr_start, *expr_end;
VAR v;
if(cur_block == block_stack_end)
die("Nesting too deep");
if(!block_ok)
{
++false_block_depth;
return 0;
}
expr_start = strchr(p, '(');
if(!expr_start)
die("missing '(' in while");
expr_end = strrchr(expr_start, ')');
if(!expr_end)
die("missing ')' in while");
eval_expr(&v, ++expr_start, --expr_end);
*cur_block++ = parser.current_line++;
if(!v.int_val)
{
block_ok = 0;
false_block_depth = 1;
}
return 0;
}
void close_cons() void close_cons()
{ {
for(--next_con; next_con >= cons; --next_con) for(--next_con; next_con >= cons; --next_con)
...@@ -328,7 +576,9 @@ int read_line(char* buf, int size) ...@@ -328,7 +576,9 @@ int read_line(char* buf, int size)
switch(state) switch(state)
{ {
case R_NORMAL: case R_NORMAL:
if(c == ';') if(c == ';' || c == '{') /* '{' allows some interesting syntax
* but we don't care, as long as the
* correct sytnax gets parsed right */
{ {
*p = 0; *p = 0;
return 0; return 0;
...@@ -355,6 +605,17 @@ int read_line(char* buf, int size) ...@@ -355,6 +605,17 @@ int read_line(char* buf, int size)
} }
else if(isspace(c)) else if(isspace(c))
no_save = 1; no_save = 1;
else if(c == '}')
{
*buf++ = '}';
*buf = 0;
return 0;
}
else if(c == ';' || c == '{')
{
*p = 0;
return 0;
}
else else
state = R_NORMAL; state = R_NORMAL;
break; break;
...@@ -404,18 +665,28 @@ int read_line(char* buf, int size) ...@@ -404,18 +665,28 @@ int read_line(char* buf, int size)
return feof(*cur_file); return feof(*cur_file);
} }
int read_query(struct query* q) int read_query(struct query** q_ptr)
{ {
char buf[MAX_QUERY]; char buf[MAX_QUERY];
char* p = buf,* p1 ; char* p = buf,* p1 ;
int c, expected_errno; int c, expected_errno;
struct query* q;
if(parser.current_line < parser.read_lines)
{
get_dynamic(&q_lines, (gptr)q_ptr, parser.current_line) ;
return 0;
}
if(!(*q_ptr=q=(struct query*)my_malloc(sizeof(*q), MYF(MY_WME)))
|| insert_dynamic(&q_lines, (gptr)&q)
)
die("Out of memory");
q->record_file[0] = 0; q->record_file[0] = 0;
q->abort_on_error = 1; q->abort_on_error = 1;
q->has_result_set = 0; q->has_result_set = 0;
q->first_word_len = 0; q->first_word_len = 0;
q->expected_errno = 0; q->expected_errno = 0;
q->type = Q_UNKNOWN;
if(read_line(buf, sizeof(buf))) if(read_line(buf, sizeof(buf)))
return 1; return 1;
if(*p == '!') if(*p == '!')
...@@ -452,7 +723,7 @@ int read_query(struct query* q) ...@@ -452,7 +723,7 @@ int read_query(struct query* q)
q->first_word_len = p1 - q->q; q->first_word_len = p1 - q->q;
strcpy(p1, p); strcpy(p1, p);
parser.read_lines++;
return 0; return 0;
} }
...@@ -834,12 +1105,56 @@ int check_first_word(struct query* q, const char* word, int len) ...@@ -834,12 +1105,56 @@ int check_first_word(struct query* q, const char* word, int len)
return 1; return 1;
} }
void get_query_type(struct query* q)
{
if(*q->q == '}')
{
q->type = Q_END_BLOCK;
return;
}
q->type = Q_QUERY;
switch(q->first_word_len)
{
case 3:
if(check_first_word(q, "inc", 3))
q->type = Q_INC;
else if(check_first_word(q, "dec", 3))
q->type = Q_DEC;
else if(check_first_word(q, "let", 3))
q->type = Q_LET;
break;
case 4:
if(check_first_word(q, "echo", 4))
q->type = Q_ECHO;
break;
case 5:
if(check_first_word(q, "sleep", 5))
q->type = Q_SLEEP;
else if(check_first_word(q, "while", 5))
q->type = Q_WHILE;
break;
case 6:
if(check_first_word(q, "source", 6))
q->type = Q_SOURCE;
break;
case 7:
if(check_first_word(q, "connect", 7))
q->type = Q_CONNECT;
break;
case 10:
if(check_first_word(q, "connection", 10))
q->type = Q_CONNECTION;
else if(check_first_word(q, "disconnect", 10))
q->type = Q_DISCONNECT;
break;
}
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
int error = 0; int error = 0;
struct query q; struct query* q;
MY_INIT(argv[0]); MY_INIT(argv[0]);
memset(cons, 0, sizeof(cons)); memset(cons, 0, sizeof(cons));
...@@ -850,6 +1165,11 @@ int main(int argc, char** argv) ...@@ -850,6 +1165,11 @@ int main(int argc, char** argv)
memset(file_stack, 0, sizeof(file_stack)); memset(file_stack, 0, sizeof(file_stack));
file_stack_end = file_stack + MAX_INCLUDE_DEPTH; file_stack_end = file_stack + MAX_INCLUDE_DEPTH;
cur_file = file_stack; cur_file = file_stack;
init_dynamic_array(&q_lines, sizeof(struct query*), INIT_Q_LINES,
INIT_Q_LINES);
memset(block_stack, 0, sizeof(block_stack));
block_stack_end = block_stack + BLOCK_STACK_DEPTH;
cur_block = block_stack;
parse_args(argc, argv); parse_args(argc, argv);
if(!*cur_file) if(!*cur_file)
...@@ -870,20 +1190,42 @@ int main(int argc, char** argv) ...@@ -870,20 +1190,42 @@ int main(int argc, char** argv)
if(!cur_con->name) if(!cur_con->name)
die("Out of memory"); die("Out of memory");
while(!read_query(&q)) for(;!read_query(&q);)
{ {
if(check_first_word(&q, "connect", 7)) int current_line_inc = 1, processed = 0;
do_connect(&q); if(q->type == Q_UNKNOWN)
else if(check_first_word(&q, "connection", 10)) get_query_type(q);
select_connection(&q); if(block_ok)
else if(check_first_word(&q, "disconnect", 10)) {
close_connection(&q); processed = 1;
else if(check_first_word(&q, "source", 6)) switch(q->type)
do_source(&q); {
else if(check_first_word(&q, "sleep", 5)) case Q_CONNECT: do_connect(q); break;
do_sleep(&q); case Q_CONNECTION: select_connection(q); break;
else case Q_DISCONNECT: close_connection(q); break;
error |= run_query(&cur_con->mysql, &q); case Q_SOURCE: do_source(q); break;
case Q_SLEEP: do_sleep(q); break;
case Q_INC: do_inc(q); break;
case Q_DEC: do_dec(q); break;
case Q_ECHO: do_echo(q); break;
case Q_LET: do_let(q); break;
case Q_QUERY: error |= run_query(&cur_con->mysql, q); break;
default: processed = 0; break;
}
}
if(!processed)
{
current_line_inc = 0;
switch(q->type)
{
case Q_WHILE: do_while(q); break;
case Q_END_BLOCK: do_done(q); break;
default: current_line_inc = 1; break;
}
}
parser.current_line += current_line_inc;
} }
close_cons(); close_cons();
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include "my_global.h"
static void spawn_stern_thread(pthread_t *t); static void spawn_stern_thread(pthread_t *t);
static int act_goofy(void); static int act_goofy(void);
...@@ -13,6 +14,7 @@ static struct { ...@@ -13,6 +14,7 @@ static struct {
int msg; int msg;
} comm = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0 }; } comm = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0 };
int int
main(void) main(void)
{ {
...@@ -65,7 +67,7 @@ static int act_goofy(void) ...@@ -65,7 +67,7 @@ static int act_goofy(void)
return ret; return ret;
} }
static void *be_stern(void *v __attribute((unused))) static void *be_stern(void *v __attribute__((unused)))
{ {
int msg; int msg;
for (;;) { for (;;) {
...@@ -87,3 +89,5 @@ static void *be_stern(void *v __attribute((unused))) ...@@ -87,3 +89,5 @@ static void *be_stern(void *v __attribute((unused)))
fputs("You are NOTHING!\n", stderr); fputs("You are NOTHING!\n", stderr);
return NULL; return NULL;
} }
Variable_name Value
Slave_open_temp_tables 0
...@@ -8,7 +8,7 @@ drop table if exists foo; ...@@ -8,7 +8,7 @@ drop table if exists foo;
create table foo(n int); create table foo(n int);
insert into foo values(1),(2); insert into foo values(1),(2);
connection slave; connection slave;
sleep 3; sleep 2;
use test; use test;
@r/3.23/rpl000001.a.result select * from foo; @r/3.23/rpl000001.a.result select * from foo;
@r/3.23/rpl000001.b.result select sum(length(word)) from words; @r/3.23/rpl000001.b.result select sum(length(word)) from words;
......
...@@ -7,5 +7,5 @@ set insert_id = 2000; ...@@ -7,5 +7,5 @@ set insert_id = 2000;
insert into x values (NULL),(NULL),(NULL); insert into x values (NULL),(NULL),(NULL);
connection slave; connection slave;
use test; use test;
sleep 1; sleep 0.5;
@r/3.23/rpl000002.result select * from x; @r/3.23/rpl000002.result select * from x;
...@@ -5,5 +5,5 @@ create table x(n int primary key); ...@@ -5,5 +5,5 @@ create table x(n int primary key);
!insert into x values (1),(2),(2); !insert into x values (1),(2),(2);
insert into x values (3); insert into x values (3);
connection slave; connection slave;
sleep 1; sleep 0.5;
@r/3.23/rpl000003.result select * from x; @r/3.23/rpl000003.result select * from x;
...@@ -8,5 +8,5 @@ INSERT into test SET name='Caleb', age=1; ...@@ -8,5 +8,5 @@ INSERT into test SET name='Caleb', age=1;
ALTER TABLE test ADD id int(8) ZEROFILL AUTO_INCREMENT PRIMARY KEY; ALTER TABLE test ADD id int(8) ZEROFILL AUTO_INCREMENT PRIMARY KEY;
@r/3.23/rpl000005.result select * from test; @r/3.23/rpl000005.result select * from test;
connection slave; connection slave;
sleep 1; sleep 0.5;
@r/3.23/rpl000005.result select * from test; @r/3.23/rpl000005.result select * from test;
--abort-slave-event-count=1
source t/include/master-slave.inc;
connection master;
drop table if exists x;
create table x(n int);
create temporary table t(n int);
insert into t values(1),(2),(3);
insert into x select * from t;
connection master1;
create temporary table t (n int);
insert into t values (4),(5);
insert into x select * from t;
disconnect master;
connection master1;
insert into x values(6);
disconnect master1;
connection slave;
let $1=12;
while ($1)
{
!slave start;
sleep 0.2;
dec $1;
}
@r/3.23/rpl000013.result select * from x;
@r/3.23/rpl000013.status.result show status like 'Slave_open_temp_tables';
...@@ -2217,7 +2217,7 @@ enum options { ...@@ -2217,7 +2217,7 @@ enum options {
OPT_SKIP_INNOBASE,OPT_SAFEMALLOC_MEM_LIMIT, OPT_SKIP_INNOBASE,OPT_SAFEMALLOC_MEM_LIMIT,
OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE, OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE,
OPT_REPLICATE_WILD_DO_TABLE, OPT_REPLICATE_WILD_IGNORE_TABLE, OPT_REPLICATE_WILD_DO_TABLE, OPT_REPLICATE_WILD_IGNORE_TABLE,
OPT_DISCONNECT_SLAVE_EVENT_COUNT OPT_DISCONNECT_SLAVE_EVENT_COUNT, OPT_ABORT_SLAVE_EVENT_COUNT
}; };
static struct option long_options[] = { static struct option long_options[] = {
...@@ -2277,6 +2277,8 @@ static struct option long_options[] = { ...@@ -2277,6 +2277,8 @@ static struct option long_options[] = {
// is a no-op // is a no-op
{"disconnect-slave-event-count", required_argument, 0, {"disconnect-slave-event-count", required_argument, 0,
(int) OPT_DISCONNECT_SLAVE_EVENT_COUNT}, (int) OPT_DISCONNECT_SLAVE_EVENT_COUNT},
{"abort-slave-event-count", required_argument, 0,
(int) OPT_ABORT_SLAVE_EVENT_COUNT},
#if !defined(DBUG_OFF) && defined(SAFEMALLOC) #if !defined(DBUG_OFF) && defined(SAFEMALLOC)
{"safemalloc-mem-limit", required_argument, 0, (int) {"safemalloc-mem-limit", required_argument, 0, (int)
OPT_SAFEMALLOC_MEM_LIMIT}, OPT_SAFEMALLOC_MEM_LIMIT},
...@@ -2883,6 +2885,11 @@ static void get_options(int argc,char **argv) ...@@ -2883,6 +2885,11 @@ static void get_options(int argc,char **argv)
case (int)OPT_DISCONNECT_SLAVE_EVENT_COUNT: case (int)OPT_DISCONNECT_SLAVE_EVENT_COUNT:
#ifndef DBUG_OFF #ifndef DBUG_OFF
disconnect_slave_event_count = atoi(optarg); disconnect_slave_event_count = atoi(optarg);
#endif
break;
case (int)OPT_ABORT_SLAVE_EVENT_COUNT:
#ifndef DBUG_OFF
abort_slave_event_count = atoi(optarg);
#endif #endif
break; break;
case (int) OPT_LOG_SLAVE_UPDATES: case (int) OPT_LOG_SLAVE_UPDATES:
......
...@@ -33,9 +33,13 @@ DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table; ...@@ -33,9 +33,13 @@ DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0; bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0; bool table_rules_on = 0;
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
static TABLE* save_temporary_tables = 0;
#ifndef DBUG_OFF #ifndef DBUG_OFF
int disconnect_slave_event_count = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1; static int events_till_disconnect = -1, events_till_abort = -1;
static int stuck_count = 0; static int stuck_count = 0;
#endif #endif
...@@ -43,8 +47,8 @@ static int stuck_count = 0; ...@@ -43,8 +47,8 @@ static int stuck_count = 0;
static inline void skip_load_data_infile(NET* net); static inline void skip_load_data_infile(NET* net);
static inline bool slave_killed(THD* thd); static inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd); static int init_slave_thread(THD* thd);
static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_sleep(THD* thd, int sec); static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table); static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
...@@ -131,7 +135,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables) ...@@ -131,7 +135,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
int add_table_rule(HASH* h, const char* table_spec) int add_table_rule(HASH* h, const char* table_spec)
{ {
char* dot = strchr(table_spec, '.'); const char* dot = strchr(table_spec, '.');
if(!dot) return 1; if(!dot) return 1;
uint len = (uint)strlen(table_spec); uint len = (uint)strlen(table_spec);
if(!len) return 1; if(!len) return 1;
...@@ -148,7 +152,7 @@ int add_table_rule(HASH* h, const char* table_spec) ...@@ -148,7 +152,7 @@ int add_table_rule(HASH* h, const char* table_spec)
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{ {
char* dot = strchr(table_spec, '.'); const char* dot = strchr(table_spec, '.');
if(!dot) return 1; if(!dot) return 1;
uint len = (uint)strlen(table_spec); uint len = (uint)strlen(table_spec);
if(!len) return 1; if(!len) return 1;
...@@ -968,7 +972,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -968,7 +972,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
pthread_handler_decl(handle_slave,arg __attribute__((unused))) pthread_handler_decl(handle_slave,arg __attribute__((unused)))
{ {
THD *thd;; // needs to be first for thread_stack THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ; MYSQL *mysql = NULL ;
if(!server_id) if(!server_id)
...@@ -985,6 +989,9 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -985,6 +989,9 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
} }
slave_running = 1; slave_running = 1;
abort_slave = 0; abort_slave = 0;
#ifndef DBUG_OFF
events_till_abort = abort_slave_event_count;
#endif
pthread_mutex_unlock(&LOCK_slave); pthread_mutex_unlock(&LOCK_slave);
int error = 1; int error = 1;
...@@ -1001,7 +1008,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1001,7 +1008,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
if(init_slave_thread(thd) || init_master_info(&glob_mi)) if(init_slave_thread(thd) || init_master_info(&glob_mi))
goto err; goto err;
thd->thread_stack = (char*)&thd; // remember where our stack is thd->thread_stack = (char*)&thd; // remember where our stack is
thd->temporary_tables = save_temporary_tables; // restore temp tables
threads.append(thd); threads.append(thd);
DBUG_PRINT("info",("master info: log_file_name=%s, position=%d", DBUG_PRINT("info",("master info: log_file_name=%s, position=%d",
...@@ -1017,14 +1024,16 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1017,14 +1024,16 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
thd->proc_info = "connecting to master"; thd->proc_info = "connecting to master";
#ifndef DBUG_OFF #ifndef DBUG_OFF
sql_print_error("Slave thread initialized"); sql_print_error("Slave thread initialized");
#endif #endif
safe_connect(thd, mysql, &glob_mi); // we can get killed during safe_connect
// always report status on startup, even if we are not in debug if(!safe_connect(thd, mysql, &glob_mi))
sql_print_error("Slave: connected to master '%s@%s:%d',\ sql_print_error("Slave: connected to master '%s@%s:%d',\
replication started in log '%s' at position %ld", glob_mi.user, replication started in log '%s' at position %ld", glob_mi.user,
glob_mi.host, glob_mi.port, glob_mi.host, glob_mi.port,
RPL_LOG_NAME, RPL_LOG_NAME,
glob_mi.pos); glob_mi.pos);
else
goto err;
while(!slave_killed(thd)) while(!slave_killed(thd))
{ {
...@@ -1053,8 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1053,8 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
sql_print_error("Slave: failed dump request, reconnecting to \ sql_print_error("Slave: failed dump request, reconnecting to \
try again, log '%s' at postion %ld", RPL_LOG_NAME, try again, log '%s' at postion %ld", RPL_LOG_NAME,
last_failed_pos = glob_mi.pos ); last_failed_pos = glob_mi.pos );
safe_reconnect(thd, mysql, &glob_mi); if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
if(slave_killed(thd))
goto err; goto err;
continue; continue;
...@@ -1084,8 +1092,7 @@ try again, log '%s' at postion %ld", RPL_LOG_NAME, ...@@ -1084,8 +1092,7 @@ try again, log '%s' at postion %ld", RPL_LOG_NAME,
sql_print_error("Slave: Failed reading log event, \ sql_print_error("Slave: Failed reading log event, \
reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME, reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME,
last_failed_pos = glob_mi.pos); last_failed_pos = glob_mi.pos);
safe_reconnect(thd, mysql, &glob_mi); if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
if(slave_killed(thd))
goto err; goto err;
break; break;
} }
...@@ -1101,6 +1108,13 @@ reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME, ...@@ -1101,6 +1108,13 @@ reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME,
// abort the slave thread, when the problem is fixed, the user // abort the slave thread, when the problem is fixed, the user
// should restart the slave with mysqladmin start-slave // should restart the slave with mysqladmin start-slave
} }
#ifndef DBUG_OFF
if(abort_slave_event_count && !--events_till_abort)
{
sql_print_error("Slave: debugging abort");
goto err;
}
#endif
// successful exec with offset advance, // successful exec with offset advance,
// the slave repents and his sins are forgiven! // the slave repents and his sins are forgiven!
...@@ -1143,6 +1157,8 @@ position %ld", ...@@ -1143,6 +1157,8 @@ position %ld",
pthread_mutex_lock(&LOCK_slave); pthread_mutex_lock(&LOCK_slave);
slave_running = 0; slave_running = 0;
abort_slave = 0; abort_slave = 0;
save_temporary_tables = thd->temporary_tables;
thd->temporary_tables = 0; // remove tempation from destructor to close them
pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
pthread_mutex_unlock(&LOCK_slave); pthread_mutex_unlock(&LOCK_slave);
delete thd; delete thd;
...@@ -1151,13 +1167,14 @@ position %ld", ...@@ -1151,13 +1167,14 @@ position %ld",
DBUG_RETURN(0); // Can't return anything here DBUG_RETURN(0); // Can't return anything here
} }
static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
// will try to connect until successful // will try to connect until successful or slave killed
{ {
int slave_was_killed;
#ifndef DBUG_OFF #ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count; events_till_disconnect = disconnect_slave_event_count;
#endif #endif
while(!slave_killed(thd) && while(!(slave_was_killed = slave_killed(thd)) &&
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)) mi->port, 0, 0))
{ {
...@@ -1166,28 +1183,39 @@ static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) ...@@ -1166,28 +1183,39 @@ static void safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
safe_sleep(thd, mi->connect_retry); safe_sleep(thd, mi->connect_retry);
} }
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d", if(!slave_was_killed)
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port); mi->user, mi->host, mi->port);
return slave_was_killed;
} }
// will try to connect until successful // will try to connect until successful or slave killed
static void safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{ {
int slave_was_killed;
mi->pending = 0; // if we lost connection after reading a state set event mi->pending = 0; // if we lost connection after reading a state set event
// we will be re-reading it, so pending needs to be cleared // we will be re-reading it, so pending needs to be cleared
#ifndef DBUG_OFF #ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count; events_till_disconnect = disconnect_slave_event_count;
#endif #endif
while(!slave_killed(thd) && mc_mysql_reconnect(mysql)) while(!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
{ {
sql_print_error("Slave thread: error connecting to master:\ sql_print_error("Slave thread: error re-connecting to master:\
%s, retry in %d sec", %s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), mi->connect_retry); mc_mysql_error(mysql), errno, mi->connect_retry);
safe_sleep(thd, mi->connect_retry); safe_sleep(thd, mi->connect_retry);
} }
if(!slave_was_killed)
sql_print_error("Slave: reconnected to master '%s@%s:%d',\
replication resumed in log '%s' at position %ld", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
glob_mi.pos);
return slave_was_killed;
} }
#ifdef __GNUC__ #ifdef __GNUC__
......
...@@ -95,7 +95,7 @@ extern bool do_table_inited, ignore_table_inited, ...@@ -95,7 +95,7 @@ extern bool do_table_inited, ignore_table_inited,
extern bool table_rules_on; extern bool table_rules_on;
#ifndef DBUG_OFF #ifndef DBUG_OFF
extern int disconnect_slave_event_count ; extern int disconnect_slave_event_count, abort_slave_event_count ;
#endif #endif
// the master variables are defaults read from my.cnf or command line // the master variables are defaults read from my.cnf or command line
......
...@@ -462,7 +462,7 @@ pthread_handler_decl(handle_bootstrap,arg) ...@@ -462,7 +462,7 @@ pthread_handler_decl(handle_bootstrap,arg)
thd->proc_info=0; thd->proc_info=0;
thd->version=refresh_version; thd->version=refresh_version;
thd->priv_user=thd->user="boot"; thd->priv_user=thd->user=(char*)"boot";
buff= (char*) thd->net.buff; buff= (char*) thd->net.buff;
init_sql_alloc(&thd->mem_root,8192,8192); init_sql_alloc(&thd->mem_root,8192,8192);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment