Commit 870a5554 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2335], merge handlerton bulk loader code to main

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@18863 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0d5a8bce
...@@ -308,7 +308,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const { ...@@ -308,7 +308,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
} }
// //
// struct that will be used as a context for smart DBT callbacks // struct that will be used as a context for smart DBT callbacks
// contains parameters needed to complete the smart DBT cursor call // contains parameters needed to complete the smart DBT cursor call
...@@ -325,44 +324,37 @@ typedef struct index_read_info { ...@@ -325,44 +324,37 @@ typedef struct index_read_info {
DBT* orig_key; DBT* orig_key;
} *INDEX_READ_INFO; } *INDEX_READ_INFO;
//
// struct that will be used as a context for smart DBT callbacks
// ONLY for the function add_index
//
typedef struct smart_dbt_ai_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
DBT* prim_key; // DBT to store the primary key
uchar* buf; // buffer to unpack the row
//
// index into key_file that holds DB* that is indexed on
// the primary_key. this->key_file[primary_index] == this->file
//
uint pk_index;
} *SMART_DBT_AI_INFO;
typedef struct row_buffers { typedef struct row_buffers {
uchar** key_buff; uchar** key_buff;
uchar** rec_buff; uchar** rec_buff;
} *ROW_BUFFERS; } *ROW_BUFFERS;
static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
int error = 0; int poll_fun(void *extra, float progress) {
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context; LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
// if (context->thd->killed) {
// copy the key to prim_key sprintf(context->write_status_msg, "The process has been killed, aborting bulk load.");
// This will be used as the data value for elements in the secondary index return 1;
// being created }
// sprintf(context->write_status_msg, "Loading of data about %f done", progress);
info->prim_key->size = key->size; thd_proc_info(context->thd, context->write_status_msg);
memcpy(info->prim_key->data, key->data, key->size); return 0;
// }
// For clustering keys on tables with a hidden primary key, we need to copy
// the primary key to current_ident, because that is what the function
// create_dbt_key_from_key uses to create the key in a clustering index void loader_ai_err_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) {
// LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra;
info->ha->extract_hidden_primary_key(info->pk_index,key); assert(context->ha);
error = info->ha->unpack_row(info->buf,row,key, info->ha->primary_key); context->ha->set_loader_error(err);
return error; }
void loader_dup_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) {
LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra;
assert(context->ha);
context->ha->set_loader_error(err);
if (err == DB_KEYEXIST) {
context->ha->set_dup_value_for_pk(key);
}
} }
// //
...@@ -1144,6 +1136,9 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1144,6 +1136,9 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
bzero(mult_rec_buff, sizeof(mult_rec_buff)); bzero(mult_rec_buff, sizeof(mult_rec_buff));
bzero(mult_key_dbt, sizeof(mult_key_dbt)); bzero(mult_key_dbt, sizeof(mult_key_dbt));
bzero(mult_rec_dbt, sizeof(mult_rec_dbt)); bzero(mult_rec_dbt, sizeof(mult_rec_dbt));
loader = NULL;
abort_loader = false;
bzero(&lc, sizeof(lc));
} }
// //
...@@ -2292,16 +2287,17 @@ u_int32_t ha_tokudb::place_key_into_mysql_buff( ...@@ -2292,16 +2287,17 @@ u_int32_t ha_tokudb::place_key_into_mysql_buff(
uchar *pos = data; uchar *pos = data;
for (; key_part != end; key_part++) { for (; key_part != end; key_part++) {
if (key_part->null_bit) { if (key_part->field->null_bit) {
uint null_offset = get_null_offset(table, key_part->field);
if (*pos++ == NULL_COL_VAL) { // Null value if (*pos++ == NULL_COL_VAL) { // Null value
// //
// We don't need to reset the record data as we will not access it // We don't need to reset the record data as we will not access it
// if the null data is set // if the null data is set
// //
record[key_part->null_offset] |= key_part->null_bit; record[null_offset] |= key_part->field->null_bit;
continue; continue;
} }
record[key_part->null_offset] &= ~key_part->null_bit; record[null_offset] &= ~key_part->field->null_bit;
} }
// //
// HOPEFULLY TEMPORARY // HOPEFULLY TEMPORARY
...@@ -2814,9 +2810,43 @@ bool ha_tokudb::may_table_be_empty() { ...@@ -2814,9 +2810,43 @@ bool ha_tokudb::may_table_be_empty() {
void ha_tokudb::start_bulk_insert(ha_rows rows) { void ha_tokudb::start_bulk_insert(ha_rows rows) {
delay_updating_ai_metadata = true; delay_updating_ai_metadata = true;
ai_metadata_update_required = false; ai_metadata_update_required = false;
abort_loader = false;
if (share->try_table_lock) { if (share->try_table_lock) {
if (tokudb_prelock_empty && may_table_be_empty()) { if (tokudb_prelock_empty && may_table_be_empty()) {
acquire_table_lock(transaction, lock_write); if (using_ignore) {
acquire_table_lock(transaction, lock_write);
}
else {
THD* thd = ha_thd();
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
u_int32_t mult_dbt_flags[MAX_KEY + 1] = {DB_DBT_REALLOC};
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
mult_dbt_flags[primary_key] = 0;
if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) {
mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
int error = db_env->create_loader(
db_env,
transaction,
&loader,
NULL, // no src_db needed
curr_num_DBs,
share->key_file,
mult_put_flags,
mult_dbt_flags,
0
);
if (error) { assert(loader == NULL); }
lc.thd = thd;
lc.ha = this;
error = loader->set_poll_function(loader, poll_fun, &lc);
assert(!error);
error = loader->set_error_callback(loader, loader_dup_fun, &lc);
assert(!error);
}
} }
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
share->try_table_lock = false; share->try_table_lock = false;
...@@ -2835,13 +2865,185 @@ int ha_tokudb::end_bulk_insert() { ...@@ -2835,13 +2865,185 @@ int ha_tokudb::end_bulk_insert() {
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment); error = update_max_auto_inc(share->status_block, share->last_auto_increment);
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
if (error) { goto cleanup; }
} }
delay_updating_ai_metadata = false; delay_updating_ai_metadata = false;
ai_metadata_update_required = false; ai_metadata_update_required = false;
return error; loader_error = 0;
if (loader) {
if (!abort_loader) {
error = loader->close(loader);
loader = NULL;
if (error) { goto cleanup; }
for (uint i = 0; i < table_share->keys; i++) {
if (table_share->key_info[i].flags & HA_NOSAME) {
bool is_unique;
error = is_index_unique(
&is_unique,
transaction,
share->key_file[i],
&table->key_info[i]
);
if (error) goto cleanup;
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
last_dup_key = i;
goto cleanup;
}
}
}
}
else {
loader->abort(loader);
loader = NULL;
}
}
cleanup:
if (loader) {
loader->abort(loader);
loader = NULL;
}
abort_loader = false;
bzero(&lc,sizeof(lc));
if (error || loader_error) {
my_errno = error ? error : loader_error;
}
return error ? error : loader_error;
} }
int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) {
int error;
DBC* tmp_cursor1 = NULL;
DBC* tmp_cursor2 = NULL;
DBT key1, key2, val, packed_key1, packed_key2;
bzero(&key1, sizeof(key1));
bzero(&key2, sizeof(key2));
bzero(&val, sizeof(val));
bzero(&packed_key1, sizeof(packed_key1));
bzero(&packed_key2, sizeof(packed_key2));
*is_unique = true;
error = db->cursor(
db,
txn,
&tmp_cursor1,
0
);
if (error) { goto cleanup; }
error = db->cursor(
db,
txn,
&tmp_cursor2,
0
);
if (error) { goto cleanup; }
error = tmp_cursor1->c_get(
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) { goto cleanup; }
while (error != DB_NOTFOUND) {
bool has_null1;
bool has_null2;
int cmp;
place_key_into_mysql_buff(
key_info,
table->record[0],
(uchar *) key1.data + 1
);
place_key_into_mysql_buff(
key_info,
table->record[1],
(uchar *) key2.data + 1
);
create_dbt_key_for_lookup(
&packed_key1,
key_info,
key_buff,
table->record[0],
&has_null1
);
create_dbt_key_for_lookup(
&packed_key2,
key_info,
key_buff2,
table->record[1],
&has_null2
);
if (!has_null1 && !has_null2) {
cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2);
if (cmp == 0) {
*is_unique = false;
break;
}
}
error = tmp_cursor1->c_get(
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error && (error != DB_NOTFOUND)) { goto cleanup; }
}
error = 0;
cleanup:
if (tmp_cursor1) {
tmp_cursor1->c_close(tmp_cursor1);
tmp_cursor1 = NULL;
}
if (tmp_cursor2) {
tmp_cursor2->c_close(tmp_cursor2);
tmp_cursor2 = NULL;
}
return error;
}
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) { int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key; DBT key;
int error = 0; int error = 0;
...@@ -3230,16 +3432,25 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -3230,16 +3432,25 @@ int ha_tokudb::write_row(uchar * record) {
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
if (curr_num_DBs == 1) { if (loader) {
error = insert_row_to_main_dictionary(record,&prim_key, &row, txn); error = loader->put(loader, &prim_key, &row);
if (error) { goto cleanup; } if (error) {
abort_loader = true;
goto cleanup;
}
} }
else { else {
error = do_uniqueness_checks(record, txn, thd); if (curr_num_DBs == 1) {
if (error) { goto cleanup; } error = insert_row_to_main_dictionary(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
}
else {
error = do_uniqueness_checks(record, txn, thd);
if (error) { goto cleanup; }
error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd); error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd);
if (error) { goto cleanup; } if (error) { goto cleanup; }
}
} }
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
...@@ -6018,12 +6229,16 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6018,12 +6229,16 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
uint curr_index = 0; uint curr_index = 0;
DBC* tmp_cursor = NULL; DBC* tmp_cursor = NULL;
int cursor_ret_val = 0; int cursor_ret_val = 0;
DBT current_primary_key; DBT curr_pk_key, curr_pk_val;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
uchar* tmp_key_buff = NULL;
uchar* tmp_prim_key_buff = NULL;
uchar* tmp_record = NULL;
THD* thd = ha_thd(); THD* thd = ha_thd();
DB_LOADER* loader = NULL;
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
u_int32_t mult_dbt_flags[MAX_KEY + 1] = {DB_DBT_REALLOC};
struct loader_context lc = {0};
lc.thd = thd;
lc.ha = this;
loader_error = 0;
// //
// number of DB files we have open currently, before add_index is executed // number of DB files we have open currently, before add_index is executed
// //
...@@ -6037,26 +6252,15 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6037,26 +6252,15 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
read_lock_wait_time = get_read_lock_wait_time(ha_thd()); read_lock_wait_time = get_read_lock_wait_time(ha_thd());
thd_proc_info(thd, "Adding indexes"); thd_proc_info(thd, "Adding indexes");
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_record = table->record[0];
if (tmp_key_buff == NULL ||
tmp_prim_key_buff == NULL ) {
error = ENOMEM;
goto cleanup;
}
error = db_env->txn_begin(db_env, 0, &txn, 0); error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
// //
// in unpack_row, MySQL passes a buffer that is this long, // in unpack_row, MySQL passes a buffer that is this long,
// so this length should be good enough for us as well // so this length should be good enough for us as well
// //
bzero((void *) &current_primary_key, sizeof(current_primary_key)); bzero((void *) &curr_pk_key, sizeof(curr_pk_key));
current_primary_key.data = tmp_prim_key_buff; bzero((void *) &curr_pk_val, sizeof(curr_pk_val));
// //
// The files for secondary tables are derived from the name of keys // The files for secondary tables are derived from the name of keys
...@@ -6115,7 +6319,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6115,7 +6319,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
// //
// grab some locks to make this go faster // grab some locks to make this go faster
// first a global read lock on the main DB, because // first a global read lock on the main DB, because
...@@ -6123,29 +6326,35 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6123,29 +6326,35 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// //
lockretryN(read_lock_wait_time){ lockretryN(read_lock_wait_time){
error = share->file->pre_acquire_read_lock( error = share->file->pre_acquire_read_lock(
share->file, share->file,
txn, txn,
share->file->dbt_neg_infty(), share->file->dbt_neg_infty(),
NULL, NULL,
share->file->dbt_pos_infty(), share->file->dbt_pos_infty(),
NULL NULL
); );
lockretry_wait; lockretry_wait;
} }
if (error) { goto cleanup; } if (error) { goto cleanup; }
// error = db_env->create_loader(
// now grab a table write lock for secondary tables we db_env,
// are creating txn,
// &loader,
for (uint i = 0; i < num_of_keys; i++) { NULL, // no src_db needed
uint curr_index = i + curr_num_DBs; num_of_keys,
error = share->key_file[curr_index]->pre_acquire_table_lock( &share->key_file[curr_num_DBs],
share->key_file[curr_index], mult_put_flags,
txn mult_dbt_flags,
); 0
if (error) { goto cleanup; } );
} if (error) { goto cleanup; }
error = loader->set_poll_function(loader, poll_fun, &lc);
if (error) { goto cleanup; }
error = loader->set_error_callback(loader, loader_ai_err_fun, &lc);
if (error) { goto cleanup; }
// //
// scan primary table, create each secondary key, add to each DB // scan primary table, create each secondary key, add to each DB
...@@ -6155,62 +6364,17 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6155,62 +6364,17 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup; goto cleanup;
} }
// cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
// for each element in the primary table, insert the proper key value pair in each secondary table
// that is created
//
struct smart_dbt_ai_info info;
info.ha = this;
info.prim_key = &current_primary_key;
info.buf = tmp_record;
info.pk_index = primary_key; // needed so that clustering indexes being created will have right pk info
unpack_entire_row = true;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
while (cursor_ret_val != DB_NOTFOUND) { while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) { if (cursor_ret_val) {
error = cursor_ret_val; error = cursor_ret_val;
goto cleanup; goto cleanup;
} }
for (uint i = 0; i < num_of_keys; i++) { error = loader->put(loader, &curr_pk_key, &curr_pk_val);
DBT secondary_key, row; if (error) { goto cleanup; }
bool is_unique_key = key_info[i].flags & HA_NOSAME;
u_int32_t put_flags = DB_YESOVERWRITE;
bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null, false);
uint curr_index = i + curr_num_DBs;
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
if (is_unique_key && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, tmp_record, &key_info[i], curr_index, txn);
if (error) { goto cleanup; }
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
last_dup_key = i;
memcpy(table_arg->record[0], tmp_record, table_arg->s->rec_buff_length);
goto cleanup;
}
}
if (key_info[i].flags & HA_CLUSTERING) {
if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){
goto cleanup;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
else {
bzero((void *)&row, sizeof(row));
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
if (error) { goto cleanup; }
}
num_processed++; num_processed++;
if ((num_processed % 1000) == 0) { if ((num_processed % 1000) == 0) {
...@@ -6221,51 +6385,41 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6221,51 +6385,41 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup; goto cleanup;
} }
} }
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info); cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
} }
error = tmp_cursor->c_close(tmp_cursor); error = tmp_cursor->c_close(tmp_cursor);
assert(error==0); assert(error==0);
tmp_cursor = NULL; tmp_cursor = NULL;
// error = loader->close(loader);
// We have an accurate row count, might as well update share->rows loader = NULL;
// if (error) goto cleanup;
pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
pthread_mutex_unlock(&share->mutex);
// curr_index = curr_num_DBs;
// Now flatten the new DB's created for (uint i = 0; i < num_of_keys; i++, curr_index++) {
// if (key_info[i].flags & HA_NOSAME) {
for (uint i = 0; i < num_of_keys; i++) { bool is_unique;
uint curr_index = i + curr_num_DBs; error = is_index_unique(
if ((error = share->key_file[curr_index]->cursor(share->key_file[curr_index], txn, &tmp_cursor, 0))) { &is_unique,
tmp_cursor = NULL; // Safety txn,
goto cleanup; share->key_file[curr_index],
} &key_info[i]
error = 0; );
num_processed = 0; if (error) goto cleanup;
while (error != DB_NOTFOUND) { if (!is_unique) {
error = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_do_nothing, NULL); error = HA_ERR_FOUND_DUPP_KEY;
if (error && error != DB_NOTFOUND) { last_dup_key = i;
goto cleanup; goto cleanup;
} }
num_processed++;
if ((num_processed % 1000) == 0) {
sprintf(status_msg, "Adding indexes: Applied %llu of %llu rows in key-%s.", num_processed, (long long unsigned) share->rows, key_info[i].name);
thd_proc_info(thd, status_msg);
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
goto cleanup;
}
}
} }
error = tmp_cursor->c_close(tmp_cursor);
assert(error==0);
tmp_cursor = NULL;
} }
//
// We have an accurate row count, might as well update share->rows
//
pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
pthread_mutex_unlock(&share->mutex);
// //
// now write stuff to status.tokudb // now write stuff to status.tokudb
...@@ -6283,6 +6437,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6283,6 +6437,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
assert(r==0); assert(r==0);
tmp_cursor = NULL; tmp_cursor = NULL;
} }
if (loader != NULL) {
loader->abort(loader);
}
if (txn) { if (txn) {
if (error) { if (error) {
curr_index = curr_num_DBs; curr_index = curr_num_DBs;
...@@ -6307,9 +6464,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6307,9 +6464,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
another transaction has accessed the table. \ another transaction has accessed the table. \
To add indexes, make sure no transactions touch the table.", share->table_name); To add indexes, make sure no transactions touch the table.", share->table_name);
} }
my_free(tmp_key_buff,MYF(MY_ALLOW_ZERO_PTR)); TOKUDB_DBUG_RETURN(error ? error : loader_error);
my_free(tmp_prim_key_buff,MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
} }
// //
...@@ -6387,6 +6542,9 @@ void ha_tokudb::print_error(int error, myf errflag) { ...@@ -6387,6 +6542,9 @@ void ha_tokudb::print_error(int error, myf errflag) {
if (error == ENOSPC) { if (error == ENOSPC) {
error = HA_ERR_DISK_FULL; error = HA_ERR_DISK_FULL;
} }
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
handler::print_error(error, errflag); handler::print_error(error, errflag);
} }
...@@ -6643,4 +6801,12 @@ To truncate the table, make sure no transactions touch the table.", share->table ...@@ -6643,4 +6801,12 @@ To truncate the table, make sure no transactions touch the table.", share->table
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
void ha_tokudb::set_loader_error(int err) {
loader_error = err;
}
void ha_tokudb::set_dup_value_for_pk(DBT* key) {
assert(!hidden_primary_key);
unpack_key(table->record[0],key,primary_key);
last_dup_key = primary_key;
}
...@@ -5,6 +5,13 @@ ...@@ -5,6 +5,13 @@
#include <db.h> #include <db.h>
#include "hatoku_cmp.h" #include "hatoku_cmp.h"
class ha_tokudb;
typedef struct loader_context {
THD* thd;
char write_status_msg[200];
ha_tokudb* ha;
} *LOADER_CONTEXT;
// //
// This object stores table information that is to be shared // This object stores table information that is to be shared
...@@ -243,9 +250,13 @@ class ha_tokudb : public handler { ...@@ -243,9 +250,13 @@ class ha_tokudb : public handler {
// so a buffer of 200 is good enough. // so a buffer of 200 is good enough.
// //
char write_status_msg[200]; //buffer of 200 should be a good upper bound. char write_status_msg[200]; //buffer of 200 should be a good upper bound.
struct loader_context lc;
ulonglong read_lock_wait_time; ulonglong read_lock_wait_time;
DB_LOADER* loader;
bool abort_loader;
int loader_error;
bool fix_rec_buff_for_blob(ulong length); bool fix_rec_buff_for_blob(ulong length);
void fix_mult_rec_buff(); void fix_mult_rec_buff();
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH]; uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
...@@ -298,6 +309,7 @@ class ha_tokudb : public handler { ...@@ -298,6 +309,7 @@ class ha_tokudb : public handler {
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, u_int32_t keynr); int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, u_int32_t keynr);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info); int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info);
void trace_create_table_info(const char *name, TABLE * form); void trace_create_table_info(const char *name, TABLE * form);
int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info);
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn); int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd); int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn); int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
...@@ -463,6 +475,8 @@ class ha_tokudb : public handler { ...@@ -463,6 +475,8 @@ class ha_tokudb : public handler {
} }
void track_progress(THD* thd); void track_progress(THD* thd);
void set_loader_error(int err);
void set_dup_value_for_pk(DBT* key);
// //
...@@ -476,3 +490,4 @@ class ha_tokudb : public handler { ...@@ -476,3 +490,4 @@ class ha_tokudb : public handler {
int __close(int mutex_is_locked); int __close(int mutex_is_locked);
int read_last(uint keynr); int read_last(uint keynr);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment