Commit cbabd7c5 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1866 List of live transactions (in logger) is now an omt instead of a linked list.

refs [t:1866]

git-svn-id: file:///svn/toku/tokudb@13604 c7de825b-a66e-492c-adef-691d508d4ae1
parent da83ebbe
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
#include "list.h" #include "list.h"
#include "mempool.h" #include "mempool.h"
#include "kv-pair.h" #include "kv-pair.h"
typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "leafentry.h" #include "leafentry.h"
#include "block_table.h" #include "block_table.h"
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "log_header.h" #include "log_header.h"
#include "checkpoint.h" #include "checkpoint.h"
#include "minicron.h" #include "minicron.h"
#include "log-internal.h"
#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER) #if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
#error #error
...@@ -1520,8 +1521,9 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) { ...@@ -1520,8 +1521,9 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
} }
static int static int
log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v)) log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *loggerv) {
{ TOKUTXN txn = txnv;
TOKULOGGER logger = loggerv;
if (toku_logger_txn_parent(txn)==NULL) { // only have to log the open root transactions if (toku_logger_txn_parent(txn)==NULL) { // only have to log the open root transactions
int r = toku_log_xstillopen(logger, NULL, 0, int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn), toku_txn_get_txnid(txn),
...@@ -1575,7 +1577,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -1575,7 +1577,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
} }
// Log all the open transactions // Log all the open transactions
{ {
int r = toku_logger_iterate_over_live_txns (logger, log_open_txn, NULL); int r = toku_omt_iterate(logger->live_txns, log_open_txn, logger);
assert(r==0); assert(r==0);
} }
// Log all the open files // Log all the open files
......
...@@ -65,7 +65,7 @@ struct tokulogger { ...@@ -65,7 +65,7 @@ struct tokulogger {
// To access these, you must have the input lock // To access these, you must have the input lock
struct logbytes *head,*tail; struct logbytes *head,*tail;
LSN lsn; // the next available lsn LSN lsn; // the next available lsn
struct list live_txns; // just a linked list. Should be a hashtable. OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
int n_in_buf; int n_in_buf;
// To access these, you must have the output lock // To access these, you must have the output lock
...@@ -95,7 +95,6 @@ struct tokutxn { ...@@ -95,7 +95,6 @@ struct tokutxn {
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */ LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
LSN first_lsn; /* The first lsn in the transaction. */ LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */ struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
MEMARENA rollentry_arena; MEMARENA rollentry_arena;
......
...@@ -24,19 +24,21 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -24,19 +24,21 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->lg_max = 100<<20; // 100MB default result->lg_max = 100<<20; // 100MB default
result->head = result->tail = 0; result->head = result->tail = 0;
result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0}; result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0};
list_init(&result->live_txns); r = toku_omt_create(&result->live_txns); if (r!=0) goto died0;
result->n_in_buf=0; result->n_in_buf=0;
result->n_in_file=0; result->n_in_file=0;
result->directory=0; result->directory=0;
result->checkpoint_lsn=(LSN){0}; result->checkpoint_lsn=(LSN){0};
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0; r = ml_init(&result->input_lock); if (r!=0) goto died1;
r = ml_init(&result->output_lock); if (r!=0) goto died1; r = ml_init(&result->output_lock); if (r!=0) goto died2;
return 0; return 0;
died1: died2:
ml_destroy(&result->input_lock); ml_destroy(&result->input_lock);
died1:
toku_omt_destroy(&result->live_txns);
died0: died0:
toku_free(result); toku_free(result);
return r; return r;
...@@ -131,6 +133,7 @@ int toku_logger_close(TOKULOGGER *loggerp) { ...@@ -131,6 +133,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
r = ml_destroy(&logger->input_lock); if (r!=0) goto panic; r = ml_destroy(&logger->input_lock); if (r!=0) goto panic;
logger->is_panicked=1; // Just in case this might help. logger->is_panicked=1; // Just in case this might help.
if (logger->directory) toku_free(logger->directory); if (logger->directory) toku_free(logger->directory);
toku_omt_destroy(&logger->live_txns);
toku_free(logger); toku_free(logger);
*loggerp=0; *loggerp=0;
if (locked_logger) { if (locked_logger) {
...@@ -728,20 +731,36 @@ TOKULOGGER toku_txn_logger (TOKUTXN txn) { ...@@ -728,20 +731,36 @@ TOKULOGGER toku_txn_logger (TOKUTXN txn) {
return txn ? txn->logger : 0; return txn ? txn->logger : 0;
} }
//Heaviside function to search through an OMT by a TXNID
static int
find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = *(TXNID*)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) { int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==0) return -1; if (logger==NULL) return -1;
struct list *l;
for (l = list_head(&logger->live_txns); l != &logger->live_txns; l = l->next) { OMTVALUE txnfound;
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link); int rval;
assert(txn->tag==TYP_TOKUTXN); int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL, NULL);
if (txn->txnid64==txnid) { if (r==0) {
*result = txn; TOKUTXN txn = txnfound;
return 0; assert(txn->tag==TYP_TOKUTXN);
} assert(txn->txnid64==txnid);
*result = txn;
rval = 0;
} }
// If there is no txn, then we treat it as the null txn. else {
*result = 0; assert(r==DB_NOTFOUND);
return 0; // If there is no txn, then we treat it as the null txn.
*result = NULL;
rval = 0;
}
return rval;
} }
int toku_set_func_fsync (int (*fsync_function)(int)) { int toku_set_func_fsync (int (*fsync_function)(int)) {
...@@ -793,16 +812,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { ...@@ -793,16 +812,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
// get them into increasing order // get them into increasing order
qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare); qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
LSN oldest_live_txn_lsn={1LL<<63}; LSN oldest_live_txn_lsn={.lsn = oldest_living_xid};
{
struct list *l;
for (l=list_head(&logger->live_txns); l!=&logger->live_txns; l=l->next) {
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link);
if (oldest_live_txn_lsn.lsn>txn->txnid64) {
oldest_live_txn_lsn.lsn=txn->txnid64;
}
}
}
//printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn); //printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn);
// Now starting at the last one, look for archivable ones. // Now starting at the last one, look for archivable ones.
...@@ -854,16 +864,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { ...@@ -854,16 +864,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
} }
int toku_logger_iterate_over_live_txns (TOKULOGGER logger, int (*f)(TOKULOGGER, TOKUTXN, void*), void *v) {
struct list *l;
for (l = list_head(&logger->live_txns); l != &logger->live_txns; l = l->next) {
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link);
int r = f(logger, txn, v);
if (r!=0) return r;
}
return 0;
}
TOKUTXN toku_logger_txn_parent (TOKUTXN txn) { TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
return txn->parent; return txn->parent;
} }
......
...@@ -65,7 +65,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result); ...@@ -65,7 +65,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
//int toku_set_func_fsync (int (*fsync_function)(int)); //int toku_set_func_fsync (int (*fsync_function)(int));
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags); int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_logger_iterate_over_live_txns (TOKULOGGER logger, int (*f)(TOKULOGGER, TOKUTXN, void*), void *v);
TOKUTXN toku_logger_txn_parent (TOKUTXN txn); TOKUTXN toku_logger_txn_parent (TOKUTXN txn);
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn); void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
typedef void *OMTVALUE;
#include "toku_assert.h" #include "toku_assert.h"
#include "memory.h" #include "memory.h"
#include "omt.h" #include "omt.h"
......
...@@ -147,6 +147,7 @@ ...@@ -147,6 +147,7 @@
// The programming API: // The programming API:
//typedef struct value *OMTVALUE; // A slight improvement over using void*. //typedef struct value *OMTVALUE; // A slight improvement over using void*.
typedef void *OMTVALUE;
typedef struct omt *OMT; typedef struct omt *OMT;
typedef struct omt_cursor *OMTCURSOR; typedef struct omt_cursor *OMTCURSOR;
......
...@@ -30,22 +30,36 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -30,22 +30,36 @@ void toku_rollback_txn_close (TOKUTXN txn) {
toku_free(txn->rollentry_filename); toku_free(txn->rollentry_filename);
} }
list_remove(&txn->live_txns_link); {
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
int r;
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx, NULL);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_txns, idx);
assert(r==0);
}
assert(oldest_living_xid <= txn->txnid64); assert(oldest_living_xid <= txn->txnid64);
assert(oldest_living_xid < MAX_TXNID); assert(oldest_living_xid < MAX_TXNID);
if (txn->txnid64 == oldest_living_xid) { if (txn->txnid64 == oldest_living_xid) {
TOKULOGGER logger = txn->logger; TOKULOGGER logger = txn->logger;
LSN oldest_live_txn_lsn = {MAX_TXNID};
{ OMTVALUE oldest_txnv;
struct list *l; int r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv, NULL);
for (l=list_head(&logger->live_txns); l!=&logger->live_txns; l=l->next) { if (r==0) {
TOKUTXN live_txn = list_struct(l, struct tokutxn, live_txns_link); TOKUTXN oldest_txn = oldest_txnv;
if (oldest_live_txn_lsn.lsn>live_txn->txnid64) { assert(oldest_txn != txn); // We just removed it
oldest_live_txn_lsn.lsn=live_txn->txnid64; assert(oldest_txn->txnid64 > oldest_living_xid); //Must be newer than the previous oldest
} oldest_living_xid = oldest_txn->txnid64;
} }
else {
//No living transactions
assert(r==EINVAL);
oldest_living_xid = MAX_TXNID;
} }
oldest_living_xid = oldest_live_txn_lsn.lsn;
} }
note_txn_closing(txn); note_txn_closing(txn);
...@@ -183,7 +197,6 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) { ...@@ -183,7 +197,6 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
count++; count++;
if (count%2 == 0) yield(NULL, yieldv); if (count%2 == 0) yield(NULL, yieldv);
} }
list_remove(&txn->live_txns_link);
// Read stuff out of the file and roll it back. // Read stuff out of the file and roll it back.
if (txn->rollentry_filename) { if (txn->rollentry_filename) {
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv); r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv);
...@@ -288,8 +301,8 @@ int toku_read_rollback_backwards(BREAD br, struct roll_entry **item, MEMARENA ma ...@@ -288,8 +301,8 @@ int toku_read_rollback_backwards(BREAD br, struct roll_entry **item, MEMARENA ma
return 0; return 0;
} }
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) { int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v; TOKUTXN txn = v;
TOKUTXN txnfind = txnv; TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1; if (txn->txnid64<txnfind->txnid64) return -1;
......
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "omt.h"
// these routines in rollback.c // these routines in rollback.c
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv); int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv);
...@@ -30,4 +32,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr); ...@@ -30,4 +32,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv); int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv);
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv); int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv);
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv);
#endif #endif
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
#include "test.h" #include "test.h"
#include <stdio.h> #include <stdio.h>
typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "memory.h" #include "memory.h"
#include "toku_assert.h" #include "toku_assert.h"
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "memory.h" #include "memory.h"
#include "toku_assert.h" #include "toku_assert.h"
......
...@@ -14,18 +14,20 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -14,18 +14,20 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
if (result==0) if (result==0)
return errno; return errno;
int r; int r;
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0); if (0) {
if (r!=0) { died0:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
toku_free(result); toku_free(result);
return r; return r;
} }
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died0;
r = toku_omt_create(&result->open_brts); r = toku_omt_create(&result->open_brts);
if (r!=0) { if (r!=0) goto died0;
if (0) {
died1: died1:
toku_logger_panic(logger, r); toku_omt_destroy(&result->open_brts);
toku_free(result); goto died0;
return r;
} }
result->txnid64 = result->first_lsn.lsn; result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids; XIDS parent_xids;
...@@ -35,19 +37,36 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -35,19 +37,36 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
parent_xids = parent_tokutxn->xids; parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64))) if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died1; goto died1;
if (0) {
died2:
xids_destroy(&result->xids);
goto died1;
}
result->logger = logger; result->logger = logger;
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create(); result->rollentry_arena = memarena_create();
if (list_empty(&logger->live_txns)) { if (toku_omt_size(logger->live_txns) == 0) {
assert(oldest_living_xid == MAX_TXNID); assert(oldest_living_xid == MAX_TXNID);
oldest_living_xid = result->txnid64; oldest_living_xid = result->txnid64;
} }
assert(oldest_living_xid < MAX_TXNID); assert(oldest_living_xid < MAX_TXNID);
assert(oldest_living_xid <= result->txnid64); assert(oldest_living_xid <= result->txnid64);
list_push(&logger->live_txns, &result->live_txns_link);
{
//Add txn to list (omt) of live transactions
u_int32_t idx;
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died2;
if (oldest_living_xid == result->txnid64)
assert(idx == 0);
else
assert(idx > 0);
}
result->rollentry_resident_bytecount=0; result->rollentry_resident_bytecount=0;
result->rollentry_raw_count = 0; result->rollentry_raw_count = 0;
result->rollentry_filename = 0; result->rollentry_filename = 0;
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <toku_stdint.h> #include <toku_stdint.h>
#include <string.h> #include <string.h>
typedef toku_range *OMTVALUE;
#include "../../newbrt/omt.h" #include "../../newbrt/omt.h"
struct __toku_range_tree_local { struct __toku_range_tree_local {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment