Commit 7b16d5c1 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3400], merge storing of frm file into main line

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@30228 c7de825b-a66e-492c-adef-691d508d4ae1
parent 656714c9
...@@ -1265,7 +1265,7 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) { ...@@ -1265,7 +1265,7 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) {
return ai_found; return ai_found;
} }
int ha_tokudb::open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) { int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) {
int error; int error;
char* newname = NULL; char* newname = NULL;
uint open_mode = DB_THREAD; uint open_mode = DB_THREAD;
...@@ -1609,6 +1609,25 @@ int ha_tokudb::initialize_share( ...@@ -1609,6 +1609,25 @@ int ha_tokudb::initialize_share(
goto exit; goto exit;
} }
error = get_status();
if (error) {
goto exit;
}
if (share->version < HA_TOKU_VERSION) {
error = ENOSYS;
goto exit;
}
//
// verify frm file is what we expect it to be
// only for tables that are not partitioned
//
if (table->part_info == NULL) {
error = verify_frm_data(table->s->path.str);
if (error) {
goto exit;
}
}
error = initialize_key_and_col_info( error = initialize_key_and_col_info(
table_share, table_share,
table, table,
...@@ -1661,15 +1680,6 @@ int ha_tokudb::initialize_share( ...@@ -1661,15 +1680,6 @@ int ha_tokudb::initialize_share(
} }
share->ref_length = ref_length; share->ref_length = ref_length;
error = get_status();
if (error) {
goto exit;
}
if (share->version < HA_TOKU_VERSION) {
error = ENOSYS;
goto exit;
}
error = estimate_num_rows(share->file,&num_rows, NULL); error = estimate_num_rows(share->file,&num_rows, NULL);
// //
// estimate_num_rows should not fail under normal conditions // estimate_num_rows should not fail under normal conditions
...@@ -2016,7 +2026,86 @@ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_d ...@@ -2016,7 +2026,86 @@ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_d
return error; return error;
} }
int ha_tokudb::write_frm_data(DB* db, DB_TXN* txn, const char* frm_name) {
uchar* frm_data = NULL;
size_t frm_len = 0;
int error = 0;
TOKUDB_DBUG_ENTER("ha_tokudb::write_frm_data, %s", frm_name);
error = readfrm(frm_name,&frm_data,&frm_len);
if (error) { goto cleanup; }
error = write_to_status(db,hatoku_frm_data,frm_data,(uint)frm_len, txn);
if (error) { goto cleanup; }
error = 0;
cleanup:
my_free(frm_data, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
}
int ha_tokudb::verify_frm_data(const char* frm_name) {
uchar* mysql_frm_data = NULL;
size_t mysql_frm_len = 0;
uchar* stored_frm_data = NULL;
size_t stored_frm_len = 0;
DBT key, value;
int error = 0;
DB_TXN* txn = NULL;
HA_METADATA_KEY curr_key = hatoku_frm_data;
TOKUDB_DBUG_ENTER("ha_tokudb::verify_frm_data %s", frm_name);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
bzero(&key, sizeof(key));
bzero(&value, sizeof(value));
// get the frm data from MySQL
error = readfrm(frm_name,&mysql_frm_data,&mysql_frm_len);
if (error) { goto cleanup; }
// TODO: get the frm data that we have stored
key.data = &curr_key;
key.size = sizeof(curr_key);
value.flags = DB_DBT_MALLOC;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
// if not found, write it
error = write_frm_data(
share->status_block,
txn,
frm_name
);
goto cleanup;
}
else if (error) {
goto cleanup;
}
stored_frm_len = value.size;
stored_frm_data = (uchar *)value.data;
if (stored_frm_len != mysql_frm_len ||
memcmp(stored_frm_data, mysql_frm_data, stored_frm_len))
{
error = HA_ERR_TABLE_DEF_CHANGED;
goto cleanup;
}
error = 0;
cleanup:
if (txn) {
commit_txn(txn, 0);
}
my_free(mysql_frm_data, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
}
// //
// Updates status.tokudb with a new max value used for the auto increment column // Updates status.tokudb with a new max value used for the auto increment column
...@@ -6005,10 +6094,18 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -6005,10 +6094,18 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
char* newname = NULL; char* newname = NULL;
KEY_AND_COL_INFO kc_info; KEY_AND_COL_INFO kc_info;
bool create_from_engine= (create_info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
bzero(&kc_info, sizeof(kc_info)); bzero(&kc_info, sizeof(kc_info));
pthread_mutex_lock(&tokudb_meta_mutex); pthread_mutex_lock(&tokudb_meta_mutex);
if (create_from_engine) {
// table already exists, nothing to do
error = 0;
goto cleanup;
}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;} if (newname == NULL){ error = ENOMEM; goto cleanup;}
...@@ -6045,6 +6142,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -6045,6 +6142,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn); error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
// only for tables that are not partitioned
if (form->part_info == NULL) {
error = write_frm_data(status_block, txn, form->s->path.str);
if (error) { goto cleanup; }
}
error = allocate_key_and_col_info(form->s, &kc_info); error = allocate_key_and_col_info(form->s, &kc_info);
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -8581,6 +8683,12 @@ int ha_tokudb::alter_table_phase2( ...@@ -8581,6 +8683,12 @@ int ha_tokudb::alter_table_phase2(
} }
} }
// update frm file
// only for tables that are not partitioned
if (altered_table->part_info == NULL) {
error = write_frm_data(share->status_block, txn, altered_table->s->path.str);
if (error) { goto cleanup; }
}
if (thd->killed) { if (thd->killed) {
error = ER_ABORTING_CONNECTION; error = ER_ABORTING_CONNECTION;
goto cleanup; goto cleanup;
......
...@@ -103,6 +103,7 @@ typedef ulonglong HA_METADATA_KEY; ...@@ -103,6 +103,7 @@ typedef ulonglong HA_METADATA_KEY;
#define hatoku_max_ai 2 //maximum auto increment value found so far #define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3 #define hatoku_ai_create_value 3
#define hatoku_key_name 4 #define hatoku_key_name 4
#define hatoku_frm_data 5
typedef struct st_filter_key_part_info { typedef struct st_filter_key_part_info {
uint offset; uint offset;
...@@ -329,10 +330,11 @@ class ha_tokudb : public handler { ...@@ -329,10 +330,11 @@ class ha_tokudb : public handler {
int open_main_dictionary(const char* name, bool is_read_only, DB_TXN* txn); int open_main_dictionary(const char* name, bool is_read_only, DB_TXN* txn);
int open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, bool is_read_only, DB_TXN* txn); int open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, bool is_read_only, DB_TXN* txn);
int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);
int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt); int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
int estimate_num_rows(DB* db, u_int64_t* num_rows, DB_TXN* txn); int estimate_num_rows(DB* db, u_int64_t* num_rows, DB_TXN* txn);
bool has_auto_increment_flag(uint* index); bool has_auto_increment_flag(uint* index);
int write_frm_data(DB* db, DB_TXN* txn, const char* frm_name);
int verify_frm_data(const char* frm_name);
int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn ); int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn );
int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size, DB_TXN* txn ); int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size, DB_TXN* txn );
int remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction); int remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction);
...@@ -605,6 +607,9 @@ class ha_tokudb : public handler { ...@@ -605,6 +607,9 @@ class ha_tokudb : public handler {
int read_last(uint keynr); int read_last(uint keynr);
}; };
int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);
#if MYSQL_VERSION_ID >= 50506 #if MYSQL_VERSION_ID >= 50506
static inline void my_free(void *p, int arg) { static inline void my_free(void *p, int arg) {
......
...@@ -173,6 +173,10 @@ static uint tokudb_alter_table_flags(uint flags); ...@@ -173,6 +173,10 @@ static uint tokudb_alter_table_flags(uint flags);
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_discover(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
size_t *frmlen);
handlerton *tokudb_hton; handlerton *tokudb_hton;
const char *ha_tokudb_ext = ".tokudb"; const char *ha_tokudb_ext = ".tokudb";
...@@ -259,6 +263,8 @@ static int tokudb_init_func(void *p) { ...@@ -259,6 +263,8 @@ static int tokudb_init_func(void *p) {
tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint; tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint;
tokudb_hton->savepoint_release = tokudb_release_savepoint; tokudb_hton->savepoint_release = tokudb_release_savepoint;
tokudb_hton->discover = tokudb_discover;
tokudb_hton->commit = tokudb_commit; tokudb_hton->commit = tokudb_commit;
tokudb_hton->rollback = tokudb_rollback; tokudb_hton->rollback = tokudb_rollback;
tokudb_hton->panic = tokudb_end; tokudb_hton->panic = tokudb_end;
...@@ -785,6 +791,57 @@ static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoin ...@@ -785,6 +791,57 @@ static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoin
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
static int tokudb_discover(handlerton *hton, THD* thd, const char *db,
const char *name,
uchar **frmblob,
size_t *frmlen)
{
TOKUDB_DBUG_ENTER("tokudb_discover");
int error;
DB* status_db = NULL;
DB_TXN* txn = NULL;
// TODO: open status dictionary, read frmdata, and pass it back, and figure out how to free it
char path[FN_REFLEN + 1];
uchar* saved_frm_data = NULL;
HA_METADATA_KEY curr_key = hatoku_frm_data;
DBT key, value;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
build_table_filename(path, sizeof(path) - 1, db, name, "", 0);
error = open_status_dictionary(&status_db, path, txn);
if (error) { goto cleanup; }
key.data = &curr_key;
key.size = sizeof(curr_key);
value.flags = DB_DBT_MALLOC;
error = status_db->get(
status_db,
txn,
&key,
&value,
0
);
if (error) {
goto cleanup;
}
saved_frm_data = (uchar *)my_malloc(value.size, MYF(MY_WME));
memcpy(saved_frm_data, value.data, value.size);
*frmblob = saved_frm_data;
*frmlen = value.size;
error = 0;
cleanup:
if (status_db) {
status_db->close(status_db,0);
}
if (txn) {
commit_txn(txn, 0);
}
TOKUDB_DBUG_RETURN(error);
}
static int smart_dbt_do_nothing (DBT const *key, DBT const *row, void *context) { static int smart_dbt_do_nothing (DBT const *key, DBT const *row, void *context) {
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment