Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
53ac59b9
Commit
53ac59b9
authored
Jun 30, 2002
by
serg@serg.mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
"myisamchk -p" for parallel recover works (no extensive testing though)
parent
3bd8dba6
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
160 additions
and
87 deletions
+160
-87
include/my_sys.h
include/my_sys.h
+2
-25
include/myisam.h
include/myisam.h
+3
-1
myisam/mi_check.c
myisam/mi_check.c
+29
-18
myisam/myisamchk.c
myisam/myisamchk.c
+17
-7
myisam/sort.c
myisam/sort.c
+21
-11
mysys/mf_iocache.c
mysys/mf_iocache.c
+88
-25
No files found.
include/my_sys.h
View file @
53ac59b9
...
...
@@ -322,29 +322,6 @@ typedef struct st_io_cache_share
my_bool
alloced
;
#endif
}
IO_CACHE_SHARE
;
#define lock_io_cache(info) \
( \
(errno=pthread_mutex_lock(&((info)->share->mutex))) ? -1 : ( \
(info)->share->count ? ( \
--((info)->share->count), \
pthread_cond_wait(&((info)->share->cond), \
&((info)->share->mutex)), \
(++((info)->share->count) ? \
pthread_mutex_unlock(&((info)->share->mutex)) : 0)) \
: 1 ) \
)
#define unlock_io_cache(info) \
( \
pthread_cond_broadcast(&((info)->share->cond)), \
pthread_mutex_unlock (&((info)->share->mutex)) \
)
/* -- to catch errors
#else
#define lock_io_cache(info)
#define unlock_io_cache(info)
*/
#endif
typedef
struct
st_io_cache
/* Used when cacheing files */
...
...
@@ -686,9 +663,9 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
extern
int
_my_b_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
#ifdef THREAD
extern
int
_my_b_read_r
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
init_io_cache_share
(
IO_CACHE
*
info
,
extern
void
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
);
extern
int
remove_io_thread
(
IO_CACHE
*
info
);
extern
void
remove_io_thread
(
IO_CACHE
*
info
);
#endif
extern
int
_my_b_seq_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_net_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
...
...
include/myisam.h
View file @
53ac59b9
...
...
@@ -382,7 +382,7 @@ typedef struct st_mi_sort_param
IO_CACHE
tempfile
,
tempfile_for_exceptions
;
DYNAMIC_ARRAY
buffpek
;
my_off_t
pos
,
max_pos
,
filepos
,
start_recpos
;
my_bool
fix_datafile
;
my_bool
fix_datafile
,
master
;
char
*
record
;
char
*
tmpdir
;
int
(
*
key_cmp
)(
struct
st_mi_sort_param
*
,
const
void
*
,
const
void
*
);
...
...
@@ -403,6 +403,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
int
mi_sort_index
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
my_string
name
);
int
mi_repair_by_sort
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
);
int
mi_repair_parallel
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
);
int
change_to_newfile
(
const
char
*
filename
,
const
char
*
old_ext
,
const
char
*
new_ext
,
uint
raid_chunks
,
myf
myflags
);
...
...
myisam/mi_check.c
View file @
53ac59b9
...
...
@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* Descri
pt
, check and repair of MyISAM tables */
/* Descri
be
, check and repair of MyISAM tables */
#include "ftdefs.h"
#include <m_ctype.h>
...
...
@@ -1186,7 +1186,8 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
param
->
read_cache
.
end_of_file
=
sort_info
.
filelength
=
my_seek
(
info
->
dfile
,
0L
,
MY_SEEK_END
,
MYF
(
0
));
sort_info
.
dupp
=
0
;
sort_param
.
fix_datafile
=
(
my_bool
)
(
!
rep_quick
);
sort_param
.
fix_datafile
=
(
my_bool
)
(
!
rep_quick
);
sort_param
.
master
=
1
;
sort_info
.
max_records
=
~
(
ha_rows
)
0
;
set_data_file_type
(
&
sort_info
,
share
);
...
...
@@ -1873,6 +1874,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
sort_param
.
tmpdir
=
param
->
tmpdir
;
sort_param
.
sort_info
=&
sort_info
;
sort_param
.
fix_datafile
=
(
my_bool
)
(
!
rep_quick
);
sort_param
.
master
=
1
;
del
=
info
->
state
->
del
;
param
->
glob_crc
=
0
;
...
...
@@ -2082,7 +2084,7 @@ err:
/* same as mi_repair_by_sort */
/* but do it multithreaded */
int
mi_repair_
by_sort_r
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
int
mi_repair_
parallel
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
const
char
*
name
,
int
rep_quick
)
{
int
got_error
;
...
...
@@ -2091,7 +2093,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
ha_rows
start_records
;
my_off_t
new_header_length
,
del
;
File
new_file
;
MI_SORT_PARAM
*
sort_param
=
0
,
*
sinfo
;
MI_SORT_PARAM
*
sort_param
=
0
;
MYISAM_SHARE
*
share
=
info
->
s
;
ulong
*
rec_per_key_part
;
MI_KEYSEG
*
keyseg
;
...
...
@@ -2099,7 +2101,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
IO_CACHE_SHARE
io_share
;
SORT_INFO
sort_info
;
ulonglong
key_map
=
share
->
state
.
key_map
;
DBUG_ENTER
(
"mi_repair_
by_sort_r
"
);
DBUG_ENTER
(
"mi_repair_
parallel
"
);
start_records
=
info
->
state
->
records
;
got_error
=
1
;
...
...
@@ -2244,6 +2246,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param
[
i
].
lock_in_memory
=
lock_memory
;
sort_param
[
i
].
tmpdir
=
param
->
tmpdir
;
sort_param
[
i
].
sort_info
=&
sort_info
;
sort_param
[
i
].
master
=
0
;
sort_param
[
i
].
fix_datafile
=
0
;
sort_param
[
i
].
filepos
=
new_header_length
;
...
...
@@ -2271,7 +2274,8 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
sort_param
[
i
].
key_length
+=
ft_max_word_len_for_sort
-
ft_max_word_len
;
}
sort_info
.
total_keys
=
i
;
sort_param
[
0
].
fix_datafile
=
!
rep_quick
;
sort_param
[
0
].
master
=
1
;
sort_param
[
0
].
fix_datafile
=
(
my_bool
)(
!
rep_quick
);
sort_info
.
got_error
=
0
;
pthread_mutex_init
(
&
sort_info
.
mutex
,
MY_MUTEX_INIT_FAST
);
...
...
@@ -2289,7 +2293,7 @@ int mi_repair_by_sort_r(MI_CHECK *param, register MI_INFO *info,
In the second one all the threads will fill their sort_buffers
(and call write_keys) at the same time, putting more stress on i/o.
*/
#if
1
#if
ndef USING_FIRST_APPROACH
param
->
sort_buffer_length
/
sort_info
.
total_keys
;
#else
param
->
sort_buffer_length
*
sort_param
[
i
].
key_length
/
length
;
...
...
@@ -2454,7 +2458,8 @@ static int sort_key_read(MI_SORT_PARAM *sort_param, void *key)
if
(
info
->
state
->
records
==
sort_info
->
max_records
)
{
mi_check_print_error
(
sort_info
->
param
,
"Found too many records; Can`t continue"
);
"Key %d - Found too many records; Can't continue"
,
sort_param
->
key
+
1
);
DBUG_RETURN
(
1
);
}
sort_param
->
real_key_length
=
(
info
->
s
->
rec_reflength
+
...
...
@@ -2543,7 +2548,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if
(
!
sort_param
->
fix_datafile
)
{
sort_param
->
filepos
=
sort_param
->
pos
;
share
->
state
.
split
++
;
if
(
sort_param
->
master
)
share
->
state
.
split
++
;
}
sort_param
->
max_pos
=
(
sort_param
->
pos
+=
share
->
base
.
pack_reclength
);
if
(
*
sort_param
->
record
)
...
...
@@ -2553,7 +2559,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
mi_static_checksum
(
info
,
sort_param
->
record
));
DBUG_RETURN
(
0
);
}
if
(
!
sort_param
->
fix_datafile
)
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
)
{
info
->
state
->
del
++
;
info
->
state
->
empty
+=
share
->
base
.
pack_reclength
;
...
...
@@ -2699,7 +2705,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
}
if
(
b_type
&
(
BLOCK_DELETED
|
BLOCK_SYNC_ERROR
))
{
if
(
!
sort_param
->
fix_datafile
&&
(
b_type
&
BLOCK_DELETED
))
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
&&
(
b_type
&
BLOCK_DELETED
))
{
info
->
state
->
empty
+=
block_info
.
block_len
;
info
->
state
->
del
++
;
...
...
@@ -2718,7 +2725,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
continue
;
}
if
(
!
sort_param
->
fix_datafile
)
if
(
!
sort_param
->
fix_datafile
&&
sort_param
->
master
)
share
->
state
.
split
++
;
if
(
!
found_record
++
)
{
...
...
@@ -2860,7 +2867,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
if
(
!
sort_param
->
fix_datafile
)
{
sort_param
->
filepos
=
sort_param
->
pos
;
share
->
state
.
split
++
;
if
(
sort_param
->
master
)
share
->
state
.
split
++
;
}
sort_param
->
max_pos
=
(
sort_param
->
pos
=
block_info
.
filepos
+
block_info
.
rec_len
);
...
...
@@ -2968,12 +2976,15 @@ int sort_write_record(MI_SORT_PARAM *sort_param)
break
;
}
}
info
->
state
->
records
++
;
if
((
param
->
testflag
&
T_WRITE_LOOP
)
&&
(
info
->
state
->
records
%
WRITE_COUNT
)
==
0
)
if
(
sort_param
->
master
)
{
char
llbuff
[
22
];
printf
(
"%s
\r
"
,
llstr
(
info
->
state
->
records
,
llbuff
));
VOID
(
fflush
(
stdout
));
info
->
state
->
records
++
;
if
((
param
->
testflag
&
T_WRITE_LOOP
)
&&
(
info
->
state
->
records
%
WRITE_COUNT
)
==
0
)
{
char
llbuff
[
22
];
printf
(
"%s
\r
"
,
llstr
(
info
->
state
->
records
,
llbuff
));
VOID
(
fflush
(
stdout
));
}
}
DBUG_RETURN
(
0
);
}
/* sort_write_record */
...
...
myisam/myisamchk.c
View file @
53ac59b9
...
...
@@ -216,9 +216,15 @@ static struct my_option my_long_options[] =
{
"recover"
,
'r'
,
"Can fix almost anything except unique keys that aren't unique."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"paraller-recover"
,
'p'
,
"Same as '-r' but creates all the keys in parallel"
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"safe-recover"
,
'o'
,
"Uses old recovery method; Slower than '-r' but can handle a couple of cases where '-r' reports that it can't fix the data file."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"sort-recover"
,
'n'
,
"Force recovering with sorting even if the temporary file was very big."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"start-check-pos"
,
OPT_START_CHECK_POS
,
"No help available."
,
0
,
0
,
0
,
GET_ULL
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
...
...
@@ -244,9 +250,6 @@ static struct my_option my_long_options[] =
(
gptr
*
)
&
check_param
.
opt_sort_key
,
(
gptr
*
)
&
check_param
.
opt_sort_key
,
0
,
GET_UINT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"sort-recover"
,
'n'
,
"Force recovering with sorting even if the temporary file was very big."
,
0
,
0
,
0
,
GET_NO_ARG
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"tmpdir"
,
't'
,
"Path for temporary files."
,
(
gptr
*
)
&
check_param
.
tmpdir
,
...
...
@@ -514,6 +517,11 @@ get_one_option(int optid,
if
(
argument
!=
disabled_my_option
)
check_param
.
testflag
|=
T_REP_BY_SORT
;
break
;
case
'p'
:
check_param
.
testflag
&=
~
T_REP_ANY
;
if
(
argument
!=
disabled_my_option
)
check_param
.
testflag
|=
T_REP_PARALLEL
;
break
;
case
'o'
:
check_param
.
testflag
&=
~
T_REP_ANY
;
check_param
.
force_sort
=
0
;
...
...
@@ -864,8 +872,7 @@ static int myisamchk(MI_CHECK *param, my_string filename)
if
(
tmp
!=
share
->
state
.
key_map
)
info
->
update
|=
HA_STATE_CHANGED
;
}
if
(
rep_quick
&&
chk_del
(
param
,
info
,
param
->
testflag
&
~
T_VERBOSE
))
if
(
rep_quick
&&
chk_del
(
param
,
info
,
param
->
testflag
&
~
T_VERBOSE
))
{
if
(
param
->
testflag
&
T_FORCE_CREATE
)
{
...
...
@@ -881,14 +888,17 @@ static int myisamchk(MI_CHECK *param, my_string filename)
}
if
(
!
error
)
{
if
((
param
->
testflag
&
T_REP_BY_SORT
)
&&
if
((
param
->
testflag
&
(
T_REP_BY_SORT
|
T_REP_PARALLEL
)
)
&&
(
share
->
state
.
key_map
||
(
rep_quick
&&
!
param
->
keys_in_use
&&
!
recreate
))
&&
mi_test_if_sort_rep
(
info
,
info
->
state
->
records
,
info
->
s
->
state
.
key_map
,
param
->
force_sort
))
{
error
=
mi_repair_by_sort
(
param
,
info
,
filename
,
rep_quick
);
if
(
param
->
testflag
&
T_REP_BY_SORT
)
error
=
mi_repair_by_sort
(
param
,
info
,
filename
,
rep_quick
);
else
error
=
mi_repair_parallel
(
param
,
info
,
filename
,
rep_quick
);
state_updated
=
1
;
}
else
if
(
param
->
testflag
&
T_REP_ANY
)
...
...
myisam/sort.c
View file @
53ac59b9
...
...
@@ -268,17 +268,23 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
uchar
**
sort_keys
;
MI_KEYSEG
*
keyseg
;
error
=
1
;
if
(
my_thread_init
())
goto
err
;
if
(
info
->
sort_info
->
got_error
)
goto
err
;
my_b_clear
(
&
info
->
tempfile
);
my_b_clear
(
&
info
->
tempfile_for_exceptions
);
bzero
((
char
*
)
&
info
->
buffpek
,
sizeof
(
info
->
buffpek
));
bzero
((
char
*
)
&
info
->
unique
,
sizeof
(
info
->
unique
));
sort_keys
=
(
uchar
**
)
NULL
;
error
=
1
;
if
(
info
->
sort_info
->
got_error
)
goto
err
;
sort_keys
=
(
uchar
**
)
NULL
;
memavl
=
max
(
info
->
sortbuff_size
,
MIN_SORT_MEMORY
);
idx
=
info
->
sort_info
->
max_records
;
sort_length
=
info
->
key_length
;
maxbuffer
=
1
;
while
(
memavl
>=
MIN_SORT_MEMORY
)
{
...
...
@@ -327,7 +333,7 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
idx
=
error
=
0
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
while
(
!
(
error
=
info
->
sort_info
->
got_error
)
||
while
(
!
(
error
=
info
->
sort_info
->
got_error
)
&&
!
(
error
=
(
*
info
->
key_read
)(
info
,
sort_keys
[
idx
])))
{
if
(
info
->
real_key_length
>
info
->
key_length
)
...
...
@@ -342,7 +348,6 @@ void *_thr_find_all_keys(MI_SORT_PARAM *info)
if
(
write_keys
(
info
,
sort_keys
,
idx
-
1
,
(
BUFFPEK
*
)
alloc_dynamic
(
&
info
->
buffpek
),
&
info
->
tempfile
))
goto
err
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
memcpy
(
sort_keys
[
0
],
sort_keys
[
idx
-
1
],(
size_t
)
info
->
key_length
);
idx
=
1
;
...
...
@@ -378,6 +383,7 @@ ok:
info
->
sort_info
->
threads_running
--
;
pthread_cond_signal
(
&
info
->
sort_info
->
cond
);
pthread_mutex_unlock
(
&
info
->
sort_info
->
mutex
);
my_thread_end
();
return
NULL
;
}
/* _thr_find_all_keys */
...
...
@@ -389,12 +395,13 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param)
ulong
*
rec_per_key_part
=
param
->
rec_per_key_part
;
int
i
,
got_error
=
sort_info
->
got_error
;
MI_INFO
*
info
=
sort_info
->
info
;
MYISAM_SHARE
*
share
=
info
->
s
;
MYISAM_SHARE
*
share
=
info
->
s
;
MI_SORT_PARAM
*
sinfo
;
byte
*
mergebuf
=
0
;
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
sinfo
++
,
rec_per_key_part
+=
sinfo
->
keyinfo
->
keysegs
)
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
rec_per_key_part
+=
sinfo
->
keyinfo
->
keysegs
,
sinfo
++
)
{
if
(
!
sinfo
->
sort_keys
)
{
...
...
@@ -417,10 +424,11 @@ int _thr_write_keys(MI_SORT_PARAM *sort_param)
sinfo
->
sort_keys
=
0
;
}
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
sinfo
++
,
for
(
i
=
0
,
sinfo
=
sort_param
;
i
<
sort_info
->
total_keys
;
i
++
,
delete_dynamic
(
&
sinfo
->
buffpek
),
close_cached_file
(
&
sinfo
->
tempfile
),
close_cached_file
(
&
sinfo
->
tempfile_for_exceptions
))
close_cached_file
(
&
sinfo
->
tempfile_for_exceptions
),
sinfo
++
)
{
if
(
got_error
)
continue
;
if
(
sinfo
->
buffpek
.
elements
)
...
...
@@ -520,8 +528,10 @@ static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
buffpek
->
count
=
count
;
for
(
end
=
sort_keys
+
count
;
sort_keys
!=
end
;
sort_keys
++
)
{
if
(
my_b_write
(
tempfile
,(
byte
*
)
*
sort_keys
,(
uint
)
sort_length
))
DBUG_RETURN
(
1
);
/* purecov: inspected */
}
DBUG_RETURN
(
0
);
}
/* write_keys */
...
...
@@ -654,7 +664,7 @@ merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
if
(
init_queue
(
&
queue
,(
uint
)
(
Tb
-
Fb
)
+
1
,
offsetof
(
BUFFPEK
,
key
),
0
,
(
int
(
*
)(
void
*
,
byte
*
,
byte
*
))
info
->
key_cmp
,
(
void
*
)
info
->
sort_info
))
(
void
*
)
info
))
DBUG_RETURN
(
1
);
/* purecov: inspected */
for
(
buffpek
=
Fb
;
buffpek
<=
Tb
;
buffpek
++
)
...
...
mysys/mf_iocache.c
View file @
53ac59b9
...
...
@@ -68,6 +68,9 @@ static void my_aiowait(my_aio_result *result);
#define unlock_append_buffer(info)
#endif
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
static
void
init_functions
(
IO_CACHE
*
info
,
enum
cache_type
type
)
{
...
...
@@ -425,31 +428,71 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
}
#ifdef THREAD
int
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
)
/* Prepare IO_CACHE for shared use */
void
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
)
{
DBUG_ASSERT
(
info
->
type
==
READ_CACHE
);
pthread_mutex_init
(
&
s
->
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
s
->
cond
,
0
);
s
->
count
=
num_threads
;
pthread_mutex_init
(
&
s
->
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
s
->
cond
,
0
);
s
->
count
=
num_threads
-
1
;
s
->
active
=
0
;
/* to catch errors */
info
->
share
=
s
;
info
->
read_function
=
_my_b_read_r
;
}
int
remove_io_thread
(
IO_CACHE
*
info
)
/*
Remove a thread from shared access to IO_CACHE
Every thread should do that on exit for not
to deadlock other threads
*/
void
remove_io_thread
(
IO_CACHE
*
info
)
{
if
(
errno
=
pthread_mutex_lock
(
&
info
->
share
->
mutex
))
return
-
1
;
pthread_mutex_lock
(
&
info
->
share
->
mutex
);
if
(
!
info
->
share
->
count
--
)
pthread_cond_signal
(
&
info
->
share
->
cond
);
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
pthread_cond_signal
(
&
info
->
share
->
cond
);
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
return
0
;
}
static
int
lock_io_cache
(
IO_CACHE
*
info
)
{
pthread_mutex_lock
(
&
info
->
share
->
mutex
);
if
(
!
info
->
share
->
count
)
return
1
;
--
(
info
->
share
->
count
);
pthread_cond_wait
(
&
info
->
share
->
cond
,
&
info
->
share
->
mutex
);
/*
count can be -1 here, if one thread was removed (remove_io_cache)
while all others were locked (lock_io_cache).
If this is the case, this thread behaves as if count was 0 from the
very beginning, that is returns 1 and does not unlock the mutex.
*/
if
(
++
(
info
->
share
->
count
))
return
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
else
return
1
;
}
static
void
unlock_io_cache
(
IO_CACHE
*
info
)
{
pthread_cond_broadcast
(
&
info
->
share
->
cond
);
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
}
/*
Read from IO_CACHE when it is shared between several threads.
It works as follows: when a thread tries to read from a file
(that is, after using all the data from the (shared) buffer),
it just hangs on lock_io_cache(), wating for other threads.
When the very last thread attempts a read, lock_io_cache()
returns 1, the thread does actual IO and unlock_io_cache(),
which signals all the waiting threads that data is in the buffer.
*/
int
_my_b_read_r
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
my_off_t
pos_in_file
;
int
length
,
diff_length
,
read_len
;
u
int
length
,
diff_length
,
read_len
;
DBUG_ENTER
(
"_my_b_read_r"
);
if
((
read_len
=
(
uint
)
(
info
->
read_end
-
info
->
read_pos
)))
...
...
@@ -460,28 +503,41 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
Count
-=
read_len
;
}
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
while
(
Count
)
{
while
(
Count
)
{
int
cnt
,
len
;
pos_in_file
=
info
->
pos_in_file
+
(
uint
)(
info
->
read_end
-
info
->
buffer
);
diff_length
=
pos_in_file
&
(
IO_SIZE
-
1
);
diff_length
=
(
uint
)
pos_in_file
&
(
IO_SIZE
-
1
);
length
=
IO_ROUND_UP
(
Count
+
diff_length
)
-
diff_length
;
length
=
(
length
<=
info
->
read_length
)
?
length
+
IO_ROUND_DN
(
info
->
read_length
-
length
)
:
length
-
IO_ROUND_UP
(
length
-
info
->
read_length
)
;
if
(
info
->
type
!=
READ_FIFO
&&
(
length
>
info
->
end_of_file
-
pos_in_file
))
length
=
info
->
end_of_file
-
pos_in_file
;
if
(
length
==
0
)
{
info
->
error
=
(
int
)
read_len
;
DBUG_RETURN
(
1
);
}
if
(
lock_io_cache
(
info
))
{
#if 0 && SAFE_MUTEX
#define PRINT_LOCK(M) printf("Thread %d: mutex is %s\n", my_thread_id(), \
(((safe_mutex_t *)(M))->count ? "Locked" : "Unlocked"))
#else
#define PRINT_LOCK(M)
#endif
PRINT_LOCK
(
&
info
->
share
->
mutex
);
info
->
share
->
active
=
info
;
if
(
info
->
seek_not_done
)
/* File touched, do seek */
VOID
(
my_seek
(
info
->
file
,
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
len
=
my_read
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
);
len
=
(
int
)
my_read
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
);
info
->
read_end
=
info
->
buffer
+
(
len
==
-
1
?
0
:
len
);
info
->
error
=
(
len
==
length
?
0
:
len
);
info
->
error
=
(
len
==
(
int
)
length
?
0
:
len
);
info
->
pos_in_file
=
pos_in_file
;
unlock_io_cache
(
info
);
PRINT_LOCK
(
&
info
->
share
->
mutex
);
}
else
{
...
...
@@ -489,15 +545,16 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
info
->
read_end
=
info
->
share
->
active
->
read_end
;
info
->
pos_in_file
=
info
->
share
->
active
->
pos_in_file
;
len
=
(
info
->
error
==
-
1
?
-
1
:
info
->
read_end
-
info
->
buffer
);
PRINT_LOCK
(
&
info
->
share
->
mutex
);
}
info
->
read_pos
=
info
->
buffer
;
info
->
seek_not_done
=
0
;
if
(
info
->
error
)
if
(
len
<=
0
)
{
info
->
error
=
read_len
;
info
->
error
=
(
int
)
read_len
;
DBUG_RETURN
(
1
);
}
cnt
=
(
len
>
Count
)
?
Count
:
len
;
cnt
=
(
len
>
Count
)
?
(
int
)
Count
:
len
;
memcpy
(
Buffer
,
info
->
read_pos
,
(
size_t
)
cnt
);
Count
-=
cnt
;
Buffer
+=
cnt
;
...
...
@@ -1070,15 +1127,21 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER
(
"end_io_cache"
);
#ifdef THREAD
/* simple protection against multi-close: destroying share first */
if
(
info
->
share
)
if
(
pthread_cond_destroy
(
&
info
->
share
->
cond
)
|
pthread_mutex_destroy
(
&
info
->
share
->
mutex
))
{
#ifdef SAFE_MUTEX
/* simple protection against multi-close: destroying share first */
if
(
pthread_cond_destroy
(
&
info
->
share
->
cond
)
|
pthread_mutex_destroy
(
&
info
->
share
->
mutex
))
{
DBUG_RETURN
(
1
);
}
else
info
->
share
=
0
;
#else
pthread_cond_destroy
(
&
info
->
share
->
cond
);
pthread_mutex_destroy
(
&
info
->
share
->
mutex
);
#endif
info
->
share
=
0
;
}
#endif
if
((
pre_close
=
info
->
pre_close
))
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment