#ifdef USE_PRAGMA_INTERFACE
#pragma interface               /* gcc class implementation */
#endif

#define TOKU_INCLUDE_CHECKPOINT_LOCK 1


#if !defined(HA_CLUSTERING)
#define HA_CLUSTERING 0
#define HA_CLUSTERED_INDEX 0
#endif

#include <db.h>
#include "hatoku_cmp.h"

class ha_tokudb;

typedef struct loader_context {
    THD* thd;
    char write_status_msg[200];
    ha_tokudb* ha;
} *LOADER_CONTEXT;

typedef struct hot_optimize_context {
    THD *thd;
    char* write_status_msg;
    ha_tokudb *ha;
    uint current_table;
    uint num_tables;
} *HOT_OPTIMIZE_CONTEXT;

//
// This object stores table information that is to be shared
// among all ha_tokudb objects.
// There is one instance per table, shared among threads.
// Some of the variables here are the DB* pointers to indexes,
// and auto increment information.
//
typedef struct st_tokudb_share {
    char *table_name;
    uint table_name_length, use_count;
    pthread_mutex_t mutex;
    THR_LOCK lock;

    ulonglong auto_ident;
    ulonglong last_auto_increment, auto_inc_create_value;
    //
    // estimate on number of rows in table
    //
    ha_rows rows;
    //
    // estimate on number of rows added in the process of a locked tables
    // this is so we can better estimate row count during a lock table
    //
    ha_rows rows_from_locked_table;
    DB *status_block;
    //
    // DB that is indexed on the primary key
    //
    DB *file;
    //
    // array of all DB's that make up table, includes DB that
    // is indexed on the primary key, add 1 in case primary
    // key is hidden
    //
    DB *key_file[MAX_KEY +1];
    rw_lock_t key_file_lock;
    uint status, version, capabilities;
    uint ref_length;
    //
    // whether table has an auto increment column
    //
    bool has_auto_inc;
    //
    // index of auto increment column in table->field, if auto_inc exists
    //
    uint ai_field_index;

    KEY_AND_COL_INFO kc_info;
    
    // 
    // we want the following optimization for bulk loads, if the table is empty, 
    // attempt to grab a table lock. emptiness check can be expensive, 
    // so we try it once for a table. After that, we keep this variable around 
    // to tell us to not try it again. 
    // 
    bool try_table_lock; 

    bool has_unique_keys;
    bool replace_into_fast;
    rw_lock_t num_DBs_lock;
    u_int32_t num_DBs;
} TOKUDB_SHARE;

#define HA_TOKU_ORIG_VERSION 4
#define HA_TOKU_VERSION 4
//
// no capabilities yet
//
#define HA_TOKU_CAP 0

//
// These are keys that will be used for retrieving metadata in status.tokudb
// To get the version, one looks up the value associated with key hatoku_version
// in status.tokudb
//

typedef ulonglong HA_METADATA_KEY;
#define hatoku_old_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
#define hatoku_key_name 4
#define hatoku_frm_data 5
#define hatoku_new_version 6

typedef struct st_filter_key_part_info {
    uint offset;
    uint part_index;
} FILTER_KEY_PART_INFO;

typedef enum {
    lock_read = 0,
    lock_write
} TABLE_LOCK_TYPE;

int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx);
int generate_row_for_del(
    DB *dest_db, 
    DB *src_db,
    DBT *dest_key,
    const DBT *src_key, 
    const DBT *src_val
    );
int generate_row_for_put(
    DB *dest_db, 
    DB *src_db,
    DBT *dest_key, 
    DBT *dest_val,
    const DBT *src_key, 
    const DBT *src_val
    ); 
int tokudb_update_fun(
    DB* db,
    const DBT *key,
    const DBT *old_val, 
    const DBT *extra,
    void (*set_val)(const DBT *new_val, void *set_extra),
    void *set_extra
    );

// the number of rows bulk fetched in one callback grows exponentially
// with the bulk fetch iteration, so the max iteration is the max number
// of shifts we can perform on a 64 bit integer.
#define HA_TOKU_BULK_FETCH_ITERATION_MAX 63

class ha_tokudb : public handler {
private:
    THR_LOCK_DATA lock;         ///< MySQL lock
    TOKUDB_SHARE *share;        ///< Shared lock info

    //
    // last key returned by ha_tokudb's cursor
    //
    DBT last_key;
    //
    // pointer used for multi_alloc of key_buff, key_buff2, primary_key_buff
    //
    void *alloc_ptr;
    //
    // buffer used to temporarily store a "packed row" 
    // data pointer of a DBT will end up pointing to this
    // see pack_row for usage
    //
    uchar *rec_buff;
    //
    // number of bytes allocated in rec_buff
    //
    ulong alloced_rec_buff_length;
    //
    // same as above two, but for updates
    //
    uchar *rec_update_buff;
    ulong alloced_update_rec_buff_length;
    u_int32_t max_key_length;

    uchar* range_query_buff; // range query buffer
    u_int32_t size_range_query_buff; // size of the allocated range query buffer
    u_int32_t bytes_used_in_range_query_buff; // number of bytes used in the range query buffer
    u_int32_t curr_range_query_buff_offset; // current offset into the range query buffer for queries to read
    uint64_t bulk_fetch_iteration;
    uint64_t rows_fetched_using_bulk_fetch;
    bool doing_bulk_fetch;

    //
    // buffer used to temporarily store a "packed key" 
    // data pointer of a DBT will end up pointing to this
    //
    uchar *key_buff; 
    //
    // buffer used to temporarily store a "packed key" 
    // data pointer of a DBT will end up pointing to this
    // This is used in functions that require the packing
    // of more than one key
    //
    uchar *key_buff2; 
    uchar *key_buff3; 
    //
    // buffer used to temporarily store a "packed key" 
    // data pointer of a DBT will end up pointing to this
    // currently this is only used for a primary key in
    // the function update_row, hence the name. It 
    // does not carry any state throughout the class.
    //
    uchar *primary_key_buff;

    //
    // ranges of prelocked area, used to know how much to bulk fetch
    //
    uchar *prelocked_left_range; 
    u_int32_t prelocked_left_range_size;
    uchar *prelocked_right_range; 
    u_int32_t prelocked_right_range_size;


    //
    // individual DBTs for each index
    //
    DBT mult_key_dbt[2*(MAX_KEY + 1)];
    DBT mult_rec_dbt[MAX_KEY + 1];
    u_int32_t mult_put_flags[MAX_KEY + 1];
    u_int32_t mult_del_flags[MAX_KEY + 1];
    u_int32_t mult_dbt_flags[MAX_KEY + 1];
    

    //
    // when unpacking blobs, we need to store it in a temporary
    // buffer that will persist because MySQL just gets a pointer to the 
    // blob data, a pointer we need to ensure is valid until the next
    // query
    //
    uchar* blob_buff;
    u_int32_t num_blob_bytes;

    bool unpack_entire_row;

    //
    // buffers (and their sizes) that will hold the indexes
    // of fields that need to be read for a query
    //
    u_int32_t* fixed_cols_for_query;
    u_int32_t num_fixed_cols_for_query;
    u_int32_t* var_cols_for_query;
    u_int32_t num_var_cols_for_query;
    bool read_blobs;
    bool read_key;

    //
    // transaction used by ha_tokudb's cursor
    //
    DB_TXN *transaction;
    bool is_fast_alter_running;

    // external_lock will set this true for read operations that will be closely followed by write operations.
    bool use_write_locks; // use write locks for reads

    //
    // instance of cursor being used for init_xxx and rnd_xxx functions
    //
    DBC *cursor;
    u_int32_t cursor_flags; // flags for cursor
    //
    // flags that are returned in table_flags()
    //
    ulonglong int_table_flags;
    // 
    // count on the number of rows that gets changed, such as when write_row occurs
    // this is meant to help keep estimate on number of elements in DB
    // 
    ulonglong added_rows;
    ulonglong deleted_rows;


    uint last_dup_key;
    //
    // if set to 0, then the primary key is not hidden
    // if non-zero (not necessarily 1), primary key is hidden
    //
    uint hidden_primary_key;
    bool key_read, using_ignore;

    //
    // After a cursor encounters an error, the cursor will be unusable
    // In case MySQL attempts to do a cursor operation (such as rnd_next
    // or index_prev), we will gracefully return this error instead of crashing
    //
    int last_cursor_error;

    //
    // For instances where we successfully prelock a range or a table,
    // we set this to TRUE so that successive cursor calls can know
    // know to limit the locking overhead in a call to the fractal tree
    //
    bool range_lock_grabbed;

    //
    // For bulk inserts, we want option of not updating auto inc
    // until all inserts are done. By default, is false
    //
    bool delay_updating_ai_metadata; // if true, don't update auto-increment metadata until bulk load completes
    bool ai_metadata_update_required; // if true, autoincrement metadata must be updated 

    //
    // buffer for updating the status of long insert, delete, and update
    // statements. Right now, the the messages are 
    // "[inserted|updated|deleted] about %llu rows",
    // so a buffer of 200 is good enough.
    //
    char write_status_msg[200]; //buffer of 200 should be a good upper bound.
    struct loader_context lc;

    DB_LOADER* loader;
    bool abort_loader;
    int loader_error;

    bool num_DBs_locked_in_bulk;
    u_int32_t lock_count;
    
    bool fix_rec_buff_for_blob(ulong length);
    bool fix_rec_update_buff_for_blob(ulong length);
    uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];

    ulong max_row_length(const uchar * buf);
    int pack_row_in_buff(
        DBT * row, 
        const uchar* record,
        uint index,
        uchar* row_buff
        );
    int pack_row(
        DBT * row, 
        const uchar* record,
        uint index
        );
    int pack_old_row_for_update(
        DBT * row, 
        const uchar* record,
        uint index
        );
    u_int32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data);
    void unpack_key(uchar * record, DBT const *key, uint index);
    u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
    DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, bool dont_pack_pk, int key_length = MAX_KEY_LENGTH);
    DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
    DBT* create_dbt_key_for_lookup(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
    DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, int8_t inf_byte);
    bool key_changed(uint keynr, const uchar * old_row, const uchar * new_row);
    int handle_cursor_error(int error, int err_to_return, uint keynr);
    DBT *get_pos(DBT * to, uchar * pos);
 
    int open_main_dictionary(const char* name, bool is_read_only, DB_TXN* txn);
    int open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, bool is_read_only, DB_TXN* txn);
    int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
    int estimate_num_rows(DB* db, u_int64_t* num_rows, DB_TXN* txn);
    bool has_auto_increment_flag(uint* index);

    int write_frm_data(DB* db, DB_TXN* txn, const char* frm_name);
    int verify_frm_data(const char* frm_name, DB_TXN* trans);
    int remove_frm_data(DB *db, DB_TXN *txn);

    int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn);
    int remove_from_status(DB* db, HA_METADATA_KEY curr_key_data, DB_TXN* txn);

    int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size, DB_TXN* txn);
    int remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction);

    int update_max_auto_inc(DB* db, ulonglong val);
    int remove_key_name_from_status(DB* status_block, char* key_name, DB_TXN* txn);
    int write_key_name_to_status(DB* status_block, char* key_name, DB_TXN* txn);
    int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn);
    void init_auto_increment();
    bool can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info, uint pk);
    int initialize_share(
        const char* name,
        int mode
        );

    void set_query_columns(uint keynr);
    int prelock_range (const key_range *start_key, const key_range *end_key);
    int create_txn(THD* thd, tokudb_trx_data* trx);
    bool may_table_be_empty(DB_TXN *txn);
    int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
    int delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
    int truncate_dictionary( uint keynr, DB_TXN* txn );
    int create_secondary_dictionary(
        const char* name, 
        TABLE* form, 
        KEY* key_info, 
        DB_TXN* txn, 
        KEY_AND_COL_INFO* kc_info, 
        u_int32_t keynr, 
        bool is_hot_index
        );
    int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info);
    void trace_create_table_info(const char *name, TABLE * form);
    int is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_info);
    int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
    int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
    // XXX 3414 remove last parameter
    void set_main_dict_put_flags(THD* thd, bool opt_eligible, u_int32_t* put_flags);
    int insert_row_to_main_dictionary(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
    int insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN* txn, THD* thd);
    void test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val);
    u_int32_t fill_row_mutator(
        uchar* buf, 
        u_int32_t* dropped_columns, 
        u_int32_t num_dropped_columns,
        TABLE* altered_table,
        KEY_AND_COL_INFO* altered_kc_info,
        u_int32_t keynr,
        bool is_add
        );

 
public:
    ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
    ~ha_tokudb();

    const char *table_type() const;
    const char *index_type(uint inx);
    const char **bas_ext() const;

    //
    // Returns a bit mask of capabilities of storage engine. Capabilities 
    // defined in sql/handler.h
    //
    ulonglong table_flags(void) const;
    
    ulong index_flags(uint inx, uint part, bool all_parts) const;

    //
    // Returns limit on the number of keys imposed by tokudb.
    //
    uint max_supported_keys() const {
        return MAX_KEY;
    } 

    uint extra_rec_buf_length() const {
        return TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
    } 
    ha_rows estimate_rows_upper_bound();

    //
    // Returns the limit on the key length imposed by tokudb.
    //
    uint max_supported_key_length() const {
        return UINT_MAX32;
    } 

    //
    // Returns limit on key part length imposed by tokudb.
    //
    uint max_supported_key_part_length() const {
        return UINT_MAX32;
    } 
    const key_map *keys_to_use_for_scanning() {
        return &key_map_full;
    }

    double scan_time();
    double keyread_time(uint index, uint ranges, ha_rows rows);
    double read_time(uint index, uint ranges, ha_rows rows);

    int open(const char *name, int mode, uint test_if_locked);
    int close(void);
    void update_create_info(HA_CREATE_INFO* create_info);
    int create(const char *name, TABLE * form, HA_CREATE_INFO * create_info);
    int delete_table(const char *name);
    int rename_table(const char *from, const char *to);
    int optimize(THD * thd, HA_CHECK_OPT * check_opt);
#if 0
    int analyze(THD * thd, HA_CHECK_OPT * check_opt);
#endif
    int write_row(uchar * buf);
    int update_row(const uchar * old_data, uchar * new_data);
    int delete_row(const uchar * buf);

    void start_bulk_insert(ha_rows rows);
    int end_bulk_insert();
    int end_bulk_insert(bool abort);

    int prepare_index_scan();
    int prepare_index_key_scan( const uchar * key, uint key_len );
    int prepare_range_scan( const key_range *start_key, const key_range *end_key);
    void column_bitmaps_signal();
    int index_init(uint index, bool sorted);
    int index_end();
    int index_next_same(uchar * buf, const uchar * key, uint keylen); 
    int index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag);
    int index_read_last(uchar * buf, const uchar * key, uint key_len);
    int index_next(uchar * buf);
    int index_prev(uchar * buf);
    int index_first(uchar * buf);
    int index_last(uchar * buf);

    int rnd_init(bool scan);
    int rnd_end();
    int rnd_next(uchar * buf);
    int rnd_pos(uchar * buf, uchar * pos);

    int read_range_first(const key_range *start_key,
                                 const key_range *end_key,
                                 bool eq_range, bool sorted);
    int read_range_next();


    void position(const uchar * record);
    int info(uint);
    int extra(enum ha_extra_function operation);
    int reset(void);
    int external_lock(THD * thd, int lock_type);
    int start_stmt(THD * thd, thr_lock_type lock_type);

    ha_rows records_in_range(uint inx, key_range * min_key, key_range * max_key);

    u_int32_t get_cursor_isolation_flags(enum thr_lock_type lock_type, THD* thd);
    THR_LOCK_DATA **store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type);

    int get_status(DB_TXN* trans);
    void init_hidden_prim_key_info();
    inline void get_auto_primary_key(uchar * to) {
        pthread_mutex_lock(&share->mutex);
        share->auto_ident++;
        hpk_num_to_char(to, share->auto_ident);
        pthread_mutex_unlock(&share->mutex);
    }
    virtual void get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values);
    bool is_optimize_blocking();
    bool is_auto_inc_singleton();
    void print_error(int error, myf errflag);
    uint8 table_cache_type() {
        return HA_CACHE_TBL_TRANSACT;
    }
    bool primary_key_is_clustered() {
        return true;
    }
    bool supports_clustered_keys() {
        return true;
    }
    int cmp_ref(const uchar * ref1, const uchar * ref2);
    bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes);

#if MYSQL_VERSION_ID >= 50521
    int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys, handler_add_index **add);
    int final_add_index(handler_add_index *add, bool commit);
#else
    int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys);
#endif

    int tokudb_add_index(
        TABLE *table_arg, 
        KEY *key_info, 
        uint num_of_keys, 
        DB_TXN* txn, 
        bool* inc_num_DBs,
        bool* modified_DB
        ); 
    void restore_add_index(TABLE* table_arg, uint num_of_keys, bool incremented_numDBs, bool modified_DBs);
    int drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, DB_TXN* txn);
    int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys);
    void restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys);
    int final_drop_index(TABLE *table_arg);

#if MYSQL_VERSION_ID >= 50521
    bool is_alter_table_hot();
#endif

    void prepare_for_alter();

#if defined(HA_GENERAL_ONLINE)
    void print_alter_info(
        TABLE *altered_table,
        HA_CREATE_INFO *create_info,
        HA_ALTER_FLAGS *alter_flags,
        uint table_changes
        );
    int check_if_supported_alter(TABLE *altered_table,
         HA_CREATE_INFO *create_info,
         HA_ALTER_FLAGS *alter_flags,
         HA_ALTER_INFO  *alter_info,
         uint table_changes
         );
    int alter_table_phase1(THD *thd,
                                   TABLE *altered_table,
                                   HA_CREATE_INFO *create_info,
                                   HA_ALTER_INFO *alter_info,
                                   HA_ALTER_FLAGS *alter_flags)
    {
      return 0;
    }
    int alter_table_phase2(THD *thd,
                                   TABLE *altered_table,
                                   HA_CREATE_INFO *create_info,
                                   HA_ALTER_INFO *alter_info,
			           HA_ALTER_FLAGS *alter_flags);
    int alter_table_phase3(THD *thd, TABLE *table)
    {
      return 0;
    }
#endif

    // delete all rows from the table
    // effect: all dictionaries, including the main and indexes, should be empty
    int discard_or_import_tablespace(my_bool discard);
    int truncate();
    int delete_all_rows();
    void extract_hidden_primary_key(uint keynr, DBT const *found_key);
    void read_key_only(uchar * buf, uint keynr, DBT const *found_key);
    int read_row_callback (uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
    int read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
    int unpack_blobs(
        uchar* record,
        const uchar* from_tokudb_blob,
        u_int32_t num_blob_bytes,
        bool check_bitmap
        );
    int unpack_row(
        uchar* record, 
        DBT const *row, 
        DBT const *key,
        uint index
        );

    int prefix_cmp_dbts( uint keynr, const DBT* first_key, const DBT* second_key) {
        return tokudb_prefix_cmp_dbt_key(share->key_file[keynr], first_key, second_key);
    }

    void track_progress(THD* thd);
    void set_loader_error(int err);
    void set_dup_value_for_pk(DBT* key);


    //
    // index into key_file that holds DB* that is indexed on
    // the primary_key. this->key_file[primary_index] == this->file
    //
    uint primary_key;

    int check(THD *thd, HA_CHECK_OPT *check_opt);

    int fill_range_query_buf(
        bool need_val, 
        DBT const *key, 
        DBT  const *row, 
        int direction,
        THD* thd
        );

    enum row_type get_row_type();

private:
    int read_full_row(uchar * buf);
    int __close(int mutex_is_locked);
    int get_next(uchar* buf, int direction);
    int read_data_from_range_query_buff(uchar* buf, bool need_val);
    void invalidate_bulk_fetch();
};

int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);


#if MYSQL_VERSION_ID >= 50506

static inline void my_free(void *p, int arg) {
    my_free(p);
}

static inline void *memcpy_fixed(void *a, const void *b, size_t n) {
    return memcpy(a, b, n);
}

#endif