Commit 3d60427e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2325], move to main

git-svn-id: file:///svn/mysql/tokudb-engine/src@17179 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0e54f841
......@@ -962,24 +962,23 @@ cleanup:
return error;
}
int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) {
int generate_row_for_put(
DB *dest_db,
DB *src_db,
DBT *dest_key,
DBT *dest_val,
const DBT *src_key,
const DBT *src_val,
void *extra
)
{
int error;
DBT pk_key, pk_val;
ROW_BUFFERS row_buffs = NULL;
bzero(&pk_key, sizeof(pk_key));
bzero(&pk_val, sizeof(pk_val));
pk_key.size = *(u_int32_t *)row->data;
pk_key.data = (uchar *)row->data + sizeof(u_int32_t);
pk_val.size = row->size - pk_key.size - sizeof(u_int32_t);
pk_val.data = (uchar *)pk_key.data + pk_key.size;
row_buffs = (ROW_BUFFERS)extra;
for ( u_int32_t i = 0; i < num_dbs; i++) {
DB* curr_db = dbs[i];
DB* curr_db = dest_db;
uchar* row_desc = NULL;
u_int32_t desc_size;
uchar* buff = NULL;
u_int32_t max_key_len = 0;
row_desc = (uchar *)curr_db->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
......@@ -987,97 +986,102 @@ int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys,
row_desc += 4;
if (is_key_pk(row_desc, desc_size)) {
keys[i].data = pk_key.data;
keys[i].size = pk_key.size;
vals[i].data = pk_val.data;
vals[i].size = pk_val.size;
continue;
assert(dest_key->flags != DB_DBT_USERMEM);
assert(dest_val->flags != DB_DBT_USERMEM);
if (dest_key->flags == DB_DBT_REALLOC && dest_key->data != NULL) {
free(dest_key->data);
}
if (dest_val->flags == DB_DBT_REALLOC && dest_val->data != NULL) {
free(dest_val->data);
}
dest_key->data = src_key->data;
dest_key->size = src_key->size;
dest_key->flags = DB_DBT_TEMPMEMORY;
dest_val->data = src_val->data;
dest_val->size = src_val->size;
dest_val->flags = DB_DBT_TEMPMEMORY;
error = 0;
goto cleanup;
}
else {
uchar* buff = NULL;
u_int32_t max_key_len = 0;
if (row_buffs != NULL) {
buff = row_buffs->key_buff[i];
if (dest_key->flags == DB_DBT_USERMEM) {
buff = (uchar *)dest_key->data;
}
else {
else if (dest_key->flags == DB_DBT_REALLOC) {
max_key_len = max_key_size_from_desc(row_desc, desc_size);
max_key_len += pk_key.size;
buff = (uchar *)my_malloc(max_key_len, MYF(MY_WME));
max_key_len += src_key->size;
if (max_key_len > dest_key->ulen) {
void* old_ptr = dest_key->data;
void* new_ptr = NULL;
new_ptr = realloc(old_ptr, max_key_len);
assert(new_ptr);
dest_key->data = new_ptr;
dest_key->ulen = max_key_len;
}
buff = (uchar *)dest_key->data;
assert(buff != NULL && max_key_len > 0);
}
keys[i].size = pack_key_from_desc(
else {
assert(false);
}
dest_key->size = pack_key_from_desc(
buff,
row_desc,
desc_size,
&pk_key,
&pk_val
src_key,
src_val
);
assert(dest_key->ulen >= dest_key->size);
if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY && !max_key_len) {
max_key_len = max_key_size_from_desc(row_desc, desc_size);
max_key_len += pk_key.size;
max_key_len += src_key->size;
}
if (max_key_len) {
assert(max_key_len >= keys[i].size);
}
keys[i].data = buff;
assert(max_key_len >= dest_key->size);
}
row_desc += desc_size;
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
if (!is_key_clustering(row_desc, desc_size)) {
bzero(&vals[i], sizeof(DBT));
dest_val->size = 0;
}
else {
uchar* buff = NULL;
if (row_buffs != NULL) {
buff = row_buffs->rec_buff[i];
if (dest_val->flags == DB_DBT_USERMEM) {
buff = (uchar *)dest_val->data;
}
else if (dest_val->flags == DB_DBT_REALLOC){
if (dest_val->ulen < src_val->size) {
void* old_ptr = dest_val->data;
void* new_ptr = NULL;
new_ptr = realloc(old_ptr, src_val->size);
assert(new_ptr);
dest_val->data = new_ptr;
dest_val->ulen = src_val->size;
}
buff = (uchar *)dest_val->data;
assert(buff != NULL);
}
else {
buff = (uchar *)my_malloc(pk_val.size, MYF(MY_WME));
assert(buff != NULL);
assert(false);
}
vals[i].size = pack_clustering_val_from_desc(
dest_val->size = pack_clustering_val_from_desc(
buff,
row_desc,
desc_size,
&pk_val
src_val
);
vals[i].data = buff;
}
assert(dest_val->ulen >= dest_val->size);
}
error = 0;
cleanup:
return error;
}
int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) {
if (extra == NULL) {
//
// handle allocation of buffers in recovery case later
//
for (u_int32_t i = 0; i < num_dbs; i++) {
DB* curr_db = dbs[i];
uchar* row_desc = NULL;
u_int32_t desc_size;
row_desc = (uchar *)curr_db->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
if (is_key_pk(row_desc, desc_size)) {
continue;
}
else {
my_free(keys[i].data, MYF(MY_ALLOW_ZERO_PTR));
my_free(vals[i].data, MYF(MY_ALLOW_ZERO_PTR));
}
}
}
return 0;
}
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg)
// flags defined in sql\handler.h
......@@ -1107,6 +1111,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
ai_metadata_update_required = false;
bzero(mult_key_buff, sizeof(mult_key_buff));
bzero(mult_rec_buff, sizeof(mult_rec_buff));
bzero(mult_key_dbt, sizeof(mult_key_dbt));
bzero(mult_rec_dbt, sizeof(mult_rec_dbt));
}
//
......@@ -1586,7 +1592,7 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
}
alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields;
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length + max_key_length + sizeof(u_int32_t), MYF(MY_WME));
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
if (rec_buff == NULL) {
ret_val = 1;
......@@ -1599,9 +1605,15 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
}
mult_key_buff[i] = (uchar *)my_malloc(max_key_length, MYF(MY_WME));
assert(mult_key_buff[i] != NULL);
mult_key_dbt[i].ulen = max_key_length;
mult_key_dbt[i].flags = DB_DBT_USERMEM;
mult_key_dbt[i].data = mult_key_buff[i];
if (table_share->key_info[i].flags & HA_CLUSTERING) {
mult_rec_buff[i] = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
assert(mult_rec_buff[i]);
mult_rec_dbt[i].ulen = alloced_rec_buff_length;
mult_rec_dbt[i].flags = DB_DBT_USERMEM;
mult_rec_dbt[i].data = mult_rec_buff[i];
}
}
alloced_mult_rec_buff_length = alloced_rec_buff_length;
......@@ -1889,7 +1901,7 @@ int ha_tokudb::__close(int mutex_is_locked) {
bool ha_tokudb::fix_rec_buff_for_blob(ulong length) {
if (!rec_buff || (length > alloced_rec_buff_length)) {
uchar *newptr;
if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length+max_key_length+sizeof(u_int32_t), MYF(MY_ALLOW_ZERO_PTR))))
if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length, MYF(MY_ALLOW_ZERO_PTR))))
return 1;
rec_buff = newptr;
alloced_rec_buff_length = length;
......@@ -1906,6 +1918,9 @@ void ha_tokudb::fix_mult_rec_buff() {
assert(false);
}
mult_rec_buff[i] = newptr;
mult_rec_dbt[i].ulen = alloced_rec_buff_length;
mult_rec_dbt[i].flags = DB_DBT_USERMEM;
mult_rec_dbt[i].data = mult_rec_buff[i];
}
}
alloced_mult_rec_buff_length = alloced_rec_buff_length;
......@@ -1942,12 +1957,10 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
int ha_tokudb::pack_row(
DBT * row,
uchar* buf,
const uchar* record,
uint index
)
{
uchar* dest_buf = NULL;
uchar* fixed_field_ptr = NULL;
uchar* var_field_offset_ptr = NULL;
uchar* start_field_data_ptr = NULL;
......@@ -1957,18 +1970,17 @@ int ha_tokudb::pack_row(
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
if ((buf == NULL) && table_share->blob_fields) {
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
}
dest_buf = (buf == NULL) ? rec_buff : buf;
/* Copy null bits */
memcpy(dest_buf, record, table_share->null_bytes);
fixed_field_ptr = dest_buf + table_share->null_bytes;
memcpy(rec_buff, record, table_share->null_bytes);
fixed_field_ptr = rec_buff + table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset;
start_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
......@@ -2011,8 +2023,8 @@ int ha_tokudb::pack_row(
);
}
row->data = dest_buf;
row->size = (size_t) (var_field_data_ptr - dest_buf);
row->data = rec_buff;
row->size = (size_t) (var_field_data_ptr - rec_buff);
r = 0;
cleanup:
......@@ -2960,7 +2972,7 @@ int ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) {
// test key packing of clustering keys
//
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, mult_rec_buff[keynr], (const uchar *) record, keynr);
error = pack_row(&row, (const uchar *) record, keynr);
if (error) { goto cleanup; }
uchar* tmp_buff = NULL;
tmp_buff = (uchar *)my_malloc(alloced_rec_buff_length,MYF(MY_WME));
......@@ -2977,7 +2989,7 @@ int ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) {
&tmp_pk_val
);
assert(tmp_num_bytes == row.size);
cmp = memcmp(tmp_buff,mult_rec_buff[keynr],tmp_num_bytes);
cmp = memcmp(tmp_buff,rec_buff,tmp_num_bytes);
assert(cmp == 0);
my_free(tmp_buff,MYF(MY_ALLOW_ZERO_PTR));
}
......@@ -3054,7 +3066,7 @@ int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_v
put_flags = DB_YESOVERWRITE;
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, NULL, (const uchar *) record, keynr);
error = pack_row(&row, (const uchar *) record, keynr);
if (error) { goto cleanup; }
}
else {
......@@ -3085,17 +3097,10 @@ cleanup:
return error;
}
int ha_tokudb::insert_rows_to_dictionaries_mult(uchar* row_buff, u_int32_t row_buff_size, DB_TXN* txn, THD* thd) {
int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd) {
int error;
DBT row;
struct row_buffers row_buff_struct;
bool is_replace_into;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
bzero(&row, sizeof(row));
row.data = row_buff;
row.size = row_buff_size;
row_buff_struct.key_buff = mult_key_buff;
row_buff_struct.rec_buff = mult_rec_buff;
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
......@@ -3105,7 +3110,21 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(uchar* row_buff, u_int32_t row_b
else {
share->mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
error = db_env->put_multiple(db_env, txn, &row, curr_num_DBs, share->key_file, share->mult_put_flags, &row_buff_struct);
error = db_env->put_multiple(
db_env,
NULL,
txn,
pk_key,
pk_val,
curr_num_DBs,
share->key_file,
mult_key_dbt,
mult_rec_dbt,
share->mult_put_flags,
NULL
);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
......@@ -3184,12 +3203,8 @@ int ha_tokudb::write_row(uchar * record) {
}
}
create_dbt_key_from_table(&prim_key, primary_key, rec_buff + sizeof(u_int32_t), record, &has_null);
//
// copy len of pk at beginning of rec_buff
//
memcpy(rec_buff, &prim_key.size, sizeof(u_int32_t));
if ((error = pack_row(&row, rec_buff + prim_key.size+sizeof(u_int32_t), (const uchar *) record, primary_key))){
create_dbt_key_from_table(&prim_key, primary_key, primary_key_buff, record, &has_null);
if ((error = pack_row(&row, (const uchar *) record, primary_key))){
goto cleanup;
}
......@@ -3220,7 +3235,7 @@ int ha_tokudb::write_row(uchar * record) {
if (error) { goto cleanup; }
}
else {
error = insert_rows_to_dictionaries_mult(rec_buff, sizeof(u_int32_t) + prim_key.size + row.size, txn, thd);
error = insert_rows_to_dictionaries_mult(&prim_key, &row, txn, thd);
if (error) { goto cleanup; }
}
......@@ -3287,7 +3302,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
error = remove_key(trans, primary_key, old_row, old_key);
if (error) { goto cleanup; }
error = pack_row(&row, NULL, new_row, primary_key);
error = pack_row(&row, new_row, primary_key);
if (error) { goto cleanup; }
error = share->file->put(share->file, trans, new_key, &row, put_flags);
......@@ -3298,7 +3313,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
}
else {
// Primary key didn't change; just update the row data
error = pack_row(&row, NULL, new_row, primary_key);
error = pack_row(&row, new_row, primary_key);
if (error) { goto cleanup; }
//
......@@ -3437,7 +3452,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
put_flags = DB_YESOVERWRITE;
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, NULL, (const uchar *) new_row, keynr);
error = pack_row(&row, (const uchar *) new_row, keynr);
if (error){ goto cleanup; }
}
else {
......@@ -6140,7 +6155,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
if (key_info[i].flags & HA_CLUSTERING) {
if ((error = pack_row(&row, NULL, (const uchar *) tmp_record, curr_index))){
if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){
goto cleanup;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
......
......@@ -97,8 +97,15 @@ typedef enum {
} TABLE_LOCK_TYPE;
int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx);
int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
int generate_row_for_put(
DB *dest_db,
DB *src_db,
DBT *dest_key,
DBT *dest_val,
const DBT *src_key,
const DBT *src_val,
void *extra
);
class ha_tokudb : public handler {
......@@ -152,6 +159,9 @@ private:
//
uchar* mult_key_buff[MAX_KEY];
uchar* mult_rec_buff[MAX_KEY];
DBT mult_key_dbt[MAX_KEY + 1];
DBT mult_rec_dbt[MAX_KEY + 1];
ulong alloced_mult_rec_buff_length;
//
......@@ -241,7 +251,6 @@ private:
ulong max_row_length(const uchar * buf);
int pack_row(
DBT * row,
uchar* buf,
const uchar* record,
uint index
);
......@@ -291,7 +300,7 @@ private:
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
int insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
int insert_rows_to_dictionaries_mult(uchar* row, u_int32_t row_size, DB_TXN* txn, THD* thd);
int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd);
int test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val);
......
......@@ -2044,7 +2044,7 @@ u_int32_t pack_clustering_val_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_val
const DBT* pk_val
)
{
uchar* null_bytes_src_ptr = NULL;
......@@ -2521,8 +2521,8 @@ u_int32_t pack_key_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_key,
DBT* pk_val
const DBT* pk_key,
const DBT* pk_val
)
{
MULTI_COL_PACK_INFO mcp_info;
......
......@@ -238,7 +238,7 @@ u_int32_t pack_clustering_val_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_val
const DBT* pk_val
);
u_int32_t get_max_secondary_key_pack_desc_size(
......@@ -275,8 +275,8 @@ u_int32_t pack_key_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_key,
DBT* pk_val
const DBT* pk_key,
const DBT* pk_val
);
......
......@@ -273,7 +273,7 @@ static int tokudb_init_func(void *p) {
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags);
r = db_env->set_multiple_callbacks(db_env, generate_keys_vals_for_put, cleanup_keys_vals_for_put, NULL, NULL);
r = db_env->set_generate_row_callback_for_put(db_env,generate_row_for_put);
assert(!r);
r = db_env->open(db_env, tokudb_home, tokudb_init_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment