Commit 9979a41f authored by Yoni Fogel's avatar Yoni Fogel

refs #5467 merge "kill put loader, fix hot indexer freeze issue" onto main

git-svn-id: file:///svn/toku/tokudb@50137 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3dde22c3
...@@ -272,7 +272,11 @@ static void print_defines (void) { ...@@ -272,7 +272,11 @@ static void print_defines (void) {
/* LOADER flags */ /* LOADER flags */
printf("/* LOADER flags */\n"); printf("/* LOADER flags */\n");
printf("#define LOADER_USE_PUTS 1\n"); // minimize space usage {
uint32_t loader_flags = 0;
dodefine_from_track(loader_flags, LOADER_DISALLOW_PUTS); // Loader is only used for side effects.
dodefine_from_track(loader_flags, LOADER_COMPRESS_INTERMEDIATES);
}
} }
static void print_db_env_struct (void) { static void print_db_env_struct (void) {
...@@ -672,7 +676,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { ...@@ -672,7 +676,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf(" uint8_t stalled_on_checkpoint;\n"); printf(" uint8_t stalled_on_checkpoint;\n");
printf("} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;\n"); printf("} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;\n");
printf("typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);\n"); printf("typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);\n");
printf("struct txn_stat {\n uint64_t rollback_raw_count;\n};\n"); printf("struct txn_stat {\n uint64_t rollback_raw_count;\n uint64_t rollback_num_entries;\n};\n");
print_db_txn_struct(); print_db_txn_struct();
print_db_txn_stat_struct(); print_db_txn_stat_struct();
......
...@@ -109,7 +109,6 @@ struct cachefile { ...@@ -109,7 +109,6 @@ struct cachefile {
void *userdata; void *userdata;
void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files. void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed
void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function. void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function. void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
......
...@@ -2815,7 +2815,6 @@ void ...@@ -2815,7 +2815,6 @@ void
toku_cachefile_set_userdata (CACHEFILE cf, toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
...@@ -2824,7 +2823,6 @@ toku_cachefile_set_userdata (CACHEFILE cf, ...@@ -2824,7 +2823,6 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
cf->userdata = userdata; cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint; cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->log_suppress_rollback_during_checkpoint = log_suppress_rollback_during_checkpoint;
cf->close_userdata = close_userdata; cf->close_userdata = close_userdata;
cf->checkpoint_userdata = checkpoint_userdata; cf->checkpoint_userdata = checkpoint_userdata;
cf->begin_checkpoint_userdata = begin_checkpoint_userdata; cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
...@@ -4181,34 +4179,27 @@ void checkpointer::log_begin_checkpoint() { ...@@ -4181,34 +4179,27 @@ void checkpointer::log_begin_checkpoint() {
TXNID last_xid = toku_txn_manager_get_last_xid(mgr); TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid); toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid);
m_lsn_of_checkpoint_in_progress = begin_lsn; m_lsn_of_checkpoint_in_progress = begin_lsn;
// Log the list of open dictionaries. // Log the list of open dictionaries.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) { for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
assert(cf->log_fassociate_during_checkpoint); assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata); cf->log_fassociate_during_checkpoint(cf, cf->userdata);
} }
// Write open transactions to the log. // Write open transactions to the log.
r = toku_txn_manager_iter_over_live_txns<checkpointer, log_open_txn> ( r = toku_txn_manager_iter_over_live_txns<checkpointer, log_open_txn> (
m_logger->txn_manager, m_logger->txn_manager,
this); this);
assert(r == 0); assert(r == 0);
// Writes list of dictionaries that have had
// rollback logs suppressed.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
assert(cf->log_suppress_rollback_during_checkpoint);
cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata);
}
} }
// //
// Sets the pending bits of EVERY PAIR in the cachetable, regardless of // Sets the pending bits of EVERY PAIR in the cachetable, regardless of
// whether the PAIR is clean or not. It will be the responsibility of // whether the PAIR is clean or not. It will be the responsibility of
// end_checkpoint or client threads to simply clear the pending bit // end_checkpoint or client threads to simply clear the pending bit
// if the PAIR is clean. // if the PAIR is clean.
// //
// On entry and exit , the pair list's read list lock is grabbed, and // On entry and exit , the pair list's read list lock is grabbed, and
// both pending locks are grabbed // both pending locks are grabbed
// //
void checkpointer::turn_on_pending_bits() { void checkpointer::turn_on_pending_bits() {
......
...@@ -191,7 +191,6 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v ...@@ -191,7 +191,6 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*), void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN), void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*checkpoint_userdata)(CACHEFILE, int, void*), void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*), void (*begin_checkpoint_userdata)(LSN, void*),
......
...@@ -64,8 +64,8 @@ void toku_checkpoint_destroy(void); ...@@ -64,8 +64,8 @@ void toku_checkpoint_destroy(void);
typedef enum {SCHEDULED_CHECKPOINT = 0, // "normal" checkpoint taken on checkpoint thread typedef enum {SCHEDULED_CHECKPOINT = 0, // "normal" checkpoint taken on checkpoint thread
CLIENT_CHECKPOINT = 1, // induced by client, such as FLUSH LOGS or SAVEPOINT CLIENT_CHECKPOINT = 1, // induced by client, such as FLUSH LOGS or SAVEPOINT
TXN_COMMIT_CHECKPOINT = 2, INDEXER_CHECKPOINT = 2,
STARTUP_CHECKPOINT = 3, STARTUP_CHECKPOINT = 3,
UPGRADE_CHECKPOINT = 4, UPGRADE_CHECKPOINT = 4,
RECOVERY_CHECKPOINT = 5, RECOVERY_CHECKPOINT = 5,
SHUTDOWN_CHECKPOINT = 6} checkpoint_caller_t; SHUTDOWN_CHECKPOINT = 6} checkpoint_caller_t;
......
...@@ -462,18 +462,10 @@ struct ft { ...@@ -462,18 +462,10 @@ struct ft {
// the on-disk layout version is from before basement nodes) // the on-disk layout version is from before basement nodes)
int layout_version_read_from_disk; int layout_version_read_from_disk;
// If a transaction created this BRT, which one?
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
// only one thread can write to these at once, this is enforced by
// the lock tree
TXNID txnid_that_created_or_locked_when_empty;
TXNID txnid_that_suppressed_recovery_logs;
// Logically the reference count is zero if live_ft_handles is empty, txns is 0, and pinned_by_checkpoint is false. // Logically the reference count is zero if live_ft_handles is empty, txns is 0, and pinned_by_checkpoint is false.
// ft_ref_lock protects modifying live_ft_handles, txns, and pinned_by_checkpoint. // ft_ref_lock protects modifying live_ft_handles, txns, and pinned_by_checkpoint.
toku_mutex_t ft_ref_lock; toku_mutex_t ft_ref_lock;
struct toku_list live_ft_handles; struct toku_list live_ft_handles;
// Number of transactions that are using this FT. you should only be able // Number of transactions that are using this FT. you should only be able
// to modify this if you have a valid handle in live_ft_handles // to modify this if you have a valid handle in live_ft_handles
......
This diff is collapsed.
...@@ -143,9 +143,9 @@ void toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsy ...@@ -143,9 +143,9 @@ void toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsy
void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn); void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn);
void toku_ft_hot_index(FT_HANDLE brt, TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn); void toku_ft_hot_index(FT_HANDLE brt, TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn);
void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val); void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val);
void toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val); void toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val);
void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val); void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val);
void toku_ft_log_del (TOKUTXN txn, FT_HANDLE brt, const DBT *key); void toku_ft_log_del (TOKUTXN txn, FT_HANDLE brt, const DBT *key);
// Effect: Delete a key from a brt // Effect: Delete a key from a brt
...@@ -233,12 +233,6 @@ void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size ...@@ -233,12 +233,6 @@ void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size
// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
// Return 0 on success, otherwise an error number. // Return 0 on success, otherwise an error number.
void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn);
// Effect: suppresses recovery logs
// Requires: this is a (target) redirected brt
// implies: txnid_that_created_or_locked_when_empty matches txn
// implies: toku_txn_note_ft(brt, txn) has been called
int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result)); int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result));
bool toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result)); bool toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result));
......
...@@ -15,14 +15,6 @@ ...@@ -15,14 +15,6 @@
#include <toku_assert.h> #include <toku_assert.h>
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
void
toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) {
TXNID txnid = toku_txn_get_txnid(txn);
assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
h->txnid_that_created_or_locked_when_empty == txnid);
h->txnid_that_created_or_locked_when_empty = txnid;
}
void void
toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) { toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
// Reset the root_xid_that_created field to the given value. // Reset the root_xid_that_created field to the given value.
...@@ -109,22 +101,6 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) { ...@@ -109,22 +101,6 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close); toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
} }
// maps to cf->log_suppress_rollback_during_checkpoint
static void
ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) {
FT h = (FT) header_v;
TXNID xid = h->txnid_that_created_or_locked_when_empty;
if (xid != TXNID_NONE) {
//Only log if useful.
TOKULOGGER logger = toku_cachefile_logger(cf);
FILENUM filenum = toku_cachefile_filenum (cf);
// We don't have access to the txn here, but the txn is
// necessarily already marked as non-readonly. Use NULL.
TOKUTXN txn = NULL;
toku_log_suppress_rollback(logger, NULL, 0, txn, filenum, xid);
}
}
// Maps to cf->begin_checkpoint_userdata // Maps to cf->begin_checkpoint_userdata
// Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...). // Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...).
// Has access to fd (it is protected). // Has access to fd (it is protected).
...@@ -331,7 +307,6 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) { ...@@ -331,7 +307,6 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
toku_cachefile_set_userdata(ft->cf, toku_cachefile_set_userdata(ft->cf,
ft, ft,
ft_log_fassociate_during_checkpoint, ft_log_fassociate_during_checkpoint,
ft_log_suppress_rollback_during_checkpoint,
ft_close, ft_close,
ft_checkpoint, ft_checkpoint,
ft_begin_checkpoint, ft_begin_checkpoint,
...@@ -432,7 +407,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac ...@@ -432,7 +407,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
toku_cachefile_set_userdata(cf, toku_cachefile_set_userdata(cf,
(void*)h, (void*)h,
ft_log_fassociate_during_checkpoint, ft_log_fassociate_during_checkpoint,
ft_log_suppress_rollback_during_checkpoint,
ft_close, ft_close,
ft_checkpoint, ft_checkpoint,
ft_begin_checkpoint, ft_begin_checkpoint,
...@@ -720,15 +694,17 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU ...@@ -720,15 +694,17 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU
if (txn) { if (txn) {
toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
// There is no recovery log entry for redirect,
// and rollback log entries are not allowed for read-only transactions.
// Normally the recovery log entry would ensure the begin was logged.
if (!txn->begin_was_logged) {
toku_maybe_log_begin_txn_for_write_operation(txn);
}
FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf); FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf); FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum); toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
TXNID xid = toku_txn_get_txnid(txn);
toku_ft_suppress_rollbacks(new_ft, txn);
toku_log_suppress_rollback(txn->logger, NULL, 0, txn, new_filenum, xid);
} }
cleanup: cleanup:
return r; return r;
} }
......
...@@ -20,9 +20,6 @@ ...@@ -20,9 +20,6 @@
void toku_ft_unlink(FT_HANDLE handle); void toku_ft_unlink(FT_HANDLE handle);
void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn); void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn);
//Effect: suppresses rollback logs
void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn);
void toku_ft_init_reflock(FT ft); void toku_ft_init_reflock(FT ft);
void toku_ft_destroy_reflock(FT ft); void toku_ft_destroy_reflock(FT ft);
void toku_ft_grab_reflock(FT ft); void toku_ft_grab_reflock(FT ft);
...@@ -60,19 +57,19 @@ toku_ft_init( ...@@ -60,19 +57,19 @@ toku_ft_init(
int toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) __attribute__ ((warn_unused_result)); int toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) __attribute__ ((warn_unused_result));
int toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn); int toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn);
void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created); void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value. // Reset the root_xid_that_created field to the given value.
// This redefines which xid created the dictionary. // This redefines which xid created the dictionary.
void toku_ft_add_txn_ref(FT h); void toku_ft_add_txn_ref(FT h);
void toku_ft_remove_txn_ref(FT h); void toku_ft_remove_txn_ref(FT h);
void toku_calculate_root_offset_pointer ( FT h, CACHEKEY* root_key, uint32_t *roothash); void toku_calculate_root_offset_pointer ( FT h, CACHEKEY* root_key, uint32_t *roothash);
void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key); void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key);
LSN toku_ft_checkpoint_lsn(FT h) __attribute__ ((warn_unused_result)); LSN toku_ft_checkpoint_lsn(FT h) __attribute__ ((warn_unused_result));
void toku_ft_stat64 (FT h, struct ftstat64_s *s); void toku_ft_stat64 (FT h, struct ftstat64_s *s);
// unconditionally set the descriptor for an open FT. can't do this when // unconditionally set the descriptor for an open FT. can't do this when
// any operation has already occurred on the ft. // any operation has already occurred on the ft.
// see toku_ft_change_descriptor(), which is the transactional version // see toku_ft_change_descriptor(), which is the transactional version
// used by the ydb layer. it better describes the client contract. // used by the ydb layer. it better describes the client contract.
void toku_ft_update_descriptor(FT ft, DESCRIPTOR d); void toku_ft_update_descriptor(FT ft, DESCRIPTOR d);
......
...@@ -28,7 +28,8 @@ enum ft_layout_version_e { ...@@ -28,7 +28,8 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate, FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate,
// mgr_last_xid after begin checkpoint, // mgr_last_xid after begin checkpoint,
// last_xid to shutdown // last_xid to shutdown
FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header,
// Removed log suppression logentry
FT_NEXT_VERSION, // the version after the current version FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
...@@ -152,7 +152,6 @@ struct tokutxn { ...@@ -152,7 +152,6 @@ struct tokutxn {
bool begin_was_logged; bool begin_was_logged;
// These are not read until a commit, prepare, or abort starts, and // These are not read until a commit, prepare, or abort starts, and
// they're "monotonic" (only go false->true) during operation: // they're "monotonic" (only go false->true) during operation:
bool checkpoint_needed_before_commit;
bool do_fsync; bool do_fsync;
bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn) bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
......
...@@ -131,15 +131,12 @@ const struct logtype logtypes[] = { ...@@ -131,15 +131,12 @@ const struct logtype logtypes[] = {
{"uint64_t", "rollentry_raw_count", 0}, {"uint64_t", "rollentry_raw_count", 0},
{"FILENUMS", "open_filenums", 0}, {"FILENUMS", "open_filenums", 0},
{"uint8_t", "force_fsync_on_commit", 0}, {"uint8_t", "force_fsync_on_commit", 0},
{"uint64_t", "num_rollback_nodes", 0}, {"uint64_t", "num_rollback_nodes", 0},
{"uint64_t", "num_rollentries", 0}, {"uint64_t", "num_rollentries", 0},
{"BLOCKNUM", "spilled_rollback_head", 0}, {"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0}, {"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0}, {"BLOCKNUM", "current_rollback", 0},
NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, // record all transactions NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, // record all transactions
{"suppress_rollback", 'S', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}, SHOULD_LOG_BEGIN},
// Records produced by transactions // Records produced by transactions
{"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, IGNORE_LOG_BEGIN}, {"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, IGNORE_LOG_BEGIN},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, {"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED},
...@@ -413,7 +410,6 @@ generate_log_writer (void) { ...@@ -413,7 +410,6 @@ generate_log_writer (void) {
switch (lt->log_begin_action) { switch (lt->log_begin_action) {
case SHOULD_LOG_BEGIN: { case SHOULD_LOG_BEGIN: {
fprintf(cf, " //txn can be NULL during tests\n"); fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " //txn can be also be NULL for suppress_rollback during checkpoint,\n");
fprintf(cf, " //never null when not checkpoint.\n"); fprintf(cf, " //never null when not checkpoint.\n");
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n"); fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n"); fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
......
...@@ -629,26 +629,6 @@ static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenpr ...@@ -629,26 +629,6 @@ static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenpr
return 0; return 0;
} }
static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r==0) {
//File is open
TOKUTXN txn = NULL;
toku_txnid2txn(renv->logger, l->xid, &txn);
assert(txn!=NULL);
FT ft = tuple->ft_handle->ft;
toku_ft_suppress_rollbacks(ft, txn);
toku_txn_maybe_note_ft(txn, ft);
}
return 0;
}
static int toku_recover_backward_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) { static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
int r; int r;
r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger); r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
......
...@@ -46,14 +46,6 @@ int ...@@ -46,14 +46,6 @@ int
note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) { note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) {
TOKUTXN parent = child->parent; TOKUTXN parent = child->parent;
toku_txn_maybe_note_ft(parent, ft); toku_txn_maybe_note_ft(parent, ft);
if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
//Pass magic "no rollback needed" flag to parent.
ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
}
if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
//Pass magic "no recovery needed" flag to parent.
ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
}
return 0; return 0;
} }
...@@ -217,11 +209,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { ...@@ -217,11 +209,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn); r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn);
assert(r==0); assert(r==0);
// Merge the list of headers that must be checkpointed before commit
if (txn->checkpoint_needed_before_commit) {
txn->parent->checkpoint_needed_before_commit = true;
}
//If this transaction needs an fsync (if it commits) //If this transaction needs an fsync (if it commits)
//save that in the parent. Since the commit really happens in the root txn. //save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
......
...@@ -210,10 +210,11 @@ exit: ...@@ -210,10 +210,11 @@ exit:
} }
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression) // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count) int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
{ {
toku_txn_lock(txn); toku_txn_lock(txn);
*raw_count = txn->roll_info.rollentry_raw_count; txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
toku_txn_unlock(txn); toku_txn_unlock(txn);
return 0; return 0;
} }
......
...@@ -48,7 +48,7 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); ...@@ -48,7 +48,7 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log); void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
void toku_txn_maybe_note_ft (TOKUTXN txn, FT h); void toku_txn_maybe_note_ft (TOKUTXN txn, FT h);
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count); int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat);
int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind); int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind);
......
...@@ -330,12 +330,11 @@ cachetable_test (void) { ...@@ -330,12 +330,11 @@ cachetable_test (void) {
char fname1[] = __SRCFILE__ "test1.dat"; char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1); unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata( toku_cachefile_set_userdata(
f1, f1,
NULL, NULL,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr, &dummy_close_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
&test_begin_checkpoint, &test_begin_checkpoint,
......
...@@ -457,25 +457,24 @@ cachetable_test (void) { ...@@ -457,25 +457,24 @@ cachetable_test (void) {
time_of_test = 60; time_of_test = 60;
int r; int r;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER); toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
char fname1[] = __SRCFILE__ "test-put-checkpoint.dat"; char fname1[] = __SRCFILE__ "test-put-checkpoint.dat";
unlink(fname1); unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0); r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata( toku_cachefile_set_userdata(
f1, f1,
NULL, NULL,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr, &dummy_close_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint test_begin_checkpoint, // called in begin_checkpoint
&dummy_end, &dummy_end,
&dummy_note_pin, &dummy_note_pin,
&dummy_note_unpin &dummy_note_unpin
); );
toku_pthread_t time_tid; toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid; toku_pthread_t checkpoint_tid;
toku_pthread_t move_tid[NUM_MOVER_THREADS]; toku_pthread_t move_tid[NUM_MOVER_THREADS];
......
...@@ -10,7 +10,6 @@ ...@@ -10,7 +10,6 @@
// Dummy callbacks for checkpointing // Dummy callbacks for checkpointing
// //
static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { } static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_log_rollback(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { } static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { }
static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { } static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static void dummy_begin(LSN UU(lsn), void* UU(p)) { } static void dummy_begin(LSN UU(lsn), void* UU(p)) { }
...@@ -28,7 +27,6 @@ create_dummy_functions(CACHEFILE cf) ...@@ -28,7 +27,6 @@ create_dummy_functions(CACHEFILE cf)
toku_cachefile_set_userdata(cf, toku_cachefile_set_userdata(cf,
ud, ud,
&dummy_log_fassociate, &dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr, &dummy_close_usr,
&dummy_chckpnt_usr, &dummy_chckpnt_usr,
&dummy_begin, &dummy_begin,
......
...@@ -152,7 +152,6 @@ void toku_txn_create_txn ( ...@@ -152,7 +152,6 @@ void toku_txn_create_txn (
.live_root_txn_list = nullptr, .live_root_txn_list = nullptr,
.xids = xids, .xids = xids,
.begin_was_logged = false, .begin_was_logged = false,
.checkpoint_needed_before_commit = false,
.do_fsync = false, .do_fsync = false,
.force_fsync_on_commit = false, .force_fsync_on_commit = false,
.do_fsync_lsn = ZERO_LSN, .do_fsync_lsn = ZERO_LSN,
...@@ -246,21 +245,11 @@ int toku_txn_commit_txn(TOKUTXN txn, int nosync, ...@@ -246,21 +245,11 @@ int toku_txn_commit_txn(TOKUTXN txn, int nosync,
poll, poll_extra); poll, poll_extra);
} }
void
toku_txn_require_checkpoint_on_commit(TOKUTXN txn) {
txn->checkpoint_needed_before_commit = true;
}
struct xcommit_info { struct xcommit_info {
int r; int r;
TOKUTXN txn; TOKUTXN txn;
}; };
bool toku_txn_requires_checkpoint(TOKUTXN txn) {
return (!txn->parent && txn->checkpoint_needed_before_commit);
}
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
{ {
...@@ -377,16 +366,10 @@ void toku_txn_close_txn(TOKUTXN txn) { ...@@ -377,16 +366,10 @@ void toku_txn_close_txn(TOKUTXN txn) {
} }
int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn); int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn);
int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn) int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn))
// Effect: This function is called on every open FT that a transaction used. // Effect: This function is called on every open FT that a transaction used.
// This function removes the transaction from that FT. // This function removes the transaction from that FT.
{ {
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
toku_ft_remove_txn_ref(h); toku_ft_remove_txn_ref(h);
return 0; return 0;
......
...@@ -44,7 +44,6 @@ int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info); ...@@ -44,7 +44,6 @@ int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, int toku_txn_commit_txn (TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
bool toku_txn_requires_checkpoint(TOKUTXN txn);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn, int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
...@@ -66,9 +65,6 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn); ...@@ -66,9 +65,6 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn);
// Complete and destroy a txn // Complete and destroy a txn
void toku_txn_close_txn(TOKUTXN txn); void toku_txn_close_txn(TOKUTXN txn);
// Require a checkpoint upon commit
void toku_txn_require_checkpoint_on_commit(TOKUTXN txn);
// Remove a txn from any live txn lists // Remove a txn from any live txn lists
void toku_txn_complete_txn(TOKUTXN txn); void toku_txn_complete_txn(TOKUTXN txn);
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <ft/log-internal.h> #include <ft/log-internal.h>
#include <ft/checkpoint.h> #include <ft/checkpoint.h>
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
#include "loader.h"
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// Engine status // Engine status
...@@ -203,7 +204,7 @@ toku_indexer_create_indexer(DB_ENV *env, ...@@ -203,7 +204,7 @@ toku_indexer_create_indexer(DB_ENV *env,
// //
{ {
DB_LOADER* loader = NULL; DB_LOADER* loader = NULL;
rval = env->create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_USE_PUTS); rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true);
if (rval) { if (rval) {
goto create_exit; goto create_exit;
} }
...@@ -473,6 +474,11 @@ build_index(DB_INDEXER *indexer) { ...@@ -473,6 +474,11 @@ build_index(DB_INDEXER *indexer) {
// - unique checks? // - unique checks?
if ( result == 0 ) { if ( result == 0 ) {
// Perform a checkpoint so that all of the indexing makes it to disk before continuing.
// Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log.
DB_ENV *env = indexer->i->env;
CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT);
(void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1); (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1);
} else { } else {
(void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1); (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
...@@ -487,14 +493,6 @@ close_indexer(DB_INDEXER *indexer) { ...@@ -487,14 +493,6 @@ close_indexer(DB_INDEXER *indexer) {
int r = 0; int r = 0;
(void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1); (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
// Mark txn as needing a checkpoint.
// (This will cause a checkpoint, which is necessary
// because these files are not necessarily on disk and all the operations
// to create them are not in the recovery log.)
DB_TXN *txn = indexer->i->txn;
TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
toku_txn_require_checkpoint_on_commit(tokutxn);
// Disassociate the indexer from the hot db and free_indexer // Disassociate the indexer from the hot db and free_indexer
disassociate_indexer_from_hot_dbs(indexer); disassociate_indexer_from_hot_dbs(indexer);
free_indexer(indexer); free_indexer(indexer);
......
...@@ -89,9 +89,6 @@ struct __toku_loader_internal { ...@@ -89,9 +89,6 @@ struct __toku_loader_internal {
void *poll_extra; void *poll_extra;
char *temp_file_template; char *temp_file_template;
DBT *ekeys;
DBT *evals;
DBT err_key; /* error key */ DBT err_key; /* error key */
DBT err_val; /* error val */ DBT err_val; /* error val */
int err_i; /* error i */ int err_i; /* error i */
...@@ -109,21 +106,6 @@ struct __toku_loader_internal { ...@@ -109,21 +106,6 @@ struct __toku_loader_internal {
static void free_loader_resources(DB_LOADER *loader) static void free_loader_resources(DB_LOADER *loader)
{ {
if ( loader->i ) { if ( loader->i ) {
for (int i=0; i<loader->i->N; i++) {
if (loader->i->ekeys &&
loader->i->ekeys[i].data &&
loader->i->ekeys[i].flags == DB_DBT_REALLOC) {
toku_free(loader->i->ekeys[i].data);
}
if (loader->i->evals &&
loader->i->evals[i].data &&
loader->i->evals[i].flags == DB_DBT_REALLOC) {
toku_free(loader->i->evals[i].data);
}
}
if (loader->i->ekeys) toku_free(loader->i->ekeys);
if (loader->i->evals) toku_free(loader->i->evals);
if (loader->i->err_key.data) toku_free(loader->i->err_key.data); if (loader->i->err_key.data) toku_free(loader->i->err_key.data);
if (loader->i->err_val.data) toku_free(loader->i->err_val.data); if (loader->i->err_val.data) toku_free(loader->i->err_val.data);
...@@ -171,24 +153,27 @@ static int ft_loader_close_and_redirect(DB_LOADER *loader) { ...@@ -171,24 +153,27 @@ static int ft_loader_close_and_redirect(DB_LOADER *loader) {
} }
static int create_loader(DB_ENV *env, // loader_flags currently has the following flags:
DB_TXN *txn, // LOADER_DISALLOW_PUTS loader->put is not allowed.
DB_LOADER **blp, // Loader is only being used for its side effects
DB *src_db, // DB_PRELOCKED_WRITE Table lock is already held, no need to relock.
int N, int
DB *dbs[], toku_loader_create_loader(DB_ENV *env,
uint32_t db_flags[/*N*/], DB_TXN *txn,
uint32_t dbt_flags[/*N*/], DB_LOADER **blp,
uint32_t loader_flags, DB *src_db,
bool check_empty) int N,
{ DB *dbs[],
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags,
bool check_empty) {
int rval; int rval;
bool use_ft_loader = (loader_flags == 0);
*blp = NULL; // set later when created *blp = NULL; // set later when created
DB_LOADER *loader = NULL; DB_LOADER *loader = NULL;
bool use_puts = loader_flags&LOADER_USE_PUTS; bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS);
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
...@@ -248,10 +233,8 @@ static int create_loader(DB_ENV *env, ...@@ -248,10 +233,8 @@ static int create_loader(DB_ENV *env,
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
brts[i] = dbs[i]->i->ft_handle; brts[i] = dbs[i]->i->ft_handle;
} }
loader->i->ekeys = NULL;
loader->i->evals = NULL;
LSN load_lsn; LSN load_lsn;
rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader); rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
if ( rval!=0 ) { if ( rval!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
...@@ -269,7 +252,7 @@ static int create_loader(DB_ENV *env, ...@@ -269,7 +252,7 @@ static int create_loader(DB_ENV *env,
loader->i->temp_file_template, loader->i->temp_file_template,
load_lsn, load_lsn,
ttxn, ttxn,
!use_puts); puts_allowed);
if ( rval!=0 ) { if ( rval!=0 ) {
toku_free(new_inames_in_env); toku_free(new_inames_in_env);
toku_free(brts); toku_free(brts);
...@@ -278,18 +261,9 @@ static int create_loader(DB_ENV *env, ...@@ -278,18 +261,9 @@ static int create_loader(DB_ENV *env,
loader->i->inames_in_env = new_inames_in_env; loader->i->inames_in_env = new_inames_in_env;
toku_free(brts); toku_free(brts);
if (use_puts) { if (!puts_allowed) {
XCALLOC_N(loader->i->N, loader->i->ekeys);
XCALLOC_N(loader->i->N, loader->i->evals);
// the following function grabs the ydb lock, so we
// first unlock before calling it
rval = ft_loader_close_and_redirect(loader); rval = ft_loader_close_and_redirect(loader);
assert_zero(rval); assert_zero(rval);
for (int i=0; i<N; i++) {
loader->i->ekeys[i].flags = DB_DBT_REALLOC;
loader->i->evals[i].flags = DB_DBT_REALLOC;
toku_ft_suppress_recovery_logs(dbs[i]->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
}
loader->i->ft_loader = NULL; loader->i->ft_loader = NULL;
// close the ft_loader and skip to the redirection // close the ft_loader and skip to the redirection
rval = 0; rval = 0;
...@@ -312,37 +286,6 @@ static int create_loader(DB_ENV *env, ...@@ -312,37 +286,6 @@ static int create_loader(DB_ENV *env,
return rval; return rval;
} }
// loader_flags currently has three possible values:
// 0 use brt loader
// USE_PUTS do not use brt loader, use log suppression mechanism (2440)
// which results in recursive call here via toku_db_pre_acquire_table_lock()
// DB_PRELOCKED_WRITE do not use brt loader, this is the recursive (inner) call via
// toku_db_pre_acquire_table_lock()
int toku_loader_create_loader(DB_ENV *env,
DB_TXN *txn,
DB_LOADER **blp,
DB *src_db,
int N,
DB *dbs[],
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags)
{
return create_loader(
env,
txn,
blp,
src_db,
N,
dbs,
db_flags,
dbt_flags,
loader_flags,
true
);
}
int toku_loader_set_poll_function(DB_LOADER *loader, int toku_loader_set_poll_function(DB_LOADER *loader,
int (*poll_func)(void *extra, float progress), int (*poll_func)(void *extra, float progress),
void *poll_extra) void *poll_extra)
...@@ -377,16 +320,9 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) ...@@ -377,16 +320,9 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
goto cleanup; goto cleanup;
} }
if (loader->i->loader_flags & LOADER_USE_PUTS) { if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) {
r = loader->i->env->put_multiple(loader->i->env, r = EINVAL;
loader->i->src_db, // src_db goto cleanup;
loader->i->txn,
key, val,
loader->i->N, // num_dbs
loader->i->dbs, // (DB**)db_array
loader->i->ekeys,
loader->i->evals,
loader->i->db_flags); // flags_array
} }
else { else {
// calling toku_ft_loader_put without a lock assumes that the // calling toku_ft_loader_put without a lock assumes that the
...@@ -422,7 +358,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) ...@@ -422,7 +358,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) { static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) {
DB_LOADER* tmp_loader = NULL; DB_LOADER* tmp_loader = NULL;
int r = create_loader( int r = toku_loader_create_loader(
loader->i->env, loader->i->env,
loader->i->txn, loader->i->txn,
&tmp_loader, &tmp_loader,
...@@ -446,7 +382,7 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -446,7 +382,7 @@ int toku_loader_close(DB_LOADER *loader)
if ( loader->i->error_callback != NULL ) { if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
} }
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
r = toku_ft_loader_abort(loader->i->ft_loader, true); r = toku_ft_loader_abort(loader->i->ft_loader, true);
redirect_loader_to_empty_dictionaries(loader); redirect_loader_to_empty_dictionaries(loader);
} }
...@@ -455,7 +391,7 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -455,7 +391,7 @@ int toku_loader_close(DB_LOADER *loader)
} }
} }
else { // no error outstanding else { // no error outstanding
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
r = ft_loader_close_and_redirect(loader); r = ft_loader_close_and_redirect(loader);
if (r) { if (r) {
redirect_loader_to_empty_dictionaries(loader); redirect_loader_to_empty_dictionaries(loader);
...@@ -481,7 +417,7 @@ int toku_loader_abort(DB_LOADER *loader) ...@@ -481,7 +417,7 @@ int toku_loader_abort(DB_LOADER *loader)
} }
} }
if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) { if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS) ) {
r = toku_ft_loader_abort(loader->i->ft_loader, true); r = toku_ft_loader_abort(loader->i->ft_loader, true);
lazy_assert_zero(r); lazy_assert_zero(r);
} }
......
...@@ -40,7 +40,7 @@ Create and set up a loader. ...@@ -40,7 +40,7 @@ Create and set up a loader.
Modifies: :: env, txn, blp, and dbs. Modifies: :: env, txn, blp, and dbs.
*/ */
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags); int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, bool check_empty);
/* /*
......
...@@ -541,12 +541,14 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -541,12 +541,14 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb) add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb)
add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb) add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb)
add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb) add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb)
add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.loader-stress-test0.tdb dir.loader-stress-test0.tdb
dir.loader-stress-test1.tdb dir.loader-stress-test1.tdb
dir.loader-stress-test2.tdb dir.loader-stress-test2.tdb
dir.loader-stress-test3.tdb dir.loader-stress-test3.tdb
dir.loader-stress-test4.tdb dir.loader-stress-test4.tdb
dir.loader-stress-test5.tdb
) )
list(REMOVE_ITEM loader_tests loader-dup-test.loader) list(REMOVE_ITEM loader_tests loader-dup-test.loader)
...@@ -643,11 +645,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS) ...@@ -643,11 +645,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
get_filename_component(base ${loader_test} NAME_WE) get_filename_component(base ${loader_test} NAME_WE)
add_test(ydb/${base}.nop.loader ${base}.tdb -e "dir.${base}.nop.loader") add_test(ydb/${base}.nop.loader ${base}.tdb -e "dir.${base}.nop.loader")
add_test(ydb/${base}.p.loader ${base}.tdb -p -e "dir.${base}.p.loader") add_test(ydb/${base}.p.loader ${base}.tdb -p -e "dir.${base}.p.loader")
add_test(ydb/${base}.comp.loader ${base}.tdb -z -e "dir.${base}.comp.loader")
if("${tdb_tests_that_should_fail}" MATCHES "${base}.loader") if("${tdb_tests_that_should_fail}" MATCHES "${base}.loader")
list(REMOVE_ITEM tdb_tests_that_should_fail ${base}.loader) list(REMOVE_ITEM tdb_tests_that_should_fail ${base}.loader)
list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader) list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader ${base}.comp.loader)
endif() endif()
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader") set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader" "dir.${base}.comp.loader")
endforeach(loader_test) endforeach(loader_test)
set(tdb_tests_that_should_fail "ydb/${tdb_tests_that_should_fail}") set(tdb_tests_that_should_fail "ydb/${tdb_tests_that_should_fail}")
......
...@@ -84,7 +84,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { ...@@ -84,7 +84,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
&db, &db,
&mult_put_flags, &mult_put_flags,
&mult_dbt_flags, &mult_dbt_flags,
LOADER_USE_PUTS LOADER_COMPRESS_INTERMEDIATES
); );
CKERR(r); CKERR(r);
} }
...@@ -102,7 +102,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { ...@@ -102,7 +102,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
CKERR(r); CKERR(r);
} }
else { else {
r = db->put(db, txn, &key, &val, 0); r = db->put(db, txn, &key, &val, 0);
CKERR(r); CKERR(r);
} }
} }
...@@ -116,11 +116,13 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) { ...@@ -116,11 +116,13 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
*raw_count = s2->rollback_raw_count - s1->rollback_raw_count; *raw_count = s2->rollback_raw_count - s1->rollback_raw_count;
if (do_loader) { if (do_loader) {
assert(s1->rollback_raw_count == s2->rollback_raw_count); assert(s1->rollback_raw_count < s2->rollback_raw_count);
assert(s1->rollback_num_entries + 1 == s2->rollback_num_entries);
} else { } else {
assert(s1->rollback_raw_count < s2->rollback_raw_count); assert(s1->rollback_raw_count < s2->rollback_raw_count);
assert(s1->rollback_num_entries < s2->rollback_num_entries);
} }
toku_free(s1); toku_free(s2); toku_free(s1); toku_free(s2);
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
......
...@@ -36,7 +36,13 @@ static void insert(DB_LOADER *loader, int k, int val_size) { ...@@ -36,7 +36,13 @@ static void insert(DB_LOADER *loader, int k, int val_size) {
DBT key = { .data = key_buffer, .size = sizeof key_buffer }; DBT key = { .data = key_buffer, .size = sizeof key_buffer };
DBT value = { .data = val_buffer, .size = val_size }; DBT value = { .data = val_buffer, .size = val_size };
r = loader->put(loader, &key, &value); assert_zero(r); r = loader->put(loader, &key, &value);
if (DISALLOW_PUTS) {
assert(r == EINVAL);
}
else {
assert_zero(r);
}
toku_free(val_buffer); toku_free(val_buffer);
} }
...@@ -65,8 +71,12 @@ int test_main(int argc, char * const argv[]) { ...@@ -65,8 +71,12 @@ int test_main(int argc, char * const argv[]) {
if (verbose > 0) verbose--; if (verbose > 0) verbose--;
continue; continue;
} }
if (strcmp(arg, "-z") == 0) {
loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
continue;
}
if (strcmp(arg, "-p") == 0) { if (strcmp(arg, "-p") == 0) {
loader_flags = LOADER_USE_PUTS; loader_flags |= LOADER_DISALLOW_PUTS;
continue; continue;
} }
if (strcmp(arg, "--txn") == 0 && i+1 < argc) { if (strcmp(arg, "--txn") == 0 && i+1 < argc) {
......
...@@ -68,7 +68,8 @@ int NUM_DBS=default_NUM_DBS; ...@@ -68,7 +68,8 @@ int NUM_DBS=default_NUM_DBS;
int NUM_ROWS=default_NUM_ROWS; int NUM_ROWS=default_NUM_ROWS;
//static int NUM_ROWS=50000000; //static int NUM_ROWS=50000000;
int CHECK_RESULTS=0; int CHECK_RESULTS=0;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
int event_trigger_lo=0; // what event triggers to use? int event_trigger_lo=0; // what event triggers to use?
int event_trigger_hi =0; // 0 and 0 mean none. int event_trigger_hi =0; // 0 and 0 mean none.
enum {MAGIC=311}; enum {MAGIC=311};
...@@ -528,13 +529,17 @@ static void check_results(DB **dbs) ...@@ -528,13 +529,17 @@ static void check_results(DB **dbs)
CKERR(r); CKERR(r);
for(int i=0;i<NUM_ROWS;i++) { for(int i=0;i<NUM_ROWS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r); if (DISALLOW_PUTS) {
k = *(unsigned int*)key.data; CKERR2(r, EINVAL);
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); } else {
v = *(unsigned int*)val.data; CKERR(r);
// test that we have the expected keys and values k = *(unsigned int*)key.data;
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j)); // printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
}
} }
{printf("."); fflush(stdout);} {printf("."); fflush(stdout);}
r = cursor->c_close(cursor); r = cursor->c_close(cursor);
...@@ -596,7 +601,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -596,7 +601,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p/-z option
if (verbose >= 2) if (verbose >= 2)
printf("old inames:\n"); printf("old inames:\n");
...@@ -612,8 +617,10 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -612,8 +617,10 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
r = loader->set_poll_function(loader, poll_function, expect_poll_void); r = loader->set_poll_function(loader, poll_function, expect_poll_void);
CKERR(r); CKERR(r);
if (verbose) if (verbose) {
printf("USE_PUTS = %d\n", USE_PUTS); printf("DISALLOW_PUTS = %d\n", DISALLOW_PUTS);
printf("COMPRESS = %d\n", COMPRESS);
}
if (verbose >= 2) if (verbose >= 2)
printf("new inames:\n"); printf("new inames:\n");
get_inames(new_inames, dbs); get_inames(new_inames, dbs);
...@@ -627,7 +634,9 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -627,7 +634,9 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
if (r != 0) { if (DISALLOW_PUTS) {
assert(r == EINVAL);
} else if (r != 0) {
assert(error_injection && error_injected); assert(error_injection && error_injected);
failed_put = r; failed_put = r;
} }
...@@ -649,13 +658,13 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -649,13 +658,13 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
} }
r = loader->close(loader); r = loader->close(loader);
CKERR(r); CKERR(r);
if (!USE_PUTS) { if (!DISALLOW_PUTS) {
assert(poll_count>0); assert(poll_count>0);
// You cannot count temp files here // You cannot count temp files here
} }
} }
else if (t == abort_via_poll) { else if (t == abort_via_poll) {
assert(!USE_PUTS); // test makes no sense with USE_PUTS assert(!DISALLOW_PUTS); // test makes no sense with DISALLOW_PUTS
if (verbose) if (verbose)
printf("closing, but expecting abort via poll\n"); printf("closing, but expecting abort via poll\n");
r = loader->close(loader); r = loader->close(loader);
...@@ -674,7 +683,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -674,7 +683,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
else else
printf("closing, expecting no error because number of system calls was less than predicted (%s)\n", type); printf("closing, expecting no error because number of system calls was less than predicted (%s)\n", type);
} }
if (!USE_PUTS && error_injected) { if (!DISALLOW_PUTS && error_injected) {
if (r == 0) { if (r == 0) {
printf("loader->close() returned 0 but should have failed due to injected error from %s on call %d\n", printf("loader->close() returned 0 but should have failed due to injected error from %s on call %d\n",
err_type_str(t), trigger); err_type_str(t), trigger);
...@@ -725,7 +734,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -725,7 +734,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
CKERR(r); CKERR(r);
if (!USE_PUTS) { if (!DISALLOW_PUTS) {
assert_inames_missing(old_inames); assert_inames_missing(old_inames);
} }
if ( CHECK_RESULTS ) { if ( CHECK_RESULTS ) {
...@@ -736,7 +745,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger) ...@@ -736,7 +745,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
else { else {
r = txn->abort(txn); r = txn->abort(txn);
CKERR(r); CKERR(r);
if (!USE_PUTS) { if (!DISALLOW_PUTS) {
assert_inames_missing(new_inames); assert_inames_missing(new_inames);
} }
} }
...@@ -962,7 +971,8 @@ static void usage(const char *cmd) { ...@@ -962,7 +971,8 @@ static void usage(const char *cmd) {
fprintf(stderr, "Usage: -h -c -s -p -d <num_dbs> -r <num_rows> -t <elow> <ehi> \n%s\n", cmd); fprintf(stderr, "Usage: -h -c -s -p -d <num_dbs> -r <num_rows> -t <elow> <ehi> \n%s\n", cmd);
fprintf(stderr, " where -h print this message.\n"); fprintf(stderr, " where -h print this message.\n");
fprintf(stderr, " -c check the results.\n"); fprintf(stderr, " -c check the results.\n");
fprintf(stderr, " -p LOADER_USE_PUTS.\n"); fprintf(stderr, " -p LOADER_DISALLOW_PUTS.\n");
fprintf(stderr, " -z LOADER_COMPRESS_INTERMEDIATES.\n");
fprintf(stderr, " -k Test only normal operation and abort_via_poll (but thoroughly).\n"); fprintf(stderr, " -k Test only normal operation and abort_via_poll (but thoroughly).\n");
fprintf(stderr, " -s size_factor=1.\n"); fprintf(stderr, " -s size_factor=1.\n");
fprintf(stderr, " -d <num_dbs> Number of indexes to create (default=%d).\n", default_NUM_DBS); fprintf(stderr, " -d <num_dbs> Number of indexes to create (default=%d).\n", default_NUM_DBS);
...@@ -998,8 +1008,10 @@ static void do_args(int argc, char * const argv[]) { ...@@ -998,8 +1008,10 @@ static void do_args(int argc, char * const argv[]) {
NUM_ROWS = atoi(argv[0]); NUM_ROWS = atoi(argv[0]);
} else if (strcmp(argv[0], "-c")==0) { } else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1; CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 0; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
printf("DISABLED Using puts as part of #4503\n"); printf("DISABLED Using puts as part of #4503\n");
} else if (strcmp(argv[0], "-k")==0) { } else if (strcmp(argv[0], "-k")==0) {
test_only_abort_via_poll = 1; test_only_abort_via_poll = 1;
...@@ -1010,7 +1022,7 @@ static void do_args(int argc, char * const argv[]) { ...@@ -1010,7 +1022,7 @@ static void do_args(int argc, char * const argv[]) {
argc--; argv++; argc--; argv++;
event_trigger_hi = atoi(argv[0]); event_trigger_hi = atoi(argv[0]);
} else if (strcmp(argv[0], "-s")==0) { } else if (strcmp(argv[0], "-s")==0) {
db_env_set_loader_size_factor(1); db_env_set_loader_size_factor(1);
} else if (strcmp(argv[0],"-e") == 0 && argc > 1) { } else if (strcmp(argv[0],"-e") == 0 && argc > 1) {
argc--; argv++; argc--; argv++;
envdir = argv[0]; envdir = argv[0];
......
...@@ -61,7 +61,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -61,7 +61,9 @@ static void do_args(int argc, char * const argv[]) {
verbose--; verbose--;
if (verbose<0) verbose=0; if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p") == 0) { } else if (strcmp(argv[0], "-p") == 0) {
loader_flags = LOADER_USE_PUTS; loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-z") == 0) {
loader_flags |= LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-e") == 0) { } else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++; argc--; argv++;
if (argc > 0) if (argc > 0)
......
...@@ -29,7 +29,7 @@ static void loader_open_abort(int ndb) { ...@@ -29,7 +29,7 @@ static void loader_open_abort(int ndb) {
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr); env->set_errfile(env, stderr);
DB *dbs[ndb]; DB *dbs[ndb];
...@@ -49,7 +49,7 @@ static void loader_open_abort(int ndb) { ...@@ -49,7 +49,7 @@ static void loader_open_abort(int ndb) {
DB_LOADER *loader; DB_LOADER *loader;
r = env->create_loader(env, txn, &loader, ndb > 0 ? dbs[0] : NULL, ndb, dbs, db_flags, dbt_flags, loader_flags); CKERR(r); r = env->create_loader(env, txn, &loader, ndb > 0 ? dbs[0] : NULL, ndb, dbs, db_flags, dbt_flags, loader_flags); CKERR(r);
r = loader->close(loader); CKERR(r); r = loader->close(loader); CKERR(r);
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
...@@ -77,7 +77,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -77,7 +77,9 @@ static void do_args(int argc, char * const argv[]) {
verbose--; verbose--;
if (verbose<0) verbose=0; if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p") == 0) { } else if (strcmp(argv[0], "-p") == 0) {
loader_flags = LOADER_USE_PUTS; loader_flags |= LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z") == 0) {
loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-e") == 0) { } else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++; argc--; argv++;
if (argc > 0) if (argc > 0)
......
...@@ -14,7 +14,8 @@ enum {MAX_DBS=256}; ...@@ -14,7 +14,8 @@ enum {MAX_DBS=256};
int NUM_DBS=5; int NUM_DBS=5;
int NUM_ROWS=100000; int NUM_ROWS=100000;
int CHECK_RESULTS=0; int CHECK_RESULTS=0;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAGIC=311}; enum {MAGIC=311};
bool dup_row_at_end = false; // false: duplicate at the begining. true: duplicate at the end. The duplicated row is row 0. bool dup_row_at_end = false; // false: duplicate at the begining. true: duplicate at the end. The duplicated row is row 0.
...@@ -156,14 +157,18 @@ static void check_results(DB **dbs) ...@@ -156,14 +157,18 @@ static void check_results(DB **dbs)
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0); r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r); CKERR(r);
for(int i=0;i<NUM_ROWS;i++) { for(int i=0;i<NUM_ROWS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r); if (DISALLOW_PUTS) {
k = *(unsigned int*)key.data; CKERR2(r, EINVAL);
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j); } else {
v = *(unsigned int*)val.data; CKERR(r);
// test that we have the expected keys and values k = *(unsigned int*)key.data;
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j)); pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j)); // printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
}
} }
{printf("."); fflush(stdout);} {printf("."); fflush(stdout);}
r = cursor->c_close(cursor); r = cursor->c_close(cursor);
...@@ -200,14 +205,14 @@ static void test_loader(DB **dbs) ...@@ -200,14 +205,14 @@ static void test_loader(DB **dbs)
DB_LOADER *loader; DB_LOADER *loader;
uint32_t db_flags[MAX_DBS]; uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS];
for(int i=0;i<MAX_DBS;i++) { for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r); CKERR(r);
...@@ -238,11 +243,9 @@ static void test_loader(DB **dbs) ...@@ -238,11 +243,9 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
if (USE_PUTS) { if (DISALLOW_PUTS) {
//PUT loader can return -1 if it finds an error during the puts. CKERR2(r, EINVAL);
CKERR2s(r, 0,-1); } else {
}
else {
CKERR(r); CKERR(r);
} }
if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
...@@ -351,7 +354,7 @@ int test_main(int argc, char * const *argv) { ...@@ -351,7 +354,7 @@ int test_main(int argc, char * const *argv) {
else { else {
int sizes[]={1,4000000,-1}; int sizes[]={1,4000000,-1};
//Make PUT loader take about the same amount of time: //Make PUT loader take about the same amount of time:
if (USE_PUTS) sizes[1] /= 25; if (DISALLOW_PUTS) sizes[1] /= 25;
for (int i=0; sizes[i]>=0; i++) { for (int i=0; sizes[i]>=0; i++) {
if (verbose) printf("Doing %d\n", sizes[i]); if (verbose) printf("Doing %d\n", sizes[i]);
NUM_ROWS = sizes[i]; NUM_ROWS = sizes[i];
...@@ -404,8 +407,10 @@ static void do_args(int argc, char * const argv[]) { ...@@ -404,8 +407,10 @@ static void do_args(int argc, char * const argv[]) {
num_rows_set = true; num_rows_set = true;
} else if (strcmp(argv[0], "-c")==0) { } else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1; CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-s")==0) { } else if (strcmp(argv[0], "-s")==0) {
db_env_set_loader_size_factor(1); db_env_set_loader_size_factor(1);
} else if (strcmp(argv[0], "-E")==0) { } else if (strcmp(argv[0], "-E")==0) {
......
...@@ -11,7 +11,8 @@ ...@@ -11,7 +11,8 @@
static const char *envdir = ENVDIR; static const char *envdir = ENVDIR;
DB_ENV *env; DB_ENV *env;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAX_NAME=128}; enum {MAX_NAME=128};
enum {NUM_DBS=1}; enum {NUM_DBS=1};
enum {NUM_KV_PAIRS=3}; enum {NUM_KV_PAIRS=3};
...@@ -50,10 +51,10 @@ static void test_loader(DB **dbs) ...@@ -50,10 +51,10 @@ static void test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r); CKERR(r);
...@@ -68,7 +69,11 @@ static void test_loader(DB **dbs) ...@@ -68,7 +69,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key)); dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val)); dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
} }
*/ */
// close the loader // close the loader
...@@ -88,10 +93,14 @@ static void test_loader(DB **dbs) ...@@ -88,10 +93,14 @@ static void test_loader(DB **dbs)
CKERR(r); CKERR(r);
for(int i=0;i<NUM_KV_PAIRS;i++) { for(int i=0;i<NUM_KV_PAIRS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); } if (DISALLOW_PUTS) {
CKERR(r); CKERR2(r, DB_NOTFOUND);
assert(*(int64_t*)key.data == kv_pairs[i].key); } else {
assert(*(int64_t*)val.data == kv_pairs[i].val); if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
}
} }
cursor->c_close(cursor); cursor->c_close(cursor);
} }
...@@ -178,8 +187,10 @@ static void do_args(int argc, char * const argv[]) { ...@@ -178,8 +187,10 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0],"-q")==0) { } else if (strcmp(argv[0],"-q")==0) {
verbose--; verbose--;
if (verbose<0) verbose=0; if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-e") == 0) { } else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++; argc--; argv++;
if (argc > 0) if (argc > 0)
......
...@@ -11,7 +11,8 @@ ...@@ -11,7 +11,8 @@
static const char *envdir = ENVDIR; static const char *envdir = ENVDIR;
DB_ENV *env; DB_ENV *env;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAX_NAME=128}; enum {MAX_NAME=128};
enum {NUM_DBS=1}; enum {NUM_DBS=1};
enum {NUM_KV_PAIRS=3}; enum {NUM_KV_PAIRS=3};
...@@ -47,14 +48,14 @@ static void test_loader(DB **dbs) ...@@ -47,14 +48,14 @@ static void test_loader(DB **dbs)
DB_LOADER *loader; DB_LOADER *loader;
uint32_t db_flags[NUM_DBS]; uint32_t db_flags[NUM_DBS];
uint32_t dbt_flags[NUM_DBS]; uint32_t dbt_flags[NUM_DBS];
for(int i=0;i<NUM_DBS;i++) { for(int i=0;i<NUM_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p or -c option
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r); CKERR(r);
...@@ -62,7 +63,7 @@ static void test_loader(DB **dbs) ...@@ -62,7 +63,7 @@ static void test_loader(DB **dbs)
CKERR(r); CKERR(r);
r = loader->set_poll_function(loader, NULL, NULL); r = loader->set_poll_function(loader, NULL, NULL);
CKERR(r); CKERR(r);
uint64_t before_puts = toku_test_get_latest_lsn(env); uint64_t before_puts = toku_test_get_latest_lsn(env);
// using loader->put, put values into DB // using loader->put, put values into DB
DBT key, val; DBT key, val;
...@@ -70,7 +71,11 @@ static void test_loader(DB **dbs) ...@@ -70,7 +71,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key)); dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val)); dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
} }
uint64_t after_puts = toku_test_get_latest_lsn(env); uint64_t after_puts = toku_test_get_latest_lsn(env);
assert(before_puts == after_puts); assert(before_puts == after_puts);
...@@ -90,11 +95,15 @@ static void test_loader(DB **dbs) ...@@ -90,11 +95,15 @@ static void test_loader(DB **dbs)
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0); r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r); CKERR(r);
for(int i=0;i<NUM_KV_PAIRS;i++) { for(int i=0;i<NUM_KV_PAIRS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); } if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
CKERR(r); if (DISALLOW_PUTS) {
assert(*(int64_t*)key.data == kv_pairs[i].key); CKERR2(r, DB_NOTFOUND);
assert(*(int64_t*)val.data == kv_pairs[i].val); } else {
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
}
} }
cursor->c_close(cursor); cursor->c_close(cursor);
} }
...@@ -185,7 +194,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -185,7 +194,9 @@ static void do_args(int argc, char * const argv[]) {
fprintf(stderr, "Usage:\n%s\n", cmd); fprintf(stderr, "Usage:\n%s\n", cmd);
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "--block_size") == 0) { } else if (strcmp(argv[0], "--block_size") == 0) {
argc--; argv++; argc--; argv++;
block_size = atoi(argv[0]); block_size = atoi(argv[0]);
......
...@@ -23,7 +23,8 @@ enum {MAX_DBS=1024}; ...@@ -23,7 +23,8 @@ enum {MAX_DBS=1024};
int NUM_DBS=1; int NUM_DBS=1;
int NUM_ROWS=1000000; int NUM_ROWS=1000000;
int CHECK_RESULTS=1; int CHECK_RESULTS=1;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
enum { old_default_cachesize=1024 }; // MB enum { old_default_cachesize=1024 }; // MB
int CACHESIZE=old_default_cachesize; int CACHESIZE=old_default_cachesize;
int ALLOW_DUPS=0; int ALLOW_DUPS=0;
...@@ -247,13 +248,18 @@ static void check_results(DB **dbs) { ...@@ -247,13 +248,18 @@ static void check_results(DB **dbs) {
// generate the expected keys // generate the expected keys
unsigned int *expected_key = (unsigned int *) toku_malloc(NUM_ROWS * sizeof (unsigned int)); unsigned int *expected_key = (unsigned int *) toku_malloc(NUM_ROWS * sizeof (unsigned int));
for (int i = 0; i < NUM_ROWS; i++) for (int i = 0; i < NUM_ROWS; i++) {
expected_key[i] = j == 0 ? (unsigned int)(i+1) : twiddle32(i+1, j); expected_key[i] = j == 0 ? (unsigned int)(i+1) : twiddle32(i+1, j);
}
// sort the keys // sort the keys
qsort(expected_key, NUM_ROWS, sizeof (unsigned int), uint_cmp); qsort(expected_key, NUM_ROWS, sizeof (unsigned int), uint_cmp);
for (int i = 0; i < NUM_ROWS+1; i++) { for (int i = 0; i < NUM_ROWS+1; i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
break;
}
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary
break; break;
...@@ -393,16 +399,16 @@ static void test_loader(DB **dbs) ...@@ -393,16 +399,16 @@ static void test_loader(DB **dbs)
uint32_t db_flags[MAX_DBS]; uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS];
uint32_t flags = DB_NOOVERWRITE; uint32_t flags = DB_NOOVERWRITE;
if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0; if ( (DISALLOW_PUTS != 0) && (ALLOW_DUPS == 1) ) flags = 0;
for(int i=0;i<MAX_DBS;i++) { for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = flags; db_flags[i] = flags;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS ? LOADER_USE_PUTS : 0; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
hiwater_start = hiwater; hiwater_start = hiwater;
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water); if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
...@@ -423,7 +429,11 @@ static void test_loader(DB **dbs) ...@@ -423,7 +429,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
} }
if ( verbose ) {printf("\n"); fflush(stdout);} if ( verbose ) {printf("\n"); fflush(stdout);}
...@@ -445,7 +455,7 @@ static void test_loader(DB **dbs) ...@@ -445,7 +455,7 @@ static void test_loader(DB **dbs)
CKERR2s(r,0,TOKUDB_CANCELED); CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) { if (r==0) {
if ( USE_PUTS == 0 ) { if ( DISALLOW_PUTS == 0 ) {
if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__); if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__);
assert(poll_count>0); assert(poll_count>0);
} }
...@@ -650,7 +660,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -650,7 +660,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-c")==0) { } else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1; CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-m")==0) { } else if (strcmp(argv[0], "-m")==0) {
argc--; argv++; argc--; argv++;
CACHESIZE = atoi(argv[0]); CACHESIZE = atoi(argv[0]);
......
...@@ -32,7 +32,8 @@ enum {MAX_DBS=1024}; ...@@ -32,7 +32,8 @@ enum {MAX_DBS=1024};
int NUM_DBS=5; int NUM_DBS=5;
int NUM_ROWS=100000; int NUM_ROWS=100000;
int CHECK_RESULTS=0; int CHECK_RESULTS=0;
int USE_PUTS=0; int DISALLOW_PUTS=0;
int COMPRESS=0;
enum { old_default_cachesize=1024 }; // MB enum { old_default_cachesize=1024 }; // MB
int CACHESIZE=old_default_cachesize; int CACHESIZE=old_default_cachesize;
int ALLOW_DUPS=0; int ALLOW_DUPS=0;
...@@ -267,6 +268,10 @@ static void check_results(DB **dbs) { ...@@ -267,6 +268,10 @@ static void check_results(DB **dbs) {
for (int i = 0; i < NUM_ROWS+1; i++) { for (int i = 0; i < NUM_ROWS+1; i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT); r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
break;
}
if (r == DB_NOTFOUND) { if (r == DB_NOTFOUND) {
assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary
break; break;
...@@ -357,16 +362,16 @@ static void test_loader(DB **dbs) ...@@ -357,16 +362,16 @@ static void test_loader(DB **dbs)
uint32_t db_flags[MAX_DBS]; uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS]; uint32_t dbt_flags[MAX_DBS];
uint32_t flags = DB_NOOVERWRITE; uint32_t flags = DB_NOOVERWRITE;
if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0; if ( (DISALLOW_PUTS) && (ALLOW_DUPS == 1) ) flags = 0;
for(int i=0;i<MAX_DBS;i++) { for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = flags; db_flags[i] = flags;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS ? LOADER_USE_PUTS : 0; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
hiwater_start = hiwater; hiwater_start = hiwater;
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water); if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
...@@ -387,7 +392,11 @@ static void test_loader(DB **dbs) ...@@ -387,7 +392,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int)); dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int)); dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} } if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
} }
if ( verbose ) {printf("\n"); fflush(stdout);} if ( verbose ) {printf("\n"); fflush(stdout);}
...@@ -409,7 +418,7 @@ static void test_loader(DB **dbs) ...@@ -409,7 +418,7 @@ static void test_loader(DB **dbs)
CKERR2s(r,0,TOKUDB_CANCELED); CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) { if (r==0) {
if ( USE_PUTS == 0 ) { if (!DISALLOW_PUTS) {
if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__); if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__);
assert(poll_count>0); assert(poll_count>0);
} }
...@@ -432,9 +441,15 @@ static void test_loader(DB **dbs) ...@@ -432,9 +441,15 @@ static void test_loader(DB **dbs)
if (verbose) if (verbose)
printf("NUM_ROWS=%d n_keys=%" PRIu64 " n_data=%" PRIu64 " dsize=%" PRIu64 " fsize=%" PRIu64 "\n", printf("NUM_ROWS=%d n_keys=%" PRIu64 " n_data=%" PRIu64 " dsize=%" PRIu64 " fsize=%" PRIu64 "\n",
NUM_ROWS, stats.bt_nkeys, stats.bt_ndata, stats.bt_dsize, stats.bt_fsize); NUM_ROWS, stats.bt_nkeys, stats.bt_ndata, stats.bt_dsize, stats.bt_fsize);
assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was == if (DISALLOW_PUTS) {
assert(stats.bt_ndata <= (uint64_t)NUM_ROWS); assert(stats.bt_nkeys == 0); // Fix as part of #4129. Was ==
assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int)); assert(stats.bt_ndata == 0);
assert(stats.bt_dsize == 0);
} else {
assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was ==
assert(stats.bt_ndata <= (uint64_t)NUM_ROWS);
assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int));
}
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
CKERR(r); CKERR(r);
} }
...@@ -633,7 +648,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -633,7 +648,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-c")==0) { } else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1; CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-m")==0) { } else if (strcmp(argv[0], "-m")==0) {
argc--; argv++; argc--; argv++;
CACHESIZE = atoi(argv[0]); CACHESIZE = atoi(argv[0]);
......
...@@ -13,7 +13,8 @@ enum {MAX_NAME=128}; ...@@ -13,7 +13,8 @@ enum {MAX_NAME=128};
enum {MAX_DBS=16}; enum {MAX_DBS=16};
enum {MAX_ROW_LEN=1024}; enum {MAX_ROW_LEN=1024};
static int NUM_DBS=10; static int NUM_DBS=10;
static int USE_PUTS=0; static int DISALLOW_PUTS=0;
static int COMPRESS=0;
static int USE_REGION=0; static int USE_REGION=0;
static const char *envdir = ENVDIR; static const char *envdir = ENVDIR;
...@@ -291,7 +292,7 @@ static int test_loader(DB **dbs) ...@@ -291,7 +292,7 @@ static int test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
FILE *fp; FILE *fp;
// select which table to loader // select which table to loader
...@@ -335,7 +336,11 @@ static int test_loader(DB **dbs) ...@@ -335,7 +336,11 @@ static int test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(int)); dbt_init(&key, &k, sizeof(int));
dbt_init(&val, v, strlen(v)+1); dbt_init(&val, v, strlen(v)+1);
r = loader->put(loader, &key, &val); r = loader->put(loader, &key, &val);
CKERR(r); if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} } if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
c = tpch_read_row(fp, &k, v); c = tpch_read_row(fp, &k, v);
} }
...@@ -350,7 +355,7 @@ static int test_loader(DB **dbs) ...@@ -350,7 +355,7 @@ static int test_loader(DB **dbs)
printf(" done\n"); printf(" done\n");
CKERR(r); CKERR(r);
if ( USE_PUTS == 0 ) assert(poll_count>0); if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
r = txn->commit(txn, 0); r = txn->commit(txn, 0);
CKERR(r); CKERR(r);
...@@ -442,7 +447,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -442,7 +447,9 @@ static void do_args(int argc, char * const argv[]) {
fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd); fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
exit(resultcode); exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1; DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-g")==0) { } else if (strcmp(argv[0], "-g")==0) {
USE_REGION = 1; USE_REGION = 1;
} else if (strcmp(argv[0], "-e") == 0) { } else if (strcmp(argv[0], "-e") == 0) {
......
...@@ -53,7 +53,7 @@ static bool do_test=false, do_recover=false; ...@@ -53,7 +53,7 @@ static bool do_test=false, do_recover=false;
static DB_ENV *env; static DB_ENV *env;
static int NUM_ROWS=50000000; static int NUM_ROWS=50000000;
static int USE_PUTS=0; static int COMPRESS=0;
enum {MAX_NAME=128}; enum {MAX_NAME=128};
enum {MAGIC=311}; enum {MAGIC=311};
...@@ -290,7 +290,7 @@ static void test_loader(DB **dbs) ...@@ -290,7 +290,7 @@ static void test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = USE_PUTS; // set with -p option uint32_t loader_flags = COMPRESS; // set with -p option
int n = count_temp(env->i->real_data_dir); int n = count_temp(env->i->real_data_dir);
assert(n == 0); // Must be no temp files before loader is run assert(n == 0); // Must be no temp files before loader is run
...@@ -308,7 +308,7 @@ static void test_loader(DB **dbs) ...@@ -308,7 +308,7 @@ static void test_loader(DB **dbs)
r = loader->set_poll_function(loader, poll_function, expect_poll_void); r = loader->set_poll_function(loader, poll_function, expect_poll_void);
CKERR(r); CKERR(r);
printf("USE_PUTS = %d\n", USE_PUTS); printf("COMPRESS = %d\n", COMPRESS);
if (verbose) printf("new inames:\n"); if (verbose) printf("new inames:\n");
get_inames(new_inames, dbs); get_inames(new_inames, dbs);
...@@ -461,9 +461,9 @@ static void do_args(int argc, char * const argv[]) { ...@@ -461,9 +461,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-r")==0) { } else if (strcmp(argv[0], "-r")==0) {
argc--; argv++; argc--; argv++;
NUM_ROWS = atoi(argv[0]); NUM_ROWS = atoi(argv[0]);
} else if (strcmp(argv[0], "-p")==0) { } else if (strcmp(argv[0], "-z")==0) {
USE_PUTS = LOADER_USE_PUTS; COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
printf("Using puts\n"); printf("Compressing\n");
} else if (strcmp(argv[0], "--test")==0) { } else if (strcmp(argv[0], "--test")==0) {
do_test=true; do_test=true;
} else if (strcmp(argv[0], "--recover") == 0) { } else if (strcmp(argv[0], "--recover") == 0) {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
// Verify that log-suppress recovery is done properly. (See ticket 2781.) // Verify that log-suppress recovery is done properly. (See ticket 2781.)
// TODO: determine if this is useful at all anymore (log suppression does not exist anymore)
#include <sys/stat.h> #include <sys/stat.h>
...@@ -97,7 +98,7 @@ load(DB **dbs) { ...@@ -97,7 +98,7 @@ load(DB **dbs) {
db_flags[i] = DB_NOOVERWRITE; db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0; dbt_flags[i] = 0;
} }
uint32_t loader_flags = LOADER_USE_PUTS; uint32_t loader_flags = LOADER_COMPRESS_INTERMEDIATES;
// create and initialize loader // create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
......
...@@ -21,7 +21,7 @@ static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT *dest_key, ...@@ -21,7 +21,7 @@ static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT *dest_key,
} }
static void static void
test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) { test_loader_abort (bool do_compress, bool abort_loader, bool abort_txn) {
DB_ENV * env; DB_ENV * env;
DB *db; DB *db;
DB_TXN *txn; DB_TXN *txn;
...@@ -35,7 +35,7 @@ test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) { ...@@ -35,7 +35,7 @@ test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) {
DB_LOADER *loader; DB_LOADER *loader;
uint32_t db_flags = 0; uint32_t db_flags = 0;
uint32_t dbt_flags = 0; uint32_t dbt_flags = 0;
uint32_t loader_flags = use_puts ? LOADER_USE_PUTS : 0; uint32_t loader_flags = do_compress ? LOADER_COMPRESS_INTERMEDIATES : 0;
DBC* cursor = NULL; DBC* cursor = NULL;
/* create the dup database file */ /* create the dup database file */
......
...@@ -881,7 +881,7 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v ...@@ -881,7 +881,7 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666); r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
DB_LOADER *loader; DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_USE_PUTS; uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r); CKERR(r);
......
...@@ -1453,7 +1453,7 @@ env_create_loader(DB_ENV *env, ...@@ -1453,7 +1453,7 @@ env_create_loader(DB_ENV *env,
uint32_t db_flags[/*N*/], uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/], uint32_t dbt_flags[/*N*/],
uint32_t loader_flags) { uint32_t loader_flags) {
int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags); int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
return r; return r;
} }
......
...@@ -219,26 +219,21 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) { ...@@ -219,26 +219,21 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) {
return toku_txn_xa_prepare(txn, &xid); return toku_txn_xa_prepare(txn, &xid);
} }
static int static int
toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
XMALLOC(*txn_stat); XMALLOC(*txn_stat);
return toku_logger_txn_rollback_raw_count(db_txn_struct_i(txn)->tokutxn, &(*txn_stat)->rollback_raw_count); return toku_logger_txn_rollback_stats(db_txn_struct_i(txn)->tokutxn, *txn_stat);
} }
static int static int
locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
int r = toku_txn_txn_stat(txn, txn_stat); int r = toku_txn_txn_stat(txn, txn_stat);
return r; return r;
} }
static int static int
locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags, locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
if (toku_txn_requires_checkpoint(ttxn)) {
CHECKPOINTER cp = toku_cachetable_get_checkpointer(txn->mgrp->i->cachetable);
toku_checkpoint(cp, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
}
bool holds_mo_lock = false; bool holds_mo_lock = false;
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) {
// A readonly transaction does no logging, and therefore does not // A readonly transaction does no logging, and therefore does not
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment