Commit 0786b0d2 authored by Sergei Petrunia's avatar Sergei Petrunia

Make window function computation a part of the query plan

Added class Window_funcs_computation, with setup() method to setup
execution, and exec() to run window function computation.

setup() is currently trivial. In the future, it is expected to optimize
the number of sorting operations and passes that are done over the temp.
table.
parent 6ad9ac21
...@@ -233,7 +233,7 @@ static bool list_contains_unique_index(TABLE *table, ...@@ -233,7 +233,7 @@ static bool list_contains_unique_index(TABLE *table,
bool (*find_func) (Field *, void *), void *data); bool (*find_func) (Field *, void *), void *data);
static bool find_field_in_item_list (Field *field, void *data); static bool find_field_in_item_list (Field *field, void *data);
static bool find_field_in_order_list (Field *field, void *data); static bool find_field_in_order_list (Field *field, void *data);
int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab); int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab, Filesort *fsort);
static int remove_dup_with_compare(THD *thd, TABLE *entry, Field **field, static int remove_dup_with_compare(THD *thd, TABLE *entry, Field **field,
Item *having); Item *having);
static int remove_dup_with_hash_index(THD *thd,TABLE *table, static int remove_dup_with_hash_index(THD *thd,TABLE *table,
...@@ -2706,6 +2706,14 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List<Item> *table_fields, ...@@ -2706,6 +2706,14 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List<Item> *table_fields,
tab->table= table; tab->table= table;
table->reginfo.join_tab= tab; table->reginfo.join_tab= tab;
// psergey-todo: this is probably an incorrect place:
if (select_lex->window_funcs.elements)
{
tab->window_funcs= new Window_funcs_computation;
if (tab->window_funcs->setup(thd, &select_lex->window_funcs))
goto err;
}
/* if group or order on first table, sort first */ /* if group or order on first table, sort first */
if (group_list && simple_group) if (group_list && simple_group)
{ {
...@@ -19189,7 +19197,7 @@ JOIN_TAB::sort_table() ...@@ -19189,7 +19197,7 @@ JOIN_TAB::sort_table()
DBUG_ASSERT(join->ordered_index_usage != (filesort->order == join->order ? DBUG_ASSERT(join->ordered_index_usage != (filesort->order == join->order ?
JOIN::ordered_index_order_by : JOIN::ordered_index_order_by :
JOIN::ordered_index_group_by)); JOIN::ordered_index_group_by));
rc= create_sort_index(join->thd, join, this); rc= create_sort_index(join->thd, join, this, NULL);
return (rc != 0); return (rc != 0);
} }
...@@ -21160,8 +21168,8 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit, ...@@ -21160,8 +21168,8 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit,
thd Thread handler thd Thread handler
join Join with table to sort join Join with table to sort
join_tab What table to sort join_tab What table to sort
fsort Filesort object. NULL means "use tab->filesort".
IMPLEMENTATION IMPLEMENTATION
- If there is an index that can be used, the first non-const join_tab in - If there is an index that can be used, the first non-const join_tab in
'join' is modified to use this index. 'join' is modified to use this index.
...@@ -21177,17 +21185,19 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit, ...@@ -21177,17 +21185,19 @@ test_if_skip_sort_order(JOIN_TAB *tab,ORDER *order,ha_rows select_limit,
*/ */
int int
create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab) create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab, Filesort *fsort)
{ {
ha_rows examined_rows; ha_rows examined_rows;
ha_rows found_rows; ha_rows found_rows;
ha_rows filesort_retval= HA_POS_ERROR; ha_rows filesort_retval= HA_POS_ERROR;
TABLE *table; TABLE *table;
SQL_SELECT *select; SQL_SELECT *select;
Filesort *fsort= tab->filesort;
bool quick_created= FALSE; bool quick_created= FALSE;
DBUG_ENTER("create_sort_index"); DBUG_ENTER("create_sort_index");
if (fsort == NULL)
fsort= tab->filesort;
// One row, no need to sort. make_tmp_tables_info should already handle this. // One row, no need to sort. make_tmp_tables_info should already handle this.
DBUG_ASSERT(!join->only_const_tables() && fsort); DBUG_ASSERT(!join->only_const_tables() && fsort);
table= tab->table; table= tab->table;
...@@ -25898,7 +25908,11 @@ AGGR_OP::end_send() ...@@ -25898,7 +25908,11 @@ AGGR_OP::end_send()
// Update ref array // Update ref array
join_tab->join->set_items_ref_array(*join_tab->ref_array); join_tab->join->set_items_ref_array(*join_tab->ref_array);
join->process_window_functions(&join->select_lex->window_funcs); if (join_tab->window_funcs)
{
join_tab->window_funcs->exec(join);
}
table->reginfo.lock_type= TL_UNLOCK; table->reginfo.lock_type= TL_UNLOCK;
bool in_first_read= true; bool in_first_read= true;
......
...@@ -423,6 +423,12 @@ typedef struct st_join_table { ...@@ -423,6 +423,12 @@ typedef struct st_join_table {
/* Sorting related info */ /* Sorting related info */
Filesort *filesort; Filesort *filesort;
/*
Non-NULL value means this join_tab must do window function computation
before reading.
*/
Window_funcs_computation* window_funcs;
bool used_for_window_func; bool used_for_window_func;
...@@ -1494,8 +1500,6 @@ class JOIN :public Sql_alloc ...@@ -1494,8 +1500,6 @@ class JOIN :public Sql_alloc
int init_execution(); int init_execution();
void exec(); void exec();
bool process_window_functions(List<Item_window_func> *window_funcs);
void exec_inner(); void exec_inner();
bool prepare_result(List<Item> **columns_list); bool prepare_result(List<Item> **columns_list);
int destroy(); int destroy();
...@@ -2270,5 +2274,5 @@ class Pushdown_query: public Sql_alloc ...@@ -2270,5 +2274,5 @@ class Pushdown_query: public Sql_alloc
bool test_if_order_compatible(SQL_I_List<ORDER> &a, SQL_I_List<ORDER> &b); bool test_if_order_compatible(SQL_I_List<ORDER> &a, SQL_I_List<ORDER> &b);
int test_if_group_changed(List<Cached_item> &list); int test_if_group_changed(List<Cached_item> &list);
int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab); int create_sort_index(THD *thd, JOIN *join, JOIN_TAB *tab, Filesort *fsort);
#endif /* SQL_SELECT_INCLUDED */ #endif /* SQL_SELECT_INCLUDED */
...@@ -1439,6 +1439,197 @@ bool compute_two_pass_window_functions(Item_window_func *item_win, ...@@ -1439,6 +1439,197 @@ bool compute_two_pass_window_functions(Item_window_func *item_win,
return is_error; return is_error;
} }
/* Make a list that is a concation of two lists of ORDER elements */
static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
{
if (!list1)
{
list1= list2;
list2= NULL;
}
ORDER *res= NULL; // first element in the new list
ORDER *prev= NULL; // last element in the new list
ORDER *cur_list= list1; // this goes through list1, list2
while (cur_list)
{
for (ORDER *cur= cur_list; cur; cur= cur->next)
{
ORDER *copy= (ORDER*)alloc_root(mem_root, sizeof(ORDER));
memcpy(copy, cur, sizeof(ORDER));
if (prev)
prev->next= copy;
prev= copy;
if (!res)
res= copy;
}
cur_list= (cur_list == list1)? list2: NULL;
}
if (prev)
prev->next= NULL;
return res;
}
bool Window_func_runner::setup(THD *thd)
{
Window_spec *spec = win_func->window_spec;
ORDER* sort_order= concat_order_lists(thd->mem_root,
spec->partition_list->first,
spec->order_list->first);
filesort= new (thd->mem_root) Filesort(sort_order, HA_POS_ERROR, NULL);
Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
switch (type)
{
case Item_sum::ROW_NUMBER_FUNC:
case Item_sum::RANK_FUNC:
case Item_sum::DENSE_RANK_FUNC:
{
/*
One-pass window function computation, walk through the rows and
assign values.
*/
compute_func= compute_window_func_values;
break;
}
case Item_sum::PERCENT_RANK_FUNC:
case Item_sum::CUME_DIST_FUNC:
{
compute_func= compute_two_pass_window_functions;
break;
}
case Item_sum::COUNT_FUNC:
case Item_sum::SUM_BIT_FUNC:
case Item_sum::SUM_FUNC:
case Item_sum::AVG_FUNC:
{
/*
Frame-aware window function computation. It does one pass, but
uses three cursors -frame_start, current_row, and frame_end.
*/
compute_func= compute_window_func_with_frames;
break;
}
default:
DBUG_ASSERT(0);
}
first_run= true;
return false;
}
/*
Compute the value of window function for all rows.
*/
bool Window_func_runner::exec(JOIN *join)
{
THD *thd= join->thd;
/*
We have to call setup_partition_border_check here.
The reason is as follows:
When computing the value of sorting criteria from OVER (PARTITION ...
ORDER BY ...) clauses, we need to read temp.table fields.
This is achieved by ORDER::item being Item** object, which points into
ref_pointer_array.
Ref_pointer_array initially points to source table fields.
At execution phase, it is set to point to the temp.table fields.
We need to use items after this step is done.
TODO: an alternative is to use something that points into
ref_pointer_array, too. Something like wrap order->item in an Item_ref
object.
*/
if (first_run)
{
win_func->setup_partition_border_check(thd);
first_run= false;
}
if (create_sort_index(thd, join, &join->join_tab[join->top_join_tab_count],
filesort));
win_func->set_phase_to_computation();
/*
Go through the sorted array and compute the window function
*/
READ_RECORD info;
TABLE *tbl= join->join_tab[join->top_join_tab_count].table;
if (init_read_record(&info, thd, tbl, NULL/*select*/, 0, 1, FALSE))
return true;
bool is_error= compute_func(win_func, tbl, &info);
/* This calls filesort_free_buffers(): */
end_read_record(&info);
//TODO: should this be moved to cleanup: ?
free_io_cache(tbl);
win_func->set_phase_to_retrieval();
return is_error;
}
bool Window_funcs_computation::setup(THD *thd,
List<Item_window_func> *window_funcs)
{
List_iterator_fast<Item_window_func> it(*window_funcs);
Item_window_func *item_win;
Window_func_runner *runner;
// for each window function
while ((item_win= it++))
{
// Create a runner and call setup for it
if (!(runner= new Window_func_runner(item_win)) ||
runner->setup(thd))
{
return true;
}
win_func_runners.push_back(runner, thd->mem_root);
}
return false;
}
bool Window_funcs_computation::exec(JOIN *join)
{
List_iterator<Window_func_runner> it(win_func_runners);
Window_func_runner *runner;
/* Execute each runner */
while ((runner = it++))
{
if (runner->exec(join))
return true;
}
return false;
}
void Window_funcs_computation::cleanup()
{
List_iterator<Window_func_runner> it(win_func_runners);
Window_func_runner *runner;
while ((runner = it++))
{
runner->cleanup();
delete runner;
}
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Unneeded comments (will be removed when we develop a replacement for // Unneeded comments (will be removed when we develop a replacement for
// the feature that was attempted here // the feature that was attempted here
...@@ -1564,112 +1755,3 @@ bool compute_two_pass_window_functions(Item_window_func *item_win, ...@@ -1564,112 +1755,3 @@ bool compute_two_pass_window_functions(Item_window_func *item_win,
else else
#endif #endif
/*
@brief
This function is called by JOIN::exec to compute window function values
@detail
JOIN::exec calls this after it has filled the temporary table with query
output. The temporary table has fields to store window function values.
@return
false OK
true Error
*/
bool JOIN::process_window_functions(List<Item_window_func> *window_funcs)
{
List_iterator_fast<Item_window_func> it(*window_funcs);
Item_window_func *item_win;
while ((item_win= it++))
{
item_win->set_phase_to_computation();
Window_spec *spec = item_win->window_spec;
/*
The sorting criteria should be
(spec->partition_list, spec->order_list)
Connect the two lists for the duration of add_sorting_to_table()
call.
*/
DBUG_ASSERT(spec->partition_list->next[0] == NULL);
*(spec->partition_list->next)= spec->order_list->first;
/*
join_tab[top_join_tab_count].table is the temp. table where join
output was stored.
*/
// CAUTION: The sorting criteria list is not yet connected
add_sorting_to_table(&join_tab[top_join_tab_count],
spec->partition_list->first);
join_tab[top_join_tab_count].used_for_window_func= true;
create_sort_index(this->thd, this, &join_tab[top_join_tab_count]);
/* Disconnect order_list from partition_list */
*(spec->partition_list->next)= NULL;
/*
Go through the sorted array and compute the window function
*/
READ_RECORD info;
TABLE *tbl= join_tab[top_join_tab_count].table;
if (init_read_record(&info, thd, tbl, select, 0, 1, FALSE))
return true;
bool is_error= false;
item_win->setup_partition_border_check(thd);
Item_sum::Sumfunctype type= item_win->window_func()->sum_func();
switch (type) {
case Item_sum::ROW_NUMBER_FUNC:
case Item_sum::RANK_FUNC:
case Item_sum::DENSE_RANK_FUNC:
{
/*
One-pass window function computation, walk through the rows and
assign values.
*/
if (compute_window_func_values(item_win, tbl, &info))
is_error= true;
break;
}
case Item_sum::PERCENT_RANK_FUNC:
case Item_sum::CUME_DIST_FUNC:
{
if (compute_two_pass_window_functions(item_win, tbl, &info))
is_error= true;
break;
}
case Item_sum::COUNT_FUNC:
case Item_sum::SUM_BIT_FUNC:
case Item_sum::SUM_FUNC:
case Item_sum::AVG_FUNC:
{
/*
Frame-aware window function computation. It does one pass, but
uses three cursors -frame_start, current_row, and frame_end.
*/
if (compute_window_func_with_frames(item_win, tbl, &info))
is_error= true;
break;
}
default:
DBUG_ASSERT(0);
}
item_win->set_phase_to_retrieval();
/* This calls filesort_free_buffers(): */
end_read_record(&info);
delete join_tab[top_join_tab_count].filesort;
join_tab[top_join_tab_count].filesort= NULL;
free_io_cache(tbl);
if (is_error)
return true;
}
return false;
}
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include "my_global.h" #include "my_global.h"
#include "item.h" #include "item.h"
#include "filesort.h"
#include "records.h"
class Item_window_func; class Item_window_func;
...@@ -134,4 +136,62 @@ int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables, ...@@ -134,4 +136,62 @@ int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
List<Item> &fields, List<Item> &all_fields, List<Item> &fields, List<Item> &all_fields,
List<Window_spec> &win_specs, List<Item_window_func> &win_funcs); List<Window_spec> &win_specs, List<Item_window_func> &win_funcs);
//////////////////////////////////////////////////////////////////////////////
// Classes that make window functions computation a part of SELECT's query plan
//////////////////////////////////////////////////////////////////////////////
typedef bool (*window_compute_func_t)(Item_window_func *item_win,
TABLE *tbl, READ_RECORD *info);
/*
This handles computation of one window function.
Currently, we make a spearate filesort() call for each window function.
*/
class Window_func_runner : public Sql_alloc
{
Item_window_func *win_func;
/* Window function can be computed over this sorting */
Filesort *filesort;
/* The function to use for computation*/
window_compute_func_t compute_func;
bool first_run;
public:
Window_func_runner(Item_window_func *win_func_arg) :
win_func(win_func_arg)
{}
// Set things up. Create filesort structures, etc
bool setup(THD *thd);
// This sorts and runs the window function.
bool exec(JOIN *join);
void cleanup() { delete filesort; }
};
/*
This is a "window function computation phase": a single object of this class
takes care of computing all window functions in a SELECT.
- JOIN optimizer is exected to call setup() during query optimization.
- JOIN::exec() should call exec() once it has collected join output in a
temporary table.
*/
class Window_funcs_computation : public Sql_alloc
{
List<Window_func_runner> win_func_runners;
public:
bool setup(THD *thd, List<Item_window_func> *window_funcs);
bool exec(JOIN *join);
void cleanup();
};
#endif /* SQL_WINDOW_INCLUDED */ #endif /* SQL_WINDOW_INCLUDED */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment