Commit cfc4a2e7 authored by unknown's avatar unknown

fixed mutex bug ( or lack of it) when reading log entries on COM_BINLOG_DUMP

added error_code to query event, and checking to see if we get the
expected error on the slave
removed obsolete code from slave.cc


sql/log_event.cc:
  added mutex as an argument to FILE read_log_event functions
sql/log_event.h:
  added error_code for the query event
sql/mysqlbinlog.cc:
  fix for new form of read_log_event
sql/slave.cc:
  removed obsolte code with fetch nx table
  added check to see if the query runs with the same error on the slave
  as it did on the master
sql/sql_repl.cc:
  fixed mutex problem around read_log_event
parent ddae2009
...@@ -71,23 +71,39 @@ int Log_event::write_header(FILE* file) ...@@ -71,23 +71,39 @@ int Log_event::write_header(FILE* file)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
int Log_event::read_log_event(FILE* file, String* packet) int Log_event::read_log_event(FILE* file, String* packet,
pthread_mutex_t* log_lock)
{ {
ulong data_len; ulong data_len;
char buf[LOG_EVENT_HEADER_LEN]; char buf[LOG_EVENT_HEADER_LEN];
if(log_lock)
pthread_mutex_lock(log_lock);
if (my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP))) if (my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP)))
return feof(file) ? LOG_READ_EOF: LOG_READ_IO; {
if(log_lock) pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_EOF: LOG_READ_IO;
}
data_len = uint4korr(buf + EVENT_LEN_OFFSET); data_len = uint4korr(buf + EVENT_LEN_OFFSET);
if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN) if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN)
return LOG_READ_BOGUS; {
if(log_lock) pthread_mutex_unlock(log_lock);
return LOG_READ_BOGUS;
}
packet->append(buf, sizeof(buf)); packet->append(buf, sizeof(buf));
data_len -= LOG_EVENT_HEADER_LEN; data_len -= LOG_EVENT_HEADER_LEN;
if (!data_len) if (!data_len)
return 0; // the event does not have a data section {
if(log_lock) pthread_mutex_unlock(log_lock);
return 0; // the event does not have a data section
}
if (packet->append(file, data_len, MYF(MY_WME|MY_NABP))) if (packet->append(file, data_len, MYF(MY_WME|MY_NABP)))
return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO; {
if(log_lock)
pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO;
}
if(log_lock) pthread_mutex_unlock(log_lock);
return 0; return 0;
} }
...@@ -95,14 +111,18 @@ int Log_event::read_log_event(FILE* file, String* packet) ...@@ -95,14 +111,18 @@ int Log_event::read_log_event(FILE* file, String* packet)
// allocates memory - the caller is responsible for clean-up // allocates memory - the caller is responsible for clean-up
Log_event* Log_event::read_log_event(FILE* file) Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
{ {
time_t timestamp; time_t timestamp;
uint32 server_id; uint32 server_id;
char buf[LOG_EVENT_HEADER_LEN-4]; char buf[LOG_EVENT_HEADER_LEN-4];
if(log_lock) pthread_mutex_lock(log_lock);
if (my_fread(file, (byte *) buf, sizeof(buf), MY_NABP)) if (my_fread(file, (byte *) buf, sizeof(buf), MY_NABP))
return NULL; {
if(log_lock) pthread_mutex_unlock(log_lock);
return NULL;
}
timestamp = uint4korr(buf); timestamp = uint4korr(buf);
server_id = uint4korr(buf + 5); server_id = uint4korr(buf + 5);
...@@ -111,6 +131,8 @@ Log_event* Log_event::read_log_event(FILE* file) ...@@ -111,6 +131,8 @@ Log_event* Log_event::read_log_event(FILE* file)
case QUERY_EVENT: case QUERY_EVENT:
{ {
Query_log_event* q = new Query_log_event(file, timestamp, server_id); Query_log_event* q = new Query_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
if (!q->query) if (!q->query)
{ {
delete q; delete q;
...@@ -123,6 +145,8 @@ Log_event* Log_event::read_log_event(FILE* file) ...@@ -123,6 +145,8 @@ Log_event* Log_event::read_log_event(FILE* file)
case LOAD_EVENT: case LOAD_EVENT:
{ {
Load_log_event* l = new Load_log_event(file, timestamp, server_id); Load_log_event* l = new Load_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
if (!l->table_name) if (!l->table_name)
{ {
delete l; delete l;
...@@ -136,6 +160,8 @@ Log_event* Log_event::read_log_event(FILE* file) ...@@ -136,6 +160,8 @@ Log_event* Log_event::read_log_event(FILE* file)
case ROTATE_EVENT: case ROTATE_EVENT:
{ {
Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id); Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
if (!r->new_log_ident) if (!r->new_log_ident)
{ {
delete r; delete r;
...@@ -148,6 +174,8 @@ Log_event* Log_event::read_log_event(FILE* file) ...@@ -148,6 +174,8 @@ Log_event* Log_event::read_log_event(FILE* file)
case INTVAR_EVENT: case INTVAR_EVENT:
{ {
Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id); Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
if (e->type == INVALID_INT_EVENT) if (e->type == INVALID_INT_EVENT)
{ {
delete e; delete e;
...@@ -157,12 +185,25 @@ Log_event* Log_event::read_log_event(FILE* file) ...@@ -157,12 +185,25 @@ Log_event* Log_event::read_log_event(FILE* file)
return e; return e;
} }
case START_EVENT: return new Start_log_event(file, timestamp, server_id); case START_EVENT:
case STOP_EVENT: return new Stop_log_event(file, timestamp, server_id); {
default: return NULL; Start_log_event* e = new Start_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
return e;
}
case STOP_EVENT:
{
Stop_log_event* e = new Stop_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock);
return e;
}
default:
if(log_lock) pthread_mutex_unlock(log_lock);
return NULL;
} }
//impossible //impossible
if(log_lock) pthread_mutex_unlock(log_lock);
return NULL; return NULL;
} }
...@@ -356,6 +397,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg, ...@@ -356,6 +397,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
data_len -= QUERY_EVENT_OVERHEAD; data_len -= QUERY_EVENT_OVERHEAD;
exec_time = uint4korr(buf + 8); exec_time = uint4korr(buf + 8);
db_len = (uint)buf[12]; db_len = (uint)buf[12];
error_code = uint2korr(buf + 13);
if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME)))) if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
return; return;
...@@ -384,11 +426,12 @@ Query_log_event::Query_log_event(const char* buf, int max_buf): ...@@ -384,11 +426,12 @@ Query_log_event::Query_log_event(const char* buf, int max_buf):
data_len -= QUERY_EVENT_OVERHEAD; data_len -= QUERY_EVENT_OVERHEAD;
exec_time = uint4korr(buf + 8); exec_time = uint4korr(buf + 8);
error_code = uint2korr(buf + 13);
if (!(data_buf = (char*) my_malloc( data_len + 1, MYF(MY_WME)))) if (!(data_buf = (char*) my_malloc( data_len + 1, MYF(MY_WME))))
return; return;
memcpy(data_buf, buf + 13, data_len); memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len);
thread_id = uint4korr(buf + 4); thread_id = uint4korr(buf + 4);
db = data_buf; db = data_buf;
db_len = (uint)buf[12]; db_len = (uint)buf[12];
...@@ -402,8 +445,8 @@ void Query_log_event::print(FILE* file, bool short_form) ...@@ -402,8 +445,8 @@ void Query_log_event::print(FILE* file, bool short_form)
if (!short_form) if (!short_form)
{ {
print_header(file); print_header(file);
fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\n", fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
(ulong) thread_id, (ulong) exec_time); (ulong) thread_id, (ulong) exec_time, error_code);
} }
if(db && db[0]) if(db && db[0])
...@@ -423,7 +466,8 @@ int Query_log_event::write_data(FILE* file) ...@@ -423,7 +466,8 @@ int Query_log_event::write_data(FILE* file)
int4store(pos, exec_time); int4store(pos, exec_time);
pos += 4; pos += 4;
*pos++ = (char)db_len; *pos++ = (char)db_len;
int2store(pos, error_code);
pos += 2;
if (my_fwrite(file, (byte*) buf, (uint)(pos - buf), MYF(MY_NABP | MY_WME)) || if (my_fwrite(file, (byte*) buf, (uint)(pos - buf), MYF(MY_NABP | MY_WME)) ||
my_fwrite(file, (db) ? (byte*) db : (byte*)"", my_fwrite(file, (db) ? (byte*) db : (byte*)"",
......
...@@ -31,7 +31,8 @@ ...@@ -31,7 +31,8 @@
#define BINLOG_VERSION 1 #define BINLOG_VERSION 1
#define LOG_EVENT_HEADER_LEN 13 #define LOG_EVENT_HEADER_LEN 13
#define QUERY_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + sizeof(uchar)) #define QUERY_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \
sizeof(uchar) + sizeof(uint16))
#define LOAD_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \ #define LOAD_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \
+ sizeof(uint32) + 2 + sizeof(uint32)) + sizeof(uint32) + 2 + sizeof(uint32))
#define EVENT_LEN_OFFSET 9 #define EVENT_LEN_OFFSET 9
...@@ -88,11 +89,13 @@ public: ...@@ -88,11 +89,13 @@ public:
void print_timestamp(FILE* file, time_t *ts = 0); void print_timestamp(FILE* file, time_t *ts = 0);
void print_header(FILE* file); void print_header(FILE* file);
static Log_event* read_log_event(FILE* file); // if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(FILE* file, pthread_mutex_t* log_lock);
static Log_event* read_log_event(const char* buf, int max_buf); static Log_event* read_log_event(const char* buf, int max_buf);
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
static int read_log_event(FILE* file, String* packet); static int read_log_event(FILE* file, String* packet,
pthread_mutex_t* log_lock);
#endif #endif
}; };
...@@ -109,12 +112,14 @@ public: ...@@ -109,12 +112,14 @@ public:
// we pass it here, so we would not have to call strlen() // we pass it here, so we would not have to call strlen()
// otherwise, set it to 0, in which case, we compute it with strlen() // otherwise, set it to 0, in which case, we compute it with strlen()
uint32 db_len; uint32 db_len;
uint16 error_code;
int thread_id; int thread_id;
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
THD* thd; THD* thd;
Query_log_event(THD* thd_arg, const char* query_arg): Query_log_event(THD* thd_arg, const char* query_arg):
Log_event(thd_arg->start_time,0,0,thd_arg->server_id), data_buf(0), Log_event(thd_arg->start_time,0,0,thd_arg->server_id), data_buf(0),
query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length),
error_code(thd_arg->net.last_errno),
thread_id(thd_arg->thread_id), thd(thd_arg) thread_id(thd_arg->thread_id), thd(thd_arg)
{ {
time_t end_time; time_t end_time;
...@@ -142,6 +147,7 @@ public: ...@@ -142,6 +147,7 @@ public:
return q_len + db_len + 2 + return q_len + db_len + 2 +
sizeof(uint32) // thread_id sizeof(uint32) // thread_id
+ sizeof(uint32) // exec_time + sizeof(uint32) // exec_time
+ sizeof(uint16) // error_code
; ;
} }
......
...@@ -284,7 +284,9 @@ static void dump_remote_log_entries(const char* logname) ...@@ -284,7 +284,9 @@ static void dump_remote_log_entries(const char* logname)
break; // end of data break; // end of data
DBUG_PRINT("info",( "len= %u, net->read_pos[5] = %d\n", DBUG_PRINT("info",( "len= %u, net->read_pos[5] = %d\n",
len, net->read_pos[5])); len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event((const char*) net->read_pos + 1 , len); Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 ,
len);
if(ev) if(ev)
{ {
ev->print(stdout, short_form); ev->print(stdout, short_form);
...@@ -315,7 +317,7 @@ static void dump_local_log_entries(const char* logname) ...@@ -315,7 +317,7 @@ static void dump_local_log_entries(const char* logname)
while(1) while(1)
{ {
Log_event* ev = Log_event::read_log_event(file); Log_event* ev = Log_event::read_log_event(file, 0);
if(!ev) if(!ev)
if(!feof(file)) if(!feof(file))
die("Could not read entry at offset %ld : Error in log format or \ die("Could not read entry at offset %ld : Error in log format or \
......
...@@ -599,36 +599,23 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -599,36 +599,23 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->query_id = query_id++; thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count)); VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->last_nx_table = thd->last_nx_db = 0; thd->last_nx_table = thd->last_nx_db = 0;
for(;;) thd->query_error = 0; // clear error
{ thd->net.last_errno = 0;
thd->query_error = 0; // clear error thd->net.last_error[0] = 0;
thd->last_nx_table = thd->last_nx_db = 0; mysql_parse(thd, thd->query, q_len);
thd->net.last_errno = 0; int expected_error,actual_error;
thd->net.last_error[0] = 0; if((expected_error = qev->error_code) !=
mysql_parse(thd, thd->query, q_len); // try query (actual_error = thd->net.last_errno) && expected_error)
if(!thd->query_error || slave_killed(thd)) // break if ok
break;
// if not ok
if(thd->last_nx_table && thd->last_nx_db)
{ {
// for now, let's just fail if the table is not sql_print_error("Slave: did not get the expected error\
// there, and not try to be a smart alec... running query from master - expected: '%s', got '%s'",
ER(expected_error),
// if table was not there actual_error ? ER(actual_error):"no error"
//if(fetch_nx_table(thd,&glob_mi)) );
// try to to fetch from master thd->query_error = 1;
break; // if we can't, just break
} }
else else if(expected_error == actual_error)
break; // if failed for some other reason, bail out thd->query_error = 0;
// if fetched the table from master successfully
// we need to restore query info in thd because
// fetch_nx_table executes create table
thd->query = (char*)qev->query;
thd->set_time((time_t)qev->when);
thd->current_tablenr = 0;
}
} }
thd->db = 0;// prevent db from being freed thd->db = 0;// prevent db from being freed
thd->query = 0; // just to be sure thd->query = 0; // just to be sure
......
...@@ -121,7 +121,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -121,7 +121,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
errmsg = "Could not find first log"; errmsg = "Could not find first log";
goto err; goto err;
} }
log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME)); log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
if(!log) if(!log)
{ {
...@@ -143,14 +143,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -143,14 +143,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
while(!net->error && net->vio != 0 && !thd->killed) while(!net->error && net->vio != 0 && !thd->killed)
{ {
while(!(error = Log_event::read_log_event(log, packet))) pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
while(!(error = Log_event::read_log_event(log, packet, log_lock)))
{ {
if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
{ {
errmsg = "Failed on my_net_write()"; errmsg = "Failed on my_net_write()";
goto err; goto err;
} }
DBUG_PRINT("info", ("log event code %d",(*packet)[LOG_EVENT_OFFSET+1] )); DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] ));
if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{ {
if(send_file(thd)) if(send_file(thd))
...@@ -168,7 +171,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -168,7 +171,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
goto err; goto err;
} }
if(!(flags & BINLOG_DUMP_NON_BLOCK) && mysql_bin_log.is_active(log_file_name)) if(!(flags & BINLOG_DUMP_NON_BLOCK) &&
mysql_bin_log.is_active(log_file_name))
// block until there is more data in the log // block until there is more data in the log
// unless non-blocking mode requested // unless non-blocking mode requested
{ {
...@@ -183,7 +187,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -183,7 +187,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
// if we did not miss anything, we just wait for other threads // if we did not miss anything, we just wait for other threads
// to signal us // to signal us
{ {
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
clearerr(log); clearerr(log);
// tell the kill thread how to wake us up // tell the kill thread how to wake us up
...@@ -196,18 +199,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -196,18 +199,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
bool read_packet = 0, fatal_error = 0; bool read_packet = 0, fatal_error = 0;
pthread_mutex_lock(log_lock); // no one will update the log while we are reading // no one will update the log while we are reading
// now, but we'll be quick and just read one record // now, but we'll be quick and just read one record
switch(Log_event::read_log_event(log, packet, log_lock))
switch(Log_event::read_log_event(log, packet))
{ {
case 0: case 0:
read_packet = 1; // we read successfully, so we'll need to send it to the read_packet = 1;
// we read successfully, so we'll need to send it to the
// slave // slave
break; break;
case LOG_READ_EOF: case LOG_READ_EOF:
pthread_mutex_lock(log_lock);
pthread_cond_wait(&COND_binlog_update, log_lock); pthread_cond_wait(&COND_binlog_update, log_lock);
pthread_mutex_unlock(log_lock);
break; break;
default: default:
...@@ -215,7 +219,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -215,7 +219,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
break; break;
} }
pthread_mutex_unlock(log_lock);
pthread_mutex_lock(&thd->mysys_var->mutex); pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= 0; thd->mysys_var->current_mutex= 0;
...@@ -275,7 +278,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -275,7 +278,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
break; break;
(void) my_fclose(log, MYF(MY_WME)); (void) my_fclose(log, MYF(MY_WME));
log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME)); log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
if(!log) if(!log)
goto err; goto err;
// fake Rotate_log event just in case it did not make it to the log // fake Rotate_log event just in case it did not make it to the log
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment