Commit 23480a9a authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1149

move partial refactoring to main line

git-svn-id: file:///svn/mysql/tokudb-engine/src@7989 c7de825b-a66e-492c-adef-691d508d4ae1
parent 56b5fda5
......@@ -9,16 +9,6 @@
#error
#endif
unsigned long my_getphyspages() {
return sysconf(_SC_PHYS_PAGES);
}
#include <syscall.h>
unsigned int my_tid() {
return syscall(__NR_gettid);
}
static inline void *thd_data_get(THD *thd, int slot) {
#if MYSQL_VERSION_ID <= 50123
return thd->ha_data[slot];
......@@ -52,250 +42,10 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
#include "hatoku_defines.h"
#include "ha_tokudb.h"
#include "hatoku_hton.h"
#include "hatoku_cmptrace.h"
#include <mysql/plugin.h>
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root);
handlerton *tokudb_hton;
typedef struct st_tokudb_trx_data {
DB_TXN *all;
DB_TXN *stmt;
DB_TXN *sp_level;
uint tokudb_lock_count;
HA_TOKU_ISO_LEVEL iso_level;
} tokudb_trx_data;
const char *ha_tokudb_ext = ".tokudb";
//static my_bool tokudb_shared_data = FALSE;
static u_int32_t tokudb_init_flags =
DB_CREATE | DB_THREAD | DB_PRIVATE |
DB_INIT_LOCK |
DB_INIT_MPOOL |
DB_INIT_TXN |
0 | // disabled for 1.0.2 DB_INIT_LOG |
0; // disabled for 1.0.1 DB_RECOVER;
static u_int32_t tokudb_env_flags = DB_LOG_AUTOREMOVE;
//static u_int32_t tokudb_lock_type = DB_LOCK_DEFAULT;
//static ulong tokudb_log_buffer_size = 0;
//static ulong tokudb_log_file_size = 0;
static ulonglong tokudb_cache_size = 0;
static uint tokudb_cache_memory_percent = 50;
static char *tokudb_home;
//static char *tokudb_tmpdir;
static char *tokudb_data_dir;
static char *tokudb_log_dir;
//static long tokudb_lock_scan_time = 0;
//static ulong tokudb_region_size = 0;
//static ulong tokudb_cache_parts = 1;
static ulong tokudb_max_lock;
static ulong tokudb_debug;
#ifdef TOKUDB_VERSION
static char *tokudb_version = TOKUDB_VERSION;
#else
static char *tokudb_version;
#endif
static DB_ENV *db_env;
static const char tokudb_hton_name[] = "TokuDB";
static const int tokudb_hton_name_length = sizeof(tokudb_hton_name) - 1;
// thread variables
static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit",
/* check */ NULL, /* update */ NULL, /* default*/ TRUE);
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void);
static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table);
static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_key, bool mutex_is_locked);
static int tokudb_end(handlerton * hton, ha_panic_function type);
static bool tokudb_flush_logs(handlerton * hton);
static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * print, enum ha_stat_type);
static int tokudb_close_connection(handlerton * hton, THD * thd);
static int tokudb_commit(handlerton * hton, THD * thd, bool all);
static int tokudb_rollback(handlerton * hton, THD * thd, bool all);
static uint tokudb_alter_table_flags(uint flags);
#if 0
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint);
#endif
static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print);
static HASH tokudb_open_tables;
pthread_mutex_t tokudb_mutex;
static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_used __attribute__ ((unused))) {
*length = share->table_name_length;
return (uchar *) share->table_name;
}
static int tokudb_init_func(void *p) {
TOKUDB_DBUG_ENTER("tokudb_init_func");
tokudb_hton = (handlerton *) p;
VOID(pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST));
(void) hash_init(&tokudb_open_tables, system_charset_info, 32, 0, 0, (hash_get_key) tokudb_get_key, 0, 0);
tokudb_hton->state = SHOW_OPTION_YES;
// tokudb_hton->flags= HTON_CAN_RECREATE; // QQQ this came from skeleton
tokudb_hton->flags = HTON_CLOSE_CURSORS_AT_COMMIT | HTON_FLUSH_AFTER_RENAME;
#ifdef DB_TYPE_TOKUDB
tokudb_hton->db_type = DB_TYPE_TOKUDB;
#else
tokudb_hton->db_type = DB_TYPE_UNKNOWN;
#endif
tokudb_hton->create = tokudb_create_handler;
tokudb_hton->close_connection = tokudb_close_connection;
#if 0
tokudb_hton->savepoint_offset = sizeof(DB_TXN *);
tokudb_hton->savepoint_set = tokudb_savepoint;
tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint;
tokudb_hton->savepoint_release = tokudb_release_savepoint;
#endif
tokudb_hton->commit = tokudb_commit;
tokudb_hton->rollback = tokudb_rollback;
tokudb_hton->panic = tokudb_end;
tokudb_hton->flush_logs = tokudb_flush_logs;
tokudb_hton->show_status = tokudb_show_status;
tokudb_hton->alter_table_flags = tokudb_alter_table_flags;
#if 0
if (!tokudb_tmpdir)
tokudb_tmpdir = mysql_tmpdir;
DBUG_PRINT("info", ("tokudb_tmpdir: %s", tokudb_tmpdir));
#endif
if (!tokudb_home)
tokudb_home = mysql_real_data_home;
DBUG_PRINT("info", ("tokudb_home: %s", tokudb_home));
#if 0
if (!tokudb_log_buffer_size) { // QQQ
tokudb_log_buffer_size = max(table_cache_size * 512, 32 * 1024);
DBUG_PRINT("info", ("computing tokudb_log_buffer_size %ld\n", tokudb_log_buffer_size));
}
tokudb_log_file_size = tokudb_log_buffer_size * 4;
tokudb_log_file_size = MY_ALIGN(tokudb_log_file_size, 1024 * 1024L);
tokudb_log_file_size = max(tokudb_log_file_size, 10 * 1024 * 1024L);
DBUG_PRINT("info", ("computing tokudb_log_file_size: %ld\n", tokudb_log_file_size));
#endif
int r;
if ((r = db_env_create(&db_env, 0))) {
DBUG_PRINT("info", ("db_env_create %d\n", r));
goto error;
}
DBUG_PRINT("info", ("tokudb_env_flags: 0x%x\n", tokudb_env_flags));
r = db_env->set_flags(db_env, tokudb_env_flags, 1);
if (r) { // QQQ
if (tokudb_debug & TOKUDB_DEBUG_INIT)
TOKUDB_TRACE("%s:WARNING: flags=%x r=%d\n", __FUNCTION__, tokudb_env_flags, r);
// goto error;
}
// config error handling
db_env->set_errcall(db_env, tokudb_print_error);
db_env->set_errpfx(db_env, "TokuDB");
// config directories
#if 0
DBUG_PRINT("info", ("tokudb_tmpdir: %s\n", tokudb_tmpdir));
db_env->set_tmp_dir(db_env, tokudb_tmpdir);
#endif
{
char *data_dir = tokudb_data_dir;
if (data_dir == 0)
data_dir = mysql_data_home;
DBUG_PRINT("info", ("tokudb_data_dir: %s\n", data_dir));
db_env->set_data_dir(db_env, data_dir);
}
if (tokudb_log_dir) {
DBUG_PRINT("info", ("tokudb_log_dir: %s\n", tokudb_log_dir));
db_env->set_lg_dir(db_env, tokudb_log_dir);
}
// config the cache table
if (tokudb_cache_size == 0) {
unsigned long pagesize = my_getpagesize();
unsigned long long npages = my_getphyspages();
unsigned long long physmem = npages * pagesize;
tokudb_cache_size = (ulonglong) (physmem * (tokudb_cache_memory_percent / 100.0));
}
if (tokudb_cache_size) {
DBUG_PRINT("info", ("tokudb_cache_size: %lld\n", tokudb_cache_size));
r = db_env->set_cachesize(db_env, (u_int32_t)(tokudb_cache_size >> 30), (u_int32_t)(tokudb_cache_size % (1024L * 1024L * 1024L)), 1);
if (r) {
DBUG_PRINT("info", ("set_cachesize %d\n", r));
goto error;
}
}
u_int32_t gbytes, bytes; int parts;
r = db_env->get_cachesize(db_env, &gbytes, &bytes, &parts);
if (r == 0)
if (tokudb_debug & TOKUDB_DEBUG_INIT)
TOKUDB_TRACE("%s:tokudb_cache_size=%lld\n", __FUNCTION__, ((unsigned long long) gbytes << 30) + bytes);
#if 0
// QQQ config the logs
DBUG_PRINT("info", ("tokudb_log_file_size: %ld\n", tokudb_log_file_size));
db_env->set_lg_max(db_env, tokudb_log_file_size);
DBUG_PRINT("info", ("tokudb_log_buffer_size: %ld\n", tokudb_log_buffer_size));
db_env->set_lg_bsize(db_env, tokudb_log_buffer_size);
// DBUG_PRINT("info",("tokudb_region_size: %ld\n", tokudb_region_size));
// db_env->set_lg_regionmax(db_env, tokudb_region_size);
#endif
// config the locks
#if 0 // QQQ no lock types yet
DBUG_PRINT("info", ("tokudb_lock_type: 0x%lx\n", tokudb_lock_type));
db_env->set_lk_detect(db_env, tokudb_lock_type);
#endif
if (tokudb_max_lock) {
DBUG_PRINT("info",("tokudb_max_lock: %ld\n", tokudb_max_lock));
r = db_env->set_lk_max_locks(db_env, tokudb_max_lock);
if (r) {
DBUG_PRINT("info", ("tokudb_set_max_locks %d\n", r));
goto error;
}
}
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags);
r = db_env->open(db_env, tokudb_home, tokudb_init_flags, 0666);
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env opened:return=%d\n", __FUNCTION__, r);
if (r) {
DBUG_PRINT("info", ("env->open %d\n", r));
goto error;
}
DBUG_RETURN(FALSE);
error:
if (db_env) {
db_env->close(db_env, 0);
db_env = 0;
}
DBUG_RETURN(TRUE);
}
static int tokudb_done_func(void *p) {
TOKUDB_DBUG_ENTER("tokudb_done_func");
int error = 0;
if (tokudb_open_tables.records)
error = 1;
hash_free(&tokudb_open_tables);
pthread_mutex_destroy(&tokudb_mutex);
TOKUDB_DBUG_RETURN(0);
}
/** @brief
Simple lock controls. The "share" it creates is a structure we will
......@@ -390,228 +140,6 @@ static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_k
return result;
}
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root) {
return new(mem_root) ha_tokudb(hton, table);
}
int tokudb_end(handlerton * hton, ha_panic_function type) {
TOKUDB_DBUG_ENTER("tokudb_end");
int error = 0;
if (db_env) {
if (tokudb_init_flags & DB_INIT_LOG)
tokudb_cleanup_log_files();
error = db_env->close(db_env, 0); // Error is logged
db_env = NULL;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_close_connection(handlerton * hton, THD * thd) {
my_free(thd_data_get(thd, hton->slot), MYF(0));
return 0;
}
bool tokudb_flush_logs(handlerton * hton) {
TOKUDB_DBUG_ENTER("tokudb_flush_logs");
int error;
bool result = 0;
if (tokudb_init_flags & DB_INIT_LOG) {
if ((error = db_env->log_flush(db_env, 0))) {
my_error(ER_ERROR_DURING_FLUSH_LOGS, MYF(0), error);
result = 1;
}
if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0))) {
my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error);
result = 1;
}
}
TOKUDB_DBUG_RETURN(result);
}
static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%d:%p\n", all, *txn);
error = (*txn)->commit(*txn, syncflag);
if (*txn == trx->sp_level)
trx->sp_level = 0;
*txn = 0;
}
else if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit0\n");
}
if (all) {
trx->iso_level = hatoku_iso_not_set;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_rollback");
DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt"));
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("rollback:%p\n", *txn);
error = (*txn)->abort(*txn);
if (*txn == trx->sp_level)
trx->sp_level = 0;
*txn = 0;
} else
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("abort0\n");
TOKUDB_DBUG_RETURN(error);
}
#if 0
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_savepoint");
int error;
DB_TXN **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
if (!(error = db_env->txn_begin(db_env, trx->sp_level, save_txn, 0))) {
trx->sp_level = *save_txn;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_rollback_to_savepoint");
int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent;
if (!(error = (*save_txn)->abort(*save_txn))) {
trx->sp_level = parent;
error = tokudb_savepoint(hton, thd, savepoint);
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_release_savepoint");
int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent;
if (!(error = (*save_txn)->commit(*save_txn, 0))) {
trx->sp_level = parent;
*save_txn = 0;
}
TOKUDB_DBUG_RETURN(error);
}
#endif
static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print) {
TOKUDB_DBUG_ENTER("tokudb_show_logs");
char **all_logs, **free_logs, **a, **f;
int error = 1;
MEM_ROOT **root_ptr = my_pthread_getspecific_ptr(MEM_ROOT **, THR_MALLOC);
MEM_ROOT show_logs_root, *old_mem_root = *root_ptr;
init_sql_alloc(&show_logs_root, BDB_LOG_ALLOC_BLOCK_SIZE, BDB_LOG_ALLOC_BLOCK_SIZE);
*root_ptr = &show_logs_root;
all_logs = free_logs = 0;
error = db_env->log_archive(db_env, &all_logs, 0);
if (error) {
DBUG_PRINT("error", ("log_archive failed (error %d)", error));
db_env->err(db_env, error, "log_archive");
if (error == DB_NOTFOUND)
error = 0; // No log files
goto err;
}
/* Error is 0 here */
if (all_logs) {
for (a = all_logs, f = free_logs; *a; ++a) {
if (f && *f && strcmp(*a, *f) == 0) {
f++;
if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_FREE))))
break;
} else {
if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_INUSE))))
break;
}
}
}
err:
if (all_logs)
free(all_logs);
if (free_logs)
free(free_logs);
free_root(&show_logs_root, MYF(0));
*root_ptr = old_mem_root;
TOKUDB_DBUG_RETURN(error);
}
bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type) {
switch (stat_type) {
case HA_ENGINE_LOGS:
return tokudb_show_logs(thd, stat_print);
default:
return FALSE;
}
}
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer) {
sql_print_error("%s: %s", db_errpfx, buffer);
}
void tokudb_cleanup_log_files(void) {
TOKUDB_DBUG_ENTER("tokudb_cleanup_log_files");
char **names;
int error;
if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0)))
my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error);
if ((error = db_env->log_archive(db_env, &names, 0)) != 0) {
DBUG_PRINT("error", ("log_archive failed (error %d)", error));
db_env->err(db_env, error, "log_archive");
DBUG_VOID_RETURN;
}
if (names) {
char **np;
for (np = names; *np; ++np) {
#if 1
if (tokudb_debug)
TOKUDB_TRACE("%s:cleanup:%s\n", __FUNCTION__, *np);
#else
my_delete(*np, MYF(MY_WME));
#endif
}
free(names);
}
DBUG_VOID_RETURN;
}
//
// *******NOTE*****
// If the flags HA_ONLINE_DROP_INDEX and HA_ONLINE_DROP_UNIQUE_INDEX
// are ever added, prepare_drop_index and final_drop_index will need to be modified
// so that the actual deletion of DB's is done in final_drop_index and not prepare_drop_index
//
static uint tokudb_alter_table_flags(uint flags)
{
return (HA_ONLINE_ADD_INDEX_NO_WRITES| HA_ONLINE_DROP_INDEX_NO_WRITES |
HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES| HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES);
}
static int get_name_length(const char *name) {
int n = 0;
const char *newname = name;
......@@ -678,125 +206,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
DBUG_RETURN(flags);
}
/*
Things that are required for ALL data types:
key_part->field->null_bit
key_part->length
key_part->field->packed_col_length(...)
DEFAULT: virtual uint packed_col_length(const uchar *to, uint length)
{ return length;}
All integer types use this.
String types MIGHT use different one, espescially the varchars
key_part->field->pack_cmp(...)
DEFAULT: virtual int pack_cmp(...)
{ return cmp(a,b); }
All integer types use the obvious one.
Assume X byte bytestream, int =:
((u_int64_t)((u_int8_t)bytes[0])) << 0 |
((u_int64_t)((u_int8_t)bytes[1])) << 8 |
((u_int64_t)((u_int8_t)bytes[2])) << 16 |
((u_int64_t)((u_int8_t)bytes[3])) << 24 |
((u_int64_t)((u_int8_t)bytes[4])) << 32 |
((u_int64_t)((u_int8_t)bytes[5])) << 40 |
((u_int64_t)((u_int8_t)bytes[6])) << 48 |
((u_int64_t)((u_int8_t)bytes[7])) << 56
If the integer type is < 8 bytes, just skip the unneeded ones.
Then compare the integers in the obvious way.
Strings:
Empty space differences at end are ignored.
i.e. delete all empty space at end first, and then compare.
Possible prerequisites:
key_part->field->cmp
NO DEFAULT
*/
typedef enum {
TOKUTRACE_SIGNED_INTEGER = 0,
TOKUTRACE_UNSIGNED_INTEGER = 1,
TOKUTRACE_CHAR = 2
} tokutrace_field_type;
typedef struct {
tokutrace_field_type type;
bool null_bit;
u_int32_t length;
} tokutrace_field;
typedef struct {
u_int16_t version;
u_int32_t num_fields;
tokutrace_field fields[1];
} tokutrace_cmp_fun;
static int tokutrace_db_get_cmp_byte_stream(DB* db, DBT* byte_stream) {
int r = ENOSYS;
void* data = NULL;
KEY* key = NULL;
if (byte_stream->flags != DB_DBT_MALLOC) { return EINVAL; }
bzero((void *) byte_stream, sizeof(*byte_stream));
u_int32_t num_fields = 0;
if (!db->app_private) { num_fields = 1; }
else {
key = (KEY*)db->app_private;
num_fields = key->key_parts;
}
size_t need_size = sizeof(tokutrace_cmp_fun) +
num_fields * sizeof(tokutrace_field);
data = my_malloc(need_size, MYF(MY_FAE | MY_ZEROFILL | MY_WME));
if (!data) { return ENOMEM; }
tokutrace_cmp_fun* info = (tokutrace_cmp_fun*)data;
info->version = 1;
info->num_fields = num_fields;
if (!db->app_private) {
info->fields[0].type = TOKUTRACE_UNSIGNED_INTEGER;
info->fields[0].null_bit = false;
info->fields[0].length = 40 / 8;
goto finish;
}
assert(db->app_private);
assert(key);
u_int32_t i;
for (i = 0; i < num_fields; i++) {
info->fields[i].null_bit = key->key_part[i].null_bit;
info->fields[i].length = key->key_part[i].length;
enum_field_types type = key->key_part[i].field->type();
switch (type) {
#ifdef HAVE_LONG_LONG
case (MYSQL_TYPE_LONGLONG):
#endif
case (MYSQL_TYPE_LONG):
case (MYSQL_TYPE_INT24):
case (MYSQL_TYPE_SHORT):
case (MYSQL_TYPE_TINY): {
/* Integer */
Field_num* field = static_cast<Field_num*>(key->key_part[i].field);
if (field->unsigned_flag) {
info->fields[i].type = TOKUTRACE_UNSIGNED_INTEGER; }
else {
info->fields[i].type = TOKUTRACE_SIGNED_INTEGER; }
break;
}
default: {
fprintf(stderr, "Cannot save cmp function for type %d.\n", type);
r = ENOSYS;
goto cleanup;
}
}
}
finish:
byte_stream->data = data;
byte_stream->size = need_size;
r = 0;
cleanup:
if (r!=0) {
if (data) { my_free(data, MYF(0)); }
}
return r;
}
static int tokudb_cmp_hidden_key(DB * file, const DBT * new_key, const DBT * saved_key) {
ulonglong a = uint5korr((char *) new_key->data);
......@@ -1000,39 +409,6 @@ static int tokudb_prefix_cmp_packed_key(DB *file, const DBT *keya, const DBT *ke
return tokudb_compare_two_keys(key, keya, keyb, true);
}
#if 0
/* Compare key against row */
static bool tokudb_key_cmp(TABLE * table, KEY * key_info, const uchar * key, uint key_length) {
KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts;
for (; key_part != end && (int) key_length > 0; key_part++) {
int cmp;
uint length;
if (key_part->null_bit) {
key_length--;
/*
With the current usage, the following case will always be FALSE,
because NULL keys are sorted before any other key
*/
if (*key != (table->record[0][key_part->null_offset] & key_part->null_bit) ? 0 : 1)
return 1;
if (!*key++) // Null value
continue;
}
/*
Last argument has to be 0 as we are also using this to function to see
if a key like 'a ' matched a row with 'a'
*/
if ((cmp = key_part->field->pack_cmp(key, key_part->length, 0)))
return cmp;
length = key_part->field->packed_col_length(key, key_part->length);
key += length;
key_length -= length;
}
return 0; // Identical keys
}
#endif
int primary_key_part_compare (const void* left, const void* right) {
PRIM_KEY_PART_INFO* left_part= (PRIM_KEY_PART_INFO *)left;
PRIM_KEY_PART_INFO* right_part = (PRIM_KEY_PART_INFO *)right;
......@@ -1258,6 +634,10 @@ is_null_field( TABLE* table, Field* field, const uchar* record) {
return ret_val;
}
inline ulong field_offset(Field* field, TABLE* table) {
return((ulong) (field->ptr - table->record[0]));
}
inline HA_TOKU_ISO_LEVEL tx_to_toku_iso(ulong tx_isolation) {
if (tx_isolation == ISO_READ_UNCOMMITTED) {
return hatoku_iso_read_uncommitted;
......@@ -1782,7 +1162,7 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
uint *ptr, *end;
for (ptr = table_share->blob_field, end = ptr + table_share->blob_fields; ptr != end; ptr++) {
Field_blob *blob = ((Field_blob *) table->field[*ptr]);
length += blob->get_length((uchar *) (buf + field_offset(blob))) + 2;
length += blob->get_length((uchar *) (buf + field_offset(blob, table))) + 2;
}
return length;
}
......@@ -1876,7 +1256,7 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record, bool strip_pk) {
assert( (hidden_primary_key != 0) == (primary_key_offsets == NULL));
curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field);
uint curr_field_offset = field_offset(*field, table);
//
// if the primary key is hidden, primary_key_offsets will be NULL and
// this clause will not execute
......@@ -1978,7 +1358,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key, bool
//
uint curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field);
uint curr_field_offset = field_offset(*field, table);
if (primary_key_offsets && pk_stripped) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
......@@ -2004,7 +1384,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key, bool
if (is_null_field(table, *field, record)) {
continue;
}
ptr = (*field)->unpack(record + field_offset(*field), ptr);
ptr = (*field)->unpack(record + field_offset(*field, table), ptr);
}
dbug_tmp_restore_column_map(table->write_set, old_map);
}
......@@ -2032,7 +1412,7 @@ u_int32_t ha_tokudb::place_key_into_mysql_buff(uchar * record, uchar* data, uint
/* tokutek change to make pack_key and unpack_key work for
decimals */
uint unpack_length = key_part->length;
pos = (uchar *) key_part->field->unpack_key(record + field_offset(key_part->field), pos,
pos = (uchar *) key_part->field->unpack_key(record + field_offset(key_part->field, table), pos,
#if MYSQL_VERSION_ID < 50123
unpack_length);
#else
......@@ -2097,7 +1477,7 @@ u_int32_t ha_tokudb::place_key_into_dbt_buff(KEY* key_info, uchar * buff, const
// because key_part->offset is SET INCORRECTLY in add_index
// filed ticket 862 to look into this
//
curr_buff = key_part->field->pack_key(curr_buff, (uchar *) (record + field_offset(key_part->field)),
curr_buff = key_part->field->pack_key(curr_buff, (uchar *) (record + field_offset(key_part->field, table)),
#if MYSQL_VERSION_ID < 50123
key_part->length);
#else
......@@ -2490,7 +1870,7 @@ int ha_tokudb::write_row(uchar * record) {
pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index]),
field_offset(table->field[share->ai_field_index], table),
record
);
if (curr_auto_inc > share->last_auto_increment) {
......@@ -2695,7 +2075,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index]),
field_offset(table->field[share->ai_field_index], table),
new_row
);
if (curr_auto_inc > share->last_auto_increment) {
......@@ -4244,15 +3624,15 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l
}
static int create_sub_table(const char *table_name, const char *sub_name, DBTYPE type, int flags) {
static int create_sub_table(const char *table_name, int flags) {
TOKUDB_DBUG_ENTER("create_sub_table");
int error;
DB *file;
DBUG_PRINT("enter", ("sub_name: %s flags: %d", sub_name, flags));
DBUG_PRINT("enter", ("flags: %d", flags));
if (!(error = db_create(&file, db_env, 0))) {
file->set_flags(file, flags);
error = (file->open(file, NULL, table_name, sub_name, type, DB_THREAD | DB_CREATE, my_umask));
error = (file->open(file, NULL, table_name, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask));
if (error) {
DBUG_PRINT("error", ("Got error: %d when opening table '%s'", error, table_name));
(void) file->remove(file, table_name, NULL, 0);
......@@ -4408,7 +3788,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
/* Create the main table that will hold the real rows */
error = create_sub_table(name_buff, NULL, DB_BTREE, 0);
error = create_sub_table(name_buff, 0);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:error=%d\n", newname, error);
}
......@@ -4426,7 +3806,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, flags);
error = create_sub_table(name_buff, flags);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
}
......@@ -4956,7 +4336,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, flags);
error = create_sub_table(name_buff, flags);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
......@@ -5340,13 +4720,6 @@ int ha_tokudb::optimize(THD * thd, HA_CHECK_OPT * check_opt) {
TOKUDB_DBUG_RETURN(error);
}
inline ulong ha_tokudb::field_offset(Field *field) {
if (table->record[0] <= field->ptr && field->ptr < table->record[1])
return field->offset(table->record[0]);
assert(0);
return 0;
}
// delete all rows from a table
//
// effects: delete all of the rows in the main dictionary and all of the
......@@ -5380,102 +4753,4 @@ int ha_tokudb::delete_all_rows() {
TOKUDB_DBUG_RETURN(error);
}
struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INTERFACE_VERSION };
// options flags
// PLUGIN_VAR_THDLOCAL Variable is per-connection
// PLUGIN_VAR_READONLY Server variable is read only
// PLUGIN_VAR_NOSYSVAR Not a server variable
// PLUGIN_VAR_NOCMDOPT Not a command line option
// PLUGIN_VAR_NOCMDARG No argument for cmd line
// PLUGIN_VAR_RQCMDARG Argument required for cmd line
// PLUGIN_VAR_OPCMDARG Argument optional for cmd line
// PLUGIN_VAR_MEMALLOC String needs memory allocated
// system variables
static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0);
static MYSQL_SYSVAR_UINT(cache_memory_percent, tokudb_cache_memory_percent, PLUGIN_VAR_READONLY, "Default percent of physical memory in the TokuDB cache table", NULL, NULL, tokudb_cache_memory_percent, 0, 100, 0);
static MYSQL_SYSVAR_ULONG(max_lock, tokudb_max_lock, PLUGIN_VAR_READONLY, "TokuDB Max Locks", NULL, NULL, 8 * 1024, 0, ~0L, 0);
static MYSQL_SYSVAR_ULONG(debug, tokudb_debug, PLUGIN_VAR_READONLY, "TokuDB Debug", NULL, NULL, 0, 0, ~0L, 0);
static MYSQL_SYSVAR_STR(log_dir, tokudb_log_dir, PLUGIN_VAR_READONLY, "TokuDB Log Directory", NULL, NULL, NULL);
static MYSQL_SYSVAR_STR(data_dir, tokudb_data_dir, PLUGIN_VAR_READONLY, "TokuDB Data Directory", NULL, NULL, NULL);
static MYSQL_SYSVAR_STR(version, tokudb_version, PLUGIN_VAR_READONLY, "TokuDB Version", NULL, NULL, NULL);
static MYSQL_SYSVAR_UINT(init_flags, tokudb_init_flags, PLUGIN_VAR_READONLY, "Sets TokuDB DB_ENV->open flags", NULL, NULL, tokudb_init_flags, 0, ~0, 0);
#if 0
static MYSQL_SYSVAR_ULONG(cache_parts, tokudb_cache_parts, PLUGIN_VAR_READONLY, "Sets TokuDB set_cache_parts", NULL, NULL, 0, 0, ~0L, 0);
// this is really a u_int32_t
// ? use MYSQL_SYSVAR_SET
static MYSQL_SYSVAR_UINT(env_flags, tokudb_env_flags, PLUGIN_VAR_READONLY, "Sets TokuDB env_flags", NULL, NULL, DB_LOG_AUTOREMOVE, 0, ~0, 0);
static MYSQL_SYSVAR_STR(home, tokudb_home, PLUGIN_VAR_READONLY, "Sets TokuDB env->open home", NULL, NULL, NULL);
// this is really a u_int32_t
//? use MYSQL_SYSVAR_SET
// this looks to be unused
static MYSQL_SYSVAR_LONG(lock_scan_time, tokudb_lock_scan_time, PLUGIN_VAR_READONLY, "Tokudb Lock Scan Time (UNUSED)", NULL, NULL, 0, 0, ~0L, 0);
// this is really a u_int32_t
//? use MYSQL_SYSVAR_ENUM
static MYSQL_SYSVAR_UINT(lock_type, tokudb_lock_type, PLUGIN_VAR_READONLY, "Sets set_lk_detect", NULL, NULL, DB_LOCK_DEFAULT, 0, ~0, 0);
static MYSQL_SYSVAR_ULONG(log_buffer_size, tokudb_log_buffer_size, PLUGIN_VAR_READONLY, "Tokudb Log Buffer Size", NULL, NULL, 0, 0, ~0L, 0);
static MYSQL_SYSVAR_ULONG(region_size, tokudb_region_size, PLUGIN_VAR_READONLY, "Tokudb Region Size", NULL, NULL, 128 * 1024, 0, ~0L, 0);
static MYSQL_SYSVAR_BOOL(shared_data, tokudb_shared_data, PLUGIN_VAR_READONLY, "Tokudb Shared Data", NULL, NULL, FALSE);
static MYSQL_SYSVAR_STR(tmpdir, tokudb_tmpdir, PLUGIN_VAR_READONLY, "Tokudb Tmp Dir", NULL, NULL, NULL);
#endif
static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(cache_size),
MYSQL_SYSVAR(cache_memory_percent),
MYSQL_SYSVAR(max_lock),
MYSQL_SYSVAR(data_dir),
MYSQL_SYSVAR(log_dir),
MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync),
MYSQL_SYSVAR(version),
MYSQL_SYSVAR(init_flags),
#if 0
MYSQL_SYSVAR(cache_parts),
MYSQL_SYSVAR(env_flags),
MYSQL_SYSVAR(home),
MYSQL_SYSVAR(lock_scan_time),
MYSQL_SYSVAR(lock_type),
MYSQL_SYSVAR(log_buffer_size),
MYSQL_SYSVAR(region_size),
MYSQL_SYSVAR(shared_data),
MYSQL_SYSVAR(tmpdir),
#endif
NULL
};
mysql_declare_plugin(tokudb) {
MYSQL_STORAGE_ENGINE_PLUGIN,
&storage_engine_structure,
"TokuDB",
"Tokutek Inc",
"Fractal trees, transactions, row level locks",
PLUGIN_LICENSE_PROPRIETARY, /* QQQ license? */
tokudb_init_func, /* plugin init */
tokudb_done_func, /* plugin deinit */
0x0200, /* QQQ 2.0 */
NULL, /* status variables */
tokudb_system_variables, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
......@@ -57,12 +57,6 @@ typedef enum {
hatoku_ai_create_value
} HA_METADATA_KEY ;
typedef enum {
hatoku_iso_not_set = 0,
hatoku_iso_read_uncommitted,
hatoku_iso_serializable
} HA_TOKU_ISO_LEVEL;
//
// for storing NULL byte in keys
//
......@@ -354,5 +348,4 @@ class ha_tokudb : public handler {
int read_full_row(uchar * buf);
int __close(int mutex_is_locked);
int read_last();
ulong field_offset(Field *);
};
#include "mysql_priv.h"
#include "hatoku_cmptrace.h"
/*
Things that are required for ALL data types:
key_part->field->null_bit
key_part->length
key_part->field->packed_col_length(...)
DEFAULT: virtual uint packed_col_length(const uchar *to, uint length)
{ return length;}
All integer types use this.
String types MIGHT use different one, espescially the varchars
key_part->field->pack_cmp(...)
DEFAULT: virtual int pack_cmp(...)
{ return cmp(a,b); }
All integer types use the obvious one.
Assume X byte bytestream, int =:
((u_int64_t)((u_int8_t)bytes[0])) << 0 |
((u_int64_t)((u_int8_t)bytes[1])) << 8 |
((u_int64_t)((u_int8_t)bytes[2])) << 16 |
((u_int64_t)((u_int8_t)bytes[3])) << 24 |
((u_int64_t)((u_int8_t)bytes[4])) << 32 |
((u_int64_t)((u_int8_t)bytes[5])) << 40 |
((u_int64_t)((u_int8_t)bytes[6])) << 48 |
((u_int64_t)((u_int8_t)bytes[7])) << 56
If the integer type is < 8 bytes, just skip the unneeded ones.
Then compare the integers in the obvious way.
Strings:
Empty space differences at end are ignored.
i.e. delete all empty space at end first, and then compare.
Possible prerequisites:
key_part->field->cmp
NO DEFAULT
*/
typedef enum {
TOKUTRACE_SIGNED_INTEGER = 0,
TOKUTRACE_UNSIGNED_INTEGER = 1,
TOKUTRACE_CHAR = 2
} tokutrace_field_type;
typedef struct {
tokutrace_field_type type;
bool null_bit;
u_int32_t length;
} tokutrace_field;
typedef struct {
u_int16_t version;
u_int32_t num_fields;
tokutrace_field fields[1];
} tokutrace_cmp_fun;
int tokutrace_db_get_cmp_byte_stream(DB* db, DBT* byte_stream) {
int r = ENOSYS;
void* data = NULL;
KEY* key = NULL;
if (byte_stream->flags != DB_DBT_MALLOC) { return EINVAL; }
bzero((void *) byte_stream, sizeof(*byte_stream));
u_int32_t num_fields = 0;
if (!db->app_private) { num_fields = 1; }
else {
key = (KEY*)db->app_private;
num_fields = key->key_parts;
}
size_t need_size = sizeof(tokutrace_cmp_fun) +
num_fields * sizeof(tokutrace_field);
data = my_malloc(need_size, MYF(MY_FAE | MY_ZEROFILL | MY_WME));
if (!data) { return ENOMEM; }
tokutrace_cmp_fun* info = (tokutrace_cmp_fun*)data;
info->version = 1;
info->num_fields = num_fields;
if (!db->app_private) {
info->fields[0].type = TOKUTRACE_UNSIGNED_INTEGER;
info->fields[0].null_bit = false;
info->fields[0].length = 40 / 8;
goto finish;
}
assert(db->app_private);
assert(key);
u_int32_t i;
for (i = 0; i < num_fields; i++) {
info->fields[i].null_bit = key->key_part[i].null_bit;
info->fields[i].length = key->key_part[i].length;
enum_field_types type = key->key_part[i].field->type();
switch (type) {
#ifdef HAVE_LONG_LONG
case (MYSQL_TYPE_LONGLONG):
#endif
case (MYSQL_TYPE_LONG):
case (MYSQL_TYPE_INT24):
case (MYSQL_TYPE_SHORT):
case (MYSQL_TYPE_TINY): {
/* Integer */
Field_num* field = static_cast<Field_num*>(key->key_part[i].field);
if (field->unsigned_flag) {
info->fields[i].type = TOKUTRACE_UNSIGNED_INTEGER; }
else {
info->fields[i].type = TOKUTRACE_SIGNED_INTEGER; }
break;
}
default: {
fprintf(stderr, "Cannot save cmp function for type %d.\n", type);
r = ENOSYS;
goto cleanup;
}
}
}
finish:
byte_stream->data = data;
byte_stream->size = need_size;
r = 0;
cleanup:
if (r!=0) {
if (data) { my_free(data, MYF(0)); }
}
return r;
}
#ifndef _HATOKU_CMPTRC
#define _HATOKU_CMPTRC
#include "db.h"
int tokutrace_db_get_cmp_byte_stream(DB* db, DBT* byte_stream);
#endif
#ifndef _HATOKU_DEF
#define _HATOKU_DEF
#include <syscall.h>
#include "db.h"
extern ulong tokudb_debug;
// QQQ how to tune these?
#define HA_TOKUDB_RANGE_COUNT 100
/* extra rows for estimate_rows_upper_bound() */
......@@ -22,6 +32,13 @@
#define TOKUDB_TRACE(f, ...) \
printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__);
inline unsigned int my_tid() {
return syscall(__NR_gettid);
}
#define TOKUDB_DBUG_ENTER(f, ...) \
{ \
if (tokudb_debug & TOKUDB_DEBUG_ENTER) { \
......@@ -51,4 +68,22 @@
}
typedef enum {
hatoku_iso_not_set = 0,
hatoku_iso_read_uncommitted,
hatoku_iso_serializable
} HA_TOKU_ISO_LEVEL;
typedef struct st_tokudb_trx_data {
DB_TXN *all;
DB_TXN *stmt;
DB_TXN *sp_level;
uint tokudb_lock_count;
HA_TOKU_ISO_LEVEL iso_level;
} tokudb_trx_data;
#endif
#define MYSQL_SERVER 1
#include "mysql_priv.h"
/* We define DTRACE after mysql_priv.h in case it disabled dtrace in the main server */
#ifdef HAVE_DTRACE
#define _DTRACE_VERSION 1
#else
#endif
#include <mysql/plugin.h>
#include "hatoku_hton.h"
#include "hatoku_defines.h"
#include "ha_tokudb.h"
#undef PACKAGE
#undef VERSION
#undef HAVE_DTRACE
#undef _DTRACE_VERSION
#include <syscall.h>
static inline void *thd_data_get(THD *thd, int slot) {
#if MYSQL_VERSION_ID <= 50123
return thd->ha_data[slot];
#else
return thd->ha_data[slot].ha_ptr;
#endif
}
static inline void thd_data_set(THD *thd, int slot, void *data) {
#if MYSQL_VERSION_ID <= 50123
thd->ha_data[slot] = data;
#else
thd->ha_data[slot].ha_ptr = data;
#endif
}
unsigned long my_getphyspages() {
return sysconf(_SC_PHYS_PAGES);
}
static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_used __attribute__ ((unused))) {
*length = share->table_name_length;
return (uchar *) share->table_name;
}
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root);
static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit",
/* check */ NULL, /* update */ NULL, /* default*/ TRUE);
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void);
static int tokudb_end(handlerton * hton, ha_panic_function type);
static bool tokudb_flush_logs(handlerton * hton);
static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * print, enum ha_stat_type);
static int tokudb_close_connection(handlerton * hton, THD * thd);
static int tokudb_commit(handlerton * hton, THD * thd, bool all);
static int tokudb_rollback(handlerton * hton, THD * thd, bool all);
static uint tokudb_alter_table_flags(uint flags);
#if 0
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint);
#endif
static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print);
handlerton *tokudb_hton;
const char *ha_tokudb_ext = ".tokudb";
char *tokudb_data_dir;
ulong tokudb_debug;
DB_ENV *db_env;
HASH tokudb_open_tables;
pthread_mutex_t tokudb_mutex;
//my_bool tokudb_shared_data = FALSE;
static u_int32_t tokudb_init_flags =
DB_CREATE | DB_THREAD | DB_PRIVATE |
DB_INIT_LOCK |
DB_INIT_MPOOL |
DB_INIT_TXN |
0 | // disabled for 1.0.2 DB_INIT_LOG |
0; // disabled for 1.0.1 DB_RECOVER;
static u_int32_t tokudb_env_flags = DB_LOG_AUTOREMOVE;
// static u_int32_t tokudb_lock_type = DB_LOCK_DEFAULT;
// static ulong tokudb_log_buffer_size = 0;
// static ulong tokudb_log_file_size = 0;
static ulonglong tokudb_cache_size = 0;
static uint tokudb_cache_memory_percent = 50;
static char *tokudb_home;
// static char *tokudb_tmpdir;
static char *tokudb_log_dir;
// static long tokudb_lock_scan_time = 0;
// static ulong tokudb_region_size = 0;
// static ulong tokudb_cache_parts = 1;
static ulong tokudb_max_lock;
static const char tokudb_hton_name[] = "TokuDB";
static const int tokudb_hton_name_length = sizeof(tokudb_hton_name) - 1;
#ifdef TOKUDB_VERSION
char *tokudb_version = TOKUDB_VERSION;
#else
char *tokudb_version;
#endif
struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INTERFACE_VERSION };
static int tokudb_init_func(void *p) {
TOKUDB_DBUG_ENTER("tokudb_init_func");
tokudb_hton = (handlerton *) p;
VOID(pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST));
(void) hash_init(&tokudb_open_tables, system_charset_info, 32, 0, 0, (hash_get_key) tokudb_get_key, 0, 0);
tokudb_hton->state = SHOW_OPTION_YES;
// tokudb_hton->flags= HTON_CAN_RECREATE; // QQQ this came from skeleton
tokudb_hton->flags = HTON_CLOSE_CURSORS_AT_COMMIT | HTON_FLUSH_AFTER_RENAME;
#ifdef DB_TYPE_TOKUDB
tokudb_hton->db_type = DB_TYPE_TOKUDB;
#else
tokudb_hton->db_type = DB_TYPE_UNKNOWN;
#endif
tokudb_hton->create = tokudb_create_handler;
tokudb_hton->close_connection = tokudb_close_connection;
#if 0
tokudb_hton->savepoint_offset = sizeof(DB_TXN *);
tokudb_hton->savepoint_set = tokudb_savepoint;
tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint;
tokudb_hton->savepoint_release = tokudb_release_savepoint;
#endif
tokudb_hton->commit = tokudb_commit;
tokudb_hton->rollback = tokudb_rollback;
tokudb_hton->panic = tokudb_end;
tokudb_hton->flush_logs = tokudb_flush_logs;
tokudb_hton->show_status = tokudb_show_status;
tokudb_hton->alter_table_flags = tokudb_alter_table_flags;
#if 0
if (!tokudb_tmpdir)
tokudb_tmpdir = mysql_tmpdir;
DBUG_PRINT("info", ("tokudb_tmpdir: %s", tokudb_tmpdir));
#endif
if (!tokudb_home)
tokudb_home = mysql_real_data_home;
DBUG_PRINT("info", ("tokudb_home: %s", tokudb_home));
#if 0
if (!tokudb_log_buffer_size) { // QQQ
tokudb_log_buffer_size = max(table_cache_size * 512, 32 * 1024);
DBUG_PRINT("info", ("computing tokudb_log_buffer_size %ld\n", tokudb_log_buffer_size));
}
tokudb_log_file_size = tokudb_log_buffer_size * 4;
tokudb_log_file_size = MY_ALIGN(tokudb_log_file_size, 1024 * 1024L);
tokudb_log_file_size = max(tokudb_log_file_size, 10 * 1024 * 1024L);
DBUG_PRINT("info", ("computing tokudb_log_file_size: %ld\n", tokudb_log_file_size));
#endif
int r;
if ((r = db_env_create(&db_env, 0))) {
DBUG_PRINT("info", ("db_env_create %d\n", r));
goto error;
}
DBUG_PRINT("info", ("tokudb_env_flags: 0x%x\n", tokudb_env_flags));
r = db_env->set_flags(db_env, tokudb_env_flags, 1);
if (r) { // QQQ
if (tokudb_debug & TOKUDB_DEBUG_INIT)
TOKUDB_TRACE("%s:WARNING: flags=%x r=%d\n", __FUNCTION__, tokudb_env_flags, r);
// goto error;
}
// config error handling
db_env->set_errcall(db_env, tokudb_print_error);
db_env->set_errpfx(db_env, "TokuDB");
// config directories
#if 0
DBUG_PRINT("info", ("tokudb_tmpdir: %s\n", tokudb_tmpdir));
db_env->set_tmp_dir(db_env, tokudb_tmpdir);
#endif
{
char *data_dir = tokudb_data_dir;
if (data_dir == 0)
data_dir = mysql_data_home;
DBUG_PRINT("info", ("tokudb_data_dir: %s\n", data_dir));
db_env->set_data_dir(db_env, data_dir);
}
if (tokudb_log_dir) {
DBUG_PRINT("info", ("tokudb_log_dir: %s\n", tokudb_log_dir));
db_env->set_lg_dir(db_env, tokudb_log_dir);
}
// config the cache table
if (tokudb_cache_size == 0) {
unsigned long pagesize = my_getpagesize();
unsigned long long npages = my_getphyspages();
unsigned long long physmem = npages * pagesize;
tokudb_cache_size = (ulonglong) (physmem * (tokudb_cache_memory_percent / 100.0));
}
if (tokudb_cache_size) {
DBUG_PRINT("info", ("tokudb_cache_size: %lld\n", tokudb_cache_size));
r = db_env->set_cachesize(db_env, (u_int32_t)(tokudb_cache_size >> 30), (u_int32_t)(tokudb_cache_size % (1024L * 1024L * 1024L)), 1);
if (r) {
DBUG_PRINT("info", ("set_cachesize %d\n", r));
goto error;
}
}
u_int32_t gbytes, bytes; int parts;
r = db_env->get_cachesize(db_env, &gbytes, &bytes, &parts);
if (r == 0)
if (tokudb_debug & TOKUDB_DEBUG_INIT)
TOKUDB_TRACE("%s:tokudb_cache_size=%lld\n", __FUNCTION__, ((unsigned long long) gbytes << 30) + bytes);
#if 0
// QQQ config the logs
DBUG_PRINT("info", ("tokudb_log_file_size: %ld\n", tokudb_log_file_size));
db_env->set_lg_max(db_env, tokudb_log_file_size);
DBUG_PRINT("info", ("tokudb_log_buffer_size: %ld\n", tokudb_log_buffer_size));
db_env->set_lg_bsize(db_env, tokudb_log_buffer_size);
// DBUG_PRINT("info",("tokudb_region_size: %ld\n", tokudb_region_size));
// db_env->set_lg_regionmax(db_env, tokudb_region_size);
#endif
// config the locks
#if 0 // QQQ no lock types yet
DBUG_PRINT("info", ("tokudb_lock_type: 0x%lx\n", tokudb_lock_type));
db_env->set_lk_detect(db_env, tokudb_lock_type);
#endif
if (tokudb_max_lock) {
DBUG_PRINT("info",("tokudb_max_lock: %ld\n", tokudb_max_lock));
r = db_env->set_lk_max_locks(db_env, tokudb_max_lock);
if (r) {
DBUG_PRINT("info", ("tokudb_set_max_locks %d\n", r));
goto error;
}
}
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags);
r = db_env->open(db_env, tokudb_home, tokudb_init_flags, 0666);
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env opened:return=%d\n", __FUNCTION__, r);
if (r) {
DBUG_PRINT("info", ("env->open %d\n", r));
goto error;
}
DBUG_RETURN(FALSE);
error:
if (db_env) {
db_env->close(db_env, 0);
db_env = 0;
}
DBUG_RETURN(TRUE);
}
static int tokudb_done_func(void *p) {
TOKUDB_DBUG_ENTER("tokudb_done_func");
int error = 0;
if (tokudb_open_tables.records)
error = 1;
hash_free(&tokudb_open_tables);
pthread_mutex_destroy(&tokudb_mutex);
TOKUDB_DBUG_RETURN(0);
}
static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root) {
return new(mem_root) ha_tokudb(hton, table);
}
int tokudb_end(handlerton * hton, ha_panic_function type) {
TOKUDB_DBUG_ENTER("tokudb_end");
int error = 0;
if (db_env) {
if (tokudb_init_flags & DB_INIT_LOG)
tokudb_cleanup_log_files();
error = db_env->close(db_env, 0); // Error is logged
db_env = NULL;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_close_connection(handlerton * hton, THD * thd) {
my_free(thd_data_get(thd, hton->slot), MYF(0));
return 0;
}
bool tokudb_flush_logs(handlerton * hton) {
TOKUDB_DBUG_ENTER("tokudb_flush_logs");
int error;
bool result = 0;
if (tokudb_init_flags & DB_INIT_LOG) {
if ((error = db_env->log_flush(db_env, 0))) {
my_error(ER_ERROR_DURING_FLUSH_LOGS, MYF(0), error);
result = 1;
}
if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0))) {
my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error);
result = 1;
}
}
TOKUDB_DBUG_RETURN(result);
}
static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%d:%p\n", all, *txn);
error = (*txn)->commit(*txn, syncflag);
if (*txn == trx->sp_level)
trx->sp_level = 0;
*txn = 0;
}
else if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit0\n");
}
if (all) {
trx->iso_level = hatoku_iso_not_set;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_rollback");
DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt"));
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("rollback:%p\n", *txn);
error = (*txn)->abort(*txn);
if (*txn == trx->sp_level)
trx->sp_level = 0;
*txn = 0;
} else
if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("abort0\n");
TOKUDB_DBUG_RETURN(error);
}
#if 0
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_savepoint");
int error;
DB_TXN **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
if (!(error = db_env->txn_begin(db_env, trx->sp_level, save_txn, 0))) {
trx->sp_level = *save_txn;
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_rollback_to_savepoint");
int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent;
if (!(error = (*save_txn)->abort(*save_txn))) {
trx->sp_level = parent;
error = tokudb_savepoint(hton, thd, savepoint);
}
TOKUDB_DBUG_RETURN(error);
}
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint) {
TOKUDB_DBUG_ENTER("tokudb_release_savepoint");
int error;
DB_TXN *parent, **save_txn = (DB_TXN **) savepoint;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
parent = (*save_txn)->parent;
if (!(error = (*save_txn)->commit(*save_txn, 0))) {
trx->sp_level = parent;
*save_txn = 0;
}
TOKUDB_DBUG_RETURN(error);
}
#endif
static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print) {
TOKUDB_DBUG_ENTER("tokudb_show_logs");
char **all_logs, **free_logs, **a, **f;
int error = 1;
MEM_ROOT **root_ptr = my_pthread_getspecific_ptr(MEM_ROOT **, THR_MALLOC);
MEM_ROOT show_logs_root, *old_mem_root = *root_ptr;
init_sql_alloc(&show_logs_root, BDB_LOG_ALLOC_BLOCK_SIZE, BDB_LOG_ALLOC_BLOCK_SIZE);
*root_ptr = &show_logs_root;
all_logs = free_logs = 0;
error = db_env->log_archive(db_env, &all_logs, 0);
if (error) {
DBUG_PRINT("error", ("log_archive failed (error %d)", error));
db_env->err(db_env, error, "log_archive");
if (error == DB_NOTFOUND)
error = 0; // No log files
goto err;
}
/* Error is 0 here */
if (all_logs) {
for (a = all_logs, f = free_logs; *a; ++a) {
if (f && *f && strcmp(*a, *f) == 0) {
f++;
if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_FREE))))
break;
} else {
if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_INUSE))))
break;
}
}
}
err:
if (all_logs)
free(all_logs);
if (free_logs)
free(free_logs);
free_root(&show_logs_root, MYF(0));
*root_ptr = old_mem_root;
TOKUDB_DBUG_RETURN(error);
}
bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type) {
switch (stat_type) {
case HA_ENGINE_LOGS:
return tokudb_show_logs(thd, stat_print);
default:
return FALSE;
}
}
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer) {
sql_print_error("%s: %s", db_errpfx, buffer);
}
void tokudb_cleanup_log_files(void) {
TOKUDB_DBUG_ENTER("tokudb_cleanup_log_files");
char **names;
int error;
if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0)))
my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error);
if ((error = db_env->log_archive(db_env, &names, 0)) != 0) {
DBUG_PRINT("error", ("log_archive failed (error %d)", error));
db_env->err(db_env, error, "log_archive");
DBUG_VOID_RETURN;
}
if (names) {
char **np;
for (np = names; *np; ++np) {
#if 1
if (tokudb_debug)
TOKUDB_TRACE("%s:cleanup:%s\n", __FUNCTION__, *np);
#else
my_delete(*np, MYF(MY_WME));
#endif
}
free(names);
}
DBUG_VOID_RETURN;
}
//
// *******NOTE*****
// If the flags HA_ONLINE_DROP_INDEX and HA_ONLINE_DROP_UNIQUE_INDEX
// are ever added, prepare_drop_index and final_drop_index will need to be modified
// so that the actual deletion of DB's is done in final_drop_index and not prepare_drop_index
//
static uint tokudb_alter_table_flags(uint flags)
{
return (HA_ONLINE_ADD_INDEX_NO_WRITES| HA_ONLINE_DROP_INDEX_NO_WRITES |
HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES| HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES);
}
// options flags
// PLUGIN_VAR_THDLOCAL Variable is per-connection
// PLUGIN_VAR_READONLY Server variable is read only
// PLUGIN_VAR_NOSYSVAR Not a server variable
// PLUGIN_VAR_NOCMDOPT Not a command line option
// PLUGIN_VAR_NOCMDARG No argument for cmd line
// PLUGIN_VAR_RQCMDARG Argument required for cmd line
// PLUGIN_VAR_OPCMDARG Argument optional for cmd line
// PLUGIN_VAR_MEMALLOC String needs memory allocated
// system variables
static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0);
static MYSQL_SYSVAR_UINT(cache_memory_percent, tokudb_cache_memory_percent, PLUGIN_VAR_READONLY, "Default percent of physical memory in the TokuDB cache table", NULL, NULL, tokudb_cache_memory_percent, 0, 100, 0);
static MYSQL_SYSVAR_ULONG(max_lock, tokudb_max_lock, PLUGIN_VAR_READONLY, "TokuDB Max Locks", NULL, NULL, 8 * 1024, 0, ~0L, 0);
static MYSQL_SYSVAR_ULONG(debug, tokudb_debug, PLUGIN_VAR_READONLY, "TokuDB Debug", NULL, NULL, 0, 0, ~0L, 0);
static MYSQL_SYSVAR_STR(log_dir, tokudb_log_dir, PLUGIN_VAR_READONLY, "TokuDB Log Directory", NULL, NULL, NULL);
static MYSQL_SYSVAR_STR(data_dir, tokudb_data_dir, PLUGIN_VAR_READONLY, "TokuDB Data Directory", NULL, NULL, NULL);
static MYSQL_SYSVAR_STR(version, tokudb_version, PLUGIN_VAR_READONLY, "TokuDB Version", NULL, NULL, NULL);
static MYSQL_SYSVAR_UINT(init_flags, tokudb_init_flags, PLUGIN_VAR_READONLY, "Sets TokuDB DB_ENV->open flags", NULL, NULL, tokudb_init_flags, 0, ~0, 0);
#if 0
static MYSQL_SYSVAR_ULONG(cache_parts, tokudb_cache_parts, PLUGIN_VAR_READONLY, "Sets TokuDB set_cache_parts", NULL, NULL, 0, 0, ~0L, 0);
// this is really a u_int32_t
// ? use MYSQL_SYSVAR_SET
static MYSQL_SYSVAR_UINT(env_flags, tokudb_env_flags, PLUGIN_VAR_READONLY, "Sets TokuDB env_flags", NULL, NULL, DB_LOG_AUTOREMOVE, 0, ~0, 0);
static MYSQL_SYSVAR_STR(home, tokudb_home, PLUGIN_VAR_READONLY, "Sets TokuDB env->open home", NULL, NULL, NULL);
// this is really a u_int32_t
//? use MYSQL_SYSVAR_SET
// this looks to be unused
static MYSQL_SYSVAR_LONG(lock_scan_time, tokudb_lock_scan_time, PLUGIN_VAR_READONLY, "Tokudb Lock Scan Time (UNUSED)", NULL, NULL, 0, 0, ~0L, 0);
// this is really a u_int32_t
//? use MYSQL_SYSVAR_ENUM
static MYSQL_SYSVAR_UINT(lock_type, tokudb_lock_type, PLUGIN_VAR_READONLY, "Sets set_lk_detect", NULL, NULL, DB_LOCK_DEFAULT, 0, ~0, 0);
static MYSQL_SYSVAR_ULONG(log_buffer_size, tokudb_log_buffer_size, PLUGIN_VAR_READONLY, "Tokudb Log Buffer Size", NULL, NULL, 0, 0, ~0L, 0);
static MYSQL_SYSVAR_ULONG(region_size, tokudb_region_size, PLUGIN_VAR_READONLY, "Tokudb Region Size", NULL, NULL, 128 * 1024, 0, ~0L, 0);
static MYSQL_SYSVAR_BOOL(shared_data, tokudb_shared_data, PLUGIN_VAR_READONLY, "Tokudb Shared Data", NULL, NULL, FALSE);
static MYSQL_SYSVAR_STR(tmpdir, tokudb_tmpdir, PLUGIN_VAR_READONLY, "Tokudb Tmp Dir", NULL, NULL, NULL);
#endif
static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(cache_size),
MYSQL_SYSVAR(cache_memory_percent),
MYSQL_SYSVAR(max_lock),
MYSQL_SYSVAR(data_dir),
MYSQL_SYSVAR(log_dir),
MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync),
MYSQL_SYSVAR(version),
MYSQL_SYSVAR(init_flags),
#if 0
MYSQL_SYSVAR(cache_parts),
MYSQL_SYSVAR(env_flags),
MYSQL_SYSVAR(home),
MYSQL_SYSVAR(lock_scan_time),
MYSQL_SYSVAR(lock_type),
MYSQL_SYSVAR(log_buffer_size),
MYSQL_SYSVAR(region_size),
MYSQL_SYSVAR(shared_data),
MYSQL_SYSVAR(tmpdir),
#endif
NULL
};
mysql_declare_plugin(tokudb) {
MYSQL_STORAGE_ENGINE_PLUGIN,
&storage_engine_structure,
"TokuDB",
"Tokutek Inc",
"Fractal trees, transactions, row level locks",
PLUGIN_LICENSE_PROPRIETARY, /* QQQ license? */
tokudb_init_func, /* plugin init */
tokudb_done_func, /* plugin deinit */
0x0200, /* QQQ 2.0 */
NULL, /* status variables */
tokudb_system_variables, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
#ifndef _HATOKU_HTON
#define _HATOKU_HTON
#include "db.h"
extern handlerton *tokudb_hton;
extern const char *ha_tokudb_ext;
extern char *tokudb_data_dir;
extern DB_ENV *db_env;
// thread variables
extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex;
#endif //#ifdef _HATOKU_HTON
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment