Commit ade78868 authored by Yoni Fogel's avatar Yoni Fogel

[t:2312] All paths to write/pwrite go through portability layer.

full_os_(p)write check for partial reads, and fixable errors.


git-svn-id: file:///svn/toku/tokudb@17078 c7de825b-a66e-492c-adef-691d508d4ae1
parent 14674910
......@@ -8,36 +8,35 @@
//Print any necessary errors
//Return whether we should try the write again.
static int
static void
try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
int try_again = 0;
if (r_write==-1) {
int errno_write = errno;
assert(errno_write != 0);
switch (errno_write) {
case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
try_again = 1;
break;
}
case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
assert(r_write < 0);
int errno_write = errno;
assert(errno_write != 0);
switch (errno_write) {
case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
try_again = 1;
break;
}
errno = errno_write;
case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
}
return try_again;
assert(try_again);
errno = errno_write;
}
static ssize_t (*t_pwrite)(int, const void *, size_t, off_t) = 0;
......@@ -50,16 +49,23 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
void
toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) {
ssize_t r;
again:
if (t_pwrite) {
r = t_pwrite(fd, buf, len, off);
} else {
r = pwrite(fd, buf, len, off);
while (len > 0) {
ssize_t r;
if (t_pwrite) {
r = t_pwrite(fd, buf, len, off);
} else {
r = pwrite(fd, buf, len, off);
}
if (r > 0) {
len -= r;
buf += r;
off += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
if (try_again_after_handling_write_error(fd, len, r))
goto again;
assert(r==(ssize_t)len);
assert(len == 0);
}
static ssize_t (*t_write)(int, const void *, size_t) = 0;
......@@ -72,16 +78,22 @@ toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
void
toku_os_full_write (int fd, const void *buf, size_t len) {
ssize_t r;
again:
if (t_write) {
r = t_write(fd, buf, len);
} else {
r = write(fd, buf, len);
while (len > 0) {
ssize_t r;
if (t_write) {
r = t_write(fd, buf, len);
} else {
r = write(fd, buf, len);
}
if (r > 0) {
len -= r;
buf += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
if (try_again_after_handling_write_error(fd, len, r))
goto again;
assert(r==(ssize_t)len);
assert(len == 0);
}
static uint64_t get_tnow(void) {
......
......@@ -151,15 +151,8 @@ int toku_logger_shutdown(TOKULOGGER logger) {
// On error: Return negative with errno set.
// On success return nbytes.
static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes;
const char *buf=bufv;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
toku_os_full_write(fd, bufv, nbytes);
return nbytes;
}
// close the current file, and open the next one.
......
......@@ -206,15 +206,8 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
// NOTE : duplicated from logger.c - FIX THIS!!!
static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes;
const char *buf=bufv;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
toku_os_full_write(fd, bufv, nbytes);
return nbytes;
}
int toku_maybe_spill_rollbacks (TOKUTXN txn) {
......
......@@ -45,10 +45,10 @@ test (int seed) {
Bytef compressed_buf[compressed_size];
{ int r = compress2(compressed_buf, &compressed_size, (Bytef*)(buf[i]), sizes[i], 1); assert(r==Z_OK); }
u_int32_t compressed_size_n = toku_htod32(compressed_size);
{ int r = write(fd, &compressed_size_n, 4); assert(r==4); }
{ int r = write(fd, compressed_buf, compressed_size); assert(r==(int)compressed_size); }
{ int r = write(fd, &sizesn[i], 4); assert(r==4); } // the uncompressed size
{ int r = write(fd, &compressed_size_n, 4); assert(r==4); }
{ toku_os_full_write(fd, &compressed_size_n, 4); }
{ toku_os_full_write(fd, compressed_buf, compressed_size); }
{ toku_os_full_write(fd, &sizesn[i], 4); } // the uncompressed size
{ toku_os_full_write(fd, &compressed_size_n, 4); }
}
{ int r=close(fd); assert(r==0); }
}
......
......@@ -18,8 +18,7 @@ static void f_flush (CACHEFILE f,
BOOL for_checkpoint __attribute__((__unused__))) {
assert(size==BLOCKSIZE);
if (write_me) {
int r = pwrite(toku_cachefile_fd(f), value, BLOCKSIZE, key.b);
assert(r==BLOCKSIZE);
toku_os_full_pwrite(toku_cachefile_fd(f), value, BLOCKSIZE, key.b);
}
if (!keep_me) {
toku_free(value);
......
......@@ -52,22 +52,18 @@ assert(sizeof(buf) == N_BIGINTS * BIGINT_SIZE);
}
{
u_int32_t v = toku_htod32(compressed_len);
ssize_t r = write(fd, &v, sizeof(v));
assert(r==sizeof(v));
toku_os_full_write(fd, &v, sizeof(v));
}
{
ssize_t r = write(fd, compressed_buf, compressed_len);
assert(r==(ssize_t)compressed_len);
toku_os_full_write(fd, compressed_buf, compressed_len);
}
{
u_int32_t v = toku_htod32(sizeof(buf));
ssize_t r = write(fd, &v, sizeof(v));
assert(r==sizeof(v));
toku_os_full_write(fd, &v, sizeof(v));
}
{
u_int32_t v = toku_htod32(compressed_len);
ssize_t r = write(fd, &v, sizeof(v));
assert(r==sizeof(v));
toku_os_full_write(fd, &v, sizeof(v));
}
}
{ int r = close(fd); assert(r==0); }
......
......@@ -27,8 +27,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
memset(buf, 0, sizeof(buf));
u_int64_t i;
for (i=0; i<(1LL<<32); i+=BUFSIZE) {
int r = write(fd, buf, BUFSIZE);
assert(r==BUFSIZE);
toku_os_full_write(fd, buf, BUFSIZE);
}
}
int64_t file_size;
......
......@@ -52,6 +52,9 @@
toku_fstat;
toku_dup2;
toku_os_full_write;
toku_os_full_pwrite;
local: *;
};
......@@ -446,6 +446,7 @@ libs:
diskfull.tdb$(BINSUF): CPPFLAGS+=-DDONT_DEPRECATE_WRITES
test_db_curs4.tdb$(BINSUF): trace.h
test_db_curs4.bdb$(BINSUF): trace.h
# a bunch of little tests designed to run in parallel
......
......@@ -14,8 +14,8 @@ static void mkfile (const char *fname) {
int fd = open(fname, O_WRONLY | O_CREAT | O_BINARY, mode);
if (fd<0) perror("opening");
assert(fd>=0);
int r = write(fd, "hello\n", 6); assert(r==6);
r = close(fd); assert(r==0);
toku_os_full_write(fd, "hello\n", 6);
int r = close(fd); assert(r==0);
}
static void
......
......@@ -91,6 +91,9 @@ typedef int64_t toku_off_t;
# pragma poison pthread_condattr_t pthread_cond_t
# pragma poison pthread_rwlockattr_t pthread_rwlock_t
# pragma poison timespec
# ifndef DONT_DEPRECATE_WRITES
# pragma poison write pwrite
# endif
# ifndef DONT_DEPRECATE_MALLOC
# pragma deprecated (malloc, free, realloc)
# endif
......@@ -114,6 +117,10 @@ int _dup2(int fd, int fd2) __attribute__((__deprecated_
char* strdup(const char *) __attribute__((__deprecated__));
#undef __strdup
char* __strdup(const char *) __attribute__((__deprecated__));
# ifndef DONT_DEPRECATE_WRITES
ssize_t write(int, const void *, size_t) __attribute__((__deprecated__));
ssize_t pwrite(int, const void *, size_t, off_t) __attribute__((__deprecated__));
#endif
# ifndef DONT_DEPRECATE_MALLOC
void *malloc(size_t) __attribute__((__deprecated__));
void free(void*) __attribute__((__deprecated__));
......@@ -126,8 +133,8 @@ void *os_malloc(size_t);
void *os_realloc(void*,size_t);
void os_free(void*);
// full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */
void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off);
void toku_os_full_write (int fd, const void *buf, size_t len);
void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) __attribute__((__visibility__("default")));
void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// wrapper around fsync
int toku_file_fsync(int fd);
......
......@@ -26,7 +26,8 @@ $(TARGET): $(OBJS)
check: $(TARGET)
cd tests && $(MAKE) check
$(OBJS): CFLAGS += -DTOKU_WINDOWS_ALLOW_DEPRECATED
$(OBJS): CPPFLAGS += -DTOKU_WINDOWS_ALLOW_DEPRECATED
file.obj: CPPFLAGS += -DDONT_DEPRECATE_WRITES
clean:
rm -rf $(TARGET) $(LIBPORTABILITY) $(PTHREAD_LIB)
......
......@@ -94,9 +94,62 @@ int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
return 0;
}
//Print any necessary errors
//Return whether we should try the write again.
static void
try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
int try_again = 0;
assert(r_write < 0);
int errno_write = errno;
assert(errno_write != 0);
switch (errno_write) {
case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
try_again = 1;
break;
}
case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
}
assert(try_again);
errno = errno_write;
}
void
toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off)
toku_os_full_pwrite (int fd, const void *org_buf, size_t len, toku_off_t off)
{
const uint8_t *buf = org_buf;
while (len > 0) {
ssize_t r;
if (t_pwrite) {
r = t_pwrite(fd, buf, len, off);
} else {
r = pwrite(fd, buf, len, off);
}
if (r > 0) {
len -= r;
buf += r;
off += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
assert(len == 0);
}
/*
{
ssize_t r;
if (t_pwrite) {
......@@ -114,6 +167,27 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off)
}
assert(r==len);
}
*/
void
toku_os_full_write (int fd, const void *org_buf, size_t len) {
const uint8_t *buf = org_buf;
while (len > 0) {
ssize_t r;
if (t_write) {
r = t_write(fd, buf, len);
} else {
r = write(fd, buf, len);
}
if (r > 0) {
len -= r;
buf += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
assert(len == 0);
}
// t_fsync exists for testing purposes only
static int (*t_fsync)(int) = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment