Commit ab2db27e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1705

create separate buffer for blobs to be unpacked in, because MySQL only
stores a pointer to the data in its buffer

git-svn-id: file:///svn/mysql/tokudb-engine/src@11445 c7de825b-a66e-492c-adef-691d508d4ae1
parent 77911527
...@@ -311,8 +311,8 @@ typedef struct smart_dbt_ai_info { ...@@ -311,8 +311,8 @@ typedef struct smart_dbt_ai_info {
} *SMART_DBT_AI_INFO; } *SMART_DBT_AI_INFO;
static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) { static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
int error = 0;
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context; SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
info->ha->unpack_row(info->buf,row,key, info->ha->primary_key);
// //
// copy the key to prim_key // copy the key to prim_key
// This will be used as the data value for elements in the secondary index // This will be used as the data value for elements in the secondary index
...@@ -326,7 +326,8 @@ static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) ...@@ -326,7 +326,8 @@ static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context)
// create_dbt_key_from_key uses to create the key in a clustering index // create_dbt_key_from_key uses to create the key in a clustering index
// //
info->ha->extract_hidden_primary_key(info->pk_index,row,key); info->ha->extract_hidden_primary_key(info->pk_index,row,key);
return 0; error = info->ha->unpack_row(info->buf,row,key, info->ha->primary_key);
return error;
} }
// //
...@@ -357,10 +358,11 @@ smart_dbt_callback_keyread(DBT const *key, DBT const *row, void *context) { ...@@ -357,10 +358,11 @@ smart_dbt_callback_keyread(DBT const *key, DBT const *row, void *context) {
// //
static int static int
smart_dbt_callback_rowread(DBT const *key, DBT const *row, void *context) { smart_dbt_callback_rowread(DBT const *key, DBT const *row, void *context) {
int error = 0;
SMART_DBT_INFO info = (SMART_DBT_INFO)context; SMART_DBT_INFO info = (SMART_DBT_INFO)context;
info->ha->extract_hidden_primary_key(info->keynr, row, key); info->ha->extract_hidden_primary_key(info->keynr, row, key);
info->ha->read_primary_key(info->buf,info->keynr,row,key); error = info->ha->read_primary_key(info->buf,info->keynr,row,key);
return 0; return error;
} }
// //
...@@ -1507,24 +1509,42 @@ int ha_tokudb::pack_row( ...@@ -1507,24 +1509,42 @@ int ha_tokudb::pack_row(
} }
void ha_tokudb::unpack_blobs( int ha_tokudb::unpack_blobs(
uchar* record, uchar* record,
const uchar* from_tokudb_blob, const uchar* from_tokudb_blob,
u_int32_t num_blob_bytes u_int32_t num_bytes
) )
{ {
uint error = 0;
uchar* ptr = NULL;
const uchar* buff = NULL;
// //
// now read blobs // assert that num_bytes > 0 iff share->num_blobs > 0
// //
const uchar* ptr = from_tokudb_blob; assert( !((share->num_blobs == 0) && (num_bytes > 0)) );
if (num_bytes > num_blob_bytes) {
ptr = (uchar *)my_realloc((void *)blob_buff, num_bytes, MYF(MY_ALLOW_ZERO_PTR));
if (ptr == NULL) {
error = ENOMEM;
goto exit;
}
blob_buff = ptr;
num_blob_bytes = num_bytes;
}
memcpy(blob_buff, from_tokudb_blob, num_bytes);
buff= blob_buff;
for (uint i = 0; i < share->num_blobs; i++) { for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]]; Field* field = table->field[share->blob_fields[i]];
ptr = field->unpack( buff = field->unpack(
record + field_offset(field, table), record + field_offset(field, table),
ptr buff
); );
} }
error = 0;
exit:
return error;
} }
...@@ -1534,7 +1554,7 @@ void ha_tokudb::unpack_blobs( ...@@ -1534,7 +1554,7 @@ void ha_tokudb::unpack_blobs(
// [out] record - row in MySQL format // [out] record - row in MySQL format
// [in] row - row stored in DBT to be converted // [in] row - row stored in DBT to be converted
// //
void ha_tokudb::unpack_row( int ha_tokudb::unpack_row(
uchar* record, uchar* record,
DBT const *row, DBT const *row,
DBT const *key, DBT const *key,
...@@ -1546,6 +1566,7 @@ void ha_tokudb::unpack_row( ...@@ -1546,6 +1566,7 @@ void ha_tokudb::unpack_row(
// fixed length row is first below // fixed length row is first below
// //
/* Copy null bits */ /* Copy null bits */
int error = 0;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
const uchar* fixed_field_ptr = (const uchar *) row->data; const uchar* fixed_field_ptr = (const uchar *) row->data;
const uchar* var_field_offset_ptr = NULL; const uchar* var_field_offset_ptr = NULL;
...@@ -1614,11 +1635,14 @@ void ha_tokudb::unpack_row( ...@@ -1614,11 +1635,14 @@ void ha_tokudb::unpack_row(
last_offset = data_end_offset; last_offset = data_end_offset;
} }
} }
unpack_blobs( error = unpack_blobs(
record, record,
var_field_data_ptr, var_field_data_ptr,
row->size - (u_int32_t)(var_field_data_ptr - (const uchar *)row->data) row->size - (u_int32_t)(var_field_data_ptr - (const uchar *)row->data)
); );
if (error) {
goto exit;
}
} }
// //
// in this case, we unpack only what is specified // in this case, we unpack only what is specified
...@@ -1710,15 +1734,20 @@ void ha_tokudb::unpack_row( ...@@ -1710,15 +1734,20 @@ void ha_tokudb::unpack_row(
var_field_data_ptr += data_end_offset; var_field_data_ptr += data_end_offset;
} }
unpack_blobs( error = unpack_blobs(
record, record,
var_field_data_ptr, var_field_data_ptr,
row->size - (u_int32_t)(var_field_data_ptr - (const uchar *)row->data) row->size - (u_int32_t)(var_field_data_ptr - (const uchar *)row->data)
); );
if (error) {
goto exit;
} }
} }
}
error = 0;
exit:
dbug_tmp_restore_column_map(table->write_set, old_map); dbug_tmp_restore_column_map(table->write_set, old_map);
return error;
} }
u_int32_t ha_tokudb::place_key_into_mysql_buff( u_int32_t ha_tokudb::place_key_into_mysql_buff(
...@@ -2950,8 +2979,9 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const ...@@ -2950,8 +2979,9 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const
// [in] row - the row that has been read from the preceding DB call // [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row // [in] found_key - key used to retrieve the row
// //
void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) { int ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key"); TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key");
int error = 0;
table->status = 0; table->status = 0;
// //
// case where we read from secondary table that is not clustered // case where we read from secondary table that is not clustered
...@@ -2969,10 +2999,13 @@ void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT co ...@@ -2969,10 +2999,13 @@ void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT co
// else read from clustered/primary key // else read from clustered/primary key
// //
else { else {
unpack_row(buf, row, found_key, keynr); error = unpack_row(buf, row, found_key, keynr);
if (error) { goto exit; }
} }
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); } if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
DBUG_VOID_RETURN; error = 0;
exit:
TOKUDB_DBUG_RETURN(error);
} }
// //
...@@ -2994,9 +3027,9 @@ int ha_tokudb::read_full_row(uchar * buf) { ...@@ -2994,9 +3027,9 @@ int ha_tokudb::read_full_row(uchar * buf) {
table->status = STATUS_NOT_FOUND; table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
} }
unpack_row(buf, &current_row, &last_key, primary_key); error = unpack_row(buf, &current_row, &last_key, primary_key);
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3035,14 +3068,15 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun ...@@ -3035,14 +3068,15 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
// TOKUDB_DBUG_DUMP("row=", row->data, row->size); // TOKUDB_DBUG_DUMP("row=", row->data, row->size);
unpack_key(buf, row, primary_key); unpack_key(buf, row, primary_key);
} }
TOKUDB_DBUG_RETURN(0); error = 0;
goto exit;
} }
// //
// in this case we have a clustered key, so no need to do pt query // in this case we have a clustered key, so no need to do pt query
// //
if (table->key_info[keynr].flags & HA_CLUSTERING) { if (table->key_info[keynr].flags & HA_CLUSTERING) {
unpack_row(buf, row, found_key, keynr); error = unpack_row(buf, row, found_key, keynr);
TOKUDB_DBUG_RETURN(0); goto exit;
} }
// //
// create a DBT that has the same data as row, // create a DBT that has the same data as row,
...@@ -3058,9 +3092,11 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun ...@@ -3058,9 +3092,11 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
current_row.flags = DB_DBT_REALLOC; current_row.flags = DB_DBT_REALLOC;
if ((error = share->file->get(share->file, transaction, &key, &current_row, 0))) { if ((error = share->file->get(share->file, transaction, &key, &current_row, 0))) {
table->status = STATUS_NOT_FOUND; table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error); error = (error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
goto exit;
} }
unpack_row(buf, &current_row, &key, primary_key); error = unpack_row(buf, &current_row, &key, primary_key);
if (error) { goto exit; }
} }
else { else {
// //
...@@ -3070,11 +3106,15 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun ...@@ -3070,11 +3106,15 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
unpack_key(buf, found_key, keynr); unpack_key(buf, found_key, keynr);
} }
else { else {
unpack_row(buf, row, found_key, primary_key); error = unpack_row(buf, row, found_key, primary_key);
if (error) { goto exit; }
} }
} }
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); } if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
TOKUDB_DBUG_RETURN(0); error = 0;
exit:
TOKUDB_DBUG_RETURN(error);
} }
// //
......
...@@ -399,14 +399,14 @@ class ha_tokudb : public handler { ...@@ -399,14 +399,14 @@ class ha_tokudb : public handler {
int delete_all_rows(); int delete_all_rows();
void extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key); void extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key);
void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); int read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key); int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void unpack_blobs( int unpack_blobs(
uchar* record, uchar* record,
const uchar* from_tokudb_blob, const uchar* from_tokudb_blob,
u_int32_t num_blob_bytes u_int32_t num_blob_bytes
); );
void unpack_row( int unpack_row(
uchar* record, uchar* record,
DBT const *row, DBT const *row,
DBT const *key, DBT const *key,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment