Commit a9876849 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge 1032 into 1032b. addresses #1032

git-svn-id: file:///svn/toku/tokudb.1032b@7782 c7de825b-a66e-492c-adef-691d508d4ae1
parent 73510a8f
...@@ -2614,11 +2614,13 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER log ...@@ -2614,11 +2614,13 @@ static int setup_initial_brt_root_node (BRT t, BLOCKNUM blocknum, TOKULOGGER log
} }
// open a file for use by the brt. if the file does not exist, create it. // open a file for use by the brt. if the file does not exist, create it.
static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, int is_create, TOKUTXN txn, int *fdp) { static int brt_open_file(BRT brt, const char *fname, int is_create, int *fdp, BOOL *did_create) {
brt = brt; brt = brt;
mode_t mode = 0777; mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
int r; int r;
int fd = open(fname, O_RDWR | O_BINARY, mode); int fd;
*did_create = FALSE;
fd = open(fname, O_RDWR | O_BINARY, mode);
if (fd==-1) { if (fd==-1) {
r = errno; r = errno;
if (errno == ENOENT) { if (errno == ENOENT) {
...@@ -2629,10 +2631,7 @@ static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, i ...@@ -2629,10 +2631,7 @@ static int brt_open_file(BRT brt, const char *fname, const char *fname_in_env, i
if (fd == -1) { if (fd == -1) {
r = errno; return r; r = errno; return r;
} }
r = toku_logger_log_fcreate(txn, fname_in_env, mode); *did_create = TRUE;
if (r != 0) {
close(fd); return r;
}
} else } else
return r; return r;
} }
...@@ -2760,16 +2759,23 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2760,16 +2759,23 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->txn_that_created = 0; // Uses 0 for no transaction. t->txn_that_created = 0; // Uses 0 for no transaction.
{ {
int fd = -1; int fd = -1;
r = brt_open_file(t, fname, fname_in_env, is_create, txn, &fd); BOOL did_create = FALSE;
r = brt_open_file(t, fname, is_create, &fd, &did_create);
if (r != 0) { if (r != 0) {
t->database_name = 0; goto died0a; t->database_name = 0; goto died0a;
} }
r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env); r=toku_cachetable_openfd(&t->cf, cachetable, fd, fname_in_env);
if (r != 0) goto died0a; if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)); if (did_create) {
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
r = toku_logger_log_fcreate(txn, fname_in_env, toku_cachefile_filenum(t->cf), mode);
if (r != 0) goto died_after_open;
}
r = toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
} }
if (r!=0) { if (r!=0) {
if (0) { died_after_open: toku_cachefile_close(&t->cf, toku_txn_logger(txn)); } died_after_open:
toku_cachefile_close(&t->cf, toku_txn_logger(txn));
t->database_name = 0; t->database_name = 0;
goto died0a; goto died0a;
} }
...@@ -2854,28 +2860,6 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2854,28 +2860,6 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
return 0; return 0;
} }
int toku_brt_reopen(BRT brt, const char *fname, const char *fname_in_env, TOKUTXN txn) {
int r;
// create a new file
int fd = -1;
r = brt_open_file(brt, fname, fname_in_env, TRUE, txn, &fd);
if (r != 0) return r;
// set the cachefile
r = toku_cachefile_set_fd(brt->cf, fd, fname_in_env);
assert(r == 0);
brt->h = 0; // set_fd should close the header
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(brt->cf));
// init the tree header
r = toku_read_brt_header_and_store_in_cachefile(brt->cf, &brt->h);
if (r == -1) {
r = brt_alloc_init_header(brt, NULL, txn);
}
return r;
}
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) { int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags) {
int i; int i;
int found = -1; int found = -1;
......
...@@ -1217,3 +1217,21 @@ void toku_cachefile_set_userdata (CACHEFILE cf, void *userdata, int (*close_user ...@@ -1217,3 +1217,21 @@ void toku_cachefile_set_userdata (CACHEFILE cf, void *userdata, int (*close_user
void *toku_cachefile_get_userdata(CACHEFILE cf) { void *toku_cachefile_get_userdata(CACHEFILE cf) {
return cf->userdata; return cf->userdata;
} }
int toku_cachefile_redirect_nullfd (CACHEFILE cf) {
int null_fd;
struct fileid fileid;
null_fd = open(DEV_NULL_FILE, O_WRONLY+O_BINARY);
assert(null_fd>=0);
os_get_unique_file_id(null_fd, &fileid);
close(cf->fd);
cf->fd = null_fd;
if (cf->fname) {
toku_free(cf->fname);
cf->fname = 0;
}
cachefile_init_filenum(cf, null_fd, NULL, fileid);
return 0;
}
...@@ -148,6 +148,10 @@ int toku_cachefile_fd (CACHEFILE); ...@@ -148,6 +148,10 @@ int toku_cachefile_fd (CACHEFILE);
// Returns: 0 if success, otherwise an error number // Returns: 0 if success, otherwise an error number
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname); int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
// Equivalent to toku_cachefile_set_fd to /dev/null but without
// closing the user data.
int toku_cachefile_redirect_nullfd (CACHEFILE cf);
// Return the logger associated with the cachefile // Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE); TOKULOGGER toku_cachefile_logger (CACHEFILE);
......
...@@ -3,8 +3,6 @@ ...@@ -3,8 +3,6 @@
#include "includes.h" #include "includes.h"
static char dev_null[] = "/dev/null";
void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) { void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) {
return malloc_in_memarena(txn->rollentry_arena, size); return malloc_in_memarena(txn->rollentry_arena, size);
} }
...@@ -139,9 +137,11 @@ static int open_logfile (TOKULOGGER logger) { ...@@ -139,9 +137,11 @@ static int open_logfile (TOKULOGGER logger) {
char fname[fnamelen]; char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number); snprintf(fname, fnamelen, "%s/log%012lld.tokulog", logger->directory, logger->next_log_file_number);
if (logger->write_log_files) { if (logger->write_log_files) {
logger->fd = creat(fname, O_EXCL | 0700); if (logger->fd==-1) return errno; logger->fd = open(fname, O_CREAT+O_WRONLY+O_TRUNC+O_EXCL+O_BINARY, S_IRWXU); if (logger->fd==-1) return errno;
} else { } else {
logger->fd = open(dev_null, O_RDWR+O_BINARY); if (logger->fd==-1) return errno; logger->fd = open(DEV_NULL_FILE, O_WRONLY+O_BINARY);
// printf("%s: %s %d\n", __FUNCTION__, DEV_NULL_FILE, logger->fd); fflush(stdout);
if (logger->fd==-1) return errno;
} }
logger->next_log_file_number++; logger->next_log_file_number++;
int version_l = htonl(log_format_version); int version_l = htonl(log_format_version);
...@@ -510,13 +510,13 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER ...@@ -510,13 +510,13 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
return 0; return 0;
} }
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) { int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, FILENUM filenum, int mode) {
if (txn==0) return 0; if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL; if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = toku_strdup_in_rollback(txn, fname) }; BYTESTRING bs = { .len=strlen(fname), .data = toku_strdup_in_rollback(txn, fname) };
int r = toku_log_fcreate (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, mode); int r = toku_log_fcreate (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), filenum, bs, mode);
if (r!=0) return r; if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs); r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), filenum, bs);
return r; return r;
} }
......
...@@ -41,7 +41,7 @@ int toku_logger_commit (TOKUTXN txn, int no_sync); ...@@ -41,7 +41,7 @@ int toku_logger_commit (TOKUTXN txn, int no_sync);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TOKULOGGER /*logger*/); int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TOKULOGGER /*logger*/);
int toku_logger_log_fcreate (TOKUTXN, const char */*fname*/, int /*mode*/); int toku_logger_log_fcreate (TOKUTXN, const char */*fname*/, FILENUM /*filenum*/, int /*mode*/);
int toku_logger_log_fopen (TOKUTXN, const char * /*fname*/, FILENUM); int toku_logger_log_fopen (TOKUTXN, const char * /*fname*/, FILENUM);
......
...@@ -18,9 +18,7 @@ ...@@ -18,9 +18,7 @@
#include <string.h> #include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#if !defined(TOKU_WINDOWS)
#include <unistd.h> #include <unistd.h>
#endif
typedef struct field { typedef struct field {
char *type; char *type;
...@@ -43,6 +41,7 @@ int logformat_version_number = 0; ...@@ -43,6 +41,7 @@ int logformat_version_number = 0;
const struct logtype rollbacks[] = { const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"TXNID", "xid", 0}, {"fcreate", 'F', FA{{"TXNID", "xid", 0},
{"FILENUM", "filenum", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
NULLFIELD}}, NULLFIELD}},
{"cmdinsert", 'i', FA{{"TXNID", "xid", 0}, {"cmdinsert", 'i', FA{{"TXNID", "xid", 0},
...@@ -92,6 +91,7 @@ const struct logtype logtypes[] = { ...@@ -92,6 +91,7 @@ const struct logtype logtypes[] = {
NULLFIELD}}, NULLFIELD}},
#endif #endif
{"fcreate", 'F', FA{{"TXNID", "txnid", 0}, {"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
{"u_int32_t", "mode", "0%o"}, {"u_int32_t", "mode", "0%o"},
NULLFIELD}}, NULLFIELD}},
...@@ -564,6 +564,8 @@ generate_rollbacks (void) { ...@@ -564,6 +564,8 @@ generate_rollbacks (void) {
const char *codepath = "log_code.c"; const char *codepath = "log_code.c";
const char *headerpath = "log_header.h"; const char *headerpath = "log_header.h";
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) { int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
chmod(codepath, S_IRUSR|S_IWUSR);
chmod(headerpath, S_IRUSR|S_IWUSR);
unlink(codepath); unlink(codepath);
unlink(headerpath); unlink(headerpath);
cf = fopen(codepath, "w"); assert(cf!=0); cf = fopen(codepath, "w"); assert(cf!=0);
......
...@@ -65,7 +65,7 @@ create_dir_from_file (const char *fname) { ...@@ -65,7 +65,7 @@ create_dir_from_file (const char *fname) {
if (i>0) { if (i>0) {
tmp[i]=0; tmp[i]=0;
mode_t oldu = umask(0); mode_t oldu = umask(0);
int r = mkdir(tmp, 0700); int r = os_mkdir(tmp, S_IRWXU);
if (r!=0 && errno!=EEXIST) { if (r!=0 && errno!=EEXIST) {
printf("error: %s\n", strerror(errno)); printf("error: %s\n", strerror(errno));
} }
...@@ -79,10 +79,10 @@ create_dir_from_file (const char *fname) { ...@@ -79,10 +79,10 @@ create_dir_from_file (const char *fname) {
} }
static void static void
toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32_t mode) { toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid), FILENUM UU(filenum), BYTESTRING fname,u_int32_t mode) {
char *fixed_fname = fixup_fname(&fname); char *fixed_fname = fixup_fname(&fname);
create_dir_from_file(fixed_fname); create_dir_from_file(fixed_fname);
int fd = creat(fixed_fname, mode); int fd = open(fixed_fname, O_CREAT+O_TRUNC+O_WRONLY+O_BINARY, mode);
assert(fd>=0); assert(fd>=0);
toku_free(fixed_fname); toku_free(fixed_fname);
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
...@@ -697,17 +697,13 @@ int tokudb_recover(const char *data_dir, const char *log_dir) { ...@@ -697,17 +697,13 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
int lockfd; int lockfd;
{ {
const char fname[] = "/__recoverylock_dont_delete_me";
int namelen=strlen(data_dir); int namelen=strlen(data_dir);
char lockfname[namelen+20]; char lockfname[namelen+sizeof(fname)];
snprintf(lockfname, sizeof(lockfname), "%s/__recoverylock_dont_delete_me", data_dir); snprintf(lockfname, sizeof(lockfname), "%s%s", data_dir, fname);
lockfd = open(lockfname, O_RDWR|O_CREAT|O_BINARY, S_IRUSR | S_IWUSR); lockfd = os_lock_file(lockfname);
if (lockfd<0) { if (lockfd<0) {
printf("Couldn't open %s\n", lockfname);
return errno;
}
r=flock(lockfd, LOCK_EX | LOCK_NB);
if (r!=0) {
printf("Couldn't run recovery because some other process holds the recovery lock %s\n", lockfname); printf("Couldn't run recovery because some other process holds the recovery lock %s\n", lockfname);
return errno; return errno;
} }
...@@ -764,7 +760,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) { ...@@ -764,7 +760,7 @@ int tokudb_recover(const char *data_dir, const char *log_dir) {
} }
toku_free(logfiles); toku_free(logfiles);
r=flock(lockfd, LOCK_UN); r=os_unlock_file(lockfd);
if (r!=0) return errno; if (r!=0) return errno;
r=chdir(org_wd); r=chdir(org_wd);
......
...@@ -11,13 +11,15 @@ ...@@ -11,13 +11,15 @@
#define TOKU_DO_COMMIT_CMD_DELETE 1 #define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1 #define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
int toku_commit_fcreate (TXNID xid __attribute__((__unused__)), int toku_commit_fcreate (TXNID UU(xid),
BYTESTRING bs_fname __attribute__((__unused__)), FILENUM UU(filenum),
TOKUTXN txn __attribute__((__unused__))) { BYTESTRING UU(bs_fname),
TOKUTXN UU(txn)) {
return 0; return 0;
} }
int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)), int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)),
FILENUM filenum,
BYTESTRING bs_fname, BYTESTRING bs_fname,
TOKUTXN txn __attribute__((__unused__))) { TOKUTXN txn __attribute__((__unused__))) {
char *fname = fixup_fname(&bs_fname); char *fname = fixup_fname(&bs_fname);
...@@ -26,7 +28,14 @@ int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)), ...@@ -26,7 +28,14 @@ int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)),
char full_fname[full_len]; char full_fname[full_len];
int l = snprintf(full_fname,full_len, "%s/%s", directory, fname); int l = snprintf(full_fname,full_len, "%s/%s", directory, fname);
assert(l<=full_len); assert(l<=full_len);
int r = unlink(full_fname); //Remove reference to the fd in the cachetable
CACHEFILE cf;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
if (r == 0) {
r = toku_cachefile_redirect_nullfd(cf);
assert(r==0);
}
r = unlink(full_fname);
assert(r==0); assert(r==0);
free(fname); free(fname);
return 0; return 0;
...@@ -157,10 +166,11 @@ int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) { ...@@ -157,10 +166,11 @@ int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
char *fname = fixup_fname(&bs); char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY); int fd = open(fname, O_RDONLY+O_BINARY);
assert(fd>=0); assert(fd>=0);
struct stat statbuf;
r = fstat(fd, &statbuf); int64_t fsize = 0;
r = os_get_file_size(fd, &fsize);
assert(r==0); assert(r==0);
r = toku_commit_fileentries(fd, statbuf.st_size, txn); r = toku_commit_fileentries(fd, fsize, txn);
assert(r==0); assert(r==0);
r = close(fd); r = close(fd);
assert(r==0); assert(r==0);
...@@ -174,10 +184,10 @@ int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) { ...@@ -174,10 +184,10 @@ int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) {
char *fname = fixup_fname(&bs); char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY); int fd = open(fname, O_RDONLY+O_BINARY);
assert(fd>=0); assert(fd>=0);
struct stat statbuf; int64_t fsize = 0;
r = fstat(fd, &statbuf); r = os_get_file_size(fd, &fsize);
assert(r==0); assert(r==0);
r = toku_rollback_fileentries(fd, statbuf.st_size, txn); r = toku_rollback_fileentries(fd, fsize, txn);
assert(r==0); assert(r==0);
r = close(fd); r = close(fd);
assert(r==0); assert(r==0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment