ha_cassandra.cc 69.8 KB
Newer Older
1
/*
Sergey Petrunya's avatar
Sergey Petrunya committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15
   Copyright (c) 2012, Monty Program Ab

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 17 18 19 20 21 22 23 24

#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation        // gcc: Class implementation
#endif

#include <mysql/plugin.h>
#include "ha_cassandra.h"
#include "sql_class.h"

25 26 27 28 29
#define DYNCOL_USUAL 20
#define DYNCOL_DELTA 100
#define DYNCOL_USUAL_REC 1024
#define DYNCOL_DELTA_REC 1024

30
static handler *cassandra_create_handler(handlerton *hton,
31
                                       TABLE_SHARE *table,
32 33
                                       MEM_ROOT *mem_root);

34
extern int dynamic_column_error_message(enum_dyncol_func_result rc);
35 36 37 38

handlerton *cassandra_hton;


39
/*
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
   Hash used to track the number of open tables; variable for example share
   methods
*/
static HASH cassandra_open_tables;

/* The mutex used to init the hash; variable for example share methods */
mysql_mutex_t cassandra_mutex;


/**
  Structure for CREATE TABLE options (table options).
  It needs to be called ha_table_option_struct.

  The option values can be specified in the CREATE TABLE at the end:
  CREATE TABLE ( ... ) *here*
*/

struct ha_table_option_struct
{
Sergey Petrunya's avatar
Sergey Petrunya committed
59 60
  const char *thrift_host;
  int         thrift_port;
61 62 63 64 65 66 67 68 69 70
  const char *keyspace;
  const char *column_family;
};


ha_create_table_option cassandra_table_option_list[]=
{
  /*
    one option that takes an arbitrary string
  */
Sergey Petrunya's avatar
Sergey Petrunya committed
71 72
  HA_TOPTION_STRING("thrift_host", thrift_host),
  HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
73 74 75 76 77
  HA_TOPTION_STRING("keyspace", keyspace),
  HA_TOPTION_STRING("column_family", column_family),
  HA_TOPTION_END
};

78 79 80 81 82 83 84 85 86 87 88 89 90 91
/**
  Structure for CREATE TABLE options (field options).
*/

struct ha_field_option_struct
{
  bool dyncol_field;
};

ha_create_table_option cassandra_field_option_list[]=
{
  /*
    Collect all other columns as dynamic here,
    the valid values are YES/NO, ON/OFF, 1/0.
92
    The default is 0, that is false, no, off.
93 94 95 96
  */
  HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
  HA_FOPTION_END
};
97

98 99 100 101
static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
  "Number of rows in an INSERT batch",
  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);

102 103 104
static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
  "Number of rows in a multiget(MRR) batch",
  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
105

106 107 108 109
static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
  "Number of rows in an rnd_read (full scan) batch",
  NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);

110 111 112 113 114
static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
  "Number of times to retry Cassandra calls that failed due to timeouts or "
  "network communication problems. The default, 0, means not to retry.",
  NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);

115
/* These match values in enum_cassandra_consistency_level */
unknown's avatar
unknown committed
116
const char *cassandra_consistency_level[] =
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
{
  "ONE",
  "QUORUM",
  "LOCAL_QUORUM",
  "EACH_QUORUM",
  "ALL",
  "ANY",
  "TWO",
  "THREE",
   NullS
};

TYPELIB cassandra_consistency_level_typelib= {
  array_elements(cassandra_consistency_level) - 1, "",
  cassandra_consistency_level, NULL
};


static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
  "Cassandra consistency level to use for write operations", NULL, NULL,
  ONE, &cassandra_consistency_level_typelib);

static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
  "Cassandra consistency level to use for read operations", NULL, NULL,
  ONE, &cassandra_consistency_level_typelib);


Sergey Petrunya's avatar
Sergey Petrunya committed
144 145 146 147
mysql_mutex_t cassandra_default_host_lock;
static char* cassandra_default_thrift_host = NULL;
static char cassandra_default_host_buf[256]="";

unknown's avatar
unknown committed
148 149
static void
cassandra_default_thrift_host_update(THD *thd,
Sergey Petrunya's avatar
Sergey Petrunya committed
150 151 152
                                     struct st_mysql_sys_var* var,
                                     void* var_ptr, /*!< out: where the
                                                    formal string goes */
unknown's avatar
unknown committed
153
                                     const void* save) /*!< in: immediate result
Sergey Petrunya's avatar
Sergey Petrunya committed
154 155 156 157 158 159
                                                       from check function */
{
  const char *new_host= *((char**)save);
  const size_t max_len= sizeof(cassandra_default_host_buf);

  mysql_mutex_lock(&cassandra_default_host_lock);
unknown's avatar
unknown committed
160

Sergey Petrunya's avatar
Sergey Petrunya committed
161 162
  if (new_host)
  {
Sergey Petrunya's avatar
Sergey Petrunya committed
163 164
    strncpy(cassandra_default_host_buf, new_host, max_len-1);
    cassandra_default_host_buf[max_len-1]= 0;
Sergey Petrunya's avatar
Sergey Petrunya committed
165 166 167 168 169 170 171
    cassandra_default_thrift_host= cassandra_default_host_buf;
  }
  else
  {
    cassandra_default_host_buf[0]= 0;
    cassandra_default_thrift_host= NULL;
  }
unknown's avatar
unknown committed
172

Sergey Petrunya's avatar
Sergey Petrunya committed
173 174 175 176 177 178 179
  *((const char**)var_ptr)= cassandra_default_thrift_host;

  mysql_mutex_unlock(&cassandra_default_host_lock);
}


static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
unknown's avatar
unknown committed
180 181
                        PLUGIN_VAR_RQCMDARG,
                        "Default host for Cassandra thrift connections",
Sergey Petrunya's avatar
Sergey Petrunya committed
182
                        /*check*/NULL,
unknown's avatar
unknown committed
183
                        cassandra_default_thrift_host_update,
Sergey Petrunya's avatar
Sergey Petrunya committed
184 185
                        /*default*/NULL);

186 187
static struct st_mysql_sys_var* cassandra_system_variables[]= {
  MYSQL_SYSVAR(insert_batch_size),
188
  MYSQL_SYSVAR(multiget_batch_size),
189
  MYSQL_SYSVAR(rnd_batch_size),
Sergey Petrunya's avatar
Sergey Petrunya committed
190 191

  MYSQL_SYSVAR(default_thrift_host),
192 193
  MYSQL_SYSVAR(write_consistency),
  MYSQL_SYSVAR(read_consistency),
194
  MYSQL_SYSVAR(failure_retries),
195 196 197 198 199
  NULL
};

Cassandra_status_vars cassandra_counters;

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
/**
  @brief
  Function we use in the creation of our hash to get key.
*/

static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
                             my_bool not_used __attribute__((unused)))
{
  *length=share->table_name_length;
  return (uchar*) share->table_name;
}

#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;

static PSI_mutex_info all_cassandra_mutexes[]=
{
  { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
  { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
};

static void init_cassandra_psi_keys()
{
  const char* category= "cassandra";
  int count;

  if (PSI_server == NULL)
    return;

  count= array_elements(all_cassandra_mutexes);
  PSI_server->register_mutex(category, all_cassandra_mutexes, count);
}
#endif

static int cassandra_init_func(void *p)
{
  DBUG_ENTER("cassandra_init_func");

#ifdef HAVE_PSI_INTERFACE
  init_cassandra_psi_keys();
#endif

  cassandra_hton= (handlerton *)p;
  mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
  (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
                      (my_hash_get_key) cassandra_get_key,0,0);

  cassandra_hton->state=   SHOW_OPTION_YES;
  cassandra_hton->create=  cassandra_create_handler;
249
  /*
250 251 252 253
    Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
    TABLE to create an *empty* table from scratch. Cassandra table won't be
    emptied if re-created.
  */
254
  cassandra_hton->flags=   0;
255
  cassandra_hton->table_options= cassandra_table_option_list;
256 257 258
  cassandra_hton->field_options= cassandra_field_option_list;

  mysql_mutex_init(0 /* no instrumentation */,
Sergey Petrunya's avatar
Sergey Petrunya committed
259
                   &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
260 261 262 263 264 265 266 267 268 269 270 271 272

  DBUG_RETURN(0);
}


static int cassandra_done_func(void *p)
{
  int error= 0;
  DBUG_ENTER("cassandra_done_func");
  if (cassandra_open_tables.records)
    error= 1;
  my_hash_free(&cassandra_open_tables);
  mysql_mutex_destroy(&cassandra_mutex);
Sergey Petrunya's avatar
Sergey Petrunya committed
273
  mysql_mutex_destroy(&cassandra_default_host_lock);
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
  DBUG_RETURN(error);
}


/**
  @brief
  Example of simple lock controls. The "share" it creates is a
  structure we will pass to each cassandra handler. Do you have to have
  one of these? Well, you have pieces that are used for locking, and
  they are needed to function.
*/

static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
{
  CASSANDRA_SHARE *share;
  uint length;
  char *tmp_name;

  mysql_mutex_lock(&cassandra_mutex);
  length=(uint) strlen(table_name);

  if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
                                              (uchar*) table_name,
                                              length)))
  {
    if (!(share=(CASSANDRA_SHARE *)
          my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
                          &share, sizeof(*share),
                          &tmp_name, length+1,
                          NullS)))
    {
      mysql_mutex_unlock(&cassandra_mutex);
      return NULL;
    }

    share->use_count=0;
    share->table_name_length=length;
    share->table_name=tmp_name;
    strmov(share->table_name,table_name);
    if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
      goto error;
    thr_lock_init(&share->lock);
    mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
                     &share->mutex, MY_MUTEX_INIT_FAST);
  }
  share->use_count++;
  mysql_mutex_unlock(&cassandra_mutex);

  return share;

error:
  mysql_mutex_destroy(&share->mutex);
  my_free(share);

  return NULL;
}


/**
  @brief
  Free lock controls. We call this whenever we close a table. If the table had
  the last reference to the share, then we free memory associated with it.
*/

static int free_share(CASSANDRA_SHARE *share)
{
  mysql_mutex_lock(&cassandra_mutex);
  if (!--share->use_count)
  {
    my_hash_delete(&cassandra_open_tables, (uchar*) share);
    thr_lock_delete(&share->lock);
    mysql_mutex_destroy(&share->mutex);
    my_free(share);
  }
  mysql_mutex_unlock(&cassandra_mutex);

  return 0;
}


static handler* cassandra_create_handler(handlerton *hton,
355
                                       TABLE_SHARE *table,
356 357 358 359 360 361 362 363
                                       MEM_ROOT *mem_root)
{
  return new (mem_root) ha_cassandra(hton, table);
}


ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
  :handler(hton, table_arg),
364 365 366 367 368
   se(NULL), field_converters(NULL),
   special_type_field_converters(NULL),
   special_type_field_names(NULL), n_special_type_fields(0),
   rowkey_converter(NULL),
   dyncol_field(0), dyncol_set(0)
369 370 371 372 373 374 375 376 377 378 379 380 381
{}


static const char *ha_cassandra_exts[] = {
  NullS
};

const char **ha_cassandra::bas_ext() const
{
  return ha_cassandra_exts;
}


382
int ha_cassandra::connect_and_check_options(TABLE *table_arg)
383
{
384
  ha_table_option_struct *options= table_arg->s->option_struct;
Sergey Petrunya's avatar
Sergey Petrunya committed
385
  int res;
386
  DBUG_ENTER("ha_cassandra::connect_and_check_options");
387

388 389
  if ((res= check_field_options(table_arg->s->field)) ||
      (res= check_table_options(options)))
Sergey Petrunya's avatar
Sergey Petrunya committed
390 391
    DBUG_RETURN(res);

392
  se= create_cassandra_se();
393
  se->set_column_family(options->column_family);
Sergey Petrunya's avatar
Sergey Petrunya committed
394 395 396
  const char *thrift_host= options->thrift_host? options->thrift_host:
                           cassandra_default_thrift_host;
  if (se->connect(thrift_host, options->thrift_port, options->keyspace))
397 398 399 400 401
  {
    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }

402
  if (setup_field_converters(table_arg->field, table_arg->s->fields))
403 404 405 406
  {
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }

407 408 409 410
  DBUG_RETURN(0);
}


411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
int ha_cassandra::check_field_options(Field **fields)
{
  Field **field;
  uint i;
  DBUG_ENTER("ha_cassandra::check_field_options");
  for (field= fields, i= 0; *field; field++, i++)
  {
    ha_field_option_struct *field_options= (*field)->option_struct;
    if (field_options && field_options->dyncol_field)
    {
      if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
      {
         my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
         DBUG_RETURN(HA_WRONG_CREATE_OPTION);
      }
      dyncol_set= 1;
      dyncol_field= i;
      bzero(&dynamic_values, sizeof(dynamic_values));
      bzero(&dynamic_names, sizeof(dynamic_names));
      bzero(&dynamic_rec, sizeof(dynamic_rec));
    }
  }
  DBUG_RETURN(0);
}


437 438 439 440 441 442 443
int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
{
  DBUG_ENTER("ha_cassandra::open");

  if (!(share = get_share(name, table)))
    DBUG_RETURN(1);
  thr_lock_data_init(&share->lock,&lock,NULL);
unknown's avatar
unknown committed
444

445 446 447 448 449 450 451 452 453 454 455 456 457
  DBUG_ASSERT(!se);
  /*
    Don't do the following on open: it prevents SHOW CREATE TABLE when the server
    has gone away.
  */
  /*
  int res;
  if ((res= connect_and_check_options(table)))
  {
    DBUG_RETURN(res);
  }
  */

458
  info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
Sergey Petrunya's avatar
Sergey Petrunya committed
459
  insert_lineno= 0;
460

461 462 463 464 465 466 467 468 469
  DBUG_RETURN(0);
}


int ha_cassandra::close(void)
{
  DBUG_ENTER("ha_cassandra::close");
  delete se;
  se= NULL;
470
  free_field_converters();
471 472 473 474
  DBUG_RETURN(free_share(share));
}


Sergey Petrunya's avatar
Sergey Petrunya committed
475 476 477 478 479
int ha_cassandra::check_table_options(ha_table_option_struct *options)
{
  if (!options->thrift_host && (!cassandra_default_thrift_host ||
                                !cassandra_default_thrift_host[0]))
  {
unknown's avatar
unknown committed
480
    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
Sergey Petrunya's avatar
Sergey Petrunya committed
481 482 483 484 485 486 487
             "thrift_host table option must be specified, or "
             "@@cassandra_default_thrift_host must be set");
    return HA_WRONG_CREATE_OPTION;
  }

  if (!options->keyspace || !options->column_family)
  {
unknown's avatar
unknown committed
488
    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
Sergey Petrunya's avatar
Sergey Petrunya committed
489 490 491 492 493 494 495
             "keyspace and column_family table options must be specified");
    return HA_WRONG_CREATE_OPTION;
  }
  return 0;
}


496 497
/**
  @brief
498
  create() is called to create a table. The variable name will have the name
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  of the table.

  @details
  When create() is called you do not need to worry about
  opening the table. Also, the .frm file will have already been
  created so adjusting create_info is not necessary. You can overwrite
  the .frm file at this point if you wish to change the table
  definition, but there are no methods currently provided for doing
  so.

  Called from handle.cc by ha_create_table().

  @see
  ha_create_table() in handle.cc
*/

int ha_cassandra::create(const char *name, TABLE *table_arg,
                         HA_CREATE_INFO *create_info)
{
Sergey Petrunya's avatar
Sergey Petrunya committed
518
  int res;
519
  DBUG_ENTER("ha_cassandra::create");
unknown's avatar
unknown committed
520

521 522 523 524
  if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
      table_arg->key_info[0].key_parts != 1 ||
      table_arg->key_info[0].key_part[0].fieldnr != 1)
  {
unknown's avatar
unknown committed
525
    my_error(ER_WRONG_COLUMN_NAME, MYF(0),
Sergey Petrunya's avatar
Sergey Petrunya committed
526
             "Table must have PRIMARY KEY defined over the first column");
527 528 529 530
    DBUG_RETURN(HA_WRONG_CREATE_OPTION);
  }

  DBUG_ASSERT(!se);
531
  if ((res= connect_and_check_options(table_arg)))
Sergey Petrunya's avatar
Sergey Petrunya committed
532 533
    DBUG_RETURN(res);

Sergey Petrunya's avatar
Sergey Petrunya committed
534
  insert_lineno= 0;
535 536 537 538 539 540 541 542 543 544
  DBUG_RETURN(0);
}

/*
  Mapping needs to
  - copy value from MySQL record to Thrift buffer
  - copy value from Thrift bufer to MySQL record..

*/

545 546 547 548 549 550 551
/* Converter base */
class ColumnDataConverter
{
public:
  Field *field;

  /* This will save Cassandra's data in the Field */
unknown's avatar
unknown committed
552
  virtual int cassandra_to_mariadb(const char *cass_data,
553 554 555 556 557
                                    int cass_data_len)=0;

  /*
    This will get data from the Field pointer, store Cassandra's form
    in internal buffer, and return pointer/size.
Sergey Petrunya's avatar
Sergey Petrunya committed
558 559 560 561 562

    @return
      false - OK
      true  - Failed to convert value (completely, there is no value to insert
              at all).
563
  */
Sergey Petrunya's avatar
Sergey Petrunya committed
564
  virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
565 566 567 568 569 570 571 572
  virtual ~ColumnDataConverter() {};
};


class DoubleDataConverter : public ColumnDataConverter
{
  double buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
573
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
574 575 576 577
  {
    DBUG_ASSERT(cass_data_len == sizeof(double));
    double *pdata= (double*) cass_data;
    field->store(*pdata);
Sergey Petrunya's avatar
Sergey Petrunya committed
578
    return 0;
579
  }
unknown's avatar
unknown committed
580

Sergey Petrunya's avatar
Sergey Petrunya committed
581
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
582 583 584 585
  {
    buf= field->val_real();
    *cass_data= (char*)&buf;
    *cass_data_len=sizeof(double);
Sergey Petrunya's avatar
Sergey Petrunya committed
586
    return false;
587 588 589 590 591 592 593 594 595
  }
  ~DoubleDataConverter(){}
};


class FloatDataConverter : public ColumnDataConverter
{
  float buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
596
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
597 598 599 600
  {
    DBUG_ASSERT(cass_data_len == sizeof(float));
    float *pdata= (float*) cass_data;
    field->store(*pdata);
Sergey Petrunya's avatar
Sergey Petrunya committed
601
    return 0;
602
  }
603

Sergey Petrunya's avatar
Sergey Petrunya committed
604
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
605 606 607 608
  {
    buf= field->val_real();
    *cass_data= (char*)&buf;
    *cass_data_len=sizeof(float);
Sergey Petrunya's avatar
Sergey Petrunya committed
609
    return false;
610 611 612 613
  }
  ~FloatDataConverter(){}
};

614 615 616 617 618 619 620 621 622 623 624
static void flip64(const char *from, char* to)
{
  to[0]= from[7];
  to[1]= from[6];
  to[2]= from[5];
  to[3]= from[4];
  to[4]= from[3];
  to[5]= from[2];
  to[6]= from[1];
  to[7]= from[0];
}
625 626 627 628

class BigintDataConverter : public ColumnDataConverter
{
  longlong buf;
629
  bool flip; /* is false when reading counter columns */
630
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
631
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
632 633 634
  {
    longlong tmp;
    DBUG_ASSERT(cass_data_len == sizeof(longlong));
635 636 637 638
    if (flip)
      flip64(cass_data, (char*)&tmp);
    else
      memcpy(&tmp, cass_data, sizeof(longlong));
639
    field->store(tmp);
Sergey Petrunya's avatar
Sergey Petrunya committed
640
    return 0;
641
  }
unknown's avatar
unknown committed
642

Sergey Petrunya's avatar
Sergey Petrunya committed
643
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
644 645
  {
    longlong tmp= field->val_int();
646 647 648 649
    if (flip)
      flip64((const char*)&tmp, (char*)&buf);
    else
      memcpy(&buf, &tmp, sizeof(longlong));
650 651
    *cass_data= (char*)&buf;
    *cass_data_len=sizeof(longlong);
Sergey Petrunya's avatar
Sergey Petrunya committed
652
    return false;
653
  }
654
  BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
655 656 657
  ~BigintDataConverter(){}
};

658 659 660 661 662 663 664
static void flip32(const char *from, char* to)
{
  to[0]= from[3];
  to[1]= from[2];
  to[2]= from[1];
  to[3]= from[0];
}
665

666 667 668 669 670

class TinyintDataConverter : public ColumnDataConverter
{
  char buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
671
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
672 673 674
  {
    DBUG_ASSERT(cass_data_len == 1);
    field->store(cass_data[0]);
Sergey Petrunya's avatar
Sergey Petrunya committed
675
    return 0;
676 677 678 679 680 681 682 683 684 685 686 687 688
  }

  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
  {
    buf= field->val_int()? 1 : 0; /* TODO: error handling? */
    *cass_data= (char*)&buf;
    *cass_data_len= 1;
    return false;
  }
  ~TinyintDataConverter(){}
};


Sergey Petrunya's avatar
Sergey Petrunya committed
689 690 691 692
class Int32DataConverter : public ColumnDataConverter
{
  int32_t buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
693
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
Sergey Petrunya's avatar
Sergey Petrunya committed
694 695 696
  {
    int32_t tmp;
    DBUG_ASSERT(cass_data_len == sizeof(int32_t));
697
    flip32(cass_data, (char*)&tmp);
Sergey Petrunya's avatar
Sergey Petrunya committed
698
    field->store(tmp);
Sergey Petrunya's avatar
Sergey Petrunya committed
699
    return 0;
Sergey Petrunya's avatar
Sergey Petrunya committed
700
  }
unknown's avatar
unknown committed
701

Sergey Petrunya's avatar
Sergey Petrunya committed
702
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
Sergey Petrunya's avatar
Sergey Petrunya committed
703 704
  {
    int32_t tmp= field->val_int();
705
    flip32((const char*)&tmp, (char*)&buf);
Sergey Petrunya's avatar
Sergey Petrunya committed
706 707
    *cass_data= (char*)&buf;
    *cass_data_len=sizeof(int32_t);
Sergey Petrunya's avatar
Sergey Petrunya committed
708
    return false;
Sergey Petrunya's avatar
Sergey Petrunya committed
709 710 711 712 713
  }
  ~Int32DataConverter(){}
};


714 715 716
class StringCopyConverter : public ColumnDataConverter
{
  String buf;
Sergey Petrunya's avatar
Sergey Petrunya committed
717
  size_t max_length;
718
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
719
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
720
  {
Sergey Petrunya's avatar
Sergey Petrunya committed
721 722
    if ((size_t)cass_data_len > max_length)
      return 1;
723
    field->store(cass_data, cass_data_len,field->charset());
Sergey Petrunya's avatar
Sergey Petrunya committed
724
    return 0;
725
  }
unknown's avatar
unknown committed
726

Sergey Petrunya's avatar
Sergey Petrunya committed
727
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
728 729
  {
    String *pstr= field->val_str(&buf);
730
    *cass_data= (char*)pstr->ptr();
731
    *cass_data_len= pstr->length();
Sergey Petrunya's avatar
Sergey Petrunya committed
732
    return false;
733
  }
Sergey Petrunya's avatar
Sergey Petrunya committed
734
  StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
735 736 737 738
  ~StringCopyConverter(){}
};


739 740 741 742
class TimestampDataConverter : public ColumnDataConverter
{
  int64_t buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
743
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
744
  {
745
    /* Cassandra data is milliseconds-since-epoch in network byte order */
746 747 748
    int64_t tmp;
    DBUG_ASSERT(cass_data_len==8);
    flip64(cass_data, (char*)&tmp);
749
    /*
unknown's avatar
unknown committed
750
      store_TIME's arguments:
751 752 753 754
      - seconds since epoch
      - microsecond fraction of a second.
    */
    ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
Sergey Petrunya's avatar
Sergey Petrunya committed
755
    return 0;
756
  }
757

Sergey Petrunya's avatar
Sergey Petrunya committed
758
  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
759 760
  {
    my_time_t ts_time;
761
    ulong ts_microsec;
762
    int64_t tmp;
763
    ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec);
unknown's avatar
unknown committed
764

765
    /* Cassandra needs milliseconds-since-epoch */
766
    tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000;
767 768 769 770
    flip64((const char*)&tmp, (char*)&buf);

    *cass_data= (char*)&buf;
    *cass_data_len= 8;
Sergey Petrunya's avatar
Sergey Petrunya committed
771
    return false;
772 773 774 775
  }
  ~TimestampDataConverter(){}
};

Sergey Petrunya's avatar
Sergey Petrunya committed
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794


static int convert_hex_digit(const char c)
{
  int num;
  if (c >= '0' && c <= '9')
    num= c - '0';
  else if (c >= 'A' && c <= 'F')
    num= c - 'A' + 10;
  else if (c >= 'a' && c <= 'f')
    num= c - 'a' + 10;
  else
    return -1; /* Couldn't convert */
  return num;
}


const char map2number[]="0123456789abcdef";

795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
static void convert_uuid2string(char *str, const char *cass_data)
{
  char *ptr= str;
  /* UUID arrives as 16-byte number in network byte order */
  for (uint i=0; i < 16; i++)
  {
    *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
    *(ptr++)= map2number[cass_data[i] & 0xF];
    if (i == 3 || i == 5 || i == 7 || i == 9)
      *(ptr++)= '-';
  }
  *ptr= 0;
}

static bool convert_string2uuid(char *buf, const char *str)
{
  int lower, upper;
  for (uint i= 0; i < 16; i++)
  {
    if ((upper= convert_hex_digit(str[0])) == -1 ||
        (lower= convert_hex_digit(str[1])) == -1)
    {
      return true;
    }
    buf[i]= lower | (upper << 4);
    str += 2;
    if (i == 3 || i == 5 || i == 7 || i == 9)
    {
      if (str[0] != '-')
        return true;
      str++;
    }
  }
  return false;
}


Sergey Petrunya's avatar
Sergey Petrunya committed
832 833 834 835 836
class UuidDataConverter : public ColumnDataConverter
{
  char buf[16]; /* Binary UUID representation */
  String str_buf;
public:
Sergey Petrunya's avatar
Sergey Petrunya committed
837
  int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
Sergey Petrunya's avatar
Sergey Petrunya committed
838 839 840
  {
    DBUG_ASSERT(cass_data_len==16);
    char str[37];
841
    convert_uuid2string(str, cass_data);
Sergey Petrunya's avatar
Sergey Petrunya committed
842
    field->store(str, 36,field->charset());
Sergey Petrunya's avatar
Sergey Petrunya committed
843
    return 0;
Sergey Petrunya's avatar
Sergey Petrunya committed
844 845 846 847 848 849
  }

  bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
  {
    String *uuid_str= field->val_str(&str_buf);

850 851 852 853
    if (uuid_str->length() != 36)
      return true;

    if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
Sergey Petrunya's avatar
Sergey Petrunya committed
854 855 856 857 858 859 860 861
      return true;
    *cass_data= buf;
    *cass_data_len= 16;
    return false;
  }
  ~UuidDataConverter(){}
};

862 863 864
/**
  Converting dynamic columns types to/from casandra types
*/
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891


/**
  Check and initialize (if it is needed) string MEM_ROOT
*/
static void alloc_strings_memroot(MEM_ROOT *mem_root)
{
  if (!alloc_root_inited(mem_root))
  {
    /*
      The mem_root used to allocate UUID (of length 36 + \0) so make
      appropriate allocated size
    */
    init_alloc_root(mem_root,
                    (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
                    ALLOC_ROOT_MIN_BLOCK_SIZE,
                    (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
                    ALLOC_ROOT_MIN_BLOCK_SIZE);
  }
}

static void free_strings_memroot(MEM_ROOT *mem_root)
{
  if (alloc_root_inited(mem_root))
    free_root(mem_root, MYF(0));
}

892 893
bool cassandra_to_dyncol_intLong(const char *cass_data,
                                 int cass_data_len __attribute__((unused)),
894 895
                                 DYNAMIC_COLUMN_VALUE *value,
                                 MEM_ROOT *mem_root __attribute__((unused)))
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
{
  value->type= DYN_COL_INT;
#ifdef WORDS_BIGENDIAN
  value->x.long_value= (longlong *)*cass_data;
#else
  flip64(cass_data, (char *)&value->x.long_value);
#endif
  return 0;
}

bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
                             char **cass_data, int *cass_data_len,
                             void* buff, void **freemem)
{
  longlong *tmp= (longlong *) buff;
  enum enum_dyncol_func_result rc=
912
    mariadb_dyncol_val_long(tmp, value);
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
  if (rc < 0)
    return true;
  *cass_data_len= sizeof(longlong);
#ifdef WORDS_BIGENDIAN
  *cass_data= (char *)buff;
#else
  flip64((char *)buff, (char *)buff + sizeof(longlong));
  *cass_data= (char *)buff + sizeof(longlong);
#endif
  *freemem= NULL;
  return false;
}

bool cassandra_to_dyncol_intInt32(const char *cass_data,
                                  int cass_data_len __attribute__((unused)),
928 929
                                  DYNAMIC_COLUMN_VALUE *value,
                                  MEM_ROOT *mem_root __attribute__((unused)))
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948
{
  int32 tmp;
  value->type= DYN_COL_INT;
#ifdef WORDS_BIGENDIAN
  tmp= *((int32 *)cass_data);
#else
  flip32(cass_data, (char *)&tmp);
#endif
  value->x.long_value= tmp;
  return 0;
}


bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
                              char **cass_data, int *cass_data_len,
                              void* buff, void **freemem)
{
  longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
  enum enum_dyncol_func_result rc=
949
    mariadb_dyncol_val_long(tmp, value);
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
  if (rc < 0)
    return true;
  *cass_data_len= sizeof(int32);
  *cass_data= (char *)buff;
#ifdef WORDS_BIGENDIAN
  *((int32 *) buff) = (int32) *tmp;
#else
  {
    int32 tmp2= (int32) *tmp;
    flip32((char *)&tmp2, (char *)buff);
  }
#endif
  *freemem= NULL;
  return false;
}


bool cassandra_to_dyncol_intCounter(const char *cass_data,
                                    int cass_data_len __attribute__((unused)),
969 970
                                    DYNAMIC_COLUMN_VALUE *value,
                                    MEM_ROOT *mem_root __attribute__((unused)))
971 972 973 974 975 976 977 978 979 980 981 982 983
{
  value->type= DYN_COL_INT;
  value->x.long_value= *((longlong *)cass_data);
  return 0;
}


bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
                                char **cass_data, int *cass_data_len,
                                void* buff, void **freemem)
{
  longlong *tmp= (longlong *)buff;
  enum enum_dyncol_func_result rc=
984
    mariadb_dyncol_val_long(tmp, value);
985 986 987 988 989 990 991 992 993 994
  if (rc < 0)
    return true;
  *cass_data_len= sizeof(longlong);
  *cass_data= (char *)buff;
  *freemem= NULL;
  return false;
}

bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
                                     int cass_data_len __attribute__((unused)),
995 996
                                     DYNAMIC_COLUMN_VALUE *value,
                                     MEM_ROOT *mem_root __attribute__((unused)))
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
{
  value->type= DYN_COL_DOUBLE;
  value->x.double_value= *((float *)cass_data);
  return 0;
}

bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
                              char **cass_data, int *cass_data_len,
                              void* buff, void **freemem)
{
  double tmp;
  enum enum_dyncol_func_result rc=
1009
    mariadb_dyncol_val_double(&tmp, value);
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  if (rc < 0)
    return true;
  *((float *)buff)= (float) tmp;
  *cass_data_len= sizeof(float);
  *cass_data= (char *)buff;
  *freemem= NULL;
  return false;
}

bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
                                      int cass_data_len __attribute__((unused)),
1021 1022 1023
                                      DYNAMIC_COLUMN_VALUE *value,
                                      MEM_ROOT *mem_root
                                      __attribute__((unused)))
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035
{
  value->type= DYN_COL_DOUBLE;
  value->x.double_value= *((double *)cass_data);
  return 0;
}

bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
                               char **cass_data, int *cass_data_len,
                               void* buff, void **freemem)
{
  double *tmp= (double *)buff;
  enum enum_dyncol_func_result rc=
1036
    mariadb_dyncol_val_double(tmp, value);
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
  if (rc < 0)
    return true;
  *cass_data_len= sizeof(double);
  *cass_data= (char *)buff;
  *freemem= NULL;
  return false;
}

bool cassandra_to_dyncol_strStr(const char *cass_data,
                                int cass_data_len,
                                DYNAMIC_COLUMN_VALUE *value,
                                CHARSET_INFO *cs)
{
  value->type= DYN_COL_STRING;
  value->x.string.charset= cs;
  value->x.string.value.str= (char *)cass_data;
  value->x.string.value.length= cass_data_len;
  return 0;
}

bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
                            char **cass_data, int *cass_data_len,
                            void* buff, void **freemem, CHARSET_INFO *cs)
{
  DYNAMIC_STRING tmp;
  if (init_dynamic_string(&tmp, NULL, 1024, 1024))
    return 1;
  enum enum_dyncol_func_result rc=
1065
    mariadb_dyncol_val_str(&tmp, value, cs, '\0');
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
  if (rc < 0)
  {
    dynstr_free(&tmp);
    return 1;
  }
  *cass_data_len= tmp.length;
  *(cass_data)= tmp.str;
  *freemem= tmp.str;
  return 0;
}

bool cassandra_to_dyncol_strBytes(const char *cass_data,
                                  int cass_data_len,
1079 1080
                                  DYNAMIC_COLUMN_VALUE *value,
                                  MEM_ROOT *mem_root __attribute__((unused)))
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
{
  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
                                    &my_charset_bin);
}

bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
                              char **cass_data, int *cass_data_len,
                              void* buff, void **freemem)
{
  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
                                buff, freemem, &my_charset_bin);
}

bool cassandra_to_dyncol_strAscii(const char *cass_data,
                                  int cass_data_len,
1096 1097
                                  DYNAMIC_COLUMN_VALUE *value,
                                  MEM_ROOT *mem_root __attribute__((unused)))
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
{
  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
                                    &my_charset_latin1_bin);
}

bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
                              char **cass_data, int *cass_data_len,
                              void* buff, void **freemem)
{
  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
                                buff, freemem, &my_charset_latin1_bin);
}

bool cassandra_to_dyncol_strUTF8(const char *cass_data,
                                 int cass_data_len,
1113 1114
                                 DYNAMIC_COLUMN_VALUE *value,
                                 MEM_ROOT *mem_root __attribute__((unused)))
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
{
  return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
                                    &my_charset_utf8_unicode_ci);
}

bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
                             char **cass_data, int *cass_data_len,
                             void* buff, void **freemem)
{
  return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
                                buff, freemem, &my_charset_utf8_unicode_ci);
}

bool cassandra_to_dyncol_strUUID(const char *cass_data,
                                 int cass_data_len,
1130 1131
                                 DYNAMIC_COLUMN_VALUE *value,
                                 MEM_ROOT *mem_root)
1132 1133 1134
{
  value->type= DYN_COL_STRING;
  value->x.string.charset= &my_charset_bin;
1135 1136
  alloc_strings_memroot(mem_root);
  value->x.string.value.str= (char *)alloc_root(mem_root, 37);
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
  if (!value->x.string.value.str)
  {
    value->x.string.value.length= 0;
    return 1;
  }
  convert_uuid2string(value->x.string.value.str, cass_data);
  value->x.string.value.length= 36;
  return 0;
}

bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
                             char **cass_data, int *cass_data_len,
                             void* buff, void **freemem)
{
  DYNAMIC_STRING tmp;
  if (init_dynamic_string(&tmp, NULL, 1024, 1024))
    return true;
  enum enum_dyncol_func_result rc=
1155
    mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0');
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
  {
    dynstr_free(&tmp);
    return true;
  }

  *cass_data_len= tmp.length;
  *(cass_data)= tmp.str;
  *freemem= tmp.str;
  return 0;
}

bool cassandra_to_dyncol_intBool(const char *cass_data,
                                 int cass_data_len,
1170 1171
                                 DYNAMIC_COLUMN_VALUE *value,
                                 MEM_ROOT *mem_root __attribute__((unused)))
1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
{
  value->type= DYN_COL_INT;
  value->x.long_value= (cass_data[0] ? 1 : 0);
  return 0;
}

bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
                             char **cass_data, int *cass_data_len,
                             void* buff, void **freemem)
{
  longlong tmp;
  enum enum_dyncol_func_result rc=
1184
    mariadb_dyncol_val_long(&tmp, value);
1185 1186 1187 1188 1189 1190 1191 1192 1193
  if (rc < 0)
    return true;
  ((char *)buff)[0]= (tmp ? 1 : 0);
  *cass_data_len= 1;
  *(cass_data)= (char *)buff;
  *freemem= 0;
  return 0;
}

1194

1195 1196
const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
1197 1198
const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";

1199 1200 1201 1202 1203 1204 1205
const char * const validator_float=   "org.apache.cassandra.db.marshal.FloatType";
const char * const validator_double=  "org.apache.cassandra.db.marshal.DoubleType";

const char * const validator_blob=    "org.apache.cassandra.db.marshal.BytesType";
const char * const validator_ascii=   "org.apache.cassandra.db.marshal.AsciiType";
const char * const validator_text=    "org.apache.cassandra.db.marshal.UTF8Type";

1206
const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
1207

Sergey Petrunya's avatar
Sergey Petrunya committed
1208 1209
const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";

1210 1211
const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";

1212
/* VARINTs are stored as big-endian big numbers. */
Sergey Petrunya's avatar
Sergey Petrunya committed
1213
const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
1214
const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
Sergey Petrunya's avatar
Sergey Petrunya committed
1215

1216

1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
static CASSANDRA_TYPE_DEF cassandra_types[]=
{
  {
    validator_bigint,
    &cassandra_to_dyncol_intLong,
    &dyncol_to_cassandraLong
  },
  {
    validator_int,
    &cassandra_to_dyncol_intInt32,
    &dyncol_to_cassandraInt32
  },
  {
    validator_counter,
    cassandra_to_dyncol_intCounter,
    &dyncol_to_cassandraCounter
  },
  {
    validator_float,
    &cassandra_to_dyncol_doubleFloat,
    &dyncol_to_cassandraFloat
  },
  {
    validator_double,
    &cassandra_to_dyncol_doubleDouble,
    &dyncol_to_cassandraDouble
  },
  {
    validator_blob,
    &cassandra_to_dyncol_strBytes,
    &dyncol_to_cassandraBytes
  },
  {
    validator_ascii,
    &cassandra_to_dyncol_strAscii,
    &dyncol_to_cassandraAscii
  },
  {
    validator_text,
    &cassandra_to_dyncol_strUTF8,
    &dyncol_to_cassandraUTF8
  },
  {
    validator_timestamp,
    &cassandra_to_dyncol_intLong,
    &dyncol_to_cassandraLong
  },
  {
    validator_uuid,
    &cassandra_to_dyncol_strUUID,
    &dyncol_to_cassandraUUID
  },
  {
    validator_boolean,
    &cassandra_to_dyncol_intBool,
    &dyncol_to_cassandraBool
  },
  {
    validator_varint,
    &cassandra_to_dyncol_strBytes,
    &dyncol_to_cassandraBytes
  },
  {
    validator_decimal,
    &cassandra_to_dyncol_strBytes,
    &dyncol_to_cassandraBytes
  }
};

CASSANDRA_TYPE get_cassandra_type(const char *validator)
{
  CASSANDRA_TYPE rc;
  switch(validator[32])
  {
  case 'L':
    rc= CT_BIGINT;
    break;
  case 'I':
    rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
    rc= CT_INT;
    break;
  case 'C':
    rc= CT_COUNTER;
    break;
  case 'F':
    rc= CT_FLOAT;
    break;
  case 'D':
    switch (validator[33])
    {
    case 'o':
      rc= CT_DOUBLE;
      break;
    case 'a':
      rc= CT_TIMESTAMP;
      break;
    case 'e':
      rc= CT_DECIMAL;
      break;
    default:
      rc= CT_BLOB;
      break;
    }
    break;
  case 'B':
    rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
    break;
  case 'A':
    rc= CT_ASCII;
    break;
  case 'U':
    rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
    break;
  default:
    rc= CT_BLOB;
  }
  DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
  return rc;
}

1337
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
1338
{
1339 1340
  ColumnDataConverter *res= NULL;

1341 1342
  switch(field->type()) {
    case MYSQL_TYPE_TINY:
1343 1344 1345 1346 1347 1348
      if (!strcmp(validator_name, validator_boolean))
      {
        res= new TinyintDataConverter;
        break;
      }
      /* fall through: */
1349 1350
    case MYSQL_TYPE_SHORT:
    case MYSQL_TYPE_LONGLONG:
1351 1352 1353
    {
      bool is_counter= false;
      if (!strcmp(validator_name, validator_bigint) ||
1354
          !strcmp(validator_name, validator_timestamp) ||
1355 1356
          (is_counter= !strcmp(validator_name, validator_counter)))
        res= new BigintDataConverter(!is_counter);
1357
      break;
1358
    }
1359 1360
    case MYSQL_TYPE_FLOAT:
      if (!strcmp(validator_name, validator_float))
1361
        res= new FloatDataConverter;
1362
      break;
Sergey Petrunya's avatar
Sergey Petrunya committed
1363

1364 1365
    case MYSQL_TYPE_DOUBLE:
      if (!strcmp(validator_name, validator_double))
1366
        res= new DoubleDataConverter;
1367
      break;
1368

1369 1370 1371 1372
    case MYSQL_TYPE_TIMESTAMP:
      if (!strcmp(validator_name, validator_timestamp))
        res= new TimestampDataConverter;
      break;
1373

Sergey Petrunya's avatar
Sergey Petrunya committed
1374
    case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
1375
      if (!strcmp(validator_name, validator_uuid) &&
Sergey Petrunya's avatar
Sergey Petrunya committed
1376
          field->real_type() == MYSQL_TYPE_STRING &&
1377
          field->field_length == 36)
Sergey Petrunya's avatar
Sergey Petrunya committed
1378 1379 1380 1381 1382 1383 1384
      {
        // UUID maps to CHAR(36), its text representation
        res= new UuidDataConverter;
        break;
      }
      /* fall through: */
    case MYSQL_TYPE_VAR_STRING:
1385
    case MYSQL_TYPE_VARCHAR:
Sergey Petrunya's avatar
Sergey Petrunya committed
1386
    {
1387 1388
      /*
        Cassandra's "varint" type is a binary-encoded arbitary-length
unknown's avatar
unknown committed
1389
        big-endian number.
1390 1391 1392 1393 1394 1395 1396
        - It can be mapped to VARBINARY(N), with sufficiently big N.
        - If the value does not fit into N bytes, it is an error. We should not
          truncate it, because that is just as good as returning garbage.
        - varint should not be mapped to BINARY(N), because BINARY(N) values
          are zero-padded, which will work as multiplying the value by
          2^k for some value of k.
      */
unknown's avatar
unknown committed
1397
      if (field->type() == MYSQL_TYPE_VARCHAR &&
1398
          field->binary() &&
1399 1400
          (!strcmp(validator_name, validator_varint) ||
           !strcmp(validator_name, validator_decimal)))
1401 1402 1403 1404 1405
      {
        res= new StringCopyConverter(field->field_length);
        break;
      }

1406 1407
      if (!strcmp(validator_name, validator_blob) ||
          !strcmp(validator_name, validator_ascii) ||
1408
          !strcmp(validator_name, validator_text))
1409
      {
1410
        res= new StringCopyConverter((size_t)-1);
1411 1412
      }
      break;
Sergey Petrunya's avatar
Sergey Petrunya committed
1413
    }
Sergey Petrunya's avatar
Sergey Petrunya committed
1414 1415 1416 1417 1418
    case MYSQL_TYPE_LONG:
      if (!strcmp(validator_name, validator_int))
        res= new Int32DataConverter;
      break;

1419
    default:;
1420
  }
1421
  return res;
1422 1423 1424
}


1425 1426 1427 1428 1429 1430
bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
{
  char *col_name;
  int  col_name_len;
  char *col_type;
  int col_type_len;
1431 1432 1433 1434 1435
  size_t ddl_fields= se->get_ddl_size();
  const char *default_type= se->get_default_validator();
  uint max_non_default_fields;
  DBUG_ENTER("ha_cassandra::setup_field_converters");
  DBUG_ASSERT(default_type);
1436 1437

  DBUG_ASSERT(!field_converters);
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457
  DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);

  /*
    We always should take into account that in case of using dynamic columns
    sql description contain one field which does not described in
    Cassandra DDL also key field is described separately. So that
    is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
  */
  max_non_default_fields= ddl_fields + 2 - n_fields;
  if (ddl_fields < (n_fields - dyncol_set - 1))
  {
    se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
    DBUG_RETURN(true);
  }

  /* allocate memory in one chunk */
  size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
    (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
    (dyncol_set ? max_non_default_fields : 0);
1458
  if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
1459
    DBUG_RETURN(true);
1460 1461 1462
  bzero(field_converters, memsize);
  n_field_converters= n_fields;

1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
  if (dyncol_set)
  {
    special_type_field_converters=
      (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
    special_type_field_names=
      ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));

    if (init_dynamic_array(&dynamic_values,
                           sizeof(DYNAMIC_COLUMN_VALUE),
                           DYNCOL_USUAL, DYNCOL_DELTA))
      DBUG_RETURN(true);
    else
      if (init_dynamic_array(&dynamic_names,
                             sizeof(LEX_STRING),
                             DYNCOL_USUAL, DYNCOL_DELTA))
      {
        delete_dynamic(&dynamic_values);
        DBUG_RETURN(true);
      }
      else
        if (init_dynamic_string(&dynamic_rec, NULL,
                                DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
        {
          delete_dynamic(&dynamic_values);
          delete_dynamic(&dynamic_names);
          DBUG_RETURN(true);
        }

    /* Dynamic column field has special processing */
    field_converters[dyncol_field]= NULL;

    default_type_def= cassandra_types + get_cassandra_type(default_type);
  }

1497 1498 1499 1500 1501
  se->first_ddl_column();
  uint n_mapped= 0;
  while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
                              &col_type_len))
  {
1502 1503
    Field **field;
    uint i;
1504
    /* Mapping for the 1st field is already known */
1505
    for (field= field_arg + 1, i= 1; *field; field++, i++)
1506
    {
1507 1508
      if ((!dyncol_set || dyncol_field != i) &&
          !strcmp((*field)->field_name, col_name))
1509 1510 1511 1512
      {
        n_mapped++;
        ColumnDataConverter **conv= field_converters + (*field)->field_index;
        if (!(*conv= map_field_to_validator(*field, col_type)))
1513
        {
1514
          se->print_error("Failed to map column %s to datatype %s",
1515 1516
                          (*field)->field_name, col_type);
          my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1517
          DBUG_RETURN(true);
1518
        }
1519 1520 1521
        (*conv)->field= *field;
      }
    }
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
    if (dyncol_set && !(*field)) // is needed and not found
    {
      DBUG_PRINT("info",("Field not found: %s", col_name));
      if (strcmp(col_type, default_type))
      {
        DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
                           col_name, col_type));
        special_type_field_names[n_special_type_fields].length= col_name_len;
        special_type_field_names[n_special_type_fields].str= col_name;
        special_type_field_converters[n_special_type_fields]=
          cassandra_types[get_cassandra_type(col_type)];
        n_special_type_fields++;
      }
    }
1536 1537
  }

1538
  if (n_mapped != n_fields - 1 - dyncol_set)
Sergey Petrunya's avatar
Sergey Petrunya committed
1539
  {
Sergey Petrunya's avatar
Sergey Petrunya committed
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
    Field *first_unmapped= NULL;
    /* Find the first field */
    for (uint i= 1; i < n_fields;i++)
    {
      if (!field_converters[i])
      {
        first_unmapped= field_arg[i];
        break;
      }
    }
    DBUG_ASSERT(first_unmapped);

    se->print_error("Field `%s` could not be mapped to any field in Cassandra",
1553
                    first_unmapped->field_name);
Sergey Petrunya's avatar
Sergey Petrunya committed
1554
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1555
    DBUG_RETURN(true);
Sergey Petrunya's avatar
Sergey Petrunya committed
1556
  }
1557 1558

  /*
Sergey Petrunya's avatar
Sergey Petrunya committed
1559
    Setup type conversion for row_key.
1560 1561
  */
  se->get_rowkey_type(&col_name, &col_type);
Sergey Petrunya's avatar
Sergey Petrunya committed
1562 1563
  if (col_name && strcmp(col_name, (*field_arg)->field_name))
  {
1564 1565
    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
                    col_name);
Sergey Petrunya's avatar
Sergey Petrunya committed
1566
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1567
    DBUG_RETURN(true);
Sergey Petrunya's avatar
Sergey Petrunya committed
1568 1569 1570 1571 1572 1573
  }
  if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
  {
    se->print_error("target column family has no key_alias defined, "
                    "PRIMARY KEY column must be named 'rowkey'");
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1574
    DBUG_RETURN(true);
Sergey Petrunya's avatar
Sergey Petrunya committed
1575 1576
  }

1577 1578 1579 1580 1581 1582
  if (col_type != NULL)
  {
    if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
    {
      se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
      my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1583
      DBUG_RETURN(true);
1584 1585 1586 1587 1588 1589 1590
    }
    rowkey_converter->field= *field_arg;
  }
  else
  {
    se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1591
    DBUG_RETURN(true);
1592
  }
1593

1594
  DBUG_RETURN(false);
1595 1596 1597 1598 1599
}


void ha_cassandra::free_field_converters()
{
1600 1601 1602
  delete rowkey_converter;
  rowkey_converter= NULL;

1603 1604 1605 1606 1607 1608
  if (dyncol_set)
  {
    delete_dynamic(&dynamic_values);
    delete_dynamic(&dynamic_names);
    dynstr_free(&dynamic_rec);
  }
1609 1610 1611
  if (field_converters)
  {
    for (uint i=0; i < n_field_converters; i++)
1612 1613 1614 1615 1616
      if (field_converters[i])
      {
        DBUG_ASSERT(!dyncol_set || i == dyncol_field);
        delete field_converters[i];
      }
1617 1618 1619 1620 1621
    my_free(field_converters);
    field_converters= NULL;
  }
}

1622

1623 1624 1625 1626 1627 1628 1629 1630
int ha_cassandra::index_init(uint idx, bool sorted)
{
  int ires;
  if (!se && (ires= connect_and_check_options(table)))
    return ires;
  return 0;
}

1631 1632 1633
void store_key_image_to_rec(Field *field, uchar *ptr, uint len);

int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
1634 1635
                                 key_part_map keypart_map,
                                 enum ha_rkey_function find_flag)
1636
{
1637
  int rc= 0;
1638
  DBUG_ENTER("ha_cassandra::index_read_map");
1639

1640
  if (find_flag != HA_READ_KEY_EXACT)
1641 1642
  {
    DBUG_ASSERT(0); /* Non-equality lookups should never be done */
1643
    DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1644
  }
1645 1646 1647

  uint key_len= calculate_key_len(table, active_index, key, keypart_map);
  store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
1648 1649 1650

  char *cass_key;
  int cass_key_len;
1651 1652 1653 1654
  my_bitmap_map *old_map;

  old_map= dbug_tmp_use_all_columns(table, table->read_set);

Sergey Petrunya's avatar
Sergey Petrunya committed
1655 1656 1657
  if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
  {
    /* We get here when making lookups like uuid_column='not-an-uuid' */
1658
    dbug_tmp_restore_column_map(table->read_set, old_map);
Sergey Petrunya's avatar
Sergey Petrunya committed
1659 1660
    DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
  }
1661

1662 1663
  dbug_tmp_restore_column_map(table->read_set, old_map);

1664
  bool found;
1665 1666 1667
  if (se->get_slice(cass_key, cass_key_len, &found))
  {
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1668
    rc= HA_ERR_INTERNAL_ERROR;
1669
  }
1670

1671 1672 1673
  /* TODO: what if we're not reading all columns?? */
  if (!found)
    rc= HA_ERR_KEY_NOT_FOUND;
1674
  else
Sergey Petrunya's avatar
Sergey Petrunya committed
1675 1676 1677 1678 1679 1680
    rc= read_cassandra_columns(false);

  DBUG_RETURN(rc);
}


unknown's avatar
unknown committed
1681
void ha_cassandra::print_conversion_error(const char *field_name,
Sergey Petrunya's avatar
Sergey Petrunya committed
1682 1683 1684 1685 1686 1687
                                          char *cass_value,
                                          int cass_value_len)
{
  char buf[32];
  char *p= cass_value;
  size_t i= 0;
1688
  for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
1689
  {
Sergey Petrunya's avatar
Sergey Petrunya committed
1690 1691
    buf[i++]= map2number[(*p >> 4) & 0xF];
    buf[i++]= map2number[*p & 0xF];
1692
  }
Sergey Petrunya's avatar
Sergey Petrunya committed
1693
  buf[i]=0;
1694

Sergey Petrunya's avatar
Sergey Petrunya committed
1695 1696
  se->print_error("Unable to convert value for field `%s` from Cassandra's data"
                  " format. Source data is %d bytes, 0x%s%s",
unknown's avatar
unknown committed
1697
                  field_name, cass_value_len, buf,
Sergey Petrunya's avatar
Sergey Petrunya committed
1698 1699
                  (i == sizeof(buf) - 1)? "..." : "");
  my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1700
}
1701

1702

1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720

CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
                                                           int cass_name_len)
{
  CASSANDRA_TYPE_DEF *type= default_type_def;
  for(uint i= 0; i < n_special_type_fields; i++)
  {
    if (cass_name_len == (int)special_type_field_names[i].length &&
        memcmp(cass_name, special_type_field_names[i].str,
               cass_name_len) == 0)
    {
      type= special_type_field_converters + i;
      break;
    }
  }
  return type;
}

Sergey Petrunya's avatar
Sergey Petrunya committed
1721
int ha_cassandra::read_cassandra_columns(bool unpack_pk)
1722
{
1723
  MEM_ROOT strings_root;
1724 1725
  char *cass_name;
  char *cass_value;
1726
  int cass_value_len, cass_name_len;
1727
  Field **field;
Sergey Petrunya's avatar
Sergey Petrunya committed
1728
  int res= 0;
1729 1730
  ulong total_name_len= 0;

1731
  clear_alloc_root(&strings_root);
1732
  /*
1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
    cassandra_to_mariadb() calls will use field->store(...) methods, which
    require that the column is in the table->write_set
  */
  my_bitmap_map *old_map;
  old_map= dbug_tmp_use_all_columns(table, table->write_set);

  /* Start with all fields being NULL */
  for (field= table->field + 1; *field; field++)
    (*field)->set_null();

1743 1744
  while (!se->get_next_read_column(&cass_name, &cass_name_len,
                                   &cass_value, &cass_value_len))
1745 1746
  {
    // map to our column. todo: use hash or something..
1747
    bool found= 0;
1748
    for (field= table->field + 1; *field; field++)
1749
    {
1750 1751 1752
      uint fieldnr= (*field)->field_index;
      if ((!dyncol_set || dyncol_field != fieldnr) &&
          !strcmp((*field)->field_name, cass_name))
1753
      {
1754
        found= 1;
1755
        (*field)->set_notnull();
Sergey Petrunya's avatar
Sergey Petrunya committed
1756 1757 1758
        if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
                                                            cass_value_len))
        {
unknown's avatar
unknown committed
1759
          print_conversion_error((*field)->field_name, cass_value,
Sergey Petrunya's avatar
Sergey Petrunya committed
1760 1761 1762 1763
                                 cass_value_len);
          res=1;
          goto err;
        }
1764
        break;
1765
      }
1766
    }
1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
    if (dyncol_set && !found)
    {
      DYNAMIC_COLUMN_VALUE val;
      LEX_STRING nm;
      CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
                                                        cass_name_len);
      nm.str= cass_name;
      nm.length= cass_name_len;
      if (nm.length > MAX_NAME_LENGTH)
      {
        se->print_error("Unable to convert value for field `%s`"
                        " from Cassandra's data format. Name"
                        " length exceed limit of %u: '%s'",
                        table->field[dyncol_field]->field_name,
                        (uint)MAX_NAME_LENGTH, cass_name);
        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
        res=1;
        goto err;
      }
      total_name_len+= cass_name_len;
      if (nm.length > MAX_TOTAL_NAME_LENGTH)
      {
        se->print_error("Unable to convert value for field `%s`"
                        " from Cassandra's data format. Sum of all names"
                        " length exceed limit of %lu",
                        table->field[dyncol_field]->field_name,
                        cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
        res=1;
        goto err;
      }

      if ((res= (*(type->cassandra_to_dynamic))(cass_value,
1800 1801
                                                cass_value_len, &val,
                                                &strings_root)) ||
1802 1803 1804 1805 1806 1807 1808
          insert_dynamic(&dynamic_names, (uchar *) &nm) ||
          insert_dynamic(&dynamic_values, (uchar *) &val))
      {
        if (res)
        {
          print_conversion_error(cass_name, cass_value, cass_value_len);
        }
1809
        free_strings_memroot(&strings_root);
1810
        // EOM shouldm be already reported if happened
1811
        res= 1;
1812 1813 1814
        goto err;
      }
    }
1815
  }
1816 1817 1818 1819

  dynamic_rec.length= 0;
  if (dyncol_set)
  {
1820 1821 1822 1823 1824 1825
    if (mariadb_dyncol_create_many_named(&dynamic_rec,
                                         dynamic_names.elements,
                                         (LEX_STRING *)dynamic_names.buffer,
                                         (DYNAMIC_COLUMN_VALUE *)
                                         dynamic_values.buffer,
                                         FALSE) < 0)
1826 1827
      dynamic_rec.length= 0;

1828
    free_strings_memroot(&strings_root);
1829
    dynamic_values.elements= dynamic_names.elements= 0;
1830

1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842
    if (dynamic_rec.length == 0)
      table->field[dyncol_field]->set_null();
    else
    {
      Field_blob *blob= (Field_blob *)table->field[dyncol_field];
      blob->set_notnull();
      blob->store_length(dynamic_rec.length);
      *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
        dynamic_rec.str;
    }
  }

1843 1844 1845 1846 1847 1848
  if (unpack_pk)
  {
    /* Unpack rowkey to primary key */
    field= table->field;
    (*field)->set_notnull();
    se->get_read_rowkey(&cass_value, &cass_value_len);
Sergey Petrunya's avatar
Sergey Petrunya committed
1849 1850 1851 1852 1853 1854
    if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
    {
      print_conversion_error((*field)->field_name, cass_value, cass_value_len);
      res=1;
      goto err;
    }
1855
  }
1856

Sergey Petrunya's avatar
Sergey Petrunya committed
1857
err:
1858
  dbug_tmp_restore_column_map(table->write_set, old_map);
Sergey Petrunya's avatar
Sergey Petrunya committed
1859
  return res;
1860 1861
}

1862 1863 1864 1865
int ha_cassandra::read_dyncol(uint *count,
                              DYNAMIC_COLUMN_VALUE **vals,
                              LEX_STRING **names,
                              String *valcol)
1866 1867 1868
{
  String *strcol;
  DYNAMIC_COLUMN col;
1869

1870 1871 1872 1873 1874 1875 1876 1877 1878
  enum enum_dyncol_func_result rc;
  DBUG_ENTER("ha_cassandra::read_dyncol");

  Field *field= table->field[dyncol_field];
  DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
  /* It is blob and it does not use buffer */
  strcol= field->val_str(NULL, valcol);
  if (field->is_null())
  {
1879 1880 1881
    *count= 0;
    *names= 0;
    *vals= 0;
1882 1883 1884 1885 1886 1887 1888 1889 1890
    DBUG_RETURN(0); // nothing to write
  }
  /*
    dynamic_column_vals only read the string so we can
    cheat here with assignment
  */
  bzero(&col, sizeof(col));
  col.str= (char *)strcol->ptr();
  col.length= strcol->length();
1891
  if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0)
1892 1893 1894 1895 1896 1897 1898
  {
    dynamic_column_error_message(rc);
    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
  }
  DBUG_RETURN(0);
}

1899 1900 1901
int ha_cassandra::write_dynamic_row(uint count,
                                    DYNAMIC_COLUMN_VALUE *vals,
                                    LEX_STRING *names)
1902 1903 1904 1905 1906 1907
{
  uint i;
  DBUG_ENTER("ha_cassandra::write_dynamic_row");
  DBUG_ASSERT(dyncol_set);


1908
  for (i= 0; i < count; i++)
1909 1910 1911 1912 1913 1914 1915
  {
    char buff[16];
    CASSANDRA_TYPE_DEF *type;
    void *freemem= NULL;
    char *cass_data;
    int cass_data_len;

1916 1917 1918
    DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str));
    type= get_cassandra_field_def(names[i].str, (int) names[i].length);
    if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len,
1919 1920 1921
                                      buff, &freemem))
    {
      my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1922 1923
               names[i].str, insert_lineno);
      DBUG_RETURN(HA_ERR_GENERIC);
1924
    }
1925
    se->add_insert_column(names[i].str, names[i].length,
1926 1927 1928 1929 1930 1931 1932
                          cass_data, cass_data_len);
    if (freemem)
      my_free(freemem);
  }
  DBUG_RETURN(0);
}

1933 1934
void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
                                    LEX_STRING **names)
1935
{
1936 1937 1938 1939 1940 1941 1942 1943 1944 1945
  if (*vals)
  {
    my_free(*vals);
    *vals= 0;
  }
  if (*names)
  {
    my_free(*names);
    *names= 0;
  }
1946
}
1947 1948 1949 1950

int ha_cassandra::write_row(uchar *buf)
{
  my_bitmap_map *old_map;
1951
  int ires;
1952
  DBUG_ENTER("ha_cassandra::write_row");
unknown's avatar
unknown committed
1953

1954 1955 1956
  if (!se && (ires= connect_and_check_options(table)))
    DBUG_RETURN(ires);

1957 1958 1959
  if (!doing_insert_batch)
    se->clear_insert_buffer();

1960
  old_map= dbug_tmp_use_all_columns(table, table->read_set);
unknown's avatar
unknown committed
1961

Sergey Petrunya's avatar
Sergey Petrunya committed
1962 1963
  insert_lineno++;

1964
  /* Convert the key */
1965 1966
  char *cass_key;
  int cass_key_len;
Sergey Petrunya's avatar
Sergey Petrunya committed
1967 1968 1969 1970 1971
  if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
  {
    my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
             rowkey_converter->field->field_name, insert_lineno);
    dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
1972
    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
Sergey Petrunya's avatar
Sergey Petrunya committed
1973
  }
1974
  se->start_row_insert(cass_key, cass_key_len);
1975

1976 1977
  /* Convert other fields */
  for (uint i= 1; i < table->s->fields; i++)
1978
  {
1979 1980
    char *cass_data;
    int cass_data_len;
1981
    if (dyncol_set && dyncol_field == i)
Sergey Petrunya's avatar
Sergey Petrunya committed
1982
    {
1983
      String valcol;
1984 1985 1986
      DYNAMIC_COLUMN_VALUE *vals;
      LEX_STRING *names;
      uint count;
1987 1988
      int rc;
      DBUG_ASSERT(field_converters[i] == NULL);
1989 1990 1991
      if (!(rc= read_dyncol(&count, &vals, &names, &valcol)))
        rc= write_dynamic_row(count, vals, names);
      free_dynamic_row(&vals, &names);
1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005
      if (rc)
      {
        dbug_tmp_restore_column_map(table->read_set, old_map);
        DBUG_RETURN(rc);
      }
    }
    else
    {
      if (field_converters[i]->mariadb_to_cassandra(&cass_data,
                                                    &cass_data_len))
      {
        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
                 field_converters[i]->field->field_name, insert_lineno);
        dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
2006
        DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2007 2008 2009
      }
      se->add_insert_column(field_converters[i]->field->field_name, 0,
                            cass_data, cass_data_len);
Sergey Petrunya's avatar
Sergey Petrunya committed
2010
    }
2011 2012
  }

2013
  dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
2014

2015
  bool res;
unknown's avatar
unknown committed
2016

2017 2018 2019
  if (doing_insert_batch)
  {
    res= 0;
2020
    if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
2021 2022 2023 2024 2025 2026 2027
    {
      res= se->do_insert();
      insert_rows_batched= 0;
    }
  }
  else
    res= se->do_insert();
2028

2029 2030
  if (res)
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
unknown's avatar
unknown committed
2031

2032 2033 2034 2035
  DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
}


Sergey Petrunya's avatar
Sergey Petrunya committed
2036
void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
2037
{
2038 2039 2040 2041
  int ires;
  if (!se && (ires= connect_and_check_options(table)))
    return;

2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
  doing_insert_batch= true;
  insert_rows_batched= 0;

  se->clear_insert_buffer();
}


int ha_cassandra::end_bulk_insert()
{
  DBUG_ENTER("ha_cassandra::end_bulk_insert");
unknown's avatar
unknown committed
2052

2053 2054 2055 2056 2057 2058 2059 2060 2061
  /* Flush out the insert buffer */
  doing_insert_batch= false;
  bool bres= se->do_insert();
  se->clear_insert_buffer();

  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
}


2062 2063 2064
int ha_cassandra::rnd_init(bool scan)
{
  bool bres;
2065
  int ires;
2066
  DBUG_ENTER("ha_cassandra::rnd_init");
2067 2068 2069 2070

  if (!se && (ires= connect_and_check_options(table)))
    DBUG_RETURN(ires);

2071
  if (!scan)
2072 2073 2074 2075
  {
    /* Prepare for rnd_pos() calls. We don't need to anything. */
    DBUG_RETURN(0);
  }
2076

2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
  if (dyncol_set)
  {
    se->clear_read_all_columns();
  }
  else
  {
    se->clear_read_columns();
    for (uint i= 1; i < table->s->fields; i++)
      se->add_read_column(table->field[i]->field_name);
  }
2087

2088
  se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
2089
  bres= se->get_range_slices(false);
2090 2091
  if (bres)
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106

  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
}


int ha_cassandra::rnd_end()
{
  DBUG_ENTER("ha_cassandra::rnd_end");

  se->finish_reading_range_slices();
  DBUG_RETURN(0);
}


int ha_cassandra::rnd_next(uchar *buf)
2107
{
2108
  int rc;
2109
  bool reached_eof;
2110 2111 2112
  DBUG_ENTER("ha_cassandra::rnd_next");

  // Unpack and return the next record.
2113
  if (se->get_next_range_slice_row(&reached_eof))
2114
  {
2115
    rc= HA_ERR_INTERNAL_ERROR;
2116
  }
2117 2118
  else
  {
2119 2120 2121
    if (reached_eof)
      rc= HA_ERR_END_OF_FILE;
    else
Sergey Petrunya's avatar
Sergey Petrunya committed
2122
      rc= read_cassandra_columns(true);
2123 2124
  }

2125 2126
  DBUG_RETURN(rc);
}
2127

2128 2129 2130 2131

int ha_cassandra::delete_all_rows()
{
  bool bres;
2132
  int ires;
2133 2134
  DBUG_ENTER("ha_cassandra::delete_all_rows");

2135 2136 2137
  if (!se && (ires= connect_and_check_options(table)))
    DBUG_RETURN(ires);

2138
  bres= se->truncate();
unknown's avatar
unknown committed
2139

2140 2141
  if (bres)
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2142 2143 2144 2145 2146

  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
}


2147 2148
int ha_cassandra::delete_row(const uchar *buf)
{
2149
  bool bres;
2150
  DBUG_ENTER("ha_cassandra::delete_row");
unknown's avatar
unknown committed
2151

2152
  bres= se->remove_row();
unknown's avatar
unknown committed
2153

2154 2155
  if (bres)
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
unknown's avatar
unknown committed
2156

2157
  DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2158 2159 2160
}


2161 2162 2163
int ha_cassandra::info(uint flag)
{
  DBUG_ENTER("ha_cassandra::info");
unknown's avatar
unknown committed
2164

2165 2166 2167 2168 2169 2170
  if (!table)
    return 1;

  if (flag & HA_STATUS_VARIABLE)
  {
    stats.records= 1000;
2171
    stats.deleted= 0;
2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188
  }
  if (flag & HA_STATUS_CONST)
  {
    ref_length= table->field[0]->key_length();
  }

  DBUG_RETURN(0);
}


void key_copy(uchar *to_key, uchar *from_record, KEY *key_info,
              uint key_length, bool with_zerofill);


void ha_cassandra::position(const uchar *record)
{
  DBUG_ENTER("ha_cassandra::position");
unknown's avatar
unknown committed
2189

2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201
  /* Copy the primary key to rowid */
  key_copy(ref, (uchar*)record, &table->key_info[0],
           table->field[0]->key_length(), true);

  DBUG_VOID_RETURN;
}


int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
{
  int rc;
  DBUG_ENTER("ha_cassandra::rnd_pos");
unknown's avatar
unknown committed
2202

2203
  int save_active_index= active_index;
2204
  active_index= 0; /* The primary key */
2205 2206 2207 2208 2209 2210 2211
  rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);

  active_index= save_active_index;

  DBUG_RETURN(rc);
}

2212

2213
int ha_cassandra::reset()
2214
{
2215
  doing_insert_batch= false;
Sergey Petrunya's avatar
Sergey Petrunya committed
2216
  insert_lineno= 0;
2217 2218 2219 2220 2221
  if (se)
  {
    se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
                               THDVAR(table->in_use, write_consistency));
  }
2222 2223
  return 0;
}
2224

2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235
/////////////////////////////////////////////////////////////////////////////
// MRR implementation
/////////////////////////////////////////////////////////////////////////////


/*
 - The key can be only primary key
  - allow equality-ranges only.
  - anything else?
*/
ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
unknown's avatar
unknown committed
2236
                                                  void *seq_init_param,
2237 2238 2239 2240 2241 2242 2243 2244 2245
                                                  uint n_ranges, uint *bufsz,
                                                  uint *flags, COST_VECT *cost)
{
  /* No support for const ranges so far */
  return HA_POS_ERROR;
}


ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
unknown's avatar
unknown committed
2246
                              uint key_parts, uint *bufsz,
2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274
                              uint *flags, COST_VECT *cost)
{
  /* Can only be equality lookups on the primary key... */
  // TODO anything else?
  *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
  *flags |= HA_MRR_NO_ASSOCIATION;

  return 10;
}


int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
                          uint n_ranges, uint mode, HANDLER_BUFFER *buf)
{
  int res;
  mrr_iter= seq->init(seq_init_param, n_ranges, mode);
  mrr_funcs= *seq;
  res= mrr_start_read();
  return (res? HA_ERR_INTERNAL_ERROR: 0);
}


bool ha_cassandra::mrr_start_read()
{
  uint key_len;

  my_bitmap_map *old_map;
  old_map= dbug_tmp_use_all_columns(table, table->read_set);
unknown's avatar
unknown committed
2275

2276 2277 2278 2279 2280 2281
  se->new_lookup_keys();

  while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
  {
    char *cass_key;
    int cass_key_len;
unknown's avatar
unknown committed
2282

2283 2284 2285 2286 2287 2288 2289 2290
    DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);

    uchar *key= (uchar*)mrr_cur_range.start_key.key;
    key_len= mrr_cur_range.start_key.length;
    //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
    store_key_image_to_rec(table->field[0], (uchar*)key, key_len);

    rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
unknown's avatar
unknown committed
2291

2292
    // Primitive buffer control
unknown's avatar
unknown committed
2293
    if (se->add_lookup_key(cass_key, cass_key_len) >
2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310
        THDVAR(table->in_use, multiget_batch_size))
      break;
  }

  dbug_tmp_restore_column_map(table->read_set, old_map);

  return se->multiget_slice();
}


int ha_cassandra::multi_range_read_next(range_id_t *range_info)
{
  int res;
  while(1)
  {
    if (!se->get_next_multiget_row())
    {
Sergey Petrunya's avatar
Sergey Petrunya committed
2311
      res= read_cassandra_columns(true);
2312 2313
      break;
    }
unknown's avatar
unknown committed
2314
    else
2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329
    {
      if (source_exhausted)
      {
        res= HA_ERR_END_OF_FILE;
        break;
      }
      else
      {
        if (mrr_start_read())
        {
          res= HA_ERR_INTERNAL_ERROR;
          break;
        }
      }
    }
unknown's avatar
unknown committed
2330
    /*
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352
      We get here if we've refilled the buffer and done another read. Try
      reading from results again
    */
  }
  return res;
}


int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
{
  const char *mrr_str= "multiget_slice";

  if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
  {
    uint mrr_str_len= strlen(mrr_str);
    uint copy_len= min(mrr_str_len, size);
    memcpy(str, mrr_str, size);
    return copy_len;
  }
  return 0;
}

2353

Sergey Petrunya's avatar
Sergey Petrunya committed
2354
class Column_name_enumerator_impl : public Column_name_enumerator
2355
{
Sergey Petrunya's avatar
Sergey Petrunya committed
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
  ha_cassandra *obj;
  uint idx;
public:
  Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
  const char* get_next_name()
  {
    if (idx == obj->table->s->fields)
      return NULL;
    else
      return obj->table->field[idx++]->field_name;
  }
};

2369

Sergey Petrunya's avatar
Sergey Petrunya committed
2370 2371
int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
{
2372 2373 2374
  DYNAMIC_COLUMN_VALUE *oldvals, *vals;
  LEX_STRING *oldnames, *names;
  uint oldcount, count;
2375
  String oldvalcol, valcol;
Sergey Petrunya's avatar
Sergey Petrunya committed
2376
  my_bitmap_map *old_map;
2377
  int res;
2378
  DBUG_ENTER("ha_cassandra::update_row");
Sergey Petrunya's avatar
Sergey Petrunya committed
2379
  /* Currently, it is guaranteed that new_data == table->record[0] */
2380
  DBUG_ASSERT(new_data == table->record[0]);
Sergey Petrunya's avatar
Sergey Petrunya committed
2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
  /* For now, just rewrite the full record */
  se->clear_insert_buffer();

  old_map= dbug_tmp_use_all_columns(table, table->read_set);

  char *old_key;
  int old_key_len;
  se->get_read_rowkey(&old_key, &old_key_len);

  /* Get the key we're going to write */
  char *new_key;
  int new_key_len;
  if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
  {
    my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
             rowkey_converter->field->field_name, insert_lineno);
    dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
2398
    DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
Sergey Petrunya's avatar
Sergey Petrunya committed
2399 2400 2401
  }

  /*
unknown's avatar
unknown committed
2402
    Compare it to the key we've read. For all types that Cassandra supports,
Sergey Petrunya's avatar
Sergey Petrunya committed
2403 2404 2405 2406 2407 2408 2409 2410
    binary byte-wise comparison can be used
  */
  bool new_primary_key;
  if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
    new_primary_key= true;
  else
    new_primary_key= false;

2411 2412 2413 2414 2415 2416 2417
  if (dyncol_set)
  {
    Field *field= table->field[dyncol_field];
    /* move to get old_data */
    my_ptrdiff_t diff;
    diff= (my_ptrdiff_t) (old_data - new_data);
    field->move_field_offset(diff);      // Points now at old_data
2418
    if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol)))
2419 2420
      DBUG_RETURN(res);
    field->move_field_offset(-diff);     // back to new_data
2421
    if ((res= read_dyncol(&count, &vals, &names, &valcol)))
2422
    {
2423
      free_dynamic_row(&oldvals, &oldnames);
2424 2425 2426
      DBUG_RETURN(res);
    }
  }
Sergey Petrunya's avatar
Sergey Petrunya committed
2427 2428 2429

  if (new_primary_key)
  {
unknown's avatar
unknown committed
2430 2431
    /*
      Primary key value changed. This is essentially a DELETE + INSERT.
Sergey Petrunya's avatar
Sergey Petrunya committed
2432 2433 2434
      Add a DELETE operation into the batch
    */
    Column_name_enumerator_impl name_enumerator(this);
2435
    se->add_row_deletion(old_key, old_key_len, &name_enumerator,
2436 2437 2438
                         oldnames,
                         (dyncol_set ? oldcount : 0));
    oldcount= 0; // they will be deleted
Sergey Petrunya's avatar
Sergey Petrunya committed
2439 2440 2441 2442 2443 2444 2445 2446 2447
  }

  se->start_row_insert(new_key, new_key_len);

  /* Convert other fields */
  for (uint i= 1; i < table->s->fields; i++)
  {
    char *cass_data;
    int cass_data_len;
2448
    if (dyncol_set && dyncol_field == i)
Sergey Petrunya's avatar
Sergey Petrunya committed
2449
    {
2450
      DBUG_ASSERT(field_converters[i] == NULL);
2451
      if ((res= write_dynamic_row(count, vals, names)))
2452 2453 2454 2455 2456 2457 2458 2459 2460
        goto err;
    }
    else
    {
      if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
      {
        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
                 field_converters[i]->field->field_name, insert_lineno);
        dbug_tmp_restore_column_map(table->read_set, old_map);
unknown's avatar
unknown committed
2461
        DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
      }
      se->add_insert_column(field_converters[i]->field->field_name, 0,
                            cass_data, cass_data_len);
    }
  }
  if (dyncol_set)
  {
    /* find removed fields */
    uint i= 0, j= 0;
    /* both array are sorted */
2472
    for(; i < oldcount; i++)
2473 2474
    {
      int scmp= 0;
2475 2476 2477
      while (j < count &&
             (scmp = mariadb_dyncol_column_cmp_named(names + j,
                                                     oldnames + i)) < 0)
2478
        j++;
2479
      if (j < count &&
2480 2481 2482
          scmp == 0)
        j++;
      else
2483
        se->add_insert_delete_column(oldnames[i].str, oldnames[i].length);
Sergey Petrunya's avatar
Sergey Petrunya committed
2484 2485
    }
  }
2486

Sergey Petrunya's avatar
Sergey Petrunya committed
2487
  dbug_tmp_restore_column_map(table->read_set, old_map);
2488 2489

  res= se->do_insert();
Sergey Petrunya's avatar
Sergey Petrunya committed
2490 2491 2492

  if (res)
    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2493 2494 2495 2496

err:
  if (dyncol_set)
  {
2497 2498
    free_dynamic_row(&oldvals, &oldnames);
    free_dynamic_row(&vals, &names);
2499 2500
  }

Sergey Petrunya's avatar
Sergey Petrunya committed
2501
  DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2502 2503 2504
}


2505 2506 2507 2508 2509 2510 2511 2512 2513 2514
/*
  We can't really have any locks for Cassandra Storage Engine. We're reading
  from Cassandra cluster, and other clients can asynchronously modify the data.
  
  We can enforce locking within this process, but this will not be useful. 
 
  Thus, store_lock() should express that:
  - Writes do not block other writes
  - Reads should not block anything either, including INSERTs.
*/
2515
THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
2516 2517
                                         THR_LOCK_DATA **to,
                                         enum thr_lock_type lock_type)
2518
{
2519
  DBUG_ENTER("ha_cassandra::store_lock");
2520
  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2521
  {
2522
    /* Writes allow other writes */
2523
    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2524
         lock_type <= TL_WRITE))
2525 2526
      lock_type = TL_WRITE_ALLOW_WRITE;

2527 2528
    /* Reads allow everything, including INSERTs */
    if (lock_type == TL_READ_NO_INSERT)
2529 2530 2531 2532
      lock_type = TL_READ;

    lock.type= lock_type;
  }
2533
  *to++= &lock;
2534
  DBUG_RETURN(to);
2535 2536 2537
}


2538 2539
ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
                                       key_range *max_key)
2540
{
2541 2542
  DBUG_ENTER("ha_cassandra::records_in_range");
  DBUG_RETURN(HA_POS_ERROR); /* Range scans are not supported */
2543 2544
}

2545

2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560
/**
  check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
  if new and old definition are compatible

  @details If there are no other explicit signs like changed number of
  fields this function will be called by compare_tables()
  (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
  file.

*/

bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
                                            uint table_changes)
{
  DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
Sergey Petrunya's avatar
Sergey Petrunya committed
2561
  /* Checked, we intend to have this empty for Cassandra SE. */
2562 2563 2564 2565
  DBUG_RETURN(COMPATIBLE_DATA_YES);
}


2566 2567 2568 2569 2570 2571 2572 2573 2574
void Cassandra_se_interface::print_error(const char *format, ...)
{
  va_list ap;
  va_start(ap, format);
  // it's not a problem if output was truncated
  my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
  va_end(ap);
}

2575 2576 2577 2578

struct st_mysql_storage_engine cassandra_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };

2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596
static SHOW_VAR cassandra_status_variables[]= {
  {"Cassandra_row_inserts",
    (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
  {"Cassandra_row_insert_batches",
    (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},

  {"Cassandra_multiget_keys_scanned",
    (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
  {"Cassandra_multiget_reads",
    (char*) &cassandra_counters.multiget_reads,      SHOW_LONG},
  {"Cassandra_multiget_rows_read",
    (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},

  {"Cassandra_timeout_exceptions",
    (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
  {"Cassandra_unavailable_exceptions",
    (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
  {NullS, NullS, SHOW_LONG}
2597 2598
};

2599

2600 2601


2602 2603 2604 2605 2606 2607 2608 2609 2610 2611
maria_declare_plugin(cassandra)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
  &cassandra_storage_engine,
  "CASSANDRA",
  "Monty Program Ab",
  "Cassandra storage engine",
  PLUGIN_LICENSE_GPL,
  cassandra_init_func,                            /* Plugin Init */
  cassandra_done_func,                            /* Plugin Deinit */
2612 2613
  0x0001,                                        /* version number (0.1) */
  cassandra_status_variables,                     /* status variables */
2614 2615 2616 2617 2618
  cassandra_system_variables,                     /* system variables */
  "0.1",                                        /* string version */
  MariaDB_PLUGIN_MATURITY_EXPERIMENTAL          /* maturity */
}
maria_declare_plugin_end;