Commit 7cf66e7c authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Buffer the reading of the rollback file. Gains 5% on large transactions. Fixes #1002.

git-svn-id: file:///svn/tokudb@5005 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0fe485a6
......@@ -50,9 +50,10 @@ BINS= brtdump \
build default: bins libs tdb-recover tdb_logprint $(TEST_OFILES)
cd tests;$(MAKE) build
# Put crc first to make it work right
# Put crc first to make -combine work right
BRT_SOURCES = \
crc \
bread \
brt-serialize \
brt-verify \
brt \
......@@ -88,7 +89,7 @@ tdb_logprint: $(OFILES)
recover.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h
tdb-recover: $(OFILES)
roll.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h omt.h
roll.o: log_header.h log-internal.h log.h brttypes.h kv-pair.h memory.h key.h cachetable.h omt.h bread.h
log_code.o: log_header.h wbuf.h log-internal.h rbuf.h
log_header.h: log_code.c
......
/* Buffered read. */
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include "bread.h"
#include "memory.h"
struct bread {
off_t current_offset; // The current offset to be read (in the file). That offset includes anything that is unread in the buffer.
int fd;
size_t bufsize;
char *buf; // A buffer of size bufsize;
int bufoff; // The current offset buf.
};
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize) {
BREAD MALLOC(result);
result->current_offset=filesize;
result->fd=fd;
result->bufoff=0;
result->bufsize=bufsize;
MALLOC_N(bufsize, result->buf);
return result;
}
int close_bread_without_closing_fd(BREAD br) {
toku_free(br->buf);
toku_free(br);
return 0;
}
ssize_t bread_backwards(BREAD br, void *vbuf, size_t nbytes) {
char *buf=vbuf;
ssize_t result=0;
while (nbytes > 0) {
assert(br->current_offset >= (off_t)nbytes);
// read whatever we can out of the buffer.
{
size_t to_copy = ((size_t)br->bufoff >= nbytes) ? nbytes : (size_t)br->bufoff;
memcpy(buf+nbytes-to_copy, &br->buf[br->bufoff-to_copy], to_copy);
nbytes -= to_copy;
br->current_offset -= to_copy;
result += to_copy;
br->bufoff -= to_copy;
}
if (nbytes>0) {
assert(br->bufoff==0);
size_t to_read = ((size_t)br->current_offset >= br->bufsize) ? br->bufsize : (size_t)br->current_offset;
ssize_t r = pread(br->fd, br->buf, to_read, br->current_offset-to_read);
assert(r==(ssize_t)to_read);
br->bufoff = to_read;
}
}
return result;
}
int bread_has_more(BREAD br) {
return br->current_offset>0;
}
#ifndef BREAD_H
#define BREAD_H
// A BREAD reads a file backwards using buffered I/O. BREAD stands for Buffered Read or Backwards Read.
// Conceivably, we could read forward too.
// The buffered I/O is buffered using a large buffer (e.g., something like a megabyte).
// If not for the large-buffer requirement, we could have used a FILE.
#include <sys/types.h>
typedef struct bread *BREAD;
BREAD create_bread_from_fd_initialize_at(int fd, off_t filesize, size_t bufsize);
// Effect: Given a file descriptor, fd, that points at a file of size filesize, create a BREAD.
// Requires: The filesize must be correct. The fd must be an open fd.
int close_bread_without_closing_fd(BREAD);
// Effect: Close the BREAD, but don't close the underlying fd.
ssize_t bread_backwards(BREAD, void *buf, size_t nbytes);
// Read nbytes into buf, reading backwards.
int bread_has_more(BREAD);
// Is there more to read?
#endif
......@@ -148,4 +148,4 @@ static inline char *fixup_fname(BYTESTRING *f) {
return fname;
}
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at);
int toku_read_rollback_backwards(BREAD, struct roll_entry **item);
......@@ -985,18 +985,15 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn) {
return 0;
}
int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off_t *new_at) {
if (at==0) return -1;
int toku_read_rollback_backwards(BREAD br, struct roll_entry **item) {
u_int32_t nbytes_n; ssize_t sr;
if ((sr=pread(fd, &nbytes_n, 4, at-4))!=4) { assert(sr<0); return errno; }
if ((sr=bread_backwards(br, &nbytes_n, 4))!=4) { assert(sr<0); return errno; }
u_int32_t n_bytes=ntohl(nbytes_n);
assert(at>=n_bytes);
unsigned char *buf = toku_malloc(n_bytes);
if (buf==0) return errno;
if ((sr=pread(fd, buf, n_bytes, at-n_bytes))!=(ssize_t)n_bytes) { assert(sr<0); return errno; }
if ((sr=bread_backwards(br, buf, n_bytes-4))!=(ssize_t)n_bytes-4) { assert(sr<0); return errno; }
int r = toku_parse_rollback(buf, n_bytes, item);
if (r!=0) return r;
(*new_at) -= n_bytes;
toku_free(buf);
return 0;
}
......
......@@ -8,6 +8,7 @@
#include "../include/db.h"
#include "brttypes.h"
#include "memory.h"
#include "bread.h"
struct logbytes;
struct logbytes {
......
......@@ -12,6 +12,7 @@
#include "log-internal.h"
#include "cachetable.h"
#include "key.h"
#include "bread.h"
// these flags control whether or not we send commit messages for
// various operations
......@@ -124,27 +125,34 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
}
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
while (filesize>0) {
int r;
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
int r=0;
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
if (r!=0) { return r; }
r = toku_read_rollback_backwards(f, &item);
if (r!=0) goto finish;
r = toku_commit_rollback_item(txn, item);
if (r!=0) { return r; }
if (r!=0) goto finish;
}
return 0;
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
return r;
}
int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) {
while (filesize>0) {
int r;
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
assert(f);
int r=0;
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(fd, filesize, &item, &filesize);
if (r!=0) { return r; }
r = toku_read_rollback_backwards(f, &item);
if (r!=0) goto finish;
r = toku_abort_rollback_item(txn, item);
if (r!=0) { return r; }
if (r!=0) goto finish;
}
return 0;
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
return r;
}
int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
......
......@@ -42,6 +42,7 @@ CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE -D_XOPEN_SOURCE=500 -I.
# Put these one-per-line so that if we insert a new one the svn diff can understand it better.
# Also keep them sorted.
REGRESSION_TESTS = \
bread-test \
brt-serialize-test \
brt-test \
brt-test-cursor \
......
#include <arpa/inet.h>
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include "../bread.h"
#define FNAME "bread-test.data"
#define RECORDS 2
void test (int seed) {
srandom(seed);
unlink(FNAME);
int i;
char buf[RECORDS][100];
int sizes[RECORDS];
int sizesn[RECORDS];
size_t off = 0;
{
int fd = creat(FNAME, 0777);
assert(fd>=0);
for (i=0; i<RECORDS; i++) {
sizes[i] = random()%100;
sizesn[i] = htonl(sizes[i]);
int j;
for (j=0; j<sizes[i]; j++) {
buf[i][j]=random();
}
int r = write(fd, buf[i], sizes[i]);
assert(r==sizes[i]);
off+=r;
r = write(fd, &sizesn[i], 4);
assert(r==4);
off+=4;
}
{ int r=close(fd); assert(r==0); }
}
int fd = open(FNAME, O_RDONLY); assert(fd>=0);
// Now read it all backward
BREAD br = create_bread_from_fd_initialize_at(fd, off, 50);
while (bread_has_more(br)) {
assert(i>0);
i--;
int sizen;
{ int r = bread_backwards(br, &sizen, 4); assert(r==4); }
int sizeh=ntohl(sizen);
assert(sizeh==sizes[i]);
assert(0<=sizeh && sizeh<100);
{
char rbuf[100];
int r = bread_backwards(br, rbuf,sizeh);
assert(r==sizeh);
assert(memcmp(rbuf, &buf[i][0], sizes[i])==0);
}
}
assert(i==0);
{ int r=close_bread_without_closing_fd(br); assert(r==0); }
{ int r=close(fd); assert(r==0); }
unlink(FNAME);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
int i;
for (i=0; i<10; i++) test(i);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment