Commit 4bbfe55a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

set_lg_max is ipmlemented, tested, and documented. Fixes #79.

git-svn-id: file:///svn/tokudb@2645 c7de825b-a66e-492c-adef-691d508d4ae1
parent 424184e8
MANPAGES = tdb_create tdb_del tdb_put tdb_open MANPAGES = tdb_create tdb_del tdb_put tdb_open tdb_log_max
MANPAGES_TEXI = $(patsubst %,%.texi,$(MANPAGES)) MANPAGES_TEXI = $(patsubst %,%.texi,$(MANPAGES))
MANPAGES_POD = $(patsubst %,%.pod,$(MANPAGES)) MANPAGES_POD = $(patsubst %,%.pod,$(MANPAGES))
MANPAGES_3 = $(patsubst %,%.3,$(MANPAGES)) MANPAGES_3 = $(patsubst %,%.3,$(MANPAGES))
......
@page
@section @code{DB_ENV->set_lg_max}
@setfilename tokudb
@settitle DB_ENV->set_lg_max
@c man title db_del tokudb
@unnumberedsubsec Synopsis
@c man begin SYNOPSIS
@code{#include <db.h>}
@noindent
@code{int DB_ENV->set_lg_max(DB_ENV *}@var{env}@code{, u_int32_t }@var{lg_max}@code{);}
@code{int DB_ENV->get_lg_max(DB_ENV *}@var{env}@code{, u_int32_t*}@var{lg_max_p}@code{);}
@c man end
@unnumberedsubsec Description
@c man begin DESCRIPTION
Set or get the maximum size, in bytes, of any given log file.
When logging is configured, the default maximum log size is 100MiB.
It is possible that a log file will be larger than @var{lg_max}: The
logs comprise log entries, and TokuDB always writes a complete log
entry into a log file. Thus if a log entry is larger than
@var{lg_max}, then the resulting log file could be larger.
You may call @code{DB_ENV->set_log_max} at any time on any environment
that has been created but hasn't yet been closed. Subsequently
written log files will be smaller than the specified size.
@c man end
@unnumberedsubsec Parameters
@c man begin PARAMETERS
@table @var
@item env
The @code{DB_ENV} handle.
@item lg_max
For @code{DB_ENV->set_log_max}, the new maximum logfile size, in bytes.
@item lg_max_p
For @code{DB_ENV->get_log_max}, the return result will be stored in @code{*}@var{lg_max_p}.
@end table
@c man end
@unnumberedsubsec Return Value
@c man begin RETURNVALUE
Returns zero on success.
@c man end
@include everyman.texi
...@@ -41,6 +41,8 @@ Copyright @copyright{} 2007, Tokutek, Inc. ...@@ -41,6 +41,8 @@ Copyright @copyright{} 2007, Tokutek, Inc.
@include tdb_put.texi @include tdb_put.texi
@include tdb_log_max.texi
@node Index @node Index
@unnumbered Index @unnumbered Index
......
...@@ -30,10 +30,9 @@ typedef struct weakstrong { char ignore; } *WS; ...@@ -30,10 +30,9 @@ typedef struct weakstrong { char ignore; } *WS;
extern long long n_items_malloced; extern long long n_items_malloced;
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
static void verify_local_fingerprint_nonleaf (BRTNODE node); static void verify_local_fingerprint_nonleaf (BRTNODE node);
#ifdef FOO #ifdef FOO
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
/* Frees a node, including all the stuff in the hash table. */ /* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) { void toku_brtnode_free (BRTNODE *nodep) {
...@@ -319,9 +318,119 @@ static int split_leaf_node (BRT t, TOKULOGGER logger, BRTNODE node, int *n_new_n ...@@ -319,9 +318,119 @@ static int split_leaf_node (BRT t, TOKULOGGER logger, BRTNODE node, int *n_new_n
*splitks = result_splitks; *splitks = result_splitks;
return 0; return 0;
} }
#endif
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
}
static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p);
// If CHILD is too wide, split it, and create a new node with the new children. Unpin CHILD or the new children (even if something goes wrong).
// If it does split, unpin the new root node also.
static int maybe_split_root(BRT brt, BRTNODE child, CACHEKEY *rootp, TOKULOGGER logger);
// if CHILD is too wide, split it, and fix up NODE. Either way, unpin the child or resulting children (even if it fails do the unpin)
static int maybe_split_nonroot (BRT brt, BRTNODE node, int childnum, BRTNODE child, int *n_children_replacing_child, TOKULOGGER logger);
// Push stuff into a child weakly. (That is don't cause any I/O or cause the child to get too big.)
static int weak_push_to_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger) {
void *child_v;
int r = toku_cachetable_maybe_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v);
if (r!=0) return 0;
BRTNODE child = child_v;
DBT key,val;
BRT_CMD_S cmd;
while (0 == toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r = brtnode_put(brt, child, &cmd, logger, WEAK);
if (r==EAGAIN) break;
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
}
return unpin_brtnode(brt, child);
died:
unpin_brtnode(brt, child);
return r;
}
// If the buffers are too big, push stuff down. The subchild may need to be split, in which case our fanout may get too large.
// When are done, this node is has little enough stuff in its buffers (but the fanout may be too large), and all the descendant
// nodes are properly sized (the buffer sizes and fanouts are all small enough).
static int push_down_if_buffers_too_full(BRT brt, BRTNODE node, TOKULOGGER logger) {
if (node->height==0) return 0; // can't push down for leaf nodes
while (node->u.n.n_bytes_in_buffers > 0 && toku_serialize_brtnode_size(node)>node->nodesize) {
int childnum;
find_heaviest_child(node, &childnum);
void *child_v;
int r = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE child=child_v;
if (0) { died: unpin_brtnode(brt, child); return r; }
BRT_CMD_S cmd;
DBT key,val;
while (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
assert(r==0); // we just did a peek, so the buffer must be nonempty
r=brtnode_put(brt, child, &cmd, logger, WEAK);
if (r!=EAGAIN && r!=0) goto died;
if (r==EAGAIN) {
// Weak pushes ran out of steam. Now do a strong push if there is still something in the buffer.
if (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=brtnode_put(brt, child, &cmd, logger, STRONG);
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
// Now it's possible that the child must be split. (Or maybe the child managed to flush stuff to our grandchildren)
int n_children_replacing_child;
r=maybe_split_nonroot(brt, node, childnum, child, &n_children_replacing_child, logger);
if (r!=0) return r; // don't go to died since that unpins
int i;
for (i=0; i<n_children_replacing_child; i++) {
r=weak_push_to_child(brt, node, childnum+i, logger);
if (r!=0) return r;
}
// we basically pushed as much as we could to that child
}
}
}
}
return 0;
}
/* Side effect: sets splitk->data pointer to a malloc'd value */ /* Side effect: sets splitk->data pointer to a malloc'd value */
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) { static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) {
int old_n_children = node->u.n.n_children; int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2; int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a; int n_children_in_b = old_n_children-n_children_in_a;
...@@ -430,7 +539,6 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node ...@@ -430,7 +539,6 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
verify_local_fingerprint_nonleaf(B); verify_local_fingerprint_nonleaf(B);
} }
*nodea = node;
*nodeb = B; *nodeb = B;
assert(toku_serialize_brtnode_size(node)<node->nodesize); assert(toku_serialize_brtnode_size(node)<node->nodesize);
...@@ -438,167 +546,67 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node ...@@ -438,167 +546,67 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
return 0; return 0;
} }
#endif static int nonleaf_node_is_too_wide (BRTNODE node) {
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0);
int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->u.n.n_children>0);
for (i=1; i<node->u.n.n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i);
if (0) printf(" %d", this_weight);
if (max_weight < this_weight) {
max_child = i;
max_weight = this_weight;
}
}
*childnum = max_child;
if (0) printf("\n");
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_which_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
assert(node->height>0); assert(node->height>0);
for (i=0; i<node->u.n.n_children-1; i++) { return node->u.n.n_children > TREE_FANOUT;
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
return i;
}
return node->u.n.n_children-1;
} }
static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS weak_p);
// If CHILD is too wide, split it, and create a new node with the new children. Unpin CHILD or the new children (even if something goes wrong).
// If it does split, unpin the new root node also.
static int maybe_split_root(BRT brt, BRTNODE child, CACHEKEY *rootp, TOKULOGGER logger);
// if CHILD is too wide, split it, and fix up NODE. Either way, unpin the child or resulting children (even if it fails do the unpin)
static int maybe_split_nonroot (BRT brt, BRTNODE node, int childnum, BRTNODE child, int *n_children_replacing_child, TOKULOGGER logger);
// Push stuff into a child weakly. (That is don't cause any I/O or cause the child to get too big.)
static int weak_push_to_child (BRT brt, BRTNODE node, int childnum, TOKULOGGER logger) {
void *child_v;
int r = toku_cachetable_maybe_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v);
if (r!=0) return 0;
BRTNODE child = child_v;
DBT key,val;
BRT_CMD_S cmd;
while (0 == toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r = brtnode_put(brt, child, &cmd, logger, WEAK);
if (r==EAGAIN) break;
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
}
return unpin_brtnode(brt, child);
died:
unpin_brtnode(brt, child);
return r;
}
// If the buffers are too big, push stuff down. The subchild may need to be split, in which case our fanout may get too large.
// When are done, this node is has little enough stuff in its buffers (but the fanout may be too large), and all the descendant
// nodes are properly sized (the buffer sizes and fanouts are all small enough).
static int push_down_if_buffers_too_full(BRT brt, BRTNODE node, TOKULOGGER logger) {
if (node->height==0) return 0; // can't push down for leaf nodes
while (node->u.n.n_bytes_in_buffers > 0 && toku_serialize_brtnode_size(node)>node->nodesize) {
int childnum;
find_heaviest_child(node, &childnum);
void *child_v;
int r = toku_cachetable_get_and_pin(brt->cf, BNC_DISKOFF(node, childnum), &child_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE child=child_v;
if (0) { died: unpin_brtnode(brt, child); return r; }
BRT_CMD_S cmd;
DBT key,val;
while (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
assert(r==0); // we just did a peek, so the buffer must be nonempty
r=brtnode_put(brt, child, &cmd, logger, WEAK);
if (r!=EAGAIN && r!=0) goto died;
if (r==EAGAIN) {
// Weak pushes ran out of steam. Now do a strong push if there is still something in the buffer.
if (0==toku_fifo_peek_cmdstruct(BNC_BUFFER(node, childnum), &cmd, &key, &val)) {
r=brtnode_put(brt, child, &cmd, logger, STRONG);
if (r!=0) goto died;
r=toku_fifo_deq(BNC_BUFFER(node, childnum));
if (r!=0) goto died;
// Now it's possible that the child must be split. (Or maybe the child managed to flush stuff to our grandchildren)
int n_children_replacing_child;
r=maybe_split_nonroot(brt, node, childnum, child, &n_children_replacing_child, logger);
if (r!=0) return r; // don't go to died since that unpins
int i;
for (i=0; i<n_children_replacing_child; i++) {
r=weak_push_to_child(brt, node, childnum+i, logger);
if (r!=0) return r;
}
// we basically pushed as much as we could to that child
}
}
}
}
return 0;
}
static int split_nonleaf_node(BRT brt, BRTNODE node_to_split, int *n_new_nodes, BRTNODE **new_nodes, DBT **splitks);
static int nonleaf_node_is_too_wide (BRT, BRTNODE);
static int maybe_fixup_fat_child(BRT brt, BRTNODE node, int childnum, BRTNODE child, TOKULOGGER logger) // If the node is too big then deal with it. Unpin the child (or children if it splits) NODE may be too big at the end static int maybe_fixup_fat_child(BRT brt, BRTNODE node, int childnum, BRTNODE child, TOKULOGGER logger) // If the node is too big then deal with it. Unpin the child (or children if it splits) NODE may be too big at the end
{ {
int r = push_down_if_buffers_too_full(brt, child, logger); int r = push_down_if_buffers_too_full(brt, child, logger);
if (r!=0) return r; if (r!=0) return r;
// now the child may have too much fanout. // now the child may have too much fanout.
if (child->height>0) { if (child->height>0) {
if (nonleaf_node_is_too_wide(brt, child)) { if (nonleaf_node_is_too_wide(child)) {
int n_new_nodes; BRTNODE *new_nodes; DBT *splitks; BRTNODE newchild;
if ((r=split_nonleaf_node(brt, child, &n_new_nodes, &new_nodes, &splitks))) return r; DBT splitk;
int i; if ((r=brt_nonleaf_split(brt, child, &newchild, &splitk, logger))) return r;
int old_n_children = node->u.n.n_children; int old_n_children = node->u.n.n_children;
FIFO old_fifo = BNC_BUFFER(node, childnum); FIFO old_fifo = BNC_BUFFER(node, childnum);
REALLOC_N(old_n_children+n_new_nodes-1, node->u.n.childinfos);
// slide the children over // slide the children over
for (i=old_n_children-1; i>childnum; i--) REALLOC_N(old_n_children+1, node->u.n.childinfos);
node->u.n.childinfos[i+n_new_nodes-1] = node->u.n.childinfos[i]; memmove(&node->u.n.childinfos[childnum+1], &node->u.n.childinfos[childnum+2], (old_n_children-childnum-1)*sizeof(node->u.n.childinfos[0]));
// fill in the new children // fill in the new children
for (; i<childnum+n_new_nodes-1; i++) { {
node->u.n.childinfos[i] = (struct brtnode_nonleaf_childinfo) { .subtree_fingerprint = 0, struct brtnode_nonleaf_childinfo *ci = &node->u.n.childinfos[childnum+1];
.diskoff = new_nodes[i-childnum]->thisnodename, ci->subtree_fingerprint = 0;
.n_bytes_in_buffer = 0 }; ci->diskoff = newchild->thisnodename;
r=toku_fifo_create(&BNC_BUFFER(node, i)); ci->n_bytes_in_buffer = 0;
r=toku_fifo_create(&ci->buffer);
if (r!=0) return r;
} }
// replace the fifo in the old child
r=toku_fifo_create(&BNC_BUFFER(node, childnum));
if (r!=0) return r;
// slide the keys over // slide the keys over
node->u.n.childkeys = toku_realloc(node->u.n.childkeys, (old_n_children+n_new_nodes-2 ) * sizeof(node->u.n.childkeys[0])); REALLOC_N(old_n_children, node->u.n.childkeys);
for (i=node->u.n.n_children; i>=childnum; i--) { memmove(&node->u.n.childkeys[childnum], &node->u.n.childkeys[childnum+1], (old_n_children-childnum-1)*sizeof(node->u.n.childkeys[0]));
node->u.n.childkeys[i+n_new_nodes-1] = node->u.n.childkeys[i]; {
struct kv_pair *pivot = splitk.data;
BYTESTRING bs = { .len = splitk.size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
node->u.n.childkeys[childnum] = pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(brt, pivot);
} }
node->u.n.n_children++;
// fix up fingerprints // fix up fingerprints
for (i=0; i<n_new_nodes; i++) { fixup_child_fingerprint(node, childnum, child, brt, logger);
fixup_child_fingerprint(node, childnum+i, new_nodes[i], brt, logger); fixup_child_fingerprint(node, childnum+1, newchild, brt, logger);
} // now everything in the fifo must be put again into one of the two fifos
toku_free(new_nodes);
// now everything in the fifos must be put again
BRT_CMD_S cmd; BRT_CMD_S cmd;
DBT key,val; DBT key,val;
while (0==toku_fifo_peek_deq_cmdstruct(old_fifo, &cmd, &key, &val)) { while (0==toku_fifo_peek_deq_cmdstruct(old_fifo, &cmd, &key, &val)) {
for (i=childnum; i<childnum+n_new_nodes-1; i++) { int cmp = brt_compare_pivot(brt, cmd.u.id.key, 0, node->u.n.childkeys[childnum]);
int cmp = brt_compare_pivot(brt, cmd.u.id.key, 0, node->u.n.childkeys[i]);
if (cmp<=0) { if (cmp<=0) {
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, i), &cmd); r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum), &cmd);
if (r!=0) return r; if (r!=0) return r;
if (cmd.type!=BRT_DELETE || 0==(brt->flags&TOKU_DB_DUPSORT)) goto filled; // we only need to put one in
}
} }
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, i), &cmd); if (cmp==0 && cmd.type==BRT_DELETE && brt->flags&TOKU_DB_DUPSORT) {
r=toku_fifo_enq_cmdstruct(BNC_BUFFER(node, childnum+1), &cmd);
if (r!=0) return r; if (r!=0) return r;
filled: /*nothing*/; }
} }
toku_fifo_free(&old_fifo); toku_fifo_free(&old_fifo);
if (r!=0) return r; if (r!=0) return r;
...@@ -767,7 +775,6 @@ static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS ...@@ -767,7 +775,6 @@ static int brtnode_put (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, WS
return brt_nonleaf_put(t, node, cmd, logger, weak_p); return brt_nonleaf_put(t, node, cmd, logger, weak_p);
} }
} }
#ifdef FOO
static void verify_local_fingerprint_nonleaf (BRTNODE node) { static void verify_local_fingerprint_nonleaf (BRTNODE node) {
u_int32_t fp=0; u_int32_t fp=0;
...@@ -781,6 +788,7 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) { ...@@ -781,6 +788,7 @@ static void verify_local_fingerprint_nonleaf (BRTNODE node) {
assert(fp==node->local_fingerprint); assert(fp==node->local_fingerprint);
} }
#ifdef FOO
static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) { static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger) {
int r; int r;
TAGMALLOC(BRTNODE, node); TAGMALLOC(BRTNODE, node);
......
...@@ -22,6 +22,7 @@ struct tokulogger { ...@@ -22,6 +22,7 @@ struct tokulogger {
int n_in_buf; int n_in_buf;
CACHETABLE ct; CACHETABLE ct;
struct list live_txns; // just a linked list. Should be a hashtable. struct list live_txns; // just a linked list. Should be a hashtable.
int lg_max; // The size of the single file in the log. Default is 100MB in TokuDB
}; };
int toku_logger_find_next_unused_log_file(const char *directory, long long *result); int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......
...@@ -69,6 +69,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -69,6 +69,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
if (result==0) return errno; if (result==0) return errno;
result->is_open=0; result->is_open=0;
result->is_panicked=0; result->is_panicked=0;
result->lg_max = 100<<20; // 100MB default
list_init(&result->live_txns); list_init(&result->live_txns);
*resultp=result; *resultp=result;
return 0; return 0;
...@@ -116,14 +117,30 @@ int toku_logger_is_open(TOKULOGGER logger) { ...@@ -116,14 +117,30 @@ int toku_logger_is_open(TOKULOGGER logger) {
return logger->is_open; return logger->is_open;
} }
static int flush (TOKULOGGER logger) { int toku_logger_set_lg_max(TOKULOGGER logger, u_int32_t lg_max) {
if (logger==0) return EINVAL; // no logger
if (logger->is_panicked) return EINVAL;
if (logger->is_open) return EINVAL;
if (lg_max>(1<<30)) return EINVAL; // too big
logger->lg_max = lg_max;
return 0;
}
int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) {
if (logger==0) return EINVAL; // no logger
if (logger->is_panicked) return EINVAL;
*lg_maxp = logger->lg_max;
return 0;
}
static int flush (TOKULOGGER logger, int close_p) {
if (logger->n_in_buf>0) { if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf); int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno; if (r==-1) return errno;
logger->n_in_file += logger->n_in_buf; logger->n_in_file += logger->n_in_buf;
logger->n_in_buf=0; logger->n_in_buf=0;
} }
if (logger->n_in_file > 100<<20) { if (close_p || logger->n_in_file >= logger->lg_max) {
int r = close(logger->fd); int r = close(logger->fd);
if (r!=0) return errno; if (r!=0) return errno;
logger->fd=-1; logger->fd=-1;
...@@ -149,17 +166,18 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) { ...@@ -149,17 +166,18 @@ int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
int r = write(logger->fd, "tokulogg", 8); if (r!=8) return errno; int r = write(logger->fd, "tokulogg", 8); if (r!=8) return errno;
r = write(logger->fd, &version_l, 4); if (r!=4) return errno; r = write(logger->fd, &version_l, 4); if (r!=4) return errno;
} }
if (logger->n_in_buf + nbytes > LOGGER_BUF_SIZE) { if (logger->n_in_buf + nbytes > LOGGER_BUF_SIZE
printf("flushing %d %d\n", logger->n_in_buf, logger->n_in_file); || logger->n_in_file + logger->n_in_buf + nbytes > logger->lg_max) {
int r=flush(logger); //printf("flushing %d %d\n", logger->n_in_buf, logger->n_in_file);
int r=flush(logger, 1);
if (r!=0) return r; if (r!=0) return r;
if (nbytes>LOGGER_BUF_SIZE) { if (nbytes>LOGGER_BUF_SIZE) {
r = write(logger->fd, bytes, nbytes); r = write(logger->fd, bytes, nbytes);
if (r!=0) return errno; if (r!=0) return errno;
logger->n_in_file = nbytes; logger->n_in_file = nbytes;
return flush(logger); return flush(logger, 0);
} }
printf("saving %d\n", nbytes); //printf("saving %d\n", nbytes);
} }
memcpy(logger->buf+logger->n_in_buf, bytes, nbytes); memcpy(logger->buf+logger->n_in_buf, bytes, nbytes);
logger->n_in_buf += nbytes; logger->n_in_buf += nbytes;
...@@ -202,9 +220,8 @@ n ...@@ -202,9 +220,8 @@ n
int toku_logger_fsync (TOKULOGGER logger) { int toku_logger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN //return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
if (logger->is_panicked) return EINVAL; if (logger->is_panicked) return EINVAL;
int r=flush(logger); int r=flush(logger, 0);
if (r!=0) return r; if (r!=0) return r;
if (logger->fd>=0) { if (logger->fd>=0) {
r = fsync(logger->fd); r = fsync(logger->fd);
......
...@@ -20,6 +20,9 @@ int toku_logger_panicked(TOKULOGGER /*logger*/); ...@@ -20,6 +20,9 @@ int toku_logger_panicked(TOKULOGGER /*logger*/);
int toku_logger_is_open(TOKULOGGER); int toku_logger_is_open(TOKULOGGER);
LSN toku_logger_last_lsn(TOKULOGGER); LSN toku_logger_last_lsn(TOKULOGGER);
int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair); int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair);
int toku_logger_commit (TOKUTXN txn, int no_sync); int toku_logger_commit (TOKUTXN txn, int no_sync);
......
...@@ -462,8 +462,12 @@ static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) { ...@@ -462,8 +462,12 @@ static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) {
static int toku_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { static int toku_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
lg_max=lg_max; return toku_logger_set_lg_max(env->i->logger, lg_max);
return toku_ydb_do_error(env, EINVAL, "TokuDB does not (yet) support set_lg_max\n"); }
static int toku_env_get_lg_max(DB_ENV * env, u_int32_t *lg_maxp) {
HANDLE_PANICKED_ENV(env);
return toku_logger_get_lg_max(env->i->logger, lg_maxp);
} }
static int toku_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { static int toku_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
...@@ -589,6 +593,10 @@ static int locked_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) { ...@@ -589,6 +593,10 @@ static int locked_env_set_lg_max(DB_ENV * env, u_int32_t lg_max) {
toku_ydb_lock(); int r = toku_env_set_lg_max(env, lg_max); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_env_set_lg_max(env, lg_max); toku_ydb_unlock(); return r;
} }
static int locked_env_get_lg_max(DB_ENV * env, u_int32_t *lg_maxp) {
toku_ydb_lock(); int r = toku_env_get_lg_max(env, lg_maxp); toku_ydb_unlock(); return r;
}
static int locked_env_set_lk_detect(DB_ENV * env, u_int32_t detect) { static int locked_env_set_lk_detect(DB_ENV * env, u_int32_t detect) {
toku_ydb_lock(); int r = toku_env_set_lk_detect(env, detect); toku_ydb_unlock(); return r; toku_ydb_lock(); int r = toku_env_set_lk_detect(env, detect); toku_ydb_unlock(); return r;
} }
...@@ -633,6 +641,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -633,6 +641,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->set_lg_bsize = locked_env_set_lg_bsize; result->set_lg_bsize = locked_env_set_lg_bsize;
result->set_lg_dir = locked_env_set_lg_dir; result->set_lg_dir = locked_env_set_lg_dir;
result->set_lg_max = locked_env_set_lg_max; result->set_lg_max = locked_env_set_lg_max;
result->get_lg_max = locked_env_get_lg_max;
result->set_lk_max_locks = locked_env_set_lk_max_locks; result->set_lk_max_locks = locked_env_set_lk_max_locks;
result->get_lk_max_locks = locked_env_get_lk_max_locks; result->get_lk_max_locks = locked_env_get_lk_max_locks;
result->set_cachesize = locked_env_set_cachesize; result->set_cachesize = locked_env_set_cachesize;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment