Commit cf9370b3 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2335], merge handlerton bulk loader code to main

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@18863 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2c4c4ac2
......@@ -308,7 +308,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
}
//
// struct that will be used as a context for smart DBT callbacks
// contains parameters needed to complete the smart DBT cursor call
......@@ -325,44 +324,37 @@ typedef struct index_read_info {
DBT* orig_key;
} *INDEX_READ_INFO;
//
// struct that will be used as a context for smart DBT callbacks
// ONLY for the function add_index
//
typedef struct smart_dbt_ai_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
DBT* prim_key; // DBT to store the primary key
uchar* buf; // buffer to unpack the row
//
// index into key_file that holds DB* that is indexed on
// the primary_key. this->key_file[primary_index] == this->file
//
uint pk_index;
} *SMART_DBT_AI_INFO;
typedef struct row_buffers {
uchar** key_buff;
uchar** rec_buff;
} *ROW_BUFFERS;
static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
int error = 0;
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
//
// copy the key to prim_key
// This will be used as the data value for elements in the secondary index
// being created
//
info->prim_key->size = key->size;
memcpy(info->prim_key->data, key->data, key->size);
//
// For clustering keys on tables with a hidden primary key, we need to copy
// the primary key to current_ident, because that is what the function
// create_dbt_key_from_key uses to create the key in a clustering index
//
info->ha->extract_hidden_primary_key(info->pk_index,key);
error = info->ha->unpack_row(info->buf,row,key, info->ha->primary_key);
return error;
int poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
if (context->thd->killed) {
sprintf(context->write_status_msg, "The process has been killed, aborting bulk load.");
return 1;
}
sprintf(context->write_status_msg, "Loading of data about %f done", progress);
thd_proc_info(context->thd, context->write_status_msg);
return 0;
}
void loader_ai_err_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) {
LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra;
assert(context->ha);
context->ha->set_loader_error(err);
}
void loader_dup_fun(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra) {
LOADER_CONTEXT context = (LOADER_CONTEXT)error_extra;
assert(context->ha);
context->ha->set_loader_error(err);
if (err == DB_KEYEXIST) {
context->ha->set_dup_value_for_pk(key);
}
}
//
......@@ -1144,6 +1136,9 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
bzero(mult_rec_buff, sizeof(mult_rec_buff));
bzero(mult_key_dbt, sizeof(mult_key_dbt));
bzero(mult_rec_dbt, sizeof(mult_rec_dbt));
loader = NULL;
abort_loader = false;
bzero(&lc, sizeof(lc));
}
//
......@@ -2292,16 +2287,17 @@ u_int32_t ha_tokudb::place_key_into_mysql_buff(
uchar *pos = data;
for (; key_part != end; key_part++) {
if (key_part->null_bit) {
if (key_part->field->null_bit) {
uint null_offset = get_null_offset(table, key_part->field);
if (*pos++ == NULL_COL_VAL) { // Null value
//
// We don't need to reset the record data as we will not access it
// if the null data is set
//
record[key_part->null_offset] |= key_part->null_bit;
record[null_offset] |= key_part->field->null_bit;
continue;
}
record[key_part->null_offset] &= ~key_part->null_bit;
record[null_offset] &= ~key_part->field->null_bit;
}
//
// HOPEFULLY TEMPORARY
......@@ -2814,10 +2810,44 @@ bool ha_tokudb::may_table_be_empty() {
void ha_tokudb::start_bulk_insert(ha_rows rows) {
delay_updating_ai_metadata = true;
ai_metadata_update_required = false;
abort_loader = false;
if (share->try_table_lock) {
if (tokudb_prelock_empty && may_table_be_empty()) {
if (using_ignore) {
acquire_table_lock(transaction, lock_write);
}
else {
THD* thd = ha_thd();
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
u_int32_t mult_dbt_flags[MAX_KEY + 1] = {DB_DBT_REALLOC};
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
mult_dbt_flags[primary_key] = 0;
if (!thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !hidden_primary_key) {
mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
int error = db_env->create_loader(
db_env,
transaction,
&loader,
NULL, // no src_db needed
curr_num_DBs,
share->key_file,
mult_put_flags,
mult_dbt_flags,
0
);
if (error) { assert(loader == NULL); }
lc.thd = thd;
lc.ha = this;
error = loader->set_poll_function(loader, poll_fun, &lc);
assert(!error);
error = loader->set_error_callback(loader, loader_dup_fun, &lc);
assert(!error);
}
}
pthread_mutex_lock(&share->mutex);
share->try_table_lock = false;
pthread_mutex_unlock(&share->mutex);
......@@ -2835,13 +2865,185 @@ int ha_tokudb::end_bulk_insert() {
pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment);
pthread_mutex_unlock(&share->mutex);
if (error) { goto cleanup; }
}
delay_updating_ai_metadata = false;
ai_metadata_update_required = false;
return error;
loader_error = 0;
if (loader) {
if (!abort_loader) {
error = loader->close(loader);
loader = NULL;
if (error) { goto cleanup; }
for (uint i = 0; i < table_share->keys; i++) {
if (table_share->key_info[i].flags & HA_NOSAME) {
bool is_unique;
error = is_index_unique(
&is_unique,
transaction,
share->key_file[i],
&table->key_info[i]
);
if (error) goto cleanup;
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
last_dup_key = i;
goto cleanup;
}
}
}
}
else {
loader->abort(loader);
loader = NULL;
}
}
cleanup:
if (loader) {
loader->abort(loader);
loader = NULL;
}
abort_loader = false;
bzero(&lc,sizeof(lc));
if (error || loader_error) {
my_errno = error ? error : loader_error;
}
return error ? error : loader_error;
}
int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info) {
int error;
DBC* tmp_cursor1 = NULL;
DBC* tmp_cursor2 = NULL;
DBT key1, key2, val, packed_key1, packed_key2;
bzero(&key1, sizeof(key1));
bzero(&key2, sizeof(key2));
bzero(&val, sizeof(val));
bzero(&packed_key1, sizeof(packed_key1));
bzero(&packed_key2, sizeof(packed_key2));
*is_unique = true;
error = db->cursor(
db,
txn,
&tmp_cursor1,
0
);
if (error) { goto cleanup; }
error = db->cursor(
db,
txn,
&tmp_cursor2,
0
);
if (error) { goto cleanup; }
error = tmp_cursor1->c_get(
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) { goto cleanup; }
while (error != DB_NOTFOUND) {
bool has_null1;
bool has_null2;
int cmp;
place_key_into_mysql_buff(
key_info,
table->record[0],
(uchar *) key1.data + 1
);
place_key_into_mysql_buff(
key_info,
table->record[1],
(uchar *) key2.data + 1
);
create_dbt_key_for_lookup(
&packed_key1,
key_info,
key_buff,
table->record[0],
&has_null1
);
create_dbt_key_for_lookup(
&packed_key2,
key_info,
key_buff2,
table->record[1],
&has_null2
);
if (!has_null1 && !has_null2) {
cmp = tokudb_prefix_cmp_dbt_key(db, &packed_key1, &packed_key2);
if (cmp == 0) {
*is_unique = false;
break;
}
}
error = tmp_cursor1->c_get(
tmp_cursor1,
&key1,
&val,
DB_NEXT
);
if (error) { goto cleanup; }
error = tmp_cursor2->c_get(
tmp_cursor2,
&key2,
&val,
DB_NEXT
);
if (error && (error != DB_NOTFOUND)) { goto cleanup; }
}
error = 0;
cleanup:
if (tmp_cursor1) {
tmp_cursor1->c_close(tmp_cursor1);
tmp_cursor1 = NULL;
}
if (tmp_cursor2) {
tmp_cursor2->c_close(tmp_cursor2);
tmp_cursor2 = NULL;
}
return error;
}
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key;
int error = 0;
......@@ -3230,6 +3432,14 @@ int ha_tokudb::write_row(uchar * record) {
if (error) { goto cleanup; }
}
if (loader) {
error = loader->put(loader, &prim_key, &row);
if (error) {
abort_loader = true;
goto cleanup;
}
}
else {
if (curr_num_DBs == 1) {
error = insert_row_to_main_dictionary(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
......@@ -3241,6 +3451,7 @@ int ha_tokudb::write_row(uchar * record) {
error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd);
if (error) { goto cleanup; }
}
}
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!error) {
......@@ -6018,12 +6229,16 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
uint curr_index = 0;
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT current_primary_key;
DBT curr_pk_key, curr_pk_val;
DB_TXN* txn = NULL;
uchar* tmp_key_buff = NULL;
uchar* tmp_prim_key_buff = NULL;
uchar* tmp_record = NULL;
THD* thd = ha_thd();
DB_LOADER* loader = NULL;
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
u_int32_t mult_dbt_flags[MAX_KEY + 1] = {DB_DBT_REALLOC};
struct loader_context lc = {0};
lc.thd = thd;
lc.ha = this;
loader_error = 0;
//
// number of DB files we have open currently, before add_index is executed
//
......@@ -6037,26 +6252,15 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
thd_proc_info(thd, "Adding indexes");
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_record = table->record[0];
if (tmp_key_buff == NULL ||
tmp_prim_key_buff == NULL ) {
error = ENOMEM;
goto cleanup;
}
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
//
// in unpack_row, MySQL passes a buffer that is this long,
// so this length should be good enough for us as well
//
bzero((void *) &current_primary_key, sizeof(current_primary_key));
current_primary_key.data = tmp_prim_key_buff;
bzero((void *) &curr_pk_key, sizeof(curr_pk_key));
bzero((void *) &curr_pk_val, sizeof(curr_pk_val));
//
// The files for secondary tables are derived from the name of keys
......@@ -6115,7 +6319,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; }
}
//
// grab some locks to make this go faster
// first a global read lock on the main DB, because
......@@ -6134,18 +6337,24 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
if (error) { goto cleanup; }
//
// now grab a table write lock for secondary tables we
// are creating
//
for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = i + curr_num_DBs;
error = share->key_file[curr_index]->pre_acquire_table_lock(
share->key_file[curr_index],
txn
error = db_env->create_loader(
db_env,
txn,
&loader,
NULL, // no src_db needed
num_of_keys,
&share->key_file[curr_num_DBs],
mult_put_flags,
mult_dbt_flags,
0
);
if (error) { goto cleanup; }
}
error = loader->set_poll_function(loader, poll_fun, &lc);
if (error) { goto cleanup; }
error = loader->set_error_callback(loader, loader_ai_err_fun, &lc);
if (error) { goto cleanup; }
//
// scan primary table, create each secondary key, add to each DB
......@@ -6155,62 +6364,17 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup;
}
//
// for each element in the primary table, insert the proper key value pair in each secondary table
// that is created
//
struct smart_dbt_ai_info info;
info.ha = this;
info.prim_key = &current_primary_key;
info.buf = tmp_record;
info.pk_index = primary_key; // needed so that clustering indexes being created will have right pk info
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
unpack_entire_row = true;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
goto cleanup;
}
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key, row;
bool is_unique_key = key_info[i].flags & HA_NOSAME;
u_int32_t put_flags = DB_YESOVERWRITE;
bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null, false);
uint curr_index = i + curr_num_DBs;
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
if (is_unique_key && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, tmp_record, &key_info[i], curr_index, txn);
if (error) { goto cleanup; }
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
last_dup_key = i;
memcpy(table_arg->record[0], tmp_record, table_arg->s->rec_buff_length);
goto cleanup;
}
}
if (key_info[i].flags & HA_CLUSTERING) {
if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){
goto cleanup;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
else {
bzero((void *)&row, sizeof(row));
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
error = loader->put(loader, &curr_pk_key, &curr_pk_val);
if (error) { goto cleanup; }
}
num_processed++;
if ((num_processed % 1000) == 0) {
......@@ -6221,51 +6385,41 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup;
}
}
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &curr_pk_key, &curr_pk_val, DB_NEXT | DB_PRELOCKED);
}
error = tmp_cursor->c_close(tmp_cursor);
assert(error==0);
tmp_cursor = NULL;
//
// We have an accurate row count, might as well update share->rows
//
pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
pthread_mutex_unlock(&share->mutex);
error = loader->close(loader);
loader = NULL;
if (error) goto cleanup;
//
// Now flatten the new DB's created
//
for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = i + curr_num_DBs;
if ((error = share->key_file[curr_index]->cursor(share->key_file[curr_index], txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
}
error = 0;
num_processed = 0;
while (error != DB_NOTFOUND) {
error = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_do_nothing, NULL);
if (error && error != DB_NOTFOUND) {
goto cleanup;
}
num_processed++;
if ((num_processed % 1000) == 0) {
sprintf(status_msg, "Adding indexes: Applied %llu of %llu rows in key-%s.", num_processed, (long long unsigned) share->rows, key_info[i].name);
thd_proc_info(thd, status_msg);
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
curr_index = curr_num_DBs;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_NOSAME) {
bool is_unique;
error = is_index_unique(
&is_unique,
txn,
share->key_file[curr_index],
&key_info[i]
);
if (error) goto cleanup;
if (!is_unique) {
error = HA_ERR_FOUND_DUPP_KEY;
last_dup_key = i;
goto cleanup;
}
}
}
error = tmp_cursor->c_close(tmp_cursor);
assert(error==0);
tmp_cursor = NULL;
}
//
// We have an accurate row count, might as well update share->rows
//
pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
pthread_mutex_unlock(&share->mutex);
//
// now write stuff to status.tokudb
......@@ -6283,6 +6437,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
assert(r==0);
tmp_cursor = NULL;
}
if (loader != NULL) {
loader->abort(loader);
}
if (txn) {
if (error) {
curr_index = curr_num_DBs;
......@@ -6307,9 +6464,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
another transaction has accessed the table. \
To add indexes, make sure no transactions touch the table.", share->table_name);
}
my_free(tmp_key_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_prim_key_buff,MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
TOKUDB_DBUG_RETURN(error ? error : loader_error);
}
//
......@@ -6387,6 +6542,9 @@ void ha_tokudb::print_error(int error, myf errflag) {
if (error == ENOSPC) {
error = HA_ERR_DISK_FULL;
}
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
handler::print_error(error, errflag);
}
......@@ -6643,4 +6801,12 @@ To truncate the table, make sure no transactions touch the table.", share->table
TOKUDB_DBUG_RETURN(error);
}
void ha_tokudb::set_loader_error(int err) {
loader_error = err;
}
void ha_tokudb::set_dup_value_for_pk(DBT* key) {
assert(!hidden_primary_key);
unpack_key(table->record[0],key,primary_key);
last_dup_key = primary_key;
}
......@@ -5,6 +5,13 @@
#include <db.h>
#include "hatoku_cmp.h"
class ha_tokudb;
typedef struct loader_context {
THD* thd;
char write_status_msg[200];
ha_tokudb* ha;
} *LOADER_CONTEXT;
//
// This object stores table information that is to be shared
......@@ -243,8 +250,12 @@ class ha_tokudb : public handler {
// so a buffer of 200 is good enough.
//
char write_status_msg[200]; //buffer of 200 should be a good upper bound.
struct loader_context lc;
ulonglong read_lock_wait_time;
DB_LOADER* loader;
bool abort_loader;
int loader_error;
bool fix_rec_buff_for_blob(ulong length);
void fix_mult_rec_buff();
......@@ -298,6 +309,7 @@ class ha_tokudb : public handler {
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, u_int32_t keynr);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info);
void trace_create_table_info(const char *name, TABLE * form);
int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info);
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
......@@ -463,6 +475,8 @@ class ha_tokudb : public handler {
}
void track_progress(THD* thd);
void set_loader_error(int err);
void set_dup_value_for_pk(DBT* key);
//
......@@ -476,3 +490,4 @@ class ha_tokudb : public handler {
int __close(int mutex_is_locked);
int read_last(uint keynr);
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment