Commit 2cfb7b0a authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #6022 implement cardinality on the mainline

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@53943 c7de825b-a66e-492c-adef-691d508d4ae1
parent dc9b51bd
...@@ -218,11 +218,11 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) { ...@@ -218,11 +218,11 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
result = error; result = error;
} }
my_hash_delete(&tokudb_open_tables, (uchar *) share); my_hash_delete(&tokudb_open_tables, (uchar *) share);
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex); pthread_mutex_destroy(&share->mutex);
rwlock_destroy(&share->num_DBs_lock); rwlock_destroy(&share->num_DBs_lock);
my_free((uchar *) share, MYF(0)); my_free((uchar *) share, MYF(0));
} }
pthread_mutex_unlock(&tokudb_mutex); pthread_mutex_unlock(&tokudb_mutex);
...@@ -5918,10 +5918,14 @@ int ha_tokudb::info(uint flag) { ...@@ -5918,10 +5918,14 @@ int ha_tokudb::info(uint flag) {
} }
if ((flag & HA_STATUS_CONST)) { if ((flag & HA_STATUS_CONST)) {
stats.max_data_file_length= 9223372036854775807ULL; stats.max_data_file_length= 9223372036854775807ULL;
for (uint i = 0; i < table_share->keys; i++) { uint64_t rec_per_key[table_share->key_parts];
bool is_unique_key = (i == primary_key) || (table->key_info[i].flags & HA_NOSAME); error = share->get_card_from_status(txn, table_share->key_parts, rec_per_key);
ulong val = (is_unique_key) ? 1 : 0; if (error == 0) {
table->key_info[i].rec_per_key[get_key_parts(&table->key_info[i]) - 1] = val; share->set_card_in_key_info(table, table_share->key_parts, rec_per_key);
} else {
for (uint i = 0; i < table_share->key_parts; i++)
rec_per_key[i] = 0;
share->set_card_in_key_info(table, table_share->key_parts, rec_per_key);
} }
} }
/* Don't return key if we got an error for the internal primary key */ /* Don't return key if we got an error for the internal primary key */
...@@ -8277,12 +8281,12 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) { ...@@ -8277,12 +8281,12 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
return idx_cond_arg; return idx_cond_arg;
} }
// table admin // table admin
#include "tokudb_card.cc"
#include "ha_tokudb_admin.cc" #include "ha_tokudb_admin.cc"
// update functions // update functions
#include "ha_tokudb_update_fun.cc" #include "tokudb_update_fun.cc"
// fast updates // fast updates
#include "ha_tokudb_update.cc" #include "ha_tokudb_update.cc"
...@@ -8291,21 +8295,23 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) { ...@@ -8291,21 +8295,23 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
#include "ha_tokudb_alter_55.cc" #include "ha_tokudb_alter_55.cc"
#include "ha_tokudb_alter_56.cc" #include "ha_tokudb_alter_56.cc"
// maria mrr // mrr
#ifdef MARIADB_BASE_VERSION #ifdef MARIADB_BASE_VERSION
#include "ha_tokudb_mrr_maria.cc" #include "ha_tokudb_mrr_maria.cc"
#endif #elif 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
#if !defined(MARIADB_BASE_VERSION)
#if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
#include "ha_tokudb_mrr_mysql.cc" #include "ha_tokudb_mrr_mysql.cc"
#endif #endif
#endif
// key comparisons // key comparisons
#include "hatoku_cmp.cc" #include "hatoku_cmp.cc"
// handlerton // handlerton
#include "hatoku_hton.cc" #include "hatoku_hton.cc"
// generate template functions
namespace tokudb {
template size_t vlq_encode_ui(uint32_t n, void *p, size_t s);
template size_t vlq_decode_ui(uint32_t *np, void *p, size_t s);
template size_t vlq_encode_ui(uint64_t n, void *p, size_t s);
template size_t vlq_decode_ui(uint64_t *np, void *p, size_t s);
};
...@@ -4,6 +4,28 @@ ...@@ -4,6 +4,28 @@
#include <db.h> #include <db.h>
#include "hatoku_cmp.h" #include "hatoku_cmp.h"
#define HA_TOKU_ORIG_VERSION 4
#define HA_TOKU_VERSION 4
//
// no capabilities yet
//
#define HA_TOKU_CAP 0
//
// These are keys that will be used for retrieving metadata in status.tokudb
// To get the version, one looks up the value associated with key hatoku_version
// in status.tokudb
//
typedef ulonglong HA_METADATA_KEY;
#define hatoku_old_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
#define hatoku_key_name 4
#define hatoku_frm_data 5
#define hatoku_new_version 6
#define hatoku_cardinality 7
class ha_tokudb; class ha_tokudb;
typedef struct loader_context { typedef struct loader_context {
...@@ -28,7 +50,8 @@ typedef struct hot_optimize_context { ...@@ -28,7 +50,8 @@ typedef struct hot_optimize_context {
// Some of the variables here are the DB* pointers to indexes, // Some of the variables here are the DB* pointers to indexes,
// and auto increment information. // and auto increment information.
// //
typedef struct st_tokudb_share { class TOKUDB_SHARE {
public:
char *table_name; char *table_name;
uint table_name_length, use_count; uint table_name_length, use_count;
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -82,29 +105,30 @@ typedef struct st_tokudb_share { ...@@ -82,29 +105,30 @@ typedef struct st_tokudb_share {
bool replace_into_fast; bool replace_into_fast;
rw_lock_t num_DBs_lock; rw_lock_t num_DBs_lock;
uint32_t num_DBs; uint32_t num_DBs;
} TOKUDB_SHARE;
#define HA_TOKU_ORIG_VERSION 4 // Set the key_info cardinality counters for the table.
#define HA_TOKU_VERSION 4 void set_card_in_key_info(TABLE *table, uint rec_per_keys, uint64_t rec_per_key[]);
//
// no capabilities yet
//
#define HA_TOKU_CAP 0
// // Put the cardinality counters into the status dictionary.
// These are keys that will be used for retrieving metadata in status.tokudb void set_card_in_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]);
// To get the version, one looks up the value associated with key hatoku_version
// in status.tokudb
//
typedef ulonglong HA_METADATA_KEY; // Get the cardinality counters from the status dictionary.
#define hatoku_old_version 0 int get_card_from_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]);
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far // Delete the cardinality counters from the status dictionary.
#define hatoku_ai_create_value 3 void delete_card_from_status(DB_TXN *txn);
#define hatoku_key_name 4
#define hatoku_frm_data 5 // Get the val for a given key in the status dictionary.
#define hatoku_new_version 6 // Returns 0 if successful.
int get_status(DB_TXN *txn, HA_METADATA_KEY k, DBT *val);
int get_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s);
// Put a val for a given key into the status dictionary.
int put_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s);
// Delete a key from the status dictionary.
int delete_status(DB_TXN *txn, HA_METADATA_KEY k);
};
typedef struct st_filter_key_part_info { typedef struct st_filter_key_part_info {
uint offset; uint offset;
...@@ -472,6 +496,7 @@ public: ...@@ -472,6 +496,7 @@ public:
int optimize(THD * thd, HA_CHECK_OPT * check_opt); int optimize(THD * thd, HA_CHECK_OPT * check_opt);
#if TOKU_INCLUDE_ANALYZE #if TOKU_INCLUDE_ANALYZE
int analyze(THD * thd, HA_CHECK_OPT * check_opt); int analyze(THD * thd, HA_CHECK_OPT * check_opt);
int analyze_key(THD *thd, DB_TXN *txn, uint key_i, KEY *key_info, uint64_t num_key_parts, uint64_t *rec_per_key_part);
#endif #endif
int write_row(uchar * buf); int write_row(uchar * buf);
int update_row(const uchar * old_data, uchar * new_data); int update_row(const uchar * old_data, uchar * new_data);
......
#if TOKU_INCLUDE_ANALYZE #if TOKU_INCLUDE_ANALYZE
volatile int ha_tokudb_analyze_wait = 0; // debug
int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) { int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
TOKUDB_DBUG_ENTER("ha_tokudb::analyze"); TOKUDB_DBUG_ENTER("ha_tokudb::analyze");
TOKUDB_DBUG_RETURN(HA_ADMIN_OK); while (ha_tokudb_analyze_wait) sleep(1); // debug concurrency issues
uint64_t rec_per_key[table_share->key_parts];
int result = HA_ADMIN_OK;
DB_TXN *txn = transaction;
if (!txn)
result = HA_ADMIN_FAILED;
if (result == HA_ADMIN_OK) {
uint next_key_part = 0;
// compute cardinality for each key
for (uint i = 0; result == HA_ADMIN_OK && i < table_share->keys; i++) {
KEY *key_info = &table_share->key_info[i];
uint64_t num_key_parts = get_key_parts(key_info);
int error = analyze_key(thd, txn, i, key_info, num_key_parts, &rec_per_key[next_key_part]);
if (error) {
result = HA_ADMIN_FAILED;
} else {
// debug
if (tokudb_debug & TOKUDB_DEBUG_ANALYZE) {
fprintf(stderr, "ha_tokudb::analyze %s.%s.%s ",
table_share->db.str, table_share->table_name.str, i == primary_key ? "primary" : table_share->key_info[i].name);
for (uint j = 0; j < num_key_parts; j++)
fprintf(stderr, "%lu ", rec_per_key[next_key_part+j]);
fprintf(stderr, "\n");
}
}
next_key_part += num_key_parts;
}
}
if (result == HA_ADMIN_OK)
share->set_card_in_status(txn, table_share->key_parts, rec_per_key);
TOKUDB_DBUG_RETURN(result);
}
// Compute records per key for all key parts of the ith key of the table.
// For each key part, put records per key part in *rec_per_key_part[key_part_index].
// Returns 0 if success, otherwise an error number.
// TODO statistical dives into the FT
int ha_tokudb::analyze_key(THD *thd, DB_TXN *txn, uint key_i, KEY *key_info, uint64_t num_key_parts, uint64_t *rec_per_key_part) {
TOKUDB_DBUG_ENTER("ha_tokudb::analyze_key");
int error = 0;
DB *db = share->key_file[key_i];
DBC *cursor = NULL;
error = db->cursor(db, txn, &cursor, 0);
if (error == 0) {
uint64_t rows = 0;
uint64_t unique_rows[num_key_parts];
for (uint64_t i = 0; i < num_key_parts; i++)
unique_rows[i] = 1;
// stop looking when the entire dictionary was analyzed, or a cap on execution time was reached, or the analyze was killed.
DBT key = {}; key.flags = DB_DBT_REALLOC;
DBT prev_key = {}; prev_key.flags = DB_DBT_REALLOC;
time_t t_start = time(0);
while (1) {
error = cursor->c_get(cursor, &key, 0, DB_NEXT);
if (error != 0) {
if (error == DB_NOTFOUND)
error = 0; // eof is not an error
break;
}
rows++;
// first row is a unique row, otherwise compare with the previous key
bool copy_key = false;
if (rows == 1) {
copy_key = true;
} else {
// compare this key with the previous key. ignore appended PK for SK's.
// TODO if a prefix is different, then all larger keys that include the prefix are also different.
// TODO if we are comparing the entire primary key or the entire unique secondary key, then the cardinality must be 1,
// so we can avoid computing it.
for (uint64_t i = 0; i < num_key_parts; i++) {
int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
if (cmp != 0) {
unique_rows[i]++;
copy_key = true;
}
}
}
// prev_key = key
if (copy_key) {
prev_key.data = realloc(prev_key.data, key.size);
assert(prev_key.data);
prev_key.size = key.size;
memcpy(prev_key.data, key.data, prev_key.size);
}
// check for limit
if ((rows % 1000) == 0) {
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
break;
}
time_t t_now = time(0);
time_t t_limit = get_analyze_time(thd);
if (t_limit > 0 && t_now - t_start > t_limit)
break;
float progress_rows = 0.0;
if (share->rows > 0)
progress_rows = (float) rows / (float) share->rows;
float progress_time = 0.0;
if (t_limit > 0)
progress_time = (float) (t_now - t_start) / (float) t_limit;
sprintf(write_status_msg, "%s.%s.%s %u of %u %.lf%% rows %.lf%% time",
table_share->db.str, table_share->table_name.str, key_i == primary_key ? "primary" : table_share->key_info[key_i].name,
key_i, table_share->keys, progress_rows * 100.0, progress_time * 100.0);
thd_proc_info(thd, write_status_msg);
}
}
// cleanup
free(key.data);
free(prev_key.data);
int close_error = cursor->c_close(cursor);
assert(close_error == 0);
// return cardinality
if (error == 0) {
for (uint64_t i = 0; i < num_key_parts; i++)
rec_per_key_part[i] = rows / unique_rows[i];
}
}
TOKUDB_DBUG_RETURN(error);
} }
#endif #endif
...@@ -102,7 +221,6 @@ static void ha_tokudb_check_info(THD *thd, TABLE *table, const char *msg) { ...@@ -102,7 +221,6 @@ static void ha_tokudb_check_info(THD *thd, TABLE *table, const char *msg) {
} }
} }
volatile int ha_tokudb_check_verbose = 0; // debug
volatile int ha_tokudb_check_wait = 0; // debug volatile int ha_tokudb_check_wait = 0; // debug
int ha_tokudb::check(THD *thd, HA_CHECK_OPT *check_opt) { int ha_tokudb::check(THD *thd, HA_CHECK_OPT *check_opt) {
...@@ -128,38 +246,32 @@ int ha_tokudb::check(THD *thd, HA_CHECK_OPT *check_opt) { ...@@ -128,38 +246,32 @@ int ha_tokudb::check(THD *thd, HA_CHECK_OPT *check_opt) {
result = HA_ADMIN_INTERNAL_ERROR; result = HA_ADMIN_INTERNAL_ERROR;
if (result == HA_ADMIN_OK) { if (result == HA_ADMIN_OK) {
uint32_t num_DBs = table_share->keys + test(hidden_primary_key); uint32_t num_DBs = table_share->keys + test(hidden_primary_key);
time_t now;
char timebuf[32];
snprintf(write_status_msg, sizeof write_status_msg, "%s primary=%d num=%d", share->table_name, primary_key, num_DBs); snprintf(write_status_msg, sizeof write_status_msg, "%s primary=%d num=%d", share->table_name, primary_key, num_DBs);
if (ha_tokudb_check_verbose) { if (tokudb_debug & TOKUDB_DEBUG_CHECK) {
ha_tokudb_check_info(thd, table, write_status_msg); ha_tokudb_check_info(thd, table, write_status_msg);
now = time(0); time_t now = time(0);
char timebuf[32];
fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg); fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg);
} }
for (uint i = 0; i < num_DBs; i++) { for (uint i = 0; i < num_DBs; i++) {
time_t now;
DB *db = share->key_file[i]; DB *db = share->key_file[i];
const char *kname = NULL; const char *kname = i == primary_key ? "primary" : table_share->key_info[i].name;
if (i == primary_key) {
kname = "primary"; // hidden primary key does not set name
}
else {
kname = table_share->key_info[i].name;
}
snprintf(write_status_msg, sizeof write_status_msg, "%s key=%s %u", share->table_name, kname, i); snprintf(write_status_msg, sizeof write_status_msg, "%s key=%s %u", share->table_name, kname, i);
thd_proc_info(thd, write_status_msg); thd_proc_info(thd, write_status_msg);
if (ha_tokudb_check_verbose) { if (tokudb_debug & TOKUDB_DEBUG_CHECK) {
ha_tokudb_check_info(thd, table, write_status_msg); ha_tokudb_check_info(thd, table, write_status_msg);
now = time(0); time_t now = time(0);
char timebuf[32];
fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg); fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg);
} }
struct check_context check_context = { thd }; struct check_context check_context = { thd };
r = db->verify_with_progress(db, ha_tokudb_check_progress, &check_context, ha_tokudb_check_verbose, keep_going); r = db->verify_with_progress(db, ha_tokudb_check_progress, &check_context, (tokudb_debug & TOKUDB_DEBUG_CHECK) != 0, keep_going);
snprintf(write_status_msg, sizeof write_status_msg, "%s key=%s %u result=%d", share->table_name, kname, i, r); snprintf(write_status_msg, sizeof write_status_msg, "%s key=%s %u result=%d", share->table_name, kname, i, r);
thd_proc_info(thd, write_status_msg); thd_proc_info(thd, write_status_msg);
if (ha_tokudb_check_verbose) { if (tokudb_debug & TOKUDB_DEBUG_CHECK) {
ha_tokudb_check_info(thd, table, write_status_msg); ha_tokudb_check_info(thd, table, write_status_msg);
now = time(0); time_t now = time(0);
char timebuf[32];
fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg); fprintf(stderr, "%.24s ha_tokudb::check %s\n", ctime_r(&now, timebuf), write_status_msg);
} }
if (result == HA_ADMIN_OK && r != 0) { if (result == HA_ADMIN_OK && r != 0) {
......
...@@ -420,6 +420,9 @@ int ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *h ...@@ -420,6 +420,9 @@ int ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *h
} }
my_free(key_info); my_free(key_info);
if (error == 0)
share->delete_card_from_status(ctx->alter_txn);
return error; return error;
} }
...@@ -465,6 +468,9 @@ int ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info * ...@@ -465,6 +468,9 @@ int ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *
int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, key_info, ctx->alter_txn); int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, key_info, ctx->alter_txn);
if (error == 0)
share->delete_card_from_status(ctx->alter_txn);
return error; return error;
} }
......
...@@ -1577,6 +1577,109 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) { ...@@ -1577,6 +1577,109 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
return cmp; return cmp;
} }
#if TOKU_INCLUDE_ANALYZE
static int tokudb_compare_two_key_parts(
const void* new_key_data,
const uint32_t new_key_size,
const void* saved_key_data,
const uint32_t saved_key_size,
const void* row_desc,
const uint32_t row_desc_size,
uint max_parts
)
{
int ret_val = 0;
uchar* row_desc_ptr = (uchar *)row_desc;
uchar *new_key_ptr = (uchar *)new_key_data;
uchar *saved_key_ptr = (uchar *)saved_key_data;
//
// if the keys have an infinity byte, set it
//
if (row_desc_ptr[0]) {
// new_key_inf_val = (int8_t)new_key_ptr[0];
// saved_key_inf_val = (int8_t)saved_key_ptr[0];
new_key_ptr++;
saved_key_ptr++;
}
row_desc_ptr++;
for (uint i = 0; i < max_parts; i++) {
if (!((uint32_t)(new_key_ptr - (uchar *)new_key_data) < new_key_size &&
(uint32_t)(saved_key_ptr - (uchar *)saved_key_data) < saved_key_size &&
(uint32_t)(row_desc_ptr - (uchar *)row_desc) < row_desc_size))
break;
uint32_t new_key_field_length;
uint32_t saved_key_field_length;
uint32_t row_desc_field_length;
//
// if there is a null byte at this point in the key
//
if (row_desc_ptr[0]) {
//
// compare null bytes. If different, return
//
if (new_key_ptr[0] != saved_key_ptr[0]) {
ret_val = ((int) *new_key_ptr - (int) *saved_key_ptr);
goto exit;
}
saved_key_ptr++;
//
// in case we just read the fact that new_key_ptr and saved_key_ptr
// have NULL as their next field
//
if (!*new_key_ptr++) {
//
// skip row_desc_ptr[0] read in if clause
//
row_desc_ptr++;
//
// skip data that describes rest of field
//
row_desc_ptr += skip_field_in_descriptor(row_desc_ptr);
continue;
}
}
row_desc_ptr++;
ret_val = compare_toku_field(
new_key_ptr,
saved_key_ptr,
row_desc_ptr,
&new_key_field_length,
&saved_key_field_length,
&row_desc_field_length
);
new_key_ptr += new_key_field_length;
saved_key_ptr += saved_key_field_length;
row_desc_ptr += row_desc_field_length;
if (ret_val) {
goto exit;
}
assert((uint32_t)(new_key_ptr - (uchar *)new_key_data) <= new_key_size);
assert((uint32_t)(saved_key_ptr - (uchar *)saved_key_data) <= saved_key_size);
assert((uint32_t)(row_desc_ptr - (uchar *)row_desc) <= row_desc_size);
}
ret_val = 0;
exit:
return ret_val;
}
static int tokudb_cmp_dbt_key_parts(DB *file, const DBT *keya, const DBT *keyb, uint max_parts) {
assert(file->cmp_descriptor->dbt.size);
return tokudb_compare_two_key_parts(
keya->data,
keya->size,
keyb->data,
keyb->size,
(uchar *)file->cmp_descriptor->dbt.data + 4,
(*(uint32_t *)file->cmp_descriptor->dbt.data) - 4,
max_parts);
}
#endif
uint32_t create_toku_main_key_pack_descriptor ( uint32_t create_toku_main_key_pack_descriptor (
uchar* buf uchar* buf
......
...@@ -274,12 +274,25 @@ int tokudb_compare_two_keys( ...@@ -274,12 +274,25 @@ int tokudb_compare_two_keys(
bool cmp_prefix bool cmp_prefix
); );
int tokudb_cmp_dbt_key(DB* db, const DBT *keya, const DBT *keyb); int tokudb_cmp_dbt_key(DB* db, const DBT *keya, const DBT *keyb);
//TODO: QQQ Only do one direction for prefix. //TODO: QQQ Only do one direction for prefix.
int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb); int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb);
#if TOKU_INCLUDE_ANALYZE
static int tokudb_compare_two_key_parts(
const void* new_key_data,
const uint32_t new_key_size,
const void* saved_key_data,
const uint32_t saved_key_size,
const void* row_desc,
const uint32_t row_desc_size,
uint max_parts
);
static int tokudb_cmp_dbt_key_parts(DB *file, const DBT *keya, const DBT *keyb, uint max_parts);
#endif
int create_toku_key_descriptor( int create_toku_key_descriptor(
uchar* buf, uchar* buf,
bool is_first_hpk, bool is_first_hpk,
......
...@@ -49,6 +49,7 @@ ...@@ -49,6 +49,7 @@
#if DB_TYPE_TOKUDB_DEFINED #if DB_TYPE_TOKUDB_DEFINED
#define TOKU_INCLUDE_EXTENDED_KEYS 1 #define TOKU_INCLUDE_EXTENDED_KEYS 1
#endif #endif
#define TOKU_INCLUDE_ANALYZE 1
#elif 50500 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50599 #elif 50500 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50599
#define TOKU_INCLUDE_ALTER_56 1 #define TOKU_INCLUDE_ALTER_56 1
...@@ -61,6 +62,7 @@ ...@@ -61,6 +62,7 @@
#if defined(MARIADB_BASE_VERSION) && DB_TYPE_TOKUDB_DEFINED #if defined(MARIADB_BASE_VERSION) && DB_TYPE_TOKUDB_DEFINED
#define TOKU_INCLUDE_EXTENDED_KEYS 1 #define TOKU_INCLUDE_EXTENDED_KEYS 1
#endif #endif
#define TOKU_INCLUDE_ANALYZE 1
#else #else
...@@ -122,6 +124,8 @@ extern ulong tokudb_debug; ...@@ -122,6 +124,8 @@ extern ulong tokudb_debug;
#define TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS 2048 #define TOKUDB_DEBUG_HIDE_DDL_LOCK_ERRORS 2048
#define TOKUDB_DEBUG_ALTER_TABLE_INFO 4096 #define TOKUDB_DEBUG_ALTER_TABLE_INFO 4096
#define TOKUDB_DEBUG_UPSERT 8192 #define TOKUDB_DEBUG_UPSERT 8192
#define TOKUDB_DEBUG_CHECK (1<<14)
#define TOKUDB_DEBUG_ANALYZE (1<<15)
#define TOKUDB_TRACE(f, ...) \ #define TOKUDB_TRACE(f, ...) \
printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__); printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__);
......
...@@ -155,6 +155,18 @@ static MYSQL_THDVAR_BOOL(disable_slow_upsert, ...@@ -155,6 +155,18 @@ static MYSQL_THDVAR_BOOL(disable_slow_upsert,
false // default false // default
); );
#endif #endif
#if TOKU_INCLUDE_ANALYZE
static MYSQL_THDVAR_UINT(analyze_time,
0,
"analyze time",
NULL,
NULL,
60, // default
0, // min
~0U, // max
1 // blocksize???
);
#endif
static void tokudb_checkpoint_lock(THD * thd); static void tokudb_checkpoint_lock(THD * thd);
static void tokudb_checkpoint_unlock(THD * thd); static void tokudb_checkpoint_unlock(THD * thd);
...@@ -704,14 +716,19 @@ uint get_tokudb_read_buf_size(THD* thd) { ...@@ -704,14 +716,19 @@ uint get_tokudb_read_buf_size(THD* thd) {
} }
#if TOKU_INCLUDE_UPSERT #if TOKU_INCLUDE_UPSERT
bool get_disable_slow_update(THD* thd) { bool get_disable_slow_update(THD *thd) {
return (THDVAR(thd, disable_slow_update) != 0); return (THDVAR(thd, disable_slow_update) != 0);
} }
bool get_disable_slow_upsert(THD* thd) { bool get_disable_slow_upsert(THD *thd) {
return (THDVAR(thd, disable_slow_upsert) != 0); return (THDVAR(thd, disable_slow_upsert) != 0);
} }
#endif #endif
#if TOKU_INCLUDE_ANALYZE
uint get_analyze_time(THD *thd) {
return THDVAR(thd, analyze_time);
}
#endif
typedef struct txn_progress_info { typedef struct txn_progress_info {
char status[200]; char status[200];
...@@ -1912,6 +1929,9 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1912,6 +1929,9 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
#if TOKU_INCLUDE_UPSERT #if TOKU_INCLUDE_UPSERT
MYSQL_SYSVAR(disable_slow_update), MYSQL_SYSVAR(disable_slow_update),
MYSQL_SYSVAR(disable_slow_upsert), MYSQL_SYSVAR(disable_slow_upsert),
#endif
#if TOKU_INCLUDE_ANALYZE
MYSQL_SYSVAR(analyze_time),
#endif #endif
NULL NULL
}; };
......
...@@ -38,6 +38,9 @@ bool get_disable_slow_update(THD *thd); ...@@ -38,6 +38,9 @@ bool get_disable_slow_update(THD *thd);
bool get_enable_fast_upsert(THD *thd); bool get_enable_fast_upsert(THD *thd);
bool get_disable_slow_upsert(THD *thd); bool get_disable_slow_upsert(THD *thd);
#endif #endif
#if TOKU_INCLUDE_ANALYZE
uint get_analyze_time(THD *thd);
#endif
extern HASH tokudb_open_tables; extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex; extern pthread_mutex_t tokudb_mutex;
......
...@@ -17,5 +17,9 @@ check: $(CHECKS) ...@@ -17,5 +17,9 @@ check: $(CHECKS)
true true
%.check: % %.check: %
valgrind ./$< valgrind -q ./$<
max_test.check: max_test
valgrind -q ./$< 1 2
int TOKUDB_SHARE::get_status(DB_TXN *txn, HA_METADATA_KEY k, DBT *val) {
DBT key = { .data = &k, .size = sizeof k };
int error = status_block->get(status_block, txn, &key, val, 0);
return error;
}
int TOKUDB_SHARE::get_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s) {
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = p, .size = (uint32_t) s, }; val.flags = DB_DBT_USERMEM;
int error = status_block->get(status_block, txn, &key, &val, 0);
return error;
}
int TOKUDB_SHARE::put_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s) {
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = p, .size = (uint32_t) s };
int error = status_block->put(status_block, txn, &key, &val, 0);
return error;
}
int TOKUDB_SHARE::delete_status(DB_TXN *txn, HA_METADATA_KEY k) {
DBT key = { .data = &k, .size = sizeof k };
int error = status_block->del(status_block, txn, &key, DB_DELETE_ANY);
return error;
}
void TOKUDB_SHARE::set_card_in_key_info(TABLE *table, uint rec_per_keys, uint64_t rec_per_key[]) {
uint next_key_part = 0;
for (uint i = 0; i < table->s->keys; i++) {
bool is_unique_key = (i == table->s->primary_key) || (table->key_info[i].flags & HA_NOSAME);
uint num_key_parts = get_key_parts(&table->key_info[i]);
for (uint j = 0; j < num_key_parts; j++) {
assert(next_key_part < rec_per_keys);
ulong val = rec_per_key[next_key_part++];
if (is_unique_key && j == num_key_parts-1)
val = 1;
table->key_info[i].rec_per_key[j] = val;
}
}
}
#include "tokudb_buffer.h"
void TOKUDB_SHARE::set_card_in_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// encode cardinality into the buffer
tokudb::buffer b;
size_t s;
s = b.append_ui<uint32_t>(rec_per_keys);
assert(s > 0);
for (uint i = 0; i < rec_per_keys; i++) {
s = b.append_ui<uint64_t>(rec_per_key[i]);
assert(s > 0);
}
// write cardinality to status
int error = put_status(txn, hatoku_cardinality, b.data(), b.size());
assert(error == 0);
}
int TOKUDB_SHARE::get_card_from_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// read cardinality from status
DBT val = {}; val.flags = DB_DBT_REALLOC;
int error = get_status(txn, hatoku_cardinality, &val);
if (error == 0) {
// decode cardinality from the buffer
tokudb::buffer b(val.data, 0, val.size);
size_t s;
uint32_t num_parts;
s = b.consume_ui<uint32_t>(&num_parts);
if (s == 0 || num_parts != rec_per_keys)
error = EINVAL;
if (error == 0) {
for (uint i = 0; i < rec_per_keys; i++) {
s = b.consume_ui<uint64_t>(&rec_per_key[i]);
if (s == 0) {
error = EINVAL;
break;
}
}
}
}
// cleanup
free(val.data);
return error;
}
void TOKUDB_SHARE::delete_card_from_status(DB_TXN *txn) {
int error = delete_status(txn, hatoku_cardinality);
assert(error == 0);
}
...@@ -1653,8 +1653,3 @@ int tokudb_update_fun( ...@@ -1653,8 +1653,3 @@ int tokudb_update_fun(
} }
return error; return error;
} }
namespace tokudb {
template size_t vlq_encode_ui(uint32_t n, void *p, size_t s);
template size_t vlq_decode_ui(uint32_t *np, void *p, size_t s);
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment