sql_insert.cc 182 KB
Newer Older
1
/*
2
   Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3
   Copyright (c) 2010, 2022, MariaDB Corporation
unknown's avatar
unknown committed
4

unknown's avatar
unknown committed
5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
7
   the Free Software Foundation; version 2 of the License.
unknown's avatar
unknown committed
8

unknown's avatar
unknown committed
9 10 11 12
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
13

unknown's avatar
unknown committed
14 15
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
Vicențiu Ciorbaru's avatar
Vicențiu Ciorbaru committed
16
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
17
*/
unknown's avatar
unknown committed
18 19 20

/* Insert of records */

21 22 23 24 25 26 27 28 29 30 31 32 33
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
34
  Delayed_insert::get_local_table() the table of the thread is copied
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

59
#include "mariadb.h"                 /* NO_EMBEDDED_ACCESS_CHECKS */
60 61 62 63 64 65 66
#include "sql_priv.h"
#include "sql_insert.h"
#include "sql_update.h"                         // compare_record
#include "sql_base.h"                           // close_thread_tables
#include "sql_cache.h"                          // query_cache_*
#include "key.h"                                // key_copy
#include "lock.h"                               // mysql_unlock_tables
67
#include "sp_head.h"
68 69
#include "sql_view.h"         // check_key_in_view, insert_view_fields
#include "sql_table.h"        // mysql_create_table_no_lock
70
#include "sql_trigger.h"
71
#include "sql_select.h"
72
#include "sql_show.h"
73
#include "slave.h"
74
#include "sql_parse.h"                          // end_active_trans
75
#include "rpl_mi.h"
Konstantin Osipov's avatar
Konstantin Osipov committed
76
#include "transaction.h"
77
#include "sql_audit.h"
Sergei Golubchik's avatar
Sergei Golubchik committed
78
#include "sql_derived.h"                        // mysql_handle_derived
79
#include "sql_prepare.h"
80 81
#include "debug_sync.h"                         // DEBUG_SYNC
#include "debug.h"                              // debug_crash_here
82
#include <my_bit.h>
83
#include "rpl_rli.h"
unknown's avatar
unknown committed
84

Brave Galera Crew's avatar
Brave Galera Crew committed
85
#ifdef WITH_WSREP
86
#include "wsrep_mysqld.h" /* wsrep_append_table_keys() */
Brave Galera Crew's avatar
Brave Galera Crew committed
87 88 89
#include "wsrep_trans_observer.h" /* wsrep_start_transction() */
#endif /* WITH_WSREP */

unknown's avatar
unknown committed
90
#ifndef EMBEDDED_LIBRARY
91 92
static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
                              TABLE_LIST *table_list);
unknown's avatar
unknown committed
93
static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
94
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
95
static void end_delayed_insert(THD *thd);
96
pthread_handler_t handle_delayed_insert(void *arg);
97
static void unlink_blobs(TABLE *table);
unknown's avatar
unknown committed
98
#endif
99 100
static bool check_view_insertability(THD *thd, TABLE_LIST *view,
                                     List<Item> &fields);
Marko Mäkelä's avatar
Marko Mäkelä committed
101 102
static int binlog_show_create_table_(THD *thd, TABLE *table,
                                     Table_specification_st *create_info);
unknown's avatar
unknown committed
103

104 105 106
/*
  Check that insert/update fields are from the same single table of a view.

107 108 109 110 111
  @param fields            The insert/update fields to be checked.
  @param values            The insert/update values to be checked, NULL if
  checking is not wanted.
  @param view              The view for insert.
  @param map     [in/out]  The insert table map.
112

113
  This function is called in 2 cases:
114 115 116 117 118 119 120 121 122
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

123
  @returns false if success.
124 125
*/

Sergei Golubchik's avatar
Sergei Golubchik committed
126 127 128
static bool check_view_single_update(List<Item> &fields, List<Item> *values,
                                     TABLE_LIST *view, table_map *map,
                                     bool insert)
129 130 131 132 133 134 135 136 137 138
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

139 140 141 142 143 144 145 146
  /*
    Check that table is only one
    (we can not rely on check_single_table because it skips some
    types of tables)
  */
  if (my_count_bits(tables) > 1)
    goto error;

147 148 149 150
  if (values)
  {
    it.init(*values);
    while ((item= it++))
151
      tables|= item->view_used_tables(view);
152 153 154 155 156
  }

  /* Convert to real table bits */
  tables&= ~PSEUDO_TABLE_BITS;

157 158 159 160 161 162 163 164 165 166 167
  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

168 169 170
  /* view->table should have been set in mysql_derived_merge_for_insert */
  DBUG_ASSERT(view->table);

171
  /*
172
    Use buffer for the insert values that was allocated for the merged view.
173
  */
Igor Babaev's avatar
Igor Babaev committed
174
  tbl->table->insert_values= view->table->insert_values;
175
  view->table= tbl->table;
176 177 178
  if (!tbl->single_table_updatable())
  {
    if (insert)
179
      my_error(ER_NON_INSERTABLE_TABLE, MYF(0), view->alias.str, "INSERT");
180
    else
181
      my_error(ER_NON_UPDATABLE_TABLE, MYF(0), view->alias.str, "UPDATE");
182 183
    return TRUE;
  }
184 185 186 187 188 189 190 191 192 193
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

194

unknown's avatar
unknown committed
195
/*
196
  Check if insert fields are correct.
197

198 199 200 201 202 203 204 205 206 207
  @param thd            The current thread.
  @param table_list     The table we are inserting into (may be view)
  @param fields         The insert fields.
  @param values         The insert values.
  @param check_unique   If duplicate values should be rejected.
  @param fields_and_values_from_different_maps If 'values' are allowed to
  refer to other tables than those of 'fields'
  @param map            See check_view_single_update
  
  @returns 0 if success, -1 if error
unknown's avatar
unknown committed
208 209
*/

unknown's avatar
unknown committed
210 211
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
212 213 214
                               bool check_unique,
                               bool fields_and_values_from_different_maps,
                               table_map *map)
unknown's avatar
unknown committed
215
{
unknown's avatar
VIEW  
unknown committed
216
  TABLE *table= table_list->table;
217
  DBUG_ENTER("check_insert_fields");
218

219
  if (!table_list->single_table_updatable())
220
  {
221
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
222
    DBUG_RETURN(-1);
223 224
  }

unknown's avatar
unknown committed
225 226
  if (fields.elements == 0 && values.elements != 0)
  {
227 228 229 230
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
231
      DBUG_RETURN(-1);
232
    }
233
    if (values.elements != table->s->visible_fields)
unknown's avatar
unknown committed
234
    {
235
      thd->get_stmt_da()->reset_current_row_for_warning(1);
unknown's avatar
unknown committed
236
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
237
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
238
    }
unknown's avatar
unknown committed
239
#ifndef NO_EMBEDDED_ACCESS_CHECKS
240 241 242
    Field_iterator_table_ref field_it;
    field_it.set(table_list);
    if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
243
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
244
#endif
245 246 247 248
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
249
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
250 251 252
  }
  else
  {						// Part field list
253
    SELECT_LEX *select_lex= thd->lex->first_select_lex();
unknown's avatar
unknown committed
254
    Name_resolution_context *context= &select_lex->context;
255
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
256
    int res;
unknown's avatar
unknown committed
257

unknown's avatar
unknown committed
258 259
    if (fields.elements != values.elements)
    {
260
      thd->get_stmt_da()->reset_current_row_for_warning(1);
unknown's avatar
unknown committed
261
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
262
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
263 264
    }

265
    thd->dup_field= 0;
unknown's avatar
unknown committed
266 267 268
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
269
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
270 271 272 273 274

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
275
    table_list->next_local= 0;
276
    context->resolve_in_table_list_only(table_list);
277 278 279 280
    /* 'Unfix' fields to allow correct marking by the setup_fields function. */
    if (table_list->is_view())
      unfix_fields(fields);

281 282
    res= setup_fields(thd, Ref_ptr_array(),
                      fields, MARK_COLUMNS_WRITE, 0, NULL, 0);
unknown's avatar
unknown committed
283 284

    /* Restore the current context. */
285
    ctx_state.restore_state(context, table_list);
286
    thd->lex->first_select_lex()->no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
287

unknown's avatar
unknown committed
288
    if (res)
289
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
290

291
    if (table_list->is_view() && table_list->is_merged_derived())
292
    {
293 294 295
      if (check_view_single_update(fields,
                                   fields_and_values_from_different_maps ?
                                   (List<Item>*) 0 : &values,
296
                                   table_list, map, true))
297
        DBUG_RETURN(-1);
298
      table= table_list->table;
299
    }
300

301
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
302
    {
303 304
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0),
               thd->dup_field->field_name.str);
305
      DBUG_RETURN(-1);
unknown's avatar
unknown committed
306 307
    }
  }
unknown's avatar
unknown committed
308
  // For the values we need select_priv
unknown's avatar
unknown committed
309
#ifndef NO_EMBEDDED_ACCESS_CHECKS
310
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
unknown committed
311
#endif
312 313 314

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
315
       check_view_insertability(thd, table_list, fields)))
316
  {
317
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
318
    DBUG_RETURN(-1);
319 320
  }

321
  DBUG_RETURN(0);
unknown's avatar
unknown committed
322 323
}

324 325
static bool has_no_default_value(THD *thd, Field *field, TABLE_LIST *table_list)
{
326 327
  if ((field->flags & (NO_DEFAULT_VALUE_FLAG | VERS_ROW_START | VERS_ROW_END))
       == NO_DEFAULT_VALUE_FLAG && field->real_type() != MYSQL_TYPE_ENUM)
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
  {
    bool view= false;
    if (table_list)
    {
      table_list= table_list->top_table();
      view= table_list->view != NULL;
    }
    if (view)
    {
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_VIEW_FIELD,
                          ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
                          table_list->view_db.str, table_list->view_name.str);
    }
    else
    {
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_FIELD,
344 345
                          ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
                          field->field_name.str);
346
    }
347
    return thd->really_abort_on_warning();
348 349 350 351
  }
  return false;
}

unknown's avatar
unknown committed
352

353 354
/**
  Check if update fields are correct.
355

356 357 358 359 360 361 362
  @param thd                  The current thread.
  @param insert_table_list    The table we are inserting into (may be view)
  @param update_fields        The update fields.
  @param update_values        The update values.
  @param fields_and_values_from_different_maps If 'update_values' are allowed to
  refer to other tables than those of 'update_fields'
  @param map                  See check_view_single_update
363

Sergei Golubchik's avatar
Sergei Golubchik committed
364 365 366
  @note
  If the update fields include an autoinc field, set the
  table->next_number_field_updated flag.
367

368
  @returns 0 if success, -1 if error
369 370
*/

unknown's avatar
unknown committed
371
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
372
                               List<Item> &update_fields,
373 374 375
                               List<Item> &update_values,
                               bool fields_and_values_from_different_maps,
                               table_map *map)
376
{
unknown's avatar
unknown committed
377
  TABLE *table= insert_table_list->table;
378
  my_bool UNINIT_VAR(autoinc_mark);
379
  enum_sql_command sql_command_save= thd->lex->sql_command;
380

381 382 383 384 385 386 387 388 389 390 391 392 393
  table->next_number_field_updated= FALSE;

  if (table->found_next_number_field)
  {
    /*
      Unmark the auto_increment field so that we can check if this is modified
      by update_fields
    */
    autoinc_mark= bitmap_test_and_clear(table->write_set,
                                        table->found_next_number_field->
                                        field_index);
  }

394 395
  thd->lex->sql_command= SQLCOM_UPDATE;

396
  /* Check the fields we are going to modify */
397
  if (setup_fields(thd, Ref_ptr_array(),
398
                   update_fields, MARK_COLUMNS_WRITE, 0, NULL, 0))
399 400
  {
    thd->lex->sql_command= sql_command_save;
401
    return -1;
402 403 404
  }

  thd->lex->sql_command= sql_command_save;
405

406 407
  if (insert_table_list->is_view() &&
      insert_table_list->is_merged_derived() &&
408 409 410
      check_view_single_update(update_fields,
                               fields_and_values_from_different_maps ?
                               (List<Item>*) 0 : &update_values,
411
                               insert_table_list, map, false))
412 413
    return -1;

414
  if (table->default_field)
415
    table->mark_default_fields_for_write(FALSE);
416 417 418 419 420 421 422 423 424 425 426 427

  if (table->found_next_number_field)
  {
    if (bitmap_is_set(table->write_set,
                      table->found_next_number_field->field_index))
      table->next_number_field_updated= TRUE;

    if (autoinc_mark)
      bitmap_set_bit(table->write_set,
                     table->found_next_number_field->field_index);
  }

428 429 430
  return 0;
}

431 432 433 434 435 436 437 438
/**
  Upgrade table-level lock of INSERT statement to TL_WRITE if
  a more concurrent lock is infeasible for some reason. This is
  necessary for engines without internal locking support (MyISAM).
  An engine with internal locking implementation might later
  downgrade the lock in handler::store_lock() method.
*/

unknown's avatar
unknown committed
439
static
440
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
441
                       enum_duplicates duplic)
442 443
{
  if (duplic == DUP_UPDATE ||
444
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
445
  {
446
    *lock_type= TL_WRITE_DEFAULT;
447 448 449 450 451 452 453
    return;
  }

  if (*lock_type == TL_WRITE_DELAYED)
  {
    /*
      We do not use delayed threads if:
unknown's avatar
unknown committed
454 455 456 457 458
      - we're running in the safe mode or skip-new mode -- the
        feature is disabled in these modes
      - we're executing this statement on a replication slave --
        we need to ensure serial execution of queries on the
        slave
459 460
      - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
        insert cannot be concurrent
unknown's avatar
unknown committed
461 462 463 464 465 466 467 468 469 470 471 472
      - this statement is directly or indirectly invoked from
        a stored function or trigger (under pre-locking) - to
        avoid deadlocks, since INSERT DELAYED involves a lock
        upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
        attempt while keeping other table level locks.
      - this statement itself may require pre-locking.
        We should upgrade the lock even though in most cases
        delayed functionality may work. Unfortunately, we can't
        easily identify whether the subject table is not used in
        the statement indirectly via a stored function or trigger:
        if it is used, that will lead to a deadlock between the
        client connection and the delayed thread.
473
      - client explicitly ask to retrieve unitary changes
474 475
    */
    if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
unknown's avatar
unknown committed
476
        thd->variables.max_insert_delayed_threads == 0 ||
Konstantin Osipov's avatar
Konstantin Osipov committed
477
        thd->locked_tables_mode > LTM_LOCK_TABLES ||
478 479
        thd->lex->uses_stored_routines() /*||
        thd->lex->describe*/)
480 481 482 483
    {
      *lock_type= TL_WRITE;
      return;
    }
484 485 486 487 488 489 490 491

    /* client explicitly asked to retrieved each affected rows and insert ids */
    if (thd->need_report_unit_results())
    {
      *lock_type= TL_WRITE;
      return;
    }

492 493 494 495 496 497 498 499
    if (thd->slave_thread)
    {
      /* Try concurrent insert */
      *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
                  TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
      return;
    }

500
    bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
501
    if (thd->wsrep_binlog_format(global_system_variables.binlog_format) == BINLOG_FORMAT_STMT &&
502
        log_on && mysql_bin_log.is_open())
503 504 505 506 507 508 509 510 511 512 513 514 515 516
    {
      /*
        Statement-based binary logging does not work in this case, because:
        a) two concurrent statements may have their rows intermixed in the
        queue, leading to autoincrement replication problems on slave (because
        the values generated used for one statement don't depend only on the
        value generated for the first row of this statement, so are not
        replicable)
        b) if first row of the statement has an error the full statement is
        not binlogged, while next rows of the statement may be inserted.
        c) if first row succeeds, statement is binlogged immediately with a
        zero error code (i.e. "no error"), if then second row fails, query
        will fail on slave too and slave will stop (wrongly believing that the
        master got no error).
517 518 519 520 521 522 523 524 525
        So we fallback to non-delayed INSERT.
        Note that to be fully correct, we should test the "binlog format which
        the delayed thread is going to use for this row". But in the common case
        where the global binlog format is not changed and the session binlog
        format may be changed, that is equal to the global binlog format.
        We test it without mutex for speed reasons (condition rarely true), and
        in the common case (global not changed) it is as good as without mutex;
        if global value is changed, anyway there is uncertainty as the delayed
        thread may be old and use the before-the-change value.
526 527 528 529 530 531 532
      */
      *lock_type= TL_WRITE;
    }
  }
}


unknown's avatar
unknown committed
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
/**
  Find or create a delayed insert thread for the first table in
  the table list, then open and lock the remaining tables.
  If a table can not be used with insert delayed, upgrade the lock
  and open and lock all tables using the standard mechanism.

  @param thd         thread context
  @param table_list  list of "descriptors" for tables referenced
                     directly in statement SQL text.
                     The first element in the list corresponds to
                     the destination table for inserts, remaining
                     tables, if any, are usually tables referenced
                     by sub-queries in the right part of the
                     INSERT.

  @return Status of the operation. In case of success 'table'
  member of every table_list element points to an instance of
  class TABLE.

  @sa open_and_lock_tables for more information about MySQL table
  level locking
*/

static
bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
{
559
  MDL_request protection_request;
unknown's avatar
unknown committed
560 561 562
  DBUG_ENTER("open_and_lock_for_insert_delayed");

#ifndef EMBEDDED_LIBRARY
563 564 565 566 567 568 569
  /* INSERT DELAYED is not allowed in a read only transaction. */
  if (thd->tx_read_only)
  {
    my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
    DBUG_RETURN(true);
  }

570 571
  /*
    In order for the deadlock detector to be able to find any deadlocks
572 573 574 575 576
    caused by the handler thread waiting for GRL or this table, we acquire
    protection against GRL (global IX metadata lock) and metadata lock on
    table to being inserted into inside the connection thread.
    If this goes ok, the tickets are cloned and added to the list of granted
    locks held by the handler thread.
577
  */
578
  if (thd->has_read_only_protection())
579 580
    DBUG_RETURN(TRUE);

581 582
  MDL_REQUEST_INIT(&protection_request, MDL_key::BACKUP, "", "",
                   MDL_BACKUP_DML, MDL_STATEMENT);
583 584 585 586 587

  if (thd->mdl_context.acquire_lock(&protection_request,
                                    thd->variables.lock_wait_timeout))
    DBUG_RETURN(TRUE);

588 589 590 591 592 593
  if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
                                    thd->variables.lock_wait_timeout))
    /*
      If a lock can't be acquired, it makes no sense to try normal insert.
      Therefore we just abort the statement.
    */
unknown's avatar
unknown committed
594 595
    DBUG_RETURN(TRUE);

596
  bool error= FALSE;
597
  if (delayed_get_table(thd, &protection_request, table_list))
598 599
    error= TRUE;
  else if (table_list->table)
unknown's avatar
unknown committed
600 601 602 603 604
  {
    /*
      Open tables used for sub-selects or in stored functions, will also
      cache these functions.
    */
605
    if (table_list->next_global &&
606
        open_and_lock_tables(thd, table_list->next_global, TRUE,
607
                             MYSQL_OPEN_IGNORE_ENGINE_STATS))
unknown's avatar
unknown committed
608 609
    {
      end_delayed_insert(thd);
610 611 612 613 614 615 616 617 618 619
      error= TRUE;
    }
    else
    {
      /*
        First table was not processed by open_and_lock_tables(),
        we need to set updatability flag "by hand".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
unknown's avatar
unknown committed
620 621
    }
  }
622 623

  /*
624 625 626 627 628 629
    We can't release protection against GRL and metadata lock on the table
    being inserted into here. These locks might be required, for example,
    because this INSERT DELAYED calls functions which may try to update
    this or another tables (updating the same table is of course illegal,
    but such an attempt can be discovered only later during statement
    execution).
630 631 632 633 634
  */

  /*
    Reset the ticket in case we end up having to use normal insert and
    therefore will reopen the table and reacquire the metadata lock.
635 636 637 638 639
  */
  table_list->mdl_request.ticket= NULL;

  if (error || table_list->table)
    DBUG_RETURN(error);
unknown's avatar
unknown committed
640 641 642 643 644 645 646 647 648 649 650
#endif
  /*
    * This is embedded library and we don't have auxiliary
    threads OR
    * a lock upgrade was requested inside delayed_get_table
      because
      - there are too many delayed insert threads OR
      - the table has triggers.
    Use a normal insert.
  */
  table_list->lock_type= TL_WRITE;
651
  DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
unknown's avatar
unknown committed
652 653 654
}


655 656 657 658 659 660 661 662 663 664 665 666 667 668
/**
  Create a new query string for removing DELAYED keyword for
  multi INSERT DEALAYED statement.

  @param[in] thd                 Thread handler
  @param[in] buf                 Query string

  @return
             0           ok
             1           error
*/
static int
create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
{
669
  /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
Monty's avatar
Monty committed
670
  if (buf->append(thd->query(), thd->query_length()) ||
671 672
      buf->replace(thd->lex->keyword_delayed_begin_offset,
                   thd->lex->keyword_delayed_end_offset -
673
                   thd->lex->keyword_delayed_begin_offset, NULL, 0))
674 675 676 677 678
    return 1;
  return 0;
}


679 680
static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
{
Monty's avatar
Monty committed
681
  Explain_insert* explain= new (thd->mem_root) Explain_insert(thd->mem_root);
682 683
  explain->table_name.append(table_list->table->alias);

684
  thd->lex->explain->add_insert_plan(explain);
685 686
  
  /* Save subquery children */
687
  for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit();
688 689 690
       unit;
       unit= unit->next_unit())
  {
691
    if (unit->explainable())
692 693 694 695 696
      explain->add_child(unit->first_select()->select_number);
  }
}


697 698 699 700 701 702
Field **TABLE::field_to_fill()
{
  return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field;
}


703 704
/**
  INSERT statement implementation
705

706 707 708 709 710
  SYNOPSIS
  mysql_insert()
  result    NULL if the insert is not outputing results
            via 'RETURNING' clause.

711 712 713
  @note Like implementations of other DDL/DML in MySQL, this function
  relies on the caller to close the thread tables. This is done in the
  end of dispatch_command().
714
*/
715 716 717 718
bool mysql_insert(THD *thd, TABLE_LIST *table_list,
                  List<Item> &fields, List<List_item> &values_list,
                  List<Item> &update_fields, List<Item> &update_values,
                  enum_duplicates duplic, bool ignore, select_result *result)
unknown's avatar
unknown committed
719
{
720
  bool retval= true;
721
  int error, res;
722
  bool transactional_table, joins_freed= FALSE;
723
  bool changed;
724
  const bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
725
  bool using_bulk_insert= 0;
unknown's avatar
unknown committed
726
  uint value_count;
727 728
  /* counter of iteration in bulk PS operation*/
  ulonglong iteration= 0;
729
  ulonglong last_affected_rows= 0;
unknown's avatar
unknown committed
730 731
  ulonglong id;
  COPY_INFO info;
732
  TABLE *table= 0;
unknown's avatar
unknown committed
733
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
734
  List_item *values;
unknown's avatar
unknown committed
735
  Name_resolution_context *context;
736
  Name_resolution_context_state ctx_state;
737
  SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
738
  unsigned char *readbuff= NULL;
739

740
#ifndef EMBEDDED_LIBRARY
741
  char *query= thd->query();
unknown's avatar
unknown committed
742 743 744
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
745
    runs without --log-bin).
unknown's avatar
unknown committed
746
  */
747
  bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
unknown's avatar
unknown committed
748
#endif
749
  thr_lock_type lock_type;
unknown's avatar
unknown committed
750
  Item *unused_conds= 0;
unknown's avatar
unknown committed
751 752
  DBUG_ENTER("mysql_insert");

753
  bzero((char*) &info,sizeof(info));
754
  create_explain_query(thd->lex, thd->mem_root);
755
  /*
756 757 758
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
759
  upgrade_lock_type(thd, &table_list->lock_type, duplic);
760 761 762 763

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
764
    never be able to get a lock on the table.
765
  */
Sergei Golubchik's avatar
Sergei Golubchik committed
766
  if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables_mode &&
767 768
      find_locked_table(thd->open_tables, table_list->db.str,
                        table_list->table_name.str))
769
  {
770
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
771
             table_list->table_name.str);
772
    DBUG_RETURN(TRUE);
773
  }
unknown's avatar
unknown committed
774

unknown's avatar
unknown committed
775
  if (table_list->lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
776
  {
unknown's avatar
unknown committed
777 778
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
779 780
  }
  else
unknown's avatar
unknown committed
781
  {
782
    if (open_and_lock_tables(thd, table_list, TRUE, 0))
unknown's avatar
unknown committed
783 784
      DBUG_RETURN(TRUE);
  }
785

786
  THD_STAGE_INFO(thd, stage_init_update);
787
  lock_type= table_list->lock_type;
788
  thd->lex->used_tables=0;
unknown's avatar
unknown committed
789
  values= its++;
790 791
  if (bulk_parameters_set(thd))
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
792
  value_count= values->elements;
unknown's avatar
unknown committed
793

794
  if ((res= mysql_prepare_insert(thd, table_list, fields, values,
795
                                 update_fields, update_values, duplic, ignore,
796 797 798 799 800 801 802 803 804 805 806 807
                                 &unused_conds, FALSE)))
  {
    retval= thd->is_error();
    if (res < 0)
    {
      /*
        Insert should be ignored but we have to log the query in statement
        format in the binary log
      */
      if (thd->binlog_current_query_unfiltered())
        retval= 1;
    }
unknown's avatar
unknown committed
808
    goto abort;
809 810 811
  }
  /* mysql_prepare_insert sets table_list->table if it was not set */
  table= table_list->table;
812

813 814
  /* Prepares LEX::returing_list if it is not empty */
  if (returning)
815
  {
816
    result->prepare(returning->item_list, NULL);
817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
    if (thd->is_bulk_op())
    {
      /*
        It is RETURNING which needs network buffer to write result set and
        it is array binfing which need network buffer to read parameters.
        So we allocate yet another network buffer.
        The old buffer will be freed at the end of operation.
      */
      DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
      readbuff= thd->net.buff; // old buffer
      if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
      {
        readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
        goto abort;
      }
    }
  }
834

835
  context= &thd->lex->first_select_lex()->context;
836 837 838 839 840 841 842 843 844
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

unknown's avatar
unknown committed
845
  /* Save the state of the current name resolution context. */
846
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
847 848 849 850 851

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
852
  table_list->next_local= 0;
unknown's avatar
unknown committed
853
  context->resolve_in_table_list_only(table_list);
854
  switch_to_nullable_trigger_fields(*values, table);
unknown's avatar
unknown committed
855

856 857 858 859 860 861 862 863 864 865
  /*
    Check assignability for the leftmost () in VALUES:
      INSERT INTO t1 (a,b) VALUES (1,2), (3,4);
    This checks if the values (1,2) can be assigned to fields (a,b).
    The further values, e.g. (3,4) are not checked - they will be
    checked during the execution time (when processing actual rows).
    This is to preserve the "insert until the very first error"-style
    behaviour for non-transactional tables.
  */
  if (values->elements &&
866 867
      table_list->table->check_assignability_opt_fields(fields, *values,
                                                        ignore))
868 869
    goto abort;

870
  while ((values= its++))
unknown's avatar
unknown committed
871
  {
872
    thd->get_stmt_da()->inc_current_row_for_warning();
unknown's avatar
unknown committed
873 874
    if (values->elements != value_count)
    {
875 876
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0),
               thd->get_stmt_da()->current_row_for_warning());
unknown's avatar
unknown committed
877 878
      goto abort;
    }
879 880
    if (setup_fields(thd, Ref_ptr_array(),
                     *values, MARK_COLUMNS_READ, 0, NULL, 0))
unknown's avatar
unknown committed
881
      goto abort;
882
    switch_to_nullable_trigger_fields(*values, table);
unknown's avatar
unknown committed
883 884
  }
  its.rewind ();
885
  thd->get_stmt_da()->reset_current_row_for_warning(0);
unknown's avatar
unknown committed
886 887
 
  /* Restore the current context. */
888
  ctx_state.restore_state(context, table_list);
889 890 891 892 893 894 895 896
  
  if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
  {
    goto abort;
  }
  save_insert_query_plan(thd, table_list);
  if (thd->lex->describe)
  {
897 898
    bool extended= thd->lex->describe & DESCRIBE_EXTENDED;
    retval= thd->lex->explain->send_explain(thd, extended);
899
    goto abort;
900
  }
unknown's avatar
unknown committed
901

unknown's avatar
unknown committed
902
  /*
unknown's avatar
unknown committed
903
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
904
  */
905
  info.ignore= ignore;
unknown's avatar
unknown committed
906
  info.handle_duplicates=duplic;
907 908
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
909
  info.view= (table_list->view ? table_list : 0);
910
  info.table_list= table_list;
911

912 913 914
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
915
    to NULL.
916
  */
Sergei Golubchik's avatar
Sergei Golubchik committed
917 918
  thd->count_cuted_fields= (values_list.elements == 1 && !ignore)
                           ? CHECK_FIELD_ERROR_FOR_NULL : CHECK_FIELD_WARN;
unknown's avatar
unknown committed
919 920 921
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

922
#ifdef HAVE_REPLICATION
923
  if (thd->rgi_slave &&
924 925
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
926
      rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
927 928 929
    goto abort;
#endif

unknown's avatar
unknown committed
930
  error=0;
931 932 933
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
934 935
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
936 937 938 939 940 941 942

  thd->abort_on_warning= !ignore && thd->is_strict_mode();

  table->reset_default_fields();
  table->prepare_triggers_for_insert_stmt_or_event();
  table->mark_columns_needed_for_insert();

943 944 945 946
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
947 948 949 950
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
951
    So we call start_bulk_insert to perform necessary checks on
952 953 954
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
955
#ifndef EMBEDDED_LIBRARY
956
  if (lock_type != TL_WRITE_DELAYED)
957 958
#endif /* EMBEDDED_LIBRARY */
  {
959
    bool create_lookup_handler= false;
960
    if (duplic != DUP_ERROR || ignore)
961
    {
962
      create_lookup_handler= true;
963
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
964 965 966 967 968
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
      {
        if (table->file->ha_rnd_init_with_error(0))
          goto abort;
      }
969
    }
970
    if (table->file->prepare_for_modify(true, create_lookup_handler))
971
      goto abort;
Konstantin Osipov's avatar
Konstantin Osipov committed
972 973 974 975 976 977 978
    /**
      This is a simple check for the case when the table has a trigger
      that reads from it, or when the statement invokes a stored function
      that reads from the table being inserted to.
      Engines can't handle a bulk insert in parallel with a read form the
      same table in the same connection.
    */
Sergei Golubchik's avatar
Sergei Golubchik committed
979
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
980
        !table->s->long_unique_table && values_list.elements > 1)
981 982
    {
      using_bulk_insert= 1;
983
      table->file->ha_start_bulk_insert(values_list.elements);
984
    }
985 986
    else
      table->file->ha_reset_copy_info();
987
  }
988

989 990
  if (fields.elements || !value_count || table_list->view != 0)
  {
991 992 993 994 995 996
    if (table->triggers &&
        table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
    {
      /* BEFORE INSERT triggers exist, the check will be done later, per row */
    }
    else if (check_that_all_fields_are_given_values(thd, table, table_list))
997 998 999 1000 1001
    {
      error= 1;
      goto values_loop_end;
    }
  }
1002

1003 1004 1005 1006
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

1007 1008 1009
  switch_to_nullable_trigger_fields(fields, table);
  switch_to_nullable_trigger_fields(update_fields, table);
  switch_to_nullable_trigger_fields(update_values, table);
1010

1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
  if (fields.elements || !value_count)
  {
    /*
      There are possibly some default values:
      INSERT INTO t1 (fields) VALUES ...
      INSERT INTO t1 VALUES ()
    */
    if (table->validate_default_values_of_unset_fields(thd))
    {
      error= 1;
      goto values_loop_end;
    }
  }
1024 1025 1026 1027 1028 1029 1030 1031
  /*
    If statement returns result set, we need to send the result set metadata
    to the client so that it knows that it has to expect an EOF or ERROR.
    At this point we have all the required information to send the result set
    metadata.
  */
  if (returning &&
      result->send_result_set_metadata(returning->item_list,
1032 1033
                                       Protocol::SEND_NUM_ROWS |
                                       Protocol::SEND_EOF))
1034
    goto values_loop_end;
1035 1036

  THD_STAGE_INFO(thd, stage_update);
1037

1038 1039 1040 1041 1042
  if  (duplic == DUP_UPDATE)
  {
    restore_record(table,s->default_values);	// Get empty record
    thd->reconsider_logging_format_for_iodup(table);
  }
1043 1044 1045 1046
  fix_rownum_pointers(thd, thd->lex->current_select, &info.accepted_rows);
  if (returning)
    fix_rownum_pointers(thd, thd->lex->returning(), &info.accepted_rows);

1047
  do
unknown's avatar
unknown committed
1048
  {
1049
    DBUG_PRINT("info", ("iteration %llu", iteration));
1050
    if (iteration && bulk_parameters_set(thd))
1051 1052 1053 1054
    {
      error= 1;
      goto values_loop_end;
    }
1055 1056

    while ((values= its++))
unknown's avatar
unknown committed
1057
    {
1058
      thd->get_stmt_da()->inc_current_row_for_warning();
1059
      if (fields.elements || !value_count)
unknown's avatar
unknown committed
1060
      {
1061 1062 1063 1064
        /*
          There are possibly some default values:
          INSERT INTO t1 (fields) VALUES ...
          INSERT INTO t1 VALUES ()
unknown's avatar
unknown committed
1065
        */
1066 1067
        restore_record(table,s->default_values);	// Get empty record
        table->reset_default_fields();
1068

1069 1070 1071
        if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
                                                          *values, 0,
                                                          TRG_EVENT_INSERT)))
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
        {
          if (values_list.elements != 1 && ! thd->is_error())
          {
            info.records++;
            continue;
          }
          /*
            TODO: set thd->abort_on_warning if values_list.elements == 1
	    and check that all items return warning in case of problem with
	    storing field.
          */
	  error=1;
	  break;
        }
unknown's avatar
unknown committed
1086
      }
1087
      else
unknown's avatar
unknown committed
1088 1089
      {
        /*
1090 1091
          No field list, all fields are set explicitly:
          INSERT INTO t1 VALUES (values)
unknown's avatar
unknown committed
1092
        */
1093
        if (thd->lex->used_tables || // Column used in values()
1094
            table->s->visible_fields != table->s->fields)
1095
	  restore_record(table,s->default_values);	// Get empty record
1096
        else
1097 1098 1099 1100 1101 1102 1103 1104
        {
          TABLE_SHARE *share= table->s;

          /*
            Fix delete marker. No need to restore rest of record since it will
            be overwritten by fill_record() anyway (and fill_record() does not
            use default values in this case).
          */
1105
          table->record[0][0]= share->default_values[0];
1106

1107 1108 1109 1110 1111 1112 1113
          /* Fix undefined null_bits. */
          if (share->null_bytes > 1 && share->last_null_bit_pos)
          {
            table->record[0][share->null_bytes - 1]=
              share->default_values[share->null_bytes - 1];
          }
        }
1114
        table->reset_default_fields();
1115 1116 1117 1118 1119 1120 1121 1122 1123

        /*
          Reset the sentinel thd->bulk_param in order not to consume the next
          values of a bound array in case one of statement executed by
          the trigger's body is INSERT statement.
        */
        void *save_bulk_param= thd->bulk_param;
        thd->bulk_param= nullptr;

1124 1125 1126 1127 1128
        if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
                                                          table->
                                                          field_to_fill(),
                                                          *values, 0,
                                                          TRG_EVENT_INSERT)))
1129
        {
1130
          thd->bulk_param= save_bulk_param;
1131 1132 1133 1134 1135 1136 1137
          if (values_list.elements != 1 && ! thd->is_error())
	  {
	    info.records++;
	    continue;
	  }
	  error=1;
	  break;
1138
        }
1139
        thd->bulk_param= save_bulk_param;
unknown's avatar
unknown committed
1140
      }
1141

1142
      /*
1143 1144
        with triggers a field can get a value *conditionally*, so we have to
        repeat has_no_default_value() check for every row
1145 1146 1147
      */
      if (table->triggers &&
          table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
1148
      {
1149
        for (Field **f=table->field ; *f ; f++)
1150
        {
1151 1152
          if (unlikely(!(*f)->has_explicit_value() &&
                       has_no_default_value(thd, *f, table_list)))
1153 1154 1155 1156
          {
            error= 1;
            goto values_loop_end;
          }
1157 1158
        }
      }
1159

1160
      if ((res= table_list->view_check_option(thd, ignore)) == VIEW_CHECK_SKIP)
1161 1162
        continue;
      else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1163
      {
1164 1165
        error= 1;
        break;
unknown's avatar
unknown committed
1166
      }
1167

1168
#ifndef EMBEDDED_LIBRARY
1169 1170 1171 1172 1173 1174 1175 1176 1177
      if (lock_type == TL_WRITE_DELAYED)
      {
        LEX_STRING const st_query = { query, thd->query_length() };
        DEBUG_SYNC(thd, "before_write_delayed");
        error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
        DEBUG_SYNC(thd, "after_write_delayed");
        query=0;
      }
      else
1178
#endif
1179
      error= write_record(thd, table, &info, result);
1180
      if (unlikely(error))
1181
        break;
1182
      info.accepted_rows++;
1183 1184 1185
    }
    its.rewind();
    iteration++;
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196

    /*
      Save affected rows and insert id when collecting using results
    */
    ulonglong new_affected_rows= info.copied + info.deleted +
            ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
            info.touched : info.updated);
    thd->collect_unit_results(
            table->file->insert_id_for_cur_row,
            new_affected_rows - last_affected_rows);
    last_affected_rows = new_affected_rows;
1197
  } while (bulk_parameters_iterations(thd));
1198

1199
values_loop_end:
1200
  free_underlaid_joins(thd, thd->lex->first_select_lex());
1201 1202
  joins_freed= TRUE;

1203
  /*
Sergei Golubchik's avatar
Sergei Golubchik committed
1204
    Now all rows are inserted. Time to update logs and sends response to
1205 1206
    user
  */
1207
#ifndef EMBEDDED_LIBRARY
1208
  if (unlikely(lock_type == TL_WRITE_DELAYED))
unknown's avatar
unknown committed
1209
  {
1210
    if (likely(!error))
1211 1212 1213 1214
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
1215 1216
  }
  else
1217
#endif
unknown's avatar
unknown committed
1218
  {
1219 1220 1221 1222 1223
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
1224 1225
    if (using_bulk_insert)
    {
1226 1227 1228 1229 1230 1231 1232
      /*
        if my_error() wasn't called yet on some specific row, end_bulk_insert()
        can still do it, but the error shouldn't be for any specific row number
      */
      if (!error)
        thd->get_stmt_da()->reset_current_row_for_warning(0);
      if (unlikely(table->file->ha_end_bulk_insert()) && !error)
1233 1234 1235 1236 1237 1238 1239
      {
        table->file->print_error(my_errno,MYF(0));
        error=1;
      }
    }
    /* Get better status from handler if handler supports it */
    if (table->file->copy_info.records)
1240
    {
1241 1242 1243 1244 1245
      DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
      info.touched= table->file->copy_info.touched;
      info.copied=  table->file->copy_info.copied;
      info.deleted= table->file->copy_info.deleted;
      info.updated= table->file->copy_info.updated;
1246
    }
1247
    if (duplic != DUP_ERROR || ignore)
1248
    {
1249
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1250 1251 1252
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
        table->file->ha_rnd_end();
    }
1253

1254
    transactional_table= table->file->has_transactions_and_rollback();
unknown's avatar
unknown committed
1255

1256
    if (likely(changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
1257
    {
1258 1259 1260 1261 1262
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
1263
      query_cache_invalidate3(thd, table_list, 1);
1264
    }
1265

1266 1267 1268 1269
    if (thd->transaction->stmt.modified_non_trans_table)
      thd->transaction->all.modified_non_trans_table= TRUE;
    thd->transaction->all.m_unsafe_rollback_flags|=
      (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
1270

1271
    if (error <= 0 ||
1272
        thd->transaction->stmt.modified_non_trans_table ||
1273
        thd->log_current_statement() ||
1274
	was_insert_delayed)
1275
    {
1276
      if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
1277
      {
1278
        int errcode= 0;
1279
	if (error <= 0)
1280
        {
1281 1282 1283 1284 1285
	  /*
	    [Guilhem wrote] Temporary errors may have filled
	    thd->net.last_error/errno.  For example if there has
	    been a disk full error when writing the row, and it was
	    MyISAM, then thd->net.last_error/errno will be set to
Marc Alff's avatar
Marc Alff committed
1286
            "disk full"... and the mysql_file_pwrite() will wait until free
1287 1288 1289 1290 1291 1292
	    space appears, and so when it finishes then the
	    write_row() was entirely successful
	  */
	  /* todo: consider removing */
	  thd->clear_error();
	}
1293
        else
1294
          errcode= query_error_code(thd, thd->killed == NOT_KILLED);
1295

1296 1297
        StatementBinlog stmt_binlog(thd, table->versioned(VERS_TRX_ID) ||
                                         thd->binlog_need_stmt_format(transactional_table));
1298
       /* bug#22725:
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309

	A query which per-row-loop can not be interrupted with
	KILLED, like INSERT, and that does not invoke stored
	routines can be binlogged with neglecting the KILLED error.
        
	If there was no error (error == zero) until after the end of
	inserting loop the KILLED flag that appeared later can be
	disregarded since previously possible invocation of stored
	routines did not result in any error due to the KILLED.  In
	such case the flag is ignored for constructing binlog event.
	*/
1310
	DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
Monty's avatar
Monty committed
1311
        if (was_insert_delayed && table_list->lock_type == TL_WRITE)
1312
        {
1313
          /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
          String log_query;
          if (create_insert_stmt_from_insert_delayed(thd, &log_query))
          {
            sql_print_error("Event Error: An error occurred while creating query string"
                            "for INSERT DELAYED stmt, before writing it into binary log.");

            error= 1;
          }
          else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                     log_query.c_ptr(), log_query.length(),
                                     transactional_table, FALSE, FALSE,
1325
                                     errcode) > 0)
1326 1327 1328 1329 1330
            error= 1;
        }
        else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
			           thd->query(), thd->query_length(),
			           transactional_table, FALSE, FALSE,
1331
                                   errcode) > 0)
1332
	  error= 1;
1333
      }
unknown's avatar
unknown committed
1334
    }
unknown's avatar
unknown committed
1335
    DBUG_ASSERT(transactional_table || !changed || 
1336
                thd->transaction->stmt.modified_non_trans_table);
unknown's avatar
unknown committed
1337
  }
Sergei Golubchik's avatar
Sergei Golubchik committed
1338
  THD_STAGE_INFO(thd, stage_end);
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
1355
  table->next_number_field=0;
1356
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1357
  table->auto_increment_field_not_null= FALSE;
1358 1359 1360
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1361

1362
  if (unlikely(error))
unknown's avatar
unknown committed
1363
    goto abort;
1364 1365
  if (thd->lex->analyze_stmt)
  {
1366
    retval= 0;
1367 1368
    goto abort;
  }
1369 1370 1371
  DBUG_PRINT("info", ("touched: %llu  copied: %llu  updated: %llu  deleted: %llu",
                      (ulonglong) info.touched, (ulonglong) info.copied,
                      (ulonglong) info.updated, (ulonglong) info.deleted));
Sergei Golubchik's avatar
Sergei Golubchik committed
1372

1373 1374
  if ((iteration * values_list.elements) == 1 &&
      (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
1375
  {
1376 1377 1378 1379
    /*
      Client expects an EOF/OK packet if result set metadata was sent. If
      LEX::has_returning and the statement returns result set
      we send EOF which is the indicator of the end of the row stream.
1380
      Otherwise we send an OK packet i.e when the statement returns only the
1381 1382 1383 1384
      status information
    */
   if (returning)
      result->send_eof();
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396
   else if (!(thd->in_sub_stmt & SUB_STMT_TRIGGER))
     /*
       Set the status and the number of affected rows in Diagnostics_area
       only in case the INSERT statement is not processed as part of a trigger
       invoked by some other DML statement. Else we would result in incorrect
       number of affected rows for bulk DML operations, e.g. the UPDATE
       statement (called via PS protocol). It would happen since the data
       member Diagnostics_area::m_affected_rows modified twice per DML
       statement - first time at the end of handling the INSERT statement
       invoking by a trigger fired on handling the original DML statement,
       and the second time at the end of handling the original DML statement.
     */
1397
      my_ok(thd, info.copied + info.deleted +
1398
               ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
Sergei Golubchik's avatar
Sergei Golubchik committed
1399
                info.touched : info.updated), id);
1400
  }
1401 1402
  else
  {
unknown's avatar
unknown committed
1403
    char buff[160];
1404 1405
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.touched : info.updated);
Sergei Golubchik's avatar
Sergei Golubchik committed
1406

1407
    if (ignore)
1408
      sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1409 1410
              (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
              (ulong) (info.records - info.copied),
1411
              (long) thd->get_stmt_da()->current_statement_warn_count());
unknown's avatar
unknown committed
1412
    else
1413
      sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1414
              (ulong) (info.deleted + updated),
1415
              (long) thd->get_stmt_da()->current_statement_warn_count());
1416 1417
    if (returning)
      result->send_eof();
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
    else if (!(thd->in_sub_stmt & SUB_STMT_TRIGGER))
      /*
        Set the status and the number of affected rows in Diagnostics_area
        only in case the INSERT statement is not processed as part of a trigger
        invoked by some other DML statement. Else we would result in incorrect
        number of affected rows for bulk DML operations, e.g. the UPDATE
        statement (called via PS protocol). It would happen since the data
        member Diagnostics_area::m_affected_rows modified twice per DML
        statement - first time at the end of handling the INSERT statement
        invoking by a trigger fired on handling the original DML statement,
        and the second time at the end of handling the original DML statement.
      */
1430
      ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
unknown's avatar
unknown committed
1431
  }
1432
  thd->abort_on_warning= 0;
1433
  if (!thd->lex->current_select->leaf_tables_saved)
1434 1435
  {
    thd->lex->current_select->save_leaf_tables(thd);
1436
    thd->lex->current_select->leaf_tables_saved= true;
1437
  }
Marko Mäkelä's avatar
Marko Mäkelä committed
1438 1439

  my_free(readbuff);
1440 1441 1442 1443
#ifndef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED && table->expr_arena)
    table->expr_arena->free_items();
#endif
unknown's avatar
unknown committed
1444
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
1445 1446

abort:
1447
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
1448
  if (lock_type == TL_WRITE_DELAYED)
1449
  {
unknown's avatar
unknown committed
1450
    end_delayed_insert(thd);
1451 1452 1453 1454 1455 1456 1457 1458 1459
    /*
      In case of an error (e.g. data truncation), the data type specific data
      in fields (e.g. Field_blob::value) was not taken over
      by the delayed writer thread. All fields in table_list->table
      will be freed by free_root() soon. We need to free the specific
      data before free_root() to avoid a memory leak.
    */
    for (Field **ptr= table_list->table->field ; *ptr ; ptr++)
      (*ptr)->free();
1460 1461
    if (table_list->table->expr_arena)
      table_list->table->expr_arena->free_items();
1462
  }
1463
#endif
1464
  if (table != NULL)
1465
    table->file->ha_release_auto_increment();
1466

1467
  if (!joins_freed)
1468
    free_underlaid_joins(thd, thd->lex->first_select_lex());
unknown's avatar
unknown committed
1469
  thd->abort_on_warning= 0;
1470 1471
  if (readbuff)
    my_free(readbuff);
1472
  status_var_add(thd->status_var.rows_sent, thd->get_sent_row_count());
1473
  DBUG_RETURN(retval);
unknown's avatar
unknown committed
1474 1475 1476
}


unknown's avatar
VIEW  
unknown committed
1477 1478 1479 1480 1481
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
1482
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
1483
    view    - reference on VIEW
1484
    fields  - fields used in insert
unknown's avatar
VIEW  
unknown committed
1485

1486 1487 1488 1489 1490 1491
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
1492 1493
  RETURN
    FALSE - OK
1494 1495 1496
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
1497 1498 1499
    TRUE  - can't be used for insert
*/

1500 1501
static bool check_view_insertability(THD *thd, TABLE_LIST *view,
                                     List<Item> &fields)
unknown's avatar
VIEW  
unknown committed
1502
{
1503
  uint num= view->view->first_select_lex()->item_list.elements;
unknown's avatar
VIEW  
unknown committed
1504
  TABLE *table= view->table;
1505 1506 1507
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
unknown committed
1508
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
Monty's avatar
Monty committed
1509
  my_bitmap_map *used_fields_buff= (my_bitmap_map*)thd->alloc(used_fields_buff_size);
1510
  MY_BITMAP used_fields;
Sergei Golubchik's avatar
Sergei Golubchik committed
1511
  enum_column_usage saved_column_usage= thd->column_usage;
1512 1513
  List_iterator_fast<Item> it(fields);
  Item *ex;
1514 1515
  DBUG_ENTER("check_key_in_view");

1516 1517 1518
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
1519 1520
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

1521
  (void) my_bitmap_init(&used_fields, used_fields_buff, table->s->fields);
1522 1523
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
1524
  view->contain_auto_increment= 0;
1525 1526 1527 1528
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
1529
  thd->column_usage= COLUMNS_WRITE;
unknown's avatar
VIEW  
unknown committed
1530
  /* check simplicity and prepare unique test of view */
1531
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
1532
  {
1533
    if (trans->item->fix_fields_if_needed(thd, &trans->item))
1534
    {
Sergei Golubchik's avatar
Sergei Golubchik committed
1535
      thd->column_usage= saved_column_usage;
1536 1537
      DBUG_RETURN(TRUE);
    }
1538
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
1539
    /* simple SELECT list entry (field without expression) */
1540
    if (!(field= trans->item->field_for_view_update()))
1541
    {
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
      // Do not check fields which we are not inserting into
      while((ex= it++))
      {
        // The field used in the INSERT
        if (ex->real_item()->field_for_view_update() ==
            trans->item->field_for_view_update())
          break;
      }
      it.rewind();
      if (!ex)
        continue;
Sergei Golubchik's avatar
Sergei Golubchik committed
1553
      thd->column_usage= saved_column_usage;
unknown's avatar
VIEW  
unknown committed
1554
      DBUG_RETURN(TRUE);
1555
    }
1556
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
1557 1558
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
1559 1560 1561 1562 1563
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
1564
  }
Sergei Golubchik's avatar
Sergei Golubchik committed
1565
  thd->column_usage= saved_column_usage;
unknown's avatar
VIEW  
unknown committed
1566
  /* unique test */
1567
  while((ex= it++))
unknown's avatar
VIEW  
unknown committed
1568
  {
1569
    /* Thanks to test above, we know that all columns are of type Item_field */
1570 1571 1572
    DBUG_ASSERT(ex->real_item()->field_for_view_update()->type() ==
                Item::FIELD_ITEM);
    Item_field *field= (Item_field *)ex->real_item()->field_for_view_update();
1573 1574
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
1575 1576 1577 1578 1579 1580 1581
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608
/**
  TODO remove when MDEV-17395 will be closed

  Checks if REPLACE or ON DUPLICATE UPDATE was executed on table containing
  WITHOUT OVERLAPS key.

  @return
  0 if no error
  ER_NOT_SUPPORTED_YET if the above condidion was met
 */
int check_duplic_insert_without_overlaps(THD *thd, TABLE *table,
                                         enum_duplicates duplic)
{
  if (duplic == DUP_REPLACE || duplic == DUP_UPDATE)
  {
    for (uint k = 0; k < table->s->keys; k++)
    {
      if (table->key_info[k].without_overlaps)
      {
        my_error(ER_NOT_SUPPORTED_YET, MYF(0), "WITHOUT OVERLAPS");
        return ER_NOT_SUPPORTED_YET;
      }
    }
  }
  return 0;
}

unknown's avatar
unknown committed
1609
/*
1610
  Check if table can be updated
unknown's avatar
unknown committed
1611 1612

  SYNOPSIS
1613 1614
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
1615
     table_list		Table list
1616 1617
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
1618
     select_insert      Check is making for SELECT ... INSERT
1619 1620

   RETURN
unknown's avatar
unknown committed
1621 1622
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
1623
*/
1624

unknown's avatar
unknown committed
1625
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1626
                                             List<Item> &fields,
unknown's avatar
unknown committed
1627
                                             bool select_insert)
unknown's avatar
unknown committed
1628
{
unknown's avatar
VIEW  
unknown committed
1629
  bool insert_into_view= (table_list->view != 0);
1630
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
1631

1632
  if (!table_list->single_table_updatable())
1633
  {
1634
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
1635 1636
    DBUG_RETURN(TRUE);
  }
1637 1638 1639 1640 1641 1642 1643
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

1644 1645 1646 1647
  if (setup_tables_and_check_access(thd,
                                    &thd->lex->first_select_lex()->context,
                                    &thd->lex->first_select_lex()->
                                      top_join_list,
1648
                                    table_list,
1649
                                    thd->lex->first_select_lex()->leaf_tables,
1650 1651
                                    select_insert, INSERT_ACL, SELECT_ACL,
                                    TRUE))
unknown's avatar
unknown committed
1652
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
1653 1654 1655 1656

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
1657
    if (!table_list->table || table_list->is_multitable())
1658 1659 1660
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
1661
      DBUG_RETURN(TRUE);
1662
    }
1663
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
1664 1665
  }

unknown's avatar
unknown committed
1666
  DBUG_RETURN(FALSE);
1667 1668 1669
}


1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682
/*
  Get extra info for tables we insert into

  @param table     table(TABLE object) we insert into,
                   might be NULL in case of view
  @param           table(TABLE_LIST object) or view we insert into
*/

static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
{
  if (table)
  {
    if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1683
      table->prepare_for_position();
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
    return;
  }

  DBUG_ASSERT(tables->view);
  List_iterator<TABLE_LIST> it(*tables->view_tables);
  TABLE_LIST *tbl;
  while ((tbl= it++))
    prepare_for_positional_update(tbl->table, tbl);

  return;
}


1697 1698 1699 1700 1701
/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
Sergei Golubchik's avatar
Sergei Golubchik committed
1702 1703 1704 1705
    thd                 Thread handler
    table_list          Global/local table list
    where               Where clause (for insert ... select)
    select_insert       TRUE if INSERT ... SELECT statement
1706

unknown's avatar
unknown committed
1707 1708 1709 1710 1711
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
1712

1713 1714 1715
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
Sergei Golubchik's avatar
Sergei Golubchik committed
1716

1717
  RETURN VALUE
1718 1719 1720
    0  OK
    >0 error
    <0 insert should be ignored
1721 1722
*/

1723 1724 1725
int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
                         List<Item> &fields, List_item *values,
                         List<Item> &update_fields, List<Item> &update_values,
1726 1727
                         enum_duplicates duplic, bool ignore,
                         COND **where,
1728
                         bool select_insert)
1729
{
1730
  SELECT_LEX *select_lex= thd->lex->first_select_lex();
unknown's avatar
unknown committed
1731
  Name_resolution_context *context= &select_lex->context;
1732
  Name_resolution_context_state ctx_state;
1733
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
1734
  bool res= 0;
1735
  table_map map= 0;
1736
  TABLE *table;
1737
  DBUG_ENTER("mysql_prepare_insert");
1738 1739
  DBUG_PRINT("enter", ("table_list: %p  view: %d",
		       table_list, (int) insert_into_view));
1740 1741
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
unknown's avatar
unknown committed
1742

1743
  if (mysql_handle_derived(thd->lex, DT_INIT))
1744
    DBUG_RETURN(1);
1745
  if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
1746
    DBUG_RETURN(1);
1747
  if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
1748
    DBUG_RETURN(1);
1749

unknown's avatar
unknown committed
1750
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1751 1752
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
1753
    if (table_list->set_insert_values(thd->mem_root))
1754
      DBUG_RETURN(1);
unknown's avatar
unknown committed
1755
  }
unknown's avatar
unknown committed
1756

1757 1758 1759 1760 1761
  table= table_list->table;

  if (table->file->check_if_updates_are_ignored("INSERT"))
    DBUG_RETURN(-1);

1762
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1763
    DBUG_RETURN(1);
unknown's avatar
VIEW  
unknown committed
1764

unknown's avatar
unknown committed
1765
  /* Prepare the fields in the statement. */
1766
  if (values)
unknown's avatar
unknown committed
1767
  {
1768 1769 1770 1771 1772 1773
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1774
    /*
1775 1776 1777 1778 1779 1780
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

1781 1782 1783
    res= setup_returning_fields(thd, table_list) ||
         setup_fields(thd, Ref_ptr_array(),
                      *values, MARK_COLUMNS_READ, 0, NULL, 0) ||
1784
          check_insert_fields(thd, context->table_list, fields, *values,
1785
                              !insert_into_view, 0, &map);
unknown's avatar
unknown committed
1786

Sergei Golubchik's avatar
Sergei Golubchik committed
1787
    if (!res)
1788
      res= setup_fields(thd, Ref_ptr_array(),
1789
                        update_values, MARK_COLUMNS_READ, 0, NULL, 0);
1790

unknown's avatar
unknown committed
1791
    if (!res && duplic == DUP_UPDATE)
unknown's avatar
unknown committed
1792
    {
1793
      select_lex->no_wrap_view_item= TRUE;
1794
      res= check_update_fields(thd, context->table_list, update_fields,
1795 1796 1797 1798 1799 1800 1801
                               update_values, false, &map) ||
           /*
             Check that all col=expr pairs are compatible for assignment in
             INSERT INTO t1 VALUES (...)
               ON DUPLICATE KEY UPDATE col=expr [, col=expr];
           */
           TABLE::check_assignability_explicit_fields(update_fields,
1802 1803
                                                      update_values,
                                                      ignore);
1804

1805
      select_lex->no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
1806
    }
1807 1808 1809

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
1810
  }
unknown's avatar
unknown committed
1811

1812 1813
  thd->get_stmt_da()->reset_current_row_for_warning(1);

unknown's avatar
unknown committed
1814 1815
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
1816

1817 1818 1819
  if (check_duplic_insert_without_overlaps(thd, table, duplic) != 0)
    DBUG_RETURN(true);

1820
  if (table->versioned(VERS_TIMESTAMP))
1821 1822
  {
    // Additional memory may be required to create historical items.
1823
    if (duplic == DUP_REPLACE && table_list->set_insert_values(thd->mem_root))
1824
      DBUG_RETURN(1);
1825

1826 1827 1828 1829
    Field *row_start= table->vers_start_field();
    Field *row_end= table->vers_end_field();
    if (!fields.elements && !(row_start->invisible && row_end->invisible))
      thd->vers_insert_history(row_start); // check privileges
1830 1831
  }

1832
  if (!select_insert)
unknown's avatar
unknown committed
1833
  {
1834
    Item *fake_conds= 0;
1835
    TABLE_LIST *duplicate;
1836 1837
    if ((duplicate= unique_table(thd, table_list, table_list->next_global,
                                 CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
1838
    {
1839
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1840
      DBUG_RETURN(1);
1841
    }
1842
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
unknown's avatar
unknown committed
1843
  }
1844
  /*
1845
    Only call prepare_for_posistion() if we are not performing a DELAYED
1846 1847
    operation. It will instead be executed by delayed insert thread.
  */
1848 1849
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
    prepare_for_positional_update(table, table_list);
1850
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1851 1852 1853
}


unknown's avatar
unknown committed
1854 1855
	/* Check if there is more uniq keys after field */

1856
static int last_uniq_key(TABLE *table, const KEY *key, uint keynr)
unknown's avatar
unknown committed
1857
{
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874
  /*
    When an underlying storage engine informs that the unique key
    conflicts are not reported in the ascending order by setting
    the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
    information to determine the last key conflict.
   
    The information about the last key conflict will be used to
    do a replace of the new row on the conflicting row, rather
    than doing a delete (of old row) + insert (of new row).
   
    Hence check for this flag and disable replacing the last row
    by returning 0 always. Returning 0 will result in doing
    a delete + insert always.
  */
  if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
    return 0;

1875
  while (++keynr < table->s->keys)
1876
    if (key[keynr].flags & HA_NOSAME)
unknown's avatar
unknown committed
1877 1878 1879 1880 1881
      return 0;
  return 1;
}


1882 1883 1884 1885 1886 1887 1888
/*
 Inserts one historical row to a table.

 Copies content of the row from table->record[1] to table->record[0],
 sets Sys_end to now() and calls ha_write_row() .
*/

Aleksey Midenkov's avatar
Aleksey Midenkov committed
1889
int vers_insert_history_row(TABLE *table)
1890
{
1891
  DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
1892
  DBUG_ASSERT(table->vers_write);
1893 1894 1895
  restore_record(table,record[1]);

  // Set Sys_end to now()
1896
  table->vers_update_end();
1897

1898 1899 1900 1901 1902
  Field *row_start= table->vers_start_field();
  Field *row_end= table->vers_end_field();
  if (row_start->cmp(row_start->ptr, row_end->ptr) >= 0)
    return 0;

1903 1904 1905 1906
  if (table->vfield &&
      table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_READ))
    return HA_ERR_GENERIC;

Aleksey Midenkov's avatar
Aleksey Midenkov committed
1907
  return table->file->ha_write_row(table->record[0]);
1908 1909
}

unknown's avatar
unknown committed
1910
/*
unknown's avatar
unknown committed
1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
1921
      sink  - result sink for the RETURNING clause
unknown's avatar
unknown committed
1922

unknown's avatar
unknown committed
1923 1924 1925 1926 1927
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1928

Sergei Golubchik's avatar
Sergei Golubchik committed
1929 1930
    Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which
    is updated didn't have transactions.
unknown's avatar
unknown committed
1931 1932 1933 1934

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1935 1936 1937
*/


1938
int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink)
unknown's avatar
unknown committed
1939
{
unknown's avatar
unknown committed
1940
  int error, trg_error= 0;
unknown's avatar
unknown committed
1941
  char *key=0;
1942
  MY_BITMAP *save_read_set, *save_write_set;
1943
  ulonglong prev_insert_id= table->file->next_insert_id;
1944
  ulonglong insert_id_for_cur_row= 0;
1945
  ulonglong prev_insert_id_for_cur_row= 0;
1946
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1947

unknown's avatar
unknown committed
1948
  info->records++;
Sergei Golubchik's avatar
Sergei Golubchik committed
1949
  save_read_set= table->read_set;
1950
  save_write_set= table->write_set;
1951

1952 1953 1954 1955 1956 1957
  DBUG_EXECUTE_IF("rpl_write_record_small_sleep_gtid_100_200",
    {
      if (thd->rgi_slave && (thd->rgi_slave->current_gtid.seq_no == 100 ||
                             thd->rgi_slave->current_gtid.seq_no == 200))
        my_sleep(20000);
    });
1958 1959
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1960
  {
1961
    while (unlikely(error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1962
    {
1963
      uint key_nr;
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1975
      bool is_duplicate_key_error;
1976
      if (table->file->is_fatal_error(error, HA_CHECK_ALL))
Sergei Golubchik's avatar
Sergei Golubchik committed
1977
        goto err;
1978 1979
      is_duplicate_key_error=
        table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP);
unknown's avatar
unknown committed
1980 1981
      if (!is_duplicate_key_error)
      {
1982 1983 1984 1985 1986
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1987
        if (info->ignore)
1988
        {
1989
          table->file->print_error(error, MYF(ME_WARNING));
1990
          goto after_trg_or_ignored_err; /* Ignoring a not fatal error */
1991
        }
1992
        goto err;
unknown's avatar
unknown committed
1993
      }
1994
      if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0))
unknown's avatar
unknown committed
1995
      {
1996
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1997 1998
	goto err;
      }
1999 2000
      DEBUG_SYNC(thd, "write_row_replace");

2001 2002
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
2003 2004 2005 2006 2007
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
Sergei Golubchik's avatar
Sergei Golubchik committed
2008
      if (info->handle_duplicates == DUP_REPLACE &&
Sergei Golubchik's avatar
Sergei Golubchik committed
2009
          key_nr == table->s->next_number_index && insert_id_for_cur_row > 0)
2010
	goto err;
2011
      if (table->file->has_dup_ref())
unknown's avatar
unknown committed
2012
      {
2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024
        /*
          If engine doesn't support HA_DUPLICATE_POS, the handler may init to
          INDEX, but dup_ref could also be set by lookup_handled (and then,
          lookup_errkey is set, f.ex. long unique duplicate).

          In such case, handler would stay uninitialized, so do it here.
         */
        bool init_lookup_handler= table->file->lookup_errkey != (uint)-1 &&
                                  table->file->inited == handler::NONE;
        if (init_lookup_handler && table->file->ha_rnd_init_with_error(false))
          goto err;

2025
        DBUG_ASSERT(table->file->inited == handler::RND);
2026 2027 2028 2029 2030 2031 2032
	int rnd_pos_err= table->file->ha_rnd_pos(table->record[1],
                                                 table->file->dup_ref);

        if (init_lookup_handler)
          table->file->ha_rnd_end();
        if (rnd_pos_err)
          goto err;
unknown's avatar
unknown committed
2033 2034 2035
      }
      else
      {
2036
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
2037 2038 2039 2040
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
2041

unknown's avatar
unknown committed
2042 2043
	if (!key)
	{
2044
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length)))
unknown's avatar
unknown committed
2045 2046 2047 2048 2049
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
2050
	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
2051
        key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1;
2052 2053
	if ((error= (table->file->ha_index_read_idx_map(table->record[1],
                                                        key_nr, (uchar*) key,
Igor Babaev's avatar
Igor Babaev committed
2054
                                                        keypart_map,
2055
                                                        HA_READ_KEY_EXACT))))
unknown's avatar
unknown committed
2056 2057
	  goto err;
      }
2058 2059
      if (table->vfield)
      {
2060 2061 2062 2063
        /*
          We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ)
          in handler methods for the just read row in record[1].
        */
2064
        table->move_fields(table->field, table->record[1], table->record[0]);
2065
        int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
2066
        table->move_fields(table->field, table->record[0], table->record[1]);
2067 2068
        if (verr)
          goto err;
2069
      }
2070
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2071
      {
unknown's avatar
unknown committed
2072
        int res= 0;
unknown's avatar
unknown committed
2073 2074 2075 2076
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
2077
        */
unknown's avatar
unknown committed
2078
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
2079 2080
        store_record(table,insert_values);
        restore_record(table,record[1]);
2081
        table->reset_default_fields();
2082 2083 2084 2085 2086 2087

        /*
          in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can
          change per row. Thus, we have to do reset_default_fields() per row.
          Twice (before insert and before update).
        */
unknown's avatar
unknown committed
2088 2089
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
2090 2091
        if (fill_record_n_invoke_before_triggers(thd, table,
                                                 *info->update_fields,
unknown's avatar
unknown committed
2092 2093
                                                 *info->update_values,
                                                 info->ignore,
unknown's avatar
unknown committed
2094 2095
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
2096

2097 2098 2099 2100 2101 2102 2103
        bool different_records= (!records_are_comparable(table) ||
                                 compare_record(table));
        /*
          Default fields must be updated before checking view updateability.
          This branch of INSERT is executed only when a UNIQUE key was violated
          with the ON DUPLICATE KEY UPDATE option. In this case the INSERT
          operation is transformed to an UPDATE, and the default fields must
2104
          be updated as if this is an UPDATE.
2105 2106
        */
        if (different_records && table->default_field)
2107
          table->evaluate_update_default_function();
2108

2109
        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
2110 2111
        res= info->table_list->view_check_option(table->in_use, info->ignore);
        if (res == VIEW_CHECK_SKIP)
2112
          goto after_trg_or_ignored_err;
unknown's avatar
unknown committed
2113
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
2114
          goto before_trg_err;
2115

2116
        table->file->restore_auto_increment(prev_insert_id);
2117
        info->touched++;
2118
        if (different_records)
2119
        {
2120 2121
          if (unlikely(error=table->file->ha_update_row(table->record[1],
                                                        table->record[0])) &&
2122
              error != HA_ERR_RECORD_IS_THE_SAME)
2123
          {
2124
            if (info->ignore &&
2125
                !table->file->is_fatal_error(error, HA_CHECK_ALL))
2126
            {
2127 2128
              if (!(thd->variables.old_behavior &
                    OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2129
                table->file->print_error(error, MYF(ME_WARNING));
2130
              goto after_trg_or_ignored_err;
2131 2132
            }
            goto err;
2133
          }
unknown's avatar
unknown committed
2134

2135
          if (error != HA_ERR_RECORD_IS_THE_SAME)
2136
          {
2137
            info->updated++;
2138 2139
            if (table->versioned() &&
                table->vers_check_update(*info->update_fields))
Aleksey Midenkov's avatar
Aleksey Midenkov committed
2140
            {
2141
              if (table->versioned(VERS_TIMESTAMP))
Aleksey Midenkov's avatar
Aleksey Midenkov committed
2142 2143 2144 2145
              {
                store_record(table, record[2]);
                if ((error= vers_insert_history_row(table)))
                {
2146 2147 2148
                  info->last_errno= error;
                  table->file->print_error(error, MYF(0));
                  trg_error= 1;
Aleksey Midenkov's avatar
Aleksey Midenkov committed
2149
                  restore_record(table, record[2]);
2150
                  goto after_trg_or_ignored_err;
Aleksey Midenkov's avatar
Aleksey Midenkov committed
2151 2152 2153 2154 2155
                }
                restore_record(table, record[2]);
              }
              info->copied++;
            }
2156
          }
2157 2158
          else
            error= 0;
unknown's avatar
unknown committed
2159
          /*
Michael Widenius's avatar
Michael Widenius committed
2160 2161 2162 2163 2164 2165
            If ON DUP KEY UPDATE updates a row instead of inserting
            one, it's like a regular UPDATE statement: it should not
            affect the value of a next SELECT LAST_INSERT_ID() or
            mysql_insert_id().  Except if LAST_INSERT_ID(#) was in the
            INSERT query, which is handled separately by
            THD::arg_of_last_insert_id_function.
unknown's avatar
unknown committed
2166
          */
2167
          prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row;
unknown's avatar
unknown committed
2168
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
unknown's avatar
unknown committed
2169 2170
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
unknown's avatar
unknown committed
2171
                                                        TRG_ACTION_AFTER, TRUE));
2172 2173
          info->copied++;
        }
2174

2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186
        /*
          Only update next_insert_id if the AUTO_INCREMENT value was explicitly
          updated, so we don't update next_insert_id with the value from the
          row being updated. Otherwise reset next_insert_id to what it was
          before the duplicate key error, since that value is unused.
        */
        if (table->next_number_field_updated)
        {
          DBUG_ASSERT(table->next_number_field != NULL);

          table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
        }
2187
        else if (prev_insert_id_for_cur_row)
2188 2189 2190
        {
          table->file->restore_auto_increment(prev_insert_id_for_cur_row);
        }
2191
        goto ok;
2192 2193 2194
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
2195 2196 2197 2198 2199
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
2200 2201
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
2202 2203 2204 2205 2206 2207
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
2208 2209 2210 2211

          Note, TABLE_SHARE and TABLE see long uniques differently:
          - TABLE_SHARE sees as HA_KEY_ALG_LONG_HASH and HA_NOSAME
          - TABLE sees as usual non-unique indexes
2212
        */
2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
        bool is_long_unique= table->s->key_info &&
                             table->s->key_info[key_nr].algorithm ==
                             HA_KEY_ALG_LONG_HASH;
        if ((is_long_unique ?
             /*
               We have a long unique. Test that there are no in-engine
               uniques and the current long unique is the last long unique.
             */
             !(table->key_info[0].flags & HA_NOSAME) &&
             last_uniq_key(table, table->s->key_info, key_nr) :
             /*
               We have a normal key - not a long unique.
               Test is the current normal key is unique and
               it is the last normal unique.
             */
             last_uniq_key(table, table->key_info, key_nr)) &&
2229
            !table->file->referenced_by_foreign_key() &&
2230
            (!table->triggers || !table->triggers->has_delete_triggers()))
2231
        {
2232
          if (table->versioned(VERS_TRX_ID))
2233
          {
2234
            DBUG_ASSERT(table->vers_write);
2235
            bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
2236
            table->file->column_bitmaps_signal();
2237 2238
            table->vers_start_field()->store(0, false);
          }
2239 2240
          if (unlikely(error= table->file->ha_update_row(table->record[1],
                                                         table->record[0])) &&
2241
              error != HA_ERR_RECORD_IS_THE_SAME)
2242
            goto err;
2243
          if (likely(!error))
2244
          {
2245
            info->deleted++;
2246
            if (!table->file->has_transactions())
Marko Mäkelä's avatar
Marko Mäkelä committed
2247
              thd->transaction->stmt.modified_non_trans_table= TRUE;
2248
            if (table->versioned(VERS_TIMESTAMP) && table->vers_write)
2249 2250 2251 2252
            {
              store_record(table, record[2]);
              error= vers_insert_history_row(table);
              restore_record(table, record[2]);
2253
              if (unlikely(error))
2254 2255 2256
                goto err;
            }
          }
2257
          else
2258
            error= 0;   // error was HA_ERR_RECORD_IS_THE_SAME
2259 2260 2261 2262 2263
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
2264 2265 2266 2267 2268 2269 2270
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
2271

2272
          if (!table->versioned(VERS_TIMESTAMP))
2273 2274 2275
            error= table->file->ha_delete_row(table->record[1]);
          else
          {
2276 2277
            store_record(table, record[2]);
            restore_record(table, record[1]);
2278
            table->vers_update_end();
2279 2280
            error= table->file->ha_update_row(table->record[1],
                                              table->record[0]);
2281
            restore_record(table, record[2]);
2282
          }
2283
          if (unlikely(error))
unknown's avatar
unknown committed
2284
            goto err;
2285
          if (!table->versioned(VERS_TIMESTAMP))
2286 2287 2288
            info->deleted++;
          else
            info->updated++;
2289
          if (!table->file->has_transactions_and_rollback())
2290
            thd->transaction->stmt.modified_non_trans_table= TRUE;
unknown's avatar
unknown committed
2291 2292 2293 2294 2295
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
2296
            goto after_trg_or_ignored_err;
unknown's avatar
unknown committed
2297 2298
          }
          /* Let us attempt do write_row() once more */
2299
        }
unknown's avatar
unknown committed
2300 2301
      }
    }
2302 2303
    
    /*
Michael Widenius's avatar
Michael Widenius committed
2304 2305 2306 2307 2308 2309
      If more than one iteration of the above while loop is done, from
      the second one the row being inserted will have an explicit
      value in the autoinc field, which was set at the first call of
      handler::update_auto_increment(). This value is saved to avoid
      thd->insert_id_for_cur_row becoming 0. Use this saved autoinc
      value.
2310 2311 2312 2313
     */
    if (table->file->insert_id_for_cur_row == 0)
      table->file->insert_id_for_cur_row= insert_id_for_cur_row;
      
2314 2315 2316 2317 2318 2319 2320
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
2321
  }
2322
  else if (unlikely((error=table->file->ha_write_row(table->record[0]))))
unknown's avatar
unknown committed
2323
  {
2324
    DEBUG_SYNC(thd, "write_row_noreplace");
2325
    if (!info->ignore ||
2326
        table->file->is_fatal_error(error, HA_CHECK_ALL))
unknown's avatar
unknown committed
2327
      goto err;
2328 2329
    if (!(thd->variables.old_behavior &
          OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2330
      table->file->print_error(error, MYF(ME_WARNING));
2331
    table->file->restore_auto_increment(prev_insert_id);
2332
    goto after_trg_or_ignored_err;
unknown's avatar
unknown committed
2333
  }
2334 2335 2336

after_trg_n_copied_inc:
  info->copied++;
2337
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
2338 2339 2340
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
2341

2342 2343 2344 2345 2346 2347 2348
ok:
  /*
    We send the row after writing it to the table so that the
    correct values are sent to the client. Otherwise it won't show
    autoinc values (generated inside the handler::ha_write()) and
    values updated in ON DUPLICATE KEY UPDATE.
  */
2349 2350 2351 2352 2353
  if (sink)
  {
    if (sink->send_data(thd->lex->returning()->item_list) < 0)
      trg_error= 1;
  }
2354 2355

after_trg_or_ignored_err:
unknown's avatar
unknown committed
2356
  if (key)
2357
    my_safe_afree(key,table->s->max_unique_length);
2358
  if (!table->file->has_transactions_and_rollback())
2359
    thd->transaction->stmt.modified_non_trans_table= TRUE;
unknown's avatar
unknown committed
2360
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
2361 2362

err:
2363
  info->last_errno= error;
unknown's avatar
unknown committed
2364
  table->file->print_error(error,MYF(0));
2365
  
unknown's avatar
unknown committed
2366
before_trg_err:
2367
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
2368
  if (key)
2369
    my_safe_afree(key, table->s->max_unique_length);
2370
  table->column_bitmaps_set(save_read_set, save_write_set);
2371
  DBUG_RETURN(1);
unknown's avatar
unknown committed
2372 2373 2374 2375
}


/******************************************************************************
2376
  Check that there aren't any null_fields
unknown's avatar
unknown committed
2377 2378
******************************************************************************/

2379 2380

int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list)
unknown's avatar
unknown committed
2381
{
2382
  int err= 0;
2383 2384
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
2385 2386
  for (Field **field=entry->field ; *field ; field++)
  {
2387
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
2388
        !(*field)->vcol_info &&
2389
        has_no_default_value(thd, *field, table_list))
2390
      err=1;
unknown's avatar
unknown committed
2391
  }
2392
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
2393 2394 2395
}

/*****************************************************************************
unknown's avatar
unknown committed
2396 2397
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
2398 2399
*****************************************************************************/

2400 2401
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
2402 2403
class delayed_row :public ilink {
public:
2404
  char *record;
unknown's avatar
unknown committed
2405
  enum_duplicates dup;
2406 2407
  my_time_t start_time;
  ulong start_time_sec_part;
Monty's avatar
Monty committed
2408
  sql_mode_t sql_mode;
2409
  bool auto_increment_field_not_null;
2410 2411
  bool ignore, log_query;
  THD::used_t query_start_sec_part_used;
2412 2413
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
2414
  ulonglong forced_insert_id;
2415 2416
  ulong auto_increment_increment;
  ulong auto_increment_offset;
2417
  LEX_STRING query;
2418
  Time_zone *time_zone;
2419 2420 2421
  char *user, *host, *ip;
  query_id_t query_id;
  my_thread_id thread_id;
unknown's avatar
unknown committed
2422

2423 2424 2425
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
2426 2427
      forced_insert_id(0), query(query_arg), time_zone(0),
      user(0), host(0), ip(0)
2428
    {}
2429
  ~delayed_row() override
unknown's avatar
unknown committed
2430
  {
2431 2432
    my_free(query.str);
    my_free(record);
unknown's avatar
unknown committed
2433 2434 2435
  }
};

2436
/**
2437
  Delayed_insert - context of a thread responsible for delayed insert
2438 2439 2440 2441
  into one table. When processing delayed inserts, we create an own
  thread for every distinct table. Later on all delayed inserts directed
  into that table are handled by a dedicated thread.
*/
unknown's avatar
unknown committed
2442

2443
class Delayed_insert :public ilink {
unknown's avatar
unknown committed
2444
  uint locks_in_memory;
2445
  thr_lock_type delayed_lock;
unknown's avatar
unknown committed
2446 2447 2448
public:
  THD thd;
  TABLE *table;
Marc Alff's avatar
Marc Alff committed
2449 2450
  mysql_mutex_t mutex;
  mysql_cond_t cond, cond_client;
2451
  uint tables_in_use, stacked_inserts;
2452
  volatile bool status;
2453
  bool retry;
2454
  /**
2455
    When the handler thread starts, it clones a metadata lock ticket
2456 2457 2458
    which protects against GRL and ticket for the table to be inserted.
    This is done to allow the deadlock detector to detect deadlocks
    resulting from these locks.
2459 2460 2461 2462 2463 2464 2465
    Before this is done, the connection thread cannot safely exit
    without causing problems for clone_ticket().
    Once handler_thread_initialized has been set, it is safe for the
    connection thread to exit.
    Access to handler_thread_initialized is protected by di->mutex.
  */
  bool handler_thread_initialized;
unknown's avatar
unknown committed
2466 2467
  COPY_INFO info;
  I_List<delayed_row> rows;
2468
  ulong group_count;
unknown's avatar
unknown committed
2469
  TABLE_LIST table_list;			// Argument
2470 2471 2472 2473 2474
  /**
    Request for IX metadata lock protecting against GRL which is
    passed from connection thread to the handler thread.
  */
  MDL_request grl_protection;
2475
  Delayed_insert(LEX *lex)
Monty's avatar
Monty committed
2476 2477
    :locks_in_memory(0), thd(next_thread_id()),
     table(0),tables_in_use(0), stacked_inserts(0),
2478
     status(0), retry(0), handler_thread_initialized(FALSE), group_count(0)
unknown's avatar
unknown committed
2479
  {
2480
    DBUG_ENTER("Delayed_insert constructor");
2481 2482 2483 2484
    thd.security_ctx->user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
    thd.security_ctx->ip= NULL;
    thd.query_id= 0;
2485
    strmake_buf(thd.security_ctx->priv_user, thd.security_ctx->user);
unknown's avatar
unknown committed
2486
    thd.current_tablenr=0;
Sergei Golubchik's avatar
Sergei Golubchik committed
2487
    thd.set_command(COM_DELAYED_INSERT);
2488 2489 2490
    thd.lex->current_select= lex->current_select;
    thd.lex->sql_command= lex->sql_command;        // For innodb::store_lock()
    thd.lex->duplicates= lex->duplicates;
2491 2492 2493 2494 2495 2496
    /*
      Prevent changes to global.lock_wait_timeout from affecting
      delayed insert threads as any timeouts in delayed inserts
      are not communicated to the client.
    */
    thd.variables.lock_wait_timeout= LONG_TIMEOUT;
unknown's avatar
unknown committed
2497

2498 2499
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
2500
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2501
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
2502
    bzero((char*) &info,sizeof(info));
Marc Alff's avatar
Marc Alff committed
2503 2504 2505
    mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
    mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
    mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2506
    mysql_mutex_lock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2507
    delayed_insert_threads++;
2508
    mysql_mutex_unlock(&LOCK_delayed_insert);
2509 2510
    delayed_lock= global_system_variables.low_priority_updates ?
                                          TL_WRITE_LOW_PRIORITY : TL_WRITE;
2511
    DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2512
  }
2513
  ~Delayed_insert() override
unknown's avatar
unknown committed
2514
  {
2515
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
2516 2517 2518 2519
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
2520
    {
unknown's avatar
unknown committed
2521
      close_thread_tables(&thd);
2522
      thd.mdl_context.release_transactional_locks(&thd);
2523
    }
Marc Alff's avatar
Marc Alff committed
2524 2525 2526
    mysql_mutex_destroy(&mutex);
    mysql_cond_destroy(&cond);
    mysql_cond_destroy(&cond_client);
2527

2528 2529
    server_threads.erase(&thd);
    mysql_mutex_assert_owner(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2530
    delayed_insert_threads--;
2531 2532 2533 2534

    my_free(thd.query());
    thd.security_ctx->user= 0;
    thd.security_ctx->host= 0;
unknown's avatar
unknown committed
2535 2536 2537 2538 2539 2540 2541 2542 2543
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
Marc Alff's avatar
Marc Alff committed
2544
    mysql_mutex_lock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2545 2546
    if (!--locks_in_memory)
    {
Marc Alff's avatar
Marc Alff committed
2547
      mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
2548 2549
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
Marc Alff's avatar
Marc Alff committed
2550
        mysql_cond_signal(&cond);
unknown's avatar
unknown committed
2551 2552
	status=1;
      }
Marc Alff's avatar
Marc Alff committed
2553
      mysql_mutex_unlock(&mutex);
unknown's avatar
unknown committed
2554
    }
Marc Alff's avatar
Marc Alff committed
2555
    mysql_mutex_unlock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2556 2557 2558 2559
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
Konstantin Osipov's avatar
Konstantin Osipov committed
2560
  bool open_and_lock_table();
unknown's avatar
unknown committed
2561 2562 2563 2564
  bool handle_inserts(void);
};


2565
I_List<Delayed_insert> delayed_threads;
unknown's avatar
unknown committed
2566 2567


2568 2569 2570 2571
/**
  Return an instance of delayed insert thread that can handle
  inserts into a given table, if it exists. Otherwise return NULL.
*/
unknown's avatar
unknown committed
2572

2573
static
2574
Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
unknown's avatar
unknown committed
2575
{
Sergei Golubchik's avatar
Sergei Golubchik committed
2576
  THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
Marc Alff's avatar
Marc Alff committed
2577
  mysql_mutex_lock(&LOCK_delayed_insert);       // Protect master list
2578
  I_List_iterator<Delayed_insert> it(delayed_threads);
2579 2580
  Delayed_insert *di;
  while ((di= it++))
unknown's avatar
unknown committed
2581
  {
2582 2583
    if (!cmp(&table_list->db, &di->table_list.db) &&
	!cmp(&table_list->table_name, &di->table_list.table_name))
unknown's avatar
unknown committed
2584
    {
2585
      di->lock();
unknown's avatar
unknown committed
2586 2587 2588
      break;
    }
  }
Marc Alff's avatar
Marc Alff committed
2589
  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2590
  return di;
unknown's avatar
unknown committed
2591 2592 2593
}


2594 2595 2596 2597
/**
  Attempt to find or create a delayed insert thread to handle inserts
  into this table.

unknown's avatar
unknown committed
2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615
  @return In case of success, table_list->table points to a local copy
          of the delayed table or is set to NULL, which indicates a
          request for lock upgrade. In case of failure, value of
          table_list->table is undefined.
  @retval TRUE  - this thread ran out of resources OR
                - a newly created delayed insert thread ran out of
                  resources OR
                - the created thread failed to open and lock the table
                  (e.g. because it does not exist) OR
                - the table opened in the created thread turned out to
                  be a view
  @retval FALSE - table successfully opened OR
                - too many delayed insert threads OR
                - the table has triggers and we have to fall back to
                  a normal INSERT
                Two latter cases indicate a request for lock upgrade.

  XXX: why do we regard INSERT DELAYED into a view as an error and
2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633
  do not simply perform a lock upgrade?

  TODO: The approach with using two mutexes to work with the
  delayed thread list -- LOCK_delayed_insert and
  LOCK_delayed_create -- is redundant, and we only need one of
  them to protect the list.  The reason we have two locks is that
  we do not want to block look-ups in the list while we're waiting
  for the newly created thread to open the delayed table. However,
  this wait itself is redundant -- we always call get_local_table
  later on, and there wait again until the created thread acquires
  a table lock.

  As is redundant the concept of locks_in_memory, since we already
  have another counter with similar semantics - tables_in_use,
  both of them are devoted to counting the number of producers for
  a given consumer (delayed insert thread), only at different
  stages of producer-consumer relationship.

2634 2635
  The 'status' variable in Delayed_insert is redundant
  too, since there is already di->stacked_inserts.
2636 2637
*/

unknown's avatar
unknown committed
2638
static
2639 2640
bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
                       TABLE_LIST *table_list)
unknown's avatar
unknown committed
2641 2642
{
  int error;
2643
  Delayed_insert *di;
unknown's avatar
unknown committed
2644 2645
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
2646
  /* Must be set in the parser */
2647
  DBUG_ASSERT(table_list->db.str);
unknown's avatar
unknown committed
2648

2649
  /* Find the thread which handles this table. */
2650
  if (!(di= find_handler(thd, table_list)))
unknown's avatar
unknown committed
2651
  {
2652 2653 2654 2655
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
2656
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2657
      DBUG_RETURN(0);
Sergei Golubchik's avatar
Sergei Golubchik committed
2658
    THD_STAGE_INFO(thd, stage_creating_delayed_handler);
Marc Alff's avatar
Marc Alff committed
2659
    mysql_mutex_lock(&LOCK_delayed_create);
2660 2661 2662 2663
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
2664
    if (! (di= find_handler(thd, table_list)))
unknown's avatar
unknown committed
2665
    {
2666
      if (!(di= new Delayed_insert(thd->lex)))
unknown's avatar
unknown committed
2667
        goto end_create;
Michael Widenius's avatar
Michael Widenius committed
2668

2669 2670 2671
      /*
        Annotating delayed inserts is not supported.
      */
2672
      di->thd.variables.binlog_annotate_row_events= 0;
2673

2674
      di->thd.set_db(&table_list->db);
2675 2676
      di->thd.set_query(my_strndup(PSI_INSTRUMENT_ME,
                                   table_list->table_name.str,
2677
                                   table_list->table_name.length,
2678
                                   MYF(MY_WME | ME_FATAL)),
2679 2680
                        table_list->table_name.length, system_charset_info);
      if (di->thd.db.str == NULL || di->thd.query() == NULL)
unknown's avatar
unknown committed
2681
      {
unknown's avatar
unknown committed
2682
        /* The error is reported */
2683
	delete di;
unknown's avatar
unknown committed
2684
        goto end_create;
unknown's avatar
unknown committed
2685
      }
2686
      di->table_list= *table_list;			// Needed to open table
2687
      /* Replace volatile strings with local copies */
2688 2689
      di->table_list.alias.str=    di->table_list.table_name.str=    di->thd.query();
      di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
2690
      di->table_list.db= Lex_ident_db(di->thd.db);
2691 2692 2693 2694 2695
      /*
        Nulify select_lex because, if the thread that spawned the current one
        disconnects, the select_lex will point to freed memory.
      */
      di->table_list.select_lex= NULL;
2696 2697 2698 2699
      /*
        We need the tickets so that they can be cloned in
        handle_delayed_insert
      */
2700 2701
      MDL_REQUEST_INIT(&di->grl_protection, MDL_key::BACKUP, "", "",
                       MDL_BACKUP_DML, MDL_STATEMENT);
2702
      di->grl_protection.ticket= grl_protection_request->ticket;
2703 2704
      init_mdl_requests(&di->table_list);
      di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2705

2706
      di->lock();
Marc Alff's avatar
Marc Alff committed
2707 2708 2709 2710
      mysql_mutex_lock(&di->mutex);
      if ((error= mysql_thread_create(key_thread_delayed_insert,
                                      &di->thd.real_id, &connection_attrib,
                                      handle_delayed_insert, (void*) di)))
unknown's avatar
unknown committed
2711 2712 2713 2714
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
Marc Alff's avatar
Marc Alff committed
2715
        mysql_mutex_unlock(&di->mutex);
2716 2717
	di->unlock();
	delete di;
2718
	my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATAL), error);
unknown's avatar
unknown committed
2719
        goto end_create;
unknown's avatar
unknown committed
2720 2721
      }

2722 2723 2724 2725 2726 2727
      /*
        Wait until table is open unless the handler thread or the connection
        thread has been killed. Note that we in all cases must wait until the
        handler thread has been properly initialized before exiting. Otherwise
        we risk doing clone_ticket() on a ticket that is no longer valid.
      */
Sergei Golubchik's avatar
Sergei Golubchik committed
2728
      THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2729 2730
      while (!di->handler_thread_initialized ||
             (!di->thd.killed && !di->table && !thd->killed))
unknown's avatar
unknown committed
2731
      {
Marc Alff's avatar
Marc Alff committed
2732
        mysql_cond_wait(&di->cond_client, &di->mutex);
unknown's avatar
unknown committed
2733
      }
Marc Alff's avatar
Marc Alff committed
2734
      mysql_mutex_unlock(&di->mutex);
Sergei Golubchik's avatar
Sergei Golubchik committed
2735
      THD_STAGE_INFO(thd, stage_got_old_table);
2736 2737 2738 2739 2740
      if (thd->killed)
      {
        di->unlock();
        goto end_create;
      }
2741
      if (di->thd.killed)
unknown's avatar
unknown committed
2742
      {
2743
        if (di->thd.is_error() && ! di->retry)
2744
        {
unknown's avatar
unknown committed
2745 2746 2747
          /*
            Copy the error message. Note that we don't treat fatal
            errors in the delayed thread as fatal errors in the
2748 2749 2750
            main thread. If delayed thread was killed, we don't
            want to send "Server shutdown in progress" in the
            INSERT THREAD.
unknown's avatar
unknown committed
2751
          */
2752 2753
          my_message(di->thd.get_stmt_da()->sql_errno(),
                     di->thd.get_stmt_da()->message(),
2754
                     MYF(0));
2755 2756
        }
        di->unlock();
unknown's avatar
unknown committed
2757
        goto end_create;
unknown's avatar
unknown committed
2758
      }
Marc Alff's avatar
Marc Alff committed
2759
      mysql_mutex_lock(&LOCK_delayed_insert);
2760
      delayed_threads.append(di);
Marc Alff's avatar
Marc Alff committed
2761
      mysql_mutex_unlock(&LOCK_delayed_insert);
unknown's avatar
unknown committed
2762
    }
Marc Alff's avatar
Marc Alff committed
2763
    mysql_mutex_unlock(&LOCK_delayed_create);
unknown's avatar
unknown committed
2764 2765
  }

Marc Alff's avatar
Marc Alff committed
2766
  mysql_mutex_lock(&di->mutex);
2767
  table_list->table= di->get_local_table(thd);
Marc Alff's avatar
Marc Alff committed
2768
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
2769 2770
  if (table_list->table)
  {
2771
    DBUG_ASSERT(! thd->is_error());
2772
    thd->di= di;
unknown's avatar
unknown committed
2773
  }
2774
  /* Unlock the delayed insert object after its last access. */
2775
  di->unlock();
2776 2777
  DBUG_PRINT("exit", ("table_list->table: %p", table_list->table));
  DBUG_RETURN(thd->is_error());
2778

unknown's avatar
unknown committed
2779
end_create:
Marc Alff's avatar
Marc Alff committed
2780
  mysql_mutex_unlock(&LOCK_delayed_create);
Monty's avatar
Monty committed
2781
  DBUG_PRINT("exit", ("is_error(): %d", thd->is_error()));
2782
  DBUG_RETURN(thd->is_error());
unknown's avatar
unknown committed
2783 2784
}

2785 2786 2787 2788
#define memdup_vcol(thd, vcol)                                            \
  if (vcol)                                                               \
  {                                                                       \
    (vcol)= (Virtual_column_info*)(thd)->memdup((vcol), sizeof(*(vcol))); \
2789
    (vcol)->expr= NULL;                                                   \
2790
  }
unknown's avatar
unknown committed
2791

2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802
/**
  As we can't let many client threads modify the same TABLE
  structure of the dedicated delayed insert thread, we create an
  own structure for each client thread. This includes a row
  buffer to save the column values and new fields that point to
  the new row buffer. The memory is allocated in the client
  thread and is freed automatically.

  @pre This function is called from the client thread.  Delayed
       insert thread mutex must be acquired before invoking this
       function.
unknown's avatar
unknown committed
2803 2804 2805

  @return Not-NULL table object on success. NULL in case of an error,
                    which is set in client_thd.
unknown's avatar
unknown committed
2806 2807
*/

2808
TABLE *Delayed_insert::get_local_table(THD* client_thd)
unknown's avatar
unknown committed
2809 2810
{
  my_ptrdiff_t adjust_ptrs;
2811
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
2812
  TABLE *copy;
2813
  TABLE_SHARE *share;
2814
  uchar *bitmap;
2815
  char *copy_tmp;
2816
  uint bitmaps_used;
2817 2818
  Field **default_fields, **virtual_fields;
  uchar *record;
2819
  DBUG_ENTER("Delayed_insert::get_local_table");
unknown's avatar
unknown committed
2820 2821 2822 2823 2824 2825

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
Sergei Golubchik's avatar
Sergei Golubchik committed
2826
    THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2827
    mysql_cond_signal(&cond);			// Tell handler to lock table
2828
    while (!thd.killed && !thd.lock && ! client_thd->killed)
unknown's avatar
unknown committed
2829
    {
Marc Alff's avatar
Marc Alff committed
2830
      mysql_cond_wait(&cond_client, &mutex);
unknown's avatar
unknown committed
2831
    }
Sergei Golubchik's avatar
Sergei Golubchik committed
2832
    THD_STAGE_INFO(client_thd, stage_got_handler_lock);
unknown's avatar
unknown committed
2833
    if (client_thd->killed)
2834
      goto error2;
2835
    if (thd.killed)
unknown's avatar
unknown committed
2836
    {
2837
      /*
2838 2839 2840 2841 2842
        Check how the insert thread was killed. If it was killed
        by FLUSH TABLES which calls kill_delayed_threads_for_table(),
        then is_error is not set.
        In this case, return without setting an error,
        which means that the insert will be converted to a normal insert.
2843
      */
2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855
      if (thd.is_error())
      {
        /*
          Copy the error message. Note that we don't treat fatal
          errors in the delayed thread as fatal errors in the
          main thread. If delayed thread was killed, we don't
          want to send "Server shutdown in progress" in the
          INSERT THREAD.

          The thread could be killed with an error message if
          di->handle_inserts() or di->open_and_lock_table() fails.
        */
2856 2857
        my_message(thd.get_stmt_da()->sql_errno(),
                   thd.get_stmt_da()->message(), MYF(0));
2858
      }
2859
      goto error2;
unknown's avatar
unknown committed
2860 2861
    }
  }
2862
  share= table->s;
unknown's avatar
unknown committed
2863

2864
  /*
2865 2866 2867 2868 2869 2870 2871 2872 2873 2874
    Allocate memory for the TABLE object, the field pointers array,
    and one record buffer of reclength size.
    Normally a table has three record buffers of rec_buff_length size,
    which includes alignment bytes. Since the table copy is used for
    creating one record only, the other record buffers and alignment
    are unnecessary.
    As the table will also need to calculate default values and
    expresions, we have to allocate own version of fields. keys and key
    parts. The key and key parts are needed as parse_vcol_defs() changes
    them in case of long hash keys.
2875
  */
Sergei Golubchik's avatar
Sergei Golubchik committed
2876
  THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887
  if (!multi_alloc_root(client_thd->mem_root,
                        &copy_tmp, sizeof(*table),
                        &field, (uint) (share->fields+1)*sizeof(Field**),
                        &default_fields,
                        (share->default_fields +
                         share->default_expressions + 1) * sizeof(Field*),
                        &virtual_fields,
                        (share->virtual_fields + 1) * sizeof(Field*),
                        &record, (uint) share->reclength,
                        &bitmap, (uint) share->column_bitmap_size*4,
                        NullS))
2888
    goto error2;
unknown's avatar
unknown committed
2889

2890
  /* Copy the TABLE object. */
2891
  copy= new (copy_tmp) TABLE;
unknown's avatar
unknown committed
2892
  *copy= *table;
2893
  copy->vcol_refix_list.empty();
2894 2895
  init_sql_alloc(key_memory_TABLE, &copy->mem_root, TABLE_ALLOC_BLOCK_SIZE, 0,
                 MYF(MY_THREAD_SPECIFIC));
Michael Widenius's avatar
Michael Widenius committed
2896

unknown's avatar
unknown committed
2897
  /* We don't need to change the file handler here */
2898
  /* Assign the pointers for the field pointers array and the record. */
2899 2900
  copy->field= field;
  copy->record[0]= record;
2901
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2902
  if (share->default_fields || share->default_expressions)
2903
    copy->default_field= default_fields;
2904
  if (share->virtual_fields)
2905 2906
    copy->vfield= virtual_fields;

2907
  copy->expr_arena= NULL;
Sergei Golubchik's avatar
Sergei Golubchik committed
2908

2909 2910 2911
  /* Ensure we don't use the table list of the original table */
  copy->pos_in_table_list= 0;

2912 2913 2914
  /* We don't need statistics for insert delayed */
  copy->stats_cb= 0;

2915 2916 2917 2918 2919 2920 2921 2922 2923 2924
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
2925
  {
2926
    if (!(*field= (*org_field)->make_new_field(client_thd->mem_root, copy, 1)))
unknown's avatar
unknown committed
2927
      goto error;
2928
    (*field)->unireg_check= (*org_field)->unireg_check;
2929
    (*field)->invisible= (*org_field)->invisible;
2930
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
2931
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2932 2933
    (*field)->flags|= ((*org_field)->flags & LONG_UNIQUE_HASH_FIELD);
    (*field)->invisible= (*org_field)->invisible;
2934 2935 2936
    memdup_vcol(client_thd, (*field)->vcol_info);
    memdup_vcol(client_thd, (*field)->default_value);
    memdup_vcol(client_thd, (*field)->check_constraint);
2937 2938
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
2939 2940 2941
  }
  *field=0;

2942 2943
  if (copy_keys_from_share(copy, client_thd->mem_root))
    goto error;
2944

2945 2946
  if (share->virtual_fields || share->default_expressions ||
      share->default_fields)
Igor Babaev's avatar
Igor Babaev committed
2947
  {
2948
    bool error_reported= FALSE;
2949
    if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
2950 2951
                                 &error_reported,
                                 VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
2952
      goto error;
Igor Babaev's avatar
Igor Babaev committed
2953 2954
  }

Sergei Golubchik's avatar
Sergei Golubchik committed
2955
  switch_defaults_to_nullable_trigger_fields(copy);
unknown's avatar
unknown committed
2956

2957 2958
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
2959 2960 2961 2962

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

2963 2964 2965 2966
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
Monty's avatar
Monty committed
2967 2968
  create_last_bit_mask(&copy->def_read_set);
  create_last_bit_mask(&copy->def_write_set);
2969
  bitmaps_used= 2;
Monty's avatar
Monty committed
2970
  if (share->default_fields || share->default_expressions)
2971
  {
2972
    my_bitmap_init(&copy->has_value_set,
2973 2974
                   (my_bitmap_map*) (bitmap +
                                     bitmaps_used*share->column_bitmap_size),
2975
                   share->fields);
2976
  }
2977
  copy->tmp_set.bitmap= 0;                      // To catch errors
2978
  bzero((char*) bitmap, share->column_bitmap_size * bitmaps_used);
2979 2980
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;
2981 2982
  move_root(client_thd->mem_root, &copy->mem_root);
  free_root(&copy->mem_root, 0);
2983

2984
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
2985 2986 2987

  /* Got fatal error */
 error:
2988
  free_root(&copy->mem_root, 0);
2989
error2:
unknown's avatar
unknown committed
2990
  tables_in_use--;
Marc Alff's avatar
Marc Alff committed
2991
  mysql_cond_signal(&cond);                     // Inform thread about abort
2992
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2993 2994 2995 2996 2997
}


/* Put a question in queue */

unknown's avatar
unknown committed
2998
static
unknown's avatar
unknown committed
2999 3000
int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
                  LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
3001
{
3002
  delayed_row *row= 0;
3003
  Delayed_insert *di=thd->di;
3004
  const Discrete_interval *forced_auto_inc;
Etienne Guesnet's avatar
Etienne Guesnet committed
3005
  size_t user_len, host_len, ip_length;
unknown's avatar
unknown committed
3006
  DBUG_ENTER("write_delayed");
3007 3008
  DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
                       (ulong) query.length));
unknown's avatar
unknown committed
3009

Sergei Golubchik's avatar
Sergei Golubchik committed
3010
  THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
Marc Alff's avatar
Marc Alff committed
3011
  mysql_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
3012
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
Marc Alff's avatar
Marc Alff committed
3013
    mysql_cond_wait(&di->cond_client, &di->mutex);
Sergei Golubchik's avatar
Sergei Golubchik committed
3014
  THD_STAGE_INFO(thd, stage_storing_row_into_queue);
unknown's avatar
unknown committed
3015

3016
  if (thd->killed)
unknown's avatar
unknown committed
3017 3018
    goto err;

3019 3020 3021 3022 3023 3024 3025
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
3026 3027
  {
    char *str;
3028 3029
    if (!(str= my_strndup(PSI_INSTRUMENT_ME, query.str, query.length,
                          MYF(MY_WME))))
3030
      goto err;
3031 3032
    query.str= str;
  }
3033 3034 3035
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
3036
    my_free(query.str);
unknown's avatar
unknown committed
3037
    goto err;
3038
  }
unknown's avatar
unknown committed
3039

Etienne Guesnet's avatar
Etienne Guesnet committed
3040
  user_len= host_len= ip_length= 0;
3041 3042 3043 3044 3045 3046 3047 3048
  row->user= row->host= row->ip= NULL;
  if (thd->security_ctx)
  {
    if (thd->security_ctx->user)
      user_len= strlen(thd->security_ctx->user) + 1;
    if (thd->security_ctx->host)
      host_len= strlen(thd->security_ctx->host) + 1;
    if (thd->security_ctx->ip)
Etienne Guesnet's avatar
Etienne Guesnet committed
3049
      ip_length= strlen(thd->security_ctx->ip) + 1;
3050
  }
3051
  /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */
3052 3053
  if (!(row->record= (char*) my_malloc(PSI_INSTRUMENT_ME,
                                       table->s->reclength +
Etienne Guesnet's avatar
Etienne Guesnet committed
3054
                                       user_len + host_len + ip_length,
3055
                                       MYF(MY_WME))))
unknown's avatar
unknown committed
3056
    goto err;
3057
  memcpy(row->record, table->record[0], table->s->reclength);
3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073

  if (thd->security_ctx)
  {
    if (thd->security_ctx->user)
    {
      row->user= row->record + table->s->reclength;
      memcpy(row->user, thd->security_ctx->user, user_len);
    }
    if (thd->security_ctx->host)
    {
      row->host= row->record + table->s->reclength + user_len;
      memcpy(row->host, thd->security_ctx->host, host_len);
    }
    if (thd->security_ctx->ip)
    {
      row->ip= row->record + table->s->reclength + user_len + host_len;
Etienne Guesnet's avatar
Etienne Guesnet committed
3074
      memcpy(row->ip, thd->security_ctx->ip, ip_length);
3075 3076 3077 3078 3079
    }
  }
  row->query_id= thd->query_id;
  row->thread_id= thd->thread_id;

3080 3081
  row->start_time=                thd->start_time;
  row->start_time_sec_part=       thd->start_time_sec_part;
3082
  row->query_start_sec_part_used= thd->used & THD::QUERY_START_SEC_PART_USED;
3083 3084 3085 3086 3087 3088 3089 3090 3091 3092
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
unknown's avatar
unknown committed
3093

3094 3095 3096 3097 3098
  /* Add session variable timezone
     Time_zone object will not be freed even the thread is ended.
     So we can get time_zone object from thread which handling delayed statement.
     See the comment of my_tz_find() for detail.
  */
3099
  if (thd->used & THD::TIME_ZONE_USED)
3100 3101 3102 3103 3104 3105 3106
  {
    row->time_zone = thd->variables.time_zone;
  }
  else
  {
    row->time_zone = NULL;
  }
3107
  /* Copy session variables. */
3108 3109
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
3110 3111 3112
  row->sql_mode=                 thd->variables.sql_mode;
  row->auto_increment_field_not_null= table->auto_increment_field_not_null;

3113 3114 3115 3116 3117 3118 3119
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
3120

unknown's avatar
unknown committed
3121 3122 3123
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
3124
  if (table->s->blob_fields)
unknown's avatar
unknown committed
3125
    unlink_blobs(table);
Marc Alff's avatar
Marc Alff committed
3126
  mysql_cond_signal(&di->cond);
unknown's avatar
unknown committed
3127 3128

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
Marc Alff's avatar
Marc Alff committed
3129
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
3130 3131 3132 3133
  DBUG_RETURN(0);

 err:
  delete row;
Marc Alff's avatar
Marc Alff committed
3134
  mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
3135 3136 3137
  DBUG_RETURN(1);
}

unknown's avatar
unknown committed
3138 3139 3140 3141
/**
  Signal the delayed insert thread that this user connection
  is finished using it for this statement.
*/
unknown's avatar
unknown committed
3142 3143 3144

static void end_delayed_insert(THD *thd)
{
3145
  DBUG_ENTER("end_delayed_insert");
3146
  Delayed_insert *di=thd->di;
Marc Alff's avatar
Marc Alff committed
3147
  mysql_mutex_lock(&di->mutex);
3148
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
3149 3150 3151
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
Marc Alff's avatar
Marc Alff committed
3152
    mysql_cond_signal(&di->cond);
unknown's avatar
unknown committed
3153
  }
Marc Alff's avatar
Marc Alff committed
3154
  mysql_mutex_unlock(&di->mutex);
3155
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3156 3157 3158 3159 3160 3161 3162
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
3163
  DBUG_ENTER("kill_delayed_threads");
Marc Alff's avatar
Marc Alff committed
3164
  mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
unknown's avatar
unknown committed
3165

3166
  I_List_iterator<Delayed_insert> it(delayed_threads);
3167 3168
  Delayed_insert *di;
  while ((di= it++))
unknown's avatar
unknown committed
3169
  {
3170
    mysql_mutex_lock(&di->thd.LOCK_thd_kill);
3171
    if (di->thd.killed < KILL_CONNECTION)
3172
      di->thd.set_killed_no_mutex(KILL_CONNECTION);
3173
    di->thd.abort_current_cond_wait(false);
3174
    mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
unknown's avatar
unknown committed
3175
  }
Marc Alff's avatar
Marc Alff committed
3176
  mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
3177
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
3178 3179 3180
}


3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193
/**
  A strategy for the prelocking algorithm which prevents the
  delayed insert thread from opening tables with engines which
  do not support delayed inserts.

  Particularly it allows to abort open_tables() as soon as we
  discover that we have opened a MERGE table, without acquiring
  metadata locks on underlying tables.
*/

class Delayed_prelocking_strategy : public Prelocking_strategy
{
public:
3194
  bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
3195
                              Sroutine_hash_entry *rt, sp_head *sp,
3196 3197 3198 3199 3200
                              bool *need_prelocking) override;
  bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
                            TABLE_LIST *table_list, bool *need_prelocking) override;
  bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
                           TABLE_LIST *table_list, bool *need_prelocking) override;
3201 3202 3203
};


Sergei Golubchik's avatar
Sergei Golubchik committed
3204 3205
bool Delayed_prelocking_strategy::handle_table(THD *thd,
             Query_tables_list *prelocking_ctx,
3206 3207 3208 3209 3210 3211
             TABLE_LIST *table_list, bool *need_prelocking)
{
  DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);

  if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
  {
3212
    my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name.str);
3213 3214 3215 3216 3217 3218
    return TRUE;
  }
  return FALSE;
}


Sergei Golubchik's avatar
Sergei Golubchik committed
3219 3220 3221
bool Delayed_prelocking_strategy::handle_routine(THD *thd,
               Query_tables_list *prelocking_ctx, Sroutine_hash_entry *rt,
               sp_head *sp, bool *need_prelocking)
3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238
{
  /* LEX used by the delayed insert thread has no routines. */
  DBUG_ASSERT(0);
  return FALSE;
}


bool Delayed_prelocking_strategy::
handle_view(THD *thd, Query_tables_list *prelocking_ctx,
            TABLE_LIST *table_list, bool *need_prelocking)
{
  /* We don't open views in the delayed insert thread. */
  DBUG_ASSERT(0);
  return FALSE;
}


Konstantin Osipov's avatar
Konstantin Osipov committed
3239 3240 3241 3242 3243 3244 3245 3246 3247 3248
/**
   Open and lock table for use by delayed thread and check that
   this table is suitable for delayed inserts.

   @retval FALSE - Success.
   @retval TRUE  - Failure.
*/

bool Delayed_insert::open_and_lock_table()
{
3249 3250 3251 3252 3253
  Delayed_prelocking_strategy prelocking_strategy;

  /*
    Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
    error for tables with engines which don't support delayed inserts.
3254 3255 3256 3257

    We can't do auto-repair in insert delayed thread, as it would hang
    when trying to an exclusive MDL_LOCK on the table during repair
    as the connection thread has a SHARED_WRITE lock.
3258
  */
Konstantin Osipov's avatar
Konstantin Osipov committed
3259
  if (!(table= open_n_lock_single_table(&thd, &table_list,
3260
                                        TL_WRITE_DELAYED,
3261 3262
                                        MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
                                        MYSQL_OPEN_IGNORE_REPAIR,
3263
                                        &prelocking_strategy)))
Konstantin Osipov's avatar
Konstantin Osipov committed
3264
  {
3265 3266 3267
    /* If table was crashed, then upper level should retry open+repair */
    retry= table_list.crashed;
    thd.fatal_error();                      // Abort waiting inserts
Konstantin Osipov's avatar
Konstantin Osipov committed
3268 3269
    return TRUE;
  }
3270

3271
  if (table->triggers || table->check_constraints)
Konstantin Osipov's avatar
Konstantin Osipov committed
3272 3273
  {
    /*
3274 3275
      Table has triggers or check constraints. This is not an error, but we do
      not support these with delayed insert. Terminate the delayed
Konstantin Osipov's avatar
Konstantin Osipov committed
3276 3277 3278 3279 3280
      thread without an error and thus request lock upgrade.
    */
    return TRUE;
  }
  table->copy_blobs= 1;
3281 3282

  table->file->prepare_for_row_logging();
Konstantin Osipov's avatar
Konstantin Osipov committed
3283 3284 3285 3286
  return FALSE;
}


3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297
/*
 * Create a new delayed insert thread
*/

pthread_handler_t handle_delayed_insert(void *arg)
{
  Delayed_insert *di=(Delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
3298
  thd->set_start_time();
3299
  server_threads.insert(thd);
3300
  if (abort_loop)
3301
    thd->set_killed(KILL_CONNECTION);
3302 3303
  else
    thd->reset_killed();
3304

Marc Alff's avatar
Marc Alff committed
3305 3306
  mysql_thread_set_psi_id(thd->thread_id);

3307
  /*
Marc Alff's avatar
Marc Alff committed
3308
    Wait until the client runs into mysql_cond_wait(),
3309 3310 3311 3312 3313 3314
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
Marc Alff's avatar
Marc Alff committed
3315
  mysql_mutex_lock(&di->mutex);
3316 3317 3318
  if (my_thread_init())
  {
    /* Can't use my_error since store_globals has not yet been called */
3319
    thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3320
    di->handler_thread_initialized= TRUE;
3321
  }
3322 3323 3324 3325
  else
  {
    DBUG_ENTER("handle_delayed_insert");
    thd->thread_stack= (char*) &thd;
3326
    if (init_thr_lock())
3327
    {
3328
      thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3329
      di->handler_thread_initialized= TRUE;
3330 3331 3332 3333
      thd->fatal_error();
      goto err;
    }

3334 3335
    thd->store_globals();

3336
    thd->lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
3337

3338
    /*
3339 3340
      INSERT DELAYED has to go to row-based format because the time
      at which rows are inserted cannot be determined in mixed mode.
3341
    */
3342
    thd->set_current_stmt_binlog_format_row_if_mixed();
3343 3344
    /* Don't annotate insert delayed binlog events */
    thd->variables.binlog_annotate_row_events= 0;
3345

3346
    /*
3347 3348 3349 3350
      Clone tickets representing protection against GRL and the lock on
      the target table for the insert and add them to the list of granted
      metadata locks held by the handler thread. This is safe since the
      handler thread is not holding nor waiting on any metadata locks.
3351
    */
3352 3353
    if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
        thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
3354
    {
3355
      thd->release_transactional_locks();
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365
      di->handler_thread_initialized= TRUE;
      goto err;
    }

    /*
      Now that the ticket has been cloned, it is safe for the connection
      thread to exit.
    */
    di->handler_thread_initialized= TRUE;
    di->table_list.mdl_request.ticket= NULL;
3366

3367 3368
    thd->set_query_id(next_query_id());

Konstantin Osipov's avatar
Konstantin Osipov committed
3369
    if (di->open_and_lock_table())
3370 3371
      goto err;

3372 3373 3374 3375 3376 3377 3378 3379 3380 3381
    /*
      INSERT DELAYED generally expects thd->lex->current_select to be NULL,
      since this is not an attribute of the current thread. This can lead to
      problems if the thread that spawned the current one disconnects.
      current_select will then point to freed memory. But current_select is
      required to resolve the partition function. So, after fulfilling that
      requirement, we set the current_select to 0.
    */
    thd->lex->current_select= NULL;

3382
    /* Tell client that the thread is initialized */
Marc Alff's avatar
Marc Alff committed
3383
    mysql_cond_signal(&di->cond_client);
3384

3385 3386 3387 3388 3389 3390
    /*
      Inform mdl that it needs to call mysql_lock_abort to abort locks
      for delayed insert.
    */
    thd->mdl_context.set_needs_thr_lock_abort(TRUE);

3391
    di->table->mark_columns_needed_for_insert();
3392 3393
    /* Mark all columns for write as we don't know which columns we get from user */
    bitmap_set_all(di->table->write_set);
3394

3395 3396 3397 3398 3399
    /* Now wait until we get an insert or lock to handle */
    /* We will not abort as long as a client thread uses this thread */

    for (;;)
    {
3400
      if (thd->killed)
3401 3402
      {
        uint lock_count;
3403
        DBUG_PRINT("delayed", ("Insert delayed killed"));
3404 3405 3406 3407
        /*
          Remove this from delay insert list so that no one can request a
          table from this
        */
Marc Alff's avatar
Marc Alff committed
3408 3409
        mysql_mutex_unlock(&di->mutex);
        mysql_mutex_lock(&LOCK_delayed_insert);
3410 3411
        di->unlink();
        lock_count=di->lock_count();
Marc Alff's avatar
Marc Alff committed
3412 3413
        mysql_mutex_unlock(&LOCK_delayed_insert);
        mysql_mutex_lock(&di->mutex);
3414 3415
        if (!lock_count && !di->tables_in_use && !di->stacked_inserts &&
            !thd->lock)
3416 3417 3418
          break;					// Time to die
      }

3419
      /* Shouldn't wait if killed or an insert is waiting. */
3420 3421 3422
      DBUG_PRINT("delayed",
                 ("thd->killed: %d  di->status: %d  di->stacked_inserts: %d",
                  thd->killed, di->status, di->stacked_inserts));
3423
      if (!thd->killed && !di->status && !di->stacked_inserts)
3424 3425 3426 3427 3428
      {
        struct timespec abstime;
        set_timespec(abstime, delayed_insert_timeout);

        /* Information for pthread_kill */
3429 3430
        mysql_mutex_unlock(&di->mutex);
        mysql_mutex_lock(&di->thd.mysys_var->mutex);
3431 3432
        di->thd.mysys_var->current_mutex= &di->mutex;
        di->thd.mysys_var->current_cond= &di->cond;
3433 3434
        mysql_mutex_unlock(&di->thd.mysys_var->mutex);
        mysql_mutex_lock(&di->mutex);
Sergei Golubchik's avatar
Sergei Golubchik committed
3435
        THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
3436 3437

        DBUG_PRINT("info",("Waiting for someone to insert rows"));
3438
        while (!thd->killed && !di->status)
3439 3440
        {
          int error;
3441
          mysql_audit_release(thd);
Marc Alff's avatar
Marc Alff committed
3442
          error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
3443 3444 3445
#ifdef EXTRA_DEBUG
          if (error && error != EINTR && error != ETIMEDOUT)
          {
Marc Alff's avatar
Marc Alff committed
3446 3447 3448
            fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
            DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
                                 error));
3449 3450 3451
          }
#endif
          if (error == ETIMEDOUT || error == ETIME)
3452
            thd->set_killed(KILL_CONNECTION);
3453 3454
        }
        /* We can't lock di->mutex and mysys_var->mutex at the same time */
Marc Alff's avatar
Marc Alff committed
3455 3456
        mysql_mutex_unlock(&di->mutex);
        mysql_mutex_lock(&di->thd.mysys_var->mutex);
3457 3458
        di->thd.mysys_var->current_mutex= 0;
        di->thd.mysys_var->current_cond= 0;
Marc Alff's avatar
Marc Alff committed
3459 3460
        mysql_mutex_unlock(&di->thd.mysys_var->mutex);
        mysql_mutex_lock(&di->mutex);
3461
      }
3462 3463 3464 3465 3466 3467 3468 3469 3470

      /*
        The code depends on that the following ASSERT always hold.
        I don't want to accidently introduce and bugs in the following code
        in this commit, so I leave the small cleaning up of the code to
        a future commit
      */
      DBUG_ASSERT(thd->lock || di->stacked_inserts == 0);

3471
      DBUG_PRINT("delayed",
3472 3473
                 ("thd->killed: %d  di->status: %d  di->stacked_insert: %d  di->tables_in_use: %d  thd->lock: %d",
                  thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0));
3474

3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485
      /*
        This is used to test see what happens if killed is sent before
        we have time to handle the insert requests.
      */
      DBUG_EXECUTE_IF("write_delay_wakeup",
                      if (!thd->killed && di->stacked_inserts)
                        my_sleep(500000);
                      );

      if (di->tables_in_use && ! thd->lock &&
          (!thd->killed || di->stacked_inserts))
3486
      {
3487
        thd->set_query_id(next_query_id());
3488 3489 3490 3491 3492 3493 3494 3495 3496 3497
        /*
          Request for new delayed insert.
          Lock the table, but avoid to be blocked by a global read lock.
          If we got here while a global read lock exists, then one or more
          inserts started before the lock was requested. These are allowed
          to complete their work before the server returns control to the
          client which requested the global read lock. The delayed insert
          handler will close the table and finish when the outstanding
          inserts are done.
        */
3498
        if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3499
        {
3500
          /* Fatal error */
3501
          thd->set_killed(KILL_CONNECTION);
3502
        }
Marc Alff's avatar
Marc Alff committed
3503
        mysql_cond_broadcast(&di->cond_client);
3504 3505 3506
      }
      if (di->stacked_inserts)
      {
3507 3508
        delayed_row *row;
        I_List_iterator<delayed_row> it(di->rows);
3509 3510
        my_thread_id cur_thd= di->thd.thread_id;

3511 3512
        while ((row= it++))
        {
3513
          if (cur_thd != row->thread_id)
3514
          {
3515 3516 3517 3518
            mysql_audit_external_lock_ex(&di->thd, row->thread_id,
                row->user, row->host, row->ip, row->query_id,
                di->table->s, F_WRLCK);
            cur_thd= row->thread_id;
3519 3520
          }
        }
3521 3522 3523
        if (di->handle_inserts())
        {
          /* Some fatal error */
3524
          thd->set_killed(KILL_CONNECTION);
3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535
        }
      }
      di->status=0;
      if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
      {
        /*
          No one is doing a insert delayed
          Unlock table so that other threads can use it
        */
        MYSQL_LOCK *lock=thd->lock;
        thd->lock=0;
Marc Alff's avatar
Marc Alff committed
3536
        mysql_mutex_unlock(&di->mutex);
3537 3538 3539 3540 3541 3542
        /*
          We need to release next_insert_id before unlocking. This is
          enforced by handler::ha_external_lock().
        */
        di->table->file->ha_release_auto_increment();
        mysql_unlock_tables(thd, lock);
Konstantin Osipov's avatar
Konstantin Osipov committed
3543
        trans_commit_stmt(thd);
3544
        di->group_count=0;
3545
        mysql_audit_release(thd);
3546 3547 3548 3549
        /*
          Reset binlog. We can't call ha_reset() for the table as this will
          reset the table maps we have calculated earlier.
        */
Marc Alff's avatar
Marc Alff committed
3550
        mysql_mutex_lock(&di->mutex);
3551
      }
3552 3553 3554 3555 3556 3557 3558

      /*
        Reset binlog. We can't call ha_reset() for the table as this will
        reset the table maps we have calculated earlier.
      */
      thd->reset_binlog_for_next_statement();

3559
      if (di->tables_in_use)
Marc Alff's avatar
Marc Alff committed
3560
        mysql_cond_broadcast(&di->cond_client); // If waiting clients
3561 3562 3563 3564 3565
    }

  err:
    DBUG_LEAVE;
  }
3566

3567 3568 3569 3570
  {
    DBUG_ENTER("handle_delayed_insert-cleanup");
    di->table=0;
    mysql_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
3571

3572 3573 3574 3575 3576 3577 3578 3579 3580
    /*
      Protect against mdl_locks trying to access open tables
      We use KILL_CONNECTION_HARD here to ensure that
      THD::notify_shared_lock() dosn't try to access open tables after
      this.
    */
    mysql_mutex_lock(&thd->LOCK_thd_data);
    thd->mdl_context.set_needs_thr_lock_abort(0);
    mysql_mutex_unlock(&thd->LOCK_thd_data);
3581
    thd->set_killed(KILL_CONNECTION_HARD);	        // If error
3582

3583
    close_thread_tables(thd);			// Free the table
3584
    thd->release_transactional_locks();
3585
    mysql_cond_broadcast(&di->cond_client);       // Safety
3586

3587 3588 3589 3590 3591 3592 3593 3594 3595
    mysql_mutex_lock(&LOCK_delayed_create);    // Because of delayed_get_table
    mysql_mutex_lock(&LOCK_delayed_insert);
    /*
      di should be unlinked from the thread handler list and have no active
      clients
    */
    delete di;
    mysql_mutex_unlock(&LOCK_delayed_insert);
    mysql_mutex_unlock(&LOCK_delayed_create);
unknown's avatar
unknown committed
3596

3597 3598
    DBUG_LEAVE;
  }
unknown's avatar
unknown committed
3599 3600
  my_thread_end();
  pthread_exit(0);
3601 3602

  return 0;
unknown's avatar
unknown committed
3603 3604 3605
}


3606
/* Remove all pointers to data for blob fields so that original table doesn't try to free them */
unknown's avatar
unknown committed
3607

3608
static void unlink_blobs(TABLE *table)
unknown's avatar
unknown committed
3609 3610 3611
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
3612
    if ((*ptr)->flags & BLOB_FLAG)
unknown's avatar
unknown committed
3613 3614 3615 3616 3617 3618
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

3619
static void free_delayed_insert_blobs(TABLE *table)
3620 3621 3622
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
3623
    if ((*ptr)->flags & BLOB_FLAG)
3624 3625 3626 3627 3628 3629 3630
      ((Field_blob *) *ptr)->free();
  }
}


/* set value field for blobs to point to data in record */

3631
static void set_delayed_insert_blobs(TABLE *table)
unknown's avatar
unknown committed
3632 3633 3634
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
3635
    if ((*ptr)->flags & BLOB_FLAG)
unknown's avatar
unknown committed
3636
    {
3637 3638 3639 3640
      Field_blob *blob= ((Field_blob *) *ptr);
      uchar *data= blob->get_ptr();
      if (data)
        blob->set_value(data);     // Set value.ptr() to point to data
unknown's avatar
unknown committed
3641 3642 3643 3644 3645
    }
  }
}


3646
bool Delayed_insert::handle_inserts(void)
unknown's avatar
unknown committed
3647 3648
{
  int error;
3649
  ulong max_rows;
3650
  bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
3651
  delayed_row *row;
unknown's avatar
unknown committed
3652 3653 3654
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
Marc Alff's avatar
Marc Alff committed
3655
  mysql_mutex_unlock(&mutex);
unknown's avatar
unknown committed
3656 3657

  table->next_number_field=table->found_next_number_field;
3658
  table->use_all_columns();
unknown's avatar
unknown committed
3659

Sergei Golubchik's avatar
Sergei Golubchik committed
3660
  THD_STAGE_INFO(&thd, stage_upgrading_lock);
3661 3662
  if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
                                   thd.variables.lock_wait_timeout))
unknown's avatar
unknown committed
3663
  {
3664 3665 3666 3667 3668
    /*
      This can happen if thread is killed either by a shutdown
      or if another thread is removing the current table definition
      from the table cache.
    */
3669
    my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATAL | ME_ERROR_LOG),
3670
             table->s->table_name.str);
unknown's avatar
unknown committed
3671 3672 3673
    goto err;
  }

Sergei Golubchik's avatar
Sergei Golubchik committed
3674
  THD_STAGE_INFO(&thd, stage_insert);
3675
  max_rows= delayed_insert_limit;
3676
  if (thd.killed || table->s->tdc->flushed)
unknown's avatar
unknown committed
3677
  {
3678
    thd.set_killed(KILL_SYSTEM_THREAD);
3679
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
3680 3681
  }

3682 3683
  if (table->file->ha_rnd_init_with_error(0))
    goto err;
3684 3685 3686 3687 3688
  /*
    We have to call prepare_for_row_logging() as the second call to
    handler_writes() will not have called decide_logging_format.
  */
  table->file->prepare_for_row_logging();
3689
  table->file->prepare_for_modify(true, true);
3690
  using_bin_log= table->file->row_logging;
3691

3692 3693 3694 3695 3696
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
Marko Mäkelä's avatar
Marko Mäkelä committed
3697
  if (!using_bin_log && !table->s->long_unique_table)
3698
    table->file->extra(HA_EXTRA_WRITE_CACHE);
3699

Marc Alff's avatar
Marc Alff committed
3700
  mysql_mutex_lock(&mutex);
3701

unknown's avatar
unknown committed
3702 3703
  while ((row=rows.get()))
  {
3704
    int tmp_error;
unknown's avatar
unknown committed
3705
    stacked_inserts--;
Marc Alff's avatar
Marc Alff committed
3706
    mysql_mutex_unlock(&mutex);
3707
    memcpy(table->record[0],row->record,table->s->reclength);
3708 3709
    if (table->s->blob_fields)
      set_delayed_insert_blobs(table);
unknown's avatar
unknown committed
3710 3711

    thd.start_time=row->start_time;
3712
    thd.start_time_sec_part=row->start_time_sec_part;
3713
    thd.used= row->query_start_sec_part_used;
3714 3715 3716 3717 3718
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
3719 3720 3721
    DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
                           row->query.str : "[NULL]",
                           (ulong) row->query.length));
3722 3723
    if (log_query)
    {
3724 3725 3726 3727
      /*
        Guaranteed that the INSERT DELAYED STMT will not be here
        in SBR when mysql binlog is enabled.
      */
3728 3729
      DBUG_ASSERT(!mysql_bin_log.is_open() ||
                  thd.is_current_stmt_binlog_format_row());
3730

3731 3732 3733 3734 3735 3736 3737 3738
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
3739 3740 3741
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
3742 3743 3744 3745
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3746
    table->auto_increment_field_not_null= row->auto_increment_field_not_null;
unknown's avatar
unknown committed
3747

3748
    /* Copy the session variables. */
3749 3750
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
3751 3752
    thd.variables.sql_mode=                 row->sql_mode;

3753 3754 3755 3756 3757 3758 3759
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
3760

3761
    info.ignore= row->ignore;
unknown's avatar
unknown committed
3762
    info.handle_duplicates= row->dup;
3763
    if (info.ignore ||
unknown's avatar
unknown committed
3764
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
3765 3766 3767 3768
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
3769 3770 3771 3772 3773 3774 3775
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
3776 3777
    if (info.handle_duplicates == DUP_UPDATE)
      table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
unknown's avatar
unknown committed
3778
    thd.clear_error(); // reset error for binlog
3779

3780
    tmp_error= 0;
3781
    if (unlikely(table->vfield))
3782 3783
    {
      /*
3784 3785 3786
        Virtual fields where not calculated by caller as the temporary
        TABLE object used had vcol_set empty. Better to calculate them
        here to make the caller faster.
3787
      */
3788 3789
      tmp_error= table->update_virtual_fields(table->file,
                                              VCOL_UPDATE_FOR_WRITE);
3790 3791
    }

3792
    if (unlikely(tmp_error || write_record(&thd, table, &info, NULL)))
unknown's avatar
unknown committed
3793
    {
3794
      info.error_count++;				// Ignore errors
3795
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
3796
      row->log_query = 0;
unknown's avatar
unknown committed
3797
    }
3798

unknown's avatar
unknown committed
3799 3800 3801 3802 3803
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
3804 3805 3806 3807 3808
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
3809

3810
    if (table->s->blob_fields)
unknown's avatar
unknown committed
3811
      free_delayed_insert_blobs(table);
3812
    thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
unknown's avatar
unknown committed
3813
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
Marc Alff's avatar
Marc Alff committed
3814
    mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
3815

3816 3817 3818 3819 3820 3821
    /*
      Reset the table->auto_increment_field_not_null as it is valid for
      only one row.
    */
    table->auto_increment_field_not_null= FALSE;

unknown's avatar
unknown committed
3822
    delete row;
3823 3824 3825 3826 3827 3828 3829
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
3830
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
3831 3832 3833 3834 3835
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
Marc Alff's avatar
Marc Alff committed
3836
          mysql_cond_broadcast(&cond_client);   // If waiting clients
Sergei Golubchik's avatar
Sergei Golubchik committed
3837
	THD_STAGE_INFO(&thd, stage_reschedule);
Marc Alff's avatar
Marc Alff committed
3838
        mysql_mutex_unlock(&mutex);
3839
	if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
unknown's avatar
unknown committed
3840 3841 3842
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
3843
	  sql_print_error("%s", thd.get_stmt_da()->message());
3844
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
unknown's avatar
unknown committed
3845 3846
	  goto err;
	}
unknown's avatar
unknown committed
3847
	query_cache_invalidate3(&thd, table, 1);
3848 3849
	if (thr_reschedule_write_lock(*thd.lock->locks,
                                thd.variables.lock_wait_timeout))
unknown's avatar
unknown committed
3850
	{
3851 3852
          /* This is not known to happen. */
          my_error(ER_DELAYED_CANT_CHANGE_LOCK,
3853
                   MYF(ME_FATAL | ME_ERROR_LOG),
3854 3855
                   table->s->table_name.str);
          goto err;
unknown's avatar
unknown committed
3856
	}
Marko Mäkelä's avatar
Marko Mäkelä committed
3857
	if (!using_bin_log && !table->s->long_unique_table)
3858
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
Marc Alff's avatar
Marc Alff committed
3859
        mysql_mutex_lock(&mutex);
Sergei Golubchik's avatar
Sergei Golubchik committed
3860
	THD_STAGE_INFO(&thd, stage_insert);
unknown's avatar
unknown committed
3861 3862
      }
      if (tables_in_use)
Marc Alff's avatar
Marc Alff committed
3863
        mysql_cond_broadcast(&cond_client);     // If waiting clients
unknown's avatar
unknown committed
3864 3865
    }
  }
3866

3867 3868
  table->file->ha_rnd_end();

3869
  if (WSREP((&thd)))
3870
    thd_proc_info(&thd, "Insert done");
3871
  else
3872
    thd_proc_info(&thd, 0);
Marc Alff's avatar
Marc Alff committed
3873
  mysql_mutex_unlock(&mutex);
3874

3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
3886
  */
3887 3888 3889
  if (table->file->row_logging &&
      thd.binlog_flush_pending_rows_event(TRUE,
                                          table->file->row_logging_has_trans))
3890
    goto err;
3891

3892
  if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
unknown's avatar
unknown committed
3893 3894
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
3895
    sql_print_error("%s", thd.get_stmt_da()->message());
3896
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
unknown's avatar
unknown committed
3897 3898
    goto err;
  }
unknown's avatar
unknown committed
3899
  query_cache_invalidate3(&thd, table, 1);
Marc Alff's avatar
Marc Alff committed
3900
  mysql_mutex_lock(&mutex);
unknown's avatar
unknown committed
3901 3902 3903
  DBUG_RETURN(0);

 err:
3904 3905 3906
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
3907
  /* Remove all not used rows */
3908
  mysql_mutex_lock(&mutex);
3909 3910
  while ((row=rows.get()))
  {
3911 3912 3913
    if (table->s->blob_fields)
    {
      memcpy(table->record[0],row->record,table->s->reclength);
3914
      set_delayed_insert_blobs(table);
3915 3916
      free_delayed_insert_blobs(table);
    }
3917 3918 3919
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
3920 3921 3922
#ifndef DBUG_OFF
    max_rows++;
#endif
3923
  }
3924
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
unknown's avatar
unknown committed
3925 3926 3927
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  DBUG_RETURN(1);
}
3928
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
3929 3930

/***************************************************************************
unknown's avatar
unknown committed
3931
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
3932 3933
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
3934 3935 3936 3937 3938 3939 3940 3941 3942

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
3943 3944 3945
  0   OK
  > 0 Error
  < 0 Ok, ignore insert
unknown's avatar
VIEW  
unknown committed
3946 3947
*/

3948
int mysql_insert_select_prepare(THD *thd, select_result *sel_res)
unknown's avatar
VIEW  
unknown committed
3949
{
3950
  int res;
unknown's avatar
VIEW  
unknown committed
3951
  LEX *lex= thd->lex;
3952
  SELECT_LEX *select_lex= lex->first_select_lex();
unknown's avatar
VIEW  
unknown committed
3953
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
3954

unknown's avatar
unknown committed
3955 3956
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
3957
    clause if table is VIEW
unknown's avatar
unknown committed
3958
  */
Sergei Golubchik's avatar
Sergei Golubchik committed
3959

3960 3961
  if ((res= mysql_prepare_insert(thd, lex->query_tables, lex->field_list, 0,
                                 lex->update_list, lex->value_list,
3962
                                 lex->duplicates, lex->ignore,
3963 3964
                                 &select_lex->where, TRUE)))
    DBUG_RETURN(res);
unknown's avatar
unknown committed
3965

3966 3967 3968 3969 3970 3971 3972
  /*
    If sel_res is not empty, it means we have items in returing_list.
    So we prepare the list now
  */
  if (sel_res)
    sel_res->prepare(lex->returning()->item_list, NULL);

3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988
  List_iterator<TABLE_LIST> ti(select_lex->leaf_tables);
  TABLE_LIST *table;
  uint insert_tables;

  if (select_lex->first_cond_optimization)
  {
    /* Back up leaf_tables list. */
    Query_arena *arena= thd->stmt_arena, backup;
    arena= thd->activate_stmt_arena_if_needed(&backup);  // For easier test

    insert_tables= select_lex->insert_tables;
    while ((table= ti++) && insert_tables--)
    {
      select_lex->leaf_tables_exec.push_back(table);
      table->tablenr_exec= table->table->tablenr;
      table->map_exec= table->table->map;
Igor Babaev's avatar
Igor Babaev committed
3989
      table->maybe_null_exec= table->table->maybe_null;
3990 3991 3992 3993 3994
    }
    if (arena)
      thd->restore_active_arena(arena, &backup);
  }
  ti.rewind();
3995 3996 3997 3998 3999
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
  /* skip all leaf tables belonged to view where we are insert */
4000 4001 4002 4003
  insert_tables= select_lex->insert_tables;
  while ((table= ti++) && insert_tables--)
    ti.remove();

4004
  DBUG_RETURN(0);
unknown's avatar
VIEW  
unknown committed
4005 4006 4007
}


4008 4009
select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
                             TABLE *table_par,
unknown's avatar
unknown committed
4010
                             List<Item> *fields_par,
unknown's avatar
unknown committed
4011 4012
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
4013
                             enum_duplicates duplic,
4014 4015
                             bool ignore_check_option_errors,
                             select_result *result):
4016
  select_result_interceptor(thd_arg),
4017
  sel_result(result),
4018 4019
  table_list(table_list_par), table(table_par), fields(fields_par),
  autoinc_value_of_last_inserted_row(0),
4020
  insert_into_view(table_list_par && table_list_par->view != 0)
4021 4022
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
4023 4024 4025 4026
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
4027 4028
  info.view= (table_list_par->view ? table_list_par : 0);
  info.table_list= table_list_par;
4029 4030 4031
}


unknown's avatar
unknown committed
4032
int
4033
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
4034
{
4035
  LEX *lex= thd->lex;
4036
  int res= 0;
4037
  table_map map= 0;
4038
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
4039 4040
  DBUG_ENTER("select_insert::prepare");

4041
  unit= u;
4042

4043 4044 4045 4046 4047
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
4048
  lex->current_select= lex->first_select_lex();
4049

Monty's avatar
Monty committed
4050 4051 4052 4053 4054
  res= (setup_returning_fields(thd, table_list) ||
        setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, 0,
                     0) ||
        check_insert_fields(thd, table_list, *fields, values,
                            !insert_into_view, 1, &map));
4055

4056 4057 4058 4059 4060 4061 4062
  if (!res)
  {
     /*
        Check that all colN=exprN pairs are compatible for assignment, e.g.:
          INSERT INTO t1 (col1, col2) VALUES (expr1, expr2);
          INSERT INTO t1 SET col1=expr1, col2=expr2;
     */
4063 4064
     res= table_list->table->check_assignability_opt_fields(*fields, values,
                                                            lex->ignore);
4065 4066
  }

unknown's avatar
unknown committed
4067 4068
  if (!res && fields->elements)
  {
Monty's avatar
Monty committed
4069 4070 4071 4072
    Abort_on_warning_instant_set aws(thd,
                                     !info.ignore && thd->is_strict_mode());
    res= check_that_all_fields_are_given_values(thd, table_list->table,
                                                table_list);
unknown's avatar
unknown committed
4073 4074 4075
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
4076
  {
4077
    Name_resolution_context *context= &lex->first_select_lex()->context;
4078 4079 4080 4081
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
4082 4083

    /* Perform name resolution only in the first table - 'table_list'. */
4084
    table_list->next_local= 0;
4085 4086
    context->resolve_in_table_list_only(table_list);

4087
    lex->first_select_lex()->no_wrap_view_item= TRUE;
4088 4089 4090 4091 4092 4093 4094 4095 4096 4097
    res= res ||
      check_update_fields(thd, context->table_list,
                          *info.update_fields, *info.update_values,
                          /*
                            In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
                            'x' can legally refer to a non-inserted table.
                            'x' is not even resolved yet.
                           */
                          true,
                          &map);
4098
    lex->first_select_lex()->no_wrap_view_item= FALSE;
4099
    /*
4100 4101 4102 4103
      When we are not using GROUP BY and there are no ungrouped
      aggregate functions we can refer to other tables in the ON
      DUPLICATE KEY part.  We use next_name_resolution_table
      descructively, so check it first (views?)
4104
    */
4105
    DBUG_ASSERT (!table_list->next_name_resolution_table);
4106 4107
    if (lex->first_select_lex()->group_list.elements == 0 &&
        !lex->first_select_lex()->with_sum_func)
4108
    {
4109
      /*
4110 4111 4112 4113
        We must make a single context out of the two separate name
        resolution contexts : the INSERT table and the tables in the
        SELECT part of INSERT ... SELECT.  To do that we must
        concatenate the two lists
4114
      */  
unknown's avatar
unknown committed
4115 4116
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
4117
    }
4118

4119
    res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
4120 4121 4122 4123 4124 4125 4126
                             MARK_COLUMNS_READ, 0, NULL, 0) ||
                /*
                  Check that all col=expr pairs are compatible for assignment in
                    INSERT INTO t1 SELECT ... FROM t2
                      ON DUPLICATE KEY UPDATE col=expr [, col=expr]
                */
                TABLE::check_assignability_explicit_fields(*info.update_fields,
4127 4128
                                                           *info.update_values,
                                                           lex->ignore);
4129
    if (!res)
4130
    {
4131 4132 4133 4134 4135 4136 4137 4138
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
4139

4140 4141
      while ((item= li++))
      {
4142
        item->transform(thd, &Item::update_value_transformer,
4143
                        (uchar*)lex->current_select);
4144
      }
4145 4146 4147
    }

    /* Restore the current context. */
4148
    ctx_state.restore_state(context, table_list);
4149
  }
4150

4151 4152
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
4153
    DBUG_RETURN(1);
4154 4155 4156 4157 4158 4159 4160 4161 4162 4163
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
4164
  if (unique_table(thd, table_list, table_list->next_global, 0))
4165 4166
  {
    /* Using same table for INSERT and SELECT */
4167 4168
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
4169
  }
4170
  else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
4171 4172
           thd->locked_tables_mode <= LTM_LOCK_TABLES &&
           !table->s->long_unique_table)
4173 4174 4175 4176 4177 4178 4179
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
4180 4181
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
4182
    */
4183
    table->file->ha_start_bulk_insert((ha_rows) 0);
4184
  }
4185
  restore_record(table,s->default_values);		// Get empty record
4186
  table->reset_default_fields();
unknown's avatar
unknown committed
4187
  table->next_number_field=table->found_next_number_field;
4188 4189

#ifdef HAVE_REPLICATION
4190
  if (thd->rgi_slave &&
4191 4192
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
4193
      rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
4194 4195 4196
    DBUG_RETURN(1);
#endif

unknown's avatar
unknown committed
4197
  thd->cuted_fields=0;
4198
  bool create_lookup_handler= false;
unknown's avatar
unknown committed
4199
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
4200
  {
4201
    create_lookup_handler= true;
unknown's avatar
unknown committed
4202
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4203 4204 4205 4206 4207
    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
    {
      if (table->file->ha_rnd_init_with_error(0))
        DBUG_RETURN(1);
    }
4208
  }
4209
  table->file->prepare_for_modify(true, create_lookup_handler);
4210 4211 4212
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
4213 4214
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4215
  thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
unknown's avatar
unknown committed
4216
  res= (table_list->prepare_where(thd, 0, TRUE) ||
4217
        table_list->prepare_check_option(thd));
4218 4219

  if (!res)
4220 4221 4222 4223
  {
     table->prepare_triggers_for_insert_stmt_or_event();
     table->mark_columns_needed_for_insert();
  }
4224

4225
  DBUG_RETURN(res);
unknown's avatar
unknown committed
4226 4227
}

4228

4229 4230 4231 4232 4233 4234 4235 4236
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
4237 4238 4239
    If the result table is the same as one of the source tables
    (INSERT SELECT), the result table is not finally prepared at the
    join prepair phase.  Do the final preparation now.
Sergei Golubchik's avatar
Sergei Golubchik committed
4240

4241 4242 4243 4244
  RETURN
    0   OK
*/

4245
int select_insert::prepare2(JOIN *)
4246 4247
{
  DBUG_ENTER("select_insert::prepare2");
4248 4249 4250 4251
  if (table->validate_default_values_of_unset_fields(thd))
    DBUG_RETURN(1);
  if (thd->lex->describe)
    DBUG_RETURN(0);
4252
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
4253
      thd->locked_tables_mode <= LTM_LOCK_TABLES &&
Marko Mäkelä's avatar
Marko Mäkelä committed
4254
      !table->s->long_unique_table)
4255
    table->file->ha_start_bulk_insert((ha_rows) 0);
4256 4257 4258 4259 4260

  /* Same as the other variants of INSERT */
  if (sel_result &&
      sel_result->send_result_set_metadata(thd->lex->returning()->item_list,
                                Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4261
    DBUG_RETURN(1);
unknown's avatar
unknown committed
4262
  DBUG_RETURN(0);
4263 4264 4265
}


4266 4267 4268 4269 4270 4271
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
4272 4273
select_insert::~select_insert()
{
4274
  DBUG_ENTER("~select_insert");
4275
  sel_result= NULL;
4276
  if (table && table->is_created())
unknown's avatar
unknown committed
4277 4278
  {
    table->next_number_field=0;
4279
    table->auto_increment_field_not_null= FALSE;
4280
    table->file->ha_reset();
unknown's avatar
unknown committed
4281
  }
4282
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
4283
  thd->abort_on_warning= 0;
4284
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
4285 4286 4287
}


4288
int select_insert::send_data(List<Item> &values)
unknown's avatar
unknown committed
4289
{
4290
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
4291
  bool error=0;
4292

unknown's avatar
unknown committed
4293
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
4294
  if (store_values(values))
4295
    DBUG_RETURN(1);
4296
  thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
4297
  if (unlikely(thd->is_error()))
4298 4299
  {
    table->auto_increment_field_not_null= FALSE;
unknown's avatar
unknown committed
4300
    DBUG_RETURN(1);
4301
  }
4302

unknown's avatar
unknown committed
4303 4304
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
4305
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
4306 4307 4308 4309 4310
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
4311
  }
4312

4313
  error= write_record(thd, table, &info, sel_result);
4314
  table->auto_increment_field_not_null= FALSE;
Sergei Golubchik's avatar
Sergei Golubchik committed
4315

4316
  if (likely(!error))
unknown's avatar
unknown committed
4317
  {
unknown's avatar
unknown committed
4318
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
4319 4320
    {
      /*
unknown's avatar
unknown committed
4321 4322 4323
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
4324 4325 4326 4327 4328 4329 4330 4331
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
4332 4333 4334 4335 4336 4337 4338
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
4339 4340 4341 4342 4343 4344
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
4345
  }
unknown's avatar
unknown committed
4346
  DBUG_RETURN(error);
unknown's avatar
unknown committed
4347 4348 4349
}


4350
bool select_insert::store_values(List<Item> &values)
unknown's avatar
unknown committed
4351
{
4352
  DBUG_ENTER("select_insert::store_values");
4353
  bool error;
4354

4355
  table->reset_default_fields();
unknown's avatar
unknown committed
4356
  if (fields->elements)
4357
    error= fill_record_n_invoke_before_triggers(thd, table, *fields, values,
4358
                                                true, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
4359
  else
4360
    error= fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
4361
                                                values, true, TRG_EVENT_INSERT);
4362

4363
  DBUG_RETURN(error);
unknown's avatar
unknown committed
4364 4365
}

4366
bool select_insert::prepare_eof()
unknown's avatar
unknown committed
4367
{
4368
  int error;
4369
  bool const trans_table= table->file->has_transactions_and_rollback();
unknown's avatar
unknown committed
4370
  bool changed;
4371 4372
  bool binary_logged= 0;
  killed_state killed_status= thd->killed;
4373 4374

  DBUG_ENTER("select_insert::prepare_eof");
4375
  DBUG_PRINT("enter", ("trans_table: %d, table_type: '%s'",
4376
                       trans_table, table->file->table_type()));
unknown's avatar
unknown committed
4377

Brave Galera Crew's avatar
Brave Galera Crew committed
4378 4379 4380 4381 4382 4383 4384
#ifdef WITH_WSREP
  error= (thd->wsrep_cs().current_error()) ? -1 :
    (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
#else
    error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
#endif /* WITH_WSREP */
    table->file->ha_end_bulk_insert() : 0;
4385

4386
  if (likely(!error) && unlikely(thd->is_error()))
4387
    error= thd->get_stmt_da()->sql_errno();
4388

4389
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
4390 4391
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
        table->file->ha_rnd_end();
4392 4393 4394 4395 4396 4397 4398 4399 4400 4401
  if (error <= 0)
  {
    error= table->file->extra(HA_EXTRA_END_ALTER_COPY);
    if (error == HA_ERR_FOUND_DUPP_KEY)
    {
      uint key_nr= table->file->get_dup_key(error);
      if ((int)key_nr >= 0 && key_nr < table->s->keys)
        print_keydup_error(table, &table->key_info[key_nr], MYF(0));
    }
  }
unknown's avatar
unknown committed
4402
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4403
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4404

4405
  if (likely((changed= (info.copied || info.deleted || info.updated))))
4406 4407 4408 4409 4410
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
4411
    query_cache_invalidate3(thd, table, 1);
unknown's avatar
unknown committed
4412
  }
4413

4414 4415 4416 4417
  if (thd->transaction->stmt.modified_non_trans_table)
    thd->transaction->all.modified_non_trans_table= TRUE;
  thd->transaction->all.m_unsafe_rollback_flags|=
    (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
4418

unknown's avatar
unknown committed
4419
  DBUG_ASSERT(trans_table || !changed || 
4420
              thd->transaction->stmt.modified_non_trans_table);
unknown's avatar
unknown committed
4421

4422 4423 4424 4425 4426 4427
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
4428 4429 4430
  if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
      (likely(!error) || thd->transaction->stmt.modified_non_trans_table ||
       thd->log_current_statement()))
4431
  {
4432
    int errcode= 0;
4433
    int res;
4434
    if (likely(!error))
unknown's avatar
unknown committed
4435
      thd->clear_error();
4436
    else
4437
      errcode= query_error_code(thd, killed_status == NOT_KILLED);
4438 4439
    StatementBinlog stmt_binlog(thd, !can_rollback_data() &&
                                thd->binlog_need_stmt_format(trans_table));
4440 4441 4442 4443
    res= thd->binlog_query(THD::ROW_QUERY_TYPE,
                           thd->query(), thd->query_length(),
                           trans_table, FALSE, FALSE, errcode);
    if (res > 0)
4444
    {
4445
      table->file->ha_release_auto_increment();
4446
      DBUG_RETURN(true);
4447
    }
4448
    binary_logged= res == 0 || !table->s->tmp_table;
4449
  }
4450 4451 4452 4453
  table->s->table_creation_was_logged|= binary_logged;
  table->file->ha_release_auto_increment();

  if (unlikely(error))
unknown's avatar
unknown committed
4454
  {
4455 4456
    table->file->print_error(error,MYF(0));
    DBUG_RETURN(true);
unknown's avatar
unknown committed
4457
  }
4458

4459 4460 4461 4462 4463
  DBUG_RETURN(false);
}

bool select_insert::send_ok_packet() {
  char  message[160];                           /* status message */
4464 4465
  ulonglong row_count;                          /* rows affected */
  ulonglong id;                                 /* last insert-id */
4466
  DBUG_ENTER("select_insert::send_ok_packet");
4467

4468
  if (info.ignore)
4469 4470 4471
    my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
                (ulong) info.records, (ulong) (info.records - info.copied),
                (long) thd->get_stmt_da()->current_statement_warn_count());
unknown's avatar
unknown committed
4472
  else
4473 4474 4475 4476
    my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
                (ulong) info.records, (ulong) (info.deleted + info.updated),
                (long) thd->get_stmt_da()->current_statement_warn_count());

4477
  row_count= info.copied + info.deleted +
4478 4479 4480
    ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
     info.touched : info.updated);

4481 4482 4483 4484 4485
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
4486

4487 4488 4489 4490 4491 4492 4493 4494
  /*
    Client expects an EOF/OK packet If LEX::has_returning and if result set
    meta was sent. See explanation for other variants of INSERT.
  */
  if (sel_result)
    sel_result->send_eof();
  else
    ::my_ok(thd, row_count, id, message);
4495 4496 4497 4498 4499 4500 4501 4502

  DBUG_RETURN(false);
}

bool select_insert::send_eof()
{
  bool res;
  DBUG_ENTER("select_insert::send_eof");
4503
  res= (prepare_eof() || (!suppress_my_ok && send_ok_packet()));
4504
  DBUG_RETURN(res);
unknown's avatar
unknown committed
4505 4506
}

4507 4508 4509
void select_insert::abort_result_set()
{
  bool binary_logged= 0;
4510
  DBUG_ENTER("select_insert::abort_result_set");
4511 4512 4513 4514 4515
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
    and the end of the function.
4516 4517 4518 4519

    If it fail due to inability to insert in multi-table view for example,
    table will be assigned with view table structure, but that table will
    not be opened really (it is dummy to check fields types & Co).
4520
   */
4521
  if (table && table->file->is_open())
4522
  {
4523
    bool changed, transactional_table;
4524 4525 4526 4527
    /*
      If we are not in prelocked mode, we end the bulk insert started
      before.
    */
Konstantin Osipov's avatar
Konstantin Osipov committed
4528
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4529 4530
      table->file->ha_end_bulk_insert();

4531 4532 4533 4534 4535
    if (table->file->inited)
      table->file->ha_rnd_end();
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);

4536 4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549
    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
4550
    changed= (info.copied || info.deleted || info.updated);
4551
    transactional_table= table->file->has_transactions_and_rollback();
4552
    if (thd->transaction->stmt.modified_non_trans_table ||
4553
        thd->log_current_statement())
4554
    {
4555
        if (!can_rollback_data())
4556
          thd->transaction->all.modified_non_trans_table= TRUE;
4557

4558
        if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4559
        {
4560 4561
          StatementBinlog stmt_binlog(thd, !can_rollback_data() &&
                                      thd->binlog_need_stmt_format(transactional_table));
4562
          int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4563
          int res;
4564
          /* error of writing binary log is ignored */
4565 4566 4567
          res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
                                 thd->query_length(),
                                 transactional_table, FALSE, FALSE, errcode);
4568
          binary_logged= res == 0 || !table->s->tmp_table;
4569
        }
4570 4571
	if (changed)
	  query_cache_invalidate3(thd, table, 1);
4572
    }
4573
    DBUG_ASSERT(transactional_table || !changed ||
4574
		thd->transaction->stmt.modified_non_trans_table);
4575 4576

    table->s->table_creation_was_logged|= binary_logged;
4577 4578 4579 4580 4581 4582
    table->file->ha_release_auto_increment();
  }

  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
4583 4584

/***************************************************************************
unknown's avatar
unknown committed
4585
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
4586 4587
***************************************************************************/

4588
Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table)
4589
{
4590 4591
  static Tmp_field_param param(false, false, false, false);
  Tmp_field_src src;
4592
  return create_tmp_field_ex(root, table, &src, &param);
4593 4594 4595
}


4596
/**
unknown's avatar
unknown committed
4597 4598
  Create table from lists of fields and items (or just return TABLE
  object for pre-opened existing table).
4599

4600 4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627
  @param thd           [in]     Thread object
  @param create_info   [in]     Create information (like MAX_ROWS, ENGINE or
                                temporary table flag)
  @param create_table  [in]     Pointer to TABLE_LIST object providing database
                                and name for table to be created or to be open
  @param alter_info    [in/out] Initial list of columns and indexes for the
                                table to be created
  @param items         [in]     List of items which should be used to produce
                                rest of fields for the table (corresponding
                                fields will be added to the end of
                                alter_info->create_list)
  @param lock          [out]    Pointer to the MYSQL_LOCK object for table
                                created will be returned in this parameter.
                                Since this table is not included in THD::lock
                                caller is responsible for explicitly unlocking
                                this table.
  @param hooks         [in]     Hooks to be invoked before and after obtaining
                                table lock on the table being created.

  @note
    This function assumes that either table exists and was pre-opened and
    locked at open_and_lock_tables() stage (and in this case we just emit
    error or warning and return pre-opened TABLE object) or an exclusive
    metadata lock was acquired on table so we can safely create, open and
    lock table in it (we don't acquire metadata lock if this create is
    for temporary table).

  @note
unknown's avatar
unknown committed
4628 4629
    Since this function contains some logic specific to CREATE TABLE ...
    SELECT it should be changed before it can be used in other contexts.
4630

4631 4632
  @retval non-zero  Pointer to TABLE object for table created or opened
  @retval 0         Error
4633 4634
*/

Sergei Golubchik's avatar
Sergei Golubchik committed
4635
TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
4636
                                              MYSQL_LOCK **lock)
4637
{
unknown's avatar
unknown committed
4638
  TABLE tmp_table;		// Used during 'Create_field()'
unknown's avatar
unknown committed
4639
  TABLE_SHARE share;
4640
  TABLE *table= 0;
4641
  uint select_field_count= items->elements;
4642 4643 4644
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
4645
  bool save_table_creation_was_logged;
4646
  DBUG_ENTER("select_create::create_table_from_items");
4647

unknown's avatar
unknown committed
4648
  tmp_table.s= &share;
4649
  init_tmp_table_share(thd, &share, "", 0, "", "", true);
unknown's avatar
unknown committed
4650

4651
  tmp_table.s->db_create_options=0;
4652 4653
  tmp_table.null_row= 0;
  tmp_table.maybe_null= 0;
4654
  tmp_table.in_use= thd;
4655

4656
  if (!(thd->variables.option_bits & OPTION_EXPLICIT_DEF_TIMESTAMP))
4657
    promote_first_timestamp_column(&alter_info->create_list);
4658

4659
  if (create_info->fix_create_fields(thd, alter_info, *table_list))
4660 4661
    DBUG_RETURN(NULL);

4662 4663
  while ((item=it++))
  {
4664 4665
    Field *tmp_field= item->create_field_for_create_select(thd->mem_root,
                                                           &tmp_table);
4666

4667
    if (!tmp_field)
4668 4669 4670 4671 4672 4673 4674 4675 4676 4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687
      DBUG_RETURN(NULL);

    Field *table_field;

    switch (item->type())
    {
    /*
      We have to take into account both the real table's fields and
      pseudo-fields used in trigger's body. These fields are used
      to copy defaults values later inside constructor of
      the class Create_field.
    */
    case Item::FIELD_ITEM:
    case Item::TRIGGER_FIELD_ITEM:
      table_field= ((Item_field *) item)->field;
      break;
    default:
      table_field= NULL;
    }

4688 4689
    Create_field *cr_field= new (thd->mem_root)
                                  Create_field(thd, tmp_field, table_field);
4690 4691 4692 4693

    if (!cr_field)
      DBUG_RETURN(NULL);

4694
    if (item->maybe_null())
4695
      cr_field->flags &= ~NOT_NULL_FLAG;
4696
    alter_info->create_list.push_back(cr_field, thd->mem_root);
4697
  }
unknown's avatar
unknown committed
4698

4699 4700 4701 4702 4703 4704 4705 4706 4707 4708
  /*
    Item*::type_handler() always returns pointers to
    type_handler_{time2|datetime2|timestamp2} no matter what
    the current mysql56_temporal_format says.
    Let's convert them according to mysql56_temporal_format.
    QQ: This perhaps should eventually be fixed to have Item*::type_handler()
    respect mysql56_temporal_format, and remove the upgrade from here.
  */
  Create_field::upgrade_data_types(alter_info->create_list);

Marko Mäkelä's avatar
Marko Mäkelä committed
4709
  if (create_info->check_fields(thd, alter_info,
4710
                                table_list->table_name,
4711 4712
                                table_list->db,
                                select_field_count))
4713 4714
    DBUG_RETURN(NULL);

4715
  DEBUG_SYNC(thd,"create_table_select_before_create");
unknown's avatar
unknown committed
4716

4717
  /* Check if LOCK TABLES + CREATE OR REPLACE of existing normal table*/
4718
  if (thd->locked_tables_mode && table_list->table &&
4719 4720 4721 4722
      !create_info->tmp_table())
  {
    /* Remember information about the locked table */
    create_info->pos_in_locked_tables=
4723 4724
      table_list->table->pos_in_locked_tables;
    create_info->mdl_ticket= table_list->table->mdl_ticket;
4725 4726
  }

4727
  /*
unknown's avatar
unknown committed
4728 4729 4730 4731 4732
    Create and lock table.

    Note that we either creating (or opening existing) temporary table or
    creating base table on which name we have exclusive lock. So code below
    should not cause deadlocks or races.
4733 4734 4735 4736 4737 4738 4739 4740 4741 4742

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
  */
4743

4744
  if (!mysql_create_table_no_lock(thd, &ddl_log_state_create, &ddl_log_state_rm,
4745
                                  create_info, alter_info, NULL,
4746
                                  select_field_count, table_list))
4747
  {
4748 4749 4750 4751 4752 4753
    DEBUG_SYNC(thd,"create_table_select_before_open");

    /*
      If we had a temporary table or a table used with LOCK TABLES,
      it was closed by mysql_create()
    */
4754
    table_list->table= 0;
4755

4756
    if (!create_info->tmp_table())
4757
    {
4758 4759
      Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
      TABLE_LIST::enum_open_strategy save_open_strategy;
unknown's avatar
unknown committed
4760

4761
      /* Force the newly created table to be opened */
4762 4763
      save_open_strategy= table_list->open_strategy;
      table_list->open_strategy= TABLE_LIST::OPEN_NORMAL;
4764 4765 4766 4767
      /*
        Here we open the destination table, on which we already have
        an exclusive metadata lock.
      */
4768
      if (open_table(thd, table_list, &ot_ctx))
unknown's avatar
unknown committed
4769
      {
4770 4771
        quick_rm_table(thd, create_info->db_type, &table_list->db,
                       table_case_name(create_info, &table_list->table_name),
4772
                       0);
unknown's avatar
unknown committed
4773
      }
4774
      /* Restore */
4775
      table_list->open_strategy= save_open_strategy;
4776
    }
4777
    else
Sergei Golubchik's avatar
Sergei Golubchik committed
4778
    {
4779 4780 4781 4782
      /*
        The pointer to the newly created temporary table has been stored in
        table->create_info.
      */
4783 4784
      table_list->table= create_info->table;
      if (!table_list->table)
4785 4786 4787 4788 4789 4790 4791 4792
      {
        /*
          This shouldn't happen as creation of temporary table should make
          it preparable for open. Anyway we can't drop temporary table if
          we are unable to find it.
        */
        DBUG_ASSERT(0);
      }
Marko Mäkelä's avatar
Marko Mäkelä committed
4793
      table_list->table->pos_in_table_list= table_list;
Sergei Golubchik's avatar
Sergei Golubchik committed
4794
    }
4795
  }
4796
  else
4797
    table_list->table= 0;                     // Create failed
4798
  
4799
  if (unlikely(!(table= table_list->table)))
4800
  {
4801 4802 4803 4804
    if (likely(!thd->is_error()))             // CREATE ... IF NOT EXISTS
      my_ok(thd);                             //   succeed, but did nothing
    ddl_log_complete(&ddl_log_state_rm);
    ddl_log_complete(&ddl_log_state_create);
4805
    DBUG_RETURN(NULL);
4806
  }
4807

4808
  DEBUG_SYNC(thd,"create_table_select_before_lock");
unknown's avatar
unknown committed
4809

4810
  table->reginfo.lock_type=TL_WRITE;
4811 4812 4813 4814 4815 4816 4817 4818

  /*
    Ensure that decide_logging_format(), called by mysql_lock_tables(), works
    with temporary tables that will be logged later if needed.
  */
  save_table_creation_was_logged= table->s->table_creation_was_logged;
  table->s->table_creation_was_logged= 1;

Konstantin Osipov's avatar
Konstantin Osipov committed
4819 4820 4821
  /*
    mysql_lock_tables() below should never fail with request to reopen table
    since it won't wait for the table lock (we have exclusive metadata lock on
4822
    the table) and thus can't get aborted.
Konstantin Osipov's avatar
Konstantin Osipov committed
4823
  */
4824 4825
  if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
               postlock(thd, &table)))
4826
  {
4827
    /* purecov: begin tested */
4828 4829
    /*
      This can happen in innodb when you get a deadlock when using same table
4830
      in insert and select or when you run out of memory.
4831 4832
      It can also happen if there was a conflict in
      THD::decide_logging_format()
4833
    */
4834 4835
    if (!thd->is_error())
      my_error(ER_CANT_LOCK, MYF(0), my_errno);
4836 4837 4838 4839 4840
    if (*lock)
    {
      mysql_unlock_tables(thd, *lock);
      *lock= 0;
    }
4841
    drop_open_table(thd, table, &table_list->db, &table_list->table_name);
4842 4843
    ddl_log_complete(&ddl_log_state_rm);
    ddl_log_complete(&ddl_log_state_create);
Monty's avatar
Monty committed
4844
    DBUG_RETURN(NULL);
4845
    /* purecov: end */
4846
  }
4847
  table->s->table_creation_was_logged= save_table_creation_was_logged;
4848
  if (!table->s->tmp_table)
4849 4850 4851 4852 4853 4854 4855 4856 4857 4858 4859
    table->file->prepare_for_row_logging();

  /*
    If slave is converting a statement event to row events, log the original
    create statement as an annotated row
  */
#ifdef HAVE_REPLICATION
  if (thd->slave_thread && opt_replicate_annotate_row_events &&
      thd->is_current_stmt_binlog_format_row())
    thd->variables.binlog_annotate_row_events= 1;
#endif
4860 4861 4862 4863
  DBUG_RETURN(table);
}


4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882
/*
  For row-based replication, the CREATE-SELECT statement is written
  in two pieces: the first one contain the CREATE TABLE statement
  necessary to create the table and the second part contain the rows
  that should go into the table.

  For non-temporary tables, the start of the CREATE-SELECT
  implicitly commits the previous transaction, and all events
  forming the statement will be stored the transaction cache. At end
  of the statement, the entire statement is committed as a
  transaction, and all events are written to the binary log.

  On the master, the table is locked for the duration of the
  statement, but since the CREATE part is replicated as a simple
  statement, there is no way to lock the table for accesses on the
  slave.  Hence, we have to hold on to the CREATE part of the
  statement until the statement has finished.
*/
int select_create::postlock(THD *thd, TABLE **tables)
4883
{
4884
  /*
4885 4886 4887
    NOTE: for row format CREATE TABLE must be logged before row data.
  */
  int error;
4888 4889 4890 4891
  TABLE_LIST *save_next_global= table_list->next_global;
  table_list->next_global= select_tables;
  error= thd->decide_logging_format(table_list);
  table_list->next_global= save_next_global;
Konstantin Osipov's avatar
Konstantin Osipov committed
4892

4893 4894
  if (unlikely(error))
    return error;
Konstantin Osipov's avatar
Konstantin Osipov committed
4895

4896 4897 4898
  TABLE const *const table = *tables;
  if (thd->is_current_stmt_binlog_format_row() &&
      !table->s->tmp_table)
4899
    return binlog_show_create_table_(thd, *tables, create_info);
4900
  return 0;
4901
}
4902

4903

4904 4905 4906 4907 4908 4909
int
select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
{
  List<Item> values(_values, thd->mem_root);
  MYSQL_LOCK *extra_lock= NULL;
  DBUG_ENTER("select_create::prepare");
4910

4911
  unit= u;
4912 4913

  /*
4914 4915 4916
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
4917
  */
4918
  if (!thd->lex->tmp_table() &&
4919
      thd->is_current_stmt_binlog_format_row() &&
4920
      mysql_bin_log.is_open())
4921 4922 4923 4924
  {
    thd->binlog_start_trans_and_stmt();
  }

4925
  if (!(table= create_table_from_items(thd, &values, &extra_lock)))
4926
  {
4927
    if (create_info->or_replace())
4928 4929
    {
      /* Original table was deleted. We have to log it */
4930
      log_drop_table(thd, &table_list->db, &table_list->table_name,
4931 4932 4933
                     &create_info->org_storage_engine_name,
                     create_info->db_type == partition_hton,
                     &create_info->org_tabledef_version,
4934 4935 4936
                     thd->lex->tmp_table());
    }

4937 4938
    /* abort() deletes table */
    DBUG_RETURN(-1);
4939
  }
unknown's avatar
unknown committed
4940

4941 4942 4943 4944 4945 4946 4947 4948 4949
  if (create_info->tmp_table())
  {
    /*
      When the temporary table was created & opened in create_table_impl(),
      the table's TABLE_SHARE (and thus TABLE) object was also linked to THD
      temporary tables lists. So, we must temporarily remove it from the
      list to keep them inaccessible from inner statements.
      e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
    */
4950
    saved_tmp_table_share= thd->save_tmp_table_share(table_list->table);
4951 4952
  }

4953 4954 4955 4956
  if (extra_lock)
  {
    DBUG_ASSERT(m_plock == NULL);

4957
    if (create_info->tmp_table())
4958 4959 4960 4961 4962 4963 4964
      m_plock= &m_lock;
    else
      m_plock= &thd->extra_lock;

    *m_plock= extra_lock;
  }

4965
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
4966
  {
4967
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
4968 4969 4970
    DBUG_RETURN(-1);
  }

4971
  /* First field to copy */
4972
  field= table->field+table->s->fields;
unknown's avatar
unknown committed
4973 4974

  /* Mark all fields that are given values */
4975 4976 4977 4978 4979 4980 4981
  for (uint n= values.elements; n; )
  {
    if ((*--field)->invisible >= INVISIBLE_SYSTEM)
      continue;
    n--;
    bitmap_set_bit(table->write_set, (*field)->field_index);
  }
unknown's avatar
unknown committed
4982 4983 4984

  table->next_number_field=table->found_next_number_field;

4985
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
4986
  thd->cuted_fields=0;
4987
  bool create_lookup_handler= false;
unknown's avatar
unknown committed
4988
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
4989
  {
4990
    create_lookup_handler= true;
unknown's avatar
unknown committed
4991
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4992 4993 4994 4995 4996
    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
    {
      if (table->file->ha_rnd_init_with_error(0))
        DBUG_RETURN(1);
    }
4997
  }
4998
  table->file->prepare_for_modify(true, create_lookup_handler);
4999 5000 5001
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
5002 5003
  if (info.handle_duplicates == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
5004 5005
  if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
      !table->s->long_unique_table)
5006
  {
5007
    table->file->ha_start_bulk_insert((ha_rows) 0);
5008 5009
    if (thd->lex->duplicates == DUP_ERROR && !thd->lex->ignore)
      table->file->extra(HA_EXTRA_BEGIN_ALTER_COPY);
Marko Mäkelä's avatar
Marko Mäkelä committed
5010
    table->file->extra(HA_EXTRA_WRITE_CACHE);
5011
  }
5012
  thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
5013 5014 5015
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
5016 5017
  // Mark table as used
  table->query_id= thd->query_id;
5018
  DBUG_RETURN(0);
unknown's avatar
unknown committed
5019 5020
}

5021

Marko Mäkelä's avatar
Marko Mäkelä committed
5022 5023
static int binlog_show_create_table_(THD *thd, TABLE *table,
                                     Table_specification_st *create_info)
5024 5025 5026
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
5027 5028 5029 5030 5031 5032 5033 5034
    created table by calling show_create_table().  In the event of an error,
    nothing should be written to the binary log, even if the table is
    non-transactional; therefore we pretend that the generated CREATE TABLE
    statement is for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map events and
    binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire kaboodle of
    events, or write them to the binary log.
5035 5036 5037 5038 5039

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
5040
  */
5041
  DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
5042
  StringBuffer<2048> query(system_charset_info);
5043
  int result;
5044
  TABLE_LIST tmp_table_list;
5045

5046
  tmp_table_list.reset();
5047
  tmp_table_list.table = table;
5048

5049 5050
  result= show_create_table(thd, &tmp_table_list, &query,
                            create_info, WITH_DB_NAME);
5051
  DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
5052

5053
  if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
5054
  {
Sergei Golubchik's avatar
Sergei Golubchik committed
5055
    int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
5056 5057 5058
    result= thd->binlog_query(THD::STMT_QUERY_TYPE,
                              query.ptr(), query.length(),
                              /* is_trans */ TRUE,
5059
                              /* direct */ FALSE,
5060
                              /* suppress_use */ FALSE,
5061
                              errcode) > 0;
5062
  }
Brave Galera Crew's avatar
Brave Galera Crew committed
5063 5064 5065 5066 5067 5068 5069 5070 5071 5072
#ifdef WITH_WSREP
  if (thd->wsrep_trx().active())
  {
    WSREP_DEBUG("transaction already started for CTAS");
  }
  else
  {
    wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
  }
#endif
5073
  return result;
5074 5075
}

5076 5077 5078 5079 5080 5081 5082 5083 5084 5085 5086

/**
   Log CREATE TABLE to binary log

   @param thd   Thread handler
   @param table Log create statement for this table

   This function is called from ALTER TABLE for a shared table converted
   to a not shared table.
*/

5087
bool binlog_create_table(THD *thd, TABLE *table, bool replace)
5088
{
5089 5090 5091 5092
  Table_specification_st create_info;
  bool result;
  ulonglong save_option_bits;

5093 5094 5095 5096
  /* Don't log temporary tables in row format */
  if (thd->variables.binlog_format == BINLOG_FORMAT_ROW &&
      table->s->tmp_table)
    return 0;
5097
  if (!thd->binlog_table_should_be_logged(&table->s->db))
5098 5099 5100 5101 5102 5103 5104
    return 0;

  /*
    We have to use ROW format to ensure that future row inserts will be
    logged
  */
  thd->set_current_stmt_binlog_format_row();
5105
  table->file->prepare_for_row_logging();
5106 5107 5108 5109 5110 5111 5112 5113 5114 5115

  create_info.lex_start();
  save_option_bits= thd->variables.option_bits;
  if (replace)
    create_info.set(DDL_options_st::OPT_OR_REPLACE);
  /* Ensure we write ENGINE=xxx and CHARSET=... to binary log */
  create_info.used_fields|= (HA_CREATE_USED_ENGINE |
                             HA_CREATE_USED_DEFAULT_CHARSET);
  /* Ensure we write all engine options to binary log */
  create_info.used_fields|= HA_CREATE_PRINT_ALL_OPTIONS;
Marko Mäkelä's avatar
Marko Mäkelä committed
5116
  result= binlog_show_create_table_(thd, table, &create_info) != 0;
5117 5118
  thd->variables.option_bits= save_option_bits;
  return result;
5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132 5133 5134 5135 5136 5137
}


/**
   Log DROP TABLE to binary log

   @param thd   Thread handler
   @param table Log create statement for this table

   This function is called from ALTER TABLE for a shared table converted
   to a not shared table.
*/

bool binlog_drop_table(THD *thd, TABLE *table)
{
  StringBuffer<2048> query(system_charset_info);
  /* Don't log temporary tables in row format */
  if (!table->s->table_creation_was_logged)
    return 0;
5138
  if (!thd->binlog_table_should_be_logged(&table->s->db))
5139 5140
    return 0;

Monty's avatar
Monty committed
5141
  query.append(STRING_WITH_LEN("DROP "));
5142
  if (table->s->tmp_table)
Monty's avatar
Monty committed
5143 5144
    query.append(STRING_WITH_LEN("TEMPORARY "));
  query.append(STRING_WITH_LEN("TABLE IF EXISTS "));
5145
  append_identifier(thd, &query, &table->s->db);
Monty's avatar
Monty committed
5146
  query.append('.');
5147 5148 5149 5150 5151 5152 5153 5154 5155 5156 5157
  append_identifier(thd, &query, &table->s->table_name);

  return thd->binlog_query(THD::STMT_QUERY_TYPE,
                           query.ptr(), query.length(),
                           /* is_trans */ TRUE,
                           /* direct */ FALSE,
                           /* suppress_use */ TRUE,
                           0) > 0;
}


5158
bool select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
5159
{
5160
  return fill_record_n_invoke_before_triggers(thd, table, field, values,
5161
                                              true, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
5162 5163
}

unknown's avatar
unknown committed
5164

unknown's avatar
unknown committed
5165 5166
bool select_create::send_eof()
{
5167
  DBUG_ENTER("select_create::send_eof");
5168 5169 5170 5171 5172 5173

  /*
    The routine that writes the statement in the binary log
    is in select_insert::prepare_eof(). For that reason, we
    mark the flag at this point.
  */
5174
  if (table->s->tmp_table)
5175
    thd->transaction->stmt.mark_created_temp_table();
5176

5177 5178 5179
  if (thd->slave_thread)
    thd->variables.binlog_annotate_row_events= 0;

5180 5181 5182 5183 5184 5185 5186 5187 5188 5189 5190 5191 5192 5193 5194 5195 5196 5197
  debug_crash_here("ddl_log_create_before_binlog");

  /*
    In case of crash, we have to add DROP TABLE to the binary log as
    the CREATE TABLE will already be logged if we are not using row based
    replication.
  */
  if (!thd->is_current_stmt_binlog_format_row())
  {
    if (ddl_log_state_create.is_active())       // Not temporary table
      ddl_log_update_phase(&ddl_log_state_create, DDL_CREATE_TABLE_PHASE_LOG);
    /*
      We can ignore if we replaced an old table as ddl_log_state_create will
      now handle the logging of the drop if needed.
    */
    ddl_log_complete(&ddl_log_state_rm);
  }

5198
  if (prepare_eof())
5199
  {
5200
    abort_result_set();
5201
    DBUG_RETURN(true);
5202
  }
Monty's avatar
Monty committed
5203
  debug_crash_here("ddl_log_create_after_prepare_eof");
5204

5205
  if (table->s->tmp_table)
5206 5207 5208 5209 5210 5211 5212 5213 5214 5215 5216 5217 5218 5219 5220 5221 5222 5223 5224 5225
  {
    /*
      Now is good time to add the new table to THD temporary tables list.
      But, before that we need to check if same table got created by the sub-
      statement.
    */
    if (thd->find_tmp_table_share(table->s->table_cache_key.str,
                                  table->s->table_cache_key.length))
    {
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->alias.c_ptr());
      abort_result_set();
      DBUG_RETURN(true);
    }
    else
    {
      DBUG_ASSERT(saved_tmp_table_share);
      thd->restore_tmp_table_share(saved_tmp_table_share);
    }
  }

5226 5227 5228 5229 5230
  /*
    Do an implicit commit at end of statement for non-temporary
    tables.  This can fail, but we should unlock the table
    nevertheless.
  */
5231
  if (!table->s->tmp_table)
unknown's avatar
unknown committed
5232
  {
sjaakola's avatar
sjaakola committed
5233
#ifdef WITH_WSREP
5234
    if (WSREP(thd) &&
5235
        table->file->ht->db_type == DB_TYPE_INNODB)
5236
    {
Brave Galera Crew's avatar
Brave Galera Crew committed
5237 5238 5239 5240 5241
      if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
      {
        wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
      }
      DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
5242
      WSREP_DEBUG("CTAS key append for trx: %" PRIu64 " thd %llu query %lld ",
Brave Galera Crew's avatar
Brave Galera Crew committed
5243 5244
                  thd->wsrep_trx_id(), thd->thread_id, thd->query_id);

5245
      /*
5246 5247
        For CTAS, append table level exclusive key for created table
        and table level shared key for selected table.
5248
      */
5249
      int rcode= wsrep_append_table_keys(thd, table_list, table_list,
5250 5251 5252
              WSREP_SERVICE_KEY_EXCLUSIVE);
      rcode= rcode || wsrep_append_table_keys(thd, nullptr, select_tables,
              WSREP_SERVICE_KEY_SHARED);
Brave Galera Crew's avatar
Brave Galera Crew committed
5253 5254
      if (rcode)
      {
5255 5256 5257 5258
        DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
        WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
                    (wsrep_thd_query(thd)) ?
                    wsrep_thd_query(thd) : "void", rcode);
5259
        abort_result_set();
Brave Galera Crew's avatar
Brave Galera Crew committed
5260
        DBUG_RETURN(true);
5261 5262
      }
      /* If commit fails, we should be able to reset the OK status. */
Brave Galera Crew's avatar
Brave Galera Crew committed
5263
      thd->get_stmt_da()->set_overwrite_status(true);
5264
    }
5265
#endif /* WITH_WSREP */
5266 5267 5268 5269
    thd->binlog_xid= thd->query_id;
    /* Remember xid's for the case of row based logging */
    ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
    ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
5270
    trans_commit_stmt(thd);
5271
    if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
Konstantin Osipov's avatar
Konstantin Osipov committed
5272
      trans_commit_implicit(thd);
Monty's avatar
Monty committed
5273 5274
    thd->binlog_xid= 0;

5275
#ifdef WITH_WSREP
Brave Galera Crew's avatar
Brave Galera Crew committed
5276
    if (WSREP(thd))
5277
    {
5278
      thd->get_stmt_da()->set_overwrite_status(FALSE);
5279
      mysql_mutex_lock(&thd->LOCK_thd_data);
Brave Galera Crew's avatar
Brave Galera Crew committed
5280
      if (wsrep_current_error(thd))
5281
      {
Brave Galera Crew's avatar
Brave Galera Crew committed
5282 5283
        WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
                    thd->thread_id,
5284 5285
                    wsrep_thd_transaction_state_str(thd),
                    wsrep_thd_query(thd));
5286
        mysql_mutex_unlock(&thd->LOCK_thd_data);
5287
        abort_result_set();
5288
        DBUG_RETURN(true);
5289
      }
5290
      mysql_mutex_unlock(&thd->LOCK_thd_data);
5291 5292
    }
#endif /* WITH_WSREP */
5293 5294 5295 5296 5297 5298 5299 5300 5301 5302

    /* Log query to ddl log */
    backup_log_info ddl_log;
    bzero(&ddl_log, sizeof(ddl_log));
    ddl_log.query= { C_STRING_WITH_LEN("CREATE") };
    if ((ddl_log.org_partitioned= (create_info->db_type == partition_hton)))
      ddl_log.org_storage_engine_name= create_info->new_storage_engine_name;
    else
      lex_string_set(&ddl_log.org_storage_engine_name,
                     ha_resolve_storage_engine_name(create_info->db_type));
5303 5304
    ddl_log.org_database=   table_list->db;
    ddl_log.org_table=      table_list->table_name;
5305 5306
    ddl_log.org_table_id=   create_info->tabledef_version;
    backup_log_ddl(&ddl_log);
5307
  }
5308 5309 5310 5311 5312 5313 5314 5315 5316
  /*
    If are using statement based replication the table will be deleted here
    in case of a crash as we can't use xid to check if the query was logged
    (as the query was logged before commit!)
  */
  debug_crash_here("ddl_log_create_after_binlog");
  ddl_log_complete(&ddl_log_state_rm);
  ddl_log_complete(&ddl_log_state_create);
  debug_crash_here("ddl_log_create_log_complete");
5317

5318 5319 5320 5321 5322 5323
  /*
    exit_done must only be set after last potential call to
    abort_result_set().
  */
  exit_done= 1;                                 // Avoid double calls

5324 5325
  send_ok_packet();

5326 5327 5328 5329 5330 5331 5332
  if (m_plock)
  {
    MYSQL_LOCK *lock= *m_plock;
    *m_plock= NULL;
    m_plock= NULL;

    if (create_info->pos_in_locked_tables)
5333
    {
5334 5335 5336 5337 5338 5339 5340 5341 5342 5343 5344 5345
      /*
        If we are under lock tables, we have created a table that was
        originally locked. We should add back the lock to ensure that
        all tables in the thd->open_list are locked!
      */
      table->mdl_ticket= create_info->mdl_ticket;

      /* The following should never fail, except if out of memory */
      if (!thd->locked_tables_list.restore_lock(thd,
                                                create_info->
                                                pos_in_locked_tables,
                                                table, lock))
5346
        DBUG_RETURN(false);                     // ok
5347
      /* Fail. Continue without locking the table */
5348
    }
5349
    mysql_unlock_tables(thd, lock);
unknown's avatar
unknown committed
5350
  }
5351
  DBUG_RETURN(false);
unknown's avatar
unknown committed
5352 5353
}

unknown's avatar
unknown committed
5354

5355
void select_create::abort_result_set()
unknown's avatar
unknown committed
5356
{
5357
  ulonglong save_option_bits;
5358
  DBUG_ENTER("select_create::abort_result_set");
5359

5360 5361 5362 5363 5364
  /* Avoid double calls, could happen in case of out of memory on cleanup */
  if (exit_done)
    DBUG_VOID_RETURN;
  exit_done= 1;

5365
  /*
5366
    In select_insert::abort_result_set() we roll back the statement, including
5367 5368 5369
    truncating the transaction cache of the binary log. To do this, we
    pretend that the statement is transactional, even though it might
    be the case that it was not.
5370 5371 5372 5373 5374 5375 5376 5377 5378

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
5379
    
5380
    However if there was an original table that was deleted, as part of
5381
    create or replace table, then we must log the statement.
5382
  */
5383 5384

  save_option_bits= thd->variables.option_bits;
5385
  thd->variables.option_bits&= ~OPTION_BIN_LOG;
5386
  select_insert::abort_result_set();
5387
  thd->transaction->stmt.modified_non_trans_table= FALSE;
5388 5389
  thd->variables.option_bits= save_option_bits;

unknown's avatar
unknown committed
5390 5391
  if (table)
  {
5392 5393 5394
    bool tmp_table= table->s->tmp_table;
    bool table_creation_was_logged= (!tmp_table ||
                                     table->s->table_creation_was_logged);
5395 5396 5397 5398

    /* CREATE SELECT failed. Remove all row events and clear caches */
    thd->binlog_remove_rows_events();

5399 5400 5401 5402 5403 5404
    if (tmp_table)
    {
      DBUG_ASSERT(saved_tmp_table_share);
      thd->restore_tmp_table_share(saved_tmp_table_share);
    }

5405 5406 5407 5408
    if (table->file->inited &&
        (info.ignore || info.handle_duplicates != DUP_ERROR) &&
        (table->file->ha_table_flags() & HA_DUPLICATE_POS))
      table->file->ha_rnd_end();
unknown's avatar
unknown committed
5409
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5410
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5411
    table->auto_increment_field_not_null= FALSE;
5412 5413 5414 5415 5416 5417 5418 5419

    if (m_plock)
    {
      mysql_unlock_tables(thd, *m_plock);
      *m_plock= NULL;
      m_plock= NULL;
    }

5420
    drop_open_table(thd, table, &table_list->db, &table_list->table_name);
unknown's avatar
unknown committed
5421
    table=0;                                    // Safety
5422
    if (thd->log_current_statement())
5423
    {
5424 5425 5426 5427
      if (mysql_bin_log.is_open())
      {
        /* Remove logging of drop, create + insert rows */
        binlog_reset_cache(thd);
5428 5429 5430 5431 5432 5433 5434 5435 5436 5437 5438 5439 5440 5441 5442
        /* Original table was deleted. We have to log it */
        if (table_creation_was_logged)
        {
          thd->binlog_xid= thd->query_id;
          ddl_log_update_xid(&ddl_log_state_create, thd->binlog_xid);
          ddl_log_update_xid(&ddl_log_state_rm, thd->binlog_xid);
          debug_crash_here("ddl_log_create_before_binlog");
          log_drop_table(thd, &table_list->db, &table_list->table_name,
                         &create_info->org_storage_engine_name,
                         create_info->db_type == partition_hton,
                         &create_info->tabledef_version,
                         tmp_table);
          debug_crash_here("ddl_log_create_after_binlog");
          thd->binlog_xid= 0;
        }
5443
      }
5444
      else if (!tmp_table)
Monty's avatar
Monty committed
5445
      {
5446 5447 5448 5449 5450
        backup_log_info ddl_log;
        bzero(&ddl_log, sizeof(ddl_log));
        ddl_log.query= { C_STRING_WITH_LEN("DROP_AFTER_CREATE") };
        ddl_log.org_partitioned= (create_info->db_type == partition_hton);
        ddl_log.org_storage_engine_name= create_info->org_storage_engine_name;
5451 5452
        ddl_log.org_database=     table_list->db;
        ddl_log.org_table=        table_list->table_name;
5453 5454
        ddl_log.org_table_id=     create_info->tabledef_version;
        backup_log_ddl(&ddl_log);
Monty's avatar
Monty committed
5455
      }
5456
    }
unknown's avatar
unknown committed
5457
  }
5458

5459 5460
  ddl_log_complete(&ddl_log_state_rm);
  ddl_log_complete(&ddl_log_state_create);
Marko Mäkelä's avatar
Marko Mäkelä committed
5461

5462 5463 5464 5465 5466 5467
  if (create_info->table_was_deleted)
  {
    /* Unlock locked table that was dropped by CREATE. */
    (void) trans_rollback_stmt(thd);
    thd->locked_tables_list.unlock_locked_table(thd, create_info->mdl_ticket);
  }
5468

5469
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
5470
}