Commit d4b2c9bb authored by vinchen's avatar vinchen Committed by Kristian Nielsen

optimize the memory allocation for compressed binlog event

parent 640051e0
...@@ -789,7 +789,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen) ...@@ -789,7 +789,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
*/ */
int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen) const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen)
{ {
ulong len = uint4korr(src + EVENT_LEN_OFFSET); ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src; const char *tmp = src;
...@@ -810,37 +811,54 @@ int query_event_uncompress(const Format_description_log_event *description_event ...@@ -810,37 +811,54 @@ int query_event_uncompress(const Format_description_log_event *description_event
*newlen = (tmp - src) + un_len; *newlen = (tmp - src) + un_len;
if(contain_checksum) if(contain_checksum)
*newlen += BINLOG_CHECKSUM_LEN; *newlen += BINLOG_CHECKSUM_LEN;
*dst = (char *)my_malloc(ALIGN_SIZE(*newlen), MYF(MY_FAE)); uint32 alloc_size = ALIGN_SIZE(*newlen);
if (!*dst) char *new_dst = NULL;
{
return 1;
}
/* copy the head*/ *is_malloc = false;
memcpy(*dst, src , tmp - src); if (alloc_size <= buf_size)
if (binlog_buf_uncompress(tmp, *dst + (tmp - src), len - (tmp - src), &un_len)) {
new_dst = buf;
}
else
{
new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
if (!new_dst)
return 1;
*is_malloc = true;
}
/* copy the head*/
memcpy(new_dst, src , tmp - src);
if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{ {
my_free(*dst); if (*is_malloc)
my_free(new_dst);
*is_malloc = false;
return 1; return 1;
} }
(*dst)[EVENT_TYPE_OFFSET] = QUERY_EVENT; new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
int4store(*dst + EVENT_LEN_OFFSET, *newlen); int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){ if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
int4store(*dst + clear_len, my_checksum(0L, (uchar *)*dst, clear_len)); int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
} }
*dst = new_dst;
return 0; return 0;
} }
int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen) const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen)
{ {
Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET]; Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
ulong len = uint4korr(src + EVENT_LEN_OFFSET); ulong len = uint4korr(src + EVENT_LEN_OFFSET);
const char *tmp = src; const char *tmp = src;
char *buf = NULL; char *new_dst = NULL;
DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type)); DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
...@@ -884,28 +902,39 @@ int Row_log_event_uncompress(const Format_description_log_event *description_eve ...@@ -884,28 +902,39 @@ int Row_log_event_uncompress(const Format_description_log_event *description_eve
*newlen += BINLOG_CHECKSUM_LEN; *newlen += BINLOG_CHECKSUM_LEN;
uint32 alloc_size = ALIGN_SIZE(*newlen); uint32 alloc_size = ALIGN_SIZE(*newlen);
buf = (char *)my_malloc(alloc_size , MYF(MY_FAE));
if (!buf) *is_malloc = false;
{ if (alloc_size <= buf_size)
return 1; {
} new_dst = buf;
}
else
{
new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
if (!new_dst)
return 1;
*is_malloc = true;
}
/* copy the head*/ /* copy the head*/
memcpy(buf, src , tmp - src); memcpy(new_dst, src , tmp - src);
/* uncompress the body */ /* uncompress the body */
if (binlog_buf_uncompress(tmp, buf + (tmp - src), len - (tmp - src), &un_len)) if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
{ {
my_free(buf); if (*is_malloc)
my_free(new_dst);
return 1; return 1;
} }
buf[EVENT_TYPE_OFFSET] = type; new_dst[EVENT_TYPE_OFFSET] = type;
int4store(buf + EVENT_LEN_OFFSET, *newlen); int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
if(contain_checksum){ if(contain_checksum){
ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN; ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
int4store(buf + clear_len, my_checksum(0L, (uchar *)buf, clear_len)); int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
} }
*dst = buf; *dst = new_dst;
return 0; return 0;
} }
...@@ -3504,14 +3533,15 @@ bool Query_compressed_log_event::write() ...@@ -3504,14 +3533,15 @@ bool Query_compressed_log_event::write()
{ {
const char *query_tmp = query; const char *query_tmp = query;
uint32 q_len_tmp = q_len; uint32 q_len_tmp = q_len;
uint32 alloc_size;
bool ret = true; bool ret = true;
q_len = binlog_get_compress_len(q_len); q_len = alloc_size = binlog_get_compress_len(q_len);
query = (char *)my_malloc(q_len, MYF(MY_FAE)); query = (char *)my_safe_alloca(alloc_size);
if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len)) if(query && !binlog_buf_compress(query_tmp, (char *)query, q_len_tmp, &q_len))
{ {
ret = Query_log_event::write(); ret = Query_log_event::write();
} }
my_free((void *)query); my_safe_afree((void *)query, alloc_size);
query = query_tmp; query = query_tmp;
q_len = q_len_tmp; q_len = q_len_tmp;
return ret; return ret;
...@@ -10786,14 +10816,15 @@ bool Rows_log_event::write_compressed() ...@@ -10786,14 +10816,15 @@ bool Rows_log_event::write_compressed()
uchar *m_rows_buf_tmp = m_rows_buf; uchar *m_rows_buf_tmp = m_rows_buf;
uchar *m_rows_cur_tmp = m_rows_cur; uchar *m_rows_cur_tmp = m_rows_cur;
bool ret = true; bool ret = true;
uint32 comlen = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp); uint32 comlen, alloc_size;
m_rows_buf = (uchar *)my_malloc(comlen, MYF(MY_FAE)); comlen = alloc_size = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
m_rows_buf = (uchar *)my_safe_alloca(alloc_size);
if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen)) if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
{ {
m_rows_cur = comlen + m_rows_buf; m_rows_cur = comlen + m_rows_buf;
ret = Log_event::write(); ret = Log_event::write();
} }
my_free(m_rows_buf); my_safe_afree(m_rows_buf, alloc_size);
m_rows_buf = m_rows_buf_tmp; m_rows_buf = m_rows_buf_tmp;
m_rows_cur = m_rows_cur_tmp; m_rows_cur = m_rows_cur_tmp;
return ret; return ret;
...@@ -12242,8 +12273,9 @@ void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_ ...@@ -12242,8 +12273,9 @@ void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_
{ {
char *new_buf; char *new_buf;
ulong len; ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg, bool is_malloc = false;
temp_buf, &new_buf, &len)) if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{ {
free_temp_buf(); free_temp_buf();
register_temp_buf(new_buf, true); register_temp_buf(new_buf, true);
...@@ -12911,8 +12943,9 @@ void Delete_rows_compressed_log_event::print(FILE *file, ...@@ -12911,8 +12943,9 @@ void Delete_rows_compressed_log_event::print(FILE *file,
{ {
char *new_buf; char *new_buf;
ulong len; ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg, bool is_malloc = false;
temp_buf, &new_buf, &len)) if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{ {
free_temp_buf(); free_temp_buf();
register_temp_buf(new_buf, true); register_temp_buf(new_buf, true);
...@@ -13167,8 +13200,9 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print ...@@ -13167,8 +13200,9 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print
{ {
char *new_buf; char *new_buf;
ulong len; ulong len;
if(!Row_log_event_uncompress(glob_description_event, checksum_alg, bool is_malloc = false;
temp_buf, &new_buf, &len)) if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
{ {
free_temp_buf(); free_temp_buf();
register_temp_buf(new_buf, true); register_temp_buf(new_buf, true);
......
...@@ -5074,11 +5074,13 @@ int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen ...@@ -5074,11 +5074,13 @@ int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen
uint32 binlog_get_compress_len(uint32 len); uint32 binlog_get_compress_len(uint32 len);
uint32 binlog_get_uncompress_len(const char *buf); uint32 binlog_get_uncompress_len(const char *buf);
int query_event_uncompress(const Format_description_log_event *description_event, int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
bool contain_checksum, const char *src, char **dst, ulong *newlen); const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen);
int Row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum, int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
const char *src, char **dst, ulong *newlen); const char *src, char* buf, ulong buf_size, bool* is_malloc,
char **dst, ulong *newlen);
#endif /* _log_event_h */ #endif /* _log_event_h */
...@@ -5664,7 +5664,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5664,7 +5664,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
bool gtid_skip_enqueue= false; bool gtid_skip_enqueue= false;
bool got_gtid_event= false; bool got_gtid_event= false;
rpl_gtid event_gtid; rpl_gtid event_gtid;
bool compressed_event = FALSE; bool is_compress_event = false;
char* new_buf = NULL;
char new_buf_arr[4096];
bool is_malloc = false;
/* /*
FD_q must have been prepared for the first R_a event FD_q must have been prepared for the first R_a event
...@@ -6150,7 +6153,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6150,7 +6153,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
*/ */
case QUERY_COMPRESSED_EVENT: case QUERY_COMPRESSED_EVENT:
inc_pos= event_len; inc_pos= event_len;
if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{ {
char llbuf[22]; char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR; error = ER_BINLOG_UNCOMPRESS_ERROR;
...@@ -6159,7 +6163,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6159,7 +6163,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(llbuf, strlen(llbuf)); error_msg.append(llbuf, strlen(llbuf));
goto err; goto err;
} }
compressed_event = true; buf = new_buf;
is_compress_event = true;
goto default_action; goto default_action;
case WRITE_ROWS_COMPRESSED_EVENT: case WRITE_ROWS_COMPRESSED_EVENT:
...@@ -6170,7 +6175,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6170,7 +6175,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
case DELETE_ROWS_COMPRESSED_EVENT_V1: case DELETE_ROWS_COMPRESSED_EVENT_V1:
inc_pos = event_len; inc_pos = event_len;
{ {
if (Row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, buf, (char **)&buf, &event_len)) if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
{ {
char llbuf[22]; char llbuf[22];
error = ER_BINLOG_UNCOMPRESS_ERROR; error = ER_BINLOG_UNCOMPRESS_ERROR;
...@@ -6180,7 +6186,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6180,7 +6186,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
goto err; goto err;
} }
} }
compressed_event = true; buf = new_buf;
is_compress_event = true;
goto default_action; goto default_action;
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -6233,7 +6240,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6233,7 +6240,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
++mi->events_queued_since_last_gtid; ++mi->events_queued_since_last_gtid;
} }
if (!compressed_event) if (!is_compress_event)
inc_pos= event_len; inc_pos= event_len;
break; break;
...@@ -6429,8 +6436,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -6429,8 +6436,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error), mi->report(ERROR_LEVEL, error, NULL, ER_DEFAULT(error),
error_msg.ptr()); error_msg.ptr());
if(compressed_event) if(is_malloc)
my_free((void *)buf); my_free((void *)new_buf);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment