Commit 635ac358 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #6058, merge to main!

git-svn-id: file:///svn/toku/tokudb@54234 c7de825b-a66e-492c-adef-691d508d4ae1
parent 90585528
......@@ -249,6 +249,7 @@ static void print_defines (void) {
#endif
dodefine_from_track(txn_flags, DB_INHERIT_ISOLATION);
dodefine_from_track(txn_flags, DB_SERIALIZABLE);
dodefine_from_track(txn_flags, DB_TXN_READ_ONLY);
}
/* TOKUDB specific error codes*/
......
......@@ -3935,7 +3935,10 @@ static int
does_txn_read_entry(TXNID id, TOKUTXN context) {
int rval;
TXNID oldest_live_in_snapshot = toku_get_oldest_in_live_root_txn_list(context);
if (id < oldest_live_in_snapshot || id == context->txnid.parent_id64) {
if (oldest_live_in_snapshot == TXNID_NONE && id < context->snapshot_txnid64) {
rval = TOKUDB_ACCEPT;
}
else if (id < oldest_live_in_snapshot || id == context->txnid.parent_id64) {
rval = TOKUDB_ACCEPT;
}
else if (id > context->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(*context->live_root_txn_list, id)) {
......
......@@ -47,6 +47,7 @@ typedef struct txnid_pair_s {
#define TXNID_NONE_LIVING ((TXNID)0)
#define TXNID_NONE ((TXNID)0)
#define TXNID_MAX ((TXNID)-1)
static const TXNID_PAIR TXNID_PAIR_NONE = { .parent_id64 = TXNID_NONE, .child_id64 = TXNID_NONE };
......
......@@ -170,6 +170,7 @@ struct tokutxn {
TXNID oldest_referenced_xid;
bool begin_was_logged;
bool declared_read_only; // true if the txn was declared read only when began
// These are not read until a commit, prepare, or abort starts, and
// they're "monotonic" (only go false->true) during operation:
bool do_fsync;
......
......@@ -412,6 +412,7 @@ generate_log_writer (void) {
fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " //never null when not checkpoint.\n");
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " invariant(!txn_declared_read_only(txn));\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
fprintf(cf, " }\n");
break;
......@@ -419,6 +420,7 @@ generate_log_writer (void) {
case ASSERT_BEGIN_WAS_LOGGED: {
fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " invariant(!txn || txn->begin_was_logged);\n");
fprintf(cf, " invariant(!txn || !txn_declared_read_only(txn));\n");
break;
}
case IGNORE_LOG_BEGIN: break;
......
......@@ -480,7 +480,16 @@ recover_transaction(TOKUTXN *txnp, TXNID_PAIR xid, TXNID_PAIR parentxid, TOKULOG
toku_txnid2txn(logger, xid, &txn);
assert(txn==NULL);
}
r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL, true);
r = toku_txn_begin_with_xid(
parent,
&txn,
logger,
xid,
TXN_SNAPSHOT_NONE,
NULL,
true, // for_recovery
false // read_only
);
assert(r == 0);
// We only know about it because it was logged. Restore the log bit.
// Logging is 'off' but it will still set the bit.
......
......@@ -97,6 +97,7 @@ test_writer_priority_thread (void *arg) {
static void
test_writer_priority (void) {
struct rw_event rw_event, *rwe = &rw_event;
ZERO_STRUCT(rw_event);
int r;
rw_event_init(rwe);
......@@ -152,6 +153,7 @@ test_single_writer_thread (void *arg) {
static void
test_single_writer (void) {
struct rw_event rw_event, *rwe = &rw_event;
ZERO_STRUCT(rw_event);
int r;
rw_event_init(rwe);
......
......@@ -32,7 +32,7 @@ static void test_it (int N) {
r = toku_logger_open_rollback(logger, ct, true); CKERR(r);
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
......@@ -44,12 +44,12 @@ static void test_it (int N) {
unsigned int rands[N];
for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
char key[100],val[300];
DBT k, v;
rands[i] = random();
......@@ -67,12 +67,12 @@ static void test_it (int N) {
if (verbose) printf("i=%d\n", i);
}
for (int i=0; i<N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
char key[100];
DBT k;
snprintf(key, sizeof(key), "key%x.%x", rands[i], i);
......@@ -92,7 +92,7 @@ static void test_it (int N) {
if (verbose) printf("d=%d\n", i);
}
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT); CKERR(r);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_ROOT, false); CKERR(r);
r = toku_open_ft_handle(FILENAME, 0, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun); CKERR(r);
r = toku_txn_commit_txn(txn, false, NULL, NULL); CKERR(r);
toku_txn_close_txn(txn);
......
......@@ -50,7 +50,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -62,7 +62,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......@@ -120,7 +120,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -132,7 +132,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// del keys 0, 2, 4, ...
......@@ -145,7 +145,7 @@ test_provdel(const char *logdir, const char *fname, int n) {
}
TOKUTXN cursortxn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &cursortxn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &cursortxn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
LE_CURSOR cursor = NULL;
......
......@@ -54,7 +54,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -66,7 +66,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......
......@@ -51,7 +51,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
assert(error == 0);
TOKUTXN txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
FT_HANDLE brt = NULL;
......@@ -63,7 +63,7 @@ create_populate_tree(const char *logdir, const char *fname, int n) {
toku_txn_close_txn(txn);
txn = NULL;
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE);
error = toku_txn_begin_txn(NULL, NULL, &txn, logger, TXN_SNAPSHOT_NONE, false);
assert(error == 0);
// insert keys 0, 1, 2, .. (n-1)
......
......@@ -47,7 +47,8 @@ void txn_child_manager_unit_test::run_child_txn_test() {
NULL,
&root_txn,
logger,
TXN_SNAPSHOT_CHILD
TXN_SNAPSHOT_CHILD,
false
);
CKERR(r);
// test starting a child txn
......@@ -57,7 +58,8 @@ void txn_child_manager_unit_test::run_child_txn_test() {
root_txn,
&child_txn,
logger,
TXN_SNAPSHOT_CHILD
TXN_SNAPSHOT_CHILD,
false
);
CKERR(r);
......@@ -89,7 +91,8 @@ void txn_child_manager_unit_test::run_test() {
NULL,
&root_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
txn_child_manager* cm = root_txn->child_manager;
......@@ -108,7 +111,8 @@ void txn_child_manager_unit_test::run_test() {
root_txn,
&child_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(child_txn->child_manager == cm);
......@@ -128,7 +132,8 @@ void txn_child_manager_unit_test::run_test() {
child_txn,
&grandchild_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(grandchild_txn->child_manager == cm);
......@@ -153,7 +158,8 @@ void txn_child_manager_unit_test::run_test() {
child_txn,
&grandchild_txn,
logger,
TXN_SNAPSHOT_ROOT
TXN_SNAPSHOT_ROOT,
false
);
CKERR(r);
assert(grandchild_txn->child_manager == cm);
......@@ -177,7 +183,8 @@ void txn_child_manager_unit_test::run_test() {
xid,
TXN_SNAPSHOT_NONE,
NULL,
true // for recovery
true, // for recovery
false // read_only
);
assert(recovery_txn->child_manager == cm);
......
......@@ -15,7 +15,7 @@
static void do_txn(TOKULOGGER logger, bool readonly) {
int r;
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
if (!readonly) {
......@@ -37,7 +37,7 @@ static void test_xid_lsn_independent(int N) {
FT_HANDLE brt;
TOKUTXN txn;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
r = toku_open_ft_handle("ftfile", 1, &brt, 1024, 256, TOKU_DEFAULT_COMPRESSION_METHOD, ct, txn, toku_builtin_compare_fun);
......@@ -47,7 +47,7 @@ static void test_xid_lsn_independent(int N) {
CKERR(r);
toku_txn_close_txn(txn);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
TXNID xid_first = txn->txnid.parent_id64;
unsigned int rands[N];
......@@ -62,7 +62,7 @@ static void test_xid_lsn_independent(int N) {
}
{
TOKUTXN txn2;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn2, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn2, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
// Verify the txnid has gone up only by one (even though many log entries were done)
invariant(txn2->txnid.parent_id64 == xid_first + 1);
......@@ -77,7 +77,7 @@ static void test_xid_lsn_independent(int N) {
//TODO(yoni) #5067 will break this portion of the test. (End ids are also assigned, so it would increase by 4 instead of 2.)
// Verify the txnid has gone up only by two (even though many log entries were done)
TOKUTXN txn3;
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn3, logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, (TOKUTXN)0, &txn3, logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
invariant(txn3->txnid.parent_id64 == xid_first + 2);
r = toku_txn_commit_txn(txn3, false, NULL, NULL);
......@@ -173,7 +173,7 @@ static void test_xid_lsn_independent_parents(int N) {
ZERO_ARRAY(txns_hack);
for (int i = 0; i < N; i++) {
r = toku_txn_begin_txn((DB_TXN*)NULL, txns[i-1], &txns[i], logger, TXN_SNAPSHOT_NONE);
r = toku_txn_begin_txn((DB_TXN*)NULL, txns[i-1], &txns[i], logger, TXN_SNAPSHOT_NONE, false);
CKERR(r);
if (i < num_non_cascade) {
......
......@@ -37,6 +37,7 @@ txn_status_init(void) {
// Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler.
STATUS_INIT(TXN_BEGIN, PARCOUNT, "begin");
STATUS_INIT(TXN_READ_BEGIN, PARCOUNT, "begin read only");
STATUS_INIT(TXN_COMMIT, PARCOUNT, "successful commits");
STATUS_INIT(TXN_ABORT, PARCOUNT, "aborts");
txn_status.initialized = true;
......@@ -77,19 +78,52 @@ toku_txn_get_root_id(TOKUTXN txn)
return txn->txnid.parent_id64;
}
bool txn_declared_read_only(TOKUTXN txn) {
return txn->declared_read_only;
}
int
toku_txn_begin_txn (
DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn,
TOKUTXN *tokutxn,
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
)
{
int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_PAIR_NONE, snapshot_type, container_db_txn, false);
int r = toku_txn_begin_with_xid(
parent_tokutxn,
tokutxn,
logger,
TXNID_PAIR_NONE,
snapshot_type,
container_db_txn,
false, // for_recovery
read_only
);
return r;
}
static void
txn_create_xids(TOKUTXN txn, TOKUTXN parent) {
XIDS xids;
XIDS parent_xids;
if (parent == NULL) {
parent_xids = xids_get_root_xids();
} else {
parent_xids = parent->xids;
}
xids_create_unknown_child(parent_xids, &xids);
TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
xids_finalize_with_child(xids, finalized_xid);
txn->xids = xids;
}
// Allocate and initialize a txn
static void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, bool for_checkpoint, bool read_only);
int
toku_txn_begin_with_xid (
TOKUTXN parent,
......@@ -98,24 +132,22 @@ toku_txn_begin_with_xid (
TXNID_PAIR xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery
bool for_recovery,
bool read_only
)
{
int r = 0;
TOKUTXN txn;
XIDS xids;
// Do as much (safe) work as possible before serializing on the txn_manager lock.
XIDS parent_xids;
if (parent == NULL) {
parent_xids = xids_get_root_xids();
} else {
parent_xids = parent->xids;
// check for case where we are trying to
// create too many nested transactions
if (!read_only && parent && !xids_can_create_child(parent->xids)) {
r = EINVAL;
goto exit;
}
r = xids_create_unknown_child(parent_xids, &xids);
if (r != 0) {
return r;
if (read_only && parent) {
invariant(txn_declared_read_only(parent));
}
toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, xids, for_recovery);
toku_txn_create_txn(&txn, parent, logger, snapshot_type, container_db_txn, for_recovery, read_only);
// txnid64, snapshot_txnid64
// will be set in here.
if (for_recovery) {
......@@ -139,7 +171,8 @@ toku_txn_begin_with_xid (
toku_txn_manager_start_txn(
txn,
logger->txn_manager,
snapshot_type
snapshot_type,
read_only
);
}
else {
......@@ -152,10 +185,12 @@ toku_txn_begin_with_xid (
);
}
}
TXNID finalized_xid = (parent == NULL) ? txn->txnid.parent_id64 : txn->txnid.child_id64;
xids_finalize_with_child(txn->xids, finalized_xid);
if (!read_only) {
// this call will set txn->xids
txn_create_xids(txn, parent);
}
*txnp = txn;
exit:
return r;
}
......@@ -174,14 +209,14 @@ static void invalidate_xa_xid (TOKU_XA_XID *xid) {
xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
}
void toku_txn_create_txn (
static void toku_txn_create_txn (
TOKUTXN *tokutxn,
TOKUTXN parent_tokutxn,
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
XIDS xids,
bool for_recovery
bool for_recovery,
bool read_only
)
{
assert(logger->rollback_cachefile);
......@@ -216,9 +251,10 @@ static txn_child_manager tcm;
.child_manager = NULL,
.container_db_txn = container_db_txn,
.live_root_txn_list = nullptr,
.xids = xids,
.xids = NULL,
.oldest_referenced_xid = TXNID_NONE,
.begin_was_logged = false,
.declared_read_only = read_only,
.do_fsync = false,
.force_fsync_on_commit = false,
.do_fsync_lsn = ZERO_LSN,
......@@ -257,7 +293,12 @@ static txn_child_manager tcm;
*tokutxn = result;
if (read_only) {
STATUS_INC(TXN_READ_BEGIN, 1);
}
else {
STATUS_INC(TXN_BEGIN, 1);
}
}
void
......@@ -540,7 +581,9 @@ void toku_txn_complete_txn(TOKUTXN txn) {
void toku_txn_destroy_txn(TOKUTXN txn) {
txn->open_fts.destroy();
if (txn->xids) {
xids_destroy(&txn->xids);
}
toku_mutex_destroy(&txn->txn_lock);
toku_mutex_destroy(&txn->state_lock);
toku_cond_destroy(&txn->state_cond);
......@@ -557,10 +600,14 @@ void toku_txn_force_fsync_on_commit(TOKUTXN txn) {
}
TXNID toku_get_oldest_in_live_root_txn_list(TOKUTXN txn) {
invariant(txn->live_root_txn_list->size()>0);
TXNID xid;
if (txn->live_root_txn_list->size()>0) {
int r = txn->live_root_txn_list->fetch(0, &xid);
assert_zero(r);
}
else {
xid = TXNID_NONE;
}
return xid;
}
......
......@@ -29,13 +29,15 @@ void toku_txn_lock(TOKUTXN txn);
void toku_txn_unlock(TOKUTXN txn);
uint64_t toku_txn_get_root_id(TOKUTXN txn);
bool txn_declared_read_only(TOKUTXN txn);
int toku_txn_begin_txn (
DB_TXN *container_db_txn,
TOKUTXN parent_tokutxn,
TOKUTXN *tokutxn,
TOKULOGGER logger,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
);
DB_TXN * toku_txn_get_container_db_txn (TOKUTXN tokutxn);
......@@ -49,11 +51,10 @@ int toku_txn_begin_with_xid (
TXNID_PAIR xid,
TXN_SNAPSHOT_TYPE snapshot_type,
DB_TXN *container_db_txn,
bool for_recovery
bool for_recovery,
bool read_only
);
// Allocate and initialize a txn
void toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn, XIDS xids, bool for_checkpoint);
void toku_txn_update_xids_in_txn(TOKUTXN txn, TXNID xid);
int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
......@@ -94,6 +95,7 @@ void toku_txn_force_fsync_on_commit(TOKUTXN txn);
typedef enum {
TXN_BEGIN, // total number of transactions begun (does not include recovered txns)
TXN_READ_BEGIN, // total number of read only transactions begun (does not include recovered txns)
TXN_COMMIT, // successful commits
TXN_ABORT,
TXN_STATUS_NUM_ROWS
......
......@@ -192,9 +192,13 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
void toku_txn_manager_destroy(TXN_MANAGER txn_manager) {
toku_mutex_destroy(&txn_manager->txn_manager_lock);
invariant(txn_manager->live_root_txns.size() == 0);
txn_manager->live_root_txns.destroy();
invariant(txn_manager->live_root_ids.size() == 0);
txn_manager->live_root_ids.destroy();
invariant(txn_manager->snapshot_txnids.size() == 0);
txn_manager->snapshot_txnids.destroy();
invariant(txn_manager->referenced_xids.size() == 0);
txn_manager->referenced_xids.destroy();
toku_free(txn_manager);
}
......@@ -264,19 +268,33 @@ max_xid(TXNID a, TXNID b) {
}
static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) {
TXNID oldest_referenced_xid = TXNID_NONE_LIVING;
int r = txn_manager->live_root_ids.fetch(0, &oldest_referenced_xid);
TXNID oldest_referenced_xid = TXNID_MAX;
int r;
if (txn_manager->live_root_ids.size() > 0) {
r = txn_manager->live_root_ids.fetch(0, &oldest_referenced_xid);
// this function should only be called when we know there is at least
// one live transaction
invariant_zero(r);
}
struct referenced_xid_tuple* tuple;
if (txn_manager->referenced_xids.size() > 0) {
struct referenced_xid_tuple* tuple;
r = txn_manager->referenced_xids.fetch(0, &tuple);
if (r == 0 && tuple->begin_id < oldest_referenced_xid) {
oldest_referenced_xid = tuple->begin_id;
}
}
if (txn_manager->snapshot_txnids.size() > 0) {
TXNID id;
r = txn_manager->snapshot_txnids.fetch(0, &id);
if (r == 0 && id < oldest_referenced_xid) {
oldest_referenced_xid = id;
}
}
if (txn_manager->last_xid < oldest_referenced_xid) {
oldest_referenced_xid = txn_manager->last_xid;
}
paranoid_invariant(oldest_referenced_xid != TXNID_MAX);
return oldest_referenced_xid;
}
......@@ -492,7 +510,8 @@ void toku_txn_manager_start_txn_for_recovery(
void toku_txn_manager_start_txn(
TOKUTXN txn,
TXN_MANAGER txn_manager,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
)
{
int r;
......@@ -528,13 +547,15 @@ void toku_txn_manager_start_txn(
// is taken into account when the transaction is closed.
// add ancestor information, and maintain global live root txn list
xid = ++txn_manager->last_xid;
xid = ++txn_manager->last_xid; // we always need an ID, needed for lock tree
toku_txn_update_xids_in_txn(txn, xid);
if (!read_only) {
uint32_t idx = txn_manager->live_root_txns.size();
r = txn_manager->live_root_txns.insert_at(txn, idx);
invariant_zero(r);
r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
invariant_zero(r);
}
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager);
if (needs_snapshot) {
......@@ -548,6 +569,7 @@ void toku_txn_manager_start_txn(
verify_snapshot_system(txn_manager);
}
txn_manager_unlock(txn_manager);
return;
}
TXNID
......@@ -593,6 +615,7 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
);
}
if (!txn_declared_read_only(txn)) {
uint32_t idx;
//Remove txn from list of live root txns
TOKUTXN txnagain;
......@@ -626,6 +649,7 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
lazy_assert_zero(r);
}
}
}
if (garbage_collection_debug) {
verify_snapshot_system(txn_manager);
......@@ -638,6 +662,7 @@ void toku_txn_manager_finish_txn(TXN_MANAGER txn_manager, TOKUTXN txn) {
txn->live_root_txn_list->destroy();
toku_free(txn->live_root_txn_list);
}
return;
}
void toku_txn_manager_clone_state_for_gc(
......
......@@ -58,7 +58,8 @@ void toku_txn_manager_handle_snapshot_destroy_for_child_txn(
void toku_txn_manager_start_txn(
TOKUTXN txn,
TXN_MANAGER txn_manager,
TXN_SNAPSHOT_TYPE snapshot_type
TXN_SNAPSHOT_TYPE snapshot_type,
bool read_only
);
void toku_txn_manager_start_txn_for_recovery(
......
......@@ -62,6 +62,12 @@ xids_get_root_xids(void) {
return rval;
}
bool
xids_can_create_child(XIDS xids) {
invariant(xids->num_xids < MAX_TRANSACTION_RECORDS);
return (xids->num_xids + 1) != MAX_TRANSACTION_RECORDS;
}
int
xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) {
......@@ -70,17 +76,15 @@ xids_create_unknown_child(XIDS parent_xids, XIDS *xids_p) {
int rval;
invariant(parent_xids);
uint32_t num_child_xids = parent_xids->num_xids + 1;
invariant(num_child_xids > 0);
invariant(num_child_xids <= MAX_TRANSACTION_RECORDS);
if (num_child_xids == MAX_TRANSACTION_RECORDS) rval = EINVAL;
else {
// assumes that caller has verified that num_child_xids will
// be less than MAX_TRANSACTIN_RECORDS
invariant(num_child_xids < MAX_TRANSACTION_RECORDS);
size_t new_size = sizeof(*parent_xids) + num_child_xids*sizeof(parent_xids->ids[0]);
XIDS CAST_FROM_VOIDP(xids, toku_xmalloc(new_size));
// Clone everything (parent does not have the newest xid).
memcpy(xids, parent_xids, new_size - sizeof(xids->ids[0]));
*xids_p = xids;
rval = 0;
}
return rval;
}
......@@ -99,11 +103,13 @@ int
xids_create_child(XIDS parent_xids, // xids list for parent transaction
XIDS * xids_p, // xids list created
TXNID this_xid) { // xid of this transaction (new innermost)
int rval = xids_create_unknown_child(parent_xids, xids_p);
if (rval == 0) {
xids_finalize_with_child(*xids_p, this_xid);
bool can_create_child = xids_can_create_child(parent_xids);
if (!can_create_child) {
return EINVAL;
}
return rval;
xids_create_unknown_child(parent_xids, xids_p);
xids_finalize_with_child(*xids_p, this_xid);
return 0;
}
void
......
......@@ -28,6 +28,8 @@
//Retrieve an XIDS representing the root transaction.
XIDS xids_get_root_xids(void);
bool xids_can_create_child(XIDS xids);
void xids_cpy(XIDS target, XIDS source);
//Creates an XIDS representing this transaction.
......
......@@ -161,6 +161,7 @@ toku_indexer_create_indexer(DB_ENV *env,
{
int rval;
DB_INDEXER *indexer = 0; // set later when created
HANDLE_READ_ONLY_TXN(txn);
*indexerp = NULL;
......
......@@ -169,6 +169,7 @@ toku_loader_create_loader(DB_ENV *env,
uint32_t loader_flags,
bool check_empty) {
int rval;
HANDLE_READ_ONLY_TXN(txn);
*blp = NULL; // set later when created
......
......@@ -121,6 +121,8 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
perf_ptquery
perf_ptquery2
perf_rangequery
perf_read_txn
perf_read_txn_single_thread
perf_read_write
perf_txn_single_thread
perf_xmalloc_free
......@@ -248,6 +250,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
test_bulk_fetch
test_compression_methods
test_cmp_descriptor
test_cursor_with_read_txn
test_db_change_pagesize
test_db_change_xxx
test_cursor_delete_2119
......@@ -260,8 +263,10 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
test_logflush
test_multiple_checkpoints_block_commit
test_query
test_read_txn_invalid_ops
test_redirect_func
test_row_size_supported
test_simple_read_txn
test_stress0
test_stress1
test_stress2
......
......@@ -24,7 +24,7 @@
static int create_child_txn(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
DB_TXN* child_txn = NULL;
DB_ENV* env = arg->env;
int r = env->txn_begin(env, txn, &child_txn, arg->txn_type);
int r = env->txn_begin(env, txn, &child_txn, arg->txn_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
......
......@@ -376,6 +376,7 @@ stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
} else {
myargs[i].operation = iibench_rangequery_op;
myargs[i].operation_extra = &put_extra;
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
myargs[i].sleep_ms = 1000; // 1 second between range queries
}
}
......
......@@ -54,6 +54,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = ptquery_op;
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
......
......@@ -67,6 +67,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
thread_ids[i] = i % cli_args->num_DBs;
myargs[i].operation = ptquery_op2;
myargs[i].operation_extra = &thread_ids[i];
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
......
......@@ -23,6 +23,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = rangequery_op;
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: perf_nop.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of creating and destroying
// root read-only transactions that create snapshots
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].txn_flags |= DB_TXN_READ_ONLY;
myargs[i].operation = nop;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
args.single_txn = false;
args.num_elements = 0;
args.num_DBs = 0;
args.num_put_threads = 0;
args.num_update_threads = 0;
stress_test_main(&args);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: perf_txn_single_thread.cc 51911 2013-01-10 18:21:29Z zardosht $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure how fast a single thread can
// commit and create transactions when there exist N transactions.
DB_TXN** txns;
int num_txns;
static int commit_and_create_txn(
DB_TXN* UU(txn),
ARG arg,
void* UU(operation_extra),
void* UU(stats_extra)
)
{
int rand_txn_id = random() % num_txns;
int r = txns[rand_txn_id]->commit(txns[rand_txn_id], 0);
CKERR(r);
r = arg->env->txn_begin(arg->env, 0, &txns[rand_txn_id], arg->txn_flags | DB_TXN_READ_ONLY);
CKERR(r);
return 0;
}
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting running of stress\n");
num_txns = cli_args->txn_size;
XCALLOC_N(num_txns, txns);
for (int i = 0; i < num_txns; i++) {
int r = env->txn_begin(env, 0, &txns[i], DB_TXN_SNAPSHOT);
CKERR(r);
}
struct arg myarg;
arg_init(&myarg, dbp, env, cli_args);
myarg.operation = commit_and_create_txn;
run_workers(&myarg, 1, cli_args->num_seconds, false, cli_args);
for (int i = 0; i < num_txns; i++) {
int chk_r = txns[i]->commit(txns[i], 0);
CKERR(chk_r);
}
toku_free(txns);
num_txns = 0;
}
int
test_main(int argc, char *const argv[]) {
num_txns = 0;
txns = NULL;
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
args.single_txn = true;
// this test is all about transactions, make the DB small
args.num_elements = 1;
args.num_DBs= 1;
perf_test_main(&args);
return 0;
}
......@@ -31,7 +31,7 @@ static int commit_and_create_txn(
int rand_txn_id = random() % num_txns;
int r = txns[rand_txn_id]->commit(txns[rand_txn_id], 0);
CKERR(r);
r = arg->env->txn_begin(arg->env, 0, &txns[rand_txn_id], arg->txn_type);
r = arg->env->txn_begin(arg->env, 0, &txns[rand_txn_id], arg->txn_flags);
CKERR(r);
return 0;
}
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
int test_main(int argc, char * const argv[])
{
int r;
DB * db;
DB_ENV * env;
(void) argc;
(void) argv;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
r = db_create(&db, env, 0);
CKERR(r);
r = db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR(r);
DB_TXN* txn = NULL;
r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT);
CKERR(r);
int k = 1;
int v = 10;
DBT key, val;
r = db->put(
db,
txn,
dbt_init(&key, &k, sizeof k),
dbt_init(&val, &v, sizeof v),
0
);
CKERR(r);
k = 2;
v = 20;
r = db->put(
db,
txn,
dbt_init(&key, &k, sizeof k),
dbt_init(&val, &v, sizeof v),
0
);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY);
CKERR(r);
DBC* cursor = NULL;
r = db->cursor(db, txn, &cursor, 0);
CKERR(r);
DBT key1, val1;
memset(&key1, 0, sizeof key1);
memset(&val1, 0, sizeof val1);
r = cursor->c_get(cursor, &key1, &val1, DB_FIRST);
CKERR(r);
invariant(key1.size == sizeof(int));
invariant(*(int *)key1.data == 1);
invariant(val1.size == sizeof(int));
invariant(*(int *)val1.data == 10);
r = cursor->c_get(cursor, &key1, &val1, DB_NEXT);
CKERR(r);
invariant(key1.size == sizeof(int));
invariant(*(int *)key1.data == 2);
invariant(val1.size == sizeof(int));
invariant(*(int *)val1.data == 20);
r = cursor->c_close(cursor);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
// clean things up
r = db->close(db, 0);
CKERR(r);
r = env->close(env, 0);
CKERR(r);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
int test_main(int argc, char * const argv[])
{
int r;
DB * db;
DB_ENV * env;
(void) argc;
(void) argv;
const char *db_env_dir = TOKU_TEST_FILENAME;
char rm_cmd[strlen(db_env_dir) + strlen("rm -rf ") + 1];
snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", db_env_dir);
r = system(rm_cmd); { int chk_r = r; CKERR(chk_r); }
r = toku_os_mkdir(db_env_dir, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, db_env_dir, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
r = db_create(&db, env, 0);
CKERR(r);
r = db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR(r);
DB_TXN* txn1 = NULL;
DB_TXN* txn2 = NULL;
r = env->txn_begin(env, 0, &txn1, DB_TXN_READ_ONLY);
CKERR(r);
r = env->txn_begin(env, 0, &txn2, DB_TXN_READ_ONLY);
CKERR(r);
r=db->pre_acquire_table_lock(db, txn1); CKERR(r);
r=db->pre_acquire_table_lock(db, txn2); CKERR2(r, DB_LOCK_NOTGRANTED);
r = txn1->commit(txn1, 0);
CKERR(r);
r = txn2->commit(txn2, 0);
CKERR(r);
// clean things up
r = db->close(db, 0);
CKERR(r);
r = env->close(env, 0);
CKERR(r);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
static int update_fun(DB *UU(db),
const DBT *UU(key),
const DBT *UU(old_val), const DBT *UU(extra),
void (*set_val)(const DBT *new_val,
void *set_extra),
void *UU(set_extra))
{
abort();
assert(set_val != NULL);
return 0;
}
static int generate_row_for_put(
DB *UU(dest_db),
DB *UU(src_db),
DBT *UU(dest_key),
DBT *UU(dest_val),
const DBT *UU(src_key),
const DBT *UU(src_val)
)
{
abort();
return 0;
}
static int generate_row_for_del(
DB *UU(dest_db),
DB *UU(src_db),
DBT *UU(dest_key),
const DBT *UU(src_key),
const DBT *UU(src_val)
)
{
abort();
return 0;
}
static void test_invalid_ops(uint32_t iso_flags) {
int r;
DB * db;
DB_ENV * env;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->set_generate_row_callback_for_put(env,generate_row_for_put);
CKERR(r);
r = env->set_generate_row_callback_for_del(env,generate_row_for_del);
CKERR(r);
env->set_update(env, update_fun);
r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
r = db_create(&db, env, 0);
CKERR(r);
DB_TXN* txn = NULL;
r = env->txn_begin(env, 0, &txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = db->open(db, txn, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR2(r, EINVAL);
r = db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_CREATE, 0644);
CKERR(r);
int k = 1;
int v = 10;
DBT key, val;
dbt_init(&key, &k, sizeof k);
dbt_init(&val, &v, sizeof v);
uint32_t db_flags = 0;
uint32_t indexer_flags = 0;
DB_INDEXER* indexer;
r = env->create_indexer(
env,
txn,
&indexer,
db,
1,
&db,
&db_flags,
indexer_flags
);
CKERR2(r, EINVAL);
// test invalid operations of ydb_db.cc,
// db->open tested above
DB_LOADER* loader;
uint32_t put_flags = 0;
uint32_t dbt_flags = 0;
r = env->create_loader(env, txn, &loader, NULL, 1, &db, &put_flags, &dbt_flags, 0);
CKERR2(r, EINVAL);
r = db->change_descriptor(db, txn, &key, 0);
CKERR2(r, EINVAL);
//
// test invalid operations return EINVAL from ydb_write.cc
//
r = db->put(db, txn, &key, &val,0);
CKERR2(r, EINVAL);
r = db->del(db, txn, &key, DB_DELETE_ANY);
CKERR2(r, EINVAL);
r = db->update(db, txn, &key, &val, 0);
CKERR2(r, EINVAL);
r = db->update_broadcast(db, txn, &val, 0);
CKERR2(r, EINVAL);
r = env->put_multiple(env, NULL, txn, &key, &val, 1, &db, &key, &val, 0);
CKERR2(r, EINVAL);
r = env->del_multiple(env, NULL, txn, &key, &val, 1, &db, &key, 0);
CKERR2(r, EINVAL);
uint32_t flags;
r = env->update_multiple(
env, NULL, txn,
&key, &val,
&key, &val,
1, &db, &flags,
1, &key,
1, &val
);
CKERR2(r, EINVAL);
r = db->close(db, 0);
CKERR(r);
// test invalid operations of ydb.cc, dbrename and dbremove
r = env->dbremove(env, txn, "foo.db", NULL, 0);
CKERR2(r, EINVAL);
// test invalid operations of ydb.cc, dbrename and dbremove
r = env->dbrename(env, txn, "foo.db", NULL, "bar.db", 0);
CKERR2(r, EINVAL);
r = txn->commit(txn, 0);
CKERR(r);
// clean things up
r = env->close(env, 0);
CKERR(r);
}
int test_main(int argc, char * const argv[]) {
(void) argc;
(void) argv;
test_invalid_ops(0);
test_invalid_ops(DB_TXN_SNAPSHOT);
test_invalid_ops(DB_READ_COMMITTED);
test_invalid_ops(DB_READ_UNCOMMITTED);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_get_max_row_size.cc 45903 2012-07-19 13:06:39Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
static void test_read_txn_creation(DB_ENV* env, uint32_t iso_flags) {
int r;
DB_TXN* parent_txn = NULL;
DB_TXN* child_txn = NULL;
r = env->txn_begin(env, 0, &parent_txn, iso_flags);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR2(r, EINVAL);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = parent_txn->commit(parent_txn, 0);
CKERR(r);
r = env->txn_begin(env, 0, &parent_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags | DB_TXN_READ_ONLY);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = env->txn_begin(env, parent_txn, &child_txn, iso_flags);
CKERR(r);
r = child_txn->commit(child_txn, 0);
CKERR(r);
r = parent_txn->commit(parent_txn, 0);
CKERR(r);
}
int test_main(int argc, char * const argv[])
{
int r;
DB_ENV * env;
(void) argc;
(void) argv;
toku_os_recursive_delete(TOKU_TEST_FILENAME);
r = toku_os_mkdir(TOKU_TEST_FILENAME, 0755); { int chk_r = r; CKERR(chk_r); }
// set things up
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, 0755);
CKERR(r);
test_read_txn_creation(env, 0);
test_read_txn_creation(env, DB_TXN_SNAPSHOT);
test_read_txn_creation(env, DB_READ_COMMITTED);
test_read_txn_creation(env, DB_READ_UNCOMMITTED);
r = env->close(env, 0);
CKERR(r);
return 0;
}
......@@ -69,6 +69,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -76,6 +77,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -63,6 +63,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -70,6 +71,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -62,6 +62,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[1].prefetch = false;
myargs[1].operation_extra = &soe[1];
myargs[1].operation = scan_op;
myargs[1].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward fast scanner
soe[2].fast = true;
......@@ -69,6 +70,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[2].prefetch = false;
myargs[2].operation_extra = &soe[2];
myargs[2].operation = scan_op;
myargs[2].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the backward slow scanner
soe[3].fast = false;
......
......@@ -36,6 +36,7 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
soe[0].prefetch = false;
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op;
myargs[0].txn_flags = DB_TXN_SNAPSHOT | DB_TXN_READ_ONLY;
// make the forward slow scanner
soe[1].fast = false;
......
......@@ -135,7 +135,7 @@ struct arg {
// DB are in [0, num_elements)
// false otherwise
int sleep_ms; // number of milliseconds to sleep between operations
uint32_t txn_type; // isolation level for txn running operation
uint32_t txn_flags; // isolation level for txn running operation
operation_t operation; // function that is the operation to be run
void* operation_extra; // extra parameter passed to operation
enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking
......@@ -155,7 +155,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->bounded_element_range = true;
arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT;
arg->txn_flags = DB_TXN_SNAPSHOT;
arg->operation_extra = nullptr;
arg->do_prepare = false;
arg->prelock_updates = false;
......@@ -488,12 +488,12 @@ static void *worker(void *arg_v) {
printf("%lu starting %p\n", (unsigned long) intself, arg->operation);
}
if (arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
}
while (run_test) {
lock_worker_op(we);
if (!arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
}
r = arg->operation(txn, arg, arg->operation_extra, we->counters);
if (r==0 && !arg->cli->single_txn && arg->do_prepare) {
......@@ -2654,7 +2654,7 @@ UU() stress_recover(struct cli_args *args) {
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_flags);
CKERR(r);
struct scan_op_extra soe = {
.fast = true,
......
......@@ -209,6 +209,16 @@ env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
}
static inline bool
txn_is_read_only(DB_TXN* txn) {
if (txn && (db_txn_struct_i(txn)->flags & DB_TXN_READ_ONLY)) {
return true;
}
return false;
}
#define HANDLE_READ_ONLY_TXN(txn) if(txn_is_read_only(txn)) return EINVAL;
void env_panic(DB_ENV * env, int cause, const char * msg);
void env_note_db_opened(DB_ENV *env, DB *db);
void env_note_db_closed(DB_ENV *env, DB *db);
......
......@@ -1205,6 +1205,7 @@ static int
locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
int ret, r;
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
HANDLE_READ_ONLY_TXN(txn);
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
......@@ -1235,6 +1236,7 @@ static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char
static int
locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
DB_TXN *child_txn = NULL;
......@@ -2413,6 +2415,7 @@ env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, u
if (!env_opened(env) || flags != 0) {
return EINVAL;
}
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
// env_dbremove_subdb() converts (fname, dbname) to dname
return env_dbremove_subdb(env, txn, fname, dbname, flags);
......@@ -2519,6 +2522,7 @@ env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, co
if (!env_opened(env) || flags != 0) {
return EINVAL;
}
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
// env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
......
......@@ -210,6 +210,7 @@ static uint64_t nontransactional_open_id = 0;
static int
toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
HANDLE_PANICKED_DB(db);
HANDLE_READ_ONLY_TXN(txn);
if (dbname != NULL) {
return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
}
......@@ -347,6 +348,7 @@ void toku_db_lt_on_destroy_callback(toku::locktree *lt) {
int
toku_db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, int mode) {
//Set comparison functions if not yet set.
HANDLE_READ_ONLY_TXN(txn);
if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
db->i->key_compare_was_set = true;
......@@ -469,6 +471,7 @@ int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
static int
toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_READ_ONLY_TXN(txn);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
int r = 0;
TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
......@@ -695,6 +698,7 @@ autotxn_db_getf_set (DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK
static int
locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
//
......@@ -1024,6 +1028,7 @@ load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], const char * new
int
locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
int ret, r;
HANDLE_READ_ONLY_TXN(txn);
DB_TXN *child_txn = NULL;
int using_txns = env->i->open_flags & DB_INIT_TXN;
......
......@@ -329,6 +329,36 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
uint32_t txn_flags = 0;
txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks. RFP remove this?
// handle whether txn is declared as read only
bool parent_txn_declared_read_only =
stxn &&
(db_txn_struct_i(stxn)->flags & DB_TXN_READ_ONLY);
bool txn_declared_read_only = false;
if (flags & DB_TXN_READ_ONLY) {
txn_declared_read_only = true;
txn_flags |= DB_TXN_READ_ONLY;
flags &= ~(DB_TXN_READ_ONLY);
}
if (txn_declared_read_only && stxn &&
!parent_txn_declared_read_only
)
{
return toku_ydb_do_error(
env,
EINVAL,
"Current transaction set as read only, but parent transaction is not\n"
);
}
if (parent_txn_declared_read_only)
{
// don't require child transaction to also set transaction as read only
// if parent has already done so
txn_flags |= DB_TXN_READ_ONLY;
txn_declared_read_only = true;
}
TOKU_ISOLATION child_isolation = TOKU_ISO_SERIALIZABLE;
uint32_t iso_flags = flags & DB_ISOLATION_FLAGS;
if (!(iso_flags == 0 ||
......@@ -434,7 +464,8 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, uint32_t flags) {
TXNID_PAIR_NONE,
snapshot_type,
result,
false
false, // for_recovery
txn_declared_read_only // read_only
);
if (r != 0) {
toku_free(result);
......
......@@ -132,6 +132,7 @@ int
toku_db_del(DB *db, DB_TXN *txn, DBT *key, uint32_t flags, bool holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
uint32_t unchecked_flags = flags;
//DB_DELETE_ANY means delete regardless of whether it exists in the db.
......@@ -175,6 +176,7 @@ int
toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, uint32_t flags, bool holds_mo_lock) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -222,6 +224,7 @@ toku_db_update(DB *db, DB_TXN *txn,
uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -263,6 +266,7 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
uint32_t flags) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
HANDLE_READ_ONLY_TXN(txn);
int r = 0;
uint32_t lock_flags = get_prelocked_flags(flags);
......@@ -428,6 +432,7 @@ env_del_multiple(
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
HANDLE_READ_ONLY_TXN(txn);
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
......@@ -574,6 +579,7 @@ env_put_multiple_internal(
DB_INDEXER* indexer = NULL;
HANDLE_PANICKED_ENV(env);
HANDLE_READ_ONLY_TXN(txn);
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
......@@ -674,6 +680,7 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
HANDLE_PANICKED_ENV(env);
DB_INDEXER* indexer = NULL;
HANDLE_READ_ONLY_TXN(txn);
if (!txn) {
r = EINVAL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment