Commit 891fe61d authored by Rich Prohaska's avatar Rich Prohaska

add big ydb lock. addresses #7

git-svn-id: file:///svn/tokudb@1670 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5832ee01
...@@ -4,7 +4,9 @@ ...@@ -4,7 +4,9 @@
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <sys/types.h> #include <sys/types.h>
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500 #define _XOPEN_SOURCE 500
#endif
#define _FILE_OFFSET_BITS 64 #define _FILE_OFFSET_BITS 64
typedef struct brt *BRT; typedef struct brt *BRT;
......
...@@ -6,7 +6,7 @@ LIBNAME=libdb ...@@ -6,7 +6,7 @@ LIBNAME=libdb
# OPTFLAGS = -O2 # OPTFLAGS = -O2
CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS) CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS)
CPPFLAGS = -I../include -I../newbrt CPPFLAGS = -I../include -I../newbrt
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
ifeq ($(OSX),OSX) ifeq ($(OSX),OSX)
......
...@@ -32,6 +32,44 @@ struct __toku_db_internal { ...@@ -32,6 +32,44 @@ struct __toku_db_internal {
int associate_is_immutable; // If this DB is a secondary then this field indicates that the index never changes due to updates. int associate_is_immutable; // If this DB is a secondary then this field indicates that the index never changes due to updates.
}; };
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
typedef void (*toku_env_errcall_t)(const char *, char *);
#elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
#else
#error
#endif
struct __toku_db_env_internal {
int is_panicked;
int ref_count;
u_int32_t open_flags;
int open_mode;
toku_env_errcall_t errcall;
void *errfile;
const char *errpfx;
char *dir; /* A malloc'd copy of the directory. */
char *tmp_dir;
char *lg_dir;
char **data_dirs;
u_int32_t n_data_dirs;
//void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size;
CACHETABLE cachetable;
TOKULOGGER logger;
};
struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn;
DB_TXN *parent;
};
struct __toku_dbc_internal {
BRT_CURSOR c;
DB_TXN *txn;
};
typedef struct __toku_lock_tree { typedef struct __toku_lock_tree {
DB* db; DB* db;
//Some Red Black tree //Some Red Black tree
...@@ -868,4 +906,6 @@ DUPSORT db ...@@ -868,4 +906,6 @@ DUPSORT db
+++++ +++++
*/ */
#endif #endif
int rfp = 0;
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
...@@ -19,6 +21,7 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights ...@@ -19,6 +21,7 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights
#include <ctype.h> #include <ctype.h>
#include <unistd.h> #include <unistd.h>
#include <libgen.h> #include <libgen.h>
#include <pthread.h>
#include "ydb-internal.h" #include "ydb-internal.h"
...@@ -27,41 +30,85 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights ...@@ -27,41 +30,85 @@ const char *toku_copyright_string = "Copyright (c) 2007 Tokutek Inc. All rights
#include "log.h" #include "log.h"
#include "memory.h" #include "memory.h"
struct __toku_db_txn_internal { /* the ydb big lock serializes access to the tokudb
//TXNID txnid64; /* A sixty-four bit txn id. */ every call (including methods) into the tokudb library gets the lock
TOKUTXN tokutxn; no internal function should invoke a method through an object */
DB_TXN *parent;
}; static pthread_mutex_t ydb_big_lock = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
static inline void ydb_lock() {
int r = pthread_mutex_lock(&ydb_big_lock); assert(r == 0);
}
static inline void ydb_unlock() {
int r = pthread_mutex_unlock(&ydb_big_lock); assert(r == 0);
}
/* the ydb reference is used to cleanup the library when there are no more references to it */
static int toku_ydb_refs = 0;
static inline void ydb_add_ref() {
++toku_ydb_refs;
}
static inline void ydb_unref() {
assert(toku_ydb_refs > 0);
if (--toku_ydb_refs == 0) {
/* call global destructors */
toku_malloc_cleanup();
}
}
/* env methods */
static int toku_env_close(DB_ENV *env, u_int32_t flags);
static inline void env_add_ref(DB_ENV *env) {
env->i->ref_count += 1;
}
static inline void env_unref(DB_ENV *env) {
assert(env->i->ref_count > 0);
if (--env->i->ref_count == 0)
toku_env_close(env, 0);
}
static inline int env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
}
static int env_is_panicked(DB_ENV *dbenv) {
if (dbenv==0) return 0;
return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger);
}
#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; })
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)
/* db methods */
static inline int db_opened(DB *db) {
return db->i->full_fname != 0;
}
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags);
/* txn methods */
/* cursor methods */
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag);
static int toku_c_del(DBC *c, u_int32_t flags);
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags);
static int toku_c_close(DBC * c);
/* misc */
static char *construct_full_name(const char *dir, const char *fname); static char *construct_full_name(const char *dir, const char *fname);
static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary); static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondary);
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
typedef void (*toku_env_errcall_t)(const char *, char *);
#elif DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
#else
#error
#endif
struct __toku_db_env_internal {
int is_panicked;
int ref_count;
u_int32_t open_flags;
int open_mode;
toku_env_errcall_t errcall;
void *errfile;
const char *errpfx;
char *dir; /* A malloc'd copy of the directory. */
char *tmp_dir;
char *lg_dir;
char **data_dirs;
u_int32_t n_data_dirs;
//void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size;
CACHETABLE cachetable;
TOKULOGGER logger;
};
// If errcall is set, call it with the format string and optionally the stderrstring (if include_stderrstring). The prefix is passed as a separate argument. // If errcall is set, call it with the format string and optionally the stderrstring (if include_stderrstring). The prefix is passed as a separate argument.
// If errfile is set, print to the errfile: prefix, fmt string, maybe include the stderr string. // If errfile is set, print to the errfile: prefix, fmt string, maybe include the stderr string.
...@@ -96,14 +143,6 @@ void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrst ...@@ -96,14 +143,6 @@ void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrst
} }
} }
static int env_is_panicked(DB_ENV *dbenv) {
if (dbenv==0) return 0;
return dbenv->i->is_panicked || toku_logger_panicked(dbenv->i->logger);
}
#define HANDLE_PANICKED_ENV(env) ({ if (env_is_panicked(env)) return EINVAL; })
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)
// Handle all the error cases (but don't do the default thing.) // Handle all the error cases (but don't do the default thing.)
static int do_error (DB_ENV *dbenv, int error, const char *string, ...) { static int do_error (DB_ENV *dbenv, int error, const char *string, ...) {
if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1; if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1;
...@@ -114,14 +153,6 @@ static int do_error (DB_ENV *dbenv, int error, const char *string, ...) { ...@@ -114,14 +153,6 @@ static int do_error (DB_ENV *dbenv, int error, const char *string, ...) {
return error; return error;
} }
static void toku_db_env_err(const DB_ENV * env, int error, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
toku_do_error_all_cases(env, error, 1, 1, fmt, ap);
va_end(ap);
}
#define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); }) #define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); }) #define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })
#define note() ({ fprintf(svtderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); }) #define note() ({ fprintf(svtderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); })
...@@ -146,43 +177,7 @@ static void print_flags(u_int32_t flags) { ...@@ -146,43 +177,7 @@ static void print_flags(u_int32_t flags) {
} }
#endif #endif
/* TODO make these thread safe */ static int env_parse_config_line(DB_ENV* dbenv, char *command, char *value) {
/* a count of the open env handles */
static int toku_ydb_refs = 0;
static void ydb_add_ref() {
toku_ydb_refs += 1;
}
static void ydb_unref() {
assert(toku_ydb_refs > 0);
toku_ydb_refs -= 1;
if (toku_ydb_refs == 0) {
/* call global destructors */
toku_malloc_cleanup();
}
}
static void db_env_add_ref(DB_ENV *env) {
env->i->ref_count += 1;
}
static void db_env_unref(DB_ENV *env) {
env->i->ref_count -= 1;
if (env->i->ref_count == 0)
env->close(env, 0);
}
static inline int db_env_opened(DB_ENV *env) {
return env->i->cachetable != 0;
}
static inline int db_opened(DB *db) {
return db->i->full_fname != 0;
}
static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) {
int r; int r;
if (!strcmp(command, "set_data_dir")) { if (!strcmp(command, "set_data_dir")) {
...@@ -199,7 +194,7 @@ static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) { ...@@ -199,7 +194,7 @@ static int db_env_parse_config_line(DB_ENV* dbenv, char *command, char *value) {
return r; return r;
} }
static int db_env_read_config(DB_ENV *env) { static int env_read_config(DB_ENV *env) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
const char* config_name = "DB_CONFIG"; const char* config_name = "DB_CONFIG";
char* full_name = NULL; char* full_name = NULL;
...@@ -289,7 +284,7 @@ static int db_env_read_config(DB_ENV *env) { ...@@ -289,7 +284,7 @@ static int db_env_read_config(DB_ENV *env) {
} }
//Parse the line. //Parse the line.
if (strlen(command) == 0 || command[0] == '#') continue; //Ignore Comments. if (strlen(command) == 0 || command[0] == '#') continue; //Ignore Comments.
r = db_env_parse_config_line(env, command, value < end ? value : ""); r = env_parse_config_line(env, command, value < end ? value : "");
if (r != 0) goto parseerror; if (r != 0) goto parseerror;
} }
if (0) { if (0) {
...@@ -307,11 +302,11 @@ static int db_env_read_config(DB_ENV *env) { ...@@ -307,11 +302,11 @@ static int db_env_read_config(DB_ENV *env) {
return r ? r : r2; return r ? r : r2;
} }
static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
int r; int r;
if (db_env_opened(env)) { if (env_opened(env)) {
return do_error(env, EINVAL, "The environment is already open\n"); return do_error(env, EINVAL, "The environment is already open\n");
} }
...@@ -338,11 +333,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int ...@@ -338,11 +333,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
} }
} }
if (!(flags & DB_PRIVATE)) { if (!(flags & DB_PRIVATE)) {
// This means that we don't have to do anything with shared memory.
// And that's good enough for mysql.
return do_error(env, EINVAL, "TokuDB requires DB_PRIVATE when opening an env\n"); return do_error(env, EINVAL, "TokuDB requires DB_PRIVATE when opening an env\n");
} }
...@@ -358,7 +349,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int ...@@ -358,7 +349,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
env->i->dir = NULL; env->i->dir = NULL;
return r; return r;
} }
if ((r = db_env_read_config(env)) != 0) { if ((r = env_read_config(env)) != 0) {
goto died1; goto died1;
} }
...@@ -387,7 +378,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int ...@@ -387,7 +378,7 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
return 0; return 0;
} }
static int toku_db_env_close(DB_ENV * env, u_int32_t flags) { static int toku_env_close(DB_ENV * env, u_int32_t flags) {
// Even if the env is panicedk, try to close as much as we can. // Even if the env is panicedk, try to close as much as we can.
int is_panicked = env_is_panicked(env); int is_panicked = env_is_panicked(env);
int r0=0,r1=0; int r0=0,r1=0;
...@@ -418,21 +409,23 @@ static int toku_db_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -418,21 +409,23 @@ static int toku_db_env_close(DB_ENV * env, u_int32_t flags) {
return 0; return 0;
} }
static int toku_db_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) { static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
env=env; flags=flags; // Suppress compiler warnings. env=env; flags=flags; // Suppress compiler warnings.
*list = NULL; *list = NULL;
return 0; return 0;
} }
static int toku_db_env_log_flush(DB_ENV * env, const DB_LSN * lsn) { static int toku_env_log_flush(DB_ENV * env, const DB_LSN * lsn) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
env=env; lsn=lsn; env=env; lsn=lsn;
barf(); barf();
return 1; return 1;
} }
static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache __attribute__((__unused__))) { static int toku_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t bytes, int ncache) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
if (ncache != 1)
return EINVAL;
u_int64_t cs64 = ((u_int64_t) gbytes << 30) + bytes; u_int64_t cs64 = ((u_int64_t) gbytes << 30) + bytes;
unsigned long cs = cs64; unsigned long cs = cs64;
if (cs64 > cs) if (cs64 > cs)
...@@ -443,7 +436,7 @@ static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t b ...@@ -443,7 +436,7 @@ static int toku_db_env_set_cachesize(DB_ENV * env, u_int32_t gbytes, u_int32_t b
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) { static int toku_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
*gbytes = env->i->cachetable_size >> 30; *gbytes = env->i->cachetable_size >> 30;
*bytes = env->i->cachetable_size & ((1<<30)-1); *bytes = env->i->cachetable_size & ((1<<30)-1);
...@@ -451,16 +444,20 @@ static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t ...@@ -451,16 +444,20 @@ static int toku_db_env_get_cachesize(DB_ENV * env, u_int32_t *gbytes, u_int32_t
return 0; return 0;
} }
static int locked_env_get_cachesize(DB_ENV *env, u_int32_t *gbytes, u_int32_t *bytes, int *ncache) {
ydb_lock(); int r = toku_env_get_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r;
}
#endif #endif
static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) { static int toku_env_set_data_dir(DB_ENV * env, const char *dir) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
u_int32_t i; u_int32_t i;
int r; int r;
char** temp; char** temp;
char* new_dir; char* new_dir;
if (db_env_opened(env) || !dir) { if (env_opened(env) || !dir) {
return do_error(env, EINVAL, "You cannot set the data dir after opening the env\n"); return do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
} }
...@@ -492,19 +489,19 @@ static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) { ...@@ -492,19 +489,19 @@ static int toku_db_env_set_data_dir(DB_ENV * env, const char *dir) {
return 0; return 0;
} }
static void toku_db_env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) { static void toku_env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
env->i->errcall = errcall; env->i->errcall = errcall;
} }
static void toku_db_env_set_errfile(DB_ENV*env, FILE*errfile) { static void toku_env_set_errfile(DB_ENV*env, FILE*errfile) {
env->i->errfile = errfile; env->i->errfile = errfile;
} }
static void toku_db_env_set_errpfx(DB_ENV * env, const char *errpfx) { static void toku_env_set_errpfx(DB_ENV * env, const char *errpfx) {
env->i->errpfx = errpfx; env->i->errpfx = errpfx;
} }
static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
if (flags != 0 && onoff) { if (flags != 0 && onoff) {
return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n"); return do_error(env, EINVAL, "TokuDB does not (yet) support any nonzero ENV flags\n");
...@@ -512,15 +509,15 @@ static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { ...@@ -512,15 +509,15 @@ static int toku_db_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
return 0; return 0;
} }
static int toku_db_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) { static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
bsize=bsize; bsize=bsize;
return do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n"); return do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n");
} }
static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) { static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
if (db_env_opened(env)) { if (env_opened(env)) {
return do_error(env, EINVAL, "Cannot set log dir after opening the env\n"); return do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
} }
...@@ -535,33 +532,37 @@ static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) { ...@@ -535,33 +532,37 @@ static int toku_db_env_set_lg_dir(DB_ENV * env, const char *dir) {
return 0; return 0;
} }
static int toku_db_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { static int toku_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
lg_max=lg_max; lg_max=lg_max;
return do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n"); return do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n");
} }
static int toku_db_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { static int toku_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
detect=detect; detect=detect;
return do_error(env, EINVAL, "TokuDB does not (yet) support set_lk_detect\n"); return do_error(env, EINVAL, "TokuDB does not (yet) support set_lk_detect\n");
} }
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
static int toku_db_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) { static int toku_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
lk_max=lk_max; lk_max=lk_max;
return 0; return 0;
} }
static int locked_env_set_lk_max(DB_ENV * env, u_int32_t lk_max) {
ydb_lock(); int r = toku_env_set_lk_max(env, lk_max); ydb_unlock(); return r;
}
#endif #endif
//void __toku_db_env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) { //void __toku_env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
// env->i->noticecall = noticecall; // env->i->noticecall = noticecall;
//} //}
static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { static int toku_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
if (db_env_opened(env)) { if (env_opened(env)) {
return do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n"); return do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
} }
if (!tmp_dir) { if (!tmp_dir) {
...@@ -573,18 +574,18 @@ static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) { ...@@ -573,18 +574,18 @@ static int toku_db_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
return env->i->tmp_dir ? 0 : ENOMEM; return env->i->tmp_dir ? 0 : ENOMEM;
} }
static int toku_db_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) { static int toku_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
which=which; onoff=onoff; which=which; onoff=onoff;
return 1; return 1;
} }
static int toku_db_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) { static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
env=env; kbyte=kbyte; min=min; flags=flags; env=env; kbyte=kbyte; min=min; flags=flags;
return 0; return 0;
} }
static int toku_db_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) { static int toku_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
statp=statp;flags=flags; statp=statp;flags=flags;
return 1; return 1;
...@@ -599,41 +600,114 @@ void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg ...@@ -599,41 +600,114 @@ void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg
fprintf(stderr, "YDB: %s: %s", errpfx, msg); fprintf(stderr, "YDB: %s: %s", errpfx, msg);
} }
static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); #if _THREAD_SAFE
int db_env_create(DB_ENV ** envp, u_int32_t flags) { static void locked_env_err(const DB_ENV * env, int error, const char *fmt, ...) {
ydb_lock();
va_list ap;
va_start(ap, fmt);
toku_do_error_all_cases(env, error, 1, 1, fmt, ap);
va_end(ap);
ydb_unlock();
}
static int locked_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
ydb_lock(); int r = toku_env_open(env, home, flags, mode); ydb_unlock(); return r;
}
static int locked_env_close(DB_ENV * env, u_int32_t flags) {
ydb_lock(); int r = toku_env_close(env, flags); ydb_unlock(); return r;
}
static int locked_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
ydb_lock(); int r = toku_env_log_archive(env, list, flags); ydb_unlock(); return r;
}
static int locked_env_log_flush(DB_ENV * env, const DB_LSN * lsn) {
ydb_lock(); int r = toku_env_log_flush(env, lsn); ydb_unlock(); return r;
}
static int locked_env_set_cachesize(DB_ENV *env, u_int32_t gbytes, u_int32_t bytes, int ncache) {
ydb_lock(); int r = toku_env_set_cachesize(env, gbytes, bytes, ncache); ydb_unlock(); return r;
}
static int locked_env_set_data_dir(DB_ENV * env, const char *dir) {
ydb_lock(); int r = toku_env_set_data_dir(env, dir); ydb_unlock(); return r;
}
static int locked_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
ydb_lock(); int r = toku_env_set_flags(env, flags, onoff); ydb_unlock(); return r;
}
static int locked_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
ydb_lock(); int r = toku_env_set_lg_bsize(env, bsize); ydb_unlock(); return r;
}
static int locked_env_set_lg_dir(DB_ENV * env, const char *dir) {
ydb_lock(); int r = toku_env_set_lg_dir(env, dir); ydb_unlock(); return r;
}
static int locked_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
ydb_lock(); int r = toku_env_set_lg_max(env, lg_max); ydb_unlock(); return r;
}
static int locked_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
ydb_lock(); int r = toku_env_set_lk_detect(env, detect); ydb_unlock(); return r;
}
static int locked_env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
ydb_lock(); int r = toku_env_set_tmp_dir(env, tmp_dir); ydb_unlock(); return r;
}
static int locked_env_set_verbose(DB_ENV * env, u_int32_t which, int onoff) {
ydb_lock(); int r = toku_env_set_verbose(env, which, onoff); ydb_unlock(); return r;
}
static int locked_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
ydb_lock(); int r = toku_env_txn_checkpoint(env, kbyte, min, flags); ydb_unlock(); return r;
}
static int locked_env_txn_stat(DB_ENV * env, DB_TXN_STAT ** statp, u_int32_t flags) {
ydb_lock(); int r = toku_env_txn_stat(env, statp, flags); ydb_unlock(); return r;
}
static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
#endif
static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
if (flags!=0) return EINVAL; if (flags!=0) return EINVAL;
DB_ENV *MALLOC(result); DB_ENV *MALLOC(result);
if (result == 0) if (result == 0)
return ENOMEM; return ENOMEM;
memset(result, 0, sizeof *result); memset(result, 0, sizeof *result);
result->err = toku_db_env_err; result->err = locked_env_err;
result->open = toku_db_env_open; result->open = locked_env_open;
result->close = toku_db_env_close; result->close = locked_env_close;
result->txn_checkpoint = toku_db_env_txn_checkpoint; result->txn_checkpoint = locked_env_txn_checkpoint;
result->log_flush = toku_db_env_log_flush; result->log_flush = locked_env_log_flush;
result->set_errcall = toku_db_env_set_errcall; result->set_errcall = toku_env_set_errcall;
result->set_errfile = toku_db_env_set_errfile; result->set_errfile = toku_env_set_errfile;
result->set_errpfx = toku_db_env_set_errpfx; result->set_errpfx = toku_env_set_errpfx;
//result->set_noticecall = toku_db_env_set_noticecall; //result->set_noticecall = locked_env_set_noticecall;
result->set_flags = toku_db_env_set_flags; result->set_flags = locked_env_set_flags;
result->set_data_dir = toku_db_env_set_data_dir; result->set_data_dir = locked_env_set_data_dir;
result->set_tmp_dir = toku_db_env_set_tmp_dir; result->set_tmp_dir = locked_env_set_tmp_dir;
result->set_verbose = toku_db_env_set_verbose; result->set_verbose = locked_env_set_verbose;
result->set_lg_bsize = toku_db_env_set_lg_bsize; result->set_lg_bsize = locked_env_set_lg_bsize;
result->set_lg_dir = toku_db_env_set_lg_dir; result->set_lg_dir = locked_env_set_lg_dir;
result->set_lg_max = toku_db_env_set_lg_max; result->set_lg_max = locked_env_set_lg_max;
result->set_cachesize = toku_db_env_set_cachesize; result->set_cachesize = locked_env_set_cachesize;
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
result->get_cachesize = toku_db_env_get_cachesize; result->get_cachesize = locked_env_get_cachesize;
#endif #endif
result->set_lk_detect = toku_db_env_set_lk_detect; result->set_lk_detect = locked_env_set_lk_detect;
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
result->set_lk_max = toku_db_env_set_lk_max; result->set_lk_max = locked_env_set_lk_max;
#endif #endif
result->log_archive = toku_db_env_log_archive; result->log_archive = locked_env_log_archive;
result->txn_stat = toku_db_env_txn_stat; result->txn_stat = locked_env_txn_stat;
result->txn_begin = toku_txn_begin; result->txn_begin = locked_txn_begin;
MALLOC(result->i); MALLOC(result->i);
if (result->i == 0) { if (result->i == 0) {
...@@ -662,7 +736,11 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -662,7 +736,11 @@ int db_env_create(DB_ENV ** envp, u_int32_t flags) {
return 0; return 0;
} }
static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) { int db_env_create(DB_ENV ** envp, u_int32_t flags) {
ydb_lock(); int r = toku_env_create(envp, flags); ydb_unlock(); return r;
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//notef("flags=%d\n", flags); //notef("flags=%d\n", flags);
int r; int r;
...@@ -683,10 +761,11 @@ static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) { ...@@ -683,10 +761,11 @@ static int toku_db_txn_commit(DB_TXN * txn, u_int32_t flags) {
return r; // The txn is no good after the commit. return r; // The txn is no good after the commit.
} }
static u_int32_t toku_db_txn_id(DB_TXN * txn) { static u_int32_t toku_txn_id(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
barf(); barf();
abort(); abort();
return -1;
} }
static TXNID next_txn = 0; static TXNID next_txn = 0;
...@@ -699,7 +778,29 @@ static int toku_txn_abort(DB_TXN * txn) { ...@@ -699,7 +778,29 @@ static int toku_txn_abort(DB_TXN * txn) {
return r; return r;
} }
static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { #if _THREAD_SAFE
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags);
static int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
ydb_lock(); int r = toku_txn_begin(env, stxn, txn, flags); ydb_unlock(); return r;
}
static u_int32_t locked_txn_id(DB_TXN *txn) {
ydb_lock(); u_int32_t r = toku_txn_id(txn); ydb_unlock(); return r;
}
static int locked_txn_commit(DB_TXN *txn, u_int32_t flags) {
ydb_lock(); int r = toku_txn_commit(txn, flags); ydb_unlock(); return r;
}
static int locked_txn_abort(DB_TXN *txn) {
ydb_lock(); int r = toku_txn_abort(txn); ydb_unlock(); return r;
}
#endif
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n"); if (!toku_logger_is_open(env->i->logger)) return do_error(env, EINVAL, "Environment does not have logging enabled\n");
flags=flags; flags=flags;
...@@ -709,9 +810,9 @@ static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t ...@@ -709,9 +810,9 @@ static int toku_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t
memset(result, 0, sizeof *result); memset(result, 0, sizeof *result);
//notef("parent=%p flags=0x%x\n", stxn, flags); //notef("parent=%p flags=0x%x\n", stxn, flags);
result->mgrp = env; result->mgrp = env;
result->abort = toku_txn_abort; result->abort = locked_txn_abort;
result->commit = toku_db_txn_commit; result->commit = locked_txn_commit;
result->id = toku_db_txn_id; result->id = locked_txn_id;
MALLOC(result->i); MALLOC(result->i);
assert(result->i); assert(result->i);
result->i->parent = stxn; result->i->parent = stxn;
...@@ -730,31 +831,31 @@ int txn_commit(DB_TXN * txn, u_int32_t flags) { ...@@ -730,31 +831,31 @@ int txn_commit(DB_TXN * txn, u_int32_t flags) {
#endif #endif
int log_compare(const DB_LSN * a, const DB_LSN * b) { int log_compare(const DB_LSN * a, const DB_LSN * b) {
ydb_lock();
fprintf(stderr, "%s:%d log_compare(%p,%p)\n", __FILE__, __LINE__, a, b); fprintf(stderr, "%s:%d log_compare(%p,%p)\n", __FILE__, __LINE__, a, b);
abort(); abort();
ydb_unlock();
} }
static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) { static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) {
DBC *dbc; DBC *dbc;
int r = secondary->cursor(secondary, txn, &dbc, 0); int r = toku_db_cursor(secondary, txn, &dbc, 0);
if (r!=0) return r; if (r!=0) return r;
DBT key,data; DBT key,data;
r = dbc->c_get(dbc, &key, &data, DB_FIRST); r = toku_c_get(dbc, &key, &data, DB_FIRST);
{ {
int r2=dbc->c_close(dbc); int r2=toku_c_close(dbc);
if (r!=DB_NOTFOUND) { if (r!=DB_NOTFOUND) {
return r2; return r2;
} }
} }
/* Now we know the secondary is empty. */ /* Now we know the secondary is empty. */
r = primary->cursor(primary, txn, &dbc, 0); r = toku_db_cursor(primary, txn, &dbc, 0);
if (r!=0) return r; if (r!=0) return r;
for (r = dbc->c_get(dbc, &key, &data, DB_FIRST); for (r = toku_c_get(dbc, &key, &data, DB_FIRST); r==0; r = toku_c_get(dbc, &key, &data, DB_NEXT)) {
r==0;
r = dbc->c_get(dbc, &key, &data, DB_NEXT)) {
r = do_associated_inserts(txn, &key, &data, secondary); r = do_associated_inserts(txn, &key, &data, secondary);
if (r!=0) { if (r!=0) {
dbc->c_close(dbc); toku_c_close(dbc);
return r; return r;
} }
} }
...@@ -816,7 +917,7 @@ static int toku_db_close(DB * db, u_int32_t flags) { ...@@ -816,7 +917,7 @@ static int toku_db_close(DB * db, u_int32_t flags) {
return r; return r;
// printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db); // printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
int is_panicked = env_is_panicked(db->dbenv); // Even if panicked, let's close as much as we can. int is_panicked = env_is_panicked(db->dbenv); // Even if panicked, let's close as much as we can.
db_env_unref(db->dbenv); env_unref(db->dbenv);
toku_free(db->i->database_name); toku_free(db->i->database_name);
toku_free(db->i->full_fname); toku_free(db->i->full_fname);
toku_free(db->i); toku_free(db->i);
...@@ -826,11 +927,6 @@ static int toku_db_close(DB * db, u_int32_t flags) { ...@@ -826,11 +927,6 @@ static int toku_db_close(DB * db, u_int32_t flags) {
return r; return r;
} }
struct __toku_dbc_internal {
BRT_CURSOR c;
DB_TXN *txn;
};
static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) { static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) {
int r = 0; int r = 0;
DBT idx; DBT idx;
...@@ -851,8 +947,6 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey) ...@@ -851,8 +947,6 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey)
return r; return r;
} }
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static inline int keyeq(DBC *c, DBT *a, DBT *b) { static inline int keyeq(DBC *c, DBT *a, DBT *b) {
DB *db = c->dbp; DB *db = c->dbp;
return db->i->brt->compare_fun(db, a, b) == 0; return db->i->brt->compare_fun(db, a, b) == 0;
...@@ -983,53 +1077,9 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag ...@@ -983,53 +1077,9 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag
return r; return r;
} }
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
int r;
DBC *count_cursor = 0;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC;
if (flags != 0) {
r = EINVAL; goto finish;
}
r = cursor->c_get(cursor, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0) goto finish;
r = cursor->dbp->cursor(cursor->dbp, 0, &count_cursor, 0);
if (r != 0) goto finish;
*count = 0;
r = count_cursor->c_get(count_cursor, &currentkey, &currentval, DB_SET);
if (r != 0) {
r = 0; goto finish; /* success, the current key must be deleted and there are no more */
}
for (;;) {
*count += 1;
r = count_cursor->c_get(count_cursor, &key, &val, DB_NEXT);
if (r != 0) break;
if (!keyeq(count_cursor, &currentkey, &key)) break;
}
r = 0; /* success, we found at least one before the end */
finish:
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
if (count_cursor) {
int rr = count_cursor->c_close(count_cursor); assert(rr == 0);
}
return r;
}
static int toku_c_del_noassociate(DBC * c, u_int32_t flags) { static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(c->dbp); HANDLE_PANICKED_DB(c->dbp);
int r; int r = toku_brt_cursor_delete(c->i->c, flags);
r = toku_brt_cursor_delete(c->i->c, flags);
return r; return r;
} }
...@@ -1080,8 +1130,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) ...@@ -1080,8 +1130,6 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
DB *pdb = db->i->primary; DB *pdb = db->i->primary;
if (!pdb) return EINVAL; //c_pget does not work on a primary. if (!pdb) return EINVAL; //c_pget does not work on a primary.
// If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees. // If data and primary_key are both zeroed, the temporary storage used to fill in data is different in the two cases because they come from different trees.
assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees. assert(db->i->brt!=pdb->i->brt); // Make sure they realy are different trees.
...@@ -1135,7 +1183,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) ...@@ -1135,7 +1183,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
r = toku_c_get_noassociate(c, key, pkey, flag); r = toku_c_get_noassociate(c, key, pkey, flag);
if (r != 0) goto died3; if (r != 0) goto died3;
r = pdb->get(pdb, c->i->txn, pkey, data, 0); r = toku_db_get(pdb, c->i->txn, pkey, data, 0);
if (r == DB_NOTFOUND) goto delete_silently_and_retry; if (r == DB_NOTFOUND) goto delete_silently_and_retry;
if (r != 0) goto died3; if (r != 0) goto died3;
r = verify_secondary_key(db, pkey, data, key); r = verify_secondary_key(db, pkey, data, key);
...@@ -1189,6 +1237,48 @@ static int toku_c_close(DBC * c) { ...@@ -1189,6 +1237,48 @@ static int toku_c_close(DBC * c) {
return r; return r;
} }
static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
int r;
DBC *count_cursor = 0;
DBT currentkey; memset(&currentkey, 0, sizeof currentkey); currentkey.flags = DB_DBT_REALLOC;
DBT currentval; memset(&currentval, 0, sizeof currentval); currentval.flags = DB_DBT_REALLOC;
DBT key; memset(&key, 0, sizeof key); key.flags = DB_DBT_REALLOC;
DBT val; memset(&val, 0, sizeof val); val.flags = DB_DBT_REALLOC;
if (flags != 0) {
r = EINVAL; goto finish;
}
r = toku_c_get(cursor, &currentkey, &currentval, DB_CURRENT+256);
if (r != 0) goto finish;
r = toku_db_cursor(cursor->dbp, 0, &count_cursor, 0);
if (r != 0) goto finish;
*count = 0;
r = toku_c_get(count_cursor, &currentkey, &currentval, DB_SET);
if (r != 0) {
r = 0; goto finish; /* success, the current key must be deleted and there are no more */
}
for (;;) {
*count += 1;
r = toku_c_get(count_cursor, &key, &val, DB_NEXT);
if (r != 0) break;
if (!keyeq(count_cursor, &currentkey, &key)) break;
}
r = 0; /* success, we found at least one before the end */
finish:
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
if (currentkey.data) toku_free(currentkey.data);
if (currentval.data) toku_free(currentval.data);
if (count_cursor) {
int rr = toku_c_close(count_cursor); assert(rr == 0);
}
return r;
}
static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r; int r;
unsigned int brtflags; unsigned int brtflags;
...@@ -1201,10 +1291,10 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, ...@@ -1201,10 +1291,10 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
// We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW // We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
DBC *dbc; DBC *dbc;
r = db->cursor(db, txn, &dbc, 0); r = toku_db_cursor(db, txn, &dbc, 0);
if (r!=0) return r; if (r!=0) return r;
r = toku_c_get_noassociate(dbc, key, data, flags == DB_GET_BOTH ? DB_GET_BOTH : DB_SET); r = toku_c_get_noassociate(dbc, key, data, flags == DB_GET_BOTH ? DB_GET_BOTH : DB_SET);
int r2 = dbc->c_close(dbc); int r2 = toku_c_close(dbc);
if (r!=0) return r; if (r!=0) return r;
return r2; return r2;
} else { } else {
...@@ -1246,13 +1336,13 @@ static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary ...@@ -1246,13 +1336,13 @@ static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary
if ((brtflags & TOKU_DB_DUPSORT) || (brtflags & TOKU_DB_DUP)) { if ((brtflags & TOKU_DB_DUPSORT) || (brtflags & TOKU_DB_DUP)) {
//If the secondary has duplicates we need to use cursor deletes. //If the secondary has duplicates we need to use cursor deletes.
DBC *dbc; DBC *dbc;
r = secondary->cursor(secondary, txn, &dbc, 0); r = toku_db_cursor(secondary, txn, &dbc, 0);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
r = toku_c_get_noassociate(dbc, &idx, key, DB_GET_BOTH); r = toku_c_get_noassociate(dbc, &idx, key, DB_GET_BOTH);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
r = toku_c_del_noassociate(dbc, 0); r = toku_c_del_noassociate(dbc, 0);
cleanup: cleanup:
r2 = dbc->c_close(dbc); r2 = toku_c_close(dbc);
} }
else r = toku_db_del_noassociate(secondary, txn, &idx, DB_DELETE_ANY); else r = toku_db_del_noassociate(secondary, txn, &idx, DB_DELETE_ANY);
if (idx.flags & DB_DBT_APPMALLOC) { if (idx.flags & DB_DBT_APPMALLOC) {
...@@ -1278,9 +1368,8 @@ static int toku_c_del(DBC * c, u_int32_t flags) { ...@@ -1278,9 +1368,8 @@ static int toku_c_del(DBC * c, u_int32_t flags) {
memset(&data, 0, sizeof(data)); memset(&data, 0, sizeof(data));
if (db->i->primary == 0) { if (db->i->primary == 0) {
pdb = db; pdb = db;
r = c->c_get(c, &pkey, &data, DB_CURRENT); r = toku_c_get(c, &pkey, &data, DB_CURRENT);
} } else {
else {
DBT skey; DBT skey;
pdb = db->i->primary; pdb = db->i->primary;
memset(&skey, 0, sizeof(skey)); memset(&skey, 0, sizeof(skey));
...@@ -1364,13 +1453,41 @@ static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) { ...@@ -1364,13 +1453,41 @@ static int toku_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
//Flags must NOT be 0. //Flags must NOT be 0.
else return EINVAL; else return EINVAL;
finish: finish:
//Insert new data with the key we got from c_get. //Insert new data with the key we got from c_get
r = db->put(db, dbc->i->txn, put_key, put_data, DB_YESOVERWRITE); // when doing the put, it should do an overwrite. r = toku_db_put(db, dbc->i->txn, put_key, put_data, DB_YESOVERWRITE); // when doing the put, it should do an overwrite.
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
r = toku_c_get(dbc, get_key, get_data, DB_GET_BOTH); r = toku_c_get(dbc, get_key, get_data, DB_GET_BOTH);
goto cleanup; goto cleanup;
} }
#if _THREAD_SAFE
static int locked_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) {
ydb_lock(); int r = toku_c_pget(c, key, pkey, data, flag); ydb_unlock(); return r;
}
static int locked_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
ydb_lock(); int r = toku_c_get(c, key, data, flag); ydb_unlock(); return r;
}
static int locked_c_close(DBC * c) {
ydb_lock(); int r = toku_c_close(c); ydb_unlock(); return r;
}
static int locked_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
ydb_lock(); int r = toku_c_count(cursor, count, flags); ydb_unlock(); return r;
}
static int locked_c_del(DBC * c, u_int32_t flags) {
ydb_lock(); int r = toku_c_del(c, flags); ydb_unlock(); return r;
}
static int locked_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
ydb_lock(); int r = toku_c_put(dbc, key, data, flags); ydb_unlock(); return r;
}
#endif
static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
if (flags != 0) if (flags != 0)
...@@ -1379,12 +1496,12 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { ...@@ -1379,12 +1496,12 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
if (result == 0) if (result == 0)
return ENOMEM; return ENOMEM;
memset(result, 0, sizeof *result); memset(result, 0, sizeof *result);
result->c_get = toku_c_get; result->c_get = locked_c_get;
result->c_pget = toku_c_pget; result->c_pget = locked_c_pget;
result->c_put = toku_c_put; result->c_put = locked_c_put;
result->c_close = toku_c_close; result->c_close = locked_c_close;
result->c_del = toku_c_del; result->c_del = locked_c_del;
result->c_count = toku_c_count; result->c_count = locked_c_count;
MALLOC(result->i); MALLOC(result->i);
assert(result->i); assert(result->i);
result->dbp = db; result->dbp = db;
...@@ -1395,7 +1512,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) { ...@@ -1395,7 +1512,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
return 0; return 0;
} }
static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
int r; int r;
...@@ -1426,7 +1543,7 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { ...@@ -1426,7 +1543,7 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
while (r==0) { while (r==0) {
r = dbc->c_del(dbc, 0); r = toku_c_del(dbc, 0);
if (r==0) found = TRUE; if (r==0) found = TRUE;
if (r!=0 && r!=DB_KEYEMPTY) goto cleanup; if (r!=0 && r!=DB_KEYEMPTY) goto cleanup;
r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP); r = toku_c_get_noassociate(dbc, key, &data, DB_NEXT_DUP);
...@@ -1437,20 +1554,20 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { ...@@ -1437,20 +1554,20 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
} }
} }
cleanup: cleanup:
r2 = dbc->c_close(dbc); r2 = toku_c_close(dbc);
if (r != 0) return r; if (r != 0) return r;
return r2; return r2;
} }
if (db->i->primary == 0) { if (db->i->primary == 0) {
pdb = db; pdb = db;
r = db->get(db, txn, key, &data, 0); r = toku_db_get(db, txn, key, &data, 0);
pdb_key = key; pdb_key = key;
} }
else { else {
memset(&pkey, 0, sizeof(pkey)); memset(&pkey, 0, sizeof(pkey));
pdb = db->i->primary; pdb = db->i->primary;
r = db->pget(db, txn, key, &pkey, &data, 0); r = toku_db_pget(db, txn, key, &pkey, &data, 0);
pdb_key = &pkey; pdb_key = &pkey;
} }
if (r != 0) return r; if (r != 0) return r;
...@@ -1500,21 +1617,23 @@ static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_ ...@@ -1500,21 +1617,23 @@ static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_
assert(db->i->brt != db->i->primary->i->brt); // Make sure they realy are different trees. assert(db->i->brt != db->i->primary->i->brt); // Make sure they realy are different trees.
assert(db!=db->i->primary); assert(db!=db->i->primary);
r = db->cursor(db, txn, &dbc, 0); r = toku_db_cursor(db, txn, &dbc, 0);
if (r!=0) return r; if (r!=0) return r;
r = dbc->c_pget(dbc, key, pkey, data, DB_SET); r = toku_c_pget(dbc, key, pkey, data, DB_SET);
if (r==DB_KEYEMPTY) r = DB_NOTFOUND; if (r==DB_KEYEMPTY) r = DB_NOTFOUND;
r2 = dbc->c_close(dbc); r2 = toku_c_close(dbc);
if (r!=0) return r; if (r!=0) return r;
return r2; return r2;
} }
#if 0
static int toku_db_key_range(DB * db, DB_TXN * txn, DBT * dbt, DB_KEY_RANGE * kr, u_int32_t flags) { static int toku_db_key_range(DB * db, DB_TXN * txn, DBT * dbt, DB_KEY_RANGE * kr, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
txn=txn; dbt=dbt; kr=kr; flags=flags; txn=txn; dbt=dbt; kr=kr; flags=flags;
barf(); barf();
abort(); abort();
} }
#endif
static int construct_full_name_in_buf(const char *dir, const char *fname, char* full, int length) { static int construct_full_name_in_buf(const char *dir, const char *fname, char* full, int length) {
int l; int l;
...@@ -1760,17 +1879,17 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3 ...@@ -1760,17 +1879,17 @@ static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int3
//TODO: Verify the target db is not open //TODO: Verify the target db is not open
//TODO: Use master database (instead of manual edit) when implemented. //TODO: Use master database (instead of manual edit) when implemented.
if ((r = db->open(db, NULL, fname, dbname, DB_BTREE, 0, 0777)) != 0) goto cleanup; if ((r = toku_db_open(db, NULL, fname, dbname, DB_BTREE, 0, 0777)) != 0) goto cleanup;
r = toku_brt_remove_subdb(db->i->brt, dbname, flags); r = toku_brt_remove_subdb(db->i->brt, dbname, flags);
cleanup: cleanup:
r2 = db->close(db, 0); r2 = toku_db_close(db, 0);
return r ? r : r2; return r ? r : r2;
} }
//TODO: Verify db file not in use. (all dbs in the file must be unused) //TODO: Verify db file not in use. (all dbs in the file must be unused)
r = find_db_file(db->dbenv, fname, &full_name); r = find_db_file(db->dbenv, fname, &full_name);
if (r!=0) return r; if (r!=0) return r;
assert(full_name); assert(full_name);
r2 = db->close(db, 0); r2 = toku_db_close(db, 0);
if (r == 0 && r2 == 0) { if (r == 0 && r2 == 0) {
if (unlink(full_name) != 0) r = errno; if (unlink(full_name) != 0) r = errno;
} }
...@@ -1803,11 +1922,7 @@ static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *, ...@@ -1803,11 +1922,7 @@ static int toku_db_set_dup_compare(DB *db, int (*dup_compare)(DB *, const DBT *,
return r; return r;
} }
static void toku_db_set_errfile (DB*db, FILE *errfile) { static int toku_db_set_flags(DB *db, u_int32_t flags) {
db->dbenv->set_errfile(db->dbenv, errfile);
}
static int toku_db_set_flags(DB * db, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
/* the following matches BDB */ /* the following matches BDB */
...@@ -1853,12 +1968,14 @@ static int toku_db_set_pagesize(DB *db, u_int32_t pagesize) { ...@@ -1853,12 +1968,14 @@ static int toku_db_set_pagesize(DB *db, u_int32_t pagesize) {
return r; return r;
} }
#if 0
static int toku_db_stat(DB * db, void *v, u_int32_t flags) { static int toku_db_stat(DB * db, void *v, u_int32_t flags) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
v=v; flags=flags; v=v; flags=flags;
barf(); barf();
abort(); abort();
} }
#endif
static int toku_db_fd(DB *db, int *fdp) { static int toku_db_fd(DB *db, int *fdp) {
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
...@@ -1866,7 +1983,80 @@ static int toku_db_fd(DB *db, int *fdp) { ...@@ -1866,7 +1983,80 @@ static int toku_db_fd(DB *db, int *fdp) {
return toku_brt_get_fd(db->i->brt, fdp); return toku_brt_get_fd(db->i->brt, fdp);
} }
int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { #if _THREAD_SAFE
static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) {
ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r;
}
static int locked_db_close(DB * db, u_int32_t flags) {
ydb_lock(); int r = toku_db_close(db, flags); ydb_unlock(); return r;
}
static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
ydb_lock(); int r = toku_db_cursor(db, txn, c, flags); ydb_unlock(); return r;
}
static int locked_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
ydb_lock(); int r = toku_db_del(db, txn, key, flags); ydb_unlock(); return r;
}
static int locked_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
ydb_lock(); int r = toku_db_get(db, txn, key, data, flags); ydb_unlock(); return r;
}
static int locked_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags) {
ydb_lock(); int r = toku_db_pget(db, txn, key, pkey, data, flags); ydb_unlock(); return r;
}
static int locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
ydb_lock(); int r = toku_db_open(db, txn, fname, dbname, dbtype, flags, mode); ydb_unlock(); return r;
}
static int locked_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
ydb_lock(); int r = toku_db_put(db, txn, key, data, flags); ydb_unlock(); return r;
}
static int locked_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags) {
ydb_lock(); int r = toku_db_remove(db, fname, dbname, flags); ydb_unlock(); return r;
}
static int locked_db_rename(DB * db, const char *namea, const char *nameb, const char *namec, u_int32_t flags) {
ydb_lock(); int r = toku_db_rename(db, namea, nameb, namec, flags); ydb_unlock(); return r;
}
static int locked_db_set_bt_compare(DB * db, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
ydb_lock(); int r = toku_db_set_bt_compare(db, bt_compare); ydb_unlock(); return r;
}
static int locked_db_set_dup_compare(DB * db, int (*dup_compare) (DB *, const DBT *, const DBT *)) {
ydb_lock(); int r = toku_db_set_dup_compare(db, dup_compare); ydb_unlock(); return r;
}
static void locked_db_set_errfile (DB *db, FILE *errfile) {
db->dbenv->set_errfile(db->dbenv, errfile);
}
static int locked_db_set_flags(DB *db, u_int32_t flags) {
ydb_lock(); int r = toku_db_set_flags(db, flags); ydb_unlock(); return r;
}
static int locked_db_get_flags(DB *db, u_int32_t *flags) {
ydb_lock(); int r = toku_db_get_flags(db, flags); ydb_unlock(); return r;
}
static int locked_db_set_pagesize(DB *db, u_int32_t pagesize) {
ydb_lock(); int r = toku_db_set_pagesize(db, pagesize); ydb_unlock(); return r;
}
static int locked_db_fd(DB *db, int *fdp) {
ydb_lock(); int r = toku_db_fd(db, fdp); ydb_unlock(); return r;
}
#endif
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
int r; int r;
if (flags) return EINVAL; if (flags) return EINVAL;
...@@ -1874,51 +2064,51 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -1874,51 +2064,51 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
/* if the env already exists then add a ref to it /* if the env already exists then add a ref to it
otherwise create one */ otherwise create one */
if (env) { if (env) {
if (!db_env_opened(env)) if (!env_opened(env))
return EINVAL; return EINVAL;
db_env_add_ref(env); env_add_ref(env);
} else { } else {
r = db_env_create(&env, 0); r = toku_env_create(&env, 0);
if (r != 0) if (r != 0)
return r; return r;
r = env->open(env, ".", DB_PRIVATE + DB_INIT_MPOOL, 0); r = toku_env_open(env, ".", DB_PRIVATE + DB_INIT_MPOOL, 0);
if (r != 0) { if (r != 0) {
env->close(env, 0); toku_env_close(env, 0);
return r; return r;
} }
assert(db_env_opened(env)); assert(env_opened(env));
} }
DB *MALLOC(result); DB *MALLOC(result);
if (result == 0) { if (result == 0) {
db_env_unref(env); env_unref(env);
return ENOMEM; return ENOMEM;
} }
memset(result, 0, sizeof *result); memset(result, 0, sizeof *result);
result->dbenv = env; result->dbenv = env;
result->associate = toku_db_associate; result->associate = locked_db_associate;
result->close = toku_db_close; result->close = locked_db_close;
result->cursor = toku_db_cursor; result->cursor = locked_db_cursor;
result->del = toku_db_del; result->del = locked_db_del;
result->get = toku_db_get; result->get = locked_db_get;
result->key_range = toku_db_key_range; // result->key_range = locked_db_key_range;
result->open = toku_db_open; result->open = locked_db_open;
result->pget = toku_db_pget; result->pget = locked_db_pget;
result->put = toku_db_put; result->put = locked_db_put;
result->remove = toku_db_remove; result->remove = locked_db_remove;
result->rename = toku_db_rename; result->rename = locked_db_rename;
result->set_bt_compare = toku_db_set_bt_compare; result->set_bt_compare = locked_db_set_bt_compare;
result->set_dup_compare = toku_db_set_dup_compare; result->set_dup_compare = locked_db_set_dup_compare;
result->set_errfile = toku_db_set_errfile; result->set_errfile = locked_db_set_errfile;
result->set_pagesize = toku_db_set_pagesize; result->set_pagesize = locked_db_set_pagesize;
result->set_flags = toku_db_set_flags; result->set_flags = locked_db_set_flags;
result->get_flags = toku_db_get_flags; result->get_flags = locked_db_get_flags;
result->stat = toku_db_stat; // result->stat = locked_db_stat;
result->fd = toku_db_fd; result->fd = locked_db_fd;
MALLOC(result->i); MALLOC(result->i);
if (result->i == 0) { if (result->i == 0) {
toku_free(result); toku_free(result);
db_env_unref(env); env_unref(env);
return ENOMEM; return ENOMEM;
} }
memset(result->i, 0, sizeof *result->i); memset(result->i, 0, sizeof *result->i);
...@@ -1938,7 +2128,7 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -1938,7 +2128,7 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
if (r != 0) { if (r != 0) {
toku_free(result->i); toku_free(result->i);
toku_free(result); toku_free(result);
db_env_unref(env); env_unref(env);
return ENOMEM; return ENOMEM;
} }
ydb_add_ref(); ydb_add_ref();
...@@ -1946,6 +2136,12 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -1946,6 +2136,12 @@ int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
return 0; return 0;
} }
int db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
ydb_lock(); int r = toku_db_create(db, env, flags); ydb_unlock(); return r;
}
/* need db_strerror_r for multiple threads */
char *db_strerror(int error) { char *db_strerror(int error) {
char *errorstr; char *errorstr;
if (error >= 0) { if (error >= 0) {
...@@ -1958,7 +2154,7 @@ char *db_strerror(int error) { ...@@ -1958,7 +2154,7 @@ char *db_strerror(int error) {
return "Database Bad Format (probably a corrupted database)"; return "Database Bad Format (probably a corrupted database)";
} }
static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of nul-terminated string. static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
errorstr = unknown_result; errorstr = unknown_result;
snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error); snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
return errorstr; return errorstr;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment