Commit a80d50bc authored by unknown's avatar unknown

Fixed locking issues around flushes.


storage/archive/ha_archive.cc:
  Added mutex around flush calls to make read's consistent (and not conflicting)
storage/archive/ha_archive.h:
  Fixed issues for fast count(*) call
parent 288a7a33
......@@ -436,6 +436,9 @@ int ha_archive::init_archive_writer()
}
/*
No locks are required because it is associated with just one handler instance
*/
int ha_archive::init_archive_reader()
{
DBUG_ENTER("ha_archive::init_archive_reader");
......@@ -794,15 +797,16 @@ int ha_archive::write_row(uchar *buf)
if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (table->next_number_field && record == table->record[0])
{
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
......@@ -992,24 +996,6 @@ int ha_archive::rnd_init(bool scan)
{
DBUG_PRINT("info", ("archive will retrieve %llu rows",
(unsigned long long) scan_rows));
stats.records= 0;
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
scan_rows= share->rows_recorded;
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
pthread_mutex_unlock(&share->mutex);
if (read_data_header(&archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
......@@ -1223,9 +1209,7 @@ int ha_archive::rnd_next(uchar *buf)
current_position= aztell(&archive);
rc= get_row(&archive, buf);
if (rc != HA_ERR_END_OF_FILE)
stats.records++;
table->status=rc ? STATUS_NOT_FOUND: 0;
DBUG_RETURN(rc);
}
......@@ -1461,12 +1445,33 @@ void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
int ha_archive::info(uint flag)
{
DBUG_ENTER("ha_archive::info");
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
/*
This should be an accurate number now, though bulk and delayed inserts can
cause the number to be inaccurate.
*/
stats.records= share->rows_recorded;
pthread_mutex_unlock(&share->mutex);
scan_rows= stats.records;
stats.deleted= 0;
DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
/* Costs quite a bit more to get all information */
if (flag & HA_STATUS_TIME)
{
......@@ -1486,7 +1491,9 @@ int ha_archive::info(uint flag)
if (flag & HA_STATUS_AUTO)
{
init_archive_reader();
pthread_mutex_lock(&share->mutex);
azflush(&archive, Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
stats.auto_increment_value= archive.auto_increment;
}
......@@ -1554,7 +1561,9 @@ int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
old_proc_info= thd_proc_info(thd, "Checking table");
/* Flush any waiting data */
pthread_mutex_lock(&share->mutex);
azflush(&(share->archive_write), Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
/*
Now we will rewind the archive file so that we are positioned at the
......
......@@ -88,6 +88,8 @@ class ha_archive: public handler
{
return (HA_NO_TRANSACTIONS | HA_REC_NOT_IN_SEQ | HA_CAN_BIT_FIELD |
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_STATS_RECORDS_IS_EXACT |
HA_HAS_RECORDS |
HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY);
}
ulong index_flags(uint idx, uint part, bool all_parts) const
......@@ -101,6 +103,7 @@ class ha_archive: public handler
uint max_supported_keys() const { return 1; }
uint max_supported_key_length() const { return sizeof(ulonglong); }
uint max_supported_key_part_length() const { return sizeof(ulonglong); }
ha_rows records() { return share->rows_recorded; }
int index_init(uint keynr, bool sorted);
virtual int index_read(uchar * buf, const uchar * key,
uint key_len, enum ha_rkey_function find_flag);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment