Commit f5087965 authored by Rusty Russell's avatar Rusty Russell

tdb2: direct access during transactions.

Currently we fall back to copying data during a transaction, but we don't
need to in many cases.  Grant direct access in those cases.

Before:
$ ./speed --transaction 1000000
Adding 1000000 records:  2409 ns (59916680 bytes)
Finding 1000000 records:  1156 ns (59916680 bytes)
Missing 1000000 records:  604 ns (59916680 bytes)
Missing 1000000 records:  604 ns (59916680 bytes)
Traversing 1000000 records:  1226 ns (59916680 bytes)
Deleting 1000000 records:  1556 ns (119361928 bytes)
Re-adding 1000000 records:  2326 ns (119361928 bytes)
Appending 1000000 records:  3269 ns (246656880 bytes)
Churning 1000000 records:  5613 ns (338235248 bytes)

After:
$ ./speed --transaction 1000000
Adding 1000000 records:  1902 ns (59916680 bytes)
Finding 1000000 records:  1032 ns (59916680 bytes)
Missing 1000000 records:  606 ns (59916680 bytes)
Missing 1000000 records:  606 ns (59916680 bytes)
Traversing 1000000 records:  741 ns (59916680 bytes)
Deleting 1000000 records:  1347 ns (119361928 bytes)
Re-adding 1000000 records:  1727 ns (119361928 bytes)
Appending 1000000 records:  2561 ns (246656880 bytes)
Churning 1000000 records:  4403 ns (338235248 bytes)
parent a56db4a5
...@@ -452,13 +452,6 @@ static int tdb_expand_file(struct tdb_context *tdb, tdb_len_t addition) ...@@ -452,13 +452,6 @@ static int tdb_expand_file(struct tdb_context *tdb, tdb_len_t addition)
return 0; return 0;
} }
/* This is only neded for tdb_access_commit, but used everywhere to simplify. */
struct tdb_access_hdr {
tdb_off_t off;
tdb_len_t len;
bool convert;
};
const void *tdb_access_read(struct tdb_context *tdb, const void *tdb_access_read(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len, bool convert) tdb_off_t off, tdb_len_t len, bool convert)
{ {
...@@ -471,6 +464,8 @@ const void *tdb_access_read(struct tdb_context *tdb, ...@@ -471,6 +464,8 @@ const void *tdb_access_read(struct tdb_context *tdb,
struct tdb_access_hdr *hdr; struct tdb_access_hdr *hdr;
hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr)); hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
if (hdr) { if (hdr) {
hdr->next = tdb->access;
tdb->access = hdr;
ret = hdr + 1; ret = hdr + 1;
if (convert) if (convert)
tdb_convert(tdb, (void *)ret, len); tdb_convert(tdb, (void *)ret, len);
...@@ -499,6 +494,8 @@ void *tdb_access_write(struct tdb_context *tdb, ...@@ -499,6 +494,8 @@ void *tdb_access_write(struct tdb_context *tdb,
struct tdb_access_hdr *hdr; struct tdb_access_hdr *hdr;
hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr)); hdr = _tdb_alloc_read(tdb, off, len, sizeof(*hdr));
if (hdr) { if (hdr) {
hdr->next = tdb->access;
tdb->access = hdr;
hdr->off = off; hdr->off = off;
hdr->len = len; hdr->len = len;
hdr->convert = convert; hdr->convert = convert;
...@@ -512,35 +509,41 @@ void *tdb_access_write(struct tdb_context *tdb, ...@@ -512,35 +509,41 @@ void *tdb_access_write(struct tdb_context *tdb,
return ret; return ret;
} }
bool is_direct(const struct tdb_context *tdb, const void *p) static struct tdb_access_hdr **find_hdr(struct tdb_context *tdb, const void *p)
{ {
return (tdb->map_ptr struct tdb_access_hdr **hp;
&& (char *)p >= (char *)tdb->map_ptr
&& (char *)p < (char *)tdb->map_ptr + tdb->map_size); for (hp = &tdb->access; *hp; hp = &(*hp)->next) {
if (*hp + 1 == p)
return hp;
}
return NULL;
} }
void tdb_access_release(struct tdb_context *tdb, const void *p) void tdb_access_release(struct tdb_context *tdb, const void *p)
{ {
if (is_direct(tdb, p)) struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
if (hp) {
hdr = *hp;
*hp = hdr->next;
free(hdr);
} else
tdb->direct_access--; tdb->direct_access--;
else
free((struct tdb_access_hdr *)p - 1);
} }
int tdb_access_commit(struct tdb_context *tdb, void *p) int tdb_access_commit(struct tdb_context *tdb, void *p)
{ {
struct tdb_access_hdr *hdr, **hp = find_hdr(tdb, p);
int ret = 0; int ret = 0;
if (!tdb->map_ptr if (hp) {
|| (char *)p < (char *)tdb->map_ptr hdr = *hp;
|| (char *)p >= (char *)tdb->map_ptr + tdb->map_size) {
struct tdb_access_hdr *hdr;
hdr = (struct tdb_access_hdr *)p - 1;
if (hdr->convert) if (hdr->convert)
ret = tdb_write_convert(tdb, hdr->off, p, hdr->len); ret = tdb_write_convert(tdb, hdr->off, p, hdr->len);
else else
ret = tdb_write(tdb, hdr->off, p, hdr->len); ret = tdb_write(tdb, hdr->off, p, hdr->len);
*hp = hdr->next;
free(hdr); free(hdr);
} else } else
tdb->direct_access--; tdb->direct_access--;
......
...@@ -290,6 +290,15 @@ struct tdb_lock_type { ...@@ -290,6 +290,15 @@ struct tdb_lock_type {
uint32_t ltype; uint32_t ltype;
}; };
/* This is only needed for tdb_access_commit, but used everywhere to
* simplify. */
struct tdb_access_hdr {
struct tdb_access_hdr *next;
tdb_off_t off;
tdb_len_t len;
bool convert;
};
struct tdb_context { struct tdb_context {
/* Filename of the database. */ /* Filename of the database. */
const char *name; const char *name;
...@@ -344,6 +353,9 @@ struct tdb_context { ...@@ -344,6 +353,9 @@ struct tdb_context {
struct tdb_attribute_stats *stats; struct tdb_attribute_stats *stats;
/* Direct access information */
struct tdb_access_hdr *access;
/* Single list of all TDBs, to avoid multiple opens. */ /* Single list of all TDBs, to avoid multiple opens. */
struct tdb_context *next; struct tdb_context *next;
dev_t device; dev_t device;
...@@ -435,9 +447,6 @@ const void *tdb_access_read(struct tdb_context *tdb, ...@@ -435,9 +447,6 @@ const void *tdb_access_read(struct tdb_context *tdb,
void *tdb_access_write(struct tdb_context *tdb, void *tdb_access_write(struct tdb_context *tdb,
tdb_off_t off, tdb_len_t len, bool convert); tdb_off_t off, tdb_len_t len, bool convert);
/* Is this pointer direct? (Otherwise it's malloced) */
bool is_direct(const struct tdb_context *tdb, const void *p);
/* Release result of tdb_access_read/write. */ /* Release result of tdb_access_read/write. */
void tdb_access_release(struct tdb_context *tdb, const void *p); void tdb_access_release(struct tdb_context *tdb, const void *p);
/* Commit result of tdb_acces_write. */ /* Commit result of tdb_acces_write. */
......
...@@ -172,6 +172,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags, ...@@ -172,6 +172,7 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
tdb->logfn = NULL; tdb->logfn = NULL;
tdb->transaction = NULL; tdb->transaction = NULL;
tdb->stats = NULL; tdb->stats = NULL;
tdb->access = NULL;
tdb_hash_init(tdb); tdb_hash_init(tdb);
tdb_io_init(tdb); tdb_io_init(tdb);
tdb_lock_init(tdb); tdb_lock_init(tdb);
...@@ -376,7 +377,6 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags, ...@@ -376,7 +377,6 @@ struct tdb_context *tdb_open(const char *name, int tdb_flags,
return NULL; return NULL;
} }
/* FIXME: modify, don't rewrite! */
static int update_rec_hdr(struct tdb_context *tdb, static int update_rec_hdr(struct tdb_context *tdb,
tdb_off_t off, tdb_off_t off,
tdb_len_t keylen, tdb_len_t keylen,
...@@ -468,7 +468,6 @@ int tdb_store(struct tdb_context *tdb, ...@@ -468,7 +468,6 @@ int tdb_store(struct tdb_context *tdb,
h.hlock_range, F_WRLCK); h.hlock_range, F_WRLCK);
return 0; return 0;
} }
/* FIXME: See if right record is free? */
} else { } else {
if (flag == TDB_MODIFY) { if (flag == TDB_MODIFY) {
/* if the record doesn't exist and we /* if the record doesn't exist and we
...@@ -525,7 +524,6 @@ int tdb_append(struct tdb_context *tdb, ...@@ -525,7 +524,6 @@ int tdb_append(struct tdb_context *tdb,
F_WRLCK); F_WRLCK);
return 0; return 0;
} }
/* FIXME: Check right record free? */
/* Slow path. */ /* Slow path. */
newdata = malloc(key.dsize + old_dlen + dbuf.dsize); newdata = malloc(key.dsize + old_dlen + dbuf.dsize);
......
...@@ -365,8 +365,37 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition) ...@@ -365,8 +365,37 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t addition)
static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off, static void *transaction_direct(struct tdb_context *tdb, tdb_off_t off,
size_t len, bool write) size_t len, bool write)
{ {
/* FIXME */ size_t blk = off / getpagesize(), end_blk;
return NULL;
/* This is wrong for zero-length blocks, but will fail gracefully */
end_blk = (off + len - 1) / getpagesize();
/* Can only do direct if in single block and we've already copied. */
if (write) {
if (blk != end_blk)
return NULL;
if (blk >= tdb->transaction->num_blocks)
return NULL;
if (tdb->transaction->blocks[blk] == NULL)
return NULL;
return tdb->transaction->blocks[blk] + off % getpagesize();
}
/* Single which we have copied? */
if (blk == end_blk
&& blk < tdb->transaction->num_blocks
&& tdb->transaction->blocks[blk])
return tdb->transaction->blocks[blk] + off % getpagesize();
/* Otherwise must be all not copied. */
while (blk < end_blk) {
if (blk >= tdb->transaction->num_blocks)
break;
if (tdb->transaction->blocks[blk])
return NULL;
blk++;
}
return tdb->transaction->io_methods->direct(tdb, off, len, write);
} }
static const struct tdb_methods transaction_methods = { static const struct tdb_methods transaction_methods = {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment