Commit 80f1c882 authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup: Queue and Bounded_queue

Bounded_queue<> pretended to be a typesafe C++ wrapper
on top of pure C queues.h.

But it wasn't, it was tightly bounded to filesort and only useful there.

* implement Queue<> - a typesafe C++ wrapper on top of QUEUE
* move Bounded_queue to filesort.cc, remove pointless "generalizations"
  change it to use Queue.
* remove bounded_queue.h
* change subselect_rowid_merge_engine to use Queue, not QUEUE
parent 2a12af93
......@@ -76,7 +76,7 @@ uchar *queue_remove(QUEUE *queue,uint idx);
void queue_replace(QUEUE *queue,uint idx);
#define queue_remove_all(queue) { (queue)->elements= 0; }
#define queue_is_full(queue) (queue->elements == queue->max_elements)
#define queue_is_full(queue) ((queue)->elements == (queue)->max_elements)
void _downheap(QUEUE *queue, uint idx);
void queue_fix(QUEUE *queue);
#define is_queue_inited(queue) ((queue)->root != 0)
......
/* Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
#ifndef BOUNDED_QUEUE_INCLUDED
#define BOUNDED_QUEUE_INCLUDED
#include "my_base.h"
#include <my_sys.h>
#include "queues.h"
#include <string.h>
class Sort_param;
/**
A priority queue with a fixed, limited size.
This is a wrapper on top of QUEUE and the queue_xxx() functions.
It keeps the top-N elements which are inserted.
Elements of type Element_type are pushed into the queue.
For each element, we call a user-supplied keymaker_function,
to generate a key of type Key_type for the element.
Instances of Key_type are compared with the user-supplied compare_function.
The underlying QUEUE implementation needs one extra element for replacing
the lowest/highest element when pushing into a full queue.
*/
template<typename Element_type, typename Key_type>
class Bounded_queue
{
public:
Bounded_queue()
{
memset(&m_queue, 0, sizeof(m_queue));
}
~Bounded_queue()
{
delete_queue(&m_queue);
}
/**
Function for making sort-key from input data.
@param param Sort parameters.
@param to Where to put the key.
@param from The input data.
*/
typedef uint (*keymaker_function)(Sort_param *param,
Key_type *to,
Element_type *from,
bool packing_keys);
/**
Function for comparing two keys.
@param n Pointer to number of bytes to compare.
@param a First key.
@param b Second key.
@retval -1, 0, or 1 depending on whether the left argument is
less than, equal to, or greater than the right argument.
*/
typedef int (*compare_function)(size_t *n, Key_type **a, Key_type **b);
/**
Initialize the queue.
@param max_elements The size of the queue.
@param max_at_top Set to true if you want biggest element on top.
false: We keep the n largest elements.
pop() will return the smallest key in the result set.
true: We keep the n smallest elements.
pop() will return the largest key in the result set.
@param compare Compare function for elements, takes 3 arguments.
If NULL, we use get_ptr_compare(compare_length).
@param compare_length Length of the data (i.e. the keys) used for sorting.
@param keymaker Function which generates keys for elements.
@param sort_param Sort parameters.
@param sort_keys Array of pointers to keys to sort.
@retval 0 OK, 1 Could not allocate memory.
We do *not* take ownership of any of the input pointer arguments.
*/
int init(ha_rows max_elements, bool max_at_top,
compare_function compare, size_t compare_length,
keymaker_function keymaker, Sort_param *sort_param,
Key_type **sort_keys);
/**
Pushes an element on the queue.
If the queue is already full, we discard one element.
Calls keymaker_function to generate a key for the element.
@param element The element to be pushed.
*/
void push(Element_type *element);
/**
Removes the top element from the queue.
@retval Pointer to the (key of the) removed element.
@note This function is for unit testing, where we push elements into to the
queue, and test that the appropriate keys are retained.
Interleaving of push() and pop() operations has not been tested.
*/
Key_type **pop()
{
// Don't return the extra element to the client code.
if (queue_is_full((&m_queue)))
queue_remove(&m_queue, 0);
DBUG_ASSERT(m_queue.elements > 0);
if (m_queue.elements == 0)
return NULL;
return reinterpret_cast<Key_type**>(queue_remove(&m_queue, 0));
}
/**
The number of elements in the queue.
*/
uint num_elements() const { return m_queue.elements; }
/**
Is the queue initialized?
*/
bool is_initialized() const { return m_queue.max_elements > 0; }
private:
Key_type **m_sort_keys;
size_t m_compare_length;
keymaker_function m_keymaker;
Sort_param *m_sort_param;
st_queue m_queue;
};
template<typename Element_type, typename Key_type>
int Bounded_queue<Element_type, Key_type>::init(ha_rows max_elements,
bool max_at_top,
compare_function compare,
size_t compare_length,
keymaker_function keymaker,
Sort_param *sort_param,
Key_type **sort_keys)
{
DBUG_ASSERT(sort_keys != NULL);
m_sort_keys= sort_keys;
m_compare_length= compare_length;
m_keymaker= keymaker;
m_sort_param= sort_param;
// init_queue() takes an uint, and also does (max_elements + 1)
if (max_elements >= (UINT_MAX - 1))
return 1;
if (compare == NULL)
compare=
reinterpret_cast<compare_function>(get_ptr_compare(compare_length));
// We allocate space for one extra element, for replace when queue is full.
return init_queue(&m_queue, (uint) max_elements + 1,
0, max_at_top,
reinterpret_cast<queue_compare>(compare),
&m_compare_length, 0, 0);
}
template<typename Element_type, typename Key_type>
void Bounded_queue<Element_type, Key_type>::push(Element_type *element)
{
DBUG_ASSERT(is_initialized());
if (queue_is_full((&m_queue)))
{
// Replace top element with new key, and re-order the queue.
Key_type **pq_top= reinterpret_cast<Key_type **>(queue_top(&m_queue));
(void)(*m_keymaker)(m_sort_param, *pq_top, element, false);
queue_replace_top(&m_queue);
} else {
// Insert new key into the queue.
(*m_keymaker)(m_sort_param, m_sort_keys[m_queue.elements],
element, false);
queue_insert(&m_queue,
reinterpret_cast<uchar*>(&m_sort_keys[m_queue.elements]));
}
}
#endif // BOUNDED_QUEUE_INCLUDED
......@@ -31,37 +31,78 @@
#include "sql_base.h"
#include "sql_test.h" // TEST_filesort
#include "opt_range.h" // SQL_SELECT
#include "bounded_queue.h"
#include "filesort_utils.h"
#include "sql_select.h"
#include "debug_sync.h"
#include "sql_queue.h"
/* functions defined in this file */
static uint make_sortkey(Sort_param *, uchar *, uchar *, bool);
static uchar *read_buffpek_from_file(IO_CACHE *buffer_file, uint count,
uchar *buf);
static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
SORT_INFO *fs_info,
IO_CACHE *buffer_file,
IO_CACHE *tempfile,
Bounded_queue<uchar, uchar> *pq,
ha_rows *found_rows);
static bool write_keys(Sort_param *param, SORT_INFO *fs_info,
uint count, IO_CACHE *buffer_file, IO_CACHE *tempfile);
static uint make_sortkey(Sort_param *, uchar *, uchar *,
bool using_packed_sortkeys= false);
class Bounded_queue
{
public:
int init(ha_rows max_elements, size_t cmplen, Sort_param *sort_param,
uchar **sort_keys);
void push(uchar *element);
size_t num_elements() const { return m_queue.elements(); }
bool is_initialized() const { return m_queue.is_inited(); }
private:
uchar **m_sort_keys;
size_t m_compare_length;
Sort_param *m_sort_param;
Queue<uchar*,uchar*,size_t> m_queue;
};
int Bounded_queue::init(ha_rows max_elements, size_t cmplen,
Sort_param *sort_param, uchar **sort_keys)
{
DBUG_ASSERT(sort_keys != NULL);
m_sort_keys= sort_keys;
m_compare_length= cmplen;
m_sort_param= sort_param;
// init_queue() takes an uint, and also does (max_elements + 1)
if (max_elements >= UINT_MAX - 1)
return 1;
// We allocate space for one extra element, for replace when queue is full.
return m_queue.init((uint)max_elements + 1, 0, true,
(decltype(m_queue)::Queue_compare)get_ptr_compare(cmplen),
&m_compare_length);
}
void Bounded_queue::push(uchar *element)
{
DBUG_ASSERT(is_initialized());
if (m_queue.is_full())
{
// Replace top element with new key, and re-order the queue.
uchar **pq_top= m_queue.top();
make_sortkey(m_sort_param, *pq_top, element, 0);
m_queue.propagate_top();
} else {
// Insert new key into the queue.
make_sortkey(m_sort_param, m_sort_keys[m_queue.elements()], element, 0);
m_queue.push(&m_sort_keys[m_queue.elements()]);
}
}
static uchar *read_buffpek_from_file(IO_CACHE *, uint, uchar *);
static ha_rows find_all_keys(THD *, Sort_param *, SQL_SELECT *, SORT_INFO *,
IO_CACHE *, IO_CACHE *, Bounded_queue *,
ha_rows *);
static bool write_keys(Sort_param *, SORT_INFO *, uint, IO_CACHE *, IO_CACHE *);
static uint make_sortkey(Sort_param *param, uchar *to);
static uint make_packed_sortkey(Sort_param *param, uchar *to);
static void register_used_fields(Sort_param *param);
static bool save_index(Sort_param *param, uint count,
SORT_INFO *table_sort);
static bool save_index(Sort_param *, uint, SORT_INFO *);
static uint suffix_length(ulong string_length);
static uint sortlength(THD *thd, Sort_keys *sortorder,
bool *allow_packing_for_sortkeys);
static Addon_fields *get_addon_fields(TABLE *table, uint sortlength,
uint *addon_length,
uint *m_packable_length);
static uint sortlength(THD *, Sort_keys *, bool *);
static Addon_fields *get_addon_fields(TABLE *, uint, uint *, uint *);
static void store_key_part_length(uint32 num, uchar *to, uint bytes)
{
......@@ -219,7 +260,7 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
IO_CACHE tempfile, buffpek_pointers, *outfile;
Sort_param param;
bool allow_packing_for_sortkeys;
Bounded_queue<uchar, uchar> pq;
Bounded_queue pq;
SQL_SELECT *const select= filesort->select;
Sort_costs costs;
ha_rows limit_rows= filesort->limit;
......@@ -337,11 +378,7 @@ SORT_INFO *filesort(THD *thd, TABLE *table, Filesort *filesort,
point in doing lazy initialization).
*/
sort->init_record_pointers();
if (pq.init(param.limit_rows,
true, // max_at_top
NULL, // compare_function
compare_length,
&make_sortkey, &param, sort->get_sort_keys()))
if (pq.init(param.limit_rows, compare_length, &param, sort->get_sort_keys()))
{
/*
If we fail to init pq, we have to give up:
......@@ -883,10 +920,8 @@ static void dbug_print_record(TABLE *table, bool print_rowid)
*/
static ha_rows find_all_keys(THD *thd, Sort_param *param, SQL_SELECT *select,
SORT_INFO *fs_info,
IO_CACHE *buffpek_pointers,
IO_CACHE *tempfile,
Bounded_queue<uchar, uchar> *pq,
SORT_INFO *fs_info, IO_CACHE *buffpek_pointers,
IO_CACHE *tempfile, Bounded_queue *pq,
ha_rows *found_rows)
{
int error, quick_select;
......
......@@ -6659,9 +6659,8 @@ subselect_rowid_merge_engine::init(MY_BITMAP *non_null_key_parts,
if (merge_keys[i]->sort_keys())
return TRUE;
if (init_queue(&pq, merge_keys_count, 0, FALSE,
subselect_rowid_merge_engine::cmp_keys_by_cur_rownum, NULL,
0, 0))
if (pq.init(merge_keys_count, 0, false,
subselect_rowid_merge_engine::cmp_keys_by_cur_rownum))
return TRUE;
item->get_IN_subquery()->get_materialization_tracker()->
......@@ -6678,7 +6677,6 @@ subselect_rowid_merge_engine::~subselect_rowid_merge_engine()
my_free(row_num_to_rowid);
for (uint i= 0; i < merge_keys_count; i++)
delete merge_keys[i];
delete_queue(&pq);
if (tmp_table->file->inited == handler::RND)
tmp_table->file->ha_rnd_end();
}
......@@ -6720,11 +6718,11 @@ subselect_rowid_merge_engine::cmp_keys_by_null_selectivity(Ordered_key **k1,
*/
int
subselect_rowid_merge_engine::cmp_keys_by_cur_rownum(void *arg,
uchar *k1, uchar *k2)
subselect_rowid_merge_engine::cmp_keys_by_cur_rownum(void *, Ordered_key *k1,
Ordered_key *k2)
{
rownum_t r1= ((Ordered_key*) k1)->current();
rownum_t r2= ((Ordered_key*) k2)->current();
rownum_t r1= k1->current();
rownum_t r2= k2->current();
return (r1 < r2) ? -1 : (r1 > r2) ? 1 : 0;
}
......@@ -6835,7 +6833,7 @@ bool subselect_rowid_merge_engine::partial_match()
/* If there is a non-NULL key, it must be the first key in the keys array. */
DBUG_ASSERT(!non_null_key || (non_null_key && merge_keys[0] == non_null_key));
/* The prioryty queue for keys must be empty. */
DBUG_ASSERT(!pq.elements);
DBUG_ASSERT(pq.is_empty());
/* All data accesses during execution are via handler::ha_rnd_pos() */
if (unlikely(tmp_table->file->ha_rnd_init_with_error(0)))
......@@ -6862,7 +6860,7 @@ bool subselect_rowid_merge_engine::partial_match()
}
if (non_null_key)
queue_insert(&pq, (uchar *) non_null_key);
pq.push(non_null_key);
/*
Do not add the non_null_key, since it was already processed above.
*/
......@@ -6876,7 +6874,7 @@ bool subselect_rowid_merge_engine::partial_match()
bitmap_set_bit(&matching_outer_cols, merge_keys[i]->get_keyid());
}
else if (merge_keys[i]->lookup())
queue_insert(&pq, (uchar *) merge_keys[i]);
pq.push(merge_keys[i]);
}
/*
......@@ -6896,7 +6894,7 @@ bool subselect_rowid_merge_engine::partial_match()
there is a subquery row with NULLs in all unmatched columns,
then there is a partial match, otherwise the result is FALSE.
*/
if (count_nulls_in_search_key && !pq.elements)
if (count_nulls_in_search_key && pq.is_empty())
{
DBUG_ASSERT(!non_null_key);
/*
......@@ -6915,11 +6913,11 @@ bool subselect_rowid_merge_engine::partial_match()
non-null key doesn't have a match.
*/
if (!count_nulls_in_search_key &&
(!pq.elements ||
(pq.elements == 1 && non_null_key &&
(pq.is_empty() ||
(pq.elements() == 1 && non_null_key &&
max_null_in_any_row < merge_keys_count-1)))
{
if (!pq.elements)
if (pq.is_empty())
{
DBUG_ASSERT(!non_null_key);
/*
......@@ -6932,16 +6930,16 @@ bool subselect_rowid_merge_engine::partial_match()
goto end;
}
DBUG_ASSERT(pq.elements);
DBUG_ASSERT(!pq.is_empty());
min_key= (Ordered_key*) queue_remove_top(&pq);
min_key= pq.pop();
min_row_num= min_key->current();
bitmap_set_bit(&matching_keys, min_key->get_keyid());
bitmap_union(&matching_keys, &matching_outer_cols);
if (min_key->next_same())
queue_insert(&pq, (uchar *) min_key);
pq.push(min_key);
if (pq.elements == 0)
if (pq.is_empty())
{
/*
Check the only matching row of the only key min_key for NULL matches
......@@ -6953,7 +6951,7 @@ bool subselect_rowid_merge_engine::partial_match()
while (TRUE)
{
cur_key= (Ordered_key*) queue_remove_top(&pq);
cur_key= pq.pop();
cur_row_num= cur_key->current();
if (cur_row_num == min_row_num)
......@@ -6978,9 +6976,9 @@ bool subselect_rowid_merge_engine::partial_match()
}
if (cur_key->next_same())
queue_insert(&pq, (uchar *) cur_key);
pq.push(cur_key);
if (pq.elements == 0)
if (pq.is_empty())
{
/* Check the last row of the last column in PQ for NULL matches. */
res= test_null_row(min_row_num);
......@@ -6994,7 +6992,7 @@ bool subselect_rowid_merge_engine::partial_match()
end:
if (!has_covering_null_columns)
bitmap_clear_all(&matching_keys);
queue_remove_all(&pq);
pq.clear();
tmp_table->file->ha_rnd_end();
return res;
}
......
......@@ -19,7 +19,7 @@
/* subselect Item */
#include "item.h"
#include <queues.h>
#include "sql_queue.h"
class st_select_lex;
class st_select_lex_unit;
......@@ -1507,7 +1507,7 @@ class subselect_rowid_merge_engine: public subselect_partial_match_engine
Priority queue of Ordered_key indexes, one per NULLable column.
This queue is used by the partial match algorithm in method exec().
*/
QUEUE pq;
Queue<Ordered_key, Ordered_key> pq;
protected:
/*
Comparison function to compare keys in order of decreasing bitmap
......@@ -1518,7 +1518,7 @@ class subselect_rowid_merge_engine: public subselect_partial_match_engine
Comparison function used by the priority queue pq, the 'smaller' key
is the one with the smaller current row number.
*/
static int cmp_keys_by_cur_rownum(void *arg, uchar *k1, uchar *k2);
static int cmp_keys_by_cur_rownum(void *arg, Ordered_key *k1, Ordered_key *k2);
bool test_null_row(rownum_t row_num);
bool exists_complementing_null_row(MY_BITMAP *keys_to_complement);
......
......@@ -65,7 +65,7 @@ class PROFILING;
Implements a persistent FIFO using server List method names. Not
thread-safe. Intended to be used on thread-local data only.
*/
template <class T> class Queue
template <class T> class FIFO_Queue
{
private:
......@@ -78,7 +78,7 @@ template <class T> class Queue
struct queue_item *first, *last;
public:
Queue()
FIFO_Queue()
{
elements= 0;
first= last= NULL;
......@@ -95,7 +95,7 @@ template <class T> class Queue
elements= 0;
}
ulong elements; /* The count of items in the Queue */
ulong elements; /* The count of items in the FIFO_Queue */
void push_back(T *payload)
{
......@@ -129,7 +129,7 @@ template <class T> class Queue
if (first == NULL)
{
DBUG_PRINT("warning", ("tried to pop nonexistent item from Queue"));
DBUG_PRINT("warning", ("tried to pop nonexistent item from FIFO_Queue"));
return NULL;
}
......@@ -228,7 +228,7 @@ class QUERY_PROFILE
double m_start_time_usecs;
double m_end_time_usecs;
ulong m_seq_counter;
Queue<PROF_MEASUREMENT> entries;
FIFO_Queue<PROF_MEASUREMENT> entries;
QUERY_PROFILE(PROFILING *profiling_arg, const char *status_arg);
......@@ -268,7 +268,7 @@ class PROFILING
QUERY_PROFILE *current;
QUERY_PROFILE *last;
Queue<QUERY_PROFILE> history;
FIFO_Queue<QUERY_PROFILE> history;
query_id_t next_profile_id() { return(profile_id_counter++); }
......
/* Copyright (c) 2024, MariaDB plc
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
#ifndef QUEUE_INCLUDED
#define QUEUE_INCLUDED
#include "queues.h"
/**
A typesafe wrapper of QUEUE, a priority heap
*/
template<typename Element, typename Key, typename Param=void>
class Queue
{
public:
typedef int (*Queue_compare)(Param *, Key *, Key *);
Queue() { m_queue.root= 0; }
~Queue() { delete_queue(&m_queue); }
int init(uint max_elements, uint offset_to_key, bool max_at_top,
Queue_compare compare, Param *param= 0)
{
return init_queue(&m_queue, max_elements, offset_to_key, max_at_top,
(queue_compare)compare, param, 0, 0);
}
size_t elements() const { return m_queue.elements; }
bool is_inited() const { return is_queue_inited(&m_queue); }
bool is_full() const { return queue_is_full(&m_queue); }
bool is_empty() const { return elements() == 0; }
Element *top() const { return (Element*)queue_top(&m_queue); }
void push(Element *element) { queue_insert(&m_queue, (uchar*)element); }
Element *pop() { return (Element *)queue_remove_top(&m_queue); }
void clear() { queue_remove_all(&m_queue); }
void propagate_top() { queue_replace_top(&m_queue); }
void replace_top(Element *element)
{
queue_top(&m_queue)= (uchar*)element;
propagate_top();
}
private:
QUEUE m_queue;
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment