Commit d1e7ef79 authored by unknown's avatar unknown

ha_ndb blobs 2

parent 9864327a
drop table if exists t1;
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
length(@x0) length(@b1) length(@d1)
256 2256 3000
select length(@x0),length(@b2),length(@d2) from dual;
length(@x0) length(@b2) length(@d2)
256 20000 30000
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 40000 b2 60000 dd2
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3)
2 4512 b1 6000 dd1
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
a
1
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref c c 4 const 10 Using where
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
a
1
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select * from t1 where c >= 100 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
rollback;
select count(*) from t1;
count(*)
0
--source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
#
# Minimal NDB blobs test.
#
# On NDB API level there is an extensive test program "testBlobs".
# A prerequisite for this handler test is that "testBlobs" succeeds.
#
# make test harder with autocommit off
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
# -- values --
# x0 size 256 (current inline size)
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
# b1 length 2000+256 (blob part aligned)
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
# d1 length 3000
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
# b2 length 20000
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
# d2 length 30000
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
select length(@x0),length(@b2),length(@d2) from dual;
# -- pk ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
# pk read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
# pk update
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
# pk update
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
# pk update to null
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
# pk delete
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
# -- hash index ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
# hash key read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
# hash key update
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
# hash key update to null
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
# hash key delete
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
# -- table scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
# table scan read
select * from t1 order by a;
# table scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- table scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
# table scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
# table scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- range scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select * from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- range scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- rollback --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
# 626
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
rollback;
select count(*) from t1;
--drop table t1;
...@@ -181,6 +181,21 @@ bool ha_ndbcluster::get_error_message(int error, ...@@ -181,6 +181,21 @@ bool ha_ndbcluster::get_error_message(int error,
} }
/*
Check if type is supported by NDB.
*/
static inline bool ndb_supported_type(enum_field_types type)
{
switch (type) {
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
return false;
}
return true;
}
/* /*
Instruct NDB to set the value of the hidden primary key Instruct NDB to set the value of the hidden primary key
*/ */
...@@ -208,40 +223,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field, ...@@ -208,40 +223,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
pack_len)); pack_len));
DBUG_DUMP("key", (char*)field_ptr, pack_len); DBUG_DUMP("key", (char*)field_ptr, pack_len);
switch (field->type()) { if (ndb_supported_type(field->type()))
case MYSQL_TYPE_DECIMAL: {
case MYSQL_TYPE_TINY: if (! (field->flags & BLOB_FLAG))
case MYSQL_TYPE_SHORT: // Common implementation for most field types
case MYSQL_TYPE_LONG: DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
// Common implementation for most field types
DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
default:
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
} }
DBUG_RETURN(3); // Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
} }
...@@ -259,63 +249,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, ...@@ -259,63 +249,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
fieldnr, field->field_name, field->type(), fieldnr, field->field_name, field->type(),
pack_len, field->is_null()?"Y":"N")); pack_len, field->is_null()?"Y":"N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len); DBUG_DUMP("value", (char*) field_ptr, pack_len);
if (field->is_null()) if (ndb_supported_type(field->type()))
{ {
// Set value to NULL if (! (field->flags & BLOB_FLAG))
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0)); {
} if (field->is_null())
// Set value to NULL
switch (field->type()) { DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
case MYSQL_TYPE_DECIMAL: // Common implementation for most field types
case MYSQL_TYPE_TINY: DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
case MYSQL_TYPE_SHORT: }
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_FLOAT: // Blob type
case MYSQL_TYPE_DOUBLE: NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr);
case MYSQL_TYPE_TIMESTAMP: if (ndb_blob != NULL)
case MYSQL_TYPE_LONGLONG: {
case MYSQL_TYPE_INT24: if (field->is_null())
case MYSQL_TYPE_DATE: DBUG_RETURN(ndb_blob->setNull() != 0);
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME: Field_blob *field_blob= (Field_blob*)field;
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE: // Get length and pointer to data
case MYSQL_TYPE_ENUM: uint32 blob_len= field_blob->get_length(field_ptr);
case MYSQL_TYPE_SET: char* blob_ptr= NULL;
case MYSQL_TYPE_VAR_STRING: field_blob->get_ptr(&blob_ptr);
case MYSQL_TYPE_STRING:
// Common implementation for most field types // Looks like NULL blob can also be signaled in this way
DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0); if (blob_ptr == NULL)
DBUG_RETURN(ndb_blob->setNull() != 0);
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB: DBUG_PRINT("value", ("set blob ptr=%x len=%u",
case MYSQL_TYPE_LONG_BLOB: (unsigned)blob_ptr, blob_len));
case MYSQL_TYPE_BLOB: DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY: // No callback needed to write value
default: DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
// Unhandled field types }
DBUG_PRINT("error", ("Field type %d not supported", field->type())); DBUG_RETURN(1);
DBUG_RETURN(2); }
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
/*
Callback to read all blob values.
- not done in unpack_record because unpack_record is valid
after execute(Commit) but reading blobs is not
- may only generate read operations; they have to be executed
somewhere before the data is available
- due to single buffer for all blobs, we let the last blob
process all blobs (last so that all are active)
- null bit is still set in unpack_record
- TODO allocate blob part aligned buffers
*/
NdbBlob::ActiveHook get_ndb_blobs_value;
int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
{
DBUG_ENTER("get_ndb_blobs_value [callback]");
if (ndb_blob->blobsNextBlob() != NULL)
DBUG_RETURN(0);
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
}
int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
DBUG_ENTER("get_ndb_blobs_value");
// Field has no field number so cannot use TABLE blob_field
// Loop twice, first only counting total buffer size
for (int loop= 0; loop <= 1; loop++)
{
uint32 offset= 0;
for (uint i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
NdbValue value= m_value[i];
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
{
Field_blob *field_blob= (Field_blob *)field;
NdbBlob *ndb_blob= value.blob;
Uint64 blob_len= 0;
if (ndb_blob->getLength(blob_len) != 0)
DBUG_RETURN(-1);
// Align to Uint64
uint32 blob_size= blob_len;
if (blob_size % 8 != 0)
blob_size+= 8 - blob_size % 8;
if (loop == 1)
{
char *buf= blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
DBUG_PRINT("value", ("read blob ptr=%x len=%u",
(uint)buf, (uint)blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
field_blob->set_ptr(len, buf);
}
offset+= blob_size;
}
}
if (loop == 0 && offset > blobs_buffer_size)
{
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer_size= 0;
DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
blobs_buffer= my_malloc(offset, MYF(MY_WME));
if (blobs_buffer == NULL)
DBUG_RETURN(-1);
blobs_buffer_size= offset;
}
} }
DBUG_RETURN(3); DBUG_RETURN(0);
} }
/* /*
Instruct NDB to fetch one field Instruct NDB to fetch one field
- data is read directly into buffer provided by field_ptr - data is read directly into buffer provided by field
if it's NULL, data is read into memory provided by NDBAPI if field is NULL, data is read into memory provided by NDBAPI
*/ */
int ha_ndbcluster::get_ndb_value(NdbOperation *op, int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
uint field_no, byte *field_ptr) uint fieldnr)
{ {
DBUG_ENTER("get_ndb_value"); DBUG_ENTER("get_ndb_value");
DBUG_PRINT("enter", ("field_no: %d", field_no)); DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
m_value[field_no]= op->getValue(field_no, field_ptr); (int)(field != NULL ? field->flags : 0)));
DBUG_RETURN(m_value == NULL);
if (field != NULL)
{
if (ndb_supported_type(field->type()))
{
DBUG_ASSERT(field->ptr != NULL);
if (! (field->flags & BLOB_FLAG))
{
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
// Blob type
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
m_value[fieldnr].blob= ndb_blob;
if (ndb_blob != NULL)
{
// Set callback
void *arg= (void *)this;
DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0);
}
DBUG_RETURN(1);
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
// Used for hidden key only
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
/*
Check if any set or get of blob value in current query.
*/
bool ha_ndbcluster::uses_blob_value(bool all_fields)
{
if (table->blob_fields == 0)
return false;
if (all_fields)
return true;
{
uint no_fields= table->fields;
int i;
THD *thd= current_thd;
// They always put blobs at the end..
for (i= no_fields - 1; i >= 0; i--)
{
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
return true;
}
}
}
return false;
} }
...@@ -462,10 +586,19 @@ void ha_ndbcluster::release_metadata() ...@@ -462,10 +586,19 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type) int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{ {
return (type == TL_WRITE_ALLOW_WRITE) ? int lm;
NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_Read; if (type == TL_WRITE_ALLOW_WRITE)
lm = NdbScanOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
/*
TODO use a new scan mode to read + lock + keyinfo
*/
lm = NdbScanOperation::LM_Exclusive;
else
lm = NdbScanOperation::LM_Read;
return lm;
} }
static const ulong index_type_flags[]= static const ulong index_type_flags[]=
...@@ -614,7 +747,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -614,7 +747,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if (set_hidden_key(op, no_fields, key)) if (set_hidden_key(op, no_fields, key))
goto err; goto err;
// Read key at the same time, for future reference // Read key at the same time, for future reference
if (get_ndb_value(op, no_fields, NULL)) if (get_ndb_value(op, NULL, no_fields))
goto err; goto err;
} }
else else
...@@ -630,13 +763,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) ...@@ -630,13 +763,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
Field *field= table->field[i]; Field *field= table->field[i];
if (thd->query_id == field->query_id) if (thd->query_id == field->query_id)
{ {
if (get_ndb_value(op, i, field->ptr)) if (get_ndb_value(op, field, i))
goto err; goto err;
} }
else else
{ {
// Attribute was not to be read // Attribute was not to be read
m_value[i]= NULL; m_value[i].ptr= NULL;
} }
} }
...@@ -700,13 +833,13 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -700,13 +833,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) || if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG)) (field->flags & PRI_KEY_FLAG))
{ {
if (get_ndb_value(op, i, field->ptr)) if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
else else
{ {
// Attribute was not to be read // Attribute was not to be read
m_value[i]= NULL; m_value[i].ptr= NULL;
} }
} }
...@@ -749,11 +882,22 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -749,11 +882,22 @@ inline int ha_ndbcluster::next_result(byte *buf)
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE; bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
do { do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb)); DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
We can only handle one tuple with blobs at a time.
*/
if (ops_pending && blobs_pending)
{
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
ops_pending= 0;
blobs_pending= false;
}
check= cursor->nextResult(contact_ndb); check= cursor->nextResult(contact_ndb);
if (check == 0) if (check == 0)
{ {
// One more record found // One more record found
DBUG_PRINT("info", ("One more record found")); DBUG_PRINT("info", ("One more record found"));
unpack_record(buf); unpack_record(buf);
table->status= 0; table->status= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -867,8 +1011,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -867,8 +1011,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index); index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname))) if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted))) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -928,7 +1074,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, ...@@ -928,7 +1074,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
if (!(op= trans->getNdbScanOperation(m_tabname))) if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
...@@ -997,7 +1145,9 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -997,7 +1145,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if (!(op=trans->getNdbScanOperation(m_tabname))) if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism))) NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor; m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op)); DBUG_RETURN(define_read_attrs(buf, op));
...@@ -1021,12 +1171,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1021,12 +1171,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
(field->flags & PRI_KEY_FLAG) || (field->flags & PRI_KEY_FLAG) ||
retrieve_all_fields) retrieve_all_fields)
{ {
if (get_ndb_value(op, i, field->ptr)) if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
else else
{ {
m_value[i]= NULL; m_value[i].ptr= NULL;
} }
} }
...@@ -1040,7 +1190,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1040,7 +1190,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (!tab->getColumn(hidden_no)) if (!tab->getColumn(hidden_no))
DBUG_RETURN(1); DBUG_RETURN(1);
#endif #endif
if (get_ndb_value(op, hidden_no, NULL)) if (get_ndb_value(op, NULL, hidden_no))
ERR_RETURN(op->getNdbError()); ERR_RETURN(op->getNdbError());
} }
...@@ -1108,12 +1258,13 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1108,12 +1258,13 @@ int ha_ndbcluster::write_row(byte *record)
*/ */
rows_inserted++; rows_inserted++;
if ((rows_inserted == rows_to_insert) || if ((rows_inserted == rows_to_insert) ||
((rows_inserted % bulk_insert_rows) == 0)) ((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0)
{ {
// Send rows to NDB // Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\ DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d", "rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows)); (int)rows_inserted, (int)bulk_insert_rows));
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
} }
...@@ -1190,6 +1341,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1190,6 +1341,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple())) if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
ops_pending++; ops_pending++;
if (uses_blob_value(false))
blobs_pending= true;
} }
else else
{ {
...@@ -1205,7 +1358,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1205,7 +1358,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been // Require that the PK for this record has previously been
// read into m_value // read into m_value
uint no_fields= table->fields; uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields]; NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec); DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
...@@ -1280,7 +1433,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -1280,7 +1433,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key // This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key")); DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields; uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields]; NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL); DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef())) if (set_hidden_key(op, no_fields, rec->aRef()))
...@@ -1318,7 +1471,7 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -1318,7 +1471,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{ {
uint row_offset= (uint) (buf - table->record[0]); uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end; Field **field, **end;
NdbRecAttr **value= m_value; NdbValue *value= m_value;
DBUG_ENTER("unpack_record"); DBUG_ENTER("unpack_record");
// Set null flag(s) // Set null flag(s)
...@@ -1327,8 +1480,23 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -1327,8 +1480,23 @@ void ha_ndbcluster::unpack_record(byte* buf)
field < end; field < end;
field++, value++) field++, value++)
{ {
if (*value && (*value)->isNULL()) if ((*value).ptr)
(*field)->set_null(row_offset); {
if (! ((*field)->flags & BLOB_FLAG))
{
if ((*value).rec->isNULL())
(*field)->set_null(row_offset);
}
else
{
NdbBlob* ndb_blob= (*value).blob;
bool isNull= true;
int ret= ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
(*field)->set_null(row_offset);
}
}
} }
#ifndef DBUG_OFF #ifndef DBUG_OFF
...@@ -1339,7 +1507,7 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -1339,7 +1507,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields; int hidden_no= table->fields;
const NDBTAB *tab= (NDBTAB *) m_table; const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no); const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no]; NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec); DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value())); hidden_col->getName(), rec->u_64_value()));
...@@ -1367,9 +1535,9 @@ void ha_ndbcluster::print_results() ...@@ -1367,9 +1535,9 @@ void ha_ndbcluster::print_results()
{ {
Field *field; Field *field;
const NDBCOL *col; const NDBCOL *col;
NdbRecAttr *value; NdbValue value;
if (!(value= m_value[f])) if (!(value= m_value[f]).ptr)
{ {
fprintf(DBUG_FILE, "Field %d was not read\n", f); fprintf(DBUG_FILE, "Field %d was not read\n", f);
continue; continue;
...@@ -1378,19 +1546,28 @@ void ha_ndbcluster::print_results() ...@@ -1378,19 +1546,28 @@ void ha_ndbcluster::print_results()
DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
col= tab->getColumn(f); col= tab->getColumn(f);
fprintf(DBUG_FILE, "%d: %s\t", f, col->getName()); fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
if (value->isNULL()) NdbBlob *ndb_blob= NULL;
if (! (field->flags & BLOB_FLAG))
{ {
fprintf(DBUG_FILE, "NULL\n"); if (value.rec->isNULL())
continue; {
fprintf(DBUG_FILE, "NULL\n");
continue;
}
}
else
{
ndb_blob= value.blob;
bool isNull= true;
ndb_blob->getNull(isNull);
if (isNull) {
fprintf(DBUG_FILE, "NULL\n");
continue;
}
} }
switch (col->getType()) { switch (col->getType()) {
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
case NdbDictionary::Column::Tinyint: { case NdbDictionary::Column::Tinyint: {
char value= *field->ptr; char value= *field->ptr;
fprintf(DBUG_FILE, "Tinyint\t%d", value); fprintf(DBUG_FILE, "Tinyint\t%d", value);
...@@ -1482,6 +1659,21 @@ void ha_ndbcluster::print_results() ...@@ -1482,6 +1659,21 @@ void ha_ndbcluster::print_results()
fprintf(DBUG_FILE, "Timespec\t%llu", value); fprintf(DBUG_FILE, "Timespec\t%llu", value);
break; break;
} }
case NdbDictionary::Column::Blob: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Text: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
} }
fprintf(DBUG_FILE, "\n"); fprintf(DBUG_FILE, "\n");
...@@ -1727,7 +1919,7 @@ void ha_ndbcluster::position(const byte *record) ...@@ -1727,7 +1919,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key // No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key")); DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields; int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no]; NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (NDBTAB *) m_table; const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no); const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() && DBUG_ASSERT(hidden_col->getPrimaryKey() &&
...@@ -1901,7 +2093,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) ...@@ -1901,7 +2093,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
const NDBTAB *tab= (NDBTAB *) m_table; const NDBTAB *tab= (NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert"); DBUG_ENTER("start_bulk_insert");
DBUG_PRINT("enter", ("rows: %d", rows)); DBUG_PRINT("enter", ("rows: %d", (int)rows));
rows_inserted= 0; rows_inserted= 0;
rows_to_insert= rows; rows_to_insert= rows;
...@@ -1936,7 +2128,7 @@ int ha_ndbcluster::end_bulk_insert() ...@@ -1936,7 +2128,7 @@ int ha_ndbcluster::end_bulk_insert()
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size) int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{ {
DBUG_ENTER("extra_opt"); DBUG_ENTER("extra_opt");
DBUG_PRINT("enter", ("cache_size: %d", cache_size)); DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
DBUG_RETURN(extra(operation)); DBUG_RETURN(extra(operation));
} }
...@@ -2157,7 +2349,7 @@ int ha_ndbcluster::start_stmt(THD *thd) ...@@ -2157,7 +2349,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans= NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid; (NdbConnection*)thd->transaction.all.ndb_tid;
DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans)); DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans); DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
if (trans == NULL) if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError()); ERR_RETURN(m_ndb->getNdbError());
...@@ -2234,71 +2426,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction) ...@@ -2234,71 +2426,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
/* /*
Map MySQL type to the corresponding NDB type Define NDB column based on Field.
Returns 0 or mysql error code.
Not member of ha_ndbcluster because NDBCOL cannot be declared.
*/ */
inline NdbDictionary::Column::Type static int create_ndb_column(NDBCOL &col,
mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg) Field *field,
HA_CREATE_INFO *info)
{ {
switch(mysql_type) { // Set name
col.setName(field->field_name);
// Set type and sizes
const enum enum_field_types mysql_type= field->real_type();
switch (mysql_type) {
// Numeric types
case MYSQL_TYPE_DECIMAL: case MYSQL_TYPE_DECIMAL:
return NdbDictionary::Column::Char; col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_TINY: case MYSQL_TYPE_TINY:
return (unsigned_flg) ? if (field->flags & UNSIGNED_FLAG)
NdbDictionary::Column::Tinyunsigned : col.setType(NDBCOL::Tinyunsigned);
NdbDictionary::Column::Tinyint; else
col.setType(NDBCOL::Tinyint);
col.setLength(1);
break;
case MYSQL_TYPE_SHORT: case MYSQL_TYPE_SHORT:
return (unsigned_flg) ? if (field->flags & UNSIGNED_FLAG)
NdbDictionary::Column::Smallunsigned : col.setType(NDBCOL::Smallunsigned);
NdbDictionary::Column::Smallint; else
col.setType(NDBCOL::Smallint);
col.setLength(1);
break;
case MYSQL_TYPE_LONG: case MYSQL_TYPE_LONG:
return (unsigned_flg) ? if (field->flags & UNSIGNED_FLAG)
NdbDictionary::Column::Unsigned : col.setType(NDBCOL::Unsigned);
NdbDictionary::Column::Int; else
case MYSQL_TYPE_TIMESTAMP: col.setType(NDBCOL::Int);
return NdbDictionary::Column::Unsigned; col.setLength(1);
case MYSQL_TYPE_LONGLONG: break;
return (unsigned_flg) ?
NdbDictionary::Column::Bigunsigned :
NdbDictionary::Column::Bigint;
case MYSQL_TYPE_INT24: case MYSQL_TYPE_INT24:
return (unsigned_flg) ? if (field->flags & UNSIGNED_FLAG)
NdbDictionary::Column::Mediumunsigned : col.setType(NDBCOL::Mediumunsigned);
NdbDictionary::Column::Mediumint; else
col.setType(NDBCOL::Mediumint);
col.setLength(1);
break;
case MYSQL_TYPE_LONGLONG:
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Bigunsigned);
else
col.setType(NDBCOL::Bigint);
col.setLength(1);
break; break;
case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_FLOAT:
return NdbDictionary::Column::Float; col.setType(NDBCOL::Float);
col.setLength(1);
break;
case MYSQL_TYPE_DOUBLE: case MYSQL_TYPE_DOUBLE:
return NdbDictionary::Column::Double; col.setType(NDBCOL::Double);
case MYSQL_TYPE_DATETIME : col.setLength(1);
return NdbDictionary::Column::Datetime; break;
case MYSQL_TYPE_DATE : // Date types
case MYSQL_TYPE_NEWDATE : case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_TIME : col.setType(NDBCOL::Unsigned);
case MYSQL_TYPE_YEAR : col.setLength(1);
// Missing NDB data types, mapped to char break;
return NdbDictionary::Column::Char; case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_ENUM : col.setType(NDBCOL::Datetime);
return NdbDictionary::Column::Char; col.setLength(1);
case MYSQL_TYPE_SET : break;
return NdbDictionary::Column::Char; case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TINY_BLOB : case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_MEDIUM_BLOB : case MYSQL_TYPE_TIME:
case MYSQL_TYPE_LONG_BLOB : case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_BLOB : col.setType(NDBCOL::Char);
return NdbDictionary::Column::Blob; col.setLength(field->pack_length());
case MYSQL_TYPE_VAR_STRING : break;
return NdbDictionary::Column::Varchar; // Char types
case MYSQL_TYPE_STRING : case MYSQL_TYPE_STRING:
return NdbDictionary::Column::Char; if (field->flags & BINARY_FLAG)
case MYSQL_TYPE_NULL : col.setType(NDBCOL::Binary);
case MYSQL_TYPE_GEOMETRY : else
return NdbDictionary::Column::Undefined; col.setType(NDBCOL::Char);
} col.setLength(field->pack_length());
return NdbDictionary::Column::Undefined; break;
case MYSQL_TYPE_VAR_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Varbinary);
else
col.setType(NDBCOL::Varchar);
col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
mysql_type_tiny_blob:
case MYSQL_TYPE_TINY_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
// No parts
col.setPartSize(0);
col.setStripeSize(0);
break;
mysql_type_blob:
case MYSQL_TYPE_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
// Use "<=" even if "<" is the exact condition
if (field->max_length() <= (1 << 8))
goto mysql_type_tiny_blob;
else if (field->max_length() <= (1 << 16))
{
col.setInlineSize(256);
col.setPartSize(2000);
col.setStripeSize(16);
}
else if (field->max_length() <= (1 << 24))
goto mysql_type_medium_blob;
else
goto mysql_type_long_blob;
break;
mysql_type_medium_blob:
case MYSQL_TYPE_MEDIUM_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(4000);
col.setStripeSize(8);
break;
mysql_type_long_blob:
case MYSQL_TYPE_LONG_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(8000);
col.setStripeSize(4);
break;
// Other types
case MYSQL_TYPE_ENUM:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_SET:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
goto mysql_type_unsupported;
mysql_type_unsupported:
default:
return HA_ERR_UNSUPPORTED;
}
// Set nullable and pk
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
// Set autoincrement
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
return 0;
} }
/* /*
Create a table in NDB Cluster Create a table in NDB Cluster
*/ */
...@@ -2308,7 +2613,6 @@ int ha_ndbcluster::create(const char *name, ...@@ -2308,7 +2613,6 @@ int ha_ndbcluster::create(const char *name,
HA_CREATE_INFO *info) HA_CREATE_INFO *info)
{ {
NDBTAB tab; NDBTAB tab;
NdbDictionary::Column::Type ndb_type;
NDBCOL col; NDBCOL col;
uint pack_length, length, i; uint pack_length, length, i;
const void *data, *pack_data; const void *data, *pack_data;
...@@ -2339,31 +2643,11 @@ int ha_ndbcluster::create(const char *name, ...@@ -2339,31 +2643,11 @@ int ha_ndbcluster::create(const char *name,
for (i= 0; i < form->fields; i++) for (i= 0; i < form->fields; i++)
{ {
Field *field= form->field[i]; Field *field= form->field[i];
ndb_type= mysql_to_ndb_type(field->real_type(),
field->flags & UNSIGNED_FLAG);
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(), field->field_name, field->real_type(),
field->pack_length())); field->pack_length()));
col.setName(field->field_name); if (my_errno= create_ndb_column(col, field, info))
col.setType(ndb_type); DBUG_RETURN(my_errno);
if ((ndb_type == NdbDictionary::Column::Char) ||
(ndb_type == NdbDictionary::Column::Varchar))
col.setLength(field->pack_length());
else
col.setLength(1);
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
tab.addColumn(col); tab.addColumn(col);
} }
...@@ -2631,14 +2915,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -2631,14 +2915,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL), m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ | m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NOT_EXACT_COUNT | HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS | HA_NO_PREFIX_CHAR_KEYS),
HA_NO_BLOBS),
m_use_write(false), m_use_write(false),
retrieve_all_fields(FALSE), retrieve_all_fields(FALSE),
rows_to_insert(0), rows_to_insert(0),
rows_inserted(0), rows_inserted(0),
bulk_insert_rows(1024), bulk_insert_rows(1024),
ops_pending(0) ops_pending(0),
blobs_buffer(0),
blobs_buffer_size(0)
{ {
int i; int i;
...@@ -2671,6 +2956,8 @@ ha_ndbcluster::~ha_ndbcluster() ...@@ -2671,6 +2956,8 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster"); DBUG_ENTER("~ha_ndbcluster");
release_metadata(); release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0;
// Check for open cursor/transaction // Check for open cursor/transaction
DBUG_ASSERT(m_active_cursor == NULL); DBUG_ASSERT(m_active_cursor == NULL);
......
...@@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration ...@@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration class NdbResultSet; // Forward declaration
class NdbScanOperation; class NdbScanOperation;
class NdbIndexScanOperation; class NdbIndexScanOperation;
class NdbBlob;
typedef enum ndb_index_type { typedef enum ndb_index_type {
UNDEFINED_INDEX = 0, UNDEFINED_INDEX = 0,
...@@ -171,6 +172,7 @@ class ha_ndbcluster: public handler ...@@ -171,6 +172,7 @@ class ha_ndbcluster: public handler
enum ha_rkey_function find_flag); enum ha_rkey_function find_flag);
int close_scan(); int close_scan();
void unpack_record(byte *buf); void unpack_record(byte *buf);
int get_ndb_lock_type(enum thr_lock_type type);
void set_dbname(const char *pathname); void set_dbname(const char *pathname);
void set_tabname(const char *pathname); void set_tabname(const char *pathname);
...@@ -181,7 +183,9 @@ class ha_ndbcluster: public handler ...@@ -181,7 +183,9 @@ class ha_ndbcluster: public handler
int set_ndb_key(NdbOperation*, Field *field, int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr); uint fieldnr, const byte* field_ptr);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr); int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr); int get_ndb_value(NdbOperation*, Field *field, uint fieldnr);
friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op); int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
...@@ -191,8 +195,8 @@ class ha_ndbcluster: public handler ...@@ -191,8 +195,8 @@ class ha_ndbcluster: public handler
void print_results(); void print_results();
longlong get_auto_increment(); longlong get_auto_increment();
int ndb_err(NdbConnection*); int ndb_err(NdbConnection*);
bool uses_blob_value(bool all_fields);
private: private:
int check_ndb_connection(); int check_ndb_connection();
...@@ -209,13 +213,19 @@ class ha_ndbcluster: public handler ...@@ -209,13 +213,19 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY]; NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY]; const char* m_unique_index_name[MAX_KEY];
NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; // NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write; bool m_use_write;
bool retrieve_all_fields; bool retrieve_all_fields;
ha_rows rows_to_insert; ha_rows rows_to_insert;
ha_rows rows_inserted; ha_rows rows_inserted;
ha_rows bulk_insert_rows; ha_rows bulk_insert_rows;
ha_rows ops_pending; ha_rows ops_pending;
bool blobs_pending;
// memory for blobs in one tuple
char *blobs_buffer;
uint32 blobs_buffer_size;
}; };
bool ndbcluster_init(void); bool ndbcluster_init(void);
...@@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name, ...@@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name,
int ndbcluster_drop_database(const char* path); int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op); void ndbcluster_print_error(int error, const NdbOperation *error_op);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment