Commit 80e61ae2 authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup: LOAD DATA replication support in IO_CACHE

remove some 14-year old code that added support for
LOAD DATA replication to IO_CACHE:
* three callbacks, of which only two were actually used and that
  were only needed for LOAD DATA replication but were
  tested in every IO_CACHE instance
* an additional opaque void * argument in IO_CACHE, also only
  used for LOAD DATA replication, but present everywhere
* the code to close IO_CACHE prematurely in LOAD DATA to have
  these callbacks called in the correct order and a long
  comment explaining what will happen if IO_CACHE is not
  closed prematurely
* a variable to track whether IO_CACHE was closed prematurely
  (to avoid double-closing it)
parent 91dab5dd
...@@ -364,7 +364,6 @@ typedef struct st_dynamic_string ...@@ -364,7 +364,6 @@ typedef struct st_dynamic_string
} DYNAMIC_STRING; } DYNAMIC_STRING;
struct st_io_cache; struct st_io_cache;
typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
typedef struct st_io_cache_share typedef struct st_io_cache_share
{ {
...@@ -460,23 +459,12 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -460,23 +459,12 @@ typedef struct st_io_cache /* Used when cacheing files */
results. Details to be documented later results. Details to be documented later
*/ */
enum cache_type type; enum cache_type type;
/*
Callbacks when the actual read I/O happens. These were added and
are currently used for binary logging of LOAD DATA INFILE - when a
block is read from the file, we create a block create/append event, and
when IO_CACHE is closed, we create an end event. These functions could,
of course be used for other things
*/
IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
/* /*
Counts the number of times, when we were forced to use disk. We use it to Counts the number of times, when we were forced to use disk. We use it to
increase the binlog_cache_disk_use and binlog_stmt_cache_disk_use status increase the binlog_cache_disk_use and binlog_stmt_cache_disk_use status
variables. variables.
*/ */
ulong disk_writes; ulong disk_writes;
void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */ char *file_name; /* if used with 'open_cached_file' */
const char *dir; const char *dir;
char prefix[3]; char prefix[3];
......
/* /*
Copyright (c) 2000, 2011, Oracle and/or its affiliates Copyright (c) 2000, 2011, Oracle and/or its affiliates
Copyright (c) 2010, 2015, MariaDB
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -157,8 +158,6 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, ...@@ -157,8 +158,6 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
info->file= file; info->file= file;
info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */ info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
info->pos_in_file= seek_offset; info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
info->alloced_buffer = 0; info->alloced_buffer = 0;
info->buffer=0; info->buffer=0;
info->seek_not_done= 0; info->seek_not_done= 0;
...@@ -1500,13 +1499,8 @@ int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count) ...@@ -1500,13 +1499,8 @@ int _my_b_async_read(register IO_CACHE *info, uchar *Buffer, size_t Count)
int _my_b_get(IO_CACHE *info) int _my_b_get(IO_CACHE *info)
{ {
uchar buff; uchar buff;
IO_CACHE_CALLBACK pre_read,post_read;
if ((pre_read = info->pre_read))
(*pre_read)(info);
if ((*(info)->read_function)(info,&buff,1)) if ((*(info)->read_function)(info,&buff,1))
return my_b_EOF; return my_b_EOF;
if ((post_read = info->post_read))
(*post_read)(info);
return (int) (uchar) buff; return (int) (uchar) buff;
} }
...@@ -1821,7 +1815,6 @@ int my_b_flush_io_cache(IO_CACHE *info, ...@@ -1821,7 +1815,6 @@ int my_b_flush_io_cache(IO_CACHE *info,
int end_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info)
{ {
int error=0; int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info)); DBUG_PRINT("enter",("cache: 0x%lx", (ulong) info));
...@@ -1831,11 +1824,6 @@ int end_io_cache(IO_CACHE *info) ...@@ -1831,11 +1824,6 @@ int end_io_cache(IO_CACHE *info)
*/ */
DBUG_ASSERT(!info->share || !info->share->total_threads); DBUG_ASSERT(!info->share || !info->share->total_threads);
if ((pre_close=info->pre_close))
{
(*pre_close)(info);
info->pre_close= 0;
}
if (info->alloced_buffer) if (info->alloced_buffer)
{ {
info->alloced_buffer=0; info->alloced_buffer=0;
......
...@@ -74,8 +74,6 @@ class READ_INFO { ...@@ -74,8 +74,6 @@ class READ_INFO {
int field_term_char,line_term_char,enclosed_char,escape_char; int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos; int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof; bool found_end_of_line,start_of_line,eof;
bool need_end_io_cache;
IO_CACHE cache;
NET *io_net; NET *io_net;
int level; /* for load xml */ int level; /* for load xml */
...@@ -84,6 +82,7 @@ class READ_INFO { ...@@ -84,6 +82,7 @@ class READ_INFO {
uchar *row_start, /* Found row starts here */ uchar *row_start, /* Found row starts here */
*row_end; /* Found row ends here */ *row_end; /* Found row ends here */
CHARSET_INFO *read_charset; CHARSET_INFO *read_charset;
LOAD_FILE_IO_CACHE cache;
READ_INFO(File file,uint tot_length,CHARSET_INFO *cs, READ_INFO(File file,uint tot_length,CHARSET_INFO *cs,
String &field_term,String &line_start,String &line_term, String &field_term,String &line_start,String &line_term,
...@@ -101,25 +100,9 @@ class READ_INFO { ...@@ -101,25 +100,9 @@ class READ_INFO {
int read_xml(); int read_xml();
int clear_level(int level); int clear_level(int level);
/*
We need to force cache close before destructor is invoked to log
the last read block
*/
void end_io_cache()
{
::end_io_cache(&cache);
need_end_io_cache = 0;
}
my_off_t file_length() { return cache.end_of_file; } my_off_t file_length() { return cache.end_of_file; }
my_off_t position() { return my_b_tell(&cache); } my_off_t position() { return my_b_tell(&cache); }
/*
Either this method, or we need to make cache public
Arg must be set from mysql_load() since constructor does not see
either the table or THD value
*/
void set_io_cache_arg(void* arg) { cache.arg = arg; }
/** /**
skip all data till the eof. skip all data till the eof.
*/ */
...@@ -187,7 +170,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -187,7 +170,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
String *enclosed=ex->enclosed; String *enclosed=ex->enclosed;
bool is_fifo=0; bool is_fifo=0;
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
LOAD_FILE_INFO lf_info;
killed_state killed_status; killed_state killed_status;
bool is_concurrent; bool is_concurrent;
#endif #endif
...@@ -453,11 +435,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -453,11 +435,10 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
{ {
lf_info.thd = thd; read_info.cache.thd = thd;
lf_info.wrote_create_file = 0; read_info.cache.wrote_create_file = 0;
lf_info.last_pos_in_file = HA_POS_ERROR; read_info.cache.last_pos_in_file = HA_POS_ERROR;
lf_info.log_delayed= transactional_table; read_info.cache.log_delayed= transactional_table;
read_info.set_io_cache_arg((void*) &lf_info);
} }
#endif /*!EMBEDDED_LIBRARY*/ #endif /*!EMBEDDED_LIBRARY*/
...@@ -553,30 +534,12 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -553,30 +534,12 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
{ {
{ {
/* /*
Make sure last block (the one which caused the error) gets Make sure last block (the one which caused the error) gets
logged. This is needed because otherwise after write of (to logged.
the binlog, not to read_info (which is a cache))
Delete_file_log_event the bad block will remain in read_info
(because pre_read is not called at the end of the last
block; remember pre_read is called whenever a new block is
read from disk). At the end of mysql_load(), the destructor
of read_info will call end_io_cache() which will flush
read_info, so we will finally have this in the binlog:
Append_block # The last successfull block
Delete_file
Append_block # The failing block
which is nonsense.
Or could also be (for a small file)
Create_file # The failing block
which is nonsense (Delete_file is not written in this case, because:
Create_file has not been written, so Delete_file is not written, then
when read_info is destroyed end_io_cache() is called which writes
Create_file.
*/ */
read_info.end_io_cache(); log_loaded_block(&read_info.cache, 0, 0);
/* If the file was not empty, wrote_create_file is true */ /* If the file was not empty, wrote_create_file is true */
if (lf_info.wrote_create_file) if (read_info.cache.wrote_create_file)
{ {
int errcode= query_error_code(thd, killed_status == NOT_KILLED); int errcode= query_error_code(thd, killed_status == NOT_KILLED);
...@@ -626,12 +589,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -626,12 +589,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
else else
{ {
/* /*
As already explained above, we need to call end_io_cache() or the last As already explained above, we need to call log_loaded_block() to have
block will be logged only after Execute_load_query_log_event (which is the last block logged
wrong), when read_info is destroyed.
*/ */
read_info.end_io_cache(); log_loaded_block(&read_info.cache, 0, 0);
if (lf_info.wrote_create_file) if (read_info.cache.wrote_create_file)
{ {
int errcode= query_error_code(thd, killed_status == NOT_KILLED); int errcode= query_error_code(thd, killed_status == NOT_KILLED);
error= write_execute_load_query_log_event(thd, ex, error= write_execute_load_query_log_event(thd, ex,
...@@ -1350,7 +1312,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, ...@@ -1350,7 +1312,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
String &enclosed_par, int escape, bool get_it_from_net, String &enclosed_par, int escape, bool get_it_from_net,
bool is_fifo) bool is_fifo)
:file(file_par), buffer(NULL), buff_length(tot_length), escape_char(escape), :file(file_par), buffer(NULL), buff_length(tot_length), escape_char(escape),
found_end_of_line(false), eof(false), need_end_io_cache(false), found_end_of_line(false), eof(false),
error(false), line_cuted(false), found_null(false), read_charset(cs) error(false), line_cuted(false), found_null(false), read_charset(cs)
{ {
/* /*
...@@ -1410,20 +1372,15 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, ...@@ -1410,20 +1372,15 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
} }
else else
{ {
/*
init_io_cache() will not initialize read_function member
if the cache is READ_NET. So we work around the problem with a
manual assignment
*/
need_end_io_cache = 1;
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (get_it_from_net) if (get_it_from_net)
cache.read_function = _my_b_net_read; cache.read_function = _my_b_net_read;
if (mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
cache.pre_read = cache.pre_close = {
(IO_CACHE_CALLBACK) log_loaded_block; cache.real_read_function= cache.read_function;
cache.read_function= log_loaded_block;
}
#endif #endif
} }
} }
...@@ -1432,8 +1389,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs, ...@@ -1432,8 +1389,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, CHARSET_INFO *cs,
READ_INFO::~READ_INFO() READ_INFO::~READ_INFO()
{ {
if (need_end_io_cache) ::end_io_cache(&cache);
::end_io_cache(&cache);
my_free(buffer); my_free(buffer);
List_iterator<XML_TAG> xmlit(taglist); List_iterator<XML_TAG> xmlit(taglist);
XML_TAG *t; XML_TAG *t;
......
...@@ -4119,20 +4119,20 @@ bool show_binlogs(THD* thd) ...@@ -4119,20 +4119,20 @@ bool show_binlogs(THD* thd)
@retval 0 success @retval 0 success
@retval 1 failure @retval 1 failure
*/ */
int log_loaded_block(IO_CACHE* file) int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count)
{ {
DBUG_ENTER("log_loaded_block"); DBUG_ENTER("log_loaded_block");
LOAD_FILE_INFO *lf_info; LOAD_FILE_IO_CACHE *lf_info= static_cast<LOAD_FILE_IO_CACHE*>(file);
uint block_len; uint block_len;
/* buffer contains position where we started last read */ /* buffer contains position where we started last read */
uchar* buffer= (uchar*) my_b_get_buffer_start(file); uchar* buffer= (uchar*) my_b_get_buffer_start(file);
uint max_event_size= current_thd->variables.max_allowed_packet; uint max_event_size= lf_info->thd->variables.max_allowed_packet;
lf_info= (LOAD_FILE_INFO*) file->arg;
if (lf_info->thd->is_current_stmt_binlog_format_row()) if (lf_info->thd->is_current_stmt_binlog_format_row())
DBUG_RETURN(0); goto ret;
if (lf_info->last_pos_in_file != HA_POS_ERROR && if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file)) lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
DBUG_RETURN(0); goto ret;
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0; for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
buffer += MY_MIN(block_len, max_event_size), buffer += MY_MIN(block_len, max_event_size),
...@@ -4158,7 +4158,9 @@ int log_loaded_block(IO_CACHE* file) ...@@ -4158,7 +4158,9 @@ int log_loaded_block(IO_CACHE* file)
lf_info->wrote_create_file= 1; lf_info->wrote_create_file= 1;
} }
} }
DBUG_RETURN(0); ret:
int res= Buffer ? lf_info->real_read_function(file, Buffer, Count) : 0;
DBUG_RETURN(res);
} }
......
...@@ -56,14 +56,15 @@ extern int init_master_info(Master_info* mi); ...@@ -56,14 +56,15 @@ extern int init_master_info(Master_info* mi);
void kill_zombie_dump_threads(uint32 slave_server_id); void kill_zombie_dump_threads(uint32 slave_server_id);
int check_binlog_magic(IO_CACHE* log, const char** errmsg); int check_binlog_magic(IO_CACHE* log, const char** errmsg);
typedef struct st_load_file_info struct LOAD_FILE_IO_CACHE : public IO_CACHE
{ {
THD* thd; THD* thd;
my_off_t last_pos_in_file; my_off_t last_pos_in_file;
bool wrote_create_file, log_delayed; bool wrote_create_file, log_delayed;
} LOAD_FILE_INFO; int (*real_read_function)(struct st_io_cache *,uchar *,size_t);
};
int log_loaded_block(IO_CACHE* file); int log_loaded_block(IO_CACHE* file, uchar *Buffer, size_t Count);
int init_replication_sys_vars(); int init_replication_sys_vars();
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags); void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
...@@ -80,6 +81,10 @@ int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog); ...@@ -80,6 +81,10 @@ int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
bool rpl_gtid_pos_check(THD *thd, char *str, size_t len); bool rpl_gtid_pos_check(THD *thd, char *str, size_t len);
bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
#else
struct LOAD_FILE_IO_CACHE : public IO_CACHE { };
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
#endif /* SQL_REPL_INCLUDED */ #endif /* SQL_REPL_INCLUDED */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment