Commit 82d79389 authored by Michael Widenius's avatar Michael Widenius

Bug#37276 maria crash on insert around the time check table is run

Fixed several (but not all) issues found by the test program:
- ASSERT on row_length in ma_blockrec.c::_ma_compact_block_page()
- Fixed bug when splitting node pages
- Fixed hang in 'closeing tables' (conflicting mutex order) by ensuring we first take trnman lock and then share->intern_lock

storage/maria/ma_blockrec.c:
  When compacting a row page when allocating space for a new row, the min length of a the new block may be temporarly smaller than 'min_block_length'.
storage/maria/ma_check.c:
  More DBUG output
storage/maria/ma_checkpoint.c:
  Call new function _ma_remove_not_visible_states_with_lock() to ensure we first take lock on trnman and then on share->intern_lock
  +
storage/maria/ma_close.c:
  Added comment
storage/maria/ma_open.c:
  Added comment
storage/maria/ma_search.c:
  Copy also node data; Caused bug when splitting node pages
storage/maria/ma_state.c:
  Added _ma_remove_not_visible_states_with_lock() to ensure we take locks in right order
storage/maria/ma_state.h:
  Added new prototype
storage/maria/trnman.c:
  Added trnman_lock() and trnman_unlock().
  Needed by _ma_remove_not_visible_states_with_lock() to get mutex in right order
storage/maria/trnman_public.h:
  Added new prototypes
parent 6559f5c2
...@@ -1691,11 +1691,16 @@ static my_bool get_head_or_tail_page(MARIA_HA *info, ...@@ -1691,11 +1691,16 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
{ {
if (res->empty_space + res->length >= length) if (res->empty_space + res->length >= length)
{ {
/*
We can't just verify min_block_length here as the newly found block
may be smaller than min_block_length.
*/
_ma_compact_block_page(res->buff, block_size, res->rownr, 1, _ma_compact_block_page(res->buff, block_size, res->rownr, 1,
page_type == HEAD_PAGE ? page_type == HEAD_PAGE ?
info->trn->min_read_from : 0, info->trn->min_read_from : 0,
page_type == HEAD_PAGE ? page_type == HEAD_PAGE ?
share->base.min_block_length : 0); min(res->length, share->base.min_block_length) :
0);
/* All empty space are now after current position */ /* All empty space are now after current position */
dir= dir_entry_pos(res->buff, block_size, res->rownr); dir= dir_entry_pos(res->buff, block_size, res->rownr);
res->length= res->empty_space= uint2korr(dir+2); res->length= res->empty_space= uint2korr(dir+2);
......
...@@ -839,7 +839,8 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo, ...@@ -839,7 +839,8 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
page_flag= _ma_get_keypage_flag(share, buff); page_flag= _ma_get_keypage_flag(share, buff);
_ma_get_used_and_nod_with_flag(share, page_flag, buff, used_length, _ma_get_used_and_nod_with_flag(share, page_flag, buff, used_length,
nod_flag); nod_flag);
keypos= buff + share->keypage_header + nod_flag; old_keypos= buff + share->keypage_header;
keypos= old_keypos+ nod_flag;
endpos= buff + used_length; endpos= buff + used_length;
param->keydata+= used_length; param->keydata+= used_length;
...@@ -879,8 +880,11 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo, ...@@ -879,8 +880,11 @@ static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
next_page= _ma_kpos(nod_flag,keypos); next_page= _ma_kpos(nod_flag,keypos);
if (chk_index_down(param,info,keyinfo,next_page, if (chk_index_down(param,info,keyinfo,next_page,
temp_buff,keys,key_checksum,level+1)) temp_buff,keys,key_checksum,level+1))
{
DBUG_DUMP("page_data", old_keypos, (uint) (keypos - old_keypos));
goto err; goto err;
} }
}
old_keypos=keypos; old_keypos=keypos;
if (keypos >= endpos || if (keypos >= endpos ||
!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &keypos)) !(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &keypos))
......
...@@ -1059,8 +1059,9 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon) ...@@ -1059,8 +1059,9 @@ static int collect_tables(LEX_STRING *str, LSN checkpoint_start_log_horizon)
TODO: Only do this call if there has been # (10?) ended transactions TODO: Only do this call if there has been # (10?) ended transactions
since last call. since last call.
*/ */
share->state_history= _ma_remove_not_visible_states(share->state_history, pthread_mutex_unlock(&share->intern_lock);
0, 0); _ma_remove_not_visible_states_with_lock(share);
pthread_mutex_lock(&share->intern_lock);
if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME) if (share->in_checkpoint & MARIA_CHECKPOINT_SHOULD_FREE_ME)
{ {
......
...@@ -129,7 +129,11 @@ int maria_close(register MARIA_HA *info) ...@@ -129,7 +129,11 @@ int maria_close(register MARIA_HA *info)
else else
share_can_be_freed= TRUE; share_can_be_freed= TRUE;
/* Remember share->history for future opens */ /*
Remember share->history for future opens
Here we take share->intern_lock followed by trans_lock but this is
safe as no other thread one can use 'share' here.
*/
share->state_history= _ma_remove_not_visible_states(share->state_history, share->state_history= _ma_remove_not_visible_states(share->state_history,
1, 0); 1, 0);
if (share->state_history) if (share->state_history)
......
...@@ -787,7 +787,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -787,7 +787,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
hash_search(&maria_stored_state, hash_search(&maria_stored_state,
(uchar*) &share->state.create_rename_lsn, 0))) (uchar*) &share->state.create_rename_lsn, 0)))
{ {
/* Move history from hash to share */ /*
Move history from hash to share. This is safe to do as we
don't have a lock on share->intern_lock.
*/
share->state_history= share->state_history=
_ma_remove_not_visible_states(history->state_history, 0, 0); _ma_remove_not_visible_states(history->state_history, 0, 0);
history->state_history= 0; history->state_history= 0;
......
...@@ -1324,7 +1324,7 @@ uint _ma_get_binary_pack_key(MARIA_KEY *int_key, uint page_flag, uint nod_flag, ...@@ -1324,7 +1324,7 @@ uint _ma_get_binary_pack_key(MARIA_KEY *int_key, uint page_flag, uint nod_flag,
} }
/* Copy rest of data ptr and, if appropriate, trans_id and node_ptr */ /* Copy rest of data ptr and, if appropriate, trans_id and node_ptr */
memcpy(key, from, length); memcpy(key, from, length + nod_flag);
*page_pos= from + length + nod_flag; *page_pos= from + length + nod_flag;
DBUG_RETURN(int_key->data_length + int_key->ref_length); DBUG_RETURN(int_key->data_length + int_key->ref_length);
...@@ -1359,7 +1359,7 @@ uchar *_ma_skip_binary_pack_key(MARIA_KEY *key, uint page_flag, ...@@ -1359,7 +1359,7 @@ uchar *_ma_skip_binary_pack_key(MARIA_KEY *key, uint page_flag,
} }
/* /**
@brief Get key at position without knowledge of previous key @brief Get key at position without knowledge of previous key
@return pointer to next key @return pointer to next key
......
...@@ -110,6 +110,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info) ...@@ -110,6 +110,7 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
@param all 1 if we should delete the first state if it's @param all 1 if we should delete the first state if it's
visible for all. For the moment this is only used visible for all. For the moment this is only used
on close() of table. on close() of table.
@param trnman_is_locked Set to 1 if we have already a lock on trnman.
@notes @notes
The assumption is that items in the history list is ordered by The assumption is that items in the history list is ordered by
...@@ -122,6 +123,9 @@ my_bool _ma_setup_live_state(MARIA_HA *info) ...@@ -122,6 +123,9 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
state as first state in the history. This is to allow us to just move state as first state in the history. This is to allow us to just move
the history from the global list to the share when we open the table. the history from the global list to the share when we open the table.
Note that if 'all' is set trnman_is_locked must be 0, becasue
trnman_get_min_trid() will take a lock on trnman.
@return @return
@retval Pointer to new history list @retval Pointer to new history list
*/ */
...@@ -157,6 +161,7 @@ MARIA_STATE_HISTORY ...@@ -157,6 +161,7 @@ MARIA_STATE_HISTORY
if (all && parent == &org_history->next) if (all && parent == &org_history->next)
{ {
DBUG_ASSERT(trnman_is_locked == 0);
/* There is only one state left. Delete this if it's visible for all */ /* There is only one state left. Delete this if it's visible for all */
if (last_trid < trnman_get_min_trid()) if (last_trid < trnman_get_min_trid())
{ {
...@@ -167,6 +172,29 @@ MARIA_STATE_HISTORY ...@@ -167,6 +172,29 @@ MARIA_STATE_HISTORY
DBUG_RETURN(org_history); DBUG_RETURN(org_history);
} }
/**
@brief Remove not used state history
@notes
share and trnman are not locked.
We must first lock trnman and then share->intern_lock. This is becasue
_ma_trnman_end_trans_hook() has a lock on trnman and then
takes share->intern_lock.
*/
void _ma_remove_not_visible_states_with_lock(MARIA_SHARE *share)
{
trnman_lock();
pthread_mutex_lock(&share->intern_lock);
share->state_history= _ma_remove_not_visible_states(share->state_history, 0,
1);
pthread_mutex_unlock(&share->intern_lock);
trnman_unlock();
}
/* /*
Free state history information from share->history and reset information Free state history information from share->history and reset information
to current state. to current state.
......
...@@ -76,3 +76,4 @@ my_bool _ma_trnman_end_trans_hook(TRN *trn, my_bool commit, ...@@ -76,3 +76,4 @@ my_bool _ma_trnman_end_trans_hook(TRN *trn, my_bool commit,
my_bool _ma_row_visible_always(MARIA_HA *info); my_bool _ma_row_visible_always(MARIA_HA *info);
my_bool _ma_row_visible_non_transactional_table(MARIA_HA *info); my_bool _ma_row_visible_non_transactional_table(MARIA_HA *info);
my_bool _ma_row_visible_transactional_table(MARIA_HA *info); my_bool _ma_row_visible_transactional_table(MARIA_HA *info);
void _ma_remove_not_visible_states_with_lock(struct st_maria_share *share);
...@@ -851,7 +851,7 @@ TrID trnman_get_max_trid() ...@@ -851,7 +851,7 @@ TrID trnman_get_max_trid()
} }
/** /**
Check if there exist an active transaction between two commit_id's @brief Check if there exist an active transaction between two commit_id's
@todo @todo
Improve speed of this. Improve speed of this.
...@@ -885,3 +885,22 @@ my_bool trnman_exists_active_transactions(TrID min_id, TrID max_id, ...@@ -885,3 +885,22 @@ my_bool trnman_exists_active_transactions(TrID min_id, TrID max_id,
pthread_mutex_unlock(&LOCK_trn_list); pthread_mutex_unlock(&LOCK_trn_list);
return ret; return ret;
} }
/**
lock transaction list
*/
void trnman_lock()
{
pthread_mutex_lock(&LOCK_trn_list);
}
/**
unlock transaction list
*/
void trnman_unlock()
{
pthread_mutex_unlock(&LOCK_trn_list);
}
...@@ -65,5 +65,7 @@ my_bool trnman_exists_active_transactions(TrID min_id, TrID max_id, ...@@ -65,5 +65,7 @@ my_bool trnman_exists_active_transactions(TrID min_id, TrID max_id,
#define TRANSID_SIZE 6 #define TRANSID_SIZE 6
#define transid_store(dst, id) int6store(dst,id) #define transid_store(dst, id) int6store(dst,id)
#define transid_korr(P) uint6korr(P) #define transid_korr(P) uint6korr(P)
void trnman_lock();
void trnman_unlock();
C_MODE_END C_MODE_END
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment