Commit b2a99540 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5886 merge blob updates to mainline

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@52092 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6b8d3611
......@@ -140,6 +140,17 @@ static uint32_t var_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint id
return v_index;
}
static uint32_t blob_field_index(TABLE *table, KEY_AND_COL_INFO *kc_info, uint idx, uint field_num) {
assert(field_num < table->s->fields);
uint b_index;
for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
if (kc_info->blob_fields[b_index] == field_num)
break;
}
assert(b_index < kc_info->num_blobs);
return b_index;
}
// Determine if an update operation can be offloaded to the storage engine.
// The update operation consists of a list of update expressions (fields[i] = values[i]), and a list
// of where conditions (conds). The function returns 0 if the update is handled in the storage engine.
......@@ -163,7 +174,7 @@ int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &upda
line = __LINE__;
goto return_error;
}
if (!check_fast_update(thd, update_fields, update_values, conds)) {
error = ENOTSUP;
line = __LINE__;
......@@ -303,6 +314,7 @@ static bool check_simple_update_expression(Item *lhs_item, Item *rhs_item, TABLE
return true;
break;
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_BLOB:
if (rhs_type == Item::STRING_ITEM)
return true;
break;
......@@ -478,14 +490,24 @@ bool ha_tokudb::check_fast_update(THD *thd, List<Item> &fields, List<Item> &valu
return true;
}
// Marshall a simple row descriptor to a buffer.
static void marshall_simple_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO &kc_info, uint key_num) {
tokudb::simple_row_descriptor sd;
sd.m_fixed_field_offset = table->s->null_bytes;
sd.m_var_field_offset = sd.m_fixed_field_offset + kc_info.mcp_info[key_num].fixed_field_size;
sd.m_var_offset_bytes = kc_info.mcp_info[key_num].len_of_offsets; // total length of the var offsets
sd.m_bytes_per_offset = sd.m_var_offset_bytes == 0 ? 0 : kc_info.num_offset_bytes; // bytes per var offset
sd.append(b);
static void marshall_varchar_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info, uint key_num) {
b.append_uint32('v');
b.append_uint32(table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
b.append_uint32(var_offset_bytes);
b.append_uint32(var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
}
static void marshall_blobs_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO *kc_info) {
b.append_uint32('b');
uint32_t n = kc_info->num_blobs;
b.append_uint32(n);
for (uint i = 0; i < n; i++) {
uint blob_field_index = kc_info->blob_fields[i];
assert(blob_field_index < table->s->fields);
uint8_t blob_field_length = table->s->field[blob_field_index]->row_pack_length();
b.append(&blob_field_length, sizeof blob_field_length);
}
}
static inline uint32_t get_null_bit_position(uint32_t null_bit);
......@@ -501,7 +523,7 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
uint32_t field_null_num = 0;
if (lhs_field->real_maybe_null()) {
uint32_t field_num = lhs_field->field_index;
field_null_num = (1<<31) + (field_num/8)*8 + get_null_bit_position(lhs_field->null_bit);
field_null_num = ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
}
uint32_t offset;
void *v_ptr = NULL;
......@@ -549,7 +571,6 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
}
break;
}
case MYSQL_TYPE_STRING: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
......@@ -567,7 +588,6 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
v_ptr = v_str.c_ptr();
break;
}
case MYSQL_TYPE_VARCHAR: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
......@@ -581,18 +601,29 @@ static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_
v_ptr = v_str.c_ptr();
break;
}
case MYSQL_TYPE_BLOB: {
update_operation = '=';
field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
offset = blob_field_index(table, &share->kc_info, table->s->primary_key, lhs_field->field_index);
v_str = *rhs_item->val_str(&v_str);
v_length = v_str.length();
if (v_length >= lhs_field->max_data_length()) {
v_length = lhs_field->max_data_length();
v_str.length(v_length); // truncate
}
v_ptr = v_str.c_ptr();
break;
}
default:
assert(0);
}
// marshall the update fields into the buffer
b.append(&update_operation, sizeof update_operation);
b.append(&field_type, sizeof field_type);
uint32_t unused = 0;
b.append(&unused, sizeof unused);
b.append(&field_null_num, sizeof field_null_num);
b.append(&offset, sizeof offset);
b.append(&v_length, sizeof v_length);
b.append_uint32(update_operation);
b.append_uint32(field_type);
b.append_uint32(field_null_num);
b.append_uint32(offset);
b.append_uint32(v_length);
b.append(v_ptr, v_length);
}
......@@ -612,6 +643,19 @@ static int save_in_field(Item *item, TABLE *table) {
return error;
}
static void count_update_types(Field *lhs_field, uint *num_varchars, uint *num_blobs) {
switch (lhs_field->type()) {
case MYSQL_TYPE_VARCHAR:
*num_varchars += 1;
break;
case MYSQL_TYPE_BLOB:
*num_blobs += 1;
break;
default:
break;
}
}
// Generate an update message for an update operation and send it into the primary tree. Return 0 if successful.
int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update_values, Item *conds, DB_TXN *txn) {
int error;
......@@ -640,16 +684,36 @@ int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update
// construct the update message
tokudb::buffer update_message;
uchar operation = UPDATE_OP_SIMPLE_UPDATE;
// update_message.append_uint32(UPDATE_OP_UPDATE_2);
uint8_t operation = UPDATE_OP_UPDATE_2;
update_message.append(&operation, sizeof operation);
// append the descriptor
marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);
// append the updates
uint32_t num_updates = update_fields.elements;
update_message.append(&num_updates, sizeof num_updates);
uint num_varchars = 0, num_blobs = 0;
if (1) {
List_iterator<Item> lhs_i(update_fields);
Item *lhs_item;
while ((lhs_item = lhs_i++)) {
if (lhs_item == NULL)
break;
Field *lhs_field = find_field_by_name(table, lhs_item);
assert(lhs_field); // we found it before, so this should work
count_update_types(lhs_field, &num_varchars, &num_blobs);
}
if (num_varchars > 0 || num_blobs > 0)
num_updates++;
if (num_blobs > 0)
num_updates++;
}
// append the updates
update_message.append_uint32(num_updates);
if (num_varchars > 0 || num_blobs > 0)
marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key);
if (num_blobs > 0)
marshall_blobs_descriptor(update_message, table, &share->kc_info);
List_iterator<Item> lhs_i(update_fields);
List_iterator<Item> rhs_i(update_values);
......@@ -779,20 +843,39 @@ int ha_tokudb::send_upsert_message(THD *thd, List<Item> &update_fields, List<Ite
tokudb::buffer update_message;
// append the operation
uchar operation = UPDATE_OP_SIMPLE_UPSERT;
// update_message.append_uint32(UPDATE_OP_UPSERT_2);
uint8_t operation = UPDATE_OP_UPSERT_2;
update_message.append(&operation, sizeof operation);
// append the row
uint32_t row_length = row.size;
update_message.append(&row_length, sizeof row_length);
update_message.append(row.data, row_length);
// append the descriptor
marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);
update_message.append_uint32(row.size);
update_message.append(row.data, row.size);
// append the update expressions
uint32_t num_updates = update_fields.elements;
update_message.append(&num_updates, sizeof num_updates);
uint num_varchars = 0, num_blobs = 0;
if (1) {
List_iterator<Item> lhs_i(update_fields);
Item *lhs_item;
while ((lhs_item = lhs_i++)) {
if (lhs_item == NULL)
break;
Field *lhs_field = find_field_by_name(table, lhs_item);
assert(lhs_field); // we found it before, so this should work
count_update_types(lhs_field, &num_varchars, &num_blobs);
}
if (num_varchars > 0 || num_blobs > 0)
num_updates++;
if (num_blobs > 0)
num_updates++;
}
// append the updates
update_message.append_uint32(num_updates);
if (num_varchars > 0 || num_blobs > 0)
marshall_varchar_descriptor(update_message, table, &share->kc_info, table->s->primary_key);
if (num_blobs > 0)
marshall_blobs_descriptor(update_message, table, &share->kc_info);
List_iterator<Item> lhs_i(update_fields);
List_iterator<Item> rhs_i(update_values);
......
This diff is collapsed.
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <tokudb_base128.h>
int main(void) {
uint32_t n;
unsigned char b[5];
size_t out_s, in_s;
printf("%u\n", 0);
for (uint32_t v = 0; v < (1<<7); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 1);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 1 && n == v);
}
printf("%u\n", 1<<7);
for (uint32_t v = (1<<7); v < (1<<14); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 2);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 2 && n == v);
}
printf("%u\n", 1<<14);
for (uint32_t v = (1<<14); v < (1<<21); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 3);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 3 && n == v);
}
printf("%u\n", 1<<21);
for (uint32_t v = (1<<21); v < (1<<28); v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 4);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 4 && n == v);
}
printf("%u\n", 1<<28);
for (uint32_t v = (1<<28); v != 0; v++) {
out_s = tokudb::base128_encode_uint32(v, b, sizeof b);
assert(out_s == 5);
in_s = tokudb::base128_decode_uint32(&n, b, out_s);
assert(in_s == 5 && n == v);
}
return 0;
}
......@@ -2,6 +2,7 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <stdint.h>
#include <tokudb_buffer.h>
static void test_null() {
......
#ifndef _TOKUDB_BASE128_H
#define _TOKUDB_BASE128_H
namespace tokudb {
static size_t base128_encode_uint32(uint32_t n, void *p, size_t s) {
unsigned char *pp = (unsigned char *)p;
uint i = 0;
while (i < s) {
uint32_t m = n & 127;
n >>= 7;
if (n != 0)
m |= 128;
pp[i++] = m;
if (n == 0)
break;
}
return i;
}
static size_t base128_decode_uint32(uint32_t *np, void *p, size_t s) {
unsigned char *pp = (unsigned char *)p;
uint32_t n = 0;
uint i = 0;
while (i < s) {
uint m = pp[i];
n |= (m & 127) << 7*i;
i++;
if ((m & 128) == 0)
break;
}
*np = n;
return i;
}
}
#endif
#if !defined(_TOKUDB_BUFFER_H)
#define _TOKUDB_BUFFER_H
#include "tokudb_base128.h"
namespace tokudb {
// A Buffer manages a contiguous chunk of memory and supports appending new data to the end of the buffer, and
......@@ -31,6 +33,12 @@ class buffer {
memcpy(append_ptr(s), p, s);
}
void append_uint32(uint32_t n) {
maybe_realloc(5);
size_t s = tokudb::base128_encode_uint32(n, (char *) m_data + m_size, 5);
m_size += s;
}
// Return a pointer to the next location in the buffer where bytes are consumed from.
void *consume_ptr(size_t s) {
if (m_size + s > m_limit)
......@@ -45,6 +53,25 @@ class buffer {
memcpy(p, consume_ptr(s), s);
}
uint32_t consume_uint32() {
uint32_t n;
size_t s = tokudb::base128_decode_uint32(&n, (char *) m_data + m_size, m_limit - m_size);
m_size += s;
return n;
}
// Write p_length bytes at an offset in the buffer
void write(void *p, size_t p_length, size_t offset) {
assert(offset + p_length <= m_size);
memcpy((char *)m_data + offset, p, p_length);
}
// Read p_length bytes at an offset in the buffer
void read(void *p, size_t p_length, size_t offset) {
assert(offset + p_length <= m_size);
memcpy(p, (char *)m_data + offset, p_length);
}
// Replace a field in the buffer with new data. If the new data size is different, then readjust the
// size of the buffer and move things around.
void replace(size_t offset, size_t old_s, void *new_p, size_t new_s) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment