Commit a9428621 authored by Rusty Russell's avatar Rusty Russell

tdb2: try to fit transactions in existing space before we expand.

Currently we use the worst-case-possible size for the recovery area.
Instead, prepare the recovery data, then see whether it's too large.

Note that this currently works out to make the database *larger* on
our speed benchmark, since we happen to need to enlarge the recovery
area at the wrong time now, rather than the old case where its already
hugely oversized.

Before:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real	0m50.366s
user	0m17.109s
sys	0m2.468s
-rw------- 1 rusty rusty 564215952 2011-04-27 21:31 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real	1m23.818s
user	0m0.304s
sys	0m0.508s
-rw------- 1 rusty rusty 669856 2011-04-27 21:32 torture.tdb
Adding 2000000 records:  887 ns (110556088 bytes)
Finding 2000000 records:  556 ns (110556088 bytes)
Missing 2000000 records:  385 ns (110556088 bytes)
Traversing 2000000 records:  401 ns (110556088 bytes)
Deleting 2000000 records:  710 ns (244003768 bytes)
Re-adding 2000000 records:  825 ns (244003768 bytes)
Appending 2000000 records:  1255 ns (268404160 bytes)
Churning 2000000 records:  2299 ns (268404160 bytes)

After:
$ time ./growtdb-bench 250000 10 > /dev/null && ls -l /tmp/growtdb.tdb && time ./tdbtorture -s 0 && ls -l torture.tdb && ./speed --transaction 2000000
real	0m47.127s
user	0m17.125s
sys	0m2.456s
-rw------- 1 rusty rusty 366680288 2011-04-27 21:34 /tmp/growtdb.tdb
testing with 3 processes, 5000 loops, seed=0
OK

real	1m16.049s
user	0m0.300s
sys	0m0.492s
-rw------- 1 rusty rusty 244472 2011-04-27 21:35 torture.tdb
Adding 2000000 records:  894 ns (110551992 bytes)
Finding 2000000 records:  564 ns (110551992 bytes)
Missing 2000000 records:  398 ns (110551992 bytes)
Traversing 2000000 records:  399 ns (110551992 bytes)
Deleting 2000000 records:  711 ns (225633208 bytes)
Re-adding 2000000 records:  819 ns (225633208 bytes)
Appending 2000000 records:  1252 ns (248196544 bytes)
Churning 2000000 records:  2319 ns (424005056 bytes)
parent cfc7d301
...@@ -222,7 +222,7 @@ static inline unsigned frec_ftable(const struct tdb_free_record *f) ...@@ -222,7 +222,7 @@ static inline unsigned frec_ftable(const struct tdb_free_record *f)
struct tdb_recovery_record { struct tdb_recovery_record {
uint64_t magic; uint64_t magic;
/* Length of record. */ /* Length of record (add this header to get total length). */
uint64_t max_len; uint64_t max_len;
/* Length used. */ /* Length used. */
uint64_t len; uint64_t len;
......
...@@ -88,7 +88,6 @@ ...@@ -88,7 +88,6 @@
fsync/msync calls are made. fsync/msync calls are made.
*/ */
/* /*
hold the context of any current transaction hold the context of any current transaction
*/ */
...@@ -608,7 +607,7 @@ void tdb_transaction_cancel(struct tdb_context *tdb) ...@@ -608,7 +607,7 @@ void tdb_transaction_cancel(struct tdb_context *tdb)
} }
/* /*
work out how much space the linearised recovery data will consume work out how much space the linearised recovery data will consume (worst case)
*/ */
static tdb_len_t tdb_recovery_size(struct tdb_context *tdb) static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
{ {
...@@ -634,129 +633,38 @@ static tdb_len_t tdb_recovery_size(struct tdb_context *tdb) ...@@ -634,129 +633,38 @@ static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
return recovery_size; return recovery_size;
} }
/* static enum TDB_ERROR tdb_recovery_area(struct tdb_context *tdb,
allocate the recovery area, or use an existing recovery area if it is const struct tdb_methods *methods,
large enough
*/
static enum TDB_ERROR tdb_recovery_allocate(struct tdb_context *tdb,
tdb_len_t *recovery_size,
tdb_off_t *recovery_offset, tdb_off_t *recovery_offset,
tdb_len_t *recovery_max_size) struct tdb_recovery_record *rec)
{ {
struct tdb_recovery_record rec;
const struct tdb_methods *methods = tdb->transaction->io_methods;
tdb_off_t recovery_head;
size_t addition;
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
recovery_head = tdb_read_off(tdb, offsetof(struct tdb_header,recovery)); *recovery_offset = tdb_read_off(tdb,
if (TDB_OFF_IS_ERR(recovery_head)) { offsetof(struct tdb_header, recovery));
return tdb_logerr(tdb, recovery_head, TDB_LOG_ERROR, if (TDB_OFF_IS_ERR(*recovery_offset)) {
"tdb_recovery_allocate:" return *recovery_offset;
" failed to read recovery head");
} }
if (recovery_head != 0) { if (*recovery_offset == 0) {
ecode = methods->tread(tdb, recovery_head, &rec, sizeof(rec)); rec->max_len = 0;
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to read recovery record");
}
tdb_convert(tdb, &rec, sizeof(rec));
/* ignore invalid recovery regions: can happen in crash */
if (rec.magic != TDB_RECOVERY_MAGIC &&
rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
recovery_head = 0;
}
}
*recovery_size = tdb_recovery_size(tdb);
if (recovery_head != 0 && *recovery_size <= rec.max_len) {
/* it fits in the existing area */
*recovery_max_size = rec.max_len;
*recovery_offset = recovery_head;
return TDB_SUCCESS; return TDB_SUCCESS;
} }
/* we need to free up the old recovery area, then allocate a ecode = methods->tread(tdb, *recovery_offset, rec, sizeof(*rec));
new one at the end of the file. Note that we cannot use if (ecode != TDB_SUCCESS)
normal allocation to allocate the new one as that might return return ecode;
us an area that is being currently used (as of the start of
the transaction) */
if (recovery_head != 0) {
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_head,
sizeof(rec) + rec.max_len,
TDB_LOCK_WAIT, true);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to free previous"
" recovery area");
}
}
/* the tdb_free() call might have increased the recovery size */
*recovery_size = tdb_recovery_size(tdb);
/* round up to a multiple of page size. Overallocate, since each
* such allocation forces us to expand the file. */
*recovery_max_size
= (((sizeof(rec) + *recovery_size + *recovery_size / 2)
+ PAGESIZE-1) & ~(PAGESIZE-1))
- sizeof(rec);
*recovery_offset = tdb->file->map_size;
recovery_head = *recovery_offset;
/* Restore ->map_size before calling underlying expand_file.
Also so that we don't try to expand the file again in the
transaction commit, which would destroy the recovery
area */
addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
sizeof(rec) + *recovery_max_size;
tdb->file->map_size = tdb->transaction->old_map_size;
ecode = methods->expand_file(tdb, addition);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to create recovery area");
}
/* we have to reset the old map size so that we don't try to
expand the file again in the transaction commit, which
would destroy the recovery area */
tdb->transaction->old_map_size = tdb->file->map_size;
/* write the recovery header offset and sync - we can sync without a race here tdb_convert(tdb, rec, sizeof(*rec));
as the magic ptr in the recovery record has not been set */ /* ignore invalid recovery regions: can happen in crash */
tdb_convert(tdb, &recovery_head, sizeof(recovery_head)); if (rec->magic != TDB_RECOVERY_MAGIC &&
ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery), rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
&recovery_head, sizeof(tdb_off_t)); *recovery_offset = 0;
if (ecode != TDB_SUCCESS) { rec->max_len = 0;
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to write recovery head");
} }
transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
&recovery_head,
sizeof(tdb_off_t));
return TDB_SUCCESS; return TDB_SUCCESS;
} }
/* Set up header for the recovery record. */
static void set_recovery_header(struct tdb_recovery_record *rec,
uint64_t magic,
uint64_t datalen, uint64_t actuallen,
uint64_t oldsize)
{
rec->magic = magic;
rec->max_len = actuallen;
rec->len = datalen;
rec->eof = oldsize;
}
static unsigned int same(const unsigned char *new, static unsigned int same(const unsigned char *new,
const unsigned char *old, const unsigned char *old,
unsigned int length) unsigned int length)
...@@ -795,44 +703,27 @@ static unsigned int different(const unsigned char *new, ...@@ -795,44 +703,27 @@ static unsigned int different(const unsigned char *new,
return length - *samelen; return length - *samelen;
} }
/* /* Allocates recovery blob, without tdb_recovery_record at head set up. */
setup the recovery data that will be used on a crash during commit static struct tdb_recovery_record *alloc_recovery(struct tdb_context *tdb,
*/ tdb_len_t *len)
static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
tdb_off_t *magic_offset)
{ {
/* Initialized for GCC's 4.4.5 overzealous uninitialized warnings. */
tdb_len_t recovery_size = 0;
tdb_off_t recovery_offset = 0, recovery_max_size = 0;
unsigned char *data, *p;
const struct tdb_methods *methods = tdb->transaction->io_methods;
struct tdb_recovery_record *rec; struct tdb_recovery_record *rec;
tdb_off_t old_map_size = tdb->transaction->old_map_size; size_t i;
uint64_t magic;
int i;
enum TDB_ERROR ecode; enum TDB_ERROR ecode;
unsigned char *p;
const struct tdb_methods *methods = tdb->transaction->io_methods;
/* rec = malloc(sizeof(*rec) + tdb_recovery_size(tdb));
check that the recovery area has enough space if (!rec) {
*/ tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
ecode = tdb_recovery_allocate(tdb, &recovery_size,
&recovery_offset, &recovery_max_size);
if (ecode != TDB_SUCCESS) {
return ecode;
}
data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
if (data == NULL) {
return tdb_logerr(tdb, TDB_ERR_OOM, TDB_LOG_ERROR,
"transaction_setup_recovery:" "transaction_setup_recovery:"
" cannot allocate"); " cannot allocate");
return TDB_ERR_PTR(TDB_ERR_OOM);
} }
rec = (struct tdb_recovery_record *)data;
/* build the recovery data into a single blob to allow us to do a single /* build the recovery data into a single blob to allow us to do a single
large write, which should be more efficient */ large write, which should be more efficient */
p = data + sizeof(*rec); p = (unsigned char *)(rec + 1);
for (i=0;i<tdb->transaction->num_blocks;i++) { for (i=0;i<tdb->transaction->num_blocks;i++) {
tdb_off_t offset; tdb_off_t offset;
tdb_len_t length; tdb_len_t length;
...@@ -849,25 +740,26 @@ static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb, ...@@ -849,25 +740,26 @@ static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
length = tdb->transaction->last_block_size; length = tdb->transaction->last_block_size;
} }
if (offset >= old_map_size) { if (offset >= tdb->transaction->old_map_size) {
continue; continue;
} }
if (offset + length > tdb->file->map_size) { if (offset + length > tdb->file->map_size) {
free(data); free(rec);
return tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR, tdb_logerr(tdb, TDB_ERR_CORRUPT, TDB_LOG_ERROR,
"tdb_transaction_setup_recovery:" "tdb_transaction_setup_recovery:"
" transaction data over new region" " transaction data over new region"
" boundary"); " boundary");
return TDB_ERR_PTR(TDB_ERR_CORRUPT);
} }
if (offset + length > old_map_size) { if (offset + length > tdb->transaction->old_map_size) {
/* Short read at EOF. */ /* Short read at EOF. */
length = old_map_size - offset; length = tdb->transaction->old_map_size - offset;
} }
ecode = methods->tread(tdb, offset, buffer, length); ecode = methods->tread(tdb, offset, buffer, length);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
free(data); free(rec);
return ecode; return TDB_ERR_PTR(ecode);
} }
/* Skip over anything the same at the start. */ /* Skip over anything the same at the start. */
...@@ -894,49 +786,160 @@ static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb, ...@@ -894,49 +786,160 @@ static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb,
} }
} }
/* Now we know size, set up rec header. */ *len = p - (unsigned char *)(rec + 1);
set_recovery_header(rec, TDB_RECOVERY_INVALID_MAGIC, return rec;
p - data - sizeof(*rec), }
recovery_max_size, old_map_size);
tdb_convert(tdb, rec, sizeof(*rec)); static tdb_off_t create_recovery_area(struct tdb_context *tdb,
tdb_len_t rec_length,
struct tdb_recovery_record *rec)
{
tdb_off_t off, recovery_off;
tdb_len_t addition;
enum TDB_ERROR ecode;
const struct tdb_methods *methods = tdb->transaction->io_methods;
/* round up to a multiple of page size. Overallocate, since each
* such allocation forces us to expand the file. */
rec->max_len
= (((sizeof(*rec) + rec_length + rec_length / 2)
+ PAGESIZE-1) & ~(PAGESIZE-1))
- sizeof(*rec);
off = tdb->file->map_size;
/* Restore ->map_size before calling underlying expand_file.
Also so that we don't try to expand the file again in the
transaction commit, which would destroy the recovery
area */
addition = (tdb->file->map_size - tdb->transaction->old_map_size) +
sizeof(*rec) + rec->max_len;
tdb->file->map_size = tdb->transaction->old_map_size;
ecode = methods->expand_file(tdb, addition);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to create recovery area");
}
/* we have to reset the old map size so that we don't try to
expand the file again in the transaction commit, which
would destroy the recovery area */
tdb->transaction->old_map_size = tdb->file->map_size;
/* write the recovery header offset and sync - we can sync without a race here
as the magic ptr in the recovery record has not been set */
recovery_off = off;
tdb_convert(tdb, &recovery_off, sizeof(recovery_off));
ecode = methods->twrite(tdb, offsetof(struct tdb_header, recovery),
&recovery_off, sizeof(tdb_off_t));
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to write recovery head");
}
transaction_write_existing(tdb, offsetof(struct tdb_header, recovery),
&recovery_off,
sizeof(tdb_off_t));
return off;
}
/*
setup the recovery data that will be used on a crash during commit
*/
static enum TDB_ERROR transaction_setup_recovery(struct tdb_context *tdb)
{
tdb_len_t recovery_size = 0;
tdb_off_t recovery_off = 0;
tdb_off_t old_map_size = tdb->transaction->old_map_size;
struct tdb_recovery_record *recovery;
const struct tdb_methods *methods = tdb->transaction->io_methods;
uint64_t magic;
enum TDB_ERROR ecode;
recovery = alloc_recovery(tdb, &recovery_size);
if (TDB_PTR_IS_ERR(recovery))
return TDB_PTR_ERR(recovery);
ecode = tdb_recovery_area(tdb, methods, &recovery_off, recovery);
if (ecode) {
free(recovery);
return ecode;
}
if (recovery->max_len < recovery_size) {
/* Not large enough. Free up old recovery area. */
if (recovery_off) {
tdb->stats.frees++;
ecode = add_free_record(tdb, recovery_off,
sizeof(*recovery)
+ recovery->max_len,
TDB_LOCK_WAIT, true);
free(recovery);
if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_recovery_allocate:"
" failed to free previous"
" recovery area");
}
/* Refresh recovery after add_free_record above. */
recovery = alloc_recovery(tdb, &recovery_size);
if (TDB_PTR_IS_ERR(recovery))
return TDB_PTR_ERR(recovery);
}
recovery_off = create_recovery_area(tdb, recovery_size,
recovery);
if (TDB_OFF_IS_ERR(recovery_off)) {
free(recovery);
return recovery_off;
}
}
/* Now we know size, convert rec header. */
recovery->magic = TDB_RECOVERY_INVALID_MAGIC;
recovery->len = recovery_size;
recovery->eof = old_map_size;
tdb_convert(tdb, recovery, sizeof(*recovery));
/* write the recovery data to the recovery area */ /* write the recovery data to the recovery area */
ecode = methods->twrite(tdb, recovery_offset, data, p - data); ecode = methods->twrite(tdb, recovery_off, recovery, recovery_size);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
free(data); free(recovery);
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_transaction_setup_recovery:" "tdb_transaction_setup_recovery:"
" failed to write recovery data"); " failed to write recovery data");
} }
transaction_write_existing(tdb, recovery_offset, data, p - data); transaction_write_existing(tdb, recovery_off, recovery, recovery_size);
free(recovery);
/* as we don't have ordered writes, we have to sync the recovery /* as we don't have ordered writes, we have to sync the recovery
data before we update the magic to indicate that the recovery data before we update the magic to indicate that the recovery
data is present */ data is present */
ecode = transaction_sync(tdb, recovery_offset, p - data); ecode = transaction_sync(tdb, recovery_off, recovery_size);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS)
free(data);
return ecode; return ecode;
}
free(data);
magic = TDB_RECOVERY_MAGIC; magic = TDB_RECOVERY_MAGIC;
tdb_convert(tdb, &magic, sizeof(magic)); tdb_convert(tdb, &magic, sizeof(magic));
*magic_offset = recovery_offset + offsetof(struct tdb_recovery_record, tdb->transaction->magic_offset
magic); = recovery_off + offsetof(struct tdb_recovery_record, magic);
ecode = methods->twrite(tdb, *magic_offset, &magic, sizeof(magic)); ecode = methods->twrite(tdb, tdb->transaction->magic_offset,
&magic, sizeof(magic));
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
return tdb_logerr(tdb, ecode, TDB_LOG_ERROR, return tdb_logerr(tdb, ecode, TDB_LOG_ERROR,
"tdb_transaction_setup_recovery:" "tdb_transaction_setup_recovery:"
" failed to write recovery magic"); " failed to write recovery magic");
} }
transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)); transaction_write_existing(tdb, tdb->transaction->magic_offset,
&magic, sizeof(magic));
/* ensure the recovery magic marker is on disk */ /* ensure the recovery magic marker is on disk */
return transaction_sync(tdb, *magic_offset, sizeof(magic)); return transaction_sync(tdb, tdb->transaction->magic_offset,
sizeof(magic));
} }
static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
...@@ -991,10 +994,9 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb) ...@@ -991,10 +994,9 @@ static enum TDB_ERROR _tdb_transaction_prepare_commit(struct tdb_context *tdb)
/* Since we have whole db locked, we don't need the expansion lock. */ /* Since we have whole db locked, we don't need the expansion lock. */
if (!(tdb->flags & TDB_NOSYNC)) { if (!(tdb->flags & TDB_NOSYNC)) {
/* write the recovery data to the end of the file */ /* Sets up tdb->transaction->recovery and
ecode = transaction_setup_recovery(tdb, * tdb->transaction->magic_offset. */
&tdb->transaction ecode = transaction_setup_recovery(tdb);
->magic_offset);
if (ecode != TDB_SUCCESS) { if (ecode != TDB_SUCCESS) {
return ecode; return ecode;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment