Commit e8d9458f authored by petr@mysql.com's avatar petr@mysql.com

WL#3244 "CSV engine: convert mmap to read/write calls"

parent 593daae1
...@@ -4944,10 +4944,10 @@ val ...@@ -4944,10 +4944,10 @@ val
UPDATE bug13894 SET val=6 WHERE val=10; UPDATE bug13894 SET val=6 WHERE val=10;
SELECT * FROM bug13894; SELECT * FROM bug13894;
val val
5
11
6 6
6 6
5
11
DROP TABLE bug13894; DROP TABLE bug13894;
DROP TABLE IF EXISTS bug14672; DROP TABLE IF EXISTS bug14672;
CREATE TABLE bug14672 (c1 integer) engine = CSV; CREATE TABLE bug14672 (c1 integer) engine = CSV;
......
...@@ -61,7 +61,7 @@ ...@@ -61,7 +61,7 @@
/* The file extension */ /* The file extension */
#define CSV_EXT ".CSV" // The data file #define CSV_EXT ".CSV" // The data file
#define CSN_EXT ".CSN" // Files used during repair #define CSN_EXT ".CSN" // Files used during repair and update
#define CSM_EXT ".CSM" // Meta file #define CSM_EXT ".CSM" // Meta file
...@@ -114,9 +114,59 @@ handlerton tina_hton= { ...@@ -114,9 +114,59 @@ handlerton tina_hton= {
NULL, /* Fill FILES Table */ NULL, /* Fill FILES Table */
HTON_CAN_RECREATE, HTON_CAN_RECREATE,
NULL, /* binlog_func */ NULL, /* binlog_func */
NULL /* binlog_log_query */ NULL, /* binlog_log_query */
NULL /* release_temporary_latches */
}; };
off_t Transparent_file::read_next()
{
off_t bytes_read;
/*
No need to seek here, as the file managed by Transparent_file class
always points to upper_bound byte
*/
if ((bytes_read= my_read(filedes, buff, buff_size, MYF(0))) == MY_FILE_ERROR)
return -1;
/* end of file */
if (!bytes_read)
return -1;
lower_bound= upper_bound;
upper_bound+= bytes_read;
return lower_bound;
}
char Transparent_file::get_value(off_t offset)
{
off_t bytes_read;
/* check boundaries */
if ((lower_bound <= offset) && (offset < upper_bound))
return buff[offset - lower_bound];
else
{
VOID(my_seek(filedes, offset, MY_SEEK_SET, MYF(0)));
/* read appropriate portion of the file */
if ((bytes_read= my_read(filedes, buff, buff_size,
MYF(0))) == MY_FILE_ERROR)
return 0;
lower_bound= offset;
upper_bound= lower_bound + bytes_read;
/* end of file */
if (upper_bound == offset)
return 0;
return buff[0];
}
}
/***************************************************************************** /*****************************************************************************
** TINA tables ** TINA tables
*****************************************************************************/ *****************************************************************************/
...@@ -130,7 +180,7 @@ int sort_set (tina_set *a, tina_set *b) ...@@ -130,7 +180,7 @@ int sort_set (tina_set *a, tina_set *b)
We assume that intervals do not intersect. So, it is enought to compare We assume that intervals do not intersect. So, it is enought to compare
any two points. Here we take start of intervals for comparison. any two points. Here we take start of intervals for comparison.
*/ */
return ( a->begin > b->begin ? -1 : ( a->begin < b->begin ? 1 : 0 ) ); return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
} }
static byte* tina_get_key(TINA_SHARE *share,uint *length, static byte* tina_get_key(TINA_SHARE *share,uint *length,
...@@ -140,48 +190,6 @@ static byte* tina_get_key(TINA_SHARE *share,uint *length, ...@@ -140,48 +190,6 @@ static byte* tina_get_key(TINA_SHARE *share,uint *length,
return (byte*) share->table_name; return (byte*) share->table_name;
} }
/*
Reloads the mmap file.
*/
int get_mmap(TINA_SHARE *share, int write)
{
DBUG_ENTER("ha_tina::get_mmap");
if (share->mapped_file && my_munmap(share->mapped_file,
share->file_stat.st_size))
DBUG_RETURN(1);
if (my_fstat(share->data_file, &share->file_stat, MYF(MY_WME)) == -1)
DBUG_RETURN(1);
if (share->file_stat.st_size)
{
if (write)
share->mapped_file= (byte *)my_mmap(NULL, share->file_stat.st_size,
PROT_READ|PROT_WRITE, MAP_SHARED,
share->data_file, 0);
else
share->mapped_file= (byte *)my_mmap(NULL, share->file_stat.st_size,
PROT_READ, MAP_PRIVATE,
share->data_file, 0);
if ((share->mapped_file == MAP_FAILED))
{
/*
Bad idea you think? See the problem is that nothing actually checks
the return value of ::rnd_init(), so tossing an error is about
it for us.
Never going to happen right? :)
*/
my_message(errno, "Woops, blew up opening a mapped file", 0);
DBUG_ASSERT(0);
DBUG_RETURN(1);
}
}
else
share->mapped_file= NULL;
DBUG_RETURN(0);
}
static int tina_init_func() static int tina_init_func()
{ {
...@@ -218,6 +226,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -218,6 +226,7 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
{ {
TINA_SHARE *share; TINA_SHARE *share;
char meta_file_name[FN_REFLEN]; char meta_file_name[FN_REFLEN];
MY_STAT file_stat; /* Stat information for the data file */
char *tmp_name; char *tmp_name;
uint length; uint length;
...@@ -250,6 +259,8 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -250,6 +259,8 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
share->table_name= tmp_name; share->table_name= tmp_name;
share->crashed= FALSE; share->crashed= FALSE;
share->rows_recorded= 0; share->rows_recorded= 0;
share->update_file_opened= FALSE;
share->tina_write_opened= FALSE;
strmov(share->table_name, table_name); strmov(share->table_name, table_name);
fn_format(share->data_file_name, table_name, "", CSV_EXT, fn_format(share->data_file_name, table_name, "", CSV_EXT,
MY_REPLACE_EXT|MY_UNPACK_FILENAME); MY_REPLACE_EXT|MY_UNPACK_FILENAME);
...@@ -277,27 +288,16 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -277,27 +288,16 @@ static TINA_SHARE *get_share(const char *table_name, TABLE *table)
*/ */
if (read_meta_file(share->meta_file, &share->rows_recorded)) if (read_meta_file(share->meta_file, &share->rows_recorded))
share->crashed= TRUE; share->crashed= TRUE;
else
(void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
if ((share->data_file= my_open(share->data_file_name, O_RDWR|O_APPEND, if (my_stat(share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
MYF(0))) == -1)
goto error2; goto error2;
share->saved_data_file_length= file_stat.st_size;
share->mapped_file= NULL; // We don't know the state as we just allocated it
if (get_mmap(share, 0) > 0)
goto error3;
/* init file length value used by readers */
share->saved_data_file_length= share->file_stat.st_size;
} }
share->use_count++; share->use_count++;
pthread_mutex_unlock(&tina_mutex); pthread_mutex_unlock(&tina_mutex);
return share; return share;
error3:
my_close(share->data_file,MYF(0));
error2: error2:
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex); pthread_mutex_destroy(&share->mutex);
...@@ -425,6 +425,30 @@ bool ha_tina::check_and_repair(THD *thd) ...@@ -425,6 +425,30 @@ bool ha_tina::check_and_repair(THD *thd)
} }
int ha_tina::init_tina_writer()
{
DBUG_ENTER("ha_tina::init_tina_writer");
/*
Mark the file as crashed. We will set the flag back when we close
the file. In the case of the crash it will remain marked crashed,
which enforce recovery.
*/
(void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
if ((share->tina_write_filedes=
my_open(share->data_file_name, O_RDWR|O_APPEND, MYF(0))) == -1)
{
DBUG_PRINT("info", ("Could not open tina file writes"));
share->crashed= TRUE;
DBUG_RETURN(1);
}
share->tina_write_opened= TRUE;
DBUG_RETURN(0);
}
bool ha_tina::is_crashed() const bool ha_tina::is_crashed() const
{ {
DBUG_ENTER("ha_tina::is_crashed"); DBUG_ENTER("ha_tina::is_crashed");
...@@ -445,10 +469,13 @@ static int free_share(TINA_SHARE *share) ...@@ -445,10 +469,13 @@ static int free_share(TINA_SHARE *share)
share->crashed ? TRUE :FALSE); share->crashed ? TRUE :FALSE);
if (my_close(share->meta_file, MYF(0))) if (my_close(share->meta_file, MYF(0)))
result_code= 1; result_code= 1;
if (share->mapped_file) if (share->tina_write_opened)
my_munmap(share->mapped_file, share->file_stat.st_size); {
share->mapped_file= NULL; if (my_close(share->tina_write_filedes, MYF(0)))
result_code= my_close(share->data_file,MYF(0)); result_code= 1;
share->tina_write_opened= FALSE;
}
hash_delete(&tina_open_tables, (byte*) share); hash_delete(&tina_open_tables, (byte*) share);
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex); pthread_mutex_destroy(&share->mutex);
...@@ -468,16 +495,16 @@ int tina_end(ha_panic_function type) ...@@ -468,16 +495,16 @@ int tina_end(ha_panic_function type)
Finds the end of a line. Finds the end of a line.
Currently only supports files written on a UNIX OS. Currently only supports files written on a UNIX OS.
*/ */
byte * find_eoln(byte *data, off_t begin, off_t end)
off_t find_eoln_buff(Transparent_file *data_buff, off_t begin, off_t end)
{ {
for (off_t x= begin; x < end; x++) for (off_t x= begin; x < end; x++)
if (data[x] == '\n') if (data_buff->get_value(x) == '\n')
return data + x; return x;
return 0; return 0;
} }
static handler *tina_create_handler(TABLE_SHARE *table) static handler *tina_create_handler(TABLE_SHARE *table)
{ {
return new ha_tina(table); return new ha_tina(table);
...@@ -491,7 +518,7 @@ ha_tina::ha_tina(TABLE_SHARE *table_arg) ...@@ -491,7 +518,7 @@ ha_tina::ha_tina(TABLE_SHARE *table_arg)
They are not probably completely right. They are not probably completely right.
*/ */
current_position(0), next_position(0), local_saved_data_file_length(0), current_position(0), next_position(0), local_saved_data_file_length(0),
chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH), file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
records_is_known(0) records_is_known(0)
{ {
/* Set our original buffers from pre-allocated memory */ /* Set our original buffers from pre-allocated memory */
...@@ -619,50 +646,50 @@ int ha_tina::chain_append() ...@@ -619,50 +646,50 @@ int ha_tina::chain_append()
*/ */
int ha_tina::find_current_row(byte *buf) int ha_tina::find_current_row(byte *buf)
{ {
byte *mapped_ptr; off_t end_offset, curr_offset= current_position;
byte *end_ptr;
DBUG_ENTER("ha_tina::find_current_row"); DBUG_ENTER("ha_tina::find_current_row");
mapped_ptr= (byte *)share->mapped_file + current_position;
/* /*
We do not read further then local_saved_data_file_length in order We do not read further then local_saved_data_file_length in order
not to conflict with undergoing concurrent insert. not to conflict with undergoing concurrent insert.
*/ */
if ((end_ptr= find_eoln(share->mapped_file, current_position, if ((end_offset= find_eoln_buff(file_buff, current_position,
local_saved_data_file_length)) == 0) local_saved_data_file_length)) == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
for (Field **field=table->field ; *field ; field++) for (Field **field=table->field ; *field ; field++)
{ {
buffer.length(0); buffer.length(0);
if (*mapped_ptr == '"') if (file_buff->get_value(curr_offset) == '"')
mapped_ptr++; // Increment past the first quote curr_offset++; // Incrementpast the first quote
else else
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
for(;mapped_ptr != end_ptr; mapped_ptr++) for(;curr_offset != end_offset; curr_offset++)
{ {
// Need to convert line feeds! // Need to convert line feeds!
if (*mapped_ptr == '"' && if (file_buff->get_value(curr_offset) == '"' &&
(((mapped_ptr[1] == ',') && (mapped_ptr[2] == '"')) || (((file_buff->get_value(curr_offset + 1) == ',') &&
(mapped_ptr == end_ptr -1 ))) (file_buff->get_value(curr_offset + 2) == '"')) ||
(curr_offset == end_offset -1 )))
{ {
mapped_ptr += 2; // Move past the , and the " curr_offset+= 2; // Move past the , and the "
break; break;
} }
if (*mapped_ptr == '\\' && mapped_ptr != (end_ptr - 1)) if (file_buff->get_value(curr_offset) == '\\' &&
curr_offset != (end_offset - 1))
{ {
mapped_ptr++; curr_offset++;
if (*mapped_ptr == 'r') if (file_buff->get_value(curr_offset) == 'r')
buffer.append('\r'); buffer.append('\r');
else if (*mapped_ptr == 'n' ) else if (file_buff->get_value(curr_offset) == 'n' )
buffer.append('\n'); buffer.append('\n');
else if ((*mapped_ptr == '\\') || (*mapped_ptr == '"')) else if ((file_buff->get_value(curr_offset) == '\\') ||
buffer.append(*mapped_ptr); (file_buff->get_value(curr_offset) == '"'))
buffer.append(file_buff->get_value(curr_offset));
else /* This could only happed with an externally created file */ else /* This could only happed with an externally created file */
{ {
buffer.append('\\'); buffer.append('\\');
buffer.append(*mapped_ptr); buffer.append(file_buff->get_value(curr_offset));
} }
} }
else // ordinary symbol else // ordinary symbol
...@@ -671,14 +698,14 @@ int ha_tina::find_current_row(byte *buf) ...@@ -671,14 +698,14 @@ int ha_tina::find_current_row(byte *buf)
We are at final symbol and no last quote was found => We are at final symbol and no last quote was found =>
we are working with a damaged file. we are working with a damaged file.
*/ */
if (mapped_ptr == end_ptr -1) if (curr_offset == end_offset - 1)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
buffer.append(*mapped_ptr); buffer.append(file_buff->get_value(curr_offset));
} }
} }
(*field)->store(buffer.ptr(), buffer.length(), system_charset_info); (*field)->store(buffer.ptr(), buffer.length(), system_charset_info);
} }
next_position= (end_ptr - share->mapped_file)+1; next_position= end_offset + 1;
/* Maybe use \N for null? */ /* Maybe use \N for null? */
memset(buf, 0, table->s->null_bytes); /* We do not implement nulls! */ memset(buf, 0, table->s->null_bytes); /* We do not implement nulls! */
...@@ -776,7 +803,7 @@ void ha_tina::get_status() ...@@ -776,7 +803,7 @@ void ha_tina::get_status()
void ha_tina::update_status() void ha_tina::update_status()
{ {
/* correct local_saved_data_file_length for writers */ /* correct local_saved_data_file_length for writers */
share->saved_data_file_length= share->file_stat.st_size; share->saved_data_file_length= local_saved_data_file_length;
} }
...@@ -827,6 +854,11 @@ int ha_tina::open(const char *name, int mode, uint open_options) ...@@ -827,6 +854,11 @@ int ha_tina::open(const char *name, int mode, uint open_options)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
} }
if ((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
DBUG_RETURN(0);
file_buff= new Transparent_file(data_file);
/* /*
Init locking. Pass handler object to the locking routines, Init locking. Pass handler object to the locking routines,
so that they could save/update local_saved_data_file_length value so that they could save/update local_saved_data_file_length value
...@@ -845,12 +877,14 @@ int ha_tina::open(const char *name, int mode, uint open_options) ...@@ -845,12 +877,14 @@ int ha_tina::open(const char *name, int mode, uint open_options)
/* /*
Close a database file. We remove ourselves from the shared strucutre. Close a database file. We remove ourselves from the shared strucutre.
If it is empty we destroy it and free the mapped file. If it is empty we destroy it.
*/ */
int ha_tina::close(void) int ha_tina::close(void)
{ {
int rc= 0;
DBUG_ENTER("ha_tina::close"); DBUG_ENTER("ha_tina::close");
DBUG_RETURN(free_share(share)); rc= my_close(data_file, MYF(0));
DBUG_RETURN(free_share(share) || rc);
} }
/* /*
...@@ -873,22 +907,17 @@ int ha_tina::write_row(byte * buf) ...@@ -873,22 +907,17 @@ int ha_tina::write_row(byte * buf)
size= encode_quote(buf); size= encode_quote(buf);
if (my_write(share->data_file, (byte*)buffer.ptr(), size, if (!share->tina_write_opened)
MYF(MY_WME | MY_NABP))) if (init_tina_writer())
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* /* use pwrite, as concurrent reader could have changed the position */
Ok, this is means that we will be doing potentially bad things if (my_write(share->tina_write_filedes, buffer.ptr(), size,
during a bulk insert on some OS'es. What we need is a cleanup MYF(MY_WME | MY_NABP)))
call for ::write_row that would let us fix up everything after the bulk
insert. The archive handler does this with an extra mutx call, which
might be a solution for this.
*/
if (get_mmap(share, 0) > 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* update local copy of the max position to see our own changes */ /* update local copy of the max position to see our own changes */
local_saved_data_file_length= share->file_stat.st_size; local_saved_data_file_length+= size;
/* update shared info */ /* update shared info */
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
...@@ -903,6 +932,23 @@ int ha_tina::write_row(byte * buf) ...@@ -903,6 +932,23 @@ int ha_tina::write_row(byte * buf)
} }
int ha_tina::open_update_temp_file_if_needed()
{
char updated_fname[FN_REFLEN];
if (!share->update_file_opened)
{
if ((update_temp_file=
my_create(fn_format(updated_fname, share->table_name,
"", CSN_EXT,
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
return 1;
share->update_file_opened= TRUE;
}
return 0;
}
/* /*
This is called for an update. This is called for an update.
Make sure you put in code to increment the auto increment, also Make sure you put in code to increment the auto increment, also
...@@ -926,16 +972,16 @@ int ha_tina::update_row(const byte * old_data, byte * new_data) ...@@ -926,16 +972,16 @@ int ha_tina::update_row(const byte * old_data, byte * new_data)
if (chain_append()) if (chain_append())
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (my_write(share->data_file, (byte*)buffer.ptr(), size, if (open_update_temp_file_if_needed())
DBUG_RETURN(-1);
if (my_write(update_temp_file, (byte*)buffer.ptr(), size,
MYF(MY_WME | MY_NABP))) MYF(MY_WME | MY_NABP)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* UPDATE should never happen on the log tables */ /* UPDATE should never happen on the log tables */
DBUG_ASSERT(!share->is_log_table); DBUG_ASSERT(!share->is_log_table);
/* update local copy of the max position to see our own changes */
local_saved_data_file_length= share->file_stat.st_size;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1001,6 +1047,8 @@ int ha_tina::rnd_init(bool scan) ...@@ -1001,6 +1047,8 @@ int ha_tina::rnd_init(bool scan)
{ {
DBUG_ENTER("ha_tina::rnd_init"); DBUG_ENTER("ha_tina::rnd_init");
/* set buffer to the beginning of the file */
file_buff->init_buff(data_file);
if (share->crashed) if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
...@@ -1008,11 +1056,6 @@ int ha_tina::rnd_init(bool scan) ...@@ -1008,11 +1056,6 @@ int ha_tina::rnd_init(bool scan)
records= 0; records= 0;
records_is_known= 0; records_is_known= 0;
chain_ptr= chain; chain_ptr= chain;
#ifdef HAVE_MADVISE
if (scan)
(void) madvise(share->mapped_file, share->file_stat.st_size,
MADV_SEQUENTIAL);
#endif
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1042,8 +1085,11 @@ int ha_tina::rnd_next(byte *buf) ...@@ -1042,8 +1085,11 @@ int ha_tina::rnd_next(byte *buf)
ha_statistic_increment(&SSV::ha_read_rnd_next_count); ha_statistic_increment(&SSV::ha_read_rnd_next_count);
current_position= next_position; current_position= next_position;
if (!share->mapped_file)
/* don't scan an empty file */
if (!local_saved_data_file_length)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
if ((rc= find_current_row(buf))) if ((rc= find_current_row(buf)))
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -1112,6 +1158,22 @@ int ha_tina::extra(enum ha_extra_function operation) ...@@ -1112,6 +1158,22 @@ int ha_tina::extra(enum ha_extra_function operation)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
Set end_pos to the last valid byte of continuous area, closest
to the given "hole", stored in the buffer. "Valid" here means,
not listed in the chain of deleted records ("holes").
*/
bool ha_tina::get_write_pos(off_t *end_pos, tina_set *closest_hole)
{
if (closest_hole == chain_ptr) /* no more chains */
*end_pos= file_buff->end();
else
*end_pos= min(file_buff->end(),
closest_hole->begin);
return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
}
/* /*
Called after each table scan. In particular after deletes, Called after each table scan. In particular after deletes,
and updates. In the last case we employ chain of deleted and updates. In the last case we employ chain of deleted
...@@ -1120,53 +1182,99 @@ int ha_tina::extra(enum ha_extra_function operation) ...@@ -1120,53 +1182,99 @@ int ha_tina::extra(enum ha_extra_function operation)
*/ */
int ha_tina::rnd_end() int ha_tina::rnd_end()
{ {
char updated_fname[FN_REFLEN];
off_t file_buffer_start= 0;
DBUG_ENTER("ha_tina::rnd_end"); DBUG_ENTER("ha_tina::rnd_end");
records_is_known= 1; records_is_known= 1;
/* First position will be truncate position, second will be increment */
if ((chain_ptr - chain) > 0) if ((chain_ptr - chain) > 0)
{ {
tina_set *ptr; tina_set *ptr= chain;
size_t length;
/* /*
Setting up writable map, this will contain all of the data after the Re-read the beginning of a file (as the buffer should point to the
get_mmap call that we have added to the file. end of file after the scan).
*/ */
if (get_mmap(share, 1) > 0) file_buff->init_buff(data_file);
DBUG_RETURN(-1);
length= share->file_stat.st_size;
/* /*
The sort handles updates/deletes with random orders. The sort is needed when there were updates/deletes with random orders.
It also sorts so that we move the final blocks to the It sorts so that we move the firts blocks to the beginning.
beginning so that we move the smallest amount of data possible.
*/ */
qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set), qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
(qsort_cmp)sort_set); (qsort_cmp)sort_set);
for (ptr= chain; ptr < chain_ptr; ptr++)
off_t write_begin= 0, write_end;
/* create the file to write updated table if it wasn't yet created */
if (open_update_temp_file_if_needed())
DBUG_RETURN(-1);
/* write the file with updated info */
while ((file_buffer_start != -1)) // while not end of file
{ {
memmove(share->mapped_file + ptr->begin, share->mapped_file + ptr->end, bool in_hole= get_write_pos(&write_end, ptr);
length - (size_t)ptr->end);
length= length - (size_t)(ptr->end - ptr->begin); /* if there is something to write, write it */
if ((write_end - write_begin) &&
(my_write(update_temp_file,
file_buff->ptr() + (write_begin - file_buff->start()),
write_end - write_begin, MYF_RW)))
goto error;
if (in_hole)
{
/* skip hole */
while (file_buff->end() <= ptr->end && file_buffer_start != -1)
file_buffer_start= file_buff->read_next();
write_begin= ptr->end;
ptr++;
} }
else
write_begin= write_end;
if (write_end == file_buff->end())
file_buffer_start= file_buff->read_next(); /* shift the buffer */
}
if (my_close(update_temp_file, MYF(0)))
DBUG_RETURN(-1);
share->update_file_opened= FALSE;
/* Unmap the file before the new size is set */ if (share->tina_write_opened)
if (my_munmap(share->mapped_file, share->file_stat.st_size)) {
if (my_close(share->tina_write_filedes, MYF(0)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* We set it to null so that get_mmap() won't try to unmap it */ /*
share->mapped_file= NULL; Mark that the writer fd is closed, so that init_tina_writer()
will reopen it later.
*/
share->tina_write_opened= FALSE;
}
/* Set the file to the new size */ /*
if (my_chsize(share->data_file, length, 0, MYF(MY_WME))) Close opened fildes's. Then move updated file in place
of the old datafile.
*/
if (my_close(data_file, MYF(0)) ||
my_rename(fn_format(updated_fname, share->table_name, "", CSN_EXT,
MY_REPLACE_EXT | MY_UNPACK_FILENAME),
share->data_file_name, MYF(0)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (get_mmap(share, 0) > 0) /* Open the file again and sync it */
if (((data_file= my_open(share->data_file_name, O_RDONLY, MYF(0))) == -1)
|| my_sync(data_file, MYF(MY_WME)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
DBUG_RETURN(0); DBUG_RETURN(0);
error:
my_close(update_temp_file, MYF(0));
share->update_file_opened= FALSE;
DBUG_RETURN(-1);
} }
...@@ -1195,10 +1303,11 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1195,10 +1303,11 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
File repair_file; File repair_file;
int rc; int rc;
ha_rows rows_repaired= 0; ha_rows rows_repaired= 0;
off_t write_begin= 0, write_end;
DBUG_ENTER("ha_tina::repair"); DBUG_ENTER("ha_tina::repair");
/* empty file */ /* empty file */
if (!share->mapped_file) if (!share->saved_data_file_length)
{ {
share->rows_recorded= 0; share->rows_recorded= 0;
goto end; goto end;
...@@ -1207,12 +1316,15 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1207,12 +1316,15 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* position buffer to the start of the file */
file_buff->init_buff(data_file);
/* /*
Local_saved_data_file_length is initialized during the lock phase. Local_saved_data_file_length is initialized during the lock phase.
Sometimes this is not getting executed before ::repair (e.g. for Sometimes this is not getting executed before ::repair (e.g. for
the log tables). We set it manually here. the log tables). We set it manually here.
*/ */
local_saved_data_file_length= share->file_stat.st_size; local_saved_data_file_length= share->saved_data_file_length;
/* set current position to the beginning of the file */ /* set current position to the beginning of the file */
current_position= next_position= 0; current_position= next_position= 0;
...@@ -1227,11 +1339,10 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1227,11 +1339,10 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
if (rc == HA_ERR_END_OF_FILE) if (rc == HA_ERR_END_OF_FILE)
{ {
/* All rows were read ok until end of file, the file does not need repair. */
/* /*
If rows_recorded != rows_repaired, we should update All rows were read ok until end of file, the file does not need repair.
rows_recorded value to the current amount of rows. If rows_recorded != rows_repaired, we should update rows_recorded value
to the current amount of rows.
*/ */
share->rows_recorded= rows_repaired; share->rows_recorded= rows_repaired;
goto end; goto end;
...@@ -1247,36 +1358,45 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1247,36 +1358,45 @@ int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0) 0, O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR); DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
if (my_write(repair_file, (byte*)share->mapped_file, current_position, file_buff->init_buff(data_file);
MYF(MY_NABP)))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
my_close(repair_file, MYF(0));
/* we just truncated the file up to the first bad row. update rows count. */ /* we just truncated the file up to the first bad row. update rows count. */
share->rows_recorded= rows_repaired; share->rows_recorded= rows_repaired;
if (my_munmap(share->mapped_file, share->file_stat.st_size)) /* write repaired file */
while (1)
{
write_end= min(file_buff->end(), current_position);
if ((write_end - write_begin) &&
(my_write(repair_file, file_buff->ptr(),
write_end - write_begin, MYF_RW)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* We set it to null so that get_mmap() won't try to unmap it */
share->mapped_file= NULL; write_begin= write_end;
if (write_end== current_position)
break;
else
file_buff->read_next(); /* shift the buffer */
}
/* /*
Close the "to"-file before renaming Close the files and rename repaired file to the datafile.
On Windows one cannot rename a file, which descriptor We have to close the files, as on Windows one cannot rename
is still open. EACCES will be returned when trying to delete a file, which descriptor is still open. EACCES will be returned
the "to"-file in my_rename() when trying to delete the "to"-file in my_rename().
*/ */
my_close(share->data_file,MYF(0)); if (my_close(data_file,MYF(0)) || my_close(repair_file, MYF(0)) ||
my_rename(repaired_fname, share->data_file_name, MYF(0)))
if (my_rename(repaired_fname, share->data_file_name, MYF(0)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
/* Open the file again, it should now be repaired */ /* Open the file again, it should now be repaired */
if ((share->data_file= my_open(share->data_file_name, O_RDWR|O_APPEND, if ((data_file= my_open(share->data_file_name, O_RDWR|O_APPEND,
MYF(0))) == -1) MYF(0))) == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (get_mmap(share, 0) > 0) /* Set new file size. The file size will be updated by ::update_status() */
DBUG_RETURN(-1); local_saved_data_file_length= (size_t) current_position;
end: end:
share->crashed= FALSE; share->crashed= FALSE;
...@@ -1295,17 +1415,12 @@ int ha_tina::delete_all_rows() ...@@ -1295,17 +1415,12 @@ int ha_tina::delete_all_rows()
if (!records_is_known) if (!records_is_known)
DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND); DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND);
/* Unmap the file before the new size is set */ if (!share->tina_write_opened)
if (share->mapped_file && my_munmap(share->mapped_file, if (init_tina_writer())
share->file_stat.st_size))
DBUG_RETURN(-1); DBUG_RETURN(-1);
share->mapped_file= NULL;
/* Truncate the file to zero size */ /* Truncate the file to zero size */
rc= my_chsize(share->data_file, 0, 0, MYF(MY_WME)); rc= my_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
if (get_mmap(share, 0) > 0)
DBUG_RETURN(-1);
records=0; records=0;
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -1367,12 +1482,15 @@ int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1367,12 +1482,15 @@ int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME)))) if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* position buffer to the start of the file */
file_buff->init_buff(data_file);
/* /*
Local_saved_data_file_length is initialized during the lock phase. Local_saved_data_file_length is initialized during the lock phase.
Check does not use store_lock in certain cases. So, we set it Check does not use store_lock in certain cases. So, we set it
manually here. manually here.
*/ */
local_saved_data_file_length= share->file_stat.st_size; local_saved_data_file_length= share->saved_data_file_length;
/* set current position to the beginning of the file */ /* set current position to the beginning of the file */
current_position= next_position= 0; current_position= next_position= 0;
/* Read the file row-by-row. If everything is ok, repair is not needed. */ /* Read the file row-by-row. If everything is ok, repair is not needed. */
...@@ -1412,6 +1530,7 @@ mysql_declare_plugin(csv) ...@@ -1412,6 +1530,7 @@ mysql_declare_plugin(csv)
tina_init_func, /* Plugin Init */ tina_init_func, /* Plugin Init */
tina_done_func, /* Plugin Deinit */ tina_done_func, /* Plugin Deinit */
0x0100 /* 1.0 */, 0x0100 /* 1.0 */,
0
} }
mysql_declare_plugin_end; mysql_declare_plugin_end;
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <my_dir.h> #include <my_dir.h>
#define DEFAULT_CHAIN_LENGTH 512 #define DEFAULT_CHAIN_LENGTH 512
#define DEFAULT_FILE_WINDOW_SIZE 4096
/* /*
Version for file format. Version for file format.
1 - Initial Version. That is, the version when the metafile was introduced. 1 - Initial Version. That is, the version when the metafile was introduced.
...@@ -29,15 +30,12 @@ ...@@ -29,15 +30,12 @@
typedef struct st_tina_share { typedef struct st_tina_share {
char *table_name; char *table_name;
char data_file_name[FN_REFLEN]; char data_file_name[FN_REFLEN];
byte *mapped_file; /* mapped region of file */
uint table_name_length, use_count; uint table_name_length, use_count;
/* /*
Below flag is needed to make log tables work with concurrent insert. Below flag is needed to make log tables work with concurrent insert.
For more details see comment to ha_tina::update_status. For more details see comment to ha_tina::update_status.
*/ */
my_bool is_log_table; my_bool is_log_table;
MY_STAT file_stat; /* Stat information for the data file */
File data_file; /* Current open data file */
/* /*
Here we save the length of the file for readers. This is updated by Here we save the length of the file for readers. This is updated by
inserts, updates and deletes. The var is initialized along with the inserts, updates and deletes. The var is initialized along with the
...@@ -46,7 +44,10 @@ typedef struct st_tina_share { ...@@ -46,7 +44,10 @@ typedef struct st_tina_share {
off_t saved_data_file_length; off_t saved_data_file_length;
pthread_mutex_t mutex; pthread_mutex_t mutex;
THR_LOCK lock; THR_LOCK lock;
bool update_file_opened;
bool tina_write_opened;
File meta_file; /* Meta file we use */ File meta_file; /* Meta file we use */
File tina_write_filedes; /* File handler for readers */
bool crashed; /* Meta file is crashed */ bool crashed; /* Meta file is crashed */
ha_rows rows_recorded; /* Number of rows in tables */ ha_rows rows_recorded; /* Number of rows in tables */
} TINA_SHARE; } TINA_SHARE;
...@@ -56,6 +57,54 @@ struct tina_set { ...@@ -56,6 +57,54 @@ struct tina_set {
off_t end; off_t end;
}; };
class Transparent_file
{
File filedes;
byte *buff; /* in-memory window to the file or mmaped area */
/* current window sizes */
off_t lower_bound;
off_t upper_bound;
uint buff_size;
public:
Transparent_file(File filedes_arg) : lower_bound(0),
buff_size(DEFAULT_FILE_WINDOW_SIZE)
{
buff= (byte *) my_malloc(buff_size*sizeof(byte), MYF(MY_WME));
/* read the beginning of the file */
init_buff(filedes_arg);
}
~Transparent_file()
{ my_free(buff, MYF(MY_ALLOW_ZERO_PTR)); }
void init_buff(File filedes_arg)
{
filedes= filedes_arg;
/* read the beginning of the file */
lower_bound= 0;
VOID(my_seek(filedes, 0, MY_SEEK_SET, MYF(0)));
if (filedes && buff)
upper_bound= my_read(filedes, buff, buff_size, MYF(0));
}
byte *ptr()
{ return buff; }
off_t start()
{ return lower_bound; }
off_t end()
{ return upper_bound; }
/* get a char from the given position in the file */
char get_value (off_t offset);
/* shift a buffer windows to see the next part of the file */
off_t read_next();
};
class ha_tina: public handler class ha_tina: public handler
{ {
THR_LOCK_DATA lock; /* MySQL lock */ THR_LOCK_DATA lock; /* MySQL lock */
...@@ -64,6 +113,9 @@ class ha_tina: public handler ...@@ -64,6 +113,9 @@ class ha_tina: public handler
off_t next_position; /* Next position in the file scan */ off_t next_position; /* Next position in the file scan */
off_t local_saved_data_file_length; /* save position for reads */ off_t local_saved_data_file_length; /* save position for reads */
byte byte_buffer[IO_SIZE]; byte byte_buffer[IO_SIZE];
Transparent_file *file_buff;
File data_file; /* File handler for readers */
File update_temp_file;
String buffer; String buffer;
/* /*
The chain contains "holes" in the file, occured because of The chain contains "holes" in the file, occured because of
...@@ -77,12 +129,19 @@ class ha_tina: public handler ...@@ -77,12 +129,19 @@ class ha_tina: public handler
uint32 chain_size; uint32 chain_size;
bool records_is_known; bool records_is_known;
private:
bool get_write_pos(off_t *end_pos, tina_set *closest_hole);
int open_update_temp_file_if_needed();
int init_tina_writer();
public: public:
ha_tina(TABLE_SHARE *table_arg); ha_tina(TABLE_SHARE *table_arg);
~ha_tina() ~ha_tina()
{ {
if (chain_alloced) if (chain_alloced)
my_free((gptr)chain,0); my_free((gptr)chain, 0);
if (file_buff)
delete file_buff;
} }
const char *table_type() const { return "CSV"; } const char *table_type() const { return "CSV"; }
const char *index_type(uint inx) { return "NONE"; } const char *index_type(uint inx) { return "NONE"; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment