diff --git a/buildheader/make_tdb.c b/buildheader/make_tdb.c index bb26ad747a06c765d3bd526681d63218c46d5d26..0b6b7d97ea3ecfadd413987f65b9ea583a855293 100644 --- a/buildheader/make_tdb.c +++ b/buildheader/make_tdb.c @@ -359,6 +359,8 @@ void print_db_env_struct (void) { "void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra))", "int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec)", "int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)", + "int (*txn_xa_recover) (DB_ENV*, XID list[/*count*/], long count, /*out*/ long *retp, u_int32_t flags)", + "int (*get_txn_from_xid) (DB_ENV*, /*in*/ XID *, /*out*/ DB_TXN **)", NULL}; sort_and_dump_fields("db_env", true, extra); @@ -463,6 +465,7 @@ static void print_db_txn_struct (void) { "struct toku_list open_txns", "int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)", + "int (*xa_prepare) (DB_TXN*, XID *)", NULL}; sort_and_dump_fields("db_txn", false, extra); } @@ -525,6 +528,13 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { #endif dodefine(DB_GID_SIZE); + printf("typedef struct xid_t { /* This struct is intended to be binary compatible with the XID in the XA architecture. See source:/import/opengroup.org/C193.pdf */\n" + " long formatID; /* format identifier */\n" + " long gtrid_length; /* value from 1 through 64 */\n" + " long bqual_length; /* value from 1 through 64 */\n" + " char data[DB_GID_SIZE];\n" + "} XID;\n"); + //Typedef toku_off_t printf("#ifndef TOKU_OFF_T_DEFINED\n" "#define TOKU_OFF_T_DEFINED\n" diff --git a/include/db.h b/include/db.h index 6fcec6e13f01ad9e424c413161a9684a6db49afc..942e009cff6ffefefbe4743906d775f4eacd0cd6 100644 --- a/include/db.h +++ b/include/db.h @@ -17,6 +17,12 @@ extern "C" { #define DB_VERSION_PATCH 119 #define DB_VERSION_STRING "Tokutek: TokuDB 4.6.119" #define DB_GID_SIZE 128 +typedef struct xid_t { /* This struct is intended to be binary compatible with the XID in the XA architecture. See source:/import/opengroup.org/C193.pdf */ + long formatID; /* format identifier */ + long gtrid_length; /* value from 1 through 64 */ + long bqual_length; /* value from 1 through 64 */ + char data[DB_GID_SIZE]; +} XID; #ifndef TOKU_OFF_T_DEFINED #define TOKU_OFF_T_DEFINED typedef int64_t toku_off_t; @@ -234,6 +240,8 @@ struct __toku_db_env { void (*set_update) (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)); int (*set_lock_timeout) (DB_ENV *env, uint64_t lock_wait_time_msec); int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec); + int (*txn_xa_recover) (DB_ENV*, XID list[/*count*/], long count, /*out*/ long *retp, u_int32_t flags); + int (*get_txn_from_xid) (DB_ENV*, /*in*/ XID *, /*out*/ DB_TXN **); void *app_private; void *api1_internal; int (*close) (DB_ENV *, u_int32_t); @@ -360,6 +368,7 @@ struct __toku_db_txn { struct toku_list open_txns; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*); int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*); + int (*xa_prepare) (DB_TXN*, XID *); DB_ENV *mgrp /*In TokuDB, mgrp is a DB_ENV not a DB_TXNMGR*/; DB_TXN *parent; void *api_internal; diff --git a/newbrt/brttypes.h b/newbrt/brttypes.h index 4b03e3e57dc42dd954d2cb037fd69c9c9593a51c..2d042db6cfe9e3a5571477fed5e70939be25537e 100644 --- a/newbrt/brttypes.h +++ b/newbrt/brttypes.h @@ -49,6 +49,7 @@ typedef u_int64_t TXNID; typedef struct blocknum_s { int64_t b; } BLOCKNUM; // make a struct so that we will notice type problems. typedef struct gid_s { uint8_t *gid; } GID; // the gid is of size [DB_GID_SIZE] +typedef XID *XIDP; // this is the type that's passed to the logger code (so that we don't have to copy all 152 bytes when only a subset are even valid.) #define ROLLBACK_NONE ((BLOCKNUM){0}) static inline BLOCKNUM make_blocknum(int64_t b) { BLOCKNUM result={b}; return result; } diff --git a/newbrt/cachetable.c b/newbrt/cachetable.c index 8699c8357c0a3a8ba73a86a7a14c052186daa6d3..c98923eb2f05bb39473f2f37a8c25eebde88b1e3 100644 --- a/newbrt/cachetable.c +++ b/newbrt/cachetable.c @@ -3548,11 +3548,11 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) { return 0; } case TOKUTXN_PREPARING: { - GID gid; - toku_txn_get_prepared_gid(txn, &gid); + XID xa_xid; + toku_txn_get_prepared_xa_xid(txn, &xa_xid); int r = toku_log_xstillopenprepared(logger, NULL, 0, toku_txn_get_txnid(txn), - gid, + &xa_xid, txn->rollentry_raw_count, open_filenums, txn->force_fsync_on_commit, @@ -3562,7 +3562,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) { txn->spilled_rollback_tail, txn->current_rollback); assert(r==0); - toku_free(gid.gid); return 0; } case TOKUTXN_RETIRED: diff --git a/newbrt/log-internal.h b/newbrt/log-internal.h index 082903c6b46eb3ca25e6cb536a7be4e16c2da9ae..71c61c99f4dbc91f6f640ccdf32341a2809bbaea 100644 --- a/newbrt/log-internal.h +++ b/newbrt/log-internal.h @@ -181,7 +181,7 @@ struct tokutxn { TOKUTXN_STATE state; LSN do_fsync_lsn; BOOL do_fsync; - GID gid; // for prepared transactions + XID xa_xid; // for prepared transactions struct toku_list prepared_txns_link; // list of prepared transactions }; @@ -228,8 +228,14 @@ static inline int toku_logsizeof_TXNID (TXNID txnid __attribute__((__unused__))) return 8; } -static inline int toku_logsizeof_GID (GID gid __attribute__((__unused__))) { - return DB_GID_SIZE; +static inline int toku_logsizeof_XIDP (XIDP xid) { + assert(0<=xid->gtrid_length && xid->gtrid_length<=64); + assert(0<=xid->bqual_length && xid->bqual_length<=64); + return xid->gtrid_length + + xid->bqual_length + + 4 // formatID + + 1 // gtrid_length + + 1; // bqual_length } static inline int toku_logsizeof_FILENUMS (FILENUMS fs) { diff --git a/newbrt/log.h b/newbrt/log.h index 04979c97bba7f9004e69dbe8e63b1c8711bd4750..3754a7641d45c8d4ba6ea3183a9269dbb29261ab 100644 --- a/newbrt/log.h +++ b/newbrt/log.h @@ -33,6 +33,14 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) { if (target->data==0) return errno; return 0; } +static inline void toku_free_TXNID(TXNID txnid __attribute__((__unused__))) {} +static inline void toku_free_u_int64_t(u_int64_t u __attribute__((__unused__))) {} +static inline void toku_free_u_int32_t(u_int32_t u __attribute__((__unused__))) {} +static inline void toku_free_u_int8_t(u_int8_t u __attribute__((__unused__))) {} +static inline void toku_free_FILENUM(FILENUM u __attribute__((__unused__))) {} +static inline void toku_free_BLOCKNUM(BLOCKNUM u __attribute__((__unused__))) {} +static inline void toku_free_BOOL(BOOL u __attribute__((__unused__))) {} +static inline void toku_free_XIDP(XIDP xidp) { toku_free(xidp); } static inline void toku_free_BYTESTRING(BYTESTRING val) { toku_free(val.data); } static inline void toku_free_FILENUMS(FILENUMS val) { toku_free(val.filenums); } diff --git a/newbrt/logformat.c b/newbrt/logformat.c index df598c7fb094f37b2eabbcff19e09918c9ef5242..a2e2e0cb5d066ff472bb0a85561508197f24d1a6 100644 --- a/newbrt/logformat.c +++ b/newbrt/logformat.c @@ -117,7 +117,7 @@ const struct logtype logtypes[] = { NULLFIELD}}, // record all transactions // prepared txns need a gid {"xstillopenprepared", 'p', FA{{"TXNID", "xid", 0}, - {"GID", "gid", 0}, // prepared transactions need a gid, and have no parentxid. + {"XIDP", "xa_xid", 0}, // prepared transactions need a gid, and have no parentxid. {"u_int64_t", "rollentry_raw_count", 0}, {"FILENUMS", "open_filenums", 0}, {"u_int8_t", "force_fsync_on_commit", 0}, @@ -133,7 +133,7 @@ const struct logtype logtypes[] = { // Records produced by transactions {"xbegin", 'b', FA{{"TXNID", "parentxid", 0},NULLFIELD}}, {"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}}, - {"xprepare",'P', FA{{"TXNID", "xid", 0}, {"GID", "gid", 0},NULLFIELD}}, + {"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}}, {"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}}, //TODO: #2037 Add dname {"fcreate", 'F', FA{{"TXNID", "xid", 0}, @@ -479,24 +479,15 @@ generate_log_reader (void) { fprintf(cf, " return 0;\n"); fprintf(cf, "}\n\n"); - int free_count=0; - DO_LOGTYPES(lt, { - free_count=0; - fprintf(cf, "static void toku_log_free_log_entry_%s_resources (struct logtype_%s *data)", lt->name, lt->name); - fprintf(cf, " {\n"); - DO_FIELDS(ft, lt, { - if ( strcmp(ft->type, "BYTESTRING") == 0 ) { - fprintf(cf, " toku_free_BYTESTRING(data->%s);\n", ft->name); - free_count++; - } - else if ( strcmp(ft->type, "FILENUMS") == 0 ) { - fprintf(cf, " toku_free_FILENUMS(data->%s);\n", ft->name); - free_count++; - } - }); - if ( free_count == 0 ) fprintf(cf, " struct logtype_%s *dummy __attribute__ ((unused)) = data;\n", lt->name); - fprintf(cf, "}\n\n"); - }); + DO_LOGTYPES(lt, ({ + fprintf(cf, "static void toku_log_free_log_entry_%s_resources (struct logtype_%s *data", lt->name, lt->name); + if (!lt->fields->type) fprintf(cf, " __attribute__((__unused__))"); + fprintf(cf, ") {\n"); + DO_FIELDS(ft, lt, + fprintf(cf, " toku_free_%s(data->%s);\n", ft->type, ft->name); + ); + fprintf(cf, "}\n\n"); + })); fprintf2(cf, hf, "void toku_log_free_log_entry_resources (struct log_entry *le)"); fprintf(hf, ";\n"); fprintf(cf, " {\n"); diff --git a/newbrt/logger.c b/newbrt/logger.c index 24ad3ff0c41f28dc7464a174701c73d38704c9e5..c4e1890bb24d790f221d5f242ff8e6858a826644 100644 --- a/newbrt/logger.c +++ b/newbrt/logger.c @@ -14,6 +14,22 @@ static int delete_logfile(TOKULOGGER logger, long long index, uint32_t version); static void grab_output(TOKULOGGER logger, LSN *fsynced_lsn); static void release_output(TOKULOGGER logger, LSN fsynced_lsn); +static void toku_print_bytes (FILE *outf, u_int32_t len, char *data) { + fprintf(outf, "\""); + u_int32_t i; + for (i=0; i<len; i++) { + switch (data[i]) { + case '"': fprintf(outf, "\\\""); break; + case '\\': fprintf(outf, "\\\\"); break; + case '\n': fprintf(outf, "\\n"); break; + default: + if (isprint(data[i])) fprintf(outf, "%c", data[i]); + else fprintf(outf, "\\%03o", (unsigned char)(data[i])); + } + } + fprintf(outf, "\""); +} + static BOOL is_a_logfile_any_version (const char *name, uint64_t *number_result, uint32_t *version_of_log) { BOOL rval = TRUE; uint64_t result; @@ -957,12 +973,31 @@ int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, u_int32_t return toku_fread_u_int64_t (f, txnid, checksum, len); } -int toku_fread_GID (FILE *f, GID *gid, struct x1764 *checksum, u_int32_t *len) { - gid->gid = toku_xmalloc(DB_GID_SIZE); - for (int i=0; i<DB_GID_SIZE; i++) { - int r = toku_fread_u_int8_t(f, &gid->gid[i], checksum, len); +int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, u_int32_t *len) { + // These reads are verbose because XA defined the fields as "long", but we use 4 bytes, 1 byte and 1 byte respectively. + XID *XMALLOC(xid); + { + u_int32_t formatID; + toku_fread_u_int32_t(f, &formatID, checksum, len); + xid->formatID = formatID; + } + { + u_int8_t gtrid_length; + toku_fread_u_int8_t (f, >rid_length, checksum, len); + xid->gtrid_length = gtrid_length; + } + { + u_int8_t bqual_length; + toku_fread_u_int8_t (f, &bqual_length, checksum, len); + xid->bqual_length = bqual_length; + } + for (int i=0; i< xid->gtrid_length + xid->bqual_length; i++) { + u_int8_t byte; + int r = toku_fread_u_int8_t(f, &byte, checksum, len); + xid->data[i] = byte; if (r!=0) return r; } + *xidp = xid; return 0; } @@ -1016,14 +1051,14 @@ int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1 return 0; } -int toku_logprint_GID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) { - GID v; - int r = toku_fread_GID(inf, &v, checksum, len); +int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) { + XIDP vp; + int r = toku_fread_XIDP(inf, &vp, checksum, len); if (r!=0) return r; - fprintf(outf, "%s=0x", fieldname); - for (int i=0; i<DB_GID_SIZE; i++) printf("%02x", v.gid[i]); - toku_free(v.gid); - v.gid=NULL; + fprintf(outf, "%s={formatID=0x%lx gtrid_length=%ld bqual_length=%ld data=", fieldname, vp->formatID, vp->gtrid_length, vp->bqual_length); + toku_print_bytes(outf, vp->gtrid_length + vp->bqual_length, vp->data); + fprintf(outf, "}"); + toku_free(vp); return 0; } @@ -1067,19 +1102,9 @@ int toku_logprint_BOOL (FILE *outf, FILE *inf, const char *fieldname, struct x17 } void toku_print_BYTESTRING (FILE *outf, u_int32_t len, char *data) { - fprintf(outf, "{len=%u data=\"", len); - u_int32_t i; - for (i=0; i<len; i++) { - switch (data[i]) { - case '"': fprintf(outf, "\\\""); break; - case '\\': fprintf(outf, "\\\\"); break; - case '\n': fprintf(outf, "\\n"); break; - default: - if (isprint(data[i])) fprintf(outf, "%c", data[i]); - else fprintf(outf, "\\%03o", (unsigned char)(data[i])); - } - } - fprintf(outf, "\"}"); + fprintf(outf, "{len=%u data=", len); + toku_print_bytes(outf, len, data); + fprintf(outf, "}"); } diff --git a/newbrt/logger.h b/newbrt/logger.h index 1870244635f5f03b0010f223c0c85800c3ce6806..abe3c60718528a931ba385520662ec9919daf249 100644 --- a/newbrt/logger.h +++ b/newbrt/logger.h @@ -71,13 +71,13 @@ int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, u_int32_t *le int toku_fread_BLOCKNUM (FILE *f, BLOCKNUM *lsn, struct x1764 *checksum, u_int32_t *len); int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, u_int32_t *len); int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, u_int32_t *len); -int toku_fread_GID (FILE *f, GID *gid, struct x1764 *checksum, u_int32_t *len); +int toku_fread_XIDP (FILE *f, XIDP *xidp, struct x1764 *checksum, u_int32_t *len); int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, u_int32_t *len); int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, u_int32_t *len); int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))); int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))); -int toku_logprint_GID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))); +int toku_logprint_XIDP (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))); int toku_logprint_u_int8_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format); int toku_logprint_u_int32_t (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format); int toku_logprint_BLOCKNUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format); diff --git a/newbrt/recover.c b/newbrt/recover.c index 37c9d44545cf4a201bdf826a2a7021091bf7fa82..b7231d1dd54479a1368795853f1e1b58cd148d57 100644 --- a/newbrt/recover.c +++ b/newbrt/recover.c @@ -660,7 +660,7 @@ static int toku_recover_xstillopenprepared (struct logtype_xstillopenprepared *l l->len, renv); if (r==0) - return toku_txn_prepare_txn(txn, l->gid); + return toku_txn_prepare_txn(txn, l->xa_xid); else return r; } @@ -743,7 +743,7 @@ static int toku_recover_xprepare (struct logtype_xprepare *l, RECOVER_ENV renv) assert(txn!=NULL); // Save the transaction - r = toku_txn_prepare_txn(txn, l->gid); + r = toku_txn_prepare_txn(txn, l->xa_xid); assert(r == 0); return 0; diff --git a/newbrt/txn.c b/newbrt/txn.c index c1e83b8c71750eea603867e049e90e27b94a9dd9..df67a4e161a69f083609bcf8693912826bc59a2d 100644 --- a/newbrt/txn.c +++ b/newbrt/txn.c @@ -8,7 +8,7 @@ #include "txn.h" #include "checkpoint.h" #include "ule.h" - +#include <valgrind/helgrind.h> BOOL garbage_collection_debug = FALSE; @@ -187,6 +187,11 @@ live_list_reverse_note_txn_start(TOKUTXN txn) { return r; } +static void invalidate_xa_xid (XID *xid) { + ANNOTATE_NEW_MEMORY(xid, sizeof(*xid)); // consider it to be all invalid for valgrind + xid->formatID = -1; // According to the XA spec, -1 means "invalid data" +} + int toku_txn_create_txn ( TOKUTXN *tokutxn, @@ -232,7 +237,7 @@ toku_txn_create_txn ( result->recovered_from_checkpoint = FALSE; toku_list_init(&result->checkpoint_before_commit); result->state = TOKUTXN_LIVE; - result->gid.gid = NULL; + invalidate_xa_xid(&result->xa_xid); result->do_fsync = FALSE; toku_txn_ignore_init(result); // 2954 @@ -422,8 +427,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv { if (txn->state==TOKUTXN_PREPARING) { txn->state=TOKUTXN_LIVE; - toku_free(txn->gid.gid); - txn->gid.gid=NULL; + invalidate_xa_xid(&txn->xa_xid); toku_list_remove(&txn->prepared_txns_link); } txn->state = TOKUTXN_COMMITTING; @@ -473,8 +477,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, { if (txn->state==TOKUTXN_PREPARING) { txn->state=TOKUTXN_LIVE; - toku_free(txn->gid.gid); - txn->gid.gid=NULL; + invalidate_xa_xid(&txn->xa_xid); toku_list_remove(&txn->prepared_txns_link); } txn->state = TOKUTXN_ABORTING; @@ -500,23 +503,51 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, return r; } -int toku_txn_prepare_txn (TOKUTXN txn, GID gid) { +static void copy_xid (XID *dest, XID *source) { + ANNOTATE_NEW_MEMORY(dest, sizeof(*dest)); + dest->formatID = source->formatID; + dest->gtrid_length = source->gtrid_length; + dest->bqual_length = source->bqual_length; + memcpy(dest->data, source->data, source->gtrid_length+source->bqual_length); +} + +int toku_txn_prepare_txn (TOKUTXN txn, XID *xa_xid) { assert(txn->state==TOKUTXN_LIVE); txn->state = TOKUTXN_PREPARING; // This state transition must be protected against begin_checkpoint. Right now it uses the ydb lock. if (txn->parent) return 0; // nothing to do if there's a parent. // Do we need to do an fsync? txn->do_fsync = (txn->force_fsync_on_commit || txn->num_rollentries>0); - txn->gid.gid = toku_memdup(gid.gid, DB_GID_SIZE); + copy_xid(&txn->xa_xid, xa_xid); // This list will go away with #4683, so we wn't need the ydb lock for this anymore. toku_list_push(&txn->logger->prepared_txns, &txn->prepared_txns_link); - return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, gid); + return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid); +} + +void toku_txn_get_prepared_xa_xid (TOKUTXN txn, XID *xid) { + copy_xid(xid, &txn->xa_xid); } -void toku_txn_get_prepared_gid (TOKUTXN txn, GID *gidp) { - gidp->gid = toku_memdup(txn->gid.gid, DB_GID_SIZE); +int toku_logger_get_txn_from_xid (TOKULOGGER logger, XID *xid, DB_TXN **txnp) { + int num_live_txns = toku_omt_size(logger->live_txns); + for (int i = 0; i < num_live_txns; i++) { + OMTVALUE v; + { + int r = toku_omt_fetch(logger->live_txns, i, &v); + assert_zero(r); + } + TOKUTXN txn = v; + if (txn->xa_xid.formatID == xid->formatID + && txn->xa_xid.gtrid_length == xid->gtrid_length + && txn->xa_xid.bqual_length == xid->bqual_length + && 0==memcmp(txn->xa_xid.data, xid->data, xid->gtrid_length + xid->bqual_length)) { + *txnp = txn->container_db_txn; + return 0; + } + } + return DB_NOTFOUND; } -int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { +int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { if (flags==DB_FIRST) { // Anything in the returned list goes back on the prepared list. while (!toku_list_empty(&logger->prepared_and_returned_txns)) { @@ -536,7 +567,7 @@ int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/], TOKUTXN txn = toku_list_struct(h, struct tokutxn, prepared_txns_link); assert(txn->container_db_txn); preplist[i].txn = txn->container_db_txn; - memcpy(preplist[i].gid, txn->gid.gid, DB_GID_SIZE); + preplist[i].xid = txn->xa_xid; } else { break; } @@ -587,7 +618,6 @@ void toku_txn_destroy_txn(TOKUTXN txn) { if (txn->open_brts) toku_omt_destroy(&txn->open_brts); xids_destroy(&txn->xids); - if (txn->gid.gid) toku_free(txn->gid.gid); toku_txn_ignore_free(txn); // 2954 toku_free(txn); diff --git a/newbrt/txn.h b/newbrt/txn.h index 6e7d3ad3da729223efbb92ed66071addaaee4011..7d5d552e5db35199d7da71bd567d40b6220850f5 100644 --- a/newbrt/txn.h +++ b/newbrt/txn.h @@ -54,11 +54,11 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, bool release_multi_operation_client_lock); -int toku_txn_prepare_txn (TOKUTXN txn, GID gid) __attribute__((warn_unused_result)); +int toku_txn_prepare_txn (TOKUTXN txn, XID *xid) __attribute__((warn_unused_result)); // Effect: Do the internal work of preparing a transaction (does not log the prepare record). -void toku_txn_get_prepared_gid (TOKUTXN, GID *); -// Effect: Return a pointer to the GID. The value is allocated, so you must free it. +void toku_txn_get_prepared_xa_xid (TOKUTXN, XID *); +// Effect: Fill in the XID information for a transaction. The caller allocates the XID and the function fills in values. int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv); @@ -131,7 +131,12 @@ int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum); TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn); -int toku_logger_recover_txn (TOKULOGGER logger, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags); +struct tokulogger_preplist { + XID xid; + DB_TXN *txn; +}; +int toku_logger_recover_txn (TOKULOGGER logger, struct tokulogger_preplist preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags); +int toku_logger_get_txn_from_xid (TOKULOGGER logger, XID *xid, DB_TXN **txnp); #if defined(__cplusplus) || defined(__cilkplusplus) } diff --git a/newbrt/wbuf.h b/newbrt/wbuf.h index 53ac3ea38177f8bfe8fffc8b774237878036088c..eeb71e1ef2f8e1e1953e0ddacf3594a32d874fbe 100644 --- a/newbrt/wbuf.h +++ b/newbrt/wbuf.h @@ -7,6 +7,7 @@ #include "x1764.h" #include "memory.h" #include "toku_assert.h" +#include "db.h" #include <errno.h> #include <string.h> @@ -190,8 +191,11 @@ static inline void wbuf_TXNID (struct wbuf *w, TXNID tid) { wbuf_ulonglong(w, tid); } -static inline void wbuf_nocrc_GID (struct wbuf *w, GID gid) { - wbuf_nocrc_literal_bytes(w, gid.gid, DB_GID_SIZE); +static inline void wbuf_nocrc_XIDP (struct wbuf *w, XIDP xid) { + wbuf_nocrc_u_int32_t(w, xid->formatID); + wbuf_nocrc_u_int8_t(w, xid->gtrid_length); + wbuf_nocrc_u_int8_t(w, xid->bqual_length); + wbuf_nocrc_literal_bytes(w, xid->data, xid->gtrid_length+xid->bqual_length); } static inline void wbuf_nocrc_LSN (struct wbuf *w, LSN lsn) { diff --git a/src/tests/Makefile b/src/tests/Makefile index 989d0445a5be89acba449aeefd8eed3438befa53..eb0e3a086f31b1070d179a59b5e54848fc50fbd0 100644 --- a/src/tests/Makefile +++ b/src/tests/Makefile @@ -270,6 +270,7 @@ BDB_DONTRUN_TESTS = \ shutdown-3344 \ stat64 stat64-create-modify-times stat64_flatten stat64-null-txn stat64-root-changes \ stress-gc \ + test-xa-prepare \ test1324 \ test1426 \ test1572 \ diff --git a/src/tests/test-prepare.c b/src/tests/test-prepare.c index 9bcedce613b284de9ad83278756b49770e21bcee..940d053c658030de8ad0ebf584b04fe29b31f4fa 100644 --- a/src/tests/test-prepare.c +++ b/src/tests/test-prepare.c @@ -66,9 +66,9 @@ static void test1 (void) { // Now we can look at env2 in the debugger to see if we managed to make it the same - DB_ENV *env; setup_env(&env, ENVDIR); + { DB_PREPLIST l[1]; long count=-1; diff --git a/src/tests/test-xa-prepare.c b/src/tests/test-xa-prepare.c new file mode 100644 index 0000000000000000000000000000000000000000..6f1daf96c4d1894306d441b041759fb60e56c8ab --- /dev/null +++ b/src/tests/test-xa-prepare.c @@ -0,0 +1,120 @@ +#include "test.h" +#include <sys/wait.h> + +#define ENVDIR2 ENVDIR "2" + +static void clean_env (const char *envdir) { + const int len = strlen(envdir)+100; + char cmd[len]; + snprintf(cmd, len, "rm -rf %s", envdir); + system(cmd); + CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO)); +} + +static void setup_env (DB_ENV **envp, const char *envdir) { + CHK(db_env_create(envp, 0)); + (*envp)->set_errfile(*envp, stderr); +#ifdef TOKUDB + CHK((*envp)->set_redzone(*envp, 0)); +#endif + CHK((*envp)->open(*envp, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO)); +} + +const unsigned int myformatid = 0x74736554; + +static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commit) { + DB *db; + DB_TXN *txn; + clean_env(envdir); + setup_env(envp, envdir); + CKERR(db_create(&db, *envp, 0)); + CKERR(db->open(db, NULL, "foo.db", 0, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO)); + CKERR((*envp)->txn_begin(*envp, 0, &txn, 0)); + DBT key={.size=4, .data="foo"}; + CKERR(db->put(db, txn, &key, &key, 0)); + CHK(db->close(db, 0)); + XID x = {.formatID = myformatid, + .gtrid_length = 8, + .bqual_length = 9}; + for (int i=0; i<8+9; i++) x.data[i] = 42+i; + CKERR(txn->xa_prepare(txn, &x)); + if (commit) + CKERR(txn->commit(txn, 0)); +} + +static void test1 (void) { + pid_t pid; + bool do_fork = true; + if (!do_fork || 0==(pid=fork())) { + DB_ENV *env; + setup_env_and_prepare(&env, ENVDIR, false); + { + XID l[1]; + long count=-1; + CKERR(env->txn_xa_recover(env, l, 1, &count, DB_FIRST)); + printf("%s:%d count=%ld\n", __FILE__, __LINE__, count); + assert(count==1); + assert(myformatid==l[0].formatID); + assert( 8==l[0].gtrid_length); + assert( 9==l[0].bqual_length); + for (int i=0; i<8+9; i++) { + assert(l[0].data[i]==42+i); + } + } + exit(0); + } + int status; + if (do_fork) { + pid_t pid2 = wait(&status); + assert(pid2==pid); + } + + DB_ENV *env2; + setup_env_and_prepare(&env2, ENVDIR2, true); + + // Now we can look at env2 in the debugger to see if we managed to make it the same + + DB_ENV *env; + setup_env(&env, ENVDIR); + + { + XID l[1]; + long count=-1; + { + int r = env->txn_xa_recover(env, l, 1, &count, DB_FIRST); + printf("r=%d count=%ld\n", r, count); + } + assert(count==1); + assert(l[0].data[0]==42); + assert(myformatid==l[0].formatID); + assert( 8 ==l[0].gtrid_length); + assert( 9 ==l[0].bqual_length); + for (int i=0; i<8+9; i++) { + assert(l[0].data[i]==42+i); + } + { + DB_TXN *txn; + int r = env->get_txn_from_xid(env, &l[0], &txn); + assert(r==0); + CHK(txn->commit(txn, 0)); + } + } + CHK(env2->close(env2, 0)); + CHK(env ->close(env, 0)); +} + +int test_main (int argc, char *const argv[]) { + default_parse_args(argc, argv); + // first test: open an environment, a db, a txn, and do a prepare. Then do txn_prepare (without even closing the environment). + test1(); + + + // second test: poen environment, a db, a txn, prepare, close the environment. Then reopen and do txn_prepare. + + // third test: make sure there is an fsync on txn_prepare, but not on the following commit. + + + // Then close the environment Find out what BDB does when ask for the txn prepares. + // Other tests: read prepared txns, 1 at a time. Then close it and read them again. + return 0; +} diff --git a/src/ydb.c b/src/ydb.c index cee5f0c0a3e0e85facb783c899e8ec9541f2d23d..48afb81c4f0c96e774f81de9135d0c6c29c1b138 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -1499,13 +1499,61 @@ locked_env_close(DB_ENV * env, u_int32_t flags) { } static int -toku_env_recover_txn (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { - return toku_logger_recover_txn(env->i->logger, preplist, count, retp, flags); +toku_env_txn_xa_recover (DB_ENV *env, XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { + struct tokulogger_preplist *MALLOC_N(count,preps); + int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags); + if (r==0) { + assert(*retp<=count); + for (int i=0; i<*retp; i++) { + xids[i] = preps[i].xid; + } + } + toku_free(preps); + return r; } +static int +toku_env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { + struct tokulogger_preplist *MALLOC_N(count,preps); + int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags); + if (r==0) { + assert(*retp<=count); + for (int i=0; i<*retp; i++) { + preplist[i].txn = preps[i].txn; + memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length); + } + } + toku_free(preps); + return r; +} + + static int locked_env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { - toku_ydb_lock(); int r = toku_env_recover_txn(env, preplist, count, retp, flags); toku_ydb_unlock(); return r; + toku_ydb_lock(); + int r = toku_env_txn_recover(env, preplist, count, retp, flags); + toku_ydb_unlock(); + return r; +} +static int +locked_env_txn_xa_recover (DB_ENV *env, XID xids[/*count*/], long count, /*out*/ long *retp, u_int32_t flags) { + toku_ydb_lock(); + int r = toku_env_txn_xa_recover(env, xids, count, retp, flags); + toku_ydb_unlock(); + return r; +} + +static int +toku_env_get_txn_from_xid (DB_ENV *env, /*in*/XID *xid, /*out*/ DB_TXN **txnp) { + return toku_logger_get_txn_from_xid(env->i->logger, xid, txnp); +} + +static int +locked_env_get_txn_from_xid (DB_ENV *env, /*in*/XID *xid, /*out*/ DB_TXN **txnp) { + toku_ydb_lock(); + int r = toku_env_get_txn_from_xid(env, xid, txnp); + toku_ydb_unlock(); + return r; } static int @@ -2372,6 +2420,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { SENV(open); SENV(close); SENV(txn_recover); + SENV(txn_xa_recover); + SENV(get_txn_from_xid); SENV(log_flush); //SENV(set_noticecall); SENV(set_flags); diff --git a/src/ydb_txn.c b/src/ydb_txn.c index 9ea9722d53489ab56f852424f8351d9e29ccb4c1..76ae6a5b6e67e7f8bc2a7aec05f0bb0bc3c9b9a9 100644 --- a/src/ydb_txn.c +++ b/src/ydb_txn.c @@ -8,6 +8,7 @@ #include "checkpoint.h" #include "log_header.h" #include "ydb_txn.h" +#include <valgrind/helgrind.h> static int toku_txn_release_locks(DB_TXN* txn) { @@ -216,7 +217,7 @@ toku_txn_abort_only(DB_TXN * txn, } static int -toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { +toku_txn_xa_prepare (DB_TXN *txn, XID *xid) { if (!txn) return EINVAL; if (txn->parent) return EINVAL; HANDLE_PANICKED_ENV(txn->mgrp); @@ -232,8 +233,7 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { } assert(!db_txn_struct_i(txn)->child); TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; - GID gids = {gid}; - int r = toku_txn_prepare_txn(ttxn, gids); + int r = toku_txn_prepare_txn(ttxn, xid); TOKULOGGER logger = txn->mgrp->i->logger; LSN do_fsync_lsn; bool do_fsync; @@ -242,6 +242,17 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { return r; } +static int +toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { + XID xid; + ANNOTATE_NEW_MEMORY(&xid, sizeof(xid)); + xid.formatID=0x756b6f54; // "Toku" + xid.gtrid_length=DB_GID_SIZE/2; // The maximum allowed gtrid length is 64. See the XA spec in source:/import/opengroup.org/C193.pdf page 20. + xid.bqual_length=DB_GID_SIZE/2; // The maximum allowed bqual length is 64. + memcpy(xid.data, gid, DB_GID_SIZE); + return toku_txn_xa_prepare(txn, &xid); +} + int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra, @@ -324,6 +335,11 @@ locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { toku_ydb_lock(); int r = toku_txn_prepare (txn, gid); toku_ydb_unlock(); return r; } +static int +locked_txn_xa_prepare (DB_TXN *txn, XID *xid) { + toku_ydb_lock(); int r = toku_txn_xa_prepare (txn, xid); toku_ydb_unlock(); return r; +} + int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) { HANDLE_PANICKED_ENV(env); @@ -420,6 +436,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool STXN(commit_with_progress); STXN(id); STXN(prepare); + STXN(xa_prepare); STXN(txn_stat); #undef STXN