Commit 3066c377 authored by Sergey Petrunya's avatar Sergey Petrunya

DS-MRR improvements: review feedback

- Switch from one bi-directional buffer class to two 
  virtual inheritance-based forward and backward buffer classes.
parent 51f90976
......@@ -865,12 +865,12 @@ LEFT JOIN
(t1,t2)
ON t3.a=1 AND t3.b=t2.b AND t2.b=t4.b;
a b a b a b
4 2 1 2 4 2
4 2 1 2 3 2
4 2 1 2 4 2
4 2 1 2 3 2
4 2 1 2 4 2
4 2 1 2 3 2
4 2 1 2 4 2
NULL NULL 2 2 3 2
NULL NULL 2 2 4 2
EXPLAIN EXTENDED
......@@ -1105,8 +1105,8 @@ t0.b=t1.b AND
(t8.b=t9.b OR t8.c IS NULL) AND
(t9.a=1);
a b a b a b a b a b a b a b a b a b a b
1 2 3 2 4 2 1 2 4 2 2 2 6 2 2 2 0 2 1 2
1 2 3 2 4 2 1 2 3 2 2 2 6 2 2 2 0 2 1 2
1 2 3 2 4 2 1 2 4 2 2 2 6 2 2 2 0 2 1 2
1 2 3 2 4 2 1 2 3 2 3 1 6 2 1 1 NULL NULL 1 1
1 2 3 2 4 2 1 2 4 2 3 1 6 2 1 1 NULL NULL 1 1
1 2 3 2 4 2 1 2 3 2 3 1 6 2 1 1 NULL NULL 1 2
......@@ -1785,8 +1785,8 @@ ON t7.b=t8.b AND t6.b < 10
ON t6.b >= 2 AND t5.b=t7.b AND
(t8.a > 0 OR t8.c IS NULL);
a b a b a b a b
2 2 3 2 2 2 1 2
2 2 1 2 2 2 1 2
2 2 3 2 2 2 1 2
1 1 1 2 1 1 NULL NULL
1 1 3 2 1 1 NULL NULL
3 3 NULL NULL NULL NULL NULL NULL
......
......@@ -352,14 +352,14 @@ Thimble Smith Happy 3 3
Lilliana Angelovska NULL NULL NULL
select t1.name, t2.name, t2.id,t3.id from t1 right join t2 on (t1.id = t2.owner) right join t1 as t3 on t3.id=t2.owner;
name name id id
Antonio Paz Perrito 2 1
Antonio Paz El Gato 1 1
Antonio Paz Perrito 2 1
Thimble Smith Happy 3 3
NULL NULL NULL 2
select t1.name, t2.name, t2.id, t2.owner, t3.id from t1 left join t2 on (t1.id = t2.owner) right join t1 as t3 on t3.id=t2.owner;
name name id owner id
Antonio Paz Perrito 2 1 1
Antonio Paz El Gato 1 1 1
Antonio Paz Perrito 2 1 1
Thimble Smith Happy 3 3 3
NULL NULL NULL NULL 2
drop table t1,t2;
......@@ -413,9 +413,9 @@ insert into t2 values (1, 2, 3),(2, 2, 8), (4,3,9),(3,2,10);
select t1.*, t2.* from t1 left join t2 on t1.n = t2.n and
t1.m = t2.m where t1.n = 1;
n m o n m o
1 2 9 1 2 3
1 2 7 1 2 3
1 2 11 1 2 3
1 2 7 1 2 3
1 2 9 1 2 3
1 3 9 NULL NULL NULL
select t1.*, t2.* from t1 left join t2 on t1.n = t2.n and
t1.m = t2.m where t1.n = 1 order by t1.o;
......
......@@ -284,131 +284,6 @@ int handler::multi_range_read_next(char **range_info)
}
/****************************************************************************
* SimpleBuffer class implementation (used by DS-MRR code)
***************************************************************************/
void SimpleBuffer::setup_writing(uchar **data1, size_t len1,
uchar **data2, size_t len2)
{
write_ptr1= data1;
size1= len1;
write_ptr2= data2;
size2= len2;
}
void SimpleBuffer::write()
{
if (is_reverse() && write_ptr2)
write(*write_ptr2, size2);
write(*write_ptr1, size1);
if (!is_reverse() && write_ptr2)
write(*write_ptr2, size2);
}
void SimpleBuffer::write(const uchar *data, size_t bytes)
{
DBUG_ASSERT(have_space_for(bytes));
if (direction == -1)
write_pos -= bytes;
memcpy(write_pos, data, bytes);
if (direction == 1)
write_pos += bytes;
}
bool SimpleBuffer::can_write()
{
return have_space_for(size1 + (write_ptr2 ? size2 : 0));
}
bool SimpleBuffer::have_space_for(size_t bytes)
{
if (direction == 1)
return (write_pos + bytes < end);
else
return (write_pos - bytes >= start);
}
size_t SimpleBuffer::used_size()
{
return (direction == 1)? write_pos - read_pos : read_pos - write_pos;
}
void SimpleBuffer::setup_reading(uchar **data1, size_t len1,
uchar **data2, size_t len2)
{
read_ptr1= data1;
DBUG_ASSERT(len1 == size1);
read_ptr2= data2;
DBUG_ASSERT(len2 == size2);
}
bool SimpleBuffer::read()
{
if (!have_data(size1 + (read_ptr2 ? size2 : 0)))
return TRUE;
*read_ptr1= read(size1);
if (read_ptr2)
*read_ptr2= read(size2);
return FALSE;
}
uchar *SimpleBuffer::read(size_t bytes)
{
DBUG_ASSERT(have_data(bytes));
uchar *res;
if (direction == 1)
{
res= read_pos;
read_pos += bytes;
return res;
}
else
{
read_pos= read_pos - bytes;
return read_pos;
}
}
bool SimpleBuffer::have_data(size_t bytes)
{
return (direction == 1)? (write_pos - read_pos >= (ptrdiff_t)bytes) :
(read_pos - write_pos >= (ptrdiff_t)bytes);
}
void SimpleBuffer::reset_for_writing()
{
if (direction == 1)
write_pos= read_pos= start;
else
write_pos= read_pos= end;
}
uchar *SimpleBuffer::end_of_space()
{
if (direction == 1)
return start;
else
return end;
}
/****************************************************************************
* DS-MRR implementation
***************************************************************************/
......@@ -492,7 +367,7 @@ int DsMrr_impl::dsmrr_init(handler *h_arg, RANGE_SEQ_IF *seq_funcs,
*/
full_buf= buf->buffer;
full_buf_end= buf->buffer_end;
rowid_buffer.set_buffer_space(full_buf, full_buf_end, SimpleBuffer::FORWARD);
rowid_buffer.set_buffer_space(full_buf, full_buf_end);
if (do_sort_keys)
{
......@@ -504,7 +379,7 @@ int DsMrr_impl::dsmrr_init(handler *h_arg, RANGE_SEQ_IF *seq_funcs,
dsmrr_fill_key_buffer();
if (dsmrr_eof && !do_rndpos_scan)
buf->end_of_used_area= key_buffer.end_of_space();
buf->end_of_used_area= key_buffer->end_of_space();
}
if (!do_rndpos_scan)
......@@ -646,9 +521,9 @@ void DsMrr_impl::dsmrr_close()
}
static int rowid_cmp(void *h, uchar *a, uchar *b)
static int rowid_cmp_reverse(void *h, uchar *a, uchar *b)
{
return ((handler*)h)->cmp_ref(a, b);
return - ((handler*)h)->cmp_ref(a, b);
}
......@@ -667,8 +542,6 @@ static int rowid_cmp(void *h, uchar *a, uchar *b)
post-condition:
rowid buffer is not empty, or key source is exhausted.
@param h Table handler
@retval 0 OK, the next portion of rowids is in the buffer,
properly ordered
@retval other Error
......@@ -689,8 +562,8 @@ int DsMrr_impl::dsmrr_fill_rowid_buffer()
last_identical_rowid= NULL;
if (do_sort_keys && key_buffer.is_reverse())
key_buffer.flip();
//if (do_sort_keys && key_buffer.is_reverse())
// key_buffer.flip();
while (rowid_buffer.can_write())
{
......@@ -721,7 +594,7 @@ int DsMrr_impl::dsmrr_fill_rowid_buffer()
dsmrr_eof= test(res == HA_ERR_END_OF_FILE);
/* Sort the buffer contents by rowid */
rowid_buffer.sort((qsort2_cmp)rowid_cmp, (void*)h);
rowid_buffer.sort((qsort2_cmp)rowid_cmp_reverse, (void*)h);
rowid_buffer.setup_reading(&rowid, h->ref_length,
is_mrr_assoc? (uchar**)&rowids_range_id: NULL, sizeof(void*));
......@@ -729,14 +602,6 @@ int DsMrr_impl::dsmrr_fill_rowid_buffer()
}
void SimpleBuffer::sort(qsort2_cmp cmp_func, void *cmp_func_arg)
{
uint elem_size= size1 + (write_ptr2 ? size2 : 0);
uint n_elements= used_size() / elem_size;
my_qsort2(used_area(), n_elements, elem_size, cmp_func, cmp_func_arg);
}
/*
my_qsort2-compatible function to compare key tuples
*/
......@@ -787,6 +652,10 @@ int DsMrr_impl::key_tuple_cmp(void* arg, uchar* key1, uchar* key2)
return 0;
}
int DsMrr_impl::key_tuple_cmp_reverse(void* arg, uchar* key1, uchar* key2)
{
return -key_tuple_cmp(arg, key1, key2);
}
/*
Setup key/rowid buffer sizes based on sample_key
......@@ -812,12 +681,13 @@ void DsMrr_impl::setup_buffer_sizes(key_range *sample_key)
my_count_bits(sample_key->keypart_map));
if (!do_rndpos_scan)
{
/* Give all space to key buffer. */
key_buffer.set_buffer_space(full_buf, full_buf_end, SimpleBuffer::FORWARD);
/* Give all space to forward key buffer. */
key_buffer= &forward_key_buf;
identical_key_it= &forward_key_it;
key_buffer->set_buffer_space(full_buf, full_buf_end);
/* Just in case, tell rowid buffer that it has zero size: */
rowid_buffer.set_buffer_space(full_buf_end, full_buf_end,
SimpleBuffer::FORWARD);
rowid_buffer.set_buffer_space(full_buf_end, full_buf_end);
return;
}
......@@ -860,10 +730,10 @@ void DsMrr_impl::setup_buffer_sizes(key_range *sample_key)
}
rowid_buffer_end= full_buf + bytes_for_rowids;
rowid_buffer.set_buffer_space(full_buf, rowid_buffer_end,
SimpleBuffer::FORWARD);
key_buffer.set_buffer_space(rowid_buffer_end, full_buf_end,
SimpleBuffer::BACKWARD);
rowid_buffer.set_buffer_space(full_buf, rowid_buffer_end);
key_buffer= &backward_key_buf;
identical_key_it= &backward_key_it;
key_buffer->set_buffer_space(rowid_buffer_end, full_buf_end);
}
......@@ -892,7 +762,7 @@ void DsMrr_impl::dsmrr_fill_key_buffer()
uchar **range_info_ptr= (uchar**)&cur_range.ptr;
DBUG_ENTER("DsMrr_impl::dsmrr_fill_key_buffer");
DBUG_ASSERT(!know_key_tuple_params || key_buffer.is_empty());
DBUG_ASSERT(!know_key_tuple_params || key_buffer->is_empty());
uchar *key_ptr;
if (know_key_tuple_params)
......@@ -903,18 +773,18 @@ void DsMrr_impl::dsmrr_fill_key_buffer()
We're using two buffers and both of them are empty now. Restore the
original sizes
*/
rowid_buffer.set_buffer_space(full_buf, rowid_buffer_end,
SimpleBuffer::FORWARD);
key_buffer.set_buffer_space(rowid_buffer_end, full_buf_end,
SimpleBuffer::BACKWARD);
rowid_buffer.set_buffer_space(full_buf, rowid_buffer_end);
key_buffer= &backward_key_buf;
identical_key_it= &backward_key_it;
key_buffer->set_buffer_space(rowid_buffer_end, full_buf_end);
}
key_buffer.reset_for_writing();
key_buffer.setup_writing(&key_ptr, key_size_in_keybuf,
key_buffer->reset_for_writing();
key_buffer->setup_writing(&key_ptr, key_size_in_keybuf,
is_mrr_assoc? (uchar**)&range_info_ptr : NULL,
sizeof(uchar*));
}
while ((!know_key_tuple_params || key_buffer.can_write()) &&
while ((!know_key_tuple_params || key_buffer->can_write()) &&
!(res= h->mrr_funcs.next(h->mrr_iter, &cur_range)))
{
DBUG_ASSERT(cur_range.range_flag & EQ_RANGE);
......@@ -923,10 +793,10 @@ void DsMrr_impl::dsmrr_fill_key_buffer()
/* This only happens when we've just started filling the buffer */
setup_buffer_sizes(&cur_range.start_key);
know_key_tuple_params= TRUE;
key_buffer.setup_writing(&key_ptr, key_size_in_keybuf,
key_buffer->setup_writing(&key_ptr, key_size_in_keybuf,
is_mrr_assoc? (uchar**)&range_info_ptr : NULL,
sizeof(uchar*));
DBUG_ASSERT(key_buffer.can_write());
DBUG_ASSERT(key_buffer->can_write());
}
/* Put key, or {key, range_id} pair into the buffer */
......@@ -935,14 +805,17 @@ void DsMrr_impl::dsmrr_fill_key_buffer()
else
key_ptr=(uchar*) cur_range.start_key.key;
key_buffer.write();
key_buffer->write();
}
dsmrr_eof= test(res);
key_buffer.sort((qsort2_cmp)DsMrr_impl::key_tuple_cmp, (void*)this);
key_buffer->sort((key_buffer->type() == Lifo_buffer::FORWARD)?
(qsort2_cmp)DsMrr_impl::key_tuple_cmp_reverse :
(qsort2_cmp)DsMrr_impl::key_tuple_cmp,
(void*)this);
key_buffer.setup_reading(&cur_index_tuple, key_size_in_keybuf,
key_buffer->setup_reading(&cur_index_tuple, key_size_in_keybuf,
is_mrr_assoc? (uchar**)&cur_range_info: NULL,
sizeof(void*));
......@@ -959,7 +832,7 @@ void DsMrr_impl::dsmrr_fill_key_buffer()
void DsMrr_impl::reallocate_buffer_space()
{
uchar *unused_start, *unused_end;
key_buffer.remove_unused_space(&unused_start, &unused_end);
key_buffer->remove_unused_space(&unused_start, &unused_end);
rowid_buffer.grow(unused_start, unused_end);
}
......@@ -1000,7 +873,7 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
while (in_identical_keys_range)
{
/* This will read to (cur_index_tuple, cur_range_info): */
res2= identical_key_it.read_next();
res2= identical_key_it->read_next();
DBUG_ASSERT(!res2);
if (cur_index_tuple == last_identical_key_ptr)
......@@ -1038,7 +911,7 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
if (last_identical_key_ptr)
{
in_identical_keys_range= TRUE;
identical_key_it.init(&key_buffer);
identical_key_it->init(key_buffer);
cur_range_info= first_identical_range_info;
}
......@@ -1053,12 +926,12 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
if (last_identical_key_ptr)
{
/* key_buffer.read() reads to (cur_index_tuple, cur_range_info) */
while (!key_buffer.read() && (cur_index_tuple != last_identical_key_ptr)) {}
while (!key_buffer->read() && (cur_index_tuple != last_identical_key_ptr)) {}
last_identical_key_ptr= NULL;
}
/* First, make sure we have a range at start of the buffer */
if (key_buffer.is_empty())
if (key_buffer->is_empty())
{
if (dsmrr_eof)
{
......@@ -1072,7 +945,7 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
if (!do_rndpos_scan)
dsmrr_fill_key_buffer();
if (key_buffer.is_empty())
if (key_buffer->is_empty())
{
res= HA_ERR_END_OF_FILE;
goto end;
......@@ -1088,7 +961,7 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
reallocate_buffer_space();
/* Get the next range to scan */
key_buffer.read(); // reads to (cur_index_tuple, cur_range_info)
key_buffer->read(); // reads to (cur_index_tuple, cur_range_info)
key_in_buf= cur_index_tuple;
if (use_key_pointers)
......@@ -1106,9 +979,9 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
/* Check if subsequent keys in the key buffer are the same as this one */
{
char *save_cur_range_info= cur_range_info;
identical_key_it.init(&key_buffer);
identical_key_it->init(key_buffer);
last_identical_key_ptr= NULL;
while (!identical_key_it.read_next())
while (!identical_key_it->read_next())
{
if (key_tuple_cmp(this, key_in_buf, cur_index_tuple))
break;
......@@ -1119,7 +992,7 @@ int DsMrr_impl::dsmrr_next_from_index(char **range_info_arg)
if (last_identical_key_ptr)
{
in_identical_keys_range= TRUE;
identical_key_it.init(&key_buffer);
identical_key_it->init(key_buffer);
first_identical_range_info= cur_range_info;
}
}
......@@ -1178,7 +1051,7 @@ int DsMrr_impl::dsmrr_next(char **range_info)
{
if (do_sort_keys)
{
if (!key_buffer.is_empty() || in_index_range)
if (!key_buffer->is_empty() || in_index_range)
{
/* There are some sorted keys left. Use them to get rowids */
if ((res= dsmrr_fill_rowid_buffer()))
......@@ -1239,9 +1112,9 @@ int DsMrr_impl::dsmrr_next(char **range_info)
Note: this implies that SQL layer doesn't touch table->record[0]
between calls.
*/
SimpleBuffer::PeekIterator identical_rowid_it;
identical_rowid_it.init(&rowid_buffer);
while (!identical_rowid_it.read_next()) // reads to (rowid, ...)
Forward_iterator it;
it.init(&rowid_buffer);
while (!it.read_next()) // reads to (rowid, ...)
{
if (h2->cmp_ref(rowid, cur_rowid))
break;
......
......@@ -46,64 +46,12 @@
storage and has better performance when reading data in rowid order.
*/
class Forward_lifo_buffer;
class Backward_lifo_buffer;
/*
An in-memory buffer used by DS-MRR implementation.
- The buffer contains fixed-size elements. The elements are either atomic
byte sequences or pairs.
- The buffer resides in memory provided by the user. It is possible to
= dynamically (ie. between write operations) add ajacent memory space to
the buffer
= dynamically remove unused space from the buffer.
- Buffer can be set to be either "forward" or "backward".
The intent of the last two properties is to allow to have two buffers on
adjacent memory space, one is being read from (and so its space shrinks)
while the other is being written to (and so it needs more and more space).
Illustration of forward buffer operation:
+-- next read will read from here
|
| +-- next write will write to here
v v
*--------------*===============*----------------*
| ^ | ^ | |
| | read_pos | write_pos |
start | | end
| |
usused space user data
For reverse buffer, start/end have the same meaning, but reading and
writing is done from end to start.
*/
class SimpleBuffer
class Lifo_buffer
{
public:
enum enum_direction {
BACKWARD=-1, /* buffer is filled/read from bigger to smaller memory addresses */
FORWARD=1 /* buffer is filled/read from smaller to bigger memory addresses */
};
private:
enum_direction direction;
uchar *start; /* points to start of buffer space */
uchar *end; /* points to just beyond the end of buffer space */
/*
Forward buffer: points to the start of the data that will be read next
Backward buffer: points to just beyond the end of the data that will be
read next.
*/
uchar *read_pos;
/*
Forward buffer: points to just after the end of the used area.
Backward buffer: points to the start of used area.
*/
uchar *write_pos;
protected:
/*
Data to be written. write() call will assume that (*write_ptr1) points to
size1 bytes of data to be written.
......@@ -123,63 +71,153 @@ class SimpleBuffer
uchar **read_ptr1;
uchar **read_ptr2;
uchar *start; /* points to start of buffer space */
uchar *end; /* points to just beyond the end of buffer space */
public:
/* Write-mode functions */
void setup_writing(uchar **data1, size_t len1,
uchar **data2, size_t len2);
void reset_for_writing();
bool can_write();
void write();
/* Read-mode functions */
bool is_empty() { return used_size() == 0; }
void setup_reading(uchar **data1, size_t len1,
uchar **data2, size_t len2);
bool read();
/* Misc functions */
void sort(qsort2_cmp cmp_func, void *cmp_func_arg);
bool is_reverse() { return direction == BACKWARD; }
uchar *end_of_space();
enum enum_direction {
BACKWARD=-1, /* buffer is filled/read from bigger to smaller memory addresses */
FORWARD=1 /* buffer is filled/read from smaller to bigger memory addresses */
};
virtual enum_direction type() = 0;
/* Buffer space control functions */
void set_buffer_space(uchar *start_arg, uchar *end_arg, enum_direction direction_arg)
void set_buffer_space(uchar *start_arg, uchar *end_arg)
{
start= start_arg;
end= end_arg;
direction= direction_arg;
TRASH(start, end - start);
reset_for_writing();
}
/*
Stop using/return the unneded space (the one that we have already wrote
to read from).
*/
void remove_unused_space(uchar **unused_start, uchar **unused_end)
void setup_writing(uchar **data1, size_t len1, uchar **data2, size_t len2)
{
if (direction == 1)
write_ptr1= data1;
size1= len1;
write_ptr2= data2;
size2= len2;
}
void setup_reading(uchar **data1, size_t len1, uchar **data2, size_t len2)
{
*unused_start= start;
*unused_end= read_pos;
start= read_pos;
read_ptr1= data1;
DBUG_ASSERT(len1 == size1);
read_ptr2= data2;
DBUG_ASSERT(len2 == size2);
}
else
//virtual void write_bytes(const uchar *data, size_t bytes)=0;
virtual bool read() = 0;
virtual void write() = 0;
bool can_write()
{
return have_space_for(size1 + (write_ptr2 ? size2 : 0));
}
bool is_empty() { return used_size() == 0; }
virtual size_t used_size() = 0;
void sort(qsort2_cmp cmp_func, void *cmp_func_arg)
{
*unused_start= read_pos;
*unused_end= end;
end= read_pos;
uint elem_size= size1 + (write_ptr2 ? size2 : 0);
uint n_elements= used_size() / elem_size;
my_qsort2(used_area(), n_elements, elem_size, cmp_func, cmp_func_arg);
}
virtual void reset_for_writing() = 0;
virtual uchar *end_of_space() = 0;
bool have_data(size_t bytes)
{
return (used_size() >= bytes);
}
virtual bool have_space_for(size_t bytes) = 0;
//virtual uchar *read_bytes(size_t bytes) = 0;
virtual void remove_unused_space(uchar **unused_start, uchar **unused_end)=0;
virtual uchar *used_area() = 0;
class Iterator
{
public:
virtual void init(Lifo_buffer *buf) = 0;
/*
Read the next value. The calling convention is the same as buf->read()
has.
void flip()
RETURN
FALSE - Ok
TRUE - EOF, reached the end of the buffer
*/
virtual bool read_next()= 0;
virtual ~Iterator() {}
protected:
Lifo_buffer *buf;
virtual uchar *get_next(size_t nbytes)=0;
};
virtual ~Lifo_buffer() {};
friend class Forward_iterator;
friend class Backward_iterator;
};
class Forward_lifo_buffer: public Lifo_buffer
{
uchar *pos;
public:
enum_direction type() { return FORWARD; }
size_t used_size()
{
return pos - start;
}
void reset_for_writing()
{
pos= start;
}
uchar *end_of_space() { return pos; }
bool have_space_for(size_t bytes)
{
uchar *tmp= read_pos;
read_pos= write_pos;
write_pos= tmp;
direction= (direction == FORWARD)? BACKWARD: FORWARD;
return (pos + bytes < end);
}
void write()
{
write_bytes(*write_ptr1, size1);
if (write_ptr2)
write_bytes(*write_ptr2, size2);
}
void write_bytes(const uchar *data, size_t bytes)
{
DBUG_ASSERT(have_space_for(bytes));
memcpy(pos, data, bytes);
pos += bytes;
}
uchar *read_bytes(size_t bytes)
{
DBUG_ASSERT(have_data(bytes));
pos= pos - bytes;
return pos;
}
bool read()
{
if (!have_data(size1 + (read_ptr2 ? size2 : 0)))
return TRUE;
if (read_ptr2)
*read_ptr2= read_bytes(size2);
*read_ptr1= read_bytes(size1);
return FALSE;
}
/*
Stop using/return the unneded space (the one that we have already wrote
to read from).
*/
void remove_unused_space(uchar **unused_start, uchar **unused_end)
{
DBUG_ASSERT(0); /* Don't need this yet */
}
void grow(uchar *unused_start, uchar *unused_end)
{
/*
......@@ -189,50 +227,151 @@ class SimpleBuffer
*/
DBUG_ASSERT(unused_end >= unused_start);
TRASH(unused_start, unused_end - unused_start);
if (direction == 1 && end == unused_start)
{
DBUG_ASSERT(end == unused_start);
end= unused_end;
}
else if (direction == -1 && start == unused_end)
/* Return pointer to start of the memory area that is occupied by the data */
uchar *used_area() { return start; }
friend class Forward_iterator;
};
class Forward_iterator : public Lifo_buffer::Iterator
{
uchar *pos;
/* Return pointer to next chunk of nbytes bytes and avance over it */
uchar *get_next(size_t nbytes)
{
start= unused_start;
if (pos - nbytes < ((Forward_lifo_buffer*)buf)->start)
return NULL;
pos -= nbytes;
return pos;
}
public:
bool read_next()
{
uchar *res;
if (buf->read_ptr2)
{
if ((res= get_next(buf->size2)))
{
*(buf->read_ptr2)= res;
*buf->read_ptr1= get_next(buf->size1);
return FALSE;
}
}
else
DBUG_ASSERT(0); /* Attempt to grow buffer in wrong direction */
{
if ((res= get_next(buf->size1)))
{
*(buf->read_ptr1)= res;
return FALSE;
}
}
return TRUE; /* EOF */
}
void init(Lifo_buffer *buf_arg)
{
DBUG_ASSERT(buf_arg->type() == Lifo_buffer::FORWARD);
buf= buf_arg;
pos= ((Forward_lifo_buffer*)buf)->pos;
}
};
class Backward_lifo_buffer: public Lifo_buffer
{
uchar *pos;
public:
enum_direction type() { return BACKWARD; }
size_t used_size()
{
return end - pos;
}
void reset_for_writing()
{
pos= end;
}
uchar *end_of_space() { return end; }
bool have_space_for(size_t bytes)
{
return (pos - bytes >= start);
}
void write()
{
if (write_ptr2)
write_bytes(*write_ptr2, size2);
write_bytes(*write_ptr1, size1);
}
void write_bytes(const uchar *data, size_t bytes)
{
DBUG_ASSERT(have_space_for(bytes));
pos -= bytes;
memcpy(pos, data, bytes);
}
bool read()
{
if (!have_data(size1 + (read_ptr2 ? size2 : 0)))
return TRUE;
*read_ptr1= read_bytes(size1);
if (read_ptr2)
*read_ptr2= read_bytes(size2);
return FALSE;
}
uchar *read_bytes(size_t bytes)
{
DBUG_ASSERT(have_data(bytes));
uchar *ret= pos;
pos= pos + bytes;
return ret;
}
/*
An iterator to do look at what we're about to read from the buffer without
actually reading it.
Stop using/return the unneded space (the one that we have already wrote
to and have read from).
*/
class PeekIterator
void remove_unused_space(uchar **unused_start, uchar **unused_end)
{
*unused_start= start;
*unused_end= pos;
start= pos;
}
void grow(uchar *unused_start, uchar *unused_end)
{
SimpleBuffer *buf; /* The buffer we're iterating over*/
/*
if buf->direction==FORWARD : pointer to what to return next
if buf->direction==BACKWARD : pointer to the end of what is to be
returned next
Passed memory area can be meaningfully used for growing the buffer if:
- it is adjacent to buffer space we're using
- it is on the end towards which we grow.
*/
uchar *pos;
public:
/*
Initialize the iterator. After intiialization, the first read_next() call
will read what buf_arg->read() would read.
DBUG_ASSERT(unused_end >= unused_start);
TRASH(unused_start, unused_end - unused_start);
DBUG_ASSERT(start == unused_end);
start= unused_start;
*/
void init(SimpleBuffer *buf_arg)
{
buf= buf_arg;
pos= buf->read_pos;
DBUG_ASSERT(0); //Not used
}
/* Return pointer to start of the memory area that is occupied by the data */
uchar *used_area() { return pos; }
friend class Backward_iterator;
};
/*
Read the next value. The calling convention is the same as buf->read()
has.
RETURN
FALSE - Ok
TRUE - EOF, reached the end of the buffer
*/
class Backward_iterator : public Lifo_buffer::Iterator
{
uchar *pos;
/* Return pointer to next chunk of nbytes bytes and advance over it */
uchar *get_next(size_t nbytes)
{
if (pos + nbytes > ((Backward_lifo_buffer*)buf)->end)
return NULL;
uchar *res= pos;
pos += nbytes;
return res;
}
public:
bool read_next()
{
/*
......@@ -249,39 +388,45 @@ class SimpleBuffer
}
return TRUE; /* EOF */
}
private:
/* Return pointer to next chunk of nbytes bytes and avance over it */
uchar *get_next(size_t nbytes)
{
if (buf->direction == 1)
{
if (pos + nbytes > buf->write_pos)
return NULL;
uchar *res= pos;
pos += nbytes;
return res;
}
else
void init(Lifo_buffer *buf_arg)
{
if (pos - nbytes < buf->write_pos)
return NULL;
pos -= nbytes;
return pos;
}
DBUG_ASSERT(buf_arg->type() == Lifo_buffer::BACKWARD);
buf= buf_arg;
pos= ((Backward_lifo_buffer*)buf)->pos;
}
};
};
private:
bool have_space_for(size_t bytes);
/* Return pointer to start of the memory area that is occupied by the data */
uchar *used_area() { return (direction == FORWARD)? read_pos : write_pos; }
size_t used_size();
void write(const uchar *data, size_t bytes);
uchar *read(size_t bytes);
bool have_data(size_t bytes);
};
/*
An in-memory buffer used by DS-MRR implementation.
- The buffer contains fixed-size elements. The elements are either atomic
byte sequences or pairs.
- The buffer resides in memory provided by the user. It is possible to
= dynamically (ie. between write operations) add ajacent memory space to
the buffer
= dynamically remove unused space from the buffer.
- Buffer can be set to be either "forward" or "backward".
The intent of the last two properties is to allow to have two buffers on
adjacent memory space, one is being read from (and so its space shrinks)
while the other is being written to (and so it needs more and more space).
Illustration of forward buffer operation:
+-- next read will read from here
|
| +-- next write will write to here
v v
*--------------*===============*----------------*
| ^ | ^ | |
| | read_pos | write_pos |
start | | end
| |
usused space user data
For reverse buffer, start/end have the same meaning, but reading and
writing is done from end to start.
*/
/*
DS-MRR implementation for one table. Create/use one object of this class for
......@@ -439,8 +584,18 @@ class DsMrr_impl
/* TRUE<=> we're in a middle of enumerating records for a key range */
bool in_index_range;
/*
One of the following two is used for key buffer: forward is used when
we only need key buffer, backward is used when we need both key and rowid
buffers.
*/
Forward_lifo_buffer forward_key_buf;
Forward_iterator forward_key_it;
Backward_lifo_buffer backward_key_buf;
Backward_iterator backward_key_it;
/* Buffer to store (key, range_id) pairs */
SimpleBuffer key_buffer;
Lifo_buffer *key_buffer;
/* key_buffer.read() reads */
uchar *cur_index_tuple;
......@@ -489,7 +644,7 @@ class DsMrr_impl
and do that by walking from current buffer read position until we get
last_identical_key_ptr.
*/
SimpleBuffer::PeekIterator identical_key_it;
Lifo_buffer::Iterator *identical_key_it;
/** rnd_pos() scan and rowid buffer-related members **/
......@@ -498,7 +653,7 @@ class DsMrr_impl
Buffer to store (rowid, range_id) pairs, or just rowids if
is_mrr_assoc==FALSE
*/
SimpleBuffer rowid_buffer;
Forward_lifo_buffer rowid_buffer;
/* rowid_buffer.read() will set the following: */
uchar *rowid;
......@@ -522,6 +677,7 @@ class DsMrr_impl
uint *buffer_size, COST_VECT *cost);
bool check_cpk_scan(THD *thd, uint keyno, uint mrr_flags);
static int key_tuple_cmp(void* arg, uchar* key1, uchar* key2);
static int key_tuple_cmp_reverse(void* arg, uchar* key1, uchar* key2);
int dsmrr_fill_rowid_buffer();
void dsmrr_fill_key_buffer();
int dsmrr_next_from_index(char **range_info);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment