Commit 04796392 authored by Sergei Petrunia's avatar Sergei Petrunia Committed by Vicențiu Ciorbaru

MDEV-9736: Window functions: multiple cursors to read filesort result

Add support for having multiple IO_CACHEs with type=READ_CACHE to share
the file they are reading from.
Each IO_CACHE keeps its own in-memory buffer. When doing a read or seek
operation on the file, it notifies other IO_CACHEs that the file position
has been changed.

Make Rowid_seq_cursor use cloned IO_CACHE when reading filesort result.
parent 6e401572
...@@ -472,6 +472,8 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -472,6 +472,8 @@ typedef struct st_io_cache /* Used when cacheing files */
const char *dir; const char *dir;
char prefix[3]; char prefix[3];
File file; /* file descriptor */ File file; /* file descriptor */
struct st_io_cache *next_file_user;
/* /*
seek_not_done is set by my_b_seek() to inform the upcoming read/write seek_not_done is set by my_b_seek() to inform the upcoming read/write
operation that a seek needs to be preformed prior to the actual I/O operation that a seek needs to be preformed prior to the actual I/O
...@@ -802,6 +804,11 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type, ...@@ -802,6 +804,11 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
extern void setup_io_cache(IO_CACHE* info); extern void setup_io_cache(IO_CACHE* info);
extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare, extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
IO_CACHE *write_cache, uint num_threads); IO_CACHE *write_cache, uint num_threads);
extern int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave);
void end_slave_io_cache(IO_CACHE *cache);
void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset);
extern void remove_io_thread(IO_CACHE *info); extern void remove_io_thread(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,uchar *Buffer,size_t Count); extern int _my_b_async_read(IO_CACHE *info,uchar *Buffer,size_t Count);
extern int my_b_append(IO_CACHE *info,const uchar *Buffer,size_t Count); extern int my_b_append(IO_CACHE *info,const uchar *Buffer,size_t Count);
......
create table t0 (a int);
insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
create table t1(a int);
insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
create table t10 (a int, b int, c int);
insert into t10
select
A.a + 1000*B.a,
A.a + 1000*B.a,
A.a + 1000*B.a
from t1 A, t0 B
order by A.a+1000*B.a;
#################################################################
## Try a basic example
flush status;
create table t21 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 0
set sort_buffer_size=1024;
flush status;
create table t22 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 35
include/diff_tables.inc [t21, t22]
drop table t21, t22;
#################################################################
# Try many cursors
set sort_buffer_size=default;
flush status;
create table t21 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 0
set sort_buffer_size=1024;
flush status;
create table t22 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 35
include/diff_tables.inc [t21, t22]
drop table t21, t22;
#################################################################
# Try having cursors pointing at different IO_CACHE pages
# in the IO_CACHE
set sort_buffer_size=default;
flush status;
create table t21 as
select
a,
sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 0
set sort_buffer_size=1024;
flush status;
create table t22 as
select
a,
sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
from
t10;
show status like 'Sort_merge_passes';
Variable_name Value
Sort_merge_passes 35
include/diff_tables.inc [t21, t22]
drop table t21, t22;
#################################################################
drop table t10;
drop table t0,t1;
#
# Tests for window functions over big datasets.
# "Big" here is "big enough so that filesort result doesn't fit in a
# memory buffer".
#
#
create table t0 (a int);
insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
create table t1(a int);
insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
create table t10 (a int, b int, c int);
insert into t10
select
A.a + 1000*B.a,
A.a + 1000*B.a,
A.a + 1000*B.a
from t1 A, t0 B
order by A.a+1000*B.a;
--echo #################################################################
--echo ## Try a basic example
flush status;
create table t21 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
from
t10;
show status like 'Sort_merge_passes';
set sort_buffer_size=1024;
flush status;
create table t22 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
from
t10;
show status like 'Sort_merge_passes';
let $diff_tables= t21, t22;
source include/diff_tables.inc;
drop table t21, t22;
--echo #################################################################
--echo # Try many cursors
set sort_buffer_size=default;
flush status;
create table t21 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
from
t10;
show status like 'Sort_merge_passes';
set sort_buffer_size=1024;
flush status;
create table t22 as
select
sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
from
t10;
show status like 'Sort_merge_passes';
let $diff_tables= t21, t22;
source include/diff_tables.inc;
drop table t21, t22;
--echo #################################################################
--echo # Try having cursors pointing at different IO_CACHE pages
--echo # in the IO_CACHE
set sort_buffer_size=default;
flush status;
create table t21 as
select
a,
sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
from
t10;
show status like 'Sort_merge_passes';
set sort_buffer_size=1024;
flush status;
create table t22 as
select
a,
sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
from
t10;
show status like 'Sort_merge_passes';
let $diff_tables= t21, t22;
source include/diff_tables.inc;
drop table t21, t22;
--echo #################################################################
drop table t10;
drop table t0,t1;
...@@ -193,6 +193,7 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, ...@@ -193,6 +193,7 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
info->alloced_buffer = 0; info->alloced_buffer = 0;
info->buffer=0; info->buffer=0;
info->seek_not_done= 0; info->seek_not_done= 0;
info->next_file_user= NULL;
if (file >= 0) if (file >= 0)
{ {
...@@ -328,6 +329,101 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize, ...@@ -328,6 +329,101 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
DBUG_RETURN(0); DBUG_RETURN(0);
} /* init_io_cache */ } /* init_io_cache */
/*
Initialize the slave IO_CACHE to read the same file (and data)
as master does.
One can create multiple slaves from a single master. Every slave and master
will have independent file positions.
The master must be a non-shared READ_CACHE.
It is assumed that no more reads are done after a master and/or a slave
has been freed (this limitation can be easily lifted).
*/
int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
{
uchar *slave_buf;
DBUG_ASSERT(master->type == READ_CACHE);
DBUG_ASSERT(!master->share);
DBUG_ASSERT(master->alloced_buffer);
if (!(slave_buf= (uchar*)my_malloc(master->buffer_length, MYF(0))))
{
return 1;
}
memcpy(slave, master, sizeof(IO_CACHE));
slave->buffer= slave_buf;
memcpy(slave->buffer, master->buffer, master->buffer_length);
slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
slave->read_end= slave->buffer + (master->read_end - master->buffer);
DBUG_ASSERT(master->current_pos == &master->read_pos);
slave->current_pos= &slave->read_pos;
DBUG_ASSERT(master->current_end == &master->read_end);
slave->current_end= &slave->read_end;
if (master->next_file_user)
{
IO_CACHE *p;
for (p= master->next_file_user;
p->next_file_user !=master;
p= p->next_file_user)
{}
p->next_file_user= slave;
slave->next_file_user= master;
}
else
{
slave->next_file_user= master;
master->next_file_user= slave;
}
return 0;
}
void end_slave_io_cache(IO_CACHE *cache)
{
my_free(cache->buffer);
}
/*
Seek a read io cache to a given offset
*/
void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
{
my_off_t cached_data_start= cache->pos_in_file;
my_off_t cached_data_end= cache->pos_in_file + (cache->read_pos -
cache->buffer);
if (needed_offset >= cached_data_start &&
needed_offset < cached_data_end)
{
/*
The offset we're seeking to is in the buffer.
Move buffer's read position accordingly
*/
cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
}
else
{
if (needed_offset > cache->end_of_file)
needed_offset= cache->end_of_file;
/*
The offset we're seeking to is not in the buffer.
- Set the buffer to be exhausted.
- Make the next read to a mysql_file_seek() call to the required
offset (but still use aligned reads).
*/
cache->read_pos= cache->read_end;
cache->seek_not_done= 1;
cache->pos_in_file= (needed_offset / IO_SIZE) * IO_SIZE;
}
}
/* Wait until current request is ready */ /* Wait until current request is ready */
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
...@@ -583,6 +679,17 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count) ...@@ -583,6 +679,17 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
{ {
/* No error, reset seek_not_done flag. */ /* No error, reset seek_not_done flag. */
info->seek_not_done= 0; info->seek_not_done= 0;
if (info->next_file_user)
{
IO_CACHE *c;
for (c= info->next_file_user;
c!= info;
c= c->next_file_user)
{
c->seek_not_done= 1;
}
}
} }
else else
{ {
...@@ -671,7 +778,19 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count) ...@@ -671,7 +778,19 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
DBUG_RETURN(0); /* EOF */ DBUG_RETURN(0); /* EOF */
} }
} }
else if ((length= mysql_file_read(info->file,info->buffer, max_length, else
{
if (info->next_file_user)
{
IO_CACHE *c;
for (c= info->next_file_user;
c!= info;
c= c->next_file_user)
{
c->seek_not_done= 1;
}
}
if ((length= mysql_file_read(info->file,info->buffer, max_length,
info->myflags)) < Count || info->myflags)) < Count ||
length == (size_t) -1) length == (size_t) -1)
{ {
...@@ -688,6 +807,7 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count) ...@@ -688,6 +807,7 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
info->seek_not_done=1; info->seek_not_done=1;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
}
/* /*
Count is the remaining number of bytes requested. Count is the remaining number of bytes requested.
length is the amount of data in the cache. length is the amount of data in the cache.
......
...@@ -515,17 +515,6 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list) ...@@ -515,17 +515,6 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
// note: make rr_from_pointers static again when not need it here anymore // note: make rr_from_pointers static again when not need it here anymore
int rr_from_pointers(READ_RECORD *info); int rr_from_pointers(READ_RECORD *info);
/*
A temporary way to clone READ_RECORD structures until Monty provides the real
one.
*/
bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
{
//DBUG_ASSERT(src->table->sort.record_pointers);
DBUG_ASSERT(src->read_record == rr_from_pointers);
memcpy(dst, src, sizeof(READ_RECORD));
return false;
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
...@@ -540,68 +529,145 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst) ...@@ -540,68 +529,145 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
class Rowid_seq_cursor class Rowid_seq_cursor
{ {
public: public:
virtual ~Rowid_seq_cursor() {} Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {}
virtual ~Rowid_seq_cursor()
{
if (ref_buffer)
my_free(ref_buffer);
if (io_cache)
{
end_slave_io_cache(io_cache);
my_free(io_cache);
io_cache= NULL;
}
}
private:
/* Length of one rowid element */
size_t ref_length;
/* If io_cache=!NULL, use it */
IO_CACHE *io_cache;
uchar *ref_buffer; /* Buffer for the last returned rowid */
uint rownum; /* Number of the rowid that is about to be returned */
bool cache_eof; /* whether we've reached EOF */
/* The following are used when we are reading from an array of pointers */
uchar *cache_start;
uchar *cache_pos;
uchar *cache_end;
public:
void init(READ_RECORD *info) void init(READ_RECORD *info)
{ {
ref_length= info->ref_length;
if (info->read_record == rr_from_pointers)
{
io_cache= NULL;
cache_start= info->cache_pos; cache_start= info->cache_pos;
cache_pos= info->cache_pos; cache_pos= info->cache_pos;
cache_end= info->cache_end; cache_end= info->cache_end;
ref_length= info->ref_length; }
else
{
//DBUG_ASSERT(info->read_record == rr_from_tempfile);
rownum= 0;
cache_eof= false;
io_cache= (IO_CACHE*)my_malloc(sizeof(IO_CACHE), MYF(0));
init_slave_io_cache(info->io_cache, io_cache);
ref_buffer= (uchar*)my_malloc(ref_length, MYF(0));
}
} }
virtual int next() virtual int next()
{
if (io_cache)
{
if (cache_eof)
return 1;
if (my_b_read(io_cache,ref_buffer,ref_length))
{
cache_eof= 1; // TODO: remove cache_eof
return -1;
}
rownum++;
return 0;
}
else
{ {
/* Allow multiple next() calls in EOF state. */ /* Allow multiple next() calls in EOF state. */
if (cache_pos == cache_end) if (cache_pos == cache_end)
return -1; return -1;
cache_pos+= ref_length; cache_pos+= ref_length;
DBUG_ASSERT(cache_pos <= cache_end); DBUG_ASSERT(cache_pos <= cache_end);
}
return 0; return 0;
} }
virtual int prev() virtual int prev()
{
if (io_cache)
{
if (rownum == 0)
return -1;
move_to(rownum - 1);
return 0;
}
else
{ {
/* Allow multiple prev() calls when positioned at the start. */ /* Allow multiple prev() calls when positioned at the start. */
if (cache_pos == cache_start) if (cache_pos == cache_start)
return -1; return -1;
cache_pos-= ref_length; cache_pos-= ref_length;
DBUG_ASSERT(cache_pos >= cache_start); DBUG_ASSERT(cache_pos >= cache_start);
return 0; return 0;
} }
}
ha_rows get_rownum() const ha_rows get_rownum() const
{ {
if (io_cache)
return rownum;
else
return (cache_pos - cache_start) / ref_length; return (cache_pos - cache_start) / ref_length;
} }
void move_to(ha_rows row_number) void move_to(ha_rows row_number)
{
if (io_cache)
{
seek_io_cache(io_cache, row_number * ref_length);
rownum= row_number;
next();
}
else
{ {
cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length); cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
DBUG_ASSERT(cache_pos <= cache_end); DBUG_ASSERT(cache_pos <= cache_end);
} }
}
protected: protected:
bool at_eof() { return (cache_pos == cache_end); } bool at_eof()
uchar *get_prev_rowid()
{ {
if (cache_pos == cache_start) if (io_cache)
return NULL; {
return cache_eof;
}
else else
return cache_pos - ref_length; return (cache_pos == cache_end);
} }
uchar *get_curr_rowid() { return cache_pos; } uchar *get_curr_rowid()
{
private: if (io_cache)
uchar *cache_start; return ref_buffer;
uchar *cache_pos; else
uchar *cache_end; return cache_pos;
uint ref_length; }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment