event_queue.cc 27.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/* Copyright (C) 2004-2006 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "mysql_priv.h"
#include "event_queue.h"
#include "event_data_objects.h"
20
#include "event_db_repository.h"
21

22

23 24
#define EVENT_QUEUE_INITIAL_SIZE 30
#define EVENT_QUEUE_EXTENT       30
25 26 27 28 29 30 31 32 33 34 35 36

#ifdef __GNUC__
#if __GNUC__ >= 2
#define SCHED_FUNC __FUNCTION__
#endif
#else
#define SCHED_FUNC "<unknown>"
#endif

#define LOCK_QUEUE_DATA()   lock_data(SCHED_FUNC, __LINE__)
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)

unknown's avatar
unknown committed
37 38 39 40 41 42
struct event_queue_param
{
  THD *thd;
  Event_queue *queue;
  pthread_mutex_t LOCK_loaded;
  pthread_cond_t COND_loaded;
unknown's avatar
unknown committed
43
  bool loading_finished;
unknown's avatar
unknown committed
44 45
};

46 47

/*
48
  Compares the execute_at members of two Event_queue_element instances.
49 50 51 52
  Used as callback for the prioritized queue when shifting
  elements inside.

  SYNOPSIS
53
    event_queue_element_data_compare_q()
54 55 56
      vptr  Not used (set it to NULL)
      a     First Event_queue_element object
      b     Second Event_queue_element object
57 58

  RETURN VALUE
59 60 61
   -1   a->execute_at < b->execute_at
    0   a->execute_at == b->execute_at
    1   a->execute_at > b->execute_at
unknown's avatar
unknown committed
62

63 64 65 66 67
  NOTES
    execute_at.second_part is not considered during comparison
*/

static int 
68
event_queue_element_compare_q(void *vptr, byte* a, byte *b)
69
{
70 71
  return my_time_compare(&((Event_queue_element *)a)->execute_at,
                         &((Event_queue_element *)b)->execute_at);
72 73 74 75 76 77 78 79 80 81 82
}


/*
  Constructor of class Event_queue.

  SYNOPSIS
    Event_queue::Event_queue()
*/

Event_queue::Event_queue()
83 84 85
  :mutex_last_unlocked_at_line(0), mutex_last_locked_at_line(0),
   mutex_last_attempted_lock_at_line(0),
   mutex_queue_data_locked(FALSE), mutex_queue_data_attempting_lock(FALSE)
86
{
87 88
  mutex_last_unlocked_in_func= mutex_last_locked_in_func=
    mutex_last_attempted_lock_in_func= "";
89
  set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
90 91
}

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

/*
  Inits mutexes.

  SYNOPSIS
    Event_queue::init_mutexes()
*/

void
Event_queue::init_mutexes()
{
  pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
}


/*
  Destroys mutexes.

  SYNOPSIS
    Event_queue::deinit_mutexes()
*/

void
Event_queue::deinit_mutexes()
{
  pthread_mutex_destroy(&LOCK_event_queue);
}


/*
122 123 124 125 126 127
  This is a queue's constructor. Until this method is called, the
  queue is unusable.  We don't use a C++ constructor instead in
  order to be able to check the return value. The queue is
  initialized once at server startup.  Initialization can fail in
  case of a failure reading events from the database or out of
  memory.
128 129 130 131 132 133 134 135 136 137

  SYNOPSIS
    Event_queue::init()

  RETURN VALUE
    FALSE  OK
    TRUE   Error
*/

bool
138
Event_queue::init_queue(THD *thd, Event_db_repository *db_repo)
139
{
unknown's avatar
unknown committed
140 141 142 143
  pthread_t th;
  bool res;
  struct event_queue_param *event_queue_param_value= NULL;

144 145 146 147 148 149
  DBUG_ENTER("Event_queue::init_queue");
  DBUG_PRINT("enter", ("this=0x%lx", this));

  LOCK_QUEUE_DATA();
  db_repository= db_repo;

150
  if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/,
unknown's avatar
unknown committed
151
                    0 /*max_on_top*/, event_queue_element_compare_q,
152
                    NULL, EVENT_QUEUE_EXTENT))
153 154
  {
    sql_print_error("SCHEDULER: Can't initialize the execution queue");
unknown's avatar
unknown committed
155
    goto err;
156 157 158 159 160
  }

  if (sizeof(my_time_t) != sizeof(time_t))
  {
    sql_print_error("SCHEDULER: sizeof(my_time_t) != sizeof(time_t) ."
161
                    "The scheduler may not work correctly. Stopping");
162
    DBUG_ASSERT(0);
unknown's avatar
unknown committed
163
    goto err;
164 165
  }

166
  res= load_events_from_db(thd);
167
  UNLOCK_QUEUE_DATA();
168 169 170
  if (res)
    deinit_queue();

unknown's avatar
unknown committed
171 172 173 174 175
  DBUG_RETURN(res);

err:
  UNLOCK_QUEUE_DATA();
  DBUG_RETURN(TRUE);
176 177 178 179
}


/*
180 181
  Deinits the queue. Remove all elements from it and destroys them
  too.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200

  SYNOPSIS
    Event_queue::deinit_queue()
*/

void
Event_queue::deinit_queue()
{
  DBUG_ENTER("Event_queue::deinit_queue");

  LOCK_QUEUE_DATA();
  empty_queue();
  delete_queue(&queue);
  UNLOCK_QUEUE_DATA();

  DBUG_VOID_RETURN;
}


201
/*
202
  Adds an event to the queue.
203 204 205

  SYNOPSIS
    Event_queue::create_event()
206 207
      dbname  The schema of the new event
      name    The name of the new event
208 209 210 211 212 213 214

  RETURN VALUE
    OP_OK             OK or scheduler not working
    OP_LOAD_ERROR     Error during loading from disk
*/

int
215
Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
216 217
{
  int res;
218
  Event_queue_element *new_element;
219
  DBUG_ENTER("Event_queue::create_event");
220
  DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, dbname.str, name.str));
221

222 223 224 225
  new_element= new Event_queue_element();
  res= db_repository->load_named_event(thd, dbname, name, new_element);
  if (res || new_element->status == Event_queue_element::DISABLED)
    delete new_element;
226
  else
227
  {
228
    new_element->compute_next_execution_time();
229 230

    LOCK_QUEUE_DATA();
231 232
    DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element));
    queue_insert_safe(&queue, (byte *) new_element);
unknown's avatar
unknown committed
233
    dbug_dump_queue(thd->query_start());
234
    pthread_cond_broadcast(&COND_queue_state);  
235
    UNLOCK_QUEUE_DATA();
236
  }
237

238 239 240 241 242 243 244 245
  DBUG_RETURN(res);
}


/*
  Updates an event from the scheduler queue

  SYNOPSIS
246
    Event_queue::update_event()
247
      thd        Thread
248 249 250 251
      dbname     Schema of the event
      name       Name of the event
      new_schema New schema, in case of RENAME TO, otherwise NULL
      new_name   New name, in case of RENAME TO, otherwise NULL
252 253 254 255 256 257 258

  RETURN VALUE
    OP_OK             OK or scheduler not working
    OP_LOAD_ERROR     Error during loading from disk
*/

int
259
Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
260
                          LEX_STRING *new_schema, LEX_STRING *new_name)
261
{
262
  int res;
263
  Event_queue_element *new_element;
264 265

  DBUG_ENTER("Event_queue::update_event");
266
  DBUG_PRINT("enter", ("thd=0x%lx et=[%s.%s]", thd, dbname.str, name.str));
267

268
  new_element= new Event_queue_element();
269

270 271
  res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname,
                                       new_name ? *new_name:name, new_element);
272 273
  if (res)
  {
274
    delete new_element;
275
    goto end;
276
  }
277
  else if (new_element->status == Event_queue_element::DISABLED)
278 279 280 281 282 283
  {
    DBUG_PRINT("info", ("The event is disabled."));
    /*
      Destroy the object but don't skip to end: because we may have to remove
      object from the cache.
    */
284 285
    delete new_element;
    new_element= NULL;
286 287
  }
  else
288
    new_element->compute_next_execution_time();
289

290
  LOCK_QUEUE_DATA();
291 292
  find_n_remove_event(dbname, name);

293
  /* If not disabled event */
294
  if (new_element)
295
  {
296
    DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element));
297
    queue_insert_safe(&queue, (byte *) new_element);
298
    pthread_cond_broadcast(&COND_queue_state);  
299
  }
300

unknown's avatar
unknown committed
301
  dbug_dump_queue(thd->query_start());
302 303
  UNLOCK_QUEUE_DATA();

304 305
end:
  DBUG_PRINT("info", ("res=%d", res));
306 307 308 309 310
  DBUG_RETURN(res);
}


/*
311
  Drops an event from the queue
312 313 314

  SYNOPSIS
    Event_queue::drop_event()
315 316 317
      thd     Thread
      dbname  Schema of the event to drop
      name    Name of the event to drop
318 319
*/

320 321
void
Event_queue::drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
322 323 324
{
  int res;
  DBUG_ENTER("Event_queue::drop_event");
325
  DBUG_PRINT("enter", ("thd=0x%lx name=0x%lx", thd, name));
326 327

  LOCK_QUEUE_DATA();
328
  find_n_remove_event(dbname, name);
unknown's avatar
unknown committed
329
  dbug_dump_queue(thd->query_start());
330
  UNLOCK_QUEUE_DATA();
331
  
332 333 334 335
  /*
    We don't signal here because the scheduler will catch the change
    next time it wakes up.
  */
336

337
  DBUG_VOID_RETURN;
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
}


/*
  Drops all events from the in-memory queue and disk that match
  certain pattern evaluated by a comparator function

  SYNOPSIS
    Event_queue::drop_matching_events()
      thd            THD
      pattern        A pattern string
      comparator     The function to use for comparing

  RETURN VALUE
    >=0  Number of dropped events
    
  NOTE
    Expected is the caller to acquire lock on LOCK_event_queue
*/

void
Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
360
                           bool (*comparator)(LEX_STRING, Event_basic *))
361
{
362
  uint i= 0;
363
  DBUG_ENTER("Event_queue::drop_matching_events");
unknown's avatar
unknown committed
364
  DBUG_PRINT("enter", ("pattern=%s", pattern.str));
365 366 367

  while (i < queue.elements)
  {
368
    Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
369
    DBUG_PRINT("info", ("[%s.%s]?", et->dbname.str, et->name.str));
370
    if (comparator(pattern, et))
371 372
    {
      /*
373 374 375 376
        The queue is ordered. If we remove an element, then all elements
        after it will shift one position to the left, if we imagine it as
        an array from left to the right. In this case we should not
        increment the counter and the (i < queue.elements) condition is ok.
377 378
      */
      queue_remove(&queue, i);
379
      delete et;
380 381 382 383 384
    }
    else
      i++;
  }
  /*
385 386
    We don't call pthread_cond_broadcast(&COND_queue_state);  
    If we remove the top event:
387 388 389 390 391 392
    1. The queue is empty. The scheduler will wake up at some time and
       realize that the queue is empty. If create_event() comes inbetween
       it will signal the scheduler
    2. The queue is not empty, but the next event after the previous top,
       won't be executed any time sooner than the element we removed. Hence,
       we may not notify the scheduler and it will realize the change when it
393
       wakes up from timedwait.
394
  */
unknown's avatar
unknown committed
395

396 397 398 399 400 401 402 403 404 405
  DBUG_VOID_RETURN;
}


/*
  Drops all events from the in-memory queue and disk that are from
  certain schema.

  SYNOPSIS
    Event_queue::drop_schema_events()
406 407
      thd        HD
      schema    The schema name
408 409
*/

410
void
411 412 413 414
Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
{
  DBUG_ENTER("Event_queue::drop_schema_events");
  LOCK_QUEUE_DATA();
415
  drop_matching_events(thd, schema, event_basic_db_equal);
416
  UNLOCK_QUEUE_DATA();
417
  DBUG_VOID_RETURN;
418 419 420
}


421 422 423 424 425 426 427
/*
  Searches for an event in the queue

  SYNOPSIS
    Event_queue::find_n_remove_event()
      db    The schema of the event to find
      name  The event to find
428

429 430 431
  NOTE
    The caller should do the locking also the caller is responsible for
    actual signalling in case an event is removed from the queue.
432 433
*/

434
void
435
Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name)
436
{
437 438 439 440 441 442 443 444 445 446 447
  uint i;
  DBUG_ENTER("Event_queue::find_n_remove_event");

  for (i= 0; i < queue.elements; ++i)
  {
    Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
    DBUG_PRINT("info", ("[%s.%s]==[%s.%s]?", db.str, name.str,
                        et->dbname.str, et->name.str));
    if (event_basic_identifier_equal(db, name, et))
    {
      queue_remove(&queue, i);
448 449
      delete et;
      break;
450 451 452
    }
  }

453
  DBUG_VOID_RETURN;
454 455 456 457 458 459
}


/*
  Loads all ENABLED events from mysql.event into the prioritized
  queue. Called during scheduler main thread initialization. Compiles
460
  the events. Creates Event_queue_element instances for every ENABLED event
461 462 463 464 465
  from mysql.event.

  SYNOPSIS
    Event_queue::load_events_from_db()
      thd - Thread context. Used for memory allocation in some cases.
unknown's avatar
unknown committed
466

467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
  RETURN VALUE
    0  OK
   !0  Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, 
              EVEX_COMPILE_ERROR) - in all these cases mysql.event was
              tampered.

  NOTES
    Reports the error to the console
*/

int
Event_queue::load_events_from_db(THD *thd)
{
  TABLE *table;
  READ_RECORD read_record_info;
  int ret= -1;
  uint count= 0;
unknown's avatar
unknown committed
484
  bool clean_the_queue= TRUE;
485 486 487 488
  /* Compile the events on this root but only for syntax check, then discard */
  MEM_ROOT boot_root;

  DBUG_ENTER("Event_queue::load_events_from_db");
489
  DBUG_PRINT("enter", ("thd=0x%lx", thd));
490 491 492

  if ((ret= db_repository->open_event_table(thd, TL_READ, &table)))
  {
493
    sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open");
494 495 496 497 498 499
    DBUG_RETURN(EVEX_OPEN_TABLE_FAILED);
  }

  init_read_record(&read_record_info, thd, table ,NULL,1,0);
  while (!(read_record_info.read_record(&read_record_info)))
  {
500 501
    Event_queue_element *et;
    if (!(et= new Event_queue_element))
502 503 504 505 506 507
    {
      DBUG_PRINT("info", ("Out of memory"));
      break;
    }
    DBUG_PRINT("info", ("Loading event from row."));

508
    if ((ret= et->load_from_row(table)))
509 510 511 512 513
    {
      sql_print_error("SCHEDULER: Error while loading from mysql.event. "
                      "Table probably corrupted");
      break;
    }
514
    if (et->status != Event_queue_element::ENABLED)
515 516 517 518 519 520 521 522 523 524 525 526 527
    {
      DBUG_PRINT("info",("%s is disabled",et->name.str));
      delete et;
      continue;
    }

    /* let's find when to be executed */
    if (et->compute_next_execution_time())
    {
      sql_print_error("SCHEDULER: Error while computing execution time of %s.%s."
                      " Skipping", et->dbname.str, et->name.str);
      continue;
    }
unknown's avatar
unknown committed
528 529 530 531 532 533 534

    {
      Event_job_data temp_job_data;
      DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str));

      temp_job_data.load_from_row(table);

535 536 537 538
      /*
        We load only on scheduler root just to check whether the body
        compiles.
      */
unknown's avatar
unknown committed
539 540 541 542 543 544
      switch (ret= temp_job_data.compile(thd, thd->mem_root)) {
      case EVEX_MICROSECOND_UNSUP:
        sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not "
                        "supported but found in mysql.event");
        break;
      case EVEX_COMPILE_ERROR:
545
        sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load",
unknown's avatar
unknown committed
546 547 548 549 550 551 552 553 554 555 556 557 558 559
                        et->dbname.str, et->name.str);
        break;
      default:
        break;
      }
      thd->end_statement();
      thd->cleanup_after_query();
    }
    if (ret)
    {
      delete et;
      goto end;
    }

560
    DBUG_PRINT("load_events_from_db", ("Adding 0x%lx to the exec list."));
561 562 563
    queue_insert_safe(&queue,  (byte *) et);
    count++;
  }
unknown's avatar
unknown committed
564
  clean_the_queue= FALSE;
565 566 567 568 569
end:
  end_read_record(&read_record_info);

  if (clean_the_queue)
  {
570
    empty_queue();
571 572 573 574 575
    ret= -1;
  }
  else
  {
    ret= 0;
576 577
    sql_print_information("SCHEDULER: Loaded %d event%s", count,
                          (count == 1)?"":"s");
578 579 580 581 582 583 584 585 586 587
  }

  close_thread_tables(thd);

  DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
  DBUG_RETURN(ret);
}


/*
588 589 590 591 592 593
  Recalculates activation times in the queue. There is one reason for
  that. Because the values (execute_at) by which the queue is ordered are
  changed by calls to compute_next_execution_time() on a request from the
  scheduler thread, if it is not running then the values won't be updated.
  Once the scheduler is started again the values has to be recalculated
  so they are right for the current time.
594 595

  SYNOPSIS
596 597
    Event_queue::recalculate_activation_times()
      thd  Thread
598 599 600
*/

void
601
Event_queue::recalculate_activation_times(THD *thd)
602
{
603 604
  uint i;
  DBUG_ENTER("Event_queue::recalculate_activation_times");
605 606

  LOCK_QUEUE_DATA();
607 608
  DBUG_PRINT("info", ("%u loaded events to be recalculated", queue.elements));
  for (i= 0; i < queue.elements; i++)
609
  {
610 611
    ((Event_queue_element*)queue_element(&queue, i))->compute_next_execution_time();
    ((Event_queue_element*)queue_element(&queue, i))->update_timing_fields(thd);
612
  }
613
  queue_fix(&queue);
614 615 616 617 618 619
  UNLOCK_QUEUE_DATA();

  DBUG_VOID_RETURN;
}


620 621 622
/*
  Empties the queue and destroys the Event_queue_element objects in the
  queue.
623

624 625 626 627
  SYNOPSIS
    Event_queue::empty_queue()

  NOTE
628
    Should be called with LOCK_event_queue locked
629
*/
630 631 632 633

void
Event_queue::empty_queue()
{
634
  uint i;
635 636
  DBUG_ENTER("Event_queue::empty_queue");
  DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
unknown's avatar
unknown committed
637
  sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements);
638
  /* empty the queue */
639
  for (i= 0; i < queue.elements; ++i)
640
  {
641
    Event_queue_element *et= (Event_queue_element *) queue_element(&queue, i);
642 643 644
    delete et;
  }
  resize_queue(&queue, 0);
645
  DBUG_VOID_RETURN;
646
}
647 648


649 650 651 652 653 654 655 656
/*
  Dumps the queue to the trace log.

  SYNOPSIS
    Event_queue::dbug_dump_queue()
      now  Current timestamp
*/

unknown's avatar
unknown committed
657
void
658 659 660
Event_queue::dbug_dump_queue(time_t now)
{
#ifndef DBUG_OFF
661
  Event_queue_element *et;
662
  uint i;
unknown's avatar
unknown committed
663
  DBUG_ENTER("Event_queue::dbug_dump_queue");
664 665 666
  DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
  for (i = 0; i < queue.elements; i++)
  {
667 668
    et= ((Event_queue_element*)queue_element(&queue, i));
    DBUG_PRINT("info",("et=0x%lx db=%s name=%s",et, et->dbname.str, et->name.str));
unknown's avatar
unknown committed
669
    DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu execs_so_far=%u"
670 671 672 673
               " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
               TIME_to_ulonglong_datetime(&et->execute_at),
               TIME_to_ulonglong_datetime(&et->starts),
               TIME_to_ulonglong_datetime(&et->ends),
unknown's avatar
unknown committed
674
               et->execution_count,
675 676 677 678
               et->expression, sec_since_epoch_TIME(&et->execute_at), now,
               (int)(sec_since_epoch_TIME(&et->execute_at) - now),
               sec_since_epoch_TIME(&et->execute_at) <= now));
  }
unknown's avatar
unknown committed
679
  DBUG_VOID_RETURN;
680 681 682
#endif
}

683 684
static const char *queue_empty_msg= "Waiting on empty queue";
static const char *queue_wait_msg= "Waiting for next activation";
685 686 687 688 689 690 691

/*
  Checks whether the top of the queue is elligible for execution and
  returns an Event_job_data instance in case it should be executed.
  `now` is compared against `execute_at` of the top element in the queue.

  SYNOPSIS
unknown's avatar
unknown committed
692
    Event_queue::get_top_for_execution_if_time()
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
      thd      [in]  Thread
      now      [in]  Current timestamp
      job_data [out] The object to execute
      abstime  [out] Time to sleep

  RETURN VALUE
    FALSE  No error. If *job_data==NULL then top not elligible for execution.
           Could be that there is no top. If abstime->tv_sec is set to value
           greater than zero then use abstime with pthread_cond_timedwait().
           If abstime->tv_sec is zero then sleep with pthread_cond_wait().
           abstime->tv_nsec is always zero.
    TRUE   Error
    
*/

bool
709
Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data)
710
{
711
  bool ret= FALSE;
712
  struct timespec top_time;
713
  struct timespec *abstime;
714
  *job_data= NULL;
715
  DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
716 717

  top_time.tv_nsec= 0;
718
  LOCK_QUEUE_DATA();
719 720
  for (;;)
  {
721
    int res;
722
    Event_queue_element *top= NULL;
723

724 725 726
    thd->end_time();
    time_t now= thd->query_start();
    abstime= NULL;
727

728 729 730 731
    if (queue.elements)
    {
      top= ((Event_queue_element*) queue_element(&queue, 0));
      top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at);
732

733 734 735 736
      abstime= &top_time;
    }

    if (!abstime || abstime->tv_sec > now)
737
    {
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
      const char *msg;
      if (abstime)
      {
        next_activation_at= top->execute_at;
        msg= queue_wait_msg;
      }
      else
      {
        set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME);
        msg= queue_wait_msg;
      }

      cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__);
      if (thd->killed)
      {
        DBUG_PRINT("info", ("thd->killed=%d", thd->killed));
        goto end;
      }
      /*
        The queue could have been emptied. Therefore it's safe to start from
        the beginning. Moreover, this way we will get also the new top, if
        the element at the top has been changed.
      */
      continue;
762
    }
763 764

    DBUG_PRINT("info", ("Ready for execution"));
unknown's avatar
unknown committed
765 766 767 768 769
    if (!(*job_data= new Event_job_data()))
    {
      ret= TRUE;
      break;
    }
770 771 772
    if ((res= db_repository->load_named_event(thd, top->dbname, top->name,
                                              *job_data)))
    {
773
      DBUG_PRINT("error", ("Got %d from load_named_event", res));
774 775 776 777 778 779 780 781 782
      delete *job_data;
      *job_data= NULL;
      ret= TRUE;
      break;
    }

    top->mark_last_executed(thd);
    if (top->compute_next_execution_time())
      top->status= Event_queue_element::DISABLED;
unknown's avatar
unknown committed
783 784 785
    DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status));

    (*job_data)->execution_count= top->execution_count;
786 787 788 789 790 791

    top->update_timing_fields(thd);
    if (((top->execute_at.year && !top->expression) || top->execute_at_null) ||
        (top->status == Event_queue_element::DISABLED))
    {
      DBUG_PRINT("info", ("removing from the queue"));
unknown's avatar
unknown committed
792 793 794
      sql_print_information("SCHEDULER: Last execution of %s.%s. %s",
                            top->dbname.str, top->name.str,
                            top->dropped? "Dropping.":"");
795 796 797 798 799 800 801
      if (top->dropped)
        top->drop(thd);
      delete top;
      queue_remove(&queue, 0);
    }
    else
      queue_replaced(&queue);
unknown's avatar
unknown committed
802 803

    dbug_dump_queue(now);
804 805 806
    break;
  }
end:
807 808
  UNLOCK_QUEUE_DATA();
  
809
  DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ",
810
             ret, *job_data, abstime? abstime->tv_sec:0));
811 812 813 814 815 816

  if (*job_data)
    DBUG_PRINT("info", ("db=%s  name=%s definer=%s", (*job_data)->dbname.str,
               (*job_data)->name.str, (*job_data)->definer.str));

  DBUG_RETURN(ret);
817
}
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834


/*
  Auxiliary function for locking LOCK_event_queue. Used by the
  LOCK_QUEUE_DATA macro

  SYNOPSIS
    Event_queue::lock_data()
      func  Which function is requesting mutex lock
      line  On which line mutex lock is requested
*/

void
Event_queue::lock_data(const char *func, uint line)
{
  DBUG_ENTER("Event_queue::lock_data");
  DBUG_PRINT("enter", ("func=%s line=%u", func, line));
835 836 837
  mutex_last_attempted_lock_in_func= func;
  mutex_last_attempted_lock_at_line= line;
  mutex_queue_data_attempting_lock= TRUE;
838
  pthread_mutex_lock(&LOCK_event_queue);
839 840 841 842
  mutex_last_attempted_lock_in_func= "";
  mutex_last_attempted_lock_at_line= 0;
  mutex_queue_data_attempting_lock= FALSE;

843 844 845
  mutex_last_locked_in_func= func;
  mutex_last_locked_at_line= line;
  mutex_queue_data_locked= TRUE;
846

847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873
  DBUG_VOID_RETURN;
}


/*
  Auxiliary function for unlocking LOCK_event_queue. Used by the
  UNLOCK_QUEUE_DATA macro

  SYNOPSIS
    Event_queue::unlock_data()
      func  Which function is requesting mutex unlock
      line  On which line mutex unlock is requested
*/

void
Event_queue::unlock_data(const char *func, uint line)
{
  DBUG_ENTER("Event_queue::unlock_data");
  DBUG_PRINT("enter", ("func=%s line=%u", func, line));
  mutex_last_unlocked_at_line= line;
  mutex_queue_data_locked= FALSE;
  mutex_last_unlocked_in_func= func;
  pthread_mutex_unlock(&LOCK_event_queue);
  DBUG_VOID_RETURN;
}


874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
/*
  Wrapper for pthread_cond_wait/timedwait

  SYNOPSIS
    Event_queue::cond_wait()
      thd     Thread (Could be NULL during shutdown procedure)
      msg     Message for thd->proc_info
      abstime If not null then call pthread_cond_timedwait()
      func    Which function is requesting cond_wait
      line    On which line cond_wait is requested
*/

void
Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
                       const char *func, uint line)
{
  DBUG_ENTER("Event_queue::cond_wait");
  waiting_on_cond= TRUE;
  mutex_last_unlocked_at_line= line;
  mutex_queue_data_locked= FALSE;
  mutex_last_unlocked_in_func= func;

  thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg);

  DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":""));
  if (!abstime)
    pthread_cond_wait(&COND_queue_state, &LOCK_event_queue);
  else
    pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime);

  mutex_last_locked_in_func= func;
  mutex_last_locked_at_line= line;
  mutex_queue_data_locked= TRUE;
  waiting_on_cond= FALSE;

  /*
    This will free the lock so we need to relock. Not the best thing to
    do but we need to obey cond_wait()
  */
  thd->exit_cond("");
  LOCK_QUEUE_DATA();

  DBUG_VOID_RETURN;
}


920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961
/*
  Dumps the internal status of the queue

  SYNOPSIS
    Event_queue::dump_internal_status()
      thd  Thread

  RETURN VALUE
    FALSE  OK
    TRUE   Error
*/

bool
Event_queue::dump_internal_status(THD *thd)
{
  DBUG_ENTER("Event_queue::dump_internal_status");
#ifndef DBUG_OFF
  CHARSET_INFO *scs= system_charset_info;
  Protocol *protocol= thd->protocol;
  List<Item> field_list;
  int ret;
  char tmp_buff[5*STRING_BUFFER_USUAL_SIZE];
  char int_buff[STRING_BUFFER_USUAL_SIZE];
  String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
  String int_string(int_buff, sizeof(int_buff), scs);
  tmp_string.length(0);
  int_string.length(0);

  /* workers_count */
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue element count"), scs);
  int_string.set((longlong) queue.elements, scs);
  protocol->store(&int_string);
  ret= protocol->write();

  /* queue_data_locked */
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue data locked"), scs);
  int_string.set((longlong) mutex_queue_data_locked, scs);
  protocol->store(&int_string);
  ret= protocol->write();

962 963 964 965 966 967 968
  /* queue_data_attempting_lock */
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue data attempting lock"), scs);
  int_string.set((longlong) mutex_queue_data_attempting_lock, scs);
  protocol->store(&int_string);
  ret= protocol->write();

969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
  /* last locked at*/
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue last locked at"), scs);
  tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
                                        tmp_string.alloced_length(), "%s::%d",
                                        mutex_last_locked_in_func,
                                        mutex_last_locked_at_line));
  protocol->store(&tmp_string);
  ret= protocol->write();

  /* last unlocked at*/
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue last unlocked at"), scs);
  tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
                                        tmp_string.alloced_length(), "%s::%d",
                                        mutex_last_unlocked_in_func,
                                        mutex_last_unlocked_at_line));
  protocol->store(&tmp_string);
  ret= protocol->write();
988 989 990 991 992 993 994 995 996 997 998

  /* last attempted lock  at*/
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue last attempted lock at"), scs);
  tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
                                        tmp_string.alloced_length(), "%s::%d",
                                        mutex_last_attempted_lock_in_func,
                                        mutex_last_attempted_lock_at_line));
  protocol->store(&tmp_string);
  ret= protocol->write();

999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  /* waiting on */
  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs);
  int_string.set((longlong) waiting_on_cond, scs);
  protocol->store(&int_string);
  ret= protocol->write();

  protocol->prepare_for_resend();
  protocol->store(STRING_WITH_LEN("next activation at"), scs);
  tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
                                        tmp_string.alloced_length(),
                                        "%4d-%02d-%02d %02d:%02d:%02d",
                                        next_activation_at.year,
                                        next_activation_at.month,
                                        next_activation_at.day,
                                        next_activation_at.hour,
                                        next_activation_at.minute,
                                        next_activation_at.second
                                        ));
  protocol->store(&tmp_string);
  ret= protocol->write();

1021 1022 1023
#endif
  DBUG_RETURN(FALSE);
}