Commit e6862ec8 authored by Rusty Russell's avatar Rusty Russell

tdb2: check PID if we are holding a lock.

tdb1 has tdb_reopen/tdb_reopen_all, which you are supposed to call
after a fork().  But we can detect PID changes whenever we grab a lock,
which is to say, before we do anything.

We currently assume that pread and pwrite work, so the only problem
with fork() is if we are holding locks and expect them to still be
there in the child.  This is unusual, but most likely caused by a
fork() inside a transaction.  In such cases, allow the child to unlock
without complaint; they can then use the TDB as expected.

Performance before and after shows no measurable speed difference:

Total of 5 runs of tools/speed 100000:
Before:
	Adding: 18899ns
	Finding: 7040ns
	Missing: 5802ns
	Traversing: 6466ns
	Deleting: 12607ns
	Re-adding: 14185ns
	Appending: 20770ns
	Churning: 93845ns

After:
	Adding: 18073ns
	Finding: 6795ns
	Missing: 5549ns
	Traversing: 6333ns
	Deleting: 12313ns
	Re-adding: 13835ns
	Appending: 20343ns
	Churning: 92208ns
parent f513c570
Interface differences between TDB1 and TDB2.
- tdb2 uses 'struct tdb_data', tdb1 uses 'struct TDB_DATA'. Use the
TDB_DATA typedef if you want portability between the two.
- tdb2 functions return 0 on success, and a negative error on failure,
whereas tdb1 functions returned 0 on success, and -1 on failure. tdb1
then used tdb_error() to determine the error.
......@@ -27,3 +30,13 @@ Interface differences between TDB1 and TDB2.
- tdb2 provides tdb_deq() for comparing two struct tdb_data.
- tdb2's tdb_name() returns a copy of the name even for TDB_INTERNAL dbs.
- tdb2 does not need tdb_reopen() or tdb_reopen_all(). If you call
fork() after during certain operations the child should close the
tdb, or complete the operations before continuing to use the tdb:
tdb_transaction_start(): child must tdb_transaction_cancel()
tdb_lockall(): child must call tdb_unlockall()
tdb_lockall_read(): child must call tdb_unlockall_read()
tdb_chainlock(): child must call tdb_chainunlock()
tdb_parse() callback: child must return from tdb_parse()
......@@ -37,6 +37,29 @@ static enum TDB_ERROR owner_conflict(struct tdb_context *tdb, const char *call)
call);
}
/* If we fork, we no longer really own locks. */
static bool check_lock_pid(struct tdb_context *tdb,
const char *call, bool log)
{
/* No locks? No problem! */
if (tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0) {
return true;
}
/* No fork? No problem! */
if (tdb->file->locker == getpid()) {
return true;
}
if (log) {
tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"%s: fork() detected after lock acquisition!"
" (%u vs %u)", call, tdb->file->locker, getpid());
}
return false;
}
static int fcntl_lock(struct tdb_context *tdb,
int rw, off_t off, off_t len, bool waitflag)
{
......@@ -48,6 +71,11 @@ static int fcntl_lock(struct tdb_context *tdb,
fl.l_len = len;
fl.l_pid = 0;
if (tdb->file->allrecord_lock.count == 0
&& tdb->file->num_lockrecs == 0) {
tdb->file->locker = getpid();
}
add_stat(tdb, lock_lowlevel, 1);
if (waitflag)
return fcntl(tdb->file->fd, F_SETLKW, &fl);
......@@ -190,7 +218,8 @@ static enum TDB_ERROR tdb_brunlock(struct tdb_context *tdb,
ret = fcntl_unlock(tdb, rw_type, offset, len);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
/* If we fail, *then* we verify that we owned the lock. If not, ok. */
if (ret == -1 && check_lock_pid(tdb, "tdb_brunlock", false)) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_brunlock failed (fd=%d) at offset %zu"
" rw_type=%d len=%zu",
......@@ -210,6 +239,9 @@ enum TDB_ERROR tdb_allrecord_upgrade(struct tdb_context *tdb)
{
int count = 1000;
if (!check_lock_pid(tdb, "tdb_transaction_prepare_commit", true))
return TDB_ERR_LOCK;
if (tdb->file->allrecord_lock.count != 1) {
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
"tdb_allrecord_upgrade failed:"
......@@ -267,6 +299,9 @@ enum TDB_ERROR tdb_lock_and_recover(struct tdb_context *tdb)
{
enum TDB_ERROR ecode;
if (!check_lock_pid(tdb, "tdb_transaction_prepare_commit", true))
return TDB_ERR_LOCK;
ecode = tdb_allrecord_lock(tdb, F_WRLCK, TDB_LOCK_WAIT|TDB_LOCK_NOCHECK,
false);
if (ecode != TDB_SUCCESS) {
......@@ -303,6 +338,10 @@ static enum TDB_ERROR tdb_nest_lock(struct tdb_context *tdb,
if (tdb->flags & TDB_NOLOCK)
return TDB_SUCCESS;
if (!check_lock_pid(tdb, "tdb_nest_lock", true)) {
return TDB_ERR_LOCK;
}
add_stat(tdb, locks, 1);
new_lck = find_nestlock(tdb, offset, NULL);
......@@ -472,6 +511,13 @@ enum TDB_ERROR tdb_allrecord_lock(struct tdb_context *tdb, int ltype,
enum TDB_ERROR ecode;
tdb_bool_err berr;
if (tdb->flags & TDB_NOLOCK)
return TDB_SUCCESS;
if (!check_lock_pid(tdb, "tdb_allrecord_lock", true)) {
return TDB_ERR_LOCK;
}
if (tdb->file->allrecord_lock.count) {
if (tdb->file->allrecord_lock.owner != tdb) {
return owner_conflict(tdb, "tdb_allrecord_lock");
......@@ -587,6 +633,9 @@ void tdb_unlock_expand(struct tdb_context *tdb, int ltype)
/* unlock entire db */
void tdb_allrecord_unlock(struct tdb_context *tdb, int ltype)
{
if (tdb->flags & TDB_NOLOCK)
return;
if (tdb->file->allrecord_lock.count == 0) {
tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_USE_ERROR,
"tdb_allrecord_unlock: not locked!");
......@@ -664,6 +713,9 @@ enum TDB_ERROR tdb_lock_hashes(struct tdb_context *tdb,
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->file->allrecord_lock.count) {
if (!check_lock_pid(tdb, "tdb_lock_hashes", true))
return TDB_ERR_LOCK;
if (tdb->file->allrecord_lock.owner != tdb)
return owner_conflict(tdb, "tdb_lock_hashes");
if (ltype == tdb->file->allrecord_lock.ltype
......@@ -736,6 +788,9 @@ enum TDB_ERROR tdb_lock_free_bucket(struct tdb_context *tdb, tdb_off_t b_off,
/* a allrecord lock allows us to avoid per chain locks */
if (tdb->file->allrecord_lock.count) {
if (!check_lock_pid(tdb, "tdb_lock_free_bucket", true))
return TDB_ERR_LOCK;
if (tdb->file->allrecord_lock.ltype == F_WRLCK)
return 0;
return tdb_logerr(tdb, TDB_ERR_LOCK, TDB_LOG_ERROR,
......
......@@ -335,6 +335,7 @@ struct tdb_file {
int fd;
/* Lock information */
pid_t locker;
struct tdb_lock allrecord_lock;
size_t num_lockrecs;
struct tdb_lock *lockrecs;
......
/* Test forking while holding lock.
*
* There are only five ways to do this currently:
* (1) grab a tdb_chainlock, then fork.
* (2) grab a tdb_lockall, then fork.
* (3) grab a tdb_lockall_read, then fork.
* (4) start a transaction, then fork.
* (5) fork from inside a tdb_parse() callback.
*
* Note that we don't hold a lock across tdb_traverse callbacks, so
* that doesn't matter.
*/
#include <ccan/tdb2/tdb.c>
#include <ccan/tdb2/open.c>
#include <ccan/tdb2/free.c>
#include <ccan/tdb2/lock.c>
#include <ccan/tdb2/io.c>
#include <ccan/tdb2/hash.c>
#include <ccan/tdb2/check.c>
#include <ccan/tdb2/transaction.c>
#include <ccan/tap/tap.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "logging.h"
static enum TDB_ERROR fork_in_parse(TDB_DATA key, TDB_DATA data,
struct tdb_context *tdb)
{
int status;
if (fork() == 0) {
/* We expect this to fail. */
if (tdb_store(tdb, key, data, TDB_REPLACE) != TDB_ERR_LOCK)
exit(1);
if (tdb_fetch(tdb, key, &data) != TDB_ERR_LOCK)
exit(1);
if (tap_log_messages != 2)
exit(2);
tdb_close(tdb);
if (tap_log_messages != 2)
exit(3);
exit(0);
}
wait(&status);
ok1(WIFEXITED(status) && WEXITSTATUS(status) == 0);
return TDB_SUCCESS;
}
int main(int argc, char *argv[])
{
unsigned int i;
struct tdb_context *tdb;
int flags[] = { TDB_DEFAULT, TDB_NOMMAP,
TDB_CONVERT, TDB_NOMMAP|TDB_CONVERT };
struct tdb_data key = tdb_mkdata("key", 3);
struct tdb_data data = tdb_mkdata("data", 4);
plan_tests(sizeof(flags) / sizeof(flags[0]) * 14);
for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
int status;
tap_log_messages = 0;
tdb = tdb_open("run-fork-test.tdb", flags[i],
O_RDWR|O_CREAT|O_TRUNC, 0600, &tap_log_attr);
if (!ok1(tdb))
continue;
/* Put a record in here. */
ok1(tdb_store(tdb, key, data, TDB_REPLACE) == TDB_SUCCESS);
ok1(tdb_chainlock(tdb, key) == TDB_SUCCESS);
if (fork() == 0) {
/* We expect this to fail. */
if (tdb_store(tdb, key, data, TDB_REPLACE) != TDB_ERR_LOCK)
return 1;
if (tdb_fetch(tdb, key, &data) != TDB_ERR_LOCK)
return 1;
if (tap_log_messages != 2)
return 2;
tdb_chainunlock(tdb, key);
if (tap_log_messages != 2)
return 3;
tdb_close(tdb);
if (tap_log_messages != 2)
return 4;
return 0;
}
wait(&status);
ok1(WIFEXITED(status) && WEXITSTATUS(status) == 0);
tdb_chainunlock(tdb, key);
ok1(tdb_lockall(tdb) == TDB_SUCCESS);
if (fork() == 0) {
/* We expect this to fail. */
if (tdb_store(tdb, key, data, TDB_REPLACE) != TDB_ERR_LOCK)
return 1;
if (tdb_fetch(tdb, key, &data) != TDB_ERR_LOCK)
return 1;
if (tap_log_messages != 2)
return 2;
tdb_unlockall(tdb);
if (tap_log_messages != 2)
return 3;
tdb_close(tdb);
if (tap_log_messages != 2)
return 4;
return 0;
}
wait(&status);
ok1(WIFEXITED(status) && WEXITSTATUS(status) == 0);
tdb_unlockall(tdb);
ok1(tdb_lockall_read(tdb) == TDB_SUCCESS);
if (fork() == 0) {
/* We expect this to fail. */
/* This would always fail anyway... */
if (tdb_store(tdb, key, data, TDB_REPLACE) != TDB_ERR_LOCK)
return 1;
if (tdb_fetch(tdb, key, &data) != TDB_ERR_LOCK)
return 1;
if (tap_log_messages != 2)
return 2;
tdb_unlockall_read(tdb);
if (tap_log_messages != 2)
return 3;
tdb_close(tdb);
if (tap_log_messages != 2)
return 4;
return 0;
}
wait(&status);
ok1(WIFEXITED(status) && WEXITSTATUS(status) == 0);
tdb_unlockall_read(tdb);
ok1(tdb_transaction_start(tdb) == TDB_SUCCESS);
/* If transactions is empty, noop "commit" succeeds. */
ok1(tdb_delete(tdb, key) == TDB_SUCCESS);
if (fork() == 0) {
/* We expect this to fail. */
if (tdb_store(tdb, key, data, TDB_REPLACE) != TDB_ERR_LOCK)
return 1;
if (tdb_fetch(tdb, key, &data) != TDB_ERR_LOCK)
return 1;
if (tap_log_messages != 2)
return 2;
if (tdb_transaction_commit(tdb) != TDB_ERR_LOCK)
return 3;
tdb_close(tdb);
if (tap_log_messages < 3)
return 4;
return 0;
}
wait(&status);
ok1(WIFEXITED(status) && WEXITSTATUS(status) == 0);
tdb_transaction_cancel(tdb);
ok1(tdb_parse_record(tdb, key, fork_in_parse, tdb)
== TDB_SUCCESS);
tdb_close(tdb);
ok1(tap_log_messages == 0);
}
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment