Commit d75d6490 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #846

merge back into main branch

git-svn-id: file:///svn/mysql/tokudb-engine/src@4138 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5924dbb1
...@@ -171,6 +171,7 @@ static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * pri ...@@ -171,6 +171,7 @@ static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * pri
static int tokudb_close_connection(handlerton * hton, THD * thd); static int tokudb_close_connection(handlerton * hton, THD * thd);
static int tokudb_commit(handlerton * hton, THD * thd, bool all); static int tokudb_commit(handlerton * hton, THD * thd, bool all);
static int tokudb_rollback(handlerton * hton, THD * thd, bool all); static int tokudb_rollback(handlerton * hton, THD * thd, bool all);
static uint tokudb_alter_table_flags(uint flags);
#if 0 #if 0
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
...@@ -218,7 +219,7 @@ static int tokudb_init_func(void *p) { ...@@ -218,7 +219,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton->panic = tokudb_end; tokudb_hton->panic = tokudb_end;
tokudb_hton->flush_logs = tokudb_flush_logs; tokudb_hton->flush_logs = tokudb_flush_logs;
tokudb_hton->show_status = tokudb_show_status; tokudb_hton->show_status = tokudb_show_status;
tokudb_hton->alter_table_flags = tokudb_alter_table_flags;
#if 0 #if 0
if (!tokudb_tmpdir) if (!tokudb_tmpdir)
tokudb_tmpdir = mysql_tmpdir; tokudb_tmpdir = mysql_tmpdir;
...@@ -366,30 +367,27 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) { ...@@ -366,30 +367,27 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
if (!(share = (TOKUDB_SHARE *) hash_search(&tokudb_open_tables, (uchar *) table_name, length))) { if (!(share = (TOKUDB_SHARE *) hash_search(&tokudb_open_tables, (uchar *) table_name, length))) {
ulong *rec_per_key; ulong *rec_per_key;
char *tmp_name; char *tmp_name;
DB **key_file;
u_int32_t *key_type; u_int32_t *key_type;
uint keys = table->s->keys; uint num_keys = table->s->keys;
if (!(share = (TOKUDB_SHARE *) if (!(share = (TOKUDB_SHARE *)
my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
&share, sizeof(*share), &share, sizeof(*share),
&tmp_name, length + 1, &tmp_name, length + 1,
&rec_per_key, keys * sizeof(ha_rows), &rec_per_key, num_keys * sizeof(ha_rows),
&key_file, (keys + 1) * sizeof(*key_file), &key_type, (num_keys + 1) * sizeof(u_int32_t),
&key_type, (keys + 1) * sizeof(u_int32_t),
NullS))) { NullS))) {
pthread_mutex_unlock(&tokudb_mutex); pthread_mutex_unlock(&tokudb_mutex);
return NULL; return NULL;
} }
share->use_count = 0; share->use_count = 0;
share->table_name_length = length; share->table_name_length = length;
share->table_name = tmp_name; share->table_name = tmp_name;
strmov(share->table_name, table_name); strmov(share->table_name, table_name);
share->rec_per_key = rec_per_key; share->rec_per_key = rec_per_key;
share->key_file = key_file;
share->key_type = key_type; share->key_type = key_type;
bzero((void *) share->key_file, sizeof(share->key_file));
if (my_hash_insert(&tokudb_open_tables, (uchar *) share)) if (my_hash_insert(&tokudb_open_tables, (uchar *) share))
goto error; goto error;
...@@ -409,7 +407,7 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) { ...@@ -409,7 +407,7 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_key, bool mutex_is_locked) { static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_key, bool mutex_is_locked) {
int error, result = 0; int error, result = 0;
uint keys = table->s->keys + test(hidden_primary_key); uint num_keys = table->s->keys + test(hidden_primary_key);
pthread_mutex_lock(&tokudb_mutex); pthread_mutex_lock(&tokudb_mutex);
...@@ -422,7 +420,7 @@ static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_k ...@@ -422,7 +420,7 @@ static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_k
/* this does share->file->close() implicitly */ /* this does share->file->close() implicitly */
update_status(share, table); update_status(share, table);
for (uint i = 0; i < keys; i++) { for (uint i = 0; i < num_keys; i++) {
if (tokudb_debug & TOKUDB_DEBUG_OPEN) if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("dbclose:%p\n", key_file[i]); TOKUDB_TRACE("dbclose:%p\n", key_file[i]);
if (key_file[i] && (error = key_file[i]->close(key_file[i], 0))) if (key_file[i] && (error = key_file[i]->close(key_file[i], 0)))
...@@ -646,6 +644,20 @@ void tokudb_cleanup_log_files(void) { ...@@ -646,6 +644,20 @@ void tokudb_cleanup_log_files(void) {
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
//
// *******NOTE*****
// If the flags HA_ONLINE_DROP_INDEX and HA_ONLINE_DROP_UNIQUE_INDEX
// are ever added, prepare_drop_index and final_drop_index will need to be modified
// so that the actual deletion of DB's is done in final_drop_index and not prepare_drop_index
//
static uint tokudb_alter_table_flags(uint flags)
{
return (HA_ONLINE_ADD_INDEX_NO_WRITES| HA_ONLINE_DROP_INDEX_NO_WRITES |
HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES| HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES);
}
static int get_name_length(const char *name) { static int get_name_length(const char *name) {
int n = 0; int n = 0;
const char *newname = name; const char *newname = name;
...@@ -672,9 +684,10 @@ static void make_name(char *newname, const char *tablename, const char *dictname ...@@ -672,9 +684,10 @@ static void make_name(char *newname, const char *tablename, const char *dictname
nn += sprintf(nn, "/%s%s", dictname, ha_tokudb_ext); nn += sprintf(nn, "/%s%s", dictname, ha_tokudb_ext);
} }
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg) ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg)
: :
handler(hton, table_arg), alloc_ptr(0), rec_buff(0), file(0), handler(hton, table_arg), alloc_ptr(0), rec_buff(0),
// flags defined in sql\handler.h // flags defined in sql\handler.h
int_table_flags(HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX | int_table_flags(HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX |
HA_FILE_BASED | HA_CAN_GEOMETRY | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX), HA_FILE_BASED | HA_CAN_GEOMETRY | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX),
...@@ -940,6 +953,60 @@ static bool tokudb_key_cmp(TABLE * table, KEY * key_info, const uchar * key, uin ...@@ -940,6 +953,60 @@ static bool tokudb_key_cmp(TABLE * table, KEY * key_info, const uchar * key, uin
} }
#endif #endif
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
//
int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type) {
int error = ENOSYS;
char part[MAX_ALIAS_NAME + 10];
char name_buff[FN_REFLEN];
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
char newname[strlen(name) + 32];
DBT cmp_byte_stream;
if (tokudb_init_flags & DB_INIT_TXN)
open_flags += DB_AUTO_COMMIT;
if ((error = db_create(ptr, db_env, 0))) {
my_errno = error;
goto cleanup;
}
sprintf(part, "key-%s", key_info->name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
*key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->app_private = (void *) (key_info);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(*ptr, &cmp_byte_stream))) {
my_errno = error;
goto cleanup;
}
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0));
}
else
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(key_info->flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error;
goto cleanup;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
}
cleanup:
return error;
}
// //
// Creates and opens a handle to a table which already exists in a tokudb // Creates and opens a handle to a table which already exists in a tokudb
// database. // database.
...@@ -994,13 +1061,10 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -994,13 +1061,10 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
thr_lock_data_init(&share->lock, &lock, NULL); thr_lock_data_init(&share->lock, &lock, NULL);
key_file = share->key_file;
key_type = share->key_type;
bzero((void *) &current_row, sizeof(current_row)); bzero((void *) &current_row, sizeof(current_row));
/* Fill in shared structure, if needed */ /* Fill in shared structure, if needed */
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
file = share->file;
if (tokudb_debug & TOKUDB_DEBUG_OPEN) if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n", TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n",
this, share, share->file, table, table->s, share->use_count); this, share, share->file, table, table->s, share->use_count);
...@@ -1008,37 +1072,36 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1008,37 +1072,36 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
DBUG_PRINT("info", ("share->use_count %u", share->use_count)); DBUG_PRINT("info", ("share->use_count %u", share->use_count));
DBT cmp_byte_stream; DBT cmp_byte_stream;
if ((error = db_create(&file, db_env, 0))) { if ((error = db_create(&share->file, db_env, 0))) {
free_share(share, table, hidden_primary_key, 1); free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0)); my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0)); my_free(alloc_ptr, MYF(0));
my_errno = error; my_errno = error;
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
share->file = file;
if (!hidden_primary_key) if (!hidden_primary_key)
file->app_private = (void *) (table_share->key_info + table_share->primary_key); share->file->app_private = (void *) (table_share->key_info + table_share->primary_key);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) { if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream)); bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC; cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(file, &cmp_byte_stream))) { if ((error = tokutrace_db_get_cmp_byte_stream(share->file, &cmp_byte_stream))) {
free_share(share, table, hidden_primary_key, 1); free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0)); my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0)); my_free(alloc_ptr, MYF(0));
my_errno = error; my_errno = error;
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
file->set_bt_compare(file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key)); share->file->set_bt_compare(share->file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
my_free(cmp_byte_stream.data, MYF(0)); my_free(cmp_byte_stream.data, MYF(0));
} }
else else
file->set_bt_compare(file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key)); share->file->set_bt_compare(share->file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
char newname[strlen(name) + 32]; char newname[strlen(name) + 32];
make_name(newname, name, "main"); make_name(newname, name, "main");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
if ((error = file->open(file, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) { if ((error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
free_share(share, table, hidden_primary_key, 1); free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0)); my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0)); my_free(alloc_ptr, MYF(0));
...@@ -1046,53 +1109,19 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1046,53 +1109,19 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
if (tokudb_debug & TOKUDB_DEBUG_OPEN) if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("open:%s:file=%p\n", newname, file); TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
/* Open other keys; These are part of the share structure */ /* Open other keys; These are part of the share structure */
key_file[primary_key] = file; share->key_file[primary_key] = share->file;
key_type[primary_key] = hidden_primary_key ? 0 : DB_NOOVERWRITE; share->key_type[primary_key] = hidden_primary_key ? 0 : DB_NOOVERWRITE;
DB **ptr = key_file; DB **ptr = share->key_file;
for (uint i = 0, used_keys = 0; i < table_share->keys; i++, ptr++) { for (uint i = 0; i < table_share->keys; i++, ptr++) {
char part[16];
if (i != primary_key) { if (i != primary_key) {
if ((error = db_create(ptr, db_env, 0))) { if ((error = open_secondary_table(ptr,&table_share->key_info[i],name,mode,&share->key_type[i]))) {
__close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
sprintf(part, "key%d", ++used_keys);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
key_type[i] = table->key_info[i].flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->app_private = (void *) (table_share->key_info + i);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(*ptr, &cmp_byte_stream))) {
__close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0));
}
else
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(table->key_info[i].flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %u", i));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
__close(1); __close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
} }
} }
/* Calculate pack_length of primary key */ /* Calculate pack_length of primary key */
...@@ -1275,12 +1304,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) { ...@@ -1275,12 +1304,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) {
} }
} }
// //
// Create a packed key from a row. This key will be written as such // Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated. // to the index tree. This will never fail as the key buffer is pre-allocated.
// Parameters: // Parameters:
// [out] key - DBT that holds the key // [out] key - DBT that holds the key
// keynr - index for which to create the key // [in] key_info - holds data about the key, such as it's length and offset into record
// [out] buff - buffer that will hold the data for key (unless // [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key) // we have a hidden primary key)
// [in] record - row from which to create the key // [in] record - row from which to create the key
...@@ -1289,23 +1319,19 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) { ...@@ -1289,23 +1319,19 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) {
// the parameter key // the parameter key
// //
DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) { DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_key");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
DBUG_RETURN(key);
}
KEY *key_info = table->key_info + keynr;
KEY_PART_INFO *key_part = key_info->key_part; KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts; KEY_PART_INFO *end = key_part + key_info->key_parts;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff; key->data = buff;
for (; key_part != end && key_length > 0; key_part++) { for (; key_part != end && key_length > 0; key_part++) {
if (key_part->null_bit) { //
// accessing key_part->field->null_bit instead off key_part->null_bit
// because key_part->null_bit is not set in add_index
// filed ticket 862 to look into this
//
if (key_part->field->null_bit) {
/* Store 0 if the key part is a NULL part */ /* Store 0 if the key part is a NULL part */
if (record[key_part->null_offset] & key_part->null_bit) { if (record[key_part->null_offset] & key_part->null_bit) {
*buff++ = 0; *buff++ = 0;
...@@ -1318,7 +1344,12 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re ...@@ -1318,7 +1344,12 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re
} }
*buff++ = 1; // Store NOT NULL marker *buff++ = 1; // Store NOT NULL marker
} }
buff = key_part->field->pack_key(buff, (uchar *) (record + key_part->offset), //
// accessing field_offset(key_part->field) instead off key_part->offset
// because key_part->offset is SET INCORRECTLY in add_index
// filed ticket 862 to look into this
//
buff = key_part->field->pack_key(buff, (uchar *) (record + field_offset(key_part->field)),
#if MYSQL_VERSION_ID < 50123 #if MYSQL_VERSION_ID < 50123
key_part->length); key_part->length);
#else #else
...@@ -1329,7 +1360,32 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re ...@@ -1329,7 +1360,32 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re
key->size = (buff - (uchar *) key->data); key->size = (buff - (uchar *) key->data);
DBUG_DUMP("key", (uchar *) key->data, key->size); DBUG_DUMP("key", (uchar *) key->data, key->size);
dbug_tmp_restore_column_map(table->write_set, old_map); dbug_tmp_restore_column_map(table->write_set, old_map);
DBUG_RETURN(key); return key;
}
//
// Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated.
// Parameters:
// [out] key - DBT that holds the key
// keynr - index for which to create the key
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
// key_length - currently set to MAX_KEY_LENGTH, is it size of buff?
// Returns:
// the parameter key
//
DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
DBUG_RETURN(key);
}
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record,key_length));
} }
...@@ -1613,36 +1669,38 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -1613,36 +1669,38 @@ int ha_tokudb::write_row(uchar * record) {
get_auto_primary_key(current_ident); get_auto_primary_key(current_ident);
} }
u_int32_t put_flags = key_type[primary_key]; u_int32_t put_flags = share->key_type[primary_key];
THD *thd = ha_thd(); THD *thd = ha_thd();
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} }
if (table_share->keys + test(hidden_primary_key) == 1) { if (table_share->keys + test(hidden_primary_key) == 1) {
error = file->put(file, transaction, create_key(&prim_key, primary_key, key_buff, record), &row, put_flags); error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags);
last_dup_key = primary_key; last_dup_key = primary_key;
} else { } else {
DB_TXN *sub_trans = transaction; DB_TXN *sub_trans = transaction;
/* QQQ Don't use sub transactions in temporary tables */ /* QQQ Don't use sub transactions in temporary tables */
for (uint retry = 0; retry < tokudb_trans_retry; retry++) { for (uint retry = 0; retry < tokudb_trans_retry; retry++) {
key_map changed_keys(0); key_map changed_keys(0);
if (!(error = file->put(file, sub_trans, create_key(&prim_key, primary_key, key_buff, record), &row, put_flags))) { if (!(error = share->file->put(share->file, sub_trans, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags))) {
changed_keys.set_bit(primary_key); changed_keys.set_bit(primary_key);
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
if (keynr == primary_key) if (keynr == primary_key)
continue; continue;
put_flags = key_type[keynr]; put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
if ((error = key_file[keynr]->put(key_file[keynr], sub_trans, create_key(&key, keynr, key_buff2, record), &prim_key, put_flags))) { if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, record), &prim_key, put_flags))) {
last_dup_key = keynr; last_dup_key = keynr;
break; break;
} }
changed_keys.set_bit(keynr); changed_keys.set_bit(keynr);
} }
} else }
else {
last_dup_key = primary_key; last_dup_key = primary_key;
}
if (error) { if (error) {
/* Remove inserted row */ /* Remove inserted row */
DBUG_PRINT("error", ("Got error %d", error)); DBUG_PRINT("error", ("Got error %d", error));
...@@ -1711,12 +1769,12 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons ...@@ -1711,12 +1769,12 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
// Delete the old row and add a new one // Delete the old row and add a new one
if (!(error = remove_key(trans, primary_key, old_row, old_key))) { if (!(error = remove_key(trans, primary_key, old_row, old_key))) {
if (!(error = pack_row(&row, new_row))) { if (!(error = pack_row(&row, new_row))) {
if ((error = file->put(file, trans, new_key, &row, key_type[primary_key]))) { if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed // Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key; last_dup_key = primary_key;
if (local_using_ignore) { if (local_using_ignore) {
int new_error; int new_error;
if ((new_error = pack_row(&row, old_row)) || (new_error = file->put(file, trans, old_key, &row, key_type[primary_key]))) if ((new_error = pack_row(&row, old_row)) || (new_error = share->file->put(share->file, trans, old_key, &row, share->key_type[primary_key])))
error = new_error; // fatal error error = new_error; // fatal error
} }
} }
...@@ -1725,7 +1783,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons ...@@ -1725,7 +1783,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
} else { } else {
// Primary key didn't change; just update the row data // Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row))) if (!(error = pack_row(&row, new_row)))
error = file->put(file, trans, new_key, &row, 0); error = share->file->put(share->file, trans, new_key, &row, 0);
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -1756,7 +1814,7 @@ int ha_tokudb::restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary ...@@ -1756,7 +1814,7 @@ int ha_tokudb::restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary
if (changed_keys->is_set(keynr)) { if (changed_keys->is_set(keynr)) {
if (changed_keys->is_prefix(1) && (error = remove_key(trans, keynr, new_row, new_key))) if (changed_keys->is_prefix(1) && (error = remove_key(trans, keynr, new_row, new_key)))
break; break;
if ((error = key_file[keynr]->put(key_file[keynr], trans, create_key(&tmp_key, keynr, key_buff2, old_row), old_key, key_type[keynr]))) if ((error = share->key_file[keynr]->put(share->key_file[keynr], trans, create_dbt_key_from_table(&tmp_key, keynr, key_buff2, old_row), old_key, share->key_type[keynr])))
break; break;
} }
} }
...@@ -1795,10 +1853,10 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -1795,10 +1853,10 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
prim_key.size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH; prim_key.size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
old_prim_key = prim_key; old_prim_key = prim_key;
} else { } else {
create_key(&prim_key, primary_key, key_buff, new_row); create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row);
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row))) if ((primary_key_changed = key_cmp(primary_key, old_row, new_row)))
create_key(&old_prim_key, primary_key, primary_key_buff, old_row); create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row);
else else
old_prim_key = prim_key; old_prim_key = prim_key;
} }
...@@ -1817,7 +1875,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -1817,7 +1875,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_RETURN(error); // Fatal error TOKUDB_DBUG_RETURN(error); // Fatal error
} }
changed_keys.set_bit(keynr); changed_keys.set_bit(keynr);
if ((error = key_file[keynr]->put(key_file[keynr], sub_trans, create_key(&key, keynr, key_buff2, new_row), &prim_key, key_type[keynr]))) { if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, new_row), &prim_key, share->key_type[keynr]))) {
last_dup_key = keynr; last_dup_key = keynr;
break; break;
} }
...@@ -1873,7 +1931,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -1873,7 +1931,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
else if (keynr == primary_key || ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)) { // Unique key else if (keynr == primary_key || ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)) { // Unique key
DBUG_PRINT("Unique key", ("index: %d", keynr)); DBUG_PRINT("Unique key", ("index: %d", keynr));
DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2); DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
error = key_file[keynr]->del(key_file[keynr], trans, keynr == primary_key ? prim_key : create_key(&key, keynr, key_buff2, record), 0); error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record), 0);
} else { } else {
/* QQQ use toku_db_delboth(key_file[keynr], key, val, trans); /* QQQ use toku_db_delboth(key_file[keynr], key, val, trans);
To delete the not duplicated key, we need to open an cursor on the To delete the not duplicated key, we need to open an cursor on the
...@@ -1882,8 +1940,8 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -1882,8 +1940,8 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
*/ */
DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2); DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor; DBC *tmp_cursor;
if (!(error = key_file[keynr]->cursor(key_file[keynr], trans, &tmp_cursor, 0))) { if (!(error = share->key_file[keynr]->cursor(share->key_file[keynr], trans, &tmp_cursor, 0))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_key(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) { if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) {
DBUG_DUMP("cget key", (uchar *) key.data, key.size); DBUG_DUMP("cget key", (uchar *) key.data, key.size);
error = tmp_cursor->c_del(tmp_cursor, 0); error = tmp_cursor->c_del(tmp_cursor, 0);
} }
...@@ -1936,7 +1994,7 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -1936,7 +1994,7 @@ int ha_tokudb::delete_row(const uchar * record) {
key_map keys = table_share->keys_in_use; key_map keys = table_share->keys_in_use;
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_key(&prim_key, primary_key, key_buff, record); create_dbt_key_from_table(&prim_key, primary_key, key_buff, record);
if (hidden_primary_key) if (hidden_primary_key)
keys.set_bit(primary_key); keys.set_bit(primary_key);
...@@ -1983,8 +2041,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -1983,8 +2041,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
} }
active_index = keynr; active_index = keynr;
DBUG_ASSERT(keynr <= table->s->keys); DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(key_file[keynr]); DBUG_ASSERT(share->key_file[keynr]);
if ((error = key_file[keynr]->cursor(key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0))) if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0)))
cursor = NULL; // Safety cursor = NULL; // Safety
bzero((void *) &last_key, sizeof(last_key)); bzero((void *) &last_key, sizeof(last_key));
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -2074,7 +2132,7 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou ...@@ -2074,7 +2132,7 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
// Read the data into current_row // Read the data into current_row
// //
current_row.flags = DB_DBT_REALLOC; current_row.flags = DB_DBT_REALLOC;
if ((error = file->get(file, transaction, &key, &current_row, 0))) { if ((error = share->file->get(share->file, transaction, &key, &current_row, 0))) {
table->status = STATUS_NOT_FOUND; table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
} }
...@@ -2109,7 +2167,7 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k ...@@ -2109,7 +2167,7 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k
table->in_use->status_var.ha_read_key_count++; table->in_use->status_var.ha_read_key_count++;
current_row.flags = DB_DBT_REALLOC; current_row.flags = DB_DBT_REALLOC;
active_index = MAX_KEY; active_index = MAX_KEY;
TOKUDB_DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0)); TOKUDB_DBUG_RETURN(read_row(share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
} }
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas. //TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas.
...@@ -2485,7 +2543,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -2485,7 +2543,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
active_index = MAX_KEY; active_index = MAX_KEY;
DBT* key = get_pos(&db_pos, pos); DBT* key = get_pos(&db_pos, pos);
TOKUDB_DBUG_RETURN(read_row(file->get(file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0)); TOKUDB_DBUG_RETURN(read_row(share->file->get(share->file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0));
} }
/* /*
...@@ -2516,7 +2574,7 @@ void ha_tokudb::position(const uchar * record) { ...@@ -2516,7 +2574,7 @@ void ha_tokudb::position(const uchar * record) {
DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
} else { } else {
create_key(&key, primary_key, ref, record); create_dbt_key_from_table(&key, primary_key, ref, record);
if (key.size < ref_length) if (key.size < ref_length)
bzero(ref + key.size, ref_length - key.size); bzero(ref + key.size, ref_length - key.size);
} }
...@@ -2836,6 +2894,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -2836,6 +2894,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
char newname[get_name_length(name) + 32]; char newname[get_name_length(name) + 32];
uint i; uint i;
//
// tracing information about what type of table we are creating
//
if (tokudb_debug & TOKUDB_DEBUG_OPEN) { if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
for (i = 0; i < form->s->fields; i++) { for (i = 0; i < form->s->fields; i++) {
Field *field = form->s->field[i]; Field *field = form->s->field[i];
...@@ -2854,8 +2916,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -2854,8 +2916,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
} }
} }
//
// check if auto increment is properly defined
// tokudb only supports auto increment on the first field in the primary key // tokudb only supports auto increment on the first field in the primary key
// or the first field in the row // or the first field in the row
//
int pk_found = 0; int pk_found = 0;
int ai_found = 0; int ai_found = 0;
for (i = 0; i < form->s->keys; i++) { for (i = 0; i < form->s->keys; i++) {
...@@ -2902,11 +2967,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -2902,11 +2967,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
primary_key = form->s->primary_key; primary_key = form->s->primary_key;
/* Create the keys */ /* Create the keys */
char part[16]; char part[MAX_ALIAS_NAME + 10];
uint index = 1;
for (uint i = 0; i < form->s->keys; i++) { for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) { if (i != primary_key) {
sprintf(part, "key%d", index++); sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part); make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (form->key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT); error = create_sub_table(name_buff, NULL, DB_BTREE, (form->key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
...@@ -3055,7 +3119,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range* ...@@ -3055,7 +3119,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
TOKUDB_DBUG_ENTER("ha_tokudb::records_in_range"); TOKUDB_DBUG_ENTER("ha_tokudb::records_in_range");
DBT key; DBT key;
ha_rows ret_val = HA_TOKUDB_RANGE_COUNT; ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
DB *kfile = key_file[keynr]; DB *kfile = share->key_file[keynr];
u_int64_t less, equal, greater; u_int64_t less, equal, greater;
u_int64_t start_rows, end_rows, rows; u_int64_t start_rows, end_rows, rows;
int is_exact; int is_exact;
...@@ -3151,6 +3215,220 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl ...@@ -3151,6 +3215,220 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
//
// Adds indexes to the table. Takes the array of KEY passed in key_info, and creates
// DB's that will go at the end of share->key_file. THE IMPLICIT ASSUMPTION HERE is
// that the table will be modified and that these added keys will be appended to the end
// of the array table->key_info
// Parameters:
// [in] table_arg - table that is being modified, seems to be identical to this->table
// [in] key_info - array of KEY's to be added
// num_of_keys - number of keys to be added, number of elements in key_info
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::add_index");
char name_buff[FN_REFLEN];
int error;
char newname[share->table_name_length + 32];
uint curr_index = 0;
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT current_primary_key;
DBT row;
DB_TXN* txn = NULL;
//
// these variables are for error handling
//
uint num_files_created = 0;
uint num_DB_opened = 0;
//
// in unpack_row, MySQL passes a buffer that is this long,
// so this length should be good enough for us as well
//
uchar tmp_record[table_arg->s->rec_buff_length];
bzero((void *) &current_primary_key, sizeof(current_primary_key));
bzero((void *) &row, sizeof(row));
//
// first create all the DB's files
//
char part[MAX_ALIAS_NAME + 10];
for (uint i = 0; i < num_of_keys; i++) {
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
if (error) { goto cleanup; }
num_files_created++;
}
//
// open all the DB files and set the appropriate variables in share
// they go to the end of share->key_file
//
curr_index = table_arg->s->keys;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
error = open_secondary_table(
&share->key_file[curr_index],
&key_info[i],
share->table_name,
0,
&share->key_type[curr_index]
);
if (error) { goto cleanup; }
num_DB_opened++;
}
//
// scan primary table, create each secondary key, add to each DB
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
assert(error == 0);
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
}
//
// for each element in the primary table, insert the proper key value pair in each secondary table
// that is created
//
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT);
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
goto cleanup;
}
unpack_row(tmp_record, &row);
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], key_buff, tmp_record);
uint curr_index = i + table_arg->s->keys;
u_int32_t put_flags = share->key_type[curr_index];
error = share->key_file[curr_index]->put(share->key_file[curr_index], NULL, &secondary_key, &current_primary_key, put_flags);
if (error) {
//
// in the case of any error anywhere, we can just nuke all the files created, so we dont need
// to be tricky and try to roll back changes. That is why we commit the transaction,
// which should be fast. The DB is going to go away anyway, so no pt in trying to keep
// it in a good state.
//
txn->commit(txn, 0);
goto cleanup;
}
}
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT);
}
error = txn->commit(txn, 0);
assert(error == 0);
tmp_cursor->c_close(tmp_cursor);
error = 0;
cleanup:
if (error) {
//
// We need to delete all the files that may have been created
// The DB's must be closed and removed
//
for (uint i = table_arg->s->keys; i < table_arg->s->keys + num_DB_opened; i++) {
share->key_file[i]->close(share->key_file[i], 0);
share->key_file[i] = NULL;
}
for (uint i = 0; i < num_files_created; i++) {
DB* tmp;
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
if (!(db_create(&tmp, db_env, 0))) {
tmp->remove(tmp, name_buff, NULL, 0);
}
}
}
TOKUDB_DBUG_RETURN(error);
}
//
// Prepares to drop indexes to the table. For each value, i, in the array key_num,
// table->key_info[i] is a key that is to be dropped.
// ***********NOTE*******************
// Although prepare_drop_index is supposed to just get the DB's ready for removal,
// and not actually do the removal, we are doing it here and not in final_drop_index
// For the flags we expose in alter_table_flags, namely xxx_NO_WRITES, this is allowed
// Changes for "future-proofing" this so that it works when we have the equivalent flags
// that are not NO_WRITES are not worth it at the moments
// Parameters:
// [in] table_arg - table that is being modified, seems to be identical to this->table
// [in] key_num - array of indexes that specify which keys of the array table->key_info
// are to be dropped
// num_of_keys - size of array, key_num
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::prepare_drop_index");
int error;
char name_buff[FN_REFLEN];
char newname[share->table_name_length + 32];
char part[MAX_ALIAS_NAME + 10];
DB** dbs_to_remove = NULL;
//
// we allocate an array of DB's here to get ready for removal
// We do this so that all potential memory allocation errors that may occur
// will do so BEFORE we go about dropping any indexes. This way, we
// can fail gracefully without losing integrity of data in such cases. If on
// on the other hand, we started removing DB's, and in the middle,
// one failed, it is not immedietely obvious how one would rollback
//
dbs_to_remove = (DB **)my_malloc(sizeof(*dbs_to_remove)*num_of_keys, MYF(MY_ZEROFILL));
if (dbs_to_remove == NULL) {
error = ENOMEM;
goto cleanup;
}
for (uint i = 0; i < num_of_keys; i++) {
error = db_create(&dbs_to_remove[i], db_env, 0);
if (error) {
goto cleanup;
}
}
for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = key_num[i];
share->key_file[curr_index]->close(share->key_file[curr_index],0);
share->key_file[curr_index] = NULL;
sprintf(part, "key-%s", table_arg->key_info[curr_index].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
dbs_to_remove[i]->remove(dbs_to_remove[i], name_buff, NULL, 0);
}
cleanup:
my_free(dbs_to_remove, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
}
// ***********NOTE*******************
// Although prepare_drop_index is supposed to just get the DB's ready for removal,
// and not actually do the removal, we are doing it here and not in final_drop_index
// For the flags we expose in alter_table_flags, namely xxx_NO_WRITES, this is allowed
// Changes for "future-proofing" this so that it works when we have the equivalent flags
// that are not NO_WRITES are not worth it at the moments, therefore, we can make
// this function just return
int ha_tokudb::final_drop_index(TABLE *table_arg) {
TOKUDB_DBUG_ENTER("ha_tokudb::final_drop_index");
TOKUDB_DBUG_RETURN(0);
}
void ha_tokudb::print_error(int error, myf errflag) { void ha_tokudb::print_error(int error, myf errflag) {
if (error == DB_LOCK_DEADLOCK) if (error == DB_LOCK_DEADLOCK)
error = HA_ERR_LOCK_DEADLOCK; error = HA_ERR_LOCK_DEADLOCK;
......
...@@ -14,7 +14,16 @@ typedef struct st_tokudb_share { ...@@ -14,7 +14,16 @@ typedef struct st_tokudb_share {
ulonglong last_auto_increment; ulonglong last_auto_increment;
ha_rows rows, org_rows; ha_rows rows, org_rows;
ulong *rec_per_key; ulong *rec_per_key;
DB *status_block, *file, **key_file; DB *status_block;
//
// DB that is indexed on the primary key
//
DB *file;
//
// array of all DB's that make up table, includes DB that
// is indexed on the primary key
//
DB *key_file[MAX_KEY];
u_int32_t *key_type; u_int32_t *key_type;
uint status, version; uint status, version;
uint ref_length; uint ref_length;
...@@ -71,25 +80,11 @@ class ha_tokudb : public handler { ...@@ -71,25 +80,11 @@ class ha_tokudb : public handler {
// //
uchar *primary_key_buff; uchar *primary_key_buff;
//
// DB that is indexed on the primary key
//
DB *file;
//
// array of all DB's that make up table, includes DB that
// is indexed on the primary key
//
DB **key_file;
// //
// transaction used by ha_tokudb's cursor // transaction used by ha_tokudb's cursor
// //
DB_TXN *transaction; DB_TXN *transaction;
//
// array that holds put_flags for each database. So, when doing a
// key_file[i]->put, key_type[i] gets passed in for flags
//
u_int32_t *key_type;
// //
// instance of cursor being used for init_xxx and rnd_xxx functions // instance of cursor being used for init_xxx and rnd_xxx functions
// //
...@@ -123,7 +118,8 @@ class ha_tokudb : public handler { ...@@ -123,7 +118,8 @@ class ha_tokudb : public handler {
int pack_row(DBT * row, const uchar * record); int pack_row(DBT * row, const uchar * record);
void unpack_row(uchar * record, DBT * row); void unpack_row(uchar * record, DBT * row);
void unpack_key(uchar * record, DBT * key, uint index); void unpack_key(uchar * record, DBT * key, uint index);
DBT *create_key(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH); DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
...@@ -132,7 +128,9 @@ class ha_tokudb : public handler { ...@@ -132,7 +128,9 @@ class ha_tokudb : public handler {
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore); int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * key, bool); int read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * key, bool);
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
public: public:
ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg); ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
~ha_tokudb() { ~ha_tokudb() {
...@@ -246,6 +244,10 @@ class ha_tokudb : public handler { ...@@ -246,6 +244,10 @@ class ha_tokudb : public handler {
int cmp_ref(const uchar * ref1, const uchar * ref2); int cmp_ref(const uchar * ref1, const uchar * ref2);
bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes); bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes);
int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys);
int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys);
int final_drop_index(TABLE *table_arg);
private: private:
int __close(int mutex_is_locked); int __close(int mutex_is_locked);
int read_last(); int read_last();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment