Commit 99880af8 authored by unknown's avatar unknown

wl#2126 - ndb - Fix handling of null values wrt read multi range


mysql-test/r/ndb_read_multi_range.result:
  Add tests of null handling to read_multi
mysql-test/t/ndb_read_multi_range.test:
  Add tests of null handling to read_multi
ndb/include/ndbapi/NdbOperation.hpp:
  Get recattr
ndb/include/ndbapi/NdbRecAttr.hpp:
  Get recattr
sql/ha_ndbcluster.cc:
  Fix handling of null values wrt read multi range
sql/ha_ndbcluster.h:
  Fix handling of null values wrt read multi range
parent 34944e95
...@@ -212,3 +212,54 @@ delete from t1 where d in (12,6,7); ...@@ -212,3 +212,54 @@ delete from t1 where d in (12,6,7);
select * from t1 where d in (12,6,7); select * from t1 where d in (12,6,7);
a b c d e a b c d e
drop table t1; drop table t1;
create table t1 (
a int not null primary key,
b int,
c int,
d int,
unique index (b),
index(c)
) engine = ndb;
insert into t1 values
(1,null,1,1),
(2,2,2,2),
(3,null,null,3),
(4,4,null,4),
(5,null,5,null),
(6,6,6,null),
(7,null,null,null),
(8,8,null,null),
(9,null,9,9),
(10,10,10,10),
(11,null,null,11),
(12,12,null,12),
(13,null,13,null),
(14,14,14,null),
(15,null,null,null),
(16,16,null,null);
create table t2 as select * from t1 where a in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
5 NULL 5 NULL
6 6 6 NULL
7 NULL NULL NULL
8 8 NULL NULL
9 NULL 9 9
10 10 10 10
drop table t2;
create table t2 as select * from t1 where b in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
6 6 6 NULL
8 8 NULL NULL
10 10 10 10
drop table t2;
create table t2 as select * from t1 where c in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
5 NULL 5 NULL
6 6 6 NULL
9 NULL 9 9
10 10 10 10
drop table t2;
drop table t1;
...@@ -157,3 +157,45 @@ delete from t1 where d in (12,6,7); ...@@ -157,3 +157,45 @@ delete from t1 where d in (12,6,7);
select * from t1 where d in (12,6,7); select * from t1 where d in (12,6,7);
drop table t1; drop table t1;
# null handling
create table t1 (
a int not null primary key,
b int,
c int,
d int,
unique index (b),
index(c)
) engine = ndb;
insert into t1 values
(1,null,1,1),
(2,2,2,2),
(3,null,null,3),
(4,4,null,4),
(5,null,5,null),
(6,6,6,null),
(7,null,null,null),
(8,8,null,null),
(9,null,9,9),
(10,10,10,10),
(11,null,null,11),
(12,12,null,12),
(13,null,13,null),
(14,14,14,null),
(15,null,null,null),
(16,16,null,null);
create table t2 as select * from t1 where a in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
create table t2 as select * from t1 where b in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
create table t2 as select * from t1 where c in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
drop table t1;
...@@ -750,6 +750,7 @@ protected: ...@@ -750,6 +750,7 @@ protected:
NdbOperation* next(); // Get next pointer NdbOperation* next(); // Get next pointer
public: public:
const NdbOperation* next() const; const NdbOperation* next() const;
const NdbRecAttr* getFirstRecAttr() const;
protected: protected:
enum OperationStatus enum OperationStatus
...@@ -1005,6 +1006,14 @@ NdbOperation::next() const ...@@ -1005,6 +1006,14 @@ NdbOperation::next() const
{ {
return theNext; return theNext;
} }
inline
const NdbRecAttr*
NdbOperation::getFirstRecAttr() const
{
return theReceiver.theFirstRecAttr;
}
/****************************************************************************** /******************************************************************************
OperationStatus Status(); OperationStatus Status();
......
...@@ -241,6 +241,9 @@ public: ...@@ -241,6 +241,9 @@ public:
* i.e. objects that has been cloned. * i.e. objects that has been cloned.
*/ */
~NdbRecAttr(); ~NdbRecAttr();
public:
const NdbRecAttr* next() const;
private: private:
NdbRecAttr(); NdbRecAttr();
...@@ -252,7 +255,7 @@ private: ...@@ -252,7 +255,7 @@ private:
void init(); /* Initialise object when allocated */ void init(); /* Initialise object when allocated */
void next(NdbRecAttr* aRecAttr); void next(NdbRecAttr* aRecAttr);
NdbRecAttr* next() const; NdbRecAttr* next();
int setup(const class NdbDictionary::Column* col, char* aValue); int setup(const class NdbDictionary::Column* col, char* aValue);
int setup(const class NdbColumnImpl* anAttrInfo, char* aValue); int setup(const class NdbColumnImpl* anAttrInfo, char* aValue);
...@@ -401,6 +404,13 @@ NdbRecAttr::next(NdbRecAttr* aRecAttr) ...@@ -401,6 +404,13 @@ NdbRecAttr::next(NdbRecAttr* aRecAttr)
inline inline
NdbRecAttr* NdbRecAttr*
NdbRecAttr::next()
{
return theNext;
}
inline
const NdbRecAttr*
NdbRecAttr::next() const NdbRecAttr::next() const
{ {
return theNext; return theNext;
......
...@@ -1118,8 +1118,8 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -1118,8 +1118,8 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
for (i= 0; i < no_fields; i++) for (i= 0; i < no_fields; i++)
{ {
Field *field= table->field[i]; Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) && if (!((field->flags & PRI_KEY_FLAG) ||
(thd->query_id != field->query_id)) (thd->query_id == field->query_id)))
{ {
if (get_ndb_value(op, field, i, new_data)) if (get_ndb_value(op, field, i, new_data))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
...@@ -1135,6 +1135,20 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ...@@ -1135,6 +1135,20 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// The value have now been fetched from NDB // The value have now been fetched from NDB
unpack_record(new_data); unpack_record(new_data);
table->status= 0; table->status= 0;
/**
* restore m_value
*/
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if (!((field->flags & PRI_KEY_FLAG) ||
(thd->query_id == field->query_id)))
{
m_value[i].ptr= NULL;
}
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1931,7 +1945,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -1931,7 +1945,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been // Require that the PK for this record has previously been
// read into m_value // read into m_value
uint no_fields= table->fields; uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields].rec; const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec); DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH); DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
...@@ -2013,7 +2027,7 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -2013,7 +2027,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key // This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key")); DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields; uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields].rec; const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL); DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef())) if (set_hidden_key(op, no_fields, rec->aRef()))
...@@ -2058,6 +2072,8 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -2058,6 +2072,8 @@ void ha_ndbcluster::unpack_record(byte* buf)
NdbValue *value= m_value; NdbValue *value= m_value;
DBUG_ENTER("unpack_record"); DBUG_ENTER("unpack_record");
end = table->field + table->fields;
// Set null flag(s) // Set null flag(s)
bzero(buf, table->null_bytes); bzero(buf, table->null_bytes);
for (field= table->field, end= field+table->fields; for (field= table->field, end= field+table->fields;
...@@ -2091,7 +2107,7 @@ void ha_ndbcluster::unpack_record(byte* buf) ...@@ -2091,7 +2107,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields; int hidden_no= table->fields;
const NDBTAB *tab= (const NDBTAB *) m_table; const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no); const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no].rec; const NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec); DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no, DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value())); hidden_col->getName(), rec->u_64_value()));
...@@ -2613,7 +2629,7 @@ void ha_ndbcluster::position(const byte *record) ...@@ -2613,7 +2629,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key // No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key")); DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields; int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no].rec; const NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (const NDBTAB *) m_table; const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no); const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() && DBUG_ASSERT(hidden_col->getPrimaryKey() &&
...@@ -4998,6 +5014,7 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -4998,6 +5014,7 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
*/ */
* multi_range_found_p= multi_ranges + multi_range_curr; * multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength); memcpy(table->record[0], m_multi_range_result_ptr, reclength);
setup_recattr(m_active_cursor->getOperation()->getFirstRecAttr());
unpack_record(table->record[0]); unpack_record(table->record[0]);
table->status= 0; table->status= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -5009,13 +5026,37 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p) ...@@ -5009,13 +5026,37 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
*/ */
* multi_range_found_p= multi_ranges + multi_range_curr; * multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength); memcpy(table->record[0], m_multi_range_result_ptr, reclength);
setup_recattr(op->getFirstRecAttr());
unpack_record(table->record[0]); unpack_record(table->record[0]);
table->status= 0; table->status= 0;
multi_range_curr++; multi_range_curr++;
op= m_active_trans->getNextCompletedOperation(op); m_current_multi_operation= m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength; m_multi_range_result_ptr += reclength;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int
ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
{
DBUG_ENTER("setup_recattr");
Field **field, **end;
NdbValue *value= m_value;
end = table->field + table->fields;
for (field= table->field; field < end; field++, value++)
{
if ((* value).ptr)
{
DBUG_ASSERT(curr != 0);
(* value).rec = curr;
curr = curr->next();
}
}
return 0;
}
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -230,7 +230,7 @@ class ha_ndbcluster: public handler ...@@ -230,7 +230,7 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share; NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY]; NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob // NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write; bool m_use_write;
bool m_ignore_dup_key; bool m_ignore_dup_key;
...@@ -259,6 +259,7 @@ class ha_ndbcluster: public handler ...@@ -259,6 +259,7 @@ class ha_ndbcluster: public handler
byte* m_multi_range_result_ptr; byte* m_multi_range_result_ptr;
uint m_multi_range_defined_count; uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation; const NdbOperation* m_current_multi_operation;
int setup_recattr(const NdbRecAttr*);
void set_rec_per_key(); void set_rec_per_key();
void records_update(); void records_update();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment