Commit df4cd174 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3436], merge handlerton piece of bulk fetch to main line handlerton

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@33944 c7de825b-a66e-492c-adef-691d508d4ae1
parent b67d0700
...@@ -323,6 +323,13 @@ typedef struct smart_dbt_info { ...@@ -323,6 +323,13 @@ typedef struct smart_dbt_info {
uint keynr; // index into share->key_file that represents DB we are currently operating on uint keynr; // index into share->key_file that represents DB we are currently operating on
} *SMART_DBT_INFO; } *SMART_DBT_INFO;
typedef struct smart_dbt_bf_info {
ha_tokudb* ha;
bool need_val;
int direction;
THD* thd;
} *SMART_DBT_BF_INFO;
typedef struct index_read_info { typedef struct index_read_info {
struct smart_dbt_info smart_dbt_info; struct smart_dbt_info smart_dbt_info;
int cmp; int cmp;
...@@ -1239,6 +1246,13 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1239,6 +1246,13 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
num_DBs_locked_in_bulk = false; num_DBs_locked_in_bulk = false;
lock_count = 0; lock_count = 0;
use_write_locks = false; use_write_locks = false;
range_query_buff = NULL;
size_range_query_buff = 0;
bytes_used_in_range_query_buff = 0;
curr_range_query_buff_offset = 0;
doing_bulk_fetch = false;
prelocked_left_range_size = 0;
prelocked_right_range_size = 0;
} }
// //
...@@ -1730,6 +1744,7 @@ exit: ...@@ -1730,6 +1744,7 @@ exit:
// //
int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_ENTER("ha_tokudb::open %p %s", this, name); TOKUDB_DBUG_ENTER("ha_tokudb::open %p %s", this, name);
THD* thd = ha_thd();
int error = 0; int error = 0;
int ret_val = 0; int ret_val = 0;
...@@ -1761,6 +1776,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1761,6 +1776,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
&key_buff, max_key_length, &key_buff, max_key_length,
&key_buff2, max_key_length, &key_buff2, max_key_length,
&key_buff3, max_key_length, &key_buff3, max_key_length,
&prelocked_left_range, max_key_length,
&prelocked_right_range, max_key_length,
&primary_key_buff, (hidden_primary_key ? 0 : max_key_length), &primary_key_buff, (hidden_primary_key ? 0 : max_key_length),
&fixed_cols_for_query, table_share->fields*sizeof(u_int32_t), &fixed_cols_for_query, table_share->fields*sizeof(u_int32_t),
&var_cols_for_query, table_share->fields*sizeof(u_int32_t), &var_cols_for_query, table_share->fields*sizeof(u_int32_t),
...@@ -1771,6 +1788,13 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1771,6 +1788,13 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
goto exit; goto exit;
} }
size_range_query_buff = get_tokudb_read_buf_size(thd);
range_query_buff = (uchar *)my_malloc(size_range_query_buff, MYF(MY_WME));
if (range_query_buff == NULL) {
ret_val = 1;
goto exit;
}
alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields; alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields;
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME)); rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
if (rec_buff == NULL) { if (rec_buff == NULL) {
...@@ -1841,6 +1865,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1841,6 +1865,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
exit: exit:
if (ret_val) { if (ret_val) {
my_free(range_query_buff, MYF(MY_ALLOW_ZERO_PTR));
range_query_buff = NULL;
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR)); my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
alloc_ptr = NULL; alloc_ptr = NULL;
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
...@@ -2132,6 +2158,7 @@ int ha_tokudb::__close(int mutex_is_locked) { ...@@ -2132,6 +2158,7 @@ int ha_tokudb::__close(int mutex_is_locked) {
my_free(rec_update_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(rec_update_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(blob_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(blob_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR)); my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
my_free(range_query_buff, MYF(MY_ALLOW_ZERO_PTR));
for (u_int32_t i = 0; i < sizeof(mult_key_buff)/sizeof(mult_key_buff[0]); i++) { for (u_int32_t i = 0; i < sizeof(mult_key_buff)/sizeof(mult_key_buff[0]); i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR)); my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
} }
...@@ -4257,16 +4284,8 @@ void ha_tokudb::column_bitmaps_signal() { ...@@ -4257,16 +4284,8 @@ void ha_tokudb::column_bitmaps_signal() {
// //
int ha_tokudb::prepare_index_scan() { int ha_tokudb::prepare_index_scan() {
int error = 0; int error = 0;
DB* db = share->key_file[active_index];
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
lockretryN(read_lock_wait_time){ error = prelock_range(NULL, NULL);
error = cursor->c_pre_acquire_range_lock(
cursor,
db->dbt_neg_infty(),
db->dbt_pos_infty()
);
lockretry_wait;
}
if (error) { last_cursor_error = error; goto cleanup; } if (error) { last_cursor_error = error; goto cleanup; }
range_lock_grabbed = true; range_lock_grabbed = true;
...@@ -4286,9 +4305,12 @@ cleanup: ...@@ -4286,9 +4305,12 @@ cleanup:
int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
int error = 0; int error = 0;
DBT start_key, end_key; DBT start_key, end_key;
THD* thd = ha_thd();
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF); pack_key(&start_key, active_index, prelocked_left_range, key, key_len, COL_NEG_INF);
pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF); prelocked_left_range_size = start_key.size;
pack_key(&end_key, active_index, prelocked_right_range, key, key_len, COL_POS_INF);
prelocked_right_range_size = end_key.size;
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_range_lock( error = cursor->c_pre_acquire_range_lock(
...@@ -4303,6 +4325,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { ...@@ -4303,6 +4325,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
} }
range_lock_grabbed = true; range_lock_grabbed = true;
doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT);
error = 0; error = 0;
cleanup: cleanup:
if (error) { if (error) {
...@@ -4319,6 +4342,10 @@ cleanup: ...@@ -4319,6 +4342,10 @@ cleanup:
return error; return error;
} }
void ha_tokudb::invalidate_bulk_fetch() {
bytes_used_in_range_query_buff= 0;
curr_range_query_buff_offset = 0;
}
// //
...@@ -4377,6 +4404,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -4377,6 +4404,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
else { else {
unpack_entire_row = true; unpack_entire_row = true;
} }
invalidate_bulk_fetch();
doing_bulk_fetch = false;
error = 0; error = 0;
exit: exit:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -4405,6 +4434,10 @@ int ha_tokudb::index_end() { ...@@ -4405,6 +4434,10 @@ int ha_tokudb::index_end() {
read_key = true; read_key = true;
num_fixed_cols_for_query = 0; num_fixed_cols_for_query = 0;
num_var_cols_for_query = 0; num_var_cols_for_query = 0;
invalidate_bulk_fetch();
doing_bulk_fetch = false;
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(0);
} }
...@@ -4588,53 +4621,30 @@ int ha_tokudb::read_full_row(uchar * buf) { ...@@ -4588,53 +4621,30 @@ int ha_tokudb::read_full_row(uchar * buf) {
// error otherwise // error otherwise
// //
int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next_same %p", this); TOKUDB_DBUG_ENTER("ha_tokudb::index_next_same");
int error = 0; statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
struct smart_dbt_info info;
DBT curr_key; DBT curr_key;
DBT found_key; DBT found_key;
bool has_null; bool has_null;
int cmp; int cmp;
u_int32_t flags; int error = get_next(buf, 1);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
info.ha = this;
info.buf = buf;
info.keynr = active_index;
pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO);
flags = SET_PRELOCK_FLAG(0);
lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index);
if (error) {
goto cleanup;
}
if (!key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) {
error = read_full_row(buf);
if (error) { if (error) {
goto cleanup; goto cleanup;
} }
}
// //
// now do the comparison // now do the comparison
// //
pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO);
create_dbt_key_from_table(&found_key,active_index,key_buff3,buf,&has_null); create_dbt_key_from_table(&found_key,active_index,key_buff3,buf,&has_null);
cmp = tokudb_prefix_cmp_dbt_key(share->key_file[active_index], &curr_key, &found_key); cmp = tokudb_prefix_cmp_dbt_key(share->key_file[active_index], &curr_key, &found_key);
if (cmp) { if (cmp) {
error = HA_ERR_END_OF_FILE; error = HA_ERR_END_OF_FILE;
} }
trx->stmt_progress.queried++;
track_progress(thd); cleanup:
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
// //
...@@ -4656,6 +4666,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -4656,6 +4666,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
// //
int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) { int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read %p find %d", this, find_flag); TOKUDB_DBUG_ENTER("ha_tokudb::index_read %p find %d", this, find_flag);
invalidate_bulk_fetch();
// TOKUDB_DBUG_DUMP("key=", key, key_len); // TOKUDB_DBUG_DUMP("key=", key, key_len);
DBT row; DBT row;
DBT lookup_key; DBT lookup_key;
...@@ -4768,27 +4779,358 @@ cleanup: ...@@ -4768,27 +4779,358 @@ cleanup:
} }
int ha_tokudb::read_data_from_range_query_buff(uchar* buf, bool need_val) {
// buffer has the next row, get it from there
int error;
uchar* curr_pos = range_query_buff+curr_range_query_buff_offset;
DBT curr_key;
bzero((void *) &curr_key, sizeof(curr_key));
// // get key info
// Reads the next row from the active index (cursor) into buf, and advances cursor u_int32_t key_size = *(u_int32_t *)curr_pos;
// Parameters: curr_pos += sizeof(key_size);
// [out] buf - buffer for the next row, in MySQL format uchar* curr_key_buff = curr_pos;
// Returns: curr_pos += key_size;
// 0 on success
// HA_ERR_END_OF_FILE if not found curr_key.data = curr_key_buff;
// error otherwise curr_key.size = key_size;
//
int ha_tokudb::index_next(uchar * buf) { // if this is a covering index, this is all we need
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); if (this->key_read) {
assert(!need_val);
read_key_only(buf, active_index, &curr_key);
error = 0;
}
// we need to get more data
else {
DBT curr_val;
bzero((void *) &curr_val, sizeof(curr_val));
uchar* curr_val_buff = NULL;
u_int32_t val_size = 0;
// in this case, we don't have a val, we are simply extracting the pk
if (!need_val) {
curr_val.data = curr_val_buff;
curr_val.size = val_size;
extract_hidden_primary_key(active_index, &curr_key);
error = read_primary_key( buf, active_index, &curr_val, &curr_key);
}
else {
// need to extract a val and place it into buf
if (unpack_entire_row) {
// get val info
val_size = *(u_int32_t *)curr_pos;
curr_pos += sizeof(val_size);
curr_val_buff = curr_pos;
curr_pos += val_size;
curr_val.data = curr_val_buff;
curr_val.size = val_size;
extract_hidden_primary_key(active_index, &curr_key);
error = unpack_row(buf,&curr_val, &curr_key, active_index);
}
else {
if (!(hidden_primary_key && active_index == primary_key)) {
unpack_key(buf,&curr_key,active_index);
}
// read rows we care about
// first the null bytes;
memcpy(buf, curr_pos, table_share->null_bytes);
curr_pos += table_share->null_bytes;
// now the fixed sized rows
for (u_int32_t i = 0; i < num_fixed_cols_for_query; i++) {
uint field_index = fixed_cols_for_query[i];
Field* field = table->field[field_index];
unpack_fixed_field(
buf + field_offset(field, table),
curr_pos,
share->kc_info.field_lengths[field_index]
);
curr_pos += share->kc_info.field_lengths[field_index];
}
// now the variable sized rows
for (u_int32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
Field* field = table->field[field_index];
u_int32_t field_len = *(u_int32_t *)curr_pos;
curr_pos += sizeof(field_len);
unpack_var_field(
buf + field_offset(field, table),
curr_pos,
field_len,
share->kc_info.length_bytes[field_index]
);
curr_pos += field_len;
}
// now the blobs
if (read_blobs) {
u_int32_t blob_size = *(u_int32_t *)curr_pos;
curr_pos += sizeof(blob_size);
error = unpack_blobs(
buf,
curr_pos,
blob_size,
true
);
curr_pos += blob_size;
if (error) {
invalidate_bulk_fetch();
goto exit;
}
}
error = 0;
}
}
}
curr_range_query_buff_offset = curr_pos - range_query_buff;
exit:
return error;
}
static int
smart_dbt_bf_callback(DBT const *key, DBT const *row, void *context) {
SMART_DBT_BF_INFO info = (SMART_DBT_BF_INFO)context;
return info->ha->fill_range_query_buf(info->need_val, key, row, info->direction, info->thd);
}
// fill in the range query buf for bulk fetch
int ha_tokudb::fill_range_query_buf(
bool need_val,
DBT const *key,
DBT const *row,
int direction,
THD* thd
) {
int error;
//
// first put the value into range_query_buf
//
u_int32_t size_remaining = size_range_query_buff - bytes_used_in_range_query_buff;
u_int32_t size_needed;
u_int32_t user_defined_size = get_tokudb_read_buf_size(thd);
uchar* curr_pos = NULL;
if (need_val) {
if (unpack_entire_row) {
size_needed = 2*sizeof(u_int32_t) + key->size + row->size;
}
else {
// this is an upper bound
size_needed = sizeof(u_int32_t) + // size of key length
key->size + row->size + //key and row
num_var_cols_for_query*(sizeof(u_int32_t)) + //lengths of varchars stored
sizeof(u_int32_t); //length of blobs
}
}
else {
size_needed = sizeof(u_int32_t) + key->size;
}
if (size_remaining < size_needed) {
range_query_buff = (uchar *)my_realloc(
(void *)range_query_buff,
bytes_used_in_range_query_buff+size_needed,
MYF(MY_WME)
);
if (range_query_buff == NULL) {
error = ENOMEM;
goto cleanup;
}
}
//
// now we know we have the size, let's fill the buffer, starting with the key
//
curr_pos = range_query_buff + bytes_used_in_range_query_buff;
*(u_int32_t *)curr_pos = key->size;
curr_pos += sizeof(u_int32_t);
memcpy(curr_pos, key->data, key->size);
curr_pos += key->size;
if (need_val) {
if (unpack_entire_row) {
*(u_int32_t *)curr_pos = row->size;
curr_pos += sizeof(u_int32_t);
memcpy(curr_pos, row->data, row->size);
curr_pos += row->size;
}
else {
// need to unpack just the data we care about
const uchar* fixed_field_ptr = (const uchar *) row->data;
fixed_field_ptr += table_share->null_bytes;
const uchar* var_field_offset_ptr = NULL;
const uchar* var_field_data_ptr = NULL;
var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[active_index].fixed_field_size;
var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[active_index].len_of_offsets;
// first the null bytes
memcpy(curr_pos, row->data, table_share->null_bytes);
curr_pos += table_share->null_bytes;
// now the fixed fields
//
// first the fixed fields
//
for (u_int32_t i = 0; i < num_fixed_cols_for_query; i++) {
uint field_index = fixed_cols_for_query[i];
memcpy(
curr_pos,
fixed_field_ptr + share->kc_info.cp_info[active_index][field_index].col_pack_val,
share->kc_info.field_lengths[field_index]
);
curr_pos += share->kc_info.field_lengths[field_index];
}
//
// now the var fields
//
for (u_int32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
u_int32_t var_field_index = share->kc_info.cp_info[active_index][field_index].col_pack_val;
u_int32_t data_start_offset;
u_int32_t field_len;
get_var_field_info(
&field_len,
&data_start_offset,
var_field_index,
var_field_offset_ptr,
share->kc_info.num_offset_bytes
);
memcpy(curr_pos, &field_len, sizeof(field_len));
curr_pos += sizeof(field_len);
memcpy(curr_pos, var_field_data_ptr + data_start_offset, field_len);
curr_pos += field_len;
}
if (read_blobs) {
u_int32_t blob_offset = 0;
u_int32_t data_size = 0;
//
// now the blobs
//
get_blob_field_info(
&blob_offset,
share->kc_info.mcp_info[active_index].len_of_offsets,
var_field_data_ptr,
share->kc_info.num_offset_bytes
);
data_size = row->size - (u_int32_t)(var_field_data_ptr - (const uchar *)row->data);
memcpy(curr_pos, &data_size, sizeof(data_size));
curr_pos += sizeof(data_size);
memcpy(curr_pos, var_field_data_ptr + blob_offset, data_size);
curr_pos += data_size;
}
}
}
bytes_used_in_range_query_buff = curr_pos - range_query_buff;
assert(bytes_used_in_range_query_buff <= size_range_query_buff);
//
// now determine if we should continue with the bulk fetch
// we want to stop under these conditions:
// - we overran the prelocked range
// - we are close to the end of the buffer
//
if (bytes_used_in_range_query_buff + table_share->rec_buff_length > user_defined_size) {
error = 0;
goto cleanup;
}
if (direction > 0) {
// compare what we got to the right endpoint of prelocked range
// because we are searching keys in ascending order
if (prelocked_right_range_size == 0) {
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
DBT right_range;
bzero(&right_range, sizeof(right_range));
right_range.size = prelocked_right_range_size;
right_range.data = prelocked_right_range;
int cmp = tokudb_cmp_dbt_key(
share->key_file[active_index],
key,
&right_range
);
error = (cmp > 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
}
else {
// compare what we got to the left endpoint of prelocked range
// because we are searching keys in descending order
if (prelocked_left_range_size == 0) {
error = TOKUDB_CURSOR_CONTINUE;
goto cleanup;
}
DBT left_range;
bzero(&left_range, sizeof(left_range));
left_range.size = prelocked_left_range_size;
left_range.data = prelocked_left_range;
int cmp = tokudb_cmp_dbt_key(
share->key_file[active_index],
key,
&left_range
);
error = (cmp < 0) ? 0 : TOKUDB_CURSOR_CONTINUE;
}
cleanup:
return error;
}
int ha_tokudb::get_next(uchar* buf, int direction) {
int error = 0; int error = 0;
struct smart_dbt_info info;
u_int32_t flags = SET_PRELOCK_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
bool need_val;
HANDLE_INVALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); // we need to read the val of what we retrieve if
// we do NOT have a covering index AND we are using a clustering secondary
// key
need_val = (this->key_read == 0) &&
(active_index == primary_key ||
table->key_info[active_index].flags & HA_CLUSTERING
);
if ((bytes_used_in_range_query_buff - curr_range_query_buff_offset) > 0) {
error = read_data_from_range_query_buff(buf, need_val);
}
else {
invalidate_bulk_fetch();
if (doing_bulk_fetch) {
struct smart_dbt_bf_info bf_info;
bf_info.ha = this;
// you need the val if you have a clustering index and key_read is not 0;
bf_info.direction = direction;
bf_info.thd = ha_thd();
bf_info.need_val = need_val;
//
// call c_getf_next with purpose of filling in range_query_buff
//
if (direction > 0) {
lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, smart_dbt_bf_callback, &bf_info);
lockretry_wait;
}
}
else {
lockretryN(read_lock_wait_time){
error = cursor->c_getf_prev(cursor, flags, smart_dbt_bf_callback, &bf_info);
lockretry_wait;
}
}
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index);
if (error) { goto cleanup; }
//
// now that range_query_buff is filled, read an element
//
error = read_data_from_range_query_buff(buf, need_val);
}
else {
struct smart_dbt_info info;
info.ha = this; info.ha = this;
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
...@@ -4798,16 +5140,41 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -4798,16 +5140,41 @@ int ha_tokudb::index_next(uchar * buf) {
lockretry_wait; lockretry_wait;
} }
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index); error = handle_cursor_error(error, HA_ERR_END_OF_FILE,active_index);
}
}
// //
// still need to get entire contents of the row if operation done on // at this point, one of two things has happened
// secondary DB and it was NOT a covering index // either we have unpacked the data into buf, and we
// are done, or we have unpacked the primary key
// into last_key, and we use the code below to
// read the full row by doing a point query into the
// main table.
// //
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) { if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf); error = read_full_row(buf);
} }
trx->stmt_progress.queried++; trx->stmt_progress.queried++;
track_progress(thd); track_progress(thd);
cleanup: cleanup:
return error;
}
//
// Reads the next row from the active index (cursor) into buf, and advances cursor
// Parameters:
// [out] buf - buffer for the next row, in MySQL format
// Returns:
// 0 on success
// HA_ERR_END_OF_FILE if not found
// error otherwise
//
int ha_tokudb::index_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
int error = get_next(buf, 1);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -4827,37 +5194,9 @@ int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) { ...@@ -4827,37 +5194,9 @@ int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) {
// error otherwise // error otherwise
// //
int ha_tokudb::index_prev(uchar * buf) { int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); TOKUDB_DBUG_ENTER("ha_tokudb::index_prev");
int error = 0;
struct smart_dbt_info info;
u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
int error = get_next(buf, -1);
info.ha = this;
info.buf = buf;
info.keynr = active_index;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
error = handle_cursor_error(error,HA_ERR_END_OF_FILE,active_index);
//
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -4872,6 +5211,7 @@ cleanup: ...@@ -4872,6 +5211,7 @@ cleanup:
// //
int ha_tokudb::index_first(uchar * buf) { int ha_tokudb::index_first(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_first"); TOKUDB_DBUG_ENTER("ha_tokudb::index_first");
invalidate_bulk_fetch();
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_PRELOCK_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
...@@ -4916,6 +5256,7 @@ cleanup: ...@@ -4916,6 +5256,7 @@ cleanup:
// //
int ha_tokudb::index_last(uchar * buf) { int ha_tokudb::index_last(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_last"); TOKUDB_DBUG_ENTER("ha_tokudb::index_last");
invalidate_bulk_fetch();
int error = 0; int error = 0;
struct smart_dbt_info info; struct smart_dbt_info info;
u_int32_t flags = SET_PRELOCK_FLAG(0); u_int32_t flags = SET_PRELOCK_FLAG(0);
...@@ -4967,11 +5308,7 @@ int ha_tokudb::rnd_init(bool scan) { ...@@ -4967,11 +5308,7 @@ int ha_tokudb::rnd_init(bool scan) {
if (error) { goto cleanup;} if (error) { goto cleanup;}
if (scan) { if (scan) {
DB* db = share->key_file[primary_key]; error = prelock_range(NULL, NULL);
lockretryN(read_lock_wait_time){
error = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty());
lockretry_wait;
}
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
// //
...@@ -5011,33 +5348,8 @@ int ha_tokudb::rnd_end() { ...@@ -5011,33 +5348,8 @@ int ha_tokudb::rnd_end() {
// //
int ha_tokudb::rnd_next(uchar * buf) { int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error = 0;
u_int32_t flags = SET_PRELOCK_FLAG(0);
THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
struct smart_dbt_info info;
HANDLE_INVALID_CURSOR();
//
// The reason we do not just call index_next is that index_next
// increments a different variable than we do here
//
statistic_increment(table->in_use->status_var.ha_read_rnd_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_next_count, &LOCK_status);
int error = get_next(buf, 1);
info.ha = this;
info.buf = buf;
info.keynr = primary_key;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
error = handle_cursor_error(error, HA_ERR_END_OF_FILE,primary_key);
trx->stmt_progress.queried++;
track_progress(thd);
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -5139,12 +5451,13 @@ cleanup: ...@@ -5139,12 +5451,13 @@ cleanup:
int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) { int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::prelock_range"); TOKUDB_DBUG_ENTER("ha_tokudb::prelock_range");
THD* thd = ha_thd();
int error = 0; int error = 0;
DBT start_dbt_key; DBT start_dbt_key;
DBT end_dbt_key; DBT end_dbt_key;
uchar* start_key_buff = key_buff2; uchar* start_key_buff = prelocked_left_range;
uchar* end_key_buff = key_buff3; uchar* end_key_buff = prelocked_right_range;
bzero((void *) &start_dbt_key, sizeof(start_dbt_key)); bzero((void *) &start_dbt_key, sizeof(start_dbt_key));
bzero((void *) &end_dbt_key, sizeof(end_dbt_key)); bzero((void *) &end_dbt_key, sizeof(end_dbt_key));
...@@ -5159,6 +5472,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5159,6 +5472,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
pack_key(&start_dbt_key, active_index, start_key_buff, start_key->key, start_key->length, COL_NEG_INF); pack_key(&start_dbt_key, active_index, start_key_buff, start_key->key, start_key->length, COL_NEG_INF);
break; break;
} }
prelocked_left_range_size = start_dbt_key.size;
}
else {
prelocked_left_range_size = 0;
} }
if (end_key) { if (end_key) {
...@@ -5170,6 +5487,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5170,6 +5487,10 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_POS_INF); pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_POS_INF);
break; break;
} }
prelocked_right_range_size = end_dbt_key.size;
}
else {
prelocked_right_range_size = 0;
} }
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
...@@ -5193,6 +5514,12 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -5193,6 +5514,12 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
goto cleanup; goto cleanup;
} }
//
// at this point, determine if we will be doing bulk fetch
// as of now, only do it if we are doing a select
//
doing_bulk_fetch = (thd_sql_command(thd) == SQLCOM_SELECT);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
...@@ -173,6 +173,13 @@ private: ...@@ -173,6 +173,13 @@ private:
uchar *rec_update_buff; uchar *rec_update_buff;
ulong alloced_update_rec_buff_length; ulong alloced_update_rec_buff_length;
u_int32_t max_key_length; u_int32_t max_key_length;
uchar* range_query_buff; // range query buffer
u_int32_t size_range_query_buff; // size of the allocated range query buffer
u_int32_t bytes_used_in_range_query_buff; // number of bytes used in the range query buffer
u_int32_t curr_range_query_buff_offset; // current offset into the range query buffer for queries to read
bool doing_bulk_fetch;
// //
// buffer used to temporarily store a "packed key" // buffer used to temporarily store a "packed key"
// data pointer of a DBT will end up pointing to this // data pointer of a DBT will end up pointing to this
...@@ -195,6 +202,15 @@ private: ...@@ -195,6 +202,15 @@ private:
// //
uchar *primary_key_buff; uchar *primary_key_buff;
//
// ranges of prelocked area, used to know how much to bulk fetch
//
uchar *prelocked_left_range;
u_int32_t prelocked_left_range_size;
uchar *prelocked_right_range;
u_int32_t prelocked_right_range_size;
// //
// individual key buffer for each index // individual key buffer for each index
// //
...@@ -609,10 +625,21 @@ public: ...@@ -609,10 +625,21 @@ public:
int check(THD *thd, HA_CHECK_OPT *check_opt); int check(THD *thd, HA_CHECK_OPT *check_opt);
int fill_range_query_buf(
bool need_val,
DBT const *key,
DBT const *row,
int direction,
THD* thd
);
private: private:
int read_full_row(uchar * buf); int read_full_row(uchar * buf);
int __close(int mutex_is_locked); int __close(int mutex_is_locked);
int read_last(uint keynr); int read_last(uint keynr);
int get_next(uchar* buf, int direction);
int read_data_from_range_query_buff(uchar* buf, bool need_val);
void invalidate_bulk_fetch();
}; };
int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn); int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);
......
...@@ -139,6 +139,18 @@ static MYSQL_THDVAR_UINT(read_block_size, ...@@ -139,6 +139,18 @@ static MYSQL_THDVAR_UINT(read_block_size,
1 // blocksize??? 1 // blocksize???
); );
static MYSQL_THDVAR_UINT(read_buf_size,
0,
"fractal tree read block size",
NULL,
NULL,
128*1024, // default
4096, // min
1*1024*1024, // max
1 // blocksize???
);
void tokudb_checkpoint_lock(THD * thd); void tokudb_checkpoint_lock(THD * thd);
void tokudb_checkpoint_unlock(THD * thd); void tokudb_checkpoint_unlock(THD * thd);
...@@ -609,6 +621,10 @@ uint get_tokudb_read_block_size(THD* thd) { ...@@ -609,6 +621,10 @@ uint get_tokudb_read_block_size(THD* thd) {
return THDVAR(thd, read_block_size); return THDVAR(thd, read_block_size);
} }
uint get_tokudb_read_buf_size(THD* thd) {
return THDVAR(thd, read_buf_size);
}
typedef struct txn_progress_info { typedef struct txn_progress_info {
char status[200]; char status[200];
THD* thd; THD* thd;
...@@ -1576,6 +1592,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1576,6 +1592,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(tmp_dir), MYSQL_SYSVAR(tmp_dir),
MYSQL_SYSVAR(block_size), MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(read_block_size), MYSQL_SYSVAR(read_block_size),
MYSQL_SYSVAR(read_buf_size),
NULL NULL
}; };
......
...@@ -20,6 +20,7 @@ bool get_create_index_online(THD* thd); ...@@ -20,6 +20,7 @@ bool get_create_index_online(THD* thd);
bool get_prelock_empty(THD* thd); bool get_prelock_empty(THD* thd);
uint get_tokudb_block_size(THD* thd); uint get_tokudb_block_size(THD* thd);
uint get_tokudb_read_block_size(THD* thd); uint get_tokudb_read_block_size(THD* thd);
uint get_tokudb_read_buf_size(THD* thd);
extern HASH tokudb_open_tables; extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex; extern pthread_mutex_t tokudb_mutex;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment