Commit 89ad11ae authored by Sergei Golubchik's avatar Sergei Golubchik

merge

parents 8a655c60 4150dfda
...@@ -2314,7 +2314,8 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -2314,7 +2314,8 @@ int ha_maria::external_lock(THD *thd, int lock_type)
trnman_new_statement(trn); trnman_new_statement(trn);
} }
if (file->s->lock.get_status) /* If handler uses versioning */
if (file->s->lock_key_trees)
{ {
if (_ma_setup_live_state(file)) if (_ma_setup_live_state(file))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); DBUG_RETURN(HA_ERR_OUT_OF_MEM);
...@@ -2513,7 +2514,8 @@ int ha_maria::implicit_commit(THD *thd, bool new_trn) ...@@ -2513,7 +2514,8 @@ int ha_maria::implicit_commit(THD *thd, bool new_trn)
if (handler->s->base.born_transactional) if (handler->s->base.born_transactional)
{ {
_ma_set_trn_for_table(handler, trn); _ma_set_trn_for_table(handler, trn);
if (handler->s->lock.get_status) /* If handler uses versioning */
if (handler->s->lock_key_trees)
{ {
if (_ma_setup_live_state(handler)) if (_ma_setup_live_state(handler))
error= HA_ERR_OUT_OF_MEM; error= HA_ERR_OUT_OF_MEM;
...@@ -3074,6 +3076,16 @@ static int mark_recovery_success(void) ...@@ -3074,6 +3076,16 @@ static int mark_recovery_success(void)
} }
/*
Return 1 if table has changed during the current transaction
*/
bool ha_maria::is_changed() const
{
return file->state->changed;
}
static int ha_maria_init(void *p) static int ha_maria_init(void *p)
{ {
int res; int res;
......
...@@ -139,6 +139,7 @@ class ha_maria :public handler ...@@ -139,6 +139,7 @@ class ha_maria :public handler
int repair(THD * thd, HA_CHECK_OPT * check_opt); int repair(THD * thd, HA_CHECK_OPT * check_opt);
bool check_and_repair(THD * thd); bool check_and_repair(THD * thd);
bool is_crashed() const; bool is_crashed() const;
bool is_changed() const;
bool auto_repair() const { return 1; } bool auto_repair() const { return 1; }
int optimize(THD * thd, HA_CHECK_OPT * check_opt); int optimize(THD * thd, HA_CHECK_OPT * check_opt);
int restore(THD * thd, HA_CHECK_OPT * check_opt); int restore(THD * thd, HA_CHECK_OPT * check_opt);
......
...@@ -185,8 +185,12 @@ static CONTROL_FILE_ERROR create_control_file(const char *name, ...@@ -185,8 +185,12 @@ static CONTROL_FILE_ERROR create_control_file(const char *name,
files around (indeed it could be that the control file alone was deleted files around (indeed it could be that the control file alone was deleted
or not restored, and we should not go on with life at this point). or not restored, and we should not go on with life at this point).
TODO: For now we trust (this is alpha version), but for beta if would Things should still be relatively safe as if someone tries to use
be great to verify. an old table with a new control file the different uuid:s between
the files will cause ma_open() to generate an HA_ERR_OLD_FILE
error. When used from mysqld this will cause the table to be open
in repair mode which will remove all dependencies between the
table and the old control file.
We could have a tool which can rebuild the control file, by reading the We could have a tool which can rebuild the control file, by reading the
directory of logs, finding the newest log, reading it to find last directory of logs, finding the newest log, reading it to find last
......
...@@ -116,6 +116,7 @@ int maria_delete(MARIA_HA *info,const uchar *record) ...@@ -116,6 +116,7 @@ int maria_delete(MARIA_HA *info,const uchar *record)
info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED; info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE | share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
STATE_NOT_ZEROFILLED); STATE_NOT_ZEROFILLED);
info->state->changed=1;
mi_sizestore(lastpos, info->cur_row.lastpos); mi_sizestore(lastpos, info->cur_row.lastpos);
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
......
...@@ -61,7 +61,7 @@ static int _ma_put_key_in_record(MARIA_HA *info,uint keynr,uchar *record); ...@@ -61,7 +61,7 @@ static int _ma_put_key_in_record(MARIA_HA *info,uint keynr,uchar *record);
if trid < 256-12 if trid < 256-12
one byte one byte
else else
one byte prefix (256-length_of_trid_in_bytes) followed by data one byte prefix length_of_trid_in_bytes + 249 followed by data
in high-byte-first order in high-byte-first order
Prefix bytes 244 to 249 are reserved for negative transid, that can be used Prefix bytes 244 to 249 are reserved for negative transid, that can be used
...@@ -69,6 +69,25 @@ static int _ma_put_key_in_record(MARIA_HA *info,uint keynr,uchar *record); ...@@ -69,6 +69,25 @@ static int _ma_put_key_in_record(MARIA_HA *info,uint keynr,uchar *record);
We have to store transid in high-byte-first order to be able to do a We have to store transid in high-byte-first order to be able to do a
fast byte-per-byte comparision of them without packing them up. fast byte-per-byte comparision of them without packing them up.
For example, assuming we the following data:
key_data: 1 (4 byte integer)
pointer_to_row: 2 << 8 + 3 = 515 (page 2, row 3)
table_create_transid 1000 Defined at create table time
transid 1010 Transaction that created row
delete_transid 2011 Transaction that deleted row
In addition we assume the table is created with a data pointer length
of 4 bytes (this is automatically calculated based on the medium
length of rows and the given max number of rows)
The binary data for the key would then look like this in hex:
00 00 00 01 Key data (1 stored high byte first)
00 00 00 47 (515 << 1) + 1 ; The last 1 is marker that key cont.
15 ((1000-1010) << 1) + 1 ; The last 1 is marker that key cont.
FB 07 E6 length byte and ((2011 - 1000) << 1) = 07 E6
*/ */
uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid) uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid)
...@@ -76,7 +95,7 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid) ...@@ -76,7 +95,7 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid)
uchar *start; uchar *start;
uint length; uint length;
uchar buff[8]; uchar buff[8];
DBUG_ASSERT(trid < (LL(1) << (MAX_PACK_TRANSID_SIZE*8))); DBUG_ASSERT(trid < (LL(1) << (MARIA_MAX_PACK_TRANSID_SIZE*8)));
DBUG_ASSERT(trid >= info->s->state.create_trid); DBUG_ASSERT(trid >= info->s->state.create_trid);
trid= (trid - info->s->state.create_trid) << 1; trid= (trid - info->s->state.create_trid) << 1;
...@@ -84,7 +103,7 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid) ...@@ -84,7 +103,7 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid)
/* Mark that key contains transid */ /* Mark that key contains transid */
to[-1]|= 1; to[-1]|= 1;
if (trid < MIN_TRANSID_PACK_PREFIX) if (trid < MARIA_MIN_TRANSID_PACK_OFFSET)
{ {
to[0]= (uchar) trid; to[0]= (uchar) trid;
return 1; return 1;
...@@ -100,7 +119,8 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid) ...@@ -100,7 +119,8 @@ uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid)
} while (trid); } while (trid);
length= (uint) (to - buff); length= (uint) (to - buff);
start[0]= (uchar) (256 - length); /* Store length prefix */ /* Store length prefix */
start[0]= (uchar) (length + MARIA_TRANSID_PACK_OFFSET);
start++; start++;
/* Copy things in high-byte-first order to output buffer */ /* Copy things in high-byte-first order to output buffer */
do do
...@@ -127,12 +147,13 @@ ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from) ...@@ -127,12 +147,13 @@ ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from)
ulonglong value; ulonglong value;
uint length; uint length;
if (from[0] < MIN_TRANSID_PACK_PREFIX) if (from[0] < MARIA_MIN_TRANSID_PACK_OFFSET)
value= (ulonglong) from[0]; value= (ulonglong) from[0];
else else
{ {
value= 0; value= 0;
for (length= (uint) (256 - from[0]), value= (ulonglong) from[1], from+=2; for (length= (uint) (from[0] - MARIA_TRANSID_PACK_OFFSET),
value= (ulonglong) from[1], from+=2;
--length ; --length ;
from++) from++)
value= (value << 8) + ((ulonglong) *from); value= (value << 8) + ((ulonglong) *from);
......
...@@ -121,6 +121,8 @@ struct st_translog_buffer ...@@ -121,6 +121,8 @@ struct st_translog_buffer
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE) in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
*/ */
TRANSLOG_ADDRESS next_buffer_offset; TRANSLOG_ADDRESS next_buffer_offset;
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/* /*
How much is written (or will be written when copy_to_buffer_in_progress How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer become 0) to this buffer
...@@ -135,12 +137,12 @@ struct st_translog_buffer ...@@ -135,12 +137,12 @@ struct st_translog_buffer
/* list of waiting buffer ready threads */ /* list of waiting buffer ready threads */
struct st_my_thread_var *waiting_flush; struct st_my_thread_var *waiting_flush;
/* /*
Pointer on the buffer which overlap with this one (due to flush of If true then previous buffer overlap with this one (due to flush of
loghandler, the last page of that buffer is the same as the first page loghandler, the last page of that buffer is the same as the first page
of this buffer) and have to be written first (because contain old of this buffer) and have to be written first (because contain old
content of page which present in both buffers) content of page which present in both buffers)
*/ */
struct st_translog_buffer *overlay; my_bool overlay;
uint buffer_no; uint buffer_no;
/* /*
Lock for the buffer. Lock for the buffer.
...@@ -175,6 +177,14 @@ struct st_translog_buffer ...@@ -175,6 +177,14 @@ struct st_translog_buffer
With file and offset it allow detect buffer changes With file and offset it allow detect buffer changes
*/ */
uint8 ver; uint8 ver;
/*
When previous buffer sent to disk it set its address here to allow
to detect when it is done
(we have to keep it in this buffer to lock buffers only in one direction).
*/
TRANSLOG_ADDRESS prev_sent_to_disk;
pthread_cond_t prev_sent_to_disk_cond;
}; };
...@@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer) ...@@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/* list of waiting buffer ready threads */ /* list of waiting buffer ready threads */
buffer->waiting_flush= 0; buffer->waiting_flush= 0;
/* lock for the buffer. Current buffer also lock the handler */ /* lock for the buffer. Current buffer also lock the handler */
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST)) if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&buffer->prev_sent_to_disk_cond, 0))
DBUG_RETURN(1); DBUG_RETURN(1);
buffer->is_closing_buffer= 0; buffer->is_closing_buffer= 0;
buffer->prev_sent_to_disk= LSN_IMPOSSIBLE;
buffer->prev_buffer_offset= LSN_IMPOSSIBLE;
buffer->ver= 0; buffer->ver= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
{ {
translog_lock_assert_owner(); translog_lock_assert_owner();
translog_start_buffer(new_buffer, cursor, new_buffer_no); translog_start_buffer(new_buffer, cursor, new_buffer_no);
} new_buffer->prev_buffer_offset=
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset; log_descriptor.buffers[old_buffer_no].offset;
new_buffer->prev_last_lsn= new_buffer->prev_last_lsn=
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no); BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
}
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx", DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(new_buffer->prev_last_lsn), LSN_IN_PARTS(new_buffer->prev_last_lsn),
(ulong) new_buffer)); (ulong) new_buffer));
...@@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
SYNOPSIS SYNOPSIS
translog_set_sent_to_disk() translog_set_sent_to_disk()
lsn LSN to assign buffer buffer which we have sent to disk
in_buffers to assign to in_buffers_only
TODO: use atomic operations if possible (64bit architectures?) TODO: use atomic operations if possible (64bit architectures?)
*/ */
static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers) static void translog_set_sent_to_disk(struct st_translog_buffer *buffer)
{ {
LSN lsn= buffer->last_lsn;
TRANSLOG_ADDRESS in_buffers= buffer->next_buffer_offset;
DBUG_ENTER("translog_set_sent_to_disk"); DBUG_ENTER("translog_set_sent_to_disk");
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock); pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) " DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
...@@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset) ...@@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
} }
} }
/*
@brief Waits previous buffer flush finish
@param buffer buffer for check
@retval 0 previous buffer flushed and this thread have to flush this one
@retval 1 previous buffer flushed and this buffer flushed by other thread too
*/
my_bool translog_prev_buffer_flush_wait(struct st_translog_buffer *buffer)
{
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
DBUG_ENTER("translog_prev_buffer_flush_wait");
DBUG_PRINT("enter", ("buffer: 0x%lx #%u offset: (%lu,0x%lx) "
"prev sent: (%lu,0x%lx) prev offset: (%lu,0x%lx)",
(ulong) buffer, (uint) buffer->buffer_no,
LSN_IN_PARTS(buffer->offset),
LSN_IN_PARTS(buffer->prev_sent_to_disk),
LSN_IN_PARTS(buffer->prev_buffer_offset)));
translog_buffer_lock_assert_owner(buffer);
/*
if prev_sent_to_disk == LSN_IMPOSSIBLE then
prev_buffer_offset should be LSN_IMPOSSIBLE
because it means that this buffer was never used
*/
DBUG_ASSERT((buffer->prev_sent_to_disk == LSN_IMPOSSIBLE &&
buffer->prev_buffer_offset == LSN_IMPOSSIBLE) ||
buffer->prev_sent_to_disk != LSN_IMPOSSIBLE);
if (buffer->prev_buffer_offset != buffer->prev_sent_to_disk)
{
do {
pthread_cond_wait(&buffer->prev_sent_to_disk_cond, &buffer->mutex);
if (buffer->file != file || buffer->offset != offset ||
buffer->ver != ver)
{
translog_buffer_unlock(buffer);
DBUG_RETURN(1); /* some the thread flushed the buffer already */
}
} while(buffer->prev_buffer_offset != buffer->prev_sent_to_disk);
}
DBUG_RETURN(0);
}
/* /*
Flush given buffer Flush given buffer
...@@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver) if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
DBUG_RETURN(0); /* some the thread flushed the buffer already */ DBUG_RETURN(0); /* some the thread flushed the buffer already */
if (buffer->overlay && buffer->overlay->file == buffer->file && if (buffer->overlay && translog_prev_buffer_flush_wait(buffer))
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size, DBUG_RETURN(0); /* some the thread flushed the buffer already */
buffer->offset) > 0)
{
/*
This can't happen for normal translog_flush,
only during destroying the loghandler
*/
struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
TRANSLOG_FILE *fl= buffer->file;
uint8 ver= buffer->ver;
translog_buffer_unlock(buffer);
translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == fl &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0)
{
translog_wait_for_buffer_free(overlay);
}
translog_buffer_unlock(overlay);
translog_buffer_lock(buffer);
if (buffer->file != fl || buffer_offset != buffer->offset ||
ver != buffer->ver)
{
/*
This means that somebody else flushed the buffer while we was
waiting for overlay then for locking buffer again.
*/
DBUG_RETURN(0);
}
}
/* /*
Send page by page in the pagecache what we are going to write on the Send page by page in the pagecache what we are going to write on the
...@@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
file->is_sync= 0; file->is_sync= 0;
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */ if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_disk(buffer->last_lsn, {
buffer->next_buffer_offset); if (translog_prev_buffer_flush_wait(buffer))
DBUG_RETURN(0); /* some the thread flushed the buffer already */
translog_set_sent_to_disk(buffer);
}
else else
translog_set_only_in_buffers(buffer->next_buffer_offset); translog_set_only_in_buffers(buffer->next_buffer_offset);
/* say to next buffer that we are finished */
{
struct st_translog_buffer *next_buffer=
log_descriptor.buffers + ((buffer->buffer_no + 1) % TRANSLOG_BUFFERS_NO);
if (likely(translog_status == TRANSLOG_OK)){
translog_buffer_lock(next_buffer);
next_buffer->prev_sent_to_disk= buffer->offset;
translog_buffer_unlock(next_buffer);
pthread_cond_broadcast(&next_buffer->prev_sent_to_disk_cond);
}
else
{
/*
It is shutdown =>
1) there is only one thread
2) mutexes of other buffers can be destroyed => we can't use them
*/
next_buffer->prev_sent_to_disk= buffer->offset;
}
}
/* Free buffer */ /* Free buffer */
buffer->file= NULL; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
...@@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
} }
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no); translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
old_buffer->next_buffer_offset= new_buffer->offset; old_buffer->next_buffer_offset= new_buffer->offset;
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_unlock(old_buffer); translog_buffer_unlock(old_buffer);
offset-= min_offset; offset-= min_offset;
} }
...@@ -7355,7 +7411,7 @@ static void translog_force_current_buffer_to_finish() ...@@ -7355,7 +7411,7 @@ static void translog_force_current_buffer_to_finish()
log_descriptor.bc.ptr+= current_page_fill; log_descriptor.bc.ptr+= current_page_fill;
log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill= log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
current_page_fill; current_page_fill;
new_buffer->overlay= old_buffer; new_buffer->overlay= 1;
} }
else else
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
...@@ -7428,8 +7484,8 @@ static void translog_force_current_buffer_to_finish() ...@@ -7428,8 +7484,8 @@ static void translog_force_current_buffer_to_finish()
memcpy(new_buffer->buffer, data, current_page_fill); memcpy(new_buffer->buffer, data, current_page_fill);
} }
old_buffer->next_buffer_offset= new_buffer->offset; old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer); translog_buffer_lock(new_buffer);
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_decrease_writers(new_buffer); translog_buffer_decrease_writers(new_buffer);
translog_buffer_unlock(new_buffer); translog_buffer_unlock(new_buffer);
......
...@@ -449,7 +449,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -449,7 +449,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
/* Ensure we have space in the key buffer for transaction id's */ /* Ensure we have space in the key buffer for transaction id's */
if (share->base.born_transactional) if (share->base.born_transactional)
share->base.max_key_length= ALIGN_SIZE(share->base.max_key_length + share->base.max_key_length= ALIGN_SIZE(share->base.max_key_length +
MAX_PACK_TRANSID_SIZE); MARIA_MAX_PACK_TRANSID_SIZE);
/* /*
If page cache is not initialized, then assume we will create the If page cache is not initialized, then assume we will create the
...@@ -824,6 +824,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -824,6 +824,7 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
VOID(my_rwlock_init(&share->mmap_lock, NULL)); VOID(my_rwlock_init(&share->mmap_lock, NULL));
share->row_is_visible= _ma_row_visible_always; share->row_is_visible= _ma_row_visible_always;
share->lock.get_status= _ma_reset_update_flag;
if (!thr_lock_inited) if (!thr_lock_inited)
{ {
/* Probably a single threaded program; Don't use concurrent inserts */ /* Probably a single threaded program; Don't use concurrent inserts */
......
...@@ -96,6 +96,8 @@ my_bool _ma_setup_live_state(MARIA_HA *info) ...@@ -96,6 +96,8 @@ my_bool _ma_setup_live_state(MARIA_HA *info)
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
/* The current item can't be deleted as it's the first one visible for us */ /* The current item can't be deleted as it's the first one visible for us */
tables->state_start= tables->state_current= history->state; tables->state_start= tables->state_current= history->state;
tables->state_current.changed= 0;
DBUG_PRINT("info", ("records: %ld", (ulong) tables->state_start.records)); DBUG_PRINT("info", ("records: %ld", (ulong) tables->state_start.records));
end: end:
...@@ -262,6 +264,7 @@ void _ma_get_status(void* param, my_bool concurrent_insert) ...@@ -262,6 +264,7 @@ void _ma_get_status(void* param, my_bool concurrent_insert)
#endif #endif
info->state_save= info->s->state.state; info->state_save= info->s->state.state;
info->state= &info->state_save; info->state= &info->state_save;
info->state->changed= 0;
info->append_insert_at_end= concurrent_insert; info->append_insert_at_end= concurrent_insert;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -315,6 +318,14 @@ void _ma_copy_status(void* to, void *from) ...@@ -315,6 +318,14 @@ void _ma_copy_status(void* to, void *from)
} }
void _ma_reset_update_flag(void *param,
my_bool concurrent_insert __attribute__((unused)))
{
MARIA_HA *info=(MARIA_HA*) param;
info->state->changed= 0;
}
/** /**
@brief Check if should allow concurrent inserts @brief Check if should allow concurrent inserts
......
...@@ -24,6 +24,7 @@ typedef struct st_maria_status_info ...@@ -24,6 +24,7 @@ typedef struct st_maria_status_info
my_off_t key_file_length; my_off_t key_file_length;
my_off_t data_file_length; my_off_t data_file_length;
ha_checksum checksum; ha_checksum checksum;
my_bool changed;
} MARIA_STATUS_INFO; } MARIA_STATUS_INFO;
...@@ -62,6 +63,7 @@ void _ma_get_status(void* param, my_bool concurrent_insert); ...@@ -62,6 +63,7 @@ void _ma_get_status(void* param, my_bool concurrent_insert);
void _ma_update_status(void* param); void _ma_update_status(void* param);
void _ma_restore_status(void *param); void _ma_restore_status(void *param);
void _ma_copy_status(void* to, void *from); void _ma_copy_status(void* to, void *from);
void _ma_reset_update_flag(void *param, my_bool concurrent_insert);
my_bool _ma_check_status(void *param); my_bool _ma_check_status(void *param);
void _ma_block_get_status(void* param, my_bool concurrent_insert); void _ma_block_get_status(void* param, my_bool concurrent_insert);
void _ma_block_update_status(void *param); void _ma_block_update_status(void *param);
......
...@@ -173,6 +173,7 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec) ...@@ -173,6 +173,7 @@ int maria_update(register MARIA_HA *info, const uchar *oldrec, uchar *newrec)
*/ */
info->update= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED | key_changed); info->update= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED | key_changed);
share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED; share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
info->state->changed= 1;
/* /*
Every Maria function that updates Maria table must end with Every Maria function that updates Maria table must end with
......
...@@ -289,6 +289,7 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -289,6 +289,7 @@ int maria_write(MARIA_HA *info, uchar *record)
info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN | info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
HA_STATE_ROW_CHANGED); HA_STATE_ROW_CHANGED);
share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED; share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED;
info->state->changed= 1;
info->cur_row.lastpos= filepos; info->cur_row.lastpos= filepos;
VOID(_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE));
......
...@@ -146,14 +146,15 @@ typedef struct st_maria_state_info ...@@ -146,14 +146,15 @@ typedef struct st_maria_state_info
#define MARIA_KEYDEF_SIZE (2+ 5*2) #define MARIA_KEYDEF_SIZE (2+ 5*2)
#define MARIA_UNIQUEDEF_SIZE (2+1+1) #define MARIA_UNIQUEDEF_SIZE (2+1+1)
#define HA_KEYSEG_SIZE (6+ 2*2 + 4*2) #define HA_KEYSEG_SIZE (6+ 2*2 + 4*2)
#define MARIA_MAX_KEY_BUFF (HA_MAX_KEY_BUFF + MAX_PACK_TRANSID_SIZE) #define MARIA_MAX_KEY_BUFF (HA_MAX_KEY_BUFF + MARIA_MAX_PACK_TRANSID_SIZE)
#define MARIA_COLUMNDEF_SIZE (2*7+1+1+4) #define MARIA_COLUMNDEF_SIZE (2*7+1+1+4)
#define MARIA_BASE_INFO_SIZE (MY_UUID_SIZE + 5*8 + 6*4 + 11*2 + 6 + 5*2 + 1 + 16) #define MARIA_BASE_INFO_SIZE (MY_UUID_SIZE + 5*8 + 6*4 + 11*2 + 6 + 5*2 + 1 + 16)
#define MARIA_INDEX_BLOCK_MARGIN 16 /* Safety margin for .MYI tables */ #define MARIA_INDEX_BLOCK_MARGIN 16 /* Safety margin for .MYI tables */
/* Internal management bytes needed to store 2 keys on an index page */ /* Internal management bytes needed to store 2 transid/key on an index page */
#define MAX_PACK_TRANSID_SIZE (TRANSID_SIZE+1) #define MARIA_MAX_PACK_TRANSID_SIZE (TRANSID_SIZE+1)
#define MIN_TRANSID_PACK_PREFIX (256-TRANSID_SIZE*2) #define MARIA_TRANSID_PACK_OFFSET (256- TRANSID_SIZE - 1)
#define MARIA_INDEX_OVERHEAD_SIZE (MAX_PACK_TRANSID_SIZE * 2) #define MARIA_MIN_TRANSID_PACK_OFFSET (MARIA_TRANSID_PACK_OFFSET-TRANSID_SIZE)
#define MARIA_INDEX_OVERHEAD_SIZE (MARIA_MAX_PACK_TRANSID_SIZE * 2)
#define MARIA_DELETE_KEY_NR 255 /* keynr for deleted blocks */ #define MARIA_DELETE_KEY_NR 255 /* keynr for deleted blocks */
/* /*
...@@ -941,8 +942,8 @@ extern my_bool _ma_compact_keypage(MARIA_HA *info, MARIA_KEYDEF *keyinfo, ...@@ -941,8 +942,8 @@ extern my_bool _ma_compact_keypage(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
extern uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid); extern uint transid_store_packed(MARIA_HA *info, uchar *to, ulonglong trid);
extern ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from); extern ulonglong transid_get_packed(MARIA_SHARE *share, const uchar *from);
#define transid_packed_length(data) \ #define transid_packed_length(data) \
((data)[0] < MIN_TRANSID_PACK_PREFIX ? 1 : \ ((data)[0] < MARIA_MIN_TRANSID_PACK_OFFSET ? 1 : \
(uint) (257 - (uchar) (data)[0])) (uint) ((uchar) (data)[0]) - (MARIA_TRANSID_PACK_OFFSET - 1))
#define key_has_transid(key) (*(key) & 1) #define key_has_transid(key) (*(key) & 1)
extern MARIA_KEY *_ma_make_key(MARIA_HA *info, MARIA_KEY *int_key, uint keynr, extern MARIA_KEY *_ma_make_key(MARIA_HA *info, MARIA_KEY *int_key, uint keynr,
......
...@@ -37,6 +37,13 @@ static TRN committed_list_min, committed_list_max; ...@@ -37,6 +37,13 @@ static TRN committed_list_min, committed_list_max;
/* a counter, used to generate transaction ids */ /* a counter, used to generate transaction ids */
static TrID global_trid_generator; static TrID global_trid_generator;
/*
The minimum existing transaction id for trnman_get_min_trid()
The default value is used when transaction manager not initialize;
Probably called from maria_chk
*/
static TrID trid_min_read_from= ~(TrID) 0;
/* the mutex for everything above */ /* the mutex for everything above */
static pthread_mutex_t LOCK_trn_list; static pthread_mutex_t LOCK_trn_list;
...@@ -158,6 +165,7 @@ int trnman_init(TrID initial_trid) ...@@ -158,6 +165,7 @@ int trnman_init(TrID initial_trid)
pool= 0; pool= 0;
global_trid_generator= initial_trid; global_trid_generator= initial_trid;
trid_min_read_from= initial_trid;
lf_hash_init(&trid_to_trn, sizeof(TRN*), LF_HASH_UNIQUE, lf_hash_init(&trid_to_trn, sizeof(TRN*), LF_HASH_UNIQUE,
0, 0, trn_get_hash_key, 0); 0, 0, trn_get_hash_key, 0);
DBUG_PRINT("info", ("pthread_mutex_init LOCK_trn_list")); DBUG_PRINT("info", ("pthread_mutex_init LOCK_trn_list"));
...@@ -303,6 +311,7 @@ TRN *trnman_new_trn(WT_THD *wt) ...@@ -303,6 +311,7 @@ TRN *trnman_new_trn(WT_THD *wt)
if (!trn->pins) if (!trn->pins)
{ {
trnman_free_trn(trn); trnman_free_trn(trn);
pthread_mutex_unlock(&LOCK_trn_list);
return 0; return 0;
} }
...@@ -315,6 +324,7 @@ TRN *trnman_new_trn(WT_THD *wt) ...@@ -315,6 +324,7 @@ TRN *trnman_new_trn(WT_THD *wt)
trn->next= &active_list_max; trn->next= &active_list_max;
trn->prev= active_list_max.prev; trn->prev= active_list_max.prev;
active_list_max.prev= trn->prev->next= trn; active_list_max.prev= trn->prev->next= trn;
trid_min_read_from= active_list_min.next->min_read_from;
DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list")); DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list"));
pthread_mutex_unlock(&LOCK_trn_list); pthread_mutex_unlock(&LOCK_trn_list);
...@@ -437,6 +447,8 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit) ...@@ -437,6 +447,8 @@ my_bool trnman_end_trn(TRN *trn, my_bool commit)
trn->next= free_me; trn->next= free_me;
free_me= trn; free_me= trn;
} }
trid_min_read_from= active_list_min.next->min_read_from;
if ((*trnman_end_trans_hook)(trn, commit, if ((*trnman_end_trans_hook)(trn, commit,
active_list_min.next != &active_list_max)) active_list_min.next != &active_list_max))
res= -1; res= -1;
...@@ -787,25 +799,14 @@ TRN *trnman_get_any_trn() ...@@ -787,25 +799,14 @@ TRN *trnman_get_any_trn()
/** /**
Returns the minimum existing transaction id Returns the minimum existing transaction id. May return a too small
number in race conditions, but this is ok as the value is used to
@notes remove not visible transid from index/rows.
This can only be called when we have at least one running transaction.
*/ */
TrID trnman_get_min_trid() TrID trnman_get_min_trid()
{ {
TrID min_read_from; return trid_min_read_from;
if (short_trid_to_active_trn == NULL)
{
/* Transaction manager not initialize; Probably called from maria_chk */
return ~(TrID) 0;
}
pthread_mutex_lock(&LOCK_trn_list);
min_read_from= active_list_min.next->min_read_from;
pthread_mutex_unlock(&LOCK_trn_list);
return min_read_from;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment