-- source include/have_ndb.inc
-- source include/have_binlog_format_row.inc
-- source include/not_embedded.inc

--disable_warnings
DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
drop database if exists mysqltest;
--enable_warnings

# workaround for bug#16445
# remove to reproduce bug and run tests from ndb start
# and with ndb_autodiscover disabled. Fails on Linux 50 % of the times
CREATE TABLE t1 (
  pk1 INT NOT NULL PRIMARY KEY,
  attr1 INT NOT NULL,
  attr2 INT,
  attr3 VARCHAR(10)
) ENGINE=ndbcluster;
drop table t1;

#
# Basic test to show that the NDB 
# table handler is working
#

#
# Show status and variables
#
--replace_column 2 #
SHOW GLOBAL STATUS LIKE 'ndb%';
--replace_column 2 #
SHOW GLOBAL VARIABLES LIKE 'ndb%';

#
# Create a normal table with primary key
#
CREATE TABLE t1 (
  pk1 INT NOT NULL PRIMARY KEY,
  attr1 INT NOT NULL,
  attr2 INT,
  attr3 VARCHAR(10)
) ENGINE=ndbcluster;

SHOW INDEX FROM t1;  
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
SHOW INDEX FROM t1;  
SELECT pk1 FROM t1 ORDER BY pk1;
SELECT * FROM t1 ORDER BY pk1;
SELECT t1.* FROM t1 ORDER BY pk1;

# Update on record by primary key
UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1 ORDER BY pk1;

# Update primary key
UPDATE t1 SET pk1=2 WHERE attr1=1;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET pk1=pk1 + 1;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET pk1=4 WHERE pk1 = 3;
SELECT * FROM t1 ORDER BY pk1;

# Delete the record
DELETE FROM t1;
SELECT * FROM t1;

# Insert more records and update them all at once
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9408, 8765, NULL, '8765'),
(7,8, NULL, NULL), (8,9, NULL, NULL), (9,10, NULL, NULL), (10,11, NULL, NULL), (11,12, NULL, NULL), (12,13, NULL, NULL), (13,14, NULL, NULL);
UPDATE t1 SET attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;

UPDATE t1 SET attr1 = 9998 WHERE pk1 < 1000;
SELECT * FROM t1 ORDER BY pk1;

UPDATE t1 SET attr1 = 9997 WHERE attr1 = 9999;
SELECT * FROM t1 ORDER BY pk1;

# Delete one record by specifying pk
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1 ORDER BY pk1;

# Delete all from table
DELETE FROM t1;
SELECT * FROM t1;

# Insert three records with attr1=4 and two with attr1=5
# Delete all with attr1=4
INSERT INTO t1 values (1, 4, NULL, NULL), (2, 4, NULL, NULL), (3, 5, NULL, NULL), (4, 4, NULL, NULL), (5, 5, NULL, NULL);
DELETE FROM t1 WHERE attr1=4;
SELECT * FROM t1 order by pk1;
DELETE FROM t1;

# Insert two records and delete one
INSERT INTO t1 VALUES (9410,9412, NULL, NULL), (9411, 9413, NULL, NULL);
DELETE FROM t1 WHERE pk1 = 9410;
SELECT * FROM t1;
DROP TABLE t1;

#
# Create table without primary key
# a hidden primary key column is created by handler
#
CREATE TABLE t1 (id INT, id2 int) engine=ndbcluster;
INSERT INTO t1 values(3456, 7890);
SELECT * FROM t1;
UPDATE t1 SET id=2 WHERE id2=12;
SELECT * FROM t1;
UPDATE t1 SET id=1234 WHERE id2=7890;
SELECT * FROM t1;
DELETE FROM t1;

INSERT INTO t1 values(3456, 7890), (3456, 7890), (3456, 7890), (3454, 7890);
SELECT * FROM t1 ORDER BY id;
DELETE FROM t1 WHERE id = 3456;
SELECT * FROM t1 ORDER BY id;

DROP TABLE t1;

# test create with the keyword "engine=NDBCLUSTER"
CREATE TABLE t1 (
  pk1 INT NOT NULL PRIMARY KEY,
  attr1 INT NOT NULL
) ENGINE=NDBCLUSTER;

INSERT INTO t1 values(1, 9999);

DROP TABLE t1;

# test create with the keyword "engine=NDB"
CREATE TABLE t1 (
  pk1 INT NOT NULL PRIMARY KEY,
  attr1 INT NOT NULL
) ENGINE=NDB;

INSERT INTO t1 values(1, 9999);

DROP TABLE t1;


#
# A more extensive test with a lot more records
#

CREATE TABLE t2 (
  a bigint unsigned NOT NULL PRIMARY KEY,
  b int unsigned not null,
  c int unsigned
) engine=ndbcluster;

CREATE TABLE t3 (
  a bigint unsigned NOT NULL,
  b bigint unsigned not null,
  c bigint unsigned,
  PRIMARY KEY(a)
) engine=ndbcluster;

CREATE TABLE t4 (
  a bigint unsigned NOT NULL,
  b bigint unsigned not null,
  c bigint unsigned NOT NULL,
  d int unsigned,
  PRIMARY KEY(a, b, c)
) engine=ndbcluster;


#
# insert more records into tables
#
let $1=1000;
disable_query_log;
while ($1)
{
 eval insert into t2 values($1, $1+9, 5);
 eval insert into t3 values($1, $1+9, 5);
 eval insert into t4 values($1, $1+9, 5, $1+26000);
 dec $1;
}
enable_query_log;


#
# delete every other record in the tables
#
let $1=1000;
disable_query_log;
while ($1)
{
 eval delete from t2 where a=$1;
 eval delete from t3 where a=$1;
 eval delete from t4 where a=$1 and b=$1+9 and c=5;
 dec $1;
 dec $1;
}
enable_query_log;


select * from t2 where a = 7 order by b;
select * from t2 where a = 7 order by a;
select * from t2 where a = 7 order by 2;
select * from t2 where a = 7 order by c;

select * from t2 where a = 7 and b = 16 order by b;
select * from t2 where a = 7 and b = 16 order by a;
select * from t2 where a = 7 and b = 17 order by a;
select * from t2 where a = 7 and b != 16 order by b;

select * from t2 where a = 7 and b = 16 and c = 5 order by b;
select * from t2 where a = 7 and b = 16 and c = 5 order by a;
select * from t2 where a = 7 and b = 16 and c = 6 order by a;
select * from t2 where a = 7 and b != 16 and c = 5 order by b;

select * from t3 where a = 7 order by b;
select * from t3 where a = 7 order by a;
select * from t3 where a = 7 order by 2;
select * from t3 where a = 7 order by c;

select * from t3 where a = 7 and b = 16 order by b;
select * from t3 where a = 7 and b = 16 order by a;
select * from t3 where a = 7 and b = 17 order by a;
select * from t3 where a = 7 and b != 16 order by b;

select * from t4 where a = 7 order by b;
select * from t4 where a = 7 order by a;
select * from t4 where a = 7 order by 2;
select * from t4 where a = 7 order by c;

select * from t4 where a = 7 and b = 16 order by b;
select * from t4 where a = 7 and b = 16 order by a;
select * from t4 where a = 7 and b = 17 order by a;
select * from t4 where a = 7 and b != 16 order by b;

#
# update records
#
let $1=1000;
disable_query_log;
while ($1)
{
 eval update t2 set c=$1 where a=$1;
 eval update t3 set c=7 where a=$1 and b=$1+9 and c=5;
 eval update t4 set d=$1+21987 where a=$1 and b=$1+9 and c=5;
 dec $1;
 dec $1;
}
enable_query_log;

delete from t2 where a > 5;
select x1.a, x1.b from t2 x1, t2 x2 where x1.b = x2.b order by x1.a;
select a, b FROM t2 outer_table where
a = (select a from t2 where b = outer_table.b ) order by a;


delete from t2;
delete from t3;
delete from t4;

drop table t2;
drop table t3;
drop table t4;

#
# Test delete and update from table with 3 keys
#

CREATE TABLE t5 (
  a bigint unsigned NOT NULL,
  b bigint unsigned not null,
  c bigint unsigned NOT NULL,
  d int unsigned,
  PRIMARY KEY(a, b, c)
) engine=ndbcluster;

insert into t5 values(10, 19, 5, 26010);

delete from t5 where a=10 and b=19 and c=5;

select * from t5;

insert into t5 values(10, 19, 5, 26010);

update t5 set d=21997 where a=10 and b=19 and c=5;

select * from t5;

delete from t5;

drop table t5;

#
# Test using table with a char(255) column first in table
#

CREATE TABLE t6 (
  adress char(255),
  a int NOT NULL PRIMARY KEY,
  b int
) engine = NDB;

insert into t6 values
 ("Nice road 3456", 1, 23),
 ("Street Road 78", 3, 92),
 ("Road street 89C", 5, 71),
 (NULL, 7, NULL);
select * from t6 order by a;
select a, b from t6 order by a;

update t6 set adress="End of road 09" where a=3;
update t6 set b=181, adress="Street 76" where a=7;
select * from t6 order by a;
select * from t6 where a=1;
delete from t6 where a=1;
select * from t6 order by a;
delete from t6 where b=71;
select * from t6 order by a;

drop table t6;

#
# Test using table with a char(255) column first in table and a 
# primary key consisting of two columns
#

CREATE TABLE t7 (
  adress char(255),
  a int NOT NULL,
  b int,
  c int NOT NULL,
  PRIMARY KEY(a, c)	
) engine = NDB;

insert into t7 values
 ("Highway 3456", 1, 23, 2),
 ("Street Road 78", 3, 92, 3),
 ("Main street 89C", 5, 71, 4),
 (NULL, 8, NULL, 12);
select * from t7 order by a;
select a, b from t7 order by a;

update t7 set adress="End of road 09" where a=3;
update t7 set adress="Gatuvägen 90C" where a=5 and c=4;
update t7 set adress="No adress" where adress is NULL;
select * from t7 order by a;
select * from t7 where a=1 and c=2;
delete from t7 where a=1;
delete from t7 where a=3 and c=3;
delete from t7 where a=5 and c=4;
select * from t7;
delete from t7 where b=23;
select * from t7;

drop table t7;

#
# Test multiple databases in one statement
#

CREATE TABLE t1 (
  pk1 INT NOT NULL PRIMARY KEY,
  attr1 INT NOT NULL,
  attr2 INT,
  attr3 VARCHAR(10)
) ENGINE=ndbcluster;

INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');

create database mysqltest;
use mysqltest;

CREATE TABLE t2 (
  a bigint unsigned NOT NULL PRIMARY KEY,
  b int unsigned not null,
  c int unsigned
) engine=ndbcluster;

insert into t2 select pk1,attr1,attr2 from test.t1;
select * from t2 order by a;
select b from test.t1, t2 where c = test.t1.attr2;
select b,test.t1.attr1 from test.t1, t2 where test.t1.pk1 < a;
 
drop table test.t1, t2;
drop database mysqltest;

#
# BUG#6031 - DROP DATABASE doesn't drop database on first try
#

--disable_warnings
drop database if exists ndbtest1;
--enable_warnings

create database ndbtest1;
use ndbtest1;
create table t1(id int) engine=ndbcluster;
drop database ndbtest1;
--error 1008
drop database ndbtest1;

#
# test support of char(0)
#

use test;
create table t1 (a int primary key, b char(0));
insert into t1 values (1,"");
insert into t1 values (2,NULL);
select * from t1 order by a;
select * from t1 order by b;
select * from t1 where b IS NULL;
select * from t1 where b IS NOT NULL;
drop table t1;

#
# test the limit of no of attributes in one table
#
# also tests bug#17179, more than 31 attributes in
# a partitioned table
#
create table t1 (
c1 int,
c2 int,
c3 int,
c4 int,
c5 int,
c6 int,
c7 int,
c8 int,
c9 int,
c10 int,
c11 int,
c12 int,
c13 int,
c14 int,
c15 int,
c16 int,
c17 int,
c18 int,
c19 int,
c20 int,
c21 int,
c22 int,
c23 int,
c24 int,
c25 int,
c26 int,
c27 int,
c28 int,
c29 int,
c30 int,
c31 int,
c32 int,
c33 int,
c34 int,
c35 int,
c36 int,
c37 int,
c38 int,
c39 int,
c40 int,
c41 int,
c42 int,
c43 int,
c44 int,
c45 int,
c46 int,
c47 int,
c48 int,
c49 int,
c50 int,
c51 int,
c52 int,
c53 int,
c54 int,
c55 int,
c56 int,
c57 int,
c58 int,
c59 int,
c60 int,
c61 int,
c62 int,
c63 int,
c64 int,
c65 int,
c66 int,
c67 int,
c68 int,
c69 int,
c70 int,
c71 int,
c72 int,
c73 int,
c74 int,
c75 int,
c76 int,
c77 int,
c78 int,
c79 int,
c80 int,
c81 int,
c82 int,
c83 int,
c84 int,
c85 int,
c86 int,
c87 int,
c88 int,
c89 int,
c90 int,
c91 int,
c92 int,
c93 int,
c94 int,
c95 int,
c96 int,
c97 int,
c98 int,
c99 int,
c100 int,
c101 int,
c102 int,
c103 int,
c104 int,
c105 int,
c106 int,
c107 int,
c108 int,
c109 int,
c110 int,
c111 int,
c112 int,
c113 int,
c114 int,
c115 int,
c116 int,
c117 int,
c118 int,
c119 int,
c120 int,
c121 int,
c122 int,
c123 int,
c124 int,
c125 int,
c126 int,
c127 int,
c128 int,
primary key using hash(c1)) engine=ndb partition by key(c1);
drop table t1;

#
# test max size of attribute name and truncation
#

create table t1 (
a1234567890123456789012345678901234567890 int primary key,
a12345678901234567890123456789a1234567890 int,
index(a12345678901234567890123456789a1234567890)
) engine=ndb;
show tables;
insert into t1 values (1,1),(2,1),(3,1),(4,1),(5,2),(6,1),(7,1);
--replace_column 9 #
explain select * from t1 where a12345678901234567890123456789a1234567890=2;
select * from t1 where a12345678901234567890123456789a1234567890=2;
drop table t1;

#
# test fragment creation
#
# first a table with _many_ fragments per node group
# then a table with just one fragment per node group
#
create table t1
  (a bigint, b bigint, c bigint, d bigint, 
   primary key (a,b,c,d)) 
  engine=ndb
  max_rows=800000000;
insert into t1 values
  (1,2,3,4),(2,3,4,5),(3,4,5,6),
  (3,2,3,4),(1,3,4,5),(2,4,5,6),
  (1,2,3,5),(2,3,4,8),(3,4,5,9),
  (3,2,3,5),(1,3,4,8),(2,4,5,9),
  (1,2,3,6),(2,3,4,6),(3,4,5,7),
  (3,2,3,6),(1,3,4,6),(2,4,5,7),
  (1,2,3,7),(2,3,4,7),(3,4,5,8),
  (3,2,3,7),(1,3,4,7),(2,4,5,8),
  (1,3,3,4),(2,4,4,5),(3,5,5,6),
  (3,3,3,4),(1,4,4,5),(2,5,5,6),
  (1,3,3,5),(2,4,4,8),(3,5,5,9),
  (3,3,3,5),(1,4,4,8),(2,5,5,9),
  (1,3,3,6),(2,4,4,6),(3,5,5,7),
  (3,3,3,6),(1,4,4,6),(2,5,5,7),
  (1,3,3,7),(2,4,4,7),(3,5,5,8),
  (3,3,3,7),(1,4,4,7),(2,5,5,8);
select count(*) from t1;
drop table t1;

create table t1
  (a bigint, b bigint, c bigint, d bigint, 
   primary key (a)) 
  engine=ndb
  max_rows=1;
drop table t1;

#
# Test auto_increment
#

connect (con1,localhost,root,,test);
connect (con2,localhost,root,,test);

create table t1
	(counter int(64) NOT NULL auto_increment,
	 datavalue char(40) default 'XXXX',
	 primary key (counter)
	) ENGINE=ndbcluster;

connection con1;
insert into t1 (datavalue) values ('newval');
insert into t1 (datavalue) values ('newval');
select * from t1 order by counter;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;
connection con2;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
insert into t1 (datavalue) select datavalue from t1 where counter < 100;
select * from t1 order by counter;

drop table t1;

#
# bug#27437
connection con1;
create table t1 (a int primary key auto_increment) engine = ndb;
insert into t1() values (),(),(),(),(),(),(),(),(),(),(),();
connection con2;
insert into t1(a) values (20),(28);
connection con1;
insert into t1() values (),(),(),(),(),(),(),(),(),(),(),();
connection con2;
insert into t1() values (21), (22);
connection con1;

drop table t1;

#
# BUG#14514 Creating table with packed key fails silently
#

CREATE TABLE t1 ( b INT ) PACK_KEYS = 0 ENGINE = ndb;
select * from t1;
drop table t1;

#
# Bug #17249 delete statement with join where clause fails 
# when table do not have pk
	#

create table t1 (a int) engine=ndb;
create table t2 (a int) engine=ndb;
insert into t1 values (1);
insert into t2 values (1);
delete t1.* from t1, t2 where t1.a = t2.a;
select * from t1;
select * from t2;
drop table t1;
drop table t2;

#
# Bug #17257 update fails for inner joins if tables 
# do not have Primary Key
#

CREATE TABLE t1 (
  i   INT,
  j   INT,
  x   INT,
  y   INT,
  z   INT
) engine=ndb;

CREATE TABLE t2 (
  i   INT,
  k   INT,
  x   INT,
  y   INT,
  z   INT
) engine=ndb;

CREATE TABLE t3 (
  j   INT,
  k   INT,
  x   INT,
  y   INT,
  z   INT
) engine=ndb;

INSERT INTO t1 VALUES ( 1, 2,13,14,15);
INSERT INTO t2 VALUES ( 1, 3,23,24,25);
INSERT INTO t3 VALUES ( 2, 3, 1,34,35), ( 2, 3, 1,34,36);

UPDATE      t1 AS a
INNER JOIN  t2 AS b
              ON a.i = b.i
INNER JOIN  t3 AS c
              ON a.j = c.j  AND  b.k = c.k
SET         a.x = b.x,
            a.y = b.y,
            a.z = (
              SELECT  sum(z)
              FROM    t3
              WHERE   y = 34
            )
WHERE       b.x = 23;
select * from t1;
drop table t1;
drop table t2;
drop table t3;

# End of 4.1 tests

#
# Test long table name
#
create table atablewithareallylongandirritatingname (a int);
insert into atablewithareallylongandirritatingname values (2);
select * from atablewithareallylongandirritatingname;
drop table atablewithareallylongandirritatingname;

#
# Bug#15682
#
create table t1 (f1 varchar(50), f2 text,f3 int, primary key(f1)) engine=NDB;
insert into t1 (f1,f2,f3)VALUES("111111","aaaaaa",1);
insert into t1 (f1,f2,f3)VALUES("222222","bbbbbb",2);
select * from t1 order by f1;
select * from t1 order by f2;
select * from t1 order by f3;
drop table t1;
# Bug#16561 Unknown ERROR msg "ERROR 1186 (HY000): Binlog closed" by perror
#

# As long there is no error code 1186 defined by NDB
# we should get a message "Illegal ndb error code: 1186"
--error 1
--exec $MY_PERROR --ndb 1186 2>&1

#
# Bug #25746 - VARCHAR UTF8 PK issue
# - prior to bugfix 4209, illegal length parameter would be
# returned in SELECT *

CREATE TABLE t1 (
a VARBINARY(40) NOT NULL,
b VARCHAR (256) CHARACTER SET UTF8 NOT NULL,
c VARCHAR(256) CHARACTER SET UTF8 NOT NULL,
PRIMARY KEY (b,c))  ENGINE=ndbcluster;
INSERT INTO t1 VALUES
("a","ab","abc"),("b","abc","abcd"),("c","abc","ab"),("d","ab","ab"),("e","abc","abc");
SELECT * FROM t1 ORDER BY a;
DROP TABLE t1;

# delete
create table t1 (a int not null primary key, b int not null) engine=ndb;
create table t2 (a int not null primary key, b int not null) engine=ndb;
insert into t1 values (1,10), (2,20), (3,30);
insert into t2 values (1,10), (2,20), (3,30);
select * from t1 order by a;
delete from t1 where a > 0 order by a desc limit 1;
select * from t1 order by a;
delete from t1,t2 using t1,t2 where t1.a = t2.a;
select * from t2 order by a;
drop table t1,t2;

# insert ignore
create table t1 (a int not null primary key, b int not null) engine=ndb;
insert into t1 values (1,10), (2,20), (3,30);
--error ER_DUP_ENTRY
insert into t1 set a=1, b=100;
insert ignore into t1 set a=1, b=100;
select * from t1 order by a;
insert into t1 set a=1, b=1000 on duplicate key update b=b+1;
select * from t1 order by a;
drop table t1;

# update
create table t1 (a int not null primary key, b int not null) engine=ndb;
create table t2 (c int not null primary key, d int not null) engine=ndb;
insert into t1 values (1,10), (2,10), (3,30), (4, 30);
insert into t2 values (1,10), (2,10), (3,30), (4, 30);
--error ER_DUP_ENTRY
update t1 set a = 1 where a = 3;
select * from t1 order by a;
update t1 set b = 1 where a > 1 order by a desc limit 1;
select * from t1 order by a;
--error ER_DUP_ENTRY
update t1,t2 set a = 1, c = 1 where a = 3 and c = 3;
select * from t1 order by a;
update ignore t1,t2 set a = 1, c = 1 where a = 3 and c = 3;
select * from t1 order by a;
drop table t1,t2;

# End of 5.0 tests
--echo End of 5.0 tests


#
# Bug #18483 Cannot create table with FK constraint
# ndb does not support foreign key constraint, it is silently ignored
# in line with other storage engines
#
CREATE TABLE t1 (a VARCHAR(255) NOT NULL,
                 CONSTRAINT pk_a PRIMARY KEY (a))engine=ndb;
CREATE TABLE t2(a VARCHAR(255) NOT NULL,
                b VARCHAR(255) NOT NULL,
                c VARCHAR(255) NOT NULL,
                CONSTRAINT pk_b_c_id PRIMARY KEY (b,c),
                CONSTRAINT fk_a FOREIGN KEY(a) REFERENCES t1(a))engine=ndb;
drop table t1, t2;

# bug#24301
create table t1 (a int not null primary key, b int) engine=ndb;
insert into t1 values(1,1),(2,2),(3,3);
create table t2 like t1;
insert into t2 select * from t1;
select * from t1 order by a;
select * from t2 order by a;
drop table t1, t2;

# create table if not exists
--disable_warnings
create table t1 (a int not null primary key, b int not null default 0, c varchar(254)) engine=ndb;
create table if not exists t1 (a int not null primary key, b int not null default 0, c varchar(254)) engine=ndb;
--enable_warnings

# create like
create table t2 like t1;

# multi rename
rename table t1 to t10, t2 to t20;
drop table t10,t20;

--echo End of 5.1 tests