Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
b15a23a7
Commit
b15a23a7
authored
Oct 09, 2006
by
istruewing@chilla.local
Browse files
Options
Browse Files
Download
Plain Diff
Merge chilla.local:/home/mydev/mysql-5.0-bug8283
into chilla.local:/home/mydev/mysql-5.1-bug8283
parents
cab855ca
014c1c88
Changes
10
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
1132 additions
and
233 deletions
+1132
-233
include/my_sys.h
include/my_sys.h
+14
-8
include/myisam.h
include/myisam.h
+1
-1
mysql-test/r/myisam.result
mysql-test/r/myisam.result
+126
-0
mysql-test/t/myisam.test
mysql-test/t/myisam.test
+91
-0
mysys/mf_iocache.c
mysys/mf_iocache.c
+500
-82
storage/myisam/mi_check.c
storage/myisam/mi_check.c
+257
-49
storage/myisam/mi_open.c
storage/myisam/mi_open.c
+4
-1
storage/myisam/mi_packrec.c
storage/myisam/mi_packrec.c
+44
-32
storage/myisam/myisamdef.h
storage/myisam/myisamdef.h
+21
-8
storage/myisam/sort.c
storage/myisam/sort.c
+74
-52
No files found.
include/my_sys.h
View file @
b15a23a7
...
@@ -341,12 +341,18 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
...
@@ -341,12 +341,18 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
#ifdef THREAD
#ifdef THREAD
typedef
struct
st_io_cache_share
typedef
struct
st_io_cache_share
{
{
/* to sync on reads into buffer */
pthread_mutex_t
mutex
;
/* To sync on reads into buffer. */
pthread_mutex_t
mutex
;
pthread_cond_t
cond
;
/* To wait for signals. */
pthread_cond_t
cond
;
pthread_cond_t
cond_writer
;
/* For a synchronized writer. */
int
count
,
total
;
/* Offset in file corresponding to the first byte of buffer. */
/* actual IO_CACHE that filled the buffer */
my_off_t
pos_in_file
;
struct
st_io_cache
*
active
;
/* If a synchronized write cache is the source of the data. */
struct
st_io_cache
*
source_cache
;
byte
*
buffer
;
/* The read buffer. */
byte
*
read_end
;
/* Behind last valid byte of buffer. */
int
running_threads
;
/* threads not in lock. */
int
total_threads
;
/* threads sharing the cache. */
int
error
;
/* Last error. */
#ifdef NOT_YET_IMPLEMENTED
#ifdef NOT_YET_IMPLEMENTED
/* whether the structure should be free'd */
/* whether the structure should be free'd */
my_bool
alloced
;
my_bool
alloced
;
...
@@ -719,8 +725,8 @@ extern void setup_io_cache(IO_CACHE* info);
...
@@ -719,8 +725,8 @@ extern void setup_io_cache(IO_CACHE* info);
extern
int
_my_b_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
#ifdef THREAD
#ifdef THREAD
extern
int
_my_b_read_r
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_read_r
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
void
init_io_cache_share
(
IO_CACHE
*
info
,
extern
void
init_io_cache_share
(
IO_CACHE
*
read_cache
,
IO_CACHE_SHARE
*
cshare
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
);
IO_CACHE
*
write_cache
,
uint
num_threads
);
extern
void
remove_io_thread
(
IO_CACHE
*
info
);
extern
void
remove_io_thread
(
IO_CACHE
*
info
);
#endif
#endif
extern
int
_my_b_seq_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_seq_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
...
...
include/myisam.h
View file @
b15a23a7
...
@@ -419,7 +419,7 @@ typedef struct st_mi_check_param
...
@@ -419,7 +419,7 @@ typedef struct st_mi_check_param
uint
testflag
,
key_cache_block_size
;
uint
testflag
,
key_cache_block_size
;
uint8
language
;
uint8
language
;
my_bool
using_global_keycache
,
opt_lock_memory
,
opt_follow_links
;
my_bool
using_global_keycache
,
opt_lock_memory
,
opt_follow_links
;
my_bool
retry_repair
,
force_sort
,
calc_checksum
;
my_bool
retry_repair
,
force_sort
;
char
temp_filename
[
FN_REFLEN
],
*
isam_file_name
;
char
temp_filename
[
FN_REFLEN
],
*
isam_file_name
;
MY_TMPDIR
*
tmpdir
;
MY_TMPDIR
*
tmpdir
;
int
tmpfile_createflag
;
int
tmpfile_createflag
;
...
...
mysql-test/r/myisam.result
View file @
b15a23a7
...
@@ -796,6 +796,132 @@ a b
...
@@ -796,6 +796,132 @@ a b
xxxxxxxxx bbbbbb
xxxxxxxxx bbbbbb
xxxxxxxxx bbbbbb
xxxxxxxxx bbbbbb
DROP TABLE t1;
DROP TABLE t1;
SET @@myisam_repair_threads=2;
SHOW VARIABLES LIKE 'myisam_repair%';
Variable_name Value
myisam_repair_threads 2
CREATE TABLE t1 (
`_id`
int(11) NOT NULL default '0',
`url`
text,
`email`
text,
`description`
text,
`loverlap`
int(11) default NULL,
`roverlap`
int(11) default NULL,
`lneighbor_id`
int(11) default NULL,
`rneighbor_id`
int(11) default NULL,
`length_`
int(11) default NULL,
`sequence`
mediumtext,
`name`
text,
`_obj_class`
text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
)
ENGINE=MyISAM DEFAULT CHARSET=latin1;
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
SELECT _id FROM t1;
_id
1
2
3
4
5
6
7
8
9
DELETE FROM t1 WHERE _id < 8;
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MyISAM 10 Dynamic 2 # # # # 140 # # # # # #
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
OPTIMIZE TABLE t1;
Table Op Msg_type Msg_text
test.t1 optimize status OK
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MyISAM 10 Dynamic 2 # # # # 0 # # # # # #
SELECT _id FROM t1;
_id
8
9
DROP TABLE t1;
CREATE TABLE t1 (
`_id`
int(11) NOT NULL default '0',
`url`
text,
`email`
text,
`description`
text,
`loverlap`
int(11) default NULL,
`roverlap`
int(11) default NULL,
`lneighbor_id`
int(11) default NULL,
`rneighbor_id`
int(11) default NULL,
`length_`
int(11) default NULL,
`sequence`
mediumtext,
`name`
text,
`_obj_class`
text NOT NULL,
PRIMARY KEY (`_id`),
UNIQUE KEY `sequence_name_index` (`name`(50)),
KEY (`length_`)
)
ENGINE=MyISAM DEFAULT CHARSET=latin1;
INSERT INTO t1 VALUES
(1,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample1',''),
(2,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample2',''),
(3,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample3',''),
(4,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample4',''),
(5,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample5',''),
(6,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample6',''),
(7,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample7',''),
(8,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample8',''),
(9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'sample9','');
SELECT _id FROM t1;
_id
1
2
3
4
5
6
7
8
9
DELETE FROM t1 WHERE _id < 8;
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MyISAM 10 Dynamic 2 # # # # 140 # # # # # #
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
REPAIR TABLE t1 QUICK;
Table Op Msg_type Msg_text
test.t1 repair status OK
CHECK TABLE t1 EXTENDED;
Table Op Msg_type Msg_text
test.t1 check status OK
SHOW TABLE STATUS LIKE 't1';
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 MyISAM 10 Dynamic 2 # # # # 140 # # # # # #
SELECT _id FROM t1;
_id
8
9
DROP TABLE t1;
SET @@myisam_repair_threads=1;
SHOW VARIABLES LIKE 'myisam_repair%';
Variable_name Value
myisam_repair_threads 1
set storage_engine=MyISAM;
set storage_engine=MyISAM;
drop table if exists t1,t2,t3;
drop table if exists t1,t2,t3;
--- Testing varchar ---
--- Testing varchar ---
...
...
mysql-test/t/myisam.test
View file @
b15a23a7
...
@@ -755,6 +755,97 @@ SELECT * FROM t1;
...
@@ -755,6 +755,97 @@ SELECT * FROM t1;
DROP
TABLE
t1
;
DROP
TABLE
t1
;
#
#
# Bug#8283 - OPTIMIZE TABLE causes data loss
#
SET
@@
myisam_repair_threads
=
2
;
SHOW
VARIABLES
LIKE
'myisam_repair%'
;
#
# Test OPTIMIZE. This creates a new data file.
CREATE
TABLE
t1
(
`_id`
int
(
11
)
NOT
NULL
default
'0'
,
`url`
text
,
`email`
text
,
`description`
text
,
`loverlap`
int
(
11
)
default
NULL
,
`roverlap`
int
(
11
)
default
NULL
,
`lneighbor_id`
int
(
11
)
default
NULL
,
`rneighbor_id`
int
(
11
)
default
NULL
,
`length_`
int
(
11
)
default
NULL
,
`sequence`
mediumtext
,
`name`
text
,
`_obj_class`
text
NOT
NULL
,
PRIMARY
KEY
(
`_id`
),
UNIQUE
KEY
`sequence_name_index`
(
`name`
(
50
)),
KEY
(
`length_`
)
)
ENGINE
=
MyISAM
DEFAULT
CHARSET
=
latin1
;
#
INSERT
INTO
t1
VALUES
(
1
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample1'
,
''
),
(
2
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample2'
,
''
),
(
3
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample3'
,
''
),
(
4
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample4'
,
''
),
(
5
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample5'
,
''
),
(
6
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample6'
,
''
),
(
7
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample7'
,
''
),
(
8
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample8'
,
''
),
(
9
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample9'
,
''
);
#
SELECT
_id
FROM
t1
;
DELETE
FROM
t1
WHERE
_id
<
8
;
--
replace_column
6
# 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW
TABLE
STATUS
LIKE
't1'
;
CHECK
TABLE
t1
EXTENDED
;
OPTIMIZE
TABLE
t1
;
CHECK
TABLE
t1
EXTENDED
;
--
replace_column
6
# 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW
TABLE
STATUS
LIKE
't1'
;
SELECT
_id
FROM
t1
;
DROP
TABLE
t1
;
#
# Test REPAIR QUICK. This retains the old data file.
CREATE
TABLE
t1
(
`_id`
int
(
11
)
NOT
NULL
default
'0'
,
`url`
text
,
`email`
text
,
`description`
text
,
`loverlap`
int
(
11
)
default
NULL
,
`roverlap`
int
(
11
)
default
NULL
,
`lneighbor_id`
int
(
11
)
default
NULL
,
`rneighbor_id`
int
(
11
)
default
NULL
,
`length_`
int
(
11
)
default
NULL
,
`sequence`
mediumtext
,
`name`
text
,
`_obj_class`
text
NOT
NULL
,
PRIMARY
KEY
(
`_id`
),
UNIQUE
KEY
`sequence_name_index`
(
`name`
(
50
)),
KEY
(
`length_`
)
)
ENGINE
=
MyISAM
DEFAULT
CHARSET
=
latin1
;
#
INSERT
INTO
t1
VALUES
(
1
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample1'
,
''
),
(
2
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample2'
,
''
),
(
3
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample3'
,
''
),
(
4
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample4'
,
''
),
(
5
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample5'
,
''
),
(
6
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample6'
,
''
),
(
7
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample7'
,
''
),
(
8
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample8'
,
''
),
(
9
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
'sample9'
,
''
);
#
SELECT
_id
FROM
t1
;
DELETE
FROM
t1
WHERE
_id
<
8
;
--
replace_column
6
# 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW
TABLE
STATUS
LIKE
't1'
;
CHECK
TABLE
t1
EXTENDED
;
REPAIR
TABLE
t1
QUICK
;
CHECK
TABLE
t1
EXTENDED
;
--
replace_column
6
# 7 # 8 # 9 # 11 # 12 # 13 # 14 # 15 # 16 #
SHOW
TABLE
STATUS
LIKE
't1'
;
SELECT
_id
FROM
t1
;
DROP
TABLE
t1
;
#
SET
@@
myisam_repair_threads
=
1
;
SHOW
VARIABLES
LIKE
'myisam_repair%'
;
# Test varchar
# Test varchar
#
#
...
...
mysys/mf_iocache.c
View file @
b15a23a7
...
@@ -70,7 +70,6 @@ static void my_aiowait(my_aio_result *result);
...
@@ -70,7 +70,6 @@ static void my_aiowait(my_aio_result *result);
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
#define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
/*
/*
Setup internal pointers inside IO_CACHE
Setup internal pointers inside IO_CACHE
...
@@ -502,65 +501,366 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
...
@@ -502,65 +501,366 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
#ifdef THREAD
#ifdef THREAD
/* Prepare IO_CACHE for shared use */
/*
void
init_io_cache_share
(
IO_CACHE
*
info
,
IO_CACHE_SHARE
*
s
,
uint
num_threads
)
Prepare IO_CACHE for shared use.
SYNOPSIS
init_io_cache_share()
read_cache A read cache. This will be copied for
every thread after setup.
cshare The share.
write_cache If non-NULL a write cache that is to be
synchronized with the read caches.
num_threads Number of threads sharing the cache
including the write thread if any.
DESCRIPTION
The shared cache is used so: One IO_CACHE is initialized with
init_io_cache(). This includes the allocation of a buffer. Then a
share is allocated and init_io_cache_share() is called with the io
cache and the share. Then the io cache is copied for each thread. So
every thread has its own copy of IO_CACHE. But the allocated buffer
is shared because cache->buffer is the same for all caches.
One thread reads data from the file into the buffer. All threads
read from the buffer, but every thread maintains its own set of
pointers into the buffer. When all threads have used up the buffer
contents, one of the threads reads the next block of data into the
buffer. To accomplish this, each thread enters the cache lock before
accessing the buffer. They wait in lock_io_cache() until all threads
joined the lock. The last thread entering the lock is in charge of
reading from file to buffer. It wakes all threads when done.
Synchronizing a write cache to the read caches works so: Whenever
the write buffer needs a flush, the write thread enters the lock and
waits for all other threads to enter the lock too. They do this when
they have used up the read buffer. When all threads are in the lock,
the write thread copies the write buffer to the read buffer and
wakes all threads.
share->running_threads is the number of threads not being in the
cache lock. When entering lock_io_cache() the number is decreased.
When the thread that fills the buffer enters unlock_io_cache() the
number is reset to the number of threads. The condition
running_threads == 0 means that all threads are in the lock. Bumping
up the number to the full count is non-intuitive. But increasing the
number by one for each thread that leaves the lock could lead to a
solo run of one thread. The last thread to join a lock reads from
file to buffer, wakes the other threads, processes the data in the
cache and enters the lock again. If no other thread left the lock
meanwhile, it would think it's the last one again and read the next
block...
The share has copies of 'error', 'buffer', 'read_end', and
'pos_in_file' from the thread that filled the buffer. We may not be
able to access this information directly from its cache because the
thread may be removed from the share before the variables could be
copied by all other threads. Or, if a write buffer is synchronized,
it would change its 'pos_in_file' after waking the other threads,
possibly before they could copy its value.
However, the 'buffer' variable in the share is for a synchronized
write cache. It needs to know where to put the data. Otherwise it
would need access to the read cache of one of the threads that is
not yet removed from the share.
RETURN
void
*/
void
init_io_cache_share
(
IO_CACHE
*
read_cache
,
IO_CACHE_SHARE
*
cshare
,
IO_CACHE
*
write_cache
,
uint
num_threads
)
{
{
DBUG_ASSERT
(
info
->
type
==
READ_CACHE
);
DBUG_ENTER
(
"init_io_cache_share"
);
pthread_mutex_init
(
&
s
->
mutex
,
MY_MUTEX_INIT_FAST
);
DBUG_PRINT
(
"io_cache_share"
,
(
"read_cache: 0x%lx share: 0x%lx "
pthread_cond_init
(
&
s
->
cond
,
0
);
"write_cache: 0x%lx threads: %u"
,
s
->
total
=
s
->
count
=
num_threads
-
1
;
read_cache
,
cshare
,
write_cache
,
num_threads
));
s
->
active
=
0
;
info
->
share
=
s
;
DBUG_ASSERT
(
num_threads
>
1
);
info
->
read_function
=
_my_b_read_r
;
DBUG_ASSERT
(
read_cache
->
type
==
READ_CACHE
);
info
->
current_pos
=
info
->
current_end
=
0
;
DBUG_ASSERT
(
!
write_cache
||
(
write_cache
->
type
==
WRITE_CACHE
));
pthread_mutex_init
(
&
cshare
->
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
cshare
->
cond
,
0
);
pthread_cond_init
(
&
cshare
->
cond_writer
,
0
);
cshare
->
running_threads
=
num_threads
;
cshare
->
total_threads
=
num_threads
;
cshare
->
error
=
0
;
/* Initialize. */
cshare
->
buffer
=
read_cache
->
buffer
;
cshare
->
read_end
=
NULL
;
/* See function comment of lock_io_cache(). */
cshare
->
pos_in_file
=
0
;
/* See function comment of lock_io_cache(). */
cshare
->
source_cache
=
write_cache
;
/* Can be NULL. */
read_cache
->
share
=
cshare
;
read_cache
->
read_function
=
_my_b_read_r
;
read_cache
->
current_pos
=
NULL
;
read_cache
->
current_end
=
NULL
;
if
(
write_cache
)
write_cache
->
share
=
cshare
;
DBUG_VOID_RETURN
;
}
}
/*
/*
Remove a thread from shared access to IO_CACHE
Remove a thread from shared access to IO_CACHE.
Every thread should do that on exit for not
to deadlock other threads
SYNOPSIS
remove_io_thread()
cache The IO_CACHE to be removed from the share.
NOTE
Every thread must do that on exit for not to deadlock other threads.
The last thread destroys the pthread resources.
A writer flushes its cache first.
RETURN
void
*/
*/
void
remove_io_thread
(
IO_CACHE
*
info
)
void
remove_io_thread
(
IO_CACHE
*
cache
)
{
{
IO_CACHE_SHARE
*
s
=
info
->
share
;
IO_CACHE_SHARE
*
cshare
=
cache
->
share
;
uint
total
;
DBUG_ENTER
(
"remove_io_thread"
);
/* If the writer goes, it needs to flush the write cache. */
if
(
cache
==
cshare
->
source_cache
)
flush_io_cache
(
cache
);
pthread_mutex_lock
(
&
s
->
mutex
);
pthread_mutex_lock
(
&
cshare
->
mutex
);
s
->
total
--
;
DBUG_PRINT
(
"io_cache_share"
,
(
"%s: 0x%lx"
,
if
(
!
s
->
count
--
)
(
cache
==
cshare
->
source_cache
)
?
pthread_cond_signal
(
&
s
->
cond
);
"writer"
:
"reader"
,
cache
));
pthread_mutex_unlock
(
&
s
->
mutex
);
/* Remove from share. */
total
=
--
cshare
->
total_threads
;
DBUG_PRINT
(
"io_cache_share"
,
(
"remaining threads: %u"
,
total
));
/* Detach from share. */
cache
->
share
=
NULL
;
/* If the writer goes, let the readers know. */
if
(
cache
==
cshare
->
source_cache
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"writer leaves"
));
cshare
->
source_cache
=
NULL
;
}
/* If all threads are waiting for me to join the lock, wake them. */
if
(
!--
cshare
->
running_threads
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"the last running thread leaves, wake all"
));
pthread_cond_signal
(
&
cshare
->
cond_writer
);
pthread_cond_broadcast
(
&
cshare
->
cond
);
}
pthread_mutex_unlock
(
&
cshare
->
mutex
);
if
(
!
total
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"last thread removed, destroy share"
));
pthread_cond_destroy
(
&
cshare
->
cond_writer
);
pthread_cond_destroy
(
&
cshare
->
cond
);
pthread_mutex_destroy
(
&
cshare
->
mutex
);
}
DBUG_VOID_RETURN
;
}
}
static
int
lock_io_cache
(
IO_CACHE
*
info
,
my_off_t
pos
)
/*
Lock IO cache and wait for all other threads to join.
SYNOPSIS
lock_io_cache()
cache The cache of the thread entering the lock.
pos File position of the block to read.
Unused for the write thread.
DESCRIPTION
Wait for all threads to finish with the current buffer. We want
all threads to proceed in concert. The last thread to join
lock_io_cache() will read the block from file and all threads start
to use it. Then they will join again for reading the next block.
The waiting threads detect a fresh buffer by comparing
cshare->pos_in_file with the position they want to process next.
Since the first block may start at position 0, we take
cshare->read_end as an additional condition. This variable is
initialized to NULL and will be set after a block of data is written
to the buffer.
RETURN
1 OK, lock in place, go ahead and read.
0 OK, unlocked, another thread did the read.
*/
static
int
lock_io_cache
(
IO_CACHE
*
cache
,
my_off_t
pos
)
{
{
int
total
;
IO_CACHE_SHARE
*
cshare
=
cache
->
share
;
IO_CACHE_SHARE
*
s
=
info
->
share
;
DBUG_ENTER
(
"lock_io_cache"
)
;
pthread_mutex_lock
(
&
s
->
mutex
);
/* Enter the lock. */
if
(
!
s
->
count
)
pthread_mutex_lock
(
&
cshare
->
mutex
);
cshare
->
running_threads
--
;
DBUG_PRINT
(
"io_cache_share"
,
(
"%s: 0x%lx pos: %lu running: %u"
,
(
cache
==
cshare
->
source_cache
)
?
"writer"
:
"reader"
,
cache
,
(
ulong
)
pos
,
cshare
->
running_threads
));
if
(
cshare
->
source_cache
)
{
{
s
->
count
=
s
->
total
;
/* A write cache is synchronized to the read caches. */
return
1
;
if
(
cache
==
cshare
->
source_cache
)
{
/* The writer waits until all readers are here. */
while
(
cshare
->
running_threads
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"writer waits in lock"
));
pthread_cond_wait
(
&
cshare
->
cond_writer
,
&
cshare
->
mutex
);
}
}
DBUG_PRINT
(
"io_cache_share"
,
(
"writer awoke, going to copy"
));
total
=
s
->
total
;
/* Stay locked. Leave the lock later by unlock_io_cache(). */
s
->
count
--
;
DBUG_RETURN
(
1
);
while
(
!
s
->
active
||
s
->
active
->
pos_in_file
<
pos
)
}
pthread_cond_wait
(
&
s
->
cond
,
&
s
->
mutex
);
if
(
s
->
total
<
total
&&
/* The last thread wakes the writer. */
(
!
s
->
active
||
s
->
active
->
pos_in_file
<
pos
))
if
(
!
cshare
->
running_threads
)
return
1
;
{
DBUG_PRINT
(
"io_cache_share"
,
(
"waking writer"
));
pthread_cond_signal
(
&
cshare
->
cond_writer
);
}
pthread_mutex_unlock
(
&
s
->
mutex
);
/*
return
0
;
Readers wait until the data is copied from the writer. Another
reason to stop waiting is the removal of the write thread. If this
happens, we leave the lock with old data in the buffer.
*/
while
((
!
cshare
->
read_end
||
(
cshare
->
pos_in_file
<
pos
))
&&
cshare
->
source_cache
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"reader waits in lock"
));
pthread_cond_wait
(
&
cshare
->
cond
,
&
cshare
->
mutex
);
}
/*
If the writer was removed from the share while this thread was
asleep, we need to simulate an EOF condition. The writer cannot
reset the share variables as they might still be in use by readers
of the last block. When we awake here then because the last
joining thread signalled us. If the writer is not the last, it
will not signal. So it is safe to clear the buffer here.
*/
if
(
!
cshare
->
read_end
||
(
cshare
->
pos_in_file
<
pos
))
{
DBUG_PRINT
(
"io_cache_share"
,
(
"reader found writer removed. EOF"
));
cshare
->
read_end
=
cshare
->
buffer
;
/* Empty buffer. */
cshare
->
error
=
0
;
/* EOF is not an error. */
}
}
else
{
/*
There are read caches only. The last thread arriving in
lock_io_cache() continues with a locked cache and reads the block.
*/
if
(
!
cshare
->
running_threads
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"last thread joined, going to read"
));
/* Stay locked. Leave the lock later by unlock_io_cache(). */
DBUG_RETURN
(
1
);
}
/*
All other threads wait until the requested block is read by the
last thread arriving. Another reason to stop waiting is the
removal of a thread. If this leads to all threads being in the
lock, we have to continue also. The first of the awaken threads
will then do the read.
*/
while
((
!
cshare
->
read_end
||
(
cshare
->
pos_in_file
<
pos
))
&&
cshare
->
running_threads
)
{
DBUG_PRINT
(
"io_cache_share"
,
(
"reader waits in lock"
));
pthread_cond_wait
(
&
cshare
->
cond
,
&
cshare
->
mutex
);
}
/* If the block is not yet read, continue with a locked cache and read. */
if
(
!
cshare
->
read_end
||
(
cshare
->
pos_in_file
<
pos
))
{
DBUG_PRINT
(
"io_cache_share"
,
(
"reader awoke, going to read"
));
/* Stay locked. Leave the lock later by unlock_io_cache(). */
DBUG_RETURN
(
1
);
}
/* Another thread did read the block already. */
}
DBUG_PRINT
(
"io_cache_share"
,
(
"reader awoke, going to process %u bytes"
,
cshare
->
read_end
?
(
uint
)
(
cshare
->
read_end
-
cshare
->
buffer
)
:
0
));
/*
Leave the lock. Do not call unlock_io_cache() later. The thread that
filled the buffer did this and marked all threads as running.
*/
pthread_mutex_unlock
(
&
cshare
->
mutex
);
DBUG_RETURN
(
0
);
}
}
static
void
unlock_io_cache
(
IO_CACHE
*
info
)
/*
Unlock IO cache.
SYNOPSIS
unlock_io_cache()
cache The cache of the thread leaving the lock.
NOTE
This is called by the thread that filled the buffer. It marks all
threads as running and awakes them. This must not be done by any
other thread.
Do not signal cond_writer. Either there is no writer or the writer
is the only one who can call this function.
The reason for resetting running_threads to total_threads before
waking all other threads is that it could be possible that this
thread is so fast with processing the buffer that it enters the lock
before even one other thread has left it. If every awoken thread
would increase running_threads by one, this thread could think that
he is again the last to join and would not wait for the other
threads to process the data.
RETURN
void
*/
static
void
unlock_io_cache
(
IO_CACHE
*
cache
)
{
{
pthread_cond_broadcast
(
&
info
->
share
->
cond
);
IO_CACHE_SHARE
*
cshare
=
cache
->
share
;
pthread_mutex_unlock
(
&
info
->
share
->
mutex
);
DBUG_ENTER
(
"unlock_io_cache"
);
DBUG_PRINT
(
"io_cache_share"
,
(
"%s: 0x%lx pos: %lu running: %u"
,
(
cache
==
cshare
->
source_cache
)
?
"writer"
:
"reader"
,
cache
,
(
ulong
)
cshare
->
pos_in_file
,
cshare
->
total_threads
));
cshare
->
running_threads
=
cshare
->
total_threads
;
pthread_cond_broadcast
(
&
cshare
->
cond
);
pthread_mutex_unlock
(
&
cshare
->
mutex
);
DBUG_VOID_RETURN
;
}
}
...
@@ -569,7 +869,7 @@ static void unlock_io_cache(IO_CACHE *info)
...
@@ -569,7 +869,7 @@ static void unlock_io_cache(IO_CACHE *info)
SYNOPSIS
SYNOPSIS
_my_b_read_r()
_my_b_read_r()
info
IO_CACHE pointer
cache
IO_CACHE pointer
Buffer Buffer to retrieve count bytes from file
Buffer Buffer to retrieve count bytes from file
Count Number of bytes to read into Buffer
Count Number of bytes to read into Buffer
...
@@ -581,7 +881,7 @@ static void unlock_io_cache(IO_CACHE *info)
...
@@ -581,7 +881,7 @@ static void unlock_io_cache(IO_CACHE *info)
It works as follows: when a thread tries to read from a file (that
It works as follows: when a thread tries to read from a file (that
is, after using all the data from the (shared) buffer), it just
is, after using all the data from the (shared) buffer), it just
hangs on lock_io_cache(), wating for other threads. When the very
hangs on lock_io_cache(), wa
i
ting for other threads. When the very
last thread attempts a read, lock_io_cache() returns 1, the thread
last thread attempts a read, lock_io_cache() returns 1, the thread
does actual IO and unlock_io_cache(), which signals all the waiting
does actual IO and unlock_io_cache(), which signals all the waiting
threads that data is in the buffer.
threads that data is in the buffer.
...
@@ -601,16 +901,17 @@ static void unlock_io_cache(IO_CACHE *info)
...
@@ -601,16 +901,17 @@ static void unlock_io_cache(IO_CACHE *info)
1 Error: can't read requested characters
1 Error: can't read requested characters
*/
*/
int
_my_b_read_r
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
int
_my_b_read_r
(
register
IO_CACHE
*
cache
,
byte
*
Buffer
,
uint
Count
)
{
{
my_off_t
pos_in_file
;
my_off_t
pos_in_file
;
uint
length
,
diff_length
,
left_length
;
uint
length
,
diff_length
,
left_length
;
IO_CACHE_SHARE
*
cshare
=
cache
->
share
;
DBUG_ENTER
(
"_my_b_read_r"
);
DBUG_ENTER
(
"_my_b_read_r"
);
if
((
left_length
=
(
uint
)
(
info
->
read_end
-
info
->
read_pos
)))
if
((
left_length
=
(
uint
)
(
cache
->
read_end
-
cache
->
read_pos
)))
{
{
DBUG_ASSERT
(
Count
>=
left_length
);
/* User is not using my_b_read() */
DBUG_ASSERT
(
Count
>=
left_length
);
/* User is not using my_b_read() */
memcpy
(
Buffer
,
info
->
read_pos
,
(
size_t
)
(
left_length
));
memcpy
(
Buffer
,
cache
->
read_pos
,
(
size_t
)
(
left_length
));
Buffer
+=
left_length
;
Buffer
+=
left_length
;
Count
-=
left_length
;
Count
-=
left_length
;
}
}
...
@@ -618,55 +919,133 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
...
@@ -618,55 +919,133 @@ int _my_b_read_r(register IO_CACHE *info, byte *Buffer, uint Count)
{
{
int
cnt
,
len
;
int
cnt
,
len
;
pos_in_file
=
info
->
pos_in_file
+
(
info
->
read_end
-
info
->
buffer
);
pos_in_file
=
cache
->
pos_in_file
+
(
cache
->
read_end
-
cache
->
buffer
);
diff_length
=
(
uint
)
(
pos_in_file
&
(
IO_SIZE
-
1
));
diff_length
=
(
uint
)
(
pos_in_file
&
(
IO_SIZE
-
1
));
length
=
IO_ROUND_UP
(
Count
+
diff_length
)
-
diff_length
;
length
=
IO_ROUND_UP
(
Count
+
diff_length
)
-
diff_length
;
length
=
(
length
<=
info
->
read_length
)
?
length
=
((
length
<=
cache
->
read_length
)
?
length
+
IO_ROUND_DN
(
info
->
read_length
-
length
)
:
length
+
IO_ROUND_DN
(
cache
->
read_length
-
length
)
:
length
-
IO_ROUND_UP
(
length
-
info
->
read_length
)
;
length
-
IO_ROUND_UP
(
length
-
cache
->
read_length
))
;
if
(
info
->
type
!=
READ_FIFO
&&
if
(
cache
->
type
!=
READ_FIFO
&&
(
length
>
(
info
->
end_of_file
-
pos_in_file
)))
(
length
>
(
cache
->
end_of_file
-
pos_in_file
)))
length
=
(
uint
)
(
info
->
end_of_file
-
pos_in_file
);
length
=
(
uint
)
(
cache
->
end_of_file
-
pos_in_file
);
if
(
length
==
0
)
if
(
length
==
0
)
{
{
info
->
error
=
(
int
)
left_length
;
cache
->
error
=
(
int
)
left_length
;
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
}
}
if
(
lock_io_cache
(
info
,
pos_in_file
))
if
(
lock_io_cache
(
cache
,
pos_in_file
))
{
{
info
->
share
->
active
=
info
;
/* With a synchronized write/read cache we won't come here... */
if
(
info
->
seek_not_done
)
/* File touched, do seek */
DBUG_ASSERT
(
!
cshare
->
source_cache
);
VOID
(
my_seek
(
info
->
file
,
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
/*
len
=
(
int
)
my_read
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
);
... unless the writer has gone before this thread entered the
info
->
read_end
=
info
->
buffer
+
(
len
==
-
1
?
0
:
len
);
lock. Simulate EOF in this case. It can be distinguished by
info
->
error
=
(
len
==
(
int
)
length
?
0
:
len
);
cache->file.
info
->
pos_in_file
=
pos_in_file
;
*/
unlock_io_cache
(
info
);
if
(
cache
->
file
<
0
)
len
=
0
;
else
{
if
(
cache
->
seek_not_done
)
/* File touched, do seek */
VOID
(
my_seek
(
cache
->
file
,
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
len
=
(
int
)
my_read
(
cache
->
file
,
cache
->
buffer
,
length
,
cache
->
myflags
);
}
DBUG_PRINT
(
"io_cache_share"
,
(
"read %d bytes"
,
len
));
cache
->
read_end
=
cache
->
buffer
+
(
len
==
-
1
?
0
:
len
);
cache
->
error
=
(
len
==
(
int
)
length
?
0
:
len
);
cache
->
pos_in_file
=
pos_in_file
;
/* Copy important values to the share. */
cshare
->
error
=
cache
->
error
;
cshare
->
read_end
=
cache
->
read_end
;
cshare
->
pos_in_file
=
pos_in_file
;
/* Mark all threads as running and wake them. */
unlock_io_cache
(
cache
);
}
}
else
else
{
{
info
->
error
=
info
->
share
->
active
->
error
;
/*
info
->
read_end
=
info
->
share
->
active
->
read_end
;
With a synchronized write/read cache readers always come here.
info
->
pos_in_file
=
info
->
share
->
active
->
pos_in_file
;
Copy important values from the share.
len
=
(
int
)
(
info
->
error
==
-
1
?
-
1
:
info
->
read_end
-
info
->
buffer
);
*/
cache
->
error
=
cshare
->
error
;
cache
->
read_end
=
cshare
->
read_end
;
cache
->
pos_in_file
=
cshare
->
pos_in_file
;
len
=
(
int
)
((
cache
->
error
==
-
1
)
?
-
1
:
cache
->
read_end
-
cache
->
buffer
);
}
}
info
->
read_pos
=
info
->
buffer
;
cache
->
read_pos
=
cache
->
buffer
;
info
->
seek_not_done
=
0
;
cache
->
seek_not_done
=
0
;
if
(
len
<=
0
)
if
(
len
<=
0
)
{
{
info
->
error
=
(
int
)
left_length
;
DBUG_PRINT
(
"io_cache_share"
,
(
"reader error. len %d left %u"
,
len
,
left_length
));
cache
->
error
=
(
int
)
left_length
;
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
}
}
cnt
=
((
uint
)
len
>
Count
)
?
(
int
)
Count
:
len
;
cnt
=
((
uint
)
len
>
Count
)
?
(
int
)
Count
:
len
;
memcpy
(
Buffer
,
info
->
read_pos
,
(
size_t
)
cnt
);
memcpy
(
Buffer
,
cache
->
read_pos
,
(
size_t
)
cnt
);
Count
-=
cnt
;
Count
-=
cnt
;
Buffer
+=
cnt
;
Buffer
+=
cnt
;
left_length
+=
cnt
;
left_length
+=
cnt
;
info
->
read_pos
+=
cnt
;
cache
->
read_pos
+=
cnt
;
}
}
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
#endif
/*
Copy data from write cache to read cache.
SYNOPSIS
copy_to_read_buffer()
write_cache The write cache.
write_buffer The source of data, mostly the cache buffer.
write_length The number of bytes to copy.
NOTE
The write thread will wait for all read threads to join the cache
lock. Then it copies the data over and wakes the read threads.
RETURN
void
*/
static
void
copy_to_read_buffer
(
IO_CACHE
*
write_cache
,
const
byte
*
write_buffer
,
uint
write_length
)
{
IO_CACHE_SHARE
*
cshare
=
write_cache
->
share
;
DBUG_ASSERT
(
cshare
->
source_cache
==
write_cache
);
/*
write_length is usually less or equal to buffer_length.
It can be bigger if _my_b_write() is called with a big length.
*/
while
(
write_length
)
{
uint
copy_length
=
min
(
write_length
,
write_cache
->
buffer_length
);
int
__attribute__
((
unused
))
rc
;
rc
=
lock_io_cache
(
write_cache
,
write_cache
->
pos_in_file
);
/* The writing thread does always have the lock when it awakes. */
DBUG_ASSERT
(
rc
);
memcpy
(
cshare
->
buffer
,
write_buffer
,
copy_length
);
cshare
->
error
=
0
;
cshare
->
read_end
=
cshare
->
buffer
+
copy_length
;
cshare
->
pos_in_file
=
write_cache
->
pos_in_file
;
/* Mark all threads as running and wake them. */
unlock_io_cache
(
write_cache
);
write_buffer
+=
copy_length
;
write_length
-=
copy_length
;
}
}
#endif
/*THREAD*/
/*
/*
...
@@ -1018,6 +1397,7 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
...
@@ -1018,6 +1397,7 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer
+=
rest_length
;
Buffer
+=
rest_length
;
Count
-=
rest_length
;
Count
-=
rest_length
;
info
->
write_pos
+=
rest_length
;
info
->
write_pos
+=
rest_length
;
if
(
my_b_flush_io_cache
(
info
,
1
))
if
(
my_b_flush_io_cache
(
info
,
1
))
return
1
;
return
1
;
if
(
Count
>=
IO_SIZE
)
if
(
Count
>=
IO_SIZE
)
...
@@ -1030,6 +1410,23 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
...
@@ -1030,6 +1410,23 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
}
}
if
(
my_write
(
info
->
file
,
Buffer
,(
uint
)
length
,
info
->
myflags
|
MY_NABP
))
if
(
my_write
(
info
->
file
,
Buffer
,(
uint
)
length
,
info
->
myflags
|
MY_NABP
))
return
info
->
error
=
-
1
;
return
info
->
error
=
-
1
;
#ifdef THREAD
/*
In case of a shared I/O cache with a writer we normally do direct
write cache to read cache copy. Simulate this here by direct
caller buffer to read cache copy. Do it after the write so that
the cache readers actions on the flushed part can go in parallel
with the write of the extra stuff. copy_to_read_buffer()
synchronizes writer and readers so that after this call the
readers can act on the extra stuff while the writer can go ahead
and prepare the next output. copy_to_read_buffer() relies on
info->pos_in_file.
*/
if
(
info
->
share
)
copy_to_read_buffer
(
info
,
Buffer
,
length
);
#endif
Count
-=
length
;
Count
-=
length
;
Buffer
+=
length
;
Buffer
+=
length
;
info
->
pos_in_file
+=
length
;
info
->
pos_in_file
+=
length
;
...
@@ -1050,6 +1447,14 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
...
@@ -1050,6 +1447,14 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
{
{
uint
rest_length
,
length
;
uint
rest_length
,
length
;
#ifdef THREAD
/*
Assert that we cannot come here with a shared cache. If we do one
day, we might need to add a call to copy_to_read_buffer().
*/
DBUG_ASSERT
(
!
info
->
share
);
#endif
lock_append_buffer
(
info
);
lock_append_buffer
(
info
);
rest_length
=
(
uint
)
(
info
->
write_end
-
info
->
write_pos
);
rest_length
=
(
uint
)
(
info
->
write_end
-
info
->
write_pos
);
if
(
Count
<=
rest_length
)
if
(
Count
<=
rest_length
)
...
@@ -1110,6 +1515,14 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
...
@@ -1110,6 +1515,14 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
uint
length
;
uint
length
;
int
error
=
0
;
int
error
=
0
;
#ifdef THREAD
/*
Assert that we cannot come here with a shared cache. If we do one
day, we might need to add a call to copy_to_read_buffer().
*/
DBUG_ASSERT
(
!
info
->
share
);
#endif
if
(
pos
<
info
->
pos_in_file
)
if
(
pos
<
info
->
pos_in_file
)
{
{
/* Of no overlap, write everything without buffering */
/* Of no overlap, write everything without buffering */
...
@@ -1186,6 +1599,17 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
...
@@ -1186,6 +1599,17 @@ int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
if
((
length
=
(
uint
)
(
info
->
write_pos
-
info
->
write_buffer
)))
if
((
length
=
(
uint
)
(
info
->
write_pos
-
info
->
write_buffer
)))
{
{
#ifdef THREAD
/*
In case of a shared I/O cache with a writer we do direct write
cache to read cache copy. Do it before the write here so that
the readers can work in parallel with the write.
copy_to_read_buffer() relies on info->pos_in_file.
*/
if
(
info
->
share
)
copy_to_read_buffer
(
info
,
info
->
write_buffer
,
length
);
#endif
pos_in_file
=
info
->
pos_in_file
;
pos_in_file
=
info
->
pos_in_file
;
/*
/*
If we have append cache, we always open the file with
If we have append cache, we always open the file with
...
@@ -1265,16 +1689,10 @@ int end_io_cache(IO_CACHE *info)
...
@@ -1265,16 +1689,10 @@ int end_io_cache(IO_CACHE *info)
#ifdef THREAD
#ifdef THREAD
/*
/*
if IO_CACHE is shared between several threads, only one
Every thread must call remove_io_thread(). The last one destroys
thread needs to call end_io_cache() - just as init_io_cache()
the share elements.
should be called only once and then memcopy'ed
*/
*/
if
(
info
->
share
)
DBUG_ASSERT
(
!
info
->
share
||
!
info
->
share
->
total_threads
);
{
pthread_cond_destroy
(
&
info
->
share
->
cond
);
pthread_mutex_destroy
(
&
info
->
share
->
mutex
);
info
->
share
=
0
;
}
#endif
#endif
if
((
pre_close
=
info
->
pre_close
))
if
((
pre_close
=
info
->
pre_close
))
...
...
storage/myisam/mi_check.c
View file @
b15a23a7
...
@@ -16,6 +16,31 @@
...
@@ -16,6 +16,31 @@
/* Describe, check and repair of MyISAM tables */
/* Describe, check and repair of MyISAM tables */
/*
About checksum calculation.
There are two types of checksums. Table checksum and row checksum.
Row checksum is an additional byte at the end of dynamic length
records. It must be calculated if the table is configured for them.
Otherwise they must not be used. The variable
MYISAM_SHARE::calc_checksum determines if row checksums are used.
MI_INFO::checksum is used as temporary storage during row handling.
For parallel repair we must assure that only one thread can use this
variable. There is no problem on the write side as this is done by one
thread only. But when checking a record after read this could go
wrong. But since all threads read through a common read buffer, it is
sufficient if only one thread checks it.
Table checksum is an eight byte value in the header of the index file.
It can be calculated even if row checksums are not used. The variable
MI_CHECK::glob_crc is calculated over all records.
MI_SORT_PARAM::calc_checksum determines if this should be done. This
variable is not part of MI_CHECK because it must be set per thread for
parallel repair. The global glob_crc must be changed by one thread
only. And it is sufficient to calculate the checksum once only.
*/
#include "ftdefs.h"
#include "ftdefs.h"
#include <m_ctype.h>
#include <m_ctype.h>
#include <stdarg.h>
#include <stdarg.h>
...
@@ -41,8 +66,7 @@ static int chk_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo,
...
@@ -41,8 +66,7 @@ static int chk_index(MI_CHECK *param, MI_INFO *info,MI_KEYDEF *keyinfo,
ha_checksum
*
key_checksum
,
uint
level
);
ha_checksum
*
key_checksum
,
uint
level
);
static
uint
isam_key_length
(
MI_INFO
*
info
,
MI_KEYDEF
*
keyinfo
);
static
uint
isam_key_length
(
MI_INFO
*
info
,
MI_KEYDEF
*
keyinfo
);
static
ha_checksum
calc_checksum
(
ha_rows
count
);
static
ha_checksum
calc_checksum
(
ha_rows
count
);
static
int
writekeys
(
MI_CHECK
*
param
,
MI_INFO
*
info
,
byte
*
buff
,
static
int
writekeys
(
MI_SORT_PARAM
*
sort_param
);
my_off_t
filepos
);
static
int
sort_one_index
(
MI_CHECK
*
param
,
MI_INFO
*
info
,
MI_KEYDEF
*
keyinfo
,
static
int
sort_one_index
(
MI_CHECK
*
param
,
MI_INFO
*
info
,
MI_KEYDEF
*
keyinfo
,
my_off_t
pagepos
,
File
new_file
);
my_off_t
pagepos
,
File
new_file
);
static
int
sort_key_read
(
MI_SORT_PARAM
*
sort_param
,
void
*
key
);
static
int
sort_key_read
(
MI_SORT_PARAM
*
sort_param
,
void
*
key
);
...
@@ -1102,7 +1126,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend)
...
@@ -1102,7 +1126,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend)
goto
err
;
goto
err
;
start_recpos
=
pos
;
start_recpos
=
pos
;
splits
++
;
splits
++
;
VOID
(
_mi_pack_get_block_info
(
info
,
&
block_info
,
-
1
,
start_recpos
));
VOID
(
_mi_pack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
-
1
,
start_recpos
));
pos
=
block_info
.
filepos
+
block_info
.
rec_len
;
pos
=
block_info
.
filepos
+
block_info
.
rec_len
;
if
(
block_info
.
rec_len
<
(
uint
)
info
->
s
->
min_pack_length
||
if
(
block_info
.
rec_len
<
(
uint
)
info
->
s
->
min_pack_length
||
block_info
.
rec_len
>
(
uint
)
info
->
s
->
max_pack_length
)
block_info
.
rec_len
>
(
uint
)
info
->
s
->
max_pack_length
)
...
@@ -1116,7 +1141,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend)
...
@@ -1116,7 +1141,8 @@ int chk_data_link(MI_CHECK *param, MI_INFO *info,int extend)
if
(
_mi_read_cache
(
&
param
->
read_cache
,(
byte
*
)
info
->
rec_buff
,
if
(
_mi_read_cache
(
&
param
->
read_cache
,(
byte
*
)
info
->
rec_buff
,
block_info
.
filepos
,
block_info
.
rec_len
,
READING_NEXT
))
block_info
.
filepos
,
block_info
.
rec_len
,
READING_NEXT
))
goto
err
;
goto
err
;
if
(
_mi_pack_rec_unpack
(
info
,
record
,
info
->
rec_buff
,
block_info
.
rec_len
))
if
(
_mi_pack_rec_unpack
(
info
,
&
info
->
bit_buff
,
record
,
info
->
rec_buff
,
block_info
.
rec_len
))
{
{
mi_check_print_error
(
param
,
"Found wrong record at %s"
,
mi_check_print_error
(
param
,
"Found wrong record at %s"
,
llstr
(
start_recpos
,
llbuff
));
llstr
(
start_recpos
,
llbuff
));
...
@@ -1400,7 +1426,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
...
@@ -1400,7 +1426,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
info
->
state
->
empty
=
0
;
info
->
state
->
empty
=
0
;
param
->
glob_crc
=
0
;
param
->
glob_crc
=
0
;
if
(
param
->
testflag
&
T_CALC_CHECKSUM
)
if
(
param
->
testflag
&
T_CALC_CHECKSUM
)
param
->
calc_checksum
=
1
;
sort_param
.
calc_checksum
=
1
;
info
->
update
=
(
short
)
(
HA_STATE_CHANGED
|
HA_STATE_ROW_CHANGED
);
info
->
update
=
(
short
)
(
HA_STATE_CHANGED
|
HA_STATE_ROW_CHANGED
);
...
@@ -1429,7 +1455,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
...
@@ -1429,7 +1455,7 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
/* Re-create all keys, which are set in key_map. */
/* Re-create all keys, which are set in key_map. */
while
(
!
(
error
=
sort_get_next_record
(
&
sort_param
)))
while
(
!
(
error
=
sort_get_next_record
(
&
sort_param
)))
{
{
if
(
writekeys
(
param
,
info
,(
byte
*
)
sort_param
.
record
,
sort_param
.
filepos
))
if
(
writekeys
(
&
sort_param
))
{
{
if
(
my_errno
!=
HA_ERR_FOUND_DUPP_KEY
)
if
(
my_errno
!=
HA_ERR_FOUND_DUPP_KEY
)
goto
err
;
goto
err
;
...
@@ -1574,11 +1600,13 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
...
@@ -1574,11 +1600,13 @@ int mi_repair(MI_CHECK *param, register MI_INFO *info,
/* Uppate keyfile when doing repair */
/* Uppate keyfile when doing repair */
static
int
writekeys
(
MI_CHECK
*
param
,
register
MI_INFO
*
info
,
byte
*
buff
,
static
int
writekeys
(
MI_SORT_PARAM
*
sort_param
)
my_off_t
filepos
)
{
{
register
uint
i
;
register
uint
i
;
uchar
*
key
;
uchar
*
key
;
MI_INFO
*
info
=
sort_param
->
sort_info
->
info
;
byte
*
buff
=
sort_param
->
record
;
my_off_t
filepos
=
sort_param
->
filepos
;
DBUG_ENTER
(
"writekeys"
);
DBUG_ENTER
(
"writekeys"
);
key
=
info
->
lastkey
+
info
->
s
->
base
.
max_key_length
;
key
=
info
->
lastkey
+
info
->
s
->
base
.
max_key_length
;
...
@@ -1632,8 +1660,8 @@ static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff,
...
@@ -1632,8 +1660,8 @@ static int writekeys(MI_CHECK *param, register MI_INFO *info, byte *buff,
}
}
}
}
/* Remove checksum that was added to glob_crc in sort_get_next_record */
/* Remove checksum that was added to glob_crc in sort_get_next_record */
if
(
param
->
calc_checksum
)
if
(
sort_
param
->
calc_checksum
)
param
->
glob_crc
-=
info
->
checksum
;
sort_param
->
sort_info
->
param
->
glob_crc
-=
info
->
checksum
;
DBUG_PRINT
(
"error"
,(
"errno: %d"
,
my_errno
));
DBUG_PRINT
(
"error"
,(
"errno: %d"
,
my_errno
));
DBUG_RETURN
(
-
1
);
DBUG_RETURN
(
-
1
);
}
/* writekeys */
}
/* writekeys */
...
@@ -2140,7 +2168,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
...
@@ -2140,7 +2168,7 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
del
=
info
->
state
->
del
;
del
=
info
->
state
->
del
;
param
->
glob_crc
=
0
;
param
->
glob_crc
=
0
;
if
(
param
->
testflag
&
T_CALC_CHECKSUM
)
if
(
param
->
testflag
&
T_CALC_CHECKSUM
)
param
->
calc_checksum
=
1
;
sort_param
.
calc_checksum
=
1
;
rec_per_key_part
=
param
->
rec_per_key_part
;
rec_per_key_part
=
param
->
rec_per_key_part
;
for
(
sort_param
.
key
=
0
;
sort_param
.
key
<
share
->
base
.
keys
;
for
(
sort_param
.
key
=
0
;
sort_param
.
key
<
share
->
base
.
keys
;
...
@@ -2226,7 +2254,8 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
...
@@ -2226,7 +2254,8 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
param
->
retry_repair
=
1
;
param
->
retry_repair
=
1
;
goto
err
;
goto
err
;
}
}
param
->
calc_checksum
=
0
;
/* No need to calc glob_crc */
/* No need to calculate checksum again. */
sort_param
.
calc_checksum
=
0
;
free_root
(
&
sort_param
.
wordroot
,
MYF
(
0
));
free_root
(
&
sort_param
.
wordroot
,
MYF
(
0
));
/* Set for next loop */
/* Set for next loop */
...
@@ -2390,6 +2419,28 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
...
@@ -2390,6 +2419,28 @@ int mi_repair_by_sort(MI_CHECK *param, register MI_INFO *info,
Each key is handled by a separate thread.
Each key is handled by a separate thread.
TODO: make a number of threads a parameter
TODO: make a number of threads a parameter
In parallel repair we use one thread per index. There are two modes:
Quick
Only the indexes are rebuilt. All threads share a read buffer.
Every thread that needs fresh data in the buffer enters the shared
cache lock. The last thread joining the lock reads the buffer from
the data file and wakes all other threads.
Non-quick
The data file is rebuilt and all indexes are rebuilt to point to
the new record positions. One thread is the master thread. It
reads from the old data file and writes to the new data file. It
also creates one of the indexes. The other threads read from a
buffer which is filled by the master. If they need fresh data,
they enter the shared cache lock. If the masters write buffer is
full, it flushes it to the new data file and enters the shared
cache lock too. When all threads joined in the lock, the master
copies its write buffer to the read buffer for the other threads
and wakes them.
RESULT
RESULT
0 ok
0 ok
<>0 Error
<>0 Error
...
@@ -2412,6 +2463,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2412,6 +2463,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
ulong
*
rec_per_key_part
;
ulong
*
rec_per_key_part
;
HA_KEYSEG
*
keyseg
;
HA_KEYSEG
*
keyseg
;
char
llbuff
[
22
];
char
llbuff
[
22
];
IO_CACHE
new_data_cache
;
/* For non-quick repair. */
IO_CACHE_SHARE
io_share
;
IO_CACHE_SHARE
io_share
;
SORT_INFO
sort_info
;
SORT_INFO
sort_info
;
ulonglong
key_map
=
share
->
state
.
key_map
;
ulonglong
key_map
=
share
->
state
.
key_map
;
...
@@ -2433,19 +2485,55 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2433,19 +2485,55 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
if
(
info
->
s
->
options
&
(
HA_OPTION_CHECKSUM
|
HA_OPTION_COMPRESS_RECORD
))
if
(
info
->
s
->
options
&
(
HA_OPTION_CHECKSUM
|
HA_OPTION_COMPRESS_RECORD
))
param
->
testflag
|=
T_CALC_CHECKSUM
;
param
->
testflag
|=
T_CALC_CHECKSUM
;
/*
Quick repair (not touching data file, rebuilding indexes):
{
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
}
Non-quick repair (rebuilding data file and indexes):
{
Master thread:
Read cache is (MI_CHECK *param)->read_cache using info->dfile.
Write cache is (MI_INFO *info)->rec_cache using new_file.
Slave threads:
Read cache is new_data_cache synced to master rec_cache.
The final assignment of the filedescriptor for rec_cache is done
after the cache creation.
Don't check file size on new_data_cache, as the resulting file size
is not known yet.
As rec_cache and new_data_cache are synced, write_buffer_length is
used for the read cache 'new_data_cache'. Both start at the same
position 'new_header_length'.
}
*/
DBUG_PRINT
(
"info"
,
(
"is quick repair: %d"
,
rep_quick
));
bzero
((
char
*
)
&
sort_info
,
sizeof
(
sort_info
));
bzero
((
char
*
)
&
sort_info
,
sizeof
(
sort_info
));
/* Initialize pthread structures before goto err. */
pthread_mutex_init
(
&
sort_info
.
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
sort_info
.
cond
,
0
);
if
(
!
(
sort_info
.
key_block
=
if
(
!
(
sort_info
.
key_block
=
alloc_key_blocks
(
param
,
alloc_key_blocks
(
param
,
(
uint
)
param
->
sort_key_blocks
,
(
uint
)
param
->
sort_key_blocks
,
share
->
base
.
max_key_block_length
))
||
share
->
base
.
max_key_block_length
))
init_io_cache
(
&
param
->
read_cache
,
info
->
dfile
,
||
init_io_cache
(
&
param
->
read_cache
,
info
->
dfile
,
(
uint
)
param
->
read_buffer_length
,
(
uint
)
param
->
read_buffer_length
,
READ_CACHE
,
share
->
pack
.
header_length
,
1
,
MYF
(
MY_WME
))
||
READ_CACHE
,
share
->
pack
.
header_length
,
1
,
MYF
(
MY_WME
))
||
(
!
rep_quick
&&
(
!
rep_quick
&&
init_io_cache
(
&
info
->
rec_cache
,
info
->
dfile
,
(
init_io_cache
(
&
info
->
rec_cache
,
info
->
dfile
,
(
uint
)
param
->
write_buffer_length
,
(
uint
)
param
->
write_buffer_length
,
WRITE_CACHE
,
new_header_length
,
1
,
WRITE_CACHE
,
new_header_length
,
1
,
MYF
(
MY_WME
|
MY_WAIT_IF_FULL
)
&
param
->
myf_rw
)))
MYF
(
MY_WME
|
MY_WAIT_IF_FULL
)
&
param
->
myf_rw
)
||
init_io_cache
(
&
new_data_cache
,
-
1
,
(
uint
)
param
->
write_buffer_length
,
READ_CACHE
,
new_header_length
,
1
,
MYF
(
MY_WME
|
MY_DONT_CHECK_FILESIZE
)))))
goto
err
;
goto
err
;
sort_info
.
key_block_end
=
sort_info
.
key_block
+
param
->
sort_key_blocks
;
sort_info
.
key_block_end
=
sort_info
.
key_block
+
param
->
sort_key_blocks
;
info
->
opt_flag
|=
WRITE_CACHE_USED
;
info
->
opt_flag
|=
WRITE_CACHE_USED
;
...
@@ -2536,8 +2624,6 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2536,8 +2624,6 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
del
=
info
->
state
->
del
;
del
=
info
->
state
->
del
;
param
->
glob_crc
=
0
;
param
->
glob_crc
=
0
;
if
(
param
->
testflag
&
T_CALC_CHECKSUM
)
param
->
calc_checksum
=
1
;
if
(
!
(
sort_param
=
(
MI_SORT_PARAM
*
)
if
(
!
(
sort_param
=
(
MI_SORT_PARAM
*
)
my_malloc
((
uint
)
share
->
base
.
keys
*
my_malloc
((
uint
)
share
->
base
.
keys
*
...
@@ -2587,6 +2673,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2587,6 +2673,7 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
sort_param
[
i
].
sort_info
=&
sort_info
;
sort_param
[
i
].
sort_info
=&
sort_info
;
sort_param
[
i
].
master
=
0
;
sort_param
[
i
].
master
=
0
;
sort_param
[
i
].
fix_datafile
=
0
;
sort_param
[
i
].
fix_datafile
=
0
;
sort_param
[
i
].
calc_checksum
=
0
;
sort_param
[
i
].
filepos
=
new_header_length
;
sort_param
[
i
].
filepos
=
new_header_length
;
sort_param
[
i
].
max_pos
=
sort_param
[
i
].
pos
=
share
->
pack
.
header_length
;
sort_param
[
i
].
max_pos
=
sort_param
[
i
].
pos
=
share
->
pack
.
header_length
;
...
@@ -2624,19 +2711,45 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2624,19 +2711,45 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
sort_info
.
total_keys
=
i
;
sort_info
.
total_keys
=
i
;
sort_param
[
0
].
master
=
1
;
sort_param
[
0
].
master
=
1
;
sort_param
[
0
].
fix_datafile
=
(
my_bool
)(
!
rep_quick
);
sort_param
[
0
].
fix_datafile
=
(
my_bool
)(
!
rep_quick
);
sort_param
[
0
].
calc_checksum
=
test
(
param
->
testflag
&
T_CALC_CHECKSUM
);
sort_info
.
got_error
=
0
;
sort_info
.
got_error
=
0
;
pthread_mutex_init
(
&
sort_info
.
mutex
,
MY_MUTEX_INIT_FAST
);
pthread_cond_init
(
&
sort_info
.
cond
,
0
);
pthread_mutex_lock
(
&
sort_info
.
mutex
);
pthread_mutex_lock
(
&
sort_info
.
mutex
);
init_io_cache_share
(
&
param
->
read_cache
,
&
io_share
,
i
);
/*
Initialize the I/O cache share for use with the read caches and, in
case of non-quick repair, the write cache. When all threads join on
the cache lock, the writer copies the write cache contents to the
read caches.
*/
if
(
i
>
1
)
{
if
(
rep_quick
)
init_io_cache_share
(
&
param
->
read_cache
,
&
io_share
,
NULL
,
i
);
else
init_io_cache_share
(
&
new_data_cache
,
&
io_share
,
&
info
->
rec_cache
,
i
);
}
else
io_share
.
total_threads
=
0
;
/* share not used */
(
void
)
pthread_attr_init
(
&
thr_attr
);
(
void
)
pthread_attr_init
(
&
thr_attr
);
(
void
)
pthread_attr_setdetachstate
(
&
thr_attr
,
PTHREAD_CREATE_DETACHED
);
(
void
)
pthread_attr_setdetachstate
(
&
thr_attr
,
PTHREAD_CREATE_DETACHED
);
for
(
i
=
0
;
i
<
sort_info
.
total_keys
;
i
++
)
for
(
i
=
0
;
i
<
sort_info
.
total_keys
;
i
++
)
{
{
sort_param
[
i
].
read_cache
=
param
->
read_cache
;
/*
Copy the properly initialized IO_CACHE structure so that every
thread has its own copy. In quick mode param->read_cache is shared
for use by all threads. In non-quick mode all threads but the
first copy the shared new_data_cache, which is synchronized to the
write cache of the first thread. The first thread copies
param->read_cache, which is not shared.
*/
sort_param
[
i
].
read_cache
=
((
rep_quick
||
!
i
)
?
param
->
read_cache
:
new_data_cache
);
DBUG_PRINT
(
"io_cache_share"
,
(
"thread: %u read_cache: 0x%lx"
,
i
,
(
long
)
&
sort_param
[
i
].
read_cache
));
/*
/*
two approaches: the same amount of memory for each thread
two approaches: the same amount of memory for each thread
or the memory for the same number of keys for each thread...
or the memory for the same number of keys for each thread...
...
@@ -2654,7 +2767,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2654,7 +2767,10 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
(
void
*
)
(
sort_param
+
i
)))
(
void
*
)
(
sort_param
+
i
)))
{
{
mi_check_print_error
(
param
,
"Cannot start a repair thread"
);
mi_check_print_error
(
param
,
"Cannot start a repair thread"
);
remove_io_thread
(
&
param
->
read_cache
);
/* Cleanup: Detach from the share. Avoid others to be blocked. */
if
(
io_share
.
total_threads
)
remove_io_thread
(
&
sort_param
[
i
].
read_cache
);
DBUG_PRINT
(
"error"
,
(
"Cannot start a repair thread"
));
sort_info
.
got_error
=
1
;
sort_info
.
got_error
=
1
;
}
}
else
else
...
@@ -2676,6 +2792,11 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2676,6 +2792,11 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
if
(
sort_param
[
0
].
fix_datafile
)
if
(
sort_param
[
0
].
fix_datafile
)
{
{
/*
Append some nuls to the end of a memory mapped file. Destroy the
write cache. The master thread did already detach from the share
by remove_io_thread() in sort.c:thr_find_all_keys().
*/
if
(
write_data_suffix
(
&
sort_info
,
1
)
||
end_io_cache
(
&
info
->
rec_cache
))
if
(
write_data_suffix
(
&
sort_info
,
1
)
||
end_io_cache
(
&
info
->
rec_cache
))
goto
err
;
goto
err
;
if
(
param
->
testflag
&
T_SAFE_REPAIR
)
if
(
param
->
testflag
&
T_SAFE_REPAIR
)
...
@@ -2691,8 +2812,14 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2691,8 +2812,14 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
sort_param
->
filepos
;
sort_param
->
filepos
;
/* Only whole records */
/* Only whole records */
share
->
state
.
version
=
(
ulong
)
time
((
time_t
*
)
0
);
share
->
state
.
version
=
(
ulong
)
time
((
time_t
*
)
0
);
/*
Exchange the data file descriptor of the table, so that we use the
new file from now on.
*/
my_close
(
info
->
dfile
,
MYF
(
0
));
my_close
(
info
->
dfile
,
MYF
(
0
));
info
->
dfile
=
new_file
;
info
->
dfile
=
new_file
;
share
->
data_file_type
=
sort_info
.
new_data_file_type
;
share
->
data_file_type
=
sort_info
.
new_data_file_type
;
share
->
pack
.
header_length
=
(
ulong
)
new_header_length
;
share
->
pack
.
header_length
=
(
ulong
)
new_header_length
;
}
}
...
@@ -2747,7 +2874,20 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
...
@@ -2747,7 +2874,20 @@ int mi_repair_parallel(MI_CHECK *param, register MI_INFO *info,
err:
err:
got_error
|=
flush_blocks
(
param
,
share
->
key_cache
,
share
->
kfile
);
got_error
|=
flush_blocks
(
param
,
share
->
key_cache
,
share
->
kfile
);
/*
Destroy the write cache. The master thread did already detach from
the share by remove_io_thread() or it was not yet started (if the
error happend before creating the thread).
*/
VOID
(
end_io_cache
(
&
info
->
rec_cache
));
VOID
(
end_io_cache
(
&
info
->
rec_cache
));
/*
Destroy the new data cache in case of non-quick repair. All slave
threads did either detach from the share by remove_io_thread()
already or they were not yet started (if the error happend before
creating the threads).
*/
if
(
!
rep_quick
)
VOID
(
end_io_cache
(
&
new_data_cache
));
if
(
!
got_error
)
if
(
!
got_error
)
{
{
/* Replace the actual file with the temporary file */
/* Replace the actual file with the temporary file */
...
@@ -2879,12 +3019,41 @@ static int sort_ft_key_read(MI_SORT_PARAM *sort_param, void *key)
...
@@ -2879,12 +3019,41 @@ static int sort_ft_key_read(MI_SORT_PARAM *sort_param, void *key)
}
/* sort_ft_key_read */
}
/* sort_ft_key_read */
/* Read next record from file using parameters in sort_info */
/*
/* Return -1 if end of file, 0 if ok and > 0 if error */
Read next record from file using parameters in sort_info.
SYNOPSIS
sort_get_next_record()
sort_param Information about and for the sort process
NOTE
Dynamic Records With Non-Quick Parallel Repair
For non-quick parallel repair we use a synchronized read/write
cache. This means that one thread is the master who fixes the data
file by reading each record from the old data file and writing it
to the new data file. By doing this the records in the new data
file are written contiguously. Whenever the write buffer is full,
it is copied to the read buffer. The slaves read from the read
buffer, which is not associated with a file. Thus read_cache.file
is -1. When using _mi_read_cache(), the slaves must always set
flag to READING_NEXT so that the function never tries to read from
file. This is safe because the records are contiguous. There is no
need to read outside the cache. This condition is evaluated in the
variable 'parallel_flag' for quick reference. read_cache.file must
be >= 0 in every other case.
RETURN
-1 end of file
0 ok
> 0 error
*/
static
int
sort_get_next_record
(
MI_SORT_PARAM
*
sort_param
)
static
int
sort_get_next_record
(
MI_SORT_PARAM
*
sort_param
)
{
{
int
searching
;
int
searching
;
int
parallel_flag
;
uint
found_record
,
b_type
,
left_length
;
uint
found_record
,
b_type
,
left_length
;
my_off_t
pos
;
my_off_t
pos
;
byte
*
to
;
byte
*
to
;
...
@@ -2922,7 +3091,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -2922,7 +3091,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
sort_param
->
max_pos
=
(
sort_param
->
pos
+=
share
->
base
.
pack_reclength
);
sort_param
->
max_pos
=
(
sort_param
->
pos
+=
share
->
base
.
pack_reclength
);
if
(
*
sort_param
->
record
)
if
(
*
sort_param
->
record
)
{
{
if
(
param
->
calc_checksum
)
if
(
sort_
param
->
calc_checksum
)
param
->
glob_crc
+=
(
info
->
checksum
=
param
->
glob_crc
+=
(
info
->
checksum
=
mi_static_checksum
(
info
,
sort_param
->
record
));
mi_static_checksum
(
info
,
sort_param
->
record
));
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
...
@@ -2937,6 +3106,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -2937,6 +3106,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
LINT_INIT
(
to
);
LINT_INIT
(
to
);
pos
=
sort_param
->
pos
;
pos
=
sort_param
->
pos
;
searching
=
(
sort_param
->
fix_datafile
&&
(
param
->
testflag
&
T_EXTEND
));
searching
=
(
sort_param
->
fix_datafile
&&
(
param
->
testflag
&
T_EXTEND
));
parallel_flag
=
(
sort_param
->
read_cache
.
file
<
0
)
?
READING_NEXT
:
0
;
for
(;;)
for
(;;)
{
{
found_record
=
block_info
.
second_read
=
0
;
found_record
=
block_info
.
second_read
=
0
;
...
@@ -2967,7 +3137,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -2967,7 +3137,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
(
byte
*
)
block_info
.
header
,
pos
,
(
byte
*
)
block_info
.
header
,
pos
,
MI_BLOCK_INFO_HEADER_LENGTH
,
MI_BLOCK_INFO_HEADER_LENGTH
,
(
!
found_record
?
READING_NEXT
:
0
)
|
(
!
found_record
?
READING_NEXT
:
0
)
|
READING_HEADER
))
parallel_flag
|
READING_HEADER
))
{
{
if
(
found_record
)
if
(
found_record
)
{
{
...
@@ -3144,9 +3314,31 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3144,9 +3314,31 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
llstr
(
sort_param
->
start_recpos
,
llbuff
));
llstr
(
sort_param
->
start_recpos
,
llbuff
));
goto
try_next
;
goto
try_next
;
}
}
if
(
_mi_read_cache
(
&
sort_param
->
read_cache
,
to
,
block_info
.
filepos
,
/*
Copy information that is already read. Avoid accessing data
below the cache start. This could happen if the header
streched over the end of the previous buffer contents.
*/
{
uint
header_len
=
(
uint
)
(
block_info
.
filepos
-
pos
);
uint
prefetch_len
=
(
MI_BLOCK_INFO_HEADER_LENGTH
-
header_len
);
if
(
prefetch_len
>
block_info
.
data_len
)
prefetch_len
=
block_info
.
data_len
;
if
(
prefetch_len
)
{
memcpy
(
to
,
block_info
.
header
+
header_len
,
prefetch_len
);
block_info
.
filepos
+=
prefetch_len
;
block_info
.
data_len
-=
prefetch_len
;
left_length
-=
prefetch_len
;
to
+=
prefetch_len
;
}
}
if
(
block_info
.
data_len
&&
_mi_read_cache
(
&
sort_param
->
read_cache
,
to
,
block_info
.
filepos
,
block_info
.
data_len
,
block_info
.
data_len
,
(
found_record
==
1
?
READING_NEXT
:
0
)))
(
found_record
==
1
?
READING_NEXT
:
0
)
|
parallel_flag
))
{
{
mi_check_print_info
(
param
,
mi_check_print_info
(
param
,
"Read error for block at: %s (error: %d); Skipped"
,
"Read error for block at: %s (error: %d); Skipped"
,
...
@@ -3176,13 +3368,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3176,13 +3368,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
{
{
if
(
sort_param
->
read_cache
.
error
<
0
)
if
(
sort_param
->
read_cache
.
error
<
0
)
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
if
(
info
->
s
->
calc_checksum
)
if
(
sort_param
->
calc_checksum
)
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
);
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
);
if
((
param
->
testflag
&
(
T_EXTEND
|
T_REP
))
||
searching
)
if
((
param
->
testflag
&
(
T_EXTEND
|
T_REP
))
||
searching
)
{
{
if
(
_mi_rec_check
(
info
,
sort_param
->
record
,
sort_param
->
rec_buff
,
if
(
_mi_rec_check
(
info
,
sort_param
->
record
,
sort_param
->
rec_buff
,
sort_param
->
find_length
,
sort_param
->
find_length
,
(
param
->
testflag
&
T_QUICK
)
&&
(
param
->
testflag
&
T_QUICK
)
&&
sort_param
->
calc_checksum
&&
test
(
info
->
s
->
calc_checksum
)))
test
(
info
->
s
->
calc_checksum
)))
{
{
mi_check_print_info
(
param
,
"Found wrong packed record at %s"
,
mi_check_print_info
(
param
,
"Found wrong packed record at %s"
,
...
@@ -3190,7 +3383,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3190,7 +3383,7 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
goto
try_next
;
goto
try_next
;
}
}
}
}
if
(
param
->
calc_checksum
)
if
(
sort_
param
->
calc_checksum
)
param
->
glob_crc
+=
info
->
checksum
;
param
->
glob_crc
+=
info
->
checksum
;
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
...
@@ -3217,7 +3410,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3217,7 +3410,8 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
DBUG_RETURN
(
1
);
/* Something wrong with data */
DBUG_RETURN
(
1
);
/* Something wrong with data */
}
}
sort_param
->
start_recpos
=
sort_param
->
pos
;
sort_param
->
start_recpos
=
sort_param
->
pos
;
if
(
_mi_pack_get_block_info
(
info
,
&
block_info
,
-
1
,
sort_param
->
pos
))
if
(
_mi_pack_get_block_info
(
info
,
&
sort_param
->
bit_buff
,
&
block_info
,
&
sort_param
->
rec_buff
,
-
1
,
sort_param
->
pos
))
DBUG_RETURN
(
-
1
);
DBUG_RETURN
(
-
1
);
if
(
!
block_info
.
rec_len
&&
if
(
!
block_info
.
rec_len
&&
sort_param
->
pos
+
MEMMAP_EXTRA_MARGIN
==
sort_param
->
pos
+
MEMMAP_EXTRA_MARGIN
==
...
@@ -3241,15 +3435,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3241,15 +3435,14 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
llstr
(
sort_param
->
pos
,
llbuff
));
llstr
(
sort_param
->
pos
,
llbuff
));
continue
;
continue
;
}
}
if
(
_mi_pack_rec_unpack
(
info
,
sort_param
->
record
,
sort_param
->
rec_buff
,
if
(
_mi_pack_rec_unpack
(
info
,
&
sort_param
->
bit_buff
,
sort_param
->
record
,
block_info
.
rec_len
))
sort_param
->
rec_buff
,
block_info
.
rec_len
))
{
{
if
(
!
searching
)
if
(
!
searching
)
mi_check_print_info
(
param
,
"Found wrong record at %s"
,
mi_check_print_info
(
param
,
"Found wrong record at %s"
,
llstr
(
sort_param
->
pos
,
llbuff
));
llstr
(
sort_param
->
pos
,
llbuff
));
continue
;
continue
;
}
}
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
);
if
(
!
sort_param
->
fix_datafile
)
if
(
!
sort_param
->
fix_datafile
)
{
{
sort_param
->
filepos
=
sort_param
->
pos
;
sort_param
->
filepos
=
sort_param
->
pos
;
...
@@ -3259,8 +3452,9 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3259,8 +3452,9 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
sort_param
->
max_pos
=
(
sort_param
->
pos
=
block_info
.
filepos
+
sort_param
->
max_pos
=
(
sort_param
->
pos
=
block_info
.
filepos
+
block_info
.
rec_len
);
block_info
.
rec_len
);
info
->
packed_length
=
block_info
.
rec_len
;
info
->
packed_length
=
block_info
.
rec_len
;
if
(
param
->
calc_checksum
)
if
(
sort_param
->
calc_checksum
)
param
->
glob_crc
+=
info
->
checksum
;
param
->
glob_crc
+=
(
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
));
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
}
}
}
}
...
@@ -3268,7 +3462,20 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
...
@@ -3268,7 +3462,20 @@ static int sort_get_next_record(MI_SORT_PARAM *sort_param)
}
}
/* Write record to new file */
/*
Write record to new file.
SYNOPSIS
sort_write_record()
sort_param Sort parameters.
NOTE
This is only called by a master thread if parallel repair is used.
RETURN
0 OK
1 Error
*/
int
sort_write_record
(
MI_SORT_PARAM
*
sort_param
)
int
sort_write_record
(
MI_SORT_PARAM
*
sort_param
)
{
{
...
@@ -3317,6 +3524,7 @@ int sort_write_record(MI_SORT_PARAM *sort_param)
...
@@ -3317,6 +3524,7 @@ int sort_write_record(MI_SORT_PARAM *sort_param)
}
}
from
=
sort_info
->
buff
+
ALIGN_SIZE
(
MI_MAX_DYN_BLOCK_HEADER
);
from
=
sort_info
->
buff
+
ALIGN_SIZE
(
MI_MAX_DYN_BLOCK_HEADER
);
}
}
/* We can use info->checksum here as only one thread calls this. */
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
);
info
->
checksum
=
mi_checksum
(
info
,
sort_param
->
record
);
reclength
=
_mi_rec_pack
(
info
,
from
,
sort_param
->
record
);
reclength
=
_mi_rec_pack
(
info
,
from
,
sort_param
->
record
);
flag
=
0
;
flag
=
0
;
...
@@ -3726,7 +3934,7 @@ static int sort_delete_record(MI_SORT_PARAM *sort_param)
...
@@ -3726,7 +3934,7 @@ static int sort_delete_record(MI_SORT_PARAM *sort_param)
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
}
}
}
}
if
(
param
->
calc_checksum
)
if
(
sort_
param
->
calc_checksum
)
param
->
glob_crc
-=
(
*
info
->
s
->
calc_checksum
)(
info
,
sort_param
->
record
);
param
->
glob_crc
-=
(
*
info
->
s
->
calc_checksum
)(
info
,
sort_param
->
record
);
}
}
error
=
flush_io_cache
(
&
info
->
rec_cache
)
||
(
*
info
->
s
->
delete_record
)(
info
);
error
=
flush_io_cache
(
&
info
->
rec_cache
)
||
(
*
info
->
s
->
delete_record
)(
info
);
...
...
storage/myisam/mi_open.c
View file @
b15a23a7
...
@@ -219,7 +219,10 @@ MI_INFO *mi_open(const char *name, int mode, uint open_flags)
...
@@ -219,7 +219,10 @@ MI_INFO *mi_open(const char *name, int mode, uint open_flags)
((
open_flags
&
HA_OPEN_ABORT_IF_CRASHED
)
&&
((
open_flags
&
HA_OPEN_ABORT_IF_CRASHED
)
&&
(
my_disable_locking
&&
share
->
state
.
open_count
))))
(
my_disable_locking
&&
share
->
state
.
open_count
))))
{
{
DBUG_PRINT
(
"error"
,(
"Table is marked as crashed"
));
DBUG_PRINT
(
"error"
,(
"Table is marked as crashed. open_flags: %u "
"changed: %u open_count: %u !locking: %d"
,
open_flags
,
share
->
state
.
changed
,
share
->
state
.
open_count
,
my_disable_locking
));
my_errno
=
((
share
->
state
.
changed
&
STATE_CRASHED_ON_REPAIR
)
?
my_errno
=
((
share
->
state
.
changed
&
STATE_CRASHED_ON_REPAIR
)
?
HA_ERR_CRASHED_ON_REPAIR
:
HA_ERR_CRASHED_ON_USAGE
);
HA_ERR_CRASHED_ON_REPAIR
:
HA_ERR_CRASHED_ON_USAGE
);
goto
err
;
goto
err
;
...
...
storage/myisam/mi_packrec.c
View file @
b15a23a7
...
@@ -103,7 +103,8 @@ static uint fill_and_get_bits(MI_BIT_BUFF *bit_buff,uint count);
...
@@ -103,7 +103,8 @@ static uint fill_and_get_bits(MI_BIT_BUFF *bit_buff,uint count);
static
void
fill_buffer
(
MI_BIT_BUFF
*
bit_buff
);
static
void
fill_buffer
(
MI_BIT_BUFF
*
bit_buff
);
static
uint
max_bit
(
uint
value
);
static
uint
max_bit
(
uint
value
);
#ifdef HAVE_MMAP
#ifdef HAVE_MMAP
static
uchar
*
_mi_mempack_get_block_info
(
MI_INFO
*
myisam
,
MI_BLOCK_INFO
*
info
,
static
uchar
*
_mi_mempack_get_block_info
(
MI_INFO
*
myisam
,
MI_BIT_BUFF
*
bit_buff
,
MI_BLOCK_INFO
*
info
,
byte
**
rec_buff_p
,
uchar
*
header
);
uchar
*
header
);
#endif
#endif
...
@@ -449,13 +450,15 @@ int _mi_read_pack_record(MI_INFO *info, my_off_t filepos, byte *buf)
...
@@ -449,13 +450,15 @@ int _mi_read_pack_record(MI_INFO *info, my_off_t filepos, byte *buf)
DBUG_RETURN
(
-
1
);
/* _search() didn't find record */
DBUG_RETURN
(
-
1
);
/* _search() didn't find record */
file
=
info
->
dfile
;
file
=
info
->
dfile
;
if
(
_mi_pack_get_block_info
(
info
,
&
block_info
,
file
,
filepos
))
if
(
_mi_pack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
file
,
filepos
))
goto
err
;
goto
err
;
if
(
my_read
(
file
,(
byte
*
)
info
->
rec_buff
+
block_info
.
offset
,
if
(
my_read
(
file
,(
byte
*
)
info
->
rec_buff
+
block_info
.
offset
,
block_info
.
rec_len
-
block_info
.
offset
,
MYF
(
MY_NABP
)))
block_info
.
rec_len
-
block_info
.
offset
,
MYF
(
MY_NABP
)))
goto
panic
;
goto
panic
;
info
->
update
|=
HA_STATE_AKTIV
;
info
->
update
|=
HA_STATE_AKTIV
;
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
buf
,
info
->
rec_buff
,
block_info
.
rec_len
));
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
&
info
->
bit_buff
,
buf
,
info
->
rec_buff
,
block_info
.
rec_len
));
panic:
panic:
my_errno
=
HA_ERR_WRONG_IN_RECORD
;
my_errno
=
HA_ERR_WRONG_IN_RECORD
;
err:
err:
...
@@ -464,8 +467,8 @@ int _mi_read_pack_record(MI_INFO *info, my_off_t filepos, byte *buf)
...
@@ -464,8 +467,8 @@ int _mi_read_pack_record(MI_INFO *info, my_off_t filepos, byte *buf)
int
_mi_pack_rec_unpack
(
register
MI_INFO
*
info
,
register
byte
*
to
,
byte
*
from
,
int
_mi_pack_rec_unpack
(
register
MI_INFO
*
info
,
MI_BIT_BUFF
*
bit_buff
,
ulong
reclength
)
register
byte
*
to
,
byte
*
from
,
ulong
reclength
)
{
{
byte
*
end_field
;
byte
*
end_field
;
reg3
MI_COLUMNDEF
*
end
;
reg3
MI_COLUMNDEF
*
end
;
...
@@ -473,18 +476,18 @@ int _mi_pack_rec_unpack(register MI_INFO *info, register byte *to, byte *from,
...
@@ -473,18 +476,18 @@ int _mi_pack_rec_unpack(register MI_INFO *info, register byte *to, byte *from,
MYISAM_SHARE
*
share
=
info
->
s
;
MYISAM_SHARE
*
share
=
info
->
s
;
DBUG_ENTER
(
"_mi_pack_rec_unpack"
);
DBUG_ENTER
(
"_mi_pack_rec_unpack"
);
init_bit_buffer
(
&
info
->
bit_buff
,
(
uchar
*
)
from
,
reclength
);
init_bit_buffer
(
bit_buff
,
(
uchar
*
)
from
,
reclength
);
for
(
current_field
=
share
->
rec
,
end
=
current_field
+
share
->
base
.
fields
;
for
(
current_field
=
share
->
rec
,
end
=
current_field
+
share
->
base
.
fields
;
current_field
<
end
;
current_field
<
end
;
current_field
++
,
to
=
end_field
)
current_field
++
,
to
=
end_field
)
{
{
end_field
=
to
+
current_field
->
length
;
end_field
=
to
+
current_field
->
length
;
(
*
current_field
->
unpack
)(
current_field
,
&
info
->
bit_buff
,
(
uchar
*
)
to
,
(
*
current_field
->
unpack
)(
current_field
,
bit_buff
,
(
uchar
*
)
to
,
(
uchar
*
)
end_field
);
(
uchar
*
)
end_field
);
}
}
if
(
!
info
->
bit_buff
.
error
&&
if
(
!
bit_buff
->
error
&&
info
->
bit_buff
.
pos
-
info
->
bit_buff
.
bits
/
8
==
info
->
bit_buff
.
end
)
bit_buff
->
pos
-
bit_buff
->
bits
/
8
==
bit_buff
->
end
)
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
info
->
update
&=
~
HA_STATE_AKTIV
;
info
->
update
&=
~
HA_STATE_AKTIV
;
DBUG_RETURN
(
my_errno
=
HA_ERR_WRONG_IN_RECORD
);
DBUG_RETURN
(
my_errno
=
HA_ERR_WRONG_IN_RECORD
);
...
@@ -1015,13 +1018,16 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
...
@@ -1015,13 +1018,16 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
if
(
info
->
opt_flag
&
READ_CACHE_USED
)
if
(
info
->
opt_flag
&
READ_CACHE_USED
)
{
{
if
(
_mi_read_cache
(
&
info
->
rec_cache
,(
byte
*
)
block_info
.
header
,
filepos
,
if
(
_mi_read_cache
(
&
info
->
rec_cache
,
(
byte
*
)
block_info
.
header
,
share
->
pack
.
ref_length
,
skip_deleted_blocks
))
filepos
,
share
->
pack
.
ref_length
,
skip_deleted_blocks
?
READING_NEXT
:
0
))
goto
err
;
goto
err
;
b_type
=
_mi_pack_get_block_info
(
info
,
&
block_info
,
-
1
,
filepos
);
b_type
=
_mi_pack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
-
1
,
filepos
);
}
}
else
else
b_type
=
_mi_pack_get_block_info
(
info
,
&
block_info
,
info
->
dfile
,
filepos
);
b_type
=
_mi_pack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
info
->
dfile
,
filepos
);
if
(
b_type
)
if
(
b_type
)
goto
err
;
/* Error code is already set */
goto
err
;
/* Error code is already set */
#ifndef DBUG_OFF
#ifndef DBUG_OFF
...
@@ -1034,9 +1040,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
...
@@ -1034,9 +1040,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
if
(
info
->
opt_flag
&
READ_CACHE_USED
)
if
(
info
->
opt_flag
&
READ_CACHE_USED
)
{
{
if
(
_mi_read_cache
(
&
info
->
rec_cache
,(
byte
*
)
info
->
rec_buff
,
if
(
_mi_read_cache
(
&
info
->
rec_cache
,
(
byte
*
)
info
->
rec_buff
,
block_info
.
filepos
,
block_info
.
rec_len
,
block_info
.
filepos
,
block_info
.
rec_len
,
skip_deleted_blocks
))
skip_deleted_blocks
?
READING_NEXT
:
0
))
goto
err
;
goto
err
;
}
}
else
else
...
@@ -1051,8 +1057,8 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
...
@@ -1051,8 +1057,8 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
info
->
nextpos
=
block_info
.
filepos
+
block_info
.
rec_len
;
info
->
nextpos
=
block_info
.
filepos
+
block_info
.
rec_len
;
info
->
update
|=
HA_STATE_AKTIV
|
HA_STATE_KEY_CHANGED
;
info
->
update
|=
HA_STATE_AKTIV
|
HA_STATE_KEY_CHANGED
;
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
buf
,
info
->
rec_buf
f
,
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
&
info
->
bit_buff
,
bu
f
,
block_info
.
rec_len
));
info
->
rec_buff
,
block_info
.
rec_len
));
err:
err:
DBUG_RETURN
(
my_errno
);
DBUG_RETURN
(
my_errno
);
}
}
...
@@ -1060,8 +1066,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
...
@@ -1060,8 +1066,9 @@ int _mi_read_rnd_pack_record(MI_INFO *info, byte *buf,
/* Read and process header from a huff-record-file */
/* Read and process header from a huff-record-file */
uint
_mi_pack_get_block_info
(
MI_INFO
*
myisam
,
MI_BLOCK_INFO
*
info
,
File
file
,
uint
_mi_pack_get_block_info
(
MI_INFO
*
myisam
,
MI_BIT_BUFF
*
bit_buff
,
my_off_t
filepos
)
MI_BLOCK_INFO
*
info
,
byte
**
rec_buff_p
,
File
file
,
my_off_t
filepos
)
{
{
uchar
*
header
=
info
->
header
;
uchar
*
header
=
info
->
header
;
uint
head_length
,
ref_length
;
uint
head_length
,
ref_length
;
...
@@ -1086,17 +1093,17 @@ uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BLOCK_INFO *info, File file,
...
@@ -1086,17 +1093,17 @@ uint _mi_pack_get_block_info(MI_INFO *myisam, MI_BLOCK_INFO *info, File file,
head_length
+=
read_pack_length
((
uint
)
myisam
->
s
->
pack
.
version
,
head_length
+=
read_pack_length
((
uint
)
myisam
->
s
->
pack
.
version
,
header
+
head_length
,
&
info
->
blob_len
);
header
+
head_length
,
&
info
->
blob_len
);
if
(
!
(
mi_alloc_rec_buff
(
myisam
,
info
->
rec_len
+
info
->
blob_len
,
if
(
!
(
mi_alloc_rec_buff
(
myisam
,
info
->
rec_len
+
info
->
blob_len
,
&
myisam
->
rec_buff
)))
rec_buff_p
)))
return
BLOCK_FATAL_ERROR
;
/* not enough memory */
return
BLOCK_FATAL_ERROR
;
/* not enough memory */
myisam
->
bit_buff
.
blob_pos
=
(
uchar
*
)
myisam
->
rec_buff
+
info
->
rec_len
;
bit_buff
->
blob_pos
=
(
uchar
*
)
*
rec_buff_p
+
info
->
rec_len
;
myisam
->
bit_buff
.
blob_end
=
myisam
->
bit_buff
.
blob_pos
+
info
->
blob_len
;
bit_buff
->
blob_end
=
bit_buff
->
blob_pos
+
info
->
blob_len
;
myisam
->
blob_length
=
info
->
blob_len
;
myisam
->
blob_length
=
info
->
blob_len
;
}
}
info
->
filepos
=
filepos
+
head_length
;
info
->
filepos
=
filepos
+
head_length
;
if
(
file
>
0
)
if
(
file
>
0
)
{
{
info
->
offset
=
min
(
info
->
rec_len
,
ref_length
-
head_length
);
info
->
offset
=
min
(
info
->
rec_len
,
ref_length
-
head_length
);
memcpy
(
myisam
->
rec_buff
,
header
+
head_length
,
info
->
offset
);
memcpy
(
*
rec_buff_p
,
header
+
head_length
,
info
->
offset
);
}
}
return
0
;
return
0
;
}
}
...
@@ -1214,7 +1221,8 @@ void _mi_unmap_file(MI_INFO *info)
...
@@ -1214,7 +1221,8 @@ void _mi_unmap_file(MI_INFO *info)
}
}
static
uchar
*
_mi_mempack_get_block_info
(
MI_INFO
*
myisam
,
MI_BLOCK_INFO
*
info
,
static
uchar
*
_mi_mempack_get_block_info
(
MI_INFO
*
myisam
,
MI_BIT_BUFF
*
bit_buff
,
MI_BLOCK_INFO
*
info
,
byte
**
rec_buff_p
,
uchar
*
header
)
uchar
*
header
)
{
{
header
+=
read_pack_length
((
uint
)
myisam
->
s
->
pack
.
version
,
header
,
header
+=
read_pack_length
((
uint
)
myisam
->
s
->
pack
.
version
,
header
,
...
@@ -1225,10 +1233,10 @@ static uchar *_mi_mempack_get_block_info(MI_INFO *myisam,MI_BLOCK_INFO *info,
...
@@ -1225,10 +1233,10 @@ static uchar *_mi_mempack_get_block_info(MI_INFO *myisam,MI_BLOCK_INFO *info,
&
info
->
blob_len
);
&
info
->
blob_len
);
/* mi_alloc_rec_buff sets my_errno on error */
/* mi_alloc_rec_buff sets my_errno on error */
if
(
!
(
mi_alloc_rec_buff
(
myisam
,
info
->
blob_len
,
if
(
!
(
mi_alloc_rec_buff
(
myisam
,
info
->
blob_len
,
&
myisam
->
rec_buff
)))
rec_buff_p
)))
return
0
;
/* not enough memory */
return
0
;
/* not enough memory */
myisam
->
bit_buff
.
blob_pos
=
(
uchar
*
)
myisam
->
rec_buff
;
bit_buff
->
blob_pos
=
(
uchar
*
)
*
rec_buff_p
;
myisam
->
bit_buff
.
blob_end
=
(
uchar
*
)
myisam
->
rec_buff
+
info
->
blob_len
;
bit_buff
->
blob_end
=
(
uchar
*
)
*
rec_buff_p
+
info
->
blob_len
;
}
}
return
header
;
return
header
;
}
}
...
@@ -1244,11 +1252,13 @@ static int _mi_read_mempack_record(MI_INFO *info, my_off_t filepos, byte *buf)
...
@@ -1244,11 +1252,13 @@ static int _mi_read_mempack_record(MI_INFO *info, my_off_t filepos, byte *buf)
if
(
filepos
==
HA_OFFSET_ERROR
)
if
(
filepos
==
HA_OFFSET_ERROR
)
DBUG_RETURN
(
-
1
);
/* _search() didn't find record */
DBUG_RETURN
(
-
1
);
/* _search() didn't find record */
if
(
!
(
pos
=
(
byte
*
)
_mi_mempack_get_block_info
(
info
,
&
block_info
,
if
(
!
(
pos
=
(
byte
*
)
_mi_mempack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
(
uchar
*
)
share
->
file_map
+
(
uchar
*
)
share
->
file_map
+
filepos
)))
filepos
)))
DBUG_RETURN
(
-
1
);
DBUG_RETURN
(
-
1
);
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
buf
,
pos
,
block_info
.
rec_len
));
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
&
info
->
bit_buff
,
buf
,
pos
,
block_info
.
rec_len
));
}
}
...
@@ -1268,7 +1278,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf,
...
@@ -1268,7 +1278,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf,
my_errno
=
HA_ERR_END_OF_FILE
;
my_errno
=
HA_ERR_END_OF_FILE
;
goto
err
;
goto
err
;
}
}
if
(
!
(
pos
=
(
byte
*
)
_mi_mempack_get_block_info
(
info
,
&
block_info
,
if
(
!
(
pos
=
(
byte
*
)
_mi_mempack_get_block_info
(
info
,
&
info
->
bit_buff
,
&
block_info
,
&
info
->
rec_buff
,
(
uchar
*
)
(
uchar
*
)
(
start
=
share
->
file_map
+
(
start
=
share
->
file_map
+
filepos
))))
filepos
))))
...
@@ -1285,7 +1296,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf,
...
@@ -1285,7 +1296,8 @@ static int _mi_read_rnd_mempack_record(MI_INFO *info, byte *buf,
info
->
nextpos
=
filepos
+
(
uint
)
(
pos
-
start
)
+
block_info
.
rec_len
;
info
->
nextpos
=
filepos
+
(
uint
)
(
pos
-
start
)
+
block_info
.
rec_len
;
info
->
update
|=
HA_STATE_AKTIV
|
HA_STATE_KEY_CHANGED
;
info
->
update
|=
HA_STATE_AKTIV
|
HA_STATE_KEY_CHANGED
;
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
buf
,
pos
,
block_info
.
rec_len
));
DBUG_RETURN
(
_mi_pack_rec_unpack
(
info
,
&
info
->
bit_buff
,
buf
,
pos
,
block_info
.
rec_len
));
err:
err:
DBUG_RETURN
(
my_errno
);
DBUG_RETURN
(
my_errno
);
}
}
...
...
storage/myisam/myisamdef.h
View file @
b15a23a7
...
@@ -76,6 +76,7 @@ typedef struct st_mi_state_info
...
@@ -76,6 +76,7 @@ typedef struct st_mi_state_info
ulong
sec_index_changed
;
/* Updated when new sec_index */
ulong
sec_index_changed
;
/* Updated when new sec_index */
ulong
sec_index_used
;
/* which extra index are in use */
ulong
sec_index_used
;
/* which extra index are in use */
ulonglong
key_map
;
/* Which keys are in use */
ulonglong
key_map
;
/* Which keys are in use */
ha_checksum
checksum
;
/* Table checksum */
ulong
version
;
/* timestamp of create */
ulong
version
;
/* timestamp of create */
time_t
create_time
;
/* Time when created database */
time_t
create_time
;
/* Time when created database */
time_t
recover_time
;
/* Time for last recover */
time_t
recover_time
;
/* Time for last recover */
...
@@ -177,6 +178,7 @@ typedef struct st_mi_isam_share { /* Shared between opens */
...
@@ -177,6 +178,7 @@ typedef struct st_mi_isam_share { /* Shared between opens */
int
(
*
delete_record
)(
struct
st_myisam_info
*
);
int
(
*
delete_record
)(
struct
st_myisam_info
*
);
int
(
*
read_rnd
)(
struct
st_myisam_info
*
,
byte
*
,
my_off_t
,
my_bool
);
int
(
*
read_rnd
)(
struct
st_myisam_info
*
,
byte
*
,
my_off_t
,
my_bool
);
int
(
*
compare_record
)(
struct
st_myisam_info
*
,
const
byte
*
);
int
(
*
compare_record
)(
struct
st_myisam_info
*
,
const
byte
*
);
/* Function to use for a row checksum. */
ha_checksum
(
*
calc_checksum
)(
struct
st_myisam_info
*
,
const
byte
*
);
ha_checksum
(
*
calc_checksum
)(
struct
st_myisam_info
*
,
const
byte
*
);
int
(
*
compare_unique
)(
struct
st_myisam_info
*
,
MI_UNIQUEDEF
*
,
int
(
*
compare_unique
)(
struct
st_myisam_info
*
,
MI_UNIQUEDEF
*
,
const
byte
*
record
,
my_off_t
pos
);
const
byte
*
record
,
my_off_t
pos
);
...
@@ -262,7 +264,7 @@ struct st_myisam_info {
...
@@ -262,7 +264,7 @@ struct st_myisam_info {
my_off_t
last_keypage
;
/* Last key page read */
my_off_t
last_keypage
;
/* Last key page read */
my_off_t
last_search_keypage
;
/* Last keypage when searching */
my_off_t
last_search_keypage
;
/* Last keypage when searching */
my_off_t
dupp_key_pos
;
my_off_t
dupp_key_pos
;
ha_checksum
checksum
;
ha_checksum
checksum
;
/* Temp storage for row checksum */
/* QQ: the folloing two xxx_length fields should be removed,
/* QQ: the folloing two xxx_length fields should be removed,
as they are not compatible with parallel repair */
as they are not compatible with parallel repair */
ulong
packed_length
,
blob_length
;
/* Length of found, packed record */
ulong
packed_length
,
blob_length
;
/* Length of found, packed record */
...
@@ -314,6 +316,7 @@ typedef struct st_mi_sort_param
...
@@ -314,6 +316,7 @@ typedef struct st_mi_sort_param
pthread_t
thr
;
pthread_t
thr
;
IO_CACHE
read_cache
,
tempfile
,
tempfile_for_exceptions
;
IO_CACHE
read_cache
,
tempfile
,
tempfile_for_exceptions
;
DYNAMIC_ARRAY
buffpek
;
DYNAMIC_ARRAY
buffpek
;
MI_BIT_BUFF
bit_buff
;
/* For parallel repair of packrec. */
/*
/*
The next two are used to collect statistics, see update_key_parts for
The next two are used to collect statistics, see update_key_parts for
...
@@ -326,6 +329,7 @@ typedef struct st_mi_sort_param
...
@@ -326,6 +329,7 @@ typedef struct st_mi_sort_param
uint
key
,
key_length
,
real_key_length
,
sortbuff_size
;
uint
key
,
key_length
,
real_key_length
,
sortbuff_size
;
uint
maxbuffers
,
keys
,
find_length
,
sort_keys_length
;
uint
maxbuffers
,
keys
,
find_length
,
sort_keys_length
;
my_bool
fix_datafile
,
master
;
my_bool
fix_datafile
,
master
;
my_bool
calc_checksum
;
/* calculate table checksum */
MI_KEYDEF
*
keyinfo
;
MI_KEYDEF
*
keyinfo
;
HA_KEYSEG
*
seg
;
HA_KEYSEG
*
seg
;
SORT_INFO
*
sort_info
;
SORT_INFO
*
sort_info
;
...
@@ -379,8 +383,15 @@ typedef struct st_mi_sort_param
...
@@ -379,8 +383,15 @@ typedef struct st_mi_sort_param
#define mi_putint(x,y,nod) { uint16 boh=(nod ? (uint16) 32768 : 0) + (uint16) (y);\
#define mi_putint(x,y,nod) { uint16 boh=(nod ? (uint16) 32768 : 0) + (uint16) (y);\
mi_int2store(x,boh); }
mi_int2store(x,boh); }
#define mi_test_if_nod(x) (x[0] & 128 ? info->s->base.key_reflength : 0)
#define mi_test_if_nod(x) (x[0] & 128 ? info->s->base.key_reflength : 0)
#define mi_mark_crashed(x) (x)->s->state.changed|=STATE_CRASHED
#define mi_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \
#define mi_mark_crashed_on_repair(x) { (x)->s->state.changed|=STATE_CRASHED|STATE_CRASHED_ON_REPAIR ; (x)->update|= HA_STATE_CHANGED; }
DBUG_PRINT("error", ("Marked table crashed")); \
}while(0)
#define mi_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \
STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \
(x)->update|= HA_STATE_CHANGED; \
DBUG_PRINT("error", \
("Marked table crashed")); \
}while(0)
#define mi_is_crashed(x) ((x)->s->state.changed & STATE_CRASHED)
#define mi_is_crashed(x) ((x)->s->state.changed & STATE_CRASHED)
#define mi_is_crashed_on_repair(x) ((x)->s->state.changed & STATE_CRASHED_ON_REPAIR)
#define mi_is_crashed_on_repair(x) ((x)->s->state.changed & STATE_CRASHED_ON_REPAIR)
#define mi_print_error(SHARE, ERRNO) \
#define mi_print_error(SHARE, ERRNO) \
...
@@ -620,8 +631,8 @@ extern void _mi_print_key(FILE *stream,HA_KEYSEG *keyseg,const uchar *key,
...
@@ -620,8 +631,8 @@ extern void _mi_print_key(FILE *stream,HA_KEYSEG *keyseg,const uchar *key,
extern
my_bool
_mi_read_pack_info
(
MI_INFO
*
info
,
pbool
fix_keys
);
extern
my_bool
_mi_read_pack_info
(
MI_INFO
*
info
,
pbool
fix_keys
);
extern
int
_mi_read_pack_record
(
MI_INFO
*
info
,
my_off_t
filepos
,
byte
*
buf
);
extern
int
_mi_read_pack_record
(
MI_INFO
*
info
,
my_off_t
filepos
,
byte
*
buf
);
extern
int
_mi_read_rnd_pack_record
(
MI_INFO
*
,
byte
*
,
my_off_t
,
my_bool
);
extern
int
_mi_read_rnd_pack_record
(
MI_INFO
*
,
byte
*
,
my_off_t
,
my_bool
);
extern
int
_mi_pack_rec_unpack
(
MI_INFO
*
info
,
byte
*
to
,
byte
*
from
,
extern
int
_mi_pack_rec_unpack
(
MI_INFO
*
info
,
MI_BIT_BUFF
*
bit_buff
,
ulong
reclength
);
byte
*
to
,
byte
*
from
,
ulong
reclength
);
extern
ulonglong
mi_safe_mul
(
ulonglong
a
,
ulonglong
b
);
extern
ulonglong
mi_safe_mul
(
ulonglong
a
,
ulonglong
b
);
extern
int
_mi_ft_update
(
MI_INFO
*
info
,
uint
keynr
,
byte
*
keybuf
,
extern
int
_mi_ft_update
(
MI_INFO
*
info
,
uint
keynr
,
byte
*
keybuf
,
const
byte
*
oldrec
,
const
byte
*
newrec
,
my_off_t
pos
);
const
byte
*
oldrec
,
const
byte
*
newrec
,
my_off_t
pos
);
...
@@ -686,7 +697,9 @@ extern "C" {
...
@@ -686,7 +697,9 @@ extern "C" {
extern
uint
_mi_get_block_info
(
MI_BLOCK_INFO
*
,
File
,
my_off_t
);
extern
uint
_mi_get_block_info
(
MI_BLOCK_INFO
*
,
File
,
my_off_t
);
extern
uint
_mi_rec_pack
(
MI_INFO
*
info
,
byte
*
to
,
const
byte
*
from
);
extern
uint
_mi_rec_pack
(
MI_INFO
*
info
,
byte
*
to
,
const
byte
*
from
);
extern
uint
_mi_pack_get_block_info
(
MI_INFO
*
,
MI_BLOCK_INFO
*
,
File
,
my_off_t
);
extern
uint
_mi_pack_get_block_info
(
MI_INFO
*
myisam
,
MI_BIT_BUFF
*
bit_buff
,
MI_BLOCK_INFO
*
info
,
byte
**
rec_buff_p
,
File
file
,
my_off_t
filepos
);
extern
void
_my_store_blob_length
(
byte
*
pos
,
uint
pack_length
,
uint
length
);
extern
void
_my_store_blob_length
(
byte
*
pos
,
uint
pack_length
,
uint
length
);
extern
void
_myisam_log
(
enum
myisam_log_commands
command
,
MI_INFO
*
info
,
extern
void
_myisam_log
(
enum
myisam_log_commands
command
,
MI_INFO
*
info
,
const
byte
*
buffert
,
uint
length
);
const
byte
*
buffert
,
uint
length
);
...
...
storage/myisam/sort.c
View file @
b15a23a7
...
@@ -309,7 +309,7 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys,
...
@@ -309,7 +309,7 @@ static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys,
pthread_handler_t
thr_find_all_keys
(
void
*
arg
)
pthread_handler_t
thr_find_all_keys
(
void
*
arg
)
{
{
MI_SORT_PARAM
*
info
=
(
MI_SORT_PARAM
*
)
arg
;
MI_SORT_PARAM
*
sort_param
=
(
MI_SORT_PARAM
*
)
arg
;
int
error
;
int
error
;
uint
memavl
,
old_memavl
,
keys
,
sort_length
;
uint
memavl
,
old_memavl
,
keys
,
sort_length
;
uint
idx
,
maxbuffer
;
uint
idx
,
maxbuffer
;
...
@@ -321,31 +321,33 @@ pthread_handler_t thr_find_all_keys(void *arg)
...
@@ -321,31 +321,33 @@ pthread_handler_t thr_find_all_keys(void *arg)
if
(
my_thread_init
())
if
(
my_thread_init
())
goto
err
;
goto
err
;
if
(
info
->
sort_info
->
got_error
)
DBUG_ENTER
(
"thr_find_all_keys"
);
DBUG_PRINT
(
"enter"
,
(
"master: %d"
,
sort_param
->
master
));
if
(
sort_param
->
sort_info
->
got_error
)
goto
err
;
goto
err
;
if
(
info
->
keyinfo
->
flag
&&
HA_VAR_LENGTH_KEY
)
if
(
sort_param
->
keyinfo
->
flag
&&
HA_VAR_LENGTH_KEY
)
{
{
info
->
write_keys
=
write_keys_varlen
;
sort_param
->
write_keys
=
write_keys_varlen
;
info
->
read_to_buffer
=
read_to_buffer_varlen
;
sort_param
->
read_to_buffer
=
read_to_buffer_varlen
;
info
->
write_key
=
write_merge_key_varlen
;
sort_param
->
write_key
=
write_merge_key_varlen
;
}
}
else
else
{
{
info
->
write_keys
=
write_keys
;
sort_param
->
write_keys
=
write_keys
;
info
->
read_to_buffer
=
read_to_buffer
;
sort_param
->
read_to_buffer
=
read_to_buffer
;
info
->
write_key
=
write_merge_key
;
sort_param
->
write_key
=
write_merge_key
;
}
}
my_b_clear
(
&
info
->
tempfile
);
my_b_clear
(
&
sort_param
->
tempfile
);
my_b_clear
(
&
info
->
tempfile_for_exceptions
);
my_b_clear
(
&
sort_param
->
tempfile_for_exceptions
);
bzero
((
char
*
)
&
info
->
buffpek
,
sizeof
(
info
->
buffpek
));
bzero
((
char
*
)
&
sort_param
->
buffpek
,
sizeof
(
sort_param
->
buffpek
));
bzero
((
char
*
)
&
info
->
unique
,
sizeof
(
info
->
unique
));
bzero
((
char
*
)
&
sort_param
->
unique
,
sizeof
(
sort_param
->
unique
));
sort_keys
=
(
uchar
**
)
NULL
;
sort_keys
=
(
uchar
**
)
NULL
;
memavl
=
max
(
info
->
sortbuff_size
,
MIN_SORT_MEMORY
);
memavl
=
max
(
sort_param
->
sortbuff_size
,
MIN_SORT_MEMORY
);
idx
=
info
->
sort_info
->
max_records
;
idx
=
sort_param
->
sort_info
->
max_records
;
sort_length
=
info
->
key_length
;
sort_length
=
sort_param
->
key_length
;
maxbuffer
=
1
;
maxbuffer
=
1
;
while
(
memavl
>=
MIN_SORT_MEMORY
)
while
(
memavl
>=
MIN_SORT_MEMORY
)
...
@@ -363,18 +365,19 @@ pthread_handler_t thr_find_all_keys(void *arg)
...
@@ -363,18 +365,19 @@ pthread_handler_t thr_find_all_keys(void *arg)
(
keys
=
(
memavl
-
sizeof
(
BUFFPEK
)
*
maxbuffer
)
/
(
keys
=
(
memavl
-
sizeof
(
BUFFPEK
)
*
maxbuffer
)
/
(
sort_length
+
sizeof
(
char
*
)))
<=
1
)
(
sort_length
+
sizeof
(
char
*
)))
<=
1
)
{
{
mi_check_print_error
(
info
->
sort_info
->
param
,
mi_check_print_error
(
sort_param
->
sort_info
->
param
,
"sort_buffer_size is to small"
);
"sort_buffer_size is to small"
);
goto
err
;
goto
err
;
}
}
}
}
while
((
maxbuffer
=
(
int
)
(
idx
/
(
keys
-
1
)
+
1
))
!=
skr
);
while
((
maxbuffer
=
(
int
)
(
idx
/
(
keys
-
1
)
+
1
))
!=
skr
);
}
}
if
((
sort_keys
=
(
uchar
**
)
my_malloc
(
keys
*
(
sort_length
+
sizeof
(
char
*
))
+
if
((
sort_keys
=
(
uchar
**
)
((
info
->
keyinfo
->
flag
&
HA_FULLTEXT
)
?
my_malloc
(
keys
*
(
sort_length
+
sizeof
(
char
*
))
+
((
sort_param
->
keyinfo
->
flag
&
HA_FULLTEXT
)
?
HA_FT_MAXBYTELEN
:
0
),
MYF
(
0
))))
HA_FT_MAXBYTELEN
:
0
),
MYF
(
0
))))
{
{
if
(
my_init_dynamic_array
(
&
info
->
buffpek
,
sizeof
(
BUFFPEK
),
if
(
my_init_dynamic_array
(
&
sort_param
->
buffpek
,
sizeof
(
BUFFPEK
),
maxbuffer
,
maxbuffer
/
2
))
maxbuffer
,
maxbuffer
/
2
))
{
{
my_free
((
gptr
)
sort_keys
,
MYF
(
0
));
my_free
((
gptr
)
sort_keys
,
MYF
(
0
));
...
@@ -389,70 +392,88 @@ pthread_handler_t thr_find_all_keys(void *arg)
...
@@ -389,70 +392,88 @@ pthread_handler_t thr_find_all_keys(void *arg)
}
}
if
(
memavl
<
MIN_SORT_MEMORY
)
if
(
memavl
<
MIN_SORT_MEMORY
)
{
{
mi_check_print_error
(
info
->
sort_info
->
param
,
"Sort buffer to small"
);
/* purecov: tested */
mi_check_print_error
(
sort_param
->
sort_info
->
param
,
"Sort buffer too small"
);
goto
err
;
/* purecov: tested */
goto
err
;
/* purecov: tested */
}
}
if
(
info
->
sort_info
->
param
->
testflag
&
T_VERBOSE
)
if
(
sort_param
->
sort_info
->
param
->
testflag
&
T_VERBOSE
)
printf
(
"Key %d - Allocating buffer for %d keys
\n
"
,
info
->
key
+
1
,
keys
);
printf
(
"Key %d - Allocating buffer for %d keys
\n
"
,
info
->
sort_keys
=
sort_keys
;
sort_param
->
key
+
1
,
keys
);
sort_param
->
sort_keys
=
sort_keys
;
idx
=
error
=
0
;
idx
=
error
=
0
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
while
(
!
(
error
=
info
->
sort_info
->
got_error
)
&&
DBUG_PRINT
(
"info"
,
(
"reading keys"
));
!
(
error
=
(
*
info
->
key_read
)(
info
,
sort_keys
[
idx
])))
while
(
!
(
error
=
sort_param
->
sort_info
->
got_error
)
&&
!
(
error
=
(
*
sort_param
->
key_read
)(
sort_param
,
sort_keys
[
idx
])))
{
{
if
(
info
->
real_key_length
>
info
->
key_length
)
if
(
sort_param
->
real_key_length
>
sort_param
->
key_length
)
{
{
if
(
write_key
(
info
,
sort_keys
[
idx
],
&
info
->
tempfile_for_exceptions
))
if
(
write_key
(
sort_param
,
sort_keys
[
idx
],
&
sort_param
->
tempfile_for_exceptions
))
goto
err
;
goto
err
;
continue
;
continue
;
}
}
if
(
++
idx
==
keys
)
if
(
++
idx
==
keys
)
{
{
if
(
info
->
write_keys
(
info
,
sort_keys
,
idx
-
1
,
if
(
sort_param
->
write_keys
(
sort_param
,
sort_keys
,
idx
-
1
,
(
BUFFPEK
*
)
alloc_dynamic
(
&
info
->
buffpek
),
(
BUFFPEK
*
)
alloc_dynamic
(
&
sort_param
->
buffpek
),
&
info
->
tempfile
))
&
sort_param
->
tempfile
))
goto
err
;
goto
err
;
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
sort_keys
[
0
]
=
(
uchar
*
)
(
sort_keys
+
keys
);
memcpy
(
sort_keys
[
0
],
sort_keys
[
idx
-
1
],(
size_t
)
info
->
key_length
);
memcpy
(
sort_keys
[
0
],
sort_keys
[
idx
-
1
],
(
size_t
)
sort_param
->
key_length
);
idx
=
1
;
idx
=
1
;
}
}
sort_keys
[
idx
]
=
sort_keys
[
idx
-
1
]
+
info
->
key_length
;
sort_keys
[
idx
]
=
sort_keys
[
idx
-
1
]
+
sort_param
->
key_length
;
}
}
if
(
error
>
0
)
if
(
error
>
0
)
goto
err
;
goto
err
;
if
(
info
->
buffpek
.
elements
)
if
(
sort_param
->
buffpek
.
elements
)
{
{
if
(
info
->
write_keys
(
info
,
sort_keys
,
idx
,
if
(
sort_param
->
write_keys
(
sort_param
,
sort_keys
,
idx
,
(
BUFFPEK
*
)
alloc_dynamic
(
&
info
->
buffpek
),
&
info
->
tempfile
))
(
BUFFPEK
*
)
alloc_dynamic
(
&
sort_param
->
buffpek
),
&
sort_param
->
tempfile
))
goto
err
;
goto
err
;
info
->
keys
=
(
info
->
buffpek
.
elements
-
1
)
*
(
keys
-
1
)
+
idx
;
sort_param
->
keys
=
(
sort_param
->
buffpek
.
elements
-
1
)
*
(
keys
-
1
)
+
idx
;
}
}
else
else
info
->
keys
=
idx
;
sort_param
->
keys
=
idx
;
info
->
sort_keys_length
=
keys
;
sort_param
->
sort_keys_length
=
keys
;
goto
ok
;
goto
ok
;
err:
err:
info
->
sort_info
->
got_error
=
1
;
/* no need to protect this with a mutex */
DBUG_PRINT
(
"error"
,
(
"got some error"
));
sort_param
->
sort_info
->
got_error
=
1
;
/* no need to protect with a mutex */
if
(
sort_keys
)
if
(
sort_keys
)
my_free
((
gptr
)
sort_keys
,
MYF
(
0
));
my_free
((
gptr
)
sort_keys
,
MYF
(
0
));
info
->
sort_keys
=
0
;
sort_param
->
sort_keys
=
0
;
delete_dynamic
(
&
info
->
buffpek
);
delete_dynamic
(
&
sort_param
->
buffpek
);
close_cached_file
(
&
info
->
tempfile
);
close_cached_file
(
&
sort_param
->
tempfile
);
close_cached_file
(
&
info
->
tempfile_for_exceptions
);
close_cached_file
(
&
sort_param
->
tempfile_for_exceptions
);
ok:
ok:
free_root
(
&
info
->
wordroot
,
MYF
(
0
));
free_root
(
&
info
->
wordroot
,
MYF
(
0
));
remove_io_thread
(
&
info
->
read_cache
);
/*
pthread_mutex_lock
(
&
info
->
sort_info
->
mutex
);
Detach from the share if the writer is involved. Avoid others to
info
->
sort_info
->
threads_running
--
;
be blocked. This includes a flush of the write buffer. This will
pthread_cond_signal
(
&
info
->
sort_info
->
cond
);
also indicate EOF to the readers.
pthread_mutex_unlock
(
&
info
->
sort_info
->
mutex
);
*/
if
(
sort_param
->
sort_info
->
info
->
rec_cache
.
share
)
remove_io_thread
(
&
sort_param
->
sort_info
->
info
->
rec_cache
);
/* Readers detach from the share if any. Avoid others to be blocked. */
if
(
sort_param
->
read_cache
.
share
)
remove_io_thread
(
&
sort_param
->
read_cache
);
pthread_mutex_lock
(
&
sort_param
->
sort_info
->
mutex
);
if
(
!--
sort_param
->
sort_info
->
threads_running
)
pthread_cond_signal
(
&
sort_param
->
sort_info
->
cond
);
pthread_mutex_unlock
(
&
sort_param
->
sort_info
->
mutex
);
DBUG_PRINT
(
"exit"
,
(
"======== ending thread ========"
));
my_thread_end
();
my_thread_end
();
return
NULL
;
return
NULL
;
}
}
...
@@ -470,6 +491,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
...
@@ -470,6 +491,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
MYISAM_SHARE
*
share
=
info
->
s
;
MYISAM_SHARE
*
share
=
info
->
s
;
MI_SORT_PARAM
*
sinfo
;
MI_SORT_PARAM
*
sinfo
;
byte
*
mergebuf
=
0
;
byte
*
mergebuf
=
0
;
DBUG_ENTER
(
"thr_write_keys"
);
LINT_INIT
(
length
);
LINT_INIT
(
length
);
for
(
i
=
0
,
sinfo
=
sort_param
;
for
(
i
=
0
,
sinfo
=
sort_param
;
...
@@ -606,7 +628,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
...
@@ -606,7 +628,7 @@ int thr_write_keys(MI_SORT_PARAM *sort_param)
}
}
}
}
my_free
((
gptr
)
mergebuf
,
MYF
(
MY_ALLOW_ZERO_PTR
));
my_free
((
gptr
)
mergebuf
,
MYF
(
MY_ALLOW_ZERO_PTR
));
return
got_error
;
DBUG_RETURN
(
got_error
)
;
}
}
#endif
/* THREAD */
#endif
/* THREAD */
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment