sql_insert.cc 110 KB
Newer Older
1
/* Copyright (C) 2000-2006 MySQL AB
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
2

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3 4
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
5
   the Free Software Foundation; version 2 of the License.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
6

bk@work.mysql.com's avatar
bk@work.mysql.com committed
7 8 9 10
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
11

bk@work.mysql.com's avatar
bk@work.mysql.com committed
12 13 14 15 16 17 18
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
  delayed_insert::get_local_table() the table of the thread is copied
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

bk@work.mysql.com's avatar
bk@work.mysql.com committed
57
#include "mysql_priv.h"
58 59
#include "sp_head.h"
#include "sql_trigger.h"
60
#include "sql_select.h"
61
#include "sql_show.h"
62
#include "slave.h"
63
#include "rpl_mi.h"
bk@work.mysql.com's avatar
bk@work.mysql.com committed
64

65
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
66
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
67 68
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
                         LEX_STRING query, bool ignore, bool log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
69
static void end_delayed_insert(THD *thd);
70
pthread_handler_t handle_delayed_insert(void *arg);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
71
static void unlink_blobs(register TABLE *table);
72
#endif
73
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
74 75 76 77 78 79 80 81 82 83 84

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
/*
  Check that insert/update fields are from the same single table of a view.

  SYNOPSIS
    check_view_single_update()
    fields            The insert/update fields to be checked.
    view              The view for insert.
    map     [in/out]  The insert table map.

  DESCRIPTION
    This function is called in 2 cases:
    1. to check insert fields. In this case *map will be set to 0.
       Insert fields are checked to be all from the same single underlying
       table of the given view. Otherwise the error is thrown. Found table
       map is returned in the map parameter.
    2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
       In this case *map contains table_map found on the previous call of
       the function to check insert fields. Update fields are checked to be
       from the same table as the insert fields.

  RETURN
    0   OK
    1   Error
*/

bool check_view_single_update(List<Item> &fields, TABLE_LIST *view,
                              table_map *map)
{
  /* it is join view => we need to find the table for update */
  List_iterator_fast<Item> it(fields);
  Item *item;
  TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
  table_map tables= 0;

  while ((item= it++))
    tables|= item->used_tables();

  /* Check found map against provided map */
  if (*map)
  {
    if (tables != *map)
      goto error;
    return FALSE;
  }

  if (view->check_single_table(&tbl, tables, view) || tbl == 0)
    goto error;

  view->table= tbl->table;
  *map= tables;

  return FALSE;

error:
  my_error(ER_VIEW_MULTIUPDATE, MYF(0),
           view->view_db.str, view->view_name.str);
  return TRUE;
}

144

bk@work.mysql.com's avatar
bk@work.mysql.com committed
145
/*
146
  Check if insert fields are correct.
147 148 149 150 151 152 153

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
ingo@mysql.com's avatar
ingo@mysql.com committed
154
    check_unique                If duplicate values should be rejected.
155 156 157 158 159 160 161 162 163

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
164 165
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
166 167
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
168
                               bool check_unique, table_map *map)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
169
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
170
  TABLE *table= table_list->table;
171

172 173
  if (!table_list->updatable)
  {
174
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
175 176 177
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
178 179
  if (fields.elements == 0 && values.elements != 0)
  {
180 181 182 183 184 185
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
186
    if (values.elements != table->s->fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
187
    {
monty@mysql.com's avatar
monty@mysql.com committed
188
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
189 190
      return -1;
    }
hf@deer.(none)'s avatar
hf@deer.(none) committed
191
#ifndef NO_EMBEDDED_ACCESS_CHECKS
192
    if (grant_option)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
193
    {
194 195
      Field_iterator_table field_it;
      field_it.set_table(table);
196
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
197
                                  table->s->db.str, table->s->table_name.str,
198
                                  &field_it))
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
199 200
        return -1;
    }
hf@deer.(none)'s avatar
hf@deer.(none) committed
201
#endif
202 203
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
204 205 206 207
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
208
    bitmap_set_all(table->write_set);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
209 210 211
  }
  else
  {						// Part field list
212 213
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
214
    Name_resolution_context_state ctx_state;
215
    int res;
216

bk@work.mysql.com's avatar
bk@work.mysql.com committed
217 218
    if (fields.elements != values.elements)
    {
monty@mysql.com's avatar
monty@mysql.com committed
219
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
220 221 222
      return -1;
    }

223
    thd->dup_field= 0;
224 225 226
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
227
    ctx_state.save_state(context, table_list);
228 229 230 231 232

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
233
    table_list->next_local= 0;
234
    context->resolve_in_table_list_only(table_list);
235
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
236 237

    /* Restore the current context. */
238
    ctx_state.restore_state(context, table_list);
239
    thd->lex->select_lex.no_wrap_view_item= FALSE;
240

241
    if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
242
      return -1;
243

igor@rurik.mysql.com's avatar
igor@rurik.mysql.com committed
244
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
245
    {
246
      if (check_view_single_update(fields, table_list, map))
247
        return -1;
248
      table= table_list->table;
249
    }
250

251
    if (check_unique && thd->dup_field)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
252
    {
253
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
254 255
      return -1;
    }
256 257 258 259 260 261 262 263 264 265 266 267
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
268
  }
269
  // For the values we need select_priv
hf@deer.(none)'s avatar
hf@deer.(none) committed
270
#ifndef NO_EMBEDDED_ACCESS_CHECKS
271
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
hf@deer.(none)'s avatar
hf@deer.(none) committed
272
#endif
273 274 275

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
276
       check_view_insertability(thd, table_list)))
277
  {
278
    my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
279 280 281
    return -1;
  }

bk@work.mysql.com's avatar
bk@work.mysql.com committed
282 283 284 285
  return 0;
}


286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

ingo@mysql.com's avatar
ingo@mysql.com committed
305
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
306
                               List<Item> &update_fields, table_map *map)
307
{
ingo@mysql.com's avatar
ingo@mysql.com committed
308
  TABLE *table= insert_table_list->table;
309
  my_bool timestamp_mark;
310

311 312
  LINT_INIT(timestamp_mark);

313 314
  if (table->timestamp_field)
  {
315 316 317 318 319 320
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
321 322
  }

323 324
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
325 326
    return -1;

327 328 329 330
  if (insert_table_list->effective_algorithm == VIEW_ALGORITHM_MERGE &&
      check_view_single_update(update_fields, insert_table_list, map))
    return -1;

331 332 333
  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
334 335
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
336 337
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
338 339 340
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
341 342 343 344
  }
  return 0;
}

345
/*
346 347 348 349 350 351 352 353 354 355 356 357
  Prepare triggers  for INSERT-like statement.

  SYNOPSIS
    prepare_triggers_for_insert_stmt()
      table   Table to which insert will happen

  NOTE
    Prepare triggers for INSERT-like statement by marking fields
    used by triggers and inform handlers that batching of UPDATE/DELETE 
    cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/

358
void prepare_triggers_for_insert_stmt(TABLE *table)
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
{
  if (table->triggers)
  {
    if (table->triggers->has_triggers(TRG_EVENT_DELETE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER DELETE triggers that might access to 
        subject table and therefore might need delete to be done 
        immediately. So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
    }
    if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
                                      TRG_ACTION_AFTER))
    {
      /*
        The table has AFTER UPDATE triggers that might access to subject 
        table and therefore might need update to be done immediately. 
        So we turn-off the batching.
      */ 
      (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
    }
  }
383
  table->mark_columns_needed_for_insert();
384 385
}

386

387 388 389 390 391
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
392 393
                  enum_duplicates duplic,
		  bool ignore)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
394
{
395
  int error, res;
396
  bool transactional_table, joins_freed= FALSE;
397
  bool changed;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
398 399 400 401
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
402
  TABLE *table= 0;
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
403
  List_iterator_fast<List_item> its(values_list);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
404
  List_item *values;
405
  Name_resolution_context *context;
406
  Name_resolution_context_state ctx_state;
407
#ifndef EMBEDDED_LIBRARY
408
  char *query= thd->query;
409
#endif
410 411 412 413 414 415
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
416
                (!(thd->security_ctx->master_access & SUPER_ACL)));
417
  thr_lock_type lock_type = table_list->lock_type;
monty@mysql.com's avatar
monty@mysql.com committed
418
  Item *unused_conds= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
419 420
  DBUG_ENTER("mysql_insert");

421 422 423 424 425
  /*
    in safe mode or with skip-new change delayed insert to be regular
    if we are told to replace duplicates, the insert cannot be concurrent
    delayed insert changed to regular in slave thread
   */
426 427 428 429
#ifdef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    lock_type=TL_WRITE;
#else
430 431
  if ((lock_type == TL_WRITE_DELAYED &&
       ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
432
	thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
433 434
      (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
      (duplic == DUP_UPDATE))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
435
    lock_type=TL_WRITE;
436
#endif
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
  if ((lock_type == TL_WRITE_DELAYED) &&
      (global_system_variables.binlog_format == BINLOG_FORMAT_STMT) &&
      log_on && mysql_bin_log.is_open() &&
      (values_list.elements > 1))
  {
    /*
      Statement-based binary logging does not work in this case, because:
      a) two concurrent statements may have their rows intermixed in the
      queue, leading to autoincrement replication problems on slave (because
      the values generated used for one statement don't depend only on the
      value generated for the first row of this statement, so are not
      replicable)
      b) if first row of the statement has an error the full statement is
      not binlogged, while next rows of the statement may be inserted.
      c) if first row succeeds, statement is binlogged immediately with a
      zero error code (i.e. "no error"), if then second row fails, query
      will fail on slave too and slave will stop (wrongly believing that the
      master got no error).
      So we fallback to non-delayed INSERT.
      Note that to be fully correct, we should test the "binlog format which
      the delayed thread is going to use for this row". But in the common case
      where the global binlog format is not changed and the session binlog
      format may be changed, that is equal to the global binlog format.
      We test it without mutex for speed reasons (condition rarely true), and
      in the common case (global not changed) it is as good as without mutex;
      if global value is changed, anyway there is uncertainty as the delayed
      thread may be old and use the before-the-change value.
    */
    lock_type= TL_WRITE;
  }
467
  table_list->lock_type= lock_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
468

469
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
470 471
  if (lock_type == TL_WRITE_DELAYED)
  {
472
    res= 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
473 474
    if (thd->locked_tables)
    {
475 476
      DBUG_ASSERT(table_list->db); /* Must be set in the parser */
      if (find_locked_table(thd, table_list->db, table_list->table_name))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
477
      {
478
	my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
479
                 table_list->table_name);
480
	DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
481 482
      }
    }
483
    if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
484
    {
485 486 487 488 489
      /*
        Open tables used for sub-selects or in stored functions, will also
        cache these functions.
      */
      res= open_and_lock_tables(thd, table_list->next_global);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
490 491 492 493 494 495
      /*
	First is not processed by open_and_lock_tables() => we need set
	updateability flags "by hands".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
496
    }
497
    else
498
    {
499 500
      /* Too many delayed insert threads;  Use a normal insert */
      table_list->lock_type= lock_type= TL_WRITE;
501
      res= open_and_lock_tables(thd, table_list);
502
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
503 504
  }
  else
505
#endif /* EMBEDDED_LIBRARY */
506
    res= open_and_lock_tables(thd, table_list);
507
  if (res || thd->is_fatal_error)
508
    DBUG_RETURN(TRUE);
509

bk@work.mysql.com's avatar
bk@work.mysql.com committed
510
  thd->proc_info="init";
511
  thd->used_tables=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
512
  values= its++;
513
  value_count= values->elements;
514

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
515
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
monty@mysql.com's avatar
monty@mysql.com committed
516
			   update_fields, update_values, duplic, &unused_conds,
517 518 519 520 521
                           FALSE,
                           (fields.elements || !value_count),
                           !ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
522
    goto abort;
523

524 525 526
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

527
  context= &thd->lex->select_lex.context;
528 529 530 531 532 533 534 535 536
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

537
  /* Save the state of the current name resolution context. */
538
  ctx_state.save_state(context, table_list);
539 540 541 542 543

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
544
  table_list->next_local= 0;
545 546
  context->resolve_in_table_list_only(table_list);

547
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
548 549 550 551
  {
    counter++;
    if (values->elements != value_count)
    {
552
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
553 554
      goto abort;
    }
555
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
556 557 558
      goto abort;
  }
  its.rewind ();
559 560
 
  /* Restore the current context. */
561
  ctx_state.restore_state(context, table_list);
562

bk@work.mysql.com's avatar
bk@work.mysql.com committed
563
  /*
564
    Fill in the given fields and dump it to the table file
bk@work.mysql.com's avatar
bk@work.mysql.com committed
565
  */
vva@eagle.mysql.r18.ru's avatar
vva@eagle.mysql.r18.ru committed
566
  info.records= info.deleted= info.copied= info.updated= 0;
567
  info.ignore= ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
568
  info.handle_duplicates=duplic;
569 570
  info.update_fields= &update_fields;
  info.update_values= &update_values;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
571
  info.view= (table_list->view ? table_list : 0);
572

573 574 575
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
576
    to NULL.
577
  */
578
  thd->count_cuted_fields= ((values_list.elements == 1 &&
monty@mysql.com's avatar
monty@mysql.com committed
579
                             !ignore) ?
580 581
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
582 583 584
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

585 586 587 588 589 590 591 592
#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    goto abort;
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
593 594
  error=0;
  thd->proc_info="update";
595
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
596
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
597 598 599
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
600 601 602 603
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
604 605 606 607
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
608 609 610 611
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
612
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
613
    table->file->ha_start_bulk_insert(values_list.elements);
614

615
  thd->no_trans_update= 0;
616 617 618
  thd->abort_on_warning= (!ignore && (thd->variables.sql_mode &
                                       (MODE_STRICT_TRANS_TABLES |
                                        MODE_STRICT_ALL_TABLES)));
619

620 621
  prepare_triggers_for_insert_stmt(table);

622

623 624 625 626
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

627
  while ((values= its++))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
628 629 630
  {
    if (fields.elements || !value_count)
    {
631
      restore_record(table,s->default_values);	// Get empty record
632 633 634
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
635
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
636
	if (values_list.elements != 1 && !thd->net.report_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
637 638 639 640
	{
	  info.records++;
	  continue;
	}
641 642 643 644 645
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
646 647 648 649 650 651
	error=1;
	break;
      }
    }
    else
    {
652
      if (thd->used_tables)			// Column used in values()
653
	restore_record(table,s->default_values);	// Get empty record
654
      else
655 656 657 658 659 660 661 662 663 664 665
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
666
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
667
	if (values_list.elements != 1 && ! thd->net.report_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
668 669 670 671 672 673 674 675
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
676

677 678 679
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
680
					     ignore))) ==
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
681 682 683
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
684
    {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
685 686
      error= 1;
      break;
687
    }
688
#ifndef EMBEDDED_LIBRARY
689
    if (lock_type == TL_WRITE_DELAYED)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
690
    {
691 692
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
693 694 695
      query=0;
    }
    else
696
#endif
697
      error=write_record(thd, table ,&info);
698 699
    if (error)
      break;
700
    thd->row_count++;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
701
  }
702

703 704 705
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

706 707 708 709
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
710
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
711 712
  if (lock_type == TL_WRITE_DELAYED)
  {
713 714 715 716 717
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
718
    query_cache_invalidate3(thd, table_list, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
719 720
  }
  else
721
#endif
bk@work.mysql.com's avatar
bk@work.mysql.com committed
722
  {
723 724 725 726 727
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
728
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
729
    {
serg@serg.mylan's avatar
serg@serg.mylan committed
730 731
      table->file->print_error(my_errno,MYF(0));
      error=1;
732
    }
733
    transactional_table= table->file->has_transactions();
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
734

735
    if ((changed= (info.copied || info.deleted || info.updated)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
736
    {
737 738 739 740 741 742 743
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
744
      {
745
        if (mysql_bin_log.is_open())
746
        {
747 748
          if (error <= 0)
            thd->clear_error();
gluh@mysql.com's avatar
gluh@mysql.com committed
749 750 751 752 753
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
                                transactional_table, FALSE) &&
              transactional_table)
          {
754
            error=1;
gluh@mysql.com's avatar
gluh@mysql.com committed
755
          }
756
        }
757 758
        if (!transactional_table)
          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
759
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
760
    }
761
    if (transactional_table)
762
      error=ha_autocommit_or_rollback(thd,error);
763

bk@work.mysql.com's avatar
bk@work.mysql.com committed
764 765 766
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
767 768 769 770 771 772 773 774 775 776
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
777 778 779 780
      thd->lock=0;
    }
  }
  thd->proc_info="end";
781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
797
  table->next_number_field=0;
798
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
799
  table->auto_increment_field_not_null= FALSE;
800
  if (duplic != DUP_ERROR || ignore)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
801
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
802 803 804
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
805

bk@work.mysql.com's avatar
bk@work.mysql.com committed
806 807 808 809
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
810 811
  {
    thd->row_count_func= info.copied+info.deleted+info.updated;
812
    send_ok(thd, (ulong) thd->row_count_func, id);
813
  }
814 815
  else
  {
bk@work.mysql.com's avatar
bk@work.mysql.com committed
816
    char buff[160];
817
    if (ignore)
818 819 820
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
821
    else
822
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
monty@mysql.com's avatar
monty@mysql.com committed
823
	      (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
824
    thd->row_count_func= info.copied+info.deleted+info.updated;
825
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
826
  }
827
  thd->abort_on_warning= 0;
828
  DBUG_RETURN(FALSE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
829 830

abort:
831
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
832 833
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
834
#endif
835
  if (table != NULL)
836
    table->file->ha_release_auto_increment();
837 838
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
839
  thd->abort_on_warning= 0;
840
  DBUG_RETURN(TRUE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
841 842 843
}


bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
844 845 846 847 848
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
849
    thd     - thread handler
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
850 851
    view    - reference on VIEW

852 853 854 855 856 857
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
858 859
  RETURN
    FALSE - OK
860 861 862
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
863 864 865
    TRUE  - can't be used for insert
*/

866
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
867
{
868
  uint num= view->view->select_lex.item_list.elements;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
869
  TABLE *table= view->table;
870 871 872
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
monty@mysql.com's avatar
monty@mysql.com committed
873
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
874
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
875
  MY_BITMAP used_fields;
evgen@sunlight.local's avatar
evgen@sunlight.local committed
876
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
877 878
  DBUG_ENTER("check_key_in_view");

879 880 881
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
882 883
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

884
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
885 886
  bitmap_clear_all(&used_fields);

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
887
  view->contain_auto_increment= 0;
888 889 890 891
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
evgen@sunlight.local's avatar
evgen@sunlight.local committed
892
  thd->mark_used_columns= MARK_COLUMNS_NONE;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
893
  /* check simplicity and prepare unique test of view */
894
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
895
  {
896
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
897
    {
evgen@sunlight.local's avatar
evgen@sunlight.local committed
898
      thd->mark_used_columns= save_mark_used_columns;
899 900
      DBUG_RETURN(TRUE);
    }
901
    Item_field *field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
902
    /* simple SELECT list entry (field without expression) */
bell@sanja.is.com.ua's avatar
merge  
bell@sanja.is.com.ua committed
903
    if (!(field= trans->item->filed_for_view_update()))
904
    {
evgen@sunlight.local's avatar
evgen@sunlight.local committed
905
      thd->mark_used_columns= save_mark_used_columns;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
906
      DBUG_RETURN(TRUE);
907
    }
908
    if (field->field->unireg_check == Field::NEXT_NUMBER)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
909 910
      view->contain_auto_increment= 1;
    /* prepare unique test */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
911 912 913 914 915
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
916
  }
evgen@sunlight.local's avatar
evgen@sunlight.local committed
917
  thd->mark_used_columns= save_mark_used_columns;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
918
  /* unique test */
919
  for (trans= trans_start; trans != trans_end; trans++)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
920
  {
921
    /* Thanks to test above, we know that all columns are of type Item_field */
922
    Item_field *field= (Item_field *)trans->item;
923 924 925
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
926 927 928 929 930 931 932
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
933
/*
934
  Check if table can be updated
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
935 936

  SYNOPSIS
937 938
     mysql_prepare_insert_check_table()
     thd		Thread handle
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
939
     table_list		Table list
940 941
     fields		List of fields to be updated
     where		Pointer to where clause
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
942
     select_insert      Check is making for SELECT ... INSERT
943 944

   RETURN
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
945 946
     FALSE ok
     TRUE  ERROR
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
947
*/
948

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
949
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
950
                                             List<Item> &fields,
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
951
                                             bool select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
952
{
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
953
  bool insert_into_view= (table_list->view != 0);
954
  DBUG_ENTER("mysql_prepare_insert_check_table");
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
955

956 957 958 959 960 961 962
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

963 964
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
965
                                    table_list,
966
                                    &thd->lex->select_lex.leaf_tables,
967
                                    select_insert, INSERT_ACL, SELECT_ACL))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
968
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
969 970 971 972

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
973 974 975 976
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
977
      DBUG_RETURN(TRUE);
978
    }
979
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
980 981
  }

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
982
  DBUG_RETURN(FALSE);
983 984 985 986 987 988 989 990 991 992
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
monty@mysql.com's avatar
monty@mysql.com committed
993 994
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
monty@mysql.com's avatar
monty@mysql.com committed
995 996
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
997 998 999 1000
    check_fields        TRUE if need to check that all INSERT fields are 
                        given values.
    abort_on_warning    whether to report if some INSERT field is not 
                        assigned as an error (TRUE) or as a warning (FALSE).
1001

monty@mishka.local's avatar
monty@mishka.local committed
1002 1003 1004 1005 1006
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1007

1008 1009 1010
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
monty@mysql.com's avatar
monty@mysql.com committed
1011
  
1012
  RETURN VALUE
1013 1014
    FALSE OK
    TRUE  error
1015 1016
*/

monty@mysql.com's avatar
monty@mysql.com committed
1017
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
monty@mysql.com's avatar
monty@mysql.com committed
1018
                          TABLE *table, List<Item> &fields, List_item *values,
monty@mishka.local's avatar
monty@mishka.local committed
1019
                          List<Item> &update_fields, List<Item> &update_values,
monty@mysql.com's avatar
monty@mysql.com committed
1020
                          enum_duplicates duplic,
1021 1022
                          COND **where, bool select_insert,
                          bool check_fields, bool abort_on_warning)
1023
{
monty@mysql.com's avatar
monty@mysql.com committed
1024
  SELECT_LEX *select_lex= &thd->lex->select_lex;
1025
  Name_resolution_context *context= &select_lex->context;
1026
  Name_resolution_context_state ctx_state;
1027
  bool insert_into_view= (table_list->view != 0);
monty@mysql.com's avatar
monty@mysql.com committed
1028
  bool res= 0;
1029
  table_map map= 0;
1030
  DBUG_ENTER("mysql_prepare_insert");
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1031 1032 1033
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
1034 1035
  /* INSERT should have a SELECT or VALUES clause */
  DBUG_ASSERT (!select_insert || !values);
monty@mishka.local's avatar
monty@mishka.local committed
1036

1037 1038 1039 1040 1041 1042 1043
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
monty@mysql.com's avatar
monty@mysql.com committed
1044
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

monty@mishka.local's avatar
monty@mishka.local committed
1057
  if (duplic == DUP_UPDATE)
1058 1059
  {
    /* it should be allocated before Item::fix_fields() */
monty@mishka.local's avatar
monty@mishka.local committed
1060
    if (table_list->set_insert_values(thd->mem_root))
monty@mysql.com's avatar
monty@mysql.com committed
1061
      DBUG_RETURN(TRUE);
1062
  }
monty@mishka.local's avatar
monty@mishka.local committed
1063

1064
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1065
    DBUG_RETURN(TRUE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1066

1067 1068

  /* Prepare the fields in the statement. */
1069
  if (values)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1070
  {
1071 1072 1073 1074 1075 1076
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
    DBUG_ASSERT (!select_lex->group_list.elements);

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);

1077
    /*
1078 1079 1080 1081 1082 1083
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
     */
    table_list->next_local= 0;
    context->resolve_in_table_list_only(table_list);

1084 1085
    res= check_insert_fields(thd, context->table_list, fields, *values,
                             !insert_into_view, &map) ||
1086
      setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099

    if (!res && check_fields)
    {
      bool saved_abort_on_warning= thd->abort_on_warning;
      thd->abort_on_warning= abort_on_warning;
      res= check_that_all_fields_are_given_values(thd, 
                                                  table ? table : 
                                                  context->table_list->table,
                                                  context->table_list);
      thd->abort_on_warning= saved_abort_on_warning;
    }

    if (!res && duplic == DUP_UPDATE)
monty@mysql.com's avatar
monty@mysql.com committed
1100
    {
1101 1102 1103
      select_lex->no_wrap_view_item= TRUE;
      res= check_update_fields(thd, context->table_list, update_fields, &map);
      select_lex->no_wrap_view_item= FALSE;
monty@mysql.com's avatar
monty@mysql.com committed
1104
    }
1105 1106 1107 1108

    /* Restore the current context. */
    ctx_state.restore_state(context, table_list);

monty@mysql.com's avatar
monty@mysql.com committed
1109
    if (!res)
1110
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
monty@mysql.com's avatar
monty@mysql.com committed
1111
  }
1112

monty@mysql.com's avatar
monty@mysql.com committed
1113 1114
  if (res)
    DBUG_RETURN(res);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
1115

bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1116 1117 1118
  if (!table)
    table= table_list->table;

1119
  if (!select_insert)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1120
  {
1121
    Item *fake_conds= 0;
1122
    TABLE_LIST *duplicate;
1123
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
1124
    {
1125
      update_non_unique_table_error(table_list, "INSERT", duplicate);
1126 1127
      DBUG_RETURN(TRUE);
    }
1128
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
monty@mysql.com's avatar
monty@mysql.com committed
1129
    select_lex->first_execution= 0;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1130
  }
1131
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1132
    table->prepare_for_position();
1133
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1134 1135 1136
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
1137 1138 1139 1140
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
1141
  while (++keynr < table->s->keys)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1142 1143 1144 1145 1146 1147 1148
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
1159

1160 1161 1162 1163 1164
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
1165

1166 1167 1168 1169 1170 1171
    Sets thd->no_trans_update if table which is updated didn't have
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1172 1173 1174
*/


1175
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1176
{
1177
  int error, trg_error= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1178
  char *key=0;
1179
  MY_BITMAP *save_read_set, *save_write_set;
1180 1181
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1182
  DBUG_ENTER("write_record");
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1183

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1184
  info->records++;
1185 1186
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1187

1188 1189
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1190
  {
1191
    while ((error=table->file->ha_write_row(table->record[0])))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1192
    {
1193
      uint key_nr;
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1205
      bool is_duplicate_key_error;
1206
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1207
	goto err;
1208
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
1209 1210
      if (!is_duplicate_key_error)
      {
1211 1212 1213 1214 1215
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
1216
        if (info->ignore)
1217 1218
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
1219
      }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1220 1221
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1222
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1223 1224
	goto err;
      }
1225 1226
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1227 1228 1229 1230 1231
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1232 1233
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1234
          key_nr == table->s->next_number_index &&
1235
	  (insert_id_for_cur_row > 0))
1236
	goto err;
1237
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1238
      {
1239
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1240 1241 1242 1243
	  goto err;
      }
      else
      {
1244
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1245 1246 1247 1248
	{
	  error=my_errno;
	  goto err;
	}
monty@hundin.mysql.fi's avatar
monty@hundin.mysql.fi committed
1249

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1250 1251
	if (!key)
	{
1252
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1253 1254 1255 1256 1257 1258
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1259
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1260
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1261
                                                (byte*) key, HA_WHOLE_KEY,
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1262 1263 1264
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1265
      if (info->handle_duplicates == DUP_UPDATE)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1266
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1267
        int res= 0;
monty@mysql.com's avatar
monty@mysql.com committed
1268 1269 1270 1271
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1272
        */
1273
	DBUG_ASSERT(table->insert_values != NULL);
1274 1275
        store_record(table,insert_values);
        restore_record(table,record[1]);
monty@mysql.com's avatar
monty@mysql.com committed
1276 1277
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
1278 1279 1280 1281 1282
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
                                                 *info->update_values, 0,
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1283 1284

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1285 1286 1287
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
1288
          goto ok_or_after_trg_err;
monty@mysql.com's avatar
monty@mysql.com committed
1289
        if (res == VIEW_CHECK_ERROR)
1290
          goto before_trg_err;
1291

1292
        table->file->restore_auto_increment(prev_insert_id);
1293 1294
        if ((error=table->file->ha_update_row(table->record[1],
                                              table->record[0])))
1295
	{
1296
          if (info->ignore &&
1297
              !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1298
          {
1299
            goto ok_or_after_trg_err;
1300
          }
1301
          goto err;
1302
        }
igor@olga.mysql.com's avatar
igor@olga.mysql.com committed
1303 1304
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ) ||
             compare_record(table))
1305 1306
        {
          info->updated++;
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
          /*
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
            like a regular UPDATE statement: it should not affect the value of a
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
            handled separately by THD::arg_of_last_insert_id_function.
          */
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
          if (table->next_number_field)
            table->file->adjust_next_insert_id_after_explicit_value(
              table->next_number_field->val_int());
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1318 1319
          trg_error= (table->triggers &&
                      table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
evgen@moonbone.local's avatar
evgen@moonbone.local committed
1320
                                                        TRG_ACTION_AFTER, TRUE));
1321 1322
          info->copied++;
        }
1323

1324
        goto ok_or_after_trg_err;
1325 1326 1327
      }
      else /* DUP_REPLACE */
      {
monty@mysql.com's avatar
monty@mysql.com committed
1328 1329 1330 1331 1332
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1333 1334
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1335 1336 1337 1338 1339 1340
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
monty@mysql.com's avatar
monty@mysql.com committed
1341 1342
	*/
	if (last_uniq_key(table,key_nr) &&
1343
	    !table->file->referenced_by_foreign_key() &&
1344
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1345 1346
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1347
        {
1348 1349
          if ((error=table->file->ha_update_row(table->record[1],
					        table->record[0])))
1350 1351
            goto err;
          info->deleted++;
1352
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1353 1354 1355 1356 1357
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
1358 1359 1360 1361 1362 1363 1364
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1365
          if ((error=table->file->ha_delete_row(table->record[1])))
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
            thd->no_trans_update= 1;
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1378
        }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1379 1380
      }
    }
1381
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1382 1383 1384 1385 1386 1387 1388
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1389
  }
1390
  else if ((error=table->file->ha_write_row(table->record[0])))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1391
  {
1392
    if (!info->ignore ||
1393
        table->file->is_fatal_error(error, HA_CHECK_DUP))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1394
      goto err;
1395
    table->file->restore_auto_increment(prev_insert_id);
1396
    goto ok_or_after_trg_err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1397
  }
1398 1399 1400

after_trg_n_copied_inc:
  info->copied++;
1401
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1402 1403 1404
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
1405 1406

ok_or_after_trg_err:
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1407
  if (key)
1408
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1409 1410
  if (!table->file->has_transactions())
    thd->no_trans_update= 1;
1411
  DBUG_RETURN(trg_error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1412 1413

err:
1414
  info->last_errno= error;
1415 1416 1417
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1418
  table->file->print_error(error,MYF(0));
1419
  
1420
before_trg_err:
1421
  table->file->restore_auto_increment(prev_insert_id);
1422 1423
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1424
  table->column_bitmaps_set(save_read_set, save_write_set);
1425
  DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1426 1427 1428 1429
}


/******************************************************************************
1430
  Check that all fields with arn't null_fields are used
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1431 1432
******************************************************************************/

1433 1434
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1435
{
1436
  int err= 0;
1437 1438
  MY_BITMAP *write_set= entry->write_set;

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1439 1440
  for (Field **field=entry->field ; *field ; field++)
  {
1441
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1442
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1443
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1444
    {
1445 1446 1447
      bool view= FALSE;
      if (table_list)
      {
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1448
        table_list= table_list->top_table();
kent@mysql.com's avatar
kent@mysql.com committed
1449
        view= test(table_list->view);
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1466
      err= 1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1467 1468
    }
  }
1469
  return thd->abort_on_warning ? err : 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1470 1471 1472
}

/*****************************************************************************
1473 1474
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1475 1476
*****************************************************************************/

1477 1478
#ifndef EMBEDDED_LIBRARY

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1479 1480
class delayed_row :public ilink {
public:
1481
  char *record;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1482 1483
  enum_duplicates dup;
  time_t start_time;
1484 1485 1486
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1487
  ulonglong forced_insert_id;
1488 1489
  ulong auto_increment_increment;
  ulong auto_increment_offset;
1490
  timestamp_auto_set_type timestamp_field_type;
1491
  LEX_STRING query;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1492

1493 1494 1495
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
1496
      forced_insert_id(0), query(query_arg)
1497
    {}
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1498 1499
  ~delayed_row()
  {
1500
    x_free(query.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
    x_free(record);
  }
};


class delayed_insert :public ilink {
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1517
  ulong group_count;
1518
  TABLE_LIST table_list;			// Argument
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1519 1520

  delayed_insert()
1521
    :locks_in_memory(0),
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1522 1523 1524
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1525 1526
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1527 1528 1529
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1530 1531
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1532 1533 1534 1535
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1536
    thd.set_current_stmt_binlog_row_based_if_mixed();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1537

1538 1539
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1540
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1541
    thd.security_ctx->host_or_ip= "";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1542
    bzero((char*) &info,sizeof(info));
1543
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1544 1545 1546 1547 1548 1549 1550 1551
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
  ~delayed_insert()
  {
1552
    /* The following is not really needed, but just for safety */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1553 1554 1555 1556 1557 1558
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
1559 1560 1561
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1562 1563
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1564
    thd.security_ctx->user= thd.security_ctx->host=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1565 1566 1567
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1568
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


I_List<delayed_insert> delayed_threads;


delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
1609 1610
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
  int error;
  delayed_insert *tmp;
1625
  TABLE *table;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1626 1627
  DBUG_ENTER("delayed_get_table");

1628 1629
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1630

1631
  /* Find the thread which handles this table. */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1632 1633
  if (!(tmp=find_handler(thd,table_list)))
  {
1634 1635 1636 1637
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1638
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1639
      DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1640 1641
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1642 1643 1644 1645 1646
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1647 1648 1649 1650
    {
      if (!(tmp=new delayed_insert()))
      {
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
1651
	goto err1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1652
      }
1653 1654 1655
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
1656 1657 1658
      tmp->thd.set_db(table_list->db, strlen(table_list->db));
      tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
      if (tmp->thd.db == NULL || tmp->thd.query == NULL)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1659 1660
      {
	delete tmp;
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
1661
	my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1662
	goto err1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1663
      }
1664
      tmp->table_list= *table_list;			// Needed to open table
1665
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
1677
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1678
	goto err1;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1691
	if (tmp->thd.is_fatal_error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1692 1693
	{
	  /* Copy error message and abort */
1694
	  thd->fatal_error();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1695
	  strmov(thd->net.last_error,tmp->thd.net.last_error);
1696
	  thd->net.last_errno=tmp->thd.net.last_errno;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1697 1698
	}
	tmp->unlock();
1699
	goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1700 1701 1702 1703
      }
      if (thd->killed)
      {
	tmp->unlock();
1704
	goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1705 1706 1707 1708 1709 1710
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
1711
  table= tmp->get_local_table(thd);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1712 1713 1714
  pthread_mutex_unlock(&tmp->mutex);
  if (table)
    thd->di=tmp;
1715 1716
  else if (tmp->thd.is_fatal_error)
    thd->fatal_error();
1717 1718
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1719
  DBUG_RETURN((table_list->table=table));
1720 1721 1722 1723 1724 1725

 err1:
  thd->fatal_error();
 err:
  pthread_mutex_unlock(&LOCK_delayed_create);
  DBUG_RETURN(0); // Continue with normal insert
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738
}


/*
  As we can't let many threads modify the same TABLE structure, we create
  an own structure for each tread.  This includes a row buffer to save the
  column values and new fields that points to the new row buffer.
  The memory is allocated in the client thread and is freed automaticly.
*/

TABLE *delayed_insert::get_local_table(THD* client_thd)
{
  my_ptrdiff_t adjust_ptrs;
1739
  Field **field,**org_field, *found_next_number_field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1740
  TABLE *copy;
1741
  TABLE_SHARE *share= table->s;
1742
  byte *bitmap;
1743
  DBUG_ENTER("delayed_insert::get_local_table");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
      strmov(client_thd->net.last_error,thd.net.last_error);
      client_thd->net.last_errno=thd.net.last_errno;
      goto error;
    }
  }

1767 1768 1769 1770 1771 1772 1773
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1774
  client_thd->proc_info="allocating local table";
1775
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
1776
				   (share->fields+1)*sizeof(Field**)+
1777 1778
				   share->reclength +
                                   share->column_bitmap_size*2);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1779 1780 1781
  if (!copy)
    goto error;

1782
  /* Copy the TABLE object. */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1783
  *copy= *table;
1784
  /* We don't need to change the file handler here */
1785 1786
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
1787 1788 1789
  bitmap= (byte*) (field + share->fields + 1);
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1800
  {
1801 1802
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
      DBUG_RETURN(0);
1803
    (*field)->orig_table= copy;			// Remove connection
1804
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
1805 1806
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1807 1808 1809 1810 1811 1812 1813 1814
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
1815
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
1816
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1817
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1818 1819
  }

1820 1821
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1822 1823 1824 1825

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

1826 1827 1828 1829 1830 1831 1832 1833 1834
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

1835
  DBUG_RETURN(copy);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1836 1837 1838 1839 1840 1841

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
1842
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1843 1844 1845 1846 1847
}


/* Put a question in queue */

1848 1849 1850
static int
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
              LEX_STRING query, bool ignore, bool log_on)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1851
{
1852
  delayed_row *row= 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1853
  delayed_insert *di=thd->di;
1854
  const Discrete_interval *forced_auto_inc;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1855
  DBUG_ENTER("write_delayed");
1856
  DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1857 1858 1859 1860 1861 1862 1863

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

1864
  if (thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1865 1866
    goto err;

1867 1868 1869 1870 1871 1872 1873
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
1874 1875 1876
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
1877
      goto err;
1878 1879
    query.str= str;
  }
1880 1881 1882 1883
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1884
    goto err;
1885
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1886

1887
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1888
    goto err;
1889
  memcpy(row->record, table->record[0], table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1890 1891
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
1892 1893 1894 1895 1896 1897 1898 1899 1900 1901
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
1902
  row->timestamp_field_type=    table->timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1903

1904
  /* Copy session variables. */
1905 1906
  row->auto_increment_increment= thd->variables.auto_increment_increment;
  row->auto_increment_offset=    thd->variables.auto_increment_offset;
1907 1908 1909 1910 1911 1912 1913
  /* Copy the next forced auto increment value, if any. */
  if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
  {
    row->forced_insert_id= forced_auto_inc->minimum();
    DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
                           (ulong) row->forced_insert_id));
  }
1914

bk@work.mysql.com's avatar
bk@work.mysql.com committed
1915 1916 1917
  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
1918
  if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}


static void end_delayed_insert(THD *thd)
{
1935
  DBUG_ENTER("end_delayed_insert");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1936 1937
  delayed_insert *di=thd->di;
  pthread_mutex_lock(&di->mutex);
1938
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1939 1940 1941 1942 1943 1944
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
1945
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
1959
    tmp->thd.killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1960 1961 1962
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
1963
      if (tmp->thd.mysys_var->current_cond)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1964
      {
1965 1966 1967 1968 1969 1970
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1971
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
1972 1973
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

1986
pthread_handler_t handle_delayed_insert(void *arg)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1987 1988 1989 1990 1991 1992 1993
{
  delayed_insert *di=(delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
1994
  thd->thread_id= thd->variables.pseudo_thread_id= thread_id++;
1995
  thd->end_time();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1996
  threads.append(thd);
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
1997
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
1998 1999
  pthread_mutex_unlock(&LOCK_thread_count);

2000 2001 2002 2003 2004 2005 2006 2007
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2008
  pthread_mutex_lock(&di->mutex);
2009
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2010 2011 2012 2013 2014 2015 2016 2017
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
2018
  thd->thread_stack= (char*) &thd;
2019
  if (init_thr_lock() || thd->store_globals())
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2020
  {
2021
    thd->fatal_error();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2022
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
2023
    goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2024 2025 2026
  }

  /* open table */
2027
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2028
  {
2029
    thd->fatal_error();				// Abort waiting inserts
2030
    goto err;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2031
  }
2032
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2033
  {
2034
    thd->fatal_error();
2035
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
2036
    goto err;
2037
  }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2053
    if (thd->killed == THD::KILL_CONNECTION)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
2073
      set_timespec(abstime, delayed_insert_timeout);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2074 2075 2076 2077

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
2078
      di->thd.proc_info="Waiting for INSERT";
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2079

2080
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
2081
      while (!thd->killed)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2082 2083
      {
	int error;
2084
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2085 2086 2087 2088
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
2089
	if (error && error != EINTR && error != ETIMEDOUT)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2090 2091 2092 2093 2094 2095 2096 2097 2098
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
monty@mysql.com's avatar
monty@mysql.com committed
2099
	if (error == ETIMEDOUT || error == ETIME)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2100
	{
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2101
	  thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2102 2103 2104
	  break;
	}
      }
2105 2106
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2107 2108 2109 2110
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
2111
      pthread_mutex_lock(&di->mutex);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2112
    }
2113
    di->thd.proc_info=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2114 2115 2116

    if (di->tables_in_use && ! thd->lock)
    {
2117
      bool not_used;
2118 2119 2120 2121 2122 2123 2124 2125 2126 2127
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
2128
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
2129 2130
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2131
      {
2132 2133 2134
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2135 2136 2137 2138 2139 2140 2141
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
2142 2143 2144
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2145 2146 2147 2148 2149
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
monty@mysql.com's avatar
monty@mysql.com committed
2150 2151 2152 2153
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2154 2155 2156
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
2157 2158 2159 2160
      /*
        We need to release next_insert_id before unlocking. This is
        enforced by handler::ha_external_lock().
      */
2161
      di->table->file->ha_release_auto_increment();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2162 2163 2164 2165 2166 2167 2168 2169
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

2170 2171 2172 2173 2174 2175 2176
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
2177 2178 2179 2180

    TODO: This is not true any more, table maps are generated on the
    first call to ha_*_row() instead. Remove code that are used to
    cover for the case outlined above.
2181 2182 2183
   */
  ha_rollback_stmt(thd);

2184
#ifndef __WIN__
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2185
end:
2186
#endif
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2187 2188 2189 2190 2191 2192 2193
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2194 2195
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


bool delayed_insert::handle_inserts(void)
{
  int error;
2242
  ulong max_rows;
2243 2244
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2245
  delayed_row *row;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2246 2247 2248 2249 2250 2251
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2252
  table->use_all_columns();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2253 2254 2255 2256 2257

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
2258
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2259 2260 2261 2262
    goto err;
  }

  thd.proc_info="insert";
2263
  max_rows= delayed_insert_limit;
2264
  if (thd.killed || table->s->version != refresh_version)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2265
  {
hf@genie.(none)'s avatar
SCRUM  
hf@genie.(none) committed
2266
    thd.killed= THD::KILL_CONNECTION;
2267
    max_rows= ULONG_MAX;                     // Do as much as possible
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2268 2269
  }

2270 2271 2272 2273 2274 2275 2276
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2277
  pthread_mutex_lock(&mutex);
2278

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2279 2280 2281 2282
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2283
    memcpy(table->record[0],row->record,table->s->reclength);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2284 2285 2286

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2287 2288 2289 2290 2291
    /*
      To get the exact auto_inc interval to store in the binlog we must not
      use values from the previous interval (of the previous rows).
    */
    bool log_query= (row->log_query && row->query.str != NULL);
2292 2293
    DBUG_PRINT("delayed", ("query: '%s'  length: %u", row->query.str ?
                           row->query.str : "[NULL]", row->query.length));
2294 2295
    if (log_query)
    {
2296 2297 2298 2299 2300 2301 2302 2303
      /*
        This is the first value of an INSERT statement.
        It is the right place to clear a forced insert_id.
        This is usually done after the last value of an INSERT statement,
        but we won't know this in the insert delayed thread. But before
        the first value is sufficiently equivalent to after the last
        value of the previous statement.
      */
2304 2305 2306
      table->file->ha_release_auto_increment();
      thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
    }
2307 2308 2309 2310
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2311
    table->timestamp_field_type= row->timestamp_field_type;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2312

2313
    /* Copy the session variables. */
2314 2315
    thd.variables.auto_increment_increment= row->auto_increment_increment;
    thd.variables.auto_increment_offset=    row->auto_increment_offset;
2316 2317 2318 2319 2320 2321 2322
    /* Copy a forced insert_id, if any. */
    if (row->forced_insert_id)
    {
      DBUG_PRINT("delayed", ("received auto_inc: %lu",
                             (ulong) row->forced_insert_id));
      thd.force_one_auto_inc_interval(row->forced_insert_id);
    }
2323

2324
    info.ignore= row->ignore;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2325
    info.handle_duplicates= row->dup;
2326
    if (info.ignore ||
2327
	info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2328 2329 2330 2331
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2332 2333 2334 2335 2336 2337 2338
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
2339
    thd.clear_error(); // reset error for binlog
2340
    if (write_record(&thd, table, &info))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2341
    {
2342
      info.error_count++;				// Ignore errors
2343
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
sasha@laptop.slkc.uswest.net's avatar
sasha@laptop.slkc.uswest.net committed
2344
      row->log_query = 0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2345
    }
2346

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2347 2348 2349 2350 2351
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2352 2353 2354 2355 2356
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2357

2358
    if (log_query && mysql_bin_log.is_open())
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
2359 2360 2361 2362 2363 2364 2365 2366 2367
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2368 2369 2370
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
guilhem@gbichot3.local's avatar
guilhem@gbichot3.local committed
2371
    }
2372

2373
    if (table->s->blob_fields)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2374 2375 2376 2377 2378 2379
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2380 2381 2382 2383 2384 2385 2386
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2387
	(!(row->log_query & using_bin_log)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
2401
          DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2402 2403
	  goto err;
	}
2404
	query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2405 2406 2407
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
2408 2409
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2410
	}
2411 2412
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2413 2414 2415 2416 2417 2418 2419 2420 2421
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2422

2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
2437

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2438 2439 2440 2441
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
2442
    DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2443 2444
    goto err;
  }
2445
  query_cache_invalidate3(&thd, table, 1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2446 2447 2448 2449
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2450 2451 2452
#ifndef DBUG_OFF
  max_rows= 0;                                  // For DBUG output
#endif
2453 2454 2455 2456 2457 2458
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
2459 2460 2461
#ifndef DBUG_OFF
    max_rows++;
#endif
2462
  }
2463
  DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2464 2465 2466 2467
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2468
#endif /* EMBEDDED_LIBRARY */
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2469 2470

/***************************************************************************
2471
  Store records in INSERT ... SELECT *
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2472 2473
***************************************************************************/

bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2474 2475 2476 2477 2478 2479 2480 2481 2482

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
2483 2484
    FALSE OK
    TRUE  Error
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2485 2486
*/

2487
bool mysql_insert_select_prepare(THD *thd)
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2488 2489
{
  LEX *lex= thd->lex;
monty@mysql.com's avatar
monty@mysql.com committed
2490
  SELECT_LEX *select_lex= &lex->select_lex;
monty@mysql.com's avatar
monty@mysql.com committed
2491
  TABLE_LIST *first_select_leaf_table;
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2492
  DBUG_ENTER("mysql_insert_select_prepare");
monty@mysql.com's avatar
monty@mysql.com committed
2493

2494 2495
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
monty@mysql.com's avatar
monty@mysql.com committed
2496
    clause if table is VIEW
2497
  */
monty@mysql.com's avatar
monty@mysql.com committed
2498
  
monty@mysql.com's avatar
monty@mysql.com committed
2499 2500 2501 2502
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
2503
                           &select_lex->where, TRUE, FALSE, FALSE))
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2504
    DBUG_RETURN(TRUE);
monty@mysql.com's avatar
monty@mysql.com committed
2505

2506 2507 2508 2509
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
monty@mysql.com's avatar
monty@mysql.com committed
2510 2511
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2512
  /* skip all leaf tables belonged to view where we are insert */
2513
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2514 2515 2516 2517
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
2518
       first_select_leaf_table= first_select_leaf_table->next_leaf)
2519
  {}
monty@mysql.com's avatar
monty@mysql.com committed
2520
  select_lex->leaf_tables= first_select_leaf_table;
2521
  DBUG_RETURN(FALSE);
bell@sanja.is.com.ua's avatar
VIEW  
bell@sanja.is.com.ua committed
2522 2523 2524
}


2525
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
monty@mishka.local's avatar
monty@mishka.local committed
2526
                             List<Item> *fields_par,
monty@mysql.com's avatar
monty@mysql.com committed
2527 2528
                             List<Item> *update_fields,
                             List<Item> *update_values,
monty@mishka.local's avatar
monty@mishka.local committed
2529
                             enum_duplicates duplic,
2530 2531
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2532
   autoinc_value_of_last_inserted_row(0),
2533 2534 2535
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
monty@mishka.local's avatar
monty@mishka.local committed
2536 2537 2538 2539
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2540
  if (table_list_par)
bell@sanja.is.com.ua's avatar
bell@sanja.is.com.ua committed
2541
    info.view= (table_list_par->view ? table_list_par : 0);
2542 2543 2544
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
2545
int
2546
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2547
{
2548
  LEX *lex= thd->lex;
2549
  int res;
2550
  table_map map= 0;
2551
  SELECT_LEX *lex_current_select_save= lex->current_select;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2552 2553
  DBUG_ENTER("select_insert::prepare");

2554
  unit= u;
2555

2556 2557 2558 2559 2560 2561
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2562
  res= check_insert_fields(thd, table_list, *fields, values,
2563
                           !insert_into_view, &map) ||
2564
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2565

2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577
  if (!res && fields->elements)
  {
    bool saved_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= !info.ignore && (thd->variables.sql_mode &
                                            (MODE_STRICT_TRANS_TABLES |
                                             MODE_STRICT_ALL_TABLES));
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
                                                table_list);
    thd->abort_on_warning= saved_abort_on_warning;
  }

  if (info.handle_duplicates == DUP_UPDATE && !res)
2578
  {
2579
    Name_resolution_context *context= &lex->select_lex.context;
2580 2581 2582 2583
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2584 2585

    /* Perform name resolution only in the first table - 'table_list'. */
2586
    table_list->next_local= 0;
2587 2588
    context->resolve_in_table_list_only(table_list);

2589
    lex->select_lex.no_wrap_view_item= TRUE;
2590
    res= res || check_update_fields(thd, context->table_list,
2591
                                    *info.update_fields, &map);
2592 2593
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
2594 2595 2596
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
      we can refer to other tables in the ON DUPLICATE KEY part.
      We use next_name_resolution_table descructively, so check it first (views?)
2597
    */
2598 2599 2600 2601 2602 2603 2604 2605
    DBUG_ASSERT (!table_list->next_name_resolution_table);
    if (lex->select_lex.group_list.elements == 0 &&
        !lex->select_lex.with_sum_func)
      /*
        We must make a single context out of the two separate name resolution contexts :
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
        To do that we must concatenate the two lists
      */  
2606 2607
      table_list->next_name_resolution_table= 
        ctx_state.get_first_name_resolution_table();
2608

2609 2610
    res= res || setup_fields(thd, 0, *info.update_values,
                             MARK_COLUMNS_READ, 0, 0);
2611
    if (!res)
2612
    {
2613 2614 2615 2616 2617 2618 2619 2620
      /*
        Traverse the update values list and substitute fields from the
        select for references (Item_ref objects) to them. This is done in
        order to get correct values from those fields when the select
        employs a temporary table.
      */
      List_iterator<Item> li(*info.update_values);
      Item *item;
2621

2622 2623 2624 2625 2626
      while ((item= li++))
      {
        item->transform(&Item::update_value_transformer,
                        (byte*)lex->current_select);
      }
2627 2628 2629
    }

    /* Restore the current context. */
2630
    ctx_state.restore_state(context, table_list);
2631
  }
2632

2633 2634
  lex->current_select= lex_current_select_save;
  if (res)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2635
    DBUG_RETURN(1);
2636 2637 2638 2639 2640 2641 2642 2643 2644 2645
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2646
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2647
      unique_table(thd, table_list, table_list->next_global, 0))
2648 2649
  {
    /* Using same table for INSERT and SELECT */
2650 2651
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2652
  }
2653
  else if (!thd->prelocked_mode)
2654 2655 2656 2657 2658 2659 2660
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2661 2662
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2663
    */
2664
    table->file->ha_start_bulk_insert((ha_rows) 0);
2665
  }
2666
  restore_record(table,s->default_values);		// Get empty record
2667
  table->next_number_field=table->found_next_number_field;
2668 2669 2670 2671 2672 2673 2674 2675 2676

#ifdef HAVE_REPLICATION
  if (thd->slave_thread &&
      (info.handle_duplicates == DUP_UPDATE) &&
      (table->next_number_field != NULL) &&
      rpl_master_has_bug(&active_mi->rli, 24432))
    DBUG_RETURN(1);
#endif

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2677
  thd->cuted_fields=0;
monty@mysql.com's avatar
monty@mysql.com committed
2678 2679
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2680 2681 2682
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2683
  thd->no_trans_update= 0;
monty@mysql.com's avatar
monty@mysql.com committed
2684
  thd->abort_on_warning= (!info.ignore &&
2685 2686 2687
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2688
  res= (table_list->prepare_where(thd, 0, TRUE) ||
2689
        table_list->prepare_check_option(thd));
2690 2691

  if (!res)
2692
     prepare_triggers_for_insert_stmt(table);
2693

2694
  DBUG_RETURN(res);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2695 2696
}

2697

2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2717 2718
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2719
    table->file->ha_start_bulk_insert((ha_rows) 0);
monty@mysql.com's avatar
monty@mysql.com committed
2720
  DBUG_RETURN(0);
2721 2722 2723
}


2724 2725 2726 2727 2728 2729
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2730 2731
select_insert::~select_insert()
{
2732
  DBUG_ENTER("~select_insert");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2733 2734 2735
  if (table)
  {
    table->next_number_field=0;
2736
    table->auto_increment_field_not_null= FALSE;
2737
    table->file->ha_reset();
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2738
  }
2739
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
2740
  thd->abort_on_warning= 0;
2741
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2742 2743 2744 2745 2746
}


bool select_insert::send_data(List<Item> &values)
{
2747
  DBUG_ENTER("select_insert::send_data");
serg@serg.mylan's avatar
serg@serg.mylan committed
2748
  bool error=0;
2749

2750
  if (unit->offset_limit_cnt)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2751
  {						// using limit offset,count
2752
    unit->offset_limit_cnt--;
2753
    DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2754
  }
monty@mysql.com's avatar
monty@mysql.com committed
2755

monty@mysql.com's avatar
monty@mysql.com committed
2756
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
serg@serg.mylan's avatar
serg@serg.mylan committed
2757 2758
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
monty@mysql.com's avatar
monty@mysql.com committed
2759 2760
  if (thd->net.report_error)
    DBUG_RETURN(1);
monty@mysql.com's avatar
monty@mysql.com committed
2761 2762
  if (table_list)                               // Not CREATE ... SELECT
  {
monty@mysql.com's avatar
monty@mysql.com committed
2763
    switch (table_list->view_check_option(thd, info.ignore)) {
monty@mysql.com's avatar
monty@mysql.com committed
2764 2765 2766 2767 2768
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
2769
  }
2770

2771
  error= write_record(thd, table, &info);
2772 2773
    
  if (!error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2774
  {
2775
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
2776 2777
    {
      /*
2778 2779 2780
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
2781 2782 2783 2784 2785 2786 2787 2788
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
2789 2790 2791 2792 2793 2794 2795
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
2796 2797 2798 2799 2800 2801
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2802
  }
serg@serg.mylan's avatar
serg@serg.mylan committed
2803
  DBUG_RETURN(error);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2804 2805 2806
}


serg@serg.mylan's avatar
serg@serg.mylan committed
2807 2808 2809
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
2810 2811
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
2812
  else
2813 2814
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
serg@serg.mylan's avatar
serg@serg.mylan committed
2815 2816
}

bk@work.mysql.com's avatar
bk@work.mysql.com committed
2817 2818
void select_insert::send_error(uint errcode,const char *err)
{
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2819 2820
  DBUG_ENTER("select_insert::send_error");

2821 2822 2823
  /* Avoid an extra 'unknown error' message if we already reported an error */
  if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
    my_message(errcode, err, MYF(0));
2824

2825 2826 2827 2828
  /*
    If the creation of the table failed (due to a syntax error, for
    example), no table will have been opened and therefore 'table'
    will be NULL. In that case, we still need to execute the rollback
2829
    and the end of the function.
2830 2831
   */
  if (table)
2832 2833
  {
    /*
2834 2835
      If we are not in prelocked mode, we end the bulk insert started
      before.
2836
    */
2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854
    if (!thd->prelocked_mode)
      table->file->ha_end_bulk_insert();

    /*
      If at least one row has been inserted/modified and will stay in
      the table (the table doesn't have transactions) we must write to
      the binlog (and the error code will make the slave stop).

      For many errors (example: we got a duplicate key error while
      inserting into a MyISAM table), no row will be added to the table,
      so passing the error to the slave will not help since there will
      be an error code mismatch (the inserts will succeed on the slave
      with no error).

      If table creation failed, the number of rows modified will also be
      zero, so no check for that is made.
    */
    if (info.copied || info.deleted || info.updated)
2855
    {
2856 2857
      DBUG_ASSERT(table != NULL);
      if (!table->file->has_transactions())
2858
      {
2859 2860 2861 2862 2863 2864 2865
        if (mysql_bin_log.is_open())
          thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                            table->file->has_transactions(), FALSE);
        if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
            !can_rollback_data())
          thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
        query_cache_invalidate3(thd, table, 1);
2866
      }
2867
    }
mats@romeo.(none)'s avatar
mats@romeo.(none) committed
2868
    table->file->ha_release_auto_increment();
monty@mysql.com's avatar
monty@mysql.com committed
2869
  }
2870

heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2871
  ha_rollback_stmt(thd);
monty@narttu.mysql.fi's avatar
monty@narttu.mysql.fi committed
2872
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2873 2874 2875 2876 2877
}


bool select_insert::send_eof()
{
2878
  int error;
2879
  bool const trans_table= table->file->has_transactions();
2880
  ulonglong id;
2881
  DBUG_ENTER("select_insert::send_eof");
2882 2883
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
                       trans_table, table->file->table_type()));
2884

2885
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
2886
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2887
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2888

vva@eagle.mysql.r18.ru's avatar
vva@eagle.mysql.r18.ru committed
2889
  if (info.copied || info.deleted || info.updated)
2890 2891 2892 2893 2894
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2895
    query_cache_invalidate3(thd, table, 1);
2896 2897 2898 2899 2900 2901 2902
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
2903 2904
    if (!trans_table &&
        (!table->s->tmp_table || !thd->current_stmt_binlog_row_based))
2905 2906
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
   }
heikki@hundin.mysql.fi's avatar
heikki@hundin.mysql.fi committed
2907

2908 2909 2910 2911 2912 2913
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
2914 2915
  if (mysql_bin_log.is_open())
  {
guilhem@mysql.com's avatar
guilhem@mysql.com committed
2916 2917
    if (!error)
      thd->clear_error();
2918 2919
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
2920
                      trans_table, FALSE);
2921
  }
2922 2923 2924 2925 2926 2927 2928 2929
  /*
    We will call ha_autocommit_or_rollback() also for
    non-transactional tables under row-based replication: there might
    be events in the binary logs transaction, and we need to write
    them to the binary log.
   */
  if (trans_table || thd->current_stmt_binlog_row_based)
  {
2930
    int error2= ha_autocommit_or_rollback(thd, error);
2931
    if (error2 && !error)
2932
      error= error2;
2933
  }
2934
  table->file->ha_release_auto_increment();
2935

2936
  if (error)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2937 2938
  {
    table->file->print_error(error,MYF(0));
2939
    DBUG_RETURN(1);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2940
  }
2941
  char buff[160];
2942
  if (info.ignore)
2943 2944
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2945
  else
2946
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
monty@mysql.com's avatar
monty@mysql.com committed
2947
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2948
  thd->row_count_func= info.copied+info.deleted+info.updated;
2949 2950 2951 2952 2953 2954 2955

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
2956
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2957 2958 2959 2960
}


/***************************************************************************
2961
  CREATE TABLE (SELECT) ...
bk@work.mysql.com's avatar
bk@work.mysql.com committed
2962 2963
***************************************************************************/

2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983
/*
  Create table from lists of fields and items (or open existing table
  with same name).

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
      extra_fields in/out Initial list of fields for table to be created
      keys         in     List of keys for table to be created
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
                          be added to the end of 'extra_fields' list)
      lock         out    Pointer to the MYSQL_LOCK object for table created
                          (open) will be returned in this parameter. Since
                          this table is not included in THD::lock caller is
                          responsible for explicitly unlocking this table.
knielsen@mysql.com's avatar
knielsen@mysql.com committed
2984
      hooks
2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002

  NOTES
    If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS
    flag and table with name provided already exists then this function will
    simply open existing table.
    Also note that create, open and lock sequence in this function is not
    atomic and thus contains gap for deadlock and can cause other troubles.
    Since this function contains some logic specific to CREATE TABLE ... SELECT
    it should be changed before it can be used in other contexts.

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
                                      List<create_field> *extra_fields,
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3003 3004 3005 3006
                                      List<Key> *keys,
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
3007 3008
{
  TABLE tmp_table;		// Used during 'create_field()'
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3009
  TABLE_SHARE share;
3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3021 3022 3023
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

3024 3025
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3026
  tmp_table.s->db_low_byte_first= 
3027 3028
        test(create_info->db_type == myisam_hton ||
             create_info->db_type == heap_hton);
3029 3030 3031 3032 3033
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
3034
    Field *field, *def_field;
3035
    if (item->type() == Item::FUNC_ITEM)
3036
      field= item->tmp_table_field(&tmp_table);
3037
    else
3038 3039 3040
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
    extra_fields->push_back(cr_field);
  }
  /*
    create and lock table

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
3061 3062 3063
    NOTE: By locking table which we just have created (or for which we just
    have have found that it already exists) separately from other tables used
    by the statement we create potential window for deadlock.
3064 3065 3066 3067 3068 3069
    TODO: create and open should be done atomic !
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
                            create_info, *extra_fields, *keys, 0,
3070
                            select_field_count, 0))
3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084
    {
      /*
        If we are here in prelocked mode we either create temporary table
        or prelocked mode is caused by the SELECT part of this statement.
      */
      DBUG_ASSERT(!thd->prelocked_mode ||
                  create_info->options & HA_LEX_CREATE_TMP_TABLE ||
                  thd->lex->requires_prelocking());

      /*
        NOTE: We don't want to ignore set of locked tables here if we are
              under explicit LOCK TABLES since it will open gap for deadlock
              too wide (and also is not backward compatible).
      */
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3085

3086 3087 3088 3089 3090
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               (MYSQL_LOCK_IGNORE_FLUSH |
                                ((thd->prelocked_mode == PRELOCKED) ?
                                 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0)))))
        quick_rm_table(create_info->db_type, create_table->db,
3091 3092
                       table_case_name(create_info, create_table->table_name),
                       0);
3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
  table->reginfo.lock_type=TL_WRITE;
knielsen@mysql.com's avatar
knielsen@mysql.com committed
3106
  hooks->prelock(&table, 1);                    // Call prelock hooks
3107 3108 3109 3110 3111 3112 3113
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
    VOID(pthread_mutex_lock(&LOCK_open));
    hash_delete(&open_cache,(byte*) table);
    VOID(pthread_mutex_unlock(&LOCK_open));
    quick_rm_table(create_info->db_type, create_table->db,
3114
		   table_case_name(create_info, create_table->table_name), 0);
3115 3116 3117 3118 3119 3120 3121
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


bk@work.mysql.com's avatar
bk@work.mysql.com committed
3122
int
3123
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3124 3125
{
  DBUG_ENTER("select_create::prepare");
3126

3127
  TABLEOP_HOOKS *hook_ptr= NULL;
3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145
  /*
    For row-based replication, the CREATE-SELECT statement is written
    in two pieces: the first one contain the CREATE TABLE statement
    necessary to create the table and the second part contain the rows
    that should go into the table.

    For non-temporary tables, the start of the CREATE-SELECT
    implicitly commits the previous transaction, and all events
    forming the statement will be stored the transaction cache. At end
    of the statement, the entire statement is committed as a
    transaction, and all events are written to the binary log.

    On the master, the table is locked for the duration of the
    statement, but since the CREATE part is replicated as a simple
    statement, there is no way to lock the table for accesses on the
    slave.  Hence, we have to hold on to the CREATE part of the
    statement until the statement has finished.
   */
3146 3147 3148
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
    MY_HOOKS(select_create *x) : ptr(x) { }
3149 3150

  private:
3151 3152
    virtual void do_prelock(TABLE **tables, uint count)
    {
3153 3154
      TABLE const *const table = *tables;
      if (ptr->get_thd()->current_stmt_binlog_row_based  &&
3155
          !table->s->tmp_table &&
3156 3157 3158 3159
          !ptr->get_create_info()->table_existed)
      {
        ptr->binlog_show_create_table(tables, count);
      }
3160 3161 3162 3163 3164 3165
    }

    select_create *ptr;
  };

  MY_HOOKS hooks(this);
3166
  hook_ptr= &hooks;
3167

3168
  unit= u;
3169 3170

  /*
3171 3172 3173
    Start a statement transaction before the create if we are using
    row-based replication for the statement.  If we are creating a
    temporary table, we need to start a statement transaction.
3174 3175 3176 3177 3178 3179 3180
  */
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
      thd->current_stmt_binlog_row_based)
  {
    thd->binlog_start_trans_and_stmt();
  }

3181
  if (!(table= create_table_from_items(thd, create_info, create_table,
3182 3183
                                       extra_fields, keys, &values,
                                       &thd->extra_lock, hook_ptr)))
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3184 3185
    DBUG_RETURN(-1);				// abort() deletes table

3186
  if (table->s->fields < values.elements)
3187
  {
3188
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
3189 3190 3191
    DBUG_RETURN(-1);
  }

3192
 /* First field to copy */
3193 3194 3195 3196
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
3197
    bitmap_set_bit(table->write_set, (*f)->field_index);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3198

3199
  /* Don't set timestamp if used */
3200
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3201 3202
  table->next_number_field=table->found_next_number_field;

3203
  restore_record(table,s->default_values);      // Get empty record
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3204
  thd->cuted_fields=0;
3205
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3206
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3207 3208 3209
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3210
  if (!thd->prelocked_mode)
3211
    table->file->ha_start_bulk_insert((ha_rows) 0);
3212
  thd->no_trans_update= 0;
monty@mysql.com's avatar
monty@mysql.com committed
3213
  thd->abort_on_warning= (!info.ignore &&
3214 3215 3216
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
3217 3218 3219 3220
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
  DBUG_RETURN(0);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3221 3222
}

3223
void
3224
select_create::binlog_show_create_table(TABLE **tables, uint count)
3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
3242
  */
3243
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
3244
  DBUG_ASSERT(tables && *tables && count > 0);
3245 3246 3247

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
3248
  int result;
3249
  TABLE_LIST table_list;
3250

3251 3252
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
3253
  query.length(0);      // Have to zero it since constructor doesn't
3254

3255
  result= store_create_info(thd, &table_list, &query, create_info);
3256
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
3257

3258 3259 3260 3261 3262 3263
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}

serg@serg.mylan's avatar
serg@serg.mylan committed
3264
void select_create::store_values(List<Item> &values)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3265
{
3266 3267
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3268 3269
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3270

3271 3272
void select_create::send_error(uint errcode,const char *err)
{
3273 3274 3275 3276 3277 3278 3279
  DBUG_ENTER("select_create::send_error");

  DBUG_PRINT("info",
             ("Current statement %s row-based",
              thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
  DBUG_PRINT("info",
             ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3280
              (ulong) table,
3281 3282 3283 3284 3285
              table && !table->s->tmp_table ? "is NOT" : "is"));
  DBUG_PRINT("info",
             ("Table %s prior to executing this statement",
              get_create_info()->table_existed ? "existed" : "did not exist"));

3286
  /*
3287 3288 3289 3290 3291 3292 3293 3294 3295
    This will execute any rollbacks that are necessary before writing
    the transcation cache.

    We disable the binary log since nothing should be written to the
    binary log.  This disabling is important, since we potentially do
    a "roll back" of non-transactional tables by removing the table,
    and the actual rollback might generate events that should not be
    written to the binary log.

3296 3297 3298 3299
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
3300 3301

  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3302 3303
}

monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3304

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3305 3306 3307 3308 3309 3310 3311
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
3312 3313 3314 3315 3316 3317 3318 3319
    /*
      Do an implicit commit at end of statement for non-temporary
      tables.  This can fail, but we should unlock the table
      nevertheless.
    */
    if (!table->s->tmp_table)
      ha_commit(thd);               // Can fail, but we proceed anyway

monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3320
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3321
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3322
    VOID(pthread_mutex_lock(&LOCK_open));
3323
    mysql_unlock_tables(thd, thd->extra_lock);
3324
    if (!table->s->tmp_table)
3325
    {
3326
      if (close_thread_table(thd, &table))
3327
        broadcast_refresh();
3328
    }
3329
    thd->extra_lock=0;
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3330
    table=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3331 3332 3333 3334 3335 3336 3337
    VOID(pthread_mutex_unlock(&LOCK_open));
  }
  return tmp;
}

void select_create::abort()
{
3338
  DBUG_ENTER("select_create::abort");
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3339
  VOID(pthread_mutex_lock(&LOCK_open));
3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356

  /*
    We roll back the statement, including truncating the transaction
    cache of the binary log, if the statement failed.

    We roll back the statement prior to deleting the table and prior
    to releasing the lock on the table, since there might be potential
    for failure if the rollback is executed after the drop or after
    unlocking the table.

    We also roll back the statement regardless of whether the creation
    of the table succeeded or not, since we need to reset the binary
    log state.
  */
  if (thd->current_stmt_binlog_row_based)
    ha_rollback_stmt(thd);

3357
  if (thd->extra_lock)
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3358
  {
3359 3360
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3361
  }
3362

bk@work.mysql.com's avatar
bk@work.mysql.com committed
3363 3364
  if (table)
  {
monty@donna.mysql.com's avatar
monty@donna.mysql.com committed
3365
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3366
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3367
    handlerton *table_type=table->s->db_type;
3368
    if (!table->s->tmp_table)
3369
    {
3370
      ulong version= table->s->version;
3371
      table->s->version= 0;
3372
      hash_delete(&open_cache,(byte*) table);
3373
      if (!create_info->table_existed)
3374 3375
        quick_rm_table(table_type, create_table->db,
                       create_table->table_name, 0);
3376
      /* Tell threads waiting for refresh that something has happened */
3377
      if (version != refresh_version)
3378
        broadcast_refresh();
3379 3380
    }
    else if (!create_info->table_existed)
3381 3382
      close_temporary_table(thd, table, 1, 1);
    table=0;                                    // Safety
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3383 3384
  }
  VOID(pthread_mutex_unlock(&LOCK_open));
3385
  DBUG_VOID_RETURN;
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3386 3387 3388 3389
}


/*****************************************************************************
3390
  Instansiate templates
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3391 3392
*****************************************************************************/

3393
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
monty@tik.mysql.fi's avatar
monty@tik.mysql.fi committed
3394
template class List_iterator_fast<List_item>;
3395
#ifndef EMBEDDED_LIBRARY
bk@work.mysql.com's avatar
bk@work.mysql.com committed
3396 3397 3398
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
3399
#endif /* EMBEDDED_LIBRARY */
3400
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */