ha_archive.cc 39.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2 of the License, or
  (at your option) any later version.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

unknown's avatar
unknown committed
17
#ifdef USE_PRAGMA_IMPLEMENTATION
18 19 20
#pragma implementation        // gcc: Class implementation
#endif

21
#include "mysql_priv.h"
22 23

#include "ha_archive.h"
24
#include <my_dir.h>
25 26 27 28 29 30 31

/*
  First, if you want to understand storage engines you should look at 
  ha_example.cc and ha_example.h. 
  This example was written as a test case for a customer who needed
  a storage engine without indexes that could compress data very well.
  So, welcome to a completely compressed storage engine. This storage
32
  engine only does inserts. No replace, deletes, or updates. All reads are 
unknown's avatar
unknown committed
33
  complete table scans. Compression is done through azip (bzip compresses
34
  better, but only marginally, if someone asks I could add support for
unknown's avatar
unknown committed
35
  it too, but beaware that it costs a lot more in CPU time then azip).
36 37 38
  
  We keep a file pointer open for each instance of ha_archive for each read
  but for writes we keep one open file handle just for that. We flush it
unknown's avatar
unknown committed
39
  only if we have a read occur. azip handles compressing lots of records
40 41 42 43 44
  at once much better then doing lots of little records between writes.
  It is possible to not lock on writes but this would then mean we couldn't
  handle bulk inserts as well (that is if someone was trying to read at
  the same time since we would want to flush).

45 46 47 48 49 50 51 52 53 54
  A "meta" file is kept alongside the data file. This file serves two purpose.
  The first purpose is to track the number of rows in the table. The second 
  purpose is to determine if the table was closed properly or not. When the 
  meta file is first opened it is marked as dirty. It is opened when the table 
  itself is opened for writing. When the table is closed the new count for rows 
  is written to the meta file and the file is marked as clean. If the meta file 
  is opened and it is marked as dirty, it is assumed that a crash occured. At 
  this point an error occurs and the user is told to rebuild the file.
  A rebuild scans the rows and rewrites the meta file. If corruption is found
  in the data file then the meta file is not repaired.
55

56
  At some point a recovery method for such a drastic case needs to be divised.
57

58
  Locks are row level, and you will get a consistant read. 
59 60 61 62 63 64 65 66 67 68

  For performance as far as table scans go it is quite fast. I don't have
  good numbers but locally it has out performed both Innodb and MyISAM. For
  Innodb the question will be if the table can be fit into the buffer
  pool. For MyISAM its a question of how much the file system caches the
  MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
  doesn't have enough memory to cache entire table that archive turns out 
  to be any faster. For writes it is always a bit slower then MyISAM. It has no
  internal limits though for row length.

69
  Examples between MyISAM (packed) and Archive.
70 71 72 73 74 75 76 77 78 79 80

  Table with 76695844 identical rows:
  29680807 a_archive.ARZ
  920350317 a.MYD


  Table with 8991478 rows (all of Slashdot's comments):
  1922964506 comment_archive.ARZ
  2944970297 comment_text.MYD


81 82 83 84 85 86
  TODO:
   Add bzip optional support.
   Allow users to set compression level.
   Add truncate table command.
   Implement versioning, should be easy.
   Allow for errors, find a way to mark bad rows.
unknown's avatar
unknown committed
87
   Talk to the azip guys, come up with a writable format so that updates are doable
88
     without switching to a block method.
89
   Add optional feature so that rows can be flushed at interval (which will cause less
90 91 92 93 94
     compression but may speed up ordered searches).
   Checkpoint the meta file to allow for faster rebuilds.
   Dirty open (right now the meta file is repaired if a crash occured).
   Option to allow for dirty reads, this would lower the sync calls, which would make
     inserts a lot faster, but would mean highly arbitrary reads.
95 96 97

    -Brian
*/
98 99 100 101 102 103 104 105 106
/*
  Notes on file formats.
  The Meta file is layed out as:
  check - Just an int of 254 to make sure that the the file we are opening was
          never corrupted.
  version - The current version of the file format.
  rows - This is an unsigned long long which is the number of rows in the data
         file.
  check point - Reserved for future use
107
  auto increment - MAX value for autoincrement
unknown's avatar
unknown committed
108 109
  dirty - Status of the file, whether or not its values are the latest. This
          flag is what causes a repair to occur
110 111 112 113 114 115

  The data file:
  check - Just an int of 254 to make sure that the the file we are opening was
          never corrupted.
  version - The current version of the file format.
  data - The data is stored in a "row +blobs" format.
unknown's avatar
unknown committed
116
*/
117

118
/* If the archive storage engine has been inited */
119
static bool archive_inited= FALSE;
120 121 122 123 124
/* Variables for archive share methods */
pthread_mutex_t archive_mutex;
static HASH archive_open_tables;

/* The file extension */
125 126 127 128
#define ARZ ".ARZ"               // The data file
#define ARN ".ARN"               // Files used during an optimize call
#define ARM ".ARM"               // Meta file
/*
129
  uchar + uchar + ulonglong + ulonglong + ulonglong + uchar
130
*/
131 132 133
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
  + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)

134 135 136 137 138
/*
  uchar + uchar
*/
#define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
139

140
/* Static declarations for handerton */
unknown's avatar
unknown committed
141
static handler *archive_create_handler(TABLE_SHARE *table);
142
/*
143 144 145
  Number of rows that will force a bulk insert.
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
146 147


unknown's avatar
unknown committed
148
/* dummy handlerton - only to have something to return from archive_db_init */
149
handlerton archive_hton = {
unknown's avatar
unknown committed
150
  MYSQL_HANDLERTON_INTERFACE_VERSION,
151
  "ARCHIVE",
152 153 154 155
  SHOW_OPTION_YES,
  "Archive storage engine", 
  DB_TYPE_ARCHIVE_DB,
  archive_db_init,
unknown's avatar
unknown committed
156 157
  0,       /* slot */
  0,       /* savepoint size. */
158 159 160 161 162 163 164 165 166 167 168 169 170
  NULL,    /* close_connection */
  NULL,    /* savepoint */
  NULL,    /* rollback to savepoint */
  NULL,    /* releas savepoint */
  NULL,    /* commit */
  NULL,    /* rollback */
  NULL,    /* prepare */
  NULL,    /* recover */
  NULL,    /* commit_by_xid */
  NULL,    /* rollback_by_xid */
  NULL,    /* create_cursor_read_view */
  NULL,    /* set_cursor_read_view */
  NULL,    /* close_cursor_read_view */
171 172 173 174 175 176
  archive_create_handler,    /* Create a new handler */
  NULL,    /* Drop a database */
  archive_db_end,    /* Panic call */
  NULL,    /* Start Consistent Snapshot */
  NULL,    /* Flush logs */
  NULL,    /* Show status */
unknown's avatar
unknown committed
177 178
  NULL,    /* Partition flags */
  NULL,    /* Alter table flags */
179
  NULL,    /* Alter interface */
180 181 182
  HTON_NO_FLAGS,
  NULL, /* binlog_func */
  NULL /* binlog_log_query */
unknown's avatar
unknown committed
183 184
};

unknown's avatar
unknown committed
185
static handler *archive_create_handler(TABLE_SHARE *table)
186 187 188
{
  return new ha_archive(table);
}
unknown's avatar
unknown committed
189

190 191 192 193 194 195 196 197 198 199
/*
  Used for hash table that tracks open tables.
*/
static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length,
                             my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
  return (byte*) share->table_name;
}

200 201 202 203 204 205 206 207 208

/*
  Initialize the archive handler.

  SYNOPSIS
    archive_db_init()
    void

  RETURN
209 210
    FALSE       OK
    TRUE        Error
211 212
*/

213
bool archive_db_init()
214
{
215 216 217
  DBUG_ENTER("archive_db_init");
  if (pthread_mutex_init(&archive_mutex, MY_MUTEX_INIT_FAST))
    goto error;
unknown's avatar
unknown committed
218 219
  if (hash_init(&archive_open_tables, system_charset_info, 32, 0, 0,
                (hash_get_key) archive_get_key, 0, 0))
220 221 222 223 224 225 226 227 228 229 230
  {
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  else
  {
    archive_inited= TRUE;
    DBUG_RETURN(FALSE);
  }
error:
  have_archive_db= SHOW_OPTION_DISABLED;	// If we couldn't use handler
  DBUG_RETURN(TRUE);
231 232 233 234 235 236 237 238 239 240 241 242 243
}

/*
  Release the archive handler.

  SYNOPSIS
    archive_db_end()
    void

  RETURN
    FALSE       OK
*/

244
int archive_db_end(ha_panic_function type)
245
{
246 247 248 249 250 251
  if (archive_inited)
  {
    hash_free(&archive_open_tables);
    VOID(pthread_mutex_destroy(&archive_mutex));
  }
  archive_inited= 0;
252
  return 0;
253 254
}

unknown's avatar
unknown committed
255
ha_archive::ha_archive(TABLE_SHARE *table_arg)
256 257 258 259 260 261
  :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0)
{
  /* Set our original buffer from pre-allocated memory */
  buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);

  /* The size of the offset value we will use for position() */
262
  ref_length = sizeof(my_off_t);
263
}
264

265 266 267
/*
  This method reads the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
268
int ha_archive::read_data_header(azio_stream *file_to_read)
269
{
270
  uchar data_buffer[DATA_BUFFER_SIZE];
271 272
  DBUG_ENTER("ha_archive::read_data_header");

unknown's avatar
unknown committed
273
  if (azrewind(file_to_read) == -1)
274 275
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

unknown's avatar
unknown committed
276
  if (azread(file_to_read, data_buffer, DATA_BUFFER_SIZE) != DATA_BUFFER_SIZE)
277
    DBUG_RETURN(errno ? errno : -1);
278 279 280 281 282 283
  
  DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0]));
  DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1]));
  
  if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&  
      (data_buffer[1] != (uchar)ARCHIVE_VERSION))
284 285 286 287
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  DBUG_RETURN(0);
}
288 289

/*
290 291
  This method writes out the header of a datafile and returns whether or not it was successful.
*/
unknown's avatar
unknown committed
292
int ha_archive::write_data_header(azio_stream *file_to_write)
293
{
294
  uchar data_buffer[DATA_BUFFER_SIZE];
295 296
  DBUG_ENTER("ha_archive::write_data_header");

297 298 299
  data_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
  data_buffer[1]= (uchar)ARCHIVE_VERSION;

unknown's avatar
unknown committed
300
  if (azwrite(file_to_write, &data_buffer, DATA_BUFFER_SIZE) != 
301
      DATA_BUFFER_SIZE)
302
    goto error;
303 304
  DBUG_PRINT("ha_archive::write_data_header", ("Check %u", (uint)data_buffer[0]));
  DBUG_PRINT("ha_archive::write_data_header", ("Version %u", (uint)data_buffer[1]));
305 306 307 308 309 310 311 312 313 314

  DBUG_RETURN(0);
error:
  DBUG_RETURN(errno);
}

/*
  This method reads the header of a meta file and returns whether or not it was successful.
  *rows will contain the current number of rows in the data file upon success.
*/
315 316
int ha_archive::read_meta_file(File meta_file, ha_rows *rows, 
                               ulonglong *auto_increment)
317
{
318
  uchar meta_buffer[META_BUFFER_SIZE];
319
  uchar *ptr= meta_buffer;
320 321 322 323 324
  ulonglong check_point;

  DBUG_ENTER("ha_archive::read_meta_file");

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
325
  if (my_read(meta_file, (byte*)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE)
326 327 328 329 330
    DBUG_RETURN(-1);
  
  /*
    Parse out the meta data, we ignore version at the moment
  */
331 332 333 334 335 336 337 338

  ptr+= sizeof(uchar)*2; // Move past header
  *rows= (ha_rows)uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past rows
  check_point= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past check_point
  *auto_increment= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past auto_increment
339 340 341

  DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0]));
  DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1]));
342 343 344 345
  DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows));
  DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point));
  DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment));
  DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
346 347

  if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || 
348
      ((bool)(*ptr)== TRUE))
349
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
350 351 352 353 354 355 356 357 358

  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}

/*
  This method writes out the header of a meta file and returns whether or not it was successful.
  By setting dirty you say whether or not the file represents the actual state of the data file.
359
  Upon ::open() we set to dirty, and upon ::close() we set to clean.
360
*/
361 362
int ha_archive::write_meta_file(File meta_file, ha_rows rows, 
                                ulonglong auto_increment, bool dirty)
363
{
364
  uchar meta_buffer[META_BUFFER_SIZE];
365
  uchar *ptr= meta_buffer;
366 367
  ulonglong check_point= 0; //Reserved for the future

368 369
  DBUG_ENTER("ha_archive::write_meta_file");

370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
  *ptr= (uchar)ARCHIVE_CHECK_HEADER;
  ptr += sizeof(uchar);
  *ptr= (uchar)ARCHIVE_VERSION;
  ptr += sizeof(uchar);
  int8store(ptr, (ulonglong)rows); 
  ptr += sizeof(ulonglong);
  int8store(ptr, check_point); 
  ptr += sizeof(ulonglong);
  int8store(ptr, auto_increment); 
  ptr += sizeof(ulonglong);
  *ptr= (uchar)dirty;
  DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", 
                                             (uint)ARCHIVE_CHECK_HEADER));
  DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", 
                                             (uint)ARCHIVE_VERSION));
385
  DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows));
386
  DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point));
387 388
  DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu",
                                             auto_increment));
389
  DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty));
390 391

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
392
  if (my_write(meta_file, (byte *)meta_buffer, META_BUFFER_SIZE, 0) != META_BUFFER_SIZE)
393 394 395 396 397 398 399 400 401 402
    DBUG_RETURN(-1);
  
  my_sync(meta_file, MYF(MY_WME));

  DBUG_RETURN(0);
}


/*
  We create the shared memory space that we will use for the open table. 
403 404 405
  No matter what we try to get or create a share. This is so that a repair
  table operation can occur. 

406
  See ha_example.cc for a longer description.
407
*/
408
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
409 410
{
  ARCHIVE_SHARE *share;
411
  char meta_file_name[FN_REFLEN];
412 413 414 415 416 417 418 419 420 421
  uint length;
  char *tmp_name;

  pthread_mutex_lock(&archive_mutex);
  length=(uint) strlen(table_name);

  if (!(share=(ARCHIVE_SHARE*) hash_search(&archive_open_tables,
                                           (byte*) table_name,
                                           length)))
  {
422
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
423 424
                          &share, sizeof(*share),
                          &tmp_name, length+1,
425
                          NullS)) 
426 427 428 429 430
    {
      pthread_mutex_unlock(&archive_mutex);
      return NULL;
    }

431 432 433
    share->use_count= 0;
    share->table_name_length= length;
    share->table_name= tmp_name;
434
    share->crashed= FALSE;
435
    fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
436
    fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
437
    strmov(share->table_name,table_name);
438 439 440 441 442
    /*
      We will use this lock for rows.
    */
    VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
    if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
443
      share->crashed= TRUE;
444 445 446
    
    /*
      After we read, we set the file to dirty. When we close, we will do the 
447 448
      opposite. If the meta file will not open we assume it is crashed and
      leave it up to the user to fix.
449
    */
450 451
    if (read_meta_file(share->meta_file, &share->rows_recorded, 
                       &share->auto_increment_value))
452 453
      share->crashed= TRUE;
    else
454 455
      (void)write_meta_file(share->meta_file, share->rows_recorded,
                            share->auto_increment_value, TRUE);
456
    /* 
457 458 459
      It is expensive to open and close the data files and since you can't have
      a gzip file that can be both read and written we keep a writer open
      that is shared amoung all open tables.
460
    */
461 462
    if (!(azopen(&(share->archive_write), share->data_file_name, 
                 O_WRONLY|O_APPEND|O_BINARY)))
unknown's avatar
unknown committed
463 464
    {
      DBUG_PRINT("info", ("Could not open archive write file"));
465
      share->crashed= TRUE;
unknown's avatar
unknown committed
466
    }
467
    VOID(my_hash_insert(&archive_open_tables, (byte*) share));
468
    thr_lock_init(&share->lock);
469 470 471 472 473 474 475 476 477
  }
  share->use_count++;
  pthread_mutex_unlock(&archive_mutex);

  return share;
}


/* 
478
  Free the share.
479 480
  See ha_example.cc for a description.
*/
481
int ha_archive::free_share(ARCHIVE_SHARE *share)
482 483 484 485 486 487 488
{
  int rc= 0;
  pthread_mutex_lock(&archive_mutex);
  if (!--share->use_count)
  {
    hash_delete(&archive_open_tables, (byte*) share);
    thr_lock_delete(&share->lock);
489
    VOID(pthread_mutex_destroy(&share->mutex));
490 491
    (void)write_meta_file(share->meta_file, share->rows_recorded, 
                          share->auto_increment_value, FALSE);
unknown's avatar
unknown committed
492
    if (azclose(&(share->archive_write)))
493
      rc= 1;
494 495
    if (my_close(share->meta_file, MYF(0)))
      rc= 1;
unknown's avatar
unknown committed
496
    my_free((gptr) share, MYF(0));
497 498 499 500 501 502 503
  }
  pthread_mutex_unlock(&archive_mutex);

  return rc;
}


unknown's avatar
unknown committed
504
/*
505 506
  We just implement one additional file extension.
*/
unknown's avatar
unknown committed
507 508 509 510 511 512
static const char *ha_archive_exts[] = {
  ARZ,
  ARM,
  NullS
};

513
const char **ha_archive::bas_ext() const
unknown's avatar
unknown committed
514 515 516
{
  return ha_archive_exts;
}
517 518 519 520 521 522 523 524


/* 
  When opening a file we:
  Create/get our shared structure.
  Init out lock.
  We open the file we will read from.
*/
525
int ha_archive::open(const char *name, int mode, uint open_options)
526 527 528
{
  DBUG_ENTER("ha_archive::open");

529
  if (!(share= get_share(name, table)))
530
    DBUG_RETURN(HA_ERR_OUT_OF_MEM); // Not handled well by calling code!
531 532
  thr_lock_data_init(&share->lock,&lock,NULL);

unknown's avatar
unknown committed
533
  if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
534
  {
535 536 537
    if (errno == EROFS || errno == EACCES)
      DBUG_RETURN(my_errno= errno);
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
538
  }
539

540 541 542 543
  if (open_options & HA_OPEN_FOR_REPAIR)
    DBUG_RETURN(0);

  DBUG_RETURN(share->crashed ? HA_ERR_CRASHED_ON_USAGE : 0);
544 545 546 547
}


/*
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
  Closes the file.

  SYNOPSIS
    close();
  
  IMPLEMENTATION:

  We first close this storage engines file handle to the archive and
  then remove our reference count to the table (and possibly free it
  as well).

  RETURN
    0  ok
    1  Error
*/

564 565
int ha_archive::close(void)
{
566
  int rc= 0;
567
  DBUG_ENTER("ha_archive::close");
568 569

  /* First close stream */
unknown's avatar
unknown committed
570
  if (azclose(&archive))
571 572 573 574 575
    rc= 1;
  /* then also close share */
  rc|= free_share(share);

  DBUG_RETURN(rc);
576 577 578 579
}


/*
580 581 582 583 584 585
  We create our data file here. The format is pretty simple. 
  You can read about the format of the data file above.
  Unlike other storage engines we do not "pack" our data. Since we 
  are about to do a general compression, packing would just be a waste of 
  CPU time. If the table has blobs they are written after the row in the order 
  of creation.
586 587 588 589
*/

int ha_archive::create(const char *name, TABLE *table_arg,
                       HA_CREATE_INFO *create_info)
590
{
591
  File create_file;  // We use to create the datafile and the metafile
592
  char name_buff[FN_REFLEN];
593
  int error;
594 595
  DBUG_ENTER("ha_archive::create");

596 597 598 599
  auto_increment_value= (create_info->auto_increment_value ?
                   create_info->auto_increment_value -1 :
                   (ulonglong) 0);

600 601 602 603 604 605 606
  if ((create_file= my_create(fn_format(name_buff,name,"",ARM,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
  {
    error= my_errno;
    goto error;
  }
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625

  for (uint key= 0; key < table_arg->s->keys; key++)
  {
    KEY *pos= table_arg->key_info+key;
    KEY_PART_INFO *key_part=     pos->key_part;
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;

    for (; key_part != key_part_end; key_part++)
    {
      Field *field= key_part->field;

      if (!(field->flags & AUTO_INCREMENT_FLAG))
      {
        error= -1;
        goto error;
      }
    }
  }

626
  write_meta_file(create_file, 0, auto_increment_value, FALSE);
627 628 629 630 631
  my_close(create_file,MYF(0));

  /* 
    We reuse name_buff since it is available.
  */
632 633 634 635 636
  if ((create_file= my_create(fn_format(name_buff,name,"",ARZ,
                                        MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
                              O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
  {
    error= my_errno;
637
    goto error;
638
  }
unknown's avatar
unknown committed
639
  if (!azdopen(&archive, create_file, O_WRONLY|O_BINARY))
640
  {
641
    error= errno;
642
    goto error2;
643
  }
unknown's avatar
unknown committed
644
  if (write_data_header(&archive))
645
  {
646 647
    error= errno;
    goto error3;
648
  }
649

unknown's avatar
unknown committed
650
  if (azclose(&archive))
unknown's avatar
unknown committed
651
  {
652
    error= errno;
653
    goto error2;
654 655
  }

656
  DBUG_RETURN(0);
657

658
error3:
unknown's avatar
unknown committed
659 660
  /* We already have an error, so ignore results of azclose. */
  (void)azclose(&archive);
661
error2:
662 663
  my_close(create_file, MYF(0));
  delete_table(name);
664
error:
665 666
  /* Return error number, if we got one */
  DBUG_RETURN(error ? error : -1);
667 668
}

669 670
/*
  This is where the actual row is written out.
671
*/
unknown's avatar
unknown committed
672
int ha_archive::real_write_row(byte *buf, azio_stream *writer)
673
{
674
  my_off_t written;
675
  uint *ptr, *end;
676
  DBUG_ENTER("ha_archive::real_write_row");
677

unknown's avatar
unknown committed
678
  written= azwrite(writer, buf, table->s->reclength);
679 680
  DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", 
                                            written, table->s->reclength));
681
  if (!delayed_insert || !bulk_insert)
682 683
    share->dirty= TRUE;

684
  if (written != (my_off_t)table->s->reclength)
685
    DBUG_RETURN(errno ? errno : -1);
686 687 688 689
  /*
    We should probably mark the table as damagaged if the record is written
    but the blob fails.
  */
unknown's avatar
unknown committed
690
  for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields ;
691 692
       ptr != end ;
       ptr++)
693
  {
694
    char *data_ptr;
695
    uint32 size= ((Field_blob*) table->field[*ptr])->get_length();
696

697 698
    if (size)
    {
699
      ((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr);
unknown's avatar
unknown committed
700
      written= azwrite(writer, data_ptr, (unsigned)size);
701
      if (written != (my_off_t)size)
702
        DBUG_RETURN(errno ? errno : -1);
703
    }
704
  }
705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
  DBUG_RETURN(0);
}


/* 
  Look at ha_archive::open() for an explanation of the row format.
  Here we just write out the row.

  Wondering about start_bulk_insert()? We don't implement it for
  archive since it optimizes for lots of writes. The only save
  for implementing start_bulk_insert() is that we could skip 
  setting dirty to true each time.
*/
int ha_archive::write_row(byte *buf)
{
  int rc;
721 722
  byte *read_buf= NULL;
  ulonglong temp_auto;
723 724 725 726 727 728 729 730 731
  DBUG_ENTER("ha_archive::write_row");

  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  pthread_mutex_lock(&share->mutex);
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780

  if (table->next_number_field)
  {
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
    update_auto_increment();
    temp_auto= table->next_number_field->val_int();

    /*
      Bad news, this will cause a search for the unique value which is very 
      expensive since we will have to do a table scan which will lock up 
      all other writers during this period. This could perhaps be optimized 
      in the future.
    */
    if (temp_auto == share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      rc= HA_ERR_FOUND_DUPP_KEY;
      goto error;
    }

    if (temp_auto < share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      /* 
        First we create a buffer that we can use for reading rows, and can pass
        to get_row().
      */
      if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
      {
        rc= HA_ERR_OUT_OF_MEM;
        goto error;
      }
       /* 
         All of the buffer must be written out or we won't see all of the
         data 
       */
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      /*
        Set the position of the local read thread to the beginning postion.
      */
      if (read_data_header(&archive))
      {
        rc= HA_ERR_CRASHED_ON_USAGE;
        goto error;
      }

      /*
        Now we read and check all of the rows.
        if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
781 782
        if ((longlong)temp_auto == 
            mfield->val_int((char*)(read_buf + mfield->offset())))
783 784 785 786 787
      */
      Field *mfield= table->next_number_field;

      while (!(get_row(&archive, read_buf)))
      {
788 789
        if (!memcmp(read_buf + mfield->offset(), table->next_number_field->ptr,
                    mfield->max_length()))
790 791 792 793 794 795 796 797
        {
          rc= HA_ERR_FOUND_DUPP_KEY;
          goto error;
        }
      }
    }
    else
    {
798 799
      if (temp_auto > share->auto_increment_value)
        auto_increment_value= share->auto_increment_value= temp_auto;
800 801 802 803 804 805 806 807
    }
  }

  /*
    Notice that the global auto_increment has been increased.
    In case of a failed row write, we will never try to reuse the value.
  */

808
  share->rows_recorded++;
unknown's avatar
unknown committed
809
  rc= real_write_row(buf, &(share->archive_write));
810
error:
811
  pthread_mutex_unlock(&share->mutex);
812
  if (read_buf)
813
    my_free((gptr) read_buf, MYF(0));
814

815
  DBUG_RETURN(rc);
816 817
}

818 819 820

ulonglong ha_archive::get_auto_increment()
{
821
  return share->auto_increment_value + 1;
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852
}

/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
  DBUG_ENTER("ha_archive::index_init");
  active_index= keynr;
  DBUG_RETURN(0);
}


/*
  No indexes, so if we get a request for an index search since we tell
  the optimizer that we have unique indexes, we scan
*/
int ha_archive::index_read(byte *buf, const byte *key,
                             uint key_len, enum ha_rkey_function find_flag)
{
  int rc;
  DBUG_ENTER("ha_archive::index_read");
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
  DBUG_RETURN(rc);
}


int ha_archive::index_read_idx(byte *buf, uint index, const byte *key,
                                 uint key_len, enum ha_rkey_function find_flag)
{
  int rc= 0;
  bool found= 0;
  KEY *mkey= &table->s->key_info[index];
853 854 855
  current_k_offset= mkey->key_part->offset;
  current_key= key;
  current_key_len= key_len;
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878


  DBUG_ENTER("ha_archive::index_read_idx");

  /* 
    All of the buffer must be written out or we won't see all of the
    data 
  */
  pthread_mutex_lock(&share->mutex);
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
  pthread_mutex_unlock(&share->mutex);

  /*
    Set the position of the local read thread to the beginning postion.
  */
  if (read_data_header(&archive))
  {
    rc= HA_ERR_CRASHED_ON_USAGE;
    goto error;
  }

  while (!(get_row(&archive, buf)))
  {
879
    if (!memcmp(current_key, buf + current_k_offset, current_key_len))
880 881 882 883 884 885 886 887 888 889 890 891 892
    {
      found= 1;
      break;
    }
  }

  if (found)
    DBUG_RETURN(0);

error:
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}

893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911

int ha_archive::index_next(byte * buf) 
{ 
  bool found= 0;

  DBUG_ENTER("ha_archive::index_next");

  while (!(get_row(&archive, buf)))
  {
    if (!memcmp(current_key, buf+current_k_offset, current_key_len))
    {
      found= 1;
      break;
    }
  }

  DBUG_RETURN(found ? 0 : HA_ERR_END_OF_FILE); 
}

912 913 914 915 916
/*
  All calls that need to scan the table start with this method. If we are told
  that it is a table scan we rewind the file to the beginning, otherwise
  we assume the position will be set.
*/
917

918 919 920
int ha_archive::rnd_init(bool scan)
{
  DBUG_ENTER("ha_archive::rnd_init");
921 922 923
  
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
924

925
  /* We rewind the file so that we can read from the beginning if scan */
926
  if (scan)
927
  {
928
    scan_rows= share->rows_recorded;
929 930
    records= 0;

931 932
    /* 
      If dirty, we lock, and then reset/flush the data.
unknown's avatar
unknown committed
933
      I found that just calling azflush() doesn't always work.
934
    */
935
    if (share->dirty == TRUE)
936
    {
937 938 939
      pthread_mutex_lock(&share->mutex);
      if (share->dirty == TRUE)
      {
unknown's avatar
unknown committed
940
        azflush(&(share->archive_write), Z_SYNC_FLUSH);
941 942 943
        share->dirty= FALSE;
      }
      pthread_mutex_unlock(&share->mutex);
944
    }
945

unknown's avatar
unknown committed
946
    if (read_data_header(&archive))
947
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
948 949
  }

950 951 952 953 954 955 956 957
  DBUG_RETURN(0);
}


/*
  This is the method that is used to read a row. It assumes that the row is 
  positioned where you want it.
*/
unknown's avatar
unknown committed
958
int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
959
{
unknown's avatar
unknown committed
960
  int read; // Bytes read, azread() returns int
961
  uint *ptr, *end;
962 963
  char *last;
  size_t total_blob_length= 0;
964
  DBUG_ENTER("ha_archive::get_row");
965

unknown's avatar
unknown committed
966
  read= azread(file_to_read, buf, table->s->reclength);
967 968
  DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, 
                                     table->s->reclength));
969 970 971

  if (read == Z_STREAM_ERROR)
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
972 973 974 975 976

  /* If we read nothing we are at the end of the file */
  if (read == 0)
    DBUG_RETURN(HA_ERR_END_OF_FILE);

977 978 979
  /* 
    If the record is the wrong size, the file is probably damaged, unless 
    we are dealing with a delayed insert or a bulk insert.
980
  */
981
  if ((ulong) read != table->s->reclength)
982
    DBUG_RETURN(HA_ERR_END_OF_FILE);
983 984

  /* Calculate blob length, we use this for our buffer */
985 986 987
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
988 989 990 991
  {
    if (ha_get_bit_in_read_set(((Field_blob*) table->field[*ptr])->fieldnr))
      total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
  }
992 993 994

  /* Adjust our row buffer if we need be */
  buffer.alloc(total_blob_length);
995
  last= (char *)buffer.ptr();
996

997
  /* Loop through our blobs and read them */
998 999 1000
  for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
       ptr != end ;
       ptr++)
1001
  {
1002
    size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1003 1004
    if (size)
    {
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
      if (ha_get_bit_in_read_set(((Field_blob*) table->field[*ptr])->fieldnr))
      {
        read= azread(file_to_read, last, size);
        if ((size_t) read != size)
          DBUG_RETURN(HA_ERR_END_OF_FILE);
        ((Field_blob*) table->field[*ptr])->set_ptr(size, last);
        last += size;
      }
      else
      {
        (void)azseek(file_to_read, size, SEEK_CUR);
      }
1017
    }
1018 1019 1020 1021
  }
  DBUG_RETURN(0);
}

1022

1023 1024 1025 1026
/* 
  Called during ORDER BY. Its position is either from being called sequentially
  or by having had ha_archive::rnd_pos() called before it is called.
*/
1027

1028 1029 1030
int ha_archive::rnd_next(byte *buf)
{
  int rc;
1031
  DBUG_ENTER("ha_archive::rnd_next");
1032

1033 1034 1035
  if (share->crashed)
      DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

1036 1037 1038 1039
  if (!scan_rows)
    DBUG_RETURN(HA_ERR_END_OF_FILE);
  scan_rows--;

1040 1041
  statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
unknown's avatar
unknown committed
1042 1043
  current_position= aztell(&archive);
  rc= get_row(&archive, buf);
1044 1045


1046
  if (rc != HA_ERR_END_OF_FILE)
1047 1048 1049 1050 1051 1052
    records++;

  DBUG_RETURN(rc);
}


1053
/*
1054 1055 1056 1057
  Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
  each call to ha_archive::rnd_next() if an ordering of the rows is
  needed.
*/
1058

1059 1060 1061
void ha_archive::position(const byte *record)
{
  DBUG_ENTER("ha_archive::position");
1062
  my_store_ptr(ref, ref_length, current_position);
1063 1064 1065 1066 1067
  DBUG_VOID_RETURN;
}


/*
1068 1069 1070 1071
  This is called after a table scan for each row if the results of the
  scan need to be ordered. It will take *pos and use it to move the
  cursor in the file so that the next row that is called is the
  correctly ordered row.
1072
*/
1073

1074 1075 1076
int ha_archive::rnd_pos(byte * buf, byte *pos)
{
  DBUG_ENTER("ha_archive::rnd_pos");
1077 1078
  statistic_increment(table->in_use->status_var.ha_read_rnd_next_count,
		      &LOCK_status);
1079
  current_position= (my_off_t)my_get_ptr(pos, ref_length);
unknown's avatar
unknown committed
1080
  (void)azseek(&archive, current_position, SEEK_SET);
1081

unknown's avatar
unknown committed
1082
  DBUG_RETURN(get_row(&archive, buf));
1083 1084 1085
}

/*
1086
  This method repairs the meta file. It does this by walking the datafile and 
1087 1088
  rewriting the meta file. Currently it does this by calling optimize with
  the extended flag.
1089
*/
1090
int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1091
{
1092
  DBUG_ENTER("ha_archive::repair");
1093 1094
  check_opt->flags= T_EXTEND;
  int rc= optimize(thd, check_opt);
1095

1096 1097
  if (rc)
    DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1098

1099
  share->crashed= FALSE;
1100
  DBUG_RETURN(0);
1101 1102
}

1103 1104 1105
/*
  The table can become fragmented if data was inserted, read, and then
  inserted again. What we do is open up the file and recompress it completely. 
1106
*/
1107 1108 1109
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
  DBUG_ENTER("ha_archive::optimize");
1110
  int rc;
unknown's avatar
unknown committed
1111
  azio_stream writer;
1112 1113
  char writer_filename[FN_REFLEN];

1114
  /* Flush any waiting data */
unknown's avatar
unknown committed
1115
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1116

1117
  /* Lets create a file to contain the new data */
1118 1119
  fn_format(writer_filename, share->table_name, "", ARN, 
            MY_REPLACE_EXT|MY_UNPACK_FILENAME);
1120

unknown's avatar
unknown committed
1121
  if (!(azopen(&writer, writer_filename, O_CREAT|O_WRONLY|O_TRUNC|O_BINARY)))
1122 1123 1124 1125 1126 1127 1128 1129
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); 

  /* 
    An extended rebuild is a lot more effort. We open up each row and re-record it. 
    Any dead rows are removed (aka rows that may have been partially recorded). 
  */

  if (check_opt->flags == T_EXTEND)
1130
  {
unknown's avatar
unknown committed
1131
    DBUG_PRINT("info", ("archive extended rebuild"));
1132
    byte *buf; 
1133

1134 1135 1136 1137 1138 1139 1140 1141 1142
    /* 
      First we create a buffer that we can use for reading rows, and can pass
      to get_row().
    */
    if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
    {
      rc= HA_ERR_OUT_OF_MEM;
      goto error;
    }
1143

1144 1145 1146 1147
    /*
      Now we will rewind the archive file so that we are positioned at the 
      start of the file.
    */
unknown's avatar
unknown committed
1148
    rc= read_data_header(&archive);
1149 1150 1151 1152 1153 1154
    
    /*
      Assuming now error from rewinding the archive file, we now write out the 
      new header for out data file.
    */
    if (!rc)
unknown's avatar
unknown committed
1155
      rc= write_data_header(&writer);
1156 1157 1158 1159 1160 1161

    /* 
      On success of writing out the new header, we now fetch each row and
      insert it into the new archive file. 
    */
    if (!rc)
1162 1163
    {
      share->rows_recorded= 0;
1164
      auto_increment_value= share->auto_increment_value= 0;
unknown's avatar
unknown committed
1165
      while (!(rc= get_row(&archive, buf)))
1166
      {
unknown's avatar
unknown committed
1167
        real_write_row(buf, &writer);
1168 1169 1170 1171 1172 1173 1174 1175
        if (table->found_next_number_field)
        {
          Field *field= table->found_next_number_field;
          if (share->auto_increment_value < 
              field->val_int((char*)(buf + field->offset())))
            auto_increment_value= share->auto_increment_value=
              field->val_int((char*)(buf + field->offset()));
        }
1176 1177 1178
        share->rows_recorded++;
      }
    }
1179

unknown's avatar
unknown committed
1180
    my_free((char*)buf, MYF(0));
1181 1182 1183 1184 1185
    if (rc && rc != HA_ERR_END_OF_FILE)
      goto error;
  } 
  else
  {
unknown's avatar
unknown committed
1186
    DBUG_PRINT("info", ("archive quick rebuild"));
1187 1188 1189
    /* 
      The quick method is to just read the data raw, and then compress it directly.
    */
unknown's avatar
unknown committed
1190
    int read; // Bytes read, azread() returns int
1191
    char block[IO_SIZE];
unknown's avatar
unknown committed
1192
    if (azrewind(&archive) == -1)
1193 1194
    {
      rc= HA_ERR_CRASHED_ON_USAGE;
unknown's avatar
unknown committed
1195
      DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE"));
1196 1197 1198
      goto error;
    }

unknown's avatar
unknown committed
1199 1200
    while ((read= azread(&archive, block, IO_SIZE)))
      azwrite(&writer, block, read);
1201 1202
  }

unknown's avatar
unknown committed
1203
  azclose(&writer);
1204 1205 1206 1207 1208

  my_rename(writer_filename,share->data_file_name,MYF(0));

  DBUG_RETURN(0); 

1209
error:
unknown's avatar
unknown committed
1210
  azclose(&writer);
1211 1212 1213

  DBUG_RETURN(rc); 
}
1214 1215 1216 1217 1218 1219 1220 1221

/* 
  Below is an example of how to setup row level locking.
*/
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
                                       THR_LOCK_DATA **to,
                                       enum thr_lock_type lock_type)
{
1222 1223 1224 1225 1226
  if (lock_type == TL_WRITE_DELAYED)
    delayed_insert= TRUE;
  else
    delayed_insert= FALSE;

1227 1228
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) 
  {
1229 1230 1231 1232 1233 1234 1235 1236 1237
    /* 
      Here is where we get into the guts of a row level lock.
      If TL_UNLOCK is set 
      If we are not doing a LOCK TABLE or DISCARD/IMPORT
      TABLESPACE, then allow multiple writers 
    */

    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
         lock_type <= TL_WRITE) && !thd->in_lock_tables
1238
        && !thd->tablespace_op)
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
      lock_type = TL_WRITE_ALLOW_WRITE;

    /* 
      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
      to t2. Convert the lock to a normal read lock to allow
      concurrent inserts to t2. 
    */

1249
    if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) 
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
      lock_type = TL_READ;

    lock.type=lock_type;
  }

  *to++= &lock;

  return to;
}

1260 1261 1262 1263 1264 1265 1266 1267 1268
void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
{
  ha_archive::info(HA_STATUS_AUTO | HA_STATUS_CONST);
  if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
  {
    create_info->auto_increment_value=auto_increment_value;
  }
}

1269 1270 1271 1272

/*
  Hints for optimizer, see ha_tina for more information
*/
1273 1274 1275
void ha_archive::info(uint flag)
{
  DBUG_ENTER("ha_archive::info");
1276 1277 1278 1279
  /* 
    This should be an accurate number now, though bulk and delayed inserts can
    cause the number to be inaccurate.
  */
1280 1281
  records= share->rows_recorded;
  deleted= 0;
1282 1283 1284 1285 1286 1287 1288
  /* Costs quite a bit more to get all information */
  if (flag & HA_STATUS_TIME)
  {
    MY_STAT file_stat;  // Stat information for the data file

    VOID(my_stat(share->data_file_name, &file_stat, MYF(MY_WME)));

1289
    mean_rec_length= table->s->reclength + buffer.alloced_length();
unknown's avatar
unknown committed
1290
    data_file_length= file_stat.st_size;
1291 1292
    create_time= file_stat.st_ctime;
    update_time= file_stat.st_mtime;
unknown's avatar
unknown committed
1293
    max_data_file_length= share->rows_recorded * mean_rec_length;
1294 1295 1296
  }
  delete_length= 0;
  index_file_length=0;
1297

1298 1299 1300
  if (flag & HA_STATUS_AUTO)
    auto_increment_value= share->auto_increment_value;

1301 1302
  DBUG_VOID_RETURN;
}
1303 1304 1305 1306 1307 1308 1309 1310 1311 1312


/*
  This method tells us that a bulk insert operation is about to occur. We set
  a flag which will keep write_row from saying that its data is dirty. This in
  turn will keep selects from causing a sync to occur.
  Basically, yet another optimizations to keep compression working well.
*/
void ha_archive::start_bulk_insert(ha_rows rows)
{
1313
  DBUG_ENTER("ha_archive::start_bulk_insert");
1314 1315
  if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
    bulk_insert= TRUE;
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
  DBUG_VOID_RETURN;
}


/* 
  Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
  flag, and set the share dirty so that the next select will call sync for us.
*/
int ha_archive::end_bulk_insert()
{
1326
  DBUG_ENTER("ha_archive::end_bulk_insert");
1327 1328 1329 1330
  bulk_insert= FALSE;
  share->dirty= TRUE;
  DBUG_RETURN(0);
}
1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341

/*
  We cancel a truncate command. The only way to delete an archive table is to drop it.
  This is done for security reasons. In a later version we will enable this by 
  allowing the user to select a different row format.
*/
int ha_archive::delete_all_rows()
{
  DBUG_ENTER("ha_archive::delete_all_rows");
  DBUG_RETURN(0);
}
1342 1343 1344 1345 1346 1347

/*
  We just return state if asked.
*/
bool ha_archive::is_crashed() const 
{
1348 1349
  DBUG_ENTER("ha_archive::is_crashed");
  DBUG_RETURN(share->crashed); 
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
}

/*
  Simple scan of the tables to make sure everything is ok.
*/

int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
{
  int rc= 0;
  byte *buf; 
  const char *old_proc_info=thd->proc_info;
  ha_rows count= share->rows_recorded;
  DBUG_ENTER("ha_archive::check");

  thd->proc_info= "Checking table";
  /* Flush any waiting data */
unknown's avatar
unknown committed
1366
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379

  /* 
    First we create a buffer that we can use for reading rows, and can pass
    to get_row().
  */
  if (!(buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
    rc= HA_ERR_OUT_OF_MEM;

  /*
    Now we will rewind the archive file so that we are positioned at the 
    start of the file.
  */
  if (!rc)
unknown's avatar
unknown committed
1380
    read_data_header(&archive);
1381 1382

  if (!rc)
unknown's avatar
unknown committed
1383
    while (!(rc= get_row(&archive, buf)))
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
      count--;

  my_free((char*)buf, MYF(0));

  thd->proc_info= old_proc_info;

  if ((rc && rc != HA_ERR_END_OF_FILE) || count)  
  {
    share->crashed= FALSE;
    DBUG_RETURN(HA_ADMIN_CORRUPT);
  }
  else
  {
    DBUG_RETURN(HA_ADMIN_OK);
  }
}

/*
  Check and repair the table if needed.
*/
bool ha_archive::check_and_repair(THD *thd) 
{
  HA_CHECK_OPT check_opt;
  DBUG_ENTER("ha_archive::check_and_repair");

  check_opt.init();

1411
  DBUG_RETURN(repair(thd, &check_opt));
1412
}