Commit 3bd6e4b8 authored by Michael Widenius's avatar Michael Widenius

MDEV-3999: Valgrind errors 'invalid write' or assorted server crashes on...

MDEV-3999: Valgrind errors 'invalid write' or assorted server crashes on concurrent flow with partitioned Aria tables
MDEV-3989: Server crashes on import from MariaDB mysqldump export with partitioned Aria table.

Problem was that bulk insert in aria was not properly protected against concurrent selects.


storage/maria/ha_maria.cc:
  Move settings of file->state to _ma_block_start_trans() to ensure that lock_key_trees is not changed by a concurrent bulk_insert.
storage/maria/ma_check.c:
  Added DBUG_ASSERT()
storage/maria/ma_open.c:
  Set start_trans to ma_start_trans for default behaviour.
storage/maria/ma_pagecrc.c:
  Removed test for 'non_transactional' as a now_transactinal could be reset while a flush was happening.
storage/maria/ma_state.c:
  Moved setting of info->state from external_lock to start_trans to protect against concurrent running bulk inserts.
  This works as the other threads will wait in thr_lock() until bulk_insert is done and keys are re-generated.
storage/maria/ma_state.h:
  Added _ma_start_trans()
parent 0737932b
...@@ -2638,23 +2638,6 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -2638,23 +2638,6 @@ int ha_maria::external_lock(THD *thd, int lock_type)
/* Transactional table */ /* Transactional table */
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK)
{ {
if (!file->s->lock_key_trees) // If we don't use versioning
{
/*
We come here in the following cases:
- The table is a temporary table
- It's a table which is crash safe but not yet versioned, for
example a table with fulltext or rtree keys
Set the current state to point to save_state so that the
block_format code don't count the same record twice.
Copy also the current state. This may have been wrong if the
same file was used several times in the last statement
*/
file->state= file->state_start;
*file->state= file->s->state.state;
}
if (file->trn) if (file->trn)
{ {
/* This can only happen with tables created with clone() */ /* This can only happen with tables created with clone() */
......
...@@ -2424,11 +2424,10 @@ static void restore_table_state_after_repair(MARIA_HA *info, ...@@ -2424,11 +2424,10 @@ static void restore_table_state_after_repair(MARIA_HA *info,
{ {
maria_versioning(info, info->s->have_versioning); maria_versioning(info, info->s->have_versioning);
info->s->lock_key_trees= org_share->lock_key_trees; info->s->lock_key_trees= org_share->lock_key_trees;
DBUG_ASSERT(!info->s->have_versioning || info->s->lock_key_trees);
} }
/** /**
@brief Drop all indexes @brief Drop all indexes
......
...@@ -896,6 +896,8 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -896,6 +896,8 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
share->row_is_visible= _ma_row_visible_always; share->row_is_visible= _ma_row_visible_always;
share->lock.get_status= _ma_reset_update_flag; share->lock.get_status= _ma_reset_update_flag;
share->lock.start_trans= _ma_start_trans;
if (!thr_lock_inited) if (!thr_lock_inited)
{ {
/* Probably a single threaded program; Don't use concurrent inserts */ /* Probably a single threaded program; Don't use concurrent inserts */
......
...@@ -358,8 +358,7 @@ my_bool maria_flush_log_for_page(uchar *page, ...@@ -358,8 +358,7 @@ my_bool maria_flush_log_for_page(uchar *page,
MARIA_SHARE *share= (MARIA_SHARE*) data_ptr; MARIA_SHARE *share= (MARIA_SHARE*) data_ptr;
DBUG_ENTER("maria_flush_log_for_page"); DBUG_ENTER("maria_flush_log_for_page");
/* share is 0 here only in unittest */ /* share is 0 here only in unittest */
DBUG_ASSERT(!share || (share->page_type == PAGECACHE_LSN_PAGE && DBUG_ASSERT(!share || share->page_type == PAGECACHE_LSN_PAGE);
share->now_transactional));
lsn= lsn_korr(page); lsn= lsn_korr(page);
if (translog_flush(lsn)) if (translog_flush(lsn))
DBUG_RETURN(1); DBUG_RETURN(1);
......
...@@ -59,6 +59,8 @@ my_bool _ma_setup_live_state(MARIA_HA *info) ...@@ -59,6 +59,8 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
MARIA_STATE_HISTORY *history; MARIA_STATE_HISTORY *history;
DBUG_ENTER("_ma_setup_live_state"); DBUG_ENTER("_ma_setup_live_state");
DBUG_ASSERT(share->lock_key_trees);
if (maria_create_trn_hook(info)) if (maria_create_trn_hook(info))
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -377,6 +379,17 @@ void _ma_reset_update_flag(void *param, ...@@ -377,6 +379,17 @@ void _ma_reset_update_flag(void *param,
info->state->changed= 0; info->state->changed= 0;
} }
my_bool _ma_start_trans(void* param)
{
MARIA_HA *info=(MARIA_HA*) param;
if (!info->s->lock_key_trees)
{
info->state= info->state_start;
*info->state= info->s->state.state;
}
return 0;
}
/** /**
@brief Check if should allow concurrent inserts @brief Check if should allow concurrent inserts
...@@ -622,6 +635,22 @@ my_bool _ma_block_start_trans(void* param) ...@@ -622,6 +635,22 @@ my_bool _ma_block_start_trans(void* param)
*/ */
return _ma_setup_live_state(info); return _ma_setup_live_state(info);
} }
else
{
/*
We come here in the following cases:
- The table is a temporary table
- It's a table which is crash safe but not yet versioned, for
example a table with fulltext or rtree keys
Set the current state to point to save_state so that the
block_format code don't count the same record twice.
Copy also the current state. This may have been wrong if the
same file was used several times in the last statement
*/
info->state= info->state_start;
*info->state= info->s->state.state;
}
/* /*
Info->trn is set if this table is already handled and we are Info->trn is set if this table is already handled and we are
...@@ -668,9 +697,11 @@ my_bool _ma_block_start_trans_no_versioning(void* param) ...@@ -668,9 +697,11 @@ my_bool _ma_block_start_trans_no_versioning(void* param)
{ {
MARIA_HA *info=(MARIA_HA*) param; MARIA_HA *info=(MARIA_HA*) param;
DBUG_ENTER("_ma_block_get_status_no_version"); DBUG_ENTER("_ma_block_get_status_no_version");
DBUG_ASSERT(info->s->base.born_transactional); DBUG_ASSERT(info->s->base.born_transactional && !info->s->lock_key_trees);
info->state->changed= 0; /* from _ma_reset_update_flag() */ info->state->changed= 0; /* from _ma_reset_update_flag() */
info->state= info->state_start;
*info->state= info->s->state.state;
if (!info->trn) if (!info->trn)
{ {
/* /*
...@@ -689,18 +720,22 @@ my_bool _ma_block_start_trans_no_versioning(void* param) ...@@ -689,18 +720,22 @@ my_bool _ma_block_start_trans_no_versioning(void* param)
void maria_versioning(MARIA_HA *info, my_bool versioning) void maria_versioning(MARIA_HA *info, my_bool versioning)
{ {
MARIA_SHARE *share= info->s;
/* For now, this is a hack */ /* For now, this is a hack */
if (info->s->have_versioning) if (share->have_versioning)
{ {
enum thr_lock_type save_lock_type; enum thr_lock_type save_lock_type;
/* Assume is a non threaded application (for now) */ share->lock_key_trees= versioning;
info->s->lock_key_trees= 0;
/* Set up info->lock.type temporary for _ma_block_get_status() */ /* Set up info->lock.type temporary for _ma_block_get_status() */
save_lock_type= info->lock.type; save_lock_type= info->lock.type;
info->lock.type= versioning ? TL_WRITE_CONCURRENT_INSERT : TL_WRITE; info->lock.type= versioning ? TL_WRITE_CONCURRENT_INSERT : TL_WRITE;
_ma_block_get_status((void*) info, versioning); _ma_block_get_status((void*) info, versioning);
info->lock.type= save_lock_type; info->lock.type= save_lock_type;
info->state= info->state_start= &info->s->state.common; if (versioning)
info->state= &share->state.common;
else
info->state= &share->state.state; /* Change global values by default */
info->state_start= info->state; /* Initial values */
} }
} }
......
...@@ -66,6 +66,7 @@ void _ma_update_status_with_lock(MARIA_HA *info); ...@@ -66,6 +66,7 @@ void _ma_update_status_with_lock(MARIA_HA *info);
void _ma_restore_status(void *param); void _ma_restore_status(void *param);
void _ma_copy_status(void* to, void *from); void _ma_copy_status(void* to, void *from);
void _ma_reset_update_flag(void *param, my_bool concurrent_insert); void _ma_reset_update_flag(void *param, my_bool concurrent_insert);
my_bool _ma_start_trans(void* param);
my_bool _ma_check_status(void *param); my_bool _ma_check_status(void *param);
void _ma_block_get_status(void* param, my_bool concurrent_insert); void _ma_block_get_status(void* param, my_bool concurrent_insert);
void _ma_block_update_status(void *param); void _ma_block_update_status(void *param);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment