Commit 053e78b8 authored by unknown's avatar unknown

Merge PBXT 1.0.11 into MariaDB.

parents 0ae75abf 3d6686ce
[MYSQL]
tree_location = lp:maria
post_commit_to = maria-developers@lists.launchpad.net
post_commit_to = commits@mariadb.org
post_commit_url = lp:maria
tree_name = maria
project_name = "MariaDB 5.1, with Maria 1.5"
......@@ -1362,7 +1362,7 @@ SELECT * FROM t1 INNER JOIN t2 ON code=id
WHERE id='a12' AND (LENGTH(code)=5 OR code < 'a00');
id select_type table type possible_keys key key_len ref rows filtered Extra
1 SIMPLE t2 const PRIMARY PRIMARY 12 const 1 100.00 Using index
1 SIMPLE t1 ref code code 13 const 1 100.00 Using where; Using index
1 SIMPLE t1 ref code code 13 const 3 100.00 Using where; Using index
Warnings:
Note 1003 select `test`.`t1`.`code` AS `code`,'a12' AS `id` from `test`.`t1` join `test`.`t2` where ((`test`.`t1`.`code` = 'a12') and (length(`test`.`t1`.`code`) = 5))
DROP TABLE t1,t2;
......
......@@ -1055,7 +1055,7 @@ t0.b=t1.b AND
(t8.b=t9.b OR t8.c IS NULL) AND
(t9.a=1);
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t0 ref idx_a idx_a 5 const 1
1 SIMPLE t0 ref idx_a idx_a 5 const 2
1 SIMPLE t1 ref idx_b idx_b 5 test.t0.b 1
1 SIMPLE t2 ALL NULL NULL NULL NULL 3
1 SIMPLE t3 ALL NULL NULL NULL NULL 2
......
......@@ -388,7 +388,7 @@ Table Op Msg_type Msg_text
test.t1 analyze status OK
explain extended select a, not(not(a)), not(a <= 2 and not(a)), not(a not like "1"), not (a not in (1,2)), not(a != 2) from t1 where not(not(a)) having not(not(a));
id select_type table type possible_keys key key_len ref rows filtered Extra
1 SIMPLE t1 index NULL a 5 NULL 5 100.00 Using where; Using index
1 SIMPLE t1 index NULL a 5 NULL 21 100.00 Using where; Using index
Warnings:
Note 1003 select `test`.`t1`.`a` AS `a`,(`test`.`t1`.`a` <> 0) AS `not(not(a))`,((`test`.`t1`.`a` > 2) or `test`.`t1`.`a`) AS `not(a <= 2 and not(a))`,(`test`.`t1`.`a` like '1') AS `not(a not like "1")`,(`test`.`t1`.`a` in (1,2)) AS `not (a not in (1,2))`,(`test`.`t1`.`a` = 2) AS `not(a != 2)` from `test`.`t1` where `test`.`t1`.`a` having `test`.`t1`.`a`
drop table t1;
......@@ -179,7 +179,7 @@ Warnings:
Warning 1265 Data truncated for column 'i' at row 513
explain select * from t1 where i=2 or i is null;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref i i 4 const 1 Using index
1 SIMPLE t1 ref i i 4 const 25 Using index
select count(*) from t1 where i=2 or i is null;
count(*)
9
......
......@@ -401,14 +401,14 @@ a b c
1 0
explain select * from t1 where (a = 1 and b = 1 and c = 'b') or (a > 2) order by a desc;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 index a a 20 NULL 11 Using where; Using index
1 SIMPLE t1 range a a 20 NULL 7 Using where; Using index
select * from t1 where (a = 1 and b = 1 and c = 'b') or (a > 2) order by a desc;
a b c
1 1 b
1 1 b
explain select * from t1 where a < 2 and b <= 1 order by a desc, b desc;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range a a 4 NULL 1 Using where; Using index
1 SIMPLE t1 range a a 4 NULL 6 Using where; Using index
select * from t1 where a < 2 and b <= 1 order by a desc, b desc;
a b c
1 1 b
......@@ -432,7 +432,7 @@ a b c
1 1
explain select * from t1 where a between 1 and 3 and b <= 1 order by a desc, b desc;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range a a 8 NULL 1 Using where; Using index
1 SIMPLE t1 range a a 8 NULL 6 Using where; Using index
select * from t1 where a between 1 and 3 and b <= 1 order by a desc, b desc;
a b c
2 1 b
......@@ -444,7 +444,7 @@ a b c
1 0
explain select * from t1 where a between 0 and 1 order by a desc, b desc;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range a a 4 NULL 1 Using where; Using index
1 SIMPLE t1 range a a 4 NULL 6 Using where; Using index
select * from t1 where a between 0 and 1 order by a desc, b desc;
a b c
1 3 b
......
......@@ -14,7 +14,7 @@ set foreign_key_checks = 0;
drop table t1;
set foreign_key_checks = 1;
INSERT INTO t2 VALUES(2);
ERROR 23000: Cannot add or update a child row: a foreign key constraint fails (Referenced table `t1` not found)
ERROR 23000: Cannot add or update a child row: a foreign key constraint fails (Referenced table `test.t1` not found)
drop table if exists parent, child, child_child;
CREATE TABLE parent (
id INT NOT NULL,
......@@ -370,7 +370,7 @@ set foreign_key_checks = 0;
CREATE TABLE t2 (s1 INT DEFAULT NULL, FOREIGN KEY (s1) REFERENCES t1 (s1));
set foreign_key_checks = 1;
insert into t2 values (1);
ERROR 23000: Cannot add or update a child row: a foreign key constraint fails (Referenced table `t1` not found)
ERROR 23000: Cannot add or update a child row: a foreign key constraint fails (Referenced table `test.t1` not found)
set foreign_key_checks = 0;
insert into t2 values (1);
CREATE TABLE t1 (s1 INT PRIMARY KEY);
......
......@@ -964,5 +964,5 @@ Z
In following EXPLAIN the access method should be ref, #rows~=500 (and not 2)
explain select * from t2 where a=1000 and b<11;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 range a a 10 NULL 1 Using where
1 SIMPLE t2 range a a 10 NULL 2 Using where
drop table t1, t2;
......@@ -1384,52 +1384,52 @@ Table Op Msg_type Msg_text
test.t2 analyze status OK
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where t2.companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
1 SIMPLE t4 eq_ref PRIMARY PRIMARY 1 test.t2.companynr 1
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where t2.companynr > 0 or t2.companynr < 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
1 SIMPLE t4 eq_ref PRIMARY PRIMARY 1 test.t2.companynr 1
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where t2.companynr > 0 and t4.companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
1 SIMPLE t4 eq_ref PRIMARY PRIMARY 1 test.t2.companynr 1
explain select companynr,companyname from t4 left join t2 using (companynr) where companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
explain select companynr,companyname from t4 left join t2 using (companynr) where companynr > 0 or companynr < 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
explain select companynr,companyname from t4 left join t2 using (companynr) where companynr > 0 and companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where t2.companynr > 0 or t2.companynr is null;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL NULL NULL NULL NULL 12
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where t2.companynr > 0 or t2.companynr < 0 or t4.companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
explain select t2.companynr,companyname from t4 left join t2 using (companynr) where ifnull(t2.companynr,1)>0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL NULL NULL NULL NULL 12
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where
explain select companynr,companyname from t4 left join t2 using (companynr) where companynr > 0 or companynr is null;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
explain select companynr,companyname from t4 left join t2 using (companynr) where companynr > 0 or companynr < 0 or companynr > 0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL PRIMARY NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
explain select companynr,companyname from t4 left join t2 using (companynr) where ifnull(companynr,1)>0;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 ALL NULL NULL NULL NULL 12 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
select distinct t2.companynr,t4.companynr from t2,t4 where t2.companynr=t4.companynr+1;
companynr companynr
37 36
......@@ -1437,7 +1437,7 @@ companynr companynr
explain select distinct t2.companynr,t4.companynr from t2,t4 where t2.companynr=t4.companynr+1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t4 index NULL PRIMARY 1 NULL 12 Using index; Using temporary
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 Using where; Using join buffer
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 Using where; Using join buffer
select t2.fld1,t2.companynr,fld3,period from t3,t2 where t2.fld1 = 38208 and t2.fld1=t3.t2nr and period = 1008 or t2.fld1 = 38008 and t2.fld1 =t3.t2nr and period = 1008;
fld1 companynr fld3 period
038008 37 reporters 1008
......@@ -1511,7 +1511,7 @@ count(*) min(fld4) max(fld4) sum(fld1) avg(fld1) std(fld1) variance(fld1)
70 absentee vest 17788966 254128.0857 3272.5940 10709871.3069
explain extended select count(*),min(fld4),max(fld4),sum(fld1),avg(fld1),std(fld1),variance(fld1) from t2 where companynr = 34 and fld4<>"";
id select_type table type possible_keys key key_len ref rows filtered Extra
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199 100.00 Using where
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200 100.00 Using where
Warnings:
Note 1003 select count(0) AS `count(*)`,min(`test`.`t2`.`fld4`) AS `min(fld4)`,max(`test`.`t2`.`fld4`) AS `max(fld4)`,sum(`test`.`t2`.`fld1`) AS `sum(fld1)`,avg(`test`.`t2`.`fld1`) AS `avg(fld1)`,std(`test`.`t2`.`fld1`) AS `std(fld1)`,variance(`test`.`t2`.`fld1`) AS `variance(fld1)` from `test`.`t2` where ((`test`.`t2`.`companynr` = 34) and (`test`.`t2`.`fld4` <> ''))
select companynr,count(*),min(fld4),max(fld4),sum(fld1),avg(fld1),std(fld1),variance(fld1) from t2 group by companynr limit 3;
......@@ -1955,7 +1955,7 @@ id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE NULL NULL NULL NULL NULL NULL NULL Impossible WHERE
explain select fld3 from t2 where fld1=fld1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 ALL NULL NULL NULL NULL 1199
1 SIMPLE t2 ALL NULL NULL NULL NULL 1200
select companynr,fld1 from t2 HAVING fld1=250501 or fld1=250502;
companynr fld1
34 250501
......@@ -2007,7 +2007,7 @@ count(*)
4181
explain select min(fld1),max(fld1),count(*) from t2;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 index NULL fld1 4 NULL 1199 Using index
1 SIMPLE t2 index NULL fld1 4 NULL 1200 Using index
select min(fld1),max(fld1),count(*) from t2;
min(fld1) max(fld1) count(*)
0 1232609 1199
......@@ -2093,9 +2093,9 @@ show full columns from t2 from test like 's%';
Field Type Collation Null Key Default Extra Privileges Comment
show keys from t2;
Table Non_unique Key_name Seq_in_index Column_name Collation Cardinality Sub_part Packed Null Index_type Comment
t2 0 PRIMARY 1 auto A 1199 NULL NULL BTREE
t2 0 fld1 1 fld1 A 1199 NULL NULL BTREE
t2 1 fld3 1 fld3 A 1199 NULL NULL BTREE
t2 0 PRIMARY 1 auto A 1200 NULL NULL BTREE
t2 0 fld1 1 fld1 A 1200 NULL NULL BTREE
t2 1 fld3 1 fld3 A 1200 NULL NULL BTREE
drop table t4, t3, t2, t1;
DO 1;
DO benchmark(100,1+1),1,1;
......
PBXT Release Notes
==================
+------- 1.0.11 Pre-GA - 2010-05-11
RN322: Creating a table the references a non-existing table can now only be done if you set: foreign_key_checks = 0. Also fixed a failure when creating tables with recursive foreign key declarations.
RN321: Added "Extended record count" to the CHECK TABLE output. This indicates the number of records that have a data log component.
RN320: All tests now run with MySQL 5.1.46.
------- 1.0.10n RC4 - 2010-04-28
RN319: Fix RN1/3 and RN1/4 back-ported from 1.1: Fixed a deadlock that could occur during low index cache situations and added some checks for index corruption, and added the try lock variation for R/W locks.
RN318: Fixed a bug in the atomic R/W lock. This bug occurred on multi-core Linux when under extrem load. The affect was that an index lookup could fail. The index was not corrupted.
------- 1.0.10m RC4 - 2010-03-29
RN317: This change prevents a unscheduled checkpoint from occurring when the sweeper has work to do. Checkpoint required due to the Checkpoint threshold reached are done as usual.
------- 1.0.10k RC4 - 2010-03-29
RN316: Set the maximum delay, while waiting for previous transactions to commit to 1/100s. This situation occurs when cleanup begins of a long running transaction.
RN315: Fixed a bug that could lead to a data log error, for example: Data log not found: '.../dlog-129602.xt'. This error occurred after a duplicate key error, dending on the table structure, because the row buffer was not restored after writing an extended record.
RN314: Server startup time could be very long when data logs become large because the log size was not save in the header when a data log is full.
------- 1.0.10j RC4 - 2010-03-24
RN313: Fixed an error in the calculation of the handle data record (.xtd files) size when AVG_ROW_LENGTH is set explicitly to a value less than 12. For example:
CREATE TABLE objs (
id int(10) unsigned NOT NULL,
objdata mediumblob NOT NULL,
PRIMARY KEY (id)
) ENGINE=PBXT AVG_ROW_LENGTH=10
This table definition previously lead to corruption of the table because the handle data record was set to 24 (14+10), which is less than the minimum (for variable length records) handle data record size of 26.
This minimum consists of 14 byte record header and 12 bytes reference to the extended record data (the part of the record in the data log).
Tip: when setting AVG_ROW_LENGTH you should normally add 12 to the average row length estimate to ensure that the average length part of the record is always in the handle data file. This is important, for example if you wish to make sure that the rows used to build indexes are in the handle data file. CHECK TABLE tells you how many rows are in the "fixed length part" of the record (output in MySQL error log). In the example above, this would be AVG_ROW_LENGTH=17.
The maximum size of a field can be calculated adding the maximum byte size as described here: http://dev.mysql.com/doc/refman/5.1/en/storage-requirements.html, and then add the following values, depending on the byte size:
byte size <= 240, add 1
byte size < 2^16 (65536), add 3
byte size < 2^24 (16777216), add 4
byte size > 2^24, add 5
------- 1.0.10i RC4 - 2010-03-17
RN312: Fixed bug #534361: Valgrind error: write of uninitialised bytes in xt_flush_indices()
RN311: Fixed ilog corruption when running out of disk space during an index flush operation, which lead to corruption of the index.
------- 1.0.10h RC4 - 2010-02-25
RN310: Fixed Windows atomic INC/DEC operations, which lead to atomic R/W lock not working correctly. The result was that some index entries were not foound.
RN309: Fixed a bug that caused a crash when the index was corrupted. The crash occurs if the index page in not completely written, and an item in the index has a bad length.
RN308: Fixed bug #509803: can't run tpcc (cannot compare FKs that rely on indexes of different length).
------- 1.0.10g RC4 - 2010-02-11
RN307: 2010-02-15: Set the internal version number 1.0.10g.
RN306: All tests now run with MySQL 5.1.42.
RN305: Fixed a bug that could cause a crash in filesort. The problem was that the return row estimate was incorrect, which caused the result of estimate_rows_upper_bound() to overflow to zero. Row estimate has been changed, and no longer takes into account deleted rows (so the row estimate is now a maximum).
RN304: Fixed bug #513012: On a table with a trigger the same record is updated more than once in one statement
------- 1.0.10f RC4 - 2010-01-29
RN303: Fix RN1/10 back-ported from 1.1: Fixed a bug in the record cache that caused PBXT to think it had run out of cache memory. The effect was that PBXT used less and less cache over time. The bug occurs during heavy concurrent access on the record cache. The affect is the PBXT gets slower and slower.
RN302: Fix RN1/11 back-ported from 1.1: Corrected a problem that sometimes caused a pause in activity when the record cache was full.
------- 1.0.10e RC4 - 2010-01-25
RN301: Fixed index statistics calculation. This bug lead to the wrong indices being selected by the optimizer because all indices returned the same cost.
RN300: Fixed bug #509968: START TRANSACTION WITH CONSISTENT SNAPSHOT breaks transactional flow.
RN299: Fixed bug #509218: Server asserts with Assertion `mutex->__data.__owner == 0' failed on high concurrency OLTP test.
------- 1.0.10d RC4 - 2010-01-11
RN298: Fixed a bug that caused huge amounts of transaction log to be written when pbxt_flush_log_at_trx_commit = 2.
------- 1.0.10c RC4 - 2009-12-29
RN297: Updated "LOCK TABLES ... READ LOCAL" behavior to be more restrictive and compatible with InnoDB
RN296: Fixed bug #499026: START TRANSACTION WITH CONSISTENT SNAPSHOT does not work for PBXT
------- 1.0.10 RC4 - 2009-12-18
RN295: PBXT tests now all run with MySQL 5.1.41.
RN294: Fixed bug #483714: a broken table can prevent other tables from opening
RN293: Added system variable pbxt_flush_log_at_trx_commit. The value of this variable determines whether the transaction log is written and/or flushed when a transaction is ended. A value of 0 means don't write or flush the transaction log, 1 means write and flush and 2 means write, but do not flush. No matter what the setting is choosen, the transaction log is written and flushed at least once per second.
------- 1.0.09g RC3 - 2009-12-16
RN292: Fixed a bug that resulted in 2-phase commit not being used between PBXT and the binlog. This bug was a result of a hack which as added to solve a problem in an pre-release version of MySQL 5.1. The hack was removed.
......
......@@ -287,7 +287,7 @@ result_t PBXTBackupDriver::get_data(Buffer &buf)
bd_table_no++;
try_(a) {
xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL);
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
pushr_(xt_heap_release, tab);
if (!(bd_ot = xt_db_open_table_using_tab(tab, bd_thread)))
xt_throw(self);
......@@ -403,7 +403,7 @@ result_t PBXTBackupDriver::lock()
bd_thread->st_abort_trans = FALSE;
bd_thread->st_stat_ended = FALSE;
bd_thread->st_stat_trans = FALSE;
bd_thread->st_is_update = FALSE;
bd_thread->st_is_update = NULL;
if (!xt_xn_begin(bd_thread))
return backup::ERROR;
bd_state = BUP_STATE_AFTER_LOCK;
......@@ -562,7 +562,7 @@ result_t PBXTRestoreDriver::send_data(Buffer &buf)
m_tables[rd_table_no-1].internal_name(path, sizeof(path));
try_(a) {
xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL);
tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE);
pushr_(xt_heap_release, tab);
if (!(rd_ot = xt_db_open_table_using_tab(tab, rd_thread)))
xt_throw(self);
......
......@@ -90,7 +90,7 @@
#define IDX_CAC_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, &(i)->cs_lock)
#define IDX_CAC_FREE_LOCK(s, i) xt_spinxslock_free(s, &(i)->cs_lock)
#define IDX_CAC_READ_LOCK(i, s) xt_spinxslock_slock(&(i)->cs_lock, (s)->t_id)
#define IDX_CAC_WRITE_LOCK(i, s) xt_spinxslock_xlock(&(i)->cs_lock, (s)->t_id)
#define IDX_CAC_WRITE_LOCK(i, s) xt_spinxslock_xlock(&(i)->cs_lock, FALSE, (s)->t_id)
#define IDX_CAC_UNLOCK(i, s) xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
#endif
......@@ -178,6 +178,7 @@ static DcGlobalsRec ind_cac_globals;
KEY_CACHE my_cache;
#undef pthread_rwlock_rdlock
#undef pthread_rwlock_wrlock
#undef pthread_rwlock_try_wrlock
#undef pthread_rwlock_unlock
#undef pthread_mutex_lock
#undef pthread_mutex_unlock
......@@ -410,7 +411,7 @@ xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTT
/* Because of the lock order, I have to release the
* handle before I get a lock on the cache block.
*
* But, by doing this, thie cache block may be gone!
* But, by doing this, this cache block may be gone!
*/
if (block) {
IDX_CAC_READ_LOCK(seg, thread);
......@@ -420,6 +421,11 @@ xtPublic void xt_ind_release_handle(XTIndHandlePtr handle, xtBool have_lock, XTT
/* Found the block...
* {HANDLE-COUNT-SLOCK}
* 04.05.2009, changed to slock.
* The xlock causes too much contention
* on the cache block for read only loads.
*
* Is it safe?
* See below...
*/
XT_IPAGE_READ_LOCK(&block->cb_lock);
goto block_found;
......@@ -691,6 +697,9 @@ xtPublic void xt_ind_exit(XTThreadPtr self)
}
}
/* Must be done before freeing the blocks! */
ind_handle_exit(self);
if (ind_cac_globals.cg_blocks) {
xt_free(self, ind_cac_globals.cg_blocks);
ind_cac_globals.cg_blocks = NULL;
......@@ -702,7 +711,6 @@ xtPublic void xt_ind_exit(XTThreadPtr self)
ind_cac_globals.cg_buffer = NULL;
}
#endif
ind_handle_exit(self);
memset(&ind_cac_globals, 0, sizeof(ind_cac_globals));
}
......@@ -882,7 +890,58 @@ static xtBool ind_free_block(XTOpenTablePtr ot, XTIndBlockPtr block)
while (xblock) {
if (block == xblock) {
/* Found the block... */
XT_IPAGE_WRITE_LOCK(&block->cb_lock, ot->ot_thread->t_id);
/* It is possible that a thread enters this code holding a
* lock on a page. This can cause a deadlock:
*
* #0 0x91faa2ce in semaphore_wait_signal_trap
* #1 0x91fb1da5 in pthread_mutex_lock
* #2 0x00e2ec13 in xt_p_mutex_lock at pthread_xt.cc:544
* #3 0x00e6c30a in xt_xsmutex_xlock at lock_xt.cc:1547
* #4 0x00dee402 in ind_free_block at cache_xt.cc:879
* #5 0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
* #6 0x00def8d1 in xt_ind_reserve at cache_xt.cc:1513
* #7 0x00e22118 in xt_idx_insert at index_xt.cc:2047
* #8 0x00e4d7ee in xt_tab_new_record at table_xt.cc:4702
* #9 0x00e0ff0b in ha_pbxt::write_row at ha_pbxt.cc:2340
* #10 0x0023a00f in handler::ha_write_row at handler.cc:4570
* #11 0x001a32c8 in write_record at sql_insert.cc:1568
* #12 0x001ab635 in mysql_insert at sql_insert.cc:812
* #13 0x0010e068 in mysql_execute_command at sql_parse.cc:3066
* #14 0x0011480d in mysql_parse at sql_parse.cc:5787
* #15 0x00115afb in dispatch_command at sql_parse.cc:1200
* #16 0x00116de2 in do_command at sql_parse.cc:857
* #17 0x00101ee4 in handle_one_connection at sql_connect.cc:1115
* #18 0x91fdb155 in _pthread_start
* #19 0x91fdb012 in thread_start
*
* #0 0x91fb146e in __semwait_signal
* #1 0x91fb12ef in nanosleep$UNIX2003
* #2 0x91fb1236 in usleep$UNIX2003
* #3 0x00e52112 in xt_yield at thread_xt.cc:1274
* #4 0x00e6c0eb in xt_spinxslock_xlock at lock_xt.cc:1456
* #5 0x00dee444 in ind_free_block at cache_xt.cc:886
* #6 0x00dee76a in ind_cac_free_lru_blocks at cache_xt.cc:1033
* #7 0x00deeaf0 in ind_cac_fetch at cache_xt.cc:1130
* #8 0x00def604 in xt_ind_fetch at cache_xt.cc:1386
* #9 0x00e2159a in xt_idx_update_row_id at index_xt.cc:2489
* #10 0x00e603c8 in xn_sw_clean_indices at xaction_xt.cc:1932
* #11 0x00e606d4 in xn_sw_cleanup_variation at xaction_xt.cc:2056
* #12 0x00e60e29 in xn_sw_cleanup_xact at xaction_xt.cc:2276
* #13 0x00e615ed in xn_sw_main at xaction_xt.cc:2433
* #14 0x00e61919 in xn_sw_run_thread at xaction_xt.cc:2564
* #15 0x00e53f80 in thr_main at thread_xt.cc:1017
* #16 0x91fdb155 in _pthread_start
* #17 0x91fdb012 in thread_start
*
* So we back off if a lock is held!
*/
if (!XT_IPAGE_WRITE_TRY_LOCK(&block->cb_lock, ot->ot_thread->t_id)) {
IDX_CAC_UNLOCK(seg, ot->ot_thread);
#ifdef DEBUG_CHECK_IND_CACHE
xt_ind_check_cache(NULL);
#endif
return FALSE;
}
if (block->cb_state != IDX_CAC_BLOCK_CLEAN) {
/* This block cannot be freeed: */
XT_IPAGE_UNLOCK(&block->cb_lock, TRUE);
......@@ -1376,6 +1435,7 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad
register XTIndBlockPtr block;
DcSegmentPtr seg;
xtWord2 branch_size;
u_int rec_size;
xtBool xlock = FALSE;
#ifdef DEBUG
......@@ -1386,10 +1446,24 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad
return FAILED;
branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE) {
IDX_CAC_UNLOCK(seg, ot->ot_thread);
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
return FAILED;
rec_size = XT_GET_INDEX_BLOCK_LEN(branch_size);
if (rec_size < 2 || rec_size > XT_INDEX_PAGE_SIZE)
goto failed_corrupt;
if (ind->mi_fix_key) {
rec_size -= 2;
if (XT_IS_NODE(branch_size)) {
if (rec_size != 0) {
if (rec_size < XT_NODE_REF_SIZE)
goto failed_corrupt;
rec_size -= XT_NODE_REF_SIZE;
if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE + XT_NODE_REF_SIZE)) != 0)
goto failed_corrupt;
}
}
else {
if ((rec_size % (ind->mi_key_size + XT_RECORD_REF_SIZE)) != 0)
goto failed_corrupt;
}
}
switch (ltype) {
......@@ -1450,6 +1524,11 @@ xtPublic xtBool xt_ind_fetch(XTOpenTablePtr ot, XTIndexPtr ind, xtIndexNodeID ad
iref->ir_block = block;
iref->ir_branch = (XTIdxBranchDPtr) block->cb_data;
return OK;
failed_corrupt:
IDX_CAC_UNLOCK(seg, ot->ot_thread);
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
return FAILED;
}
xtPublic xtBool xt_ind_release(XTOpenTablePtr ot, XTIndexPtr ind, XTPageUnlockType XT_NDEBUG_UNUSED(utype), XTIndReferencePtr iref)
......
......@@ -58,7 +58,8 @@ struct XTIdxReadBuffer;
#define XT_IPAGE_INIT_LOCK(s, i) xt_atomicrwlock_init_with_autoname(s, i)
#define XT_IPAGE_FREE_LOCK(s, i) xt_atomicrwlock_free(s, i)
#define XT_IPAGE_READ_LOCK(i) xt_atomicrwlock_slock(i)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_atomicrwlock_xlock(i, o)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_atomicrwlock_xlock(i, FALSE, o)
#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_atomicrwlock_xlock(i, TRUE, o)
#define XT_IPAGE_UNLOCK(i, x) xt_atomicrwlock_unlock(i, x)
#elif defined(XT_IPAGE_USE_PTHREAD_RW)
#define XT_IPAGE_LOCK_TYPE xt_rwlock_type
......@@ -66,20 +67,23 @@ struct XTIdxReadBuffer;
#define XT_IPAGE_FREE_LOCK(s, i) xt_free_rwlock(i)
#define XT_IPAGE_READ_LOCK(i) xt_slock_rwlock_ns(i)
#define XT_IPAGE_WRITE_LOCK(i, s) xt_xlock_rwlock_ns(i)
#define XT_IPAGE_WRITE_TRY_LOCK(i, s) xt_xlock_try_rwlock_ns(i)
#define XT_IPAGE_UNLOCK(i, x) xt_unlock_rwlock_ns(i)
#elif defined(XT_IPAGE_USE_SPINXSLOCK)
#define XT_IPAGE_LOCK_TYPE XTSpinXSLockRec
#define XT_IPAGE_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i)
#define XT_IPAGE_FREE_LOCK(s, i) xt_spinxslock_free(s, i)
#define XT_IPAGE_READ_LOCK(i) xt_spinxslock_slock(i)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, o)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, FALSE, o)
#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_spinxslock_xlock(i, TRUE, o)
#define XT_IPAGE_UNLOCK(i, x) xt_spinxslock_unlock(i, x)
#else // XT_IPAGE_USE_SKEW_RW
#define XT_IPAGE_LOCK_TYPE XTSkewRWLockRec
#define XT_IPAGE_INIT_LOCK(s, i) xt_skewrwlock_init_with_autoname(s, i)
#define XT_IPAGE_FREE_LOCK(s, i) xt_skewrwlock_free(s, i)
#define XT_IPAGE_READ_LOCK(i) xt_skewrwlock_slock(i)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_skewrwlock_xlock(i, o)
#define XT_IPAGE_WRITE_LOCK(i, o) xt_skewrwlock_xlock(i, FALSE, o)
#define XT_IPAGE_WRITE_TRY_LOCK(i, o) xt_skewrwlock_xlock(i, TRUE, o)
#define XT_IPAGE_UNLOCK(i, x) xt_skewrwlock_unlock(i, x)
#endif
......@@ -103,10 +107,10 @@ typedef struct XTIndBlock {
struct XTIndBlock *cb_lr_used; /* Less recently used blocks. */
/* Protected by cb_lock: */
XT_IPAGE_LOCK_TYPE cb_lock;
xtWord1 cb_state; /* Block status. */
xtWord4 cp_flush_seq;
xtWord2 cb_handle_count; /* TRUE if this page is referenced by a handle. */
xtWord2 cp_flush_seq;
xtWord2 cp_del_count; /* Number of deleted entries. */
xtWord1 cb_state; /* Block status. */
#ifdef XT_USE_DIRECT_IO_ON_INDEX
xtWord1 *cb_data;
#else
......
......@@ -68,6 +68,7 @@ xtPublic int xt_db_log_file_count;
xtPublic int xt_db_auto_increment_mode; /* 0 = MySQL compatible, 1 = PrimeBase Compatible. */
xtPublic int xt_db_offline_log_function; /* 0 = recycle logs, 1 = delete logs, 2 = keep logs */
xtPublic int xt_db_sweeper_priority; /* 0 = low (default), 1 = normal, 2 = high */
xtPublic int xt_db_flush_log_at_trx_commit; /* 0 = no-write/no-flush, 1 = yes, 2 = write/no-flush */
xtPublic XTSortedListPtr xt_db_open_db_by_id = NULL;
xtPublic XTHashTabPtr xt_db_open_databases = NULL;
......@@ -288,6 +289,7 @@ xtPublic void xt_stop_database_threads(XTThreadPtr self, xtBool sync)
/* Wait for the checkpointer: */
xt_wait_for_checkpointer(self, db);
}
xt_stop_flusher(self, db);
xt_stop_checkpointer(self, db);
xt_stop_writer(self, db);
xt_stop_sweeper(self, db);
......@@ -317,6 +319,7 @@ static void db_finalize(XTThreadPtr self, void *x)
{
XTDatabaseHPtr db = (XTDatabaseHPtr) x;
xt_stop_flusher(self, db);
xt_stop_checkpointer(self, db);
xt_stop_compactor(self, db);
xt_stop_sweeper(self, db);
......@@ -475,6 +478,8 @@ xtPublic XTDatabaseHPtr xt_get_database(XTThreadPtr self, char *path, xtBool mul
xt_start_compactor(self, db);
xt_start_writer(self, db);
xt_start_checkpointer(self, db);
if (xt_db_flush_log_at_trx_commit == 0 || xt_db_flush_log_at_trx_commit == 2)
xt_start_flusher(self, db);
popr_();
xt_ht_put(self, xt_db_open_databases, db);
......@@ -574,6 +579,7 @@ xtPublic void xt_drop_database(XTThreadPtr self, XTDatabaseHPtr db)
pushr_(xt_ht_unlock, xt_db_open_databases);
/* Shutdown the database daemons: */
xt_stop_flusher(self, db);
xt_stop_checkpointer(self, db);
xt_stop_sweeper(self, db);
xt_stop_compactor(self, db);
......@@ -902,7 +908,7 @@ xtPublic XTOpenTablePoolPtr xt_db_lock_table_pool_by_name(XTThreadPtr self, XTDa
XTTableHPtr tab;
xtTableID tab_id;
pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok, NULL));
pushsr_(tab, xt_heap_release, xt_use_table(self, tab_name, no_load, missing_ok));
if (!tab) {
freer_(); // xt_heap_release(tab)
return NULL;
......
......@@ -60,6 +60,7 @@ extern int xt_db_log_file_count;
extern int xt_db_auto_increment_mode;
extern int xt_db_offline_log_function;
extern int xt_db_sweeper_priority;
extern int xt_db_flush_log_at_trx_commit;
extern XTSortedListPtr xt_db_open_db_by_id;
extern XTHashTabPtr xt_db_open_databases;
......@@ -187,6 +188,10 @@ typedef struct XTDatabase : public XTHeap {
xt_mutex_type db_cp_lock;
xt_cond_type db_cp_cond; /* Writer condition when idle (must bw woken by log flush! */
XTCheckPointStateRec db_cp_state; /* The checkpoint state. */
/* The "flusher" thread (used when pbxt_flush_log_at_trx_commit = 0 or 2) */
struct XTThread *db_fl_thread; /* The flusher thread (flushes the transation log). */
xt_mutex_type db_fl_lock;
} XTDatabaseRec, *XTDatabaseHPtr; /* Heap pointer */
#define XT_FOR_USER 0
......
This diff is collapsed.
......@@ -45,7 +45,6 @@ struct XTIndex;
#define XT_DD_KEY_PRIMARY 2
#define XT_DD_KEY_FOREIGN 3
#define XT_KEY_ACTION_DEFAULT 0
#define XT_KEY_ACTION_RESTRICT 1
#define XT_KEY_ACTION_CASCADE 2
#define XT_KEY_ACTION_SET_NULL 3
......@@ -259,7 +258,7 @@ class XTDDTable : public XTObject {
XTList<XTDDColumn> dt_cols;
XTList<XTDDIndex> dt_indexes;
xt_rwlock_type dt_ref_lock; /* The lock for adding and using references. */
XTRecurRWLockRec dt_ref_lock; /* The lock for adding and using references. */
XTList<XTDDForeignKey> dt_fkeys; /* The foreign keys on this table. */
XTDDTableRef *dt_trefs; /* A list of tables that reference this table. */
......
......@@ -1148,6 +1148,11 @@ void XTDataLogBuffer::dlb_exit(XTThreadPtr self)
xtBool XTDataLogBuffer::dlb_close_log(XTThreadPtr thread)
{
if (dlb_data_log) {
if (dlb_data_log->dlf_log_file) {
if (!dl_write_log_header(dlb_data_log, dlb_data_log->dlf_log_file, 0, thread))
return FAILED;
}
/* Flush and commit the data in the old log: */
if (!dlb_flush_log(TRUE, thread))
return FAILED;
......@@ -1952,7 +1957,7 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF
log_rec.xl_status_1 = XT_LOG_ENT_DEL_LOG;
log_rec.xl_checksum_1 = XT_CHECKSUM_1(data_log->dlf_log_id);
XT_SET_DISK_4(log_rec.xl_log_id_4, data_log->dlf_log_id);
if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, TRUE)) {
if (!xt_xlog_log_data(self, sizeof(XTXactNewLogEntryDRec), (XTXactLogBufferDPtr) &log_rec, XT_XLOG_WRITE_AND_FLUSH)) {
db->db_datalogs.dls_set_log_state(data_log, XT_DL_TO_COMPACT);
xt_throw(self);
}
......
......@@ -119,7 +119,7 @@ xtBool xt_fs_rename(struct XTThread *self, char *from_path, char *to_path);
#define FILE_MAP_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i)
#define FILE_MAP_FREE_LOCK(s, i) xt_spinxslock_free(s, i)
#define FILE_MAP_READ_LOCK(i, o) xt_spinxslock_slock(i, o)
#define FILE_MAP_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, o)
#define FILE_MAP_WRITE_LOCK(i, o) xt_spinxslock_xlock(i, FALSE, o)
#define FILE_MAP_UNLOCK(i, o) xt_spinxslock_unlock(i, o)
#endif
......
This diff is collapsed.
......@@ -1328,10 +1328,12 @@ static void idx_insert_node_item(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSE
XT_SET_DISK_2(leaf->tb_size_2, XT_MAKE_NODE_SIZE(result->sr_item.i_total_size));
}
static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result)
static xtBool idx_get_middle_branch_item(XTOpenTablePtr ot, XTIndexPtr ind, XTIdxBranchDPtr branch, XTIdxKeyValuePtr value, XTIdxResultPtr result)
{
xtWord1 *bitem;
ASSERT_NS(result->sr_item.i_node_ref_size == 0 || result->sr_item.i_node_ref_size == XT_NODE_REF_SIZE);
ASSERT_NS((int) result->sr_item.i_total_size >= 0 && result->sr_item.i_total_size <= XT_INDEX_PAGE_SIZE*2);
if (ind->mi_fix_key) {
u_int full_item_size = result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
......@@ -1346,18 +1348,25 @@ static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, X
}
else {
u_int node_ref_size;
u_int ilen;
u_int ilen, tlen;
xtWord1 *bend;
node_ref_size = result->sr_item.i_node_ref_size;
bitem = branch->tb_data + node_ref_size;;
bitem = branch->tb_data + node_ref_size;
bend = &branch->tb_data[(result->sr_item.i_total_size - node_ref_size) / 2 + node_ref_size];
ilen = 0;
if (bitem < bend) {
tlen = 0;
for (;;) {
ilen = myxt_get_key_length(ind, bitem);
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend)
tlen += ilen + XT_RECORD_REF_SIZE + node_ref_size;
if (bitem + ilen + XT_RECORD_REF_SIZE + node_ref_size >= bend) {
if (ilen > XT_INDEX_PAGE_SIZE || tlen > result->sr_item.i_total_size) {
xt_register_taberr(XT_REG_CONTEXT, XT_ERR_INDEX_CORRUPTED, ot->ot_table->tab_name);
return FAILED;
}
break;
}
bitem += ilen + XT_RECORD_REF_SIZE + node_ref_size;
}
}
......@@ -1370,6 +1379,7 @@ static void idx_get_middle_branch_item(XTIndexPtr ind, XTIdxBranchDPtr branch, X
xt_get_record_ref(bitem + ilen, &value->sv_rec_id, &value->sv_row_id);
memcpy(value->sv_key, bitem, value->sv_length);
}
return OK;
}
static size_t idx_write_branch_item(XTIndexPtr XT_UNUSED(ind), xtWord1 *item, XTIdxKeyValuePtr value)
......@@ -1438,7 +1448,8 @@ static xtBool idx_replace_node_key(XTOpenTablePtr ot, XTIndexPtr ind, IdxStackIt
/* We assume that value can be overwritten (which is the case) */
key_value.sv_flags = XT_SEARCH_WHOLE_KEY;
key_value.sv_key = key_buf;
idx_get_middle_branch_item(ind, iref.ir_branch, &key_value, &result);
if (!idx_get_middle_branch_item(ot, ind, iref.ir_branch, &key_value, &result))
goto failed_1;
if (!idx_new_branch(ot, ind, &new_branch))
goto failed_1;
......@@ -1567,7 +1578,8 @@ static xtBool idx_insert_node(XTOpenTablePtr ot, XTIndexPtr ind, IdxBranchStackP
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE);
/* We assume that value can be overwritten (which is the case) */
idx_get_middle_branch_item(ind, &ot->ot_ind_wbuf, key_value, &result);
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, key_value, &result))
goto failed_1;
if (!idx_new_branch(ot, ind, &new_branch))
goto failed_1;
......@@ -2041,7 +2053,7 @@ xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id,
memcpy(&ot->ot_ind_wbuf, iref.ir_branch, offsetof(XTIdxBranchDRec, tb_data) + result.sr_item.i_total_size);
idx_insert_leaf_item(ind, &ot->ot_ind_wbuf, &key_value, &result);
IDX_TRACE("%d-> %x\n", (int) XT_NODE_ID(current), (int) XT_GET_DISK_2(ot->ot_ind_wbuf.tb_size_2));
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE);
ASSERT_NS(result.sr_item.i_total_size > XT_INDEX_PAGE_DATA_SIZE && result.sr_item.i_total_size <= XT_INDEX_PAGE_DATA_SIZE*2);
/* This is the number of potential writes. In other words, the total number
* of blocks that may be accessed.
......@@ -2053,7 +2065,8 @@ xtPublic xtBool xt_idx_insert(XTOpenTablePtr ot, XTIndexPtr ind, xtRowID row_id,
goto failed_1;
/* Key does not fit, must split... */
idx_get_middle_branch_item(ind, &ot->ot_ind_wbuf, &key_value, &result);
if (!idx_get_middle_branch_item(ot, ind, &ot->ot_ind_wbuf, &key_value, &result))
goto failed_1;
if (!idx_new_branch(ot, ind, &new_branch))
goto failed_1;
......@@ -3317,7 +3330,7 @@ xtPublic xtBool xt_idx_match_search(register XTOpenTablePtr XT_UNUSED(ot), regis
return FALSE;
}
static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTIndexPtr ind)
static void idx_set_index_selectivity(XTOpenTablePtr ot, XTIndexPtr ind, XTThreadPtr thread)
{
static const xtRecordID MAX_RECORDS = 100;
......@@ -3368,7 +3381,7 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd
last_rec = ot->ot_curr_rec_id;
key_len = ot->ot_ind_state.i_item_size - XT_RECORD_REF_SIZE;
xt_ind_unlock_handle(ot->ot_ind_rhandle);
xt_ind_lock_handle(ot->ot_ind_rhandle);
memcpy(key_buf, ot->ot_ind_rhandle->ih_branch->tb_data + ot->ot_ind_state.i_item_offset, key_len);
xt_ind_unlock_handle(ot->ot_ind_rhandle);
}
......@@ -3410,7 +3423,7 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd
last_iter_rec = last_rec;
if (ot->ot_ind_rhandle) {
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, self);
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
ot->ot_ind_rhandle = NULL;
}
}
......@@ -3431,8 +3444,10 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd
return;
failed_1:
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, self);
ot->ot_ind_rhandle = NULL;
if (ot->ot_ind_rhandle) {
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, thread);
ot->ot_ind_rhandle = NULL;
}
failed:
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
......@@ -3440,16 +3455,23 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd
return;
}
xtPublic void xt_ind_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot)
xtPublic void xt_ind_set_index_selectivity(XTOpenTablePtr ot, XTThreadPtr thread)
{
XTTableHPtr tab = ot->ot_table;
XTIndexPtr *ind;
u_int i;
if (!tab->tab_dic.dic_disable_index) {
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++)
idx_set_index_selectivity(self, ot, *ind);
time_t now;
now = time(NULL);
xt_lock_mutex_ns(&tab->tab_ind_stat_lock);
if (tab->tab_ind_stat_calc_time < now) {
if (!tab->tab_dic.dic_disable_index) {
for (i=0, ind=tab->tab_dic.dic_keys; i<tab->tab_dic.dic_key_count; i++, ind++)
idx_set_index_selectivity(ot, *ind, thread);
}
tab->tab_ind_stat_calc_time = time(NULL);
}
xt_unlock_mutex_ns(&tab->tab_ind_stat_lock);
}
/*
......@@ -3740,7 +3762,8 @@ xtPublic void xt_ind_count_deleted_items(XTTableHPtr tab, XTIndexPtr ind, XTIndB
static xtBool idx_flush_dirty_list(XTIndexLogPtr il, XTOpenTablePtr ot, u_int *flush_count, XTIndBlockPtr *flush_list)
{
for (u_int i=0; i<*flush_count; i++)
il->il_write_block(ot, flush_list[i]);
if (!il->il_write_block(ot, flush_list[i]))
return FAILED;
*flush_count = 0;
return OK;
}
......@@ -3793,7 +3816,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool
xtIndexNodeID ind_free;
xtBool something_to_free = FALSE;
xtIndexNodeID last_address, next_address;
xtWord2 curr_flush_seq;
xtWord4 curr_flush_seq;
XTIndFreeListPtr list_ptr;
u_int dirty_blocks;
XTCheckPointTablePtr cp_tab;
......@@ -3810,8 +3833,9 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool
if (!tab->tab_db->db_indlogs.ilp_get_log(&il, ot->ot_thread))
goto failed_3;
il->il_reset(tab->tab_id);
if (!il->il_write_byte(ot, XT_DT_FREE_LIST))
if (!il->il_reset(ot))
goto failed_2;
if (!il->il_write_byte(ot, XT_DT_LOG_HEAD))
goto failed_2;
if (!il->il_write_word4(ot, tab->tab_id))
goto failed_2;
......@@ -3849,7 +3873,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool
wrote_something = TRUE;
while (block) {
ASSERT_NS(block->cb_state == IDX_CAC_BLOCK_DIRTY);
ASSERT_NS(block->cp_flush_seq == curr_flush_seq);
ASSERT_NS((block->cp_flush_seq == curr_flush_seq) || xt_xn_is_before(block->cp_flush_seq, curr_flush_seq));
if (!ind_add_to_dirty_list(il, ot, &flush_count, flush_list, block))
goto failed;
block = block->cb_dirty_next;
......@@ -4023,7 +4047,7 @@ xtPublic xtBool xt_flush_indices(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool
fblock = block;
block = block->cb_dirty_next;
ASSERT_NS(fblock->cb_state == IDX_CAC_BLOCK_DIRTY);
if (fblock->cp_flush_seq == curr_flush_seq) {
if (fblock->cp_flush_seq == curr_flush_seq || xt_xn_is_before(fblock->cp_flush_seq, curr_flush_seq)) {
/* Take the block off the dirty list: */
if (fblock->cb_dirty_next)
fblock->cb_dirty_next->cb_dirty_prev = fblock->cb_dirty_prev;
......@@ -4254,12 +4278,32 @@ void XTIndexLogPool::ilp_release_log(XTIndexLogPtr il)
xt_unlock_mutex_ns(&ilp_lock);
}
void XTIndexLog::il_reset(xtTableID tab_id)
xtBool XTIndexLog::il_reset(XTOpenTable *ot)
{
XTIndLogHeadDRec log_head;
xtTableID tab_id = ot->ot_table->tab_id;
il_tab_id = tab_id;
il_log_eof = 0;
il_buffer_len = 0;
il_buffer_offset = 0;
/* We must write the header and flush here or the "previous" status (from the
* last flush run) could remain. Failure to write the file completely leave the
* old header in place, and other parts of the file changed.
* This would lead to index corruption.
*/
log_head.ilh_data_type = XT_DT_LOG_HEAD;
XT_SET_DISK_4(log_head.ilh_tab_id_4, tab_id);
XT_SET_DISK_4(log_head.ilh_log_eof_4, 0);
if (!xt_pwrite_file(il_of, 0, sizeof(XTIndLogHeadDRec), (xtWord1 *) &log_head, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
return FAILED;
if (!xt_flush_file(il_of, &ot->ot_thread->st_statistics.st_ilog, ot->ot_thread))
return FAILED;
return OK;
}
void XTIndexLog::il_close(xtBool delete_it)
......
......@@ -30,6 +30,7 @@
#include <mysql_version.h>
#include <my_bitmap.h>
#endif
#include <time.h>
#include "thread_xt.h"
#include "linklist_xt.h"
......@@ -293,7 +294,6 @@ typedef struct XTIndFreeList {
*/
typedef struct XTIndex {
u_int mi_index_no; /* The index number (used by MySQL). */
xt_mutex_type mi_flush_lock; /* Lock the index during flushing. */
/* Protected by the mi_rwlock lock: */
XT_INDEX_LOCK_TYPE mi_rwlock; /* This lock protects the structure of the index.
......@@ -407,7 +407,7 @@ typedef struct XTIndexLog {
off_t il_buffer_offset;
void il_reset(xtTableID tab_id);
xtBool il_reset(XTOpenTable *ot);
void il_close(xtBool delete_it);
void il_release();
......@@ -478,7 +478,7 @@ xtBool xt_idx_search_prev(struct XTOpenTable *ot, struct XTIndex *ind, register
xtBool xt_idx_next(register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key);
xtBool xt_idx_prev(register struct XTOpenTable *ot, register struct XTIndex *ind, register XTIdxSearchKeyPtr search_key);
xtBool xt_idx_read(struct XTOpenTable *ot, struct XTIndex *ind, xtWord1 *rec_buf);
void xt_ind_set_index_selectivity(XTThreadPtr self, struct XTOpenTable *ot);
void xt_ind_set_index_selectivity(struct XTOpenTable *ot, XTThreadPtr thread);
void xt_check_indices(struct XTOpenTable *ot);
void xt_load_indices(XTThreadPtr self, struct XTOpenTable *ot);
void xt_ind_count_deleted_items(struct XTTable *ot, struct XTIndex *ind, struct XTIndBlock *block);
......
......@@ -1444,7 +1444,7 @@ xtPublic void xt_spinxslock_free(struct XTThread *XT_UNUSED(self), XTSpinXSLockP
#endif
}
xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UNUSED(thd_id))
xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thd_id))
{
register xtWord2 set;
......@@ -1453,6 +1453,8 @@ xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UN
set = xt_atomic_tas2(&sxs->sxs_xlocked, 1);
if (!set)
break;
if (try_lock)
return FALSE;
xt_yield();
}
......@@ -1460,9 +1462,25 @@ xtPublic xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID XT_NDEBUG_UN
sxs->sxs_locker = thd_id;
#endif
/* Wait for all the reader to wait! */
while (sxs->sxs_wait_count < sxs->sxs_rlock_count)
xt_yield();
/* Wait for all the readers to wait! */
while (sxs->sxs_wait_count < sxs->sxs_rlock_count) {
sxs->sxs_xwaiter = 1;
xt_yield(); //*
/* This should not be required, because there is only one thread
* accessing this value. However, the lock fails if this
* is not done with an atomic op.
*
* This is because threads on other processors have the
* value in processor cache. So they do not
* notice that the value has been set to zero.
* They think it is still 1 and march through
* the barrier (sxs->sxs_xwaiter < sxs->sxs_xlocked) below.
*
* In the meantime, this X locker has gone on thinking
* all is OK.
*/
xt_atomic_tas2(&sxs->sxs_xwaiter, 0);
}
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_add_owner(&sxs->sxs_lock_info);
......@@ -1474,12 +1492,12 @@ xtPublic xtBool xt_spinxslock_slock(XTSpinXSLockPtr sxs)
{
xt_atomic_inc2(&sxs->sxs_rlock_count);
/* Check if there could be an X locker: */
if (sxs->sxs_xlocked) {
/* I am waiting... */
/* Wait as long as the locker is not waiting: */
while (sxs->sxs_xwaiter < sxs->sxs_xlocked) {
xt_atomic_inc2(&sxs->sxs_wait_count);
while (sxs->sxs_xlocked)
while (sxs->sxs_xwaiter < sxs->sxs_xlocked) {
xt_yield();
}
xt_atomic_dec2(&sxs->sxs_wait_count);
}
......@@ -1493,12 +1511,17 @@ xtPublic xtBool xt_spinxslock_unlock(XTSpinXSLockPtr sxs, xtBool xlocked)
{
if (xlocked) {
#ifdef DEBUG
ASSERT_NS(sxs->sxs_locker && sxs->sxs_xlocked);
sxs->sxs_locker = 0;
#endif
sxs->sxs_xlocked = 0;
}
else
else {
#ifdef DEBUG
ASSERT_NS(sxs->sxs_rlock_count > 0);
#endif
xt_atomic_dec2(&sxs->sxs_rlock_count);
}
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_release_owner(&sxs->sxs_lock_info);
......@@ -1698,7 +1721,7 @@ xtPublic void xt_atomicrwlock_free(struct XTThread *, XTAtomicRWLockPtr XT_UNUSE
#endif
}
xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBUG_UNUSED(thr_id))
xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id))
{
register xtWord2 set;
......@@ -1707,6 +1730,8 @@ xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBU
set = xt_atomic_tas2(&arw->arw_xlock_set, 1);
if (!set)
break;
if (try_lock)
return FALSE;
xt_yield();
}
......@@ -1721,7 +1746,7 @@ xtPublic xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr arw, xtThreadID XT_NDEBU
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_add_owner(&arw->arw_lock_info);
#endif
return OK;
return TRUE;
}
xtPublic xtBool xt_atomicrwlock_slock(XTAtomicRWLockPtr arw)
......@@ -1799,7 +1824,7 @@ xtPublic void xt_skewrwlock_free(struct XTThread *, XTSkewRWLockPtr XT_UNUSED(sr
#endif
}
xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UNUSED(thr_id))
xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtBool try_lock, xtThreadID XT_NDEBUG_UNUSED(thr_id))
{
register xtWord2 set;
......@@ -1808,6 +1833,8 @@ xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UN
set = xt_atomic_tas2(&srw->srw_xlock_set, 1);
if (!set)
break;
if (try_lock)
return FALSE;
xt_yield();
}
......@@ -1822,7 +1849,7 @@ xtPublic xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr srw, xtThreadID XT_NDEBUG_UN
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_add_owner(&srw->srw_lock_info);
#endif
return OK;
return TRUE;
}
xtPublic xtBool xt_skewrwlock_slock(XTSkewRWLockPtr srw)
......@@ -1867,6 +1894,124 @@ xtPublic xtBool xt_skewrwlock_unlock(XTSkewRWLockPtr srw, xtBool xlocked)
return OK;
}
/*
* -----------------------------------------------------------------------
* RECURSIVE R/W LOCK (allows X lockers to lock again)
*/
#ifdef XT_THREAD_LOCK_INFO
void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm, const char *name)
{
rm->rm_locker = NULL;
rm->rm_lock_count = 0;
xt_init_mutex(self, &rm->rm_mutex, name);
}
#else
xtPublic void xt_recursivemutex_init(XTThreadPtr self, XTRecursiveMutexPtr rm)
{
rm->rm_locker = NULL;
rm->rm_lock_count = 0;
xt_init_mutex(self, &rm->rm_mutex);
}
#endif
xtPublic void xt_recursivemutex_free(XTRecursiveMutexPtr rm)
{
xt_free_mutex(&rm->rm_mutex);
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_free(&rm->rm_lock_info);
#endif
}
xtPublic void xt_recursivemutex_lock(XTThreadPtr self, XTRecursiveMutexPtr rm)
{
if (self != rm->rm_locker) {
xt_lock_mutex(self, &rm->rm_mutex);
rm->rm_locker = self;
}
rm->rm_lock_count++;
}
xtPublic void xt_recursivemutex_unlock(XTThreadPtr self, XTRecursiveMutexPtr rm)
{
ASSERT(self == rm->rm_locker);
ASSERT(rm->rm_lock_count > 0);
rm->rm_lock_count--;
if (!rm->rm_lock_count) {
rm->rm_locker = NULL;
xt_unlock_mutex(self, &rm->rm_mutex);
}
}
/*
* -----------------------------------------------------------------------
* RECURSIVE MUTEX (allows lockers to lock again)
*/
#ifdef XT_THREAD_LOCK_INFO
void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw, const char *name)
{
rrw->rrw_locker = NULL;
rrw->rrw_lock_count = 0;
xt_init_rwlock(self, &rrw->rrw_lock, name);
}
#else
void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw)
{
rrw->rrw_locker = NULL;
rrw->rrw_lock_count = 0;
xt_init_rwlock(self, &rrw->rrw_lock);
}
#endif
void xt_recurrwlock_free(XTRecurRWLockPtr rrw)
{
xt_free_rwlock(&rrw->rrw_lock);
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_free(&rrw->rrw_lock_info);
#endif
}
void xt_recurrwlock_xlock(struct XTThread *self, XTRecurRWLockPtr rrw)
{
if (self != rrw->rrw_locker) {
xt_xlock_rwlock(self, &rrw->rrw_lock);
rrw->rrw_locker = self;
}
rrw->rrw_lock_count++;
}
void xt_recurrwlock_slock(struct XTThread *self, XTRecurRWLockPtr rrw)
{
xt_slock_rwlock(self, &rrw->rrw_lock);
}
void xt_recurrwlock_slock_ns(XTRecurRWLockPtr rrw)
{
xt_slock_rwlock_ns(&rrw->rrw_lock);
}
void xt_recurrwlock_unxlock(struct XTThread *self, XTRecurRWLockPtr rrw)
{
ASSERT(self == rrw->rrw_locker);
ASSERT(rrw->rrw_lock_count > 0);
rrw->rrw_lock_count--;
if (!rrw->rrw_lock_count) {
rrw->rrw_locker = NULL;
xt_unlock_rwlock(self, &rrw->rrw_lock);
}
}
void xt_recurrwlock_unslock(struct XTThread *self, XTRecurRWLockPtr rrw)
{
xt_unlock_rwlock(self, &rrw->rrw_lock);
}
void xt_recurrwlock_unslock_ns(XTRecurRWLockPtr rrw)
{
xt_unlock_rwlock_ns(&rrw->rrw_lock);
}
/*
* -----------------------------------------------------------------------
* UNIT TESTS
......@@ -2031,7 +2176,7 @@ static void *lck_run_writer(XTThreadPtr self)
xt_rwmutex_unlock(&data->xs_lock, self->t_id);
}
else if (data->xs_which_lock == LOCK_SPINXSLOCK) {
xt_spinxslock_xlock(&data->xs_spinrwlock, self->t_id);
xt_spinxslock_xlock(&data->xs_spinrwlock, FALSE, self->t_id);
lck_do_job(self, data->xs_which_job, data, FALSE);
xt_spinxslock_unlock(&data->xs_spinrwlock, TRUE);
}
......@@ -2041,12 +2186,12 @@ static void *lck_run_writer(XTThreadPtr self)
xt_xsmutex_unlock(&data->xs_fastrwlock, self->t_id);
}
else if (data->xs_which_lock == LOCK_ATOMICRWLOCK) {
xt_atomicrwlock_xlock(&data->xs_atomicrwlock, self->t_id);
xt_atomicrwlock_xlock(&data->xs_atomicrwlock, FALSE, self->t_id);
lck_do_job(self, data->xs_which_job, data, FALSE);
xt_atomicrwlock_unlock(&data->xs_atomicrwlock, TRUE);
}
else if (data->xs_which_lock == LOCK_SKEWRWLOCK) {
xt_skewrwlock_xlock(&data->xs_skewrwlock, self->t_id);
xt_skewrwlock_xlock(&data->xs_skewrwlock, FALSE, self->t_id);
lck_do_job(self, data->xs_which_job, data, FALSE);
xt_skewrwlock_unlock(&data->xs_skewrwlock, TRUE);
}
......
......@@ -109,7 +109,8 @@ inline xtWord1 xt_atomic_dec1(volatile xtWord1 *mptr)
inline void xt_atomic_inc2(volatile xtWord2 *mptr)
{
#ifdef XT_ATOMIC_WIN32_X86
__asm LOCK INC WORD PTR mptr
__asm MOV ECX, mptr
__asm LOCK INC WORD PTR [ECX]
#elif defined(XT_ATOMIC_GNUC_X86)
asm volatile ("lock; incw %0" : : "m" (*mptr) : "memory");
#elif defined(XT_ATOMIC_GCC_OPS)
......@@ -125,7 +126,8 @@ inline void xt_atomic_inc2(volatile xtWord2 *mptr)
inline void xt_atomic_dec2(volatile xtWord2 *mptr)
{
#ifdef XT_ATOMIC_WIN32_X86
__asm LOCK DEC WORD PTR mptr
__asm MOV ECX, mptr
__asm LOCK DEC WORD PTR [ECX]
#elif defined(XT_ATOMIC_GNUC_X86)
asm volatile ("lock; decw %0" : : "m" (*mptr) : "memory");
#elif defined(XT_ATOMIC_GCC_OPS)
......@@ -427,6 +429,7 @@ inline void xt_fastlock_unlock(XTFastLockPtr fal, struct XTThread *XT_UNUSED(thr
typedef struct XTSpinXSLock {
volatile xtWord2 sxs_xlocked;
volatile xtWord2 sxs_xwaiter;
volatile xtWord2 sxs_rlock_count;
volatile xtWord2 sxs_wait_count; /* The number of readers waiting for the xlocker. */
#ifdef DEBUG
......@@ -446,7 +449,7 @@ void xt_spinxslock_init(struct XTThread *self, XTSpinXSLockPtr sxs, const char *
void xt_spinxslock_init(struct XTThread *self, XTSpinXSLockPtr sxs);
#endif
void xt_spinxslock_free(struct XTThread *self, XTSpinXSLockPtr sxs);
xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtThreadID thd_id);
xtBool xt_spinxslock_xlock(XTSpinXSLockPtr sxs, xtBool try_lock, xtThreadID thd_id);
xtBool xt_spinxslock_slock(XTSpinXSLockPtr sxs);
xtBool xt_spinxslock_unlock(XTSpinXSLockPtr sxs, xtBool xlocked);
......@@ -500,7 +503,7 @@ void xt_atomicrwlock_init(struct XTThread *self, XTAtomicRWLockPtr xsl, const ch
void xt_atomicrwlock_init(struct XTThread *self, XTAtomicRWLockPtr xsl);
#endif
void xt_atomicrwlock_free(struct XTThread *self, XTAtomicRWLockPtr xsl);
xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr xsl, xtThreadID thr_id);
xtBool xt_atomicrwlock_xlock(XTAtomicRWLockPtr xsl, xtBool try_lock, xtThreadID thr_id);
xtBool xt_atomicrwlock_slock(XTAtomicRWLockPtr xsl);
xtBool xt_atomicrwlock_unlock(XTAtomicRWLockPtr xsl, xtBool xlocked);
......@@ -525,7 +528,7 @@ void xt_skewrwlock_init(struct XTThread *self, XTSkewRWLockPtr xsl, const char *
void xt_skewrwlock_init(struct XTThread *self, XTSkewRWLockPtr xsl);
#endif
void xt_skewrwlock_free(struct XTThread *self, XTSkewRWLockPtr xsl);
xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr xsl, xtThreadID thr_id);
xtBool xt_skewrwlock_xlock(XTSkewRWLockPtr xsl, xtBool try_lock, xtThreadID thr_id);
xtBool xt_skewrwlock_slock(XTSkewRWLockPtr xsl);
xtBool xt_skewrwlock_unlock(XTSkewRWLockPtr xsl, xtBool xlocked);
......@@ -713,4 +716,57 @@ void xt_exit_row_lock_list(XTRowLockListPtr rl);
#define XT_HAVE_LOCK 2
#define XT_WAITING 3
/*
* -----------------------------------------------------------------------
* RECURSIVE MUTEX (allows lockers to lock again)
*/
typedef struct XTRecursiveMutex {
struct XTThread *rm_locker;
u_int rm_lock_count;
xt_mutex_type rm_mutex;
#ifdef XT_THREAD_LOCK_INFO
XTThreadLockInfoRec rm_lock_info;
const char *rm_name;
#endif
} XTRecursiveMutexRec, *XTRecursiveMutexPtr;
#ifdef XT_THREAD_LOCK_INFO
#define xt_recursivemutex_init_with_autoname(a,b) xt_recursivemutex_init(a,b,LOCKLIST_ARG_SUFFIX(b))
void xt_recursivemutex_init(struct XTThread *self, XTRecursiveMutexPtr rm, const char *name);
#else
#define xt_recursivemutex_init_with_autoname(a,b) xt_recursivemutex_init(a,b)
void xt_recursivemutex_init(struct XTThread *self, XTRecursiveMutexPtr rm);
#endif
void xt_recursivemutex_free(XTRecursiveMutexPtr rm);
void xt_recursivemutex_lock(struct XTThread *self, XTRecursiveMutexPtr rm);
void xt_recursivemutex_unlock(struct XTThread *self, XTRecursiveMutexPtr rm);
typedef struct XTRecurRWLock {
struct XTThread *rrw_locker;
u_int rrw_lock_count;
xt_rwlock_type rrw_lock;
#ifdef XT_THREAD_LOCK_INFO
XTThreadLockInfoRec rrw_lock_info;
const char *rrw_name;
#endif
} XTRecurRWLockRec, *XTRecurRWLockPtr;
#ifdef XT_THREAD_LOCK_INFO
#define xt_recurrwlock_init_with_autoname(a,b) xt_recurrwlock_init(a,b,LOCKLIST_ARG_SUFFIX(b))
void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw, const char *name);
#else
#define xt_recurrwlock_init_with_autoname(a,b) xt_recurrwlock_init(a,b)
void xt_recurrwlock_init(struct XTThread *self, XTRecurRWLockPtr rrw);
#endif
void xt_recurrwlock_free(XTRecurRWLockPtr rrw);
void xt_recurrwlock_xlock(struct XTThread *self, XTRecurRWLockPtr rrw);
void xt_recurrwlock_slock(struct XTThread *self, XTRecurRWLockPtr rrw);
void xt_recurrwlock_slock_ns(XTRecurRWLockPtr rrw);
void xt_recurrwlock_unxlock(struct XTThread *self, XTRecurRWLockPtr rrw);
void xt_recurrwlock_unslock(struct XTThread *self, XTRecurRWLockPtr rrw);
void xt_recurrwlock_unslock_ns(XTRecurRWLockPtr rrw);
#endif
......@@ -180,6 +180,10 @@ void xt_trace_thread_locks(XTThread *self)
lock_type = "XTAtomicRWLock";
lock_name = li->li_atomic_rwlock->arw_name;
break;
case XTThreadLockInfo::SKEW_RW_LOCK:
lock_type = "XTSkewRWLock";
lock_name = li->li_skew_rwlock->srw_name;
break;
}
xt_ttracef(self, " #lock#%d: type: %s name: %s \n", count, lock_type, lock_name);
......
......@@ -255,6 +255,11 @@ xtPublic u_int myxt_create_key_from_row(XTIndexPtr ind, xtWord1 *key, xtWord1 *r
xtWord1 *end;
xtWord1 *start;
#ifdef HAVE_valgrind
if (ind->mi_fix_key)
memset((byte*) key, 0,(size_t) (ind->mi_key_size) );
#endif
start = key;
for (u_int i=0; i<ind->mi_seg_count; i++, keyseg++)
{
......@@ -531,7 +536,7 @@ xtPublic u_int myxt_create_foreign_key_from_row(XTIndexPtr ind, xtWord1 *key, xt
key += length;
}
return fkey_ind->mi_fix_key ? fkey_ind->mi_key_size : (u_int) (key - start); /* Return keylength */
return (u_int) (key - start);
}
/* I may be overcautious here, but can I assume that
......@@ -2132,10 +2137,10 @@ static void my_deref_index_data(struct XTThread *self, XTIndexPtr mi)
{
enter_();
/* The dirty list of cache pages should be empty here! */
ASSERT(!mi->mi_dirty_list);
/* This is not the case if we were not able to flush data. E.g. when running out of disk space */
//ASSERT(!mi->mi_dirty_list);
ASSERT(!mi->mi_free_list);
xt_free_mutex(&mi->mi_flush_lock);
xt_spinlock_free(self, &mi->mi_dirty_lock);
XT_INDEX_FREE_LOCK(self, mi);
myxt_bitmap_free(self, &mi->mi_col_map);
......@@ -2174,7 +2179,6 @@ static XTIndexPtr my_create_index(XTThreadPtr self, TABLE *table_arg, u_int idx,
pushsr_(ind, my_deref_index_data, (XTIndexPtr) xt_calloc(self, MX_OFFSETOF(XTIndexRec, mi_seg) + sizeof(XTIndexSegRec) * index->key_parts));
XT_INDEX_INIT_LOCK(self, ind);
xt_init_mutex_with_autoname(self, &ind->mi_flush_lock);
xt_spinlock_init_with_autoname(self, &ind->mi_dirty_lock);
ind->mi_index_no = idx;
ind->mi_flags = (index->flags & (HA_NOSAME | HA_NULL_ARE_EQUAL | HA_UNIQUE_CHECK));
......@@ -2556,8 +2560,12 @@ xtPublic void myxt_setup_dictionary(XTThreadPtr self, XTDictionaryPtr dic)
ave_row_size += 3 + ave_data_size;
/* This is the length of the record required for all indexes: */
if (field_count + 1 == dic->dic_ind_cols_req)
dic->dic_ind_rec_len = max_data_size;
/* This was calculated incorrectly. Not a serius bug because it
* is only used in the case of fixed length row, and in this
* case the dic_ind_rec_len is set correctly below.
*/
if (field_count == dic->dic_ind_cols_req)
dic->dic_ind_rec_len = max_row_size;
}
dic->dic_min_row_size = min_row_size;
......@@ -2624,6 +2632,20 @@ xtPublic void myxt_setup_dictionary(XTThreadPtr self, XTDictionaryPtr dic)
}
}
/* Ensure that handle data record size is big enough to
* include the extended record reference, in the case of
* variable length rows
*/
if (!dic_rec_fixed) {
if (dic_rec_size < offsetof(XTTabRecExtDRec, re_data))
dic_rec_size = offsetof(XTTabRecExtDRec, re_data);
}
#ifdef DEBUG
else {
ASSERT_NS(dic_rec_size > offsetof(XTTabRecFix, rf_data));
}
#endif
if (!dic->dic_rec_size) {
dic->dic_rec_size = dic_rec_size;
dic->dic_rec_fixed = dic_rec_fixed;
......@@ -2861,6 +2883,7 @@ static void ha_create_dd_index(XTThreadPtr self, XTDDIndex *ind, KEY *key)
for (key_part = key->key_part; key_part != key_part_end; key_part++) {
if (!(cref = new XTDDColumnRef()))
xt_throw_errno(XT_CONTEXT, XT_ENOMEM);
cref->init(self);
ind->co_cols.append(self, cref);
cref->cr_col_name = xt_dup_string(self, (char *) key_part->field->field_name);
}
......
......@@ -39,25 +39,25 @@
#ifdef XT_WIN
void xt_p_init_threading(void)
xtPublic void xt_p_init_threading(void)
{
}
int xt_p_set_normal_priority(pthread_t thr)
xtPublic int xt_p_set_normal_priority(pthread_t thr)
{
if (!SetThreadPriority (thr, THREAD_PRIORITY_NORMAL))
return GetLastError();
return 0;
}
int xt_p_set_low_priority(pthread_t thr)
xtPublic int xt_p_set_low_priority(pthread_t thr)
{
if (!SetThreadPriority (thr, THREAD_PRIORITY_LOWEST))
return GetLastError();
return 0;
}
int xt_p_set_high_priority(pthread_t thr)
xtPublic int xt_p_set_high_priority(pthread_t thr)
{
if (!SetThreadPriority (thr, THREAD_PRIORITY_HIGHEST))
return GetLastError();
......@@ -67,9 +67,9 @@ int xt_p_set_high_priority(pthread_t thr)
#define XT_RWLOCK_MAGIC 0x78AC390E
#ifdef XT_THREAD_LOCK_INFO
int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr, const char *n)
xtPublic int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr, const char *n)
#else
int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr)
xtPublic int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr)
#endif
{
InitializeCriticalSection(&mutex->mt_cs);
......@@ -80,7 +80,7 @@ int xt_p_mutex_init(xt_mutex_type *mutex, const pthread_mutexattr_t *attr)
return 0;
}
int xt_p_mutex_destroy(xt_mutex_type *mutex)
xtPublic int xt_p_mutex_destroy(xt_mutex_type *mutex)
{
DeleteCriticalSection(&mutex->mt_cs);
#ifdef XT_THREAD_LOCK_INFO
......@@ -89,7 +89,7 @@ int xt_p_mutex_destroy(xt_mutex_type *mutex)
return 0;
}
int xt_p_mutex_lock(xt_mutex_type *mx)
xtPublic int xt_p_mutex_lock(xt_mutex_type *mx)
{
EnterCriticalSection(&mx->mt_cs);
#ifdef XT_THREAD_LOCK_INFO
......@@ -98,7 +98,7 @@ int xt_p_mutex_lock(xt_mutex_type *mx)
return 0;
}
int xt_p_mutex_unlock(xt_mutex_type *mx)
xtPublic int xt_p_mutex_unlock(xt_mutex_type *mx)
{
LeaveCriticalSection(&mx->mt_cs);
#ifdef XT_THREAD_LOCK_INFO
......@@ -107,7 +107,7 @@ int xt_p_mutex_unlock(xt_mutex_type *mx)
return 0;
}
int xt_p_mutex_trylock(xt_mutex_type *mutex)
xtPublic int xt_p_mutex_trylock(xt_mutex_type *mutex)
{
#if(_WIN32_WINNT >= 0x0400)
/* NOTE: MySQL bug! was using?!
......@@ -130,9 +130,9 @@ int xt_p_mutex_trylock(xt_mutex_type *mutex)
}
#ifdef XT_THREAD_LOCK_INFO
int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr, const char *n)
xtPublic int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr, const char *n)
#else
int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr)
xtPublic int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr)
#endif
{
int result;
......@@ -173,7 +173,7 @@ int xt_p_rwlock_init(xt_rwlock_type *rwl, const pthread_condattr_t *attr)
return result;
}
int xt_p_rwlock_destroy(xt_rwlock_type *rwl)
xtPublic int xt_p_rwlock_destroy(xt_rwlock_type *rwl)
{
int result = 0, result1 = 0, result2 = 0;
......@@ -225,7 +225,7 @@ int xt_p_rwlock_destroy(xt_rwlock_type *rwl)
}
int xt_p_rwlock_rdlock(xt_rwlock_type *rwl)
xtPublic int xt_p_rwlock_rdlock(xt_rwlock_type *rwl)
{
int result;
......@@ -262,7 +262,7 @@ int xt_p_rwlock_rdlock(xt_rwlock_type *rwl)
return (xt_p_mutex_unlock (&(rwl->rw_ex_lock)));
}
int xt_p_rwlock_wrlock(xt_rwlock_type *rwl)
xtPublic int xt_p_rwlock_wrlock(xt_rwlock_type *rwl)
{
int result;
......@@ -309,7 +309,54 @@ int xt_p_rwlock_wrlock(xt_rwlock_type *rwl)
return result;
}
int xt_p_rwlock_unlock(xt_rwlock_type *rwl)
xtPublic xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwl)
{
int result;
if (rwl == NULL)
return FALSE;
if (rwl->rw_magic != XT_RWLOCK_MAGIC)
return FALSE;
if ((result = xt_p_mutex_trylock(&rwl->rw_ex_lock)) != 0)
return FALSE;
if ((result = xt_p_mutex_lock(&rwl->rw_sh_lock)) != 0) {
(void) xt_p_mutex_unlock(&rwl->rw_ex_lock);
return FALSE;
}
if (rwl->rw_ex_count == 0) {
if (rwl->rw_sh_complete_count > 0) {
rwl->rw_sh_count -= rwl->rw_sh_complete_count;
rwl->rw_sh_complete_count = 0;
}
if (rwl->rw_sh_count > 0) {
rwl->rw_sh_complete_count = -rwl->rw_sh_count;
do {
result = pthread_cond_wait (&rwl->rw_sh_cond, &rwl->rw_sh_lock.mt_cs);
}
while (result == 0 && rwl->rw_sh_complete_count < 0);
if (result == 0)
rwl->rw_sh_count = 0;
}
}
if (result == 0)
rwl->rw_ex_count++;
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_add_owner(&rwl->rw_lock_info);
#endif
return TRUE;
}
xtPublic int xt_p_rwlock_unlock(xt_rwlock_type *rwl)
{
int result, result1;
......@@ -342,12 +389,12 @@ int xt_p_rwlock_unlock(xt_rwlock_type *rwl)
return ((result != 0) ? result : result1);
}
int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex)
xtPublic int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex)
{
return xt_p_cond_timedwait(cond, mutex, NULL);
}
int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *abstime)
xtPublic int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *abstime)
{
pthread_mutex_t *mutex = &mt->mt_cs;
int result;
......@@ -393,7 +440,7 @@ int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *
return result == WAIT_TIMEOUT ? ETIMEDOUT : 0;
}
int xt_p_join(pthread_t thread, void **value)
xtPublic int xt_p_join(pthread_t thread, void **value)
{
DWORD exitcode;
......@@ -676,6 +723,23 @@ xtPublic int xt_p_rwlock_wrlock(xt_rwlock_type *rwlock)
return r;
}
xtPublic xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwlock)
{
XTThreadPtr self = xt_get_self();
int r;
ASSERT_NS(rwlock->rw_init == 67890);
r = pthread_rwlock_trywrlock(&rwlock->rw_plock);
if (r == 0) {
ASSERT_NS(!rwlock->rw_locker);
rwlock->rw_locker = self;
#ifdef XT_THREAD_LOCK_INFO
xt_thread_lock_info_add_owner(&rwlock->rw_lock_info);
#endif
}
return r == 0;
}
xtPublic int xt_p_rwlock_unlock(xt_rwlock_type *rwlock)
{
XTThreadPtr self = xt_get_self();
......
......@@ -101,13 +101,14 @@ int xt_p_rwlock_init(xt_rwlock_type *rwlock, const pthread_condattr_t *attr, con
#else
int xt_p_rwlock_init(xt_rwlock_type *rwlock, const pthread_condattr_t *attr);
#endif
int xt_p_rwlock_destroy(xt_rwlock_type *rwlock);
int xt_p_rwlock_rdlock(xt_rwlock_type *mx);
int xt_p_rwlock_wrlock(xt_rwlock_type *mx);
int xt_p_rwlock_unlock(xt_rwlock_type *mx);
int xt_p_rwlock_destroy(xt_rwlock_type *rwlock);
int xt_p_rwlock_rdlock(xt_rwlock_type *mx);
int xt_p_rwlock_wrlock(xt_rwlock_type *mx);
xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *rwl);
int xt_p_rwlock_unlock(xt_rwlock_type *mx);
int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex);
int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, struct timespec *abstime);
int xt_p_cond_wait(xt_cond_type *cond, xt_mutex_type *mutex);
int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, struct timespec *abstime);
int xt_p_join(pthread_t thread, void **value);
......@@ -125,6 +126,7 @@ int xt_p_join(pthread_t thread, void **value);
#define xt_slock_rwlock_ns xt_p_rwlock_rdlock
#define xt_xlock_rwlock_ns xt_p_rwlock_wrlock
#define xt_xlock_try_rwlock_ns xt_p_rwlock_try_wrlock
#define xt_unlock_rwlock_ns xt_p_rwlock_unlock
#ifdef XT_THREAD_LOCK_INFO
......@@ -225,9 +227,10 @@ typedef struct xt_rwlock_struct {
#endif
} xt_rwlock_type;
int xt_p_rwlock_rdlock(xt_rwlock_type *mx);
int xt_p_rwlock_wrlock(xt_rwlock_type *mx);
int xt_p_rwlock_unlock(xt_rwlock_type *mx);
int xt_p_rwlock_rdlock(xt_rwlock_type *mx);
int xt_p_rwlock_wrlock(xt_rwlock_type *mx);
xtBool xt_p_rwlock_try_wrlock(xt_rwlock_type *mx);
int xt_p_rwlock_unlock(xt_rwlock_type *mx);
int xt_p_mutex_lock(xt_mutex_type *mx, u_int line, const char *file);
int xt_p_mutex_unlock(xt_mutex_type *mx);
......@@ -251,37 +254,39 @@ int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mutex, const struct t
}
#endif
#define xt_slock_rwlock_ns xt_p_rwlock_rdlock
#define xt_xlock_rwlock_ns xt_p_rwlock_wrlock
#define xt_unlock_rwlock_ns xt_p_rwlock_unlock
#define xt_slock_rwlock_ns xt_p_rwlock_rdlock
#define xt_xlock_rwlock_ns xt_p_rwlock_wrlock
#define xt_xlock_try_rwlock_ns xt_p_rwlock_try_wrlock
#define xt_unlock_rwlock_ns xt_p_rwlock_unlock
#define xt_lock_mutex_ns(x) xt_p_mutex_lock(x, __LINE__, __FILE__)
#define xt_unlock_mutex_ns xt_p_mutex_unlock
#define xt_mutex_trylock xt_p_mutex_trylock
#define xt_lock_mutex_ns(x) xt_p_mutex_lock(x, __LINE__, __FILE__)
#define xt_unlock_mutex_ns xt_p_mutex_unlock
#define xt_mutex_trylock xt_p_mutex_trylock
#else // DEBUG_LOCKING
#define xt_rwlock_struct _opaque_pthread_rwlock_t
#define xt_mutex_struct _opaque_pthread_mutex_t
#define xt_rwlock_struct _opaque_pthread_rwlock_t
#define xt_mutex_struct _opaque_pthread_mutex_t
#define xt_rwlock_type pthread_rwlock_t
#define xt_mutex_type pthread_mutex_t
#define xt_rwlock_type pthread_rwlock_t
#define xt_mutex_type pthread_mutex_t
#define xt_slock_rwlock_ns pthread_rwlock_rdlock
#define xt_xlock_rwlock_ns pthread_rwlock_wrlock
#define xt_unlock_rwlock_ns pthread_rwlock_unlock
#define xt_slock_rwlock_ns pthread_rwlock_rdlock
#define xt_xlock_rwlock_ns pthread_rwlock_wrlock
#define xt_xlock_try_rwlock_ns(x) (pthread_rwlock_trywrlock(x) == 0)
#define xt_unlock_rwlock_ns pthread_rwlock_unlock
#define xt_lock_mutex_ns pthread_mutex_lock
#define xt_unlock_mutex_ns pthread_mutex_unlock
#define xt_mutex_trylock pthread_mutex_trylock
#define xt_lock_mutex_ns pthread_mutex_lock
#define xt_unlock_mutex_ns pthread_mutex_unlock
#define xt_mutex_trylock pthread_mutex_trylock
#define xt_p_mutex_trylock pthread_mutex_trylock
#define xt_p_mutex_destroy pthread_mutex_destroy
#define xt_p_mutex_init pthread_mutex_init
#define xt_p_rwlock_destroy pthread_rwlock_destroy
#define xt_p_rwlock_init pthread_rwlock_init
#define xt_p_cond_wait pthread_cond_wait
#define xt_p_cond_timedwait pthread_cond_timedwait
#define xt_p_mutex_trylock pthread_mutex_trylock
#define xt_p_mutex_destroy pthread_mutex_destroy
#define xt_p_mutex_init pthread_mutex_init
#define xt_p_rwlock_destroy pthread_rwlock_destroy
#define xt_p_rwlock_init pthread_rwlock_init
#define xt_p_cond_wait pthread_cond_wait
#define xt_p_cond_timedwait pthread_cond_timedwait
#endif // DEBUG_LOCKING
......
......@@ -1315,7 +1315,7 @@ static void xres_apply_operations(XTThreadPtr self, XTWriterStatePtr ws, xtBool
tab->tab_head_op_seq = op->or_op_seq;
if (tab->tab_wr_wake_freeer) {
if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
xt_wr_wake_freeer(self);
xt_wr_wake_freeer(self, ws->ws_db);
}
i++;
}
......@@ -1498,7 +1498,7 @@ xtPublic void xt_xres_apply_in_order(XTThreadPtr self, XTWriterStatePtr ws, xtLo
tab->tab_head_op_seq = op_seq;
if (tab->tab_wr_wake_freeer) {
if (!XTTableSeq::xt_op_is_before(tab->tab_head_op_seq, tab->tab_wake_freeer_op))
xt_wr_wake_freeer(self);
xt_wr_wake_freeer(self, ws->ws_db);
}
/* Apply any operations in the list that now follow on...
......@@ -1575,10 +1575,12 @@ static int xres_comp_flush_tabs(XTThreadPtr XT_UNUSED(self), register const void
static void xres_init_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
{
xt_init_mutex_with_autoname(self, &cp->cp_state_lock);
cp->cp_inited = TRUE;
}
static void xres_free_checkpoint_state(XTThreadPtr self, XTCheckPointStatePtr cp)
{
cp->cp_inited = FALSE;
xt_free_mutex(&cp->cp_state_lock);
if (cp->cp_table_ids) {
xt_free_sortedlist(self, cp->cp_table_ids);
......@@ -1616,6 +1618,7 @@ xtPublic void xt_xres_init(XTThreadPtr self, XTDatabaseHPtr db)
xt_init_mutex_with_autoname(self, &db->db_cp_lock);
xt_init_cond(self, &db->db_cp_cond);
xt_init_mutex_with_autoname(self, &db->db_fl_lock);
xres_init_checkpoint_state(self, &db->db_cp_state);
db->db_restart.xres_init(self, db, &db->db_wr_log_id, &db->db_wr_log_offset, &max_log_id);
......@@ -1633,6 +1636,7 @@ xtPublic void xt_xres_exit(XTThreadPtr self, XTDatabaseHPtr db)
xres_free_checkpoint_state(self, &db->db_cp_state);
xt_free_mutex(&db->db_cp_lock);
xt_free_cond(&db->db_cp_cond);
xt_free_mutex(&db->db_fl_lock);
}
/* ----------------------------------------------------------------------
......@@ -2182,7 +2186,7 @@ xtBool XTXactRestart::xres_restart(XTThreadPtr self, xtLogID *log_id, xtLogOffse
xtBool XTXactRestart::xres_is_checkpoint_pending(xtLogID curr_log_id, xtLogOffset curr_log_offset)
{
return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency / 2;
return xt_bytes_since_last_checkpoint(xres_db, curr_log_id, curr_log_offset) >= xt_db_checkpoint_frequency;
}
/*
......@@ -2531,10 +2535,10 @@ static void xres_cp_main(XTThreadPtr self)
XTDatabaseHPtr db = self->st_database;
u_int curr_writer_total;
time_t now;
xtXactID sweep_count;
xt_set_low_priority(self);
while (!self->t_quit) {
/* Wait 2 seconds: */
curr_writer_total = db->db_xn_total_writer_count;
......@@ -2549,9 +2553,13 @@ static void xres_cp_main(XTThreadPtr self)
if (self->t_quit)
break;
if (curr_writer_total == db->db_xn_total_writer_count)
sweep_count = db->db_xn_curr_id + 1 - db->db_xn_to_clean_id;
if (curr_writer_total == db->db_xn_total_writer_count &&
!sweep_count &&
db->db_wr_idle == XT_THREAD_IDLE) {
/* No activity in 2 seconds: */
xres_cp_checkpoint(self, db, curr_writer_total, FALSE);
}
else {
/* There server is busy, check if we need to
* write a checkpoint anyway...
......@@ -2672,6 +2680,10 @@ xtPublic xtBool xt_begin_checkpoint(XTDatabaseHPtr db, xtBool have_table_lock, X
XTOperationPtr op;
XTCheckPointTableRec cpt;
XTSortedListPtr tables = NULL;
/* during startup we can get an error before the checkpointer is inited */
if (!cp->cp_inited)
return FAILED;
/* First check if a checkpoint is already running: */
xt_lock_mutex_ns(&cp->cp_state_lock);
......@@ -3314,7 +3326,7 @@ static void *xn_xres_run_recovery_thread(XTThreadPtr self)
* #7 0x000c0db2 in THD::~THD at sql_class.cc:934
* #8 0x003b025b in myxt_destroy_thread at myxt_xt.cc:2999
* #9 0x003b66b5 in xn_xres_run_recovery_thread at restart_xt.cc:3196
* #10 0x003cbfbb in thr_main_pbxt at thread_xt.cc:1020
* #10 0x003cbfbb in xt_thread_main at thread_xt.cc:1020
*
myxt_destroy_thread(mysql_thread, TRUE);
*/
......@@ -3350,3 +3362,123 @@ xtPublic void xt_xres_terminate_recovery(XTThreadPtr self)
xt_wait_for_thread(tid, TRUE);
}
}
/* ----------------------------------------------------------------------
* L O G F L U S H P R O C E S S
*/
static void *xres_fl_run_thread(XTThreadPtr self)
{
XTDatabaseHPtr db = (XTDatabaseHPtr) self->t_data;
int count;
void *mysql_thread;
xtWord8 to_flush;
if (!(mysql_thread = myxt_create_thread()))
xt_throw(self);
while (!self->t_quit) {
try_(a) {
/*
* The garbage collector requires that the database
* is in use because.
*/
xt_use_database(self, db, XT_FOR_CHECKPOINTER);
/* This action is both safe and required (see details elsewhere) */
xt_heap_release(self, self->st_database);
xt_set_low_priority(self);
to_flush = xt_trace_clock() + XT_XLOG_FLUSH_FREQ * 1000;
for (;;) {
/* Wait 1 second: */
while (!self->t_quit && xt_trace_clock() < to_flush)
xt_sleep_milli_second(10);
if (self->t_quit)
break;
if (!db->db_xlog.xlog_flush(self))
xt_throw(self);
to_flush += XT_XLOG_FLUSH_FREQ * 1000;
}
}
catch_(a) {
/* This error is "normal"! */
if (self->t_exception.e_xt_err != XT_ERR_NO_DICTIONARY &&
!(self->t_exception.e_xt_err == XT_SIGNAL_CAUGHT &&
self->t_exception.e_sys_err == SIGTERM))
xt_log_and_clear_exception(self);
}
cont_(a);
/* Avoid releasing the database (done above) */
self->st_database = NULL;
xt_unuse_database(self, self);
/* After an exception, pause before trying again... */
/* Number of seconds */
count = 60;
while (!self->t_quit && count > 0) {
sleep(1);
count--;
}
}
/*
* {MYSQL-THREAD-KILL}
myxt_destroy_thread(mysql_thread, TRUE);
*/
return NULL;
}
static void xres_fl_free_thread(XTThreadPtr self, void *data)
{
XTDatabaseHPtr db = (XTDatabaseHPtr) data;
if (db->db_fl_thread) {
xt_lock_mutex(self, &db->db_fl_lock);
pushr_(xt_unlock_mutex, &db->db_fl_lock);
db->db_fl_thread = NULL;
freer_(); // xt_unlock_mutex(&db->db_fl_lock)
}
}
xtPublic void xt_start_flusher(XTThreadPtr self, XTDatabaseHPtr db)
{
char name[PATH_MAX];
sprintf(name, "FL-%s", xt_last_directory_of_path(db->db_main_path));
xt_remove_dir_char(name);
db->db_fl_thread = xt_create_daemon(self, name);
xt_set_thread_data(db->db_fl_thread, db, xres_fl_free_thread);
xt_run_thread(self, db->db_fl_thread, xres_fl_run_thread);
}
xtPublic void xt_stop_flusher(XTThreadPtr self, XTDatabaseHPtr db)
{
XTThreadPtr thr_fl;
if (db->db_fl_thread) {
xt_lock_mutex(self, &db->db_fl_lock);
pushr_(xt_unlock_mutex, &db->db_fl_lock);
/* This pointer is safe as long as you have the transaction lock. */
if ((thr_fl = db->db_fl_thread)) {
xtThreadID tid = thr_fl->t_id;
/* Make sure the thread quits when woken up. */
xt_terminate_thread(self, thr_fl);
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
xt_wait_for_thread(tid, FALSE);
db->db_fl_thread = NULL;
}
else
freer_(); // xt_unlock_mutex(&db->db_cp_lock)
}
}
......@@ -92,6 +92,7 @@ typedef struct XTXactRestart {
} XTXactRestartRec, *XTXactRestartPtr;
typedef struct XTCheckPointState {
xtBool cp_inited; /* TRUE if structure was inited */
xt_mutex_type cp_state_lock; /* Lock and the entire checkpoint state. */
xtBool cp_running; /* TRUE if a checkpoint is running. */
xtLogID cp_log_id;
......@@ -136,6 +137,9 @@ void xt_dump_xlogs(struct XTDatabase *db, xtLogID start_log);
void xt_xres_start_database_recovery(XTThreadPtr self);
void xt_xres_terminate_recovery(XTThreadPtr self);
void xt_start_flusher(struct XTThread *self, struct XTDatabase *db);
void xt_stop_flusher(struct XTThread *self, struct XTDatabase *db);
#define XT_RECOVER_PENDING 0
#define XT_RECOVER_DONE 1
#define XT_RECOVER_SWEPT 2
......
......@@ -380,7 +380,7 @@ xtPublic void xt_int8_to_byte_size(xtInt8 value, char *string)
/* Version number must also be set in configure.in! */
xtPublic c_char *xt_get_version(void)
{
return "1.0.09g RC";
return "1.0.11 Pre-GA";
}
/* Copy and URL decode! */
......
......@@ -46,8 +46,10 @@ static void tabc_fr_wait_for_cache(XTThreadPtr self, u_int msecs);
xtPublic void xt_tc_set_cache_size(size_t cache_size)
{
xt_tab_cache.tcm_cache_size = cache_size;
xt_tab_cache.tcm_low_level = cache_size / 4 * 3; // Current 75%
xt_tab_cache.tcm_high_level = cache_size / 100 * 95; // Current 95%
/* Multiplying by this number can overflow a 4 byte value! */
xt_tab_cache.tcm_low_level = (size_t) ((xtWord8) cache_size * (xtWord8) 70 / (xtWord8) 100); // Current 70%
xt_tab_cache.tcm_high_level = (size_t) ((xtWord8) cache_size * 95 / (xtWord8) 100); // Current 95%
xt_tab_cache.tcm_mid_level = (size_t) ((xtWord8) cache_size * 85 / (xtWord8) 100); // Current 85%
}
/*
......@@ -84,25 +86,30 @@ xtPublic void xt_tc_init(XTThreadPtr self, size_t cache_size)
xtPublic void xt_tc_exit(XTThreadPtr self)
{
XTTabCacheSegPtr seg;
for (u_int i=0; i<XT_TC_SEGMENT_COUNT; i++) {
if (xt_tab_cache.tcm_segment[i].tcs_hash_table) {
if (xt_tab_cache.tcm_segment[i].tcs_cache_in_use) {
XTTabCachePagePtr page, tmp_page;
for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
page = xt_tab_cache.tcm_segment[i].tcs_hash_table[j];
while (page) {
tmp_page = page;
page = page->tcp_next;
xt_free(self, tmp_page);
}
seg = &xt_tab_cache.tcm_segment[i];
if (seg->tcs_hash_table) {
XTTabCachePagePtr page, tmp_page;
for (size_t j=0; j<xt_tab_cache.tcm_hash_size; j++) {
page = seg->tcs_hash_table[j];
while (page) {
tmp_page = page;
page = page->tcp_next;
ASSERT_NS(seg->tcs_cache_in_use >= offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
seg->tcs_cache_in_use -= (offsetof(XTTabCachePageRec, tcp_data) + tmp_page->tcp_data_size);
ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
xt_free(self, tmp_page);
}
}
xt_free(self, xt_tab_cache.tcm_segment[i].tcs_hash_table);
xt_tab_cache.tcm_segment[i].tcs_hash_table = NULL;
TAB_CAC_FREE_LOCK(self, &xt_tab_cache.tcm_segment[i].tcs_lock);
xt_free(self, seg->tcs_hash_table);
seg->tcs_hash_table = NULL;
TAB_CAC_FREE_LOCK(self, &seg->tcs_lock);
}
ASSERT_NS(seg->tcs_cache_in_use == 0);
}
xt_free_mutex(&xt_tab_cache.tcm_lock);
......@@ -554,24 +561,24 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache
}
page = page->tcp_next;
}
size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
TAB_CAC_UNLOCK(&seg->tcs_lock, thread->t_id);
/* Page not found, allocate a new page: */
size_t page_size = offsetof(XTTabCachePageRec, tcp_data) + this->tci_page_size;
if (!(new_page = (XTTabCachePagePtr) xt_malloc_ns(page_size)))
return FAILED;
/* Increment cache used. */
seg->tcs_cache_in_use += page_size;
/* Check the level of the cache: */
size_t cache_used = 0;
for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
if (cache_used > dcg->tcm_cache_high)
if (cache_used + page_size > dcg->tcm_cache_high)
dcg->tcm_cache_high = cache_used;
if (cache_used > dcg->tcm_cache_size) {
if (cache_used + page_size > dcg->tcm_cache_size) {
XTThreadPtr self;
time_t now;
......@@ -638,7 +645,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache
for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
if (cache_used <= dcg->tcm_high_level)
if (cache_used + page_size <= dcg->tcm_high_level)
break;
/*
* If there is too little cache we can get stuck here.
......@@ -663,7 +670,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache
while (time(NULL) < now + 5);
xt_unlock_mutex_ns(&dcg->tcm_freeer_lock);
}
else if (cache_used > dcg->tcm_high_level) {
else if (cache_used + page_size > dcg->tcm_high_level) {
/* Wake up the freeer because the cache level,
* is higher than the high level.
*/
......@@ -693,6 +700,9 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache
}
#ifdef XT_MEMSET_UNUSED_SPACE
else
red_size = 0;
/* Removing this is an optimization. It should not be required
* to clear the unused space in the page.
*/
......@@ -727,6 +737,15 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_FILE_PTR file, xtRefID ref_id, XTTabCache
page->tcp_next = seg->tcs_hash_table[hash_idx];
seg->tcs_hash_table[hash_idx] = page;
/* GOTCHA! This increment was done just after the malloc!
* So it was not protected by the segment lock!
* The result was that this count was no longer reliable,
* This resulted in the amount of cache being used becoming less, and\
* less, because increments were lost over time!
*/
/* Increment cache used. */
seg->tcs_cache_in_use += page_size;
done_ok:
*ret_seg = seg;
*ret_page = page;
......@@ -761,7 +780,7 @@ xtBool XTTableSeq::ts_log_no_op(XTThreadPtr thread, xtTableID tab_id, xtOpSeqNo
* some will be missing, so the writer will not
* be able to contniue.
*/
return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, FALSE);
return xt_xlog_log_data(thread, sizeof(XTactNoOpEntryDRec), (XTXactLogBufferDPtr) &ent_rec, XT_XLOG_NO_WRITE_NO_FLUSH);
}
#ifdef XT_NOT_INLINE
......@@ -828,13 +847,23 @@ xtBool XTTableSeq::xt_op_is_before(register xtOpSeqNo now, register xtOpSeqNo th
/*
* Used by the writer to wake the freeer.
*/
xtPublic void xt_wr_wake_freeer(XTThreadPtr self)
xtPublic void xt_wr_wake_freeer(XTThreadPtr self, XTDatabaseHPtr db)
{
/* BUG FIX: Was using tcm_freeer_cond.
* This is incorrect. When the freeer waits for the
* writter, it uses the writer's condition!
*/
xt_lock_mutex_ns(&db->db_wr_lock);
if (!xt_broadcast_cond_ns(&db->db_wr_cond))
xt_log_and_clear_exception_ns();
xt_unlock_mutex_ns(&db->db_wr_lock);
/*
xt_lock_mutex(self, &xt_tab_cache.tcm_freeer_lock);
pushr_(xt_unlock_mutex, &xt_tab_cache.tcm_freeer_lock);
if (!xt_broadcast_cond_ns(&xt_tab_cache.tcm_freeer_cond))
xt_log_and_clear_exception_ns();
freer_(); // xt_unlock_mutex(&xt_tab_cache.tcm_freeer_lock)
*/
}
/* Wait for a transaction to quit: */
......@@ -1070,7 +1099,9 @@ static size_t tabc_free_page(XTThreadPtr self, TCResourcePtr tc)
/* Free the page: */
size_t freed_space = offsetof(XTTabCachePageRec, tcp_data) + page->tcp_data_size;
ASSERT_NS(seg->tcs_cache_in_use >= freed_space);
seg->tcs_cache_in_use -= freed_space;
ASSERT_NS(seg->tcs_cache_in_use == 0 || seg->tcs_cache_in_use >= 25000);
xt_free_ns(page);
TAB_CAC_UNLOCK(&seg->tcs_lock, self->t_id);
......@@ -1083,6 +1114,7 @@ static void tabc_fr_main(XTThreadPtr self)
{
register XTTabCacheMemPtr dcg = &xt_tab_cache;
TCResourceRec tc = { 0 };
int i;
xt_set_low_priority(self);
dcg->tcm_freeer_busy = TRUE;
......@@ -1095,14 +1127,20 @@ static void tabc_fr_main(XTThreadPtr self)
while (!self->t_quit) {
/* Total up the cache memory used: */
cache_used = 0;
for (int i=0; i<XT_TC_SEGMENT_COUNT; i++)
for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
if (cache_used > dcg->tcm_cache_high) {
if (cache_used > dcg->tcm_cache_high)
dcg->tcm_cache_high = cache_used;
}
/* Check if the cache usage is over 95%: */
if (self->t_quit || cache_used < dcg->tcm_high_level)
if (self->t_quit)
break;
/* If threads are waiting then we are more aggressive about freeing
* cache.
*/
if (cache_used < (dcg->tcm_threads_waiting ? dcg->tcm_mid_level : dcg->tcm_high_level))
break;
/* Reduce cache to the 75% level: */
......@@ -1137,7 +1175,23 @@ static void tabc_fr_main(XTThreadPtr self)
*/
xt_db_approximate_time = time(NULL);
dcg->tcm_freeer_busy = FALSE;
tabc_fr_wait_for_cache(self, 500);
/* No idea, why, but I am getting an uneccesarry pause here.
* I run DBT2 with low record cache.
*
* Every now and then there is a pause where the freeer is here,
* and all user threads are waiting for the freeer.
*
* So adding the tcm_threads_waiting condition.
*/
if (dcg->tcm_threads_waiting) {
cache_used = 0;
for (i=0; i<XT_TC_SEGMENT_COUNT; i++)
cache_used += dcg->tcm_segment[i].tcs_cache_in_use;
if (cache_used < dcg->tcm_mid_level)
tabc_fr_wait_for_cache(self, 500);
}
else
tabc_fr_wait_for_cache(self, 500);
//tabc_fr_wait_for_cache(self, 30*1000);
dcg->tcm_freeer_busy = TRUE;
xt_db_approximate_time = time(NULL);
......@@ -1174,7 +1228,7 @@ static void *tabc_fr_run_thread(XTThreadPtr self)
count = 2*60;
#endif
while (!self->t_quit && count > 0) {
xt_db_approximate_time = xt_trace_clock();
xt_db_approximate_time = time(NULL);
sleep(1);
count--;
}
......
......@@ -29,6 +29,7 @@
struct XTTable;
struct XTOpenTable;
struct XTTabCache;
struct XTDatabase;
#include "thread_xt.h"
#include "filesys_xt.h"
......@@ -226,6 +227,7 @@ typedef struct XTTabCacheMem {
size_t tcm_cache_high; /* The high water level of cache allocation. */
size_t tcm_low_level; /* This is the level to which the freeer will free, once it starts working. */
size_t tcm_high_level; /* This is the level at which the freeer will start to work (to avoid waiting)! */
size_t tcm_mid_level; /* At this level the freeer will not sleep if there are threads waiting. */
/* The free'er thread: */
struct XTThread *tcm_freeer_thread; /* The freeer thread . */
......@@ -283,6 +285,6 @@ void xt_check_table_cache(struct XTTable *tab);
void xt_quit_freeer(XTThreadPtr self);
void xt_stop_freeer(XTThreadPtr self);
void xt_start_freeer(XTThreadPtr self);
void xt_wr_wake_freeer(XTThreadPtr self);
void xt_wr_wake_freeer(XTThreadPtr self, struct XTDatabase *db);
#endif
This diff is collapsed.
......@@ -364,6 +364,9 @@ typedef struct XTTable : public XTHeap {
xtWord4 tab_rec_fnum; /* The count of the number of free rows on the free list. */
xt_mutex_type tab_rec_lock; /* Lock for the free list. */
xt_mutex_type tab_ind_stat_lock; /* Aquired when calculating index statistics. */
time_t tab_ind_stat_calc_time; /* Zero means the index stats have not be calculated, otherwize this is a time. */
xt_mutex_type tab_ind_flush_lock; /* Required while the index file is being flushed. */
xtLogID tab_ind_rec_log_id; /* The point before which index entries have been written. */
xtLogOffset tab_ind_rec_log_offset; /* The log offset of the write point. */
......@@ -372,7 +375,7 @@ typedef struct XTTable : public XTHeap {
xtIndexNodeID tab_ind_free; /* The start of the free page list of the index. */
XTIndFreeListPtr tab_ind_free_list; /* A cache of the free list (if exists, don't go to disk!) */
xt_mutex_type tab_ind_lock; /* Lock for reading and writing the index free list. */
xtWord2 tab_ind_flush_seq;
xtWord4 tab_ind_flush_seq;
} XTTableHRec, *XTTableHPtr; /* Heap pointer */
/* Used for an in-memory list of the tables, ordered by ID. */
......@@ -403,6 +406,8 @@ typedef struct XTOpenTable {
size_t ot_rec_size; /* Cached from table for quick access. */
char ot_error_key[XT_IDENTIFIER_NAME_SIZE];
struct XTOpenTable *ot_prev_update; /* The UPDATE statement stack! {UPDATE-STACK} */
u_int ot_update_id; /* The update statement ID. */
xtBool ot_for_update; /* True if reading FOR UPDATE. */
xtBool ot_is_modify; /* True if UPDATE or DELETE. */
xtRowID ot_temp_row_lock; /* The temporary row lock set on this table. */
......@@ -507,11 +512,11 @@ void xt_check_tables(struct XTThread *self);
char *xt_tab_file_to_name(size_t size, char *tab_name, char *file_name);
void xt_create_table(struct XTThread *self, XTPathStrPtr name, XTDictionaryPtr dic);
XTTableHPtr xt_use_table(struct XTThread *self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, xtBool *opened);
XTTableHPtr xt_use_table(struct XTThread *self, XTPathStrPtr name, xtBool no_load, xtBool missing_ok);
void xt_sync_flush_table(struct XTThread *self, XTOpenTablePtr ot);
xtBool xt_flush_record_row(XTOpenTablePtr ot, off_t *bytes_flushed, xtBool have_table_loc);
void xt_flush_table(struct XTThread *self, XTOpenTablePtr ot);
XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, struct XTDatabase *db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic, xtBool *opened);
XTTableHPtr xt_use_table_no_lock(XTThreadPtr self, struct XTDatabase *db, XTPathStrPtr name, xtBool no_load, xtBool missing_ok, XTDictionaryPtr dic);
int xt_use_table_by_id(struct XTThread *self, XTTableHPtr *tab, struct XTDatabase *db, xtTableID tab_id);
XTOpenTablePtr xt_open_table(XTTableHPtr tab);
void xt_close_table(XTOpenTablePtr ot, xtBool flush, xtBool have_table_lock);
......
......@@ -96,7 +96,7 @@ xtPublic xtBool xt_init_logging(void)
{
int err;
log_file = stderr;
log_file = stdout;
log_level = XT_LOG_TRACE;
err = xt_p_mutex_init_with_autoname(&log_mutex, NULL);
if (err) {
......@@ -413,7 +413,8 @@ static void thr_save_error_va(XTExceptionPtr e, XTThreadPtr self, xtBool throw_i
vsnprintf(e->e_err_msg, XT_ERR_MSG_SIZE, fmt, ap);
/* Make the first character of the message upper case: */
if (isalpha(e->e_err_msg[0]) && islower(e->e_err_msg[0]))
/* This did not work for foreign languages! */
if (e->e_err_msg[0] >= 'a' && e->e_err_msg[0] <= 'z')
e->e_err_msg[0] = (char) toupper(e->e_err_msg[0]);
if (func && *func && *func != '-')
......@@ -793,11 +794,9 @@ xtPublic void xt_register_tabcolerr(c_char *func, c_char *file, u_int line, int
xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path);
xt_strcat(sizeof(buffer), buffer, ".");
xt_strcpy(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path));
xt_strcat(sizeof(buffer), buffer, ".");
xt_strcat(sizeof(buffer), buffer, item2);
xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path));
xt_register_ixterr(func, file, line, xt_err, buffer);
xt_register_i2xterr(func, file, line, xt_err, buffer, item2);
}
xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_err, XTPathStrPtr tab_item)
......@@ -806,7 +805,7 @@ xtPublic void xt_register_taberr(c_char *func, c_char *file, u_int line, int xt_
xt_2nd_last_name_of_path(sizeof(buffer), buffer, tab_item->ps_path);
xt_strcat(sizeof(buffer), buffer, ".");
xt_strcpy(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path));
xt_strcat(sizeof(buffer), buffer, xt_last_name_of_path(tab_item->ps_path));
xt_register_ixterr(func, file, line, xt_err, buffer);
}
......@@ -1013,7 +1012,7 @@ static xtBool thr_setup_signals(void)
typedef void *(*ThreadMainFunc)(XTThreadPtr self);
extern "C" void *thr_main_pbxt(void *data)
extern "C" void *xt_thread_main(void *data)
{
ThreadDataPtr td = (ThreadDataPtr) data;
XTThreadPtr self = td->td_thr;
......@@ -1168,6 +1167,14 @@ xtPublic XTThreadPtr xt_init_threading(u_int max_threads)
/* Align the number of threads: */
xt_thr_maximum_threads = xt_align_size(max_threads, XT_XS_LOCK_ALIGN);
#ifdef XT_TRACK_CONNECTIONS
if (xt_thr_maximum_threads > XT_TRACK_MAX_CONNS) {
xt_log_error(XT_NS_CONTEXT, XT_LOG_FATAL, XT_ERR_TOO_MANY_THREADS, 0,
"XT_TRACK_CONNECTIONS is enabled and xt_thr_maximum_threads > XT_TRACK_MAX_CONNS");
goto failed;
}
#endif
#ifdef HANDLE_SIGNALS
if (!thr_setup_signals())
return NULL;
......@@ -1503,10 +1510,10 @@ xtPublic pthread_t xt_run_thread(XTThreadPtr self, XTThreadPtr child, void *(*st
pthread_attr_t attr = { 0, 0, 0 };
attr.priority = THREAD_PRIORITY_NORMAL;
err = pthread_create(&child_thread, &attr, thr_main_pbxt, &data);
err = pthread_create(&child_thread, &attr, xt_thread_main, &data);
}
#else
err = pthread_create(&child_thread, NULL, thr_main_pbxt, &data);
err = pthread_create(&child_thread, NULL, xt_thread_main, &data);
#endif
if (err) {
xt_free_thread(child);
......
......@@ -132,6 +132,7 @@ struct XTSortedList;
struct XTXactLog;
struct XTXactData;
struct XTDatabase;
struct XTOpenTable;
typedef void (*XTThreadFreeFunc)(struct XTThread *self, void *data);
......@@ -307,8 +308,7 @@ typedef struct XTThread {
xtThreadID *st_thread_list;
/* Used to prevent a record from being updated twice in one statement. */
xtBool st_is_update; /* TRUE if this is an UPDATE statement. */
u_int st_update_id; /* The update statement ID. */
struct XTOpenTable *st_is_update; /* TRUE if this is an UPDATE statement. {UPDATE-STACK} */
XTRowLockListRec st_lock_list; /* The thread row lock list (drop locks on transaction end). */
XTStatisticsRec st_statistics; /* Accumulated statistics for this thread. */
......@@ -536,7 +536,10 @@ extern struct XTThread **xt_thr_array;
* Function prototypes
*/
extern "C" void *thr_main_pbxt(void *data);
/* OpenSolaris has thr_main in /usr/include/thread.h (name conflict)
* Thanks for the tip Monty!
*/
extern "C" void *xt_thread_main(void *data);
void xt_get_now(char *buffer, size_t len);
xtBool xt_init_logging(void);
......
......@@ -36,6 +36,7 @@
#ifdef DEBUG
//#define PRINT_TRACE
//#define RESET_AFTER_DUMP
//#define DUMP_TO_STDOUT
#endif
static xtBool trace_initialized = FALSE;
......@@ -109,10 +110,10 @@ xtPublic void xt_print_trace(void)
xt_lock_mutex_ns(&trace_mutex);
if (trace_log_end > trace_log_offset+1) {
trace_log_buffer[trace_log_end] = 0;
fprintf(stderr, "%s", trace_log_buffer + trace_log_offset + 1);
printf("%s", trace_log_buffer + trace_log_offset + 1);
}
trace_log_buffer[trace_log_offset] = 0;
fprintf(stderr, "%s", trace_log_buffer);
printf("%s", trace_log_buffer);
trace_log_offset = 0;
trace_log_end = 0;
xt_unlock_mutex_ns(&trace_mutex);
......@@ -121,9 +122,18 @@ xtPublic void xt_print_trace(void)
xtPublic void xt_dump_trace(void)
{
FILE *fp;
if (trace_log_offset) {
#ifdef DUMP_TO_STDOUT
if (trace_log_end > trace_log_offset+1) {
trace_log_buffer[trace_log_end] = 0;
printf("%s", trace_log_buffer + trace_log_offset + 1);
}
trace_log_buffer[trace_log_offset] = 0;
printf("%s", trace_log_buffer);
printf("\n");
#else
FILE *fp;
fp = fopen("pbxt.log", "w");
xt_lock_mutex_ns(&trace_mutex);
......@@ -136,6 +146,7 @@ xtPublic void xt_dump_trace(void)
fprintf(fp, "%s", trace_log_buffer);
fclose(fp);
}
#endif
#ifdef RESET_AFTER_DUMP
trace_log_offset = 0;
......@@ -379,9 +390,9 @@ xtPublic void xt_dump_conn_tracking(void)
ptr = conn_info;
for (int i=0; i<XT_TRACK_MAX_CONNS; i++) {
if (ptr->ci_curr_xact_id || ptr->ci_prev_xact_id) {
fprintf(stderr, "%3d curr=%d prev=%d prev-time=%ld\n", (int) ptr->cu_t_id, (int) ptr->ci_curr_xact_id, (int) ptr->ci_prev_xact_id, (long) ptr->ci_prev_xact_time);
printf("%3d curr=%d prev=%d prev-time=%ld\n", (int) ptr->cu_t_id, (int) ptr->ci_curr_xact_id, (int) ptr->ci_prev_xact_id, (long) ptr->ci_prev_xact_time);
if (i+1<XT_TRACK_MAX_CONNS) {
fprintf(stderr, " diff=%d\n", (int) (ptr+1)->ci_curr_xact_id - (int) ptr->ci_curr_xact_id);
printf(" diff=%d\n", (int) (ptr+1)->ci_curr_xact_id - (int) ptr->ci_curr_xact_id);
}
}
ptr++;
......
......@@ -51,7 +51,9 @@ void xt_ftracef(char *fmt, ...);
* CONNECTION TRACKING
*/
#ifdef DEBUG
#define XT_TRACK_CONNECTIONS
#endif
#ifdef XT_TRACK_CONNECTIONS
#define XT_TRACK_MAX_CONNS 500
......
......@@ -1326,7 +1326,7 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
}
/* Write and flush the transaction log: */
if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE)) {
if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
ok = FALSE;
status = XT_LOG_ENT_ABORT;
/* Make sure this is done, if we failed to log
......@@ -1440,16 +1440,23 @@ static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
/* Don't get too far ahead of the sweeper! */
if (writer) {
#ifdef XT_WAIT_FOR_CLEANUP
xtXactID wait_xn_id;
/* This is the transaction that was committed 3 transactions ago: */
wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
thread->st_prev_xact[thread->st_last_xact] = xn_id;
/* This works because XT_MAX_XACT_BEHIND == 2! */
ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
thread->st_last_xact ^= 1;
while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
xt_critical_wait();
if (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) {
/* Set a maximum wait time (1/100s) */
xtWord8 then = xt_trace_clock() + (xtWord8) 100000;
xtXactID wait_xn_id;
/* This is the transaction that was committed 3 transactions ago: */
wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
thread->st_prev_xact[thread->st_last_xact] = xn_id;
/* This works because XT_MAX_XACT_BEHIND == 2! */
ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
thread->st_last_xact ^= 1;
while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
if (xt_trace_clock() >= then)
break;
xt_critical_wait();
}
}
#else
if ((db->db_sw_faster & XT_SW_TOO_FAR_BEHIND) != 0) {
......@@ -1486,7 +1493,7 @@ xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
entry.xt_status_1 = XT_LOG_ENT_NEW_TAB;
entry.xt_checksum_1 = XT_CHECKSUM_1(tab_id);
XT_SET_DISK_4(entry.xt_tab_id_4, tab_id);
return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, TRUE);
return xt_xlog_log_data(self, sizeof(XTXactNewTabEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_WRITE_AND_FLUSH);
}
xtPublic int xt_xn_status(XTOpenTablePtr ot, xtXactID xn_id, xtRecordID XT_UNUSED(rec_id))
......@@ -2266,6 +2273,7 @@ static xtBool xn_sw_cleanup_variation(XTThreadPtr self, XNSweeperStatePtr ss, XT
xt_log_and_clear_exception(self);
break;
}
prev_rec_id = next_rec_id;
next_rec_id = XT_GET_DISK_4(prev_rec_head.tr_prev_rec_id_4);
}
......@@ -2461,7 +2469,7 @@ static xtBool xn_sw_cleanup_xact(XTThreadPtr self, XNSweeperStatePtr ss, XTXactD
cu.xc_checksum_1 = XT_CHECKSUM_1(XT_CHECKSUM4_XACT(xact->xd_start_xn_id));
XT_SET_DISK_4(cu.xc_xact_id_4, xact->xd_start_xn_id);
if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, FALSE))
if (!xt_xlog_log_data(self, sizeof(XTXactCleanupEntryDRec), (XTXactLogBufferDPtr) &cu, XT_XLOG_NO_WRITE_NO_FLUSH))
return FAILED;
ss->ss_flush_pending = TRUE;
......@@ -2656,7 +2664,9 @@ static void xn_sw_main(XTThreadPtr self)
* we flush the log.
*/
if (now >= idle_start + 2) {
if (!xt_xlog_flush_log(db, self))
/* Don't do this if flusher is active! */
if (!db->db_fl_thread &&
!xt_xlog_flush_log(db, self))
xt_throw(self);
ss->ss_flush_pending = FALSE;
}
......
......@@ -153,14 +153,14 @@ typedef struct XTXactData {
#define XT_XACT_INIT_LOCK(s, i) xt_spinxslock_init_with_autoname(s, i)
#define XT_XACT_FREE_LOCK(s, i) xt_spinxslock_free(s, i)
#define XT_XACT_READ_LOCK(i, s) xt_spinxslock_slock(i)
#define XT_XACT_WRITE_LOCK(i, s) xt_spinxslock_xlock(i, (s)->t_id)
#define XT_XACT_WRITE_LOCK(i, s) xt_spinxslock_xlock(i, FALSE, (s)->t_id)
#define XT_XACT_UNLOCK(i, s, b) xt_spinxslock_unlock(i, b)
#else
#define XT_XACT_LOCK_TYPE XTSkewRWLockRec
#define XT_XACT_INIT_LOCK(s, i) xt_skewrwlock_init_with_autoname(s, i)
#define XT_XACT_FREE_LOCK(s, i) xt_skewrwlock_free(s, i)
#define XT_XACT_READ_LOCK(i, s) xt_skewrwlock_slock(i)
#define XT_XACT_WRITE_LOCK(i, s) xt_skewrwlock_xlock(i, (s)->t_id)
#define XT_XACT_WRITE_LOCK(i, s) xt_skewrwlock_xlock(i, FALSE, (s)->t_id)
#define XT_XACT_UNLOCK(i, s, b) xt_skewrwlock_unlock(i, b)
#endif
......
......@@ -719,14 +719,14 @@ void XTDatabaseLog::xlog_exit(XTThreadPtr self)
}
}
#define WR_NO_SPACE 1
#define WR_FLUSH 2
#define WR_NO_SPACE 1 /* Write because there is no space, or some other reason */
#define WR_FLUSH 2 /* Normal commit, write and flush */
xtBool XTDatabaseLog::xlog_flush(XTThreadPtr thread)
{
if (!xlog_flush_pending())
return OK;
return xlog_append(thread, 0, NULL, 0, NULL, TRUE, NULL, NULL);
return xlog_append(thread, 0, NULL, 0, NULL, XT_XLOG_WRITE_AND_FLUSH, NULL, NULL);
}
xtBool XTDatabaseLog::xlog_flush_pending()
......@@ -754,7 +754,7 @@ xtBool XTDatabaseLog::xlog_flush_pending()
* This function returns the log ID and offset of
* the data write position.
*/
xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, xtBool commit, xtLogID *log_id, xtLogOffset *log_offset)
xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset)
{
int write_reason = 0;
xtLogID req_flush_log_id;
......@@ -763,19 +763,20 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xtWord8 flush_time;
xtWord2 sum;
/* The first size value must be set, of the second is set! */
ASSERT_NS(size1 || !size2);
if (!size1) {
/* Just flush the buffer... */
xt_lck_slock(&xl_buffer_lock);
write_reason = WR_FLUSH;
write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
req_flush_log_id = xl_append_log_id;
req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
xt_spinlock_unlock(&xl_buffer_lock);
goto write_log_to_file;
}
else {
req_flush_log_id = 0;
req_flush_log_offset = 0;
}
req_flush_log_id = 0;
req_flush_log_offset = 0;
/*
* This is a dirty read, which will send us to the
......@@ -866,11 +867,17 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
return OK;
}
}
else {
else if (size1) {
/* It may be that there is now space in the append buffer: */
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
goto copy_to_log_buffer;
}
else {
/* We are just writing the buffer! */
ASSERT_NS(write_reason == WR_NO_SPACE);
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
return OK;
}
if (xt_trace_clock() >= then) {
xt_lock_mutex_ns(&xl_write_lock);
......@@ -922,12 +929,18 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
ASSERT_NS(xt_comp_log_pos(xl_write_log_id, xl_write_log_offset, xl_append_log_id, xl_append_log_offset) <= 0);
return OK;
}
goto write_log_to_file;
}
/* It may be that there is now space in the append buffer: */
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
goto copy_to_log_buffer;
else if (size1) {
/* It may be that there is now space in the append buffer: */
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers)
goto copy_to_log_buffer;
}
else {
/* We are just writing the buffer! */
ASSERT_NS(write_reason == WR_NO_SPACE);
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0)
return OK;
}
goto write_log_to_file;
}
......@@ -952,7 +965,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
return OK;
}
/* Not flushed, but what about written? */
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0)) <= 0) {
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
/* The write position is after or equal to the required flush
* position. This means that all we have to do is flush
* to satisfy the writers condition.
......@@ -960,7 +973,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xtBool ok = TRUE;
if (xl_log_id != xl_write_log_id)
ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0), thread);
ok = xlog_open_log(xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start), thread);
if (ok) {
if (xl_db->db_co_busy) {
......@@ -979,7 +992,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xt_lock_mutex_ns(&xl_db->db_wr_lock);
xl_flush_log_id = xl_write_log_id;
xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : 0);
xl_flush_log_offset = xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start);
/*
* We have written data to the log, wake the writer to commit
* the data to the database.
......@@ -1000,8 +1013,11 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
return ok;
}
}
else {
/* If there is space in the buffer, then we can go on
else if (size1) {
/* If the amounf of data to be written is 0, then we are just required
* to write the transaction buffer.
*
* If there is space in the buffer, then we can go on
* to copy our data into the buffer:
*/
if (xl_append_buf_pos + size1 + size2 <= xl_size_of_buffers) {
......@@ -1016,6 +1032,21 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
goto copy_to_log_buffer;
}
}
else {
/* We are just writing the buffer! */
ASSERT_NS(write_reason == WR_NO_SPACE);
if (xt_comp_log_pos(req_flush_log_id, req_flush_log_offset, xl_write_log_id, xl_write_log_offset + (xl_write_done ? xl_write_buf_pos : xl_write_buf_pos_start)) <= 0) {
#ifdef XT_XLOG_WAIT_SPINS
xt_writing = 0;
if (xt_waiting)
xt_cond_wakeall(&xl_write_cond);
#else
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
#endif
return OK;
}
}
rewrite:
/* If the current write buffer has been written, then
......@@ -1109,7 +1140,8 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
part_size = 512 - part_size;
xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG;
#ifdef HAVE_valgrind
memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size);
if (part_size > 1)
memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size - 1);
#endif
if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
goto write_failed;
......@@ -1197,9 +1229,13 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
xt_writing = FALSE;
xt_cond_wakeall(&xl_write_cond);
#endif
if (size1 == 0)
return OK;
}
copy_to_log_buffer:
ASSERT_NS(size1);
xt_spinlock_lock(&xl_buffer_lock);
/* Now we have to check again. The check above was a dirty read!
*/
......@@ -1291,11 +1327,14 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
if (log_offset)
*log_offset = xl_append_log_offset + xl_append_buf_pos;
xl_append_buf_pos += size1 + size2;
if (commit) {
write_reason = WR_FLUSH;
if (flush_log_at_trx_commit != XT_XLOG_NO_WRITE_NO_FLUSH) {
write_reason = flush_log_at_trx_commit == XT_XLOG_WRITE_AND_FLUSH ? WR_FLUSH : WR_NO_SPACE;
req_flush_log_id = xl_append_log_id;
req_flush_log_offset = xl_append_log_offset + xl_append_buf_pos;
xt_spinlock_unlock(&xl_buffer_lock);
/* We have written the data already! */
size1 = 0;
size2 = 0;
goto write_log_to_file;
}
......@@ -1485,9 +1524,9 @@ xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread)
return db->db_xlog.xlog_flush(thread);
}
xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, xtBool commit)
xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit)
{
return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, commit, NULL, NULL);
return thread->st_database->db_xlog.xlog_append(thread, size, (xtWord1 *) log_entry, 0, NULL, flush_log_at_trx_commit, NULL, NULL);
}
/* Allocate a record from the free list. */
......@@ -1498,7 +1537,7 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o
xtWord4 sum = 0;
int check_size = 1;
XTXactDataPtr xact = NULL;
xtBool commit = FALSE;
int flush_log_at_trx_commit = XT_XLOG_NO_WRITE_NO_FLUSH;
switch (status) {
case XT_LOG_ENT_REC_MODIFIED:
......@@ -1613,7 +1652,7 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o
XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq);
log_entry.xp.xp_xa_len_1 = (xtWord1) size;
len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
commit = TRUE;
flush_log_at_trx_commit = xt_db_flush_log_at_trx_commit;
break;
default:
ASSERT_NS(FALSE);
......@@ -1652,9 +1691,9 @@ xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo o
xt_print_log_record(0, 0, &log_entry);
#endif
if (xact)
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, &xact->xd_begin_log, &xact->xd_begin_offset);
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, &xact->xd_begin_log, &xact->xd_begin_offset);
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, NULL, NULL);
return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, flush_log_at_trx_commit, NULL, NULL);
}
/*
......
......@@ -75,6 +75,10 @@ struct XTDatabase;
#define XT_DELETE_LOGS 1
#define XT_KEEP_LOGS 2
#define XT_XLOG_NO_WRITE_NO_FLUSH 0
#define XT_XLOG_WRITE_AND_FLUSH 1
#define XT_XLOG_WRITE_AND_NO_FLUSH 2
/* LOG CACHE ---------------------------------------------------- */
typedef struct XTXLogBlock {
......@@ -443,7 +447,7 @@ typedef struct XTDatabaseLog {
void xlog_name(size_t size, char *path, xtLogID log_id);
int xlog_delete_log(xtLogID del_log_id, struct XTThread *thread);
xtBool xlog_append(struct XTThread *thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, xtBool commit, xtLogID *log_id, xtLogOffset *log_offset);
xtBool xlog_append(struct XTThread *thread, size_t size1, xtWord1 *data1, size_t size2, xtWord1 *data2, int flush_log_at_trx_commit, xtLogID *log_id, xtLogOffset *log_offset);
xtBool xlog_flush(struct XTThread *thread);
xtBool xlog_flush_pending();
......@@ -464,7 +468,7 @@ typedef struct XTDatabaseLog {
} XTDatabaseLogRec, *XTDatabaseLogPtr;
xtBool xt_xlog_flush_log(struct XTDatabase *db, struct XTThread *thread);
xtBool xt_xlog_log_data(struct XTThread *thread, size_t len, XTXactLogBufferDPtr log_entry, xtBool commit);
xtBool xt_xlog_log_data(struct XTThread *thread, size_t len, XTXactLogBufferDPtr log_entry, int flush_log_at_trx_commit);
xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtRecordID free_list, xtRecordID address, size_t size, xtWord1 *data, struct XTThread *thread);
void xt_xlog_init(struct XTThread *self, size_t cache_size);
......
......@@ -221,7 +221,7 @@ typedef struct XTPathStr {
*/
#ifdef DEBUG
//#define XT_USE_GLOBAL_DEBUG_SIZES
#define XT_USE_GLOBAL_DEBUG_SIZES
#endif
/*
......@@ -392,6 +392,11 @@ typedef struct XTPathStr {
//#define XT_NO_ATOMICS
#endif
/* When pbxt_flush_log_at_trx_commit != 1, the transaction log is flushed
* at regular intervals. Set the interval here.
*/
#define XT_XLOG_FLUSH_FREQ 1000
/* ----------------------------------------------------------------------
* GLOBAL CONSTANTS
*/
......@@ -457,21 +462,24 @@ typedef struct XTPathStr {
#ifdef XT_USE_GLOBAL_DEBUG_SIZES
//#undef XT_ROW_RWLOCKS
//#define XT_ROW_RWLOCKS 2
//#define XT_ROW_RWLOCKS 2
//#undef XT_TAB_MIN_VAR_REC_LENGTH
//#define XT_TAB_MIN_VAR_REC_LENGTH 20
//#define XT_TAB_MIN_VAR_REC_LENGTH 20
//#undef XT_ROW_LOCK_COUNT
//#define XT_ROW_LOCK_COUNT (XT_ROW_RWLOCKS * 2)
//#define XT_ROW_LOCK_COUNT (XT_ROW_RWLOCKS * 2)
//#undef XT_INDEX_PAGE_SHIFTS
//#define XT_INDEX_PAGE_SHIFTS 8 // 256
//#define XT_INDEX_PAGE_SHIFTS 8 // 256
//#undef XT_BLOCK_SIZE_FOR_DIRECT_IO
//#define XT_BLOCK_SIZE_FOR_DIRECT_IO 256
//#define XT_BLOCK_SIZE_FOR_DIRECT_IO 256
//#undef XT_INDEX_WRITE_BUFFER_SIZE
//#define XT_INDEX_WRITE_BUFFER_SIZE (40 * 1024)
//#define XT_INDEX_WRITE_BUFFER_SIZE (40 * 1024)
//#undef XT_XLOG_FLUSH_FREQ
//#define XT_XLOG_FLUSH_FREQ (30 * 1000)
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment