Commit 44b74074 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5104, merge ICP for MariaDB to main

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@53424 c7de825b-a66e-492c-adef-691d508d4ae1
parent 388450f6
...@@ -300,6 +300,9 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const { ...@@ -300,6 +300,9 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
TOKUDB_DBUG_ENTER("ha_tokudb::index_flags"); TOKUDB_DBUG_ENTER("ha_tokudb::index_flags");
assert(table_share); assert(table_share);
ulong flags = (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | HA_KEYREAD_ONLY | HA_READ_RANGE); ulong flags = (HA_READ_NEXT | HA_READ_PREV | HA_READ_ORDER | HA_KEYREAD_ONLY | HA_READ_RANGE);
#ifdef MARIADB_BASE_VERSION
flags |= HA_DO_INDEX_COND_PUSHDOWN;
#endif
if (table_share->key_info[idx].flags & HA_CLUSTERING) { if (table_share->key_info[idx].flags & HA_CLUSTERING) {
flags |= HA_CLUSTERED_INDEX; flags |= HA_CLUSTERED_INDEX;
} }
...@@ -322,6 +325,7 @@ typedef struct smart_dbt_bf_info { ...@@ -322,6 +325,7 @@ typedef struct smart_dbt_bf_info {
bool need_val; bool need_val;
int direction; int direction;
THD* thd; THD* thd;
uchar* buf;
} *SMART_DBT_BF_INFO; } *SMART_DBT_BF_INFO;
typedef struct index_read_info { typedef struct index_read_info {
...@@ -1191,7 +1195,6 @@ static int generate_row_for_put( ...@@ -1191,7 +1195,6 @@ static int generate_row_for_put(
); );
} }
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg) ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg)
// flags defined in sql\handler.h // flags defined in sql\handler.h
{ {
...@@ -1243,6 +1246,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1243,6 +1246,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
prelocked_left_range_size = 0; prelocked_left_range_size = 0;
prelocked_right_range_size = 0; prelocked_right_range_size = 0;
tokudb_active_index = MAX_KEY; tokudb_active_index = MAX_KEY;
invalidate_icp();
} }
ha_tokudb::~ha_tokudb() { ha_tokudb::~ha_tokudb() {
...@@ -4355,6 +4359,12 @@ void ha_tokudb::invalidate_bulk_fetch() { ...@@ -4355,6 +4359,12 @@ void ha_tokudb::invalidate_bulk_fetch() {
curr_range_query_buff_offset = 0; curr_range_query_buff_offset = 0;
} }
void ha_tokudb::invalidate_icp() {
toku_pushed_idx_cond = NULL;
toku_pushed_idx_cond_keyno = MAX_KEY;
icp_went_out_of_range = false;
}
volatile int ha_tokudb_index_init_wait = 0; // debug volatile int ha_tokudb_index_init_wait = 0; // debug
// //
...@@ -4458,6 +4468,7 @@ int ha_tokudb::index_end() { ...@@ -4458,6 +4468,7 @@ int ha_tokudb::index_end() {
num_var_cols_for_query = 0; num_var_cols_for_query = 0;
invalidate_bulk_fetch(); invalidate_bulk_fetch();
invalidate_icp();
doing_bulk_fetch = false; doing_bulk_fetch = false;
close_dsmrr(); close_dsmrr();
...@@ -4900,16 +4911,29 @@ int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val) { ...@@ -4900,16 +4911,29 @@ int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val) {
static int static int
smart_dbt_bf_callback(DBT const *key, DBT const *row, void *context) { smart_dbt_bf_callback(DBT const *key, DBT const *row, void *context) {
SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context; SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context;
return info->ha->fill_range_query_buf(info->need_val, key, row, info->direction, info->thd); return info->ha->fill_range_query_buf(info->need_val, key, row, info->direction, info->thd, info->buf);
} }
#ifdef MARIADB_BASE_VERSION
enum icp_result ha_tokudb::toku_handler_index_cond_check(Item* pushed_idx_cond)
{
enum icp_result res;
if (end_range && compare_key2(end_range) > 0) {
return ICP_OUT_OF_RANGE;
}
res = pushed_idx_cond->val_int() ? ICP_MATCH : ICP_NO_MATCH;
return res;
}
#endif
// fill in the range query buf for bulk fetch // fill in the range query buf for bulk fetch
int ha_tokudb::fill_range_query_buf( int ha_tokudb::fill_range_query_buf(
bool need_val, bool need_val,
DBT const *key, DBT const *key,
DBT const *row, DBT const *row,
int direction, int direction,
THD* thd THD* thd,
uchar* buf
) { ) {
int error; int error;
// //
...@@ -4919,6 +4943,32 @@ int ha_tokudb::fill_range_query_buf( ...@@ -4919,6 +4943,32 @@ int ha_tokudb::fill_range_query_buf(
uint32_t size_needed; uint32_t size_needed;
uint32_t user_defined_size = get_tokudb_read_buf_size(thd); uint32_t user_defined_size = get_tokudb_read_buf_size(thd);
uchar* curr_pos = NULL; uchar* curr_pos = NULL;
#ifdef MARIADB_BASE_VERSION
// if we have an index condition pushed down, we check it
if (toku_pushed_idx_cond && (tokudb_active_index == toku_pushed_idx_cond_keyno)) {
unpack_key(buf, key, tokudb_active_index);
enum icp_result result = toku_handler_index_cond_check(toku_pushed_idx_cond);
// If we have reason to stop, we set icp_went_out_of_range and get out
if (result == ICP_ABORTED_BY_USER || result == ICP_OUT_OF_RANGE || thd_killed(thd)) {
icp_went_out_of_range = true;
error = 0;
goto cleanup;
}
// otherwise, if we simply see that the current key is no match,
// we tell the cursor to continue and don't store
// the key locally
else if (result == ICP_NO_MATCH) {
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
}
#endif
// at this point, if ICP is on, we have verified that the key is one
// we are interested in, so we proceed with placing the data
// into the range query buffer
if (need_val) { if (need_val) {
if (unpack_entire_row) { if (unpack_entire_row) {
size_needed = 2*sizeof(uint32_t) + key->size + row->size; size_needed = 2*sizeof(uint32_t) + key->size + row->size;
...@@ -5122,6 +5172,9 @@ int ha_tokudb::get_next(uchar* buf, int direction) { ...@@ -5122,6 +5172,9 @@ int ha_tokudb::get_next(uchar* buf, int direction) {
if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) { if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) {
error = read_data_from_range_query_buff(buf, need_val); error = read_data_from_range_query_buff(buf, need_val);
} }
else if (icp_went_out_of_range) {
error = HA_ERR_END_OF_FILE;
}
else { else {
invalidate_bulk_fetch(); invalidate_bulk_fetch();
if (doing_bulk_fetch) { if (doing_bulk_fetch) {
...@@ -5131,14 +5184,28 @@ int ha_tokudb::get_next(uchar* buf, int direction) { ...@@ -5131,14 +5184,28 @@ int ha_tokudb::get_next(uchar* buf, int direction) {
bf_info.direction = direction; bf_info.direction = direction;
bf_info.thd = ha_thd(); bf_info.thd = ha_thd();
bf_info.need_val = need_val; bf_info.need_val = need_val;
bf_info.buf = buf;
// //
// call c_getf_next with purpose of filling in range_query_buff // call c_getf_next with purpose of filling in range_query_buff
// //
rows_fetched_using_bulk_fetch = 0; rows_fetched_using_bulk_fetch = 0;
if (direction > 0) { // it is expected that we can do ICP in the smart_dbt_bf_callback
error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info); // as a result, it's possible we don't return any data because
} else { // none of the rows matched the index condition. Therefore, we need
error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info); // this while loop. icp_out_of_range will be set if we hit a row that
// the index condition states is out of our range. When that hits,
// we know all the data in the buffer is the last data we will retrieve
while (bytes_used_in_range_query_buff == 0 && !icp_went_out_of_range && error == 0) {
if (direction > 0) {
error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info);
} else {
error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info);
}
}
// if there is no data set and we went out of range,
// then there is nothing to return
if (bytes_used_in_range_query_buff == 0 && icp_went_out_of_range) {
error = HA_ERR_END_OF_FILE;
} }
if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) { if (bulk_fetch_iteration < HA_TOKU_BULK_FETCH_ITERATION_MAX) {
bulk_fetch_iteration++; bulk_fetch_iteration++;
...@@ -5793,6 +5860,7 @@ int ha_tokudb::reset(void) { ...@@ -5793,6 +5860,7 @@ int ha_tokudb::reset(void) {
key_read = 0; key_read = 0;
using_ignore = 0; using_ignore = 0;
reset_dsmrr(); reset_dsmrr();
invalidate_icp();
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(0);
} }
...@@ -8086,6 +8154,18 @@ void ha_tokudb::reset_dsmrr() { ...@@ -8086,6 +8154,18 @@ void ha_tokudb::reset_dsmrr() {
#endif #endif
} }
// we cache the information so we can do filtering ourselves,
// but as far as MySQL knows, we are not doing any filtering,
// so if we happen to miss filtering a row that does not match
// idx_cond_arg, MySQL will catch it.
// This allows us the ability to deal with only index_next and index_prev,
// and not need to worry about other index_XXX functions
Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
toku_pushed_idx_cond_keyno = keyno_arg;
toku_pushed_idx_cond = idx_cond_arg;
return idx_cond_arg;
}
// table admin // table admin
#include "ha_tokudb_admin.cc" #include "ha_tokudb_admin.cc"
......
...@@ -134,6 +134,10 @@ class ha_tokudb : public handler { ...@@ -134,6 +134,10 @@ class ha_tokudb : public handler {
DsMrr_impl ds_mrr; DsMrr_impl ds_mrr;
#endif #endif
// For ICP. Cache our own copies
Item* toku_pushed_idx_cond;
uint toku_pushed_idx_cond_keyno; /* The index which the above condition is for */
bool icp_went_out_of_range;
// //
// last key returned by ha_tokudb's cursor // last key returned by ha_tokudb's cursor
...@@ -568,6 +572,10 @@ class ha_tokudb : public handler { ...@@ -568,6 +572,10 @@ class ha_tokudb : public handler {
#endif #endif
#endif #endif
// ICP introduced in MariaDB 5.5
Item* idx_cond_push(uint keyno, class Item* idx_cond);
#if TOKU_INCLUDE_ALTER_56 #if TOKU_INCLUDE_ALTER_56
public: public:
enum_alter_inplace_result check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_info *ha_alter_info); enum_alter_inplace_result check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
...@@ -698,7 +706,8 @@ class ha_tokudb : public handler { ...@@ -698,7 +706,8 @@ class ha_tokudb : public handler {
DBT const *key, DBT const *key,
DBT const *row, DBT const *row,
int direction, int direction,
THD* thd THD* thd,
uchar* buf
); );
#if MYSQL_VERSION_ID >= 50521 #if MYSQL_VERSION_ID >= 50521
...@@ -712,7 +721,9 @@ class ha_tokudb : public handler { ...@@ -712,7 +721,9 @@ class ha_tokudb : public handler {
int __close(int mutex_is_locked); int __close(int mutex_is_locked);
int get_next(uchar* buf, int direction); int get_next(uchar* buf, int direction);
int read_data_from_range_query_buff(uchar* buf, bool need_val); int read_data_from_range_query_buff(uchar* buf, bool need_val);
enum icp_result toku_handler_index_cond_check(Item* pushed_idx_cond);
void invalidate_bulk_fetch(); void invalidate_bulk_fetch();
void invalidate_icp();
int delete_all_rows_internal(); int delete_all_rows_internal();
void close_dsmrr(); void close_dsmrr();
void reset_dsmrr(); void reset_dsmrr();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment