Commit fdcda414 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5886 merge blob updates to mainline

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@52092 c7de825b-a66e-492c-adef-691d508d4ae1
parent ba8070f8
......@@ -140,6 +140,17 @@ static uint32_t var_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint id
return v_index;
}
static uint32_t blob_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint idx, uint field_num) {
assert(field_num < table->s->fields);
uint b_index;
for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
if (kc_info->blob_fields[b_index] == field_num)
break;
}
assert(b_index < kc_info->num_blobs);
return b_index;
}
// Determine if an update operation can be offloaded to the storage engine.
// The update operation consists of a list of update expressions (fields[i] = values[i]), and a list
// of where conditions (conds). The function returns 0 if the update is handled in the storage engine.
......@@ -163,7 +174,7 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
line = __LINE__;
goto return_error;
}
if (!check_fast_update(thd, update_fields, update_values, conds)) {
error = ENOTSUP;
line = __LINE__;
......@@ -303,6 +314,7 @@ static bool check_simple_update_expression(Item *lhs_item, Item *rhs_item, TABLE
return true;
break;
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_BLOB:
if (rhs_type == Item::STRING_ITEM)
return true;
break;
......@@ -478,14 +490,24 @@ bool ha_tokudb::check_fast_update(THD *thd, List<Item> &fields, List<Item> &valu
return true;
}
// Marshall a simple row descriptor to a buffer.
static void marshall_simple_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO &kc_info, uint key_num) {
tokudb::simple_row_descriptor sd;
sd.m_fixed_field_offset = table->s->null_bytes;
sd.m_var_field_offset = sd.m_fixed_field_offset + kc_info.mcp_info[key_num].fixed_field_size;
sd.m_var_offset_bytes = kc_info.mcp_info[key_num].len_of_offsets; // total length of the var offsets
sd.m_bytes_per_offset = sd.m_var_offset_bytes == 0 ? 0 : kc_info.num_offset_bytes; // bytes per var offset
sd.append(b);
static void marshall_varchar_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info, uint key_num) {
b.append_uint32('v');
b.append_uint32(table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
b.append_uint32(var_offset_bytes);
b.append_uint32(var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
}
static void marshall_blobs_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info) {
b.append_uint32('b');
uint32_t n = kc_info->num_blobs;
b.append_uint32(n);
for (uint i = 0; i < n; i++) {
uint blob_field_index = kc_info->blob_fields[i];
assert(blob_field_index < table->s->fields);
uint8_t blob_field_length = table->s->field[blob_field_index]->row_pack_length();
b.append(&blob_field_length, sizeof blob_field_length);
}
}
static inline uint32_t get_null_bit_position(uint32_t null_bit);
......@@ -501,7 +523,7 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
uint32_t field_null_num = 0;
if (lhs_field->real_maybe_null()) {
uint32_t field_num = lhs_field->field_index;
field_null_num = (1<<31) + (field_num/8)*8 + get_null_bit_position(lhs_field->null_bit);
field_null_num = ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
}
uint32_t offset;
void *v_ptr = NULL;
......@@ -549,7 +571,6 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
}
break;
}
case MYSQL_TYPE_STRING: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
......@@ -567,7 +588,6 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
v_ptr = v_str.c_ptr();
break;
}
case MYSQL_TYPE_VARCHAR: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
......@@ -581,18 +601,29 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
v_ptr = v_str.c_ptr();
break;
}
case MYSQL_TYPE_BLOB: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
offset = blob_field_index(table, &share->kc_info, table->s->primary_key, lhs_field->field_index);
v_str = *rhs_item->val_str(&v_str);
v_length = v_str.length();
if (v_length >= lhs_field->max_data_length()) {
v_length = lhs_field->max_data_length();
v_str.length(v_length); // truncate
}
v_ptr = v_str.c_ptr();
break;
}
default:
assert(0);
}
// marshall the update fields into the buffer
b.append(&update_operation, sizeof update_operation);
b.append(&field_type, sizeof field_type);
uint32_t unused = 0;
b.append(&unused, sizeof unused);
b.append(&field_null_num, sizeof field_null_num);
b.append(&offset, sizeof offset);
b.append(&v_length, sizeof v_length);
b.append_uint32(update_operation);
b.append_uint32(field_type);
b.append_uint32(field_null_num);
b.append_uint32(offset);
b.append_uint32(v_length);
b.append(v_ptr, v_length);
}
......@@ -612,6 +643,19 @@ static int save_in_field(Item *item, TABLE *table) {
return error;
}
static void count_update_types(Field *lhs_field, uint *num_varchars, uint *num_blobs) {
switch (lhs_field->type()) {
case MYSQL_TYPE_VARCHAR:
*num_varchars += 1;
break;
case MYSQL_TYPE_BLOB:
*num_blobs += 1;
break;
default:
break;
}
}
// Generate an update message for an update operation and send it into the primary tree. Return 0 if successful.
int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update_values, Item *conds, DB_TXN *txn) {
int error;
......@@ -640,16 +684,36 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
// construct the update message
tokudb::buffer update_message;
uchar operation = UPDATE_OP_SIMPLE_UPDATE;
// update_message.append_uint32(UPDATE_OP_UPDATE_2);
uint8_t operation = UPDATE_OP_UPDATE_2;
update_message.append(&operation, sizeof operation);
// append the descriptor
marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);
// append the updates
uint32_t num_updates = update_fields.elements;
update_message.append(&num_updates, sizeof num_updates);
uint num_varchars = 0, num_blobs = 0;
if (1) {
List_iterator<Item> lhs_i(update_fields);
Item *lhs_item;
while ((lhs_item = lhs_i++)) {
if (lhs_item == NULL)
break;
Field *lhs_field = find_field_by_name(table, lhs_item);
assert(lhs_field); // we found it before, so this should work
count_update_types(lhs_field, &num_varchars, &num_blobs);
}
if (num_varchars > 0 || num_blobs > 0)
num_updates++;
if (num_blobs > 0)
num_updates++;
}
// append the updates
update_message.append_uint32(num_updates);
if (num_varchars > 0 || num_blobs > 0)
marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key);
if (num_blobs > 0)
marshall_blobs_descriptor(update_message, table, &share->kc_info);
List_iterator<Item> lhs_i(update_fields);
List_iterator<Item> rhs_i(update_values);
......@@ -779,20 +843,39 @@ int ha_tokudb::send_upsert_message(THD *thd, List<Item> &update_fields, List<Ite
tokudb::buffer update_message;
// append the operation
uchar operation = UPDATE_OP_SIMPLE_UPSERT;
// update_message.append_uint32(UPDATE_OP_UPSERT_2);
uint8_t operation = UPDATE_OP_UPSERT_2;
update_message.append(&operation, sizeof operation);
// append the row
uint32_t row_length = row.size;
update_message.append(&row_length, sizeof row_length);
update_message.append(row.data, row_length);
// append the descriptor
marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);
update_message.append_uint32(row.size);
update_message.append(row.data, row.size);
// append the update expressions
uint32_t num_updates = update_fields.elements;
update_message.append(&num_updates, sizeof num_updates);
uint num_varchars = 0, num_blobs = 0;
if (1) {
List_iterator<Item> lhs_i(update_fields);
Item *lhs_item;
while ((lhs_item = lhs_i++)) {
if (lhs_item == NULL)
break;
Field *lhs_field = find_field_by_name(table, lhs_item);
assert(lhs_field); // we found it before, so this should work
count_update_types(lhs_field, &num_varchars, &num_blobs);
}
if (num_varchars > 0 || num_blobs > 0)
num_updates++;
if (num_blobs > 0)
num_updates++;
}
// append the updates
update_message.append_uint32(num_updates);
if (num_varchars > 0 || num_blobs > 0)
marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key);
if (num_blobs > 0)
marshall_blobs_descriptor(update_message, table, &share->kc_info);
List_iterator<Item> lhs_i(update_fields);
List_iterator<Item> rhs_i(update_values);
......
......@@ -11,8 +11,10 @@ enum {
UPDATE_OP_EXPAND_CHAR = 4,
UPDATE_OP_EXPAND_BINARY = 5,
UPDATE_OP_SIMPLE_UPDATE = 10,
UPDATE_OP_SIMPLE_UPSERT = 11,
UPDATE_OP_UPDATE_1 = 10,
UPDATE_OP_UPSERT_1 = 11,
UPDATE_OP_UPDATE_2 = 12,
UPDATE_OP_UPSERT_2 = 13,
};
// Field types used in the update messages
......@@ -95,44 +97,90 @@ enum {
// new length 4 the new length of the field's value
// pad char 1
// Simple row descriptor:
// fixed field offset 4 offset of the beginning of the fixed fields
// var field offset 4 offset of the variable length offsets
// var_offset_bytes 1 size of each variable length offset
// bytes_per_offset 4 number of bytes per offset
// Update and Upsert version 1
//
// Field descriptor:
// field type 4 see field types above
// unused 4 unused
// field null num 4 bit 31 is 1 if the field is nullible and the remaining bits contain the null bit number
// field offset 4 for fixed fields, this is the offset from begining of the row of the field
// for variable length fields, this is the index of the variable length field in the dictionary
// Simple update operation:
// Operations:
// update operation 4 == { '=', '+', '-' }
// x = k
// x = x + k
// x = x - k
// field descriptor
// field type 4 see field types above
// unused 4 unused
// field null num 4 bit 31 is 1 if the field is nullible and the remaining bits contain the null bit number
// field offset 4 for fixed fields, this is the offset from begining of the row of the field
// value:
// value length 4 == N, length of the value
// value N value to add or subtract
// Simple update message:
// Operation 1 == UPDATE_OP_UPDATE_FIELD
// Simple row descriptor
//
// Update_1 message:
// Operation 1 == UPDATE_OP_UPDATE_1
// fixed field offset 4 offset of the beginning of the fixed fields
// var field offset 4 offset of the variable length offsets
// var_offset_bytes 1 length of offsets (Note: not big enough)
// bytes_per_offset 4 number of bytes per offset
// Number of update ops 4 == N
// Uupdate ops [N]
// Simple upsert message:
// Operation 1 == UPDATE_OP_UPSERT
// Update ops [N]
//
// Upsert_1 message:
// Operation 1 == UPDATE_OP_UPSERT_1
// Insert row:
// length 4 == N
// data N
// Simple row descriptor
// fixed field offset 4 offset of the beginning of the fixed fields
// var field offset 4 offset of the variable length offsets
// var_offset_bytes 1 length of offsets (Note: not big enough)
// bytes_per_offset 4 number of bytes per offset
// Number of update ops 4 == N
// Update ops [N]
// Update version 2
// Operation uint32 == UPDATE_OP_UPDATE_2
// Number of update ops uint32 == N
// Update ops uint8 [ N ]
//
// Upsert version 2
// Operation uint32 == UPDATE_OP_UPSERT_2
// Insert length uint32 == N
// Insert data uint8 [ N ]
// Number of update ops uint32 == N
// Update ops uint8 [ N ]
//
// Variable fields info
// Update operation uint32 == 'v'
// Start offset uint32
// Num varchars uint32
// Bytes per offset uint32
//
// Blobs info
// Update operation uint32 == 'b'
// Num blobs uint32 == N
// Blob lengths uint8 [ N ]
//
// Update operation on fixed length fields
// Update operation uint32 == '=', '+', '-'
// Field type uint32
// Null num uint32 0 => not nullable, otherwise encoded as field_null_num + 1
// Offset uint32
// Value length uint32 == N
// Value uint8 [ N ]
//
// Update operation on varchar fields
// Update operation uint32 == '='
// Field type uint32
// Null num uint32
// Var index uint32
// Value length uint32 == N
// Value uint8 [ N ]
//
// Update operation on blob fields
// Update operation uint32 == '='
// Field type uint32
// Null num uint32
// Blob index uint32
// Value length 4 == N
// Value [ N ]
#include "tokudb_buffer.h"
#include "tokudb_math.h"
......@@ -919,158 +967,255 @@ static int tokudb_expand_char_field(
return error;
}
// Update a fixed field: new_val@offset = extra_val
static void set_fixed_field(uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &new_val, void *extra_val) {
assert(the_offset + length <= new_val.size());
new_val.replace(the_offset, length, extra_val, length);
if (field_null_num)
set_overall_null_position((uchar *) new_val.data(), field_null_num & ~(1<<31), false);
}
namespace tokudb {
class simple_row_descriptor {
public:
simple_row_descriptor() : m_fixed_field_offset(0), m_var_field_offset(0), m_var_offset_bytes(0), m_bytes_per_offset(0) {
}
~simple_row_descriptor() {
}
void consume(tokudb::buffer &b) {
b.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
b.consume(&m_var_field_offset, sizeof m_var_field_offset);
b.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
b.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
}
void append(tokudb::buffer &b) {
b.append(&m_fixed_field_offset, sizeof m_fixed_field_offset);
b.append(&m_var_field_offset, sizeof m_var_field_offset);
b.append(&m_var_offset_bytes, sizeof m_var_offset_bytes);
b.append(&m_bytes_per_offset, sizeof m_bytes_per_offset);
}
public:
uint32_t m_fixed_field_offset;
uint32_t m_var_field_offset;
uint8_t m_var_offset_bytes;
uint32_t m_bytes_per_offset;
};
class var_fields {
public:
var_fields(uint32_t var_offset, uint32_t offset_bytes, uint32_t bytes_per_offset) {
assert(bytes_per_offset == 1 || bytes_per_offset == 2);
var_fields() : m_initialized(false) {
}
void init_var_fields(uint32_t var_offset, uint32_t offset_bytes, uint32_t bytes_per_offset, tokudb::buffer *val_buffer) {
if (m_initialized)
return;
assert(bytes_per_offset == 0 || bytes_per_offset == 1 || bytes_per_offset == 2);
m_var_offset = var_offset;
m_val_offset = m_var_offset + offset_bytes;
m_bytes_per_offset = bytes_per_offset;
m_max_fields = offset_bytes / bytes_per_offset;
if (bytes_per_offset > 0) {
m_num_fields = offset_bytes / bytes_per_offset;
} else {
assert(offset_bytes == 0);
m_num_fields = 0;
}
m_val_buffer = val_buffer;
m_initialized = true;
}
bool is_initialized() {
return m_initialized;
}
uint32_t value_offset(uint32_t var_index, void *base);
uint32_t value_length(uint32_t var_index, void *base);
void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s, void *base);
uint32_t value_offset(uint32_t var_index);
uint32_t value_length(uint32_t var_index);
void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s);
uint32_t end_offset();
void replace(uint32_t var_index, void *new_val_ptr, uint32_t new_val_length);
private:
uint32_t read_offset(uint32_t var_index, void *base);
void write_offset(uint32_t var_index, uint32_t v, void *base);
uint32_t read_offset(uint32_t var_index);
void write_offset(uint32_t var_index, uint32_t v);
private:
uint32_t m_var_offset;
uint32_t m_val_offset;
uint32_t m_bytes_per_offset;
uint32_t m_max_fields;
uint32_t m_num_fields;
tokudb::buffer *m_val_buffer;
bool m_initialized;
};
// Return the ith variable length offset
uint32_t var_fields::read_offset(uint32_t var_index, void *base) {
if (m_bytes_per_offset == 1) {
uint8_t offset;
memcpy(&offset, (char *)base + m_var_offset + var_index * m_bytes_per_offset, sizeof offset);
return offset;
} else {
uint16_t offset;
memcpy(&offset, (char *)base + m_var_offset + var_index * m_bytes_per_offset, sizeof offset);
return offset;
}
uint32_t var_fields::read_offset(uint32_t var_index) {
uint32_t offset = 0;
m_val_buffer->read(&offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset);
return offset;
}
// Write the ith variable length offset with a new offset.
void var_fields::write_offset(uint32_t var_index, uint32_t new_offset, void *base) {
if (m_bytes_per_offset == 1) {
assert(new_offset < (1<<8));
uint8_t offset = new_offset;
memcpy((char *)base + m_var_offset + var_index * m_bytes_per_offset, &offset, sizeof offset);
} else {
assert(new_offset < (1<<16));
uint16_t offset = new_offset;
memcpy((char *)base + m_var_offset + var_index * m_bytes_per_offset, &offset, sizeof offset);
}
void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) {
m_val_buffer->write(&new_offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset);
}
// Return the offset of the ith variable length field
uint32_t var_fields::value_offset(uint32_t var_index, void *base) {
assert(var_index < m_max_fields);
uint32_t var_fields::value_offset(uint32_t var_index) {
assert(var_index < m_num_fields);
if (var_index == 0)
return m_val_offset;
else
return m_val_offset + read_offset(var_index-1, base);
return m_val_offset + read_offset(var_index-1);
}
// Return the length of the ith variable length field
uint32_t var_fields::value_length(uint32_t var_index, void *base) {
assert(var_index < m_max_fields);
uint32_t var_fields::value_length(uint32_t var_index) {
assert(var_index < m_num_fields);
if (var_index == 0)
return read_offset(0, base);
return read_offset(0);
else
return read_offset(var_index, base) - read_offset(var_index-1, base);
return read_offset(var_index) - read_offset(var_index-1);
}
// The length of the ith variable length fields changed. Update all of the subsequent offsets.
void var_fields::update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s, void *base) {
assert(var_index < m_max_fields);
void var_fields::update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s) {
assert(var_index < m_num_fields);
if (old_s == new_s)
return;
for (uint i = var_index; i < m_max_fields; i++) {
uint32_t v = read_offset(i, base);
for (uint i = var_index; i < m_num_fields; i++) {
uint32_t v = read_offset(i);
if (new_s > old_s)
write_offset(i, v + (new_s - old_s), base);
write_offset(i, v + (new_s - old_s));
else
write_offset(i, v - (old_s - new_s), base);
write_offset(i, v - (old_s - new_s));
}
}
uint32_t var_fields::end_offset() {
if (m_num_fields == 0)
return m_val_offset;
else
return m_val_offset + read_offset(m_num_fields-1);
}
// Update a variable length field: new_val[i] = extra_val, where i is the ith variable length field.
// Compute the offset from the var index
// Replace the var value with the extra val
// Update the var offsets
// Reset the null bit
static void set_var_field(uint32_t var_index, uint32_t length, uint32_t field_null_num,
tokudb::buffer &new_val, void *extra_val, const tokudb::simple_row_descriptor &sd) {
tokudb::var_fields var_fields(sd.m_var_field_offset, sd.m_var_offset_bytes, sd.m_bytes_per_offset);
void var_fields::replace(uint32_t var_index, void *new_val_ptr, uint32_t new_val_length) {
assert(m_initialized);
// replace the new val with the extra val
uint32_t the_offset = var_fields.value_offset(var_index, new_val.data());
uint32_t old_s = var_fields.value_length(var_index, new_val.data());
uint32_t new_s = length;
new_val.replace(the_offset, old_s, extra_val, new_s);
uint32_t the_offset = value_offset(var_index);
uint32_t old_s = value_length(var_index);
uint32_t new_s = new_val_length;
m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s);
// update the var offsets
var_fields.update_offsets(var_index, old_s, new_s, new_val.data());
update_offsets(var_index, old_s, new_s);
}
class blob_fields {
public:
blob_fields() : m_initialized(false) {
}
bool is_initialized() {
return m_initialized;
}
void init_blob_fields(uint32_t num_blobs, uint8_t *blob_lengths, tokudb::buffer *val_buffer) {
if (m_initialized)
return;
m_num_blobs = num_blobs; m_blob_lengths = blob_lengths; m_val_buffer = val_buffer;
m_initialized = true;
}
void start_blobs(uint32_t offset) {
m_blob_offset = offset;
}
void replace(uint32_t blob_index, uint32_t length, void *p);
private:
uint32_t read_length(uint32_t offset, size_t size);
void write_length(uint32_t offset, size_t size, uint32_t new_length);
uint32_t blob_offset(uint32_t blob_index);
private:
uint32_t m_blob_offset;
uint32_t m_num_blobs;
uint8_t *m_blob_lengths;
tokudb::buffer *m_val_buffer;
bool m_initialized;
};
uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) {
uint32_t length = 0;
m_val_buffer->read(&length, blob_length, offset);
return length;
}
void blob_fields::write_length(uint32_t offset, size_t size, uint32_t new_length) {
m_val_buffer->write(&new_length, size, offset);
}
uint32_t blob_fields::blob_offset(uint32_t blob_index) {
assert(blob_index < m_num_blobs);
uint32_t offset = m_blob_offset;
for (uint i = 0; i < blob_index; i++) {
uint32_t blob_length = m_blob_lengths[i];
uint32_t length = read_length(offset, blob_length);
offset += blob_length + length;
}
return offset;
}
void blob_fields::replace(uint32_t blob_index, uint32_t new_length, void *new_value) {
assert(m_initialized);
assert(blob_index < m_num_blobs);
// compute the ith blob offset
uint32_t offset = blob_offset(blob_index);
uint8_t blob_length = m_blob_lengths[blob_index];
// read the old length
uint32_t old_length = read_length(offset, blob_length);
// reset null bit
if (field_null_num)
set_overall_null_position((uchar *) new_val.data(), field_null_num & ~(1<<31), false);
// replace the data
m_val_buffer->replace(offset + blob_length, old_length, new_value, new_length);
// write the new length
write_length(offset, blob_length, new_length);
}
class value_map {
public:
value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) {
}
void init_var_fields(uint32_t var_offset, uint32_t offset_bytes, uint32_t bytes_per_offset) {
m_var_fields.init_var_fields(var_offset, offset_bytes, bytes_per_offset, m_val_buffer);
}
void init_blob_fields(uint32_t num_blobs, uint8_t *blob_lengths) {
m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer);
}
// Replace the value of a fixed length field
void replace_fixed(uint32_t the_offset, uint32_t field_null_num, void *new_val_ptr, uint32_t new_val_length) {
m_val_buffer->replace(the_offset, new_val_length, new_val_ptr, new_val_length);
maybe_clear_null(field_null_num);
}
// Replace the value of a variable length field
void replace_varchar(uint32_t var_index, uint32_t field_null_num, void *new_val_ptr, uint32_t new_val_length) {
m_var_fields.replace(var_index, new_val_ptr, new_val_length);
maybe_clear_null(field_null_num);
}
// Replace the value of a blob field
void replace_blob(uint32_t blob_index, uint32_t field_null_num, void *new_val_ptr, uint32_t new_val_length) {
m_blob_fields.start_blobs(m_var_fields.end_offset());
m_blob_fields.replace(blob_index, new_val_length, new_val_ptr);
maybe_clear_null(field_null_num);
}
void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &old_val, void *extra_val);
void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &old_val, void *extra_val);
private:
bool is_null(uint32_t null_num, uchar *null_bytes) {
bool field_is_null = false;
if (null_num) {
if (null_num & (1<<31))
null_num &= ~(1<<31);
else
null_num -= 1;
field_is_null = is_overall_null_position_set(null_bytes, null_num);
}
return field_is_null;
}
void maybe_clear_null(uint32_t null_num) {
if (null_num) {
if (null_num & (1<<31))
null_num &= ~(1<<31);
else
null_num -= 1;
set_overall_null_position((uchar *) m_val_buffer->data(), null_num, false);
}
}
private:
var_fields m_var_fields;
blob_fields m_blob_fields;
tokudb::buffer *m_val_buffer;
};
// Update an int field: signed newval@offset = old_val@offset OP extra_val
static void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &new_val, tokudb::buffer &old_val, void *extra_val) {
assert(the_offset + length <= new_val.size());
void value_map::int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &old_val, void *extra_val) {
assert(the_offset + length <= m_val_buffer->size());
assert(the_offset + length <= old_val.size());
assert(length == 1 || length == 2 || length == 3 || length == 4 || length == 8);
uchar *old_val_ptr = (uchar *) old_val.data();
bool field_is_null = false;
if (field_null_num)
field_is_null = is_overall_null_position_set(old_val_ptr, field_null_num & ~(1<<31));
bool field_is_null = is_null(field_null_num, old_val_ptr);
int64_t v = 0;
memcpy(&v, old_val_ptr + the_offset, length);
v = tokudb::int_sign_extend(v, 8*length);
......@@ -1088,7 +1233,7 @@ static void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uin
else
v = tokudb::int_low_endpoint(8*length);
}
new_val.replace(the_offset, length, &v, length);
m_val_buffer->replace(the_offset, length, &v, length);
}
break;
case '-':
......@@ -1101,7 +1246,7 @@ static void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uin
else
v = tokudb::int_high_endpoint(8*length);
}
new_val.replace(the_offset, length, &v, length);
m_val_buffer->replace(the_offset, length, &v, length);
}
break;
default:
......@@ -1110,16 +1255,14 @@ static void int_op(uint32_t operation, uint32_t the_offset, uint32_t length, uin
}
// Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val
static void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &new_val, tokudb::buffer &old_val, void *extra_val) {
assert(the_offset + length <= new_val.size());
void value_map::uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, uint32_t field_null_num,
tokudb::buffer &old_val, void *extra_val) {
assert(the_offset + length <= m_val_buffer->size());
assert(the_offset + length <= old_val.size());
assert(length == 1 || length == 2 || length == 3 || length == 4 || length == 8);
uchar *old_val_ptr = (uchar *) old_val.data();
bool field_is_null = false;
if (field_null_num)
field_is_null = is_overall_null_position_set(old_val_ptr, field_null_num & ~(1<<31));
bool field_is_null = is_null(field_null_num, old_val_ptr);
uint64_t v = 0;
memcpy(&v, old_val_ptr + the_offset, length);
uint64_t extra_v = 0;
......@@ -1132,7 +1275,7 @@ static void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, ui
if (over) {
v = tokudb::uint_high_endpoint(8*length);
}
new_val.replace(the_offset, length, &v, length);
m_val_buffer->replace(the_offset, length, &v, length);
}
break;
case '-':
......@@ -1142,7 +1285,7 @@ static void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, ui
if (over) {
v = tokudb::uint_low_endpoint(8*length);
}
new_val.replace(the_offset, length, &v, length);
m_val_buffer->replace(the_offset, length, &v, length);
}
break;
default:
......@@ -1150,9 +1293,10 @@ static void uint_op(uint32_t operation, uint32_t the_offset, uint32_t length, ui
}
}
// Decode and apply a sequence of update operations defined in the extra to the old value and put the result
// in the new value.
static void apply_updates(tokudb::buffer &new_val, tokudb::buffer &old_val, tokudb::buffer &extra_val, const tokudb::simple_row_descriptor &sd) {
}
// Decode and apply a sequence of update operations defined in the extra to the old value and put the result in the new value.
static void apply_1_updates(tokudb::value_map &vd, tokudb::buffer &new_val, tokudb::buffer &old_val, tokudb::buffer &extra_val) {
uint32_t num_updates;
extra_val.consume(&num_updates, sizeof num_updates);
for ( ; num_updates > 0; num_updates--) {
......@@ -1167,38 +1311,31 @@ static void apply_updates(tokudb::buffer &new_val, tokudb::buffer &old_val, toku
extra_val.consume(&field_null_num, sizeof field_null_num);
uint32_t the_offset;
extra_val.consume(&the_offset, sizeof the_offset);
uint32_t length;
extra_val.consume(&length, sizeof length);
void *extra_val_ptr = extra_val.consume_ptr(length);
uint32_t extra_val_length;
extra_val.consume(&extra_val_length, sizeof extra_val_length);
void *extra_val_ptr = extra_val.consume_ptr(extra_val_length);
// apply the update
switch (field_type) {
case UPDATE_TYPE_INT:
if (update_operation == '=')
set_fixed_field(the_offset, length, field_null_num, new_val, extra_val_ptr);
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
int_op(update_operation, the_offset, length, field_null_num, new_val, old_val, extra_val_ptr);
vd.int_op(update_operation, the_offset, extra_val_length, field_null_num, old_val, extra_val_ptr);
break;
case UPDATE_TYPE_UINT:
if (update_operation == '=')
set_fixed_field(the_offset, length, field_null_num, new_val, extra_val_ptr);
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
uint_op(update_operation, the_offset, length, field_null_num, new_val, old_val, extra_val_ptr);
vd.uint_op(update_operation, the_offset, extra_val_length, field_null_num, old_val, extra_val_ptr);
break;
case UPDATE_TYPE_CHAR:
case UPDATE_TYPE_BINARY:
if (update_operation == '=')
set_fixed_field(the_offset, length, field_null_num, new_val, extra_val_ptr);
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
assert(0);
break;
case UPDATE_TYPE_VARBINARY:
case UPDATE_TYPE_VARCHAR:
if (update_operation == '=')
set_var_field(the_offset, length, field_null_num, new_val, extra_val_ptr, sd);
else
assert(0);
break;
default:
assert(0);
break;
......@@ -1207,9 +1344,8 @@ static void apply_updates(tokudb::buffer &new_val, tokudb::buffer &old_val, toku
assert(extra_val.size() == extra_val.limit());
}
// Simple update handler. Decode the update message, apply the update operations to the old value, and set
// the new value.
static int tokudb_simple_update_fun(
// Simple update handler. Decode the update message, apply the update operations to the old value, and set the new value.
static int tokudb_update_1_fun(
DB* db,
const DBT *key_dbt,
const DBT *old_val_dbt,
......@@ -1222,12 +1358,18 @@ static int tokudb_simple_update_fun(
uchar operation;
extra_val.consume(&operation, sizeof operation);
assert(operation == UPDATE_OP_SIMPLE_UPDATE);
assert(operation == UPDATE_OP_UPDATE_1);
if (old_val_dbt != NULL) {
// get the simple descriptor
tokudb::simple_row_descriptor sd;
sd.consume(extra_val);
uint32_t m_fixed_field_offset;
extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
uint32_t m_var_field_offset;
extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
uint32_t m_var_offset_bytes;
extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
uint32_t m_bytes_per_offset;
extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);
......@@ -1235,8 +1377,11 @@ static int tokudb_simple_update_fun(
tokudb::buffer new_val;
new_val.append(old_val_dbt->data, old_val_dbt->size);
tokudb::value_map vd(&new_val);
vd.init_var_fields(m_var_field_offset, m_var_offset_bytes, m_bytes_per_offset);
// apply updates to new val
apply_updates(new_val, old_val, extra_val, sd);
apply_1_updates(vd, new_val, old_val, extra_val);
// set the new val
DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
......@@ -1250,7 +1395,7 @@ static int tokudb_simple_update_fun(
// Simple upsert handler. Decode the upsert message. If the key does not exist, then insert a new value from the extra.
// Otherwise, apply the update operations to the old value, and then set the new value.
static int tokudb_simple_upsert_fun(
static int tokudb_upsert_1_fun(
DB* db,
const DBT *key_dbt,
const DBT *old_val_dbt,
......@@ -1263,7 +1408,7 @@ static int tokudb_simple_upsert_fun(
uchar operation;
extra_val.consume(&operation, sizeof operation);
assert(operation == UPDATE_OP_SIMPLE_UPSERT);
assert(operation == UPDATE_OP_UPSERT_1);
uint32_t insert_length;
extra_val.consume(&insert_length, sizeof insert_length);
......@@ -1277,17 +1422,175 @@ static int tokudb_simple_upsert_fun(
set_val(&new_val_dbt, set_extra);
} else {
// decode the simple descriptor
tokudb::simple_row_descriptor sd;
sd.consume(extra_val);
uint32_t m_fixed_field_offset;
extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
uint32_t m_var_field_offset;
extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
uint32_t m_var_offset_bytes;
extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
uint32_t m_bytes_per_offset;
extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);
// new val = old val
tokudb::buffer new_val;
new_val.append(old_val_dbt->data, old_val_dbt->size);
tokudb::value_map vd(&new_val);
vd.init_var_fields(m_var_field_offset, m_var_offset_bytes, m_bytes_per_offset);
// apply updates to new val
apply_1_updates(vd, new_val, old_val, extra_val);
// set the new val
DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
new_val_dbt.data = new_val.data();
new_val_dbt.size = new_val.size();
set_val(&new_val_dbt, set_extra);
}
return 0;
}
// Decode and apply a sequence of update operations defined in the extra to the old value and put the result in the new value.
static void apply_2_updates(tokudb::value_map &vd, tokudb::buffer &new_val, tokudb::buffer &old_val, tokudb::buffer &extra_val) {
uint32_t num_updates = extra_val.consume_uint32();
for (uint32_t i = 0; i < num_updates; i++) {
uint32_t update_operation = extra_val.consume_uint32();
if (update_operation == 'v') {
uint32_t var_field_offset = extra_val.consume_uint32();
uint32_t var_offset_bytes = extra_val.consume_uint32();
uint32_t bytes_per_offset = extra_val.consume_uint32();
vd.init_var_fields(var_field_offset, var_offset_bytes, bytes_per_offset);
} else if (update_operation == 'b') {
uint32_t num_blobs = extra_val.consume_uint32();
uint8_t *blob_lengths = (uint8_t *)extra_val.consume_ptr(num_blobs);
vd.init_blob_fields(num_blobs, blob_lengths);
} else {
uint32_t field_type = extra_val.consume_uint32();
uint32_t field_null_num = extra_val.consume_uint32();
uint32_t the_offset = extra_val.consume_uint32();
uint32_t extra_val_length = extra_val.consume_uint32();
void *extra_val_ptr = extra_val.consume_ptr(extra_val_length);
switch (field_type) {
case UPDATE_TYPE_INT:
if (update_operation == '=')
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
vd.int_op(update_operation, the_offset, extra_val_length, field_null_num, old_val, extra_val_ptr);
break;
case UPDATE_TYPE_UINT:
if (update_operation == '=')
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
vd.uint_op(update_operation, the_offset, extra_val_length, field_null_num, old_val, extra_val_ptr);
break;
case UPDATE_TYPE_CHAR:
case UPDATE_TYPE_BINARY:
if (update_operation == '=')
vd.replace_fixed(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
assert(0);
break;
case UPDATE_TYPE_VARBINARY:
case UPDATE_TYPE_VARCHAR:
if (update_operation == '=')
vd.replace_varchar(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
assert(0);
break;
case UPDATE_TYPE_TEXT:
case UPDATE_TYPE_BLOB:
if (update_operation == '=')
vd.replace_blob(the_offset, field_null_num, extra_val_ptr, extra_val_length);
else
assert(0);
break;
default:
assert(0);
break;
}
}
}
assert(extra_val.size() == extra_val.limit());
}
// Simple update handler. Decode the update message, apply the update operations to the old value, and set the new value.
static int tokudb_update_2_fun(
DB* db,
const DBT *key_dbt,
const DBT *old_val_dbt,
const DBT *extra,
void (*set_val)(const DBT *new_val_dbt, void *set_extra),
void *set_extra
)
{
tokudb::buffer extra_val(extra->data, 0, extra->size);
uint32_t operation = extra_val.consume_uint32();
assert(operation == UPDATE_OP_UPDATE_2);
if (old_val_dbt != NULL) {
tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);
// new val = old val
tokudb::buffer new_val;
new_val.append(old_val_dbt->data, old_val_dbt->size);
tokudb::value_map vd(&new_val);
// apply updates to new val
apply_2_updates(vd, new_val, old_val, extra_val);
// set the new val
DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
new_val_dbt.data = new_val.data();
new_val_dbt.size = new_val.size();
set_val(&new_val_dbt, set_extra);
}
return 0;
}
// Simple upsert handler. Decode the upsert message. If the key does not exist, then insert a new value from the extra.
// Otherwise, apply the update operations to the old value, and then set the new value.
static int tokudb_upsert_2_fun(
DB* db,
const DBT *key_dbt,
const DBT *old_val_dbt,
const DBT *extra,
void (*set_val)(const DBT *new_val_dbt, void *set_extra),
void *set_extra
)
{
tokudb::buffer extra_val(extra->data, 0, extra->size);
uint32_t operation = extra_val.consume_uint32();
assert(operation == UPDATE_OP_UPSERT_2);
uint32_t insert_length = extra_val.consume_uint32();
assert(insert_length < extra_val.limit());
void *insert_row = extra_val.consume_ptr(insert_length);
if (old_val_dbt == NULL) {
// insert a new row
DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
new_val_dbt.size = insert_length;
new_val_dbt.data = insert_row;
set_val(&new_val_dbt, set_extra);
} else {
tokudb::buffer old_val(old_val_dbt->data, old_val_dbt->size, old_val_dbt->size);
// new val = old val
tokudb::buffer new_val;
new_val.append(old_val_dbt->data, old_val_dbt->size);
tokudb::value_map vd(&new_val);
// apply updates to new val
apply_updates(new_val, old_val, extra_val, sd);
apply_2_updates(vd, new_val, old_val, extra_val);
// set the new val
DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
......@@ -1329,11 +1632,17 @@ int tokudb_update_fun(
case UPDATE_OP_EXPAND_BINARY:
error = tokudb_expand_char_field(db, key, old_val, extra, set_val, set_extra);
break;
case UPDATE_OP_SIMPLE_UPDATE:
error = tokudb_simple_update_fun(db, key, old_val, extra, set_val, set_extra);
case UPDATE_OP_UPDATE_1:
error = tokudb_update_1_fun(db, key, old_val, extra, set_val, set_extra);
break;
case UPDATE_OP_UPSERT_1:
error = tokudb_upsert_1_fun(db, key, old_val, extra, set_val, set_extra);
break;
case UPDATE_OP_UPDATE_2:
error = tokudb_update_2_fun(db, key, old_val, extra, set_val, set_extra);
break;
case UPDATE_OP_SIMPLE_UPSERT:
error = tokudb_simple_upsert_fun(db, key, old_val, extra, set_val, set_extra);
case UPDATE_OP_UPSERT_2:
error = tokudb_upsert_2_fun(db, key, old_val, extra, set_val, set_extra);
break;
default:
error = EINVAL;
......
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <tokudb_base128.h>
int main(void) {
uint32_t n;
unsigned char b[5];
size_t out_s, in_s;
printf("%u\n", 0);
for (uint32_t v = 0; v < (1<<7); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 1);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 1 && n == v);
}
printf("%u\n", 1<<7);
for (uint32_t v = (1<<7); v < (1<<14); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 2);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 2 && n == v);
}
printf("%u\n", 1<<14);
for (uint32_t v = (1<<14); v < (1<<21); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 3);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 3 && n == v);
}
printf("%u\n", 1<<21);
for (uint32_t v = (1<<21); v < (1<<28); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 4);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 4 && n == v);
}
printf("%u\n", 1<<28);
for (uint32_t v = (1<<28); v != 0; v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 5);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 5 && n == v);
}
return 0;
}
......@@ -2,6 +2,7 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <stdint.h>
#include <tokudb_buffer.h>
static void test_null() {
......
#ifndef _TOKUDB_BASE128_H
#define _TOKUDB_BASE128_H
namespace tokudb {
static size_t base128_encode_uint32(uint32_t n, void *p, size_t s) {
unsigned char *pp = (unsigned char *)p;
uint i = 0;
while (i < s) {
uint32_t m = n & 127;
n >>= 7;
if (n != 0)
m |= 128;
pp[i++] = m;
if (n == 0)
break;
}
return i;
}
static size_t base128_decode_uint32(uint32_t *np, void *p, size_t s) {
unsigned char *pp = (unsigned char *)p;
uint32_t n = 0;
uint i = 0;
while (i < s) {
uint m = pp[i];
n |= (m & 127) << 7*i;
i++;
if ((m & 128) == 0)
break;
}
*np = n;
return i;
}
}
#endif
#if !defined(_TOKUDB_BUFFER_H)
#define _TOKUDB_BUFFER_H
#include "tokudb_base128.h"
namespace tokudb {
// A Buffer manages a contiguous chunk of memory and supports appending new data to the end of the buffer, and
......@@ -31,6 +33,12 @@ class buffer {
memcpy(append_ptr(s), p, s);
}
void append_uint32(uint32_t n) {
maybe_realloc(5);
size_t s = tokudb::base128_encode_uint32(n, (char *) m_data + m_size, 5);
m_size += s;
}
// Return a pointer to the next location in the buffer where bytes are consumed from.
void *consume_ptr(size_t s) {
if (m_size + s > m_limit)
......@@ -45,6 +53,25 @@ class buffer {
memcpy(p, consume_ptr(s), s);
}
uint32_t consume_uint32() {
uint32_t n;
size_t s = tokudb::base128_decode_uint32(&n, (char *) m_data + m_size, m_limit - m_size);
m_size += s;
return n;
}
// Write p_length bytes at an offset in the buffer
void write(void *p, size_t p_length, size_t offset) {
assert(offset + p_length <= m_size);
memcpy((char *)m_data + offset, p, p_length);
}
// Read p_length bytes at an offset in the buffer
void read(void *p, size_t p_length, size_t offset) {
assert(offset + p_length <= m_size);
memcpy(p, (char *)m_data + offset, p_length);
}
// Replace a field in the buffer with new data. If the new data size is different, then readjust the
// size of the buffer and move things around.
void replace(size_t offset, size_t old_s, void *new_p, size_t new_s) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment