Commit 54e0641f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1084

implement HA_EXTRA_IGNORE_DUP_KEY correctly

git-svn-id: file:///svn/mysql/tokudb-engine/src@5592 c7de825b-a66e-492c-adef-691d508d4ae1
parent 45f58be0
......@@ -1982,6 +1982,8 @@ int ha_tokudb::write_row(uchar * record) {
THD *thd = NULL;
u_int32_t put_flags;
bool has_null;
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
//
// some crap that needs to be done because MySQL does not properly abstract
......@@ -2002,6 +2004,14 @@ int ha_tokudb::write_row(uchar * record) {
get_auto_primary_key(current_ident);
}
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0);
if (error) {
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
//
// first the primary key (because it must be unique, has highest chance of failure)
//
......@@ -2012,7 +2022,7 @@ int ha_tokudb::write_row(uchar * record) {
}
error = share->file->put(
share->file,
transaction,
txn,
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null),
&row,
put_flags
......@@ -2036,7 +2046,7 @@ int ha_tokudb::write_row(uchar * record) {
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
txn,
&key,
&prim_key,
put_flags
......@@ -2046,16 +2056,10 @@ int ha_tokudb::write_row(uchar * record) {
// and MySQL told us to ignore duplicate key errors
//
if (error) {
if (using_ignore && error == DB_KEYEXIST) {
error = 0;
continue;
}
else {
last_dup_key = keynr;
goto cleanup;
}
}
}
if (!error) {
added_rows++;
......@@ -2064,6 +2068,17 @@ int ha_tokudb::write_row(uchar * record) {
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
if (sub_trans) {
// no point in recording error value of abort.
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
}
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -2094,7 +2109,7 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row)
Update a row from one value to another.
Clobbers key_buff2
*/
int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key, bool local_using_ignore) {
int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key) {
TOKUDB_DBUG_ENTER("update_primary_key");
DBT row;
int error;
......@@ -2107,19 +2122,16 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key;
if (local_using_ignore) {
int new_error;
if ((new_error = pack_row(&row, old_row)) || (new_error = share->file->put(share->file, trans, old_key, &row, share->key_type[primary_key])))
error = new_error; // fatal error
}
}
}
}
} else {
else {
// Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row)))
if (!(error = pack_row(&row, new_row))) {
error = share->file->put(share->file, trans, new_key, &row, 0);
}
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -2139,6 +2151,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
bool primary_key_changed;
bool has_null;
THD* thd = ha_thd();
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
......@@ -2163,8 +2177,16 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
}
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0);
if (error) {
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
/* Start by updating the primary key */
error = update_primary_key(transaction, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key, using_ignore);
error = update_primary_key(txn, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key);
if (error) {
last_dup_key = primary_key;
goto cleanup;
......@@ -2176,7 +2198,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
u_int32_t put_flags;
if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) {
if ((error = remove_key(txn, keynr, old_row, &old_prim_key))) {
goto cleanup;
}
put_flags = share->key_type[keynr];
......@@ -2185,7 +2207,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
txn,
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null),
&prim_key,
put_flags
......@@ -2195,21 +2217,26 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// and MySQL told us to ignore duplicate key errors
//
if (error) {
if (using_ignore && error == DB_KEYEXIST) {
error = 0;
continue;
}
else {
last_dup_key = keynr;
goto cleanup;
}
}
}
}
cleanup:
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY;
}
if (sub_trans) {
// no point in recording error value of abort.
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
}
}
TOKUDB_DBUG_RETURN(error);
}
......
......@@ -167,7 +167,7 @@ class ha_tokudb : public handler {
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key);
int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment