Commit e32ce570 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #648

Updated ha_tokudb::index_read and comparison functions based on code review.
Also put in some (very informal) comments about how index_read is supposed to work.

git-svn-id: file:///svn/mysql/tokudb-engine/src@3275 c7de825b-a66e-492c-adef-691d508d4ae1
parent 62a3b58b
......@@ -684,33 +684,34 @@ static int tokudb_cmp_packed_key(DB * file, const DBT * new_key, const DBT * sav
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp;
uint length;
uint new_key_field_length;
uint saved_key_field_length;
if (key_part->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr++)
return ((int) *new_key_ptr - (int) saved_key_ptr[-1]);
if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
key_length--;
saved_key_length--;
if (!*new_key_ptr++)
continue;
if (!*new_key_ptr++) { continue; }
}
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert( key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp;
length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
new_key_ptr += length;
key_length -= length;
length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
saved_key_ptr += length;
saved_key_length -= length;
}
if (key_length < saved_key_length)
return -1;
if (key_length > saved_key_length)
return 1;
return 0;
new_key_ptr += new_key_field_length;
key_length -= new_key_field_length;
saved_key_ptr += saved_key_field_length;
saved_key_length -= saved_key_field_length;
}
return key_length - saved_key_length;
}
//TODO: QQQ Only do one direction for prefix.
//TODO: QQQ Combine this and previous function.
static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DBT * saved_key) {
assert(file->app_private != 0);
KEY *key = (KEY *) file->app_private;
......@@ -723,25 +724,28 @@ static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DB
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp;
uint length;
uint new_key_field_length;
uint saved_key_field_length;
if (key_part->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr++)
return ((int) *new_key_ptr - (int) saved_key_ptr[-1]);
if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
key_length--;
saved_key_length--;
if (!*new_key_ptr++)
continue;
if (!*new_key_ptr++) { continue; }
}
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert( key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp;
length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
new_key_ptr += length;
key_length -= length;
length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
saved_key_ptr += length;
saved_key_length -= length;
new_key_ptr += new_key_field_length;
key_length -= new_key_field_length;
saved_key_ptr += saved_key_field_length;
saved_key_length -= saved_key_field_length;
}
return 0;
}
......@@ -1746,6 +1750,8 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k
TOKUDB_DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
}
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas.
int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read");
DBT row;
......@@ -1754,6 +1760,67 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
table->in_use->status_var.ha_read_key_count++;
bzero((void *) &row, sizeof(row));
pack_key(&last_key, active_index, key_buff, key, key_len);
/*
if (full_key) {
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
Just c_get DB_SET, return.
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE. If EQUAL to query, then do DB_NEXT (or is it DB_NEXT_NODUP?)
case (HA_READ_KEY_OR_NEXT):
c_get DB_SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
if if WAS equal to the query, then
if (NO_DUP db, just return it) else:
do DB_NEXT_NODUP
if found, do DB_PREV and return
else do DB_LAST and return
case (HA_READ_PREFIX_LAST):
c_get DB_SET. if !found, return NOT FOUND
if (NO_DUP db, just return it) else:
do c_get DB_NEXT_NODUP
if found, do DB_PREV and return.
else do DB_LAST and return.
default: Crash a lot.
}
else {
// Not full key
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
c_get DB_SET_RANGE, then check a prefix
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE, then:
while (found && query is prefix of 'dbtfound') do:
c_get DB_NEXT_NODUP (Definitely NEXT_NODUP since we care about key only).
case (HA_READ_KEY_OR_NEXT):
c_get SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If query not a prefix of found, then DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE, then:
if (found && query is prefix of whatever found) do:
c_get DB_NEXT till not prefix (and return the one that was)
if (found originally but was not prefix of whatever found) do:
c_get DB_PREV
case (HA_READ_PREFIX_LAST):
c_get DB_SET_RANGE. if !found, or query not prefix of what found, return NOT FOUND
whlie query is prefix of whatfound, do c_get DB_NEXT till not.. then return the last one that was.
default: Crash a lot.
}
}
Note that sometimes if not found, will need things like DB_FIRST or DB_LAST
TODO: QQQ maybe need to pass true/1 as last parameter of read_row (this would make it
return END_OF_FILE instead of just NOT_FOUND
*/
switch (find_flag) {
case HA_READ_KEY_EXACT:
......@@ -1764,34 +1831,30 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key))
error = DB_NOTFOUND;
}
error = read_row(error, buf, active_index, &row, 0, 0);
break;
case HA_READ_AFTER_KEY:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) {
DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
for (;;) {
pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
break;
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT);
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT_NODUP);
if (error != 0)
break;
}
}
error = read_row(error, buf, active_index, &row, 0, 0);
break;
case HA_READ_BEFORE_KEY:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
else
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
error = read_row(error, buf, active_index, &row, 0, 0);
break;
case HA_READ_KEY_OR_NEXT:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
error = read_row(error, buf, active_index, &row, 0, 0);
break;
case HA_READ_KEY_OR_PREV:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
......@@ -1800,33 +1863,36 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
} else
}
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
error = read_row(error, buf, active_index, &row, 0, 0);
break;
case HA_READ_PREFIX_LAST_OR_PREV:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) {
DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
for (;;) {
pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
break;
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT);
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT_NODUP);
if (error != 0)
break;
}
if (error == 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
}
error = read_row(error, buf, active_index, &row, 0, 0);
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
break;
default:
printf("%d:%s:%d:unsupported:%d\n", my_tid(), __FILE__, __LINE__, find_flag);
error = HA_ERR_UNSUPPORTED;
break;
}
error = read_row(error, buf, active_index, &row, 0, find_flag != HA_READ_KEY_EXACT);
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR))
printf("%d:%s:%d:error:%d:%d\n", my_tid(), __FILE__, __LINE__, error, find_flag);
TOKUDB_DBUG_RETURN(error);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment