Commit f9850e0f authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1084

implement HA_EXTRA_IGNORE_DUP_KEY correctly

git-svn-id: file:///svn/mysql/tokudb-engine/src@5592 c7de825b-a66e-492c-adef-691d508d4ae1
parent 060d2d73
...@@ -1982,6 +1982,8 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -1982,6 +1982,8 @@ int ha_tokudb::write_row(uchar * record) {
THD *thd = NULL; THD *thd = NULL;
u_int32_t put_flags; u_int32_t put_flags;
bool has_null; bool has_null;
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
// //
// some crap that needs to be done because MySQL does not properly abstract // some crap that needs to be done because MySQL does not properly abstract
...@@ -2002,6 +2004,14 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2002,6 +2004,14 @@ int ha_tokudb::write_row(uchar * record) {
get_auto_primary_key(current_ident); get_auto_primary_key(current_ident);
} }
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0);
if (error) {
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
// //
// first the primary key (because it must be unique, has highest chance of failure) // first the primary key (because it must be unique, has highest chance of failure)
// //
...@@ -2012,7 +2022,7 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2012,7 +2022,7 @@ int ha_tokudb::write_row(uchar * record) {
} }
error = share->file->put( error = share->file->put(
share->file, share->file,
transaction, txn,
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null), create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null),
&row, &row,
put_flags put_flags
...@@ -2036,7 +2046,7 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2036,7 +2046,7 @@ int ha_tokudb::write_row(uchar * record) {
} }
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
transaction, txn,
&key, &key,
&prim_key, &prim_key,
put_flags put_flags
...@@ -2046,16 +2056,10 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2046,16 +2056,10 @@ int ha_tokudb::write_row(uchar * record) {
// and MySQL told us to ignore duplicate key errors // and MySQL told us to ignore duplicate key errors
// //
if (error) { if (error) {
if (using_ignore && error == DB_KEYEXIST) {
error = 0;
continue;
}
else {
last_dup_key = keynr; last_dup_key = keynr;
goto cleanup; goto cleanup;
} }
} }
}
if (!error) { if (!error) {
added_rows++; added_rows++;
...@@ -2064,6 +2068,17 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2064,6 +2068,17 @@ int ha_tokudb::write_row(uchar * record) {
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
} }
if (sub_trans) {
// no point in recording error value of abort.
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
}
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2094,7 +2109,7 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row) ...@@ -2094,7 +2109,7 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row)
Update a row from one value to another. Update a row from one value to another.
Clobbers key_buff2 Clobbers key_buff2
*/ */
int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key, bool local_using_ignore) { int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key) {
TOKUDB_DBUG_ENTER("update_primary_key"); TOKUDB_DBUG_ENTER("update_primary_key");
DBT row; DBT row;
int error; int error;
...@@ -2107,19 +2122,16 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons ...@@ -2107,19 +2122,16 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) { if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed // Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key; last_dup_key = primary_key;
if (local_using_ignore) {
int new_error;
if ((new_error = pack_row(&row, old_row)) || (new_error = share->file->put(share->file, trans, old_key, &row, share->key_type[primary_key])))
error = new_error; // fatal error
} }
} }
} }
} }
} else { else {
// Primary key didn't change; just update the row data // Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row))) if (!(error = pack_row(&row, new_row))) {
error = share->file->put(share->file, trans, new_key, &row, 0); error = share->file->put(share->file, trans, new_key, &row, 0);
} }
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2139,6 +2151,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2139,6 +2151,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
bool primary_key_changed; bool primary_key_changed;
bool has_null; bool has_null;
THD* thd = ha_thd(); THD* thd = ha_thd();
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
LINT_INIT(error); LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
...@@ -2163,8 +2177,16 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2163,8 +2177,16 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
} }
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0);
if (error) {
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
/* Start by updating the primary key */ /* Start by updating the primary key */
error = update_primary_key(transaction, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key, using_ignore); error = update_primary_key(txn, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key);
if (error) { if (error) {
last_dup_key = primary_key; last_dup_key = primary_key;
goto cleanup; goto cleanup;
...@@ -2176,7 +2198,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2176,7 +2198,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) { if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
u_int32_t put_flags; u_int32_t put_flags;
if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) { if ((error = remove_key(txn, keynr, old_row, &old_prim_key))) {
goto cleanup; goto cleanup;
} }
put_flags = share->key_type[keynr]; put_flags = share->key_type[keynr];
...@@ -2185,7 +2207,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2185,7 +2207,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
transaction, txn,
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null), create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null),
&prim_key, &prim_key,
put_flags put_flags
...@@ -2195,21 +2217,26 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2195,21 +2217,26 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// and MySQL told us to ignore duplicate key errors // and MySQL told us to ignore duplicate key errors
// //
if (error) { if (error) {
if (using_ignore && error == DB_KEYEXIST) {
error = 0;
continue;
}
else {
last_dup_key = keynr; last_dup_key = keynr;
goto cleanup; goto cleanup;
} }
} }
} }
}
cleanup: cleanup:
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
} }
if (sub_trans) {
// no point in recording error value of abort.
// nothing we can do about it anyway and it is not what
// we want to return.
if (error) {
sub_trans->abort(sub_trans);
}
else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC);
}
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
...@@ -167,7 +167,7 @@ class ha_tokudb : public handler { ...@@ -167,7 +167,7 @@ class ha_tokudb : public handler {
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row); int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore); int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key);
int handle_cursor_error(int error, int err_to_return, uint keynr); int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment