Commit 54587c20 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1158

add 'infinity byte' to beginning of keys

git-svn-id: file:///svn/mysql/tokudb-engine/src@6162 c7de825b-a66e-492c-adef-691d508d4ae1
parent d601c582
......@@ -800,14 +800,23 @@ cleanup:
}
static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * saved_key, bool cmp_prefix) {
uchar *new_key_ptr = (uchar *) new_key->data;
uchar *saved_key_ptr = (uchar *) saved_key->data;
uchar new_key_inf_val = *(uchar *) new_key->data;
uchar saved_key_inf_val = *(uchar *) saved_key->data;
//
// first byte is "infinity" byte
//
uchar *new_key_ptr = (uchar *)(new_key->data) + 1;
uchar *saved_key_ptr = (uchar *)(saved_key->data) + 1;
KEY_PART_INFO *key_part = key->key_part, *end = key_part + key->key_parts;
uint key_length = new_key->size;
uint saved_key_length = saved_key->size;
int ret_val;
//
// do not include the inf val at the beginning
//
uint new_key_length = new_key->size - sizeof(uchar);
uint saved_key_length = saved_key->size - sizeof(uchar);
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) {
for (; key_part != end && (int) new_key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp;
uint new_key_field_length;
uint saved_key_field_length;
......@@ -817,22 +826,40 @@ static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * sa
if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
key_length--;
new_key_length--;
saved_key_length--;
if (!*new_key_ptr++) { continue; }
}
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert( key_length >= new_key_field_length);
assert(new_key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp;
new_key_ptr += new_key_field_length;
key_length -= new_key_field_length;
new_key_length -= new_key_field_length;
saved_key_ptr += saved_key_field_length;
saved_key_length -= saved_key_field_length;
}
return cmp_prefix ? 0 : key_length - saved_key_length;
if (cmp_prefix || (new_key_length == 0 && saved_key_length == 0) ) {
ret_val = 0;
}
//
// at this point, one SHOULD be 0
//
else if (new_key_length == 0 && saved_key_length > 0) {
ret_val = (new_key_inf_val == COL_POS_INF ) ? 1 : -1;
}
else if (new_key_length > 0 && saved_key_length == 0) {
ret_val = (saved_key_inf_val == COL_POS_INF ) ? -1 : 1;
}
//
// this should never happen, perhaps we should assert(false)
//
else {
ret_val = new_key_length - saved_key_length;
}
return ret_val;
}
static int tokudb_cmp_packed_key(DB *file, const DBT *keya, const DBT *keyb) {
......@@ -1158,7 +1185,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
key_used_on_scan = primary_key;
/* Need some extra memory in case of packed keys */
max_key_length = table_share->max_key_length + MAX_REF_PARTS * 3;
// the "+ 1" is for the first byte that states +/- infinity
max_key_length = table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar);
if (!(alloc_ptr =
my_multi_malloc(MYF(MY_WME),
&key_buff, max_key_length,
......@@ -1273,7 +1301,12 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
/* Calculate pack_length of primary key */
share->fixed_length_primary_key = 1;
if (!hidden_primary_key) {
ref_length = 0;
//
// I realize this is incredibly confusing, and refactoring should take
// care of this, but we need to set the ref_length to start at 1, to account for
// the "infinity byte" in keys.
//
ref_length = sizeof(uchar);
KEY_PART_INFO *key_part = table->key_info[primary_key].key_part;
KEY_PART_INFO *end = key_part + table->key_info[primary_key].key_parts;
for (; key_part != end; key_part++)
......@@ -1284,7 +1317,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
share->ref_length = ref_length;
error = get_status();
if (error) {
if (error || share->version < HA_TOKU_VERSION) {
__close(1);
TOKUDB_DBUG_RETURN(1);
}
......@@ -1761,7 +1794,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
KEY *key_info = table->key_info + index;
KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts;
uchar *pos = (uchar *) key->data;
uchar *pos = (uchar *) key->data + 1;
for (; key_part != end; key_part++) {
if (key_part->null_bit) {
......@@ -1808,6 +1841,15 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff;
//
// first put the "infinity" byte at beginning. States if missing columns are implicitly
// positive infinity or negative infinity. For this, because we are creating key
// from a row, there is no way that columns can be missing, so in practice,
// this will be meaningless. Might as well put in a value
//
*buff++ = COL_NEG_INF;
*has_null = false;
for (; key_part != end && key_length > 0; key_part++) {
//
......@@ -1820,7 +1862,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
uint null_offset = (uint) ((char*) key_part->field->null_ptr
- (char*) table->record[0]);
if (record[null_offset] & key_part->field->null_bit) {
*buff++ = 0;
*buff++ = NULL_COL_VAL;
*has_null = true;
//
// fractal tree does not handle this falg at the moment
......@@ -1829,7 +1871,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
//key->flags |= DB_DBT_DUPOK;
continue;
}
*buff++ = 1; // Store NOT NULL marker
*buff++ = NONNULL_COL_VAL; // Store NOT NULL marker
}
//
// accessing field_offset(key_part->field) instead off key_part->offset
......@@ -1860,6 +1902,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
// [out] has_null - says if the key has a NULL value for one of its columns
// key_length - currently set to MAX_KEY_LENGTH, is it size of buff?
// Returns:
// the parameter key
......@@ -1889,7 +1932,7 @@ DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, c
// Returns:
// the parameter key
//
DBT *ha_tokudb::pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length) {
DBT *ha_tokudb::pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte) {
TOKUDB_DBUG_ENTER("ha_tokudb::pack_key");
KEY *key_info = table->key_info + keynr;
KEY_PART_INFO *key_part = key_info->key_part;
......@@ -1899,20 +1942,22 @@ DBT *ha_tokudb::pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_
bzero((void *) key, sizeof(*key));
key->data = buff;
//
// first put the "infinity" byte at beginning. States if missing columns are implicitly
// positive infinity or negative infinity
//
*buff++ = inf_byte;
for (; key_part != end && (int) key_length > 0; key_part++) {
uint offset = 0;
if (key_part->null_bit) {
if (!(*buff++ = (*key_ptr == 0))) // Store 0 if NULL
{
if (!(*key_ptr == 0)) {
*buff++ = NULL_COL_VAL;
key_length -= key_part->store_length;
key_ptr += key_part->store_length;
//
// fractal tree does not handle this falg at the moment
// so commenting out for now
//
//key->flags |= DB_DBT_DUPOK;
continue;
}
*buff++ = NONNULL_COL_VAL;
offset = 1; // Data is at key_ptr+1
}
buff = key_part->field->pack_key_from_key_image(buff, (uchar *) key_ptr + offset,
......@@ -2830,7 +2875,7 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k
current_row.flags = DB_DBT_REALLOC;
active_index = MAX_KEY;
error = share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0);
error = share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len, COL_NEG_INF), &current_row, 0);
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = HA_ERR_KEY_NOT_FOUND;
goto cleanup;
......@@ -2971,7 +3016,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
table->in_use->status_var.ha_read_key_count++;
bzero((void *) &row, sizeof(row));
pack_key(&last_key, active_index, key_buff, key, key_len);
pack_key(&last_key, active_index, key_buff, key, key_len, COL_NEG_INF);
info.ha = this;
info.buf = buf;
......@@ -2984,7 +3029,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) {
DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
pack_key(&orig_key, active_index, key_buff2, key, key_len, COL_NEG_INF);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key))
error = DB_NOTFOUND;
}
......@@ -3014,7 +3059,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) {
DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
pack_key(&orig_key, active_index, key_buff2, key, key_len, COL_NEG_INF);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
}
......@@ -3294,7 +3339,11 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
/* We don't need to set app_data here */
bzero((void *) to, sizeof(*to));
//
// this should really be done through pack_key functions
//
to->data = pos;
*pos++ = COL_NEG_INF;
if (share->fixed_length_primary_key)
to->size = ref_length;
else {
......@@ -3349,20 +3398,21 @@ int ha_tokudb::read_range_first(
const DBT* start_dbt_data = NULL;
DBT end_dbt_key;
const DBT* end_dbt_data = NULL;
uchar start_key_buff [table_share->max_key_length + MAX_REF_PARTS * 3];
uchar end_key_buff [table_share->max_key_length + MAX_REF_PARTS * 3];
uchar start_key_buff [table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar)];
uchar end_key_buff [table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar)];
bzero((void *) &start_dbt_key, sizeof(start_dbt_key));
bzero((void *) &end_dbt_key, sizeof(end_dbt_key));
range_lock_grabbed = false;
if (start_key) {
pack_key(&start_dbt_key, active_index, start_key_buff, start_key->key, start_key->length);
switch (start_key->flag) {
case HA_READ_AFTER_KEY:
pack_key(&start_dbt_key, active_index, start_key_buff, start_key->key, start_key->length, COL_POS_INF);
start_dbt_data = share->key_file[active_index]->dbt_pos_infty();
break;
default:
pack_key(&start_dbt_key, active_index, start_key_buff, start_key->key, start_key->length, COL_NEG_INF);
start_dbt_data = share->key_file[active_index]->dbt_neg_infty();
break;
}
......@@ -3372,12 +3422,13 @@ int ha_tokudb::read_range_first(
}
if (end_key) {
pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length);
switch (end_key->flag) {
case HA_READ_BEFORE_KEY:
pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_NEG_INF);
end_dbt_data = share->key_file[active_index]->dbt_neg_infty();
break;
default:
pack_key(&end_dbt_key, active_index, end_key_buff, end_key->key, end_key->length, COL_POS_INF);
end_dbt_data = share->key_file[active_index]->dbt_pos_infty();
break;
}
......@@ -4178,7 +4229,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
// So, we call key_range64 on the key, and the key that is after it.
//
if (start_key) {
pack_key(&key, keynr, key_buff, start_key->key, start_key->length);
pack_key(&key, keynr, key_buff, start_key->key, start_key->length, COL_NEG_INF);
error = kfile->key_range64(
kfile,
transaction,
......@@ -4235,7 +4286,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
}
if (end_key) {
pack_key(&key, keynr, key_buff, end_key->key, end_key->length);
pack_key(&key, keynr, key_buff, end_key->key, end_key->length, COL_NEG_INF);
error = kfile->key_range64(
kfile,
transaction,
......
......@@ -38,7 +38,7 @@ typedef struct st_tokudb_share {
uint ai_field_index;
} TOKUDB_SHARE;
#define HA_TOKU_VERSION 1
#define HA_TOKU_VERSION 2
//
// no capabilities yet
//
......@@ -57,6 +57,19 @@ typedef enum {
hatoku_ai_create_value
} HA_METADATA_KEY ;
//
// for storing NULL byte in keys
//
#define NULL_COL_VAL 0
#define NONNULL_COL_VAL 1
//
// for storing if rest of key is +/- infinity
//
#define COL_NEG_INF 0
#define COL_POS_INF 1
typedef struct st_prim_key_part_info {
uint offset;
uint part_index;
......@@ -172,7 +185,7 @@ private:
void unpack_key(uchar * record, DBT const *key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment