SCRUM: 'Replication: PURGE LOGS with date' task

Added:

PURGE [MASTER] LOGS BEFORE date/date_expression
expire-logs-days option
  With this option old files are deleted when
      - mysqld is started
      - log is rotated
      - someone does FLUSH LOGS


parent d14db396
...@@ -64,6 +64,7 @@ static SYMBOL symbols[] = { ...@@ -64,6 +64,7 @@ static SYMBOL symbols[] = {
{ "AVG_ROW_LENGTH", SYM(AVG_ROW_LENGTH),0,0}, { "AVG_ROW_LENGTH", SYM(AVG_ROW_LENGTH),0,0},
{ "AUTO_INCREMENT", SYM(AUTO_INC),0,0}, { "AUTO_INCREMENT", SYM(AUTO_INC),0,0},
{ "BACKUP", SYM(BACKUP_SYM),0,0}, { "BACKUP", SYM(BACKUP_SYM),0,0},
{ "BEFORE", SYM(BEFORE_SYM),0,0},
{ "BEGIN", SYM(BEGIN_SYM),0,0}, { "BEGIN", SYM(BEGIN_SYM),0,0},
{ "BERKELEYDB", SYM(BERKELEY_DB_SYM),0,0}, { "BERKELEYDB", SYM(BERKELEY_DB_SYM),0,0},
{ "BDB", SYM(BERKELEY_DB_SYM),0,0}, { "BDB", SYM(BERKELEY_DB_SYM),0,0},
......
...@@ -746,6 +746,78 @@ err: ...@@ -746,6 +746,78 @@ err:
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/*
Remove all logs before the given file date from disk and from the
index file.
SYNOPSIS
purge_logs_before_date()
thd Thread pointer
before_date Delete all log files before given date.
NOTES
If any of the logs before the deleted one is in use,
only purge logs up to this one.
RETURN VALUES
0 ok
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
*/
int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time)
{
int error;
LOG_INFO log_info;
MY_STAT stat_area;
DBUG_ENTER("purge_logs_before_date");
if (no_rotate)
DBUG_RETURN(LOG_INFO_PURGE_NO_ROTATE);
pthread_mutex_lock(&LOCK_index);
/*
Delete until we find curren file
or a file that is used or a file
that is older than purge_time.
*/
if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))
goto err;
while (strcmp(log_file_name, log_info.log_file_name) &&
!log_in_use(log_info.log_file_name))
{
/* It's not fatal even if we can't delete a log file */
if (my_stat(log_info.log_file_name, &stat_area, MYF(0)) &&
stat_area.st_mtime < purge_time)
my_delete(log_info.log_file_name, MYF(0));
else
break;
if (find_next_log(&log_info, 0))
break;
}
/*
If we get killed -9 here, the sysadmin would have to edit
the log index file after restart - otherwise, this should be safe
*/
if (copy_up_file_and_fill(&index_file, log_info.index_file_start_offset))
{
error= LOG_INFO_IO;
goto err;
}
// now update offsets in index file for running threads
adjust_linfo_offsets(log_info.index_file_start_offset);
err:
pthread_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
/* /*
Create a new log file name Create a new log file name
...@@ -1033,6 +1105,7 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, ...@@ -1033,6 +1105,7 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command,
bool MYSQL_LOG::write(Log_event* event_info) bool MYSQL_LOG::write(Log_event* event_info)
{ {
bool error=0; bool error=0;
bool should_rotate = 0;
DBUG_ENTER("MYSQL_LOG::write(event)"); DBUG_ENTER("MYSQL_LOG::write(event)");
if (!inited) // Can't use mutex if not init if (!inited) // Can't use mutex if not init
...@@ -1045,7 +1118,6 @@ bool MYSQL_LOG::write(Log_event* event_info) ...@@ -1045,7 +1118,6 @@ bool MYSQL_LOG::write(Log_event* event_info)
/* In most cases this is only called if 'is_open()' is true */ /* In most cases this is only called if 'is_open()' is true */
if (is_open()) if (is_open())
{ {
bool should_rotate = 0;
THD *thd=event_info->thd; THD *thd=event_info->thd;
const char *local_db = event_info->get_db(); const char *local_db = event_info->get_db();
#ifdef USING_TRANSACTIONS #ifdef USING_TRANSACTIONS
...@@ -1163,6 +1235,13 @@ err: ...@@ -1163,6 +1235,13 @@ err:
} }
pthread_mutex_unlock(&LOCK_log); pthread_mutex_unlock(&LOCK_log);
if (should_rotate && ~expire_logs_days)
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
error= purge_logs_before_date(current_thd, purge_time);
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -703,6 +703,7 @@ extern ulong binlog_cache_size, max_binlog_cache_size, open_files_limit; ...@@ -703,6 +703,7 @@ extern ulong binlog_cache_size, max_binlog_cache_size, open_files_limit;
extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size; extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size;
extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log; extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log;
extern ulong specialflag, current_pid; extern ulong specialflag, current_pid;
extern ulong expire_logs_days;
extern uint test_flags,select_errors,ha_open_options; extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables; extern uint protocol_version,dropping_tables;
......
...@@ -392,6 +392,7 @@ ulong max_connections,max_insert_delayed_threads,max_used_connections, ...@@ -392,6 +392,7 @@ ulong max_connections,max_insert_delayed_threads,max_used_connections,
max_connect_errors, max_user_connections = 0; max_connect_errors, max_user_connections = 0;
ulong thread_id=1L,current_pid; ulong thread_id=1L,current_pid;
ulong slow_launch_threads = 0; ulong slow_launch_threads = 0;
ulong expire_logs_days = ~0L;
char mysql_real_data_home[FN_REFLEN], char mysql_real_data_home[FN_REFLEN],
language[LIBLEN],reg_ext[FN_EXTLEN], language[LIBLEN],reg_ext[FN_EXTLEN],
...@@ -2158,6 +2159,12 @@ The server will not act as a slave."); ...@@ -2158,6 +2159,12 @@ The server will not act as a slave.");
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin", open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
opt_binlog_index_name,LOG_BIN); opt_binlog_index_name,LOG_BIN);
using_update_log=1; using_update_log=1;
if (~expire_logs_days)
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(current_thd, purge_time);
}
} }
...@@ -3216,7 +3223,8 @@ enum options { ...@@ -3216,7 +3223,8 @@ enum options {
OPT_BDB_MAX_LOCK, OPT_BDB_MAX_LOCK,
OPT_ENABLE_SHARED_MEMORY, OPT_ENABLE_SHARED_MEMORY,
OPT_SHARED_MEMORY_BASE_NAME, OPT_SHARED_MEMORY_BASE_NAME,
OPT_OLD_PASSWORDS OPT_OLD_PASSWORDS,
OPT_EXPIRE_LOGS_DAYS
}; };
...@@ -4019,6 +4027,11 @@ struct my_option my_long_options[] = ...@@ -4019,6 +4027,11 @@ struct my_option my_long_options[] =
(gptr*) &global_system_variables.net_wait_timeout, (gptr*) &global_system_variables.net_wait_timeout,
(gptr*) &max_system_variables.net_wait_timeout, 0, GET_ULONG, (gptr*) &max_system_variables.net_wait_timeout, 0, GET_ULONG,
REQUIRED_ARG, NET_WAIT_TIMEOUT, 1, LONG_TIMEOUT, 0, 1, 0}, REQUIRED_ARG, NET_WAIT_TIMEOUT, 1, LONG_TIMEOUT, 0, 1, 0},
{"expire_logs_days", OPT_EXPIRE_LOGS_DAYS,
"Logs will be rotated after expire-log-days days. ",
(gptr*) &expire_logs_days,
(gptr*) &expire_logs_days, 0, GET_ULONG,
REQUIRED_ARG, ~0L, 0, 99, 0, 1, 0},
{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
...@@ -4064,6 +4077,7 @@ struct show_var_st status_vars[]= { ...@@ -4064,6 +4077,7 @@ struct show_var_st status_vars[]= {
{"Com_lock_tables", (char*) (com_stat+(uint) SQLCOM_LOCK_TABLES),SHOW_LONG}, {"Com_lock_tables", (char*) (com_stat+(uint) SQLCOM_LOCK_TABLES),SHOW_LONG},
{"Com_optimize", (char*) (com_stat+(uint) SQLCOM_OPTIMIZE),SHOW_LONG}, {"Com_optimize", (char*) (com_stat+(uint) SQLCOM_OPTIMIZE),SHOW_LONG},
{"Com_purge", (char*) (com_stat+(uint) SQLCOM_PURGE),SHOW_LONG}, {"Com_purge", (char*) (com_stat+(uint) SQLCOM_PURGE),SHOW_LONG},
{"Com_purge_before_date", (char*) (com_stat+(uint) SQLCOM_PURGE_BEFORE),SHOW_LONG},
{"Com_rename_table", (char*) (com_stat+(uint) SQLCOM_RENAME_TABLE),SHOW_LONG}, {"Com_rename_table", (char*) (com_stat+(uint) SQLCOM_RENAME_TABLE),SHOW_LONG},
{"Com_repair", (char*) (com_stat+(uint) SQLCOM_REPAIR),SHOW_LONG}, {"Com_repair", (char*) (com_stat+(uint) SQLCOM_REPAIR),SHOW_LONG},
{"Com_replace", (char*) (com_stat+(uint) SQLCOM_REPLACE),SHOW_LONG}, {"Com_replace", (char*) (com_stat+(uint) SQLCOM_REPLACE),SHOW_LONG},
......
...@@ -134,6 +134,7 @@ public: ...@@ -134,6 +134,7 @@ public:
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name); bool is_active(const char* log_file_name);
int purge_logs(THD* thd, const char* to_log); int purge_logs(THD* thd, const char* to_log);
int purge_logs_before_date(THD* thd, time_t purge_time);
int purge_first_log(struct st_relay_log_info* rli); int purge_first_log(struct st_relay_log_info* rli);
bool reset_logs(THD* thd); bool reset_logs(THD* thd);
// if we are exiting, we also want to close the index file // if we are exiting, we also want to close the index file
......
...@@ -64,7 +64,7 @@ enum enum_sql_command { ...@@ -64,7 +64,7 @@ enum enum_sql_command {
SQLCOM_ROLLBACK, SQLCOM_COMMIT, SQLCOM_SLAVE_START, SQLCOM_SLAVE_STOP, SQLCOM_ROLLBACK, SQLCOM_COMMIT, SQLCOM_SLAVE_START, SQLCOM_SLAVE_STOP,
SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_CHANGE_MASTER, SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_CHANGE_MASTER,
SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE, SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE,
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS, SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_PURGE_BEFORE, SQLCOM_SHOW_BINLOGS,
SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA, SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA,
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ, SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ,
SQLCOM_SHOW_SLAVE_HOSTS, SQLCOM_DELETE_MULTI, SQLCOM_UPDATE_MULTI, SQLCOM_SHOW_SLAVE_HOSTS, SQLCOM_DELETE_MULTI, SQLCOM_UPDATE_MULTI,
...@@ -413,6 +413,7 @@ typedef struct st_lex ...@@ -413,6 +413,7 @@ typedef struct st_lex
char *length,*dec,*change,*name; char *length,*dec,*change,*name;
char *backup_dir; /* For RESTORE/BACKUP */ char *backup_dir; /* For RESTORE/BACKUP */
char* to_log; /* For PURGE MASTER LOGS TO */ char* to_log; /* For PURGE MASTER LOGS TO */
time_t purge_time; /* For PURGE MASTER LOGS BEFORE */
char* x509_subject,*x509_issuer,*ssl_cipher; char* x509_subject,*x509_issuer,*ssl_cipher;
char* found_colon; /* For multi queries - next query */ char* found_colon; /* For multi queries - next query */
enum SSL_type ssl_type; /* defined in violite.h */ enum SSL_type ssl_type; /* defined in violite.h */
......
...@@ -1665,9 +1665,18 @@ mysql_execute_command(THD *thd) ...@@ -1665,9 +1665,18 @@ mysql_execute_command(THD *thd)
{ {
if (check_global_access(thd, SUPER_ACL)) if (check_global_access(thd, SUPER_ACL))
goto error; goto error;
// PURGE MASTER LOGS TO 'file'
res = purge_master_logs(thd, lex->to_log); res = purge_master_logs(thd, lex->to_log);
break; break;
} }
case SQLCOM_PURGE_BEFORE:
{
if (check_global_access(thd, SUPER_ACL))
goto error;
// PURGE MASTER LOGS BEFORE 'data'
res = purge_master_logs_before_date(thd, lex->purge_time);
break;
}
case SQLCOM_SHOW_WARNS: case SQLCOM_SHOW_WARNS:
{ {
res= mysqld_show_warnings(thd, (ulong) res= mysqld_show_warnings(thd, (ulong)
...@@ -2742,7 +2751,7 @@ mysql_execute_command(THD *thd) ...@@ -2742,7 +2751,7 @@ mysql_execute_command(THD *thd)
if (check_global_access(thd,RELOAD_ACL) || check_db_used(thd, tables)) if (check_global_access(thd,RELOAD_ACL) || check_db_used(thd, tables))
goto error; goto error;
/* error sending is deferred to reload_acl_and_cache */ /* error sending is deferred to reload_acl_and_cache */
reload_acl_and_cache(thd, lex->type, tables) ; reload_acl_and_cache(thd, lex->type, tables);
break; break;
case SQLCOM_KILL: case SQLCOM_KILL:
kill_one_thread(thd,lex->thread_id); kill_one_thread(thd,lex->thread_id);
...@@ -3804,6 +3813,12 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables) ...@@ -3804,6 +3813,12 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
mysql_log.new_file(1); mysql_log.new_file(1);
mysql_update_log.new_file(1); mysql_update_log.new_file(1);
mysql_bin_log.new_file(1); mysql_bin_log.new_file(1);
if (~expire_logs_days)
{
long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(thd, purge_time);
}
mysql_slow_log.new_file(1); mysql_slow_log.new_file(1);
if (ha_flush_logs()) if (ha_flush_logs())
result=1; result=1;
......
...@@ -256,15 +256,10 @@ bool log_in_use(const char* log_name) ...@@ -256,15 +256,10 @@ bool log_in_use(const char* log_name)
return result; return result;
} }
int purge_error_message(THD* thd, int res)
int purge_master_logs(THD* thd, const char* to_log)
{ {
char search_file_name[FN_REFLEN];
const char* errmsg = 0; const char* errmsg = 0;
mysql_bin_log.make_log_name(search_file_name, to_log);
int res = mysql_bin_log.purge_logs(thd, search_file_name);
switch(res) { switch(res) {
case 0: break; case 0: break;
case LOG_INFO_EOF: errmsg = "Target log not found in binlog index"; break; case LOG_INFO_EOF: errmsg = "Target log not found in binlog index"; break;
...@@ -288,10 +283,26 @@ binlog purge"; break; ...@@ -288,10 +283,26 @@ binlog purge"; break;
} }
else else
send_ok(thd); send_ok(thd);
return 0; return 0;
} }
int purge_master_logs(THD* thd, const char* to_log)
{
char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log);
int res = mysql_bin_log.purge_logs(thd, search_file_name);
return purge_error_message(thd, res);
}
int purge_master_logs_before_date(THD* thd, time_t purge_time)
{
int res = mysql_bin_log.purge_logs_before_date(thd, purge_time);
return purge_error_message(thd ,res);
}
/* /*
TODO: Clean up loop to only have one call to send_file() TODO: Clean up loop to only have one call to send_file()
*/ */
......
...@@ -33,6 +33,7 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, ...@@ -33,6 +33,7 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
int reset_slave(THD *thd, MASTER_INFO* mi); int reset_slave(THD *thd, MASTER_INFO* mi);
int reset_master(THD* thd); int reset_master(THD* thd);
int purge_master_logs(THD* thd, const char* to_log); int purge_master_logs(THD* thd, const char* to_log);
int purge_master_logs_before_date(THD* thd, time_t purge_time);
bool log_in_use(const char* log_name); bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset); void adjust_linfo_offsets(my_off_t purge_offset);
int show_binlogs(THD* thd); int show_binlogs(THD* thd);
......
...@@ -528,6 +528,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize); ...@@ -528,6 +528,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b,int *yystacksize);
%token CIPHER_SYM %token CIPHER_SYM
%token HELP %token HELP
%token BEFORE_SYM
%left SET_VAR %left SET_VAR
%left OR_OR_CONCAT OR %left OR_OR_CONCAT OR
...@@ -3558,13 +3559,34 @@ purge: ...@@ -3558,13 +3559,34 @@ purge:
PURGE PURGE
{ {
LEX *lex=Lex; LEX *lex=Lex;
lex->sql_command = SQLCOM_PURGE;
lex->type=0; lex->type=0;
} purge_options
{}
;
purge_options:
LOGS_SYM
purge_option
| MASTER_SYM LOGS_SYM
purge_option;
purge_option:
TO_SYM TEXT_STRING
{
Lex->sql_command = SQLCOM_PURGE;
Lex->to_log = $2.str;
} }
MASTER_SYM LOGS_SYM TO_SYM TEXT_STRING | BEFORE_SYM expr
{ {
Lex->to_log = $6.str; if ($2->check_cols(1) || $2->fix_fields(Lex->thd, 0, &$2))
} ; {
net_printf(Lex->thd, ER_WRONG_ARGUMENTS, "PURGE LOGS BEFORE");
YYABORT;
}
Item *tmp= new Item_func_unix_timestamp($2);
Lex->sql_command = SQLCOM_PURGE_BEFORE;
Lex->purge_time= tmp->val_int();
};
/* kill threads */ /* kill threads */
...@@ -3574,6 +3596,7 @@ kill: ...@@ -3574,6 +3596,7 @@ kill:
LEX *lex=Lex; LEX *lex=Lex;
if ($2->check_cols(1) || $2->fix_fields(lex->thd, 0, &$2)) if ($2->check_cols(1) || $2->fix_fields(lex->thd, 0, &$2))
{ {
send_error(lex->thd, ER_SET_CONSTANTS_ONLY); send_error(lex->thd, ER_SET_CONSTANTS_ONLY);
YYABORT; YYABORT;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment