Commit 0caba84b authored by pekka@mysql.com's avatar pekka@mysql.com

ndb charsets: DICT

parent d0d2b820
......@@ -438,8 +438,8 @@ public:
case DictTabInfo::ExtText:
AttributeType = DictTabInfo::StringType;
AttributeSize = DictTabInfo::an8Bit;
// head + inline part [ attr precision ]
AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + AttributeExtPrecision;
// head + inline part [ attr precision lower half ]
AttributeArraySize = (NDB_BLOB_HEAD_SIZE << 2) + (AttributeExtPrecision & 0xFFFF);
return true;
};
return false;
......
......@@ -32,6 +32,8 @@
#include <ndb_types.h>
class Ndb;
struct charset_info_st;
typedef struct charset_info_st CHARSET_INFO;
/**
* @class NdbDictionary
......@@ -305,6 +307,14 @@ public:
*/
int getLength() const;
/**
* For Char or Varchar or Text, set or get MySQL CHARSET_INFO. This
* specifies both character set and collation. See get_charset()
* etc in MySQL. (The cs is not "const" in MySQL).
*/
void setCharset(CHARSET_INFO* cs);
CHARSET_INFO* getCharset() const;
/**
* For blob, set or get "inline size" i.e. number of initial bytes
* to store in table's blob attribute. This part is normally in
......
......@@ -90,6 +90,13 @@ public:
*/
static const Type& getType(Uint32 typeId);
/**
* Check character set.
*/
static bool usable_in_pk(Uint32 typeId, const void* cs);
static bool usable_in_hash_index(Uint32 typeId, const void* cs);
static bool usable_in_ordered_index(Uint32 typeId, const void* cs);
private:
/**
* List of all types. Must match Type::Enum.
......
......@@ -529,6 +529,83 @@ NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size
return CmpUnknown;
}
// check charset
bool
NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
switch (type.m_typeId) {
case Type::Undefined:
break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
return
cs != 0 &&
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->strxfrm_multiply == 1; // current limitation
}
break;
case Type::Varchar:
return true; // Varchar not used via MySQL
case Type::Blob:
case Type::Text:
break;
default:
return true;
}
return false;
}
bool
NdbSqlUtil::usable_in_hash_index(Uint32 typeId, const void* info)
{
return usable_in_pk(typeId, info);
}
bool
NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info)
{
const Type& type = getType(typeId);
switch (type.m_typeId) {
case Type::Undefined:
break;
case Type::Char:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
return
cs != 0 &&
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->coll->strnncollsp != 0 &&
cs->strxfrm_multiply == 1; // current limitation
}
break;
case Type::Varchar:
return true; // Varchar not used via MySQL
case Type::Text:
{
const CHARSET_INFO *cs = (const CHARSET_INFO*)info;
return
cs != 0 &&
cs->mbmaxlen == 1 && // extra limitation
cs->cset != 0 &&
cs->coll != 0 &&
cs->coll->strnxfrm != 0 &&
cs->coll->strnncollsp != 0 &&
cs->strxfrm_multiply == 1; // current limitation
}
break;
default:
return true;
}
return false;
}
#ifdef NDB_SQL_UTIL_TEST
#include <NdbTick.h>
......
......@@ -6317,6 +6317,8 @@ Dbdict::createIndex_toCreateTable(Signal* signal, OpCreateIndexPtr opPtr)
w.add(DictTabInfo::AttributeStoredInd, (Uint32)DictTabInfo::Stored);
// ext type overrides
w.add(DictTabInfo::AttributeExtType, aRec->extType);
w.add(DictTabInfo::AttributeExtPrecision, aRec->extPrecision);
w.add(DictTabInfo::AttributeExtScale, aRec->extScale);
w.add(DictTabInfo::AttributeExtLength, aRec->extLength);
w.add(DictTabInfo::AttributeEnd, (Uint32)true);
}
......
......@@ -109,6 +109,18 @@ NdbDictionary::Column::setInlineSize(int size)
m_impl.m_precision = size;
}
void
NdbDictionary::Column::setCharset(CHARSET_INFO* cs)
{
m_impl.m_cs = cs;
}
CHARSET_INFO*
NdbDictionary::Column::getCharset() const
{
return m_impl.m_cs;
}
int
NdbDictionary::Column::getInlineSize() const
{
......@@ -856,6 +868,8 @@ NdbDictionary::Dictionary::getNdbError() const {
NdbOut&
operator<<(NdbOut& out, const NdbDictionary::Column& col)
{
const CHARSET_INFO *cs = col.getCharset();
const char *csname = cs ? cs->name : "?";
out << col.getName() << " ";
switch (col.getType()) {
case NdbDictionary::Column::Tinyint:
......@@ -898,10 +912,10 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
out << "Decimal(" << col.getScale() << "," << col.getPrecision() << ")";
break;
case NdbDictionary::Column::Char:
out << "Char(" << col.getLength() << ")";
out << "Char(" << col.getLength() << ";" << csname << ")";
break;
case NdbDictionary::Column::Varchar:
out << "Varchar(" << col.getLength() << ")";
out << "Varchar(" << col.getLength() << ";" << csname << ")";
break;
case NdbDictionary::Column::Binary:
out << "Binary(" << col.getLength() << ")";
......@@ -921,7 +935,7 @@ operator<<(NdbOut& out, const NdbDictionary::Column& col)
break;
case NdbDictionary::Column::Text:
out << "Text(" << col.getInlineSize() << "," << col.getPartSize()
<< ";" << col.getStripeSize() << ")";
<< ";" << col.getStripeSize() << ";" << csname << ")";
break;
case NdbDictionary::Column::Undefined:
out << "Undefined";
......
......@@ -36,6 +36,7 @@
#include "NdbEventOperationImpl.hpp"
#include "NdbBlob.hpp"
#include <AttributeHeader.hpp>
#include <my_sys.h>
#define DEBUG_PRINT 0
#define INCOMPATIBLE_VERSION -2
......@@ -64,6 +65,7 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col)
m_name = col.m_name;
m_type = col.m_type;
m_precision = col.m_precision;
m_cs = col.m_cs;
m_scale = col.m_scale;
m_length = col.m_length;
m_pk = col.m_pk;
......@@ -89,6 +91,9 @@ NdbColumnImpl::operator=(const NdbColumnImpl& col)
void
NdbColumnImpl::init(Type t)
{
// do not use default_charset_info as it may not be initialized yet
// use binary collation until NDB tests can handle charsets
CHARSET_INFO* default_cs = &my_charset_latin1_bin;
m_attrId = -1;
m_type = t;
switch (m_type) {
......@@ -107,17 +112,20 @@ NdbColumnImpl::init(Type t)
m_precision = 0;
m_scale = 0;
m_length = 1;
m_cs = NULL;
break;
case Decimal:
m_precision = 10;
m_scale = 0;
m_length = 1;
m_cs = NULL;
break;
case Char:
case Varchar:
m_precision = 0;
m_scale = 0;
m_length = 1;
m_cs = default_cs;
break;
case Binary:
case Varbinary:
......@@ -126,16 +134,19 @@ NdbColumnImpl::init(Type t)
m_precision = 0;
m_scale = 0;
m_length = 1;
m_cs = NULL;
break;
case Blob:
m_precision = 256;
m_scale = 8000;
m_length = 4;
m_cs = NULL;
break;
case Text:
m_precision = 256;
m_scale = 8000;
m_length = 4;
m_cs = default_cs;
break;
}
m_pk = false;
......@@ -191,52 +202,12 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const
return false;
}
}
if(m_length != col.m_length){
return false;
}
switch(m_type){
case NdbDictionary::Column::Undefined:
break;
case NdbDictionary::Column::Tinyint:
case NdbDictionary::Column::Tinyunsigned:
case NdbDictionary::Column::Smallint:
case NdbDictionary::Column::Smallunsigned:
case NdbDictionary::Column::Mediumint:
case NdbDictionary::Column::Mediumunsigned:
case NdbDictionary::Column::Int:
case NdbDictionary::Column::Unsigned:
case NdbDictionary::Column::Float:
break;
case NdbDictionary::Column::Decimal:
if(m_scale != col.m_scale ||
m_precision != col.m_precision){
return false;
}
break;
case NdbDictionary::Column::Char:
case NdbDictionary::Column::Varchar:
case NdbDictionary::Column::Binary:
case NdbDictionary::Column::Varbinary:
if(m_length != col.m_length){
return false;
}
break;
case NdbDictionary::Column::Bigint:
case NdbDictionary::Column::Bigunsigned:
case NdbDictionary::Column::Double:
case NdbDictionary::Column::Datetime:
case NdbDictionary::Column::Timespec:
break;
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Text:
if (m_precision != col.m_precision ||
m_scale != col.m_scale ||
m_length != col.m_length) {
m_length != col.m_length ||
m_cs != col.m_cs) {
return false;
}
break;
}
if (m_autoIncrement != col.m_autoIncrement){
return false;
}
......@@ -1176,6 +1147,7 @@ indexTypeMapping[] = {
{ -1, -1 }
};
// TODO: remove, api-kernel type codes must match now
static const
ApiKernelMapping
columnTypeMapping[] = {
......@@ -1282,9 +1254,23 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
return 703;
}
col->m_extType = attrDesc.AttributeExtType;
col->m_precision = attrDesc.AttributeExtPrecision;
col->m_precision = (attrDesc.AttributeExtPrecision & 0xFFFF);
col->m_scale = attrDesc.AttributeExtScale;
col->m_length = attrDesc.AttributeExtLength;
// charset in upper half of precision
unsigned cs_number = (attrDesc.AttributeExtPrecision >> 16);
// charset is defined exactly for char types
if (col->getCharType() != (cs_number != 0)) {
delete impl;
return 703;
}
if (col->getCharType()) {
col->m_cs = get_charset(cs_number, MYF(0));
if (col->m_cs == NULL) {
delete impl;
return 743;
}
}
// translate to old kernel types and sizes
if (! attrDesc.translateExtType()) {
......@@ -1535,9 +1521,23 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
getKernelConstant(col->m_type,
columnTypeMapping,
DictTabInfo::ExtUndefined);
tmpAttr.AttributeExtPrecision = col->m_precision;
tmpAttr.AttributeExtPrecision = ((unsigned)col->m_precision & 0xFFFF);
tmpAttr.AttributeExtScale = col->m_scale;
tmpAttr.AttributeExtLength = col->m_length;
// charset is defined exactly for char types
if (col->getCharType() != (col->m_cs != NULL)) {
m_error.code = 703;
return -1;
}
// primary key type check
if (col->m_pk && ! NdbSqlUtil::usable_in_pk(col->m_type, col->m_cs)) {
m_error.code = 743;
return -1;
}
// charset in upper half of precision
if (col->getCharType()) {
tmpAttr.AttributeExtPrecision |= (col->m_cs->number << 16);
}
// DICT will ignore and recompute this
(void)tmpAttr.translateExtType();
......@@ -1999,6 +1999,14 @@ NdbDictInterface::createIndex(Ndb & ndb,
m_error.code = 4245;
return -1;
}
// index key type check
if (it == DictTabInfo::UniqueHashIndex &&
! NdbSqlUtil::usable_in_hash_index(col->m_type, col->m_cs) ||
it == DictTabInfo::OrderedIndex &&
! NdbSqlUtil::usable_in_ordered_index(col->m_type, col->m_cs)) {
m_error.code = 743;
return -1;
}
attributeList.id[i] = col->m_attrId;
}
if (it == DictTabInfo::UniqueHashIndex) {
......
......@@ -60,6 +60,7 @@ public:
int m_precision;
int m_scale;
int m_length;
CHARSET_INFO * m_cs; // not const in MySQL
bool m_pk;
bool m_tupleKey;
......@@ -82,6 +83,7 @@ public:
Uint32 m_keyInfoPos;
Uint32 m_extType; // used by restore (kernel type in versin v2x)
bool getInterpretableType() const ;
bool getCharType() const;
bool getBlobType() const;
/**
......@@ -446,6 +448,14 @@ NdbColumnImpl::getInterpretableType() const {
m_type == NdbDictionary::Column::Bigunsigned);
}
inline
bool
NdbColumnImpl::getCharType() const {
return (m_type == NdbDictionary::Column::Char ||
m_type == NdbDictionary::Column::Varchar ||
m_type == NdbDictionary::Column::Text);
}
inline
bool
NdbColumnImpl::getBlobType() const {
......
......@@ -280,6 +280,9 @@ ErrorBundle ErrorCodes[] = {
{ 739, SE, "Unsupported primary key length" },
{ 740, SE, "Nullable primary key not supported" },
{ 741, SE, "Unsupported alter table" },
{ 742, SE, "Unsupported attribute type in index" },
{ 743, SE, "Unsupported character set in table or index" },
{ 744, SE, "Character conversion error" },
{ 241, SE, "Invalid schema object version" },
{ 283, SE, "Table is being dropped" },
{ 284, SE, "Table not defined in transaction coordinator" },
......
......@@ -2897,6 +2897,8 @@ static int create_ndb_column(NDBCOL &col,
{
// Set name
col.setName(field->field_name);
// Get char set
CHARSET_INFO *cs= field->charset();
// Set type and sizes
const enum enum_field_types mysql_type= field->real_type();
switch (mysql_type) {
......@@ -2968,15 +2970,19 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Binary);
else
else {
col.setType(NDBCOL::Char);
col.setCharset(cs);
}
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_VAR_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Varbinary);
else
else {
col.setType(NDBCOL::Varchar);
col.setCharset(cs);
}
col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
......@@ -2984,8 +2990,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_TINY_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
else {
col.setType(NDBCOL::Text);
col.setCharset(cs);
}
col.setInlineSize(256);
// No parts
col.setPartSize(0);
......@@ -2995,8 +3003,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
else {
col.setType(NDBCOL::Text);
col.setCharset(cs);
}
// Use "<=" even if "<" is the exact condition
if (field->max_length() <= (1 << 8))
goto mysql_type_tiny_blob;
......@@ -3015,8 +3025,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_MEDIUM_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
else {
col.setType(NDBCOL::Text);
col.setCharset(cs);
}
col.setInlineSize(256);
col.setPartSize(4000);
col.setStripeSize(8);
......@@ -3025,8 +3037,10 @@ static int create_ndb_column(NDBCOL &col,
case MYSQL_TYPE_LONG_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
else {
col.setType(NDBCOL::Text);
col.setCharset(cs);
}
col.setInlineSize(256);
col.setPartSize(8000);
col.setStripeSize(4);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment