ha_archive.cc 45.5 KB
Newer Older
1 2 3 4
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
unknown's avatar
unknown committed
5
  the Free Software Foundation; version 2 of the License.
6 7 8 9 10 11 12 13 14 15

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

unknown's avatar
unknown committed
16
#ifdef USE_PRAGMA_IMPLEMENTATION
17 18 19
#pragma implementation        // gcc: Class implementation
#endif

20
#include "mysql_priv.h"
unknown's avatar
unknown committed
21
#include <myisam.h>
22 23

#include "ha_archive.h"
24
#include <my_dir.h>
25

unknown's avatar
unknown committed
26 27
#include <mysql/plugin.h>

28 29 30
/*
  First, if you want to understand storage engines you should look at 
  ha_example.cc and ha_example.h. 
31

32 33 34
  This example was written as a test case for a customer who needed
  a storage engine without indexes that could compress data very well.
  So, welcome to a completely compressed storage engine. This storage
35
  engine only does inserts. No replace, deletes, or updates. All reads are 
36 37
  complete table scans. Compression is done through a combination of packing
  and making use of the zlib library
38 39 40
  
  We keep a file pointer open for each instance of ha_archive for each read
  but for writes we keep one open file handle just for that. We flush it
unknown's avatar
unknown committed
41
  only if we have a read occur. azip handles compressing lots of records
42 43 44 45 46
  at once much better then doing lots of little records between writes.
  It is possible to not lock on writes but this would then mean we couldn't
  handle bulk inserts as well (that is if someone was trying to read at
  the same time since we would want to flush).

47 48 49 50 51 52 53 54 55 56
  A "meta" file is kept alongside the data file. This file serves two purpose.
  The first purpose is to track the number of rows in the table. The second 
  purpose is to determine if the table was closed properly or not. When the 
  meta file is first opened it is marked as dirty. It is opened when the table 
  itself is opened for writing. When the table is closed the new count for rows 
  is written to the meta file and the file is marked as clean. If the meta file 
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
  this point an error occurs and the user is told to rebuild the file.
  A rebuild scans the rows and rewrites the meta file. If corruption is found
  in the data file then the meta file is not repaired.
57

58
  At some point a recovery method for such a drastic case needs to be divised.
59

60
  Locks are row level, and you will get a consistant read. 
61 62 63 64 65 66 67

  For performance as far as table scans go it is quite fast. I don't have
  good numbers but locally it has out performed both Innodb and MyISAM. For
  Innodb the question will be if the table can be fit into the buffer
  pool. For MyISAM its a question of how much the file system caches the
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
  doesn't have enough memory to cache entire table that archive turns out 
68
  to be any faster. 
69

70
  Examples between MyISAM (packed) and Archive.
71 72 73 74 75 76 77 78 79 80 81

  Table with 76695844 identical rows:
  29680807 a_archive.ARZ
  920350317 a.MYD


  Table with 8991478 rows (all of Slashdot's comments):
  1922964506 comment_archive.ARZ
  2944970297 comment_text.MYD


82 83
  TODO:
   Allow users to set compression level.
84
   Allow adjustable block size.
85 86
   Implement versioning, should be easy.
   Allow for errors, find a way to mark bad rows.
87
   Add optional feature so that rows can be flushed at interval (which will cause less
88 89 90 91
     compression but may speed up ordered searches).
   Checkpoint the meta file to allow for faster rebuilds.
   Option to allow for dirty reads, this would lower the sync calls, which would make
     inserts a lot faster, but would mean highly arbitrary reads.
92 93 94 95 96 97 98 99 100

    -Brian
*/

/* Variables for archive share methods */
pthread_mutex_t archive_mutex;
static HASH archive_open_tables;

/* The file extension */
101 102
#define ARZ ".ARZ"               // The data file
#define ARN ".ARN"               // Files used during an optimize call
103
#define ARM ".ARM"               // Meta file (deprecated)
104

105 106 107 108 109
/*
  uchar + uchar
*/
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
110

111
/* Static declarations for handerton */
112 113 114
static handler *archive_create_handler(handlerton *hton, 
                                       TABLE_SHARE *table, 
                                       MEM_ROOT *mem_root);
115
int archive_discover(handlerton *hton, THD* thd, const char *db, 
116 117 118
                     const char *name,
                     uchar **frmblob, 
                     size_t *frmlen);
119

120
/*
121 122 123
  Number of rows that will force a bulk insert.
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
124

125 126 127 128 129
/*
  Size of header used for row
*/
#define ARCHIVE_ROW_HEADER_SIZE 4

130 131 132
static handler *archive_create_handler(handlerton *hton,
                                       TABLE_SHARE *table, 
                                       MEM_ROOT *mem_root)
133
{
134
  return new (mem_root) ha_archive(hton, table);
135
}
unknown's avatar
unknown committed
136

137 138 139
/*
  Used for hash table that tracks open tables.
*/
140
static uchar* archive_get_key(ARCHIVE_SHARE *share, size_t *length,
141 142 143
                             my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
144
  return (uchar*) share->table_name;
145 146
}

147 148 149 150 151 152

/*
  Initialize the archive handler.

  SYNOPSIS
    archive_db_init()
153
    void *
154 155

  RETURN
156 157
    FALSE       OK
    TRUE        Error
158 159
*/

160
int archive_db_init(void *p)
161
{
162
  DBUG_ENTER("archive_db_init");
163
  handlerton *archive_hton;
164

165
  archive_hton= (handlerton *)p;
166 167 168 169 170
  archive_hton->state= SHOW_OPTION_YES;
  archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
  archive_hton->create= archive_create_handler;
  archive_hton->flags= HTON_NO_FLAGS;
  archive_hton->discover= archive_discover;
unknown's avatar
unknown committed
171

172 173
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
    goto error;
174
  if (hash_init(&archive_open_tables, table_alias_charset, 32, 0, 0,
unknown's avatar
unknown committed
175
                (hash_get_key) archive_get_key, 0, 0))
176 177 178 179 180 181 182 183 184
  {
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  else
  {
    DBUG_RETURN(FALSE);
  }
error:
  DBUG_RETURN(TRUE);
185 186 187 188 189 190
}

/*
  Release the archive handler.

  SYNOPSIS
unknown's avatar
unknown committed
191
    archive_db_done()
192 193 194 195 196 197
    void

  RETURN
    FALSE       OK
*/

198
int archive_db_done(void *p)
199
{
200 201 202
  hash_free(&archive_open_tables);
  VOID(pthread_mutex_destroy(&archive_mutex));

203
  return 0;
204 205
}

unknown's avatar
unknown committed
206

207 208
ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
  :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
209 210 211 212 213
{
  /* Set our original buffer from pre-allocated memory */
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);

  /* The size of the offset value we will use for position() */
214 215
  ref_length= sizeof(my_off_t);
  archive_reader_open= FALSE;
216
}
217

218
int archive_discover(handlerton *hton, THD* thd, const char *db, 
219 220 221
                     const char *name,
                     uchar **frmblob, 
                     size_t *frmlen)
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
{
  DBUG_ENTER("archive_discover");
  DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name)); 
  azio_stream frm_stream;
  char az_file[FN_REFLEN];
  char *frm_ptr;
  MY_STAT file_stat; 

  fn_format(az_file, name, db, ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);

  if (!(my_stat(az_file, &file_stat, MYF(0))))
    goto err;

  if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
  {
    if (errno == EROFS || errno == EACCES)
      DBUG_RETURN(my_errno= errno);
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
  }

  if (frm_stream.frm_length == 0)
    goto err;

  frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
  azread_frm(&frm_stream, frm_ptr);
  azclose(&frm_stream);

  *frmlen= frm_stream.frm_length;
250
  *frmblob= (uchar*) frm_ptr;
251 252 253 254 255 256 257

  DBUG_RETURN(0);
err:
  my_errno= 0;
  DBUG_RETURN(1);
}

258 259 260
/*
  This method reads the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
261
int ha_archive::read_data_header(azio_stream *file_to_read)
262
{
263 264
  int error;
  unsigned long ret;
265
  uchar data_buffer[DATA_BUFFER_SIZE];
266 267
  DBUG_ENTER("ha_archive::read_data_header");

unknown's avatar
unknown committed
268
  if (azrewind(file_to_read) == -1)
269 270
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

271 272 273
  if (file_to_read->version >= 3)
    DBUG_RETURN(0);
  /* Everything below this is just legacy to version 2< */
274

275
  DBUG_PRINT("ha_archive", ("Reading legacy data header"));
276

277
  ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
278

279 280
  if (ret != DATA_BUFFER_SIZE)
  {
281
    DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu", 
282 283 284
                              DATA_BUFFER_SIZE, ret));
    DBUG_RETURN(1);
  }
285

286 287 288 289 290
  if (error)
  {
    DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
    DBUG_RETURN(1);
  }
291
  
292 293
  DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
  DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
294

295
  if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&  
296
      (data_buffer[1] == 1 || data_buffer[1] == 2))
297
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
298 299 300 301 302 303 304

  DBUG_RETURN(0);
}


/*
  We create the shared memory space that we will use for the open table. 
305 306 307
  No matter what we try to get or create a share. This is so that a repair
  table operation can occur. 

308
  See ha_example.cc for a longer description.
309
*/
unknown's avatar
unknown committed
310
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
311 312
{
  uint length;
313
  DBUG_ENTER("ha_archive::get_share");
314 315 316 317 318

  pthread_mutex_lock(&archive_mutex);
  length=(uint) strlen(table_name);

  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
319
                                           (uchar*) table_name,
320 321
                                           length)))
  {
322 323
    char *tmp_name;
    azio_stream archive_tmp;
324

325
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
326 327
                          &share, sizeof(*share),
                          &tmp_name, length+1,
328
                          NullS)) 
329 330
    {
      pthread_mutex_unlock(&archive_mutex);
331 332
      *rc= HA_ERR_OUT_OF_MEM;
      DBUG_RETURN(NULL);
333 334
    }

335 336 337
    share->use_count= 0;
    share->table_name_length= length;
    share->table_name= tmp_name;
338
    share->crashed= FALSE;
339
    share->archive_write_open= FALSE;
340
    fn_format(share->data_file_name, table_name, "",
341
              ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
342 343 344
    strmov(share->table_name, table_name);
    DBUG_PRINT("ha_archive", ("Data File %s", 
                        share->data_file_name));
345 346 347 348 349 350
    /*
      We will use this lock for rows.
    */
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
    
    /*
351 352 353 354
      We read the meta file, but do not mark it dirty. Since we are not
      doing a write we won't mark it dirty (and we won't open it for
      anything but reading... open it for write and we will generate null
      compression writes).
355
    */
356 357
    if (!(azopen(&archive_tmp, share->data_file_name, O_RDONLY|O_BINARY)))
    {
358 359 360
      *rc= my_errno ? my_errno : -1;
      pthread_mutex_unlock(&archive_mutex);
      my_free(share, MYF(0));
361 362
      DBUG_RETURN(NULL);
    }
363 364 365 366 367 368 369 370 371 372 373 374 375
    share->version= archive_tmp.version;
    if (archive_tmp.version == ARCHIVE_VERSION)
    {
      stats.auto_increment_value= archive_tmp.auto_increment + 1;
      share->rows_recorded= (ha_rows)archive_tmp.rows;
      share->crashed= archive_tmp.dirty;
    }
    else
    {
      /* Used by repair */
      share->rows_recorded= ~(ha_rows) 0;
      stats.auto_increment_value= 0;
    }
376 377 378 379 380 381
    /*
      If archive version is less than 3, It should be upgraded before
      use.
    */
    if (archive_tmp.version < ARCHIVE_VERSION)
      *rc= HA_ERR_TABLE_NEEDS_UPGRADE;
382 383
    azclose(&archive_tmp);

384
    VOID(my_hash_insert(&archive_open_tables, (uchar*) share));
385
    thr_lock_init(&share->lock);
386 387
  }
  share->use_count++;
388
  DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now", 
389 390 391 392
                      share->table_name_length, share->table_name,
                      share->use_count));
  if (share->crashed)
    *rc= HA_ERR_CRASHED_ON_USAGE;
393 394
  pthread_mutex_unlock(&archive_mutex);

395
  DBUG_RETURN(share);
396 397 398 399
}


/* 
400
  Free the share.
401 402
  See ha_example.cc for a description.
*/
unknown's avatar
unknown committed
403
int ha_archive::free_share()
404 405
{
  int rc= 0;
406
  DBUG_ENTER("ha_archive::free_share");
unknown's avatar
unknown committed
407 408 409 410
  DBUG_PRINT("ha_archive",
             ("archive table %.*s has %d open handles on entrance", 
              share->table_name_length, share->table_name,
              share->use_count));
411

412
  pthread_mutex_lock(&archive_mutex);
unknown's avatar
unknown committed
413
  if (!--share->use_count)
414
  {
415
    hash_delete(&archive_open_tables, (uchar*) share);
unknown's avatar
unknown committed
416 417
    thr_lock_delete(&share->lock);
    VOID(pthread_mutex_destroy(&share->mutex));
418 419 420 421 422 423 424
    /* 
      We need to make sure we don't reset the crashed state.
      If we open a crashed file, wee need to close it as crashed unless
      it has been repaired.
      Since we will close the data down after this, we go on and count
      the flush on close;
    */
unknown's avatar
unknown committed
425
    if (share->archive_write_open)
426
    {
unknown's avatar
unknown committed
427
      if (azclose(&(share->archive_write)))
428
        rc= 1;
429
    }
430
    my_free((uchar*) share, MYF(0));
431 432 433
  }
  pthread_mutex_unlock(&archive_mutex);

434
  DBUG_RETURN(rc);
435 436
}

437 438 439 440 441 442 443 444
int ha_archive::init_archive_writer()
{
  DBUG_ENTER("ha_archive::init_archive_writer");
  /* 
    It is expensive to open and close the data files and since you can't have
    a gzip file that can be both read and written we keep a writer open
    that is shared amoung all open tables.
  */
445
  if (!(azopen(&(share->archive_write), share->data_file_name, 
446
               O_RDWR|O_BINARY)))
447
  {
448
    DBUG_PRINT("ha_archive", ("Could not open archive write file"));
449 450 451 452 453 454 455 456
    share->crashed= TRUE;
    DBUG_RETURN(1);
  }
  share->archive_write_open= TRUE;

  DBUG_RETURN(0);
}

457

458 459 460
/* 
  No locks are required because it is associated with just one handler instance
*/
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
int ha_archive::init_archive_reader()
{
  DBUG_ENTER("ha_archive::init_archive_reader");
  /* 
    It is expensive to open and close the data files and since you can't have
    a gzip file that can be both read and written we keep a writer open
    that is shared amoung all open tables.
  */
  if (!archive_reader_open)
  {
    if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
    {
      DBUG_PRINT("ha_archive", ("Could not open archive read file"));
      share->crashed= TRUE;
      DBUG_RETURN(1);
    }
    archive_reader_open= TRUE;
  }

  DBUG_RETURN(0);
}


unknown's avatar
unknown committed
484
/*
485 486
  We just implement one additional file extension.
*/
unknown's avatar
unknown committed
487 488 489 490 491
static const char *ha_archive_exts[] = {
  ARZ,
  NullS
};

492
const char **ha_archive::bas_ext() const
unknown's avatar
unknown committed
493 494 495
{
  return ha_archive_exts;
}
496 497 498 499 500 501 502 503


/* 
  When opening a file we:
  Create/get our shared structure.
  Init out lock.
  We open the file we will read from.
*/
504
int ha_archive::open(const char *name, int mode, uint open_options)
505
{
506
  int rc= 0;
507 508
  DBUG_ENTER("ha_archive::open");

509
  DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", 
510
                      (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
unknown's avatar
unknown committed
511
  share= get_share(name, &rc);
512

513 514 515 516 517 518 519
 /*
    Allow open on crashed table in repair mode only.
    Block open on 5.0 ARCHIVE table. Though we have almost all
    routines to access these tables, they were not well tested.
    For now we have to refuse to open such table to avoid
    potential data loss.
  */
520
  switch (rc)
521
  {
522 523 524
  case 0:
    break;
  case HA_ERR_CRASHED_ON_USAGE:
525
    DBUG_PRINT("ha_archive", ("archive table was crashed"));
526
    if (open_options & HA_OPEN_FOR_REPAIR)
527 528
    {
      rc= 0;
529
      break;
530
    }
531 532
    /* fall through */
  case HA_ERR_TABLE_NEEDS_UPGRADE:
533 534 535 536 537
    if (open_options & HA_OPEN_FOR_REPAIR)
    {
      rc= 0;
      break;
    }
unknown's avatar
unknown committed
538
    free_share();
539 540
    /* fall through */
  default:
541 542 543
    DBUG_RETURN(rc);
  }

544 545
  DBUG_ASSERT(share);

546 547
  record_buffer= create_record_buffer(table->s->reclength + 
                                      ARCHIVE_ROW_HEADER_SIZE);
548 549 550

  if (!record_buffer)
  {
unknown's avatar
unknown committed
551
    free_share();
552 553 554 555
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }

  thr_lock_data_init(&share->lock, &lock, NULL);
556

557
  DBUG_RETURN(rc);
558 559 560 561
}


/*
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
  Closes the file.

  SYNOPSIS
    close();
  
  IMPLEMENTATION:

  We first close this storage engines file handle to the archive and
  then remove our reference count to the table (and possibly free it
  as well).

  RETURN
    0  ok
    1  Error
*/

578 579
int ha_archive::close(void)
{
580
  int rc= 0;
581
  DBUG_ENTER("ha_archive::close");
582

583 584
  destroy_record_buffer(record_buffer);

585
  /* First close stream */
586 587 588 589 590
  if (archive_reader_open)
  {
    if (azclose(&archive))
      rc= 1;
  }
591
  /* then also close share */
unknown's avatar
unknown committed
592
  rc|= free_share();
593 594

  DBUG_RETURN(rc);
595 596 597 598
}


/*
599 600 601 602 603 604
  We create our data file here. The format is pretty simple. 
  You can read about the format of the data file above.
  Unlike other storage engines we do not "pack" our data. Since we 
  are about to do a general compression, packing would just be a waste of 
  CPU time. If the table has blobs they are written after the row in the order 
  of creation.
605 606 607 608
*/

int ha_archive::create(const char *name, TABLE *table_arg,
                       HA_CREATE_INFO *create_info)
609 610
{
  char name_buff[FN_REFLEN];
611
  char linkname[FN_REFLEN];
612
  int error;
613
  azio_stream create_stream;            /* Archive file we are working with */
614 615
  File frm_file;                   /* File handler for readers */
  MY_STAT file_stat;  // Stat information for the data file
616
  uchar *frm_ptr;
617

618 619
  DBUG_ENTER("ha_archive::create");

620
  stats.auto_increment_value= create_info->auto_increment_value;
621

622 623 624 625 626 627 628 629 630 631 632 633 634
  for (uint key= 0; key < table_arg->s->keys; key++)
  {
    KEY *pos= table_arg->key_info+key;
    KEY_PART_INFO *key_part=     pos->key_part;
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;

    for (; key_part != key_part_end; key_part++)
    {
      Field *field= key_part->field;

      if (!(field->flags & AUTO_INCREMENT_FLAG))
      {
        error= -1;
635
        DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
636 637 638 639 640
        goto error;
      }
    }
  }

641 642 643
  /* 
    We reuse name_buff since it is available.
  */
644
  if (create_info->data_file_name && create_info->data_file_name[0] != '#')
645
  {
646
    DBUG_PRINT("ha_archive", ("archive will create stream file %s", 
647 648 649
                        create_info->data_file_name));
                        
    fn_format(name_buff, create_info->data_file_name, "", ARZ,
650
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
651
    fn_format(linkname, name, "", ARZ,
652
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
653 654 655
  }
  else
  {
656 657
    fn_format(name_buff, name, "", ARZ,
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
658
    linkname[0]= 0;
659
  }
660

661 662 663 664 665
  /*
    There is a chance that the file was "discovered". In this case
    just use whatever file is there.
  */
  if (!(my_stat(name_buff, &file_stat, MYF(0))))
666
  {
667 668 669 670 671 672 673 674 675 676 677
    my_errno= 0;
    if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
    {
      error= errno;
      goto error2;
    }

    if (linkname[0])
      my_symlink(name_buff, linkname, MYF(0));
    fn_format(name_buff, name, "", ".frm",
              MY_REPLACE_EXT | MY_UNPACK_FILENAME);
678

679 680 681
    /*
      Here is where we open up the frm and pass it to archive to store 
    */
682 683 684 685
    if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0)
    {
      if (!my_fstat(frm_file, &file_stat, MYF(MY_WME)))
      {
686
        frm_ptr= (uchar *)my_malloc(sizeof(uchar) * file_stat.st_size, MYF(0));
687 688 689 690
        if (frm_ptr)
        {
          my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0));
          azwrite_frm(&create_stream, (char *)frm_ptr, file_stat.st_size);
691
          my_free((uchar*)frm_ptr, MYF(0));
692 693 694 695
        }
      }
      my_close(frm_file, MYF(0));
    }
696 697 698 699 700 701 702 703 704

    if (create_info->comment.str)
      azwrite_comment(&create_stream, create_info->comment.str, 
                      create_info->comment.length);

    /* 
      Yes you need to do this, because the starting value 
      for the autoincrement may not be zero.
    */
705 706
    create_stream.auto_increment= stats.auto_increment_value ?
                                    stats.auto_increment_value - 1 : 0;
707 708 709 710 711 712 713 714
    if (azclose(&create_stream))
    {
      error= errno;
      goto error2;
    }
  }
  else
    my_errno= 0;
715 716 717

  DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
  DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
718

719

720
  DBUG_RETURN(0);
721

722
error2:
723
  delete_table(name);
724
error:
725 726
  /* Return error number, if we got one */
  DBUG_RETURN(error ? error : -1);
727 728
}

729 730
/*
  This is where the actual row is written out.
731
*/
732
int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
733
{
734
  my_off_t written;
735
  unsigned int r_pack_length;
736
  DBUG_ENTER("ha_archive::real_write_row");
737

738
  /* We pack the row for writing */
739 740 741
  r_pack_length= pack_row(buf);

  written= azwrite(writer, record_buffer->buffer, r_pack_length);
742 743
  if (written != r_pack_length)
  {
744 745 746
    DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", 
                                              (uint32) written, 
                                              (uint32)r_pack_length));
747 748 749
    DBUG_RETURN(-1);
  }

750
  if (!delayed_insert || !bulk_insert)
751 752
    share->dirty= TRUE;

753 754 755 756
  DBUG_RETURN(0);
}


757 758 759 760
/* 
  Calculate max length needed for row. This includes
  the bytes required for the length in the header.
*/
761

762
uint32 ha_archive::max_row_length(const uchar *buf)
763
{
764
  uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
765
  length+= ARCHIVE_ROW_HEADER_SIZE;
766 767 768

  uint *ptr, *end;
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
769 770
       ptr != end ;
       ptr++)
771
  {
772
      length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
773
  }
774

775 776 777 778
  return length;
}


779
unsigned int ha_archive::pack_row(uchar *record)
780
{
781
  uchar *ptr;
782 783 784 785

  DBUG_ENTER("ha_archive::pack_row");


786 787
  if (fix_rec_buff(max_row_length(record)))
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
788

789
  /* Copy null bits */
790 791 792
  memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE, 
         record, table->s->null_bytes);
  ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
793 794

  for (Field **field=table->field ; *field ; field++)
795
  {
796
    if (!((*field)->is_null()))
797
      ptr= (*field)->pack(ptr, record + (*field)->offset(record));
798
  }
799

800 801 802 803 804 805 806
  int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
                                         ARCHIVE_ROW_HEADER_SIZE)); 
  DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
                           (ptr - record_buffer->buffer - 
                             ARCHIVE_ROW_HEADER_SIZE)));

  DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
807 808 809 810 811 812 813 814 815 816 817 818
}


/* 
  Look at ha_archive::open() for an explanation of the row format.
  Here we just write out the row.

  Wondering about start_bulk_insert()? We don't implement it for
  archive since it optimizes for lots of writes. The only save
  for implementing start_bulk_insert() is that we could skip 
  setting dirty to true each time.
*/
819
int ha_archive::write_row(uchar *buf)
820 821
{
  int rc;
822
  uchar *read_buf= NULL;
823
  ulonglong temp_auto;
824
  uchar *record=  table->record[0];
825 826 827
  DBUG_ENTER("ha_archive::write_row");

  if (share->crashed)
828
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
829

unknown's avatar
unknown committed
830
  ha_statistic_increment(&SSV::ha_write_count);
831 832 833
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  pthread_mutex_lock(&share->mutex);
834

835 836 837 838 839
  if (!share->archive_write_open)
    if (init_archive_writer())
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);


840
  if (table->next_number_field && record == table->record[0])
841 842 843 844 845 846
  {
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
    update_auto_increment();
    temp_auto= table->next_number_field->val_int();

    /*
847 848
      We don't support decremening auto_increment. They make the performance
      just cry.
849
    */
850
    if (temp_auto <= share->archive_write.auto_increment && 
851 852 853 854 855
        mkey->flags & HA_NOSAME)
    {
      rc= HA_ERR_FOUND_DUPP_KEY;
      goto error;
    }
856
#ifdef DEAD_CODE
857 858 859 860 861 862
    /*
      Bad news, this will cause a search for the unique value which is very 
      expensive since we will have to do a table scan which will lock up 
      all other writers during this period. This could perhaps be optimized 
      in the future.
    */
863 864 865 866 867
    {
      /* 
        First we create a buffer that we can use for reading rows, and can pass
        to get_row().
      */
868
      if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
      {
        rc= HA_ERR_OUT_OF_MEM;
        goto error;
      }
       /* 
         All of the buffer must be written out or we won't see all of the
         data 
       */
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      /*
        Set the position of the local read thread to the beginning postion.
      */
      if (read_data_header(&archive))
      {
        rc= HA_ERR_CRASHED_ON_USAGE;
        goto error;
      }

      Field *mfield= table->next_number_field;

      while (!(get_row(&archive, read_buf)))
      {
891 892
        if (!memcmp(read_buf + mfield->offset(record),
                    table->next_number_field->ptr,
unknown's avatar
unknown committed
893
                    mfield->max_display_length()))
894 895 896 897 898 899
        {
          rc= HA_ERR_FOUND_DUPP_KEY;
          goto error;
        }
      }
    }
900
#endif
901 902
    else
    {
903
      if (temp_auto > share->archive_write.auto_increment)
904 905
        stats.auto_increment_value=
          (share->archive_write.auto_increment= temp_auto) + 1;
906 907 908 909 910 911 912
    }
  }

  /*
    Notice that the global auto_increment has been increased.
    In case of a failed row write, we will never try to reuse the value.
  */
913
  share->rows_recorded++;
914
  rc= real_write_row(buf,  &(share->archive_write));
915
error:
916
  pthread_mutex_unlock(&share->mutex);
917
  if (read_buf)
918
    my_free((uchar*) read_buf, MYF(0));
919

920
  DBUG_RETURN(rc);
921 922
}

923

924 925 926 927
void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
                                    ulonglong nb_desired_values,
                                    ulonglong *first_value,
                                    ulonglong *nb_reserved_values)
928
{
929
  *nb_reserved_values= ULONGLONG_MAX;
930
  *first_value= share->archive_write.auto_increment + 1;
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945
}

/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
  DBUG_ENTER("ha_archive::index_init");
  active_index= keynr;
  DBUG_RETURN(0);
}


/*
  No indexes, so if we get a request for an index search since we tell
  the optimizer that we have unique indexes, we scan
*/
946
int ha_archive::index_read(uchar *buf, const uchar *key,
947 948 949 950 951 952 953 954 955
                             uint key_len, enum ha_rkey_function find_flag)
{
  int rc;
  DBUG_ENTER("ha_archive::index_read");
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
  DBUG_RETURN(rc);
}


956
int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
957 958
                                 uint key_len, enum ha_rkey_function find_flag)
{
959
  int rc;
960 961
  bool found= 0;
  KEY *mkey= &table->s->key_info[index];
962 963 964
  current_k_offset= mkey->key_part->offset;
  current_key= key;
  current_key_len= key_len;
965 966 967 968


  DBUG_ENTER("ha_archive::index_read_idx");

969
  rc= rnd_init(TRUE);
970

971
  if (rc)
972 973 974 975
    goto error;

  while (!(get_row(&archive, buf)))
  {
976
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
977 978 979 980 981 982 983 984 985 986 987 988 989
    {
      found= 1;
      break;
    }
  }

  if (found)
    DBUG_RETURN(0);

error:
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}

990

991
int ha_archive::index_next(uchar * buf) 
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
{ 
  bool found= 0;

  DBUG_ENTER("ha_archive::index_next");

  while (!(get_row(&archive, buf)))
  {
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
    {
      found= 1;
      break;
    }
  }

  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
}

1009 1010 1011 1012 1013
/*
  All calls that need to scan the table start with this method. If we are told
  that it is a table scan we rewind the file to the beginning, otherwise
  we assume the position will be set.
*/
1014

1015 1016 1017
int ha_archive::rnd_init(bool scan)
{
  DBUG_ENTER("ha_archive::rnd_init");
1018 1019 1020
  
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1021

1022 1023
  init_archive_reader();

1024
  /* We rewind the file so that we can read from the beginning if scan */
1025
  if (scan)
1026
  {
1027
    scan_rows= stats.records;
1028 1029
    DBUG_PRINT("info", ("archive will retrieve %llu rows", 
                        (unsigned long long) scan_rows));
1030

unknown's avatar
unknown committed
1031
    if (read_data_header(&archive))
1032
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1033 1034
  }

1035 1036 1037 1038 1039 1040 1041 1042
  DBUG_RETURN(0);
}


/*
  This is the method that is used to read a row. It assumes that the row is 
  positioned where you want it.
*/
1043
int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1044
{
1045 1046
  int rc;
  DBUG_ENTER("ha_archive::get_row");
1047
  DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", 
1048 1049 1050
                            (uchar)file_to_read->version, 
                            ARCHIVE_VERSION));
  if (file_to_read->version == ARCHIVE_VERSION)
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
    rc= get_row_version3(file_to_read, buf);
  else
    rc= get_row_version2(file_to_read, buf);

  DBUG_PRINT("ha_archive", ("Return %d\n", rc));

  DBUG_RETURN(rc);
}

/* Reallocate buffer if needed */
1061
bool ha_archive::fix_rec_buff(unsigned int length)
1062
{
1063 1064 1065 1066 1067
  DBUG_ENTER("ha_archive::fix_rec_buff");
  DBUG_PRINT("ha_archive", ("Fixing %u for %u", 
                            length, record_buffer->length));
  DBUG_ASSERT(record_buffer->buffer);

unknown's avatar
unknown committed
1068
  if (length > record_buffer->length)
1069
  {
1070 1071
    uchar *newptr;
    if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer, 
1072
                                    length,
1073
				    MYF(MY_ALLOW_ZERO_PTR))))
1074
      DBUG_RETURN(1);
1075 1076 1077
    record_buffer->buffer= newptr;
    record_buffer->length= length;
  }
1078 1079 1080 1081

  DBUG_ASSERT(length <= record_buffer->length);

  DBUG_RETURN(0);
1082 1083
}

1084
int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1085 1086 1087
{
  DBUG_ENTER("ha_archive::unpack_row");

1088
  unsigned int read;
1089
  int error;
1090
  uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1091
  unsigned int row_len;
1092 1093

  /* First we grab the length stored */
1094
  read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1095

1096
  if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1097 1098 1099 1100 1101 1102
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  /* If we read nothing we are at the end of the file */
  if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

1103 1104 1105
  row_len=  uint4korr(size_buffer);
  DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len, 
                           (unsigned int)table->s->reclength));
1106 1107 1108 1109 1110

  if (fix_rec_buff(row_len))
  {
    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
  }
1111
  DBUG_ASSERT(row_len <= record_buffer->length);
1112

1113 1114 1115 1116
  read= azread(file_to_read, record_buffer->buffer, row_len, &error);

  if (read != row_len || error)
  {
1117
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1118
  }
1119 1120

  /* Copy null bits */
1121
  const uchar *ptr= record_buffer->buffer;
1122 1123 1124
  memcpy(record, ptr, table->s->null_bytes);
  ptr+= table->s->null_bytes;
  for (Field **field=table->field ; *field ; field++)
1125
  {
1126 1127
    if (!((*field)->is_null()))
    {
1128
      ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1129
    }
1130
  }
1131 1132 1133 1134
  DBUG_RETURN(0);
}


1135
int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1136 1137
{
  DBUG_ENTER("ha_archive::get_row_version3");
unknown's avatar
unknown committed
1138

1139
  int returnable= unpack_row(file_to_read, buf);
unknown's avatar
unknown committed
1140

1141 1142 1143 1144
  DBUG_RETURN(returnable);
}


1145
int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1146
{
1147
  unsigned int read;
1148
  int error;
1149
  uint *ptr, *end;
1150 1151
  char *last;
  size_t total_blob_length= 0;
1152
  MY_BITMAP *read_set= table->read_set;
1153
  DBUG_ENTER("ha_archive::get_row_version2");
1154

1155
  read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1156

1157 1158 1159 1160
  /* If we read nothing we are at the end of the file */
  if (read == 0)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

1161 1162
  if (read != table->s->reclength)
  {
1163
    DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u", 
1164
                                                read, 
1165
                                                (unsigned int)table->s->reclength));
1166 1167
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
  }
1168

1169
  if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1170
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1171

1172 1173 1174
  /* 
    If the record is the wrong size, the file is probably damaged, unless 
    we are dealing with a delayed insert or a bulk insert.
1175
  */
1176
  if ((ulong) read != table->s->reclength)
1177
    DBUG_RETURN(HA_ERR_END_OF_FILE);
1178 1179

  /* Calculate blob length, we use this for our buffer */
1180 1181 1182
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1183
  {
1184 1185 1186
    if (bitmap_is_set(read_set,
                      (((Field_blob*) table->field[*ptr])->field_index)))
        total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1187
  }
1188 1189 1190

  /* Adjust our row buffer if we need be */
  buffer.alloc(total_blob_length);
1191
  last= (char *)buffer.ptr();
1192

1193
  /* Loop through our blobs and read them */
1194 1195 1196
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1197
  {
1198
    size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1199 1200
    if (size)
    {
1201 1202
      if (bitmap_is_set(read_set,
                        ((Field_blob*) table->field[*ptr])->field_index))
1203
      {
1204 1205 1206 1207 1208
        read= azread(file_to_read, last, size, &error);

        if (error)
          DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1209 1210
        if ((size_t) read != size)
          DBUG_RETURN(HA_ERR_END_OF_FILE);
1211
        ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1212 1213 1214 1215 1216 1217
        last += size;
      }
      else
      {
        (void)azseek(file_to_read, size, SEEK_CUR);
      }
1218
    }
1219 1220 1221 1222
  }
  DBUG_RETURN(0);
}

1223

1224 1225 1226 1227
/* 
  Called during ORDER BY. Its position is either from being called sequentially
  or by having had ha_archive::rnd_pos() called before it is called.
*/
1228

1229
int ha_archive::rnd_next(uchar *buf)
1230 1231
{
  int rc;
1232
  DBUG_ENTER("ha_archive::rnd_next");
1233

1234 1235 1236
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1237 1238 1239 1240
  if (!scan_rows)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  scan_rows--;

unknown's avatar
unknown committed
1241
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
unknown's avatar
unknown committed
1242 1243
  current_position= aztell(&archive);
  rc= get_row(&archive, buf);
1244

1245
  table->status=rc ? STATUS_NOT_FOUND: 0;
1246 1247 1248 1249 1250

  DBUG_RETURN(rc);
}


1251
/*
1252 1253 1254 1255
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
  each call to ha_archive::rnd_next() if an ordering of the rows is
  needed.
*/
1256

1257
void ha_archive::position(const uchar *record)
1258 1259
{
  DBUG_ENTER("ha_archive::position");
1260
  my_store_ptr(ref, ref_length, current_position);
1261 1262 1263 1264 1265
  DBUG_VOID_RETURN;
}


/*
1266 1267 1268 1269
  This is called after a table scan for each row if the results of the
  scan need to be ordered. It will take *pos and use it to move the
  cursor in the file so that the next row that is called is the
  correctly ordered row.
1270
*/
1271

1272
int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1273 1274
{
  DBUG_ENTER("ha_archive::rnd_pos");
unknown's avatar
unknown committed
1275
  ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1276
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
unknown's avatar
unknown committed
1277
  if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1278
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
unknown's avatar
unknown committed
1279
  DBUG_RETURN(get_row(&archive, buf));
1280 1281
}

1282 1283 1284 1285 1286 1287 1288 1289
int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
{
  if (share->version < ARCHIVE_VERSION)
    return HA_ADMIN_NEEDS_ALTER;
  return 0;
}


1290
/*
1291
  This method repairs the meta file. It does this by walking the datafile and 
1292 1293
  rewriting the meta file. If EXTENDED repair is requested, we attempt to
  recover as much data as possible.
1294
*/
1295
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1296
{
1297
  DBUG_ENTER("ha_archive::repair");
1298
  int rc= optimize(thd, check_opt);
1299

1300
  if (rc)
1301
    DBUG_RETURN(HA_ADMIN_CORRUPT);
1302

1303
  share->crashed= FALSE;
1304
  DBUG_RETURN(0);
1305 1306
}

1307 1308 1309
/*
  The table can become fragmented if data was inserted, read, and then
  inserted again. What we do is open up the file and recompress it completely. 
1310
*/
1311 1312 1313
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  DBUG_ENTER("ha_archive::optimize");
1314
  int rc= 0;
unknown's avatar
unknown committed
1315
  azio_stream writer;
1316 1317
  char writer_filename[FN_REFLEN];

1318 1319
  init_archive_reader();

1320
  // now we close both our writer and our reader for the rename
1321
  if (share->archive_write_open)
1322 1323
  {
    azclose(&(share->archive_write));
1324
    share->archive_write_open= FALSE;
1325
  }
1326

1327
  /* Lets create a file to contain the new data */
1328
  fn_format(writer_filename, share->table_name, "", ARN, 
1329
            MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1330

1331
  if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1332 1333 1334 1335 1336 1337
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 

  /* 
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
    Any dead rows are removed (aka rows that may have been partially recorded). 

1338 1339 1340 1341
    As of Archive format 3, this is the only type that is performed, before this
    version it was just done on T_EXTEND
  */
  if (1)
1342
  {
1343 1344
    DBUG_PRINT("ha_archive", ("archive extended rebuild"));

1345 1346 1347 1348
    /*
      Now we will rewind the archive file so that we are positioned at the 
      start of the file.
    */
unknown's avatar
unknown committed
1349
    rc= read_data_header(&archive);
1350 1351 1352 1353 1354 1355

    /* 
      On success of writing out the new header, we now fetch each row and
      insert it into the new archive file. 
    */
    if (!rc)
1356 1357
    {
      share->rows_recorded= 0;
1358 1359
      stats.auto_increment_value= 1;
      share->archive_write.auto_increment= 0;
1360
      my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
1361

1362
      while (!(rc= get_row(&archive, table->record[0])))
1363
      {
1364
        real_write_row(table->record[0], &writer);
1365 1366 1367 1368
        /*
          Long term it should be possible to optimize this so that
          it is not called on each row.
        */
1369 1370 1371
        if (table->found_next_number_field)
        {
          Field *field= table->found_next_number_field;
1372
          ulonglong auto_value=
1373 1374
            (ulonglong) field->val_int(table->record[0] +
                                       field->offset(table->record[0]));
1375
          if (share->archive_write.auto_increment < auto_value)
1376 1377
            stats.auto_increment_value=
              (share->archive_write.auto_increment= auto_value) + 1;
1378
        }
1379
      }
1380

1381
      dbug_tmp_restore_column_map(table->read_set, org_bitmap);
unknown's avatar
unknown committed
1382
      share->rows_recorded= (ha_rows)writer.rows;
1383
    }
1384

1385 1386
    DBUG_PRINT("info", ("recovered %llu archive rows", 
                        (unsigned long long)share->rows_recorded));
1387 1388

    DBUG_PRINT("ha_archive", ("recovered %llu archive rows", 
1389
                        (unsigned long long)share->rows_recorded));
1390

1391 1392 1393 1394 1395 1396 1397 1398
    /*
      If REPAIR ... EXTENDED is requested, try to recover as much data
      from data file as possible. In this case if we failed to read a
      record, we assume EOF. This allows massive data loss, but we can
      hardly do more with broken zlib stream. And this is the only way
      to restore at least what is still recoverable.
    */
    if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1399 1400 1401
      goto error;
  } 

unknown's avatar
unknown committed
1402
  azclose(&writer);
1403
  share->dirty= FALSE;
1404 1405 1406 1407 1408 1409
  
  azclose(&archive);

  // make the file we just wrote be our data file
  rc = my_rename(writer_filename,share->data_file_name,MYF(0));

1410

1411
  DBUG_RETURN(rc);
1412
error:
1413
  DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
unknown's avatar
unknown committed
1414
  azclose(&writer);
1415 1416 1417

  DBUG_RETURN(rc); 
}
1418 1419 1420 1421 1422 1423 1424 1425

/* 
  Below is an example of how to setup row level locking.
*/
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
                                       THR_LOCK_DATA **to,
                                       enum thr_lock_type lock_type)
{
1426 1427 1428 1429 1430
  if (lock_type == TL_WRITE_DELAYED)
    delayed_insert= TRUE;
  else
    delayed_insert= FALSE;

1431 1432
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
  {
1433 1434 1435
    /* 
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set 
1436
      If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT
1437 1438 1439 1440
      TABLESPACE, then allow multiple writers 
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1441 1442
         lock_type <= TL_WRITE) && delayed_insert == FALSE &&
        !thd_in_lock_tables(thd)
unknown's avatar
unknown committed
1443
        && !thd_tablespace_op(thd))
1444 1445 1446 1447 1448 1449 1450 1451 1452 1453
      lock_type = TL_WRITE_ALLOW_WRITE;

    /* 
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2. 
    */

unknown's avatar
unknown committed
1454
    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) 
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
      lock_type = TL_READ;

    lock.type=lock_type;
  }

  *to++= &lock;

  return to;
}

1465 1466
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
{
1467 1468 1469
  DBUG_ENTER("ha_archive::update_create_info");

  ha_archive::info(HA_STATUS_AUTO);
1470
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1471
  {
1472
    create_info->auto_increment_value= stats.auto_increment_value;
1473
  }
1474

unknown's avatar
unknown committed
1475
  if (!(my_readlink(share->real_path, share->data_file_name, MYF(0))))
1476
    create_info->data_file_name= share->real_path;
1477 1478

  DBUG_VOID_RETURN;
1479 1480
}

1481 1482 1483 1484

/*
  Hints for optimizer, see ha_tina for more information
*/
1485
int ha_archive::info(uint flag)
1486 1487
{
  DBUG_ENTER("ha_archive::info");
1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503

  /* 
    If dirty, we lock, and then reset/flush the data.
    I found that just calling azflush() doesn't always work.
  */
  pthread_mutex_lock(&share->mutex);
  if (share->dirty == TRUE)
  {
    if (share->dirty == TRUE)
    {
      DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      share->dirty= FALSE;
    }
  }

1504 1505 1506 1507
  /* 
    This should be an accurate number now, though bulk and delayed inserts can
    cause the number to be inaccurate.
  */
1508
  stats.records= share->rows_recorded;
1509 1510
  pthread_mutex_unlock(&share->mutex);

1511
  stats.deleted= 0;
1512 1513

  DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1514 1515 1516 1517 1518 1519 1520
  /* Costs quite a bit more to get all information */
  if (flag & HA_STATUS_TIME)
  {
    MY_STAT file_stat;  // Stat information for the data file

    VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));

1521
    stats.data_file_length= file_stat.st_size;
1522 1523
    stats.create_time= (ulong) file_stat.st_ctime;
    stats.update_time= (ulong) file_stat.st_mtime;
1524
    stats.mean_rec_length= stats.records ?
1525
      ulong(stats.data_file_length / stats.records) : table->s->reclength;
1526
    stats.max_data_file_length= MAX_FILE_SIZE;
1527
  }
1528 1529
  stats.delete_length= 0;
  stats.index_file_length=0;
1530

1531
  if (flag & HA_STATUS_AUTO)
1532
  {
1533
    init_archive_reader();
1534
    pthread_mutex_lock(&share->mutex);
1535
    azflush(&archive, Z_SYNC_FLUSH);
1536
    pthread_mutex_unlock(&share->mutex);
1537
    stats.auto_increment_value= archive.auto_increment + 1;
1538
  }
1539

1540
  DBUG_RETURN(0);
1541
}
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551


/*
  This method tells us that a bulk insert operation is about to occur. We set
  a flag which will keep write_row from saying that its data is dirty. This in
  turn will keep selects from causing a sync to occur.
  Basically, yet another optimizations to keep compression working well.
*/
void ha_archive::start_bulk_insert(ha_rows rows)
{
1552
  DBUG_ENTER("ha_archive::start_bulk_insert");
1553 1554
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
    bulk_insert= TRUE;
1555 1556 1557 1558 1559 1560 1561 1562
  DBUG_VOID_RETURN;
}


/* 
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
  flag, and set the share dirty so that the next select will call sync for us.
*/
1563
int ha_archive::end_bulk_insert()
1564
{
1565
  DBUG_ENTER("ha_archive::end_bulk_insert");
1566 1567 1568 1569
  bulk_insert= FALSE;
  share->dirty= TRUE;
  DBUG_RETURN(0);
}
1570 1571 1572 1573 1574 1575 1576 1577 1578

/*
  We cancel a truncate command. The only way to delete an archive table is to drop it.
  This is done for security reasons. In a later version we will enable this by 
  allowing the user to select a different row format.
*/
int ha_archive::delete_all_rows()
{
  DBUG_ENTER("ha_archive::delete_all_rows");
unknown's avatar
unknown committed
1579
  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1580
}
1581 1582 1583 1584 1585 1586

/*
  We just return state if asked.
*/
bool ha_archive::is_crashed() const 
{
1587 1588
  DBUG_ENTER("ha_archive::is_crashed");
  DBUG_RETURN(share->crashed); 
1589 1590 1591 1592 1593 1594 1595 1596 1597
}

/*
  Simple scan of the tables to make sure everything is ok.
*/

int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
{
  int rc= 0;
unknown's avatar
unknown committed
1598
  const char *old_proc_info;
1599 1600 1601
  ha_rows count= share->rows_recorded;
  DBUG_ENTER("ha_archive::check");

unknown's avatar
unknown committed
1602
  old_proc_info= thd_proc_info(thd, "Checking table");
1603
  /* Flush any waiting data */
1604
  pthread_mutex_lock(&share->mutex);
unknown's avatar
unknown committed
1605
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1606
  pthread_mutex_unlock(&share->mutex);
1607 1608 1609 1610 1611

  /*
    Now we will rewind the archive file so that we are positioned at the 
    start of the file.
  */
1612
  init_archive_reader();
1613 1614 1615
  read_data_header(&archive);
  while (!(rc= get_row(&archive, table->record[0])))
    count--;
1616

unknown's avatar
unknown committed
1617
  thd_proc_info(thd, old_proc_info);
1618 1619 1620 1621 1622 1623

  if ((rc && rc != HA_ERR_END_OF_FILE) || count)  
  {
    share->crashed= FALSE;
    DBUG_RETURN(HA_ADMIN_CORRUPT);
  }
1624 1625

  DBUG_RETURN(HA_ADMIN_OK);
1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637
}

/*
  Check and repair the table if needed.
*/
bool ha_archive::check_and_repair(THD *thd) 
{
  HA_CHECK_OPT check_opt;
  DBUG_ENTER("ha_archive::check_and_repair");

  check_opt.init();

1638
  DBUG_RETURN(repair(thd, &check_opt));
1639
}
unknown's avatar
unknown committed
1640

1641
archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) 
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
{
  DBUG_ENTER("ha_archive::create_record_buffer");
  archive_record_buffer *r;
  if (!(r= 
        (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
                                           MYF(MY_WME))))
  {
    DBUG_RETURN(NULL); /* purecov: inspected */
  }
  r->length= (int)length;

1653
  if (!(r->buffer= (uchar*) my_malloc(r->length,
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
                                    MYF(MY_WME))))
  {
    my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
    DBUG_RETURN(NULL); /* purecov: inspected */
  }

  DBUG_RETURN(r);
}

void ha_archive::destroy_record_buffer(archive_record_buffer *r) 
{
  DBUG_ENTER("ha_archive::destroy_record_buffer");
  my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
  my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
  DBUG_VOID_RETURN;
}

unknown's avatar
unknown committed
1671
struct st_mysql_storage_engine archive_storage_engine=
1672
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
unknown's avatar
unknown committed
1673 1674 1675 1676

mysql_declare_plugin(archive)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
unknown's avatar
unknown committed
1677 1678
  &archive_storage_engine,
  "ARCHIVE",
unknown's avatar
unknown committed
1679
  "Brian Aker, MySQL AB",
unknown's avatar
unknown committed
1680
  "Archive storage engine",
1681
  PLUGIN_LICENSE_GPL,
unknown's avatar
unknown committed
1682
  archive_db_init, /* Plugin Init */
unknown's avatar
unknown committed
1683
  archive_db_done, /* Plugin Deinit */
1684
  0x0300 /* 3.0 */,
1685 1686 1687
  NULL,                       /* status variables                */
  NULL,                       /* system variables                */
  NULL                        /* config options                  */
unknown's avatar
unknown committed
1688 1689
}
mysql_declare_plugin_end;
unknown's avatar
unknown committed
1690