sql_insert.cc 97.1 KB
Newer Older
unknown's avatar
unknown committed
1
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
unknown's avatar
unknown committed
2

unknown's avatar
unknown committed
3 4 5 6
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
unknown's avatar
unknown committed
7

unknown's avatar
unknown committed
8 9 10 11
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
unknown's avatar
unknown committed
12

unknown's avatar
unknown committed
13 14 15 16 17 18 19
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/* Insert of records */

unknown's avatar
unknown committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
/*
  INSERT DELAYED

  Insert delayed is distinguished from a normal insert by lock_type ==
  TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
  "delayed" table (delayed_get_table()), but falls back to
  open_and_lock_tables() on error and proceeds as normal insert then.

  Opening a "delayed" table means to find a delayed insert thread that
  has the table open already. If this fails, a new thread is created and
  waited for to open and lock the table.

  If accessing the thread succeeded, in
  delayed_insert::get_local_table() the table of the thread is copied
  for local use. A copy is required because the normal insert logic
  works on a target table, but the other threads table object must not
  be used. The insert logic uses the record buffer to create a record.
  And the delayed insert thread uses the record buffer to pass the
  record to the table handler. So there must be different objects. Also
  the copied table is not included in the lock, so that the statement
  can proceed even if the real table cannot be accessed at this moment.

  Copying a table object is not a trivial operation. Besides the TABLE
  object there are the field pointer array, the field objects and the
  record buffer. After copying the field objects, their pointers into
  the record must be "moved" to point to the new record buffer.

  After this setup the normal insert logic is used. Only that for
  delayed inserts write_delayed() is called instead of write_record().
  It inserts the rows into a queue and signals the delayed insert thread
  instead of writing directly to the table.

  The delayed insert thread awakes from the signal. It locks the table,
  inserts the rows from the queue, unlocks the table, and waits for the
  next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.

*/

unknown's avatar
unknown committed
58
#include "mysql_priv.h"
59 60
#include "sp_head.h"
#include "sql_trigger.h"
61
#include "sql_select.h"
62
#include "sql_show.h"
unknown's avatar
unknown committed
63 64

static int check_null_fields(THD *thd,TABLE *entry);
unknown's avatar
unknown committed
65
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
66
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
67 68
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
                         LEX_STRING query, bool ignore, bool log_on);
unknown's avatar
unknown committed
69
static void end_delayed_insert(THD *thd);
70
pthread_handler_t handle_delayed_insert(void *arg);
unknown's avatar
unknown committed
71
static void unlink_blobs(register TABLE *table);
unknown's avatar
unknown committed
72
#endif
73
static bool check_view_insertability(THD *thd, TABLE_LIST *view);
unknown's avatar
unknown committed
74 75 76 77 78 79 80 81 82 83 84

/* Define to force use of my_malloc() if the allocated memory block is big */

#ifndef HAVE_ALLOCA
#define my_safe_alloca(size, min_length) my_alloca(size)
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
#else
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
#endif

85

unknown's avatar
unknown committed
86
/*
87
  Check if insert fields are correct.
88 89 90 91 92 93 94

  SYNOPSIS
    check_insert_fields()
    thd                         The current thread.
    table                       The table for insert.
    fields                      The insert fields.
    values                      The insert values.
unknown's avatar
unknown committed
95
    check_unique                If duplicate values should be rejected.
96 97 98 99 100 101 102 103 104

  NOTE
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
    or leaves it as is, depending on if timestamp should be updated or
    not.

  RETURN
    0           OK
    -1          Error
unknown's avatar
unknown committed
105 106
*/

unknown's avatar
unknown committed
107 108 109
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
                               List<Item> &fields, List<Item> &values,
                               bool check_unique)
unknown's avatar
unknown committed
110
{
unknown's avatar
VIEW  
unknown committed
111
  TABLE *table= table_list->table;
112

113 114 115 116 117 118
  if (!table_list->updatable)
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
119 120
  if (fields.elements == 0 && values.elements != 0)
  {
121 122 123 124 125 126
    if (!table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
      return -1;
    }
127
    if (values.elements != table->s->fields)
unknown's avatar
unknown committed
128
    {
unknown's avatar
unknown committed
129
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
130 131
      return -1;
    }
unknown's avatar
SCRUM:  
unknown committed
132
#ifndef NO_EMBEDDED_ACCESS_CHECKS
133
    if (grant_option)
unknown's avatar
VIEW  
unknown committed
134 135 136
    {
      Field_iterator_table fields;
      fields.set_table(table);
137
      if (check_grant_all_columns(thd, INSERT_ACL, &table->grant,
unknown's avatar
unknown committed
138
                                  table->s->db.str, table->s->table_name.str,
unknown's avatar
VIEW  
unknown committed
139 140 141
                                  &fields))
        return -1;
    }
unknown's avatar
SCRUM:  
unknown committed
142
#endif
143 144
    clear_timestamp_auto_bits(table->timestamp_field_type,
                              TIMESTAMP_AUTO_SET_ON_INSERT);
145 146 147 148
    /*
      No fields are provided so all fields must be provided in the values.
      Thus we set all bits in the write set.
    */
149
    bitmap_set_all(table->write_set);
unknown's avatar
unknown committed
150 151 152
  }
  else
  {						// Part field list
unknown's avatar
unknown committed
153 154
    SELECT_LEX *select_lex= &thd->lex->select_lex;
    Name_resolution_context *context= &select_lex->context;
155
    Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
156
    int res;
unknown's avatar
unknown committed
157

unknown's avatar
unknown committed
158 159
    if (fields.elements != values.elements)
    {
unknown's avatar
unknown committed
160
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
unknown's avatar
unknown committed
161 162 163
      return -1;
    }

164
    thd->dup_field= 0;
unknown's avatar
unknown committed
165 166 167
    select_lex->no_wrap_view_item= TRUE;

    /* Save the state of the current name resolution context. */
168
    ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
169 170 171 172 173

    /*
      Perform name resolution only in the first table - 'table_list',
      which is the table that is inserted into.
    */
174
    table_list->next_local= 0;
175
    context->resolve_in_table_list_only(table_list);
176
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
unknown's avatar
unknown committed
177 178

    /* Restore the current context. */
179
    ctx_state.restore_state(context, table_list);
180
    thd->lex->select_lex.no_wrap_view_item= FALSE;
unknown's avatar
unknown committed
181

unknown's avatar
unknown committed
182
    if (res)
unknown's avatar
unknown committed
183
      return -1;
unknown's avatar
unknown committed
184

unknown's avatar
unknown committed
185
    if (table_list->effective_algorithm == VIEW_ALGORITHM_MERGE)
186 187 188 189
    {
      /* it is join view => we need to find table for update */
      List_iterator_fast<Item> it(fields);
      Item *item;
unknown's avatar
unknown committed
190
      TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
191
      table_map map= 0;
unknown's avatar
unknown committed
192 193

      while ((item= it++))
194
        map|= item->used_tables();
unknown's avatar
unknown committed
195
      if (table_list->check_single_table(&tbl, map, table_list) || tbl == 0)
196 197 198 199 200 201 202
      {
        my_error(ER_VIEW_MULTIUPDATE, MYF(0),
                 table_list->view_db.str, table_list->view_name.str);
        return -1;
      }
      table_list->table= table= tbl->table;
    }
203

204
    if (check_unique && thd->dup_field)
unknown's avatar
unknown committed
205
    {
206
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
unknown's avatar
unknown committed
207 208
      return -1;
    }
209 210 211 212 213 214 215 216 217 218 219 220
    if (table->timestamp_field)	// Don't automaticly set timestamp if used
    {
      if (bitmap_is_set(table->write_set,
                        table->timestamp_field->field_index))
        clear_timestamp_auto_bits(table->timestamp_field_type,
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
      else
      {
        bitmap_set_bit(table->write_set,
                       table->timestamp_field->field_index);
      }
    }
unknown's avatar
unknown committed
221
  }
unknown's avatar
unknown committed
222
  // For the values we need select_priv
unknown's avatar
SCRUM:  
unknown committed
223
#ifndef NO_EMBEDDED_ACCESS_CHECKS
224
  table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
unknown's avatar
SCRUM:  
unknown committed
225
#endif
226 227 228

  if (check_key_in_view(thd, table_list) ||
      (table_list->view &&
229
       check_view_insertability(thd, table_list)))
230 231 232 233 234
  {
    my_error(ER_NON_UPDATABLE_TABLE, MYF(0), table_list->alias, "INSERT");
    return -1;
  }

unknown's avatar
unknown committed
235 236 237 238
  return 0;
}


239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
/*
  Check update fields for the timestamp field.

  SYNOPSIS
    check_update_fields()
    thd                         The current thread.
    insert_table_list           The insert table list.
    table                       The table for update.
    update_fields               The update fields.

  NOTE
    If the update fields include the timestamp field,
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.

  RETURN
    0           OK
    -1          Error
*/

unknown's avatar
unknown committed
258
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
259 260
                               List<Item> &update_fields)
{
unknown's avatar
unknown committed
261
  TABLE *table= insert_table_list->table;
262
  my_bool timestamp_mark;
263 264 265

  if (table->timestamp_field)
  {
266 267 268 269 270 271
    /*
      Unmark the timestamp field so that we can check if this is modified
      by update_fields
    */
    timestamp_mark= bitmap_test_and_clear(table->write_set,
                                          table->timestamp_field->field_index);
272 273
  }

274 275
  /* Check the fields we are going to modify */
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
276 277 278 279 280
    return -1;

  if (table->timestamp_field)
  {
    /* Don't set timestamp column if this is modified. */
281 282
    if (bitmap_is_set(table->write_set,
                      table->timestamp_field->field_index))
283 284
      clear_timestamp_auto_bits(table->timestamp_field_type,
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
285 286 287
    if (timestamp_mark)
      bitmap_set_bit(table->write_set,
                     table->timestamp_field->field_index);
288 289 290 291 292
  }
  return 0;
}


unknown's avatar
unknown committed
293 294 295 296 297
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
298 299
                  enum_duplicates duplic,
		  bool ignore)
unknown's avatar
unknown committed
300
{
301
  int error, res;
302 303 304 305 306
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-update or --log-bin).
  */
307 308
  bool log_on= ((thd->options & OPTION_BIN_LOG) ||
                (!(thd->security_ctx->master_access & SUPER_ACL)));
309
  bool transactional_table, joins_freed= FALSE;
310
  bool changed;
unknown's avatar
unknown committed
311 312 313 314
  uint value_count;
  ulong counter = 1;
  ulonglong id;
  COPY_INFO info;
315
  TABLE *table= 0;
unknown's avatar
unknown committed
316
  List_iterator_fast<List_item> its(values_list);
unknown's avatar
unknown committed
317
  List_item *values;
unknown's avatar
unknown committed
318
  Name_resolution_context *context;
319
  Name_resolution_context_state ctx_state;
unknown's avatar
unknown committed
320 321 322
#ifndef EMBEDDED_LIBRARY
  char *query= thd->query;
#endif
unknown's avatar
unknown committed
323
  thr_lock_type lock_type = table_list->lock_type;
unknown's avatar
unknown committed
324
  Item *unused_conds= 0;
unknown's avatar
unknown committed
325 326
  DBUG_ENTER("mysql_insert");

327 328 329 330 331
  /*
    in safe mode or with skip-new change delayed insert to be regular
    if we are told to replace duplicates, the insert cannot be concurrent
    delayed insert changed to regular in slave thread
   */
332 333 334 335
#ifdef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    lock_type=TL_WRITE;
#else
unknown's avatar
unknown committed
336 337
  if ((lock_type == TL_WRITE_DELAYED &&
       ((specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE)) ||
338
	thd->slave_thread || !thd->variables.max_insert_delayed_threads)) ||
339 340
      (lock_type == TL_WRITE_CONCURRENT_INSERT && duplic == DUP_REPLACE) ||
      (duplic == DUP_UPDATE))
unknown's avatar
unknown committed
341
    lock_type=TL_WRITE;
342
#endif
unknown's avatar
unknown committed
343
  table_list->lock_type= lock_type;
unknown's avatar
unknown committed
344

345
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
346 347 348 349
  if (lock_type == TL_WRITE_DELAYED)
  {
    if (thd->locked_tables)
    {
unknown's avatar
unknown committed
350 351
      DBUG_ASSERT(table_list->db); /* Must be set in the parser */
      if (find_locked_table(thd, table_list->db, table_list->table_name))
unknown's avatar
unknown committed
352
      {
353
	my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
354
                 table_list->table_name);
unknown's avatar
unknown committed
355
	DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
356 357
      }
    }
358
    if ((table= delayed_get_table(thd,table_list)) && !thd->is_fatal_error)
359
    {
360 361 362 363 364
      /*
        Open tables used for sub-selects or in stored functions, will also
        cache these functions.
      */
      res= open_and_lock_tables(thd, table_list->next_global);
unknown's avatar
VIEW  
unknown committed
365 366 367 368 369 370
      /*
	First is not processed by open_and_lock_tables() => we need set
	updateability flags "by hands".
      */
      if (!table_list->derived && !table_list->view)
        table_list->updatable= 1;  // usual table
371
    }
unknown's avatar
unknown committed
372
    else
373
    {
374 375
      /* Too many delayed insert threads;  Use a normal insert */
      table_list->lock_type= lock_type= TL_WRITE;
unknown's avatar
unknown committed
376
      res= open_and_lock_tables(thd, table_list);
377
    }
unknown's avatar
unknown committed
378 379
  }
  else
380
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
381
    res= open_and_lock_tables(thd, table_list);
382
  if (res || thd->is_fatal_error)
unknown's avatar
unknown committed
383
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
384

unknown's avatar
unknown committed
385
  thd->proc_info="init";
386
  thd->used_tables=0;
unknown's avatar
unknown committed
387
  values= its++;
unknown's avatar
unknown committed
388

unknown's avatar
VIEW  
unknown committed
389
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
unknown's avatar
unknown committed
390 391
			   update_fields, update_values, duplic, &unused_conds,
                           FALSE))
unknown's avatar
unknown committed
392
    goto abort;
393

394 395 396
  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

unknown's avatar
unknown committed
397 398
  context= &thd->lex->select_lex.context;
  /* Save the state of the current name resolution context. */
399
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
400 401 402 403 404

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
405
  table_list->next_local= 0;
unknown's avatar
unknown committed
406 407
  context->resolve_in_table_list_only(table_list);

unknown's avatar
unknown committed
408
  value_count= values->elements;
unknown's avatar
unknown committed
409
  while ((values= its++))
unknown's avatar
unknown committed
410 411 412 413
  {
    counter++;
    if (values->elements != value_count)
    {
414
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
unknown's avatar
unknown committed
415 416
      goto abort;
    }
417
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
unknown's avatar
unknown committed
418 419 420
      goto abort;
  }
  its.rewind ();
unknown's avatar
unknown committed
421 422
 
  /* Restore the current context. */
423
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
424

unknown's avatar
unknown committed
425
  /*
unknown's avatar
unknown committed
426
    Fill in the given fields and dump it to the table file
unknown's avatar
unknown committed
427
  */
unknown's avatar
unknown committed
428
  info.records= info.deleted= info.copied= info.updated= 0;
429
  info.ignore= ignore;
unknown's avatar
unknown committed
430
  info.handle_duplicates=duplic;
431 432
  info.update_fields= &update_fields;
  info.update_values= &update_values;
unknown's avatar
unknown committed
433
  info.view= (table_list->view ? table_list : 0);
434

435 436 437
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
unknown's avatar
unknown committed
438
    to NULL.
439
  */
unknown's avatar
unknown committed
440
  thd->count_cuted_fields= ((values_list.elements == 1 &&
unknown's avatar
unknown committed
441
                             !ignore) ?
442 443
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
unknown's avatar
unknown committed
444 445 446 447 448
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

  error=0;
  thd->proc_info="update";
unknown's avatar
unknown committed
449
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
450
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
451 452 453
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
454 455 456 457
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
458 459 460 461
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
462 463 464 465
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
466
  if (lock_type != TL_WRITE_DELAYED && !thd->prelocked_mode)
467
    table->file->ha_start_bulk_insert(values_list.elements);
468

unknown's avatar
unknown committed
469
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
470
  thd->abort_on_warning= (!ignore &&
unknown's avatar
unknown committed
471 472 473 474
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));

475
  if ((fields.elements || !value_count) &&
476
      check_that_all_fields_are_given_values(thd, table, table_list))
unknown's avatar
unknown committed
477 478 479 480 481
  {
    /* thd->net.report_error is now set, which will abort the next loop */
    error= 1;
  }

482
  table->mark_columns_needed_for_insert();
483

484 485 486 487
  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

unknown's avatar
unknown committed
488
  while ((values= its++))
unknown's avatar
unknown committed
489 490 491
  {
    if (fields.elements || !value_count)
    {
492
      restore_record(table,s->default_values);	// Get empty record
unknown's avatar
unknown committed
493 494 495
      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
496
      {
unknown's avatar
unknown committed
497
	if (values_list.elements != 1 && !thd->net.report_error)
unknown's avatar
unknown committed
498 499 500 501
	{
	  info.records++;
	  continue;
	}
unknown's avatar
unknown committed
502 503 504 505 506
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
unknown's avatar
unknown committed
507 508 509 510 511 512
	error=1;
	break;
      }
    }
    else
    {
513
      if (thd->used_tables)			// Column used in values()
514
	restore_record(table,s->default_values);	// Get empty record
515
      else
unknown's avatar
unknown committed
516 517 518 519 520 521 522 523 524 525 526
      {
        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
	table->record[0][0]= table->s->default_values[0];
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
unknown's avatar
unknown committed
527
      {
unknown's avatar
unknown committed
528
	if (values_list.elements != 1 && ! thd->net.report_error)
unknown's avatar
unknown committed
529 530 531 532 533 534 535 536
	{
	  info.records++;
	  continue;
	}
	error=1;
	break;
      }
    }
537

538 539 540
    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
541
					     ignore))) ==
unknown's avatar
unknown committed
542 543 544
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
545
    {
unknown's avatar
unknown committed
546 547
      error= 1;
      break;
unknown's avatar
unknown committed
548
    }
549
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
550
    if (lock_type == TL_WRITE_DELAYED)
unknown's avatar
unknown committed
551
    {
552 553
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
unknown's avatar
unknown committed
554 555 556
      query=0;
    }
    else
557
#endif
unknown's avatar
unknown committed
558
      error=write_record(thd, table ,&info);
559 560
    if (error)
      break;
561
    thd->row_count++;
unknown's avatar
unknown committed
562
  }
563

564 565
  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;
566
  table->file->ha_release_auto_increment();
567

568 569 570 571
  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
572
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
573 574
  if (lock_type == TL_WRITE_DELAYED)
  {
575 576 577 578 579
    if (!error)
    {
      info.copied=values_list.elements;
      end_delayed_insert(thd);
    }
unknown's avatar
unknown committed
580
    query_cache_invalidate3(thd, table_list, 1);
unknown's avatar
unknown committed
581 582
  }
  else
583
#endif
unknown's avatar
unknown committed
584
  {
585
    if (!thd->prelocked_mode && table->file->ha_end_bulk_insert() && !error)
586
    {
unknown's avatar
unknown committed
587 588
      table->file->print_error(my_errno,MYF(0));
      error=1;
589
    }
590
    transactional_table= table->file->has_transactions();
unknown's avatar
unknown committed
591

592
    if ((changed= (info.copied || info.deleted || info.updated)))
unknown's avatar
unknown committed
593
    {
594 595 596 597 598 599 600
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
      if (error <= 0 || !transactional_table)
601
      {
602
        if (mysql_bin_log.is_open())
603
        {
604 605
          if (error <= 0)
            thd->clear_error();
unknown's avatar
unknown committed
606 607 608 609 610
          if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                thd->query, thd->query_length,
                                transactional_table, FALSE) &&
              transactional_table)
          {
611
            error=1;
unknown's avatar
unknown committed
612
          }
613
        }
614 615
        if (!transactional_table)
          thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
616
      }
unknown's avatar
unknown committed
617
    }
618
    if (transactional_table)
619
      error=ha_autocommit_or_rollback(thd,error);
unknown's avatar
unknown committed
620

unknown's avatar
unknown committed
621 622 623
    if (thd->lock)
    {
      mysql_unlock_tables(thd, thd->lock);
624 625 626 627 628 629 630 631 632 633
      /*
        Invalidate the table in the query cache if something changed
        after unlocking when changes become fisible.
        TODO: this is workaround. right way will be move invalidating in
        the unlock procedure.
      */
      if (lock_type ==  TL_WRITE_CONCURRENT_INSERT && changed)
      {
        query_cache_invalidate3(thd, table_list, 1);
      }
unknown's avatar
unknown committed
634 635 636 637
      thd->lock=0;
    }
  }
  thd->proc_info="end";
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.copied) ?
     table->next_number_field->val_int() : 0));
unknown's avatar
unknown committed
654
  table->next_number_field=0;
655
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
656
  if (duplic != DUP_ERROR || ignore)
unknown's avatar
unknown committed
657
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
658 659 660
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
661

unknown's avatar
unknown committed
662 663 664 665
  if (error)
    goto abort;
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
666 667
  {
    thd->row_count_func= info.copied+info.deleted+info.updated;
668
    send_ok(thd, (ulong) thd->row_count_func, id);
669
  }
670 671
  else
  {
unknown's avatar
unknown committed
672
    char buff[160];
673
    if (ignore)
674 675 676
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
	      (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
677
    else
678
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
679
	      (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
680
    thd->row_count_func= info.copied+info.deleted+info.updated;
681
    ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
682
  }
683
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
684
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
685 686

abort:
687
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
688 689
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
690
#endif
691
  if (table != NULL)
692
    table->file->ha_release_auto_increment();
693 694
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
unknown's avatar
unknown committed
695
  thd->abort_on_warning= 0;
unknown's avatar
unknown committed
696
  DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
697 698 699
}


unknown's avatar
VIEW  
unknown committed
700 701 702 703 704
/*
  Additional check for insertability for VIEW

  SYNOPSIS
    check_view_insertability()
705
    thd     - thread handler
unknown's avatar
VIEW  
unknown committed
706 707
    view    - reference on VIEW

708 709 710 711 712 713
  IMPLEMENTATION
    A view is insertable if the folloings are true:
    - All columns in the view are columns from a table
    - All not used columns in table have a default values
    - All field in view are unique (not referring to the same column)

unknown's avatar
VIEW  
unknown committed
714 715
  RETURN
    FALSE - OK
716 717 718
      view->contain_auto_increment is 1 if and only if the view contains an
      auto_increment field

unknown's avatar
VIEW  
unknown committed
719 720 721
    TRUE  - can't be used for insert
*/

722
static bool check_view_insertability(THD * thd, TABLE_LIST *view)
unknown's avatar
VIEW  
unknown committed
723
{
724
  uint num= view->view->select_lex.item_list.elements;
unknown's avatar
VIEW  
unknown committed
725
  TABLE *table= view->table;
726 727 728
  Field_translator *trans_start= view->field_translation,
		   *trans_end= trans_start + num;
  Field_translator *trans;
unknown's avatar
VIEW  
unknown committed
729
  Field **field_ptr= table->field;
unknown's avatar
unknown committed
730
  uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
unknown's avatar
unknown committed
731
  uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
732
  MY_BITMAP used_fields;
unknown's avatar
unknown committed
733
  enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
734 735
  DBUG_ENTER("check_key_in_view");

736 737 738
  if (!used_fields_buff)
    DBUG_RETURN(TRUE);  // EOM

unknown's avatar
VIEW  
unknown committed
739 740
  DBUG_ASSERT(view->table != 0 && view->field_translation != 0);

741
  VOID(bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0));
742 743
  bitmap_clear_all(&used_fields);

unknown's avatar
VIEW  
unknown committed
744
  view->contain_auto_increment= 0;
745 746 747 748
  /* 
    we must not set query_id for fields as they're not 
    really used in this context
  */
unknown's avatar
unknown committed
749
  thd->mark_used_columns= MARK_COLUMNS_NONE;
unknown's avatar
VIEW  
unknown committed
750
  /* check simplicity and prepare unique test of view */
751
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
752
  {
753
    if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
754
    {
unknown's avatar
unknown committed
755
      thd->mark_used_columns= save_mark_used_columns;
756 757
      DBUG_RETURN(TRUE);
    }
758
    Item_field *field;
unknown's avatar
VIEW  
unknown committed
759
    /* simple SELECT list entry (field without expression) */
unknown's avatar
merge  
unknown committed
760
    if (!(field= trans->item->filed_for_view_update()))
761
    {
unknown's avatar
unknown committed
762
      thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
763
      DBUG_RETURN(TRUE);
764
    }
765
    if (field->field->unireg_check == Field::NEXT_NUMBER)
unknown's avatar
VIEW  
unknown committed
766 767
      view->contain_auto_increment= 1;
    /* prepare unique test */
unknown's avatar
unknown committed
768 769 770 771 772
    /*
      remove collation (or other transparent for update function) if we have
      it
    */
    trans->item= field;
unknown's avatar
VIEW  
unknown committed
773
  }
unknown's avatar
unknown committed
774
  thd->mark_used_columns= save_mark_used_columns;
unknown's avatar
VIEW  
unknown committed
775
  /* unique test */
776
  for (trans= trans_start; trans != trans_end; trans++)
unknown's avatar
VIEW  
unknown committed
777
  {
778
    /* Thanks to test above, we know that all columns are of type Item_field */
779
    Item_field *field= (Item_field *)trans->item;
780 781 782
    /* check fields belong to table in which we are inserting */
    if (field->field->table == table &&
        bitmap_fast_test_and_set(&used_fields, field->field->field_index))
unknown's avatar
VIEW  
unknown committed
783 784 785 786 787 788 789
      DBUG_RETURN(TRUE);
  }

  DBUG_RETURN(FALSE);
}


unknown's avatar
unknown committed
790
/*
791
  Check if table can be updated
unknown's avatar
unknown committed
792 793

  SYNOPSIS
794 795
     mysql_prepare_insert_check_table()
     thd		Thread handle
unknown's avatar
unknown committed
796
     table_list		Table list
797 798
     fields		List of fields to be updated
     where		Pointer to where clause
unknown's avatar
unknown committed
799
     select_insert      Check is making for SELECT ... INSERT
800 801

   RETURN
unknown's avatar
unknown committed
802 803
     FALSE ok
     TRUE  ERROR
unknown's avatar
unknown committed
804
*/
805

unknown's avatar
unknown committed
806
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
807
                                             List<Item> &fields,
unknown's avatar
unknown committed
808
                                             bool select_insert)
unknown's avatar
unknown committed
809
{
unknown's avatar
VIEW  
unknown committed
810
  bool insert_into_view= (table_list->view != 0);
811
  DBUG_ENTER("mysql_prepare_insert_check_table");
unknown's avatar
VIEW  
unknown committed
812

813 814 815 816 817 818 819
  /*
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
     all others require SELECT_ACL only. the ACL requirement below is for
     new leaves only anyway (view-constituents), so check for SELECT rather
     than INSERT.
  */

820 821
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
                                    &thd->lex->select_lex.top_join_list,
822
                                    table_list,
823
                                    &thd->lex->select_lex.leaf_tables,
824
                                    select_insert, INSERT_ACL, SELECT_ACL))
unknown's avatar
unknown committed
825
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
826 827 828 829

  if (insert_into_view && !fields.elements)
  {
    thd->lex->empty_field_list_on_rset= 1;
830 831 832 833
    if (!table_list->table)
    {
      my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
               table_list->view_db.str, table_list->view_name.str);
unknown's avatar
unknown committed
834
      DBUG_RETURN(TRUE);
835
    }
836
    DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
unknown's avatar
VIEW  
unknown committed
837 838
  }

unknown's avatar
unknown committed
839
  DBUG_RETURN(FALSE);
840 841 842 843 844 845 846 847 848 849
}


/*
  Prepare items in INSERT statement

  SYNOPSIS
    mysql_prepare_insert()
    thd			Thread handler
    table_list	        Global/local table list
unknown's avatar
unknown committed
850 851
    table		Table to insert into (can be NULL if table should
			be taken from table_list->table)    
unknown's avatar
unknown committed
852 853
    where		Where clause (for insert ... select)
    select_insert	TRUE if INSERT ... SELECT statement
854

unknown's avatar
unknown committed
855 856 857 858 859
  TODO (in far future)
    In cases of:
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
    ON DUPLICATE KEY ...
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
unknown's avatar
unknown committed
860

861 862 863
  WARNING
    You MUST set table->insert_values to 0 after calling this function
    before releasing the table object.
unknown's avatar
unknown committed
864
  
865
  RETURN VALUE
unknown's avatar
unknown committed
866 867
    FALSE OK
    TRUE  error
868 869
*/

unknown's avatar
unknown committed
870
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
unknown's avatar
unknown committed
871
                          TABLE *table, List<Item> &fields, List_item *values,
unknown's avatar
unknown committed
872
                          List<Item> &update_fields, List<Item> &update_values,
unknown's avatar
unknown committed
873 874
                          enum_duplicates duplic,
                          COND **where, bool select_insert)
875
{
unknown's avatar
unknown committed
876
  SELECT_LEX *select_lex= &thd->lex->select_lex;
unknown's avatar
unknown committed
877
  Name_resolution_context *context= &select_lex->context;
878
  Name_resolution_context_state ctx_state;
879
  bool insert_into_view= (table_list->view != 0);
unknown's avatar
unknown committed
880
  bool res= 0;
881
  DBUG_ENTER("mysql_prepare_insert");
unknown's avatar
unknown committed
882 883 884
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
		       (ulong)table_list, (ulong)table,
		       (int)insert_into_view));
unknown's avatar
unknown committed
885

886 887 888 889 890 891 892
  /*
    For subqueries in VALUES() we should not see the table in which we are
    inserting (for INSERT ... SELECT this is done by changing table_list,
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
  */
  if (!select_insert)
  {
unknown's avatar
unknown committed
893
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
894 895 896 897 898 899 900 901 902 903 904 905
         un;
         un= un->next_unit())
    {
      for (SELECT_LEX *sl= un->first_select();
           sl;
           sl= sl->next_select())
      {
        sl->context.outer_context= 0;
      }
    }
  }

unknown's avatar
unknown committed
906
  if (duplic == DUP_UPDATE)
unknown's avatar
unknown committed
907 908
  {
    /* it should be allocated before Item::fix_fields() */
unknown's avatar
unknown committed
909
    if (table_list->set_insert_values(thd->mem_root))
unknown's avatar
unknown committed
910
      DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
911
  }
unknown's avatar
unknown committed
912

913
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
unknown's avatar
unknown committed
914
    DBUG_RETURN(TRUE);
unknown's avatar
VIEW  
unknown committed
915

unknown's avatar
unknown committed
916
  /* Save the state of the current name resolution context. */
917
  ctx_state.save_state(context, table_list);
unknown's avatar
unknown committed
918

unknown's avatar
unknown committed
919 920 921 922
  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
unknown's avatar
unknown committed
923
  table_list->next_local= 0;
unknown's avatar
unknown committed
924 925 926
  context->resolve_in_table_list_only(table_list);

  /* Prepare the fields in the statement. */
927
  if (values &&
928 929
      !(res= check_insert_fields(thd, context->table_list, fields, *values,
                                 !insert_into_view) ||
930
        setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0)) &&
931
      duplic == DUP_UPDATE)
unknown's avatar
unknown committed
932
  {
unknown's avatar
unknown committed
933
    select_lex->no_wrap_view_item= TRUE;
unknown's avatar
unknown committed
934
    res= check_update_fields(thd, context->table_list, update_fields);
unknown's avatar
unknown committed
935
    select_lex->no_wrap_view_item= FALSE;
936 937 938 939
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part.
    */       
unknown's avatar
unknown committed
940 941
    if (select_lex->group_list.elements == 0)
    {
942
      context->table_list->next_local=       ctx_state.save_next_local;
943
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
unknown's avatar
unknown committed
944
      context->first_name_resolution_table->
945
        next_name_resolution_table=          ctx_state.save_next_local;
unknown's avatar
unknown committed
946 947
    }
    if (!res)
948
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
unknown's avatar
unknown committed
949
  }
unknown's avatar
unknown committed
950 951

  /* Restore the current context. */
952
  ctx_state.restore_state(context, table_list);
unknown's avatar
unknown committed
953

unknown's avatar
unknown committed
954 955
  if (res)
    DBUG_RETURN(res);
unknown's avatar
VIEW  
unknown committed
956

unknown's avatar
unknown committed
957 958 959
  if (!table)
    table= table_list->table;

960
  if (!select_insert)
unknown's avatar
unknown committed
961
  {
962
    Item *fake_conds= 0;
963
    TABLE_LIST *duplicate;
964
    if ((duplicate= unique_table(thd, table_list, table_list->next_global)))
965
    {
966
      update_non_unique_table_error(table_list, "INSERT", duplicate);
967 968
      DBUG_RETURN(TRUE);
    }
unknown's avatar
unknown committed
969 970
    select_lex->fix_prepare_information(thd, &fake_conds);
    select_lex->first_execution= 0;
unknown's avatar
unknown committed
971
  }
972
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
973
    table->prepare_for_position();
unknown's avatar
unknown committed
974
  DBUG_RETURN(FALSE);
unknown's avatar
unknown committed
975 976 977
}


unknown's avatar
unknown committed
978 979 980 981
	/* Check if there is more uniq keys after field */

static int last_uniq_key(TABLE *table,uint keynr)
{
982
  while (++keynr < table->s->keys)
unknown's avatar
unknown committed
983 984 985 986 987 988 989
    if (table->key_info[keynr].flags & HA_NOSAME)
      return 0;
  return 1;
}


/*
unknown's avatar
unknown committed
990 991 992 993 994 995 996 997 998 999
  Write a record to table with optional deleting of conflicting records,
  invoke proper triggers if needed.

  SYNOPSIS
     write_record()
      thd   - thread context
      table - table to which record should be written
      info  - COPY_INFO structure describing handling of duplicates
              and which is used for counting number of records inserted
              and deleted.
unknown's avatar
unknown committed
1000

unknown's avatar
unknown committed
1001 1002 1003 1004 1005
  NOTE
    Once this record will be written to table after insert trigger will
    be invoked. If instead of inserting new record we will update old one
    then both on update triggers will work instead. Similarly both on
    delete triggers will be invoked if we will delete conflicting records.
unknown's avatar
unknown committed
1006

unknown's avatar
unknown committed
1007 1008 1009 1010 1011 1012
    Sets thd->no_trans_update if table which is updated didn't have
    transactions.

  RETURN VALUE
    0     - success
    non-0 - error
unknown's avatar
unknown committed
1013 1014 1015
*/


unknown's avatar
unknown committed
1016
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
unknown's avatar
unknown committed
1017
{
unknown's avatar
unknown committed
1018
  int error, trg_error= 0;
unknown's avatar
unknown committed
1019
  char *key=0;
1020
  MY_BITMAP *save_read_set, *save_write_set;
1021 1022
  ulonglong prev_insert_id= table->file->next_insert_id;
  ulonglong insert_id_for_cur_row= 0;
1023
  DBUG_ENTER("write_record");
unknown's avatar
unknown committed
1024

unknown's avatar
unknown committed
1025
  info->records++;
1026 1027
  save_read_set=  table->read_set;
  save_write_set= table->write_set;
1028

1029 1030
  if (info->handle_duplicates == DUP_REPLACE ||
      info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1031
  {
1032
    while ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1033
    {
1034
      uint key_nr;
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
      /*
        If we do more than one iteration of this loop, from the second one the
        row will have an explicit value in the autoinc field, which was set at
        the first call of handler::update_auto_increment(). So we must save
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
        0.
      */
      if (table->file->insert_id_for_cur_row > 0)
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
      else
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
unknown's avatar
unknown committed
1046
      bool is_duplicate_key_error;
1047
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1048
	goto err;
unknown's avatar
unknown committed
1049
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
unknown's avatar
unknown committed
1050 1051
      if (!is_duplicate_key_error)
      {
1052 1053 1054 1055 1056
        /*
          We come here when we had an ignorable error which is not a duplicate
          key error. In this we ignore error if ignore flag is set, otherwise
          report error as usual. We will not do any duplicate key processing.
        */
unknown's avatar
unknown committed
1057
        if (info->ignore)
1058 1059
          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
        goto err;
unknown's avatar
unknown committed
1060
      }
unknown's avatar
unknown committed
1061 1062
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
      {
1063
	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
unknown's avatar
unknown committed
1064 1065
	goto err;
      }
1066 1067
      /* Read all columns for the row we are going to replace */
      table->use_all_columns();
1068 1069 1070 1071 1072
      /*
	Don't allow REPLACE to replace a row when a auto_increment column
	was used.  This ensures that we don't get a problem when the
	whole range of the key has been used.
      */
1073 1074
      if (info->handle_duplicates == DUP_REPLACE &&
          table->next_number_field &&
1075
          key_nr == table->s->next_number_index &&
1076
	  (insert_id_for_cur_row > 0))
1077
	goto err;
1078
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
unknown's avatar
unknown committed
1079
      {
1080
	if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
unknown's avatar
unknown committed
1081 1082 1083 1084
	  goto err;
      }
      else
      {
1085
	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
unknown's avatar
unknown committed
1086 1087 1088 1089
	{
	  error=my_errno;
	  goto err;
	}
unknown's avatar
unknown committed
1090

unknown's avatar
unknown committed
1091 1092
	if (!key)
	{
1093
	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
unknown's avatar
unknown committed
1094 1095 1096 1097 1098 1099
					   MAX_KEY_LENGTH)))
	  {
	    error=ENOMEM;
	    goto err;
	  }
	}
1100
	key_copy((byte*) key,table->record[0],table->key_info+key_nr,0);
unknown's avatar
unknown committed
1101
	if ((error=(table->file->index_read_idx(table->record[1],key_nr,
1102
						(byte*) key,
unknown's avatar
unknown committed
1103 1104
						table->key_info[key_nr].
						key_length,
unknown's avatar
unknown committed
1105 1106 1107
						HA_READ_KEY_EXACT))))
	  goto err;
      }
1108
      if (info->handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
1109
      {
unknown's avatar
unknown committed
1110
        int res= 0;
unknown's avatar
unknown committed
1111 1112 1113 1114
        /*
          We don't check for other UNIQUE keys - the first row
          that matches, is updated. If update causes a conflict again,
          an error is returned
1115
        */
unknown's avatar
unknown committed
1116
	DBUG_ASSERT(table->insert_values != NULL);
unknown's avatar
unknown committed
1117 1118
        store_record(table,insert_values);
        restore_record(table,record[1]);
unknown's avatar
unknown committed
1119 1120
        DBUG_ASSERT(info->update_fields->elements ==
                    info->update_values->elements);
unknown's avatar
unknown committed
1121 1122 1123 1124 1125
        if (fill_record_n_invoke_before_triggers(thd, *info->update_fields,
                                                 *info->update_values, 0,
                                                 table->triggers,
                                                 TRG_EVENT_UPDATE))
          goto before_trg_err;
1126 1127

        /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
unknown's avatar
unknown committed
1128 1129 1130
        if (info->view &&
            (res= info->view->view_check_option(current_thd, info->ignore)) ==
            VIEW_CHECK_SKIP)
unknown's avatar
unknown committed
1131
          goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1132
        if (res == VIEW_CHECK_ERROR)
unknown's avatar
unknown committed
1133
          goto before_trg_err;
1134

1135 1136
        if ((error=table->file->ha_update_row(table->record[1],
                                              table->record[0])))
1137
	{
1138
          if (info->ignore &&
1139
              !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1140
          {
1141
            table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1142
            goto ok_or_after_trg_err;
1143
          }
1144
          goto err;
1145
	}
unknown's avatar
unknown committed
1146
        info->updated++;
1147
        /*
1148 1149 1150 1151 1152
          If ON DUP KEY UPDATE updates a row instead of inserting one, it's
          like a regular UPDATE statement: it should not affect the value of a
          next SELECT LAST_INSERT_ID() or mysql_insert_id().
          Except if LAST_INSERT_ID(#) was in the INSERT query, which is
          handled separately by THD::arg_of_last_insert_id_function.
1153
        */
1154
        insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1155
        if (table->next_number_field)
1156
          table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
unknown's avatar
unknown committed
1157 1158 1159 1160 1161
        trg_error= (table->triggers &&
                    table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
                                                      TRG_ACTION_AFTER, TRUE));
        info->copied++;
        goto ok_or_after_trg_err;
1162 1163 1164
      }
      else /* DUP_REPLACE */
      {
unknown's avatar
unknown committed
1165 1166 1167 1168 1169
	/*
	  The manual defines the REPLACE semantics that it is either
	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
	  InnoDB do not function in the defined way if we allow MySQL
	  to convert the latter operation internally to an UPDATE.
1170 1171
          We also should not perform this conversion if we have 
          timestamp field with ON UPDATE which is different from DEFAULT.
1172 1173 1174 1175 1176 1177
          Another case when conversion should not be performed is when
          we have ON DELETE trigger on table so user may notice that
          we cheat here. Note that it is ok to do such conversion for
          tables which have ON UPDATE but have no ON DELETE triggers,
          we just should not expose this fact to users by invoking
          ON UPDATE triggers.
unknown's avatar
unknown committed
1178 1179
	*/
	if (last_uniq_key(table,key_nr) &&
1180
	    !table->file->referenced_by_foreign_key() &&
1181
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
1182 1183
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH) &&
            (!table->triggers || !table->triggers->has_delete_triggers()))
1184
        {
1185 1186
          if ((error=table->file->ha_update_row(table->record[1],
					        table->record[0])))
1187 1188
            goto err;
          info->deleted++;
1189
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1190 1191 1192 1193 1194
          /*
            Since we pretend that we have done insert we should call
            its after triggers.
          */
          goto after_trg_n_copied_inc;
unknown's avatar
unknown committed
1195 1196 1197 1198 1199 1200 1201
        }
        else
        {
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_BEFORE, TRUE))
            goto before_trg_err;
1202
          if ((error=table->file->ha_delete_row(table->record[1])))
unknown's avatar
unknown committed
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
            goto err;
          info->deleted++;
          if (!table->file->has_transactions())
            thd->no_trans_update= 1;
          if (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                                TRG_ACTION_AFTER, TRUE))
          {
            trg_error= 1;
            goto ok_or_after_trg_err;
          }
          /* Let us attempt do write_row() once more */
1215
        }
unknown's avatar
unknown committed
1216 1217
      }
    }
1218
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1219 1220 1221 1222 1223 1224 1225
    /*
      Restore column maps if they where replaced during an duplicate key
      problem.
    */
    if (table->read_set != save_read_set ||
        table->write_set != save_write_set)
      table->column_bitmaps_set(save_read_set, save_write_set);
unknown's avatar
unknown committed
1226
  }
1227
  else if ((error=table->file->ha_write_row(table->record[0])))
unknown's avatar
unknown committed
1228
  {
1229
    if (!info->ignore ||
1230
        table->file->is_fatal_error(error, HA_CHECK_DUP))
unknown's avatar
unknown committed
1231
      goto err;
1232
    table->file->restore_auto_increment(prev_insert_id);
1233
    goto ok_or_after_trg_err;
unknown's avatar
unknown committed
1234
  }
1235 1236 1237

after_trg_n_copied_inc:
  info->copied++;
1238
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1239 1240 1241
  trg_error= (table->triggers &&
              table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
                                                TRG_ACTION_AFTER, TRUE));
unknown's avatar
unknown committed
1242 1243

ok_or_after_trg_err:
unknown's avatar
unknown committed
1244
  if (key)
1245
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
unknown's avatar
unknown committed
1246 1247
  if (!table->file->has_transactions())
    thd->no_trans_update= 1;
unknown's avatar
unknown committed
1248
  DBUG_RETURN(trg_error);
unknown's avatar
unknown committed
1249 1250

err:
1251
  info->last_errno= error;
1252 1253 1254
  /* current_select is NULL if this is a delayed insert */
  if (thd->lex->current_select)
    thd->lex->current_select->no_error= 0;        // Give error
unknown's avatar
unknown committed
1255
  table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
1256 1257

before_trg_err:
1258
  table->file->restore_auto_increment(prev_insert_id);
unknown's avatar
unknown committed
1259 1260
  if (key)
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1261
  table->column_bitmaps_set(save_read_set, save_write_set);
1262
  DBUG_RETURN(1);
unknown's avatar
unknown committed
1263 1264 1265 1266
}


/******************************************************************************
unknown's avatar
unknown committed
1267
  Check that all fields with arn't null_fields are used
unknown's avatar
unknown committed
1268 1269
******************************************************************************/

1270 1271
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
                                           TABLE_LIST *table_list)
unknown's avatar
unknown committed
1272
{
1273
  int err= 0;
1274 1275
  MY_BITMAP *write_set= entry->write_set;

unknown's avatar
unknown committed
1276 1277
  for (Field **field=entry->field ; *field ; field++)
  {
1278
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
1279 1280
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
        ((*field)->real_type() != FIELD_TYPE_ENUM))
unknown's avatar
unknown committed
1281
    {
1282 1283 1284
      bool view= FALSE;
      if (table_list)
      {
unknown's avatar
unknown committed
1285
        table_list= table_list->top_table();
unknown's avatar
unknown committed
1286
        view= test(table_list->view);
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
      }
      if (view)
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_VIEW_FIELD,
                            ER(ER_NO_DEFAULT_FOR_VIEW_FIELD),
                            table_list->view_db.str,
                            table_list->view_name.str);
      }
      else
      {
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                            ER_NO_DEFAULT_FOR_FIELD,
                            ER(ER_NO_DEFAULT_FOR_FIELD),
                            (*field)->field_name);
      }
1303
      err= 1;
unknown's avatar
unknown committed
1304 1305
    }
  }
1306
  return thd->abort_on_warning ? err : 0;
unknown's avatar
unknown committed
1307 1308 1309
}

/*****************************************************************************
unknown's avatar
unknown committed
1310 1311
  Handling of delayed inserts
  A thread is created for each table that one uses with the DELAYED attribute.
unknown's avatar
unknown committed
1312 1313
*****************************************************************************/

1314 1315
#ifndef EMBEDDED_LIBRARY

unknown's avatar
unknown committed
1316 1317
class delayed_row :public ilink {
public:
1318
  char *record;
unknown's avatar
unknown committed
1319 1320
  enum_duplicates dup;
  time_t start_time;
1321 1322 1323
  bool query_start_used, ignore, log_query;
  bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  ulonglong first_successful_insert_id_in_prev_stmt;
1324
  timestamp_auto_set_type timestamp_field_type;
1325
  LEX_STRING query;
unknown's avatar
unknown committed
1326

1327 1328 1329 1330 1331
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
      query(query_arg)
    {}
unknown's avatar
unknown committed
1332 1333
  ~delayed_row()
  {
1334
    x_free(query.str);
unknown's avatar
unknown committed
1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
    x_free(record);
  }
};


class delayed_insert :public ilink {
  uint locks_in_memory;
public:
  THD thd;
  TABLE *table;
  pthread_mutex_t mutex;
  pthread_cond_t cond,cond_client;
  volatile uint tables_in_use,stacked_inserts;
  volatile bool status,dead;
  COPY_INFO info;
  I_List<delayed_row> rows;
1351
  ulong group_count;
unknown's avatar
unknown committed
1352
  TABLE_LIST table_list;			// Argument
unknown's avatar
unknown committed
1353 1354

  delayed_insert()
1355
    :locks_in_memory(0),
unknown's avatar
unknown committed
1356 1357 1358
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
1359 1360
    thd.security_ctx->user=thd.security_ctx->priv_user=(char*) delayed_user;
    thd.security_ctx->host=(char*) my_localhost;
unknown's avatar
unknown committed
1361 1362 1363
    thd.current_tablenr=0;
    thd.version=refresh_version;
    thd.command=COM_DELAYED_INSERT;
1364 1365
    thd.lex->current_select= 0; 		// for my_message_sql
    thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
1366 1367 1368 1369
    /*
      Statement-based replication of INSERT DELAYED has problems with RAND()
      and user vars, so in mixed mode we go to row-based.
    */
1370
    thd.set_current_stmt_binlog_row_based_if_mixed();
unknown's avatar
unknown committed
1371

1372 1373
    bzero((char*) &thd.net, sizeof(thd.net));		// Safety
    bzero((char*) &table_list, sizeof(table_list));	// Safety
1374
    thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
1375
    thd.security_ctx->host_or_ip= "";
unknown's avatar
unknown committed
1376
    bzero((char*) &info,sizeof(info));
1377
    pthread_mutex_init(&mutex,MY_MUTEX_INIT_FAST);
unknown's avatar
unknown committed
1378 1379 1380 1381 1382 1383 1384 1385
    pthread_cond_init(&cond,NULL);
    pthread_cond_init(&cond_client,NULL);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
    delayed_insert_threads++;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
  }
  ~delayed_insert()
  {
1386
    /* The following is not really needed, but just for safety */
unknown's avatar
unknown committed
1387 1388 1389 1390 1391 1392
    delayed_row *row;
    while ((row=rows.get()))
      delete row;
    if (table)
      close_thread_tables(&thd);
    VOID(pthread_mutex_lock(&LOCK_thread_count));
unknown's avatar
unknown committed
1393 1394 1395
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    pthread_cond_destroy(&cond_client);
unknown's avatar
unknown committed
1396 1397
    thd.unlink();				// Must be unlinked under lock
    x_free(thd.query);
1398
    thd.security_ctx->user= thd.security_ctx->host=0;
unknown's avatar
unknown committed
1399 1400 1401
    thread_count--;
    delayed_insert_threads--;
    VOID(pthread_mutex_unlock(&LOCK_thread_count));
1402
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
unknown's avatar
unknown committed
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
    locks_in_memory++;				// Assume LOCK_delay_insert
  }
  void unlock()
  {
    pthread_mutex_lock(&LOCK_delayed_insert);
    if (!--locks_in_memory)
    {
      pthread_mutex_lock(&mutex);
      if (thd.killed && ! stacked_inserts && ! tables_in_use)
      {
	pthread_cond_signal(&cond);
	status=1;
      }
      pthread_mutex_unlock(&mutex);
    }
    pthread_mutex_unlock(&LOCK_delayed_insert);
  }
  inline uint lock_count() { return locks_in_memory; }

  TABLE* get_local_table(THD* client_thd);
  bool handle_inserts(void);
};


I_List<delayed_insert> delayed_threads;


delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
{
  thd->proc_info="waiting for delay_list";
  pthread_mutex_lock(&LOCK_delayed_insert);	// Protect master list
  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1443 1444
    if (!strcmp(tmp->thd.db, table_list->db) &&
	!strcmp(table_list->table_name, tmp->table->s->table_name.str))
unknown's avatar
unknown committed
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
    {
      tmp->lock();
      break;
    }
  }
  pthread_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
  return tmp;
}


static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list)
{
  int error;
  delayed_insert *tmp;
1459
  TABLE *table;
unknown's avatar
unknown committed
1460 1461
  DBUG_ENTER("delayed_get_table");

unknown's avatar
unknown committed
1462 1463
  /* Must be set in the parser */
  DBUG_ASSERT(table_list->db);
unknown's avatar
unknown committed
1464

1465
  /* Find the thread which handles this table. */
unknown's avatar
unknown committed
1466 1467
  if (!(tmp=find_handler(thd,table_list)))
  {
1468 1469 1470 1471
    /*
      No match. Create a new thread to handle the table, but
      no more than max_insert_delayed_threads.
    */
1472
    if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
1473
      DBUG_RETURN(0);
unknown's avatar
unknown committed
1474 1475
    thd->proc_info="Creating delayed handler";
    pthread_mutex_lock(&LOCK_delayed_create);
1476 1477 1478 1479 1480
    /*
      The first search above was done without LOCK_delayed_create.
      Another thread might have created the handler in between. Search again.
    */
    if (! (tmp= find_handler(thd, table_list)))
unknown's avatar
unknown committed
1481 1482 1483 1484
    {
      if (!(tmp=new delayed_insert()))
      {
	my_error(ER_OUTOFMEMORY,MYF(0),sizeof(delayed_insert));
1485
	goto err1;
unknown's avatar
unknown committed
1486
      }
unknown's avatar
unknown committed
1487 1488 1489
      pthread_mutex_lock(&LOCK_thread_count);
      thread_count++;
      pthread_mutex_unlock(&LOCK_thread_count);
unknown's avatar
unknown committed
1490 1491 1492
      tmp->thd.set_db(table_list->db, strlen(table_list->db));
      tmp->thd.query= my_strdup(table_list->table_name,MYF(MY_WME));
      if (tmp->thd.db == NULL || tmp->thd.query == NULL)
unknown's avatar
unknown committed
1493 1494
      {
	delete tmp;
unknown's avatar
unknown committed
1495
	my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1496
	goto err1;
unknown's avatar
unknown committed
1497
      }
unknown's avatar
unknown committed
1498
      tmp->table_list= *table_list;			// Needed to open table
1499
      tmp->table_list.alias= tmp->table_list.table_name= tmp->thd.query;
unknown's avatar
unknown committed
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
      tmp->lock();
      pthread_mutex_lock(&tmp->mutex);
      if ((error=pthread_create(&tmp->thd.real_id,&connection_attrib,
				handle_delayed_insert,(void*) tmp)))
      {
	DBUG_PRINT("error",
		   ("Can't create thread to handle delayed insert (error %d)",
		    error));
	pthread_mutex_unlock(&tmp->mutex);
	tmp->unlock();
	delete tmp;
unknown's avatar
unknown committed
1511
	my_error(ER_CANT_CREATE_THREAD, MYF(0), error);
1512
	goto err1;
unknown's avatar
unknown committed
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524
      }

      /* Wait until table is open */
      thd->proc_info="waiting for handler open";
      while (!tmp->thd.killed && !tmp->table && !thd->killed)
      {
	pthread_cond_wait(&tmp->cond_client,&tmp->mutex);
      }
      pthread_mutex_unlock(&tmp->mutex);
      thd->proc_info="got old table";
      if (tmp->thd.killed)
      {
1525
	if (tmp->thd.is_fatal_error)
unknown's avatar
unknown committed
1526 1527
	{
	  /* Copy error message and abort */
1528
	  thd->fatal_error();
unknown's avatar
unknown committed
1529
	  strmov(thd->net.last_error,tmp->thd.net.last_error);
1530
	  thd->net.last_errno=tmp->thd.net.last_errno;
unknown's avatar
unknown committed
1531 1532
	}
	tmp->unlock();
1533
	goto err;
unknown's avatar
unknown committed
1534 1535 1536 1537
      }
      if (thd->killed)
      {
	tmp->unlock();
1538
	goto err;
unknown's avatar
unknown committed
1539 1540 1541 1542 1543 1544
      }
    }
    pthread_mutex_unlock(&LOCK_delayed_create);
  }

  pthread_mutex_lock(&tmp->mutex);
1545
  table= tmp->get_local_table(thd);
unknown's avatar
unknown committed
1546 1547 1548
  pthread_mutex_unlock(&tmp->mutex);
  if (table)
    thd->di=tmp;
1549 1550
  else if (tmp->thd.is_fatal_error)
    thd->fatal_error();
1551 1552
  /* Unlock the delayed insert object after its last access. */
  tmp->unlock();
unknown's avatar
unknown committed
1553
  DBUG_RETURN((table_list->table=table));
1554 1555 1556 1557 1558 1559

 err1:
  thd->fatal_error();
 err:
  pthread_mutex_unlock(&LOCK_delayed_create);
  DBUG_RETURN(0); // Continue with normal insert
unknown's avatar
unknown committed
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572
}


/*
  As we can't let many threads modify the same TABLE structure, we create
  an own structure for each tread.  This includes a row buffer to save the
  column values and new fields that points to the new row buffer.
  The memory is allocated in the client thread and is freed automaticly.
*/

TABLE *delayed_insert::get_local_table(THD* client_thd)
{
  my_ptrdiff_t adjust_ptrs;
1573
  Field **field,**org_field, *found_next_number_field;
unknown's avatar
unknown committed
1574
  TABLE *copy;
unknown's avatar
unknown committed
1575
  TABLE_SHARE *share= table->s;
1576
  byte *bitmap;
unknown's avatar
unknown committed
1577
  DBUG_ENTER("delayed_insert::get_local_table");
unknown's avatar
unknown committed
1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600

  /* First request insert thread to get a lock */
  status=1;
  tables_in_use++;
  if (!thd.lock)				// Table is not locked
  {
    client_thd->proc_info="waiting for handler lock";
    pthread_cond_signal(&cond);			// Tell handler to lock table
    while (!dead && !thd.lock && ! client_thd->killed)
    {
      pthread_cond_wait(&cond_client,&mutex);
    }
    client_thd->proc_info="got handler lock";
    if (client_thd->killed)
      goto error;
    if (dead)
    {
      strmov(client_thd->net.last_error,thd.net.last_error);
      client_thd->net.last_errno=thd.net.last_errno;
      goto error;
    }
  }

unknown's avatar
unknown committed
1601 1602 1603 1604 1605 1606 1607
  /*
    Allocate memory for the TABLE object, the field pointers array, and
    one record buffer of reclength size. Normally a table has three
    record buffers of rec_buff_length size, which includes alignment
    bytes. Since the table copy is used for creating one record only,
    the other record buffers and alignment are unnecessary.
  */
unknown's avatar
unknown committed
1608
  client_thd->proc_info="allocating local table";
unknown's avatar
unknown committed
1609
  copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
unknown's avatar
unknown committed
1610
				   (share->fields+1)*sizeof(Field**)+
1611 1612
				   share->reclength +
                                   share->column_bitmap_size*2);
unknown's avatar
unknown committed
1613 1614 1615
  if (!copy)
    goto error;

unknown's avatar
unknown committed
1616
  /* Copy the TABLE object. */
unknown's avatar
unknown committed
1617
  *copy= *table;
unknown's avatar
unknown committed
1618
  /* We don't need to change the file handler here */
unknown's avatar
unknown committed
1619 1620
  /* Assign the pointers for the field pointers array and the record. */
  field= copy->field= (Field**) (copy + 1);
1621 1622 1623
  bitmap= (byte*) (field + share->fields + 1);
  copy->record[0]= (bitmap + share->column_bitmap_size * 2);
  memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
unknown's avatar
unknown committed
1624 1625 1626 1627 1628 1629 1630 1631 1632 1633
  /*
    Make a copy of all fields.
    The copied fields need to point into the copied record. This is done
    by copying the field objects with their old pointer values and then
    "move" the pointers by the distance between the original and copied
    records. That way we preserve the relative positions in the records.
  */
  adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
  found_next_number_field= table->found_next_number_field;
  for (org_field= table->field; *org_field; org_field++, field++)
unknown's avatar
unknown committed
1634
  {
unknown's avatar
unknown committed
1635 1636
    if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
      DBUG_RETURN(0);
1637
    (*field)->orig_table= copy;			// Remove connection
unknown's avatar
unknown committed
1638
    (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
1639 1640
    if (*org_field == found_next_number_field)
      (*field)->table->found_next_number_field= *field;
unknown's avatar
unknown committed
1641 1642 1643 1644 1645 1646 1647 1648
  }
  *field=0;

  /* Adjust timestamp */
  if (table->timestamp_field)
  {
    /* Restore offset as this may have been reset in handle_inserts */
    copy->timestamp_field=
unknown's avatar
unknown committed
1649
      (Field_timestamp*) copy->field[share->timestamp_field_offset];
1650
    copy->timestamp_field->unireg_check= table->timestamp_field->unireg_check;
1651
    copy->timestamp_field_type= copy->timestamp_field->get_auto_set_type();
unknown's avatar
unknown committed
1652 1653
  }

1654 1655
  /* Adjust in_use for pointing to client thread */
  copy->in_use= client_thd;
1656 1657 1658 1659

  /* Adjust lock_count. This table object is not part of a lock. */
  copy->lock_count= 0;

1660 1661 1662 1663 1664 1665 1666 1667 1668
  /* Adjust bitmaps */
  copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
  copy->def_write_set.bitmap= ((my_bitmap_map*)
                               (bitmap + share->column_bitmap_size));
  copy->tmp_set.bitmap= 0;                      // To catch errors
  bzero((char*) bitmap, share->column_bitmap_size*2);
  copy->read_set=  &copy->def_read_set;
  copy->write_set= &copy->def_write_set;

unknown's avatar
unknown committed
1669
  DBUG_RETURN(copy);
unknown's avatar
unknown committed
1670 1671 1672 1673 1674 1675

  /* Got fatal error */
 error:
  tables_in_use--;
  status=1;
  pthread_cond_signal(&cond);			// Inform thread about abort
unknown's avatar
unknown committed
1676
  DBUG_RETURN(0);
unknown's avatar
unknown committed
1677 1678 1679 1680 1681
}


/* Put a question in queue */

1682 1683 1684
static int
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
              LEX_STRING query, bool ignore, bool log_on)
unknown's avatar
unknown committed
1685
{
1686
  delayed_row *row;
unknown's avatar
unknown committed
1687 1688
  delayed_insert *di=thd->di;
  DBUG_ENTER("write_delayed");
1689
  DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
unknown's avatar
unknown committed
1690 1691 1692 1693 1694 1695 1696

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
  while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

1697
  if (thd->killed)
unknown's avatar
unknown committed
1698 1699
    goto err;

1700 1701 1702 1703 1704 1705 1706
  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
1707 1708 1709
  {
    char *str;
    if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
1710
      goto err;
1711 1712
    query.str= str;
  }
1713 1714 1715 1716
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
unknown's avatar
unknown committed
1717
    goto err;
1718
  }
unknown's avatar
unknown committed
1719

1720
  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
unknown's avatar
unknown committed
1721
    goto err;
1722
  memcpy(row->record, table->record[0], table->s->reclength);
unknown's avatar
unknown committed
1723 1724
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
  /*
    those are for the binlog: LAST_INSERT_ID() has been evaluated at this
    time, so record does not need it, but statement-based binlogging of the
    INSERT will need when the row is actually inserted.
    As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
  */
  row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
  row->first_successful_insert_id_in_prev_stmt=
    thd->first_successful_insert_id_in_prev_stmt;
1735
  row->timestamp_field_type=    table->timestamp_field_type;
unknown's avatar
unknown committed
1736 1737 1738 1739

  di->rows.push_back(row);
  di->stacked_inserts++;
  di->status=1;
1740
  if (table->s->blob_fields)
unknown's avatar
unknown committed
1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
    unlink_blobs(table);
  pthread_cond_signal(&di->cond);

  thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(0);

 err:
  delete row;
  pthread_mutex_unlock(&di->mutex);
  DBUG_RETURN(1);
}


static void end_delayed_insert(THD *thd)
{
1757
  DBUG_ENTER("end_delayed_insert");
unknown's avatar
unknown committed
1758 1759
  delayed_insert *di=thd->di;
  pthread_mutex_lock(&di->mutex);
1760
  DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
unknown's avatar
unknown committed
1761 1762 1763 1764 1765 1766
  if (!--di->tables_in_use || di->thd.killed)
  {						// Unlock table
    di->status=1;
    pthread_cond_signal(&di->cond);
  }
  pthread_mutex_unlock(&di->mutex);
1767
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780
}


/* We kill all delayed threads when doing flush-tables */

void kill_delayed_threads(void)
{
  VOID(pthread_mutex_lock(&LOCK_delayed_insert)); // For unlink from list

  I_List_iterator<delayed_insert> it(delayed_threads);
  delayed_insert *tmp;
  while ((tmp=it++))
  {
unknown's avatar
unknown committed
1781
    /* Ensure that the thread doesn't kill itself while we are looking at it */
unknown's avatar
unknown committed
1782
    pthread_mutex_lock(&tmp->mutex);
unknown's avatar
SCRUM  
unknown committed
1783
    tmp->thd.killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1784 1785 1786
    if (tmp->thd.mysys_var)
    {
      pthread_mutex_lock(&tmp->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1787
      if (tmp->thd.mysys_var->current_cond)
unknown's avatar
unknown committed
1788
      {
unknown's avatar
unknown committed
1789 1790 1791 1792 1793 1794
	/*
	  We need the following test because the main mutex may be locked
	  in handle_delayed_insert()
	*/
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_lock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1795
	pthread_cond_broadcast(tmp->thd.mysys_var->current_cond);
unknown's avatar
unknown committed
1796 1797
	if (&tmp->mutex != tmp->thd.mysys_var->current_mutex)
	  pthread_mutex_unlock(tmp->thd.mysys_var->current_mutex);
unknown's avatar
unknown committed
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810
      }
      pthread_mutex_unlock(&tmp->thd.mysys_var->mutex);
    }
    pthread_mutex_unlock(&tmp->mutex);
  }
  VOID(pthread_mutex_unlock(&LOCK_delayed_insert)); // For unlink from list
}


/*
 * Create a new delayed insert thread
*/

1811
pthread_handler_t handle_delayed_insert(void *arg)
unknown's avatar
unknown committed
1812 1813 1814 1815 1816 1817 1818 1819
{
  delayed_insert *di=(delayed_insert*) arg;
  THD *thd= &di->thd;

  pthread_detach_this_thread();
  /* Add thread to THD list so that's it's visible in 'show processlist' */
  pthread_mutex_lock(&LOCK_thread_count);
  thd->thread_id=thread_id++;
1820
  thd->end_time();
unknown's avatar
unknown committed
1821
  threads.append(thd);
unknown's avatar
SCRUM  
unknown committed
1822
  thd->killed=abort_loop ? THD::KILL_CONNECTION : THD::NOT_KILLED;
unknown's avatar
unknown committed
1823 1824
  pthread_mutex_unlock(&LOCK_thread_count);

1825 1826 1827 1828 1829 1830 1831 1832
  /*
    Wait until the client runs into pthread_cond_wait(),
    where we free it after the table is opened and di linked in the list.
    If we did not wait here, the client might detect the opened table
    before it is linked to the list. It would release LOCK_delayed_create
    and allow another thread to create another handler for the same table,
    since it does not find one in the list.
  */
unknown's avatar
unknown committed
1833
  pthread_mutex_lock(&di->mutex);
1834
#if !defined( __WIN__) /* Win32 calls this in pthread_create */
unknown's avatar
unknown committed
1835 1836 1837 1838 1839 1840 1841 1842
  if (my_thread_init())
  {
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
    goto end;
  }
#endif

  DBUG_ENTER("handle_delayed_insert");
1843
  thd->thread_stack= (char*) &thd;
unknown's avatar
unknown committed
1844
  if (init_thr_lock() || thd->store_globals())
unknown's avatar
unknown committed
1845
  {
1846
    thd->fatal_error();
unknown's avatar
unknown committed
1847
    strmov(thd->net.last_error,ER(thd->net.last_errno=ER_OUT_OF_RESOURCES));
1848
    goto err;
unknown's avatar
unknown committed
1849
  }
1850
#if !defined(__WIN__) && !defined(__NETWARE__)
unknown's avatar
unknown committed
1851 1852 1853 1854 1855 1856 1857
  sigset_t set;
  VOID(sigemptyset(&set));			// Get mask in use
  VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif

  /* open table */

unknown's avatar
unknown committed
1858
  if (!(di->table=open_ltable(thd,&di->table_list,TL_WRITE_DELAYED)))
unknown's avatar
unknown committed
1859
  {
1860
    thd->fatal_error();				// Abort waiting inserts
1861
    goto err;
unknown's avatar
unknown committed
1862
  }
1863
  if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
unknown's avatar
unknown committed
1864
  {
1865
    thd->fatal_error();
1866
    my_error(ER_ILLEGAL_HA, MYF(0), di->table_list.table_name);
1867
    goto err;
unknown's avatar
unknown committed
1868
  }
unknown's avatar
unknown committed
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
  di->table->copy_blobs=1;

  /* One can now use this */
  pthread_mutex_lock(&LOCK_delayed_insert);
  delayed_threads.append(di);
  pthread_mutex_unlock(&LOCK_delayed_insert);

  /* Tell client that the thread is initialized */
  pthread_cond_signal(&di->cond_client);

  /* Now wait until we get an insert or lock to handle */
  /* We will not abort as long as a client thread uses this thread */

  for (;;)
  {
unknown's avatar
SCRUM  
unknown committed
1884
    if (thd->killed == THD::KILL_CONNECTION)
unknown's avatar
unknown committed
1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
    {
      uint lock_count;
      /*
	Remove this from delay insert list so that no one can request a
	table from this
      */
      pthread_mutex_unlock(&di->mutex);
      pthread_mutex_lock(&LOCK_delayed_insert);
      di->unlink();
      lock_count=di->lock_count();
      pthread_mutex_unlock(&LOCK_delayed_insert);
      pthread_mutex_lock(&di->mutex);
      if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
	break;					// Time to die
    }

    if (!di->status && !di->stacked_inserts)
    {
      struct timespec abstime;
1904
      set_timespec(abstime, delayed_insert_timeout);
unknown's avatar
unknown committed
1905 1906 1907 1908

      /* Information for pthread_kill */
      di->thd.mysys_var->current_mutex= &di->mutex;
      di->thd.mysys_var->current_cond= &di->cond;
1909
      di->thd.proc_info="Waiting for INSERT";
unknown's avatar
unknown committed
1910

1911
      DBUG_PRINT("info",("Waiting for someone to insert rows"));
unknown's avatar
unknown committed
1912
      while (!thd->killed)
unknown's avatar
unknown committed
1913 1914
      {
	int error;
1915
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
unknown's avatar
unknown committed
1916 1917 1918 1919
	error=pthread_cond_wait(&di->cond,&di->mutex);
#else
	error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
#ifdef EXTRA_DEBUG
1920
	if (error && error != EINTR && error != ETIMEDOUT)
unknown's avatar
unknown committed
1921 1922 1923 1924 1925 1926 1927 1928 1929
	{
	  fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
	  DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
			      error));
	}
#endif
#endif
	if (thd->killed || di->status)
	  break;
unknown's avatar
unknown committed
1930
	if (error == ETIMEDOUT || error == ETIME)
unknown's avatar
unknown committed
1931
	{
unknown's avatar
SCRUM  
unknown committed
1932
	  thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1933 1934 1935
	  break;
	}
      }
unknown's avatar
unknown committed
1936 1937
      /* We can't lock di->mutex and mysys_var->mutex at the same time */
      pthread_mutex_unlock(&di->mutex);
unknown's avatar
unknown committed
1938 1939 1940 1941
      pthread_mutex_lock(&di->thd.mysys_var->mutex);
      di->thd.mysys_var->current_mutex= 0;
      di->thd.mysys_var->current_cond= 0;
      pthread_mutex_unlock(&di->thd.mysys_var->mutex);
unknown's avatar
unknown committed
1942
      pthread_mutex_lock(&di->mutex);
unknown's avatar
unknown committed
1943
    }
1944
    di->thd.proc_info=0;
unknown's avatar
unknown committed
1945 1946 1947

    if (di->tables_in_use && ! thd->lock)
    {
1948
      bool not_used;
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
      /*
        Request for new delayed insert.
        Lock the table, but avoid to be blocked by a global read lock.
        If we got here while a global read lock exists, then one or more
        inserts started before the lock was requested. These are allowed
        to complete their work before the server returns control to the
        client which requested the global read lock. The delayed insert
        handler will close the table and finish when the outstanding
        inserts are done.
      */
1959
      if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
1960 1961
                                          MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
                                          &not_used)))
unknown's avatar
unknown committed
1962
      {
1963 1964 1965
	/* Fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1966 1967 1968 1969 1970 1971 1972
      }
      pthread_cond_broadcast(&di->cond_client);
    }
    if (di->stacked_inserts)
    {
      if (di->handle_inserts())
      {
1973 1974 1975
	/* Some fatal error */
	di->dead= 1;
	thd->killed= THD::KILL_CONNECTION;
unknown's avatar
unknown committed
1976 1977 1978 1979 1980
      }
    }
    di->status=0;
    if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
    {
unknown's avatar
unknown committed
1981 1982 1983 1984
      /*
        No one is doing a insert delayed
        Unlock table so that other threads can use it
      */
unknown's avatar
unknown committed
1985 1986 1987
      MYSQL_LOCK *lock=thd->lock;
      thd->lock=0;
      pthread_mutex_unlock(&di->mutex);
1988
      di->table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
1989 1990 1991 1992 1993 1994 1995 1996
      mysql_unlock_tables(thd, lock);
      di->group_count=0;
      pthread_mutex_lock(&di->mutex);
    }
    if (di->tables_in_use)
      pthread_cond_broadcast(&di->cond_client); // If waiting clients
  }

1997 1998 1999 2000 2001 2002 2003 2004 2005 2006
err:
  /*
    mysql_lock_tables() can potentially start a transaction and write
    a table map. In the event of an error, that transaction has to be
    rolled back.  We only need to roll back a potential statement
    transaction, since real transactions are rolled back in
    close_thread_tables().
   */
  ha_rollback_stmt(thd);

unknown's avatar
unknown committed
2007 2008 2009 2010 2011 2012 2013 2014
end:
  /*
    di should be unlinked from the thread handler list and have no active
    clients
  */

  close_thread_tables(thd);			// Free the table
  di->table=0;
2015 2016
  di->dead= 1;                                  // If error
  thd->killed= THD::KILL_CONNECTION;	        // If error
unknown's avatar
unknown committed
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062
  pthread_cond_broadcast(&di->cond_client);	// Safety
  pthread_mutex_unlock(&di->mutex);

  pthread_mutex_lock(&LOCK_delayed_create);	// Because of delayed_get_table
  pthread_mutex_lock(&LOCK_delayed_insert);	
  delete di;
  pthread_mutex_unlock(&LOCK_delayed_insert);
  pthread_mutex_unlock(&LOCK_delayed_create);  

  my_thread_end();
  pthread_exit(0);
  DBUG_RETURN(0);
}


/* Remove pointers from temporary fields to allocated values */

static void unlink_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
      ((Field_blob *) (*ptr))->clear_temporary();
  }
}

/* Free blobs stored in current row */

static void free_delayed_insert_blobs(register TABLE *table)
{
  for (Field **ptr=table->field ; *ptr ; ptr++)
  {
    if ((*ptr)->flags & BLOB_FLAG)
    {
      char *str;
      ((Field_blob *) (*ptr))->get_ptr(&str);
      my_free(str,MYF(MY_ALLOW_ZERO_PTR));
      ((Field_blob *) (*ptr))->reset();
    }
  }
}


bool delayed_insert::handle_inserts(void)
{
  int error;
2063
  ulong max_rows;
2064 2065
  bool using_ignore= 0, using_opt_replace= 0,
       using_bin_log= mysql_bin_log.is_open();
2066
  delayed_row *row;
unknown's avatar
unknown committed
2067 2068 2069 2070 2071 2072
  DBUG_ENTER("handle_inserts");

  /* Allow client to insert new rows */
  pthread_mutex_unlock(&mutex);

  table->next_number_field=table->found_next_number_field;
2073
  table->use_all_columns();
unknown's avatar
unknown committed
2074 2075 2076 2077 2078

  thd.proc_info="upgrading lock";
  if (thr_upgrade_write_delay_lock(*thd.lock->locks))
  {
    /* This can only happen if thread is killed by shutdown */
unknown's avatar
unknown committed
2079
    sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),table->s->table_name.str);
unknown's avatar
unknown committed
2080 2081 2082 2083
    goto err;
  }

  thd.proc_info="insert";
2084
  max_rows= delayed_insert_limit;
2085
  if (thd.killed || table->s->version != refresh_version)
unknown's avatar
unknown committed
2086
  {
unknown's avatar
SCRUM  
unknown committed
2087
    thd.killed= THD::KILL_CONNECTION;
2088
    max_rows= ULONG_MAX;                     // Do as much as possible
unknown's avatar
unknown committed
2089 2090
  }

unknown's avatar
unknown committed
2091 2092 2093 2094 2095 2096 2097
  /*
    We can't use row caching when using the binary log because if
    we get a crash, then binary log will contain rows that are not yet
    written to disk, which will cause problems in replication.
  */
  if (!using_bin_log)
    table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2098
  pthread_mutex_lock(&mutex);
2099

unknown's avatar
unknown committed
2100 2101 2102 2103
  while ((row=rows.get()))
  {
    stacked_inserts--;
    pthread_mutex_unlock(&mutex);
2104
    memcpy(table->record[0],row->record,table->s->reclength);
unknown's avatar
unknown committed
2105 2106 2107

    thd.start_time=row->start_time;
    thd.query_start_used=row->query_start_used;
2108
    /* for the binlog, forget auto_increment ids generated by previous rows */
2109
//    thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
2110 2111 2112 2113
    thd.first_successful_insert_id_in_prev_stmt= 
      row->first_successful_insert_id_in_prev_stmt;
    thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= 
      row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2114
    table->timestamp_field_type= row->timestamp_field_type;
unknown's avatar
unknown committed
2115

2116
    info.ignore= row->ignore;
unknown's avatar
unknown committed
2117
    info.handle_duplicates= row->dup;
2118
    if (info.ignore ||
unknown's avatar
unknown committed
2119
	info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2120 2121 2122 2123
    {
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
      using_ignore=1;
    }
2124 2125 2126 2127 2128 2129 2130
    if (info.handle_duplicates == DUP_REPLACE &&
        (!table->triggers ||
         !table->triggers->has_delete_triggers()))
    {
      table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
      using_opt_replace= 1;
    }
unknown's avatar
unknown committed
2131
    thd.clear_error(); // reset error for binlog
unknown's avatar
unknown committed
2132
    if (write_record(&thd, table, &info))
unknown's avatar
unknown committed
2133
    {
2134
      info.error_count++;				// Ignore errors
2135
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
unknown's avatar
unknown committed
2136
      row->log_query = 0;
unknown's avatar
unknown committed
2137
    }
2138

unknown's avatar
unknown committed
2139 2140 2141 2142 2143
    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }
2144 2145 2146 2147 2148
    if (using_opt_replace)
    {
      using_opt_replace= 0;
      table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
    }
2149 2150

    if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
unknown's avatar
unknown committed
2151 2152 2153 2154 2155 2156 2157 2158 2159
    {
      /*
        If the query has several rows to insert, only the first row will come
        here. In row-based binlogging, this means that the first row will be
        written to binlog as one Table_map event and one Rows event (due to an
        event flush done in binlog_query()), then all other rows of this query
        will be binlogged together as one single Table_map event and one
        single Rows event.
      */
2160 2161 2162
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);
unknown's avatar
unknown committed
2163
    }
2164

2165
    if (table->s->blob_fields)
unknown's avatar
unknown committed
2166 2167 2168 2169 2170 2171
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
    thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
    pthread_mutex_lock(&mutex);

    delete row;
2172 2173 2174 2175 2176 2177 2178
    /*
      Let READ clients do something once in a while
      We should however not break in the middle of a multi-line insert
      if we have binary logging enabled as we don't want other commands
      on this table until all entries has been processed
    */
    if (group_count++ >= max_rows && (row= rows.head()) &&
2179
	(!(row->log_query & using_bin_log)))
unknown's avatar
unknown committed
2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194
    {
      group_count=0;
      if (stacked_inserts || tables_in_use)	// Let these wait a while
      {
	if (tables_in_use)
	  pthread_cond_broadcast(&cond_client); // If waiting clients
	thd.proc_info="reschedule";
	pthread_mutex_unlock(&mutex);
	if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
	{
	  /* This should never happen */
	  table->file->print_error(error,MYF(0));
	  sql_print_error("%s",thd.net.last_error);
	  goto err;
	}
unknown's avatar
unknown committed
2195
	query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2196 2197 2198
	if (thr_reschedule_write_lock(*thd.lock->locks))
	{
	  /* This should never happen */
unknown's avatar
unknown committed
2199 2200
	  sql_print_error(ER(ER_DELAYED_CANT_CHANGE_LOCK),
                          table->s->table_name.str);
unknown's avatar
unknown committed
2201
	}
unknown's avatar
unknown committed
2202 2203
	if (!using_bin_log)
	  table->file->extra(HA_EXTRA_WRITE_CACHE);
unknown's avatar
unknown committed
2204 2205 2206 2207 2208 2209 2210 2211 2212
	pthread_mutex_lock(&mutex);
	thd.proc_info="insert";
      }
      if (tables_in_use)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }
  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);
2213

2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229
#ifdef HAVE_ROW_BASED_REPLICATION
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
#endif /* HAVE_ROW_BASED_REPLICATION */
2230

unknown's avatar
unknown committed
2231 2232 2233 2234 2235 2236
  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen
    table->file->print_error(error,MYF(0));
    sql_print_error("%s",thd.net.last_error);
    goto err;
  }
unknown's avatar
unknown committed
2237
  query_cache_invalidate3(&thd, table, 1);
unknown's avatar
unknown committed
2238 2239 2240 2241
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(0);

 err:
2242 2243 2244 2245 2246 2247 2248
  /* Remove all not used rows */
  while ((row=rows.get()))
  {
    delete row;
    thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
    stacked_inserts--;
  }
unknown's avatar
unknown committed
2249 2250 2251 2252
  thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
  pthread_mutex_lock(&mutex);
  DBUG_RETURN(1);
}
2253
#endif /* EMBEDDED_LIBRARY */
unknown's avatar
unknown committed
2254 2255

/***************************************************************************
unknown's avatar
unknown committed
2256
  Store records in INSERT ... SELECT *
unknown's avatar
unknown committed
2257 2258
***************************************************************************/

unknown's avatar
VIEW  
unknown committed
2259 2260 2261 2262 2263 2264 2265 2266 2267

/*
  make insert specific preparation and checks after opening tables

  SYNOPSIS
    mysql_insert_select_prepare()
    thd         thread handler

  RETURN
unknown's avatar
unknown committed
2268 2269
    FALSE OK
    TRUE  Error
unknown's avatar
VIEW  
unknown committed
2270 2271
*/

unknown's avatar
unknown committed
2272
bool mysql_insert_select_prepare(THD *thd)
unknown's avatar
VIEW  
unknown committed
2273 2274
{
  LEX *lex= thd->lex;
unknown's avatar
unknown committed
2275
  SELECT_LEX *select_lex= &lex->select_lex;
unknown's avatar
unknown committed
2276
  TABLE_LIST *first_select_leaf_table;
unknown's avatar
VIEW  
unknown committed
2277
  DBUG_ENTER("mysql_insert_select_prepare");
unknown's avatar
unknown committed
2278

unknown's avatar
unknown committed
2279 2280
  /*
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
unknown's avatar
unknown committed
2281
    clause if table is VIEW
unknown's avatar
unknown committed
2282
  */
unknown's avatar
unknown committed
2283
  
unknown's avatar
unknown committed
2284 2285 2286 2287
  if (mysql_prepare_insert(thd, lex->query_tables,
                           lex->query_tables->table, lex->field_list, 0,
                           lex->update_list, lex->value_list,
                           lex->duplicates,
unknown's avatar
unknown committed
2288
                           &select_lex->where, TRUE))
unknown's avatar
unknown committed
2289
    DBUG_RETURN(TRUE);
unknown's avatar
unknown committed
2290

2291 2292 2293 2294
  /*
    exclude first table from leaf tables list, because it belong to
    INSERT
  */
unknown's avatar
unknown committed
2295 2296
  DBUG_ASSERT(select_lex->leaf_tables != 0);
  lex->leaf_tables_insert= select_lex->leaf_tables;
2297
  /* skip all leaf tables belonged to view where we are insert */
unknown's avatar
unknown committed
2298
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
2299 2300 2301 2302 2303 2304
       first_select_leaf_table &&
       first_select_leaf_table->belong_to_view &&
       first_select_leaf_table->belong_to_view ==
       lex->leaf_tables_insert->belong_to_view;
       first_select_leaf_table= first_select_leaf_table->next_leaf)
  {}
unknown's avatar
unknown committed
2305
  select_lex->leaf_tables= first_select_leaf_table;
unknown's avatar
unknown committed
2306
  DBUG_RETURN(FALSE);
unknown's avatar
VIEW  
unknown committed
2307 2308 2309
}


2310
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
unknown's avatar
unknown committed
2311
                             List<Item> *fields_par,
unknown's avatar
unknown committed
2312 2313
                             List<Item> *update_fields,
                             List<Item> *update_values,
unknown's avatar
unknown committed
2314
                             enum_duplicates duplic,
2315 2316
                             bool ignore_check_option_errors)
  :table_list(table_list_par), table(table_par), fields(fields_par),
2317
   autoinc_value_of_last_inserted_row(0),
2318 2319 2320
   insert_into_view(table_list_par && table_list_par->view != 0)
{
  bzero((char*) &info,sizeof(info));
unknown's avatar
unknown committed
2321 2322 2323 2324
  info.handle_duplicates= duplic;
  info.ignore= ignore_check_option_errors;
  info.update_fields= update_fields;
  info.update_values= update_values;
2325
  if (table_list_par)
unknown's avatar
unknown committed
2326
    info.view= (table_list_par->view ? table_list_par : 0);
2327 2328 2329
}


unknown's avatar
unknown committed
2330
int
2331
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
unknown's avatar
unknown committed
2332
{
2333
  LEX *lex= thd->lex;
2334
  int res;
2335
  SELECT_LEX *lex_current_select_save= lex->current_select;
unknown's avatar
unknown committed
2336 2337
  DBUG_ENTER("select_insert::prepare");

2338
  unit= u;
2339 2340 2341 2342 2343 2344
  /*
    Since table in which we are going to insert is added to the first
    select, LEX::current_select should point to the first select while
    we are fixing fields from insert list.
  */
  lex->current_select= &lex->select_lex;
2345
  res= check_insert_fields(thd, table_list, *fields, values,
2346
                           !insert_into_view) ||
2347
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
2348

2349 2350
  if (info.handle_duplicates == DUP_UPDATE)
  {
2351 2352
    /* Save the state of the current name resolution context. */
    Name_resolution_context *context= &lex->select_lex.context;
2353 2354 2355 2356
    Name_resolution_context_state ctx_state;

    /* Save the state of the current name resolution context. */
    ctx_state.save_state(context, table_list);
2357 2358

    /* Perform name resolution only in the first table - 'table_list'. */
2359
    table_list->next_local= 0;
2360 2361
    context->resolve_in_table_list_only(table_list);

2362
    lex->select_lex.no_wrap_view_item= TRUE;
2363 2364
    res= res || check_update_fields(thd, context->table_list,
                                    *info.update_fields);
2365 2366 2367 2368 2369
    lex->select_lex.no_wrap_view_item= FALSE;
    /*
      When we are not using GROUP BY we can refer to other tables in the
      ON DUPLICATE KEY part
    */       
2370 2371
    if (lex->select_lex.group_list.elements == 0)
    {
2372
      context->table_list->next_local=       ctx_state.save_next_local;
2373
      /* first_name_resolution_table was set by resolve_in_table_list_only() */
2374
      context->first_name_resolution_table->
2375
        next_name_resolution_table=          ctx_state.save_next_local;
2376
    }
2377 2378
    res= res || setup_fields(thd, 0, *info.update_values, MARK_COLUMNS_READ,
                             0, 0);
2379 2380

    /* Restore the current context. */
2381
    ctx_state.restore_state(context, table_list);
2382
  }
2383

2384 2385
  lex->current_select= lex_current_select_save;
  if (res)
unknown's avatar
unknown committed
2386
    DBUG_RETURN(1);
2387 2388 2389 2390 2391 2392 2393 2394 2395 2396
  /*
    if it is INSERT into join view then check_insert_fields already found
    real table for insert
  */
  table= table_list->table;

  /*
    Is table which we are changing used somewhere in other parts of
    query
  */
2397
  if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
2398
      unique_table(thd, table_list, table_list->next_global))
2399 2400
  {
    /* Using same table for INSERT and SELECT */
2401 2402
    lex->current_select->options|= OPTION_BUFFER_RESULT;
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
2403
  }
2404
  else if (!thd->prelocked_mode)
2405 2406 2407 2408 2409 2410 2411
  {
    /*
      We must not yet prepare the result table if it is the same as one of the 
      source tables (INSERT SELECT). The preparation may disable 
      indexes on the result table, which may be used during the select, if it
      is the same table (Bug #6034). Do the preparation after the select phase
      in select_insert::prepare2().
2412 2413
      We won't start bulk inserts at all if this statement uses functions or
      should invoke triggers since they may access to the same table too.
2414
    */
2415
    table->file->ha_start_bulk_insert((ha_rows) 0);
2416
  }
2417
  restore_record(table,s->default_values);		// Get empty record
unknown's avatar
unknown committed
2418
  table->next_number_field=table->found_next_number_field;
unknown's avatar
unknown committed
2419
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2420 2421
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2422 2423 2424
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
unknown's avatar
unknown committed
2425
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2426
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2427 2428 2429
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2430 2431 2432 2433
  res= ((fields->elements &&
         check_that_all_fields_are_given_values(thd, table, table_list)) ||
        table_list->prepare_where(thd, 0, TRUE) ||
        table_list->prepare_check_option(thd));
2434 2435

  if (!res)
2436 2437
    table->mark_columns_needed_for_insert();

2438
  DBUG_RETURN(res);
unknown's avatar
unknown committed
2439 2440
}

2441

2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460
/*
  Finish the preparation of the result table.

  SYNOPSIS
    select_insert::prepare2()
    void

  DESCRIPTION
    If the result table is the same as one of the source tables (INSERT SELECT),
    the result table is not finally prepared at the join prepair phase.
    Do the final preparation now.
		       
  RETURN
    0   OK
*/

int select_insert::prepare2(void)
{
  DBUG_ENTER("select_insert::prepare2");
2461 2462
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
      !thd->prelocked_mode)
2463
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2464
  DBUG_RETURN(0);
2465 2466 2467
}


2468 2469 2470 2471 2472 2473
void select_insert::cleanup()
{
  /* select_insert/select_create are never re-used in prepared statement */
  DBUG_ASSERT(0);
}

unknown's avatar
unknown committed
2474 2475
select_insert::~select_insert()
{
2476
  DBUG_ENTER("~select_insert");
unknown's avatar
unknown committed
2477 2478 2479
  if (table)
  {
    table->next_number_field=0;
2480
    table->file->ha_reset();
unknown's avatar
unknown committed
2481
  }
2482
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2483
  thd->abort_on_warning= 0;
2484
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2485 2486 2487 2488 2489
}


bool select_insert::send_data(List<Item> &values)
{
2490
  DBUG_ENTER("select_insert::send_data");
unknown's avatar
unknown committed
2491
  bool error=0;
2492

2493
  if (unit->offset_limit_cnt)
unknown's avatar
unknown committed
2494
  {						// using limit offset,count
2495
    unit->offset_limit_cnt--;
2496
    DBUG_RETURN(0);
unknown's avatar
unknown committed
2497
  }
unknown's avatar
unknown committed
2498

unknown's avatar
unknown committed
2499
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
unknown's avatar
unknown committed
2500 2501
  store_values(values);
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
unknown's avatar
unknown committed
2502 2503
  if (thd->net.report_error)
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2504 2505
  if (table_list)                               // Not CREATE ... SELECT
  {
unknown's avatar
unknown committed
2506
    switch (table_list->view_check_option(thd, info.ignore)) {
unknown's avatar
unknown committed
2507 2508 2509 2510 2511
    case VIEW_CHECK_SKIP:
      DBUG_RETURN(0);
    case VIEW_CHECK_ERROR:
      DBUG_RETURN(1);
    }
unknown's avatar
unknown committed
2512
  }
2513

2514
  error= write_record(thd, table, &info);
2515 2516
    
  if (!error)
unknown's avatar
unknown committed
2517
  {
unknown's avatar
unknown committed
2518
    if (table->triggers || info.handle_duplicates == DUP_UPDATE)
unknown's avatar
unknown committed
2519 2520
    {
      /*
unknown's avatar
unknown committed
2521 2522 2523
        Restore fields of the record since it is possible that they were
        changed by ON DUPLICATE KEY UPDATE clause.
    
unknown's avatar
unknown committed
2524 2525 2526 2527 2528 2529 2530 2531
        If triggers exist then whey can modify some fields which were not
        originally touched by INSERT ... SELECT, so we have to restore
        their original values for the next row.
      */
      restore_record(table, s->default_values);
    }
    if (table->next_number_field)
    {
2532 2533 2534 2535 2536 2537 2538
      /*
        If no value has been autogenerated so far, we need to remember the
        value we just saw, we may need to send it to client in the end.
      */
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
        autoinc_value_of_last_inserted_row= 
          table->next_number_field->val_int();
unknown's avatar
unknown committed
2539 2540 2541 2542 2543 2544
      /*
        Clear auto-increment field for the next record, if triggers are used
        we will clear it twice, but this should be cheap.
      */
      table->next_number_field->reset();
    }
unknown's avatar
unknown committed
2545
  }
2546
  table->file->ha_release_auto_increment();
unknown's avatar
unknown committed
2547
  DBUG_RETURN(error);
unknown's avatar
unknown committed
2548 2549 2550
}


unknown's avatar
unknown committed
2551 2552 2553
void select_insert::store_values(List<Item> &values)
{
  if (fields->elements)
unknown's avatar
unknown committed
2554 2555
    fill_record_n_invoke_before_triggers(thd, *fields, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2556
  else
unknown's avatar
unknown committed
2557 2558
    fill_record_n_invoke_before_triggers(thd, table->field, values, 1,
                                         table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2559 2560
}

unknown's avatar
unknown committed
2561 2562
void select_insert::send_error(uint errcode,const char *err)
{
unknown's avatar
unknown committed
2563 2564
  DBUG_ENTER("select_insert::send_error");

2565 2566 2567
  /* Avoid an extra 'unknown error' message if we already reported an error */
  if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error)
    my_message(errcode, err, MYF(0));
2568 2569 2570 2571 2572 2573 2574 2575 2576

  if (!table)
  {
    /*
      This can only happen when using CREATE ... SELECT and the table was not
      created becasue of an syntax error
    */
    DBUG_VOID_RETURN;
  }
2577
  if (!thd->prelocked_mode)
2578
    table->file->ha_end_bulk_insert();
unknown's avatar
unknown committed
2579 2580
  /*
    If at least one row has been inserted/modified and will stay in the table
2581
    (the table doesn't have transactions) we must write to the binlog (and
unknown's avatar
unknown committed
2582
    the error code will make the slave stop).
2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594

    For many errors (example: we got a duplicate key error while
    inserting into a MyISAM table), no row will be added to the table,
    so passing the error to the slave will not help since there will
    be an error code mismatch (the inserts will succeed on the slave
    with no error).

    If we are using row-based replication we have two cases where this
    code is executed: replication of CREATE-SELECT and replication of
    INSERT-SELECT.

    When replicating a CREATE-SELECT statement, we shall not write the
2595 2596
    events to the binary log and should thus not set
    OPTION_STATUS_NO_TRANS_UPDATE.
2597 2598 2599 2600 2601 2602

    When replicating INSERT-SELECT, we shall not write the events to
    the binary log for transactional table, but shall write all events
    if there is one or more writes to non-transactional tables. In
    this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a
    write to a non-transactional table, otherwise it is cleared.
2603
  */
2604
  if (info.copied || info.deleted || info.updated)
2605
  {
2606
    if (!table->file->has_transactions())
2607
    {
2608 2609 2610 2611 2612 2613 2614 2615 2616
      if (mysql_bin_log.is_open())
      {
        thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
                          table->file->has_transactions(), FALSE);
      }
      if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table &&
          !can_rollback_data())
        thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
      query_cache_invalidate3(thd, table, 1);
2617
    }
unknown's avatar
unknown committed
2618
  }
unknown's avatar
unknown committed
2619
  ha_rollback_stmt(thd);
unknown's avatar
unknown committed
2620
  DBUG_VOID_RETURN;
unknown's avatar
unknown committed
2621 2622 2623 2624 2625
}


bool select_insert::send_eof()
{
2626
  int error,error2;
2627
  ulonglong id;
unknown's avatar
unknown committed
2628 2629
  DBUG_ENTER("select_insert::send_eof");

2630
  error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0;
unknown's avatar
unknown committed
2631
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2632
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2633

unknown's avatar
unknown committed
2634
  if (info.copied || info.deleted || info.updated)
2635 2636 2637 2638 2639
  {
    /*
      We must invalidate the table in the query cache before binlog writing
      and ha_autocommit_or_rollback.
    */
unknown's avatar
unknown committed
2640
    query_cache_invalidate3(thd, table, 1);
2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652
    /*
      Mark that we have done permanent changes if all of the below is true
      - Table doesn't support transactions
      - It's a normal (not temporary) table. (Changes to temporary tables
        are not logged in RBR)
      - We are using statement based replication
    */
    if (!table->file->has_transactions() &&
        (!table->s->tmp_table ||
         !thd->current_stmt_binlog_row_based))
      thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
   }
unknown's avatar
unknown committed
2653

2654 2655 2656 2657 2658 2659
  /*
    Write to binlog before commiting transaction.  No statement will
    be written by the binlog_query() below in RBR mode.  All the
    events are in the transaction cache and will be written when
    ha_autocommit_or_rollback() is issued below.
  */
2660 2661
  if (mysql_bin_log.is_open())
  {
unknown's avatar
unknown committed
2662 2663
    if (!error)
      thd->clear_error();
2664 2665 2666
    thd->binlog_query(THD::ROW_QUERY_TYPE,
                      thd->query, thd->query_length,
                      table->file->has_transactions(), FALSE);
2667
  }
2668 2669 2670
  if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
    error=error2;
  if (error)
unknown's avatar
unknown committed
2671 2672
  {
    table->file->print_error(error,MYF(0));
unknown's avatar
unknown committed
2673
    DBUG_RETURN(1);
unknown's avatar
unknown committed
2674
  }
unknown's avatar
unknown committed
2675
  char buff[160];
2676
  if (info.ignore)
unknown's avatar
unknown committed
2677 2678
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
	    (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
unknown's avatar
unknown committed
2679
  else
unknown's avatar
unknown committed
2680
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
unknown's avatar
unknown committed
2681
	    (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
2682
  thd->row_count_func= info.copied+info.deleted+info.updated;
2683 2684 2685 2686 2687 2688 2689

  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
  ::send_ok(thd, (ulong) thd->row_count_func, id, buff);
unknown's avatar
unknown committed
2690
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2691 2692 2693 2694
}


/***************************************************************************
unknown's avatar
unknown committed
2695
  CREATE TABLE (SELECT) ...
unknown's avatar
unknown committed
2696 2697
***************************************************************************/

2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717
/*
  Create table from lists of fields and items (or open existing table
  with same name).

  SYNOPSIS
    create_table_from_items()
      thd          in     Thread object
      create_info  in     Create information (like MAX_ROWS, ENGINE or
                          temporary table flag)
      create_table in     Pointer to TABLE_LIST object providing database
                          and name for table to be created or to be open
      extra_fields in/out Initial list of fields for table to be created
      keys         in     List of keys for table to be created
      items        in     List of items which should be used to produce rest
                          of fields for the table (corresponding fields will
                          be added to the end of 'extra_fields' list)
      lock         out    Pointer to the MYSQL_LOCK object for table created
                          (open) will be returned in this parameter. Since
                          this table is not included in THD::lock caller is
                          responsible for explicitly unlocking this table.
unknown's avatar
unknown committed
2718
      hooks
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736

  NOTES
    If 'create_info->options' bitmask has HA_LEX_CREATE_IF_NOT_EXISTS
    flag and table with name provided already exists then this function will
    simply open existing table.
    Also note that create, open and lock sequence in this function is not
    atomic and thus contains gap for deadlock and can cause other troubles.
    Since this function contains some logic specific to CREATE TABLE ... SELECT
    it should be changed before it can be used in other contexts.

  RETURN VALUES
    non-zero  Pointer to TABLE object for table created or opened
    0         Error
*/

static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
                                      TABLE_LIST *create_table,
                                      List<create_field> *extra_fields,
unknown's avatar
unknown committed
2737 2738 2739 2740
                                      List<Key> *keys,
                                      List<Item> *items,
                                      MYSQL_LOCK **lock,
                                      TABLEOP_HOOKS *hooks)
2741 2742
{
  TABLE tmp_table;		// Used during 'create_field()'
unknown's avatar
unknown committed
2743
  TABLE_SHARE share;
2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
  TABLE *table= 0;
  uint select_field_count= items->elements;
  /* Add selected items to field list */
  List_iterator_fast<Item> it(*items);
  Item *item;
  Field *tmp_field;
  bool not_used;
  DBUG_ENTER("create_table_from_items");

  tmp_table.alias= 0;
  tmp_table.timestamp_field= 0;
unknown's avatar
unknown committed
2755 2756 2757
  tmp_table.s= &share;
  init_tmp_table_share(&share, "", 0, "", "");

2758 2759
  tmp_table.s->db_create_options=0;
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
unknown's avatar
unknown committed
2760 2761 2762
  tmp_table.s->db_low_byte_first= 
        test(create_info->db_type == &myisam_hton ||
             create_info->db_type == &heap_hton);
2763 2764 2765 2766 2767
  tmp_table.null_row=tmp_table.maybe_null=0;

  while ((item=it++))
  {
    create_field *cr_field;
2768
    Field *field, *def_field;
2769
    if (item->type() == Item::FUNC_ITEM)
2770
      field= item->tmp_table_field(&tmp_table);
2771
    else
2772 2773 2774
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
                              0);
2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794
    if (!field ||
	!(cr_field=new create_field(field,(item->type() == Item::FIELD_ITEM ?
					   ((Item_field *)item)->field :
					   (Field*) 0))))
      DBUG_RETURN(0);
    if (item->maybe_null)
      cr_field->flags &= ~NOT_NULL_FLAG;
    extra_fields->push_back(cr_field);
  }
  /*
    create and lock table

    We don't log the statement, it will be logged later.

    If this is a HEAP table, the automatic DELETE FROM which is written to the
    binlog when a HEAP table is opened for the first time since startup, must
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
    don't want to delete from it) 2) it would be written before the CREATE
    TABLE, which is a wrong order. So we keep binary logging disabled when we
    open_table().
2795 2796 2797
    NOTE: By locking table which we just have created (or for which we just
    have have found that it already exists) separately from other tables used
    by the statement we create potential window for deadlock.
2798 2799 2800 2801 2802 2803
    TODO: create and open should be done atomic !
  */
  {
    tmp_disable_binlog(thd);
    if (!mysql_create_table(thd, create_table->db, create_table->table_name,
                            create_info, *extra_fields, *keys, 0,
2804
                            select_field_count, 0))
2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818
    {
      /*
        If we are here in prelocked mode we either create temporary table
        or prelocked mode is caused by the SELECT part of this statement.
      */
      DBUG_ASSERT(!thd->prelocked_mode ||
                  create_info->options & HA_LEX_CREATE_TMP_TABLE ||
                  thd->lex->requires_prelocking());

      /*
        NOTE: We don't want to ignore set of locked tables here if we are
              under explicit LOCK TABLES since it will open gap for deadlock
              too wide (and also is not backward compatible).
      */
unknown's avatar
unknown committed
2819

2820 2821 2822 2823 2824
      if (! (table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
                               (MYSQL_LOCK_IGNORE_FLUSH |
                                ((thd->prelocked_mode == PRELOCKED) ?
                                 MYSQL_OPEN_IGNORE_LOCKED_TABLES:0)))))
        quick_rm_table(create_info->db_type, create_table->db,
2825 2826
                       table_case_name(create_info, create_table->table_name),
                       0);
2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839
    }
    reenable_binlog(thd);
    if (!table)                                   // open failed
      DBUG_RETURN(0);
  }

  /*
    FIXME: What happens if trigger manages to be created while we are
           obtaining this lock ? May be it is sensible just to disable
           trigger execution in this case ? Or will MYSQL_LOCK_IGNORE_FLUSH
           save us from that ?
  */
  table->reginfo.lock_type=TL_WRITE;
unknown's avatar
unknown committed
2840
  hooks->prelock(&table, 1);                    // Call prelock hooks
2841 2842 2843 2844 2845 2846 2847
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)))
  {
    VOID(pthread_mutex_lock(&LOCK_open));
    hash_delete(&open_cache,(byte*) table);
    VOID(pthread_mutex_unlock(&LOCK_open));
    quick_rm_table(create_info->db_type, create_table->db,
2848
		   table_case_name(create_info, create_table->table_name), 0);
2849 2850 2851 2852 2853 2854 2855
    DBUG_RETURN(0);
  }
  table->file->extra(HA_EXTRA_WRITE_CACHE);
  DBUG_RETURN(table);
}


unknown's avatar
unknown committed
2856
int
2857
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2858 2859
{
  DBUG_ENTER("select_create::prepare");
2860

2861 2862
  TABLEOP_HOOKS *hook_ptr= NULL;
#ifdef HAVE_ROW_BASED_REPLICATION
2863 2864 2865 2866 2867
  class MY_HOOKS : public TABLEOP_HOOKS {
  public:
    MY_HOOKS(select_create *x) : ptr(x) { }
    virtual void do_prelock(TABLE **tables, uint count)
    {
unknown's avatar
unknown committed
2868 2869 2870
    if (ptr->get_thd()->current_stmt_binlog_row_based  &&
        !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE))
      ptr->binlog_show_create_table(tables, count);
2871 2872 2873 2874 2875 2876 2877
    }

  private:
    select_create *ptr;
  };

  MY_HOOKS hooks(this);
2878 2879
  hook_ptr= &hooks;
#endif
2880

2881
  unit= u;
2882
  if (!(table= create_table_from_items(thd, create_info, create_table,
2883 2884
                                       extra_fields, keys, &values,
                                       &thd->extra_lock, hook_ptr)))
unknown's avatar
unknown committed
2885 2886
    DBUG_RETURN(-1);				// abort() deletes table

2887
  if (table->s->fields < values.elements)
unknown's avatar
unknown committed
2888
  {
2889
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
unknown's avatar
unknown committed
2890 2891 2892
    DBUG_RETURN(-1);
  }

2893
 /* First field to copy */
unknown's avatar
unknown committed
2894 2895 2896 2897
  field= table->field+table->s->fields - values.elements;

  /* Mark all fields that are given values */
  for (Field **f= field ; *f ; f++)
2898
    bitmap_set_bit(table->write_set, (*f)->field_index);
unknown's avatar
unknown committed
2899

2900
  /* Don't set timestamp if used */
2901
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
unknown's avatar
unknown committed
2902 2903
  table->next_number_field=table->found_next_number_field;

2904
  restore_record(table,s->default_values);      // Get empty record
unknown's avatar
unknown committed
2905
  thd->cuted_fields=0;
unknown's avatar
unknown committed
2906
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
unknown's avatar
unknown committed
2907
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2908 2909 2910
  if (info.handle_duplicates == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2911
  if (!thd->prelocked_mode)
2912
    table->file->ha_start_bulk_insert((ha_rows) 0);
unknown's avatar
unknown committed
2913
  thd->no_trans_update= 0;
unknown's avatar
unknown committed
2914
  thd->abort_on_warning= (!info.ignore &&
unknown's avatar
unknown committed
2915 2916 2917
                          (thd->variables.sql_mode &
                           (MODE_STRICT_TRANS_TABLES |
                            MODE_STRICT_ALL_TABLES)));
2918 2919 2920 2921
  if (check_that_all_fields_are_given_values(thd, table, table_list))
    DBUG_RETURN(1);
  table->mark_columns_needed_for_insert();
  DBUG_RETURN(0);
unknown's avatar
unknown committed
2922 2923 2924
}


2925
#ifdef HAVE_ROW_BASED_REPLICATION
2926
void
2927
select_create::binlog_show_create_table(TABLE **tables, uint count)
2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944
{
  /*
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
    created table by calling store_create_info() (behaves as SHOW
    CREATE TABLE).  In the event of an error, nothing should be
    written to the binary log, even if the table is non-transactional;
    therefore we pretend that the generated CREATE TABLE statement is
    for a transactional table.  The event will then be put in the
    transaction cache, and any subsequent events (e.g., table-map
    events and binrow events) will also be put there.  We can then use
    ha_autocommit_or_rollback() to either throw away the entire
    kaboodle of events, or write them to the binary log.

    We write the CREATE TABLE statement here and not in prepare()
    since there potentially are sub-selects or accesses to information
    schema that will do a close_thread_tables(), destroying the
    statement transaction cache.
2945
  */
2946
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
2947
  DBUG_ASSERT(tables && *tables && count > 0);
2948 2949 2950

  char buf[2048];
  String query(buf, sizeof(buf), system_charset_info);
2951
  int result;
2952
  TABLE_LIST table_list;
2953

2954 2955
  memset(&table_list, 0, sizeof(table_list));
  table_list.table = *tables;
2956
  query.length(0);      // Have to zero it since constructor doesn't
2957

2958
  result= store_create_info(thd, &table_list, &query, create_info);
2959
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
2960

2961 2962 2963 2964 2965
  thd->binlog_query(THD::STMT_QUERY_TYPE,
                    query.ptr(), query.length(),
                    /* is_trans */ TRUE,
                    /* suppress_use */ FALSE);
}
2966
#endif // HAVE_ROW_BASED_REPLICATION
2967

unknown's avatar
unknown committed
2968
void select_create::store_values(List<Item> &values)
unknown's avatar
unknown committed
2969
{
unknown's avatar
unknown committed
2970 2971
  fill_record_n_invoke_before_triggers(thd, field, values, 1,
                                       table->triggers, TRG_EVENT_INSERT);
unknown's avatar
unknown committed
2972 2973
}

unknown's avatar
unknown committed
2974

2975 2976 2977 2978 2979 2980 2981 2982 2983
void select_create::send_error(uint errcode,const char *err)
{
  /*
   Disable binlog, because we "roll back" partial inserts in ::abort
   by removing the table, even for non-transactional tables.
  */
  tmp_disable_binlog(thd);
  select_insert::send_error(errcode, err);
  reenable_binlog(thd);
unknown's avatar
unknown committed
2984 2985
}

unknown's avatar
unknown committed
2986

unknown's avatar
unknown committed
2987 2988 2989 2990 2991 2992 2993
bool select_create::send_eof()
{
  bool tmp=select_insert::send_eof();
  if (tmp)
    abort();
  else
  {
unknown's avatar
unknown committed
2994
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2995
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
2996
    VOID(pthread_mutex_lock(&LOCK_open));
2997
    mysql_unlock_tables(thd, thd->extra_lock);
2998
    if (!table->s->tmp_table)
2999
    {
3000
      if (close_thread_table(thd, &table))
3001
        broadcast_refresh();
3002
    }
3003
    thd->extra_lock=0;
unknown's avatar
unknown committed
3004
    table=0;
unknown's avatar
unknown committed
3005 3006 3007 3008 3009 3010 3011 3012
    VOID(pthread_mutex_unlock(&LOCK_open));
  }
  return tmp;
}

void select_create::abort()
{
  VOID(pthread_mutex_lock(&LOCK_open));
3013
  if (thd->extra_lock)
unknown's avatar
unknown committed
3014
  {
3015 3016
    mysql_unlock_tables(thd, thd->extra_lock);
    thd->extra_lock=0;
unknown's avatar
unknown committed
3017 3018 3019
  }
  if (table)
  {
unknown's avatar
unknown committed
3020
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3021
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
unknown's avatar
unknown committed
3022
    handlerton *table_type=table->s->db_type;
3023
    if (!table->s->tmp_table)
3024
    {
3025
      ulong version= table->s->version;
unknown's avatar
unknown committed
3026
      table->s->version= 0;
3027
      hash_delete(&open_cache,(byte*) table);
3028
      if (!create_info->table_existed)
3029 3030
        quick_rm_table(table_type, create_table->db,
                       create_table->table_name, 0);
3031
      /* Tell threads waiting for refresh that something has happened */
unknown's avatar
unknown committed
3032
      if (version != refresh_version)
3033
        broadcast_refresh();
3034 3035
    }
    else if (!create_info->table_existed)
unknown's avatar
unknown committed
3036 3037
      close_temporary_table(thd, table, 1, 1);
    table=0;                                    // Safety
unknown's avatar
unknown committed
3038 3039 3040 3041 3042 3043
  }
  VOID(pthread_mutex_unlock(&LOCK_open));
}


/*****************************************************************************
unknown's avatar
unknown committed
3044
  Instansiate templates
unknown's avatar
unknown committed
3045 3046
*****************************************************************************/

3047
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
unknown's avatar
unknown committed
3048
template class List_iterator_fast<List_item>;
3049
#ifndef EMBEDDED_LIBRARY
unknown's avatar
unknown committed
3050 3051 3052
template class I_List<delayed_insert>;
template class I_List_iterator<delayed_insert>;
template class I_List<delayed_row>;
3053
#endif /* EMBEDDED_LIBRARY */
3054
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */