ha_tokudb_update.cc 26.3 KB
Newer Older
1 2 3
#if TOKU_INCLUDE_UPSERT

// Point updates and upserts
4

5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Restrictions:
//   No triggers
//   No binary logging
//   Primary key must be defined
//   Simple and compound primary key
//   Int, char and varchar primary key types
//   No updates on fields that are part of any key
//   No clustering keys
//   Integer and char field updates
//   Update expressions:
//       x = constant
//       x = x+constant
//       x = x-constant
//      x = if(x=0,0,x-1)
//   Session variable enables fast updates and fast upserts
//   Session variable disables slow updates and slow upserts
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
// Future features:
//   Support more primary key types
//   Allow statement based binary logging
//   Force statement logging for fast updates
//   Support clustering keys using broadcast updates
//   Support primary key ranges using multicast messages
//   Support more complicated update expressions
//   Replace field_offset

// Debug function to dump an Item
static void dump_item(Item *item) {
    fprintf(stderr, "%u", item->type());
    switch (item->type()) {
    case Item::FUNC_ITEM: {
        Item_func *func = static_cast<Item_func*>(item);
        uint n = func->argument_count();
        Item **arguments = func->arguments();
        fprintf(stderr, ":func=%u,%s,%u(", func->functype(), func->func_name(), n);
        for (uint i = 0; i < n ; i++) {
            dump_item(arguments[i]);
            if (i < n-1)
                fprintf(stderr,",");
        }
        fprintf(stderr, ")");
        break;
    }
    case Item::INT_ITEM: {
        Item_int *int_item = static_cast<Item_int*>(item);
        fprintf(stderr, ":int=%lld", int_item->val_int());
        break;
    }
    case Item::STRING_ITEM: {
        Item_string *str_item = static_cast<Item_string*>(item);
        fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
        break;
    }
    case Item::FIELD_ITEM: {
        Item_field *field_item = static_cast<Item_field*>(item);
        fprintf(stderr, ":field=%s.%s.%s", field_item->db_name, field_item->table_name, field_item->field_name);
        break;
    }
    case Item::COND_ITEM: {
        Item_cond *cond_item = static_cast<Item_cond*>(item);
        fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
        List_iterator<Item> li(*cond_item->argument_list());
        Item *list_item;
        while ((list_item = li++)) {
            dump_item(list_item);
            fprintf(stderr, "\n");  
        }
        fprintf(stderr, ")\n");
        break;
    }
    default:
        break;
    }
}

// Debug function to dump an Item list
static void dump_item_list(const char *h, List<Item> &l) {
    fprintf(stderr, "%s elements=%u\n", h, l.elements);
    List_iterator<Item> li(l);
    Item *item;
    while ((item = li++) != NULL) {
        dump_item(item);
        fprintf(stderr, "\n");  
    }
}

// Find a Field by its Item name
static Field *find_field_by_name(TABLE *table, Item *item) {
    if (item->type() != Item::FIELD_ITEM)
        return NULL;
    Item_field *field_item = static_cast<Item_field*>(item);
#if 0
    if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
        strcmp(table->s->table_name.str, field_item->table_name) != 0)
        return NULL;
    Field *found_field = NULL;
    for (uint i = 0; i < table->s->fields; i++) {
        Field *test_field = table->s->field[i];
        if (strcmp(field_item->field_name, test_field->field_name) == 0) {
            found_field = test_field;
            break;
        }
    }
    return found_field;
#else
110
    // item->field may be a shortcut instead of the above table lookup
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    return field_item->field;
#endif
}

// Return the starting offset in the value for a particular index (selected by idx) of a
// particular field (selected by expand_field_num).
// This only works for fixed length fields
static uint32_t update_field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_info, int idx, int expand_field_num) {
    uint32_t offset = null_bytes;
    for (int i = 0; i < expand_field_num; i++) {
        if (bitmap_is_set(&kc_info->key_filters[idx], i)) // skip key fields
            continue;
        offset += kc_info->field_lengths[i];
    }
    return offset;
}

// Determine if an update operation can be offloaded to the storage engine.
// The update operation consists of a list of update expressions (fields[i] = values[i]), and a list
130 131 132
// of where conditions (conds).  The function returns 0 if the update is handled in the storage engine.
// Otherwise, an error is returned.
int ha_tokudb::fast_update(THD *thd, List<Item> &update_fields, List<Item> &update_values, Item *conds) {
133 134
    TOKUDB_DBUG_ENTER("ha_tokudb::fast_update");

135 136
    int error = 0;

137
    if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
138 139
        dump_item_list("fields", update_fields);
        dump_item_list("values", update_values);
140 141 142 143 144
        if (conds) {
            fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n");
        }
    }

145 146 147 148
    if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
        error = ENOTSUP;  // something is fishy with the parameters
        goto return_error;
    }
149

150
    if (!check_fast_update(thd, update_fields, update_values, conds)) {
151
        error = ENOTSUP;
152 153
        goto check_error;
    }
154

155
    error = send_update_message(update_fields, update_values, conds, transaction);
156

157
check_error:
158
    if (error != 0) {
159 160
        if (get_disable_slow_update(thd))
            error = HA_ERR_UNSUPPORTED;
161 162
        if (error != ENOTSUP)
            print_error(error, MYF(0));
163
    }
164

165 166
return_error:
    TOKUDB_DBUG_RETURN(error);
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
}

// Return true if an expression is a simple int expression or a simple function of +- int expression.
static bool check_int_result(Item *item) {
    Item::Type t = item->type();
    if (t == Item::INT_ITEM)
        return true;
    else if (t == Item::FUNC_ITEM) {
        Item_func *item_func = static_cast<Item_func*>(item);
        if (strcmp(item_func->func_name(), "+") != 0 && strcmp(item_func->func_name(), "-") != 0)
            return false;
        if (item_func->argument_count() != 1)
            return false;
        Item **arguments = item_func->arguments();
        if (arguments[0]->type() != Item::INT_ITEM)
            return false;
        return true;
    } else
        return false;
}

// Return true if an expression looks like field_name op constant.
static bool check_x_op_constant(const char *field_name, Item *item, const char *op, Item **item_constant) {
    if (item->type() != Item::FUNC_ITEM)
        return false;
    Item_func *item_func = static_cast<Item_func*>(item);
    if (strcmp(item_func->func_name(), op) != 0)
        return false;
    Item **arguments = item_func->arguments();
    uint n = item_func->argument_count();
    if (n != 2)
        return false;
    if (arguments[0]->type() != Item::FIELD_ITEM)
        return false;
    Item_field *arg0 = static_cast<Item_field*>(arguments[0]);
    if (strcmp(field_name, arg0->field_name) != 0)
        return false;
    if (!check_int_result(arguments[1]))
        return false;
    *item_constant = arguments[1];
    return true;
}

// Return true if an expression looks like field_name = constant
static bool check_x_equal_0(const char *field_name, Item *item) {
    Item *item_constant;
    if (!check_x_op_constant(field_name, item, "=", &item_constant))
        return false;
    if (item_constant->val_int() != 0)
        return false;
    return true;
}

// Return true if an expression looks like fieldname - 1
static bool check_x_minus_1(const char *field_name, Item *item) {
    Item *item_constant;
    if (!check_x_op_constant(field_name, item, "-", &item_constant))
        return false;
    if (item_constant->val_int() != 1)
        return false;
    return true;
}

// Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
// the field named by fieldname is an unsigned int.
static bool check_decr_floor_expression(Field *lhs_field, Item *item) {
    if (item->type() != Item::FUNC_ITEM)
        return false;
    Item_func *item_func = static_cast<Item_func*>(item);
236 237
    if (strcmp(item_func->func_name(), "if") != 0)
        return false;
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
    Item **arguments = item_func->arguments();
    uint n = item_func->argument_count();
    if (n != 3)
        return false;
    if (!check_x_equal_0(lhs_field->field_name, arguments[0]))
        return false;
    if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
        return false;
    if (!check_x_minus_1(lhs_field->field_name, arguments[2]))
        return false;
    if (!(lhs_field->flags & UNSIGNED_FLAG))
        return false;
    return true;
}

// Check if lhs = rhs expression is simple.  Return true if it is.
static bool check_simple_update_expression(Item *lhs_item, Item *rhs_item, TABLE *table) {
    Field *lhs_field = find_field_by_name(table, lhs_item);
    if (lhs_field == NULL)
        return false;
    if (!lhs_field->part_of_key.is_clear_all())
        return false;
    enum_field_types lhs_type = lhs_field->type();
    Item::Type rhs_type = rhs_item->type();
    switch (lhs_type) {
    case MYSQL_TYPE_TINY:
    case MYSQL_TYPE_SHORT:
    case MYSQL_TYPE_INT24:
    case MYSQL_TYPE_LONG:
    case MYSQL_TYPE_LONGLONG:
        if (check_int_result(rhs_item))
            return true;
        Item *item_constant;
        if (check_x_op_constant(lhs_field->field_name, rhs_item, "+", &item_constant))
            return true;
        if (check_x_op_constant(lhs_field->field_name, rhs_item, "-", &item_constant))
            return true;
        if (check_decr_floor_expression(lhs_field, rhs_item))
            return true;
        break;
    case MYSQL_TYPE_STRING:
        if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM) 
            return true;
        break;
    default:
        break;
    }
    return false;
}

// Check that all update expressions are simple.  Return true if they are.
static bool check_all_update_expressions(List<Item> &fields, List<Item> &values, TABLE *table) {
    List_iterator<Item> lhs_i(fields);
    List_iterator<Item> rhs_i(values);
    while (1) {
        Item *lhs_item = lhs_i++;
        if (lhs_item == NULL)
            break;
        Item *rhs_item = rhs_i++;
        if (rhs_item == NULL)
            assert(0); // can not happen
        if (!check_simple_update_expression(lhs_item, rhs_item, table))
            return false;
    }
    return true;
}

305 306
static bool full_field_in_key(TABLE *table, Field *field) {
    assert(table->s->primary_key < table->s->keys);
307
    KEY *key = &table->s->key_info[table->s->primary_key];
308
    for (uint i = 0; i < get_key_parts(key); i++) {
309 310 311 312 313 314
        KEY_PART_INFO *key_part = &key->key_part[i];
        if (strcmp(field->field_name, key_part->field->field_name) == 0) {
            return key_part->length == field->field_length;
        }
    }
    return false;
315 316 317 318
}

// Check that an expression looks like fieldname = constant, fieldname is part of the
// primary key, and the named field is an int, char or varchar type.  Return true if it does.
319
static bool check_pk_field_equal_constant(Item *item, TABLE *table, MY_BITMAP *pk_fields) {
320 321 322 323 324 325 326 327 328 329 330 331
    if (item->type() != Item::FUNC_ITEM)
        return false;
    Item_func *func = static_cast<Item_func*>(item);
    if (strcmp(func->func_name(), "=") != 0)
        return false;   
    uint n = func->argument_count();
    if (n != 2)
        return false;
    Item **arguments = func->arguments();
    Field *field = find_field_by_name(table, arguments[0]);
    if (field == NULL)
        return false;
332
    if (!bitmap_test_and_clear(pk_fields, field->field_index))
333 334 335 336 337 338 339
        return false;
    switch (field->type()) {
    case MYSQL_TYPE_TINY:
    case MYSQL_TYPE_SHORT:
    case MYSQL_TYPE_INT24:
    case MYSQL_TYPE_LONG:
    case MYSQL_TYPE_LONGLONG:
340
        return arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM;
341 342
    case MYSQL_TYPE_STRING:
    case MYSQL_TYPE_VARCHAR:
343 344
        return full_field_in_key(table, field) && 
            (arguments[1]->type() == Item::INT_ITEM || arguments[1]->type() == Item::STRING_ITEM);
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
    default:
        return false;
    }
}

// Check that the where condition covers all of the primary key components with fieldname = constant
// expressions.  Return true if it does.
static bool check_point_update(Item *conds, TABLE *table) {
    bool result = false;

    if (conds == NULL)
        return false; // no where condition on the update

    if (table->s->primary_key >= table->s->keys)
        return false; // no primary key defined

    // use a bitmap of the primary key fields to keep track of those fields that are covered
    // by the where conditions
    MY_BITMAP pk_fields;
    if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
        return false;
    KEY *key = &table->s->key_info[table->s->primary_key];
367
    for (uint i = 0; i < get_key_parts(key); i++) 
368 369 370 371
        bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);

    switch (conds->type()) {
    case Item::FUNC_ITEM:
372
        result = check_pk_field_equal_constant(conds, table, &pk_fields);
373 374 375 376 377 378 379 380 381
        break;
    case Item::COND_ITEM: {
        Item_cond *cond_item = static_cast<Item_cond*>(conds);
        if (strcmp(cond_item->func_name(), "and") != 0)
            break;
        List_iterator<Item> li(*cond_item->argument_list());
        Item *list_item;
        result = true;
        while (result == true && (list_item = li++)) {
382
            result = check_pk_field_equal_constant(list_item, table, &pk_fields);
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
        }
        break;
    }
    default:
        break;
    }

    if (!bitmap_is_clear_all(&pk_fields))
        result = false;
    bitmap_free(&pk_fields);
    return result;
}

// Return true if there are any clustering keys (except the primary).
// Precompute this when the table is opened.
static bool clustering_keys_exist(TABLE *table) {
399 400
    for (uint keynr = 0; keynr < table->s->keys; keynr++) {
        if (keynr != table->s->primary_key && (table->s->key_info[keynr].flags & HA_CLUSTERING))
401
            return true;
402
    }
403 404 405
    return false;
}

406 407 408 409 410 411 412 413
static bool is_strict_mode(THD *thd) {
#if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
    return thd->is_strict_mode();
#else
    return test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
#endif
}

414
#if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
415
#include <binlog.h>
416 417 418
#elif 50500 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50599
#include <log.h>
#endif
419 420 421

// Check if an update operation can be handled by this storage engine.  Return true if it can.
bool ha_tokudb::check_fast_update(THD *thd, List<Item> &fields, List<Item> &values, Item *conds) {
Rich Prohaska's avatar
Rich Prohaska committed
422 423 424 425 426 427 428 429
    // fast upserts disabled
    if (!get_enable_fast_update(thd))
        return false;

    if (!transaction)
        return false;

    // avoid strict mode arithmetic overflow issues
430
    if (is_strict_mode(thd))
Rich Prohaska's avatar
Rich Prohaska committed
431 432
        return false;

433 434 435 436 437 438 439 440 441
    // no triggers
    if (table->triggers) 
        return false;
    
    // no binlog
    if (mysql_bin_log.is_open())
        return false;

    // no clustering keys (need to broadcast an increment into the clustering keys since we are selecting with the primary key)
442
    if (clustering_keys_exist(table))
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
        return false;

    // fast updates enabled with session variable
    if (!get_enable_fast_update(thd))
        return false;

    if (!check_all_update_expressions(fields, values, table))
        return false;

    if (!check_point_update(conds, table))
        return false;

    return true;
}

// Marshall a simple row descriptor to a buffer.
static void marshall_simple_descriptor(tokudb::buffer &b, TABLE *table, KEY_AND_COL_INFO &kc_info, uint key_num) {
    Simple_row_descriptor sd;
    sd.m_fixed_field_offset = table->s->null_bytes;
    sd.m_var_field_offset = sd.m_fixed_field_offset + kc_info.mcp_info[key_num].fixed_field_size;
    sd.m_var_offset_bytes = kc_info.mcp_info[key_num].len_of_offsets;
    sd.m_num_var_fields = sd.m_var_offset_bytes == 0 ? 0 : kc_info.mcp_info[key_num].len_of_offsets / sd.m_var_offset_bytes;
    sd.append(b);
}

static inline uint32_t get_null_bit_position(uint32_t null_bit);

// Marshall update operatins to a buffer.
static void marshall_simple_update(tokudb::buffer &b, Item *lhs_item, Item *rhs_item, TABLE *table, TOKUDB_SHARE *share) {
    // figure out the update operation type (again)
    Field *lhs_field = find_field_by_name(table, lhs_item);
    assert(lhs_field); // we found it before, so this should work

    // compute the update info
    uint32_t field_type;
    uint32_t field_num = lhs_field->field_index;
    uint32_t field_null_num = 0;
    if (lhs_field->real_maybe_null())
        field_null_num = (1<<31) + (field_num/8)*8 + get_null_bit_position(lhs_field->null_bit);
    uint32_t offset = update_field_offset(table->s->null_bytes, &share->kc_info, table->s->primary_key, lhs_field->field_index);
    void *v_ptr = NULL;
    uint32_t v_length;
    uint32_t update_operation;
    longlong v_ll; 
    String v_str;

    switch (lhs_field->type()) {
    case MYSQL_TYPE_TINY:
    case MYSQL_TYPE_SHORT:
    case MYSQL_TYPE_INT24:
    case MYSQL_TYPE_LONG:
    case MYSQL_TYPE_LONGLONG: {
        Field_num *lhs_num = static_cast<Field_num*>(lhs_field);
        field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
        switch (rhs_item->type()) {
        case Item::INT_ITEM: {
            update_operation = '=';
            v_ll = rhs_item->val_int();
            v_length = lhs_field->pack_length();
            v_ptr = &v_ll;
            break;
        }
        case Item::FUNC_ITEM: {
            Item_func *rhs_func = static_cast<Item_func*>(rhs_item);
            Item **arguments = rhs_func->arguments();
            if (strcmp(rhs_func->func_name(), "if") == 0) {
                update_operation = '-'; // we only support one if function for now, and it is a descrement with floor.
                v_ll = 1;
            } else if (rhs_func->argument_count() == 1) {
                update_operation = '=';
                v_ll = rhs_func->val_int();
            } else {
                update_operation = rhs_func->func_name()[0];
                v_ll = arguments[1]->val_int();
            }
            v_length = lhs_field->pack_length();
            v_ptr = &v_ll;
            break;
        }
        default:
            assert(0);
        }
        break;
    }

    case MYSQL_TYPE_STRING: {
        update_operation = '=';
        field_type = lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;        
        v_str = *rhs_item->val_str(&v_str);
        v_length = v_str.length();
        if (v_length >= lhs_field->pack_length()) {
            v_length = lhs_field->pack_length();
            v_str.length(v_length); // truncate
        } else {
            v_length = lhs_field->pack_length();
            uchar pad_char = lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
            v_str.fill(lhs_field->pack_length(), pad_char); // pad
        }
        v_ptr = v_str.c_ptr();
        break;
    }
    default:
        assert(0);
    }

    // marshall the update fields into the buffer
    b.append(&update_operation, sizeof update_operation);
    b.append(&field_type, sizeof field_type);
    b.append(&field_num, sizeof field_num);
    b.append(&field_null_num, sizeof field_null_num);
    b.append(&offset, sizeof offset);
    b.append(&v_length, sizeof v_length);
    b.append(v_ptr, v_length);
}

// Save an item's value into the appropriate field.  Return 0 if successful.
static int save_in_field(Item *item, TABLE *table) {
    assert(item->type() == Item::FUNC_ITEM);
    Item_func *func = static_cast<Item_func*>(item);
    assert(strcmp(func->func_name(), "=") == 0);
    uint n = func->argument_count();
    assert(n == 2);
    Item **arguments = func->arguments();
    assert(arguments[0]->type() == Item::FIELD_ITEM);
    Item_field *field_item = static_cast<Item_field*>(arguments[0]);
    my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
    int error = arguments[1]->save_in_field(field_item->field, 0);
    dbug_tmp_restore_column_map(table->write_set, old_map);
    return error;
}

// Generate an update message for an update operation and send it into the primary tree.  Return 0 if successful.
575
int ha_tokudb::send_update_message(List<Item> &update_fields, List<Item> &update_values, Item *conds, DB_TXN *txn) {
576 577 578 579 580 581 582 583 584 585
    int error;

    // Save the primary key from the where conditions
    Item::Type t = conds->type();
    if (t == Item::FUNC_ITEM) {
        error = save_in_field(conds, table);
    } else if (t == Item::COND_ITEM) {
        Item_cond *cond_item = static_cast<Item_cond*>(conds);
        List_iterator<Item> li(*cond_item->argument_list());
        Item *list_item;
586 587
        error = 0;
        while (error == 0 && (list_item = li++)) {
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
            error = save_in_field(list_item, table);
        }
    } else
        assert(0);
    if (error)
        return error;

    // put the primary key into key_buff and wrap it with key_dbt
    DBT key_dbt; 
    bool has_null;
    create_dbt_key_from_table(&key_dbt, primary_key, key_buff, table->record[0], &has_null);
    
    // construct the update message
    tokudb::buffer update_message;
        
    uchar operation = UPDATE_OP_SIMPLE_UPDATE;
    update_message.append(&operation, sizeof operation);
    
    // append the descriptor
    marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);    

    // append the updates
610 611 612 613 614
    uint32_t num_updates = update_fields.elements;
    update_message.append(&num_updates, sizeof num_updates);

    List_iterator<Item> lhs_i(update_fields);
    List_iterator<Item> rhs_i(update_values);
615 616 617 618 619 620 621 622 623 624
    while (error == 0) {
        Item *lhs_item = lhs_i++;
        if (lhs_item == NULL)
            break;
        Item *rhs_item = rhs_i++;
        if (rhs_item == NULL)
            assert(0); // can not happen
        marshall_simple_update(update_message, lhs_item, rhs_item, table, share);
    }

625 626 627 628 629 630 631 632 633 634 635 636 637
    rw_rdlock(&share->num_DBs_lock);

    if (share->num_DBs > table->s->keys + test(hidden_primary_key)) {// hot index in progress
        error = ENOTSUP; // run on the slow path
    } else {
        // send the message    
        DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
        update_dbt.data = update_message.data();
        update_dbt.size = update_message.size();
        error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
    }

    rw_unlock(&share->num_DBs_lock);
638 639 640 641 642 643
        
    return error;
}

// Determine if an upsert operation can be offloaded to the storage engine.
// An upsert consists of a row and a list of update expressions (update_fields[i] = update_values[i]).
644 645
// The function returns 0 if the upsert is handled in the storage engine.  Otherwise, an error code is returned.
int ha_tokudb::upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) {
646 647
    TOKUDB_DBUG_ENTER("ha_tokudb::upsert");

648 649
    int error = 0;

650
    if (tokudb_debug & TOKUDB_DEBUG_UPSERT) {
651 652 653 654 655
        fprintf(stderr, "upsert\n");
        dump_item_list("update_fields", update_fields);
        dump_item_list("update_values", update_values);
    }

656 657 658 659
    if (update_fields.elements < 1 || update_fields.elements != update_values.elements) {
        error = ENOTSUP;  // not an upsert or something is fishy with the parameters
        goto return_error;
    }
660

661
    if (!check_upsert(thd, update_fields, update_values)) {
662
        error = ENOTSUP;
663 664
        goto check_error;
    }
665
    
666
    error = send_upsert_message(thd, update_fields, update_values, transaction);
667

668
check_error:
669 670 671 672 673 674
    if (error != 0) {
        if (get_disable_slow_upsert(thd))
            error = HA_ERR_UNSUPPORTED;
        if (error != ENOTSUP)
            print_error(error, MYF(0));
    }
675 676 677

return_error:
    TOKUDB_DBUG_RETURN(error);
678 679 680 681
}

// Check if an upsert can be handled by this storage engine.  Return trus if it can.
bool ha_tokudb::check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values) {
Rich Prohaska's avatar
Rich Prohaska committed
682 683 684 685 686 687 688 689
    // fast upserts disabled
    if (!get_enable_fast_upsert(thd))
        return false;

    if (!transaction)
        return false;

    // avoid strict mode arithmetic overflow issues
690
    if (is_strict_mode(thd))
Rich Prohaska's avatar
Rich Prohaska committed
691 692
        return false;

693 694 695 696 697 698 699 700 701 702 703 704
    // no triggers
    if (table->triggers) 
        return false;
    
    // no binlog
    if (mysql_bin_log.is_open())
        return false;

    // primary key must exist
    if (table->s->primary_key >= table->s->keys)
        return false;

705 706
    // no secondary keys
    if (table->s->keys > 1)
707 708 709 710 711 712 713 714 715
        return false;

    if (!check_all_update_expressions(update_fields, update_values, table))
        return false;

    return true;
}

// Generate an upsert message and send it into the primary tree.  Return 0 if successful.
716
int ha_tokudb::send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn) {
717 718 719 720 721
    int error = 0;

    // generate primary key
    DBT key_dbt;
    bool has_null;
722
    create_dbt_key_from_table(&key_dbt, primary_key, primary_key_buff, table->record[0], &has_null);
723 724 725

    // generate packed row
    DBT row;
726
    error = pack_row(&row, (const uchar *) table->record[0], primary_key);
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744
    if (error)
        return error;

    tokudb::buffer update_message;

    // append the operation
    uchar operation = UPDATE_OP_SIMPLE_UPSERT;
    update_message.append(&operation, sizeof operation);

    // append the row
    uint32_t row_length = row.size;
    update_message.append(&row_length, sizeof row_length);
    update_message.append(row.data, row_length);

    // append the descriptor
    marshall_simple_descriptor(update_message, table, share->kc_info, primary_key);

    // append the update expressions
745 746 747
    uint32_t num_updates = update_fields.elements;
    update_message.append(&num_updates, sizeof num_updates);

748 749 750 751 752 753 754 755 756 757 758 759
    List_iterator<Item> lhs_i(update_fields);
    List_iterator<Item> rhs_i(update_values);
    while (1) {
        Item *lhs_item = lhs_i++;
        if (lhs_item == NULL)
            break;
        Item *rhs_item = rhs_i++;
        if (rhs_item == NULL)
            assert(0); // can not happen
        marshall_simple_update(update_message, lhs_item, rhs_item, table, share);
    }

760 761 762 763 764 765 766 767 768 769 770 771 772
    rw_rdlock(&share->num_DBs_lock);

    if (share->num_DBs > table->s->keys + test(hidden_primary_key)) { // hot index in progress
        error = ENOTSUP; // run on the slow path
    } else {
        // send the upsert message
        DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
        update_dbt.data = update_message.data();
        update_dbt.size = update_message.size();
        error = share->key_file[primary_key]->update(share->key_file[primary_key], txn, &key_dbt, &update_dbt, 0);
    }

    rw_unlock(&share->num_DBs_lock);
773 774 775 776 777

    return error;
}

#endif