Commit 9c3dd2fb authored by pekka@mysql.com's avatar pekka@mysql.com

ha_ndb blobs 2

parent e38792a6
drop table if exists t1;
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
length(@x0) length(@b1) length(@d1)
256 2256 3000
select length(@x0),length(@b2),length(@d2) from dual;
length(@x0) length(@b2) length(@d2)
256 20000 30000
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 40000 b2 60000 dd2
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3)
2 4512 b1 6000 dd1
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
a
1
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref c c 4 const 10 Using where
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
a
1
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select * from t1 where c >= 100 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
rollback;
select count(*) from t1;
count(*)
0
--source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
#
# Minimal NDB blobs test.
#
# On NDB API level there is an extensive test program "testBlobs".
# A prerequisite for this handler test is that "testBlobs" succeeds.
#
# make test harder with autocommit off
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
# -- values --
# x0 size 256 (current inline size)
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
# b1 length 2000+256 (blob part aligned)
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
# d1 length 3000
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
# b2 length 20000
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
# d2 length 30000
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
select length(@x0),length(@b2),length(@d2) from dual;
# -- pk ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
# pk read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
# pk update
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
# pk update
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
# pk update to null
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
# pk delete
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
# -- hash index ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
# hash key read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
# hash key update
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
# hash key update to null
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
# hash key delete
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
# -- table scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
# table scan read
select * from t1 order by a;
# table scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- table scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
# table scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
# table scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- range scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select * from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- range scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- rollback --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
# 626
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
rollback;
select count(*) from t1;
--drop table t1;
......@@ -181,6 +181,21 @@ bool ha_ndbcluster::get_error_message(int error,
}
/*
Check if type is supported by NDB.
*/
static inline bool ndb_supported_type(enum_field_types type)
{
switch (type) {
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
return false;
}
return true;
}
/*
Instruct NDB to set the value of the hidden primary key
*/
......@@ -208,40 +223,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
pack_len));
DBUG_DUMP("key", (char*)field_ptr, pack_len);
switch (field->type()) {
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
if (ndb_supported_type(field->type()))
{
if (! (field->flags & BLOB_FLAG))
// Common implementation for most field types
DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
default:
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
DBUG_RETURN(3);
}
......@@ -260,62 +250,196 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
pack_len, field->is_null()?"Y":"N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
if (field->is_null())
if (ndb_supported_type(field->type()))
{
if (! (field->flags & BLOB_FLAG))
{
if (field->is_null())
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
}
switch (field->type()) {
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
// Common implementation for most field types
DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
}
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
default:
// Blob type
NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
if (field->is_null())
DBUG_RETURN(ndb_blob->setNull() != 0);
Field_blob *field_blob= (Field_blob*)field;
// Get length and pointer to data
uint32 blob_len= field_blob->get_length(field_ptr);
char* blob_ptr= NULL;
field_blob->get_ptr(&blob_ptr);
// Looks like NULL blob can also be signaled in this way
if (blob_ptr == NULL)
DBUG_RETURN(ndb_blob->setNull() != 0);
DBUG_PRINT("value", ("set blob ptr=%x len=%u",
(unsigned)blob_ptr, blob_len));
DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
// No callback needed to write value
DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
}
DBUG_RETURN(1);
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
/*
Callback to read all blob values.
- not done in unpack_record because unpack_record is valid
after execute(Commit) but reading blobs is not
- may only generate read operations; they have to be executed
somewhere before the data is available
- due to single buffer for all blobs, we let the last blob
process all blobs (last so that all are active)
- null bit is still set in unpack_record
- TODO allocate blob part aligned buffers
*/
NdbBlob::ActiveHook get_ndb_blobs_value;
int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
{
DBUG_ENTER("get_ndb_blobs_value [callback]");
if (ndb_blob->blobsNextBlob() != NULL)
DBUG_RETURN(0);
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
}
int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
DBUG_ENTER("get_ndb_blobs_value");
// Field has no field number so cannot use TABLE blob_field
// Loop twice, first only counting total buffer size
for (int loop= 0; loop <= 1; loop++)
{
uint32 offset= 0;
for (uint i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
NdbValue value= m_value[i];
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
{
Field_blob *field_blob= (Field_blob *)field;
NdbBlob *ndb_blob= value.blob;
Uint64 blob_len= 0;
if (ndb_blob->getLength(blob_len) != 0)
DBUG_RETURN(-1);
// Align to Uint64
uint32 blob_size= blob_len;
if (blob_size % 8 != 0)
blob_size+= 8 - blob_size % 8;
if (loop == 1)
{
char *buf= blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
DBUG_PRINT("value", ("read blob ptr=%x len=%u",
(uint)buf, (uint)blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
field_blob->set_ptr(len, buf);
}
offset+= blob_size;
}
}
if (loop == 0 && offset > blobs_buffer_size)
{
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer_size= 0;
DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
blobs_buffer= my_malloc(offset, MYF(MY_WME));
if (blobs_buffer == NULL)
DBUG_RETURN(-1);
blobs_buffer_size= offset;
}
DBUG_RETURN(3);
}
DBUG_RETURN(0);
}
/*
Instruct NDB to fetch one field
- data is read directly into buffer provided by field_ptr
if it's NULL, data is read into memory provided by NDBAPI
- data is read directly into buffer provided by field
if field is NULL, data is read into memory provided by NDBAPI
*/
int ha_ndbcluster::get_ndb_value(NdbOperation *op,
uint field_no, byte *field_ptr)
int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
uint fieldnr)
{
DBUG_ENTER("get_ndb_value");
DBUG_PRINT("enter", ("field_no: %d", field_no));
m_value[field_no]= op->getValue(field_no, field_ptr);
DBUG_RETURN(m_value == NULL);
DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
(int)(field != NULL ? field->flags : 0)));
if (field != NULL)
{
if (ndb_supported_type(field->type()))
{
DBUG_ASSERT(field->ptr != NULL);
if (! (field->flags & BLOB_FLAG))
{
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
// Blob type
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
m_value[fieldnr].blob= ndb_blob;
if (ndb_blob != NULL)
{
// Set callback
void *arg= (void *)this;
DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0);
}
DBUG_RETURN(1);
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
// Used for hidden key only
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
/*
Check if any set or get of blob value in current query.
*/
bool ha_ndbcluster::uses_blob_value(bool all_fields)
{
if (table->blob_fields == 0)
return false;
if (all_fields)
return true;
{
uint no_fields= table->fields;
int i;
THD *thd= current_thd;
// They always put blobs at the end..
for (i= no_fields - 1; i >= 0; i--)
{
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
return true;
}
}
}
return false;
}
......@@ -462,10 +586,19 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN;
}
NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
return (type == TL_WRITE_ALLOW_WRITE) ?
NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_Read;
int lm;
if (type == TL_WRITE_ALLOW_WRITE)
lm = NdbScanOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
/*
TODO use a new scan mode to read + lock + keyinfo
*/
lm = NdbScanOperation::LM_Exclusive;
else
lm = NdbScanOperation::LM_Read;
return lm;
}
static const ulong index_type_flags[]=
......@@ -614,7 +747,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if (set_hidden_key(op, no_fields, key))
goto err;
// Read key at the same time, for future reference
if (get_ndb_value(op, no_fields, NULL))
if (get_ndb_value(op, NULL, no_fields))
goto err;
}
else
......@@ -630,13 +763,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
goto err;
}
else
{
// Attribute was not to be read
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -700,13 +833,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -749,11 +882,22 @@ inline int ha_ndbcluster::next_result(byte *buf)
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
We can only handle one tuple with blobs at a time.
*/
if (ops_pending && blobs_pending)
{
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
ops_pending= 0;
blobs_pending= false;
}
check= cursor->nextResult(contact_ndb);
if (check == 0)
{
// One more record found
DBUG_PRINT("info", ("One more record found"));
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
......@@ -867,8 +1011,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -928,7 +1074,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -997,7 +1145,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
......@@ -1021,12 +1171,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
(field->flags & PRI_KEY_FLAG) ||
retrieve_all_fields)
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -1040,7 +1190,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
if (get_ndb_value(op, hidden_no, NULL))
if (get_ndb_value(op, NULL, hidden_no))
ERR_RETURN(op->getNdbError());
}
......@@ -1108,12 +1258,13 @@ int ha_ndbcluster::write_row(byte *record)
*/
rows_inserted++;
if ((rows_inserted == rows_to_insert) ||
((rows_inserted % bulk_insert_rows) == 0))
((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0)
{
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows));
(int)rows_inserted, (int)bulk_insert_rows));
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
}
......@@ -1190,6 +1341,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
if (uses_blob_value(false))
blobs_pending= true;
}
else
{
......@@ -1205,7 +1358,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
......@@ -1280,7 +1433,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
......@@ -1318,7 +1471,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{
uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end;
NdbRecAttr **value= m_value;
NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
// Set null flag(s)
......@@ -1327,9 +1480,24 @@ void ha_ndbcluster::unpack_record(byte* buf)
field < end;
field++, value++)
{
if (*value && (*value)->isNULL())
if ((*value).ptr)
{
if (! ((*field)->flags & BLOB_FLAG))
{
if ((*value).rec->isNULL())
(*field)->set_null(row_offset);
}
else
{
NdbBlob* ndb_blob= (*value).blob;
bool isNull= true;
int ret= ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
(*field)->set_null(row_offset);
}
}
}
#ifndef DBUG_OFF
// Read and print all values that was fetched
......@@ -1339,7 +1507,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no];
NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value()));
......@@ -1367,9 +1535,9 @@ void ha_ndbcluster::print_results()
{
Field *field;
const NDBCOL *col;
NdbRecAttr *value;
NdbValue value;
if (!(value= m_value[f]))
if (!(value= m_value[f]).ptr)
{
fprintf(DBUG_FILE, "Field %d was not read\n", f);
continue;
......@@ -1379,18 +1547,27 @@ void ha_ndbcluster::print_results()
col= tab->getColumn(f);
fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
if (value->isNULL())
NdbBlob *ndb_blob= NULL;
if (! (field->flags & BLOB_FLAG))
{
if (value.rec->isNULL())
{
fprintf(DBUG_FILE, "NULL\n");
continue;
}
}
else
{
ndb_blob= value.blob;
bool isNull= true;
ndb_blob->getNull(isNull);
if (isNull) {
fprintf(DBUG_FILE, "NULL\n");
continue;
}
}
switch (col->getType()) {
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
case NdbDictionary::Column::Tinyint: {
char value= *field->ptr;
fprintf(DBUG_FILE, "Tinyint\t%d", value);
......@@ -1482,6 +1659,21 @@ void ha_ndbcluster::print_results()
fprintf(DBUG_FILE, "Timespec\t%llu", value);
break;
}
case NdbDictionary::Column::Blob: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Text: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
}
fprintf(DBUG_FILE, "\n");
......@@ -1727,7 +1919,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no];
NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
......@@ -1901,7 +2093,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
const NDBTAB *tab= (NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert");
DBUG_PRINT("enter", ("rows: %d", rows));
DBUG_PRINT("enter", ("rows: %d", (int)rows));
rows_inserted= 0;
rows_to_insert= rows;
......@@ -1936,7 +2128,7 @@ int ha_ndbcluster::end_bulk_insert()
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
DBUG_ENTER("extra_opt");
DBUG_PRINT("enter", ("cache_size: %d", cache_size));
DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
DBUG_RETURN(extra(operation));
}
......@@ -2157,7 +2349,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid;
DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
......@@ -2234,71 +2426,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
/*
Map MySQL type to the corresponding NDB type
Define NDB column based on Field.
Returns 0 or mysql error code.
Not member of ha_ndbcluster because NDBCOL cannot be declared.
*/
inline NdbDictionary::Column::Type
mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
static int create_ndb_column(NDBCOL &col,
Field *field,
HA_CREATE_INFO *info)
{
switch(mysql_type) {
// Set name
col.setName(field->field_name);
// Set type and sizes
const enum enum_field_types mysql_type= field->real_type();
switch (mysql_type) {
// Numeric types
case MYSQL_TYPE_DECIMAL:
return NdbDictionary::Column::Char;
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_TINY:
return (unsigned_flg) ?
NdbDictionary::Column::Tinyunsigned :
NdbDictionary::Column::Tinyint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Tinyunsigned);
else
col.setType(NDBCOL::Tinyint);
col.setLength(1);
break;
case MYSQL_TYPE_SHORT:
return (unsigned_flg) ?
NdbDictionary::Column::Smallunsigned :
NdbDictionary::Column::Smallint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Smallunsigned);
else
col.setType(NDBCOL::Smallint);
col.setLength(1);
break;
case MYSQL_TYPE_LONG:
return (unsigned_flg) ?
NdbDictionary::Column::Unsigned :
NdbDictionary::Column::Int;
case MYSQL_TYPE_TIMESTAMP:
return NdbDictionary::Column::Unsigned;
case MYSQL_TYPE_LONGLONG:
return (unsigned_flg) ?
NdbDictionary::Column::Bigunsigned :
NdbDictionary::Column::Bigint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Unsigned);
else
col.setType(NDBCOL::Int);
col.setLength(1);
break;
case MYSQL_TYPE_INT24:
return (unsigned_flg) ?
NdbDictionary::Column::Mediumunsigned :
NdbDictionary::Column::Mediumint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Mediumunsigned);
else
col.setType(NDBCOL::Mediumint);
col.setLength(1);
break;
case MYSQL_TYPE_LONGLONG:
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Bigunsigned);
else
col.setType(NDBCOL::Bigint);
col.setLength(1);
break;
case MYSQL_TYPE_FLOAT:
return NdbDictionary::Column::Float;
col.setType(NDBCOL::Float);
col.setLength(1);
break;
case MYSQL_TYPE_DOUBLE:
return NdbDictionary::Column::Double;
case MYSQL_TYPE_DATETIME :
return NdbDictionary::Column::Datetime;
case MYSQL_TYPE_DATE :
case MYSQL_TYPE_NEWDATE :
case MYSQL_TYPE_TIME :
case MYSQL_TYPE_YEAR :
// Missing NDB data types, mapped to char
return NdbDictionary::Column::Char;
case MYSQL_TYPE_ENUM :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_SET :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_TINY_BLOB :
case MYSQL_TYPE_MEDIUM_BLOB :
case MYSQL_TYPE_LONG_BLOB :
case MYSQL_TYPE_BLOB :
return NdbDictionary::Column::Blob;
case MYSQL_TYPE_VAR_STRING :
return NdbDictionary::Column::Varchar;
case MYSQL_TYPE_STRING :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_NULL :
case MYSQL_TYPE_GEOMETRY :
return NdbDictionary::Column::Undefined;
}
return NdbDictionary::Column::Undefined;
col.setType(NDBCOL::Double);
col.setLength(1);
break;
// Date types
case MYSQL_TYPE_TIMESTAMP:
col.setType(NDBCOL::Unsigned);
col.setLength(1);
break;
case MYSQL_TYPE_DATETIME:
col.setType(NDBCOL::Datetime);
col.setLength(1);
break;
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_YEAR:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
// Char types
case MYSQL_TYPE_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Binary);
else
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_VAR_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Varbinary);
else
col.setType(NDBCOL::Varchar);
col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
mysql_type_tiny_blob:
case MYSQL_TYPE_TINY_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
// No parts
col.setPartSize(0);
col.setStripeSize(0);
break;
mysql_type_blob:
case MYSQL_TYPE_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
// Use "<=" even if "<" is the exact condition
if (field->max_length() <= (1 << 8))
goto mysql_type_tiny_blob;
else if (field->max_length() <= (1 << 16))
{
col.setInlineSize(256);
col.setPartSize(2000);
col.setStripeSize(16);
}
else if (field->max_length() <= (1 << 24))
goto mysql_type_medium_blob;
else
goto mysql_type_long_blob;
break;
mysql_type_medium_blob:
case MYSQL_TYPE_MEDIUM_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(4000);
col.setStripeSize(8);
break;
mysql_type_long_blob:
case MYSQL_TYPE_LONG_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(8000);
col.setStripeSize(4);
break;
// Other types
case MYSQL_TYPE_ENUM:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_SET:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
goto mysql_type_unsupported;
mysql_type_unsupported:
default:
return HA_ERR_UNSUPPORTED;
}
// Set nullable and pk
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
// Set autoincrement
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
return 0;
}
/*
Create a table in NDB Cluster
*/
......@@ -2308,7 +2613,6 @@ int ha_ndbcluster::create(const char *name,
HA_CREATE_INFO *info)
{
NDBTAB tab;
NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
const void *data, *pack_data;
......@@ -2339,31 +2643,11 @@ int ha_ndbcluster::create(const char *name,
for (i= 0; i < form->fields; i++)
{
Field *field= form->field[i];
ndb_type= mysql_to_ndb_type(field->real_type(),
field->flags & UNSIGNED_FLAG);
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(),
field->pack_length()));
col.setName(field->field_name);
col.setType(ndb_type);
if ((ndb_type == NdbDictionary::Column::Char) ||
(ndb_type == NdbDictionary::Column::Varchar))
col.setLength(field->pack_length());
else
col.setLength(1);
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
if (my_errno= create_ndb_column(col, field, info))
DBUG_RETURN(my_errno);
tab.addColumn(col);
}
......@@ -2631,14 +2915,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS |
HA_NO_BLOBS),
HA_NO_PREFIX_CHAR_KEYS),
m_use_write(false),
retrieve_all_fields(FALSE),
rows_to_insert(0),
rows_inserted(0),
bulk_insert_rows(1024),
ops_pending(0)
ops_pending(0),
blobs_buffer(0),
blobs_buffer_size(0)
{
int i;
......@@ -2671,6 +2956,8 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0;
// Check for open cursor/transaction
DBUG_ASSERT(m_active_cursor == NULL);
......
......@@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbBlob;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
......@@ -171,6 +172,7 @@ class ha_ndbcluster: public handler
enum ha_rkey_function find_flag);
int close_scan();
void unpack_record(byte *buf);
int get_ndb_lock_type(enum thr_lock_type type);
void set_dbname(const char *pathname);
void set_tabname(const char *pathname);
......@@ -181,7 +183,9 @@ class ha_ndbcluster: public handler
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
int get_ndb_value(NdbOperation*, Field *field, uint fieldnr);
friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
......@@ -191,8 +195,8 @@ class ha_ndbcluster: public handler
void print_results();
longlong get_auto_increment();
int ndb_err(NdbConnection*);
bool uses_blob_value(bool all_fields);
private:
int check_ndb_connection();
......@@ -209,13 +213,19 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
// NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool retrieve_all_fields;
ha_rows rows_to_insert;
ha_rows rows_inserted;
ha_rows bulk_insert_rows;
ha_rows ops_pending;
bool blobs_pending;
// memory for blobs in one tuple
char *blobs_buffer;
uint32 blobs_buffer_size;
};
bool ndbcluster_init(void);
......@@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name,
int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment