Commit 647a01c5 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1116 #1658

merge to main

git-svn-id: file:///svn/mysql/tokudb-engine/src@11407 c7de825b-a66e-492c-adef-691d508d4ae1
parent 382639fa
......@@ -44,6 +44,10 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
#include "hatoku_hton.h"
#include <mysql/plugin.h>
static const char *ha_tokudb_exts[] = {
ha_tokudb_ext,
NullS
};
#define declare_lockretry \
int lockretrycount;
......@@ -59,14 +63,41 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
usleep((lockretrycount<4 ? (1<<lockretrycount) : (1<<3)) * 1024); \
} while (0)
//
// This offset is calculated starting from AFTER the NULL bytes
//
inline uint get_var_len_offset(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
uint offset = 0;
for (uint i = 0; i < table_share->fields; i++) {
if (share->field_lengths[i] && !bitmap_is_set(&share->key_filters[keynr],i)) {
offset += share->field_lengths[i];
}
}
return offset;
}
inline uint get_len_of_offsets(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
uint len = 0;
for (uint i = 0; i < table_share->fields; i++) {
if (share->length_bytes[i] && !bitmap_is_set(&share->key_filters[keynr],i)) {
len += share->num_offset_bytes;
}
}
return len;
}
/** @brief
Simple lock controls. The "share" it creates is a structure we will
pass to each tokudb handler. Do you have to have one of these? Well, you have
pieces that are used for locking, and they are needed to function.
*/
static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
TOKUDB_SHARE *share;
static TOKUDB_SHARE *get_share(const char *table_name, struct st_table_share *table_share) {
TOKUDB_SHARE *share = NULL;
int error = 0;
uint length;
uint i = 0;
pthread_mutex_lock(&tokudb_mutex);
length = (uint) strlen(table_name);
......@@ -91,22 +122,58 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
share->table_name = tmp_name;
strmov(share->table_name, table_name);
//
// initialize all of the bitmaps
//
for (i = 0; i < MAX_KEY + 1; i++) {
error = bitmap_init(
&share->key_filters[i],
NULL,
table_share->fields,
false
);
if (error) {
goto exit;
}
}
//
// create the field lengths
//
share->field_lengths = (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
share->length_bytes= (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
share->blob_fields= (u_int32_t *)my_malloc(table_share->fields*sizeof(u_int32_t), MYF(MY_WME | MY_ZEROFILL));
if (share->field_lengths == NULL ||
share->length_bytes == NULL ||
share->blob_fields == NULL ) {
goto exit;
}
bzero((void *) share->key_file, sizeof(share->key_file));
if (my_hash_insert(&tokudb_open_tables, (uchar *) share))
goto error;
error = my_hash_insert(&tokudb_open_tables, (uchar *) share);
if (error) {
goto exit;
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
}
pthread_mutex_unlock(&tokudb_mutex);
exit:
if (error) {
for (i = 0; MAX_KEY + 1; i++) {
bitmap_free(&share->key_filters[i]);
}
my_free(share->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_destroy(&share->mutex);
my_free((uchar *) share, MYF(0));
share = NULL;
}
return share;
error:
pthread_mutex_destroy(&share->mutex);
my_free((uchar *) share, MYF(0));
return NULL;
}
static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
......@@ -136,6 +203,17 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
share->key_file[i] = NULL;
}
}
for (uint i = 0; i < MAX_KEY+1; i++) {
bitmap_free(&share->key_filters[i]);
}
for (uint i = 0; i < MAX_KEY+1; i++) {
my_free(share->cp_info[i], MYF(MY_ALLOW_ZERO_PTR));
}
my_free(share->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
if (share->status_block && (error = share->status_block->close(share->status_block, 0))) {
result = error;
......@@ -185,30 +263,7 @@ static void make_name(char *newname, const char *tablename, const char *dictname
goto cleanup; \
}
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg)
// flags defined in sql\handler.h
{
int_table_flags = HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX |
HA_FILE_BASED | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX |HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE;
alloc_ptr = NULL;
rec_buff = NULL;
transaction = NULL;
added_rows = 0;
deleted_rows = 0;
last_dup_key = UINT_MAX;
using_ignore = 0;
last_cursor_error = 0;
range_lock_grabbed = false;
primary_key_offsets = NULL;
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
}
static const char *ha_tokudb_exts[] = {
ha_tokudb_ext,
NullS
};
/*
* returns NULL terminated file extension string
......@@ -230,12 +285,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
int primary_key_part_compare (const void* left, const void* right) {
PRIM_KEY_PART_INFO* left_part= (PRIM_KEY_PART_INFO *)left;
PRIM_KEY_PART_INFO* right_part = (PRIM_KEY_PART_INFO *)right;
return left_part->offset - right_part->offset;
}
//
// struct that will be used as a context for smart DBT callbacks
// contains parameters needed to complete the smart DBT cursor call
......@@ -263,7 +312,7 @@ typedef struct smart_dbt_ai_info {
static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
info->ha->unpack_row(info->buf,row,key, true);
info->ha->unpack_row(info->buf,row,key, info->ha->primary_key);
//
// copy the key to prim_key
// This will be used as the data value for elements in the secondary index
......@@ -504,6 +553,258 @@ inline u_int32_t toku_iso_to_txn_flag (HA_TOKU_ISO_LEVEL lvl) {
int filter_key_part_compare (const void* left, const void* right) {
FILTER_KEY_PART_INFO* left_part= (FILTER_KEY_PART_INFO *)left;
FILTER_KEY_PART_INFO* right_part = (FILTER_KEY_PART_INFO *)right;
return left_part->offset - right_part->offset;
}
//
// Be very careful with parameters passed to this function. Who knows
// if key, table have proper info set. I had to verify by checking
// in the debugger.
//
void set_key_filter(MY_BITMAP* key_filter, KEY* key, TABLE* table, bool get_offset_from_keypart) {
FILTER_KEY_PART_INFO parts[MAX_REF_PARTS];
uint curr_skip_index = 0;
for (uint i = 0; i < key->key_parts; i++) {
//
// horrendous hack due to bugs in mysql, basically
// we cannot always reliably get the offset from the same source
//
parts[i].offset = get_offset_from_keypart ? key->key_part[i].offset : field_offset(key->key_part[i].field, table);
parts[i].part_index = i;
}
qsort(
parts, // start of array
key->key_parts, //num elements
sizeof(*parts), //size of each element
filter_key_part_compare
);
for (uint i = 0; i < table->s->fields; i++) {
Field* field = table->field[i];
uint curr_field_offset = field_offset(field, table);
if (curr_skip_index < key->key_parts) {
uint curr_skip_offset = 0;
curr_skip_offset = parts[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
// we have hit a field that is a portion of the primary key
//
uint curr_key_index = parts[curr_skip_index].part_index;
curr_skip_index++;
//
// only choose to continue over the key if the key's length matches the field's length
// otherwise, we may have a situation where the column is a varchar(10), the
// key is only the first 3 characters, and we end up losing the last 7 bytes of the
// column
//
TOKU_TYPE toku_type;
toku_type = mysql_to_toku_type(field);
switch(toku_type) {
case(toku_type_blob):
break;
case(toku_type_varbinary):
case(toku_type_varstring):
case(toku_type_fixbinary):
case(toku_type_fixstring):
if (key->key_part[curr_key_index].length == field->field_length) {
bitmap_set_bit(key_filter,i);
}
break;
default:
bitmap_set_bit(key_filter,i);
break;
}
}
}
}
}
inline uchar* pack_fixed_field(
uchar* to_tokudb,
const uchar* from_mysql,
u_int32_t num_bytes
)
{
switch (num_bytes) {
case (1):
memcpy(to_tokudb, from_mysql, 1);
break;
case (2):
memcpy(to_tokudb, from_mysql, 2);
break;
case (3):
memcpy(to_tokudb, from_mysql, 3);
break;
case (4):
memcpy(to_tokudb, from_mysql, 4);
break;
case (8):
memcpy(to_tokudb, from_mysql, 8);
break;
default:
memcpy(to_tokudb, from_mysql, num_bytes);
break;
}
return to_tokudb+num_bytes;
}
inline const uchar* unpack_fixed_field(
uchar* to_mysql,
const uchar* from_tokudb,
u_int32_t num_bytes
)
{
switch (num_bytes) {
case (1):
memcpy(to_mysql, from_tokudb, 1);
break;
case (2):
memcpy(to_mysql, from_tokudb, 2);
break;
case (3):
memcpy(to_mysql, from_tokudb, 3);
break;
case (4):
memcpy(to_mysql, from_tokudb, 4);
break;
case (8):
memcpy(to_mysql, from_tokudb, 8);
break;
default:
memcpy(to_mysql, from_tokudb, num_bytes);
break;
}
return from_tokudb+num_bytes;
}
inline uchar* pack_var_field(
uchar* to_tokudb_offset_ptr, //location where offset data is going to be written
uchar* to_tokudb_data,
uchar* to_tokudb_offset_start, //location where offset starts
const uchar * from_mysql,
u_int32_t mysql_length_bytes,
u_int32_t offset_bytes
)
{
uint data_length = 0;
u_int32_t offset = 0;
switch(mysql_length_bytes) {
case(1):
data_length = from_mysql[0];
break;
case(2):
data_length = uint2korr(from_mysql);
break;
default:
assert(false);
break;
}
memcpy(to_tokudb_data, from_mysql + mysql_length_bytes, data_length);
//
// for offset, we pack the offset where the data ENDS!
//
offset = to_tokudb_data + data_length - to_tokudb_offset_start;
switch(offset_bytes) {
case (1):
to_tokudb_offset_ptr[0] = (uchar)offset;
break;
case (2):
int2store(to_tokudb_offset_ptr,offset);
break;
default:
assert(false);
break;
}
return to_tokudb_data + data_length;
}
inline void unpack_var_field(
uchar* to_mysql,
const uchar* from_tokudb_data,
u_int32_t from_tokudb_data_len,
u_int32_t mysql_length_bytes
)
{
//
// store the length
//
switch (mysql_length_bytes) {
case(1):
to_mysql[0] = (uchar)from_tokudb_data_len;
break;
case(2):
int2store(to_mysql, from_tokudb_data_len);
break;
default:
assert(false);
break;
}
//
// store the data
//
memcpy(to_mysql+mysql_length_bytes, from_tokudb_data, from_tokudb_data_len);
}
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg)
// flags defined in sql\handler.h
{
int_table_flags = HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX |
HA_FILE_BASED | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX |HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE;
alloc_ptr = NULL;
rec_buff = NULL;
transaction = NULL;
cursor = NULL;
fixed_cols_for_query = NULL;
var_cols_for_query = NULL;
num_fixed_cols_for_query = 0;
num_var_cols_for_query = 0;
unpack_entire_row = true;
read_blobs = false;
read_key = false;
added_rows = 0;
deleted_rows = 0;
last_dup_key = UINT_MAX;
using_ignore = 0;
last_cursor_error = 0;
range_lock_grabbed = false;
num_added_rows_in_stmt = 0;
num_deleted_rows_in_stmt = 0;
num_updated_rows_in_stmt = 0;
}
//
// states if table has an auto increment column, if so, sets index where auto inc column is to index
// Parameters:
// [out] index - if auto inc exists, then this param is set to where it exists in table, if not, then unchanged
// Returns:
// true if auto inc column exists, false otherwise
//
bool ha_tokudb::has_auto_increment_flag(uint* index) {
//
// check to see if we have auto increment field
//
bool ai_found = false;
uint ai_index = 0;
for (uint i = 0; i < table_share->fields; i++, ai_index++) {
Field* field = table->field[i];
if (field->flags & AUTO_INCREMENT_FLAG) {
ai_found = true;
*index = ai_index;
break;
}
}
return ai_found;
}
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
......@@ -553,6 +854,257 @@ cleanup:
return error;
}
int initialize_col_pack_info(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
int error = ENOSYS;
//
// set up the cp_info
//
assert(share->cp_info[keynr] == NULL);
share->cp_info[keynr] = (COL_PACK_INFO *)my_malloc(
table_share->fields*sizeof(COL_PACK_INFO),
MYF(MY_WME | MY_ZEROFILL)
);
if (share->cp_info[keynr] == NULL) {
error = ENOMEM;
goto exit;
}
{
u_int32_t curr_fixed_offset = 0;
u_int32_t curr_var_index = 0;
for (uint j = 0; j < table_share->fields; j++) {
COL_PACK_INFO* curr = &share->cp_info[keynr][j];
//
// need to set the offsets / indexes
// offsets are calculated AFTER the NULL bytes
//
if (!bitmap_is_set(&share->key_filters[keynr],j)) {
if (share->field_lengths[j]) {
curr->col_pack_val = curr_fixed_offset;
curr_fixed_offset += share->field_lengths[j];
}
else if (share->length_bytes[j]) {
curr->col_pack_val = curr_var_index;
curr_var_index++;
}
}
}
//
// set up the mcp_info
//
share->mcp_info[keynr].var_len_offset = get_var_len_offset(
share,
table_share,
keynr
);
share->mcp_info[keynr].len_of_offsets = get_len_of_offsets(
share,
table_share,
keynr
);
error = 0;
}
exit:
return error;
}
int ha_tokudb::initialize_share(
const char* name,
int mode
)
{
int error = 0;
char* newname = NULL;
char name_buff[FN_REFLEN];
u_int64_t num_rows = 0;
u_int32_t curr_blob_field_index = 0;
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
open_flags += DB_AUTO_COMMIT;
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
newname = (char *)my_malloc(strlen(name) + NAME_CHAR_LEN,MYF(MY_WME));
if (newname == NULL) {
error = ENOMEM;
goto exit;
}
//
// initialize share->num_offset_bytes
//
if (table_share->reclength < 256) {
share->num_offset_bytes = 1;
}
else {
share->num_offset_bytes = 2;
}
//
// fill in the field lengths. 0 means it is a variable sized field length
// fill in length_bytes, 0 means it is fixed or blob
//
for (uint i = 0; i < table_share->fields; i++) {
Field* field = table_share->field[i];
TOKU_TYPE toku_type = mysql_to_toku_type(field);
uint32 pack_length = 0;
switch (toku_type) {
case toku_type_int:
case toku_type_double:
case toku_type_float:
case toku_type_fixbinary:
case toku_type_fixstring:
pack_length = field->pack_length();
assert(pack_length < 256);
share->field_lengths[i] = (uchar)pack_length;
share->length_bytes[i] = 0;
break;
case toku_type_blob:
share->field_lengths[i] = 0;
share->length_bytes[i] = 0;
share->blob_fields[curr_blob_field_index] = i;
curr_blob_field_index++;
break;
case toku_type_varstring:
case toku_type_varbinary:
//
// meaning it is variable sized
//
share->field_lengths[i] = 0;
share->length_bytes[i] = (uchar)((Field_varstring *)field)->length_bytes;
break;
default:
assert(false);
}
}
share->num_blobs = curr_blob_field_index;
for (uint i = 0; i < table_share->keys + test(hidden_primary_key); i++) {
//
// do the cluster/primary key filtering calculations
//
if (! (i==primary_key && hidden_primary_key) ){
if ( i == primary_key ) {
set_key_filter(
&share->key_filters[primary_key],
&table_share->key_info[primary_key],
table,
true
);
}
if (table_share->key_info[i].flags & HA_CLUSTERING) {
set_key_filter(
&share->key_filters[i],
&table_share->key_info[i],
table,
true
);
if (!hidden_primary_key) {
set_key_filter(
&share->key_filters[i],
&table_share->key_info[primary_key],
table,
true
);
}
}
}
if (i == primary_key || table_share->key_info[i].flags & HA_CLUSTERING) {
error = initialize_col_pack_info(share,table_share,i);
if (error) {
goto exit;
}
}
}
error = db_create(&share->file, db_env, 0);
if (error) {
goto exit;
}
//
// set comparison function for main.tokudb
//
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
make_name(newname, name, "main");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0);
if (error) {
goto exit;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
}
/* Open other keys; These are part of the share structure */
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
for (uint i = 0; i < table_share->keys; i++) {
if (i != primary_key) {
error = open_secondary_table(
&share->key_file[i],
&table_share->key_info[i],
name,
mode,
&share->key_type[i]
);
if (error) {
goto exit;
}
}
}
if (!hidden_primary_key) {
//
// We need to set the ref_length to start at 5, to account for
// the "infinity byte" in keys, and for placing the DBT size in the first four bytes
//
ref_length = sizeof(u_int32_t) + sizeof(uchar);
KEY_PART_INFO *key_part = table->key_info[primary_key].key_part;
KEY_PART_INFO *end = key_part + table->key_info[primary_key].key_parts;
for (; key_part != end; key_part++) {
ref_length += key_part->field->max_packed_col_length(key_part->length);
}
share->status |= STATUS_PRIMARY_KEY_INIT;
}
share->ref_length = ref_length;
error = get_status();
if (error) {
goto exit;
}
if (share->version < HA_TOKU_VERSION) {
error = ENOSYS;
goto exit;
}
error = estimate_num_rows(share->file,&num_rows);
//
// estimate_num_rows should not fail under normal conditions
//
if (error == 0) {
share->rows = num_rows;
}
else {
goto exit;
}
//
// initialize auto increment data
//
share->has_auto_inc = has_auto_increment_flag(&share->ai_field_index);
if (share->has_auto_inc) {
init_auto_increment();
}
error = 0;
exit:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
......@@ -570,22 +1122,14 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_ENTER("ha_tokudb::open %p %s", this, name);
TOKUDB_OPEN();
char name_buff[FN_REFLEN];
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
uint max_key_length;
int error;
char* newname = NULL;
int error = 0;
int ret_val = 0;
transaction = NULL;
cursor = NULL;
open_flags += DB_AUTO_COMMIT;
newname = (char *)my_malloc(strlen(name) + NAME_CHAR_LEN,MYF(MY_WME));
if (newname == NULL) {
TOKUDB_DBUG_RETURN(1);
}
/* Open primary key */
hidden_primary_key = 0;
if ((primary_key = table_share->primary_key) >= MAX_KEY) {
......@@ -594,66 +1138,41 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
key_used_on_scan = MAX_KEY;
hidden_primary_key = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
ref_length = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH + sizeof(u_int32_t);
}
}
else {
key_used_on_scan = primary_key;
}
/* Need some extra memory in case of packed keys */
// the "+ 1" is for the first byte that states +/- infinity
// multiply everything by 2 to account for clustered keys having a key and primary key together
max_key_length = 2*(table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar));
if (!(alloc_ptr =
my_multi_malloc(MYF(MY_WME),
&key_buff, max_key_length,
&key_buff2, max_key_length,
&primary_key_buff, (hidden_primary_key ? 0 : max_key_length),
NullS))) {
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
if (!(rec_buff = (uchar *) my_malloc((alloced_rec_buff_length = table_share->rec_buff_length), MYF(MY_WME)))) {
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(0));
alloc_ptr = NULL;
TOKUDB_DBUG_RETURN(1);
alloc_ptr = my_multi_malloc(MYF(MY_WME),
&key_buff, max_key_length,
&key_buff2, max_key_length,
&primary_key_buff, (hidden_primary_key ? 0 : max_key_length),
&fixed_cols_for_query, table_share->fields*sizeof(u_int32_t),
&var_cols_for_query, table_share->fields*sizeof(u_int32_t),
NullS
);
if (alloc_ptr == NULL) {
ret_val = 1;
goto exit;
}
/* Init shared structure */
if (!(share = get_share(name, table))) {
my_free((char *) rec_buff, MYF(0));
rec_buff = NULL;
my_free(alloc_ptr, MYF(0));
alloc_ptr = NULL;
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields;
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
if (rec_buff == NULL) {
ret_val = 1;
goto exit;
}
/* Make sorted list of primary key parts, if they exist*/
if (!hidden_primary_key) {
uint num_prim_key_parts = table_share->key_info[table_share->primary_key].key_parts;
primary_key_offsets = (PRIM_KEY_PART_INFO *)my_malloc(
num_prim_key_parts*sizeof(*primary_key_offsets),
MYF(MY_WME)
);
if (!primary_key_offsets) {
free_share(share, 1);
my_free((char *) rec_buff, MYF(0));
rec_buff = NULL;
my_free(alloc_ptr, MYF(0));
alloc_ptr = NULL;
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
for (uint i = 0; i < table_share->key_info[table_share->primary_key].key_parts; i++) {
primary_key_offsets[i].offset = table_share->key_info[table_share->primary_key].key_part[i].offset;
primary_key_offsets[i].part_index = i;
}
qsort(
primary_key_offsets, // start of array
num_prim_key_parts, //num elements
sizeof(*primary_key_offsets), //size of each element
primary_key_part_compare
);
/* Init shared structure */
share = get_share(name, table_share);
if (share == NULL) {
ret_val = 1;
goto exit;
}
thr_lock_data_init(&share->lock, &lock, NULL);
......@@ -666,100 +1185,14 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
this, share, share->file, table, table->s, share->use_count);
}
if (!share->use_count++) {
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
if ((error = db_create(&share->file, db_env, 0))) {
free_share(share, 1);
my_free((char *) rec_buff, MYF(0));
rec_buff = NULL;
my_free(alloc_ptr, MYF(0));
alloc_ptr = NULL;
if (primary_key_offsets) {
my_free(primary_key_offsets, MYF(0));
primary_key_offsets = NULL;
}
my_errno = error;
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
make_name(newname, name, "main");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
if ((error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
ret_val = initialize_share(
name,
mode
);
if (ret_val) {
free_share(share, 1);
my_free((char *) rec_buff, MYF(0));
rec_buff = NULL;
my_free(alloc_ptr, MYF(0));
alloc_ptr = NULL;
if (primary_key_offsets) {
my_free(primary_key_offsets, MYF(0));
primary_key_offsets = NULL;
}
my_errno = error;
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
/* Open other keys; These are part of the share structure */
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
DB **ptr = share->key_file;
for (uint i = 0; i < table_share->keys; i++, ptr++) {
if (i != primary_key) {
if ((error = open_secondary_table(ptr,&table_share->key_info[i],name,mode,&share->key_type[i]))) {
__close(1);
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
}
}
if (!hidden_primary_key) {
//
// I realize this is incredibly confusing, and refactoring should take
// care of this, but we need to set the ref_length to start at 5, to account for
// the "infinity byte" in keys, and for placing the DBT size in the first four bytes
//
ref_length = sizeof(u_int32_t) + sizeof(uchar);
KEY_PART_INFO *key_part = table->key_info[primary_key].key_part;
KEY_PART_INFO *end = key_part + table->key_info[primary_key].key_parts;
for (; key_part != end; key_part++) {
ref_length += key_part->field->max_packed_col_length(key_part->length);
}
share->status |= STATUS_PRIMARY_KEY_INIT;
}
share->ref_length = ref_length;
error = get_status();
if (error || share->version < HA_TOKU_VERSION) {
__close(1);
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
//////////////////////
u_int64_t num_rows = 0;
int error = estimate_num_rows(share->file,&num_rows);
//
// estimate_num_rows should not fail under normal conditions
//
if (error == 0) {
share->rows = num_rows;
}
else {
__close(1);
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(1);
}
//
// initialize auto increment data
//
share->has_auto_inc = has_auto_increment_flag(&share->ai_field_index);
if (share->has_auto_inc) {
init_auto_increment();
pthread_mutex_unlock(&share->mutex);
goto exit;
}
}
ref_length = share->ref_length; // If second open
......@@ -767,14 +1200,22 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
key_read = false;
stats.block_size = 1<<20; // QQQ Tokudb DB block size
share->fixed_length_row = !(table_share->db_create_options & HA_OPTION_PACK_RECORD);
init_hidden_prim_key_info();
info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(0);
exit:
if (ret_val) {
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
alloc_ptr = NULL;
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
rec_buff = NULL;
if (error) {
my_errno = error;
}
}
TOKUDB_DBUG_RETURN(ret_val);
}
//
......@@ -847,30 +1288,6 @@ cleanup:
return error;
}
//
// states if table has an auto increment column, if so, sets index where auto inc column is to index
// Parameters:
// [out] index - if auto inc exists, then this param is set to where it exists in table, if not, then unchanged
// Returns:
// true if auto inc column exists, false otherwise
//
bool ha_tokudb::has_auto_increment_flag(uint* index) {
//
// check to see if we have auto increment field
//
bool ai_found = false;
uint ai_index = 0;
for (uint i = 0; i < table_share->fields; i++, ai_index++) {
Field* field = table->field[i];
if (field->flags & AUTO_INCREMENT_FLAG) {
ai_found = true;
*index = ai_index;
break;
}
}
return ai_found;
}
//
// helper function to write a piece of metadata in to status.tokudb
//
......@@ -957,10 +1374,8 @@ int ha_tokudb::__close(int mutex_is_locked) {
TOKUDB_TRACE("close:%p\n", this);
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
my_free(primary_key_offsets, MYF(MY_ALLOW_ZERO_PTR));
rec_buff = NULL;
alloc_ptr = NULL;
primary_key_offsets = NULL;
ha_tokudb::reset(); // current_row buffer
TOKUDB_DBUG_RETURN(free_share(share, mutex_is_locked));
}
......@@ -1010,65 +1425,18 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
int ha_tokudb::pack_row(
DBT * row,
const uchar* record,
bool strip_pk
const uchar* record,
uint index
)
{
uchar *ptr;
uchar* fixed_field_ptr = NULL;
uchar* var_field_offset_ptr = NULL;
uchar* start_field_data_ptr = NULL;
uchar* var_field_data_ptr = NULL;
int r = ENOSYS;
bzero((void *) row, sizeof(*row));
uint curr_skip_index;
KEY *key_info = &table->key_info[primary_key];
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
//
if (share->fixed_length_row) {
if (hidden_primary_key || !strip_pk) {
row->data = (void *)record;
row->size = table_share->reclength;
r = 0;
goto cleanup;
}
else {
//
// if the primary key is not hidden, then it is part of the record
// because primary key information is already stored in the key
// that will be passed to the fractal tree, we do not copy
// components that belong to the primary key
//
if (fix_rec_buff_for_blob(table_share->reclength)) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
uchar* tmp_dest = rec_buff;
const uchar* tmp_src = record;
uint i = 0;
//
// say we have 100 bytes in record, and bytes 25-50 and 75-90 belong to the primary key
// this for loop will do a memcpy [0,25], [51,75] and [90,100]
//
for (i =0; i < key_info->key_parts; i++){
uint curr_index = primary_key_offsets[i].part_index;
uint bytes_to_copy = record + key_info->key_part[curr_index].offset - tmp_src;
memcpy(tmp_dest,tmp_src, bytes_to_copy);
tmp_dest += bytes_to_copy;
tmp_src = record + key_info->key_part[curr_index].offset + key_info->key_part[curr_index].length;
}
memcpy(tmp_dest,tmp_src, record + table_share->reclength - tmp_src);
tmp_dest += record + table_share->reclength - tmp_src;
row->data = rec_buff;
row->size = (size_t) (tmp_dest - rec_buff);
r = 0;
goto cleanup;
}
}
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
......@@ -1079,55 +1447,54 @@ int ha_tokudb::pack_row(
/* Copy null bits */
memcpy(rec_buff, record, table_share->null_bytes);
ptr = rec_buff + table_share->null_bytes;
fixed_field_ptr = rec_buff + table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->mcp_info[index].var_len_offset;
start_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
var_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
//
// assert that when the hidden primary key exists, primary_key_offsets is NULL
//
assert( (hidden_primary_key != 0) == (primary_key_offsets == NULL));
curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field, table);
//
// if the primary key is hidden, primary_key_offsets will be NULL and
// this clause will not execute
//
if (primary_key_offsets &&
strip_pk &&
curr_skip_index < table_share->key_info[table_share->primary_key].key_parts
) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
// we have hit a field that is a portion of the primary key
//
uint curr_key_index = primary_key_offsets[curr_skip_index].part_index;
curr_skip_index++;
//
// only choose to continue over the key if the key's length matches the field's length
// otherwise, we may have a situation where the column is a varchar(10), the
// key is only the first 3 characters, and we end up losing the last 7 bytes of the
// column
//
if (table->key_info[primary_key].key_part[curr_key_index].length == (*field)->field_length) {
continue;
}
}
}
if (is_null_field(table, *field, record)) {
for (uint i = 0; i < table_share->fields; i++) {
Field* field = table->field[i];
uint curr_field_offset = field_offset(field, table);
if (bitmap_is_set(&share->key_filters[index],i)) {
continue;
}
ptr = (*field)->pack(ptr, (const uchar *)
(record + curr_field_offset));
if (share->field_lengths[i]) {
fixed_field_ptr = pack_fixed_field(
fixed_field_ptr,
record + curr_field_offset,
share->field_lengths[i]
);
}
else if (share->length_bytes[i]) {
var_field_data_ptr = pack_var_field(
var_field_offset_ptr,
var_field_data_ptr,
start_field_data_ptr,
record + curr_field_offset,
share->length_bytes[i],
share->num_offset_bytes
);
var_field_offset_ptr += share->num_offset_bytes;
}
}
for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]];
var_field_data_ptr = field->pack(
var_field_data_ptr,
record + field_offset(field, table)
);
}
row->data = rec_buff;
row->size = (size_t) (ptr - rec_buff);
row->size = (size_t) (var_field_data_ptr - rec_buff);
r = 0;
cleanup:
dbug_tmp_restore_column_map(table->write_set, old_map);
return r;
}
......@@ -1140,97 +1507,189 @@ cleanup:
void ha_tokudb::unpack_row(
uchar* record,
DBT const *row,
DBT const *key,
bool pk_stripped
DBT const *key,
uint index
)
{
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
//
if (share->fixed_length_row) {
if (hidden_primary_key || !pk_stripped) {
memcpy(record, (void *) row->data, table_share->reclength);
}
else {
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
KEY *key_info = &table_share->key_info[primary_key];
/* Copy null bits */
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
const uchar* fixed_field_ptr = (const uchar *) row->data;
const uchar* var_field_offset_ptr = NULL;
const uchar* var_field_data_ptr = NULL;
u_int32_t data_end_offset = 0;
memcpy(record, fixed_field_ptr, table_share->null_bytes);
fixed_field_ptr += table_share->null_bytes;
uchar* tmp_dest = record;
uchar* tmp_src = (uchar *)row->data;
uint i = 0;
var_field_offset_ptr = fixed_field_ptr + share->mcp_info[index].var_len_offset;
var_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
//
// unpack_key will fill in parts of record that are part of the primary key
//
unpack_key(record, key, primary_key);
//
// unpack the key, if necessary
//
if (!(hidden_primary_key && index == primary_key)) {
unpack_key(record,key,index);
}
u_int32_t last_offset = 0;
//
// we have two methods of unpacking, one if we need to unpack the entire row
// the second if we unpack a subset of the entire row
// first method here is if we unpack the entire row
//
if (unpack_entire_row) {
//
// fill in parts of record that are not part of the key
//
for (uint i = 0; i < table_share->fields; i++) {
Field* field = table->field[i];
if (bitmap_is_set(&share->key_filters[index],i)) {
continue;
}
if (share->field_lengths[i]) {
fixed_field_ptr = unpack_fixed_field(
record + field_offset(field, table),
fixed_field_ptr,
share->field_lengths[i]
);
}
//
// produces the opposite effect to what happened in pack_row
// first we fill in the parts of record that are not part of the key
// here, we DO modify var_field_data_ptr or var_field_offset_ptr
// as we unpack variable sized fields
//
for (i =0; i < key_info->key_parts; i++){
uint curr_index = primary_key_offsets[i].part_index;
uint bytes_to_copy = record + key_info->key_part[curr_index].offset - tmp_dest;
memcpy(tmp_dest,tmp_src, bytes_to_copy);
tmp_src += bytes_to_copy;
tmp_dest = record + key_info->key_part[curr_index].offset + key_info->key_part[curr_index].length;
}
memcpy(tmp_dest,tmp_src, record + table_share->reclength - tmp_dest);
dbug_tmp_restore_column_map(table->write_set, old_map);
else if (share->length_bytes[i]) {
switch (share->num_offset_bytes) {
case (1):
data_end_offset = var_field_offset_ptr[0];
break;
case (2):
data_end_offset = uint2korr(var_field_offset_ptr);
break;
default:
assert(false);
break;
}
unpack_var_field(
record + field_offset(field, table),
var_field_data_ptr,
data_end_offset - last_offset,
share->length_bytes[i]
);
var_field_offset_ptr += share->num_offset_bytes;
var_field_data_ptr += data_end_offset - last_offset;
last_offset = data_end_offset;
}
}
for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]];
var_field_data_ptr = field->unpack(
record + field_offset(field, table),
var_field_data_ptr
);
}
}
//
// in this case, we unpack only what is specified
// in fixed_cols_for_query and var_cols_for_query
//
else {
/* Copy null bits */
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
const uchar *ptr = (const uchar *) row->data;
memcpy(record, ptr, table_share->null_bytes);
ptr += table_share->null_bytes;
if (primary_key_offsets && pk_stripped) {
//
// unpack_key will fill in parts of record that are part of the primary key
//
unpack_key(record, key, primary_key);
//
// first the fixed fields
//
for (u_int32_t i = 0; i < num_fixed_cols_for_query; i++) {
uint field_index = fixed_cols_for_query[i];
Field* field = table->field[field_index];
unpack_fixed_field(
record + field_offset(field, table),
fixed_field_ptr + share->cp_info[index][field_index].col_pack_val,
share->field_lengths[field_index]
);
}
//
// fill in parts of record that are not part of the key
// now the var fields
// here, we do NOT modify var_field_data_ptr or var_field_offset_ptr
//
uint curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field, table);
if (primary_key_offsets &&
pk_stripped &&
curr_skip_index < table_share->key_info[table_share->primary_key].key_parts
) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
// we have hit a field that is a portion of the primary key
//
uint curr_key_index = primary_key_offsets[curr_skip_index].part_index;
curr_skip_index++;
//
// only choose to continue over the key if the key's length matches the field's length
// otherwise, we may have a situation where the column is a varchar(10), the
// key is only the first 3 characters, and we end up losing the last 7 bytes of the
// column
//
if (table->key_info[primary_key].key_part[curr_key_index].length == (*field)->field_length) {
continue;
}
for (u_int32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
Field* field = table->field[field_index];
u_int32_t var_field_index = share->cp_info[index][field_index].col_pack_val;
u_int32_t data_start_offset;
switch (share->num_offset_bytes) {
case (1):
data_end_offset = (var_field_offset_ptr + var_field_index)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_offset_ptr + 2*var_field_index);
break;
default:
assert(false);
break;
}
if (var_field_index) {
switch (share->num_offset_bytes) {
case (1):
data_start_offset = (var_field_offset_ptr + var_field_index - 1)[0];
break;
case (2):
data_start_offset = uint2korr(var_field_offset_ptr + 2*(var_field_index-1));
break;
default:
assert(false);
break;
}
}
//
// null bytes MUST have been copied before doing this
//
if (is_null_field(table, *field, record)) {
continue;
else {
data_start_offset = 0;
}
unpack_var_field(
record + field_offset(field, table),
var_field_data_ptr + data_start_offset,
data_end_offset - data_start_offset,
share->length_bytes[field_index]
);
}
//
// now the blobs
//
//
// need to set var_field_data_ptr to point to beginning of blobs, which
// is at the end of the var stuff (if they exist), if var stuff does not exist
// then the bottom variable will be 0, and var_field_data_ptr is already
// set correctly
//
if (share->mcp_info[index].len_of_offsets) {
switch (share->num_offset_bytes) {
case (1):
data_end_offset = (var_field_data_ptr - 1)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_data_ptr - 2);
break;
default:
assert(false);
break;
}
ptr = (*field)->unpack(record + field_offset(*field, table), ptr);
var_field_data_ptr += data_end_offset;
}
for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]];
var_field_data_ptr = field->unpack(
record + field_offset(field, table),
var_field_data_ptr
);
}
dbug_tmp_restore_column_map(table->write_set, old_map);
}
dbug_tmp_restore_column_map(table->write_set, old_map);
}
u_int32_t ha_tokudb::place_key_into_mysql_buff(
......@@ -1767,7 +2226,7 @@ int ha_tokudb::write_row(uchar * record) {
pthread_mutex_unlock(&share->mutex);
}
if ((error = pack_row(&row, (const uchar *) record, true))){
if ((error = pack_row(&row, (const uchar *) record, primary_key))){
goto cleanup;
}
......@@ -1823,7 +2282,6 @@ int ha_tokudb::write_row(uchar * record) {
// now insertion for rest of indexes
//
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool cluster_row_created = false;
if (keynr == primary_key) {
continue;
}
......@@ -1834,11 +2292,8 @@ int ha_tokudb::write_row(uchar * record) {
put_flags = DB_YESOVERWRITE;
}
if (table->key_info[keynr].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) record, false))){
goto cleanup;
}
cluster_row_created = true;
if ((error = pack_row(&row, (const uchar *) record, keynr))){
goto cleanup;
}
lockretry {
error = share->key_file[keynr]->put(
......@@ -1939,7 +2394,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
// Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one
if (!(error = remove_key(trans, primary_key, old_row, old_key))) {
if (!(error = pack_row(&row, new_row, true))) {
if (!(error = pack_row(&row, new_row, primary_key))) {
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key;
......@@ -1949,7 +2404,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
}
else {
// Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row, true))) {
if (!(error = pack_row(&row, new_row, primary_key))) {
error = share->file->put(share->file, trans, new_key, &row, 0);
}
}
......@@ -2038,7 +2493,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
// Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool cluster_row_created = false;
if (keynr == primary_key) {
continue;
}
......@@ -2053,11 +2507,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
put_flags = DB_YESOVERWRITE;
}
if (table->key_info[keynr].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) new_row, false))){
goto cleanup;
}
cluster_row_created = true;
if ((error = pack_row(&row, (const uchar *) new_row, keynr))){
goto cleanup;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
......@@ -2227,6 +2678,68 @@ int ha_tokudb::delete_row(const uchar * record) {
TOKUDB_DBUG_RETURN(error);
}
//
// takes as input table->read_set and table->write_set
// and puts list of field indexes that need to be read in
// unpack_row in the member variables fixed_cols_for_query
// and var_cols_for_query
//
void ha_tokudb::set_query_columns(uint keynr) {
u_int32_t curr_fixed_col_index = 0;
u_int32_t curr_var_col_index = 0;
read_key = false;
read_blobs = false;
//
// i know this is probably confusing and will need to be explained better
//
uint key_index = 0;
if (keynr == primary_key) {
key_index = primary_key;
}
else {
key_index = (table->key_info[keynr].flags & HA_CLUSTERING ? keynr : primary_key);
}
for (uint i = 0; i < table_share->fields; i++) {
if (bitmap_is_set(table->read_set,i) ||
bitmap_is_set(table->write_set,i)
)
{
if (bitmap_is_set(&share->key_filters[key_index],i)) {
read_key = true;
}
else {
//
// if fixed field length
//
if (share->field_lengths[i] != 0) {
//
// save the offset into the list
//
fixed_cols_for_query[curr_fixed_col_index] = i;
curr_fixed_col_index++;
}
//
// varchar or varbinary
//
else if (share->length_bytes[i] != 0) {
var_cols_for_query[curr_var_col_index] = i;
curr_var_col_index++;
}
//
// it is a blob
//
else {
read_blobs = true;
}
}
}
}
num_fixed_cols_for_query = curr_fixed_col_index;
num_var_cols_for_query = curr_var_col_index;
}
//
// Notification that a scan of entire secondary table is about
// to take place. Will pre acquire table read lock
......@@ -2264,6 +2777,7 @@ cleanup:
int ha_tokudb::index_init(uint keynr, bool sorted) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_init %p %d", this, keynr);
int error;
THD* thd = ha_thd();
DBUG_PRINT("enter", ("table: '%s' key: %d", table_share->table_name.str, keynr));
/*
......@@ -2282,8 +2796,19 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
last_cursor_error = error;
cursor = NULL; // Safety
goto exit;
}
bzero((void *) &last_key, sizeof(last_key));
if (thd_sql_command(thd) == SQLCOM_SELECT) {
set_query_columns(keynr);
unpack_entire_row = false;
}
else {
unpack_entire_row = true;
}
error = 0;
exit:
TOKUDB_DBUG_RETURN(error);
}
......@@ -2399,16 +2924,10 @@ void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT co
memcpy(key_buff, row->data, row->size);
}
//
// case we read from clustered key, so unpack the entire row
//
else if (keynr != primary_key && (table->key_info[keynr].flags & HA_CLUSTERING)) {
unpack_row(buf, row, found_key, false);
}
//
// case we read from the primary key, so unpack the entire row
// else read from clustered/primary key
//
else {
unpack_row(buf, row, found_key, true);
unpack_row(buf, row, found_key, keynr);
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
DBUG_VOID_RETURN;
......@@ -2433,7 +2952,7 @@ int ha_tokudb::read_full_row(uchar * buf) {
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &last_key, true);
unpack_row(buf, &current_row, &last_key, primary_key);
TOKUDB_DBUG_RETURN(0);
}
......@@ -2480,7 +2999,7 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
// in this case we have a clustered key, so no need to do pt query
//
if (table->key_info[keynr].flags & HA_CLUSTERING) {
unpack_row(buf, row, found_key, false);
unpack_row(buf, row, found_key, keynr);
TOKUDB_DBUG_RETURN(0);
}
//
......@@ -2499,7 +3018,7 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &key, true);
unpack_row(buf, &current_row, &key, primary_key);
}
else {
//
......@@ -2509,7 +3028,7 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
unpack_key(buf, found_key, keynr);
}
else {
unpack_row(buf, row, found_key, true);
unpack_row(buf, row, found_key, primary_key);
}
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
......@@ -3192,7 +3711,7 @@ void ha_tokudb::position(const uchar * record) {
if (hidden_primary_key) {
DBUG_ASSERT(ref_length == (TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH + sizeof(u_int32_t)));
memcpy_fixed(ref + sizeof(u_int32_t), current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
*(u_int32_t *)ref = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
*(u_int32_t *)ref = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
}
else {
bool has_null;
......@@ -3564,7 +4083,6 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l
DBUG_RETURN(to);
}
int toku_dbt_up(DB*,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
......@@ -3572,6 +4090,7 @@ int toku_dbt_up(DB*,
return 0;
}
static int create_sub_table(const char *table_name, int flags , DBT* row_descriptor) {
TOKUDB_DBUG_ENTER("create_sub_table");
int error;
......@@ -3777,7 +4296,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(table_share->fields * 6)+10 ,MYF(MY_WME));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
dirname = (char *)my_malloc(get_name_length(name) + NAME_CHAR_LEN,MYF(MY_WME));
......@@ -3880,6 +4399,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
if (!(error = (db_create(&status_block, db_env, 0)))) {
make_name(newname, name, "status");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
//
// create a row descriptor that is the same as a hidden primary key
//
......@@ -4451,12 +4971,40 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
num_files_created++;
}
for (uint i = 0; i < table_share->keys + test(hidden_primary_key); i++) {
}
//
// open all the DB files and set the appropriate variables in share
// they go to the end of share->key_file
//
curr_index = curr_num_DBs;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_CLUSTERING) {
set_key_filter(
&share->key_filters[curr_index],
&key_info[i],
table_arg,
false
);
if (!hidden_primary_key) {
set_key_filter(
&share->key_filters[curr_index],
&table_arg->key_info[primary_key],
table_arg,
false
);
}
error = initialize_col_pack_info(share,table_arg->s,curr_index);
if (error) {
goto cleanup;
}
}
error = open_secondary_table(
&share->key_file[curr_index],
&key_info[i],
......@@ -4519,7 +5067,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
info.buf = tmp_record;
info.pk_index = primary_key; // needed so that clustering indexes being created will have right pk info
unpack_entire_row = true;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
......@@ -4528,7 +5076,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
for (uint i = 0; i < num_of_keys; i++) {
bool cluster_row_created = false;
DBT secondary_key, row;
bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null);
......@@ -4539,11 +5086,8 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
if (key_info[i].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) tmp_record, false))){
goto cleanup;
}
cluster_row_created = true;
if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){
goto cleanup;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
......
......@@ -5,6 +5,15 @@
#include <db.h>
#include "hatoku_cmp.h"
typedef struct st_col_pack_info {
u_int32_t col_pack_val; //offset if fixed, pack_index if var
} COL_PACK_INFO;
typedef struct st_multi_col_pack_info {
u_int32_t var_len_offset; //where the fixed length stuff ends and the offsets for var stuff begins
u_int32_t len_of_offsets; //length of the offset bytes in a packed row
} MULTI_COL_PACK_INFO;
typedef struct st_tokudb_share {
char *table_name;
uint table_name_length, use_count;
......@@ -36,7 +45,6 @@ typedef struct st_tokudb_share {
u_int32_t key_type[MAX_KEY +1];
uint status, version, capabilities;
uint ref_length;
bool fixed_length_row;
//
// whether table has an auto increment column
//
......@@ -45,6 +53,15 @@ typedef struct st_tokudb_share {
// index of auto increment column in table->field, if auto_inc exists
//
uint ai_field_index;
MY_BITMAP key_filters[MAX_KEY+1];
uchar* field_lengths; //stores the field lengths of fixed size fields (255 max)
uchar* length_bytes; // stores the length of lengths of varchars and varbinaries
u_int32_t* blob_fields; // list of indexes of blob fields
u_int32_t num_blobs;
MULTI_COL_PACK_INFO mcp_info[MAX_KEY+1];
COL_PACK_INFO* cp_info[MAX_KEY+1];
u_int32_t num_offset_bytes; //number of bytes needed to encode the offset
} TOKUDB_SHARE;
#define HA_TOKU_VERSION 2
......@@ -60,16 +77,16 @@ typedef struct st_tokudb_share {
// in status.tokudb
//
typedef ulonglong HA_METADATA_KEY;
#define hatoku_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
typedef ulonglong HA_METADATA_KEY;
#define hatoku_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
typedef struct st_prim_key_part_info {
typedef struct st_filter_key_part_info {
uint offset;
uint part_index;
} PRIM_KEY_PART_INFO;
} FILTER_KEY_PART_INFO;
typedef enum {
lock_read = 0,
......@@ -125,6 +142,19 @@ private:
//
uchar *primary_key_buff;
bool unpack_entire_row;
//
// buffers (and their sizes) that will hold the indexes
// of fields that need to be read for a query
//
u_int32_t* fixed_cols_for_query;
u_int32_t num_fixed_cols_for_query;
u_int32_t* var_cols_for_query;
u_int32_t num_var_cols_for_query;
bool read_blobs;
bool read_key;
//
// transaction used by ha_tokudb's cursor
//
......@@ -159,11 +189,6 @@ private:
ulonglong num_deleted_rows_in_stmt;
ulonglong num_updated_rows_in_stmt;
//
// index into key_file that holds DB* that is indexed on
// the primary_key. this->key_file[primary_index] == this->file
//
uint primary_key;
uint last_dup_key;
//
// if set to 0, then the primary key is not hidden
......@@ -186,8 +211,6 @@ private:
//
bool range_lock_grabbed;
PRIM_KEY_PART_INFO* primary_key_offsets;
//
// buffer for updating the status of long insert, delete, and update
// statements. Right now, the the messages are
......@@ -200,7 +223,11 @@ private:
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record, bool strip_pk);
int pack_row(
DBT * row,
const uchar* record,
uint index
);
u_int32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data);
void unpack_key(uchar * record, DBT const *key, uint index);
u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
......@@ -222,6 +249,12 @@ private:
int update_max_auto_inc(DB* db, ulonglong val);
int write_auto_inc_create(DB* db, ulonglong val);
void init_auto_increment();
int initialize_share(
const char* name,
int mode
);
void set_query_columns(uint keynr);
public:
......@@ -361,10 +394,21 @@ public:
void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void unpack_row(uchar * record, DBT const *row, DBT const *key, bool pk_stripped);
void unpack_row(
uchar* record,
DBT const *row,
DBT const *key,
uint index
);
int heavi_ret_val;
//
// index into key_file that holds DB* that is indexed on
// the primary_key. this->key_file[primary_index] == this->file
//
uint primary_key;
private:
int read_full_row(uchar * buf);
int __close(int mutex_is_locked);
......
......@@ -27,7 +27,7 @@ typedef enum {
} TOKU_TYPE;
inline TOKU_TYPE mysql_to_toku_type (enum_field_types mysql_type);
inline TOKU_TYPE mysql_to_toku_type (Field* field);
uchar* pack_toku_key_field(
......@@ -51,6 +51,7 @@ uchar* unpack_toku_key_field(
u_int32_t key_part_length
);
//
// for storing NULL byte in keys
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment