Commit 492c8098 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Contrary to r18222, the new group commit code was not on the main truck.

{{{
svn merge -r17893:18056 https://svn.tokutek.com/tokudb/toku/tokudb.2370c
}}}

Refs #2370, #2385.  [t:2370] [t:2385].


git-svn-id: file:///svn/toku/tokudb@18259 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7cbfde5c
...@@ -427,7 +427,6 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -427,7 +427,6 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf(" u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */ \n"); printf(" u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */ \n");
printf(" u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */ \n"); printf(" u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */ \n");
printf(" u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */ \n"); printf(" u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */ \n");
printf(" u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */ \n");
printf(" u_int32_t checkpoint_period; /* delay between automatic checkpoints */ \n"); printf(" u_int32_t checkpoint_period; /* delay between automatic checkpoints */ \n");
printf(" u_int32_t checkpoint_footprint; /* state of checkpoint procedure */ \n"); printf(" u_int32_t checkpoint_footprint; /* state of checkpoint procedure */ \n");
printf(" char checkpoint_time_begin[26]; /* time of last checkpoint begin */ \n"); printf(" char checkpoint_time_begin[26]; /* time of last checkpoint begin */ \n");
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <errno.h> #include <errno.h>
#include <toku_assert.h> #include <toku_assert.h>
#include <stdio.h> #include <stdio.h>
#include <string.h>
//Print any necessary errors //Print any necessary errors
//Return whether we should try the write again. //Return whether we should try the write again.
...@@ -130,19 +131,29 @@ toku_set_func_fsync(int (*fsync_function)(int)) { ...@@ -130,19 +131,29 @@ toku_set_func_fsync(int (*fsync_function)(int)) {
return 0; return 0;
} }
// keep trying if fsync fails because of EINTR
int int
toku_file_fsync(int fd) { toku_file_fsync_without_accounting (int fd) {
int r = -1; int r = -1;
uint64_t tstart = get_tnow();
while (r != 0) { while (r != 0) {
if (t_fsync) if (t_fsync)
r = t_fsync(fd); r = t_fsync(fd);
else else
r = fsync(fd); r = fsync(fd);
if (r) if (r) {
assert(errno==EINTR); int rr = errno;
if (rr!=EINTR) printf("rr=%d (%s)\n", rr, strerror(rr));
assert(rr==EINTR);
}
} }
return r;
}
// keep trying if fsync fails because of EINTR
int
toku_file_fsync(int fd) {
uint64_t tstart = get_tnow();
int r = toku_file_fsync_without_accounting(fd);
toku_sync_fetch_and_increment_uint64(&toku_fsync_count); toku_sync_fetch_and_increment_uint64(&toku_fsync_count);
toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart); toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart);
return r; return r;
......
...@@ -5459,8 +5459,6 @@ toku_brt_lock_init(void) { ...@@ -5459,8 +5459,6 @@ toku_brt_lock_init(void) {
int r = 0; int r = 0;
if (r==0) if (r==0)
r = toku_pwrite_lock_init(); r = toku_pwrite_lock_init();
if (r==0)
r = toku_logger_lock_init();
return r; return r;
} }
...@@ -5469,8 +5467,6 @@ toku_brt_lock_destroy(void) { ...@@ -5469,8 +5467,6 @@ toku_brt_lock_destroy(void) {
int r = 0; int r = 0;
if (r==0) if (r==0)
r = toku_pwrite_lock_destroy(); r = toku_pwrite_lock_destroy();
if (r==0)
r = toku_logger_lock_destroy();
return r; return r;
} }
......
...@@ -59,7 +59,12 @@ struct logbuf { ...@@ -59,7 +59,12 @@ struct logbuf {
struct tokulogger { struct tokulogger {
enum typ_tag tag; // must be first enum typ_tag tag; // must be first
struct mylock input_lock, output_lock; // acquired in that order struct mylock input_lock;
toku_pthread_mutex_t output_condition_lock; // if you need both this lock and input_lock, acquire the output_lock first, then input_lock. More typical is to get the output_is_available condition to be false, and then acquire the input_lock.
toku_pthread_cond_t output_condition; //
BOOL output_is_available; // this is part of the predicate for the output condition. It's true if no thread is modifying the output (either doing an fsync or otherwise fiddling with the output).
BOOL is_open; BOOL is_open;
BOOL is_panicked; BOOL is_panicked;
BOOL write_log_files; BOOL write_log_files;
...@@ -75,14 +80,15 @@ struct tokulogger { ...@@ -75,14 +80,15 @@ struct tokulogger {
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that? OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
struct logbuf inbuf; // data being accumulated for the write struct logbuf inbuf; // data being accumulated for the write
// To access these, you must have the output lock // To access these, you must have the output condition lock.
LSN written_lsn; // the last lsn written LSN written_lsn; // the last lsn written
LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry LSN fsynced_lsn; // What is the LSN of the highest fsynced log entry (accessed only while holding the output lock, and updated only when the output lock and output permission are held)
LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint. LSN checkpoint_lsn; // What is the LSN of the most recent completed checkpoint.
long long next_log_file_number; long long next_log_file_number;
struct logbuf outbuf; // data being written to the file struct logbuf outbuf; // data being written to the file
int n_in_file; // The amount of data in the current file int n_in_file; // The amount of data in the current file
// To access the logfilemgr you must have the output condition lock.
TOKULOGFILEMGR logfilemgr; TOKULOGFILEMGR logfilemgr;
u_int32_t write_block_size; // How big should the blocks be written to various logs? u_int32_t write_block_size; // How big should the blocks be written to various logs?
......
...@@ -7,16 +7,11 @@ ...@@ -7,16 +7,11 @@
static const int log_format_version=TOKU_LOG_VERSION; static const int log_format_version=TOKU_LOG_VERSION;
static toku_pthread_mutex_t logger_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static u_int32_t logger_lock_ctr = 0; // useful for debug at a live installation
static int open_logfile (TOKULOGGER logger); static int open_logfile (TOKULOGGER logger);
static int toku_logger_write_buffer (TOKULOGGER logger, int do_fsync); static int toku_logger_write_buffer (TOKULOGGER logger, LSN *fsynced_lsn);
static int delete_logfile(TOKULOGGER logger, long long index); static int delete_logfile(TOKULOGGER logger, long long index);
static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
u_int32_t toku_logger_get_lock_ctr(void) { static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
return logger_lock_ctr;
}
int toku_logger_create (TOKULOGGER *resultp) { int toku_logger_create (TOKULOGGER *resultp) {
int r; int r;
...@@ -45,8 +40,10 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -45,8 +40,10 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->oldest_living_xid = TXNID_NONE_LIVING; result->oldest_living_xid = TXNID_NONE_LIVING;
toku_logfilemgr_create(&result->logfilemgr); toku_logfilemgr_create(&result->logfilemgr);
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto panic; r = ml_init(&result->input_lock); if (r!=0) goto panic;
r = ml_init(&result->output_lock); if (r!=0) goto panic; r = toku_pthread_mutex_init(&result->output_condition_lock, NULL); if (r!=0) goto panic;
r = toku_pthread_cond_init(&result->output_condition, NULL); if (r!=0) goto panic;
result->output_is_available = TRUE;
return 0; return 0;
panic: panic:
...@@ -94,42 +91,35 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) { ...@@ -94,42 +91,35 @@ int toku_logger_open (const char *directory, TOKULOGGER logger) {
// No locks held on entry // No locks held on entry
// No locks held on exit. // No locks held on exit.
// Perhaps no locks are needed, since you cannot legally close the log concurrently with doing anything else. // No locks are needed, since you cannot legally close the log concurrently with doing anything else.
// But grab the locks just to be careful, including one to prevent access
// between unlocking and destroying.
int toku_logger_close(TOKULOGGER *loggerp) { int toku_logger_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp; TOKULOGGER logger = *loggerp;
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
int r = 0; int r = 0;
int locked_logger = 0;
if (!logger->is_open) goto is_closed; if (!logger->is_open) goto is_closed;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic; ml_lock(&logger->input_lock);
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; LSN fsynced_lsn;
r = toku_pthread_mutex_lock(&logger_mutex); if (r!=0) goto panic; grab_output(logger, &fsynced_lsn);
logger_lock_ctr++; r = toku_logger_write_buffer(logger, &fsynced_lsn); if (r!=0) goto panic; //Releases the input lock
locked_logger = 1;
r = toku_logger_write_buffer(logger, 1); if (r!=0) goto panic; //Releases the input lock
if (logger->fd!=-1) { if (logger->fd!=-1) {
r = close(logger->fd); if (r!=0) { r=errno; goto panic; } r = close(logger->fd); if (r!=0) { r=errno; goto panic; }
} }
logger->fd=-1; logger->fd=-1;
release_output(logger, fsynced_lsn);
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic;
is_closed: is_closed:
toku_free(logger->inbuf.buf); toku_free(logger->inbuf.buf);
toku_free(logger->outbuf.buf); toku_free(logger->outbuf.buf);
r = ml_destroy(&logger->output_lock); if (r!=0) goto panic; // before destroying locks they must be left in the unlocked state.
r = ml_destroy(&logger->input_lock); if (r!=0) goto panic; r = ml_destroy(&logger->input_lock); if (r!=0) goto panic;
r = toku_pthread_mutex_destroy(&logger->output_condition_lock); if (r!=0) goto panic;
r = toku_pthread_cond_destroy(&logger->output_condition); if (r!=0) goto panic;
logger->is_panicked=1; // Just in case this might help. logger->is_panicked=1; // Just in case this might help.
if (logger->directory) toku_free(logger->directory); if (logger->directory) toku_free(logger->directory);
toku_omt_destroy(&logger->live_txns); toku_omt_destroy(&logger->live_txns);
toku_logfilemgr_destroy(&logger->logfilemgr); toku_logfilemgr_destroy(&logger->logfilemgr);
toku_free(logger); toku_free(logger);
*loggerp=0; *loggerp=0;
if (locked_logger) {
logger_lock_ctr++;
r = toku_pthread_mutex_unlock(&logger_mutex); if (r!=0) goto panic;
}
return r; return r;
panic: panic:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
...@@ -147,23 +137,15 @@ int toku_logger_shutdown(TOKULOGGER logger) { ...@@ -147,23 +137,15 @@ int toku_logger_shutdown(TOKULOGGER logger) {
return r; return r;
} }
// Write data to a file. Keep trying even if partial writes occur. static int close_and_open_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
// On error: Return negative with errno set. // Effect: close the current file, and open the next one.
// On success return nbytes. // Entry: This thread has permission to modify the output.
static int write_it (int fd, const void *bufv, int nbytes) { // Exit: This thread has permission to modify the output.
toku_os_full_write(fd, bufv, nbytes); {
return nbytes;
}
// close the current file, and open the next one.
// Entry: The output lock is held.
// Exit: The output lock is stlil held.
static int close_and_open_logfile (TOKULOGGER logger) {
int r; int r;
if (logger->write_log_files) { if (logger->write_log_files) {
r = toku_file_fsync(logger->fd); if (r!=0) return errno; r = toku_file_fsync_without_accounting(logger->fd); if (r!=0) return errno;
assert(logger->fsynced_lsn.lsn <= logger->written_lsn.lsn); *fsynced_lsn = logger->written_lsn;
logger->fsynced_lsn = logger->written_lsn;
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // fixes t:2294 toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // fixes t:2294
} }
r = close(logger->fd); if (r!=0) return errno; r = close(logger->fd); if (r!=0) return errno;
...@@ -177,42 +159,150 @@ max_int (int a, int b) ...@@ -177,42 +159,150 @@ max_int (int a, int b)
return b; return b;
} }
// ***********************************************************
// output mutex/condition manipulation routines
// ***********************************************************
static void
wait_till_output_available (TOKULOGGER logger)
// Effect: Wait until output becomes available.
// Implementation hint: Use a pthread_cond_wait.
// Entry: Holds the output_condition_lock (but not the inlock)
// Exit: Holds the output_condition_lock and logger->output_is_available
//
{
while (!logger->output_is_available) {
int r = toku_pthread_cond_wait(&logger->output_condition, &logger->output_condition_lock);
assert(r==0);
}
}
static void
grab_output(TOKULOGGER logger, LSN *fsynced_lsn)
// Effect: Wait until output becomes available and get permission to modify output.
// Entry: Holds no lock (including not holding the input lock, since we never hold both at once).
// Exit: Hold permission to modify output (but none of the locks).
{
int r;
r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0);
wait_till_output_available(logger);
logger->output_is_available = FALSE;
if (fsynced_lsn) {
*fsynced_lsn = logger->fsynced_lsn;
}
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0);
}
static BOOL
wait_till_output_already_written_or_output_buffer_available (TOKULOGGER logger, LSN lsn, LSN *fsynced_lsn)
// Effect: Wait until either the output is available or the lsn has been written.
// Return true iff the lsn has been written.
// If returning true, then on exit we don't hold output permission.
// If returning false, then on exit we do hold output permission.
// Entry: Hold no locks.
// Exit: Hold the output permission if returns false.
{
BOOL result;
{ int r = toku_pthread_mutex_lock(&logger->output_condition_lock); assert(r==0); }
while (1) {
if (logger->fsynced_lsn.lsn >= lsn.lsn) { // we can look at the fsynced lsn since we have the lock.
result = TRUE;
break;
}
if (logger->output_is_available) {
logger->output_is_available = FALSE;
result = FALSE;
break;
}
// otherwise wait for a good time to look again.
int r = toku_pthread_cond_wait(&logger->output_condition, &logger->output_condition_lock);
assert(r==0);
}
*fsynced_lsn = logger->fsynced_lsn;
{ int r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0); }
return result;
}
static void
release_output (TOKULOGGER logger, LSN fsynced_lsn)
// Effect: Release output permission.
// Entry: Holds output permissions, but no locks.
// Exit: Holds neither locks nor output permission.
{
int r;
r = pthread_mutex_lock(&logger->output_condition_lock); assert(r==0);
logger->output_is_available = TRUE;
if (logger->fsynced_lsn.lsn < fsynced_lsn.lsn) {
logger->fsynced_lsn = fsynced_lsn;
}
r = toku_pthread_cond_broadcast(&logger->output_condition); assert(r==0);
r = toku_pthread_mutex_unlock(&logger->output_condition_lock); assert(r==0);
}
static void
swap_inbuf_outbuf (TOKULOGGER logger)
// Effect: Swap the inbuf and outbuf
// Entry and exit: Hold the input lock and permission to modify output.
{
struct logbuf tmp = logger->inbuf;
logger->inbuf = logger->outbuf;
logger->outbuf = tmp;
assert(logger->inbuf.n_in_buf == 0);
}
static void
write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
// Effect: Write the contents of outbuf to logfile. Don't necessarily fsync (but it might, in which case fynced_lsn is updated).
// If the logfile gets too big, open the next one (that's the case where an fsync might happen).
// Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
{
if (logger->outbuf.n_in_buf>0) {
toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
}
// If the file got too big, then open a new file.
if (logger->n_in_file > logger->lg_max) {
int r = close_and_open_logfile(logger, fsynced_lsn);
assert(r==0);
}
}
int int
toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed) toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
// Entry: Holds the inlock // Entry: Holds the inlock
// Exit: Holds the inlock // Exit: Holds the inlock
// Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer. // Effect: Upon exit, the inlock is held and there are at least n_bytes_needed in the buffer.
// May release the inlock, so this is not atomic. // May release the inlock (and then reacquire it), so this is not atomic.
// Implementation: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There is no fsync. // May obtain the output lock and output permission (but if it does so, it will have released the inlock, since we don't hold both locks at once).
// (But may hold output permission and inlock at the same time.)
// Implementation hint: Makes space in the inbuf, possibly by writing the inbuf to disk or increasing the size of the inbuf. There might not be an fsync.
// Arguments: logger: the logger (side effects) // Arguments: logger: the logger (side effects)
// n_bytes_needed: how many bytes to make space for. // n_bytes_needed: how many bytes to make space for.
{ {
int r; int r;
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0; if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0;
r = ml_unlock(&logger->input_lock); if (r!=0) goto panic; r = ml_unlock(&logger->input_lock); if (r!=0) goto panic;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic; LSN fsynced_lsn;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; grab_output(logger, &fsynced_lsn);
// Some other thread may have written the log out while we didn't have the lock. If we can squeeze it in now, then be happy
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) return 0; r = ml_lock(&logger->input_lock); if (r!=0) goto panic;
// Some other thread may have written the log out while we didn't have the lock. If we have space now, then be happy.
if (logger->inbuf.n_in_buf + n_bytes_needed <= LOGGER_MIN_BUF_SIZE) {
release_output(logger, fsynced_lsn);
return 0;
}
if (logger->inbuf.n_in_buf > 0) { if (logger->inbuf.n_in_buf > 0) {
// There isn't enough space, and there is something in the buffer, so write the inbuf. // There isn't enough space, and there is something in the buffer, so write the inbuf.
{ swap_inbuf_outbuf(logger);
struct logbuf tmp = logger->inbuf;
logger->inbuf = logger->outbuf; // Don't release the inlock in this case, because we don't want to get starved.
logger->outbuf = tmp; write_outbuf_to_logfile(logger, &fsynced_lsn);
assert(logger->inbuf.n_in_buf == 0);
}
r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
if (r!=logger->outbuf.n_in_buf) { r = errno; goto panic; }
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic; // set
}
} }
// the inbuf is empty. Is it big enough? // the inbuf is empty. Make it big enough (just in case it is somehow smaller than a single log entry).
if (n_bytes_needed > logger->inbuf.buf_size) { if (n_bytes_needed > logger->inbuf.buf_size) {
assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big. assert(n_bytes_needed < (1<<30)); // it seems unlikely to work if a logentry gets that big.
int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes int new_size = max_int(logger->inbuf.buf_size * 2, n_bytes_needed); // make it at least twice as big, and big enough for n_bytes
...@@ -220,26 +310,28 @@ toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed) ...@@ -220,26 +310,28 @@ toku_logger_make_space_in_inbuf (TOKULOGGER logger, int n_bytes_needed)
XREALLOC_N(new_size, logger->inbuf.buf); XREALLOC_N(new_size, logger->inbuf.buf);
logger->inbuf.buf_size = new_size; logger->inbuf.buf_size = new_size;
} }
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic; release_output(logger, fsynced_lsn);
return 0; return 0;
panic: panic:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
return r; return r;
} }
int toku_logger_fsync (TOKULOGGER logger)
// Effect: This is the exported fsync used by ydb.c for env_log_flush. Group commit doesn't have to work.
// Entry: Holds no locks // Entry: Holds no locks
// Exit: Holds no locks // Exit: Holds no locks
// This is the exported fsync used by ydb.c // Implementation note: Acquire the output condition lock, then the output permission, then release the output condition lock, then get the input lock.
int toku_logger_fsync (TOKULOGGER logger) { // Then release everything.
//
{
int r; int r;
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic; r = ml_lock(&logger->input_lock); assert(r==0);
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; r = toku_logger_maybe_fsync(logger, logger->inbuf.max_lsn_in_buf, TRUE);
r = toku_logger_write_buffer(logger, 1); if (r!=0) goto panic; if (r!=0) {
r = ml_unlock(&logger->output_lock); if (r!=0) goto panic; toku_logger_panic(logger, r);
return 0; }
panic:
toku_logger_panic(logger, r);
return r; return r;
} }
...@@ -284,20 +376,9 @@ int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) { ...@@ -284,20 +376,9 @@ int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) {
return 0; return 0;
} }
int toku_logger_lock_init(void) { int toku_logger_find_next_unused_log_file(const char *directory, long long *result)
int r = toku_pthread_mutex_init(&logger_mutex, NULL); // This is called during logger initialalization, and no locks are required.
logger_lock_ctr = 0; {
assert(r == 0);
return r;
}
int toku_logger_lock_destroy(void) {
int r = toku_pthread_mutex_destroy(&logger_mutex);
assert(r == 0);
return r;
}
int toku_logger_find_next_unused_log_file(const char *directory, long long *result) {
DIR *d=opendir(directory); DIR *d=opendir(directory);
long long maxf=-1; *result = maxf; long long maxf=-1; *result = maxf;
struct dirent *de; struct dirent *de;
...@@ -313,7 +394,6 @@ int toku_logger_find_next_unused_log_file(const char *directory, long long *resu ...@@ -313,7 +394,6 @@ int toku_logger_find_next_unused_log_file(const char *directory, long long *resu
return r; return r;
} }
static int logfilenamecompare (const void *ap, const void *bp) { static int logfilenamecompare (const void *ap, const void *bp) {
char *a=*(char**)ap; char *a=*(char**)ap;
char *b=*(char**)bp; char *b=*(char**)bp;
...@@ -322,7 +402,9 @@ static int logfilenamecompare (const void *ap, const void *bp) { ...@@ -322,7 +402,9 @@ static int logfilenamecompare (const void *ap, const void *bp) {
// Return the log files in sorted order // Return the log files in sorted order
// Return a null_terminated array of strings, and also return the number of strings in the array. // Return a null_terminated array of strings, and also return the number of strings in the array.
int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles) { // Requires: Race conditions must be dealt with by caller. Either call during initialization or grab the output permission.
int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_logfiles)
{
int result_limit=2; int result_limit=2;
int n_results=0; int n_results=0;
char **MALLOC_N(result_limit, result); char **MALLOC_N(result_limit, result);
...@@ -358,8 +440,9 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo ...@@ -358,8 +440,9 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp, int *n_lo
return d ? closedir(d) : 0; return d ? closedir(d) : 0;
} }
static int open_logfile (TOKULOGGER logger) { static int open_logfile (TOKULOGGER logger)
int r; // Entry and Exit: This thread has permission to modify the output.
{
int fnamelen = strlen(logger->directory)+50; int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen]; char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number); snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number);
...@@ -373,9 +456,9 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -373,9 +456,9 @@ static int open_logfile (TOKULOGGER logger) {
// printf("%s: %s %d\n", __FUNCTION__, DEV_NULL_FILE, logger->fd); fflush(stdout); // printf("%s: %s %d\n", __FUNCTION__, DEV_NULL_FILE, logger->fd); fflush(stdout);
if (logger->fd==-1) return errno; if (logger->fd==-1) return errno;
} }
r = write_it(logger->fd, "tokulogg", 8); if (r!=8) return errno; toku_os_full_write(logger->fd, "tokulogg", 8);
int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order int version_l = toku_htonl(log_format_version); //version MUST be in network byte order regardless of disk order
r = write_it(logger->fd, &version_l, 4); if (r!=4) return errno; toku_os_full_write(logger->fd, &version_l, 4);
if ( logger->write_log_files ) { if ( logger->write_log_files ) {
TOKULOGFILEINFO lf_info = toku_malloc(sizeof(struct toku_logfile_info)); TOKULOGFILEINFO lf_info = toku_malloc(sizeof(struct toku_logfile_info));
if (lf_info == NULL) if (lf_info == NULL)
...@@ -389,7 +472,9 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -389,7 +472,9 @@ static int open_logfile (TOKULOGGER logger) {
return 0; return 0;
} }
static int delete_logfile(TOKULOGGER logger, long long index) { static int delete_logfile(TOKULOGGER logger, long long index)
// Entry and Exit: This thread has permission to modify the output.
{
int fnamelen = strlen(logger->directory)+50; int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen]; char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, index); snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, index);
...@@ -397,8 +482,13 @@ static int delete_logfile(TOKULOGGER logger, long long index) { ...@@ -397,8 +482,13 @@ static int delete_logfile(TOKULOGGER logger, long long index) {
return r; return r;
} }
int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) { int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn)
// On entry and exit: No logger locks held.
// Acquires and releases output permission.
{
int r=0; int r=0;
LSN fsynced_lsn;
grab_output(logger, &fsynced_lsn);
TOKULOGFILEMGR lfm = logger->logfilemgr; TOKULOGFILEMGR lfm = logger->logfilemgr;
int n_logfiles = toku_logfilemgr_num_logfiles(lfm); int n_logfiles = toku_logfilemgr_num_logfiles(lfm);
...@@ -417,139 +507,104 @@ int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) { ...@@ -417,139 +507,104 @@ int toku_logger_maybe_trim_log(TOKULOGGER logger, LSN trim_lsn) {
n_logfiles--; n_logfiles--;
r = delete_logfile(logger, index); r = delete_logfile(logger, index);
if (r!=0) { if (r!=0) {
return r; break;
} }
} }
} }
release_output(logger, fsynced_lsn);
return r; return r;
} }
void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files) { void toku_logger_write_log_files (TOKULOGGER logger, BOOL write_log_files)
// Called only during initialization, so no locks are needed.
{
assert(!logger->is_open); assert(!logger->is_open);
logger->write_log_files = write_log_files; logger->write_log_files = write_log_files;
} }
void toku_logger_trim_log_files (TOKULOGGER logger, BOOL trim_log_files) { void toku_logger_trim_log_files (TOKULOGGER logger, BOOL trim_log_files)
// Called only during initialization, so no locks are needed.
{
assert(logger); assert(logger);
logger->trim_log_files = trim_log_files; logger->trim_log_files = trim_log_files;
} }
int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync) int toku_logger_maybe_fsync (TOKULOGGER logger, LSN lsn, int do_fsync)
// Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn. // Effect: If fsync is nonzero, then make sure that the log is flushed and synced at least up to lsn.
// Entry: Holds input lock. // Entry: Holds input lock. The log entry has already been written to the input buffer.
// Exit: Holds no locks. // Exit: Holds no locks.
// The input lock may be released and then reacquired. Thus this function does not run atomically. // The input lock may be released and then reacquired. Thus this function does not run atomically with respect to other threads.
{ {
int r; int r;
BOOL have_input_lock = TRUE; if (do_fsync) {
if (do_fsync && logger->fsynced_lsn.lsn < lsn.lsn) { // reacquire the locks (acquire output permission first)
// need to fsync and not enough is done r = ml_unlock(&logger->input_lock); assert(r==0);
// reacquire the locks (acquire output lock first) LSN fsynced_lsn;
r = ml_unlock(&logger->input_lock); if (r!=0) goto panic; BOOL already_done = wait_till_output_already_written_or_output_buffer_available(logger, lsn, &fsynced_lsn);
have_input_lock = FALSE; if (already_done) return 0;
r = ml_lock(&logger->output_lock); if (r!=0) goto panic;
r = ml_lock(&logger->input_lock); if (r!=0) goto panic; // otherwise we now own the output permission, and our lsn isn't outputed.
have_input_lock = TRUE;
r = ml_lock(&logger->input_lock); assert(r==0);
// it's possible that the written lsn is now written enough that we are happy. If not then do the I/O swap_inbuf_outbuf(logger);
if (logger->fsynced_lsn.lsn < lsn.lsn) {
// now we actually do the I/O r = ml_unlock(&logger->input_lock); // release the input lock now, so other threads can fill the inbuf. (Thus enabling group commit.)
struct logbuf tmp = logger->inbuf; assert(r==0);
logger->inbuf = logger->outbuf;
logger->outbuf = tmp; write_outbuf_to_logfile(logger, &fsynced_lsn);
r = ml_unlock(&logger->input_lock); // release the input lock now, so group commit can operate if (fsynced_lsn.lsn < lsn.lsn) {
if (r!=0) goto panic; // it may have gotten fsynced by the write_outbuf_to_logfile.
have_input_lock = FALSE; r = toku_file_fsync_without_accounting(logger->fd);
if (logger->outbuf.n_in_buf>0) { if (r!=0) {
r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf); toku_logger_panic(logger, r);
if (r!=logger->outbuf.n_in_buf) { r = errno; goto panic; } return r;
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
}
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic;
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
} else {
assert(logger->fsynced_lsn.lsn < logger->written_lsn.lsn); // the fsynced_lsn was less than lsn, but not less than the written lsn?
r = toku_file_fsync(logger->fd);
if (r!=0) { r = errno; goto panic; }
logger->fsynced_lsn = logger->written_lsn;
} }
assert(fsynced_lsn.lsn <= logger->written_lsn.lsn);
fsynced_lsn = logger->written_lsn;
} }
r = ml_unlock(&logger->output_lock); // the last lsn is only accessed while holding output permission or else when the log file is old.
if (r!=0) goto panic; if ( logger->write_log_files )
} toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
if (have_input_lock) { release_output(logger, fsynced_lsn);
} else {
r = ml_unlock(&logger->input_lock); r = ml_unlock(&logger->input_lock);
if (r!=0) goto panic2; assert(r==0);
have_input_lock = FALSE;
} }
if ( logger->write_log_files )
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn);
return 0; return 0;
panic:
if (have_input_lock) {
ml_unlock(&logger->input_lock);
have_input_lock = FALSE;
}
panic2:
toku_logger_panic(logger, r);
return r;
} }
static int static int
toku_logger_write_buffer (TOKULOGGER logger, int do_fsync) toku_logger_write_buffer (TOKULOGGER logger, LSN *fsynced_lsn)
// Entry: Holds both locks. // Entry: Holds the input lock and permission to modify output.
// Exit: Holds only the output lock. // Exit: Holds only the permission to modify output.
// Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync. // Effect: Write the buffers to the output. If DO_FSYNC is true, then fsync.
// Note: Only called during single-threaded activity from toku_logger_restart, so locks aren't really needed.
{ {
int r; swap_inbuf_outbuf(logger);
{ { int r = ml_unlock(&logger->input_lock); assert(r==0); }
struct logbuf tmp = logger->inbuf; write_outbuf_to_logfile(logger, fsynced_lsn);
logger->inbuf = logger->outbuf; if (logger->write_log_files) {
logger->outbuf = tmp; int r = toku_file_fsync_without_accounting(logger->fd);
assert(logger->inbuf.n_in_buf == 0); if (r!=0) {
} toku_logger_panic(logger, r);
r = ml_unlock(&logger->input_lock); return r;
if (r!=0)
goto panic;
if (logger->outbuf.n_in_buf > 0) {
r = write_it(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
if (r != logger->outbuf.n_in_buf) {
r = errno;
goto panic;
}
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
logger->outbuf.n_in_buf = 0;
if (logger->n_in_file > logger->lg_max) {
r = close_and_open_logfile(logger); if (r!=0) goto panic;
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
} else if (do_fsync) {
r = toku_file_fsync(logger->fd);
logger->fsynced_lsn = logger->outbuf.max_lsn_in_buf;
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // t:2294
} else {
/* nothing */
} }
toku_logfilemgr_update_last_lsn(logger->logfilemgr, logger->written_lsn); // t:2294
} }
return 0; return 0;
panic:
toku_logger_panic(logger, r);
return r;
} }
int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) { int toku_logger_restart(TOKULOGGER logger, LSN lastlsn)
// Entry and exit: Holds no locks (this is called only during single-threaded activity, such as initial start).
{
int r; int r;
// flush out the log buffer // flush out the log buffer
r = ml_lock(&logger->output_lock); assert(r == 0); LSN fsynced_lsn;
grab_output(logger, &fsynced_lsn);
r = ml_lock(&logger->input_lock); assert(r == 0); r = ml_lock(&logger->input_lock); assert(r == 0);
r = toku_logger_write_buffer(logger, TRUE); assert(r == 0); r = toku_logger_write_buffer(logger, &fsynced_lsn); assert(r == 0);
r = ml_unlock(&logger->output_lock); assert(r == 0);
// close the log file // close the log file
r = close(logger->fd); assert(r == 0); r = close(logger->fd); assert(r == 0);
...@@ -561,7 +616,9 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) { ...@@ -561,7 +616,9 @@ int toku_logger_restart(TOKULOGGER logger, LSN lastlsn) {
logger->trim_log_files = TRUE; logger->trim_log_files = TRUE;
// open a new log file // open a new log file
return open_logfile(logger); r = open_logfile(logger);
release_output(logger, fsynced_lsn);
return r;
} }
// fname is the iname // fname is the iname
...@@ -895,7 +952,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) { ...@@ -895,7 +952,7 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
return rval; return rval;
} }
// Find the earliest LSN in a log // Find the earliest LSN in a log. No locks are needed.
static int peek_at_log (TOKULOGGER logger, char* filename, LSN *first_lsn) { static int peek_at_log (TOKULOGGER logger, char* filename, LSN *first_lsn) {
logger=logger; logger=logger;
int fd = open(filename, O_RDONLY+O_BINARY); int fd = open(filename, O_RDONLY+O_BINARY);
...@@ -925,13 +982,17 @@ static int peek_at_log (TOKULOGGER logger, char* filename, LSN *first_lsn) { ...@@ -925,13 +982,17 @@ static int peek_at_log (TOKULOGGER logger, char* filename, LSN *first_lsn) {
} }
// Return a malloc'd array of malloc'd strings which are the filenames that can be archived. // Return a malloc'd array of malloc'd strings which are the filenames that can be archived.
// Output permission are obtained briefly so we can get a list of the log files without conflicting.
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
if (flags!=0) return EINVAL; // don't know what to do. if (flags!=0) return EINVAL; // don't know what to do.
int all_n_logs; int all_n_logs;
int i; int i;
char **all_logs; char **all_logs;
int n_logfiles; int n_logfiles;
LSN fsynced_lsn;
grab_output(logger, &fsynced_lsn);
int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles); int r = toku_logger_find_logfiles (logger->directory, &all_logs, &n_logfiles);
release_output(logger, fsynced_lsn);
if (r!=0) return r; if (r!=0) return r;
for (i=0; all_logs[i]; i++); for (i=0; all_logs[i]; i++);
......
...@@ -12,8 +12,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger); ...@@ -12,8 +12,6 @@ int toku_logger_open (const char *directory, TOKULOGGER logger);
int toku_logger_shutdown(TOKULOGGER logger); int toku_logger_shutdown(TOKULOGGER logger);
int toku_logger_close(TOKULOGGER *loggerp); int toku_logger_close(TOKULOGGER *loggerp);
u_int32_t toku_logger_get_lock_ctr(void);
int toku_logger_fsync (TOKULOGGER logger); int toku_logger_fsync (TOKULOGGER logger);
void toku_logger_panic (TOKULOGGER logger, int err); void toku_logger_panic (TOKULOGGER logger, int err);
int toku_logger_panicked(TOKULOGGER logger); int toku_logger_panicked(TOKULOGGER logger);
......
...@@ -931,7 +931,10 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) { ...@@ -931,7 +931,10 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
} }
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, NULL, NULL); r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn,
NULL, NULL,
// No need to release locks during recovery.
NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
...@@ -78,12 +78,16 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE ...@@ -78,12 +78,16 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, poll, poll_extra); void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN,
poll, poll_extra,
release_locks, reacquire_locks, locks_thunk);
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk) {
int r; int r;
// panic handled in log_commit // panic handled in log_commit
...@@ -93,7 +97,9 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -93,7 +97,9 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
txn->progress_poll_fun = poll; txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra; txn->progress_poll_fun_extra = poll_extra;
if (release_locks) release_locks(locks_thunk);
r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks. r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (reacquire_locks) reacquire_locks(locks_thunk);
if (r!=0) if (r!=0)
return r; return r;
r = toku_rollback_commit(txn, yield, yieldv, oplsn); r = toku_rollback_commit(txn, yield, yieldv, oplsn);
......
...@@ -9,9 +9,11 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -9,9 +9,11 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid); int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
void (*release_locks)(void*), void(*reacquire_locks)(void*), void *locks_thunk);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv, int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
......
...@@ -174,7 +174,6 @@ BDB_BINS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DON ...@@ -174,7 +174,6 @@ BDB_BINS = $(patsubst %.c,%.bdb$(BINSUF),$(filter-out $(patsubst %,%.c,$(BDB_DON
endif endif
TDB_TESTS_THAT_SHOULD_FAIL= \ TDB_TESTS_THAT_SHOULD_FAIL= \
test_groupcommit_count \
test944 \ test944 \
test_truncate_txn_abort \ test_truncate_txn_abort \
test_db_no_env test_db_no_env
...@@ -215,6 +214,8 @@ TLRECOVER = 2 3 4 5 6 7 8 9 10 ...@@ -215,6 +214,8 @@ TLRECOVER = 2 3 4 5 6 7 8 9 10
EXTRA_TDB_TESTS = \ EXTRA_TDB_TESTS = \
$(patsubst %,test_log%.recover,$(TLRECOVER)) \ $(patsubst %,test_log%.recover,$(TLRECOVER)) \
test_groupcommit_count_hgrind.tdbrun \
test_groupcommit_count_vgrind.tdbrun \
#\ ends prev line #\ ends prev line
ifeq ($(OS_CHOICE),windows) ifeq ($(OS_CHOICE),windows)
...@@ -602,6 +603,12 @@ helgrind2.bdbrun: BDBVGRIND=$(HGRIND) ...@@ -602,6 +603,12 @@ helgrind2.bdbrun: BDBVGRIND=$(HGRIND)
helgrind3.tdbrun: TDBVGRIND=$(HGRIND) helgrind3.tdbrun: TDBVGRIND=$(HGRIND)
helgrind3.bdbrun: BDBVGRIND=$(HGRIND) helgrind3.bdbrun: BDBVGRIND=$(HGRIND)
test_groupcommit_count_hgrind.tdbrun: HGRIND+=--suppressions=helgrind.suppressions
test_groupcommit_count_hgrind.tdbrun: test_groupcommit_count.tdb$(BINSUF)
$(HGRIND) ./test_groupcommit_count.tdb$(BINSUF) $(VERBVERBOSE) -n 1 -p hgrind $(SUMMARIZE_CMD)
test_groupcommit_count_vgrind.tdbrun: test_groupcommit_count.tdb$(BINSUF)
$(VGRIND) ./test_groupcommit_count.tdb$(BINSUF) $(VERBVERBOSE) -n 1 -p vgrind $(SUMMARIZE_CMD)
# we deliberately don't close the env, so recovery runs # we deliberately don't close the env, so recovery runs
# lets avoid all of the valgrind errors # lets avoid all of the valgrind errors
test-recover1.tdbrun: VGRIND= test-recover1.tdbrun: VGRIND=
......
{
helgrind_3.5.0_false_positive_against_pthread_create
Helgrind:Race
fun:mythread_wrapper
}
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
DB_ENV *env; DB_ENV *env;
DB *db; DB *db;
int do_sync=1;
#define NITER 100 #define NITER 100
...@@ -28,18 +29,20 @@ static void *start_a_thread (void *i_p) { ...@@ -28,18 +29,20 @@ static void *start_a_thread (void *i_p) {
dbt_init(&key, keystr, 1+strlen(keystr)), dbt_init(&key, keystr, 1+strlen(keystr)),
dbt_init(&data, keystr, 1+strlen(keystr)), dbt_init(&data, keystr, 1+strlen(keystr)),
0); 0);
r=tid->commit(tid, 0); CKERR(r); r=tid->commit(tid, do_sync ? 0 : DB_TXN_NOSYNC); CKERR(r);
} }
return 0; return 0;
} }
char *env_path;
static void static void
test_groupcommit (int nthreads) { test_groupcommit (int nthreads) {
int r; int r;
DB_TXN *tid; DB_TXN *tid;
r=db_env_create(&env, 0); assert(r==0); r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=env->open(env, env_path, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r); r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0); r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -59,6 +62,7 @@ test_groupcommit (int nthreads) { ...@@ -59,6 +62,7 @@ test_groupcommit (int nthreads) {
r=db->close(db, 0); assert(r==0); r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0); r=env->close(env, 0); assert(r==0);
//if (verbose) printf(" That's a total of %d commits\n", nthreads*NITER);
} }
// helgrind doesn't understand that pthread_join removes a race condition. I'm not impressed... -Bradley // helgrind doesn't understand that pthread_join removes a race condition. I'm not impressed... -Bradley
...@@ -88,42 +92,102 @@ static struct timeval prevtime; ...@@ -88,42 +92,102 @@ static struct timeval prevtime;
static int prev_count; static int prev_count;
static void static void
printtdiff (char *str) { printtdiff (int N) {
struct timeval thistime; struct timeval thistime;
gettimeofday(&thistime, 0); gettimeofday(&thistime, 0);
double diff = toku_tdiff(&thistime, &prevtime); double diff = toku_tdiff(&thistime, &prevtime);
int fcount=get_fsync_count(); int fcount=get_fsync_count();
if (verbose) printf("%s: %10.6fs %d fsyncs for %s\n", progname, diff, fcount-prev_count, str); if (verbose) printf("%s: %10.6fs %4d fsyncs for %4d threads %s %8.1f tps, %8.1f tps/thread\n", progname, diff, fcount-prev_count,
N,
do_sync ? "with sync " : "with DB_TXN_NOSYNC",
NITER*(N/diff), NITER/diff);
prevtime=thistime; prevtime=thistime;
prev_count=fcount; prev_count=fcount;
} }
static void
do_test (int N) {
for (do_sync = 0; do_sync<2; do_sync++) {
int count_before = get_fsync_count();
test_groupcommit(N);
printtdiff(N);
if (get_fsync_count()-count_before>= N*NITER) {
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n");
exit(1);
}
}
}
int log_max_n_threads_over_10 = 3;
static void
my_parse_args (int argc, char *argv[]) {
verbose=1; // use -q to turn off the talking.
env_path = toku_strdup(ENVDIR);
const char *argv0=argv[0];
while (argc>1) {
int resultcode=0;
if (strcmp(argv[1], "-v")==0) {
verbose++;
} else if (strcmp(argv[1],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[1],"-n")==0) {
argc--;
argv++;
if (argc<=1) { resultcode=1; goto do_usage; }
errno = 0;
char *end;
log_max_n_threads_over_10 = strtol(argv[1], &end, 10);
if (errno!=0 || *end) {
resultcode=1;
goto do_usage;
}
} else if (strcmp(argv[1],"-p")==0) {
argc--;
argv++;
if (argc<=1) { resultcode=1; goto do_usage; }
int size = strlen(ENVDIR) + 10 + strlen(argv[1]);
REALLOC_N(size, env_path);
assert(env_path);
snprintf(env_path, size, "%s.%s", ENVDIR, argv[1]);
} else if (strcmp(argv[1], "-h")==0) {
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q] [-n LOG(MAX_N_THREADS/10)] [-h]\n", argv0);
exit(resultcode);
} else {
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int int
test_main (int argc, char *const argv[]) { test_main (int argc, char *const argv[]) {
progname=argv[0]; progname=argv[0];
parse_args(argc, argv); my_parse_args(argc, argv);
gettimeofday(&prevtime, 0); gettimeofday(&prevtime, 0);
prev_count=0; prev_count=0;
{ int r = db_env_set_func_fsync(do_fsync); CKERR(r); } { int r = db_env_set_func_fsync(do_fsync); CKERR(r); }
system("rm -rf " ENVDIR); {
{ int r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); } int size=20+strlen(env_path);
char command[size];
test_groupcommit(1); printtdiff("1 thread"); snprintf(command, size, "rm -rf %s", env_path);
test_groupcommit(2); printtdiff("2 threads"); system(command);
int count_before_10 = get_fsync_count();
test_groupcommit(10); printtdiff("10 threads");
if (get_fsync_count()-count_before_10 >= 10*NITER) {
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n");
exit(1);
} }
int count_before_20 = get_fsync_count(); { int r=toku_os_mkdir(env_path, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); }
test_groupcommit(20); printtdiff("20 threads");
if (get_fsync_count()-count_before_20 >= 20*NITER) { test_groupcommit(1); printtdiff(1);
if (verbose) printf("It looks like too many fsyncs. Group commit doesn't appear to be occuring.\n"); test_groupcommit(2); printtdiff(2);
exit(1); for (int i=0; i<log_max_n_threads_over_10; i++) {
do_test(10 << i);
} }
toku_free(env_path);
return 0; return 0;
} }
...@@ -91,6 +91,7 @@ printtdiff (char *str) { ...@@ -91,6 +91,7 @@ printtdiff (char *str) {
struct timeval thistime; struct timeval thistime;
gettimeofday(&thistime, 0); gettimeofday(&thistime, 0);
if (verbose) printf("%10.6f %s\n", toku_tdiff(&thistime, &prevtime), str); if (verbose) printf("%10.6f %s\n", toku_tdiff(&thistime, &prevtime), str);
prevtime = thistime;
} }
int int
......
...@@ -1323,8 +1323,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) { ...@@ -1323,8 +1323,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) {
time_t now = time(NULL); time_t now = time(NULL);
format_time(&now, engstat->now); format_time(&now, engstat->now);
engstat->logger_lock_ctr = toku_logger_get_lock_ctr();
{ {
SCHEDULE_STATUS_S schedstat; SCHEDULE_STATUS_S schedstat;
toku_ydb_lock_get_status(&schedstat); toku_ydb_lock_get_status(&schedstat);
...@@ -1418,7 +1416,6 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) { ...@@ -1418,7 +1416,6 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
n += snprintf(buff + n, bufsiz - n, "time_ydb_lock_held_unavailable %"PRIu64"\n", engstat.time_ydb_lock_held_unavailable); n += snprintf(buff + n, bufsiz - n, "time_ydb_lock_held_unavailable %"PRIu64"\n", engstat.time_ydb_lock_held_unavailable);
n += snprintf(buff + n, bufsiz - n, "max_time_ydb_lock_held %"PRIu64"\n", engstat.max_time_ydb_lock_held); n += snprintf(buff + n, bufsiz - n, "max_time_ydb_lock_held %"PRIu64"\n", engstat.max_time_ydb_lock_held);
n += snprintf(buff + n, bufsiz - n, "total_time_ydb_lock_held %"PRIu64"\n", engstat.total_time_ydb_lock_held); n += snprintf(buff + n, bufsiz - n, "total_time_ydb_lock_held %"PRIu64"\n", engstat.total_time_ydb_lock_held);
n += snprintf(buff + n, bufsiz - n, "logger_lock_ctr %"PRIu64"\n", engstat.logger_lock_ctr);
n += snprintf(buff + n, bufsiz - n, "checkpoint_period %d \n", engstat.checkpoint_period); n += snprintf(buff + n, bufsiz - n, "checkpoint_period %d \n", engstat.checkpoint_period);
n += snprintf(buff + n, bufsiz - n, "checkpoint_footprint %d \n", engstat.checkpoint_footprint); n += snprintf(buff + n, bufsiz - n, "checkpoint_footprint %d \n", engstat.checkpoint_footprint);
n += snprintf(buff + n, bufsiz - n, "checkpoint_time_begin %s \n", engstat.checkpoint_time_begin); n += snprintf(buff + n, bufsiz - n, "checkpoint_time_begin %s \n", engstat.checkpoint_time_begin);
...@@ -1608,8 +1605,17 @@ static void ydb_yield (voidfp f, void *UU(v)) { ...@@ -1608,8 +1605,17 @@ static void ydb_yield (voidfp f, void *UU(v)) {
toku_ydb_lock(); toku_ydb_lock();
} }
static void release_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld release\n", get_tdiff(), pthread_self());
toku_ydb_unlock();
}
static void reacquire_ydb_lock_callback (void *ignore __attribute__((__unused__))) {
//printf("%8.6fs Thread %ld reacquire\n", get_tdiff(), pthread_self());
toku_ydb_lock();
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
if (!txn) return EINVAL; if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children //Recursively kill off children
...@@ -1648,7 +1654,9 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, ...@@ -1648,7 +1654,9 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL); //r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL, poll, poll_extra); r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
poll, poll_extra,
release_ydb_lock_callback, reacquire_ydb_lock_callback, NULL);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r; txn->mgrp->i->is_panicked = r;
...@@ -5292,8 +5300,10 @@ include_toku_pthread_yield (void) { ...@@ -5292,8 +5300,10 @@ include_toku_pthread_yield (void) {
// For test purposes only, translate dname to iname // For test purposes only, translate dname to iname
static int static int
env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) { env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
toku_ydb_lock();
DB *directory = env->i->directory; DB *directory = env->i->directory;
int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_PRELOCKED); // allocates memory for iname int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_PRELOCKED); // allocates memory for iname
toku_ydb_unlock();
return r; return r;
} }
......
...@@ -139,6 +139,7 @@ void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__v ...@@ -139,6 +139,7 @@ void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__v
int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default"))); int toku_os_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// wrapper around fsync // wrapper around fsync
int toku_file_fsync_without_accounting(int fd);
int toku_file_fsync(int fd); int toku_file_fsync(int fd);
// get the number of fsync calls and the fsync times // get the number of fsync calls and the fsync times
......
...@@ -249,9 +249,8 @@ static uint64_t get_tnow(void) { ...@@ -249,9 +249,8 @@ static uint64_t get_tnow(void) {
// keep trying if fsync fails because of EINTR // keep trying if fsync fails because of EINTR
int int
toku_file_fsync(int fd) { toku_file_fsync_without_accounting (int fd) {
int r = -1; int r = -1;
uint64_t tstart = get_tnow();
while (r != 0) { while (r != 0) {
if (t_fsync) if (t_fsync)
r = t_fsync(fd); r = t_fsync(fd);
...@@ -260,6 +259,13 @@ toku_file_fsync(int fd) { ...@@ -260,6 +259,13 @@ toku_file_fsync(int fd) {
if (r) if (r)
assert(errno==EINTR); assert(errno==EINTR);
} }
return r;
}
int
toku_file_fsync(int fd) {
uint64_t tstart = get_tnow();
int r = toku_file_fsync_without_accounting(fd);
#if TOKU_WINDOWS_HAS_ATOMIC_64 #if TOKU_WINDOWS_HAS_ATOMIC_64
toku_sync_fetch_and_increment_uint64(&toku_fsync_count); toku_sync_fetch_and_increment_uint64(&toku_fsync_count);
toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart); toku_sync_fetch_and_add_uint64(&toku_fsync_time, get_tnow() - tstart);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment