Commit 69d0ba12 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

If fsync fails on a log commit, panic the DB. (We don't know if the...

If fsync fails on a log commit, panic the DB.  (We don't know if the transaction committed or failed without doing recovery.)
Start work on saving the undo records in main memory for rollback.
Addresses #27 (recovery) 
Addresses #253 (rollback)


git-svn-id: file:///svn/tokudb@1561 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3465f1bf
......@@ -9,6 +9,9 @@
#define LOGGER_BUF_SIZE (1<<24)
struct tokulogger {
int is_open;
int is_panicked;
int panic_errno;
enum typ_tag tag;
char *directory;
int fd;
......@@ -40,6 +43,7 @@ struct tokutxn {
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry;
};
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
......
......@@ -64,33 +64,50 @@ int toku_logger_find_logfiles (const char *directory, int *n_resultsp, char ***r
return closedir(d);
}
int toku_logger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) {
int toku_logger_create (TOKULOGGER *resultp) {
TAGMALLOC(TOKULOGGER, result);
if (result==0) return -1;
if (result==0) return errno;
result->is_open=0;
result->is_panicked=0;
*resultp=result;
return 0;
}
int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_open) return EINVAL;
if (logger->is_panicked) return EINVAL;
int r;
long long nexti;
r = toku_logger_find_next_unused_log_file(directory, &nexti);
if (r!=0) {
died0:
toku_free(result);
return r;
}
result->directory = toku_strdup(directory);
if (result->directory==0) goto died0;
result->fd = -1;
result->next_log_file_number = nexti;
result->n_in_buf = 0;
logger->directory = toku_strdup(directory);
if (logger->directory==0) goto died0;
logger->fd = -1;
logger->next_log_file_number = nexti;
logger->n_in_buf = 0;
result->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
logger->lsn.lsn = 0; // WRONG!!! This should actually be calculated by looking at the log file.
*resultp=result;
return toku_logger_log_bytes(result, 0, "");
return toku_logger_log_bytes(logger, 0, "");
}
void toku_logger_panic (TOKULOGGER logger, int err) {
logger->panic_errno=err;
logger->is_panicked=1;
}
int toku_logger_panicked(TOKULOGGER logger) {
if (logger==0) return 0;
return logger->is_panicked;
}
static int log_format_version=0;
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
int r;
if (logger->is_panicked) return EINVAL;
//fprintf(stderr, "%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
if (logger->fd==-1) {
int fnamelen = strlen(logger->directory)+50;
......@@ -129,9 +146,11 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
return 0;
}
int toku_logger_log_close(TOKULOGGER *loggerp) {
int toku_logger_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
if (logger->is_panicked) return EINVAL;
int r = 0;
if (!logger->is_open) goto is_closed;
if (logger->fd!=-1) {
//printf("%s:%d closing log: n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
if (logger->n_in_buf>0) {
......@@ -141,6 +160,8 @@ int toku_logger_log_close(TOKULOGGER *loggerp) {
r = close(logger->fd);
}
toku_free(logger->directory);
is_closed:
logger->is_panicked=1; // Just in case this might help.
toku_free(logger);
*loggerp=0;
return r;
......@@ -160,6 +181,7 @@ n
int toku_logger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
if (logger->is_panicked) return EINVAL;
if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
......@@ -173,6 +195,7 @@ int toku_logger_fsync (TOKULOGGER logger) {
}
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
if (logger->is_panicked) return EINVAL;
wbuf_int(wbuf, toku_crc32(0, wbuf->buf, wbuf->ndone));
wbuf_int(wbuf, 4+wbuf->ndone);
return toku_logger_log_bytes(logger, wbuf->ndone, wbuf->buf);
......@@ -187,6 +210,7 @@ int toku_logger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
int keylen,
unsigned char *val,
int vallen) {
if (logger->is_panicked) return EINVAL;
printf("%s:%d\n", __FILE__, __LINE__);
return 0;
int buflen=(keylen+vallen+4+4 // key and value
......@@ -213,6 +237,7 @@ int toku_logger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair) {
assert(is_add==0);
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
assert(db);
int keylen = pair->keylen;
int vallen = pair->vallen;
......@@ -239,12 +264,14 @@ int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF dis
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn, txn->txnid64, nosync);
toku_free(txn);
return r;
}
int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
if (logger->is_panicked) return EINVAL;
struct wbuf wbuf;
const int buflen =10;
unsigned char buf[buflen];
......@@ -258,6 +285,7 @@ int toku_logger_log_checkpoint (TOKULOGGER logger, LSN *lsn) {
}
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
......@@ -268,6 +296,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
}
int toku_logger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum) {
if (logger->is_panicked) return EINVAL;
const int buflen=(+1 // log command
+8 // lsn
+8 // fileid
......@@ -292,6 +321,8 @@ int toku_logger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF old
}
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
......@@ -300,6 +331,8 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
......@@ -310,6 +343,7 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
int toku_logger_log_unlink (TOKUTXN txn, const char *fname) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
const int fnamelen = strlen(fname);
const int buflen = (+1 // log command
+4 // length of fname
......@@ -325,6 +359,8 @@ int toku_logger_log_unlink (TOKUTXN txn, const char *fname) {
};
int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
#if 0
LOGGEDBRTHEADER lh;
lh.size = toku_serialize_brt_header_size(h);
......@@ -348,7 +384,6 @@ int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h)
toku_free(all_that_stuff);
return r;
#else
if (txn==0) return 0;
int subsize=toku_serialize_brt_header_size(h);
int buflen = (4 // firstlen
+ 1 //cmd
......
......@@ -6,10 +6,13 @@
#include "../include/db.h"
#include "brttypes.h"
#include "kv-pair.h"
int toku_logger_create_and_open_logger (const char *directory, TOKULOGGER *resultp);
int toku_logger_create(TOKULOGGER */*resultp*/);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int toku_logger_log_close(TOKULOGGER *logger);
int toku_logger_close(TOKULOGGER *logger);
int toku_logger_log_checkpoint (TOKULOGGER, LSN*);
void toku_logger_panic(TOKULOGGER, int/*err*/);
int toku_logger_panicked(TOKULOGGER /*logger*/);
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
......
......@@ -138,6 +138,7 @@ void generate_log_struct (void) {
fprintf(hf, " union {\n");
DO_LOGTYPES(lt, fprintf(hf," struct logtype_%s %s;\n", lt->name, lt->name));
fprintf(hf, " } u;\n");
fprintf(hf, " struct log_entry *next; /* for in-memory list of log entries */\n");
fprintf(hf, "};\n");
}
......@@ -183,8 +184,18 @@ void generate_log_writer (void) {
if (lt->command=='C') {
fprintf(cf, " if (r!=0) return r;\n");
fprintf(cf, " // commit has some extra work to do.\n");
fprintf(cf, " if (txn->parent || nosync) return 0; // don't fsync if there is a parent.\n");
fprintf(cf, " else return toku_logger_fsync(txn->logger);\n");
fprintf(cf, " if (nosync) return 0;\n");
fprintf(cf, " if (txn->parent) { // do not fsync if there is a parent. Instead append the log entries onto the parent.\n");
fprintf(cf, " if (txn->parent->oldest_logentry) txn->parent->newest_logentry->next = txn->oldest_logentry;\n");
fprintf(cf, " else txn->parent->oldest_logentry = txn->oldest_logentry;\n");
fprintf(cf, " if (txn->newest_logentry) txn->parent->newest_logentry = txn->newest_logentry;\n");
fprintf(cf, " txn->newest_logentry = txn->oldest_logentry = 0;\n");
fprintf(cf, " } else {\n");
fprintf(cf, " while (txn->newest_logentry) { struct log_entry *next=txn->newest_logentry->next; toku_free(txn->newest_logentry); txn->newest_logentry=next; }\n");
fprintf(cf, " r = toku_logger_fsync(txn->logger);\n");
fprintf(cf, " if (r!=0) toku_logger_panic(txn->logger, r);\n");
fprintf(cf, " }\n");
fprintf(cf, " return 0;\n");
} else {
fprintf(cf, " return r;\n");
}
......
......@@ -45,6 +45,7 @@ typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
#endif
struct __toku_db_env_internal {
int is_panicked;
int ref_count;
u_int32_t open_flags;
int open_mode;
......@@ -96,8 +97,16 @@ void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrst
}
static int env_is_panicked(DB_ENV *dbenv) {
if (dbenv==0) return 0;
return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger);
}
#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; })
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)
// Handle all the error cases (but don't do the default thing.)
static int do_error (DB_ENV *dbenv, int error, const char *string, ...) {
if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1;
va_list ap;
va_start(ap, string);
toku_do_error_all_cases(dbenv, error, 1, 0, string, ap);
......@@ -115,7 +124,7 @@ static void toku_db_env_err(const DB_ENV * env, int error, const char *fmt, ...)
#define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })
#define note() ({ fprintf(stderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define note() ({ fprintf(svtderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define notef(fmt,...) ({ fprintf(stderr, "YDB: Note %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })
#if 0
......@@ -191,6 +200,7 @@ static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) {
}
static int db_env_read_config(DB_ENV *env) {
HANDLE_PANICKED_ENV(env);
const char* config_name = "DB_CONFIG";
char* full_name = NULL;
char* linebuffer = NULL;
......@@ -298,6 +308,7 @@ static int db_env_read_config(DB_ENV *env) {
}
static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
HANDLE_PANICKED_ENV(env);
int r;
if (db_env_opened(env)) {
......@@ -357,16 +368,17 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
char* full_dir = NULL;
if (env->i->lg_dir) full_dir = construct_full_name(env->i->dir, env->i->lg_dir);
r = toku_logger_create_and_open_logger(
full_dir ? full_dir : env->i->dir, &env->i->logger);
if (full_dir) toku_free(full_dir);
assert(env->i->logger);
if (r!=0) {
do_error(env, r, "Could not open logger\n");
do_error(env, r, "Could not create logger\n");
goto died1;
}
if (0) {
r = toku_logger_open(full_dir ? full_dir : env->i->dir, env->i->logger);
if (full_dir) toku_free(full_dir);
if (r!=0) {
do_error(env, r, "Could not open logger\n");
died2:
toku_logger_log_close(&env->i->logger);
toku_logger_close(&env->i->logger);
goto died1;
}
}
......@@ -377,11 +389,13 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
}
static int toku_db_env_close(DB_ENV * env, u_int32_t flags) {
// Even if the env is panicedk, try to close as much as we can.
int is_panicked = env_is_panicked(env);
int r0=0,r1=0;
if (env->i->cachetable)
r0=toku_cachetable_close(&env->i->cachetable);
if (env->i->logger)
r1=toku_logger_log_close(&env->i->logger);
r1=toku_logger_close(&env->i->logger);
if (env->i->data_dirs) {
u_int32_t i;
assert(env->i->n_data_dirs > 0);
......@@ -401,6 +415,7 @@ static int toku_db_env_close(DB_ENV * env, u_int32_t flags) {
if (flags!=0) return EINVAL;
if (r0) return r0;
if (r1) return r1;
if (is_panicked) return EINVAL;
return 0;
}
......@@ -411,12 +426,14 @@ static int toku_db_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags)
}
static int toku_db_env_log_flush(DB_ENV * env, const DB_LSN * lsn) {
HANDLE_PANICKED_ENV(env);
env=env; lsn=lsn;
barf();
return 1;
}
static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache __attribute__((__unused__))) {
HANDLE_PANICKED_ENV(env);
u_int64_t cs64 = ((u_int64_t) gbytes << 30) + bytes;
unsigned long cs = cs64;
if (cs64 > cs)
......@@ -428,6 +445,7 @@ static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t b
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) {
HANDLE_PANICKED_ENV(env);
*gbytes = env->i->cachetable_size >> 30;
*bytes = env->i->cachetable_size & ((1<<30)-1);
*ncache = 1;
......@@ -437,6 +455,7 @@ static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t
#endif
static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) {
HANDLE_PANICKED_ENV(env);
u_int32_t i;
int r;
char** temp;
......@@ -487,6 +506,7 @@ static void toku_db_env_set_errpfx(DB_ENV * env, const char *errpfx) {
}
static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
HANDLE_PANICKED_ENV(env);
if (flags != 0 && onoff) {
return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n");
}
......@@ -494,11 +514,13 @@ static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
}
static int toku_db_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
HANDLE_PANICKED_ENV(env);
bsize=bsize;
return do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n");
}
static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) {
HANDLE_PANICKED_ENV(env);
if (db_env_opened(env)) {
return do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
}
......@@ -515,18 +537,21 @@ static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) {
}
static int toku_db_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
HANDLE_PANICKED_ENV(env);
lg_max=lg_max;
return do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n");
}
static int toku_db_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
HANDLE_PANICKED_ENV(env);
detect=detect;
return do_error(env, EINVAL, "TokuDB does not (yet) support set_lk_detect\n");
}
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
static int toku_db_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
env=env;lk_max=lk_max;
HANDLE_PANICKED_ENV(env);
lk_max=lk_max;
return 0;
}
#endif
......@@ -536,6 +561,7 @@ static int toku_db_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
//}
static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
HANDLE_PANICKED_ENV(env);
if (db_env_opened(env)) {
return do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
}
......@@ -549,7 +575,8 @@ static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
}
static int toku_db_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
env=env; which=which; onoff=onoff;
HANDLE_PANICKED_ENV(env);
which=which; onoff=onoff;
return 1;
}
......@@ -559,7 +586,8 @@ static int toku_db_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t m
}
static int toku_db_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
env=env;statp=statp;flags=flags;
HANDLE_PANICKED_ENV(env);
statp=statp;flags=flags;
return 1;
}
......@@ -614,17 +642,29 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
return ENOMEM;
}
memset(result->i, 0, sizeof *result->i);
result->i->is_panicked=0;
result->i->ref_count = 1;
result->i->errcall = 0;
result->i->errpfx = 0;
result->i->errfile = 0;
{
int r = toku_logger_create(&result->i->logger);
if (r!=0) {
toku_free(result->i);
toku_free(result);
return r;
}
assert(result->i->logger);
}
ydb_add_ref();
*envp = result;
return 0;
}
static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(txn->mgrp);
//notef("flags=%d\n", flags);
int r;
int nosync = (flags & DB_TXN_NOSYNC)!=0;
......@@ -645,7 +685,7 @@ static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) {
}
static u_int32_t toku_db_txn_id(DB_TXN * txn) {
txn=txn;
HANDLE_PANICKED_ENV(txn->mgrp);
barf();
abort();
}
......@@ -653,11 +693,13 @@ static u_int32_t toku_db_txn_id(DB_TXN * txn) {
static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
fprintf(stderr, "toku_txn_abort(%p)\n", txn);
abort();
}
static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(env);
if (!env->i->logger) return EINVAL;
flags=flags;
DB_TXN *MALLOC(result);
......@@ -721,6 +763,8 @@ static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) {
static int toku_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result),
u_int32_t flags) {
HANDLE_PANICKED_DB(primary);
HANDLE_PANICKED_DB(secondary);
unsigned int brtflags;
if (secondary->i->primary) return EINVAL; // The secondary already has a primary
......@@ -770,12 +814,14 @@ static int toku_db_close(DB * db, u_int32_t flags) {
if (r != 0)
return r;
// printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
int is_panicked = env_is_panicked(db->dbenv); // Even if panicked, let's close as much as we can.
db_env_unref(db->dbenv);
toku_free(db->i->database_name);
toku_free(db->i->full_fname);
toku_free(db->i);
toku_free(db);
ydb_unref();
if (r==0 && is_panicked) return EINVAL;
return r;
}
......@@ -805,11 +851,13 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey)
}
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
HANDLE_PANICKED_DB(c->dbp);
int r = toku_brt_cursor_get(c->i->c, key, data, flag, c->i->txn ? c->i->txn->i->tokutxn : 0);
return r;
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp);
int r;
r = toku_brt_cursor_delete(c->i->c, flags);
......@@ -860,6 +908,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
int r2;
int r3;
DB *db = c->dbp;
HANDLE_PANICKED_DB(db);
DB *pdb = db->i->primary;
......@@ -941,6 +990,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
DB *db = c->dbp;
HANDLE_PANICKED_DB(db);
int r;
if (db->i->primary==0) r = toku_c_get_noassociate(c, key, data, flag);
......@@ -1046,6 +1096,7 @@ static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary
static int toku_c_del(DBC * c, u_int32_t flags) {
int r;
DB* db = c->dbp;
HANDLE_PANICKED_DB(db);
//It is a primary with secondaries, or is a secondary.
if (db->i->primary != 0 || !list_empty(&db->i->associated)) {
......@@ -1087,6 +1138,7 @@ static int toku_c_del(DBC * c, u_int32_t flags) {
static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
DB* db = dbc->dbp;
HANDLE_PANICKED_DB(db);
unsigned int brtflags;
int r;
DBT* put_key = key;
......@@ -1151,6 +1203,7 @@ static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
}
static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
if (flags != 0)
return EINVAL;
DBC *MALLOC(result);
......@@ -1173,6 +1226,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
}
static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
//It is a primary with secondaries, or is a secondary.
......@@ -1251,6 +1305,7 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
}
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
if (db->i->primary==0) r = toku_db_get_noassociate(db, txn, key, data, flags);
......@@ -1266,6 +1321,7 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
}
static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
int r2;
DBC *dbc;
......@@ -1284,7 +1340,8 @@ static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_
}
static int toku_db_key_range(DB * db, DB_TXN * txn, DBT * dbt, DB_KEY_RANGE * kr, u_int32_t flags) {
db=db; txn=txn; dbt=dbt; kr=kr; flags=flags;
HANDLE_PANICKED_DB(db);
txn=txn; dbt=dbt; kr=kr; flags=flags;
barf();
abort();
}
......@@ -1359,12 +1416,8 @@ static int find_db_file(DB_ENV* dbenv, const char *fname, char** full_name_out)
return 0;
}
// The decision to embedded subdatabases in files is a little bit painful.
// My original design was to simply create another file, but it turns out that we
// have to inherit mode bits and so forth from the first file that was created.
// Other problems may ensue (who is responsible for deleting the file? That's not so bad actually.)
// This suggests that we really need to put the multiple databases into one file.
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db);
// Warning. Should check arguments. Should check return codes on malloc and open and so forth.
int openflags = 0;
......@@ -1506,6 +1559,7 @@ static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondar
}
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
//Cannot put directly into a secondary.
......@@ -1526,6 +1580,7 @@ static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t f
}
static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
int r;
int r2;
char *full_name;
......@@ -1554,6 +1609,7 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3
}
static int toku_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
if (flags!=0) return EINVAL;
char afull[PATH_MAX], cfull[PATH_MAX];
int r;
......@@ -1566,11 +1622,13 @@ static int toku_db_rename(DB * db, const char *namea, const char *nameb, const c
}
static int toku_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_DB(db);
int r = toku_brt_set_bt_compare(db->i->brt, bt_compare);
return r;
}
static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, const DBT *)) {
HANDLE_PANICKED_DB(db);
int r = toku_brt_set_dup_compare(db->i->brt, dup_compare);
return r;
}
......@@ -1580,6 +1638,7 @@ static void toku_db_set_errfile (DB*db, FILE *errfile) {
}
static int toku_db_set_flags(DB * db, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
/* the following matches BDB */
if (db_opened(db) && flags != 0) return EINVAL;
......@@ -1599,6 +1658,7 @@ static int toku_db_set_flags(DB * db, u_int32_t flags) {
}
static int toku_db_get_flags(DB *db, u_int32_t *pflags) {
HANDLE_PANICKED_DB(db);
if (!pflags) return EINVAL;
u_int32_t tflags;
u_int32_t flags = 0;
......@@ -1618,12 +1678,14 @@ static int toku_db_get_flags(DB *db, u_int32_t *pflags) {
}
static int toku_db_set_pagesize(DB *db, u_int32_t pagesize) {
HANDLE_PANICKED_DB(db);
int r = toku_brt_set_nodesize(db->i->brt, pagesize);
return r;
}
static int toku_db_stat(DB * db, void *v, u_int32_t flags) {
db=db; v=v; flags=flags;
HANDLE_PANICKED_DB(db);
v=v; flags=flags;
barf();
abort();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment