Commit bd026f29 authored by Rik Prohaska's avatar Rik Prohaska

DB-813 add an interrupt callback to the analyze cursor to allow early...

DB-813 add an interrupt callback to the analyze cursor to allow early termination, plus detect garbage rows during analyze scan
parent e28727b5
......@@ -156,18 +156,47 @@ int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
bool is_unique = false;
if (i == primary_key || (key_info->flags & HA_NOSAME))
is_unique = true;
uint64_t rows = 0;
uint64_t deleted_rows = 0;
int error = tokudb::analyze_card(share->key_file[i], txn, is_unique, num_key_parts, &rec_per_key[total_key_parts],
tokudb_cmp_dbt_key_parts, analyze_progress, &analyze_progress_extra);
tokudb_cmp_dbt_key_parts, analyze_progress, &analyze_progress_extra,
&rows, &deleted_rows);
sql_print_information("tokudb analyze %d %" PRIu64 " %" PRIu64, error, rows, deleted_rows);
if (error != 0 && error != ETIME) {
result = HA_ADMIN_FAILED;
} else {
// debug
if (tokudb_debug & TOKUDB_DEBUG_ANALYZE) {
TOKUDB_HANDLER_TRACE("%s.%s.%s",
table_share->db.str, table_share->table_name.str, i == primary_key ? "primary" : table_share->key_info[i].name);
for (uint j = 0; j < num_key_parts; j++)
TOKUDB_HANDLER_TRACE("%lu", rec_per_key[total_key_parts+j]);
}
}
if (error != 0 && rows == 0 && deleted_rows > 0) {
result = HA_ADMIN_FAILED;
}
double f = THDVAR(thd, analyze_delete_fraction);
if (result == HA_ADMIN_FAILED || (double) deleted_rows > f * (rows + deleted_rows)) {
char name[256]; int namelen;
namelen = snprintf(name, sizeof name, "%.*s.%.*s.%s",
(int) table_share->db.length, table_share->db.str,
(int) table_share->table_name.length, table_share->table_name.str,
key_name);
thd->protocol->prepare_for_resend();
thd->protocol->store(name, namelen, system_charset_info);
thd->protocol->store("analyze", 7, system_charset_info);
thd->protocol->store("info", 4, system_charset_info);
char rowmsg[256]; int rowmsglen;
rowmsglen = snprintf(rowmsg, sizeof rowmsg, "rows processed %" PRIu64 " rows deleted %" PRIu64, rows, deleted_rows);
thd->protocol->store(rowmsg, rowmsglen, system_charset_info);
thd->protocol->write();
sql_print_information("tokudb analyze on %.*s %.*s",
namelen, name, rowmsglen, rowmsg);
}
if (tokudb_debug & TOKUDB_DEBUG_ANALYZE) {
char name[256]; int namelen;
namelen = snprintf(name, sizeof name, "%.*s.%.*s.%s",
(int) table_share->db.length, table_share->db.str,
(int) table_share->table_name.length, table_share->table_name.str,
key_name);
TOKUDB_HANDLER_TRACE("%.*s rows %" PRIu64 " deleted %" PRIu64,
namelen, name, rows, deleted_rows);
for (uint j = 0; j < num_key_parts; j++)
TOKUDB_HANDLER_TRACE("%lu", rec_per_key[total_key_parts+j]);
}
total_key_parts += num_key_parts;
}
......
......@@ -1454,6 +1454,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(disable_slow_upsert),
#endif
MYSQL_SYSVAR(analyze_time),
MYSQL_SYSVAR(analyze_delete_fraction),
MYSQL_SYSVAR(fsync_log_period),
#if TOKU_INCLUDE_HANDLERTON_HANDLE_FATAL_SIGNAL
MYSQL_SYSVAR(gdb_path),
......
......@@ -370,16 +370,9 @@ static MYSQL_THDVAR_BOOL(disable_slow_upsert,
);
#endif
static MYSQL_THDVAR_UINT(analyze_time,
0,
"analyze time",
NULL,
NULL,
5, // default
0, // min
~0U, // max
1 // blocksize
);
static MYSQL_THDVAR_UINT(analyze_time, 0, "analyze time (seconds)", NULL /*check*/, NULL /*update*/, 5 /*default*/, 0 /*min*/, ~0U /*max*/, 1 /*blocksize*/);
static MYSQL_THDVAR_DOUBLE(analyze_delete_fraction, 0, "fraction of rows allowed to be deleted", NULL /*check*/, NULL /*update*/, 1.0 /*def*/, 0 /*min*/, 1.0 /*max*/, 1);
static void tokudb_checkpoint_lock(THD * thd);
static void tokudb_checkpoint_unlock(THD * thd);
......@@ -484,7 +477,7 @@ static int tokudb_killed_callback(void) {
return thd_killed(thd);
}
static bool tokudb_killed_thd_callback(void *extra) {
static bool tokudb_killed_thd_callback(void *extra, uint64_t deleted_rows) {
THD *thd = static_cast<THD *>(extra);
return thd_killed(thd) != 0;
}
......
......@@ -218,15 +218,32 @@ namespace tokudb {
return error;
}
struct analyze_card_cursor_callback_extra {
int (*analyze_progress)(void *extra, uint64_t rows);
void *analyze_extra;
uint64_t *rows;
uint64_t *deleted_rows;
};
bool analyze_card_cursor_callback(void *extra, uint64_t deleted_rows) {
analyze_card_cursor_callback_extra *a_extra = static_cast<analyze_card_cursor_callback_extra *>(extra);
*a_extra->deleted_rows += deleted_rows;
int r = a_extra->analyze_progress(a_extra->analyze_extra, *a_extra->rows);
sql_print_information("tokudb analyze_card_cursor_callback %u %" PRIu64 " %" PRIu64, r, *a_extra->deleted_rows, deleted_rows);
return r != 0;
}
// Compute records per key for all key parts of the ith key of the table.
// For each key part, put records per key part in *rec_per_key_part[key_part_index].
// Returns 0 if success, otherwise an error number.
// TODO statistical dives into the FT
int analyze_card(DB *db, DB_TXN *txn, bool is_unique, uint64_t num_key_parts, uint64_t *rec_per_key_part,
int (*key_compare)(DB *, const DBT *, const DBT *, uint),
int (*analyze_progress)(void *extra, uint64_t rows), void *progress_extra) {
int (*analyze_progress)(void *extra, uint64_t rows), void *progress_extra,
uint64_t *return_rows, uint64_t *return_deleted_rows) {
int error = 0;
uint64_t rows = 0;
uint64_t deleted_rows = 0;
uint64_t unique_rows[num_key_parts];
if (is_unique && num_key_parts == 1) {
// dont compute for unique keys with a single part. we already know the answer.
......@@ -235,6 +252,8 @@ namespace tokudb {
DBC *cursor = NULL;
error = db->cursor(db, txn, &cursor, 0);
if (error == 0) {
analyze_card_cursor_callback_extra e = { analyze_progress, progress_extra, &rows, &deleted_rows };
cursor->c_set_check_interrupt_callback(cursor, analyze_card_cursor_callback, &e);
for (uint64_t i = 0; i < num_key_parts; i++)
unique_rows[i] = 1;
// stop looking when the entire dictionary was analyzed, or a cap on execution time was reached, or the analyze was killed.
......@@ -243,8 +262,8 @@ namespace tokudb {
while (1) {
error = cursor->c_get(cursor, &key, 0, DB_NEXT);
if (error != 0) {
if (error == DB_NOTFOUND)
error = 0; // eof is not an error
if (error == DB_NOTFOUND || error == TOKUDB_INTERRUPTED)
error = 0; // not an error
break;
}
rows++;
......@@ -287,10 +306,12 @@ namespace tokudb {
}
}
// return cardinality
if (error == 0 || error == ETIME) {
for (uint64_t i = 0; i < num_key_parts; i++)
rec_per_key_part[i] = rows / unique_rows[i];
}
if (return_rows)
*return_rows = rows;
if (return_deleted_rows)
*return_deleted_rows = deleted_rows;
for (uint64_t i = 0; i < num_key_parts; i++)
rec_per_key_part[i] = rows / unique_rows[i];
return error;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment