Commit 0c595bde authored by Nikita Malyavin's avatar Nikita Malyavin

MDEV-22434 UPDATE on RocksDB table with WITHOUT OVERLAPS fails

Insert worked incorrect as well. RocksDB used table->record[0] internally to store some
intermediate results for key conversion, during index searching among other operations.
So table->record[0] is spoiled during ha_rnd_index_map in ha_check_overlaps, so in turn
the broken record data was inserted.

The fix is to store RocksDB intermediate result in its own buffer instead of table->record[0].

`rocksdb` MTR suite is is checked and runs fine.
No need for additional tests. The existing overlaps.test covers the case completely.
However, I am not going to add anything related to rocksdb to suite, to keep it away
from additional dependencies.

To run tests with RocksDB engine, one can add following to engines.combinations:
[rocksdb]
plugin-load=$HA_ROCKSDB_SO
default-storage-engine=rocksdb
rocksdb
parent c3e09a2d
......@@ -175,7 +175,7 @@ insert into t values (3, NULL, '2003-03-01', '2003-05-01');
insert into t values (1, 1, '2003-03-01', '2003-05-01');
insert into t values (1, 2, '2003-05-01', '2003-07-01');
insert into t values (4, NULL, '2003-03-01', '2003-05-01');
create sequence seq start=5;
create sequence seq start=5 engine=myisam;
update t set id= nextval(seq), u= nextval(seq), s='2003-05-01', e='2003-07-01'
where u is NULL;
select * from t;
......
......@@ -157,7 +157,7 @@ insert into t values (1, 1, '2003-03-01', '2003-05-01');
insert into t values (1, 2, '2003-05-01', '2003-07-01');
insert into t values (4, NULL, '2003-03-01', '2003-05-01');
create sequence seq start=5;
create sequence seq start=5 engine=myisam;
update t set id= nextval(seq), u= nextval(seq), s='2003-05-01', e='2003-07-01'
where u is NULL;
......
......@@ -4279,7 +4279,9 @@ mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
key_info->without_overlaps= key->without_overlaps;
if (key_info->without_overlaps)
{
if (key_info->algorithm == HA_KEY_ALG_LONG_HASH)
if (key_info->algorithm == HA_KEY_ALG_HASH ||
key_info->algorithm == HA_KEY_ALG_LONG_HASH)
{
my_error(ER_KEY_CANT_HAVE_WITHOUT_OVERLAPS, MYF(0), key_info->name.str);
DBUG_RETURN(true);
......
......@@ -6328,6 +6328,7 @@ ha_rocksdb::ha_rocksdb(my_core::handlerton *const hton,
m_dup_sk_packed_tuple(nullptr),
m_dup_sk_packed_tuple_old(nullptr),
m_pack_buffer(nullptr),
m_record_buffer(nullptr),
m_lock_rows(RDB_LOCK_NONE),
m_keyread_only(false),
m_insert_with_update(false),
......@@ -6542,6 +6543,7 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
uint key_len = 0;
uint max_packed_sk_len = 0;
uint pack_key_len = 0;
uint record_len = table->s->reclength + table->s->null_bytes;
m_pk_descr = kd_arr[pk_index(table_arg, tbl_def_arg)];
if (has_hidden_pk(table_arg)) {
......@@ -6586,6 +6588,8 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
reinterpret_cast<uchar *>(my_malloc(PSI_INSTRUMENT_ME, max_packed_sk_len, MYF(0)));
m_pack_buffer =
reinterpret_cast<uchar *>(my_malloc(PSI_INSTRUMENT_ME, max_packed_sk_len, MYF(0)));
m_record_buffer =
reinterpret_cast<uchar *>(my_malloc(PSI_INSTRUMENT_ME, record_len, MYF(0)));
m_scan_it_lower_bound =
reinterpret_cast<uchar *>(my_malloc(PSI_INSTRUMENT_ME, max_packed_sk_len, MYF(0)));
......@@ -6607,6 +6611,7 @@ int ha_rocksdb::alloc_key_buffers(const TABLE *const table_arg,
m_sk_packed_tuple == nullptr || m_sk_packed_tuple_old == nullptr ||
m_end_key_packed_tuple == nullptr || m_pack_buffer == nullptr ||
m_scan_it_upper_bound == nullptr || m_scan_it_lower_bound == nullptr ||
m_record_buffer == nullptr ||
(alloc_alter_buffers && (m_dup_sk_packed_tuple == nullptr ||
m_dup_sk_packed_tuple_old == nullptr))) {
// One or more of the above allocations failed. Clean up and exit
......@@ -6640,6 +6645,9 @@ void ha_rocksdb::free_key_buffers() {
my_free(m_pack_buffer);
m_pack_buffer = nullptr;
my_free(m_record_buffer);
m_record_buffer = nullptr;
my_free(m_dup_sk_packed_tuple);
m_dup_sk_packed_tuple = nullptr;
......@@ -8051,7 +8059,8 @@ int ha_rocksdb::position_to_correct_key(
rc = HA_ERR_KEY_NOT_FOUND;
} else if (find_flag == HA_READ_PREFIX_LAST) {
uint size = kd.pack_index_tuple(table, m_pack_buffer,
m_sk_packed_tuple, key, keypart_map);
m_sk_packed_tuple, m_record_buffer,
key, keypart_map);
rocksdb::Slice lookup_tuple(
reinterpret_cast<char *>(m_sk_packed_tuple), size);
......@@ -8091,7 +8100,7 @@ int ha_rocksdb::calc_eq_cond_len(const Rdb_key_def &kd,
if (end_key) {
*end_key_packed_size =
kd.pack_index_tuple(table, m_pack_buffer, m_end_key_packed_tuple,
end_key->key, end_key->keypart_map);
m_record_buffer, end_key->key, end_key->keypart_map);
/*
Calculating length of the equal conditions here. 4 byte index id is
......@@ -8462,7 +8471,8 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
This is a special case, use DB::Get.
*/
const uint size = kd.pack_index_tuple(table, m_pack_buffer,
m_pk_packed_tuple, key, keypart_map);
m_pk_packed_tuple, m_record_buffer,
key, keypart_map);
bool skip_lookup = is_blind_delete_enabled();
rc = get_row_by_rowid(buf, m_pk_packed_tuple, size, skip_lookup, false);
......@@ -8488,14 +8498,14 @@ int ha_rocksdb::index_read_map_impl(uchar *const buf, const uchar *const key,
.user_defined_key_parts) -
1;
packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
key, tmp_map);
m_record_buffer, key, tmp_map);
if (table->key_info[active_index].user_defined_key_parts !=
kd.get_key_parts()) {
using_full_key = false;
}
} else {
packed_size = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
key, keypart_map);
m_record_buffer, key, keypart_map);
}
if ((pushed_idx_cond && pushed_idx_cond_keyno == active_index) &&
......@@ -11924,6 +11934,7 @@ ha_rows ha_rocksdb::records_in_range(uint inx, const key_range *const min_key,
uint size1 = 0;
if (min_key) {
size1 = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple,
m_record_buffer,
min_key->key, min_key->keypart_map);
if (min_key->flag == HA_READ_PREFIX_LAST_OR_PREV ||
min_key->flag == HA_READ_PREFIX_LAST ||
......@@ -11937,6 +11948,7 @@ ha_rows ha_rocksdb::records_in_range(uint inx, const key_range *const min_key,
uint size2 = 0;
if (max_key) {
size2 = kd.pack_index_tuple(table, m_pack_buffer, m_sk_packed_tuple_old,
m_record_buffer,
max_key->key, max_key->keypart_map);
if (max_key->flag == HA_READ_PREFIX_LAST_OR_PREV ||
max_key->flag == HA_READ_PREFIX_LAST ||
......
......@@ -235,6 +235,11 @@ class ha_rocksdb : public my_core::handler {
*/
uchar *m_pack_buffer;
/*
A buffer long enough to store table record
*/
uchar *m_record_buffer;
/* class to convert between Mysql format and RocksDB format*/
std::shared_ptr<Rdb_converter> m_converter;
......
......@@ -987,7 +987,7 @@ uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
Convert index tuple into storage (i.e. mem-comparable) format
@detail
Currently this is done by unpacking into table->record[0] and then
Currently this is done by unpacking into record_buffer and then
packing index columns into storage format.
@param pack_buffer Temporary area for packing varchar columns. Its
......@@ -996,6 +996,7 @@ uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
uchar *const packed_tuple,
uchar *const record_buffer,
const uchar *const key_tuple,
const key_part_map &keypart_map) const {
DBUG_ASSERT(tbl != nullptr);
......@@ -1005,13 +1006,13 @@ uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
/* We were given a record in KeyTupleFormat. First, save it to record */
const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
key_restore(record_buffer, key_tuple, &tbl->key_info[m_keyno], key_len);
uint n_used_parts = my_count_bits(keypart_map);
if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used
/* Then, convert the record into a mem-comparable form */
return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
return pack_record(tbl, pack_buffer, record_buffer, packed_tuple, nullptr,
false, 0, n_used_parts);
}
......
......@@ -252,7 +252,8 @@ class Rdb_key_def {
public:
/* Convert a key from KeyTupleFormat to mem-comparable form */
uint pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
uchar *const packed_tuple, const uchar *const key_tuple,
uchar *const packed_tuple, uchar *const record_buffer,
const uchar *const key_tuple,
const key_part_map &keypart_map) const;
uchar *pack_field(Field *const field, Rdb_field_packing *pack_info,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment