Commit 402dbc88 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

test large malloc failures in the brtloader writer. merge from...

test large malloc failures in the brtloader writer. merge from tokudb.2603.extractor.tests. refs[t:2603]

git-svn-id: file:///svn/toku/tokudb@20276 c7de825b-a66e-492c-adef-691d508d4ae1
parent 99eaa861
...@@ -88,12 +88,27 @@ toku_brtloader_set_size_factor(uint32_t factor) { ...@@ -88,12 +88,27 @@ toku_brtloader_set_size_factor(uint32_t factor) {
} }
static void add_big_buffer(struct file_info *file) { static int add_big_buffer(struct file_info *file) {
if (file->buffer == NULL) int result = 0;
BOOL newbuffer = FALSE;
if (file->buffer == NULL) {
file->buffer = toku_malloc(file->buffer_size); file->buffer = toku_malloc(file->buffer_size);
if (file->buffer) { if (file->buffer == NULL)
int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size); assert(r == 0); result = errno;
else
newbuffer = TRUE;
} }
if (result == 0) {
int r = setvbuf(file->file, (char *) file->buffer, _IOFBF, file->buffer_size);
if (r != 0) {
result = errno;
if (newbuffer) {
toku_free(file->buffer);
file->buffer = NULL;
}
}
}
return result;
} }
static void cleanup_big_buffer(struct file_info *file) { static void cleanup_big_buffer(struct file_info *file) {
...@@ -143,11 +158,12 @@ void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error) ...@@ -143,11 +158,12 @@ void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error)
fi->file_infos = NULL; fi->file_infos = NULL;
} }
static void open_file_add (struct file_infos *fi, static int open_file_add (struct file_infos *fi,
FILE *file, FILE *file,
char *fname, char *fname,
/* out */ FIDX *idx) /* out */ FIDX *idx)
{ {
int result = 0;
int r = toku_pthread_mutex_lock(&fi->lock); assert(r==0); int r = toku_pthread_mutex_lock(&fi->lock); assert(r==0);
if (fi->n_files >= fi->n_files_limit) { if (fi->n_files >= fi->n_files_limit) {
fi->n_files_limit *=2; fi->n_files_limit *=2;
...@@ -161,12 +177,15 @@ static void open_file_add (struct file_infos *fi, ...@@ -161,12 +177,15 @@ static void open_file_add (struct file_infos *fi,
fi->file_infos[fi->n_files].n_rows = 0; fi->file_infos[fi->n_files].n_rows = 0;
fi->file_infos[fi->n_files].buffer_size = 1<<20; fi->file_infos[fi->n_files].buffer_size = 1<<20;
fi->file_infos[fi->n_files].buffer = NULL; fi->file_infos[fi->n_files].buffer = NULL;
add_big_buffer(&fi->file_infos[fi->n_files]); result = add_big_buffer(&fi->file_infos[fi->n_files]);
idx->idx = fi->n_files; if (result == 0) {
fi->n_files++; idx->idx = fi->n_files;
fi->n_files_extant++; fi->n_files++;
fi->n_files_open++; fi->n_files_extant++;
fi->n_files_open++;
}
r = toku_pthread_mutex_unlock(&fi->lock); assert(r==0); r = toku_pthread_mutex_unlock(&fi->lock); assert(r==0);
return result;
} }
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) { int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
...@@ -224,17 +243,25 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) ...@@ -224,17 +243,25 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
* The open file will be saved in bl->file_infos so that even if errors happen we can free them all. * The open file will be saved in bl->file_infos so that even if errors happen we can free them all.
*/ */
{ {
int result = 0;
char *fname = toku_strdup(bl->temp_file_template); char *fname = toku_strdup(bl->temp_file_template);
int fd = mkstemp(fname); int fd = mkstemp(fname);
if (fd<0) { int r = errno; toku_free(fname); return r; } if (fd < 0) {
FILE *f = fdopen(fd, "r+"); result = errno;
if (f==NULL) { int r = errno; toku_free(fname); close(fd); return r; } } else {
open_file_add(&bl->file_infos, f, fname, file_idx); FILE *f = fdopen(fd, "r+");
if (f == NULL)
static int counter=0; result = errno;
//fprintf(stderr, "%s:%d %d: %s\n", __FILE__, __LINE__, counter, fname); else
counter++; result = open_file_add(&bl->file_infos, f, fname, file_idx);
return 0; }
if (result != 0) {
if (fd >= 0)
close(fd);
if (fname != NULL)
toku_free(fname);
}
return result;
} }
static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
...@@ -2289,6 +2316,8 @@ static int write_literal(struct dbout *out, void*data, size_t len) { ...@@ -2289,6 +2316,8 @@ static int write_literal(struct dbout *out, void*data, size_t len) {
CILK_BEGIN CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl) { static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl) {
int result = 0;
//printf(" finishing leaf node progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation); //printf(" finishing leaf node progress=%d fin at %d\n", bl->progress, bl->progress+progress_allocation);
//printf("local_fingerprint=%8x\n", lbuf->local_fingerprint); //printf("local_fingerprint=%8x\n", lbuf->local_fingerprint);
putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint); putbuf_int32_at(&lbuf->dbuf, lbuf->local_fingerprint_p, lbuf->local_fingerprint);
...@@ -2332,51 +2361,55 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2332,51 +2361,55 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// allocate space for the compressed bufer // allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block); int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf); unsigned char *MALLOC_N(header_len + bound, compressed_buf);
assert(compressed_buf); // LAZY if (compressed_buf == NULL) {
result = errno;
// compress and checksum the sub blocks } else {
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 1);
// cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
// serialize the sub block header // compress and checksum the sub blocks
memcpy(compressed_buf+16, &n_sub_blocks, 4); int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
for (int i = 0; i < n_sub_blocks; i++) { (char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4); (char *) (compressed_buf + header_len), 1);
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it // cppy the uncompressed header to the compressed buffer
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t)); memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
// serialize the sub block header
memcpy(compressed_buf+16, &n_sub_blocks, 4);
for (int i = 0; i < n_sub_blocks; i++) {
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
dbout_lock(out); dbout_lock(out);
long long off_of_leaf = out->current_off; long long off_of_leaf = out->current_off;
int size = header_len + compressed_len; int size = header_len + compressed_len;
if (0) { if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len); fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf); fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf);
fprintf(stderr, "compressed bytes are:"); fprintf(stderr, "compressed bytes are:");
//for (int i=0; i<compressed_len; i++) { //for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i]; // unsigned char c = compressed_buf[28+i];
// if (isprint(c)) fprintf(stderr, "%c", c); // if (isprint(c)) fprintf(stderr, "%c", c);
// else fprintf(stderr, "\\%03o", compressed_buf[28+i]); // else fprintf(stderr, "\\%03o", compressed_buf[28+i]);
//} //}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]); fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
} }
int result = write_literal(out, compressed_buf, size); result = write_literal(out, compressed_buf, size);
if (result == 0) { if (result == 0) {
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf); //printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf; out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size; out->translation[lbuf->blocknum].size = size;
seek_align_locked(out); seek_align_locked(out);
}
dbout_unlock(out);
} }
dbout_unlock(out);
toku_free(sub_block); // RFP cilk++ bug toku_free(sub_block); // RFP cilk++ bug
...@@ -2653,7 +2686,10 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s ...@@ -2653,7 +2686,10 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
} }
FIDX next_pivots_file; FIDX next_pivots_file;
brtloader_open_temp_file (bl, &next_pivots_file); {
int r = brtloader_open_temp_file (bl, &next_pivots_file);
if (r != 0) { result = r; break; }
}
struct subtrees_info next_sts; struct subtrees_info next_sts;
subtrees_info_init(&next_sts); subtrees_info_init(&next_sts);
......
...@@ -88,20 +88,24 @@ static void reset_my_malloc_counts(void) { ...@@ -88,20 +88,24 @@ static void reset_my_malloc_counts(void) {
static void *my_malloc(size_t n) { static void *my_malloc(size_t n) {
void *caller = __builtin_return_address(0); void *caller = __builtin_return_address(0);
if ((void*)toku_malloc <= caller && caller <= (void*)toku_xcalloc) { if (!((void*)toku_malloc <= caller && caller <= (void*)toku_free))
my_malloc_count++; goto skip;
if (n >= 64*1024) { my_malloc_count++;
my_big_malloc_count++; if (n >= 64*1024) {
if (my_malloc_event) { my_big_malloc_count++;
event_count++; if (my_malloc_event) {
if (event_count == event_count_trigger) { caller = __builtin_return_address(1);
event_hit(); if ((void*)toku_xmalloc <= caller && caller <= (void*)toku_malloc_report)
errno = ENOMEM; goto skip;
return NULL; event_count++;
} if (event_count == event_count_trigger) {
event_hit();
errno = ENOMEM;
return NULL;
} }
} }
} }
skip:
return malloc(n); return malloc(n);
} }
...@@ -206,6 +210,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -206,6 +210,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL); brt_loader_set_poll_function(&bl.poll_callback, loader_poll_callback, NULL);
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2); r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2);
// if (!(expect_error ? r != 0 : r == 0)) printf("WARNING%%d expect_error=%d r=%d\n", __LINE__, expect_error, r);
assert(expect_error ? r != 0 : r == 0); assert(expect_error ? r != 0 : r == 0);
toku_set_func_malloc(NULL); toku_set_func_malloc(NULL);
...@@ -217,14 +222,20 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -217,14 +222,20 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
brt_loader_destroy_poll_callback(&bl.poll_callback); brt_loader_destroy_poll_callback(&bl.poll_callback);
r = queue_destroy(q2); r = queue_destroy(q2);
assert(r==0); if (r != 0) printf("WARNING%d r=%d\n", __LINE__, r);
//assert(r==0);
destroy_merge_fileset(&fs); destroy_merge_fileset(&fs);
brtloader_fi_destroy(&bl.file_infos, expect_error); brtloader_fi_destroy(&bl.file_infos, expect_error);
} }
static int usage(const char *progname, int n) { static int usage(const char *progname, int n) {
fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] directory\n", progname, n); fprintf(stderr, "Usage:\n %s [-v] [-q] [-r %d] [-s] [-m] directory\n", progname, n);
fprintf(stderr, "[-v] turn on verbose\n");
fprintf(stderr, "[-q] turn off verbose\n");
fprintf(stderr, "[-r %d] set the number of rows\n", n);
fprintf(stderr, "[-s] set the small loader size factor\n");
fprintf(stderr, "[-m] inject big malloc failures\n");
return 1; return 1;
} }
......
...@@ -26,15 +26,6 @@ void *toku_malloc(size_t size) { ...@@ -26,15 +26,6 @@ void *toku_malloc(size_t size) {
return p; return p;
} }
void *
toku_xcalloc(size_t nmemb, size_t size)
{
size_t newsize = nmemb * size;
void *vp = toku_xmalloc(newsize);
if (vp) memset(vp, 0, newsize);
return vp;
}
void * void *
toku_calloc(size_t nmemb, size_t size) toku_calloc(size_t nmemb, size_t size)
{ {
...@@ -44,21 +35,6 @@ toku_calloc(size_t nmemb, size_t size) ...@@ -44,21 +35,6 @@ toku_calloc(size_t nmemb, size_t size)
return vp; return vp;
} }
void *
toku_xmalloc(size_t size) {
void *r = toku_malloc(size);
if (r==0) abort();
return r;
}
void *
toku_xrealloc(void *v, size_t size)
{
void *r = toku_realloc(v, size);
if (r==0) abort();
return r;
}
void * void *
toku_tagmalloc(size_t size, enum typ_tag typtag) toku_tagmalloc(size_t size, enum typ_tag typtag)
{ {
...@@ -80,6 +56,20 @@ toku_realloc(void *p, size_t size) ...@@ -80,6 +56,20 @@ toku_realloc(void *p, size_t size)
return q; return q;
} }
void *
toku_memdup (const void *v, size_t len)
{
void *r=toku_malloc(len);
if (r) memcpy(r,v,len);
return r;
}
char *
toku_strdup (const char *s)
{
return toku_memdup(s, strlen(s)+1);
}
void void
toku_free(void *p) toku_free(void *p)
{ {
...@@ -96,31 +86,41 @@ toku_free_n(void* p, size_t size __attribute__((unused))) ...@@ -96,31 +86,41 @@ toku_free_n(void* p, size_t size __attribute__((unused)))
} }
void * void *
toku_xmemdup (const void *v, size_t len) toku_xmalloc(size_t size) {
{ void *r = toku_malloc(size);
void *r=toku_xmalloc(len); if (r==0) abort();
memcpy(r,v,len);
return r; return r;
} }
void * void *
toku_memdup (const void *v, size_t len) toku_xcalloc(size_t nmemb, size_t size)
{ {
void *r=toku_malloc(len); size_t newsize = nmemb * size;
if (r) memcpy(r,v,len); void *vp = toku_xmalloc(newsize);
if (vp) memset(vp, 0, newsize);
return vp;
}
void *
toku_xrealloc(void *v, size_t size)
{
void *r = toku_realloc(v, size);
if (r==0) abort();
return r; return r;
} }
char * void *
toku_xstrdup (const char *s) toku_xmemdup (const void *v, size_t len)
{ {
return toku_xmemdup(s, strlen(s)+1); void *r=toku_xmalloc(len);
memcpy(r,v,len);
return r;
} }
char * char *
toku_strdup (const char *s) toku_xstrdup (const char *s)
{ {
return toku_memdup(s, strlen(s)+1); return toku_xmemdup(s, strlen(s)+1);
} }
void void
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment