Commit 4ae5e7e4 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #843

merge back into main branch

git-svn-id: file:///svn/mysql/tokudb-engine/src@4264 c7de825b-a66e-492c-adef-691d508d4ae1
parent 645f5457
......@@ -692,7 +692,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg)
// flags defined in sql\handler.h
int_table_flags(HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX |
HA_FILE_BASED | HA_CAN_GEOMETRY | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX),
changed_rows(0), last_dup_key((uint) - 1), version(0), using_ignore(0) {
changed_rows(0), last_dup_key((uint) - 1), version(0), using_ignore(0),primary_key_offsets(NULL) {
}
static const char *ha_tokudb_exts[] = {
......@@ -954,6 +954,12 @@ static bool tokudb_key_cmp(TABLE * table, KEY * key_info, const uchar * key, uin
}
#endif
int primary_key_part_compare (const void* left, const void* right) {
PRIM_KEY_PART_INFO* left_part= (PRIM_KEY_PART_INFO *)left;
PRIM_KEY_PART_INFO* right_part = (PRIM_KEY_PART_INFO *)right;
return left_part->offset - right_part->offset;
}
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
//
......@@ -1061,6 +1067,32 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
my_free(alloc_ptr, MYF(0));
TOKUDB_DBUG_RETURN(1);
}
/* Make sorted list of primary key parts, if they exist*/
if (!hidden_primary_key) {
uint num_prim_key_parts = table_share->key_info[table_share->primary_key].key_parts;
primary_key_offsets = (PRIM_KEY_PART_INFO *)my_malloc(
num_prim_key_parts*sizeof(*primary_key_offsets),
MYF(MY_WME)
);
if (!primary_key_offsets) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
TOKUDB_DBUG_RETURN(1);
}
for (uint i = 0; i < table_share->key_info[table_share->primary_key].key_parts; i++) {
primary_key_offsets[i].offset = table_share->key_info[table_share->primary_key].key_part[i].offset;
primary_key_offsets[i].part_index = i;
}
qsort(
primary_key_offsets, // start of array
num_prim_key_parts, //num elements
sizeof(*primary_key_offsets), //size of each element
primary_key_part_compare
);
}
thr_lock_data_init(&share->lock, &lock, NULL);
bzero((void *) &current_row, sizeof(current_row));
......@@ -1077,6 +1109,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
if (primary_key_offsets) my_free(primary_key_offsets, MYF(0));
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
......@@ -1090,6 +1123,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
if (primary_key_offsets) my_free(primary_key_offsets, MYF(0));
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
......@@ -1106,6 +1140,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
if (primary_key_offsets) my_free(primary_key_offsets, MYF(0));
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
......@@ -1169,6 +1204,7 @@ int ha_tokudb::__close(int mutex_is_locked) {
TOKUDB_TRACE("close:%p\n", this);
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
my_free(primary_key_offsets, MYF(MY_ALLOW_ZERO_PTR));
ha_tokudb::reset(); // current_row buffer
TOKUDB_DBUG_RETURN(free_share(share, table, hidden_primary_key, mutex_is_locked));
}
......@@ -1218,29 +1254,114 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
int ha_tokudb::pack_row(DBT * row, const uchar * record) {
uchar *ptr;
int r = ENOSYS;
bzero((void *) row, sizeof(*row));
uint curr_skip_index;
KEY *key_info = table->key_info + primary_key;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
//
if (share->fixed_length_row) {
row->data = (void *) record;
row->size = table_share->reclength;
return 0;
if (hidden_primary_key) {
row->data = (void *)record;
row->size = table_share->reclength;
r = 0;
goto cleanup;
}
else {
//
// if the primary key is not hidden, then it is part of the record
// because primary key information is already stored in the key
// that will be passed to the fractal tree, we do not copy
// components that belong to the primary key
//
if (fix_rec_buff_for_blob(table_share->reclength)) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
uchar* tmp_dest = rec_buff;
const uchar* tmp_src = record;
uint i = 0;
//
// say we have 100 bytes in record, and bytes 25-50 and 75-90 belong to the primary key
// this for loop will do a memcpy [0,25], [51,75] and [90,100]
//
for (i =0; i < key_info->key_parts; i++){
uint curr_index = primary_key_offsets[i].part_index;
uint bytes_to_copy = record + key_info->key_part[curr_index].offset - tmp_src;
memcpy(tmp_dest,tmp_src, bytes_to_copy);
tmp_dest += bytes_to_copy;
tmp_src = record + key_info->key_part[curr_index].offset + key_info->key_part[curr_index].length;
}
memcpy(tmp_dest,tmp_src, record + table_share->reclength - tmp_src);
tmp_dest += record + table_share->reclength - tmp_src;
row->data = rec_buff;
row->size = (size_t) (tmp_dest - rec_buff);
r = 0;
goto cleanup;
}
}
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record)))
return HA_ERR_OUT_OF_MEM;
if (fix_rec_buff_for_blob(max_row_length(record))) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
}
/* Copy null bits */
memcpy(rec_buff, record, table_share->null_bytes);
ptr = rec_buff + table_share->null_bytes;
//
// assert that when the hidden primary key exists, primary_key_offsets is NULL
//
assert( (hidden_primary_key != 0) == (primary_key_offsets == NULL));
curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field);
//
// if the primary key is hidden, primary_key_offsets will be NULL and
// this clause will not execute
//
if (primary_key_offsets) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
// we have hit a field that is a portion of the primary key
//
uint curr_key_index = primary_key_offsets[curr_skip_index].part_index;
curr_skip_index++;
//
// only choose to continue over the key if the key's length matches the field's length
// otherwise, we may have a situation where the column is a varchar(10), the
// key is only the first 3 characters, and we end up losing the last 7 bytes of the
// column
//
if (table->key_info[primary_key].key_part[curr_key_index].length == (*field)->field_length) {
continue;
}
}
}
ptr = (*field)->pack(ptr, (const uchar *)
(record + field_offset(*field)));
(record + curr_field_offset));
}
row->data = rec_buff;
row->size = (size_t) (ptr - rec_buff);
return 0;
r = 0;
cleanup:
dbug_tmp_restore_column_map(table->write_set, old_map);
return r;
}
//
......@@ -1249,21 +1370,88 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record) {
// [out] record - row in MySQL format
// [in] row - row stored in DBT to be converted
//
void ha_tokudb::unpack_row(uchar * record, DBT * row) {
if (share->fixed_length_row)
memcpy(record, (void *) row->data, table_share->reclength);
void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) {
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
//
if (share->fixed_length_row) {
if (hidden_primary_key) {
memcpy(record, (void *) row->data, table_share->reclength);
}
else {
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
KEY *key_info = table_share->key_info + primary_key;
uchar* tmp_dest = record;
uchar* tmp_src = (uchar *)row->data;
uint i = 0;
//
// unpack_key will fill in parts of record that are part of the primary key
//
unpack_key(record, key, primary_key);
//
// produces the opposite effect to what happened in pack_row
// first we fill in the parts of record that are not part of the key
//
for (i =0; i < key_info->key_parts; i++){
uint curr_index = primary_key_offsets[i].part_index;
uint bytes_to_copy = record + key_info->key_part[curr_index].offset - tmp_dest;
memcpy(tmp_dest,tmp_src, bytes_to_copy);
tmp_src += bytes_to_copy;
tmp_dest = record + key_info->key_part[curr_index].offset + key_info->key_part[curr_index].length;
}
memcpy(tmp_dest,tmp_src, record + table_share->reclength - tmp_dest);
dbug_tmp_restore_column_map(table->write_set, old_map);
}
}
else {
/* Copy null bits */
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
const uchar *ptr = (const uchar *) row->data;
memcpy(record, ptr, table_share->null_bytes);
ptr += table_share->null_bytes;
for (Field ** field = table->field; *field; field++)
if (primary_key_offsets) {
//
// unpack_key will fill in parts of record that are part of the primary key
//
unpack_key(record, key, primary_key);
}
//
// fill in parts of record that are not part of the key
//
uint curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field);
if (primary_key_offsets) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
// we have hit a field that is a portion of the primary key
//
uint curr_key_index = primary_key_offsets[curr_skip_index].part_index;
curr_skip_index++;
//
// only choose to continue over the key if the key's length matches the field's length
// otherwise, we may have a situation where the column is a varchar(10), the
// key is only the first 3 characters, and we end up losing the last 7 bytes of the
// column
//
if (table->key_info[primary_key].key_part[curr_key_index].length == (*field)->field_length) {
continue;
}
}
}
ptr = (*field)->unpack(record + field_offset(*field), ptr);
}
dbug_tmp_restore_column_map(table->write_set, old_map);
}
}
//
// Store the key and the primary key into the row
// Parameters:
......@@ -2137,11 +2325,13 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
row = &current_row;
// TOKUDB_DBUG_DUMP("key=", key.data, key.size);
// TOKUDB_DBUG_DUMP("row=", row->data, row->size);
unpack_row(buf, &current_row, &key);
}
else {
unpack_row(buf, row, found_key);
}
unpack_row(buf, row);
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
TOKUDB_DBUG_RETURN(0);
}
......@@ -3356,7 +3546,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
error = cursor_ret_val;
goto cleanup;
}
unpack_row(tmp_record, &row);
unpack_row(tmp_record, &row, &current_primary_key);
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record);
......
......@@ -31,6 +31,11 @@ typedef struct st_tokudb_share {
} TOKUDB_SHARE;
typedef struct st_prim_key_part_info {
uint offset;
uint part_index;
} PRIM_KEY_PART_INFO;
class ha_tokudb : public handler {
private:
THR_LOCK_DATA lock; ///< MySQL lock
......@@ -110,13 +115,15 @@ private:
uint hidden_primary_key;
uint version;
bool key_read, using_ignore;
PRIM_KEY_PART_INFO* primary_key_offsets;
bool fix_rec_buff_for_blob(ulong length);
#define TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH 5 // QQQ why 5?
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record);
void unpack_row(uchar * record, DBT * row);
void unpack_row(uchar * record, DBT * row, DBT* key);
void unpack_key(uchar * record, DBT * key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment