Commit 42297969 authored by monty@donna.mysql.com's avatar monty@donna.mysql.com

Automatic primary key for BDB tables

parent 9323e733
monty@tik.mysql.com
monty@donna.mysql.com
......@@ -1793,16 +1793,20 @@ Big changes made in @strong{MySQL} Version 3.22.12.
@item @strong{MyODBC} (uses ODBC SDK 2.5) --- Gamma
It seems to work well with some programs.
@item Replication -- Alpha
@item Replication -- Alpha / Beta
We are still working on replication, so don't expect this to be rock
solid yet. On the other hand, some @strong{MySQL} users are already
using this with good results.
@item BDB Tables -- Alpha
@item BDB Tables -- Alpha / Beta
The Berkeley DB code is very stable, but we are still improving the interface
between @strong{MySQL} and BDB tables, so it will take some time before this
is as tested as the other table types.
@item Automatic recovery of MyISAM tables - Alpha.
This only affects the new code that checks if the table was closed properly
on open and executes an automatic check/repair of the table if it wasn't.
@end table
MySQL AB provides e-mail support for paying customers, but the @strong{MySQL}
......@@ -7979,12 +7983,10 @@ you should also compile your code to be multi-threaded!
@node Windows and BDB tables., Windows vs Unix, Windows compiling, Windows
@subsection Windows and BDB Tables
We are working on removing the requirement that one must have a primary
key in a BDB table. As soon as this is fixed we will throughly test the
BDB interface by running the @strong{MySQL} benchmark and our internal
test suite on it. When the above is done we will start to release binary
distributions (for Windows and UNIX) of @strong{MySQL} that will include
support for BDB tables.
We will shortly do a full test on the new BDB interface on Windows.
When this is done we will start to release binary distributions (for
Windows and UNIX) of @strong{MySQL} that will include support for BDB
tables.
@cindex Windows, versus UNIX
@cindex operating systems, Windows versus UNIX
......@@ -21876,13 +21878,22 @@ Some characteristic of @code{BDB} tables:
@itemize @bullet
@item
All @code{BDB} tables must have a primary key.
@strong{MySQL} requires a @code{PRIMARY KEY} in each BDB table to be
able to refer to previously read rows; If you don't create on,
@strong{MySQL} will create an maintain a hidden @code{PRIMARY KEY} for
you. The hidden key has a length of 5 bytes and is incremented for each
insert attempt.
@item
If all columns you access in a @code{BDB} tables is part of the same index or
part of the the primary key then @strong{MySQL} can execute the query
without having to access the actual row. In a @code{MyISAM} table the
above holds only if the columns are part of the same index.
@item
The @code{PRIMARY KEY} will be faster than any other key, as the
@code{PRIMARY KEY} is stored together with the row data. As the other keys are
stored as the key data + the @code{PRIMARY KEY}, its important to keep the
@code{PRIMARY KEY} as short as possible to save disk and get better speed.
@item
@code{LOCK TABLES} works on @code{BDB} tables as with other tables. If
you don't use @code{LOCK TABLE}, @strong{MYSQL} will issue an internal
multiple write lock on the table to ensure that the table will be
......@@ -37972,6 +37983,11 @@ though, so 3.23 is not released as a stable version yet.
@appendixsubsec Changes in release 3.23.26
@itemize @bullet
@item
If one don't create a @code{PRIMARY KEY} in a BDB table, a hidden
@code{PRIMARY KEY} will be created.
@item
Added read-only-key optimization to BDB tables.
@item
@code{LEFT JOIN} did in some case prefer a full table scan when one
didn't have a @code{WHERE} clause.
@item
......@@ -44,7 +44,7 @@ class Field {
uint8 null_bit; // And position to it
struct st_table *table; // Pointer for table
ulong query_id; // For quick test of used fields
key_map key_start,part_of_key; // Which keys a field is in
key_map key_start,part_of_key; // Key is part of these keys.
const char *table_name,*field_name;
utype unireg_check;
uint32 field_length; // Length of field
......
......@@ -331,7 +331,7 @@ static ha_rows find_all_keys(SORTPARAM *param, SQL_SELECT *select,
if (! indexfile && ! quick_select)
{
file->reset(); // QQ; Shouldn't be needed
if (table->keyread) // QQ Can be removed after the reset
if (sort_form->key_read) // QQ Can be removed after the reset
file->extra(HA_EXTRA_KEYREAD); // QQ is removed
next_pos=(byte*) 0; /* Find records in sequence */
file->rnd_init();
......
......@@ -21,13 +21,12 @@
- Don't automaticly pack all string keys (To do this we need to modify
CREATE TABLE so that one can use the pack_keys argument per key).
- An argument to pack_key that we don't want compression.
- Interaction with LOCK TABLES (Monty will fix this)
- DB_DBT_USERMEN should be used for fixed length tables
We will need an updated Berkeley DB version for this.
- Killing threads that has got a 'deadlock'
- SHOW TABLE STATUS should give more information about the table.
- Get a more accurate count of the number of rows.
- Introduce hidden primary keys for tables without a primary key
We could store the found number of rows when the table is scanned.
- We will need a manager thread that calls flush_logs, removes old
logs and makes checkpoints at given intervals.
- When not using UPDATE IGNORE, don't make a sub transaction but abort
......@@ -203,6 +202,14 @@ const char **ha_berkeley::bas_ext() const
{ static const char *ext[]= { ha_berkeley_ext, NullS }; return ext; }
static int
berkeley_cmp_hidden_key(const DBT *new_key, const DBT *saved_key)
{
ulonglong a=uint5korr((char*) new_key->data);
ulonglong b=uint5korr((char*) saved_key->data);
return a < b ? -1 : (a > b ? 1 : 0);
}
static int
berkeley_cmp_packed_key(const DBT *new_key, const DBT *saved_key)
{
......@@ -232,6 +239,8 @@ berkeley_cmp_packed_key(const DBT *new_key, const DBT *saved_key)
}
/* The following is not yet used; Should be used for fixed length keys */
static int
berkeley_cmp_fix_length_key(const DBT *new_key, const DBT *saved_key)
{
......@@ -261,19 +270,30 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
int error;
DBUG_ENTER("ha_berkeley::open");
/* Open primary key */
hidden_primary_key=0;
if ((primary_key=table->primary_key) >= MAX_KEY)
{ // No primary key
primary_key=table->keys;
fixed_length_primary_key=1;
ref_length=hidden_primary_key=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
}
key_used_on_scan=primary_key;
/* Need some extra memory in case of packed keys */
uint max_key_length= table->max_key_length + MAX_REF_PARTS*2;
if (!(alloc_ptr=
my_multi_malloc(MYF(MY_WME),
&key_file, table->keys*sizeof(*key_file),
&key_type, table->keys*sizeof(u_int32_t),
&key_file, (table->keys+1)*sizeof(*key_file),
&key_type, (table->keys+1)*sizeof(u_int32_t),
&key_buff, max_key_length,
&key_buff2, max_key_length,
&primary_key_buff,
table->key_info[table->primary_key].key_length,
(hidden_primary_key ? 0 :
table->key_info[table->primary_key].key_length),
NullS)))
DBUG_RETURN(1);
if (!(rec_buff=my_malloc((alloced_rec_buff_length=table->reclength),
if (!(rec_buff=my_malloc((alloced_rec_buff_length=table->rec_buff_length),
MYF(MY_WME))))
{
my_free(alloc_ptr,MYF(0));
......@@ -298,8 +318,9 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
DBUG_RETURN(1);
}
/* Open primary key */
file->set_bt_compare(file, berkeley_cmp_packed_key);
file->set_bt_compare(file,
(hidden_primary_key ? berkeley_cmp_hidden_key :
berkeley_cmp_packed_key));
if ((error=(file->open(file, fn_format(name_buff,name,"", ha_berkeley_ext,
2 | 4),
"main", DB_BTREE, open_mode,0))))
......@@ -314,26 +335,19 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
transaction=0;
cursor=0;
key_read=0;
fixed_length_row=!(table->db_create_options & HA_OPTION_PACK_RECORD);
/* Open other keys */
bzero((char*) key_file,sizeof(*key_file)*table->keys);
if ((key_used_on_scan=primary_key=table->primary_key) < MAX_KEY)
key_file[primary_key]=file;
else // No primary key
{
hidden_primary_key=1;
if (!share->primary_key_inited)
update_auto_primary_key();
}
key_type[primary_key]=DB_NOOVERWRITE;
bzero((char*) &current_row,sizeof(current_row));
DB **ptr=key_file;
for (uint i=0, used_keys=0; i < table->keys ; i++, ptr++)
{
char part[7];
key_type[i]=table->key_info[i].flags & HA_NOSAME ? DB_NOOVERWRITE : 0;
if (i != primary_key)
{
if ((error=db_create(ptr, db_env, 0)))
......@@ -343,6 +357,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
DBUG_RETURN(1);
}
sprintf(part,"key%02d",++used_keys);
key_type[i]=table->key_info[i].flags & HA_NOSAME ? DB_NOOVERWRITE : 0;
(*ptr)->set_bt_compare(*ptr, berkeley_cmp_packed_key);
if (!(table->key_info[i].flags & HA_NOSAME))
(*ptr)->set_flags(*ptr, DB_DUP);
......@@ -355,13 +370,10 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
}
}
}
DBUG_RETURN(0);
}
void ha_berkeley::initialize(void)
{
/* Calculate pack_length of primary key */
if (!hidden_primary_key)
{
ref_length=0;
KEY_PART_INFO *key_part= table->key_info[primary_key].key_part;
KEY_PART_INFO *end=key_part+table->key_info[primary_key].key_parts;
......@@ -369,14 +381,23 @@ void ha_berkeley::initialize(void)
ref_length+= key_part->field->max_packed_col_length(key_part->length);
fixed_length_primary_key=
(ref_length == table->key_info[primary_key].key_length);
}
else
{
if (!share->primary_key_inited)
update_auto_primary_key();
}
DBUG_RETURN(0);
}
int ha_berkeley::close(void)
{
int error,result=0;
uint keys=table->keys + test(hidden_primary_key);
DBUG_ENTER("ha_berkeley::close");
for (uint i=0; i < table->keys; i++)
for (uint i=0; i < keys; i++)
{
if (key_file[i] && (error=key_file[i]->close(key_file[i],0)))
result=error;
......@@ -427,13 +448,20 @@ ulong ha_berkeley::max_row_length(const byte *buf)
pre-allocated.
*/
int ha_berkeley::pack_row(DBT *row, const byte *record)
int ha_berkeley::pack_row(DBT *row, const byte *record, bool new_row)
{
bzero((char*) row,sizeof(*row));
if (fixed_length_row)
{
row->data=(void*) record;
row->size=table->reclength;
row->size=table->reclength+hidden_primary_key;
if (hidden_primary_key)
{
if (new_row)
get_auto_primary_key(current_ident);
memcpy_fixed((char*) record+table->reclength, (char*) current_ident,
BDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
return 0;
}
if (table->blob_fields)
......@@ -448,6 +476,15 @@ int ha_berkeley::pack_row(DBT *row, const byte *record)
for (Field **field=table->field ; *field ; field++)
ptr=(byte*) (*field)->pack((char*) ptr,record + (*field)->offset());
if (hidden_primary_key)
{
if (new_row)
get_auto_primary_key(current_ident);
memcpy_fixed((char*) ptr, (char*) current_ident,
BDB_HIDDEN_PRIMARY_KEY_LENGTH);
ptr+=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
}
row->data=rec_buff;
row->size= (size_t) (ptr - rec_buff);
return 0;
......@@ -457,7 +494,7 @@ int ha_berkeley::pack_row(DBT *row, const byte *record)
void ha_berkeley::unpack_row(char *record, DBT *row)
{
if (fixed_length_row)
memcpy(record,row->data,table->reclength);
memcpy(record,(char*) row->data,table->reclength+hidden_primary_key);
else
{
/* Copy null bits */
......@@ -470,6 +507,37 @@ void ha_berkeley::unpack_row(char *record, DBT *row)
}
/* Store the key and the primary key into the row */
void ha_berkeley::unpack_key(char *record, DBT *key, uint index)
{
KEY *key_info=table->key_info+index;
KEY_PART_INFO *key_part= key_info->key_part,
*end=key_part+key_info->key_parts;
char *pos=(char*) key->data;
for ( ; key_part != end; key_part++)
{
if (key_part->null_bit)
{
if (!*pos++) // Null value
{
/*
We don't need to reset the record data as we will not access it
if the null data is set
*/
record[key_part->null_offset]|=key_part->null_bit;
continue;
}
record[key_part->null_offset]&= ~key_part->null_bit;
}
pos= (char*) key_part->field->unpack(record + key_part->field->offset(),
pos);
}
}
/*
Create a packed key from from a row
This will never fail as the key buffer is pre allocated.
......@@ -478,12 +546,20 @@ void ha_berkeley::unpack_row(char *record, DBT *row)
DBT *ha_berkeley::pack_key(DBT *key, uint keynr, char *buff,
const byte *record)
{
bzero((char*) key,sizeof(*key));
if (hidden_primary_key && keynr == primary_key)
{
key->data=current_ident;
key->size=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
return key;
}
KEY *key_info=table->key_info+keynr;
KEY_PART_INFO *key_part=key_info->key_part;
KEY_PART_INFO *end=key_part+key_info->key_parts;
DBUG_ENTER("pack_key");
bzero((char*) key,sizeof(*key));
key->data=buff;
key->app_private= key_info;
......@@ -561,7 +637,7 @@ int ha_berkeley::write_row(byte * record)
update_timestamp(record+table->time_stamp-1);
if (table->next_number_field && record == table->record[0])
update_auto_increment();
if ((error=pack_row(&row, record)))
if ((error=pack_row(&row, record,1)))
DBUG_RETURN(error);
if (table->keys == 1)
......@@ -678,7 +754,7 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
pack_key(&old_key, primary_key, key_buff2, old_row);
if ((error=remove_key(trans, primary_key, old_row, (DBT *) 0, &old_key)))
DBUG_RETURN(error); // This should always succeed
if ((error=pack_row(&row, new_row)))
if ((error=pack_row(&row, new_row, 0)))
{
// Out of memory (this shouldn't happen!)
(void) file->put(file, trans, &old_key, &row,
......@@ -697,7 +773,7 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
else
{
// Primary key didn't change; just update the row data
if ((error=pack_row(&row, new_row)))
if ((error=pack_row(&row, new_row, 0)))
DBUG_RETURN(error);
error=file->put(file, trans, prim_key, &row, 0);
if (error)
......@@ -719,12 +795,24 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
statistic_increment(ha_update_count,&LOCK_status);
if (table->time_stamp)
update_timestamp(new_row+table->time_stamp-1);
if (hidden_primary_key)
{
primary_key_changed=0;
bzero((char*) &prim_key,sizeof(prim_key));
prim_key.data= (void*) current_ident;
prim_key.size=BDB_HIDDEN_PRIMARY_KEY_LENGTH;
old_prim_key=prim_key;
}
else
{
pack_key(&prim_key, primary_key, key_buff, new_row);
if ((primary_key_changed=key_cmp(primary_key, old_row, new_row)))
pack_key(&old_prim_key, primary_key, primary_key_buff, old_row);
else
old_prim_key=prim_key;
}
LINT_INIT(error);
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
......@@ -869,9 +957,12 @@ int ha_berkeley::delete_row(const byte * record)
DBUG_ENTER("delete_row");
statistic_increment(ha_delete_count,&LOCK_status);
if ((error=pack_row(&row, record)))
if ((error=pack_row(&row, record, 0)))
DBUG_RETURN((error));
pack_key(&prim_key, primary_key, key_buff, record);
if (hidden_primary_key)
keys|= (key_map) 1 << primary_key;
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
{
DB_TXN *sub_trans;
......@@ -934,7 +1025,7 @@ int ha_berkeley::index_end()
/* What to do after we have read a row based on an index */
int ha_berkeley::read_row(int error, char *buf, uint keynr, DBT *row,
bool read_next)
DBT *found_key, bool read_next)
{
DBUG_ENTER("read_row");
if (error)
......@@ -944,9 +1035,22 @@ int ha_berkeley::read_row(int error, char *buf, uint keynr, DBT *row,
table->status=STATUS_NOT_FOUND;
DBUG_RETURN(error);
}
if (hidden_primary_key)
memcpy_fixed(current_ident,
(char*) row->data+row->size-BDB_HIDDEN_PRIMARY_KEY_LENGTH,
BDB_HIDDEN_PRIMARY_KEY_LENGTH);
table->status=0;
if (keynr != primary_key)
{
/* We only found the primary key. Now we have to use this to find
the row data */
if (key_read && found_key)
{
unpack_key(buf,found_key,keynr);
if (!hidden_primary_key)
unpack_key(buf,row,primary_key);
DBUG_RETURN(0);
}
DBT key;
bzero((char*) &key,sizeof(key));
key.data=key_buff2;
......@@ -963,7 +1067,6 @@ int ha_berkeley::read_row(int error, char *buf, uint keynr, DBT *row,
row= &current_row;
}
unpack_row(buf,row);
table->status=0;
DBUG_RETURN(0);
}
......@@ -980,7 +1083,7 @@ int ha_berkeley::index_read_idx(byte * buf, uint keynr, const byte * key,
pack_key(&last_key, keynr, key_buff, key,
key_len),
&current_row,0),
buf, keynr, &current_row, 0));
buf, keynr, &current_row, &last_key, 0));
}
......@@ -999,7 +1102,7 @@ int ha_berkeley::index_read(byte * buf, const byte * key,
key_buff,
key, key_len),
&row, DB_SET),
buf, active_index, &row, 0);
buf, active_index, &row, (DBT*) 0, 0);
}
else
{
......@@ -1009,7 +1112,7 @@ int ha_berkeley::index_read(byte * buf, const byte * key,
memcpy(key_buff2, key_buff, last_key.size);
((KEY*) last_key.app_private)->handler.bdb_return_if_eq= -1;
error=read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE),
buf, active_index, &row, 0);
buf, active_index, &row, (DBT*) 0, 0);
((KEY*) last_key.app_private)->handler.bdb_return_if_eq=0;
if (!error && find_flag == HA_READ_KEY_EXACT)
{
......@@ -1030,7 +1133,7 @@ int ha_berkeley::index_next(byte * buf)
statistic_increment(ha_read_next_count,&LOCK_status);
bzero((char*) &row,sizeof(row));
DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
buf, active_index, &row ,1));
buf, active_index, &row, &last_key, 1));
}
int ha_berkeley::index_next_same(byte * buf, const byte *key, uint keylen)
......@@ -1042,11 +1145,11 @@ int ha_berkeley::index_next_same(byte * buf, const byte *key, uint keylen)
bzero((char*) &row,sizeof(row));
if (keylen == table->key_info[active_index].key_length)
error=read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT_DUP),
buf, active_index, &row,1);
buf, active_index, &row, &last_key, 1);
else
{
error=read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
buf, active_index, &row,1);
buf, active_index, &row, &last_key, 1);
if (!error && ::key_cmp(table, key, active_index, keylen))
error=HA_ERR_END_OF_FILE;
}
......@@ -1061,7 +1164,7 @@ int ha_berkeley::index_prev(byte * buf)
statistic_increment(ha_read_prev_count,&LOCK_status);
bzero((char*) &row,sizeof(row));
DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV),
buf, active_index, &row,1));
buf, active_index, &row, &last_key, 1));
}
......@@ -1072,7 +1175,7 @@ int ha_berkeley::index_first(byte * buf)
statistic_increment(ha_read_first_count,&LOCK_status);
bzero((char*) &row,sizeof(row));
DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_FIRST),
buf, active_index, &row,0));
buf, active_index, &row, &last_key, 0));
}
int ha_berkeley::index_last(byte * buf)
......@@ -1082,7 +1185,7 @@ int ha_berkeley::index_last(byte * buf)
statistic_increment(ha_read_last_count,&LOCK_status);
bzero((char*) &row,sizeof(row));
DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_LAST),
buf, active_index, &row,0));
buf, active_index, &row, &last_key, 0));
}
int ha_berkeley::rnd_init(bool scan)
......@@ -1103,7 +1206,7 @@ int ha_berkeley::rnd_next(byte *buf)
statistic_increment(ha_read_rnd_next_count,&LOCK_status);
bzero((char*) &row,sizeof(row));
DBUG_RETURN(read_row(cursor->c_get(cursor, &last_key, &row, DB_NEXT),
buf, active_index, &row, 1));
buf, active_index, &row, &last_key, 1));
}
......@@ -1136,12 +1239,17 @@ int ha_berkeley::rnd_pos(byte * buf, byte *pos)
return read_row(file->get(file, transaction,
get_pos(&db_pos, pos),
&current_row, 0),
buf, active_index, &current_row,0);
buf, active_index, &current_row, (DBT*) 0, 0);
}
void ha_berkeley::position(const byte *record)
{
DBT key;
if (hidden_primary_key)
{
memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
else
pack_key(&key, primary_key, ref, record);
}
......@@ -1162,11 +1270,27 @@ void ha_berkeley::info(uint flag)
int ha_berkeley::extra(enum ha_extra_function operation)
{
switch (operation) {
case HA_EXTRA_RESET:
case HA_EXTRA_RESET_STATE:
key_read=0;
break;
case HA_EXTRA_KEYREAD:
key_read=1; // Query satisfied with key
break;
case HA_EXTRA_NO_KEYREAD:
key_read=0;
break;
default:
break;
}
return 0;
}
int ha_berkeley::reset(void)
{
key_read=0; // Reset to state after open
return 0;
}
......@@ -1280,6 +1404,7 @@ int ha_berkeley::create(const char *name, register TABLE *form,
{
char name_buff[FN_REFLEN];
char part[7];
uint index=1;
DBUG_ENTER("ha_berkeley::create");
fn_format(name_buff,name,"", ha_berkeley_ext,2 | 4);
......@@ -1288,15 +1413,19 @@ int ha_berkeley::create(const char *name, register TABLE *form,
if (create_sub_table(name_buff,"main",DB_BTREE,0))
DBUG_RETURN(1);
primary_key=table->primary_key;
/* Create the keys */
for (uint i=1; i < form->keys; i++)
for (uint i=0; i < form->keys; i++)
{
if (i != primary_key)
{
sprintf(part,"key%02d",i);
sprintf(part,"key%02d",index++);
if (create_sub_table(name_buff, part, DB_BTREE,
(table->key_info[i].flags & HA_NOSAME) ? 0 :
DB_DUP))
DBUG_RETURN(1);
}
}
/* Create the status block to save information from last status command */
/* Is DB_BTREE the best option here ? (QUEUE can't be used in sub tables) */
......@@ -1403,7 +1532,7 @@ static BDB_SHARE *get_share(const char *table_name)
return 0;
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex);
pthread_mutex_init(&share->mutex,NULL);
}
}
share->use_count++;
......@@ -1427,17 +1556,17 @@ static void free_share(BDB_SHARE *share)
void ha_berkeley::update_auto_primary_key()
{
(void) extra(HA_EXTRA_KEYREAD);
pthread_mutex_lock(&share->mutex);
if (!share->primary_key_inited)
{
(void) extra(HA_EXTRA_KEYREAD);
index_init(primary_key);
if (!index_last(table->record[1]))
share->auto_ident=current_ident;
share->auto_ident=uint5korr(current_ident);
index_end();
(void) extra(HA_EXTRA_NO_KEYREAD);
}
pthread_mutex_unlock(&share->mutex);
(void) extra(HA_EXTRA_NO_KEYREAD);
}
#endif /* HAVE_BERKELEY_DB */
......@@ -23,15 +23,18 @@
#include <db.h>
#define BDB_HIDDEN_PRIMARY_KEY_LENGTH 5
typedef struct st_berkeley_share {
ulonglong auto_ident;
THR_LOCK lock;
pthread_mutex_t mutex;
char *table_name;
uint table_name_length,use_count;
my_bool inited;
bool primary_key_inited;
} BDB_SHARE;
class ha_berkeley: public handler
{
THR_LOCK_DATA lock;
......@@ -46,13 +49,15 @@ class ha_berkeley: public handler
BDB_SHARE *share;
ulong int_option_flag;
ulong alloced_rec_buff_length;
uint primary_key,last_dup_key;
bool fixed_length_row, fixed_length_primary_key, hidden_primary_key;
uint primary_key,last_dup_key, hidden_primary_key;
bool fixed_length_row, fixed_length_primary_key, key_read;
bool fix_rec_buff_for_blob(ulong length);
byte current_ident[BDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const byte *buf);
int pack_row(DBT *row,const byte *record);
int pack_row(DBT *row,const byte *record, bool new_row);
void unpack_row(char *record, DBT *row);
void ha_berkeley::unpack_key(char *record, DBT *key, uint index);
DBT *pack_key(DBT *key, uint keynr, char *buff, const byte *record);
DBT *pack_key(DBT *key, uint keynr, char *buff, const byte *key_ptr,
uint key_length);
......@@ -64,7 +69,7 @@ class ha_berkeley: public handler
int update_primary_key(DB_TXN *trans, bool primary_key_changed,
const byte * old_row, const byte * new_row,
DBT *prim_key);
int read_row(int error, char *buf, uint keynr, DBT *row, bool);
int read_row(int error, char *buf, uint keynr, DBT *row, DBT *key, bool);
DBT *get_pos(DBT *to, byte *pos);
public:
......@@ -72,9 +77,8 @@ class ha_berkeley: public handler
int_option_flag(HA_READ_NEXT | HA_READ_PREV |
HA_REC_NOT_IN_SEQ |
HA_KEYPOS_TO_RNDPOS | HA_READ_ORDER | HA_LASTKEY_ORDER |
HA_LONGLONG_KEYS | HA_NULL_KEY |
HA_BLOB_KEY |
HA_REQUIRE_PRIMARY_KEY | HA_NOT_EXACT_COUNT |
HA_LONGLONG_KEYS | HA_NULL_KEY | HA_HAVE_KEY_READ_ONLY |
HA_BLOB_KEY | HA_NOT_EXACT_COUNT |
HA_PRIMARY_KEY_IN_READ_INDEX | HA_DROP_BEFORE_CREATE),
last_dup_key((uint) -1)
{
......@@ -84,14 +88,14 @@ class ha_berkeley: public handler
const char **bas_ext() const;
ulong option_flag() const { return int_option_flag; }
uint max_record_length() const { return HA_MAX_REC_LENGTH; }
uint max_keys() const { return MAX_KEY; }
uint max_keys() const { return MAX_KEY-1; }
uint max_key_parts() const { return MAX_REF_PARTS; }
uint max_key_length() const { return MAX_KEY_LENGTH; }
uint extra_rec_buf_length() { return BDB_HIDDEN_PRIMARY_KEY_LENGTH; }
bool fast_key_read() { return 1;}
bool has_transactions() { return 1;}
int open(const char *name, int mode, uint test_if_locked);
void initialize(void);
int close(void);
double scan_time();
int write_row(byte * buf);
......@@ -129,6 +133,16 @@ class ha_berkeley: public handler
int delete_table(const char *name);
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type);
void update_auto_primary_key();
inline void get_auto_primary_key(byte *to)
{
ulonglong tmp;
pthread_mutex_lock(&share->mutex);
share->auto_ident++;
int5store(to,share->auto_ident);
pthread_mutex_unlock(&share->mutex);
}
};
extern bool berkeley_skip;
......
......@@ -202,6 +202,7 @@ class handler :public Sql_alloc
virtual double read_time(ha_rows rows) { return rows; }
virtual bool fast_key_read() { return 0;}
virtual bool has_transactions(){ return 0;}
virtual uint extra_rec_buf_length() { return 0; }
virtual int index_init(uint idx) { active_index=idx; return 0;}
virtual int index_end() {return 0; }
......
......@@ -2347,6 +2347,7 @@ bool QUICK_SELECT::unique_key_range()
QUICK_SELECT *get_quick_select_for_ref(TABLE *table, TABLE_REF *ref)
{
table->file->index_end(); // Remove old cursor
QUICK_SELECT *quick=new QUICK_SELECT(table, ref->key, 1);
KEY *key_info = &table->key_info[ref->key];
KEY_PART *key_part;
......
......@@ -1413,11 +1413,17 @@ Field *find_field_in_table(THD *thd,TABLE *table,const char *name,uint length,
if (field->query_id != thd->query_id)
{
field->query_id=thd->query_id;
field->table->used_fields++;
table->used_fields++;
if (field->part_of_key)
{
if (!(field->part_of_key & table->ref_primary_key))
table->used_keys&=field->part_of_key;
}
else
table->used_keys=0;
}
else
thd->dupp_field=field;
field->table->used_keys&=field->part_of_key;
}
if (check_grants && !thd->master_access && check_grant_column(thd,table,name,length))
return WRONG_GRANT;
......@@ -1659,19 +1665,19 @@ static bool
insert_fields(THD *thd,TABLE_LIST *tables, const char *table_name,
List_iterator<Item> *it)
{
TABLE_LIST *table;
uint found;
DBUG_ENTER("insert_fields");
found=0;
for (table=tables ; table ; table=table->next)
for (; tables ; tables=tables->next)
{
TABLE *table=tables->table;
if (grant_option && !thd->master_access &&
check_grant_all_columns(thd,SELECT_ACL,table->table) )
check_grant_all_columns(thd,SELECT_ACL,table) )
DBUG_RETURN(-1);
if (!table_name || !strcmp(table_name,table->name))
if (!table_name || !strcmp(table_name,tables->name))
{
Field **ptr=table->table->field,*field;
Field **ptr=table->field,*field;
while ((field = *ptr++))
{
Item_field *item= new Item_field(field);
......@@ -1682,10 +1688,17 @@ insert_fields(THD *thd,TABLE_LIST *tables, const char *table_name,
if (field->query_id == thd->query_id)
thd->dupp_field=field;
field->query_id=thd->query_id;
field->table->used_keys&=field->part_of_key;
if (field->part_of_key)
{
if (!(field->part_of_key & table->ref_primary_key))
table->used_keys&=field->part_of_key;
}
else
table->used_keys=0;
}
/* All fields are used */
table->table->used_fields=table->table->fields;
table->used_fields=table->fields;
}
}
if (!found)
......@@ -1750,6 +1763,7 @@ int setup_conds(THD *thd,TABLE_LIST *tables,COND **conds)
// TODO: This could be optimized to use hashed names if t2 had a hash
for (j=0 ; j < t2->fields ; j++)
{
key_map tmp_map;
if (!my_strcasecmp(t1->field[i]->field_name,
t2->field[j]->field_name))
{
......@@ -1760,8 +1774,20 @@ int setup_conds(THD *thd,TABLE_LIST *tables,COND **conds)
tmp->fix_length_and_dec(); // Update cmp_type
tmp->const_item_cache=0;
cond_and->list.push_back(tmp);
t1->used_keys&= t1->field[i]->part_of_key;
t2->used_keys&= t2->field[j]->part_of_key;
if ((tmp_map=t1->field[i]->part_of_key))
{
if (!(tmp_map & t1->ref_primary_key))
t1->used_keys&=tmp_map;
}
else
t1->used_keys=0;
if ((tmp_map=t2->field[j]->part_of_key))
{
if (!(tmp_map & t2->ref_primary_key))
t2->used_keys&=tmp_map;
}
else
t2->used_keys=0;
break;
}
}
......
......@@ -2352,6 +2352,11 @@ make_join_readinfo(JOIN *join,uint options)
break;
case JT_EQ_REF:
table->status=STATUS_NO_RECORD;
if (tab->select)
{
delete tab->select->quick;
tab->select->quick=0;
}
delete tab->quick;
tab->quick=0;
table->file->index_init(tab->ref.key);
......@@ -2365,6 +2370,11 @@ make_join_readinfo(JOIN *join,uint options)
break;
case JT_REF:
table->status=STATUS_NO_RECORD;
if (tab->select)
{
delete tab->select->quick;
tab->select->quick=0;
}
delete tab->quick;
tab->quick=0;
table->file->index_init(tab->ref.key);
......
......@@ -207,49 +207,29 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
}
#endif
/* Allocate handler */
if (!(outparam->file= get_new_handler(outparam,outparam->db_type)))
goto err_not_open;
error=2;
if (db_stat)
{
int err;
if ((err=(outparam->file->
ha_open(index_file,
(db_stat & HA_READ_ONLY ? O_RDONLY : O_RDWR),
(db_stat & HA_OPEN_TEMPORARY ? HA_OPEN_TMP_TABLE :
((db_stat & HA_WAIT_IF_LOCKED) ||
(specialflag & SPECIAL_WAIT_IF_LOCKED)) ?
HA_OPEN_WAIT_IF_LOCKED :
(db_stat & (HA_ABORT_IF_LOCKED | HA_GET_INFO)) ?
HA_OPEN_ABORT_IF_LOCKED :
HA_OPEN_IGNORE_IF_LOCKED) | ha_open_flags))))
{
/* Set a flag if the table is crashed and it can be auto. repaired */
outparam->crashed=(err == HA_ERR_CRASHED &&
outparam->file->auto_repair() &&
!(ha_open_flags & HA_OPEN_FOR_REPAIR));
goto err_not_open; /* purecov: inspected */
}
}
outparam->db_low_byte_first=outparam->file->low_byte_first();
error=4;
outparam->reginfo.lock_type= TL_UNLOCK;
outparam->current_lock=F_UNLCK;
if (db_stat & HA_OPEN_KEYFILE || (prgflag & DELAYED_OPEN)) records=2;
if ((db_stat & HA_OPEN_KEYFILE) || (prgflag & DELAYED_OPEN)) records=2;
else records=1;
if (prgflag & (READ_ALL+EXTRA_RECORD)) records++;
rec_buff_length=ALIGN_SIZE(outparam->reclength+1);
/* QQ: TODO, remove the +1 from below */
rec_buff_length=ALIGN_SIZE(outparam->reclength+1+
outparam->file->extra_rec_buf_length());
if (!(outparam->record[0]= (byte*)
(record = (char *) alloc_root(&outparam->mem_root,
rec_buff_length * records))))
goto err; /* purecov: inspected */
goto err_not_open; /* purecov: inspected */
record[outparam->reclength]=0; // For purify and ->c_ptr()
outparam->rec_buff_length=rec_buff_length;
if (my_pread(file,(byte*) record,(uint) outparam->reclength,
(ulong) (uint2korr(head+6)+uint2korr(head+14)),
MYF(MY_NABP)))
goto err; /* purecov: inspected */
goto err_not_open; /* purecov: inspected */
for (i=0 ; i < records ; i++, record+=rec_buff_length)
{
outparam->record[i]=(byte*) record;
......@@ -265,12 +245,12 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
}
VOID(my_seek(file,pos,MY_SEEK_SET,MYF(0)));
if (my_read(file,(byte*) head,288,MYF(MY_NABP))) goto err;
if (my_read(file,(byte*) head,288,MYF(MY_NABP))) goto err_not_open;
if (crypted)
{
crypted->decode((char*) head+256,288-256);
if (sint2korr(head+284) != 0) // Should be 0
goto err; // Wrong password
goto err_not_open; // Wrong password
}
outparam->fields= uint2korr(head+258);
......@@ -292,13 +272,13 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
(outparam->fields+interval_parts+
keys+3)*sizeof(my_string)+
(n_length+int_length)))))
goto err; /* purecov: inspected */
goto err_not_open; /* purecov: inspected */
outparam->field=field_ptr;
read_length=((uint) (outparam->fields*11)+pos+
(uint) (n_length+int_length));
if (read_string(file,(gptr*) &disk_buff,read_length))
goto err; /* purecov: inspected */
goto err_not_open; /* purecov: inspected */
if (crypted)
{
crypted->decode((char*) disk_buff,read_length);
......@@ -321,6 +301,7 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
if (keynames)
fix_type_pointers(&int_array,&outparam->keynames,1,&keynames);
VOID(my_close(file,MYF(MY_WME)));
file= -1;
record=(char*) outparam->record[0]-1; /* Fieldstart = 1 */
if (null_field_first)
......@@ -426,7 +407,7 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
(uint) key_part->length);
#ifdef EXTRA_DEBUG
if (key_part->fieldnr > outparam->fields)
goto err; // sanity check
goto err_not_open; // sanity check
#endif
if (key_part->fieldnr)
{ // Should always be true !
......@@ -494,6 +475,12 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
(outparam->keys_in_use & ((key_map) 1 << primary_key)))
{
outparam->primary_key=primary_key;
if (outparam->file->option_flag() & HA_PRIMARY_KEY_IN_READ_INDEX)
outparam->ref_primary_key= (key_map) 1 << primary_key;
/*
If we are using an integer as the primary key then allow the user to
refer to it as '_rowid'
*/
if (outparam->key_info[primary_key].key_parts == 1)
{
Field *field= outparam->key_info[primary_key].key_part[0].field;
......@@ -505,6 +492,7 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
outparam->primary_key = MAX_KEY; // we do not have a primary key
}
x_free((gptr) disk_buff);
disk_buff=0;
if (new_field_pack_flag <= 1)
{ /* Old file format with default null */
uint null_length=(outparam->null_fields+7)/8;
......@@ -523,7 +511,7 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
(Field_blob**) alloc_root(&outparam->mem_root,
(uint) (outparam->blob_fields+1)*
sizeof(Field_blob*))))
goto err;
goto err_not_open;
for (ptr=outparam->field ; *ptr ; ptr++)
{
if ((*ptr)->flags & BLOB_FLAG)
......@@ -535,21 +523,42 @@ int openfrm(const char *name, const char *alias, uint db_stat, uint prgflag,
outparam->blob_field=
(Field_blob**) (outparam->field+outparam->fields); // Point at null ptr
/* The table struct is now initialzed; Open the table */
error=2;
if (db_stat)
{
int err;
if ((err=(outparam->file->
ha_open(index_file,
(db_stat & HA_READ_ONLY ? O_RDONLY : O_RDWR),
(db_stat & HA_OPEN_TEMPORARY ? HA_OPEN_TMP_TABLE :
((db_stat & HA_WAIT_IF_LOCKED) ||
(specialflag & SPECIAL_WAIT_IF_LOCKED)) ?
HA_OPEN_WAIT_IF_LOCKED :
(db_stat & (HA_ABORT_IF_LOCKED | HA_GET_INFO)) ?
HA_OPEN_ABORT_IF_LOCKED :
HA_OPEN_IGNORE_IF_LOCKED) | ha_open_flags))))
{
/* Set a flag if the table is crashed and it can be auto. repaired */
outparam->crashed=(err == HA_ERR_CRASHED &&
outparam->file->auto_repair() &&
!(ha_open_flags & HA_OPEN_FOR_REPAIR));
goto err_not_open; /* purecov: inspected */
}
}
outparam->db_low_byte_first=outparam->file->low_byte_first();
my_pthread_setspecific_ptr(THR_MALLOC,old_root);
opened_tables++;
#ifndef DBUG_OFF
if (use_hash)
(void) hash_check(&outparam->name_hash);
#endif
if (db_stat)
outparam->file->initialize();
DBUG_RETURN (0);
err:
if (outparam->file && db_stat)
(void) outparam->file->close();
err_not_open:
x_free((gptr) disk_buff);
if (file > 0)
VOID(my_close(file,MYF(MY_WME)));
err_end: /* Here when no file */
......
......@@ -113,7 +113,7 @@ struct st_table {
byte *record_pointers; /* If sorted in memory */
ha_rows found_records; /* How many records in sort */
ORDER *group;
key_map quick_keys,used_keys;
key_map quick_keys, used_keys, ref_primary_key;
ha_rows quick_rows[MAX_KEY];
uint quick_key_parts[MAX_KEY];
key_part_map const_key_parts[MAX_KEY];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment