Commit 1c819f40 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the changes from 2499d onto the main line. Fixes #2499. close[t:2499].

{{{
svn merge -r 19523:19895 https://svn.tokutek.com/tokudb/toku/tokudb.2499d
}}}
.


git-svn-id: file:///svn/toku/tokudb@19902 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7309ee35
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include <toku_atomic.h>
#include <unistd.h>
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include <malloc.h>
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ifndef _TOKU_HTONL_H
#define _TOKU_HTONL_H
......@@ -5,7 +7,7 @@
#error
#endif
#if defined(__cplusplus)
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......@@ -20,7 +22,7 @@ static inline uint32_t toku_ntohl(uint32_t i) {
return ntohl(i);
}
#if defined(__cplusplus)
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#if !defined(TOKU_OS_TYPES_H)
#define TOKU_OS_TYPES_H
#if defined(__cplusplus)
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
......@@ -17,11 +19,12 @@ struct fileid {
typedef struct stat toku_struct_stat;
// windows compat
#if !defined(O_BINARY)
#define O_BINARY 0
#endif
#if defined(__cplusplus)
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#define _GNU_SOURCE 1
#include <toku_pthread.h>
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ifndef _TOKU_PTHREAD_H
#define _TOKU_PTHREAD_H
#if defined(__cplusplus)
extern "C" {
#endif
#include <pthread.h>
#include <time.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef pthread_attr_t toku_pthread_attr_t;
typedef pthread_t toku_pthread_t;
typedef pthread_mutexattr_t toku_pthread_mutexattr_t;
......@@ -166,7 +168,7 @@ toku_pthread_setspecific(toku_pthread_key_t key, void *data) {
return pthread_setspecific(key, data);
}
#if defined(__cplusplus)
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ifndef TOKU_STDINT_H
#define TOKU_STDINT_H
......
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#include <stdlib.h>
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ifndef TOKU_TIME_H
#define TOKU_TIME_H
#include <time.h>
#include <sys/time.h>
#ifdef __cplusplus
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
static inline float toku_tdiff (struct timeval *a, struct timeval *b) {
return (a->tv_sec - b->tv_sec) +1e-6*(a->tv_usec - b->tv_usec);
}
#ifdef __cplusplus
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
......
......@@ -66,6 +66,8 @@ BRT_SOURCES = \
merger \
minicron \
omt \
pqueue \
queue \
recover \
roll \
rollback \
......@@ -100,6 +102,15 @@ endif
NEWBRT_O_FILES += brtloader.$(OEXT)
ifeq ($(BRTLOADER),cilk)
brtloader.$(OEXT): brtloader.c
$(CILKPP) -DTOKU_ALLOW_DEPRECATED $(CILKFLAGS) -c $<
endif
ifeq ($(BRTLOADER),cxx)
brtloader.$(OEXT): brtloader.c
$(CXX) -DTOKU_ALLOW_DEPRECATED $(CXXFLAGS) -c $<
endif
$(NEWBRT): $(NEWBRT_O_FILES)
$(NEWBRT_BUNDLE): log_code.c log_header.h
......@@ -158,3 +169,9 @@ clean-local:
# After doing (cd ../src/tests;make test_log5.recover), run these. The files should have no differences.
testdump: brtdump$(BINSUF)
./brtdump ../src/tests/dir.test_log5.c.tdb.recover/foo.db > dump.r && ./brtdump ../src/tests/dir.test_log5.c.tdb/foo.db > dump.$(OEXT) && diff dump.$(OEXT) dump.r
foo:
@echo CILKROOT $(CILKROOT)
@echo CILKPP $(CILKPP)
@echo BRTLOADER $(BRTLOADER)
@echo BDBDIR $(BDBDIR)
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "ule.h"
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef BACKWARD_10_H
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2009-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ident "$Id$"
......
......@@ -3,11 +3,15 @@
#define BLOCK_ALLOCATOR_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#define BLOCK_ALLOCATOR_ALIGNMENT 4096
// How much must be reserved at the beginning for the block?
// The actual header is 8+4+4+8+8_4+8+ the length of the db names + 1 pointer for each root.
......@@ -122,4 +126,8 @@ block_allocator_get_unused_statistics(BLOCK_ALLOCATOR ba, TOKU_DB_FRAGMENTATION
//Requires: report->data_bytes is filled in
//Requires: report->checkpoint_bytes_additional is filled in
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......
......@@ -2,9 +2,13 @@
#ifndef BLOCKTABLE_H
#define BLOCKTABLE_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct block_table *BLOCK_TABLE;
//Needed by tests, brtdump
......@@ -76,5 +80,9 @@ enum {RESERVED_BLOCKNUM_NULL =0,
RESERVED_BLOCKNUM_DESCRIPTOR =2,
RESERVED_BLOCKNUMS};
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
static int brt_root_put_cmd_XY (BRT brt, BRT_MSG *md, TOKUTXN txn) {
......
......@@ -3,7 +3,7 @@
#define BRT_INTERNAL_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "toku_assert.h"
......@@ -19,6 +19,10 @@
#include "block_table.h"
#include "leaflock.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#ifndef BRT_FANOUT
#define BRT_FANOUT 16
#endif
......@@ -361,4 +365,9 @@ int toku_db_badformat(void);
int toku_brt_remove_on_commit(TOKUTXN child, DBT* iname_dbt_p);
int toku_brt_remove_now(CACHETABLE ct, DBT* iname_dbt_p);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef BRT_SEARCH_H
#define BRT_SEARCH_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
enum brt_search_direction_e {
BRT_SEARCH_LEFT = 1, /* search left -> right, finds min xy as defined by the compare function */
BRT_SEARCH_RIGHT = 2, /* search right -> left, finds max xy as defined by the compare function */
......@@ -24,15 +28,19 @@ typedef int (*brt_search_compare_func_t)(struct brt_search */*so*/, DBT */*x*/,
typedef struct brt_search {
brt_search_compare_func_t compare;
enum brt_search_direction_e direction;
DBT *k;
DBT *v;
const DBT *k;
const DBT *v;
void *context;
} brt_search_t;
/* initialize the search compare object */
static inline brt_search_t *brt_search_init(brt_search_t *so, brt_search_compare_func_t compare, enum brt_search_direction_e direction, DBT *k, DBT *v, void *context) {
static inline brt_search_t *brt_search_init(brt_search_t *so, brt_search_compare_func_t compare, enum brt_search_direction_e direction, const DBT *k, const DBT *v, void *context) {
so->compare = compare; so->direction = direction; so->k = k; so->v = v; so->context = context;
return so;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......@@ -187,13 +187,16 @@ enum reactivity { RE_STABLE, RE_FUSIBLE, RE_FISSIBLE };
static enum reactivity
get_leaf_reactivity (BRTNODE node) {
enum reactivity re = RE_STABLE;
assert(node->height==0);
if (node->dirty) {
unsigned int size = toku_serialize_brtnode_size(node);
if (size > node->nodesize && toku_omt_size(node->u.l.buffer) > 1)
return RE_FISSIBLE;
if ((size*4) < node->nodesize && !node->u.l.seqinsert)
return RE_FUSIBLE;
return RE_STABLE;
re = RE_FISSIBLE;
else if ((size*4) < node->nodesize && !node->u.l.seqinsert)
re = RE_FUSIBLE;
}
return re;
}
static enum reactivity
......@@ -4632,18 +4635,18 @@ brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, BRT_GET_CALLBACK_FUNC
return r;
}
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
static inline int compare_kv_xy(BRT brt, const DBT *k, const DBT *v, const DBT *x, const DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
static inline int compare_k_x(BRT brt, const DBT *k, const DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
static inline int compare_v_y(BRT brt, const DBT *v, const DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
......@@ -5272,7 +5275,7 @@ toku_brt_cursor_heaviside(BRT_CURSOR cursor, BRT_GET_STRADDLE_CALLBACK_FUNCTION
{
brt_search_t search; brt_search_init(&search, brt_cursor_compare_heaviside,
wrapper->direction < 0 ? BRT_SEARCH_RIGHT : BRT_SEARCH_LEFT,
(DBT*)toku_dbt_fake,
toku_dbt_fake,
cursor->brt->flags & TOKU_DB_DUPSORT ? (DBT*)toku_dbt_fake : NULL,
wrapper);
......
......@@ -2,7 +2,7 @@
#ifndef BRT_H
#define BRT_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// This must be first to make the 64-bit file mode work right in Linux
......@@ -14,6 +14,10 @@
#include "log.h"
#include "brt-search.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// A callback function is invoked with the key, and the data.
// The pointers (to the bytevecs) must not be modified. The data must be copied out before the callback function returns.
// Note: In the thread-safe version, the brt node remains locked while the callback function runs. So return soon, and don't call the BRT code from the callback function.
......@@ -226,4 +230,8 @@ BOOL toku_brt_is_recovery_logging_suppressed (BRT);
#define TOKU_MULTIPLE_MAIN_THREADS 0
#endif
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the brt_msg,
* which is the ephemeral version of the fifo_msg.
*/
#ifndef TOKU_BRT_MSG_H
#define TOKU_BRT_MSG_H
#ifndef BRT_MSG_H
#define BRT_MSG_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
u_int32_t brt_msg_get_keylen(BRT_MSG brt_msg);
......@@ -33,5 +34,10 @@ void brt_msg_from_dbts(BRT_MSG brt_msg, DBT *key, DBT *val, XIDS xids, enum brt_
#endif
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Tell me the diff between two brt files. */
......@@ -8,7 +8,6 @@
static int dump_data = 1;
static CACHETABLE ct;
static void
......@@ -204,23 +203,17 @@ get_unaligned_uint32(unsigned char *p) {
return *(u_int32_t *)p;
}
#define SUB_BLOCK_XSUM 1
struct sub_block {
u_int32_t compressed_size;
u_int32_t uncompressed_size;
#if SUB_BLOCK_XSUM
u_int32_t xsum;
#endif
};
static void
sub_block_deserialize(struct sub_block *sb, unsigned char *sub_block_header) {
sb->compressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+0));
sb->uncompressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+4));
#if SUB_BLOCK_XSUM
sb->xsum = toku_dtoh32(get_unaligned_uint32(sub_block_header+8));
#endif
}
static void
......@@ -230,21 +223,17 @@ verify_block(unsigned char *cp, u_int64_t size) {
unsigned char *sub_block_header = &cp[node_header];
u_int32_t n_sub_blocks = toku_dtoh32(get_unaligned_uint32(&sub_block_header[0]));
u_int32_t header_length = node_header + n_sub_blocks * sizeof (struct sub_block);
#if SUB_BLOCK_XSUM
header_length += sizeof (u_int32_t); // CRC
#endif
if (header_length > size) {
printf("header length too big: %u\n", header_length);
return;
}
#if SUB_BLOCK_XSUM
u_int32_t header_xsum = x1764_memory(cp, header_length);
u_int32_t expected_xsum = toku_dtoh32(get_unaligned_uint32(&cp[header_length]));
if (header_xsum != expected_xsum) {
printf("header checksum failed: %u %u\n", header_xsum, expected_xsum);
return;
}
#endif
// deserialize the sub block header
struct sub_block sub_block[n_sub_blocks];
......@@ -257,14 +246,10 @@ verify_block(unsigned char *cp, u_int64_t size) {
// verify the sub block header
u_int32_t offset = header_length + 4;
for (u_int32_t i = 0 ; i < n_sub_blocks; i++) {
#if SUB_BLOCK_XSUM
u_int32_t xsum = x1764_memory(cp + offset, sub_block[i].compressed_size);
printf("%u: %u %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size, sub_block[i].xsum);
if (xsum != sub_block[i].xsum)
printf(" fail %u", xsum);
#else
printf("%u: %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size);
#endif
printf("\n");
offset += sub_block[i].compressed_size;
}
......@@ -287,23 +272,6 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
toku_free(vp);
}
#if 0
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t i;
for (i=0; i<size; i++) {
if ((i % 32) == 0)
printf("%"PRIu64": ", offset+i);
printf("%2.2X", vp[i]);
if (((i+1) % 4) == 0)
printf(" ");
if (((i+1) % 32) == 0)
printf("\n");
}
printf("\n");
}
#endif
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t n = size / 32;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_BRTLOADER_INTERNAL_H
#define _TOKU_BRTLOADER_INTERNAL_H
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#include <db.h>
#include "brttypes.h"
#include "brtloader.h"
#include "queue.h"
#include "toku_pthread.h"
#if defined(__cplusplus)
extern "C" {
#endif
/* These functions are exported to allow the tests to compile. */
......@@ -11,8 +23,11 @@ struct file_info {
char *fname;
FILE *file;
u_int64_t n_rows; // how many rows were written into that file
size_t buffer_size;
void *buffer;
};
struct file_infos {
toku_pthread_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos).
int n_files;
int n_files_limit;
struct file_info *file_infos;
......@@ -20,9 +35,79 @@ struct file_infos {
};
typedef struct fidx { int idx; } FIDX;
static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
static int fidx_is_null (const FIDX f) __attribute__((__unused__));
static int fidx_is_null (const FIDX f) { return f.idx==-1; }
int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx);
/* These data structures are used for manipulating a collection of rows in main memory. */
struct row {
size_t off; // the offset in the data array.
int klen,vlen;
};
struct rowset {
size_t n_rows, n_rows_limit;
struct row *rows;
size_t n_bytes, n_bytes_limit;
char *data;
};
int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
struct merge_fileset {
int n_temp_files, n_temp_files_limit;
FIDX *data_fidxs;
};
void init_merge_fileset (struct merge_fileset *fs);
void destroy_merge_fileset (struct merge_fileset *fs);
struct poll_callback_s {
brt_loader_poll_func poll_function;
void *poll_extra;
};
int brt_loader_init_poll_callback(BRTLOADER);
void brt_loader_destroy_poll_callback(BRTLOADER);
void brt_loader_set_poll_function(BRTLOADER, brt_loader_poll_func poll_function, void *poll_extra);
int brt_loader_call_poll_function(BRTLOADER, float progress);
struct error_callback_s {
brt_loader_error_func error_callback;
void *extra;
int did_callback;
int error;
DB *db;
int which_db;
DBT key;
DBT val;
toku_pthread_mutex_t mutex;
int (*set_error_and_callback)(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
BRTLOADER bl;
};
int brt_loader_init_error_callback(BRTLOADER);
void brt_loader_destroy_error_callback(BRTLOADER);
int brt_loader_get_error(BRTLOADER);
void brt_loader_set_error_function(BRTLOADER, brt_loader_error_func error_function, void *extra);
int brt_loader_set_error(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
int brt_loader_call_error_function(BRTLOADER);
int brt_loader_set_error_and_callback(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
struct brtloader_s {
int panic;
......@@ -37,11 +122,18 @@ struct brtloader_s {
const struct descriptor **descriptors; // N of these
const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env).
struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated.
struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread.
QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file.
toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
u_int64_t n_rows; // how many rows have been put?
struct merge_fileset *fs;
const char *temp_file_template;
FIDX fprimary_rows; // the file index (in the file_infos) for the data
u_int64_t fprimary_offset;
CACHETABLE cachetable;
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
struct file_infos file_infos;
......@@ -51,60 +143,77 @@ struct brtloader_s {
// We use an integer so that we can add to the progress using a fetch-and-add instruction.
// These two are set in the close function, and used while running close
int (*poll_function)(void *extra, float progress);
void *poll_extra;
struct error_callback_s error_callback;
struct poll_callback_s poll_callback;
int user_said_stop; // 0 if the poll_function always returned zero. If it ever returns nonzero, then store that value here.
LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in brt headers made by this loader.
};
/* These data structures are used for manipulating a collection of rows in main memory. */
struct row {
size_t off; // the offset in the data array.
int klen,vlen;
};
struct rowset {
size_t n_rows, n_rows_limit;
struct row *rows;
size_t n_bytes, n_bytes_limit;
char *data;
QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
pthread_t *fractal_threads;
BOOL *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
};
int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val);
// Set the number of rows in the loader. Used for test.
void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
int loader_write_row(DBT *key, DBT *val, FIDX data, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
// Get the number of rows in the loader. Used for test.
u_int64_t toku_brt_loader_get_n_rows(BRTLOADER bl);
struct error_callback_s {
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra);
DB *db;
int which_db;
void *extra;
// The data passed into a fractal_thread via pthread_create.
struct fractal_thread_args {
BRTLOADER bl;
const struct descriptor *descriptor;
int fd; // write the brt into tfd.
int progress_allocation;
QUEUE q;
int errno_result; // the final result.
};
int merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
DB *dest_db, brt_compare_func,
struct error_callback_s *,
void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
u_int64_t toku_brt_loader_get_n_rows(BRTLOADER bl);
int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
int which_db, DB *dest_db, brt_compare_func,
BRTLOADER,
struct rowset *);
int mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func, struct error_callback_s *, struct rowset *);
struct merge_fileset {
int n_temp_files, n_temp_files_limit;
FIDX *data_fidxs;
int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int progress_allocation, QUEUE);
#if defined(__cilkplusplus)
extern "Cilk++" {
#endif
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
//int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
#if defined(__cilkplusplus)
};
#endif
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func,
int progress_allocation);
void init_merge_fileset (struct merge_fileset *fs);
void destroy_merge_fileset (struct merge_fileset *fs);
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
const struct descriptor *descriptor,
int fd, // write to here
int progress_allocation,
QUEUE q);
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func,
struct error_callback_s *error_callback, int progress_allocation);
int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, struct error_callback_s *, int progress_allocation);
int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor, int progress_allocation);
int brtloader_init_file_infos (struct file_infos *fi);
void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error);
int brtloader_fi_close (struct file_infos *fi, FIDX idx);
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
int brtloader_fi_unlink (struct file_infos *fi, FIDX idx);
#if defined(__cplusplus)
};
#endif
#endif
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef BRTLOADER_H
#define BRTLOADER_H
#ifndef TOKU_BRT_LOADER_H
#define TOKU_BRT_LOADER_H
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// The loader callbacks are C functions and need to be defined as such
typedef void (*brt_loader_error_func)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra);
typedef int (*brt_loader_poll_func)(void *extra, float progress);
typedef struct brtloader_s *BRTLOADER;
int toku_brt_loader_open (BRTLOADER *bl,
CACHETABLE cachetable,
generate_row_for_put_func g,
......@@ -16,13 +27,20 @@ int toku_brt_loader_open (BRTLOADER *bl,
brt_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template,
LSN load_lsn);
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
int (*poll_callback)(void *extra, float progress), void *poll_callback_extra);
brt_loader_error_func error_callback, void *error_callback_extra,
brt_loader_poll_func poll_callback, void *poll_callback_extra);
int toku_brt_loader_abort(BRTLOADER bl,
BOOL is_error);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // BRTLOADER_H
......@@ -2,7 +2,7 @@
#define BRTTYPES_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <sys/types.h>
......@@ -14,6 +14,10 @@
#include <db.h>
#include <inttypes.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct brt *BRT;
struct brt_header;
struct wbuf;
......@@ -126,5 +130,9 @@ typedef int (*generate_row_for_del_func)(DB *dest_db, DB *src_db, DBT *dest_val,
typedef struct memarena *MEMARENA;
typedef struct rollback_log_node *ROLLBACK_LOG_NODE;
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
......@@ -3,7 +3,7 @@
#define CACHETABLE_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <fcntl.h>
......@@ -11,6 +11,10 @@
#include "workqueue.h"
#include "leaflock.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// TODO: #1398 Get rid of this entire straddle_callback hack
// Man is this ugly.
#ifdef BRT_LEVEL_STRADDLE_CALLBACK_LOGIC_NOT_READY
......@@ -322,5 +326,10 @@ void toku_cachetable_set_env_dir(CACHETABLE ct, char *env_dir);
char * toku_construct_full_name(int count, ...);
char * toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env);
int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[n]);
int toku_cachetable_local_checkpoint_for_commit(CACHETABLE ct, TOKUTXN txn, uint32_t n, CACHEFILE cachefiles[]);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2009-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ident "$Id$"
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ifndef TOKU_CHECKPOINT_H
#define TOKU_CHECKPOINT_H
#ident "Copyright (c) 2009-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ident "$Id$"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
int toku_set_checkpoint_period(CACHETABLE ct, u_int32_t new_period);
u_int32_t toku_get_checkpoint_period(CACHETABLE ct);
//Effect: Change [end checkpoint (n) - begin checkpoint (n+1)] delay to
......@@ -75,3 +82,8 @@ typedef struct {
void toku_checkpoint_get_status(CHECKPOINT_STATUS stat);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef FIFO_H
#define FIFO_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h"
#include "xids-internal.h"
#include "xids.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// If the fifo_entry is unpacked, the compiler aligns the xids array and we waste a lot of space
#if TOKU_WINDOWS
......@@ -74,5 +77,8 @@ int toku_fifo_iterate_internal_has_more(FIFO fifo, int off);
int toku_fifo_iterate_internal_next(FIFO fifo, int off);
struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* Purpose of this file is to define and handle the fifo_msg, which
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the fifo_msg,
......@@ -9,11 +9,12 @@
* NOTE: Accessor functions return all values in host byte order.
*/
#ifndef TOKU_FIFO_MSG_H
#define TOKU_FIFO_MSG_H
#ifndef FIFO_MSG_H
#define FIFO_MSG_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
u_int32_t fifo_msg_get_keylen(FIFO_MSG fifo_msg);
......@@ -33,5 +34,9 @@ u_int32_t fifo_msg_get_size(FIFO_MSG fifo_msg);
// the given brt_msg
u_int32_t fifo_msg_get_size_required(BRT_MSG brt_msg);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......
#ifndef TOKU_HASHFUN_H
#define TOKU_HASHFUN_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// FNV Hash: From an idea sent by Glenn Fowler and Phong Vo to the IEEE POSIX 1003.2 committee. Landon Curt Noll improved it.
// See: http://isthe.com/chongo/tech/comp/fnv/
static inline u_int32_t hash_key_extend(u_int32_t initial_hash,
......@@ -37,3 +45,9 @@ static unsigned int hash_key (const char *key, ITEMLEN keylen) {
return hash;
}
#endif
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// Clip horizontally (100 chars by default)
......
......@@ -2,7 +2,7 @@
#define SYSINCLUDES_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// Portability first!
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef TOKU_KEY_H
#define TOKU_KEY_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "ybt.h"
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
void toku_test_keycompare (void) ;
int toku_builtin_compare_fun (DB *, const DBT *, const DBT*) __attribute__((__visibility__("default")));
int toku_dont_call_this_compare_fun (DB *, const DBT *, const DBT*);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -2,12 +2,16 @@
#define KV_PAIR_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "memory.h"
#include <string.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
/*
* the key value pair contains a key and a value in a contiguous space. the
* key is right after the length fields and the value is right after the key.
......@@ -31,7 +35,7 @@ static inline void kv_pair_init(struct kv_pair *pair, const void *key, unsigned
}
static inline struct kv_pair *kv_pair_malloc(const void *key, unsigned int keylen, const void *val, unsigned int vallen) {
struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
struct kv_pair *pair = (struct kv_pair *) toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair)
kv_pair_init(pair, key, keylen, val, vallen);
return pair;
......@@ -39,7 +43,7 @@ static inline struct kv_pair *kv_pair_malloc(const void *key, unsigned int keyle
/* replace the val, keep the same key */
static inline struct kv_pair *kv_pair_realloc_same_key(struct kv_pair *p, void *newval, unsigned int newvallen) {
struct kv_pair *pair = toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
struct kv_pair *pair = (struct kv_pair *) toku_realloc(p, sizeof (struct kv_pair) + p->keylen + newvallen);
if (pair) {
pair->vallen = newvallen;
memcpy(pair->key + pair->keylen, newval, (size_t)newvallen);
......@@ -75,4 +79,8 @@ static inline unsigned int kv_pair_vallen(const struct kv_pair *pair) {
return pair->vallen;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef LEAFENTRY_H
#define LEAFENTRY_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......@@ -34,6 +34,10 @@
#include "rbuf.h"
#include "x1764.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#if 0
Memory format of packed nodup leaf entry
CONSTANTS:
......@@ -142,5 +146,9 @@ void le_full_promotion(LEAFENTRY le, size_t *new_leafentry_memorysize, size_t *n
//Requires: le is not marked committed
//Requires: The outermost uncommitted xid in le has actually committed (le was not yet updated to reflect that)
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......
......@@ -2,9 +2,13 @@
#ifndef TOKU_LEAFLOCK_H
#define TOKU_LEAFLOCK_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct leaflock *LEAFLOCK;
typedef struct leaflock_pool *LEAFLOCK_POOL;
......@@ -19,5 +23,10 @@ void toku_leaflock_unlock_by_leaf(LEAFLOCK leaflock);
void toku_leaflock_lock_by_cursor(LEAFLOCK leaflock);
void toku_leaflock_unlock_by_cursor(LEAFLOCK leaflock);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ifndef TOKU_LOCK_H
#define TOKU_LOCK_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#if defined(__x86_64) || defined(__i386)
static inline void mfence (void) {
......@@ -54,5 +61,12 @@ static inline void spin_unlock (SPINLOCK v) {
}
#else
#error Need to define architectur-specific stuff for other machines.
#error Need to define architecture-specific stuff for other machines.
#endif
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -2,7 +2,7 @@
#define LOG_INTERNAL_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brt-internal.h"
......@@ -17,6 +17,10 @@
#include <string.h>
#include <dirent.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// Locking for the logger
// For most purposes we use the big ydb lock.
// To log: grab the buf lock
......@@ -194,4 +198,9 @@ static inline char *fixup_fname(BYTESTRING *f) {
return fname;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ifndef TOKULOGGGER_H
#define TOKULOGGGER_H
#ifndef TOKU_LOGGGER_H
#define TOKU_LOGGGER_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......@@ -13,8 +13,13 @@
#include "memory.h"
#include "x1764.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef void(*voidfp)(void *thunk);
typedef void(*YIELDF)(voidfp, void *fpthunk, void *yieldthunk);
struct roll_entry;
#include "logger.h"
......@@ -24,7 +29,7 @@ struct roll_entry;
static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
target->len = val.len;
target->data = toku_memdup(val.data, (size_t)val.len);
target->data = (char *) toku_memdup(val.data, (size_t)val.len);
if (target->data==0) return errno;
return 0;
}
......@@ -33,4 +38,8 @@ static inline void toku_free_FILENUMS(FILENUMS val) { toku_free(val.filenums); }
void toku_set_lsn_increment (uint64_t incr) __attribute__((__visibility__("default")));
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
......@@ -2,11 +2,15 @@
#define TOKULOGCURSOR_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "log_header.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
struct toku_logcursor;
typedef struct toku_logcursor *TOKULOGCURSOR;
......@@ -38,4 +42,8 @@ int toku_logcursor_last(const TOKULOGCURSOR lc, struct log_entry **le);
// return 0 if log exists, ENOENT if no log
int toku_logcursor_log_exists(const TOKULOGCURSOR lc);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // TOKULOGCURSOR_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
......@@ -2,11 +2,15 @@
#define TOKULOGFILEMGR_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "log_header.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// this is the basic information we need to keep per logfile
struct toku_logfile_info {
int64_t index;
......@@ -29,4 +33,9 @@ LSN toku_logfilemgr_get_last_lsn(TOKULOGFILEMGR lfm);
void toku_logfilemgr_update_last_lsn(TOKULOGFILEMGR lfm, LSN lsn);
void toku_logfilemgr_print(TOKULOGFILEMGR lfm);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif //TOKULOGFILEMGR_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* This file defines the logformat in an executable fashion.
......@@ -682,11 +682,12 @@ int main (int argc, const char *const argv[]) {
pf = fopen(printpath, "w"); assert(pf!=0);
fprintf(hf, "#ifndef LOG_HEADER_H\n");
fprintf(hf, "#define LOG_HEADER_H\n");
fprintf2(cf, hf, "/* Do not edit this file. This code generated by logformat.c. Copyright 2007, 2008 Tokutek. */\n");
fprintf2(cf, hf, "#ident \"Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved.\"\n");
fprintf2(cf, hf, "/* Do not edit this file. This code generated by logformat.c. Copyright 2007-2010 Tokutek. */\n");
fprintf2(cf, hf, "#ident \"Copyright (c) 2007-2010 Tokutek Inc. All rights reserved.\"\n");
fprintf2(cf, pf, "#include \"includes.h\"\n");
fprintf(hf, "#include \"brt-internal.h\"\n");
fprintf(hf, "#include \"memarena.h\"\n");
fprintf(hf, "#if defined(__cplusplus)\nextern \"C\" {\n#endif\n");
generate_enum();
generate_log_struct();
generate_dispatch();
......@@ -695,6 +696,7 @@ int main (int argc, const char *const argv[]) {
generate_rollbacks();
generate_log_entry_functions();
generate_logprint();
fprintf(hf, "#if defined(__cplusplus)\n};\n#endif\n");
fprintf(hf, "#endif\n");
{
int r=fclose(hf); assert(r==0);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef TOKULOGGER_H
#define TOKULOGGER_H
#ifndef TOKU_LOGGER_H
#define TOKU_LOGGER_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
enum {
TOKU_LOG_VERSION_1 = 1,
TOKU_LOG_VERSION_2 = 2,
......@@ -162,6 +166,9 @@ typedef struct logger_status {
void toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS s);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef MEMARENA_H
#define MEMARENA_H
#ifndef TOKU_MEMARENA_H
#define TOKU_MEMARENA_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* We have too many memory management tricks:
......@@ -19,6 +20,10 @@
#include <sys/types.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
MEMARENA memarena_create_presized (size_t initial_size);
// Effect: Create a memarena with initial size. In case of ENOMEM, aborts.
......@@ -44,4 +49,8 @@ size_t memarena_total_memory_size (MEMARENA);
size_t memarena_total_size_in_use (MEMARENA);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* An implementation of memory that can be made to return NULL and ENOMEM on certain mallocs. */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "memory.h"
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef _TOKU_MEMPOOL_H
#define _TOKU_MEMPOOL_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* a memory pool is a contiguous region of memory that supports single
......@@ -11,6 +11,10 @@
#include <sys/types.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
struct mempool;
struct mempool {
......@@ -49,4 +53,9 @@ static inline int toku_mempool_inrange(struct mempool *mp, void *vp, size_t size
return (mp->base <= vp) && ((char *)vp + size <= (char *)mp->base + mp->size);
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2009-2010 Tokutek Inc. All rights reserved."
/* See merger.h for a description of this module. */
......
#ifndef TOKU_MERGER_H
#define TOKU_MERGER_H
/* This is a C header (no Cilk or C++ inside here) */
/* The merger abstraction:
......@@ -28,6 +31,11 @@
* This could be an issue if the data was already sorted, so that file[0] is always emptying first, then file[1], and so forth.
*/
#include "db.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct merger *MERGER;
typedef void (*MEMORY_ALLOCATION_UPDATER) (/*in */ size_t currently_using,
/*in */ size_t currently_requested,
......@@ -56,5 +64,9 @@ int merger_pop (MERGER m,
// It is fairly straightforward to keep the key and val "live": In most cases, the buffer is still valid. In the case where the key and val are the last
// item, then we must take care not to reuse the buffer until the next merger_pop.
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include <toku_portability.h>
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "$Id$"
#ifndef TOKU_MINICRON_H
#define TOKU_MINICRON_H
#include <toku_pthread.h>
#include <toku_time.h>
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// Specification:
// A minicron is a miniature cron job for executing a job periodically inside a pthread.
......@@ -37,3 +43,9 @@ int toku_minicron_change_period(struct minicron *p, u_int32_t new_period);
u_int32_t toku_minicron_get_period(struct minicron *p);
int toku_minicron_shutdown(struct minicron *p);
BOOL toku_minicron_has_been_shutdown(struct minicron *p);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......
#if !defined(OMT_H)
#define OMT_H
#if !defined(TOKU_OMT_H)
#define TOKU_OMT_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// Order Maintenance Tree (OMT)
//
// Maintains a collection of totally ordered values, where each value has an integer weight.
......@@ -504,5 +508,9 @@ void toku_omt_cursor_associate(OMT omt, OMTCURSOR c);
// We do not want to grab two locks (one for omt, and one for the old
// associated omt).
#endif /* #ifndef OMT_H */
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif /* #ifndef TOKU_OMT_H */
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brt-internal.h"
#include "brtloader-internal.h"
#include "pqueue.h"
#define pqueue_left(i) ((i) << 1)
#define pqueue_right(i) (((i) << 1) + 1)
#define pqueue_parent(i) ((i) >> 1)
int pqueue_init(pqueue_t **result, size_t n, int which_db, DB *db, brt_compare_func compare, struct error_callback_s *err_callback)
{
pqueue_t *q;
if (!(q = toku_malloc(sizeof(pqueue_t))))
return errno;
/* Need to allocate n+1 elements since element 0 isn't used. */
if (!(q->d = toku_malloc((n + 1) * sizeof(pqueue_node_t *)))) {
int r = errno;
toku_free(q);
return r;
}
q->size = 1;
q->avail = q->step = (n+1); /* see comment above about n+1 */
q->which_db = which_db;
q->db = db;
q->compare = compare;
q->dup_error = 0;
q->error_callback = err_callback;
*result = q;
return 0;
}
void pqueue_free(pqueue_t *q)
{
toku_free(q->d);
toku_free(q);
}
size_t pqueue_size(pqueue_t *q)
{
/* queue element 0 exists but doesn't count since it isn't used. */
return (q->size - 1);
}
static int pqueue_compare(pqueue_t *q, DBT *next_key, DBT *next_val, DBT *curr_key)
{
int r = q->compare(q->db, next_key, curr_key);
if ( r == 0 ) { // duplicate key : next_key == curr_key
q->dup_error = 1;
if (q->error_callback->set_error_and_callback)
q->error_callback->set_error_and_callback(q->error_callback->bl, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
}
return ( r > -1 );
}
static void pqueue_bubble_up(pqueue_t *q, size_t i)
{
size_t parent_node;
pqueue_node_t *moving_node = q->d[i];
DBT *moving_key = moving_node->key;
for (parent_node = pqueue_parent(i);
((i > 1) && pqueue_compare(q, q->d[parent_node]->key, q->d[parent_node]->val, moving_key));
i = parent_node, parent_node = pqueue_parent(i))
{
q->d[i] = q->d[parent_node];
}
q->d[i] = moving_node;
}
static size_t pqueue_maxchild(pqueue_t *q, size_t i)
{
size_t child_node = pqueue_left(i);
if (child_node >= q->size)
return 0;
if ((child_node+1) < q->size &&
pqueue_compare(q, q->d[child_node]->key, q->d[child_node]->val, q->d[child_node+1]->key))
child_node++; /* use right child instead of left */
return child_node;
}
static void pqueue_percolate_down(pqueue_t *q, size_t i)
{
size_t child_node;
pqueue_node_t *moving_node = q->d[i];
DBT *moving_key = moving_node->key;
DBT *moving_val = moving_node->val;
while ((child_node = pqueue_maxchild(q, i)) &&
pqueue_compare(q, moving_key, moving_val, q->d[child_node]->key))
{
q->d[i] = q->d[child_node];
i = child_node;
}
q->d[i] = moving_node;
}
int pqueue_insert(pqueue_t *q, pqueue_node_t *d)
{
size_t i;
if (!q) return 1;
if (q->size >= q->avail) return 1;
/* insert item */
i = q->size++;
q->d[i] = d;
pqueue_bubble_up(q, i);
if ( q->dup_error ) return DB_KEYEXIST;
return 0;
}
int pqueue_pop(pqueue_t *q, pqueue_node_t **d)
{
if (!q || q->size == 1) {
*d = NULL;
return 0;
}
*d = q->d[1];
q->d[1] = q->d[--q->size];
pqueue_percolate_down(q, 1);
if ( q->dup_error ) return DB_KEYEXIST;
return 0;
}
#ifndef TOKU_PQUEUE_H
#define TOKU_PQUEUE_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
typedef struct brt_pqueue_node_t
{
DBT *key;
DBT *val;
int i;
} pqueue_node_t;
typedef struct brt_pqueue_t
{
size_t size;
size_t avail;
size_t step;
int which_db;
DB *db; // needed for compare function
brt_compare_func compare;
pqueue_node_t **d;
int dup_error;
struct error_callback_s *error_callback;
} pqueue_t;
int pqueue_init(pqueue_t **result, size_t n, int which_db, DB *db, brt_compare_func compare, struct error_callback_s *err_callback);
void pqueue_free(pqueue_t *q);
size_t pqueue_size(pqueue_t *q);
int pqueue_insert(pqueue_t *q, pqueue_node_t *d);
int pqueue_pop(pqueue_t *q, pqueue_node_t **d);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif //TOKU_PQUEUE_H
#include <errno.h>
#include <toku_assert.h>
#include "queue.h"
#include "memory.h"
#include "toku_pthread.h"
struct qitem;
struct qitem {
void *item;
struct qitem *next;
u_int64_t weight;
};
struct queue {
u_int64_t contents_weight; // how much stuff is in there?
u_int64_t weight_limit; // Block enqueueing when the contents gets to be bigger than the weight.
struct qitem *head, *tail;
BOOL eof;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
// Representation invariant:
// q->contents_weight is the sum of the weights of everything in the queue.
// q->weight_limit is the limit on the weight before we block.
// q->head is the oldest thing in the queue. q->tail is the newest. (If nothing is in the queue then both are NULL)
// If q->head is not null:
// q->head->item is the oldest item.
// q->head->weight is the weight of that item.
// q->head->next is the next youngest thing.
// q->eof indicates that the producer has said "that's all".
// q->mutex and q->cond are used as condition variables.
int queue_create (QUEUE *q, u_int64_t weight_limit)
{
QUEUE MALLOC(result);
if (result==NULL) return errno;
result->contents_weight = 0;
result->weight_limit = weight_limit;
result->head = NULL;
result->tail = NULL;
result->eof = FALSE;
int r;
r = toku_pthread_mutex_init(&result->mutex, NULL);
if (r!=0) {
toku_free(result);
return r;
}
r = toku_pthread_cond_init(&result->cond, NULL);
if (r!=0) {
toku_pthread_mutex_destroy(&result->mutex);
toku_free(result);
return r;
}
*q = result;
return 0;
}
int queue_destroy (QUEUE q)
{
if (q->head) return EINVAL;
assert(q->contents_weight==0);
{
int r = toku_pthread_mutex_destroy(&q->mutex);
if (r) return r;
}
{
int r = toku_pthread_cond_destroy(&q->cond);
if (r) return r;
}
toku_free(q);
return 0;
}
int queue_enq (QUEUE q, void *item, u_int64_t weight, u_int64_t *total_weight_after_enq)
{
{
int r = toku_pthread_mutex_lock(&q->mutex);
if (r) return r;
}
assert(!q->eof);
// Go ahead and put it in, even if it's too much.
q->contents_weight += weight;
struct qitem *MALLOC(qi);
if (qi==NULL) return errno;
qi->item = item;
qi->weight = weight;
qi->next = NULL;
if (q->tail) {
q->tail->next = qi;
} else {
assert(q->head==NULL);
q->head = qi;
}
q->tail = qi;
// Wake up the consumer.
{
int r = toku_pthread_cond_signal(&q->cond);
if (r) return r;
}
// Now block if there's too much stuff in there.
while (q->weight_limit < q->contents_weight) {
int r = toku_pthread_cond_wait(&q->cond, &q->mutex);
if (r) return r;
}
// we are allowed to return.
if (total_weight_after_enq) {
*total_weight_after_enq = q->contents_weight;
}
{
int r = toku_pthread_mutex_unlock(&q->mutex);
if (r) return r;
}
return 0;
}
int queue_eof (QUEUE q)
{
{
int r = toku_pthread_mutex_lock(&q->mutex);
if (r) return r;
}
assert(!q->eof);
q->eof = TRUE;
{
int r = toku_pthread_cond_signal(&q->cond);
if (r) return r;
}
{
int r = toku_pthread_mutex_unlock(&q->mutex);
if (r) return r;
}
return 0;
}
int queue_deq (QUEUE q, void **item, u_int64_t *weight, u_int64_t *total_weight_after_deq)
{
{
int r = toku_pthread_mutex_lock(&q->mutex);
if (r) return r;
}
int result;
while (q->head==NULL && !q->eof) {
int r = toku_pthread_cond_wait(&q->cond, &q->mutex);
if (r) return r;
}
if (q->head==NULL) {
assert(q->eof);
result = EOF;
} else {
struct qitem *head = q->head;
q->contents_weight -= head->weight;
*item = head->item;
if (weight)
*weight = head->weight;
if (total_weight_after_deq)
*total_weight_after_deq = q->contents_weight;
q->head = head->next;
toku_free(head);
if (q->head==NULL) {
q->tail = NULL;
}
// wake up the producer, since we decreased the contents_weight.
int r = toku_pthread_cond_signal(&q->cond);
if (r!=0) return r;
// Successful result.
result = 0;
}
{
int r = toku_pthread_mutex_unlock(&q->mutex);
if (r) return r;
}
return result;
}
#ifndef TOKU_QUEUE_H
#define TOKU_QUEUE_H
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// The abstraction:
//
// queue.h implements a queue suitable for a producer-consumer relationship between two pthreads.
// The enqueue/dequeue operation is fairly heavyweight (involving pthread condition variables) so it may be useful
// to enqueue large chunks rather than small chunks.
// It probably won't work right to have two consumer threads.
//
// Every item inserted into the queue has a weight. If the weight
// gets too big, then the queue blocks on trying to insert more items.
// The weight can be used to limit the total number of items in the
// queue (weight of each item=1) or the total memory consumed by queue
// items (weight of each item is its size). Or the weight's could all be
// zero for an unlimited queue.
typedef struct queue *QUEUE;
int queue_create (QUEUE *q, u_int64_t weight_limit);
// Effect: Create a queue with a given weight limit. The queue is initially empty.
int queue_enq (QUEUE q, void *item, u_int64_t weight, u_int64_t *total_weight_after_enq);
// Effect: Insert ITEM of weight WEIGHT into queue. If the resulting contents weight too much then block (don't return) until the total weight is low enough.
// If total_weight_after_enq!=NULL then return the current weight of the items in the queue (after finishing blocking on overweight, and after enqueueing the item).
// If successful return 0.
// If an error occurs, return the error number, and the state of the queue is undefined. The item may have been enqueued or not, and in fact the queue may be badly corrupted if the condition variables go awry. If it's just a matter of out-of-memory, then the queue is probably OK.
// Requires: There is only a single consumer. (We wake up the consumer using a pthread_cond_signal (which is suitable only for single consumers.)
int queue_eof (QUEUE q);
// Effect: Inform the queue that no more values will be inserted. After all the values that have been inserted are dequeued, further dequeue operations will return EOF.
// Returns 0 on success. On failure, things are pretty bad (likely to be some sort of mutex failure).
int queue_deq (QUEUE q, void **item, u_int64_t *weight, u_int64_t *total_weight_after_deq);
// Effect: Wait until the queue becomes nonempty. Then dequeue and return the oldest item. The item and its weight are returned in *ITEM.
// If weight!=NULL then return the item's weight in *weight.
// If total_weight_after_deq!=NULL then return the current weight of the items in the queue (after dequeuing the item).
// Return 0 if an item is returned.
// Return EOF is we no more items will be returned.
// Usage note: The queue should be destroyed only after any consumers will no longer look at it (for example, they saw EOF).
int queue_destroy (QUEUE q);
// Effect: Destroy the queue.
// Requires: The queue must be empty and no consumer should try to dequeue after this (one way to do this is to make sure the consumer saw EOF).
// Returns 0 on success. If the queue is not empty, returns EINVAL. Other errors are likely to be bad (some sort of mutex or condvar failure).
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -2,7 +2,7 @@
#define RBUF_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......@@ -12,6 +12,10 @@
#include "memory.h"
#include "toku_htonl.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
struct rbuf {
unsigned char *buf;
unsigned int size;
......@@ -132,7 +136,7 @@ static inline void rbuf_BYTESTRING (struct rbuf *r, BYTESTRING *bs) {
bs->len = rbuf_int(r);
u_int32_t newndone = r->ndone + bs->len;
assert(newndone <= r->size);
bs->data = toku_memdup(&r->buf[r->ndone], (size_t)bs->len);
bs->data = (char *) toku_memdup(&r->buf[r->ndone], (size_t)bs->len);
assert(bs->data);
r->ndone = newndone;
}
......@@ -141,9 +145,13 @@ static inline void rbuf_ma_BYTESTRING (struct rbuf *r, MEMARENA ma, BYTESTRING
bs->len = rbuf_int(r);
u_int32_t newndone = r->ndone + bs->len;
assert(newndone <= r->size);
bs->data = memarena_memdup(ma, &r->buf[r->ndone], (size_t)bs->len);
bs->data = (char *) memarena_memdup(ma, &r->buf[r->ndone], (size_t)bs->len);
assert(bs->data);
r->ndone = newndone;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
......@@ -2,7 +2,7 @@
#define TOKURECOVER_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_portability.h>
......@@ -13,6 +13,10 @@
#include "memory.h"
#include "x1764.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// Run tokudb recovery from the log
// Returns 0 if success
int tokudb_recover (const char *env_dir, const char *log_dir,
......@@ -37,4 +41,8 @@ void toku_recover_set_callback2 (void (*)(void*), void*);
extern int tokudb_recovery_trace;
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // TOKURECOVER_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* rollback and rollforward routines. */
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKUDB_ROLL_H
#define TOKUDB_ROLL_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// these flags control whether or not we send commit messages for
// various operations
......@@ -20,5 +25,9 @@
// for each BRT_DELETE_BOTH message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
......
#ifndef TOKUROLLBACK_H
#define TOKUROLLBACK_H
#ifndef TOKU_ROLLBACK_H
#define TOKU_ROLLBACK_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "omt.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// these routines in rollback.c
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
......@@ -60,5 +64,8 @@ struct rollback_log_node {
size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
};
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif // TOKU_ROLLBACK_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Readers/writers locks implementation
......
......@@ -2,11 +2,15 @@
#ifndef TOKU_RWLOCK_H
#define TOKU_RWLOCK_H
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <toku_assert.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
//Use case:
// A read lock is acquired by threads that get and pin an entry in the
// cachetable. A write lock is acquired by the writer thread when an entry
......@@ -172,5 +176,9 @@ static inline int rwlock_users(RWLOCK rwlock) {
return rwlock->reader + rwlock->want_read + rwlock->writer + rwlock->want_write;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
#ifndef TOKU_SUB_BLOCK_H
#define TOKU_SUB_BLOCK_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
static const int max_sub_blocks = 8;
static const int target_sub_block_size = 512*1024;
......@@ -98,5 +102,8 @@ decompress_worker(void *arg);
int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Recover an env. The logs are in argv[1]. The new database is created in the cwd. */
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
/* Dump the log from stdin to stdout. */
......
......@@ -94,12 +94,31 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL)
check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.brtloader-test
check_brtloader-test-write-dbfile$(BINSUF): EXTRA_ARGS=-n 1000000 dir.brtloader-test-write-dbfile
brtloader-test$(BINSUF): brtloader-test.$(OEXT)
ifeq ($(BRTLOADER),cilk)
$(CILKPP) $(CILKFLAGS) $< -o $@ $(LDFLAGS)
endif
ifeq ($(BRTLOADER),cxx)
$(CXX) $(CXXFLAGS) $< -o $@ $(LDFLAGS)
endif
brtloader-test-write-dbfile$(BINSUF): brtloader-test-write-dbfile.$(OEXT)
ifeq ($(BRTLOADER),cilk)
$(CILKPP) $(CILKFLAGS) $< -o $@ $(LDFLAGS)
endif
ifeq ($(BRTLOADER),cxx)
$(CXX) $(CXXFLAGS) $< -o $@ $(LDFLAGS)
endif
check_%: % $(PTHREAD_LOCAL)
$(VGRIND) ./$< $(VERBVERBOSE) $(EXTRA_ARGS) $(SUMMARIZE_CMD)
benchmark-test.$(OEXT): ../brt.h ../brt-search.h ../../include/db.h
brtloader_test$(BINSUF): ../brtloader-internal.h ../brtloader.o
../brtloader.$(OEXT): ../brtloader.c ../brtloader-internal.h
cd $(@D) && $(MAKE) $(@F)
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: pqueue.c$"
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
// test the loader write dbfile function
#include "includes.h"
#include "test.h"
#include "brtloader-internal.h"
#if defined(__cplusplus)
extern "C" {
#endif
static void traceit(const char *s) {
time_t t = time(NULL);
printf("%.24s %s\n", ctime(&t), s);
fflush(stdout);
}
static int qsort_compare_ints (const void *a, const void *b) {
int avalue = *(int*)a;
int bvalue = *(int*)b;
if (avalue<bvalue) return -1;
if (avalue>bvalue) return +1;
return 0;
}
static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) {
assert(dest_db==NULL);
assert(akey->size==sizeof(int));
assert(bkey->size==sizeof(int));
return qsort_compare_ints(akey->data, bkey->data);
}
static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
fprintf(stderr, "error in test");
abort();
}
static void verify_dbfile(int n, const char *name) {
if (verbose) traceit("verify");
int r;
CACHETABLE ct;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
TOKUTXN const null_txn = NULL;
BRT t = NULL;
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints); assert(r == 0);
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0);
BRT_CURSOR cursor = NULL;
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
int i;
for (i=0; ; i++) {
int kk = i;
int vv = i;
struct check_pair pair = {sizeof kk, &kk, sizeof vv, &vv, 0};
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_NEXT);
if (r != 0) {
assert(pair.call_count ==0);
break;
}
assert(pair.call_count==1);
}
assert(i == n);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
if (verbose) traceit("verify done");
}
static void test_write_dbfile (char *template, int n, char *output_name) {
if (verbose) traceit("test start");
DB *dest_db = NULL;
struct brtloader_s bl = {.panic = 0,
.temp_file_template = template};
int r = brtloader_init_file_infos(&bl.file_infos);
CKERR(r);
struct merge_fileset fs;
init_merge_fileset(&fs);
// put rows in the row set
struct rowset aset;
init_rowset(&aset);
for (int i=0; i<n; i++) {
DBT key = {.size=sizeof i,
.data=&i};
DBT val = {.size=sizeof i,
.data=&i};
add_row(&aset, &key, &val);
}
toku_brt_loader_set_n_rows(&bl, n);
brt_loader_init_error_callback(&bl);
brt_loader_set_error_function(&bl, err_cb, NULL);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
destroy_rowset(&aset);
QUEUE q;
r = queue_create(&q, 0xFFFFFFFF); // infinite queue.
assert(r==0);
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r);
assert(fs.n_temp_files==0);
QUEUE q2;
r = queue_create(&q2, 0xFFFFFFFF); // infinite queue.
assert(r==0);
size_t num_found = 0;
while (1) {
void *v;
r = queue_deq(q, &v, NULL, NULL);
if (r==EOF) break;
struct rowset *rs = (struct rowset *)v;
printf("v=%p\n", v);
for (size_t i=num_found; i<rs->n_rows; i++) {
struct row *row = &rs->rows[i];
assert(row->klen==sizeof(int));
assert(row->vlen==sizeof(int));
assert((int)i==*(int*)(rs->data+row->off));
}
num_found += rs->n_rows;
r = queue_enq(q2, v, 0, NULL);
assert(r==0);
}
assert((int)num_found == n);
r = queue_eof(q2);
assert(r==0);
r = queue_destroy(q);
assert(r==0);
struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}};
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO);
assert(fd>=0);
if (verbose) traceit("write to file");
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
assert(r==0);
r = queue_destroy(q2);
assert(r==0);
destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, FALSE);
// walk a cursor through the dbfile and verify the rows
verify_dbfile(n, output_name);
brt_loader_destroy_error_callback(&bl);
}
/* Test to see if we can open temporary files. */
int test_main (int argc, const char *argv[]) {
const char *progname=argv[0];
int n = 1;
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
verbose=1;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else if (strcmp(argv[0],"-n") == 0) {
argc--; argv++;
n = atoi(argv[0]);
} else if (argc!=1) {
fprintf(stderr, "Usage:\n %s [-v] [-q] directory\n", progname);
exit(1);
}
else {
break;
}
argc--; argv++;
}
assert(argc==1); // argv[1] is the directory in which to do the test.
const char* directory = argv[0];
char unlink_all[strlen(directory)+20];
snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory);
system(unlink_all);
int r = toku_os_mkdir(directory, 0755);
assert(r==0);
int templen = strlen(directory)+15;
char template[templen];
int tlen = snprintf(template, templen, "%s/tempXXXXXX", directory);
assert (tlen>0 && tlen<templen);
char output_name[templen];
int olen = snprintf(output_name, templen, "%s/test.tokudb", directory);
assert (olen>0 && olen<templen);
test_write_dbfile(template, n, output_name);
return 0;
}
#if defined(__cplusplus)
};
#endif
This diff is collapsed.
......@@ -29,6 +29,7 @@ static void test_flat (void) {
permute[ra]=i;
}
for (i=0; i<limit; i++) {
if (verbose) printf("%s:%d %"PRIu64"\n", __FUNCTION__, __LINE__, i);
char key[100],val[100];
u_int64_t ri = permute[i];
snprintf(key, 100, "%08llu", (unsigned long long)2*ri+1);
......@@ -43,6 +44,7 @@ static void test_flat (void) {
u_int64_t prevless=0;
u_int64_t prevgreater=limit;
for (i=0; i<2*limit+1; i++) {
if (verbose) printf("%s:%d %"PRIu64"\n", __FUNCTION__, __LINE__, i);
char key[100];
snprintf(key, 100, "%08llu", (unsigned long long)i);
DBT k;
......
......@@ -62,6 +62,7 @@ NEWBRT_TESTS_RAW = \
minicron-test \
omt-cursor-test \
omt-test \
pqueue-test \
recovery-cbegin \
recovery-cbegin-cend \
recovery-cbegin-cend-hello \
......
This diff is collapsed.
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <toku_assert.h>
#include <toku_pthread.h>
#include "queue.h"
static int verbose=1;
static int count_0 = 0;
static u_int64_t e_max_weight=0, d_max_weight = 0; // max weight seen by enqueue thread and dequeue thread respectively.
static void *start_0 (void *arg) {
QUEUE q = (QUEUE)arg;
void *item;
u_int64_t weight;
long count = 0;
while (1) {
u_int64_t this_max_weight;
int r=queue_deq(q, &item, &weight, &this_max_weight);
if (r==EOF) break;
assert(r==0);
if (this_max_weight>d_max_weight) d_max_weight=this_max_weight;
long v = (long)item;
//printf("D(%ld)=%ld %ld\n", v, this_max_weight, d_max_weight);
assert(v==count);
count_0++;
count++;
}
return NULL;
}
static void enq (QUEUE q, long v, u_int64_t weight) {
u_int64_t this_max_weight;
int r = queue_enq(q, (void*)v, (weight==0)?0:1, &this_max_weight);
assert(r==0);
if (this_max_weight>e_max_weight) e_max_weight=this_max_weight;
//printf("E(%ld)=%ld %ld\n", v, this_max_weight, e_max_weight);
}
static void queue_test_0 (u_int64_t weight)
// Test a queue that can hold WEIGHT items.
{
//printf("\n");
count_0 = 0;
e_max_weight = 0;
d_max_weight = 0;
QUEUE q;
int r;
r = queue_create(&q, weight); assert(r==0);
toku_pthread_t thread;
r = toku_pthread_create(&thread, NULL, start_0, q); assert(r==0);
enq(q, 0L, weight);
enq(q, 1L, weight);
enq(q, 2L, weight);
enq(q, 3L, weight);
sleep(1);
enq(q, 4L, weight);
enq(q, 5L, weight);
r = queue_eof(q); assert(r==0);
void *result;
r = toku_pthread_join(thread, &result); assert(r==0);
assert(result==NULL);
assert(count_0==6);
r = queue_destroy(q);
assert(d_max_weight <= weight);
assert(e_max_weight <= weight);
}
static void parse_args (int argc, const char *argv[]) {
const char *progname=argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
} else {
fprintf(stderr, "Usage:\n %s [-v] [-q]\n", progname);
exit(1);
}
argc--; argv++;
}
if (verbose<0) verbose=0;
}
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
queue_test_0(0LL);
queue_test_0(1LL);
queue_test_0(2LL);
return 0;
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment