Commit 9c9c5f98 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1047 #1056

use smart DBT's for add_index and ha_tokudb cursor operations

git-svn-id: file:///svn/mysql/tokudb-engine/src@5433 c7de825b-a66e-492c-adef-691d508d4ae1
parent 52abaa22
...@@ -687,7 +687,7 @@ static void make_name(char *newname, const char *tablename, const char *dictname ...@@ -687,7 +687,7 @@ static void make_name(char *newname, const char *tablename, const char *dictname
} }
#define CHECK_VALID_CURSOR() \ #define HANDLE_INVALID_CURSOR() \
if (cursor == NULL) { \ if (cursor == NULL) { \
error = last_cursor_error; \ error = last_cursor_error; \
goto cleanup; \ goto cleanup; \
...@@ -946,14 +946,80 @@ int primary_key_part_compare (const void* left, const void* right) { ...@@ -946,14 +946,80 @@ int primary_key_part_compare (const void* left, const void* right) {
return left_part->offset - right_part->offset; return left_part->offset - right_part->offset;
} }
//
// struct that will be used as a context for smart DBT callbacks
// contains parameters needed to complete the smart DBT cursor call
//
typedef struct smart_dbt_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
uchar* buf; // output buffer where row will be written
uint keynr; // index into share->key_file that represents DB we are currently operating on
} *SMART_DBT_INFO;
//
// struct that will be used as a context for smart DBT callbacks
// ONLY for the function add_index
//
typedef struct smart_dbt_ai_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
DBT* prim_key; // DBT to store the primary key
uchar* buf; // buffer to unpack the row
} *SMART_DBT_AI_INFO;
static void smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
info->ha->unpack_row(info->buf,row,key);
//
// copy the key to prim_key
//
info->prim_key->size = key->size;
memcpy(info->prim_key->data, key->data, key->size);
}
//
// Smart DBT callback function in case where we have a covering index
//
static void smart_dbt_callback_keyread(DBT const *key, DBT const *row, void *context) {
SMART_DBT_INFO info = (SMART_DBT_INFO)context;
info->ha->extract_hidden_primary_key(info->keynr, row, key);
info->ha->read_key_only(info->buf,info->keynr,row,key);
}
//
// Smart DBT callback function in case where we do NOT have a covering index
//
static void smart_dbt_callback_rowread(DBT const *key, DBT const *row, void *context) {
SMART_DBT_INFO info = (SMART_DBT_INFO)context;
info->ha->extract_hidden_primary_key(info->keynr, row, key);
info->ha->read_primary_key(info->buf,info->keynr,row,key);
}
//
// Smart DBT callback function in c_getf_heavi, in case where we have a covering index,
//
static void smart_dbt_callback_keyread_heavi(DBT const *key, DBT const *row, void *context, int r_h) {
smart_dbt_callback_keyread(key,row,context);
}
//
// Smart DBT callback function in c_getf_heavi, in case where we do NOT have a covering index
//
static void smart_dbt_callback_rowread_heavi(DBT const *key, DBT const *row, void *context, int r_h) {
smart_dbt_callback_rowread(key,row,context);
}
//
// macro for Smart DBT callback function,
// so we do not need to put this long line of code in multiple places
//
#define SMART_DBT_CALLBACK ( this->key_read ? smart_dbt_callback_keyread : smart_dbt_callback_rowread )
// //
// macro that modifies read flag for cursor operations depending on whether // macro that modifies read flag for cursor operations depending on whether
// we have preacquired lock or not // we have preacquired lock or not
// //
#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? (flg | DB_PRELOCKED) : flg) #define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? ((flg) | DB_PRELOCKED) : (flg))
// //
// Open a secondary table, the key will be a secondary index, the data will be a primary key // Open a secondary table, the key will be a secondary index, the data will be a primary key
...@@ -1458,7 +1524,7 @@ cleanup: ...@@ -1458,7 +1524,7 @@ cleanup:
// [out] record - row in MySQL format // [out] record - row in MySQL format
// [in] row - row stored in DBT to be converted // [in] row - row stored in DBT to be converted
// //
void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) { void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
// //
// two cases, fixed length row, and variable length row // two cases, fixed length row, and variable length row
// fixed length row is first below // fixed length row is first below
...@@ -1548,7 +1614,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) { ...@@ -1548,7 +1614,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) {
// index -index into key_file that represents the DB // index -index into key_file that represents the DB
// unpacking a key of // unpacking a key of
// //
void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) { void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
KEY *key_info = table->key_info + index; KEY *key_info = table->key_info + index;
KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts; KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts;
uchar *pos = (uchar *) key->data; uchar *pos = (uchar *) key->data;
...@@ -2331,7 +2397,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -2331,7 +2397,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
range_lock_grabbed = false; range_lock_grabbed = false;
DBUG_ASSERT(keynr <= table->s->keys); DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(share->key_file[keynr]); DBUG_ASSERT(share->key_file[keynr]);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0))) { if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
last_cursor_error = error; last_cursor_error = error;
cursor = NULL; // Safety cursor = NULL; // Safety
} }
...@@ -2356,40 +2422,32 @@ int ha_tokudb::index_end() { ...@@ -2356,40 +2422,32 @@ int ha_tokudb::index_end() {
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
//
// The funtion read_row checks whether the row was obtained from the primary table or int ha_tokudb::handle_cursor_error(int error, int err_to_return, uint keynr) {
// from an index table. If it was obtained from an index table, it further dereferences on TOKUDB_DBUG_ENTER("ha_tokudb::handle_cursor_error");
// the main table. In the end, the read_row function will manage to return the actual row
// of interest in the buf parameter.
//
// Parameters:
// error - result of preceding DB call
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
// read_next - if true, DB_NOTFOUND and DB_KEYEMPTY map to HA_ERR_END_OF_FILE,
// else HA_ERR_KEY_NOT_FOUND, this is a bad parameter to have and this funcitonality
// should not be here
//
int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * found_key, bool read_next) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_row");
//
// Disreputable error translation: this makes us all puke
//
if (error) { if (error) {
last_cursor_error = error; last_cursor_error = error;
table->status = STATUS_NOT_FOUND; table->status = STATUS_NOT_FOUND;
cursor->c_close(cursor); cursor->c_close(cursor);
cursor = NULL; cursor = NULL;
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) { if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = read_next ? HA_ERR_END_OF_FILE : HA_ERR_KEY_NOT_FOUND; error = err_to_return;
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0))) { if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
cursor = NULL; // Safety cursor = NULL; // Safety
} }
} }
TOKUDB_DBUG_RETURN(error);
} }
TOKUDB_DBUG_RETURN(error);
}
//
// Helper function for read_row and smart_dbt_callback_xxx functions
// When using a hidden primary key, upon reading a row,
// we set the current_ident field to whatever the primary key we retrieved
// was
//
void ha_tokudb::extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key) {
// //
// extract hidden primary key to current_ident // extract hidden primary key to current_ident
// //
...@@ -2401,6 +2459,100 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou ...@@ -2401,6 +2459,100 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
memcpy_fixed(current_ident, (char *) row->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); memcpy_fixed(current_ident, (char *) row->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
} }
} }
}
//
// Reads the contents of row and found_key, DBT's retrieved from the DB associated to keynr, into buf
// This function assumes that we are using a covering index, as a result, if keynr is the primary key,
// we do not read row into buf
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_key_only");
table->status = 0;
unpack_key(buf, found_key, keynr);
if (!hidden_primary_key && (keynr != primary_key)) {
unpack_key(buf, row, primary_key);
}
DBUG_VOID_RETURN;
}
//
// Helper function used to try to retrieve the entire row
// If keynr is associated with the main table, reads contents of found_key and row into buf, otherwise,
// makes copy of primary key and saves it to last_key. This can later be used to retrieve the entire row
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key");
table->status = 0;
if (keynr != primary_key) {
//
// create a DBT that has the same data as row,
//
bzero((void *) &last_key, sizeof(last_key));
last_key.data = key_buff;
last_key.size = row->size;
memcpy(key_buff, row->data, row->size);
}
else {
unpack_row(buf, row, found_key);
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
DBUG_VOID_RETURN;
}
//
// This function reads an entire row into buf. This function also assumes that
// the key needed to retrieve the row is stored in the member variable last_key
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::read_full_row(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_full_row");
int error;
//
// Read the data into current_row, assumes key is stored in this->last_key
//
current_row.flags = DB_DBT_REALLOC;
if ((error = share->file->get(share->file, transaction, &last_key, &current_row, 0))) {
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &last_key);
TOKUDB_DBUG_RETURN(0);
}
//
// The funtion read_row checks whether the row was obtained from the primary table or
// from an index table. If it was obtained from an index table, it further dereferences on
// the main table. In the end, the read_row function will manage to return the actual row
// of interest in the buf parameter.
//
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_row");
int error;
extract_hidden_primary_key(keynr, row, found_key);
table->status = 0; table->status = 0;
// //
// if the index shows that the table we read the row from was indexed on the primary key, // if the index shows that the table we read the row from was indexed on the primary key,
...@@ -2436,8 +2588,6 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou ...@@ -2436,8 +2588,6 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
table->status = STATUS_NOT_FOUND; table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
} }
// TOKUDB_DBUG_DUMP("key=", key.data, key.size);
// TOKUDB_DBUG_DUMP("row=", row->data, row->size);
unpack_row(buf, &current_row, &key); unpack_row(buf, &current_row, &key);
} }
else { else {
...@@ -2471,118 +2621,24 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou ...@@ -2471,118 +2621,24 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
// //
int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint key_len, enum ha_rkey_function find_flag) { int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read_idx"); TOKUDB_DBUG_ENTER("ha_tokudb::index_read_idx");
int error;
table->in_use->status_var.ha_read_key_count++; table->in_use->status_var.ha_read_key_count++;
current_row.flags = DB_DBT_REALLOC; current_row.flags = DB_DBT_REALLOC;
active_index = MAX_KEY; active_index = MAX_KEY;
TOKUDB_DBUG_RETURN(read_row(share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
}
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas. error = share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0);
/* if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
if (full_key) { error = HA_ERR_KEY_NOT_FOUND;
switch (find_flag) { goto cleanup;
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT }
case (HA_READ_KEY_EXACT): if (!error) {
Just c_get DB_SET, return. error = read_row(buf, keynr, &current_row, &last_key);
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE. If EQUAL to query, then do DB_NEXT (or is it DB_NEXT_NODUP?)
case (HA_READ_KEY_OR_NEXT):
c_get DB_SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
if if WAS equal to the query, then
if (NO_DUP db, just return it) else:
do DB_NEXT_NODUP
if found, do DB_PREV and return
else do DB_LAST and return
case (HA_READ_PREFIX_LAST):
c_get DB_SET. if !found, return NOT FOUND
if (NO_DUP db, just return it) else:
do c_get DB_NEXT_NODUP
if found, do DB_PREV and return.
else do DB_LAST and return.
default: Crash a lot.
}
else {
// Not full key
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
c_get DB_SET_RANGE, then check a prefix
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE, then:
while (found && query is prefix of 'dbtfound') do:
c_get DB_NEXT_NODUP (Definitely NEXT_NODUP since we care about key only).
case (HA_READ_KEY_OR_NEXT):
c_get SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If query not a prefix of found, then DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE, then:
if (found && query is prefix of whatever found) do:
c_get DB_NEXT till not prefix (and return the one that was)
if (found originally but was not prefix of whatever found) do:
c_get DB_PREV
case (HA_READ_PREFIX_LAST):
c_get DB_SET_RANGE. if !found, or query not prefix of what found, return NOT FOUND
whlie query is prefix of whatfound, do c_get DB_NEXT till not.. then return the last one that was.
default: Crash a lot.
} }
}
Note that sometimes if not found, will need things like DB_FIRST or DB_LAST
TODO: QQQ maybe need to pass true/1 as last parameter of read_row (this would make it
return END_OF_FILE instead of just NOT_FOUND
*/
//
// This struct is used to copy results from the DB search into local memory.
//
typedef struct dbt_copy_info {
DBT *key;
DBT *val;
int error;
} *DBT_COPY_INFO;
//
// Verify which are supposed to be prefix and not prefix.
//
// Copies the contents of key and val, the returned values from the search in
// the DB, into a DBT_COPY_INFO.
// Parameters:
// [in] key - returned key object from the search in the DB
// [in] val - returned value object from the search in the DB
// [out] extra_f - context information that was passed into the search
// in the appropriate cursor call
// r_h - value returned by heaviside function
//
//
//
//
static void dbt_copy_heavi(DBT const *key, DBT const *val, void *extra_f, int r_h) {
DBT_COPY_INFO info = (DBT_COPY_INFO)extra_f;
int r;
info->key->size = key->size;
info->key->data = malloc(key->size);
if (!info->key->data) { r = errno; goto cleanup; }
info->key->flags = DB_DBT_REALLOC;
if (key->size) memcpy(info->key->data, key->data, key->size);
info->val->size = val->size;
info->val->data = malloc(val->size);
if (!info->val->data) { r = errno; goto cleanup; }
info->val->flags = DB_DBT_REALLOC;
if (val->size) memcpy(info->val->data, val->data, val->size);
r = 0;
cleanup: cleanup:
info->error = r; TOKUDB_DBUG_RETURN(error);
} }
// //
// context information for the heaviside functions. // context information for the heaviside functions.
// Context information includes data necessary // Context information includes data necessary
...@@ -2703,17 +2759,20 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -2703,17 +2759,20 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
// TOKUDB_DBUG_DUMP("key=", key, key_len); // TOKUDB_DBUG_DUMP("key=", key, key_len);
DBT row; DBT row;
int error; int error;
struct smart_dbt_info info;
struct heavi_info heavi_info;
bool do_read_row = true;
CHECK_VALID_CURSOR(); HANDLE_INVALID_CURSOR();
table->in_use->status_var.ha_read_key_count++; table->in_use->status_var.ha_read_key_count++;
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
pack_key(&last_key, active_index, key_buff, key, key_len); pack_key(&last_key, active_index, key_buff, key, key_len);
struct dbt_copy_info copy_info; //Needed as part of the smart dbt. info.ha = this;
struct heavi_info heavi_info; //Needed for the heaviside function. info.buf = buf;
copy_info.key = &last_key; info.keynr = active_index;
copy_info.val = &row;
heavi_info.db = share->key_file[active_index]; heavi_info.db = share->key_file[active_index];
heavi_info.key = &last_key; heavi_info.key = &last_key;
switch (find_flag) { switch (find_flag) {
...@@ -2727,14 +2786,22 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -2727,14 +2786,22 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
} }
break; break;
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info, error = cursor->c_getf_heavi(
after_key_heavi, &heavi_info, 1); cursor, 0,
if (error==0 && copy_info.error!=0) error = copy_info.error; key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
after_key_heavi, &heavi_info,
1
);
do_read_row = false;
break; break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info, error = cursor->c_getf_heavi(
before_key_heavi, &heavi_info, -1); cursor, 0,
if (error==0 && copy_info.error!=0) error = copy_info.error; key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
before_key_heavi, &heavi_info,
-1
);
do_read_row = false;
break; break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */ case HA_READ_KEY_OR_NEXT: /* Record or next record */
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
...@@ -2751,65 +2818,35 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -2751,65 +2818,35 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
error = cursor->c_get(cursor, &last_key, &row, DB_LAST); error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
break; break;
case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */ case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info, error = cursor->c_getf_heavi(
prefix_last_or_prev_heavi, &heavi_info, -1); cursor, 0,
if (error==0 && copy_info.error!=0) error = copy_info.error; key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
prefix_last_or_prev_heavi, &heavi_info,
-1
);
do_read_row = false;
break; break;
default: default:
TOKUDB_TRACE("unsupported:%d\n", find_flag); TOKUDB_TRACE("unsupported:%d\n", find_flag);
error = HA_ERR_UNSUPPORTED; error = HA_ERR_UNSUPPORTED;
break; break;
} }
error = read_row(error, buf, active_index, &row, &last_key, 0); error = handle_cursor_error(error,HA_ERR_KEY_NOT_FOUND,active_index);
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) if (!error && do_read_row) {
TOKUDB_TRACE("error:%d:%d\n", error, find_flag); error = read_row(buf, active_index, &row, &last_key);
cleanup:
//
// Using dbt_copy_heavi (used with c_getf_heavi) will set
// flags==DB_DBT_REALLOC.
// Otherwise, flags will be 0 (which means the DB does memory cleanup).
// We need to clean up our own memory with heaviside functions, since they
// use smart dbts.
//
if (last_key.flags==DB_DBT_REALLOC && last_key.data) {
free(last_key.data);
bzero((void *) &last_key, sizeof(last_key));
} }
if (row.flags==DB_DBT_REALLOC && row.data) { else if (!error && !key_read && active_index != primary_key) {
free(row.data); error = read_full_row(buf);
bzero((void *) &row, sizeof(row));
} }
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) {
TOKUDB_TRACE("error:%d:%d\n", error, find_flag);
}
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
#if 0
/*
Read last key is solved by reading the next key and then reading
the previous key
*/
int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read_last");
DBT row;
int error;
KEY *key_info = &table->key_info[active_index];
statistic_increment(table->in_use->status_var.ha_read_key_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
/* read of partial key */
pack_key(&last_key, active_index, key_buff, key, key_len);
/* Store for compare */
memcpy(key_buff2, key_buff, (key_len = last_key.size));
assert(0);
key_info->handler.bdb_return_if_eq = 1;
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE), buf, active_index, &row, (DBT *) 0, 0);
key_info->handler.bdb_return_if_eq = 0;
bzero((void *) &row, sizeof(row));
if (read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV), buf, active_index, &row, &last_key, 1) || tokudb_key_cmp(table, key_info, key_buff2, key_len))
error = HA_ERR_KEY_NOT_FOUND;
TOKUDB_DBUG_RETURN(error);
}
#endif
// //
// Reads the next row from the active index (cursor) into buf, and advances cursor // Reads the next row from the active index (cursor) into buf, and advances cursor
...@@ -2823,13 +2860,23 @@ int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) { ...@@ -2823,13 +2860,23 @@ int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) {
int ha_tokudb::index_next(uchar * buf) { int ha_tokudb::index_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error; int error;
DBT row; struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(DB_NEXT); u_int32_t flags = SET_READ_FLAG(0);
CHECK_VALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1); info.ha = this;
info.buf = buf;
info.keynr = active_index;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info), HA_ERR_END_OF_FILE,active_index);
//
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
error = read_full_row(buf);
}
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2847,23 +2894,30 @@ cleanup: ...@@ -2847,23 +2894,30 @@ cleanup:
// //
int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next_same %p", this); TOKUDB_DBUG_ENTER("ha_tokudb::index_next_same %p", this);
DBT row;
int error; int error;
CHECK_VALID_CURSOR(); struct smart_dbt_info info;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row)); info.ha = this;
info.buf = buf;
info.keynr = active_index;
/* QQQ NEXT_DUP on nodup returns EINVAL for tokudb */ /* QQQ NEXT_DUP on nodup returns EINVAL for tokudb */
if (keylen == table->key_info[active_index].key_length && if (keylen == table->key_info[active_index].key_length &&
!(table->key_info[active_index].flags & HA_NOSAME) && !(table->key_info[active_index].flags & HA_NOSAME) &&
!(table->key_info[active_index].flags & HA_END_SPACE_KEY)) { !(table->key_info[active_index].flags & HA_END_SPACE_KEY)) {
u_int32_t flags = SET_READ_FLAG(DB_NEXT_DUP); u_int32_t flags = SET_READ_FLAG(0);
error = cursor->c_get(cursor, &last_key, &row, flags); error = handle_cursor_error(cursor->c_getf_next_dup(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
error = read_row(error, buf, active_index, &row, &last_key, 1); if (!error && !key_read && active_index != primary_key) {
error = read_full_row(buf);
}
} else { } else {
u_int32_t flags = SET_READ_FLAG(DB_NEXT); u_int32_t flags = SET_READ_FLAG(0);
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1); error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
if (!error && !key_read && active_index != primary_key) {
error = read_full_row(buf);
}
if (!error &&::key_cmp_if_same(table, key, active_index, keylen)) if (!error &&::key_cmp_if_same(table, key, active_index, keylen))
error = HA_ERR_END_OF_FILE; error = HA_ERR_END_OF_FILE;
} }
...@@ -2881,14 +2935,26 @@ cleanup: ...@@ -2881,14 +2935,26 @@ cleanup:
// error otherwise // error otherwise
// //
int ha_tokudb::index_prev(uchar * buf) { int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_prev"); TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error; int error;
u_int32_t flags = SET_READ_FLAG(DB_PREV); struct smart_dbt_info info;
CHECK_VALID_CURSOR(); u_int32_t flags = SET_READ_FLAG(0);
DBT row; HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_prev_count, &LOCK_status);
bzero((void *) &row, sizeof(row)); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
info.ha = this;
info.buf = buf;
info.keynr = active_index;
error = handle_cursor_error(cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
//
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
error = read_full_row(buf);
}
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2906,10 +2972,15 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -2906,10 +2972,15 @@ int ha_tokudb::index_first(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_first"); TOKUDB_DBUG_ENTER("ha_tokudb::index_first");
int error; int error;
DBT row; DBT row;
CHECK_VALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status);
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_FIRST), buf, active_index, &row, &last_key, 1);
error = handle_cursor_error(cursor->c_get(cursor, &last_key, &row, DB_FIRST),HA_ERR_END_OF_FILE,active_index);
if (!error) {
error = read_row(buf, active_index, &row, &last_key);
}
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2927,10 +2998,14 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -2927,10 +2998,14 @@ int ha_tokudb::index_last(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_last"); TOKUDB_DBUG_ENTER("ha_tokudb::index_last");
int error; int error;
DBT row; DBT row;
CHECK_VALID_CURSOR(); HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status);
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_LAST), buf, active_index, &row, &last_key, 1);
error = handle_cursor_error(cursor->c_get(cursor, &last_key, &row, DB_LAST),HA_ERR_END_OF_FILE,active_index);
if (!error) {
error = read_row(buf, active_index, &row, &last_key);
}
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2977,6 +3052,7 @@ int ha_tokudb::rnd_end() { ...@@ -2977,6 +3052,7 @@ int ha_tokudb::rnd_end() {
TOKUDB_DBUG_RETURN(index_end()); TOKUDB_DBUG_RETURN(index_end());
} }
// //
// Read the next row in a table scan // Read the next row in a table scan
// Parameters: // Parameters:
...@@ -2989,18 +3065,21 @@ int ha_tokudb::rnd_end() { ...@@ -2989,18 +3065,21 @@ int ha_tokudb::rnd_end() {
int ha_tokudb::rnd_next(uchar * buf) { int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error; int error;
DBT row; u_int32_t flags = SET_READ_FLAG(0);
u_int32_t flags = SET_READ_FLAG(DB_NEXT); struct smart_dbt_info info;
CHECK_VALID_CURSOR() HANDLE_INVALID_CURSOR();
// //
// The reason we do not just call index_next is that index_next // The reason we do not just call index_next is that index_next
// increments a different variable than we do here // increments a different variable than we do here
// //
statistic_increment(table->in_use->status_var.ha_read_rnd_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
DBUG_DUMP("last_key", (uchar *) last_key.data, last_key.size); info.ha = this;
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, primary_key, &row, &last_key, 1); info.buf = buf;
info.keynr = primary_key;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key);
cleanup: cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3036,10 +3115,21 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) { ...@@ -3036,10 +3115,21 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
TOKUDB_DBUG_ENTER("ha_tokudb::rnd_pos"); TOKUDB_DBUG_ENTER("ha_tokudb::rnd_pos");
DBT db_pos; DBT db_pos;
int error;
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
active_index = MAX_KEY; active_index = MAX_KEY;
DBT* key = get_pos(&db_pos, pos); DBT* key = get_pos(&db_pos, pos);
TOKUDB_DBUG_RETURN(read_row(share->file->get(share->file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0)); error = share->file->get(share->file, transaction, key, &current_row, 0);
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = HA_ERR_KEY_NOT_FOUND;
goto cleanup;
}
if (!error) {
error = read_row(buf, primary_key, &current_row, key);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
} }
...@@ -4035,9 +4125,10 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4035,9 +4125,10 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
DBC* tmp_cursor = NULL; DBC* tmp_cursor = NULL;
int cursor_ret_val = 0; int cursor_ret_val = 0;
DBT current_primary_key; DBT current_primary_key;
DBT row;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
uchar tmp_key_buff[2*table_arg->s->rec_buff_length]; uchar tmp_key_buff[2*table_arg->s->rec_buff_length];
uchar tmp_prim_key_buff[2*table_arg->s->rec_buff_length];
// //
// number of DB files we have open currently, before add_index is executed // number of DB files we have open currently, before add_index is executed
// //
...@@ -4054,8 +4145,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4054,8 +4145,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// //
uchar tmp_record[table_arg->s->rec_buff_length]; uchar tmp_record[table_arg->s->rec_buff_length];
bzero((void *) &current_primary_key, sizeof(current_primary_key)); bzero((void *) &current_primary_key, sizeof(current_primary_key));
bzero((void *) &row, sizeof(row)); current_primary_key.data = tmp_prim_key_buff;
// //
// The files for secondary tables are derived from the name of keys // The files for secondary tables are derived from the name of keys
...@@ -4150,13 +4240,19 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4150,13 +4240,19 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// for each element in the primary table, insert the proper key value pair in each secondary table // for each element in the primary table, insert the proper key value pair in each secondary table
// that is created // that is created
// //
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT | DB_PRELOCKED); struct smart_dbt_ai_info info;
info.ha = this;
info.prim_key = &current_primary_key;
info.buf = tmp_record;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
while (cursor_ret_val != DB_NOTFOUND) { while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) { if (cursor_ret_val) {
error = cursor_ret_val; error = cursor_ret_val;
goto cleanup; goto cleanup;
} }
unpack_row(tmp_record, &row, &current_primary_key);
for (uint i = 0; i < num_of_keys; i++) { for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key; DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record); create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record);
...@@ -4183,7 +4279,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4183,7 +4279,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup; goto cleanup;
} }
} }
<<<<<<< .mine
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
=======
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT | DB_PRELOCKED); cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT | DB_PRELOCKED);
>>>>>>> .r5430
} }
error = txn->commit(txn, 0); error = txn->commit(txn, 0);
assert(error == 0); assert(error == 0);
......
...@@ -160,8 +160,7 @@ private: ...@@ -160,8 +160,7 @@ private:
ulong max_row_length(const uchar * buf); ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record); int pack_row(DBT * row, const uchar * record);
void unpack_row(uchar * record, DBT * row, DBT* key); void unpack_key(uchar * record, DBT const *key, uint index);
void unpack_key(uchar * record, DBT * key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH); DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH); DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
...@@ -170,7 +169,7 @@ private: ...@@ -170,7 +169,7 @@ private:
int restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary_key, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key); int restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary_key, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row); int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore); int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * key, bool); int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type); int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
...@@ -306,8 +305,14 @@ public: ...@@ -306,8 +305,14 @@ public:
// delete all rows from the table // delete all rows from the table
// effect: all dictionaries, including the main and indexes, should be empty // effect: all dictionaries, including the main and indexes, should be empty
int delete_all_rows(); int delete_all_rows();
void extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key);
void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void unpack_row(uchar * record, DBT const *row, DBT const *key);
private: private:
int read_full_row(uchar * buf);
int __close(int mutex_is_locked); int __close(int mutex_is_locked);
int read_last(); int read_last();
ulong field_offset(Field *); ulong field_offset(Field *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment