Commit 38afbaa4 authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-ndb-merge
parents cede8318 b05d1a3a
......@@ -1811,7 +1811,7 @@ sub fix_image
{
my($text) = @_;
my($arg1, $ext);
$text =~ /^([^,]*)$/;
$text =~ /^([^,]*)/;
die "error in image: '$text'" unless defined($1);
$arg1 = $1;
$arg1 =~ s/@@/@/g;
......
......@@ -365,6 +365,11 @@ uint my_instr_mb(struct charset_info_st *,
const char *s, uint s_length,
my_match_t *match, uint nmatch);
int my_wildcmp_unicode(CHARSET_INFO *cs,
const char *str, const char *str_end,
const char *wildstr, const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights);
extern my_bool my_parse_charset_xml(const char *bug, uint len,
int (*add)(CHARSET_INFO *cs));
......
......@@ -25,7 +25,6 @@ extern "C" {
MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
my_bool default_value, uint server_capabilities);
void free_rows(MYSQL_DATA *cur);
my_bool mysql_autenticate(MYSQL *mysql, const char *passwd);
void free_old_query(MYSQL *mysql);
void end_server(MYSQL *mysql);
my_bool mysql_reconnect(MYSQL *mysql);
......
......@@ -2244,7 +2244,7 @@ dict_foreign_add_to_cache(
Scans from pointer onwards. Stops if is at the start of a copy of
'string' where characters are compared without case sensitivity. Stops
also at '\0'. */
static
const char*
dict_scan_to(
/*=========*/
......
......@@ -891,6 +891,18 @@ dict_tables_have_same_db(
const char* name2); /* in: table name in the form
dbname '/' tablename */
/*************************************************************************
Scans from pointer onwards. Stops if is at the start of a copy of
'string' where characters are compared without case sensitivity. Stops
also at '\0'. */
const char*
dict_scan_to(
/*=========*/
/* out: scanned up to this */
const char* ptr, /* in: scan from */
const char* string);/* in: look for this */
/* Buffers for storing detailed information about the latest foreign key
and unique key errors */
extern FILE* dict_foreign_err_file;
......
......@@ -50,6 +50,15 @@ innobase_invalidate_query_cache(
ulint full_name_len); /* in: full name length where also the null
chars count */
/**********************************************************************
This function returns true if SQL-query in the current thread
is either REPLACE or LOAD DATA INFILE REPLACE.
NOTE that /mysql/innobase/row/row0ins.c must contain the
prototype for this function ! */
ibool
innobase_query_is_replace(void);
/*===========================*/
/*************************************************************************
Creates an insert node struct. */
......@@ -1482,8 +1491,8 @@ row_ins_scan_sec_index_for_duplicate(
ulint err = DB_SUCCESS;
ibool moved;
mtr_t mtr;
trx_t *trx;
ibool success;
trx_t* trx;
const char* ptr;
n_unique = dict_index_get_n_unique(index);
......@@ -1523,9 +1532,8 @@ row_ins_scan_sec_index_for_duplicate(
trx = thr_get_trx(thr);
ut_ad(trx);
dict_accept(*trx->mysql_query_str, "REPLACE", &success);
if (success) {
if (innobase_query_is_replace()) {
/* The manual defines the REPLACE semantics that it
is either an INSERT or DELETE(s) for duplicate key
......@@ -1605,7 +1613,7 @@ row_ins_duplicate_error_in_clust(
page_t* page;
ulint n_unique;
trx_t* trx = thr_get_trx(thr);
ibool success;
const char* ptr;
UT_NOT_USED(mtr);
......@@ -1639,10 +1647,7 @@ row_ins_duplicate_error_in_clust(
sure that in roll-forward we get the same duplicate
errors as in original execution */
dict_accept(*trx->mysql_query_str, "REPLACE",
&success);
if (success) {
if (innobase_query_is_replace()) {
/* The manual defines the REPLACE semantics
that it is either an INSERT or DELETE(s)
......@@ -1683,15 +1688,9 @@ row_ins_duplicate_error_in_clust(
/* The manual defines the REPLACE semantics that it
is either an INSERT or DELETE(s) for duplicate key
+ INSERT. Therefore, we should take X-lock for
duplicates.
*/
/* Is the first word in MySQL query REPLACE ? */
dict_accept(*trx->mysql_query_str, "REPLACE",
&success);
duplicates. */
if (success) {
if (innobase_query_is_replace()) {
err = row_ins_set_exclusive_rec_lock(
LOCK_REC_NOT_GAP,
......
......@@ -2794,7 +2794,7 @@ row_search_for_mysql(
rec_t* index_rec;
rec_t* clust_rec;
rec_t* old_vers;
ulint err;
ulint err = DB_SUCCESS;
ibool moved;
ibool cons_read_requires_clust_rec;
ibool was_lock_wait;
......@@ -3206,16 +3206,10 @@ row_search_for_mysql(
/* Try to place a lock on the index record */
/* If innodb_locks_unsafe_for_binlog option is used,
we lock only the record, i.e. next-key locking is
not used.
*/
if ( srv_locks_unsafe_for_binlog )
{
err = sel_set_rec_lock(rec, index,
prebuilt->select_lock_type,
LOCK_REC_NOT_GAP, thr);
}
else
we do not lock gaps. Supremum record is really
a gap and therefore we do not set locks there. */
if ( srv_locks_unsafe_for_binlog == FALSE )
{
err = sel_set_rec_lock(rec, index,
prebuilt->select_lock_type,
......
......@@ -237,7 +237,7 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
char* res_buf_end=res_buf+res_buf_size;
char* net_buf=(char*) con->net.read_pos, *net_buf_end;
int res_buf_shift=RES_BUF_SHIFT;
uint num_bytes;
ulong num_bytes;
if (res_buf_size<RES_BUF_SHIFT)
{
......
......@@ -63,6 +63,15 @@ select 'A' like 'a' collate utf8_bin;
select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%');
_utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%')
1
select convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8);
convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8)
1
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8)
1
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8)
1
SELECT 'a' = 'a ';
'a' = 'a '
1
......
use test;
drop table if exists test_select;
Warnings:
Note 1051 Unknown table 'test_select'
CREATE TABLE test_select(session_id char(9) NOT NULL);
INSERT INTO test_select VALUES ("abc");
SELECT * FROM test_select;
drop table if exists t1, t2;
CREATE TABLE t1(session_id char(9) NOT NULL);
INSERT INTO t1 VALUES ("abc");
SELECT * FROM t1;
session_id
abc
prepare st_1180 from 'SELECT * FROM test_select WHERE ?="1111" and session_id = "abc"';
prepare st_1180 from 'SELECT * FROM t1 WHERE ?="1111" and session_id = "abc"';
set @arg1= 'abc';
execute st_1180 using @arg1;
session_id
......@@ -18,4 +15,104 @@ abc
set @arg1= 'abc';
execute st_1180 using @arg1;
session_id
drop table test_select;
drop table t1;
create table t1 (
c_01 char(6), c_02 integer, c_03 real, c_04 int(3), c_05 varchar(20),
c_06 date, c_07 char(1), c_08 real, c_09 int(11), c_10 time,
c_11 char(6), c_12 integer, c_13 real, c_14 int(3), c_15 varchar(20),
c_16 date, c_17 char(1), c_18 real, c_19 int(11), c_20 text);
prepare st_1644 from 'insert into t1 values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)';
set @arg01= 'row_1';
set @arg02= 1;
set @arg03= 1.1;
set @arg04= 111;
set @arg05= 'row_one';
set @arg06= '2004-10-12';
set @arg07= '1';
set @arg08= 1.1;
set @arg09= '100100100';
set @arg10= '12:34:56';
set @arg11= 'row_1';
set @arg12= 1;
set @arg13= 1.1;
set @arg14= 111;
set @arg15= 'row_one';
set @arg16= '2004-10-12';
set @arg17= '1';
set @arg18= 1.1;
set @arg19= '100100100';
set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= NULL;
set @arg02= NULL;
set @arg03= NULL;
set @arg04= NULL;
set @arg05= NULL;
set @arg06= NULL;
set @arg07= NULL;
set @arg08= NULL;
set @arg09= NULL;
set @arg10= NULL;
set @arg11= NULL;
set @arg12= NULL;
set @arg13= NULL;
set @arg14= NULL;
set @arg15= NULL;
set @arg16= NULL;
set @arg17= NULL;
set @arg18= NULL;
set @arg19= NULL;
set @arg20= NULL;
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= 'row_3';
set @arg02= 3;
set @arg03= 3.3;
set @arg04= 333;
set @arg05= 'row_three';
set @arg06= '2004-10-12';
set @arg07= '3';
set @arg08= 3.3;
set @arg09= '300300300';
set @arg10= '12:34:56';
set @arg11= 'row_3';
set @arg12= 3;
set @arg13= 3.3;
set @arg14= 333;
set @arg15= 'row_three';
set @arg16= '2004-10-12';
set @arg17= '3';
set @arg18= 3.3;
set @arg19= '300300300';
set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
select * from t1;
c_01 c_02 c_03 c_04 c_05 c_06 c_07 c_08 c_09 c_10 c_11 c_12 c_13 c_14 c_15 c_16 c_17 c_18 c_19 c_20
row_1 1 1.1 111 row_one 2004-10-12 1 1.1 100100100 12:34:56 row_1 1 1.1 111 row_one 2004-10-12 1 1.1 100100100 12:34:56
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
row_3 3 3.3 333 row_three 2004-10-12 3 3.3 300300300 12:34:56 row_3 3 3.3 333 row_three 2004-10-12 3 3.3 300300300 12:34:56
drop table t1;
create table t1(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(12) not null,
cold varchar(2) not null,
primary key (cola, colb, cold));
create table t2(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(2) not null,
cold float,
primary key (cold));
insert into t1 values ('aaaa', 'yyyy', 'yyyy-dd-mm', 'R');
insert into t2 values ('aaaa', 'yyyy', 'R', 203), ('bbbb', 'zzzz', 'C', 201);
prepare st_1676 from 'select a.cola, a.colb, a.cold from t1 a, t2 b where a.cola = ? and a.colb = ? and a.cold = ? and b.cola = a.cola and b.colb = a.colb and b.colc = a.cold';
set @arg0= "aaaa";
set @arg1= "yyyy";
set @arg2= "R";
execute st_1676 using @arg0, @arg1, @arg2;
cola colb cold
aaaa yyyy R
drop table t1, t2;
......@@ -33,6 +33,14 @@ select 'A' like 'a';
select 'A' like 'a' collate utf8_bin;
select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%');
# Bug #6040: can't retrieve records with umlaut
# characters in case insensitive manner.
# Case insensitive search LIKE comparison
# was broken for multibyte characters:
select convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8);
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
#
# Check the following:
# "a" == "a "
......@@ -568,6 +576,7 @@ DROP TABLE t1;
#
# Bug #5723: length(<varchar utf8 field>) returns varying results
#
--disable_warnings
SET NAMES utf8;
--disable_warnings
CREATE TABLE t1 (
......
......@@ -3,19 +3,25 @@
# Prepared Statements #
# re-testing bug DB entries #
# #
# The bugs are reported as "closed". #
# Command sequences taken from bug report. #
# No other test contains the bug# as comment. #
# #
# Tests drop/create tables 't1', 't2', ... #
# #
###############################################
use test;
--disable_warnings
drop table if exists t1, t2;
--enable_warnings
# bug#1180: optimized away part of WHERE clause cause incorect prepared satatement results
drop table if exists test_select;
CREATE TABLE t1(session_id char(9) NOT NULL);
INSERT INTO t1 VALUES ("abc");
SELECT * FROM t1;
CREATE TABLE test_select(session_id char(9) NOT NULL);
INSERT INTO test_select VALUES ("abc");
SELECT * FROM test_select;
prepare st_1180 from 'SELECT * FROM test_select WHERE ?="1111" and session_id = "abc"';
prepare st_1180 from 'SELECT * FROM t1 WHERE ?="1111" and session_id = "abc"';
# Must not find a row
set @arg1= 'abc';
......@@ -29,4 +35,97 @@ execute st_1180 using @arg1;
set @arg1= 'abc';
execute st_1180 using @arg1;
drop table test_select;
drop table t1;
# end of bug#1180
# bug#1644: Insertion of more than 3 NULL columns with parameter binding fails
# Using prepared statements, insertion of more than three columns with NULL
# values fails to insert additional NULLS. After the third column NULLS will
# be inserted into the database as zeros.
# First insert four columns of a value (i.e. 22) to verify binding is working
# correctly. Then Bind to each columns bind parameter an is_null value of 1.
# Then insert four more columns of integers, just for sanity.
# A subsequent select on the server will result in this:
# mysql> select * from foo_dfr;
# +------+------+------+------+
# | col1 | col2 | col3 | col4 |
# +------+------+------+------+
# | 22 | 22 | 22 | 22 |
# | NULL | NULL | NULL | 0 |
# | 88 | 88 | 88 | 88 |
# +------+------+------+------+
# Test is extended to more columns - code stores bit vector in bytes.
create table t1 (
c_01 char(6), c_02 integer, c_03 real, c_04 int(3), c_05 varchar(20),
c_06 date, c_07 char(1), c_08 real, c_09 int(11), c_10 time,
c_11 char(6), c_12 integer, c_13 real, c_14 int(3), c_15 varchar(20),
c_16 date, c_17 char(1), c_18 real, c_19 int(11), c_20 text);
# Do not use "timestamp" type, because it has a non-NULL default as of 4.1.2
prepare st_1644 from 'insert into t1 values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)';
set @arg01= 'row_1'; set @arg02= 1; set @arg03= 1.1; set @arg04= 111; set @arg05= 'row_one';
set @arg06= '2004-10-12'; set @arg07= '1'; set @arg08= 1.1; set @arg09= '100100100'; set @arg10= '12:34:56';
set @arg11= 'row_1'; set @arg12= 1; set @arg13= 1.1; set @arg14= 111; set @arg15= 'row_one';
set @arg16= '2004-10-12'; set @arg17= '1'; set @arg18= 1.1; set @arg19= '100100100'; set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= NULL; set @arg02= NULL; set @arg03= NULL; set @arg04= NULL; set @arg05= NULL;
set @arg06= NULL; set @arg07= NULL; set @arg08= NULL; set @arg09= NULL; set @arg10= NULL;
set @arg11= NULL; set @arg12= NULL; set @arg13= NULL; set @arg14= NULL; set @arg15= NULL;
set @arg16= NULL; set @arg17= NULL; set @arg18= NULL; set @arg19= NULL; set @arg20= NULL;
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= 'row_3'; set @arg02= 3; set @arg03= 3.3; set @arg04= 333; set @arg05= 'row_three';
set @arg06= '2004-10-12'; set @arg07= '3'; set @arg08= 3.3; set @arg09= '300300300'; set @arg10= '12:34:56';
set @arg11= 'row_3'; set @arg12= 3; set @arg13= 3.3; set @arg14= 333; set @arg15= 'row_three';
set @arg16= '2004-10-12'; set @arg17= '3'; set @arg18= 3.3; set @arg19= '300300300'; set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
select * from t1;
drop table t1;
# end of bug#1644
# bug#1677: Prepared statement two-table join returns no rows when one is expected
create table t1(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(12) not null,
cold varchar(2) not null,
primary key (cola, colb, cold));
create table t2(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(2) not null,
cold float,
primary key (cold));
insert into t1 values ('aaaa', 'yyyy', 'yyyy-dd-mm', 'R');
insert into t2 values ('aaaa', 'yyyy', 'R', 203), ('bbbb', 'zzzz', 'C', 201);
prepare st_1676 from 'select a.cola, a.colb, a.cold from t1 a, t2 b where a.cola = ? and a.colb = ? and a.cold = ? and b.cola = a.cola and b.colb = a.colb and b.colc = a.cold';
set @arg0= "aaaa";
set @arg1= "yyyy";
set @arg2= "R";
execute st_1676 using @arg0, @arg1, @arg2;
drop table t1, t2;
# end of bug#1676
......@@ -110,7 +110,7 @@
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
#define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy
/*
* Blobs.
......
......@@ -32,7 +32,6 @@
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp>
......@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs;
bool m_storeNullKey;
TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert
TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
......@@ -582,17 +581,24 @@ private:
* DbtuxNode.cpp
*/
int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node);
void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(NodeHandle& node);
void deleteNode(NodeHandle& node);
void setNodePref(NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
......@@ -600,10 +606,21 @@ private:
/*
* DbtuxTree.cpp
*/
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
// add entry
void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry
void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate
void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
......@@ -615,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent);
void scanFirst(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
......@@ -626,9 +643,9 @@ private:
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
......@@ -652,7 +669,7 @@ private:
PrintPar();
};
void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
......@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, ".");
par.m_side = 2;
par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par);
printNode(frag, out, tree.m_root, par);
out.m_out->flush();
if (! par.m_ok) {
if (debugFile == 0) {
......@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
}
void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{
if (loc == NullTupLoc) {
par.m_depth = 0;
......@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
......@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]);
printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
......@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
// check missed half-leaf/leaf merge
#ifdef dbtux_totally_groks_t_trees
// check missed semi-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
// our semi-leaf seems to satify interior minOccup condition
node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
#endif
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
......
......@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
......@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
/*
* At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail.
* At most one new node is inserted in the operation. Pre-allocate
* it so that the operation cannot fail.
*/
if (frag.m_freeLoc == NullTupLoc) {
jam();
......@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam();
break;
}
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
treeAdd(signal, frag, treePos, ent);
treeAdd(frag, treePos, ent);
break;
case TuxMaintReq::OpRemove:
jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
......@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
break;
}
treeRemove(signal, frag, treePos);
treeRemove(frag, treePos);
break;
default:
ndbrequire(false);
......
......@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE;
#ifdef dbtux_min_occup_less_max_occup
const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
#else
const unsigned maxSlack = 0;
#endif
// size up to and including first 2 entries
const unsigned pref = tree.getSize(AccPref);
if (! (pref <= tree.m_nodeSize)) {
......
......@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
......@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
}
/*
* Set handle to point to new node. Uses the pre-allocated node.
* Set handle to point to new node. Uses a pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, NodeHandle& node)
Dbtux::insertNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc);
// unlink from freelist
selectNode(node, frag.m_freeLoc);
frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
......@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
}
/*
* Delete existing node.
* Delete existing node. Simply put it on the freelist.
*/
void
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
Dbtux::deleteNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc;
Uint32 pageId = loc.getPageId();
Uint32 pageOffset = loc.getPageOffset();
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
// invalidate the handle
node.m_loc = NullTupLoc;
node.m_node = 0;
}
......@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
Dbtux::setNodePref(Signal* signal, NodeHandle& node)
Dbtux::setNodePref(NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
......@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v
* A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/
void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans
// fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(node);
}
void
Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
......@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^
* A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/
void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
if (node.getNodeScan() != RNIL) {
// remove or move scans at this position
if (scanList == 0)
moveScanList(node, pos);
else
removeScanList(node, pos, *scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopDownScans(node, pos);
}
scanPtr.i = nextPtrI;
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
// fix other scans
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(node);
}
void
Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
......@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^
* A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/
void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
if (node.getNodeScan() != RNIL) {
// remove scans at 0
removeScanList(node, 0, scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePushDownScans(node, pos);
}
scanPtr.i = nextPtrI;
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
// fix other scans
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
......@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
......@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^
* A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/
void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
moveScanList(node, pos);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopUpScans(node, pos);
}
scanPtr.i = nextPtrI;
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
// fix other scans
tmpList[0] = newMin;
entList[0] = entList[occup];
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
......@@ -367,39 +381,121 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} while (scanPtr.i != RNIL);
}
/*
* Move all possible entries from another node before the min (i=0) or
* after the max (i=1). XXX can be optimized
* Move number of entries from another node to this node before the min
* (i=0) or after the max (i=1). Expensive but not often used.
*/
void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{
Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1);
while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
while (cnt != 0) {
TreeEnt ent;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
Uint32 scanList = RNIL;
nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
}
}
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = scanList;
do {
jam();
c_scanOpPool.getPtr(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To pos=" << pos << " " << node << endl;
}
#endif
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
scanPtr.p->m_nodeScan = RNIL;
linkScan(node, scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
// set position but leave direction alone
scanPos.m_loc = node.m_loc;
scanPos.m_pos = pos;
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "Fron pos=" << pos << " " << node << endl;
}
#endif
unlinkScan(node, scanPtr);
scanPtr.p->m_nodeScan = scanList;
scanList = scanPtr.i;
// unset position but leave direction alone
scanPos.m_loc = NullTupLoc;
scanPos.m_pos = ZNIL;
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
......
......@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
......@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) {
jam();
// search is done only once in single range scan
scanFirst(signal, scanPtr);
scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
......@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) {
jam();
// look for next
scanNext(signal, scanPtr);
scanNext(scanPtr);
}
// for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer;
......@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext.
*/
void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
}
// search for scan start position
TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
......@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc);
selectNode(node, treePos.m_loc);
linkScan(node, scanPtr);
}
/*
* Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly
* we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
*
......@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/
void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanNext(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
}
#endif
if (scan.m_state == ScanOp::Locked) {
jam();
// version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert(false);
#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// cannot be moved away from tuple we have locked
ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
......@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc);
selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandle node = origNode;
......@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, node, pos.m_loc);
selectNode(node, pos.m_loc);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
......@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// can we see it
if (! scanVisible(signal, scanPtr, ent)) {
if (! scanVisible(scanPtr, ent)) {
jam();
continue;
}
......@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
......@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet.
*/
bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{
const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
......@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* is a semi-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
......@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
......@@ -18,71 +18,105 @@
#include "Dbtux.hpp"
/*
* Add entry.
* Add entry. Handle the case when there is room for one more. This
* is the common case given slack in nodes.
*/
void
Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
// check for empty tree
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, node);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
selectNode(signal, node, treePos.m_loc);
// check if it is bounding node
if (pos != 0 && pos != node.getOccup()) {
if (treePos.m_loc != NullTupLoc) {
// non-empty tree
jam();
// check if room for one more
selectNode(node, treePos.m_loc);
unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) {
// node has room
jam();
nodePushUp(signal, node, pos, ent);
nodePushUp(node, pos, ent, RNIL);
return;
}
// returns min entry
nodePushDown(signal, node, pos - 1, ent);
// find position to add the removed min entry
TupLoc childLoc = node.getLink(0);
if (childLoc == NullTupLoc) {
jam();
// left child will be added
pos = 0;
} else {
jam();
// find glb node
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
treeAddFull(frag, node, pos, ent);
return;
}
pos = node.getOccup();
jam();
insertNode(node);
nodePushUp(node, 0, ent, RNIL);
node.setSide(2);
tree.m_root = node.m_loc;
}
/*
* Add entry when node is full. Handle the case when there is g.l.b
* node in left subtree with room for one more. It will receive the min
* entry of this node. The min entry could be the entry to add.
*/
void
Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
TupLoc loc = lubNode.getLink(0);
if (loc != NullTupLoc) {
// find g.l.b node
NodeHandle glbNode(frag);
do {
jam();
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room
jam();
Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
// fall thru to next case
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
return;
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, node, pos, ent);
treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
return;
}
// add a new node
NodeHandle childNode(frag);
insertNode(signal, childNode);
nodePushUp(signal, childNode, 0, ent);
treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
}
/*
* Add entry when there is no g.l.b node in left subtree or the g.l.b
* node is full. We must add a new left or right child node which
* becomes the new g.l.b node.
*/
void
Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
{
NodeHandle glbNode(frag);
insertNode(glbNode);
// connect parent and child
node.setLink(i, childNode.m_loc);
childNode.setLink(2, node.m_loc);
childNode.setSide(i);
// re-balance tree at each node
parentNode.setLink(i, glbNode.m_loc);
glbNode.setLink(2, parentNode.m_loc);
glbNode.setSide(i);
Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, 0, ent, scanList);
// re-balance the tree
treeAddRebalance(frag, parentNode, i);
}
/*
* Re-balance tree after adding a node. The process starts with the
* parent of the added node.
*/
void
Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) {
// height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1);
......@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased
jam();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i));
selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, node, i);
treeRotateSingle(frag, node, i);
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, node, i);
treeRotateDouble(frag, node, i);
} else {
// height of subtree increased so it cannot be perfectly balanced
ndbrequire(false);
......@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break;
}
i = node.getSide();
selectNode(signal, node, parentLoc);
selectNode(node, parentLoc);
}
}
/*
* Remove entry.
* Remove entry. Optimize for nodes with slack. Handle the case when
* there is no underflow i.e. occupancy remains at least minOccup. For
* interior nodes this is a requirement. For others it means that we do
* not need to consider merge of semi-leaf and leaf.
*/
void
Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
Dbtux::treeRemove(Frag& frag, TreePos treePos)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc);
selectNode(node, treePos.m_loc);
TreeEnt ent;
// check interior node first
if (node.getChilds() == 2) {
jam();
ndbrequire(node.getOccup() >= tree.m_minOccup);
// check if no underflow
if (node.getOccup() > tree.m_minOccup) {
// no underflow in any node type
jam();
nodePopDown(signal, node, pos, ent);
nodePopDown(node, pos, ent, 0);
return;
}
// save current handle
NodeHandle parentNode = node;
// find glb node
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// use glb max as new parent min
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = node.getOccup() - 1;
// fall thru to next case
}
// remove the element
nodePopDown(signal, node, pos, ent);
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
unsigned i;
for (i = 0; i <= 1; i++) {
jam();
TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) {
// move to child
selectNode(signal, node, childLoc);
// balance of half-leaf parent requires child to be leaf
break;
}
if (node.getChilds() == 2) {
// underflow in interior node
jam();
treeRemoveInner(frag, node, pos);
return;
}
ndbrequire(node.getChilds() == 0);
// get parent if any
TupLoc parentLoc = node.getLink(2);
NodeHandle parentNode(frag);
i = node.getSide();
// move all that fits into parent
if (parentLoc != NullTupLoc) {
// remove entry in semi/leaf
nodePopDown(node, pos, ent, 0);
if (node.getLink(0) != NullTupLoc) {
jam();
selectNode(signal, parentNode, node.getLink(2));
nodeSlide(signal, parentNode, node, i);
// fall thru to next case
treeRemoveSemi(frag, node, 0);
return;
}
// non-empty leaf
if (node.getOccup() >= 1) {
if (node.getLink(1) != NullTupLoc) {
jam();
treeRemoveSemi(frag, node, 1);
return;
}
treeRemoveLeaf(frag, node);
}
/*
* Remove entry when interior node underflows. There is g.l.b node in
* left subtree to borrow an entry from. The max entry of the g.l.b
* node becomes the min entry of this node.
*/
void
Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
{
TreeHead& tree = frag.m_tree;
TreeEnt ent;
// find g.l.b node
NodeHandle glbNode(frag);
TupLoc loc = lubNode.getLink(0);
do {
jam();
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
// borrow max entry from semi/leaf
Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam();
treeRemoveSemi(frag, glbNode, 0);
return;
}
treeRemoveLeaf(frag, glbNode);
}
/*
* Handle semi-leaf after removing an entry. Move entries from leaf to
* semi-leaf to bring semi-leaf occupancy above minOccup, if possible.
* The leaf may become empty.
*/
void
Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
{
TreeHead& tree = frag.m_tree;
ndbrequire(semiNode.getChilds() < 2);
TupLoc leafLoc = semiNode.getLink(i);
NodeHandle leafNode(frag);
selectNode(leafNode, leafLoc);
if (semiNode.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
nodeSlide(semiNode, leafNode, cnt, i);
if (leafNode.getOccup() == 0) {
// remove empty leaf
deleteNode(signal, node);
if (parentLoc == NullTupLoc) {
jam();
// tree is now empty
tree.m_root = NullTupLoc;
return;
treeRemoveNode(frag, leafNode);
}
}
node = parentNode;
node.setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf
if (node.getBalance() == 0) {
}
/*
* Handle leaf after removing an entry. If parent is semi-leaf, move
* entries to it as in the semi-leaf case. If parent is interior node,
* do nothing.
*/
void
Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
TupLoc parentLoc = leafNode.getLink(2);
if (parentLoc != NullTupLoc) {
jam();
// move entries from the other child
TupLoc childLoc = node.getLink(1 - i);
NodeHandle childNode(frag);
selectNode(signal, childNode, childLoc);
nodeSlide(signal, node, childNode, 1 - i);
if (childNode.getOccup() == 0) {
jam();
deleteNode(signal, childNode);
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
unsigned i = leafNode.getSide();
if (parentNode.getLink(1 - i) == NullTupLoc) {
// parent is semi-leaf
jam();
return;
if (parentNode.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
nodeSlide(parentNode, leafNode, cnt, i);
}
// fix side and become parent
i = node.getSide();
selectNode(signal, node, parentLoc);
}
}
#endif
// re-balance tree at each node
if (leafNode.getOccup() == 0) {
jam();
// remove empty leaf
treeRemoveNode(frag, leafNode);
}
}
/*
* Remove empty leaf.
*/
void
Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
ndbrequire(leafNode.getChilds() == 0);
TupLoc parentLoc = leafNode.getLink(2);
unsigned i = leafNode.getSide();
deleteNode(leafNode);
if (parentLoc != NullTupLoc) {
jam();
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
parentNode.setLink(i, NullTupLoc);
// re-balance the tree
treeRemoveRebalance(frag, parentNode, i);
return;
}
// tree is now empty
tree.m_root = NullTupLoc;
}
/*
* Re-balance tree after removing a node. The process starts with the
* parent of the removed node.
*/
void
Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) {
// height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1);
......@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam();
// child on the other side
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i));
selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, node, 1 - i);
treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, node, 1 - i);
treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up
} else {
jam();
treeRotateSingle(signal, frag, node, 1 - i);
treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done
return;
}
......@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return;
}
i = node.getSide();
selectNode(signal, node, parentLoc);
selectNode(node, parentLoc);
}
}
......@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* all optional. If 4 are there it changes side.
*/
void
Dbtux::treeRotateSingle(Signal* signal,
Frag& frag,
NodeHandle& node,
unsigned i)
Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
{
ndbrequire(i <= 1);
/*
......@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3);
selectNode(node3, loc3);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
......@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
selectNode(signal, node4, loc4);
selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
......@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) {
jam();
NodeHandle node0(frag);
selectNode(signal, node0, loc0);
selectNode(node0, loc0);
node0.setLink(side5, loc3);
} else {
jam();
......@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal,
*
*/
void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{
TreeHead& tree = frag.m_tree;
// old top node
NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc;
......@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
selectNode(signal, node2, loc2);
selectNode(node2, loc2);
const int bal2 = node2.getBalance();
// level 2
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
selectNode(signal, node4, loc4);
selectNode(node4, loc4);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
......@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// fill up leaf before it becomes internal
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
TreeHead& tree = frag.m_tree;
nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves
if (node4.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = tree.m_minOccup - node4.getOccup();
ndbrequire(cnt < node2.getOccup());
nodeSlide(node4, node2, cnt, i);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
}
} else {
if (loc3 != NullTupLoc) {
jam();
NodeHandle node3(frag);
selectNode(signal, node3, loc3);
selectNode(node3, loc3);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (loc5 != NullTupLoc) {
jam();
NodeHandle node5(frag);
selectNode(signal, node5, loc5);
selectNode(node5, loc5);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
......@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) {
jam();
selectNode(signal, node0, loc0);
selectNode(node0, loc0);
node0.setLink(side6, loc4);
} else {
jam();
......
......@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown)
comments [ ... ] are after the case
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
......@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct
[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ]
[ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct
......@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct
mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ]
[ case c: table scan has improved... ]
[ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup]
charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct
......@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ]
[ binary search of bounding node when adding entry ]
none mc02/a 35 ms 53 ms 51 pct
mc02/b 42 ms 75 ms 76 pct
[ rewrote treeAdd / treeRemove ]
optim 17 mc02/a 35 ms 52 ms 49 pct
mc02/b 43 ms 75 ms 75 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et:
......@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_nologging;
......@@ -66,7 +66,7 @@ struct Opt {
m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
m_subsubloop(4),
m_index(0),
m_loop(1),
m_nologging(false),
......@@ -79,7 +79,7 @@ struct Opt {
m_seed(0),
m_subloop(4),
m_table(0),
m_threads(4),
m_threads(6), // table + 5 indexes
m_v(1) {
}
};
......@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows;
// value calculation
unsigned m_range;
......@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0),
m_set(0),
m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows),
m_range(m_rows),
m_pctrange(0),
......@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0);
......@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
......@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0;
}
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int
pkupdatescanread(Par par)
{
......@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func;
thr.start();
}
......@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
if (i % 2 == 0) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
......@@ -2963,9 +3018,10 @@ tbuild(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
}
RUNSTEP(par, readverify, MT);
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -2973,6 +3029,22 @@ tbuild(Par par)
static int
tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
......@@ -2981,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3000,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3014,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
......@@ -3030,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
......@@ -3049,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
......@@ -3074,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3096,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3132,9 +3204,10 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"),
TCase("c", tmixedops, "pk operations and scan operations"),
TCase("d", tbusybuild, "pk operations and index build"),
TCase("b", tpkops, "pk operations"),
TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
......@@ -3192,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0);
}
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
LL1("loop " << l);
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << par.m_lno);
if (par.m_seed == 0)
srandom(l);
srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
......@@ -263,8 +263,8 @@ int ha_example::write_row(byte * buf)
clause was used. Consecutive ordering is not guarenteed.
Currently new_data will not have an updated auto_increament record, or
and updated timestamp field. You can do these for example by doing these:
if (table->timestamp_on_update_now)
update_timestamp(new_row+table->timestamp_on_update_now-1);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time();
if (table->next_number_field && record == table->record[0])
update_auto_increment();
......
......@@ -428,8 +428,8 @@ int ha_tina::write_row(byte * buf)
statistic_increment(ha_write_count,&LOCK_status);
if (table->timestamp_default_now)
update_timestamp(buf+table->timestamp_default_now-1);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
size= encode_quote(buf);
......@@ -464,8 +464,8 @@ int ha_tina::update_row(const byte * old_data, byte * new_data)
statistic_increment(ha_update_count,&LOCK_status);
if (table->timestamp_default_now)
update_timestamp(new_data+table->timestamp_default_now-1);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time();
size= encode_quote(new_data);
......
......@@ -5468,4 +5468,29 @@ innobase_get_at_most_n_mbchars(
}
}
extern "C" {
/**********************************************************************
This function returns true if SQL-query in the current thread
is either REPLACE or LOAD DATA INFILE REPLACE.
NOTE that /mysql/innobase/row/row0ins.c must contain the
prototype for this function ! */
ibool
innobase_query_is_replace(void)
/*===========================*/
{
THD* thd;
thd = (THD *)innobase_current_thd();
if ( thd->lex->sql_command == SQLCOM_REPLACE ||
( thd->lex->sql_command == SQLCOM_LOAD &&
thd->lex->duplicates == DUP_REPLACE )) {
return true;
} else {
return false;
}
}
}
#endif /* HAVE_INNOBASE_DB */
......@@ -35,7 +35,7 @@
update user set password=PASSWORD("hello") where user="test"
This saves a hashed number as a string in the password field.
The new autentication is performed in following manner:
The new authentication is performed in following manner:
SERVER: public_seed=create_random_string()
send(public_seed)
......
CHARSET_INFO
============
A structure containing data for charset+collation pair implementation.
Virtual functions which use this data are collected
into separate structures MY_CHARSET_HANDLER and
MY_COLLATION_HANDLER.
typedef struct charset_info_st
{
uint number;
uint primary_number;
uint binary_number;
uint state;
const char *csname;
const char *name;
const char *comment;
uchar *ctype;
uchar *to_lower;
uchar *to_upper;
uchar *sort_order;
uint16 *tab_to_uni;
MY_UNI_IDX *tab_from_uni;
uchar state_map[256];
uchar ident_map[256];
uint strxfrm_multiply;
uint mbminlen;
uint mbmaxlen;
char max_sort_char; /* For LIKE optimization */
MY_CHARSET_HANDLER *cset;
MY_COLLATION_HANDLER *coll;
} CHARSET_INFO;
CHARSET_INFO fields description:
===============================
Numbers (identifiers)
---------------------
number - an ID uniquely identifying this charset+collation pair.
primary_number - ID of a charset+collation pair, which consists
of the same character set and the default collation of this
character set. Not really used now. Intended to optimize some
parts of the code where we need to find the default collation
using its non-default counterpart for the given character set.
binary_numner - ID of a charset+collation pair, which consists
of the same character set and the binary collation of this
character set. Not really used now. Intended to optimize
"SELECT BINARY x" in the future.
Names
-----
csname - name of the character set for this charset+collation pair.
name - name of the collation for this charset+collation pair.
comment - a text comment, dysplayed in "Description" column of
SHOW CHARACTER SET output.
Conversion tables
-----------------
ctype - pointer to array[257] of "type of characters"
bit mask for each chatacter, e.g. if a
character is a digit or a letter or a separator, etc.
to_lower - pointer to arrat[256] used in LCASE()
to_upper - pointer to array[256] used in UCASE()
sort_order - pointer to array[256] used for strings comparison
Unicode conversion data
-----------------------
For 8bit character sets:
tab_to_uni : array[256] of charset->Unicode translation
tab_from_uni: a structure for Unicode->charset translation
Non-8 bit charsets have their own structures per charset
hidden in correspondent ctype-xxx.c file and don't use
tab_to_uni and tab_from_uni tables.
Parser maps
-----------
state_map[]
ident_map[]
These maps are to quickly identify if a character is
an identificator part, a digit, a special character,
or a part of other SQL language lexical item.
Probably can be combined with ctype array in the future.
But for some reasons these two arrays are used in the parser,
while a separate ctype[] array is used in the other part of the
code, like fulltext, etc.
Misc fields
-----------
strxfrm_multiply - how many times a sort key (i.e. a string
which can be passed into memcmp() for comparison)
can be longer than the original string.
Usually it is 1. For some complex
collations it can be bigger. For example
in latin1_german2_ci, a sort key is up to
twice longer than the original string.
e.g. Letter 'A' with two dots above is
substituted with 'AE'.
mbminlen - mininum multibyte sequence length.
Now always 1 accept ucs2. For ucs2
it is 2.
mbmaxlen - maximum multibyte sequence length.
1 for 8bit charsets. Can be also 2 or 3.
MY_CHARSET_HANDLER
==================
MY_CHARSET_HANDLER is a collection of character-set
related routines. Defined in m_ctype.h. Have the
following set of functions:
Multibyte routines
------------------
ismbchar() - detects if the given string is a multibyte sequence
mbcharlen() - retuturns length of multibyte sequence starting with
the given character
numchars() - returns number of characters in the given string, e.g.
in SQL function CHAR_LENGTH().
charpos() - calculates the offset of the given position in the string.
Used in SQL functions LEFT(), RIGHT(), SUBSTRING(),
INSERT()
well_formed_length()
- finds the length of correctly formed multybyte beginning.
Used in INSERTs to cut a beginning of the given string
which is
a) "well formed" according to the given character set.
b) can fit into the given data type
Terminates the string in the good position, taking in account
multibyte character boundaries.
lengthsp() - returns the length of the given string without traling spaces.
Unicode conversion routines
---------------------------
mb_wc - converts the left multibyte sequence into it Unicode code.
mc_mb - converts the given Unicode code into multibyte sequence.
Case and sort convertion
------------------------
caseup_str - converts the given 0-terminated string into the upper case
casedn_str - converts the given 0-terminated string into the lower case
caseup - converts the given string into the lower case using length
casedn - converts the given string into the lower case using length
Number-to-string conversion routines
------------------------------------
snprintf()
long10_to_str()
longlong10_to_str()
The names are pretty self-descripting.
String padding routines
-----------------------
fill() - writes the given Unicode value into the given string
with the given length. Used to pad the string, usually
with space character, according to the given charset.
String-to-numner conversion routines
------------------------------------
strntol()
strntoul()
strntoll()
strntoull()
strntod()
These functions are almost for the same thing with their
STDLIB counterparts, but also:
- accept length instead of 0-terminator
- and are character set dependant
Simple scanner routines
-----------------------
scan() - to skip leading spaces in the given string.
Used when a string value is inserted into a numeric field.
MY_COLLATION_HANDLER
====================
strnncoll() - compares two strings according to the given collation
strnncollsp() - like the above but ignores trailing spaces
strnxfrm() - makes a sort key suitable for memcmp() corresponding
to the given string
like_range() - creates a LIKE range, for optimizer
wildcmp() - wildcard comparison, for LIKE
strcasecmp() - 0-terminated string comparison
instr() - finds the first substring appearence in the string
hash_sort() - calculates hash value taking in account
the collation rules, e.g. case-insensitivity,
accent sensitivity, etc.
\ No newline at end of file
......@@ -123,8 +123,7 @@ int my_strcasecmp_mb(CHARSET_INFO * cs,const char *s, const char *t)
** 1 if matched with wildcard
*/
#define INC_PTR(cs,A,B) A+=((use_mb_flag && \
my_ismbchar(cs,A,B)) ? my_ismbchar(cs,A,B) : 1)
#define INC_PTR(cs,A,B) A+=(my_ismbchar(cs,A,B) ? my_ismbchar(cs,A,B) : 1)
#define likeconv(s,A) (uchar) (s)->sort_order[(uchar) (A)]
......@@ -135,8 +134,6 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
{
int result= -1; /* Not found, using wildcards */
bool use_mb_flag=use_mb(cs);
while (wildstr != wildend)
{
while (*wildstr != w_many && *wildstr != w_one)
......@@ -144,8 +141,7 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
int l;
if (*wildstr == escape && wildstr+1 != wildend)
wildstr++;
if (use_mb_flag &&
(l = my_ismbchar(cs, wildstr, wildend)))
if ((l = my_ismbchar(cs, wildstr, wildend)))
{
if (str+l > str_end || memcmp(str, wildstr, l) != 0)
return 1;
......@@ -200,14 +196,10 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
cmp= *++wildstr;
mb=wildstr;
LINT_INIT(mblen);
if (use_mb_flag)
mblen = my_ismbchar(cs, wildstr, wildend);
mblen= my_ismbchar(cs, wildstr, wildend);
INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */
cmp=likeconv(cs,cmp);
do
{
if (use_mb_flag)
{
for (;;)
{
......@@ -229,13 +221,6 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
}
INC_PTR(cs,str, str_end);
}
}
else
{
while (str != str_end && likeconv(cs,*str) != cmp)
str++;
if (str++ == str_end) return (-1);
}
{
int tmp=my_wildcmp_mb(cs,str,str_end,wildstr,wildend,escape,w_one,
w_many);
......@@ -555,8 +540,6 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
{
int result= -1; /* Not found, using wildcards */
bool use_mb_flag=use_mb(cs);
while (wildstr != wildend)
{
while (*wildstr != w_many && *wildstr != w_one)
......@@ -564,8 +547,7 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
int l;
if (*wildstr == escape && wildstr+1 != wildend)
wildstr++;
if (use_mb_flag &&
(l = my_ismbchar(cs, wildstr, wildend)))
if ((l = my_ismbchar(cs, wildstr, wildend)))
{
if (str+l > str_end || memcmp(str, wildstr, l) != 0)
return 1;
......@@ -620,13 +602,9 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
cmp= *++wildstr;
mb=wildstr;
LINT_INIT(mblen);
if (use_mb_flag)
mblen = my_ismbchar(cs, wildstr, wildend);
mblen= my_ismbchar(cs, wildstr, wildend);
INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */
do
{
if (use_mb_flag)
{
for (;;)
{
......@@ -647,13 +625,6 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
}
INC_PTR(cs,str, str_end);
}
}
else
{
while (str != str_end && *str != cmp)
str++;
if (str++ == str_end) return (-1);
}
{
int tmp=my_wildcmp_mb_bin(cs,str,str_end,wildstr,wildend,escape,w_one,w_many);
if (tmp <= 0)
......
......@@ -1231,171 +1231,13 @@ uint my_lengthsp_ucs2(CHARSET_INFO *cs __attribute__((unused)),
}
/*
** Compare string against string with wildcard
** 0 if matched
** -1 if not matched with wildcard
** 1 if matched with wildcard
*/
static
int my_wildcmp_ucs2(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights)
{
int result= -1; /* Not found, using wildcards */
my_wc_t s_wc, w_wc;
int scan, plane;
while (wildstr != wildend)
{
while (1)
{
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
}
if (w_wc == (my_wc_t)w_many)
{
result= 1; /* Found an anchor char */
break;
}
wildstr+= scan;
scan= my_ucs2_uni(cs, &s_wc, (const uchar*)str, (const uchar*)str_end);
if (scan <=0)
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}
else
{
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc != w_wc)
return 1; /* No match */
}
if (wildstr == wildend)
return (str != str_end); /* Match if both are at end */
}
if (w_wc == (my_wc_t)w_many)
{ /* Found w_many */
/* Remove any '%' and '_' from the wild search string */
for ( ; wildstr != wildend ; )
{
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)w_many)
{
wildstr+= scan;
continue;
}
if (w_wc == (my_wc_t)w_one)
{
wildstr+= scan;
scan= my_ucs2_uni(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end);
if (scan <=0)
return 1;
str+= scan;
continue;
}
break; /* Not a wild character */
}
if (wildstr == wildend)
return 0; /* Ok if w_many is last */
if (str == str_end)
return -1;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
}
while (1)
{
/* Skip until the first character from wildstr is found */
while (str != str_end)
{
scan= my_ucs2_uni(cs,&s_wc, (const uchar*)str,
(const uchar*)str_end);
if (scan <= 0)
return 1;
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc == w_wc)
break;
str+= scan;
}
if (str == str_end)
return -1;
result= my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend,escape,
w_one,w_many,weights);
if (result <= 0)
return result;
str+= scan;
}
}
}
return (str != str_end ? 1 : 0);
}
static
int my_wildcmp_ucs2_ci(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many)
{
return my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend,
return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,uni_plane);
}
......@@ -1406,7 +1248,7 @@ int my_wildcmp_ucs2_bin(CHARSET_INFO *cs,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many)
{
return my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend,
return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,NULL);
}
......
......@@ -1518,6 +1518,161 @@ MY_UNICASE_INFO *uni_plane[256]={
};
/*
** Compare string against string with wildcard
** This function is used in UTF8 and UCS2
**
** 0 if matched
** -1 if not matched with wildcard
** 1 if matched with wildcard
*/
int my_wildcmp_unicode(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights)
{
int result= -1; /* Not found, using wildcards */
my_wc_t s_wc, w_wc;
int scan, plane;
int (*mb_wc)(struct charset_info_st *cs, my_wc_t *wc,
const unsigned char *s,const unsigned char *e);
mb_wc= cs->cset->mb_wc;
while (wildstr != wildend)
{
while (1)
{
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
if ((scan= mb_wc(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
}
if (w_wc == (my_wc_t)w_many)
{
result= 1; /* Found an anchor char */
break;
}
wildstr+= scan;
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}
else
{
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc != w_wc)
return 1; /* No match */
}
if (wildstr == wildend)
return (str != str_end); /* Match if both are at end */
}
if (w_wc == (my_wc_t)w_many)
{ /* Found w_many */
/* Remove any '%' and '_' from the wild search string */
for ( ; wildstr != wildend ; )
{
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
if (w_wc == (my_wc_t)w_many)
{
wildstr+= scan;
continue;
}
if (w_wc == (my_wc_t)w_one)
{
wildstr+= scan;
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
str+= scan;
continue;
}
break; /* Not a wild character */
}
if (wildstr == wildend)
return 0; /* Ok if w_many is last */
if (str == str_end)
return -1;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <=0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <=0)
return 1;
}
while (1)
{
/* Skip until the first character from wildstr is found */
while (str != str_end)
{
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc == w_wc)
break;
str+= scan;
}
if (str == str_end)
return -1;
result= my_wildcmp_unicode(cs, str, str_end, wildstr, wildend,
escape, w_one, w_many,
weights);
if (result <= 0)
return result;
str+= scan;
}
}
}
return (str != str_end ? 1 : 0);
}
#endif
......@@ -1992,6 +2147,17 @@ static int my_strcasecmp_utf8(CHARSET_INFO *cs, const char *s, const char *t)
return my_strncasecmp_utf8(cs, s, t, len);
}
static
int my_wildcmp_utf8(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many)
{
return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,uni_plane);
}
static int my_strnxfrm_utf8(CHARSET_INFO *cs,
uchar *dst, uint dstlen,
const uchar *src, uint srclen)
......@@ -2060,7 +2226,7 @@ static MY_COLLATION_HANDLER my_collation_ci_handler =
my_strnncollsp_utf8,
my_strnxfrm_utf8,
my_like_range_mb,
my_wildcmp_mb,
my_wildcmp_utf8,
my_strcasecmp_utf8,
my_instr_mb,
my_hash_sort_utf8
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment