Row level lock code for the archive storage engine.

parent ef7bdbf4
...@@ -42,7 +42,18 @@ ...@@ -42,7 +42,18 @@
handle bulk inserts as well (that is if someone was trying to read at handle bulk inserts as well (that is if someone was trying to read at
the same time since we would want to flush). the same time since we would want to flush).
No attempts at durability are made. You can corrupt your data. A "meta" file is kept. All this file does is contain information on
the number of rows.
No attempts at durability are made. You can corrupt your data. A repair
method was added to repair the meta file that stores row information,
but if your data file gets corrupted I haven't solved that. I could
create a repair that would solve this, but do you want to take a
chance of loosing your data?
Locks are row level, and you will get a consistant read. Transactions
will be added later (they are not that hard to add at this
stage).
For performance as far as table scans go it is quite fast. I don't have For performance as far as table scans go it is quite fast. I don't have
good numbers but locally it has out performed both Innodb and MyISAM. For good numbers but locally it has out performed both Innodb and MyISAM. For
...@@ -71,14 +82,35 @@ ...@@ -71,14 +82,35 @@
Add truncate table command. Add truncate table command.
Implement versioning, should be easy. Implement versioning, should be easy.
Allow for errors, find a way to mark bad rows. Allow for errors, find a way to mark bad rows.
See if during an optimize you can make the table smaller.
Talk to the gzip guys, come up with a writable format so that updates are doable Talk to the gzip guys, come up with a writable format so that updates are doable
without switching to a block method. without switching to a block method.
Add optional feature so that rows can be flushed at interval (which will cause less Add optional feature so that rows can be flushed at interval (which will cause less
compression but may speed up ordered searches). compression but may speed up ordered searches).
Checkpoint the meta file to allow for faster rebuilds.
Dirty open (right now the meta file is repaired if a crash occured).
Transactions.
Option to allow for dirty reads, this would lower the sync calls, which would make
inserts a lot faster, but would mean highly arbitrary reads.
-Brian -Brian
*/ */
/*
Notes on file formats.
The Meta file is layed out as:
check - Just an int of 254 to make sure that the the file we are opening was
never corrupted.
version - The current version of the file format.
rows - This is an unsigned long long which is the number of rows in the data
file.
check point - Reserved for future use
dirty - Status of the file, whether or not its values are the latest. This flag
is what causes a repair to occur
The data file:
check - Just an int of 254 to make sure that the the file we are opening was
never corrupted.
version - The current version of the file format.
data - The data is stored in a "row +blobs" format.
/* Variables for archive share methods */ /* Variables for archive share methods */
pthread_mutex_t archive_mutex; pthread_mutex_t archive_mutex;
...@@ -86,8 +118,11 @@ static HASH archive_open_tables; ...@@ -86,8 +118,11 @@ static HASH archive_open_tables;
static int archive_init= 0; static int archive_init= 0;
/* The file extension */ /* The file extension */
#define ARZ ".ARZ" #define ARZ ".ARZ" // The data file
#define ARN ".ARN" #define ARN ".ARN" // Files used during an optimize call
#define ARM ".ARM" // Meta file
#define META_BUFFER_SIZE 24 // Size of the data used in the meta file
#define CHECK_HEADER 254 // The number we use to determine corruption
/* /*
Used for hash table that tracks open tables. Used for hash table that tracks open tables.
...@@ -99,14 +134,132 @@ static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length, ...@@ -99,14 +134,132 @@ static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length,
return (byte*) share->table_name; return (byte*) share->table_name;
} }
/*
This method reads the header of a datafile and returns whether or not it was successful.
*/
int ha_archive::read_data_header(gzFile file_to_read)
{
int check; // We use this to check the header
DBUG_ENTER("ha_archive::read_data_header");
if (gzrewind(file_to_read) == -1)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (gzread(file_to_read, &check, sizeof(int)) != sizeof(int))
DBUG_RETURN(errno ? errno : -1);
if (check != CHECK_HEADER)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (gzread(file_to_read, &version, sizeof(version)) != sizeof(version))
DBUG_RETURN(errno ? errno : -1);
DBUG_RETURN(0);
}
/* /*
Example of simple lock controls. This method writes out the header of a datafile and returns whether or not it was successful.
See ha_example.cc for a description. */
int ha_archive::write_data_header(gzFile file_to_write)
{
int check= CHECK_HEADER;
DBUG_ENTER("ha_archive::write_data_header");
if (gzwrite(file_to_write, &check, sizeof(int)) != sizeof(int))
goto error;
if (gzwrite(file_to_write, &version, sizeof(int)) != sizeof(version))
goto error;
DBUG_RETURN(0);
error:
DBUG_RETURN(errno);
}
/*
This method reads the header of a meta file and returns whether or not it was successful.
*rows will contain the current number of rows in the data file upon success.
*/
int ha_archive::read_meta_file(File meta_file, ulonglong *rows)
{
size_t size= sizeof(ulonglong) + sizeof(version) + sizeof(bool); // calculate
byte meta_buffer[META_BUFFER_SIZE];
bool dirty;
int check;
ulonglong check_point;
DBUG_ENTER("ha_archive::read_meta_file");
/*
Format of the meta file is:
version
number of rows
byte showing if the file was stored
*/
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
if (my_read(meta_file, meta_buffer, size, MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1);
/*
Parse out the meta data, we ignore version at the moment
*/
memcpy(&check, meta_buffer + sizeof(int), sizeof(int));
longlongstore(rows, meta_buffer + sizeof(version) + sizeof(int));
longlongstore(&check_point, meta_buffer + sizeof(version) + sizeof(int) +
sizeof(ulonglong));
memcpy(&dirty, meta_buffer+sizeof(ulonglong) + sizeof(version) +
sizeof(ulonglong) + sizeof(int), sizeof(bool));
if (dirty == TRUE)
DBUG_RETURN(-1);
my_sync(meta_file, MYF(MY_WME));
DBUG_RETURN(0);
}
/*
This method writes out the header of a meta file and returns whether or not it was successful.
By setting dirty you say whether or not the file represents the actual state of the data file.
Upon ::open() we set to dirty, and upon ::close() we set to clean. If we determine during
a read that the file was dirty we will force a rebuild of this file.
*/
int ha_archive::write_meta_file(File meta_file, ulonglong rows, bool dirty)
{
char meta_buffer[META_BUFFER_SIZE];
ulonglong check_port= 0;
size_t size= sizeof(ulonglong) + sizeof(version) + sizeof(bool) +
sizeof(ulonglong); // calculate length of data
DBUG_ENTER("ha_archive::write_meta_file");
/*
Format of the meta file is:
version
number of rows
byte showing if the file was stored
*/
version= ARCHIVE_VERSION;
memcpy(meta_buffer, &version, sizeof(version));
longlongstore(meta_buffer + sizeof(version), rows); // Position past version
longlongstore(meta_buffer + sizeof(version) + sizeof(ulonglong), check_port);
memcpy(meta_buffer+sizeof(ulonglong) + sizeof(version) + + sizeof(ulonglong), &dirty, sizeof(bool));
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
if (my_write(meta_file, meta_buffer, size, MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1);
my_sync(meta_file, MYF(MY_WME));
DBUG_RETURN(0);
}
/*
We create the shared memory space that we will use for the open table.
See ha_example.cc for a longer description.
*/ */
static ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table) ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
{ {
ARCHIVE_SHARE *share; ARCHIVE_SHARE *share;
char meta_file_name[FN_REFLEN];
uint length; uint length;
char *tmp_name; char *tmp_name;
...@@ -143,33 +296,62 @@ static ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -143,33 +296,62 @@ static ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table)
return NULL; return NULL;
} }
share->use_count=0; share->use_count= 0;
share->table_name_length=length; share->table_name_length= length;
share->table_name=tmp_name; share->table_name= tmp_name;
fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
strmov(share->table_name,table_name); strmov(share->table_name,table_name);
/*
We will use this lock for rows.
*/
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
goto error;
if (read_meta_file(share->meta_file, &share->rows_recorded))
{
/*
The problem here is that for some reason, probably a crash, the meta
file has been corrupted. So what do we do? Well we try to rebuild it
ourself. Once that happens, we reread it, but if that fails we just
call it quits and return an error.
*/
if (rebuild_meta_file(share->table_name, share->meta_file))
goto error;
if (read_meta_file(share->meta_file, &share->rows_recorded))
goto error;
}
/*
After we read, we set the file to dirty. When we close, we will do the
opposite.
*/
(void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
/* /*
It is expensive to open and close the data files and since you can't have It is expensive to open and close the data files and since you can't have
a gzip file that can be both read and written we keep a writer open a gzip file that can be both read and written we keep a writer open
that is shared amoung all open tables. that is shared amoung all open tables.
*/ */
if ((share->archive_write= gzopen(share->data_file_name, "ab")) == NULL) if ((share->archive_write= gzopen(share->data_file_name, "ab")) == NULL)
goto error; goto error2;
if (my_hash_insert(&archive_open_tables, (byte*) share)) if (my_hash_insert(&archive_open_tables, (byte*) share))
goto error; goto error2;
thr_lock_init(&share->lock); thr_lock_init(&share->lock);
if (pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST)) if (pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST))
goto error2; goto error3;
} }
share->use_count++; share->use_count++;
pthread_mutex_unlock(&archive_mutex); pthread_mutex_unlock(&archive_mutex);
return share; return share;
error2: error3:
VOID(pthread_mutex_destroy(&share->mutex));
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
/* We close, but ignore errors since we already have errors */ /* We close, but ignore errors since we already have errors */
(void)gzclose(share->archive_write); (void)gzclose(share->archive_write);
error2:
my_close(share->meta_file,MYF(0));
error: error:
pthread_mutex_unlock(&archive_mutex); pthread_mutex_unlock(&archive_mutex);
my_free((gptr) share, MYF(0)); my_free((gptr) share, MYF(0));
...@@ -179,10 +361,10 @@ error: ...@@ -179,10 +361,10 @@ error:
/* /*
Free lock controls. Free the share.
See ha_example.cc for a description. See ha_example.cc for a description.
*/ */
static int free_share(ARCHIVE_SHARE *share) int ha_archive::free_share(ARCHIVE_SHARE *share)
{ {
int rc= 0; int rc= 0;
pthread_mutex_lock(&archive_mutex); pthread_mutex_lock(&archive_mutex);
...@@ -190,7 +372,8 @@ static int free_share(ARCHIVE_SHARE *share) ...@@ -190,7 +372,8 @@ static int free_share(ARCHIVE_SHARE *share)
{ {
hash_delete(&archive_open_tables, (byte*) share); hash_delete(&archive_open_tables, (byte*) share);
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex); VOID(pthread_mutex_destroy(&share->mutex));
(void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
if (gzclose(share->archive_write) == Z_ERRNO) if (gzclose(share->archive_write) == Z_ERRNO)
rc= 1; rc= 1;
my_free((gptr) share, MYF(0)); my_free((gptr) share, MYF(0));
...@@ -205,7 +388,7 @@ static int free_share(ARCHIVE_SHARE *share) ...@@ -205,7 +388,7 @@ static int free_share(ARCHIVE_SHARE *share)
We just implement one additional file extension. We just implement one additional file extension.
*/ */
const char **ha_archive::bas_ext() const const char **ha_archive::bas_ext() const
{ static const char *ext[]= { ARZ, ARN, NullS }; return ext; } { static const char *ext[]= { ARZ, ARN, ARM, NullS }; return ext; }
/* /*
...@@ -213,7 +396,6 @@ const char **ha_archive::bas_ext() const ...@@ -213,7 +396,6 @@ const char **ha_archive::bas_ext() const
Create/get our shared structure. Create/get our shared structure.
Init out lock. Init out lock.
We open the file we will read from. We open the file we will read from.
Set the size of ref_length.
*/ */
int ha_archive::open(const char *name, int mode, uint test_if_locked) int ha_archive::open(const char *name, int mode, uint test_if_locked)
{ {
...@@ -284,36 +466,58 @@ int ha_archive::close(void) ...@@ -284,36 +466,58 @@ int ha_archive::close(void)
int ha_archive::create(const char *name, TABLE *table_arg, int ha_archive::create(const char *name, TABLE *table_arg,
HA_CREATE_INFO *create_info) HA_CREATE_INFO *create_info)
{ {
File create_file; File create_file; // We use to create the datafile and the metafile
char name_buff[FN_REFLEN]; char name_buff[FN_REFLEN];
size_t written; size_t written;
int error; int error;
DBUG_ENTER("ha_archive::create"); DBUG_ENTER("ha_archive::create");
/*
Right now version for the meta file and the data file is the same.
*/
version= ARCHIVE_VERSION;
if ((create_file= my_create(fn_format(name_buff,name,"",ARM,
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
{
error= my_errno;
goto error;
}
write_meta_file(create_file, 0, FALSE);
my_close(create_file,MYF(0));
/*
We reuse name_buff since it is available.
*/
if ((create_file= my_create(fn_format(name_buff,name,"",ARZ, if ((create_file= my_create(fn_format(name_buff,name,"",ARZ,
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0, MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
{ {
error= my_errno; error= my_errno;
goto err; goto error;
} }
if ((archive= gzdopen(create_file, "ab")) == NULL) if ((archive= gzdopen(create_file, "ab")) == NULL)
{ {
error= errno; error= errno;
delete_table(name); delete_table(name);
goto err; goto error;
} }
version= ARCHIVE_VERSION; if (write_data_header(archive))
written= gzwrite(archive, &version, sizeof(version));
if (gzclose(archive) || written != sizeof(version))
{ {
error= errno; gzclose(archive);
delete_table(name); goto error2;
goto err;
} }
if (gzclose(archive))
goto error2;
DBUG_RETURN(0); DBUG_RETURN(0);
err: error2:
error= errno;
delete_table(name);
error:
/* Return error number, if we got one */ /* Return error number, if we got one */
DBUG_RETURN(error ? error : -1); DBUG_RETURN(error ? error : -1);
} }
...@@ -330,30 +534,41 @@ err: ...@@ -330,30 +534,41 @@ err:
*/ */
int ha_archive::write_row(byte * buf) int ha_archive::write_row(byte * buf)
{ {
char *pos;
z_off_t written; z_off_t written;
DBUG_ENTER("ha_archive::write_row"); DBUG_ENTER("ha_archive::write_row");
statistic_increment(ha_write_count,&LOCK_status); statistic_increment(ha_write_count,&LOCK_status);
if (table->timestamp_default_now) if (table->timestamp_default_now)
update_timestamp(buf+table->timestamp_default_now-1); update_timestamp(buf+table->timestamp_default_now-1);
pthread_mutex_lock(&share->mutex);
written= gzwrite(share->archive_write, buf, table->reclength); written= gzwrite(share->archive_write, buf, table->reclength);
share->dirty= TRUE; share->dirty= TRUE;
if (written != table->reclength) if (written != table->reclength)
DBUG_RETURN(errno ? errno : -1); goto error;
/*
We should probably mark the table as damagaged if the record is written
but the blob fails.
*/
for (Field_blob **field=table->blob_field ; *field ; field++) for (Field_blob **field=table->blob_field ; *field ; field++)
{ {
char *ptr; char *ptr;
uint32 size= (*field)->get_length(); uint32 size= (*field)->get_length();
(*field)->get_ptr(&ptr); if (size)
written= gzwrite(share->archive_write, ptr, (unsigned)size); {
if (written != size) (*field)->get_ptr(&ptr);
DBUG_RETURN(errno ? errno : -1); written= gzwrite(share->archive_write, ptr, (unsigned)size);
if (written != size)
goto error;
}
} }
share->rows_recorded++;
pthread_mutex_unlock(&share->mutex);
DBUG_RETURN(0); DBUG_RETURN(0);
error:
pthread_mutex_unlock(&share->mutex);
DBUG_RETURN(errno ? errno : -1);
} }
...@@ -369,37 +584,28 @@ int ha_archive::rnd_init(bool scan) ...@@ -369,37 +584,28 @@ int ha_archive::rnd_init(bool scan)
int read; // gzread() returns int, and we use this to check the header int read; // gzread() returns int, and we use this to check the header
/* We rewind the file so that we can read from the beginning if scan */ /* We rewind the file so that we can read from the beginning if scan */
if(scan) if (scan)
{ {
scan_rows= share->rows_recorded;
records= 0; records= 0;
if (gzrewind(archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
}
/* /*
If dirty, we lock, and then reset/flush the data. If dirty, we lock, and then reset/flush the data.
I found that just calling gzflush() doesn't always work. I found that just calling gzflush() doesn't always work.
*/ */
if (share->dirty == TRUE)
{
pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE) if (share->dirty == TRUE)
{ {
gzflush(share->archive_write, Z_SYNC_FLUSH); pthread_mutex_lock(&share->mutex);
share->dirty= FALSE; if (share->dirty == TRUE)
{
gzflush(share->archive_write, Z_SYNC_FLUSH);
share->dirty= FALSE;
}
pthread_mutex_unlock(&share->mutex);
} }
pthread_mutex_unlock(&share->mutex);
} if (read_data_header(archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/*
At the moment we just check the size of version to make sure the header is
intact.
*/
if (scan)
{
read= gzread(archive, &version, sizeof(version));
if (read != sizeof(version))
DBUG_RETURN(errno ? errno : -1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -410,14 +616,19 @@ int ha_archive::rnd_init(bool scan) ...@@ -410,14 +616,19 @@ int ha_archive::rnd_init(bool scan)
This is the method that is used to read a row. It assumes that the row is This is the method that is used to read a row. It assumes that the row is
positioned where you want it. positioned where you want it.
*/ */
int ha_archive::get_row(byte *buf) int ha_archive::get_row(gzFile file_to_read, byte *buf)
{ {
int read; // Bytes read, gzread() returns int int read; // Bytes read, gzread() returns int
char *last; char *last;
size_t total_blob_length= 0; size_t total_blob_length= 0;
Field_blob **field;
DBUG_ENTER("ha_archive::get_row"); DBUG_ENTER("ha_archive::get_row");
read= gzread(archive, buf, table->reclength); read= gzread(file_to_read, buf, table->reclength);
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes", read));
if (read == Z_STREAM_ERROR)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/* If we read nothing we are at the end of the file */ /* If we read nothing we are at the end of the file */
if (read == 0) if (read == 0)
...@@ -428,7 +639,7 @@ int ha_archive::get_row(byte *buf) ...@@ -428,7 +639,7 @@ int ha_archive::get_row(byte *buf)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/* Calculate blob length, we use this for our buffer */ /* Calculate blob length, we use this for our buffer */
for (Field_blob **field=table->blob_field; *field ; field++) for (field=table->blob_field; *field ; field++)
total_blob_length += (*field)->get_length(); total_blob_length += (*field)->get_length();
/* Adjust our row buffer if we need be */ /* Adjust our row buffer if we need be */
...@@ -436,14 +647,17 @@ int ha_archive::get_row(byte *buf) ...@@ -436,14 +647,17 @@ int ha_archive::get_row(byte *buf)
last= (char *)buffer.ptr(); last= (char *)buffer.ptr();
/* Loop through our blobs and read them */ /* Loop through our blobs and read them */
for (Field_blob **field=table->blob_field; *field ; field++) for (field=table->blob_field; *field ; field++)
{ {
size_t size= (*field)->get_length(); size_t size= (*field)->get_length();
read= gzread(archive, last, size); if (size)
if ((size_t) read != size) {
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); read= gzread(file_to_read, last, size);
(*field)->set_ptr(size, last); if ((size_t) read != size)
last += size; DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
(*field)->set_ptr(size, last);
last += size;
}
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -459,9 +673,15 @@ int ha_archive::rnd_next(byte *buf) ...@@ -459,9 +673,15 @@ int ha_archive::rnd_next(byte *buf)
int rc; int rc;
DBUG_ENTER("ha_archive::rnd_next"); DBUG_ENTER("ha_archive::rnd_next");
if (!scan_rows)
DBUG_RETURN(HA_ERR_END_OF_FILE);
scan_rows--;
statistic_increment(ha_read_rnd_next_count,&LOCK_status); statistic_increment(ha_read_rnd_next_count,&LOCK_status);
current_position= gztell(archive); current_position= gztell(archive);
rc= get_row(buf); rc= get_row(archive, buf);
if (rc != HA_ERR_END_OF_FILE) if (rc != HA_ERR_END_OF_FILE)
records++; records++;
...@@ -474,6 +694,7 @@ int ha_archive::rnd_next(byte *buf) ...@@ -474,6 +694,7 @@ int ha_archive::rnd_next(byte *buf)
each call to ha_archive::rnd_next() if an ordering of the rows is each call to ha_archive::rnd_next() if an ordering of the rows is
needed. needed.
*/ */
void ha_archive::position(const byte *record) void ha_archive::position(const byte *record)
{ {
DBUG_ENTER("ha_archive::position"); DBUG_ENTER("ha_archive::position");
...@@ -496,13 +717,70 @@ int ha_archive::rnd_pos(byte * buf, byte *pos) ...@@ -496,13 +717,70 @@ int ha_archive::rnd_pos(byte * buf, byte *pos)
current_position= ha_get_ptr(pos, ref_length); current_position= ha_get_ptr(pos, ref_length);
z_off_t seek= gzseek(archive, current_position, SEEK_SET); z_off_t seek= gzseek(archive, current_position, SEEK_SET);
DBUG_RETURN(get_row(buf)); DBUG_RETURN(get_row(archive, buf));
}
/*
This method rebuilds the meta file. It does this by walking the datafile and
rewriting the meta file.
*/
int ha_archive::rebuild_meta_file(char *table_name, File meta_file)
{
int rc;
byte *buf;
ulonglong rows_recorded= 0;
gzFile rebuild_file; /* Archive file we are working with */
char data_file_name[FN_REFLEN];
DBUG_ENTER("ha_archive::rebuild_meta_file");
/*
Open up the meta file to recreate it.
*/
fn_format(data_file_name, table_name, "", ARZ,
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
if ((rebuild_file= gzopen(data_file_name, "rb")) == NULL)
DBUG_RETURN(errno ? errno : -1);
if (rc= read_data_header(rebuild_file))
goto error;
/*
We malloc up the buffer we will use for counting the rows.
I know, this malloc'ing memory but this should be a very
rare event.
*/
if (!(buf= (byte*) my_malloc(table->rec_buff_length > sizeof(ulonglong) +1 ?
table->rec_buff_length : sizeof(ulonglong) +1 ,
MYF(MY_WME))))
{
rc= HA_ERR_CRASHED_ON_USAGE;
goto error;
}
while (!(rc= get_row(rebuild_file, buf)))
rows_recorded++;
/*
Only if we reach the end of the file do we assume we can rewrite.
At this point we reset rc to a non-message state.
*/
if (rc == HA_ERR_END_OF_FILE)
{
(void)write_meta_file(meta_file, rows_recorded, FALSE);
rc= 0;
}
my_free((gptr) buf, MYF(0));
error:
gzclose(rebuild_file);
DBUG_RETURN(rc);
} }
/* /*
The table can become fragmented if data was inserted, read, and then The table can become fragmented if data was inserted, read, and then
inserted again. What we do is open up the file and recompress it completely. inserted again. What we do is open up the file and recompress it completely.
*/ */
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{ {
DBUG_ENTER("ha_archive::optimize"); DBUG_ENTER("ha_archive::optimize");
...@@ -512,7 +790,8 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -512,7 +790,8 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
char writer_filename[FN_REFLEN]; char writer_filename[FN_REFLEN];
/* Lets create a file to contain the new data */ /* Lets create a file to contain the new data */
fn_format(writer_filename,share->table_name,"",ARN, MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(writer_filename, share->table_name, "", ARN,
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
/* Closing will cause all data waiting to be flushed, to be flushed */ /* Closing will cause all data waiting to be flushed, to be flushed */
gzclose(share->archive_write); gzclose(share->archive_write);
...@@ -547,6 +826,59 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -547,6 +826,59 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
No transactions yet, so this is pretty dull.
*/
int ha_archive::external_lock(THD *thd, int lock_type)
{
DBUG_ENTER("ha_archive::external_lock");
DBUG_RETURN(0);
}
/*
Below is an example of how to setup row level locking.
*/
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type)
{
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
/*
Here is where we get into the guts of a row level lock.
If TL_UNLOCK is set
If we are not doing a LOCK TABLE or DISCARD/IMPORT
TABLESPACE, then allow multiple writers
*/
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
lock_type <= TL_WRITE) && !thd->in_lock_tables
&& !thd->tablespace_op) {
lock_type = TL_WRITE_ALLOW_WRITE;
}
/*
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
to t2. Convert the lock to a normal read lock to allow
concurrent inserts to t2.
*/
if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) {
lock_type = TL_READ;
}
lock.type=lock_type;
}
*to++= &lock;
return to;
}
/****************************************************************************** /******************************************************************************
Everything below here is default, please look at ha_example.cc for Everything below here is default, please look at ha_example.cc for
...@@ -616,8 +948,8 @@ void ha_archive::info(uint flag) ...@@ -616,8 +948,8 @@ void ha_archive::info(uint flag)
DBUG_ENTER("ha_archive::info"); DBUG_ENTER("ha_archive::info");
/* This is a lie, but you don't want the optimizer to see zero or 1 */ /* This is a lie, but you don't want the optimizer to see zero or 1 */
if (records < 2) records= share->rows_recorded;
records= 2; deleted= 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -634,23 +966,6 @@ int ha_archive::reset(void) ...@@ -634,23 +966,6 @@ int ha_archive::reset(void)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int ha_archive::external_lock(THD *thd, int lock_type)
{
DBUG_ENTER("ha_archive::external_lock");
DBUG_RETURN(0);
}
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type)
{
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
lock.type=lock_type;
*to++= &lock;
return to;
}
ha_rows ha_archive::records_in_range(uint inx, key_range *min_key, ha_rows ha_archive::records_in_range(uint inx, key_range *min_key,
key_range *max_key) key_range *max_key)
{ {
......
...@@ -32,8 +32,10 @@ typedef struct st_archive_share { ...@@ -32,8 +32,10 @@ typedef struct st_archive_share {
uint table_name_length,use_count; uint table_name_length,use_count;
pthread_mutex_t mutex; pthread_mutex_t mutex;
THR_LOCK lock; THR_LOCK lock;
File meta_file; /* Meta file we use */
gzFile archive_write; /* Archive file we are working with */ gzFile archive_write; /* Archive file we are working with */
bool dirty; /* Flag for if a flush should occur */ bool dirty; /* Flag for if a flush should occur */
ulonglong rows_recorded; /* Number of rows in tables */
} ARCHIVE_SHARE; } ARCHIVE_SHARE;
/* /*
...@@ -50,7 +52,8 @@ class ha_archive: public handler ...@@ -50,7 +52,8 @@ class ha_archive: public handler
z_off_t current_position; /* The position of the row we just read */ z_off_t current_position; /* The position of the row we just read */
byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */ byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */
String buffer; /* Buffer used for blob storage */ String buffer; /* Buffer used for blob storage */
unsigned int version; /* Used for recording version */ uint version; /* Used for recording version */
ulonglong scan_rows; /* Number of rows left in scan */
public: public:
ha_archive(TABLE *table): handler(table) ha_archive(TABLE *table): handler(table)
...@@ -104,7 +107,14 @@ public: ...@@ -104,7 +107,14 @@ public:
int rnd_init(bool scan=1); int rnd_init(bool scan=1);
int rnd_next(byte *buf); int rnd_next(byte *buf);
int rnd_pos(byte * buf, byte *pos); int rnd_pos(byte * buf, byte *pos);
int get_row(byte *buf); int get_row(gzFile file_to_read, byte *buf);
int read_meta_file(File meta_file, ulonglong *rows);
int write_meta_file(File meta_file, ulonglong rows, bool dirty);
ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table);
int free_share(ARCHIVE_SHARE *share);
int rebuild_meta_file(char *table_name, File meta_file);
int read_data_header(gzFile file_to_read);
int write_data_header(gzFile file_to_write);
void position(const byte *record); void position(const byte *record);
void info(uint); void info(uint);
int extra(enum ha_extra_function operation); int extra(enum ha_extra_function operation);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment