Commit ccf4a137 authored by unknown's avatar unknown

Formailized the row buffer structure, implemented new streaming format.


mysql-test/r/archive.result:
  Added cleanup for additional tables
mysql-test/t/archive.test:
  Added cleanup for additional tables.
storage/archive/ha_archive.cc:
  Rows are now proceeded with length. Added new record buffer structure and methods.
storage/archive/ha_archive.h:
  New structure for buffer
parent dd7e49e0
drop table if exists t1,t2,t3; drop table if exists t1,t2,t3,t4,t5;
CREATE TABLE t1 ( CREATE TABLE t1 (
Period smallint(4) unsigned zerofill DEFAULT '0000' NOT NULL, Period smallint(4) unsigned zerofill DEFAULT '0000' NOT NULL,
Varor_period smallint(4) unsigned DEFAULT '0' NOT NULL Varor_period smallint(4) unsigned DEFAULT '0' NOT NULL
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
-- source include/have_binlog_format_mixed_or_statement.inc -- source include/have_binlog_format_mixed_or_statement.inc
--disable_warnings --disable_warnings
drop table if exists t1,t2,t3; drop table if exists t1,t2,t3,t4,t5;
--enable_warnings --enable_warnings
CREATE TABLE t1 ( CREATE TABLE t1 (
......
...@@ -146,6 +146,11 @@ static handler *archive_create_handler(handlerton *hton, ...@@ -146,6 +146,11 @@ static handler *archive_create_handler(handlerton *hton,
*/ */
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
/*
Size of header used for row
*/
#define ARCHIVE_ROW_HEADER_SIZE 4
static handler *archive_create_handler(handlerton *hton, static handler *archive_create_handler(handlerton *hton,
TABLE_SHARE *table, TABLE_SHARE *table,
MEM_ROOT *mem_root) MEM_ROOT *mem_root)
...@@ -248,6 +253,8 @@ int ha_archive::read_data_header(azio_stream *file_to_read) ...@@ -248,6 +253,8 @@ int ha_archive::read_data_header(azio_stream *file_to_read)
DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0])); DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0]));
DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1])); DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1]));
share->data_version= (uchar)data_buffer[1];
if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) && if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
(data_buffer[1] != (uchar)ARCHIVE_VERSION)) (data_buffer[1] != (uchar)ARCHIVE_VERSION))
...@@ -283,6 +290,7 @@ error: ...@@ -283,6 +290,7 @@ error:
*rows will contain the current number of rows in the data file upon success. *rows will contain the current number of rows in the data file upon success.
*/ */
int ha_archive::read_meta_file(File meta_file, ha_rows *rows, int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
uint *meta_version,
ulonglong *auto_increment, ulonglong *auto_increment,
ulonglong *forced_flushes, ulonglong *forced_flushes,
char *real_path) char *real_path)
...@@ -326,6 +334,8 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows, ...@@ -326,6 +334,8 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
DBUG_PRINT("ha_archive::read_meta_file", ("Real Path %s", real_path)); DBUG_PRINT("ha_archive::read_meta_file", ("Real Path %s", real_path));
DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr))); DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
*meta_version= (uchar)meta_buffer[1];
if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) ||
((bool)(*ptr)== TRUE)) ((bool)(*ptr)== TRUE))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
...@@ -446,7 +456,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, ...@@ -446,7 +456,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST)); VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1) if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
share->crashed= TRUE; share->crashed= TRUE;
DBUG_PRINT("info", ("archive opening (1) up write at %s", DBUG_PRINT("ha_archive", ("archive opening (1) up write at %s",
share->data_file_name)); share->data_file_name));
/* /*
...@@ -454,6 +464,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, ...@@ -454,6 +464,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
a write. a write.
*/ */
if (read_meta_file(share->meta_file, &share->rows_recorded, if (read_meta_file(share->meta_file, &share->rows_recorded,
&share->meta_version,
&share->auto_increment_value, &share->auto_increment_value,
&share->forced_flushes, &share->forced_flushes,
share->real_path)) share->real_path))
...@@ -468,7 +479,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, ...@@ -468,7 +479,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
thr_lock_init(&share->lock); thr_lock_init(&share->lock);
} }
share->use_count++; share->use_count++;
DBUG_PRINT("info", ("archive table %.*s has %d open handles now", DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now",
share->table_name_length, share->table_name, share->table_name_length, share->table_name,
share->use_count)); share->use_count));
if (share->crashed) if (share->crashed)
...@@ -487,7 +498,7 @@ int ha_archive::free_share(ARCHIVE_SHARE *share) ...@@ -487,7 +498,7 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
{ {
int rc= 0; int rc= 0;
DBUG_ENTER("ha_archive::free_share"); DBUG_ENTER("ha_archive::free_share");
DBUG_PRINT("info", ("archive table %.*s has %d open handles on entrance", DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles on entrance",
share->table_name_length, share->table_name, share->table_name_length, share->table_name,
share->use_count)); share->use_count));
...@@ -539,7 +550,7 @@ int ha_archive::init_archive_writer() ...@@ -539,7 +550,7 @@ int ha_archive::init_archive_writer()
if (!(azopen(&(share->archive_write), share->data_file_name, if (!(azopen(&(share->archive_write), share->data_file_name,
O_WRONLY|O_APPEND|O_BINARY))) O_WRONLY|O_APPEND|O_BINARY)))
{ {
DBUG_PRINT("info", ("Could not open archive write file")); DBUG_PRINT("ha_archive", ("Could not open archive write file"));
share->crashed= TRUE; share->crashed= TRUE;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -575,7 +586,7 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -575,7 +586,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
int rc= 0; int rc= 0;
DBUG_ENTER("ha_archive::open"); DBUG_ENTER("ha_archive::open");
DBUG_PRINT("info", ("archive table was opened for crash: %s", DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
(open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no")); (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
share= get_share(name, table, &rc); share= get_share(name, table, &rc);
...@@ -589,9 +600,17 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -589,9 +600,17 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
thr_lock_data_init(&share->lock,&lock,NULL); record_buffer= create_record_buffer(table->s->reclength);
if (!record_buffer)
{
free_share(share);
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
}
thr_lock_data_init(&share->lock, &lock, NULL);
DBUG_PRINT("info", ("archive data_file_name %s", share->data_file_name)); DBUG_PRINT("ha_archive", ("archive data_file_name %s", share->data_file_name));
if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))) if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
{ {
if (errno == EROFS || errno == EACCES) if (errno == EROFS || errno == EACCES)
...@@ -599,7 +618,7 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -599,7 +618,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
} }
DBUG_PRINT("info", ("archive table was crashed %s", DBUG_PRINT("ha_archive", ("archive table was crashed %s",
rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no")); rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR) if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
{ {
...@@ -632,6 +651,8 @@ int ha_archive::close(void) ...@@ -632,6 +651,8 @@ int ha_archive::close(void)
int rc= 0; int rc= 0;
DBUG_ENTER("ha_archive::close"); DBUG_ENTER("ha_archive::close");
destroy_record_buffer(record_buffer);
/* First close stream */ /* First close stream */
if (azclose(&archive)) if (azclose(&archive))
rc= 1; rc= 1;
...@@ -676,7 +697,7 @@ int ha_archive::create(const char *name, TABLE *table_arg, ...@@ -676,7 +697,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if (!(field->flags & AUTO_INCREMENT_FLAG)) if (!(field->flags & AUTO_INCREMENT_FLAG))
{ {
error= -1; error= -1;
DBUG_PRINT("info", ("Index error in creating archive table")); DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
goto error; goto error;
} }
} }
...@@ -701,7 +722,7 @@ int ha_archive::create(const char *name, TABLE *table_arg, ...@@ -701,7 +722,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if (create_info->data_file_name) if (create_info->data_file_name)
{ {
char linkname[FN_REFLEN]; char linkname[FN_REFLEN];
DBUG_PRINT("info", ("archive will create stream file %s", DBUG_PRINT("ha_archive", ("archive will create stream file %s",
create_info->data_file_name)); create_info->data_file_name));
fn_format(name_buff, create_info->data_file_name, "", ARZ, fn_format(name_buff, create_info->data_file_name, "", ARZ,
...@@ -762,37 +783,74 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer) ...@@ -762,37 +783,74 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
{ {
my_off_t written; my_off_t written;
uint *ptr, *end; uint *ptr, *end;
int r_pack_length;
byte size_buffer[ARCHIVE_ROW_HEADER_SIZE]; // Longest possible row length with blobs
DBUG_ENTER("ha_archive::real_write_row"); DBUG_ENTER("ha_archive::real_write_row");
written= azwrite(writer, buf, table->s->reclength); // We pack the row for writing
r_pack_length= pack_row(buf);
DBUG_PRINT("ha_archive",("Pack row length %d", r_pack_length));
// Store the size of the row before the row
bzero(size_buffer, ARCHIVE_ROW_HEADER_SIZE);
int4store(size_buffer, (int)r_pack_length);
DBUG_PRINT("ha_archive",("Pack %d %d %d %d", size_buffer[0], size_buffer[1], size_buffer[2], size_buffer[3]));
azwrite(writer, size_buffer, ARCHIVE_ROW_HEADER_SIZE);
written= azwrite(writer, record_buffer->buffer, r_pack_length);
DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d",
(uint32)written, (uint32)written,
(uint32)table->s->reclength)); (uint32)r_pack_length));
if (!delayed_insert || !bulk_insert) if (!delayed_insert || !bulk_insert)
share->dirty= TRUE; share->dirty= TRUE;
if (written != (my_off_t)table->s->reclength) if (written != (my_off_t)r_pack_length)
DBUG_RETURN(errno ? errno : -1); DBUG_RETURN(errno ? errno : -1);
/*
We should probably mark the table as damagaged if the record is written DBUG_RETURN(0);
but the blob fails. }
*/
for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields ;
/* Calculate max length needed for row */
int ha_archive::max_row_length(const byte *buf)
{
ulonglong length= table->s->reclength + table->s->fields*2;
uint *ptr, *end;
for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
ptr != end ; ptr != end ;
ptr++) ptr++)
{ {
char *data_ptr; Field_blob *blob= ((Field_blob*) table->field[*ptr]);
uint32 size= ((Field_blob*) table->field[*ptr])->get_length(); length+= blob->get_length((char*) buf + blob->offset())+2;
}
if (size) return length;
{ }
((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr);
written= azwrite(writer, data_ptr, (unsigned)size);
if (written != (my_off_t)size) int ha_archive::pack_row(const byte *record)
DBUG_RETURN(errno ? errno : -1); {
} byte *ptr;
DBUG_ENTER("ha_archive::pack_row");
if (table->s->blob_fields)
{
if (fix_rec_buff(max_row_length(record)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
} }
DBUG_RETURN(0);
/* Copy null bits */
memcpy(record_buffer->buffer, record, table->s->null_bytes);
ptr= record_buffer->buffer + table->s->null_bytes;
for (Field **field=table->field ; *field ; field++)
ptr=(byte*) (*field)->pack((char*) ptr,
(char*) record + (*field)->offset());
DBUG_RETURN((size_t) (ptr - record_buffer->buffer));
} }
...@@ -809,7 +867,9 @@ int ha_archive::write_row(byte *buf) ...@@ -809,7 +867,9 @@ int ha_archive::write_row(byte *buf)
{ {
int rc; int rc;
byte *read_buf= NULL; byte *read_buf= NULL;
byte *ptr;
ulonglong temp_auto; ulonglong temp_auto;
DBUG_ENTER("ha_archive::write_row"); DBUG_ENTER("ha_archive::write_row");
if (share->crashed) if (share->crashed)
...@@ -866,12 +926,6 @@ int ha_archive::write_row(byte *buf) ...@@ -866,12 +926,6 @@ int ha_archive::write_row(byte *buf)
goto error; goto error;
} }
/*
Now we read and check all of the rows.
if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
if ((longlong)temp_auto ==
mfield->val_int((char*)(read_buf + mfield->offset())))
*/
Field *mfield= table->next_number_field; Field *mfield= table->next_number_field;
while (!(get_row(&archive, read_buf))) while (!(get_row(&archive, read_buf)))
...@@ -899,37 +953,8 @@ int ha_archive::write_row(byte *buf) ...@@ -899,37 +953,8 @@ int ha_archive::write_row(byte *buf)
if (init_archive_writer()) if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/*
Varchar structures are constant in size but are not cleaned up request
to request. The following sets all unused space to null to improve
compression.
*/
for (Field **field=table->field ; *field ; field++)
{
/*
Pack length will report 256 when you have 255 bytes
of data plus the single byte for length.
Probably could have added a method to say the number
of bytes taken up by field for the length data.
*/
uint32 actual_length= (*field)->data_length() +
((*field)->pack_length() > 256 ? 2 : 1);
if ((*field)->real_type() == MYSQL_TYPE_VARCHAR)
{
char *ptr= (*field)->ptr + actual_length;
DBUG_ASSERT(actual_length <= (*field)->pack_length());
uint32 to_free= (*field)->pack_length() - actual_length;
if (to_free > 0)
bzero(ptr, to_free);
}
}
share->rows_recorded++; share->rows_recorded++;
rc= real_write_row(buf, &(share->archive_write)); rc= real_write_row(buf, &(share->archive_write));
error: error:
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
if (read_buf) if (read_buf)
...@@ -1054,7 +1079,7 @@ int ha_archive::rnd_init(bool scan) ...@@ -1054,7 +1079,7 @@ int ha_archive::rnd_init(bool scan)
if (scan) if (scan)
{ {
scan_rows= share->rows_recorded; scan_rows= share->rows_recorded;
DBUG_PRINT("info", ("archive will retrieve %llu rows", DBUG_PRINT("ha_archive", ("archive will retrieve %llu rows",
(unsigned long long)scan_rows)); (unsigned long long)scan_rows));
stats.records= 0; stats.records= 0;
...@@ -1067,7 +1092,7 @@ int ha_archive::rnd_init(bool scan) ...@@ -1067,7 +1092,7 @@ int ha_archive::rnd_init(bool scan)
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE) if (share->dirty == TRUE)
{ {
DBUG_PRINT("info", ("archive flushing out rows for scan")); DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH); azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->forced_flushes++; share->forced_flushes++;
share->dirty= FALSE; share->dirty= FALSE;
...@@ -1088,16 +1113,90 @@ int ha_archive::rnd_init(bool scan) ...@@ -1088,16 +1113,90 @@ int ha_archive::rnd_init(bool scan)
positioned where you want it. positioned where you want it.
*/ */
int ha_archive::get_row(azio_stream *file_to_read, byte *buf) int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
{
int rc;
DBUG_ENTER("ha_archive::get_row");
if (share->data_version == ARCHIVE_VERSION)
rc= get_row_version3(file_to_read, buf);
else
rc= get_row_version2(file_to_read, buf);
DBUG_PRINT("ha_archive", ("Return %d\n", rc));
DBUG_RETURN(rc);
}
/* Reallocate buffer if needed */
bool ha_archive::fix_rec_buff(int length)
{
if (! record_buffer->buffer || length > record_buffer->length)
{
byte *newptr;
if (!(newptr=(byte*) my_realloc((gptr) record_buffer->buffer, length,
MYF(MY_ALLOW_ZERO_PTR))))
return 1; /* purecov: inspected */
record_buffer->buffer= newptr;
record_buffer->length= length;
}
return 0;
}
int ha_archive::unpack_row(azio_stream *file_to_read, char *record)
{
DBUG_ENTER("ha_archive::unpack_row");
int read; // Bytes read, azread() returns int
byte size_buffer[ARCHIVE_ROW_HEADER_SIZE];
int row_len;
/* First we grab the length stored */
read= azread(file_to_read, (byte *)size_buffer, ARCHIVE_ROW_HEADER_SIZE);
if (read == Z_STREAM_ERROR)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/* If we read nothing we are at the end of the file */
if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
DBUG_RETURN(HA_ERR_END_OF_FILE);
row_len= sint4korr(size_buffer);
DBUG_PRINT("ha_archive",("Unpack row length %d -> %llu", row_len,
(unsigned long long)table->s->reclength));
fix_rec_buff(row_len);
if (azread(file_to_read, record_buffer->buffer, row_len) != row_len)
DBUG_RETURN(-1);
/* Copy null bits */
const char *ptr= (const char*) record_buffer->buffer;
memcpy(record, ptr, table->s->null_bytes);
ptr+= table->s->null_bytes;
for (Field **field=table->field ; *field ; field++)
ptr= (*field)->unpack(record + (*field)->offset(), ptr);
DBUG_RETURN(0);
}
int ha_archive::get_row_version3(azio_stream *file_to_read, byte *buf)
{
DBUG_ENTER("ha_archive::get_row_version3");
int returnable= unpack_row(file_to_read, buf);
DBUG_RETURN(returnable);
}
int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf)
{ {
int read; // Bytes read, azread() returns int int read; // Bytes read, azread() returns int
uint *ptr, *end; uint *ptr, *end;
char *last; char *last;
size_t total_blob_length= 0; size_t total_blob_length= 0;
MY_BITMAP *read_set= table->read_set; MY_BITMAP *read_set= table->read_set;
DBUG_ENTER("ha_archive::get_row"); DBUG_ENTER("ha_archive::get_row_version2");
read= azread(file_to_read, buf, table->s->reclength); read= azread(file_to_read, buf, table->s->reclength);
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %lu", read, DBUG_PRINT("ha_archive::get_row_version2", ("Read %d bytes expected %lu", read,
(unsigned long)table->s->reclength)); (unsigned long)table->s->reclength));
if (read == Z_STREAM_ERROR) if (read == Z_STREAM_ERROR)
...@@ -1266,8 +1365,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1266,8 +1365,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (check_opt->flags == T_EXTEND) if (check_opt->flags == T_EXTEND)
{ {
DBUG_PRINT("info", ("archive extended rebuild")); DBUG_PRINT("ha_archive", ("archive extended rebuild"));
byte *buf; byte *buf;
archive_record_buffer *write_buffer, *read_buffer, *original_buffer;
original_buffer= record_buffer;
/* /*
First we create a buffer that we can use for reading rows, and can pass First we create a buffer that we can use for reading rows, and can pass
...@@ -1279,6 +1381,15 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1279,6 +1381,15 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
goto error; goto error;
} }
read_buffer= create_record_buffer(record_buffer->length);
write_buffer= create_record_buffer(record_buffer->length);
if (!write_buffer || !read_buffer)
{
rc= HA_ERR_OUT_OF_MEM;
goto error;
}
/* /*
Now we will rewind the archive file so that we are positioned at the Now we will rewind the archive file so that we are positioned at the
start of the file. start of the file.
...@@ -1300,8 +1411,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1300,8 +1411,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{ {
share->rows_recorded= 0; share->rows_recorded= 0;
stats.auto_increment_value= share->auto_increment_value= 0; stats.auto_increment_value= share->auto_increment_value= 0;
record_buffer= read_buffer;
while (!(rc= get_row(&archive, buf))) while (!(rc= get_row(&archive, buf)))
{ {
record_buffer= write_buffer;
real_write_row(buf, &writer); real_write_row(buf, &writer);
if (table->found_next_number_field) if (table->found_next_number_field)
{ {
...@@ -1313,18 +1427,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1313,18 +1427,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
auto_value; auto_value;
} }
share->rows_recorded++; share->rows_recorded++;
record_buffer= read_buffer;
} }
} }
DBUG_PRINT("info", ("recovered %llu archive rows",
DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
(unsigned long long)share->rows_recorded)); (unsigned long long)share->rows_recorded));
record_buffer= original_buffer;
destroy_record_buffer(read_buffer);
destroy_record_buffer(write_buffer);
my_free((char*)buf, MYF(0)); my_free((char*)buf, MYF(0));
if (rc && rc != HA_ERR_END_OF_FILE) if (rc && rc != HA_ERR_END_OF_FILE)
goto error; goto error;
} }
else else
{ {
DBUG_PRINT("info", ("archive quick rebuild")); DBUG_PRINT("ha_archive", ("archive quick rebuild"));
/* /*
The quick method is to just read the data raw, and then compress it directly. The quick method is to just read the data raw, and then compress it directly.
*/ */
...@@ -1333,7 +1453,7 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1333,7 +1453,7 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (azrewind(&archive) == -1) if (azrewind(&archive) == -1)
{ {
rc= HA_ERR_CRASHED_ON_USAGE; rc= HA_ERR_CRASHED_ON_USAGE;
DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE")); DBUG_PRINT("ha_archive", ("archive HA_ERR_CRASHED_ON_USAGE"));
goto error; goto error;
} }
...@@ -1359,12 +1479,12 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1359,12 +1479,12 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
know it failed. know it failed.
We also need to reopen our read descriptor since it has changed. We also need to reopen our read descriptor since it has changed.
*/ */
DBUG_PRINT("info", ("Reopening archive data file")); DBUG_PRINT("ha_archive", ("Reopening archive data file"));
if (!azopen(&(share->archive_write), share->data_file_name, if (!azopen(&(share->archive_write), share->data_file_name,
O_WRONLY|O_APPEND|O_BINARY) || O_WRONLY|O_APPEND|O_BINARY) ||
!azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)) !azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))
{ {
DBUG_PRINT("info", ("Could not open archive write file")); DBUG_PRINT("ha_archive", ("Could not open archive write file"));
rc= HA_ERR_CRASHED_ON_USAGE; rc= HA_ERR_CRASHED_ON_USAGE;
} }
...@@ -1577,6 +1697,36 @@ bool ha_archive::check_and_repair(THD *thd) ...@@ -1577,6 +1697,36 @@ bool ha_archive::check_and_repair(THD *thd)
DBUG_RETURN(repair(thd, &check_opt)); DBUG_RETURN(repair(thd, &check_opt));
} }
archive_record_buffer *ha_archive::create_record_buffer(ulonglong length)
{
DBUG_ENTER("ha_archive::create_record_buffer");
archive_record_buffer *r;
if (!(r=
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
MYF(MY_WME))))
{
DBUG_RETURN(NULL); /* purecov: inspected */
}
r->length= (int)length;
if (!(r->buffer= (byte*) my_malloc(r->length,
MYF(MY_WME))))
{
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(NULL); /* purecov: inspected */
}
DBUG_RETURN(r);
}
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
{
DBUG_ENTER("ha_archive::destroy_record_buffer");
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
DBUG_VOID_RETURN;
}
struct st_mysql_storage_engine archive_storage_engine= struct st_mysql_storage_engine archive_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };
...@@ -1590,7 +1740,7 @@ mysql_declare_plugin(archive) ...@@ -1590,7 +1740,7 @@ mysql_declare_plugin(archive)
PLUGIN_LICENSE_GPL, PLUGIN_LICENSE_GPL,
archive_db_init, /* Plugin Init */ archive_db_init, /* Plugin Init */
archive_db_done, /* Plugin Deinit */ archive_db_done, /* Plugin Deinit */
0x0100 /* 1.0 */, 0x0300 /* 1.0 */,
NULL, /* status variables */ NULL, /* status variables */
NULL, /* system variables */ NULL, /* system variables */
NULL /* config options */ NULL /* config options */
......
...@@ -27,6 +27,12 @@ ...@@ -27,6 +27,12 @@
ha_example.h. ha_example.h.
*/ */
typedef struct st_archive_record_buffer {
byte *buffer;
int length;
} archive_record_buffer;
typedef struct st_archive_share { typedef struct st_archive_share {
char *table_name; char *table_name;
char data_file_name[FN_REFLEN]; char data_file_name[FN_REFLEN];
...@@ -43,18 +49,23 @@ typedef struct st_archive_share { ...@@ -43,18 +49,23 @@ typedef struct st_archive_share {
ulonglong forced_flushes; ulonglong forced_flushes;
ulonglong mean_rec_length; ulonglong mean_rec_length;
char real_path[FN_REFLEN]; char real_path[FN_REFLEN];
uint meta_version;
uint data_version;
} ARCHIVE_SHARE; } ARCHIVE_SHARE;
/* /*
Version for file format. Version for file format.
1 - Initial Version 1 - Initial Version (Never Released)
2 - Stream Compression, seperate blobs, no packing
3 - One steam (row and blobs), with packing
*/ */
#define ARCHIVE_VERSION 2 #define ARCHIVE_VERSION 3
class ha_archive: public handler class ha_archive: public handler
{ {
THR_LOCK_DATA lock; /* MySQL lock */ THR_LOCK_DATA lock; /* MySQL lock */
ARCHIVE_SHARE *share; /* Shared lock info */ ARCHIVE_SHARE *share; /* Shared lock info */
azio_stream archive; /* Archive file we are working with */ azio_stream archive; /* Archive file we are working with */
my_off_t current_position; /* The position of the row we just read */ my_off_t current_position; /* The position of the row we just read */
byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */ byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */
...@@ -65,6 +76,10 @@ class ha_archive: public handler ...@@ -65,6 +76,10 @@ class ha_archive: public handler
const byte *current_key; const byte *current_key;
uint current_key_len; uint current_key_len;
uint current_k_offset; uint current_k_offset;
archive_record_buffer *record_buffer;
archive_record_buffer *create_record_buffer(ulonglong length);
void destroy_record_buffer(archive_record_buffer *r);
public: public:
ha_archive(handlerton *hton, TABLE_SHARE *table_arg); ha_archive(handlerton *hton, TABLE_SHARE *table_arg);
...@@ -105,7 +120,10 @@ public: ...@@ -105,7 +120,10 @@ public:
int rnd_next(byte *buf); int rnd_next(byte *buf);
int rnd_pos(byte * buf, byte *pos); int rnd_pos(byte * buf, byte *pos);
int get_row(azio_stream *file_to_read, byte *buf); int get_row(azio_stream *file_to_read, byte *buf);
int get_row_version2(azio_stream *file_to_read, byte *buf);
int get_row_version3(azio_stream *file_to_read, byte *buf);
int read_meta_file(File meta_file, ha_rows *rows, int read_meta_file(File meta_file, ha_rows *rows,
uint *meta_version,
ulonglong *auto_increment, ulonglong *auto_increment,
ulonglong *forced_flushes, ulonglong *forced_flushes,
char *real_path); char *real_path);
...@@ -137,5 +155,9 @@ public: ...@@ -137,5 +155,9 @@ public:
bool is_crashed() const; bool is_crashed() const;
int check(THD* thd, HA_CHECK_OPT* check_opt); int check(THD* thd, HA_CHECK_OPT* check_opt);
bool check_and_repair(THD *thd); bool check_and_repair(THD *thd);
int max_row_length(const byte *buf);
bool fix_rec_buff(int length);
int unpack_row(azio_stream *file_to_read, char *record);
int pack_row(const byte *record);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment