Commit d7812705 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

refs[t:4697] Merge xa to main. {{{svn merge -r41628:41661 ../tokudb.4697 }}} Refs #4697.

git-svn-id: file:///svn/toku/tokudb@41662 c7de825b-a66e-492c-adef-691d508d4ae1
parent 89f7051b
......@@ -359,6 +359,8 @@ void print_db_env_struct (void) {
"void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra))",
"int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec)",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*txn_xa_recover) (DB_ENV*, XID list[/*count*/], long count, /*out*/ long *retp, u_int32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ XID *, /*out*/ DB_TXN **)",
NULL};
sort_and_dump_fields("db_env", true, extra);
......@@ -463,6 +465,7 @@ static void print_db_txn_struct (void) {
"struct toku_list open_txns",
"int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, XID *)",
NULL};
sort_and_dump_fields("db_txn", false, extra);
}
......@@ -525,6 +528,13 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
#endif
dodefine(DB_GID_SIZE);
printf("typedef struct xid_t { /* This struct is intended to be binary compatible with the XID in the XA architecture. See source:/import/opengroup.org/C193.pdf */\n"
" long formatID; /* format identifier */\n"
" long gtrid_length; /* value from 1 through 64 */\n"
" long bqual_length; /* value from 1 through 64 */\n"
" char data[DB_GID_SIZE];\n"
"} XID;\n");
//Typedef toku_off_t
printf("#ifndef TOKU_OFF_T_DEFINED\n"
"#define TOKU_OFF_T_DEFINED\n"
......
......@@ -17,6 +17,12 @@ extern "C" {
#define DB_VERSION_PATCH 119
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.119"
#define DB_GID_SIZE 128
typedef struct xid_t { /* This struct is intended to be binary compatible with the XID in the XA architecture. See source:/import/opengroup.org/C193.pdf */
long formatID; /* format identifier */
long gtrid_length; /* value from 1 through 64 */
long bqual_length; /* value from 1 through 64 */
char data[DB_GID_SIZE];
} XID;
#ifndef TOKU_OFF_T_DEFINED
#define TOKU_OFF_T_DEFINED
typedef int64_t toku_off_t;
......@@ -234,6 +240,8 @@ struct __toku_db_env {
void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra));
int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec);
int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec);
int (*txn_xa_recover) (DB_ENV*, XID list[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
int (*get_txn_from_xid) (DB_ENV*, /*in*/ XID *, /*out*/ DB_TXN **);
void *app_private;
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
......@@ -360,6 +368,7 @@ struct __toku_db_txn {
struct toku_list open_txns;
int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*xa_prepare) (DB_TXN*, XID *);
DB_ENV *mgrp /*In TokuDB, mgrp is a DB_ENV not a DB_TXNMGR*/;
DB_TXN *parent;
void *api_internal;
......
......@@ -49,6 +49,7 @@ typedef u_int64_t TXNID;
typedef struct blocknum_s { int64_t b; } BLOCKNUM; // make a struct so that we will notice type problems.
typedef struct gid_s { uint8_t *gid; } GID; // the gid is of size [DB_GID_SIZE]
typedef XID *XIDP; // this is the type that's passed to the logger code (so that we don't have to copy all 152 bytes when only a subset are even valid.)
#define ROLLBACK_NONE ((BLOCKNUM){0})
static inline BLOCKNUM make_blocknum(int64_t b) { BLOCKNUM result={b}; return result; }
......
......@@ -3548,11 +3548,11 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
return 0;
}
case TOKUTXN_PREPARING: {
GID gid;
toku_txn_get_prepared_gid(txn, &gid);
XID xa_xid;
toku_txn_get_prepared_xa_xid(txn, &xa_xid);
int r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn),
gid,
&xa_xid,
txn->rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
......@@ -3562,7 +3562,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
txn->spilled_rollback_tail,
txn->current_rollback);
assert(r==0);
toku_free(gid.gid);
return 0;
}
case TOKUTXN_RETIRED:
......
......@@ -181,7 +181,7 @@ struct tokutxn {
TOKUTXN_STATE state;
LSN do_fsync_lsn;
BOOL do_fsync;
GID gid; // for prepared transactions
XID xa_xid; // for prepared transactions
struct toku_list prepared_txns_link; // list of prepared transactions
};
......@@ -228,8 +228,14 @@ static inline int toku_logsizeof_TXNID (TXNID txnid __attribute__((__unused__)))
return 8;
}
static inline int toku_logsizeof_GID (GID gid __attribute__((__unused__))) {
return DB_GID_SIZE;
static inline int toku_logsizeof_XIDP (XIDP xid) {
assert(0<=xid->gtrid_length && xid->gtrid_length<=64);
assert(0<=xid->bqual_length && xid->bqual_length<=64);
return xid->gtrid_length
+ xid->bqual_length
+ 4 // formatID
+ 1 // gtrid_length
+ 1; // bqual_length
}
static inline int toku_logsizeof_FILENUMS (FILENUMS fs) {
......
......@@ -33,6 +33,14 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
if (target->data==0) return errno;
return 0;
}
static inline void toku_free_TXNID(TXNID txnid __attribute__((__unused__))) {}
static inline void toku_free_u_int64_t(u_int64_t u __attribute__((__unused__))) {}
static inline void toku_free_u_int32_t(u_int32_t u __attribute__((__unused__))) {}
static inline void toku_free_u_int8_t(u_int8_t u __attribute__((__unused__))) {}
static inline void toku_free_FILENUM(FILENUM u __attribute__((__unused__))) {}
static inline void toku_free_BLOCKNUM(BLOCKNUM u __attribute__((__unused__))) {}
static inline void toku_free_BOOL(BOOL u __attribute__((__unused__))) {}
static inline void toku_free_XIDP(XIDP xidp) { toku_free(xidp); }
static inline void toku_free_BYTESTRING(BYTESTRING val) { toku_free(val.data); }
static inline void toku_free_FILENUMS(FILENUMS val) { toku_free(val.filenums); }
......
......@@ -117,7 +117,7 @@ const struct logtype logtypes[] = {
NULLFIELD}}, // record all transactions
// prepared txns need a gid
{"xstillopenprepared", 'p', FA{{"TXNID", "xid", 0},
{"GID", "gid", 0}, // prepared transactions need a gid, and have no parentxid.
{"XIDP", "xa_xid", 0}, // prepared transactions need a gid, and have no parentxid.
{"u_int64_t", "rollentry_raw_count", 0},
{"FILENUMS", "open_filenums", 0},
{"u_int8_t", "force_fsync_on_commit", 0},
......@@ -133,7 +133,7 @@ const struct logtype logtypes[] = {
// Records produced by transactions
{"xbegin", 'b', FA{{"TXNID", "parentxid", 0},NULLFIELD}},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"GID", "gid", 0},NULLFIELD}},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}},
{"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}},
//TODO: #2037 Add dname
{"fcreate", 'F', FA{{"TXNID", "xid", 0},
......@@ -479,24 +479,15 @@ generate_log_reader (void) {
fprintf(cf, " return 0;\n");
fprintf(cf, "}\n\n");
int free_count=0;
DO_LOGTYPES(lt, {
free_count=0;
fprintf(cf, "static void toku_log_free_log_entry_%s_resources (struct logtype_%s *data)", lt->name, lt->name);
fprintf(cf, " {\n");
DO_FIELDS(ft, lt, {
if ( strcmp(ft->type, "BYTESTRING") == 0 ) {
fprintf(cf, " toku_free_BYTESTRING(data->%s);\n", ft->name);
free_count++;
}
else if ( strcmp(ft->type, "FILENUMS") == 0 ) {
fprintf(cf, " toku_free_FILENUMS(data->%s);\n", ft->name);
free_count++;
}
});
if ( free_count == 0 ) fprintf(cf, " struct logtype_%s *dummy __attribute__ ((unused)) = data;\n", lt->name);
fprintf(cf, "}\n\n");
});
DO_LOGTYPES(lt, ({
fprintf(cf, "static void toku_log_free_log_entry_%s_resources (struct logtype_%s *data", lt->name, lt->name);
if (!lt->fields->type) fprintf(cf, " __attribute__((__unused__))");
fprintf(cf, ") {\n");
DO_FIELDS(ft, lt,
fprintf(cf, " toku_free_%s(data->%s);\n", ft->type, ft->name);
);
fprintf(cf, "}\n\n");
}));
fprintf2(cf, hf, "void toku_log_free_log_entry_resources (struct log_entry *le)");
fprintf(hf, ";\n");
fprintf(cf, " {\n");
......
......@@ -14,6 +14,22 @@ static int delete_logfile(TOKULOGGER logger, long long index, uint32_t version);
static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn);
static void release_output(TOKULOGGER logger, LSN fsynced_lsn);
static void toku_print_bytes (FILE *outf, u_int32_t len, char *data) {
fprintf(outf, "\"");
u_int32_t i;
for (i=0; i<len; i++) {
switch (data[i]) {
case '"': fprintf(outf, "\\\""); break;
case '\\': fprintf(outf, "\\\\"); break;
case '\n': fprintf(outf, "\\n"); break;
default:
if (isprint(data[i])) fprintf(outf, "%c", data[i]);
else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
}
}
fprintf(outf, "\"");
}
static BOOL is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) {
BOOL rval = TRUE;
uint64_t result;
......@@ -957,12 +973,31 @@ int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, u_int32_t
return toku_fread_u_int64_t (f, txnid, checksum, len);
}
int toku_fread_GID (FILE *f, GID *gid, struct x1764 *checksum, u_int32_t *len) {
gid->gid = toku_xmalloc(DB_GID_SIZE);
for (int i=0; i<DB_GID_SIZE; i++) {
int r = toku_fread_u_int8_t(f, &gid->gid[i], checksum, len);
int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, u_int32_t *len) {
// These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively.
XID *XMALLOC(xid);
{
u_int32_t formatID;
toku_fread_u_int32_t(f, &formatID, checksum, len);
xid->formatID = formatID;
}
{
u_int8_t gtrid_length;
toku_fread_u_int8_t (f, &gtrid_length, checksum, len);
xid->gtrid_length = gtrid_length;
}
{
u_int8_t bqual_length;
toku_fread_u_int8_t (f, &bqual_length, checksum, len);
xid->bqual_length = bqual_length;
}
for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) {
u_int8_t byte;
int r = toku_fread_u_int8_t(f, &byte, checksum, len);
xid->data[i] = byte;
if (r!=0) return r;
}
*xidp = xid;
return 0;
}
......@@ -1016,14 +1051,14 @@ int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1
return 0;
}
int toku_logprint_GID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) {
GID v;
int r = toku_fread_GID(inf, &v, checksum, len);
int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) {
XIDP vp;
int r = toku_fread_XIDP(inf, &vp, checksum, len);
if (r!=0) return r;
fprintf(outf, "%s=0x", fieldname);
for (int i=0; i<DB_GID_SIZE; i++) printf("%02x", v.gid[i]);
toku_free(v.gid);
v.gid=NULL;
fprintf(outf, "%s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length);
toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data);
fprintf(outf, "}");
toku_free(vp);
return 0;
}
......@@ -1067,19 +1102,9 @@ int toku_logprint_BOOL (FILE *outf, FILE *inf, const char *fieldname, struct x17
}
void toku_print_BYTESTRING (FILE *outf, u_int32_t len, char *data) {
fprintf(outf, "{len=%u data=\"", len);
u_int32_t i;
for (i=0; i<len; i++) {
switch (data[i]) {
case '"': fprintf(outf, "\\\""); break;
case '\\': fprintf(outf, "\\\\"); break;
case '\n': fprintf(outf, "\\n"); break;
default:
if (isprint(data[i])) fprintf(outf, "%c", data[i]);
else fprintf(outf, "\\%03o", (unsigned char)(data[i]));
}
}
fprintf(outf, "\"}");
fprintf(outf, "{len=%u data=", len);
toku_print_bytes(outf, len, data);
fprintf(outf, "}");
}
......
......@@ -71,13 +71,13 @@ int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, u_int32_t *le
int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *lsn, struct x1764 *checksum, u_int32_t *len);
int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, u_int32_t *len);
int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, u_int32_t *len);
int toku_fread_GID (FILE *f, GID *gid, struct x1764 *checksum, u_int32_t *len);
int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, u_int32_t *len);
int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, u_int32_t *len);
int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, u_int32_t *len);
int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_GID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_u_int8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format);
int toku_logprint_u_int32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format);
int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format);
......
......@@ -660,7 +660,7 @@ static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l
l->len,
renv);
if (r==0)
return toku_txn_prepare_txn(txn, l->gid);
return toku_txn_prepare_txn(txn, l->xa_xid);
else
return r;
}
......@@ -743,7 +743,7 @@ static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv)
assert(txn!=NULL);
// Save the transaction
r = toku_txn_prepare_txn(txn, l->gid);
r = toku_txn_prepare_txn(txn, l->xa_xid);
assert(r == 0);
return 0;
......
......@@ -8,7 +8,7 @@
#include "txn.h"
#include "checkpoint.h"
#include "ule.h"
#include <valgrind/helgrind.h>
BOOL garbage_collection_debug = FALSE;
......@@ -187,6 +187,11 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
return r;
}
static void invalidate_xa_xid (XID *xid) {
ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind
xid->formatID = -1; // According to the XA spec, -1 means "invalid data"
}
int
toku_txn_create_txn (
TOKUTXN *tokutxn,
......@@ -232,7 +237,7 @@ toku_txn_create_txn (
result->recovered_from_checkpoint = FALSE;
toku_list_init(&result->checkpoint_before_commit);
result->state = TOKUTXN_LIVE;
result->gid.gid = NULL;
invalidate_xa_xid(&result->xa_xid);
result->do_fsync = FALSE;
toku_txn_ignore_init(result); // 2954
......@@ -422,8 +427,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
{
if (txn->state==TOKUTXN_PREPARING) {
txn->state=TOKUTXN_LIVE;
toku_free(txn->gid.gid);
txn->gid.gid=NULL;
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_COMMITTING;
......@@ -473,8 +477,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
{
if (txn->state==TOKUTXN_PREPARING) {
txn->state=TOKUTXN_LIVE;
toku_free(txn->gid.gid);
txn->gid.gid=NULL;
invalidate_xa_xid(&txn->xa_xid);
toku_list_remove(&txn->prepared_txns_link);
}
txn->state = TOKUTXN_ABORTING;
......@@ -500,23 +503,51 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
return r;
}
int toku_txn_prepare_txn (TOKUTXN txn, GID gid) {
static void copy_xid (XID *dest, XID *source) {
ANNOTATE_NEW_MEMORY(dest, sizeof(*dest));
dest->formatID = source->formatID;
dest->gtrid_length = source->gtrid_length;
dest->bqual_length = source->bqual_length;
memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length);
}
int toku_txn_prepare_txn (TOKUTXN txn, XID *xa_xid) {
assert(txn->state==TOKUTXN_LIVE);
txn->state = TOKUTXN_PREPARING; // This state transition must be protected against begin_checkpoint. Right now it uses the ydb lock.
if (txn->parent) return 0; // nothing to do if there's a parent.
// Do we need to do an fsync?
txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0);
txn->gid.gid = toku_memdup(gid.gid, DB_GID_SIZE);
copy_xid(&txn->xa_xid, xa_xid);
// This list will go away with #4683, so we wn't need the ydb lock for this anymore.
toku_list_push(&txn->logger->prepared_txns, &txn->prepared_txns_link);
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, gid);
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
}
void toku_txn_get_prepared_xa_xid (TOKUTXN txn, XID *xid) {
copy_xid(xid, &txn->xa_xid);
}
void toku_txn_get_prepared_gid (TOKUTXN txn, GID *gidp) {
gidp->gid = toku_memdup(txn->gid.gid, DB_GID_SIZE);
int toku_logger_get_txn_from_xid (TOKULOGGER logger, XID *xid, DB_TXN **txnp) {
int num_live_txns = toku_omt_size(logger->live_txns);
for (int i = 0; i < num_live_txns; i++) {
OMTVALUE v;
{
int r = toku_omt_fetch(logger->live_txns, i, &v);
assert_zero(r);
}
TOKUTXN txn = v;
if (txn->xa_xid.formatID == xid->formatID
&& txn->xa_xid.gtrid_length == xid->gtrid_length
&& txn->xa_xid.bqual_length == xid->bqual_length
&& 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) {
*txnp = txn->container_db_txn;
return 0;
}
}
return DB_NOTFOUND;
}
int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
if (flags==DB_FIRST) {
// Anything in the returned list goes back on the prepared list.
while (!toku_list_empty(&logger->prepared_and_returned_txns)) {
......@@ -536,7 +567,7 @@ int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/],
TOKUTXN txn = toku_list_struct(h, struct tokutxn, prepared_txns_link);
assert(txn->container_db_txn);
preplist[i].txn = txn->container_db_txn;
memcpy(preplist[i].gid, txn->gid.gid, DB_GID_SIZE);
preplist[i].xid = txn->xa_xid;
} else {
break;
}
......@@ -587,7 +618,6 @@ void toku_txn_destroy_txn(TOKUTXN txn) {
if (txn->open_brts)
toku_omt_destroy(&txn->open_brts);
xids_destroy(&txn->xids);
if (txn->gid.gid) toku_free(txn->gid.gid);
toku_txn_ignore_free(txn); // 2954
toku_free(txn);
......
......@@ -54,11 +54,11 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra,
bool release_multi_operation_client_lock);
int toku_txn_prepare_txn (TOKUTXN txn, GID gid) __attribute__((warn_unused_result));
int toku_txn_prepare_txn (TOKUTXN txn, XID *xid) __attribute__((warn_unused_result));
// Effect: Do the internal work of preparing a transaction (does not log the prepare record).
void toku_txn_get_prepared_gid (TOKUTXN, GID *);
// Effect: Return a pointer to the GID. The value is allocated, so you must free it.
void toku_txn_get_prepared_xa_xid (TOKUTXN, XID *);
// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values.
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv);
......@@ -131,7 +131,12 @@ int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum);
TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn);
int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
struct tokulogger_preplist {
XID xid;
DB_TXN *txn;
};
int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags);
int toku_logger_get_txn_from_xid (TOKULOGGER logger, XID *xid, DB_TXN **txnp);
#if defined(__cplusplus) || defined(__cilkplusplus)
}
......
......@@ -7,6 +7,7 @@
#include "x1764.h"
#include "memory.h"
#include "toku_assert.h"
#include "db.h"
#include <errno.h>
#include <string.h>
......@@ -190,8 +191,11 @@ static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid);
}
static inline void wbuf_nocrc_GID (struct wbuf *w, GID gid) {
wbuf_nocrc_literal_bytes(w, gid.gid, DB_GID_SIZE);
static inline void wbuf_nocrc_XIDP (struct wbuf *w, XIDP xid) {
wbuf_nocrc_u_int32_t(w, xid->formatID);
wbuf_nocrc_u_int8_t(w, xid->gtrid_length);
wbuf_nocrc_u_int8_t(w, xid->bqual_length);
wbuf_nocrc_literal_bytes(w, xid->data, xid->gtrid_length+xid->bqual_length);
}
static inline void wbuf_nocrc_LSN (struct wbuf *w, LSN lsn) {
......
......@@ -270,6 +270,7 @@ BDB_DONTRUN_TESTS = \
shutdown-3344 \
stat64 stat64-create-modify-times stat64_flatten stat64-null-txn stat64-root-changes \
stress-gc \
test-xa-prepare \
test1324 \
test1426 \
test1572 \
......
......@@ -66,9 +66,9 @@ static void test1 (void) {
// Now we can look at env2 in the debugger to see if we managed to make it the same
DB_ENV *env;
setup_env(&env, ENVDIR);
{
DB_PREPLIST l[1];
long count=-1;
......
#include "test.h"
#include <sys/wait.h>
#define ENVDIR2 ENVDIR "2"
static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
system(cmd);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
static void setup_env (DB_ENV **envp, const char *envdir) {
CHK(db_env_create(envp, 0));
(*envp)->set_errfile(*envp, stderr);
#ifdef TOKUDB
CHK((*envp)->set_redzone(*envp, 0));
#endif
CHK((*envp)->open(*envp, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO));
}
const unsigned int myformatid = 0x74736554;
static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commit) {
DB *db;
DB_TXN *txn;
clean_env(envdir);
setup_env(envp, envdir);
CKERR(db_create(&db, *envp, 0));
CKERR(db->open(db, NULL, "foo.db", 0, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO));
CKERR((*envp)->txn_begin(*envp, 0, &txn, 0));
DBT key={.size=4, .data="foo"};
CKERR(db->put(db, txn, &key, &key, 0));
CHK(db->close(db, 0));
XID x = {.formatID = myformatid,
.gtrid_length = 8,
.bqual_length = 9};
for (int i=0; i<8+9; i++) x.data[i] = 42+i;
CKERR(txn->xa_prepare(txn, &x));
if (commit)
CKERR(txn->commit(txn, 0));
}
static void test1 (void) {
pid_t pid;
bool do_fork = true;
if (!do_fork || 0==(pid=fork())) {
DB_ENV *env;
setup_env_and_prepare(&env, ENVDIR, false);
{
XID l[1];
long count=-1;
CKERR(env->txn_xa_recover(env, l, 1, &count, DB_FIRST));
printf("%s:%d count=%ld\n", __FILE__, __LINE__, count);
assert(count==1);
assert(myformatid==l[0].formatID);
assert( 8==l[0].gtrid_length);
assert( 9==l[0].bqual_length);
for (int i=0; i<8+9; i++) {
assert(l[0].data[i]==42+i);
}
}
exit(0);
}
int status;
if (do_fork) {
pid_t pid2 = wait(&status);
assert(pid2==pid);
}
DB_ENV *env2;
setup_env_and_prepare(&env2, ENVDIR2, true);
// Now we can look at env2 in the debugger to see if we managed to make it the same
DB_ENV *env;
setup_env(&env, ENVDIR);
{
XID l[1];
long count=-1;
{
int r = env->txn_xa_recover(env, l, 1, &count, DB_FIRST);
printf("r=%d count=%ld\n", r, count);
}
assert(count==1);
assert(l[0].data[0]==42);
assert(myformatid==l[0].formatID);
assert( 8 ==l[0].gtrid_length);
assert( 9 ==l[0].bqual_length);
for (int i=0; i<8+9; i++) {
assert(l[0].data[i]==42+i);
}
{
DB_TXN *txn;
int r = env->get_txn_from_xid(env, &l[0], &txn);
assert(r==0);
CHK(txn->commit(txn, 0));
}
}
CHK(env2->close(env2, 0));
CHK(env ->close(env, 0));
}
int test_main (int argc, char *const argv[]) {
default_parse_args(argc, argv);
// first test: open an environment, a db, a txn, and do a prepare. Then do txn_prepare (without even closing the environment).
test1();
// second test: poen environment, a db, a txn, prepare, close the environment. Then reopen and do txn_prepare.
// third test: make sure there is an fsync on txn_prepare, but not on the following commit.
// Then close the environment Find out what BDB does when ask for the txn prepares.
// Other tests: read prepared txns, 1 at a time. Then close it and read them again.
return 0;
}
......@@ -1499,13 +1499,61 @@ locked_env_close(DB_ENV * env, u_int32_t flags) {
}
static int
toku_env_recover_txn (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
return toku_logger_recover_txn(env->i->logger, preplist, count, retp, flags);
toku_env_txn_xa_recover (DB_ENV *env, XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
struct tokulogger_preplist *MALLOC_N(count,preps);
int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
if (r==0) {
assert(*retp<=count);
for (int i=0; i<*retp; i++) {
xids[i] = preps[i].xid;
}
}
toku_free(preps);
return r;
}
static int
toku_env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
struct tokulogger_preplist *MALLOC_N(count,preps);
int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
if (r==0) {
assert(*retp<=count);
for (int i=0; i<*retp; i++) {
preplist[i].txn = preps[i].txn;
memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
}
}
toku_free(preps);
return r;
}
static int
locked_env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
toku_ydb_lock(); int r = toku_env_recover_txn(env, preplist, count, retp, flags); toku_ydb_unlock(); return r;
toku_ydb_lock();
int r = toku_env_txn_recover(env, preplist, count, retp, flags);
toku_ydb_unlock();
return r;
}
static int
locked_env_txn_xa_recover (DB_ENV *env, XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) {
toku_ydb_lock();
int r = toku_env_txn_xa_recover(env, xids, count, retp, flags);
toku_ydb_unlock();
return r;
}
static int
toku_env_get_txn_from_xid (DB_ENV *env, /*in*/XID *xid, /*out*/ DB_TXN **txnp) {
return toku_logger_get_txn_from_xid(env->i->logger, xid, txnp);
}
static int
locked_env_get_txn_from_xid (DB_ENV *env, /*in*/XID *xid, /*out*/ DB_TXN **txnp) {
toku_ydb_lock();
int r = toku_env_get_txn_from_xid(env, xid, txnp);
toku_ydb_unlock();
return r;
}
static int
......@@ -2372,6 +2420,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(open);
SENV(close);
SENV(txn_recover);
SENV(txn_xa_recover);
SENV(get_txn_from_xid);
SENV(log_flush);
//SENV(set_noticecall);
SENV(set_flags);
......
......@@ -8,6 +8,7 @@
#include "checkpoint.h"
#include "log_header.h"
#include "ydb_txn.h"
#include <valgrind/helgrind.h>
static int
toku_txn_release_locks(DB_TXN* txn) {
......@@ -216,7 +217,7 @@ toku_txn_abort_only(DB_TXN * txn,
}
static int
toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
toku_txn_xa_prepare (DB_TXN *txn, XID *xid) {
if (!txn) return EINVAL;
if (txn->parent) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp);
......@@ -232,8 +233,7 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
}
assert(!db_txn_struct_i(txn)->child);
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
GID gids = {gid};
int r = toku_txn_prepare_txn(ttxn, gids);
int r = toku_txn_prepare_txn(ttxn, xid);
TOKULOGGER logger = txn->mgrp->i->logger;
LSN do_fsync_lsn;
bool do_fsync;
......@@ -242,6 +242,17 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
return r;
}
static int
toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
XID xid;
ANNOTATE_NEW_MEMORY(&xid, sizeof(xid));
xid.formatID=0x756b6f54; // "Toku"
xid.gtrid_length=DB_GID_SIZE/2; // The maximum allowed gtrid length is 64. See the XA spec in source:/import/opengroup.org/C193.pdf page 20.
xid.bqual_length=DB_GID_SIZE/2; // The maximum allowed bqual length is 64.
memcpy(xid.data, gid, DB_GID_SIZE);
return toku_txn_xa_prepare(txn, &xid);
}
int
toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
......@@ -324,6 +335,11 @@ locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) {
toku_ydb_lock(); int r = toku_txn_prepare (txn, gid); toku_ydb_unlock(); return r;
}
static int
locked_txn_xa_prepare (DB_TXN *txn, XID *xid) {
toku_ydb_lock(); int r = toku_txn_xa_prepare (txn, xid); toku_ydb_unlock(); return r;
}
int
toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) {
HANDLE_PANICKED_ENV(env);
......@@ -420,6 +436,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool
STXN(commit_with_progress);
STXN(id);
STXN(prepare);
STXN(xa_prepare);
STXN(txn_stat);
#undef STXN
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment