Commit d0ccff50 authored by monty@bitch.mysql.fi's avatar monty@bitch.mysql.fi

New improved IO_CACHE

parent d3288575
#! /bin/sh
gmake -k clean || true
/bin/rm -f */.deps/*.P config.cache
aclocal && autoheader && aclocal && automake && autoconf
(cd bdb/dist && sh s_all)
(cd innobase && aclocal && autoheader && aclocal && automake && autoconf)
if [ -d gemini ]
then
(cd gemini && aclocal && autoheader && aclocal && automake && autoconf)
fi
CFLAGS="-g -Wimplicit -Wreturn-type -Wid-clash-51 -Wswitch -Wtrigraphs -Wcomment -W -Wchar-subscripts -Wformat -Wimplicit-function-dec -Wimplicit-int -Wparentheses -Wsign-compare -Wwrite-strings -Wunused -O3 -fno-omit-frame-pointer -mcpu=v8 -Wa,-xarch=v8plusa" CXX=gcc CXXFLAGS="-Wimplicit -Wreturn-type -Wid-clash-51 -Wswitch -Wtrigraphs -Wcomment -W -Wchar-subscripts -Wformat -Wimplicit-function-dec -Wimplicit-int -Wparentheses -Wsign-compare -Wwrite-strings -Woverloaded-virtual -Wextern-inline -Wsign-promo -Wreorder -Wctor-dtor-privacy -Wnon-virtual-dtor -felide-constructors -fno-exceptions -fno-rtti -O3 -fno-omit-frame-pointer -mcpu=v8 -Wa,-xarch=v8plusa -g" ./configure --prefix=/usr/local/mysql --enable-assembler --with-extra-charsets=complex --enable-thread-safe-client --with-debug
gmake -j 4
......@@ -324,11 +324,6 @@ typedef unsigned short ushort;
#endif
#include <dbug.h>
#ifndef DBUG_OFF
#define dbug_assert(A) assert(A)
#else
#define dbug_assert(A)
#endif
#define MIN_ARRAY_SIZE 0 /* Zero or One. Gcc allows zero*/
#define ASCII_BITS_USED 8 /* Bit char used */
......
......@@ -293,33 +293,21 @@ typedef struct st_dynamic_string {
struct st_io_cache;
typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
#ifdef THREAD
#define lock_append_buffer(info) \
pthread_mutex_lock(&(info)->append_buffer_lock)
#define unlock_append_buffer(info) \
pthread_mutex_unlock(&(info)->append_buffer_lock)
#else
#define lock_append_buffer(info)
#define unlock_append_buffer(info)
#endif
typedef struct st_io_cache /* Used when cacheing files */
{
my_off_t pos_in_file,end_of_file;
byte *rc_pos,*rc_end,*buffer,*rc_request_pos;
my_bool alloced_buffer; /* currented READ_NET is the only one
that will use a buffer allocated somewhere
else
*/
byte *append_buffer, *append_read_pos, *write_pos, *append_end,
*write_end;
/* for append buffer used in READ_APPEND cache */
byte *read_pos,*read_end,*buffer,*request_pos;
byte *write_buffer, *append_read_pos, *write_pos, *write_end;
byte **current_pos, **current_end;
/* The lock is for append buffer used in READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
#endif
int (*read_function)(struct st_io_cache *,byte *,uint);
int (*write_function)(struct st_io_cache *,const byte *,uint);
enum cache_type type;
/* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read;
......@@ -331,7 +319,11 @@ typedef struct st_io_cache /* Used when cacheing files */
int seek_not_done,error;
uint buffer_length,read_length;
myf myflags; /* Flags used to my_read/my_write */
enum cache_type type;
/*
Currently READ_NET is the only one that will use a buffer allocated
somewhere else
*/
my_bool alloced_buffer;
#ifdef HAVE_AIOWAIT
uint inited;
my_off_t aio_read_pos;
......@@ -349,9 +341,9 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
#define my_b_EOF INT_MIN
#define my_b_read(info,Buffer,Count) \
((info)->rc_pos + (Count) <= (info)->rc_end ?\
(memcpy(Buffer,(info)->rc_pos,(size_t) (Count)), \
((info)->rc_pos+=(Count)),0) :\
((info)->read_pos + (Count) <= (info)->read_end ?\
(memcpy(Buffer,(info)->read_pos,(size_t) (Count)), \
((info)->read_pos+=(Count)),0) :\
(*(info)->read_function)((info),Buffer,Count))
#define my_b_write(info,Buffer,Count) \
......@@ -362,11 +354,10 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
#define my_b_get(info) \
((info)->rc_pos != (info)->rc_end ?\
((info)->rc_pos++, (int) (uchar) (info)->rc_pos[-1]) :\
((info)->read_pos != (info)->read_end ?\
((info)->read_pos++, (int) (uchar) (info)->read_pos[-1]) :\
_my_b_get(info))
/* my_b_write_byte dosn't have any err-check */
#define my_b_write_byte(info,chr) \
(((info)->write_pos < (info)->write_end) ?\
......@@ -374,18 +365,14 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
(_my_b_write(info,0,0) , ((*(info)->write_pos++)=(chr))))
#define my_b_fill_cache(info) \
(((info)->rc_end=(info)->rc_pos),(*(info)->read_function)(info,0,0))
#define my_write_cache(info) (((info)->type == WRITE_CACHE))
#define my_cache_pointer(info) (my_write_cache(info) ? \
((info)->write_pos) : ((info)->rc_pos))
(((info)->read_end=(info)->read_pos),(*(info)->read_function)(info,0,0))
#define my_b_tell(info) ((info)->pos_in_file + \
my_cache_pointer(info) - (info)->rc_request_pos)
(uint) (*(info)->current_pos - (info)->request_pos))
#define my_b_bytes_in_cache(info) (uint) (*(info)->current_end - \
*(info)->current_pos)
#define my_b_bytes_in_cache(info) (my_write_cache(info) ? \
((uint) ((info)->write_end - (info)->write_pos)): \
((uint) ((info)->rc_end - (info)->rc_pos)))
typedef struct st_changeable_var {
const char *name; /* Name of variable */
......@@ -584,7 +571,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_block_write(IO_CACHE *info, const byte *Buffer,
uint Count, my_off_t pos);
extern int flush_io_cache(IO_CACHE *info);
......
......@@ -46,11 +46,11 @@ int _nisam_read_cache(IO_CACHE *info, byte *buff, ulong pos, uint length,
buff+=read_length;
}
if ((offset=pos - (ulong) info->pos_in_file) <
(ulong) (info->rc_end - info->rc_request_pos))
(ulong) (info->read_end - info->request_pos))
{
in_buff_pos=info->rc_request_pos+(uint) offset;
in_buff_length= min(length,(uint) (info->rc_end-in_buff_pos));
memcpy(buff,info->rc_request_pos+(uint) offset,(size_t) in_buff_length);
in_buff_pos=info->request_pos+(uint) offset;
in_buff_length= min(length,(uint) (info->read_end-in_buff_pos));
memcpy(buff,info->request_pos+(uint) offset,(size_t) in_buff_length);
if (!(length-=in_buff_length))
return 0;
pos+=in_buff_length;
......@@ -61,14 +61,14 @@ int _nisam_read_cache(IO_CACHE *info, byte *buff, ulong pos, uint length,
if (flag & READING_NEXT)
{
if (pos != ((info)->pos_in_file +
(uint) ((info)->rc_end - (info)->rc_request_pos)))
(uint) ((info)->read_end - (info)->request_pos)))
{
info->pos_in_file=pos; /* Force start here */
info->rc_pos=info->rc_end=info->rc_request_pos; /* Everything used */
info->read_pos=info->read_end=info->request_pos; /* Everything used */
info->seek_not_done=1;
}
else
info->rc_pos=info->rc_end; /* All block used */
info->read_pos=info->read_end; /* All block used */
if (!(*info->read_function)(info,buff,length))
return 0;
if (!(flag & READING_HEADER) || info->error == -1 ||
......
......@@ -187,7 +187,7 @@ int _nisam_read_rnd_static_record(N_INFO *info, byte *buf,
(skipp_deleted_blocks || !filepos))
{
cache_read=1; /* Read record using cache */
cache_length=(uint) (info->rec_cache.rc_end - info->rec_cache.rc_pos);
cache_length=(uint) (info->rec_cache.read_end - info->rec_cache.read_pos);
}
else
info->rec_cache.seek_not_done=1; /* Filepos is changed */
......
......@@ -172,7 +172,7 @@ void start_test(int id)
}
if (key_cacheing && rnd(2) == 0)
init_key_cache(65536L,(uint) IO_SIZE*4*10);
printf("Process %d, pid: %d\n",id,getpid()); fflush(stdout);
printf("Process %d, pid: %d\n",id,(int) getpid()); fflush(stdout);
for (error=i=0 ; i < tests && !error; i++)
{
......@@ -356,7 +356,7 @@ int test_write(N_INFO *file,int id,int lock_type)
nisam_extra(file,HA_EXTRA_WRITE_CACHE);
}
sprintf(record.id,"%7d",getpid());
sprintf(record.id,"%7d",(int) getpid());
strmov(record.text,"Testing...");
tries=(uint) rnd(100)+10;
......
......@@ -45,11 +45,11 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
buff+=read_length;
}
if ((offset= (my_off_t) (pos - info->pos_in_file)) <
(my_off_t) (info->rc_end - info->rc_request_pos))
(my_off_t) (info->read_end - info->request_pos))
{
in_buff_pos=info->rc_request_pos+(uint) offset;
in_buff_length= min(length,(uint) (info->rc_end-in_buff_pos));
memcpy(buff,info->rc_request_pos+(uint) offset,(size_t) in_buff_length);
in_buff_pos=info->request_pos+(uint) offset;
in_buff_length= min(length,(uint) (info->read_end-in_buff_pos));
memcpy(buff,info->request_pos+(uint) offset,(size_t) in_buff_length);
if (!(length-=in_buff_length))
DBUG_RETURN(0);
pos+=in_buff_length;
......@@ -60,14 +60,14 @@ int _mi_read_cache(IO_CACHE *info, byte *buff, my_off_t pos, uint length,
if (flag & READING_NEXT)
{
if (pos != ((info)->pos_in_file +
(uint) ((info)->rc_end - (info)->rc_request_pos)))
(uint) ((info)->read_end - (info)->request_pos)))
{
info->pos_in_file=pos; /* Force start here */
info->rc_pos=info->rc_end=info->rc_request_pos; /* Everything used */
info->read_pos=info->read_end=info->request_pos; /* Everything used */
info->seek_not_done=1;
}
else
info->rc_pos=info->rc_end; /* All block used */
info->read_pos=info->read_end; /* All block used */
if (!(*info->read_function)(info,buff,length))
DBUG_RETURN(0);
if (!(flag & READING_HEADER) || info->error == -1 ||
......
......@@ -221,7 +221,7 @@ int _mi_read_rnd_static_record(MI_INFO *info, byte *buf,
(skipp_deleted_blocks || !filepos))
{
cache_read=1; /* Read record using cache */
cache_length=(uint) (info->rec_cache.rc_end - info->rec_cache.rc_pos);
cache_length=(uint) (info->rec_cache.read_end - info->rec_cache.read_pos);
}
else
info->rec_cache.seek_not_done=1; /* Filepos is changed */
......
......@@ -19,7 +19,7 @@ TZ=GMT-3; export TZ # for UNIX_TIMESTAMP tests to work
# Program Definitions
#--
PATH=/bin:/usr/bin:/usr/local/bin:/usr/bsd:/usr/X11R6/bin
PATH=/bin:/usr/bin:/usr/local/bin:/usr/bsd:/usr/X11R6/bin:/usr/openwin/bin
# Standard functions
......@@ -49,6 +49,7 @@ BASENAME=`which basename | head -1`
DIFF=`which diff | head -1`
CAT=cat
CUT=cut
HEAD=head
TAIL=tail
ECHO=echo # use internal echo if possible
EXPR=expr # use internal if possible
......@@ -118,7 +119,7 @@ MYSQLD_SRC_DIRS="strings mysys include extra regex isam merge myisam \
#
# Set LD_LIBRARY_PATH if we are using shared libraries
#
LD_LIBRARY_PATH="$BASEDIR/lib:$LD_LIBRARY_PATH"
LD_LIBRARY_PATH="$BASEDIR/lib:$BASEDIR/libmysql/.libs:$LD_LIBRARY_PATH"
export LD_LIBRARY_PATH
MASTER_RUNNING=0
......@@ -225,6 +226,8 @@ while test $# -gt 0; do
$ECHO "Note: you will get more meaningful output on a source distribution compiled with debugging option when running tests with --gdb option"
fi
DO_GDB=1
# We must use manager, as things doesn't work on Linux without it
USE_MANAGER=1
USE_RUNNING_SERVER=""
;;
--client-gdb )
......@@ -310,6 +313,8 @@ if [ x$SOURCE_DIST = x1 ] ; then
MYSQLD="$BASEDIR/sql/mysqld"
if [ -f "$BASEDIR/client/.libs/lt-mysqltest" ] ; then
MYSQL_TEST="$BASEDIR/client/.libs/lt-mysqltest"
elif [ -f "$BASEDIR/client/.libs/mysqltest" ] ; then
MYSQL_TEST="$BASEDIR/client/.libs/mysqltest"
else
MYSQL_TEST="$BASEDIR/client/mysqltest"
fi
......@@ -428,7 +433,7 @@ do_gdb_test ()
$ECHO "set args $mysql_test_args < $2" > $GDB_CLIENT_INIT
echo "Set breakpoints ( if needed) and type 'run' in gdb window"
#this xterm should not be backgrounded
xterm -title "Client" -e gdb -x $GDB_CLIENT_INIT $MYSQL_TEST_BIN
$XTERM -title "Client" -e gdb -x $GDB_CLIENT_INIT $MYSQL_TEST_BIN
}
error () {
......@@ -437,7 +442,7 @@ error () {
}
error_is () {
$TR "\n" " " < $TIMEFILE | $SED -e 's/.* At line \(.*\)\: \(.*\)Command .*$/ \>\> Error at line \1: \2<\</'
$CAT < $TIMEFILE | $SED -e 's/.* At line \(.*\)\: \(.*\)/ \>\> Error at line \1: \2<\</' | $HEAD -1
}
prefix_to_8() {
......@@ -802,8 +807,8 @@ start_slave()
elif [ x$DO_GDB = x1 ]
then
$ECHO "set args $slave_args" > $GDB_SLAVE_INIT
manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e gdb -x \
$GDB_SLAVE_INIT $SLAVE_MYSQLD
manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e \
gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD
else
manager_launch $slave_ident $SLAVE_MYSQLD $slave_args
fi
......
drop table if exists t1,t2;
create table t1 (a tinyint not null auto_increment, b blob not null, primary key (a));
check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
repair table t1;
Table Op Msg_type Msg_text
test.t1 repair status OK
create table t1 (a tinyint not null auto_increment, b blob not null, primary key (a)) type=isam;
delete from t1 where (a & 1);
check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
repair table t1;
Table Op Msg_type Msg_text
test.t1 repair status OK
check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
select sum(length(b)) from t1;
sum(length(b))
3274494
drop table t1;
create table t1 (a int not null auto_increment,b int, primary key (a)) type=isam;
insert into t1 values (1,1),(NULL,2),(3,3),(NULL,4);
......
......@@ -6,7 +6,7 @@ drop table if exists t1,t2;
# Test possible problem with rows that are about 65535 bytes long
#
create table t1 (a tinyint not null auto_increment, b blob not null, primary key (a));
create table t1 (a tinyint not null auto_increment, b blob not null, primary key (a)) type=isam;
let $1=100;
disable_query_log;
......@@ -16,12 +16,8 @@ while ($1)
dec $1;
}
enable_query_log;
check table t1;
repair table t1;
delete from t1 where (a & 1);
check table t1;
repair table t1;
check table t1;
select sum(length(b)) from t1;
drop table t1;
#
......
......@@ -23,12 +23,30 @@
Possibly use of asyncronic io.
macros for read and writes for faster io.
Used instead of FILE when reading or writing whole files.
This will make mf_rec_cache obsolete.
This code makes mf_rec_cache obsolete (currently only used by ISAM)
One can change info->pos_in_file to a higher value to skip bytes in file if
also info->rc_pos is set to info->rc_end.
also info->read_pos is set to info->read_end.
If called through open_cached_file(), then the temporary file will
only be created if a write exeeds the file buffer or if one calls
flush_io_cache().
If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
reading and another for writing. Reads are first done from disk and
then done from the write buffer. This is an efficient way to read
from a log file when one is writing to it at the same time.
For this to work, the file has to be opened in append mode!
Note that when one uses SEQ_READ_APPEND, one MUST write using
my_b_append ! This is needed because we need to lock the mutex
every time we access the write buffer.
TODO:
When one SEQ_READ_APPEND and we are reading and writing at the same time,
each time the write buffer gets full and it's written to disk, we will
always do a disk read to read a part of the buffer from disk to the
read buffer.
This should be fixed so that when we do a flush_io_cache() and
we have been reading the write buffer, we should transfer the rest of the
write buffer to the read buffer before we start to reuse it.
*/
#define MAP_TO_USE_RAID
......@@ -41,18 +59,20 @@ static void my_aiowait(my_aio_result *result);
#include <assert.h>
#include <errno.h>
#ifdef MAIN
#include <my_dir.h>
#ifdef THREAD
#define lock_append_buffer(info) \
pthread_mutex_lock(&(info)->append_buffer_lock)
#define unlock_append_buffer(info) \
pthread_mutex_unlock(&(info)->append_buffer_lock)
#else
#define lock_append_buffer(info)
#define unlock_append_buffer(info)
#endif
static void init_read_function(IO_CACHE* info, enum cache_type type);
static void init_write_function(IO_CACHE* info, enum cache_type type);
static void init_read_function(IO_CACHE* info, enum cache_type type)
static void
init_functions(IO_CACHE* info, enum cache_type type)
{
switch (type)
{
#ifndef MYSQL_CLIENT
switch (type) {
case READ_NET:
/* must be initialized by the caller. The problem is that
_my_b_net_read has to be defined in sql directory because of
......@@ -61,24 +81,25 @@ static void init_read_function(IO_CACHE* info, enum cache_type type)
as myisamchk
*/
break;
#endif
case SEQ_READ_APPEND:
info->read_function = _my_b_seq_read;
info->write_function = 0; /* Force a core if used */
break;
default:
info->read_function = _my_b_read;
info->write_function = _my_b_write;
}
}
static void init_write_function(IO_CACHE* info, enum cache_type type)
{
switch (type)
/* Ensure that my_b_tell() and my_b_bytes_in_cache works */
if (type == WRITE_CACHE)
{
case SEQ_READ_APPEND:
info->write_function = _my_b_append;
break;
default:
info->write_function = _my_b_write;
info->current_pos= &info->write_pos;
info->current_end= &info->write_end;
}
else
{
info->current_pos= &info->read_pos;
info->current_end= &info->read_end;
}
}
......@@ -93,62 +114,61 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
pbool use_async_io, myf cache_myflags)
{
uint min_cache;
my_off_t end_of_file= ~(my_off_t) 0;
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
/* There is no file in net_reading */
info->file= file;
info->type=type;
info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
info->alloced_buffer = 0;
info->buffer=0;
info->seek_not_done= test(file >= 0);
if (!cachesize)
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
info->alloced_buffer = 0;
if (type == READ_CACHE || type == SEQ_READ_APPEND)
{ /* Assume file isn't growing */
if (cache_myflags & MY_DONT_CHECK_FILESIZE)
{
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
}
else
if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
{
my_off_t file_pos,end_of_file;
if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
DBUG_RETURN(1);
/* Calculate end of file to not allocate to big buffers */
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
/* Trim cache size if the file is very small.
However, we should not do this with SEQ_READ_APPEND cache
*/
if (type != SEQ_READ_APPEND &&
(my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
/* Trim cache size if the file is very small */
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No need to use async */
}
}
}
if ((int) type < (int) READ_NET)
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
if (type != READ_NET && type != WRITE_NET)
{
uint buffer_block;
/* Retry allocating memory in smaller blocks until we get one */
for (;;)
{
buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
uint buffer_block;
cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
(ulong) ~(min_cache-1));
if (type == SEQ_READ_APPEND)
buffer_block *= 2;
if (cachesize < min_cache)
cachesize = min_cache;
buffer_block = cachesize;
if (type == SEQ_READ_APPEND)
buffer_block *= 2;
if ((info->buffer=
(byte*) my_malloc(buffer_block,
MYF((cache_myflags & ~ MY_WME) |
(cachesize == min_cache ? MY_WME : 0)))) != 0)
{
info->write_buffer=info->buffer;
if (type == SEQ_READ_APPEND)
info->append_buffer = info->buffer + cachesize;
info->write_buffer = info->buffer + cachesize;
info->alloced_buffer=1;
break; /* Enough memory found */
}
......@@ -157,45 +177,30 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
}
}
else
info->buffer=0;
DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
info->write_pos = info->write_end = 0;
info->request_pos= info->read_pos= info->write_pos = info->buffer;
if (type == SEQ_READ_APPEND)
{
info->append_read_pos = info->write_pos = info->append_buffer;
info->write_end = info->append_end =
info->append_buffer + info->buffer_length;
info->append_read_pos = info->write_pos = info->write_buffer;
info->write_end = info->write_buffer + info->buffer_length;
#ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
#endif
}
if (type == READ_CACHE || type == SEQ_READ_APPEND ||
type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
else /* type == WRITE_CACHE */
{
if (type == WRITE_CACHE)
info->write_end=
info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
info->write_pos = info->buffer;
}
/* end_of_file may be changed by user later */
info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
: ~(my_off_t) 0);
info->type=type;
else
info->read_end=info->buffer; /* Nothing in cache */
/* End_of_file may be changed by user later */
info->end_of_file= end_of_file;
info->error=0;
init_read_function(info,type);
init_write_function(info,type);
init_functions(info,type);
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
......@@ -236,8 +241,13 @@ static void my_aiowait(my_aio_result *result)
}
#endif
/* Use this to reset cache to start or other type */
/* Some simple optimizing is done when reinit in current buffer */
/*
Use this to reset cache to re-start reading or to change the type
between READ_CACHE <-> WRITE_CACHE
If we are doing a reinit of a cache where we have the start of the file
in the cache, we are reusing this memory without flushing it to disk.
*/
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
my_off_t seek_offset,
......@@ -245,33 +255,37 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
pbool clear_cache)
{
DBUG_ENTER("reinit_io_cache");
DBUG_PRINT("enter",("type: %d seek_offset: %lu clear_cache: %d",
type, (ulong) seek_offset, (int) clear_cache));
info->seek_not_done= test(info->file >= 0); /* Seek not done */
/* One can't do reinit with the following types */
DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
type != WRITE_NET && info->type != WRITE_NET &&
type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
/* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file +
(uint) (info->rc_end - info->rc_request_pos))
{ /* use current buffer */
seek_offset <= my_b_tell(info))
{
/* Reuse current buffer without flushing it to disk */
byte *pos;
if (info->type == WRITE_CACHE && type == READ_CACHE)
{
info->rc_end=info->write_pos;
info->read_end=info->write_pos;
info->end_of_file=my_b_tell(info);
}
else if (type == WRITE_CACHE)
{
if (info->type == READ_CACHE)
{
info->write_end=info->buffer+info->buffer_length;
info->write_pos=info->rc_pos;
}
info->write_end=info->write_buffer+info->buffer_length;
info->end_of_file = ~(my_off_t) 0;
}
pos=info->request_pos+(seek_offset-info->pos_in_file);
if (type == WRITE_CACHE)
info->write_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
info->write_pos=pos;
else
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
info->read_pos= pos;
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
#endif
......@@ -284,43 +298,29 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
*/
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
/* No need to flush cache if we want to reuse it */
if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
/* flush cache if we want to reuse it */
if (!clear_cache && flush_io_cache(info))
DBUG_RETURN(1);
if (info->pos_in_file != seek_offset)
{
info->pos_in_file=seek_offset;
/* Better to do always do a seek */
info->seek_not_done=1;
}
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
info->request_pos=info->read_pos=info->write_pos=info->buffer;
if (type == READ_CACHE)
{
info->rc_end=info->buffer; /* Nothing in cache */
info->read_end=info->buffer; /* Nothing in cache */
}
else
{
info->rc_end=info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
~(my_off_t) 0);
}
info->write_end=(info->buffer + info->buffer_length -
(seek_offset & (IO_SIZE-1)));
info->end_of_file= ~(my_off_t) 0;
}
if (info->type == SEQ_READ_APPEND)
{
info->append_read_pos = info->write_pos = info->append_buffer;
}
if (!info->write_pos)
info->write_pos = info->buffer;
if (!info->write_end)
info->write_end = info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->type=type;
info->error=0;
init_read_function(info,type);
init_write_function(info,type);
init_functions(info,type);
#ifdef HAVE_AIOWAIT
if (type != READ_NET)
{
if (use_async_io && ! my_disable_async_io &&
((ulong) info->buffer_length <
(ulong) (info->end_of_file - seek_offset)))
......@@ -328,7 +328,6 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->read_length=info->buffer_length/2;
info->read_function=_my_b_async_read;
}
}
info->inited=0;
#endif
DBUG_RETURN(0);
......@@ -336,28 +335,30 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
/*
/*
Read buffered. Returns 1 if can't read requested characters
This function is only called from the my_b_read() macro
when there isn't enough characters in the buffer to
satisfy the request.
Returns 0 we succeeded in reading all data
*/
*/
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
DBUG_ENTER("_my_b_read");
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
if ((left_length=(uint) (info->read_end-info->read_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->rc_pos, (size_t) (left_length));
DBUG_ASSERT(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->read_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
......@@ -370,7 +371,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (info->end_of_file == pos_in_file)
{ /* End of file */
info->error=(int) left_length;
return 1;
DBUG_RETURN(1);
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
......@@ -378,7 +379,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
info->error= read_length == (uint) -1 ? -1 :
(int) (read_length+left_length);
return 1;
DBUG_RETURN(1);
}
Count-=length;
Buffer+=length;
......@@ -386,16 +387,17 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
left_length+=length;
diff_length=0;
}
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
(info->end_of_file - pos_in_file) < max_length)
max_length > (info->end_of_file - pos_in_file))
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
info->error= left_length; /* We only got this many char */
return 1;
DBUG_RETURN(1);
}
length=0; /* Didn't read any chars */
}
......@@ -406,16 +408,18 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (length != (uint) -1)
memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
return 1;
info->read_pos=info->read_end=info->buffer;
DBUG_RETURN(1);
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
info->read_pos=info->buffer+Count;
info->read_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
DBUG_RETURN(0);
}
/* Do sequential read from the SEQ_READ_APPEND cache
/*
Do sequential read from the SEQ_READ_APPEND cache
we do this in three stages:
- first read from info->buffer
- then if there are still data to read, try the file descriptor
......@@ -427,97 +431,127 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
uint length,diff_length,left_length,save_count;
my_off_t max_length, pos_in_file;
save_count=Count;
/* first, read the regular buffer */
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
if ((left_length=(uint) (info->read_end-info->read_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->rc_pos, (size_t) (left_length));
DBUG_ASSERT(Count > left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->read_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
lock_append_buffer(info);
/* pos_in_file always point on where info->buffer was read */
if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >=
if ((pos_in_file=info->pos_in_file+(uint) (info->read_end - info->buffer)) >=
info->end_of_file)
{
info->pos_in_file=pos_in_file;
goto read_append_buffer;
if (info->seek_not_done)
{ /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
info->seek_not_done=0;
}
/* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
/* now the second stage begins - read from file descriptor */
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
if (info->end_of_file == pos_in_file)
{ /* End of file */
goto read_append_buffer;
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length)
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) ==
(uint)-1)
{
if (read_length != (uint)-1)
{
Count -= read_length;
Buffer += read_length;
info->error= -1;
unlock_append_buffer(info);
return 1;
}
Count-=read_length;
Buffer+=read_length;
pos_in_file+=read_length;
if (read_length != (uint) length)
{
/*
We only got part of data; Read the rest of the data from the
write buffer
*/
goto read_append_buffer;
}
Count-=length;
Buffer+=length;
pos_in_file+=length;
left_length+=length;
diff_length=0;
}
max_length=info->read_length-diff_length;
if ((info->end_of_file - pos_in_file) < max_length)
if (max_length > (info->end_of_file - pos_in_file))
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
goto read_append_buffer;
length=0; /* Didn't read any more chars */
}
length=0; /* Didn't read any chars */
}
else if ((length=my_read(info->file,info->buffer,(uint) max_length,
info->myflags)) < Count ||
length == (uint) -1)
else
{
if (length != (uint) -1)
length=my_read(info->file,info->buffer,(uint) max_length,
info->myflags);
if (length == (uint) -1)
{
info->error= -1;
unlock_append_buffer(info);
return 1;
}
if (length < Count)
{
memcpy(Buffer,info->buffer,(size_t) length);
Count -= length;
Buffer += length;
}
goto read_append_buffer;
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
}
unlock_append_buffer(info);
info->read_pos=info->buffer+Count;
info->read_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
read_append_buffer:
lock_append_buffer(info);
if (!Count) return 0;
{
uint copy_len = (uint)(info->append_read_pos -
info->write_pos);
dbug_assert(info->append_read_pos <= info->write_pos);
if (copy_len > Count)
copy_len = Count;
memcpy(Buffer, info->append_read_pos,
copy_len);
/*
Read data from the current write buffer.
Count should never be == 0 here (The code will work even if count is 0)
*/
{
/* First copy the data to Count */
uint len_in_buff = (uint) (info->write_pos - info->append_read_pos);
uint copy_len;
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=min(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
Count -= copy_len;
if (Count)
info->error = save_count - Count;
/* Fill read buffer with data from write buffer */
memcpy(info->buffer, info->append_read_pos,
(size_t) (len_in_buff - copy_len));
info->read_pos= info->buffer;
info->read_end= info->buffer+(len_in_buff - copy_len);
info->append_read_pos=info->write_pos;
info->pos_in_file+=len_in_buff;
}
unlock_append_buffer(info);
return Count ? 1 : 0;
}
#ifdef HAVE_AIOWAIT
int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
......@@ -527,8 +561,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
my_off_t next_pos_in_file;
byte *read_buffer;
memcpy(Buffer,info->rc_pos,
(size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
memcpy(Buffer,info->read_pos,
(size_t) (left_length=(uint) (info->read_end-info->read_pos)));
Buffer+=left_length;
org_Count=Count;
Count-=left_length;
......@@ -555,13 +589,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
(int) (read_length+left_length));
return(1);
}
info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos);
info->pos_in_file+=(uint) (info->read_end - info->request_pos);
if (info->rc_request_pos != info->buffer)
info->rc_request_pos=info->buffer;
if (info->request_pos != info->buffer)
info->request_pos=info->buffer;
else
info->rc_request_pos=info->buffer+info->read_length;
info->rc_pos=info->rc_request_pos;
info->request_pos=info->buffer+info->read_length;
info->read_pos=info->request_pos;
next_pos_in_file=info->aio_read_pos+read_length;
/* Check if pos_in_file is changed
......@@ -578,8 +612,8 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
info->pos_in_file=info->aio_read_pos; /* Whe are here */
info->rc_pos=info->rc_request_pos+offset;
read_length-=offset; /* Bytes left from rc_pos */
info->read_pos=info->request_pos+offset;
read_length-=offset; /* Bytes left from read_pos */
}
}
#ifndef DBUG_OFF
......@@ -591,16 +625,16 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
#endif
/* Copy found bytes to buffer */
length=min(Count,read_length);
memcpy(Buffer,info->rc_pos,(size_t) length);
memcpy(Buffer,info->read_pos,(size_t) length);
Buffer+=length;
Count-=length;
left_length+=length;
info->rc_end=info->rc_pos+read_length;
info->rc_pos+=length;
info->read_end=info->rc_pos+read_length;
info->read_pos+=length;
}
else
next_pos_in_file=(info->pos_in_file+ (uint)
(info->rc_end - info->rc_request_pos));
(info->read_end - info->request_pos));
/* If reading large blocks, or first read or read with skipp */
if (Count)
......@@ -614,13 +648,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1));
if (Count < read_length)
{ /* Small block, read to cache */
if ((read_length=my_read(info->file,info->rc_request_pos,
if ((read_length=my_read(info->file,info->request_pos,
read_length, info->myflags)) == (uint) -1)
return info->error= -1;
use_length=min(Count,read_length);
memcpy(Buffer,info->rc_request_pos,(size_t) use_length);
info->rc_pos=info->rc_request_pos+Count;
info->rc_end=info->rc_request_pos+read_length;
memcpy(Buffer,info->request_pos,(size_t) use_length);
info->read_pos=info->request_pos+Count;
info->read_end=info->request_pos+read_length;
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
next_pos_in_file+=read_length;
......@@ -641,7 +675,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
info->error= read_length == (uint) -1 ? -1 : read_length+left_length;
return 1;
}
info->rc_pos=info->rc_end=info->rc_request_pos;
info->read_pos=info->read_end=info->request_pos;
info->pos_in_file=(next_pos_in_file+=Count);
}
}
......@@ -652,7 +686,7 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
if (max_length > (my_off_t) info->read_length - diff_length)
max_length= (my_off_t) info->read_length - diff_length;
if (info->rc_request_pos != info->buffer)
if (info->request_pos != info->buffer)
read_buffer=info->buffer;
else
read_buffer=info->buffer+info->read_length;
......@@ -669,13 +703,13 @@ int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
my_errno=errno;
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
errno, info->aio_result.result.aio_errno));
if (info->rc_request_pos != info->buffer)
if (info->request_pos != info->buffer)
{
bmove(info->buffer,info->rc_request_pos,
(uint) (info->rc_end - info->rc_pos));
info->rc_request_pos=info->buffer;
info->rc_pos-=info->read_length;
info->rc_end-=info->read_length;
bmove(info->buffer,info->request_pos,
(uint) (info->read_end - info->read_pos));
info->request_pos=info->buffer;
info->read_pos-=info->read_length;
info->read_end-=info->read_length;
}
info->read_length=info->buffer_length; /* Use hole buffer */
info->read_function=_my_b_read; /* Use normal IO_READ next */
......@@ -709,16 +743,17 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->write_end - info->write_pos);
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
if (info->pos_in_file+info->buffer_length > info->end_of_file)
{
my_errno=errno=EFBIG;
return info->error = -1;
}
rest_length=(uint) (info->write_end - info->write_pos);
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
......@@ -740,12 +775,21 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
return 0;
}
int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
/*
Append a block to the write buffer.
This is done with the buffer locked to ensure that we don't read from
the write buffer before we are ready with it.
*/
int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->append_end -
info->write_pos);
lock_append_buffer(info);
rest_length=(uint) (info->write_end - info->write_pos);
if (Count <= rest_length)
goto end;
memcpy(info->write_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
......@@ -760,8 +804,11 @@ int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
Count-=length;
Buffer+=length;
}
end:
memcpy(info->write_pos,Buffer,(size_t) Count);
info->write_pos+=Count;
unlock_append_buffer(info);
return 0;
}
......@@ -791,10 +838,13 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
Buffer+=length;
pos+= length;
Count-= length;
#ifndef HAVE_PREAD
info->seek_not_done=1;
#endif
}
/* Check if we want to write inside the used part of the buffer.*/
length= (uint) (info->rc_end - info->buffer);
length= (uint) (info->write_end - info->buffer);
if (pos < info->pos_in_file + length)
{
uint offset= (uint) (pos - info->pos_in_file);
......@@ -816,20 +866,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
return error;
}
/* avoid warning about empty if body */
#ifdef THREAD
#define IF_APPEND_CACHE if (append_cache)
#else
#define IF_APPEND_CACHE
#endif
/* Flush write cache */
int flush_io_cache(IO_CACHE *info)
{
uint length;
int append_cache;
my_bool append_cache;
my_off_t pos_in_file;
DBUG_ENTER("flush_io_cache");
append_cache = (info->type == SEQ_READ_APPEND);
if (info->type == WRITE_CACHE || append_cache)
{
......@@ -838,37 +884,38 @@ int flush_io_cache(IO_CACHE *info)
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
IF_APPEND_CACHE
lock_append_buffer(info);
if (info->write_pos != info->buffer)
if ((length=(uint) (info->write_pos - info->write_buffer)))
{
pos_in_file=info->pos_in_file;
if (append_cache)
{
length=(uint) (info->write_pos - info->buffer);
pos_in_file=info->end_of_file;
info->seek_not_done=1;
}
if (info->seek_not_done)
{ /* File touched, do seek */
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
{
IF_APPEND_CACHE
unlock_append_buffer(info);
DBUG_RETURN((info->error= -1));
}
if (!append_cache)
info->seek_not_done=0;
}
info->write_pos=info->buffer;
info->write_pos= info->write_buffer;
if (!append_cache)
info->pos_in_file+=length;
info->write_end=(info->buffer+info->buffer_length-
(info->pos_in_file & (IO_SIZE-1)));
if (append_cache)
{
info->append_read_pos = info->buffer;
info->append_end = info->write_end;
}
if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
info->write_end= (info->write_buffer+info->buffer_length-
((pos_in_file+length) & (IO_SIZE-1)));
/* Set this to be used if we are using SEQ_READ_APPEND */
info->append_read_pos = info->write_buffer;
if (my_write(info->file,info->write_buffer,length,
info->myflags | MY_NABP))
info->error= -1;
else
info->error= 0;
IF_APPEND_CACHE
unlock_append_buffer(info);
set_if_bigger(info->end_of_file,(pos_in_file+length));
DBUG_RETURN(info->error);
}
}
......@@ -888,7 +935,8 @@ int end_io_cache(IO_CACHE *info)
int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
if ((pre_close=info->pre_close))
(*pre_close)(info);
if (info->alloced_buffer)
{
......@@ -896,13 +944,28 @@ int end_io_cache(IO_CACHE *info)
if (info->file != -1) /* File doesn't exist */
error=flush_io_cache(info);
my_free((gptr) info->buffer,MYF(MY_WME));
info->buffer=info->rc_pos=(byte*) 0;
info->alloced_buffer = 0;
info->buffer=info->read_pos=(byte*) 0;
}
if (info->type == SEQ_READ_APPEND)
{
/* Destroy allocated mutex */
info->type=0;
#ifdef THREAD
pthread_mutex_destroy(&info->append_buffer_lock);
#endif
}
DBUG_RETURN(error);
} /* end_io_cache */
/**********************************************************************
Testing of MF_IOCACHE
**********************************************************************/
#ifdef MAIN
#include <my_dir.h>
void die(const char* fmt, ...)
{
va_list va_args;
......@@ -916,7 +979,7 @@ void die(const char* fmt, ...)
int open_file(const char* fname, IO_CACHE* info, int cache_size)
{
int fd;
if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0)
if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
die("Could not open %s", fname);
if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
die("failed in init_io_cache()");
......@@ -960,8 +1023,8 @@ int main(int argc, char** argv)
char buf[4];
int block_size = abs(rand() % max_block);
int4store(buf, block_size);
if (my_b_write(&sra_cache,buf,4) ||
my_b_write(&sra_cache, block, block_size))
if (my_b_append(&sra_cache,buf,4) ||
my_b_append(&sra_cache, block, block_size))
die("write failed");
total_bytes += 4+block_size;
}
......@@ -985,6 +1048,3 @@ supposedly written\n");
return 0;
}
#endif
......@@ -26,22 +26,42 @@
#include <m_ctype.h>
/*
** Fix that next read will be made at certain position
** For write cache, make next write happen at a certain position
Fix that next read will be made at certain position
For write cache, make next write happen at a certain position
*/
void my_b_seek(IO_CACHE *info,my_off_t pos)
{
DBUG_ENTER("my_b_seek");
DBUG_PRINT("enter",("pos: %lu", (ulong) pos));
if (info->type == READ_CACHE)
{
info->rc_pos=info->rc_end=info->buffer;
byte* try_pos=info->read_pos + (pos - info->pos_in_file);
if (try_pos >= info->buffer &&
try_pos <= info->read_end)
{
/* The position is in the current buffer; Reuse it */
info->read_pos = try_pos;
DBUG_VOID_RETURN;
}
else
{
/* Force a new read on next my_b_read */
info->read_pos=info->read_end=info->buffer;
}
}
else if (info->type == WRITE_CACHE)
{
byte* try_write_pos;
try_write_pos = info->write_pos + (pos - info->pos_in_file);
if (try_write_pos >= info->buffer && try_write_pos <= info->write_end)
info->write_pos = try_write_pos;
byte* try_pos;
/* If write is in current buffer, reuse it */
try_pos = info->write_pos + (pos - info->pos_in_file);
if (try_pos >= info->write_buffer &&
try_pos <= info->write_end)
{
info->write_pos = try_pos;
DBUG_VOID_RETURN;
}
else
flush_io_cache(info);
}
......@@ -51,14 +71,15 @@ void my_b_seek(IO_CACHE *info,my_off_t pos)
/*
** Fill buffer. Note that this assumes that you have already used
** all characters in the CACHE, independent of the rc_pos value!
** all characters in the CACHE, independent of the read_pos value!
** return: 0 on error or EOF (info->error = -1 on error)
** number of characters
*/
uint my_b_fill(IO_CACHE *info)
{
my_off_t pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
my_off_t pos_in_file=(info->pos_in_file+
(uint) (info->read_end - info->buffer));
my_off_t max_length;
uint diff_length,length;
if (info->seek_not_done)
......@@ -86,8 +107,8 @@ uint my_b_fill(IO_CACHE *info)
info->error= -1;
return 0;
}
info->rc_pos=info->buffer;
info->rc_end=info->buffer+length;
info->read_pos=info->buffer;
info->read_end=info->buffer+length;
info->pos_in_file=pos_in_file;
return length;
}
......@@ -113,11 +134,11 @@ uint my_b_gets(IO_CACHE *info, char *to, uint max_length)
char *pos,*end;
if (length > max_length)
length=max_length;
for (pos=info->rc_pos,end=pos+length ; pos < end ;)
for (pos=info->read_pos,end=pos+length ; pos < end ;)
{
if ((*to++ = *pos++) == '\n')
{
info->rc_pos=pos;
info->read_pos=pos;
*to='\0';
return (uint) (to-start);
}
......@@ -125,7 +146,7 @@ uint my_b_gets(IO_CACHE *info, char *to, uint max_length)
if (!(max_length-=length))
{
/* Found enough charcters; Return found string */
info->rc_pos=pos;
info->read_pos=pos;
*to='\0';
return (uint) (to-start);
}
......
......@@ -20,9 +20,11 @@
#include <my_global.h>
#include <my_sys.h>
#include <m_string.h>
#undef EXTRA_DEBUG
#define EXTRA_DEBUG
void init_alloc_root(MEM_ROOT *mem_root, uint block_size, uint pre_alloc_size)
void init_alloc_root(MEM_ROOT *mem_root, uint block_size,
uint pre_alloc_size __attribute__((unused)))
{
mem_root->free=mem_root->used=0;
mem_root->min_malloc=32;
......
......@@ -32,7 +32,7 @@ my_bool bitmap_init(MY_BITMAP *map, uint bitmap_size)
if (!(map->bitmap=(uchar*) my_malloc((bitmap_size+7)/8,
MYF(MY_WME | MY_ZEROFILL))))
return 1;
dbug_assert(bitmap_size != ~(uint) 0);
DBUG_ASSERT(bitmap_size != ~(uint) 0);
#ifdef THREAD
pthread_mutex_init(&map->mutex, MY_MUTEX_INIT_FAST);
#endif
......
......@@ -424,7 +424,7 @@ struct hostent *my_gethostbyname_r(const char *name,
int buflen, int *h_errnop)
{
struct hostent *hp;
dbug_assert((size_t) buflen >= sizeof(*result));
DBUG_ASSERT((size_t) buflen >= sizeof(*result));
if (gethostbyname_r(name,result, buffer, (size_t) buflen, &hp, h_errnop))
return 0;
return hp;
......@@ -436,7 +436,7 @@ struct hostent *my_gethostbyname_r(const char *name,
struct hostent *result, char *buffer,
int buflen, int *h_errnop)
{
dbug_assert(buflen >= sizeof(struct hostent_data));
DBUG_ASSERT(buflen >= sizeof(struct hostent_data));
if (gethostbyname_r(name,result,(struct hostent_data *) buffer) == -1)
{
*h_errnop= errno;
......@@ -452,7 +452,7 @@ struct hostent *my_gethostbyname_r(const char *name,
int buflen, int *h_errnop)
{
struct hostent *hp;
dbug_assert(buflen >= sizeof(struct hostent_data));
DBUG_ASSERT(buflen >= sizeof(struct hostent_data));
hp= gethostbyname_r(name,result,(struct hostent_data *) buffer);
*h_errnop= errno;
return hp;
......
......@@ -26,7 +26,8 @@ my_off_t my_seek(File fd, my_off_t pos, int whence,
reg1 os_off_t newpos;
DBUG_ENTER("my_seek");
DBUG_PRINT("my",("Fd: %d Hpos: %lu Pos: %lu Whence: %d MyFlags: %d",
fd, ((ulonglong) pos) >> 32, (ulong) pos, whence, MyFlags));
fd, (ulong) (((ulonglong) pos) >> 32), (ulong) pos,
whence, MyFlags));
newpos=lseek(fd, pos, whence);
if (newpos == (os_off_t) -1)
{
......@@ -34,6 +35,10 @@ my_off_t my_seek(File fd, my_off_t pos, int whence,
DBUG_PRINT("error",("lseek: %lu, errno: %d",newpos,errno));
DBUG_RETURN(MY_FILEPOS_ERROR);
}
if (newpos != pos)
{
DBUG_PRINT("exit",("pos: %lu", (ulong) newpos));
}
DBUG_RETURN((my_off_t) newpos);
} /* my_seek */
......@@ -53,6 +58,6 @@ my_off_t my_tell(File fd, myf MyFlags __attribute__((unused)))
#endif
if (pos == (os_off_t) -1)
my_errno=errno;
DBUG_PRINT("exit",("pos: %lu",pos));
DBUG_PRINT("exit",("pos: %lu", (ulong) pos));
DBUG_RETURN((my_off_t) pos);
} /* my_tell */
......@@ -1045,7 +1045,7 @@ int ha_berkeley::restore_keys(DB_TXN *trans, key_map changed_keys,
}
err:
dbug_assert(error != DB_KEYEXIST);
DBUG_ASSERT(error != DB_KEYEXIST);
DBUG_RETURN(error);
}
......@@ -1187,7 +1187,7 @@ int ha_berkeley::remove_key(DB_TXN *trans, uint keynr, const byte *record,
((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) ==
HA_NOSAME))
{ // Unique key
dbug_assert(keynr == primary_key || prim_key->data != key_buff2);
DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
error=key_file[keynr]->del(key_file[keynr], trans,
keynr == primary_key ?
prim_key :
......@@ -1201,7 +1201,7 @@ int ha_berkeley::remove_key(DB_TXN *trans, uint keynr, const byte *record,
row to find the key to be delete and delete it.
We will never come here with keynr = primary_key
*/
dbug_assert(keynr != primary_key && prim_key->data != key_buff2);
DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor;
if (!(error=key_file[keynr]->cursor(key_file[keynr], trans,
&tmp_cursor, 0)))
......
......@@ -196,7 +196,7 @@ convert_error_code_to_mysql(
return(HA_ERR_TO_BIG_ROW);
} else {
dbug_assert(0);
DBUG_ASSERT(0);
return(-1); // Unknown error
}
......@@ -259,7 +259,7 @@ check_trx_exists(
trx = (trx_t*) thd->transaction.all.innobase_tid;
if (trx == NULL) {
dbug_assert(thd != NULL);
DBUG_ASSERT(thd != NULL);
trx = trx_allocate_for_mysql();
trx->mysql_thd = thd;
......@@ -852,7 +852,7 @@ normalize_table_name(
name_ptr = ptr + 1;
dbug_assert(ptr > name);
DBUG_ASSERT(ptr > name);
ptr--;
......@@ -975,7 +975,7 @@ ha_innobase::open(
ref_length = DATA_ROW_ID_LEN + 10;
dbug_assert(key_used_on_scan == MAX_KEY);
DBUG_ASSERT(key_used_on_scan == MAX_KEY);
}
auto_inc_counter_for_this_stat = 0;
......@@ -1119,8 +1119,8 @@ innobase_mysql_cmp(
enum_field_types mysql_tp;
int ret;
dbug_assert(a_length != UNIV_SQL_NULL);
dbug_assert(b_length != UNIV_SQL_NULL);
DBUG_ASSERT(a_length != UNIV_SQL_NULL);
DBUG_ASSERT(b_length != UNIV_SQL_NULL);
mysql_tp = (enum_field_types) mysql_type;
......@@ -1158,11 +1158,11 @@ get_innobase_type_from_mysql_type(
8 bits: this is used in ibuf and also when DATA_NOT_NULL is
ORed to the type */
dbug_assert((ulint)FIELD_TYPE_STRING < 256);
dbug_assert((ulint)FIELD_TYPE_VAR_STRING < 256);
dbug_assert((ulint)FIELD_TYPE_DOUBLE < 256);
dbug_assert((ulint)FIELD_TYPE_FLOAT < 256);
dbug_assert((ulint)FIELD_TYPE_DECIMAL < 256);
DBUG_ASSERT((ulint)FIELD_TYPE_STRING < 256);
DBUG_ASSERT((ulint)FIELD_TYPE_VAR_STRING < 256);
DBUG_ASSERT((ulint)FIELD_TYPE_DOUBLE < 256);
DBUG_ASSERT((ulint)FIELD_TYPE_FLOAT < 256);
DBUG_ASSERT((ulint)FIELD_TYPE_DECIMAL < 256);
switch (field->type()) {
case FIELD_TYPE_VAR_STRING: if (field->flags & BINARY_FLAG) {
......@@ -2368,7 +2368,7 @@ ha_innobase::position(
len = store_key_val_for_row(primary_key, (char*) ref, record);
}
dbug_assert(len <= ref_length);
DBUG_ASSERT(len <= ref_length);
ref_stored_len = len;
}
......
......@@ -774,13 +774,13 @@ bool MYSQL_LOG::write(IO_CACHE *cache)
length=my_b_bytes_in_cache(cache);
do
{
if (my_b_write(&log_file, cache->rc_pos, length))
if (my_b_write(&log_file, cache->read_pos, length))
{
if (!write_error)
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
goto err;
}
cache->rc_pos=cache->rc_end; // Mark buffer used up
cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length=my_b_fill(cache)));
if (flush_io_cache(&log_file))
{
......
......@@ -52,26 +52,32 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
{
int read_length;
NET *net= &(current_thd)->net;
DBUG_ENTER("_my_b_net_read");
if (info->end_of_file)
return 1; /* because my_b_get (no _) takes 1 byte at a time */
if (!info->end_of_file)
DBUG_RETURN(1); /* because my_b_get (no _) takes 1 byte at a time */
read_length=my_net_read(net);
if (read_length == (int) packet_error)
{
info->error= -1;
return 1;
DBUG_RETURN(1);
}
if (read_length == 0)
{
/* End of file from client */
info->end_of_file = 1; return 1;
info->end_of_file= 0; /* End of file from client */
DBUG_RETURN(1);
}
/* to set up stuff for my_b_get (no _) */
info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length;
Buffer[0] = info->rc_pos[0]; /* length is always 1 */
info->rc_pos++;
info->buffer = info->rc_pos;
return 0;
info->read_end = (info->read_pos = (byte*) net->read_pos) + read_length;
Buffer[0] = info->read_pos[0]; /* length is always 1 */
info->read_pos++;
/*
info->request_pos is used by log_loaded_block() to know the size
of the current block
*/
info->request_pos=info->read_pos;
DBUG_RETURN(0);
}
} /* extern "C" */
......@@ -2614,7 +2614,7 @@ int QUICK_SELECT_DESC::get_next()
}
else
{
dbug_assert(range->flag & NEAR_MAX || range_reads_after_key(range));
DBUG_ASSERT(range->flag & NEAR_MAX || range_reads_after_key(range));
/* Note: even if max_key is only a prefix, HA_READ_AFTER_KEY will
* do the right thing - go past all keys which match the prefix */
result=file->index_read(record, (byte*) range->max_key,
......
......@@ -882,7 +882,7 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name,
table->outer_join=table->null_row=table->maybe_null=0;
table->status=STATUS_NO_RECORD;
table->keys_in_use_for_query=table->used_keys= table->keys_in_use;
dbug_assert(table->key_read == 0);
DBUG_ASSERT(table->key_read == 0);
DBUG_RETURN(table);
}
......
......@@ -126,7 +126,7 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables,
mode=RNEXT;
break;
case RLAST:
dbug_assert(keyname != 0);
DBUG_ASSERT(keyname != 0);
err=table->file->index_last(table->record[0]);
mode=RPREV;
break;
......@@ -136,12 +136,12 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables,
table->file->rnd_next(table->record[0]);
break;
case RPREV:
dbug_assert(keyname != 0);
DBUG_ASSERT(keyname != 0);
err=table->file->index_prev(table->record[0]);
break;
case RKEY:
{
dbug_assert(keyname != 0);
DBUG_ASSERT(keyname != 0);
KEY *keyinfo=table->key_info+keyno;
KEY_PART_INFO *key_part=keyinfo->key_part;
uint key_len;
......
......@@ -551,7 +551,7 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
(IO_CACHE_CALLBACK)log_loaded_block;
(IO_CACHE_CALLBACK) log_loaded_block;
}
}
}
......
......@@ -288,7 +288,7 @@ static void decrease_user_connections(const char *user, const char *host)
uc = (struct user_conn *) hash_search(&hash_user_connections,
(byte*) temp_user, temp_len);
dbug_assert(uc != 0); // We should always find the user
DBUG_ASSERT(uc != 0); // We should always find the user
if (!uc)
goto end; // Safety; Something went wrong
if (! --uc->connections)
......
......@@ -137,7 +137,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
MYF(MY_WME)))
MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
{
*errmsg = "Could not open log file"; // This will not be sent
goto err;
......@@ -1024,8 +1024,10 @@ int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
char* buffer = (char*)file->buffer;
if (!(block_len = file->rc_end - buffer))
/* file->request_pos contains position where we started last read */
char* buffer = (char*) file->request_pos;
if (!(block_len = file->read_end - buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
......
......@@ -2680,7 +2680,7 @@ eq_ref_table(JOIN *join, ORDER *start_order, JOIN_TAB *tab)
if (order)
{
found++;
dbug_assert(!(order->used & map));
DBUG_ASSERT(!(order->used & map));
order->used|=map;
continue; // Used in ORDER BY
}
......
......@@ -1687,7 +1687,7 @@ static void init_pid_file()
if (!fp)
die("Could not open pid file %s", pid_file);
created_pid_file=1;
fprintf(fp, "%d\n", getpid());
fprintf(fp, "%d\n", (int) getpid());
fclose(fp);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment