Commit 34ab0c05 authored by marko's avatar marko

branches/zip: Implement the reporting of duplicate key values to MySQL.

innobase_rec_to_mysql(): New function, for converting an InnoDB clustered
index record to MySQL table->record[0].  TODO: convert integer fields.
Currently, integer fields are in big-endian byte order instead of
host byte order, and signed integer fields are offset by 0x80000000.

innobase_rec_reset(): New function, for resetting table->record[0].

row_merge_build_indexes(): Add the parameter TABLE* table (the MySQL table
handle) for reporting duplicate key values.

dtuple_from_fields(): New function, to convert an array of dfield_t* to
dtuple_t.

dtuple_get_n_ext(): New function, to compute the number of externally stored
fields.

row_merge_dup_t: Structure for counting and reporting duplicate records.

row_merge_dup_report(): Function for counting and reporting duplicate records.

row_merge_tuple_cmp(), row_merge_tuple_sort(): Replace the ulint* n_dup
parameter with row_merge_dup_t* dup.

row_merge_buf_sort(): Add the parameter row_merge_dup_t* dup, which is
NULL when sorting a non-unique index.

row_merge_buf_write(), row_merge_heap_create(), row_merge_read_rec(),
row_merge_cmp(), row_merge_read_clustered_index(), row_merge_blocks(),
row_merge(), row_merge_sort(): Add const qualifiers.

row_merge_read_clustered_index(): Use a common error handling branch err_exit.
Invoke row_merge_buf_sort() differently on unique indexes.

row_merge_blocks(): note TODO: We could invoke innobase_rec_to_mysql()
to report duplicate key values when creating a clustered index.
parent f44bec19
...@@ -19,6 +19,80 @@ extern "C" { ...@@ -19,6 +19,80 @@ extern "C" {
#include "handler0alter.h" #include "handler0alter.h"
} }
/*****************************************************************
Copies an InnoDB clustered index record to table->record[0]. */
extern "C"
void
innobase_rec_to_mysql(
/*==================*/
TABLE* table, /* in/out: MySQL table */
const rec_t* rec, /* in: record */
const dict_index_t* index, /* in: clustered index */
const ulint* offsets) /* in: rec_get_offsets(
rec, index, ...) */
{
uint n_fields = table->s->fields;
uint i;
ut_ad(dict_index_is_clust(index));
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(n_fields == dict_table_get_n_user_cols(index->table));
for (i = 0; i < n_fields; i++) {
Field* field = table->field[i];
void* ptr = field->ptr;
uint32 flen = field->pack_length();
ulint ipos;
ulint ilen;
const void* ifield;
ipos = dict_index_get_nth_col_pos(index, i);
ut_ad(ipos != ULINT_UNDEFINED);
if (UNIV_UNLIKELY(ipos == ULINT_UNDEFINED)) {
reset_field:
field->reset();
field->set_null();
continue;
}
ifield = rec_get_nth_field(rec, offsets, ipos, &ilen);
/* Assign the NULL flag */
if (ilen == UNIV_SQL_NULL) {
ut_ad(field->real_maybe_null());
goto reset_field;
} else {
field->set_notnull();
/* Copy the data. */
/* TODO: convert integer fields */
if (ilen >= flen) {
memcpy(ptr, ifield, flen);
} else {
field->reset();
memcpy(ptr, ifield, ilen);
}
}
}
}
/*****************************************************************
Resets table->record[0]. */
extern "C"
void
innobase_rec_reset(
/*===============*/
TABLE* table) /* in/out: MySQL table */
{
uint n_fields = table->s->fields;
uint i;
for (i = 0; i < n_fields; i++) {
table->field[i]->set_default();
}
}
/********************************************************************** /**********************************************************************
Removes the filename encoding of a database and table name. */ Removes the filename encoding of a database and table name. */
static static
...@@ -638,7 +712,7 @@ ha_innobase::add_index( ...@@ -638,7 +712,7 @@ ha_innobase::add_index(
/* Read the clustered index of the table and build indexes /* Read the clustered index of the table and build indexes
based on this information using temporary files and merge sort. */ based on this information using temporary files and merge sort. */
error = row_merge_build_indexes(trx, innodb_table, indexed_table, error = row_merge_build_indexes(trx, innodb_table, indexed_table,
index, num_of_idx); index, num_of_idx, table);
error_handling: error_handling:
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
......
...@@ -213,6 +213,18 @@ dtuple_create( ...@@ -213,6 +213,18 @@ dtuple_create(
is created */ is created */
ulint n_fields); /* in: number of fields */ ulint n_fields); /* in: number of fields */
/**************************************************************
Wrap data fields in a tuple. The default value for number
of fields used in record comparisons for this tuple is n_fields. */
UNIV_INLINE
const dtuple_t*
dtuple_from_fields(
/*===============*/
/* out: data tuple */
dtuple_t* tuple, /* in: storage for data tuple */
const dfield_t* fields, /* in: fields */
ulint n_fields); /* in: number of fields */
/************************************************************************* /*************************************************************************
Creates a dtuple for use in MySQL. */ Creates a dtuple for use in MySQL. */
...@@ -247,6 +259,14 @@ dtuple_get_data_size( ...@@ -247,6 +259,14 @@ dtuple_get_data_size(
/*=================*/ /*=================*/
/* out: sum of data lens */ /* out: sum of data lens */
const dtuple_t* tuple); /* in: typed data tuple */ const dtuple_t* tuple); /* in: typed data tuple */
/*************************************************************************
Computes the number of externally stored fields in a data tuple. */
UNIV_INLINE
ulint
dtuple_get_n_ext(
/*=============*/
/* out: number of fields */
const dtuple_t* tuple); /* in: tuple */
/**************************************************************** /****************************************************************
Compare two data tuples, respecting the collation of character fields. */ Compare two data tuples, respecting the collation of character fields. */
......
...@@ -349,6 +349,26 @@ dtuple_create( ...@@ -349,6 +349,26 @@ dtuple_create(
return(tuple); return(tuple);
} }
/**************************************************************
Wrap data fields in a tuple. The default value for number
of fields used in record comparisons for this tuple is n_fields. */
UNIV_INLINE
const dtuple_t*
dtuple_from_fields(
/*===============*/
/* out: data tuple */
dtuple_t* tuple, /* in: storage for data tuple */
const dfield_t* fields, /* in: fields */
ulint n_fields) /* in: number of fields */
{
tuple->info_bits = 0;
tuple->n_fields = tuple->n_fields_cmp = n_fields;
tuple->fields = (dfield_t*) fields;
ut_d(tuple->magic_n = DATA_TUPLE_MAGIC_N);
return(tuple);
}
/************************************************************** /**************************************************************
The following function returns the sum of data lengths of a tuple. The space The following function returns the sum of data lengths of a tuple. The space
occupied by the field structs or the tuple struct is not counted. Neither occupied by the field structs or the tuple struct is not counted. Neither
...@@ -386,6 +406,30 @@ dtuple_get_data_size( ...@@ -386,6 +406,30 @@ dtuple_get_data_size(
return(sum); return(sum);
} }
/*************************************************************************
Computes the number of externally stored fields in a data tuple. */
UNIV_INLINE
ulint
dtuple_get_n_ext(
/*=============*/
/* out: number of externally stored fields */
const dtuple_t* tuple) /* in: tuple */
{
ulint n_ext = 0;
ulint n_fields = tuple->n_fields;
ulint i;
ut_ad(tuple);
ut_ad(dtuple_check_typed(tuple));
ut_ad(tuple->magic_n == DATA_TUPLE_MAGIC_N);
for (i = 0; i < n_fields; i++) {
n_ext += dtuple_get_nth_field(tuple, i)->ext;
}
return(n_ext);
}
/*********************************************************************** /***********************************************************************
Sets types of fields binary in a tuple. */ Sets types of fields binary in a tuple. */
UNIV_INLINE UNIV_INLINE
......
...@@ -3,3 +3,23 @@ Smart ALTER TABLE ...@@ -3,3 +3,23 @@ Smart ALTER TABLE
(c) 2005-2007 Innobase Oy (c) 2005-2007 Innobase Oy
*******************************************************/ *******************************************************/
/*****************************************************************
Copies an InnoDB clustered index record to table->record[0]. */
void
innobase_rec_to_mysql(
/*==================*/
TABLE* table, /* in/out: MySQL table */
const rec_t* rec, /* in: record */
const dict_index_t* index, /* in: clustered index */
const ulint* offsets); /* in: rec_get_offsets(
rec, index, ...) */
/*****************************************************************
Resets table->record[0]. */
void
innobase_rec_reset(
/*===============*/
TABLE* table); /* in/out: MySQL table */
...@@ -165,5 +165,8 @@ row_merge_build_indexes( ...@@ -165,5 +165,8 @@ row_merge_build_indexes(
created; identical to old_table created; identical to old_table
unless creating a PRIMARY KEY */ unless creating a PRIMARY KEY */
dict_index_t** indexes, /* in: indexes to be created */ dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes); /* in: size of indexes[] */ ulint n_indexes, /* in: size of indexes[] */
TABLE* table); /* in/out: MySQL table, for
reporting erroneous key value
if applicable */
#endif /* row0merge.h */ #endif /* row0merge.h */
...@@ -38,4 +38,7 @@ typedef struct row_ext_struct row_ext_t; ...@@ -38,4 +38,7 @@ typedef struct row_ext_struct row_ext_t;
typedef struct row_prebuilt_struct row_prebuilt_t; typedef struct row_prebuilt_struct row_prebuilt_t;
/* MySQL data types */
typedef struct st_table TABLE;
#endif #endif
...@@ -38,6 +38,7 @@ Completed by Sunny Bains and Marko Makela ...@@ -38,6 +38,7 @@ Completed by Sunny Bains and Marko Makela
#include "mem0mem.h" #include "mem0mem.h"
#include "log0log.h" #include "log0log.h"
#include "ut0sort.h" #include "ut0sort.h"
#include "handler0alter.h"
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/* Set these in order ot enable debug printout. */ /* Set these in order ot enable debug printout. */
...@@ -221,13 +222,13 @@ row_merge_buf_add( ...@@ -221,13 +222,13 @@ row_merge_buf_add(
row_ext_t* ext) /* in/out: cache of externally stored row_ext_t* ext) /* in/out: cache of externally stored
column prefixes, or NULL */ column prefixes, or NULL */
{ {
ulint i; ulint i;
ulint n_fields; ulint n_fields;
ulint data_size; ulint data_size;
ulint extra_size; ulint extra_size;
dict_index_t* index; const dict_index_t* index;
dfield_t* entry; dfield_t* entry;
dfield_t* field; dfield_t* field;
if (buf->n_tuples >= buf->max_tuples) { if (buf->n_tuples >= buf->max_tuples) {
return(FALSE); return(FALSE);
...@@ -247,7 +248,7 @@ row_merge_buf_add( ...@@ -247,7 +248,7 @@ row_merge_buf_add(
extra_size = UT_BITS_IN_BYTES(index->n_nullable); extra_size = UT_BITS_IN_BYTES(index->n_nullable);
for (i = 0; i < n_fields; i++, field++) { for (i = 0; i < n_fields; i++, field++) {
dict_field_t* ifield; const dict_field_t* ifield;
const dict_col_t* col; const dict_col_t* col;
ulint col_no; ulint col_no;
const dfield_t* row_field; const dfield_t* row_field;
...@@ -359,6 +360,137 @@ row_merge_buf_add( ...@@ -359,6 +360,137 @@ row_merge_buf_add(
return(TRUE); return(TRUE);
} }
/* Structure for reporting duplicate records. */
struct row_merge_dup_struct {
const dict_index_t* index; /* index being sorted */
const dict_table_t* old_table; /* original table */
TABLE* table; /* MySQL table object */
ulint n_dup; /* number of duplicates */
};
typedef struct row_merge_dup_struct row_merge_dup_t;
/*****************************************************************
Report a duplicate key. */
static
void
row_merge_dup_report(
/*=================*/
row_merge_dup_t* dup, /* in/out: for reporting duplicates */
const dfield_t* entry) /* in: duplicate index entry */
{
/* Buffer for converting the record */
byte buf[UNIV_PAGE_SIZE / 2];
const dtuple_t* tuple;
dtuple_t tuple_store;
const rec_t* clust_rec;
const dict_index_t* index = dup->index;
const dict_table_t* table = index->table;
ulint n_fields= dict_index_get_n_fields(index);
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets;
if (dup->n_dup++) {
/* Only report the first duplicate record,
but count all duplicate records. */
return;
}
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
if (table != dup->old_table) {
/* A new clustered index is being created. */
if (dict_index_is_clust(index)) {
/* Convert the clustered index record
to MySQL format. */
ulint n_ext;
tuple = dtuple_from_fields(&tuple_store,
entry, n_fields);
n_ext = dtuple_get_n_ext(tuple);
clust_rec = rec_convert_dtuple_to_rec(buf, index,
tuple, n_ext);
offsets = rec_get_offsets(clust_rec, index, offsets_,
ULINT_UNDEFINED, &heap);
innobase_rec_to_mysql(dup->table, clust_rec,
index, offsets);
} else {
/* We cannot fetch the MySQL record
corresponding to a secondary index record when
also creating the clustered index. Consider a
table t (a,b,c,d) that lacks a primary key.
Consider the operation ALTER TABLE t ADD
PRIMARY KEY (a,b), ADD UNIQUE KEY (c).
The original clustered index record is
(DB_ROW_ID,DB_TRX_ID,DB_ROLL_PTR,a,b,c,d),
the new clustered index record is
(a,b,DB_TRX_ID,DB_ROLL_PTR,c,d), and the
unique secondary index record is (c,a,b).
Because the new records do not contain
DB_ROW_ID and the new clustered index B-tree
has not been created when the UNIQUE KEY (c)
violation is detected, it is impossible to
fetch the clustered index record without an
expensive table scan. */
innobase_rec_reset(dup->table);
}
} else {
const dict_index_t* clust_index
= dict_table_get_first_index(dup->old_table);
btr_pcur_t pcur;
mtr_t mtr;
ulint n_uniq;
ut_ad(!dict_index_is_clust(index));
ut_ad(dict_index_is_clust(clust_index));
/* Build a search tuple for the clustered index record. */
n_uniq = dict_index_get_n_unique(clust_index);
ut_a(n_uniq < n_fields);
tuple = dtuple_from_fields(&tuple_store,
entry + n_fields - n_uniq, n_uniq);
/* Fetch the clustered index record. */
mtr_start(&mtr);
if (row_search_on_row_ref(&pcur, BTR_SEARCH_LEAF,
table, tuple, &mtr)) {
ut_ad(clust_index == btr_cur_get_index(
btr_pcur_get_btr_cur(&pcur)));
clust_rec = btr_pcur_get_rec(&pcur);
offsets = rec_get_offsets(clust_rec, clust_index,
offsets_, ULINT_UNDEFINED,
&heap);
innobase_rec_to_mysql(dup->table, clust_rec,
clust_index, offsets);
} else {
/* The clustered index record was not found.
This should never happen, but we ignore this
error unless UNIV_DEBUG is defined. */
ut_ad(0);
innobase_rec_reset(dup->table);
}
btr_pcur_close(&pcur);
mtr_commit(&mtr);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
/***************************************************************** /*****************************************************************
Compare two tuples. */ Compare two tuples. */
static static
...@@ -368,18 +500,19 @@ row_merge_tuple_cmp( ...@@ -368,18 +500,19 @@ row_merge_tuple_cmp(
/* out: 1, 0, -1 if a is greater, /* out: 1, 0, -1 if a is greater,
equal, less, respectively, than b */ equal, less, respectively, than b */
ulint n_field,/* in: number of fields */ ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */
const dfield_t* a, /* in: first tuple to be compared */ const dfield_t* a, /* in: first tuple to be compared */
const dfield_t* b) /* in: second tuple to be compared */ const dfield_t* b, /* in: second tuple to be compared */
row_merge_dup_t* dup) /* in/out: for reporting duplicates */
{ {
int cmp; int cmp;
const dfield_t* field = a;
do { do {
cmp = cmp_dfield_dfield(a++, b++); cmp = cmp_dfield_dfield(a++, b++);
} while (!cmp && --n_field); } while (!cmp && --n_field);
if (!cmp) { if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
(*n_dup)++; row_merge_dup_report(dup, field);
} }
return(cmp); return(cmp);
...@@ -392,7 +525,7 @@ void ...@@ -392,7 +525,7 @@ void
row_merge_tuple_sort( row_merge_tuple_sort(
/*=================*/ /*=================*/
ulint n_field,/* in: number of fields */ ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */ row_merge_dup_t* dup, /* in/out: for reporting duplicates */
const dfield_t** tuples, /* in/out: tuples */ const dfield_t** tuples, /* in/out: tuples */
const dfield_t** aux, /* in/out: work area */ const dfield_t** aux, /* in/out: work area */
ulint low, /* in: lower bound of the ulint low, /* in: lower bound of the
...@@ -401,8 +534,8 @@ row_merge_tuple_sort( ...@@ -401,8 +534,8 @@ row_merge_tuple_sort(
sorting area, exclusive */ sorting area, exclusive */
{ {
#define row_merge_tuple_sort_ctx(a,b,c,d) \ #define row_merge_tuple_sort_ctx(a,b,c,d) \
row_merge_tuple_sort(n_field, n_dup, a, b, c, d) row_merge_tuple_sort(n_field, dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, n_dup, a, b) #define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx, UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
tuples, aux, low, high, row_merge_tuple_cmp_ctx); tuples, aux, low, high, row_merge_tuple_cmp_ctx);
...@@ -411,19 +544,14 @@ row_merge_tuple_sort( ...@@ -411,19 +544,14 @@ row_merge_tuple_sort(
/********************************************************** /**********************************************************
Sort a buffer. */ Sort a buffer. */
static static
ulint void
row_merge_buf_sort( row_merge_buf_sort(
/*===============*/ /*===============*/
/* out: number of duplicates row_merge_buf_t* buf, /* in/out: sort buffer */
encountered */ row_merge_dup_t* dup) /* in/out: for reporting duplicates */
row_merge_buf_t* buf) /* in/out: sort buffer */
{ {
ulint n_dup = 0; row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
row_merge_tuple_sort(dict_index_get_n_unique(buf->index), &n_dup,
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples); buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
return(n_dup);
} }
/********************************************************** /**********************************************************
...@@ -441,9 +569,9 @@ row_merge_buf_write( ...@@ -441,9 +569,9 @@ row_merge_buf_write(
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block) # define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
#endif /* !UNIV_DEBUG */ #endif /* !UNIV_DEBUG */
{ {
dict_index_t* index = buf->index; const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index); ulint n_fields= dict_index_get_n_fields(index);
byte* b = &(*block)[0]; byte* b = &(*block)[0];
ulint i; ulint i;
...@@ -452,7 +580,7 @@ row_merge_buf_write( ...@@ -452,7 +580,7 @@ row_merge_buf_write(
ulint extra_size; ulint extra_size;
const dfield_t* entry = buf->tuples[i]; const dfield_t* entry = buf->tuples[i];
size = rec_get_converted_size_comp(buf->index, size = rec_get_converted_size_comp(index,
REC_STATUS_ORDINARY, REC_STATUS_ORDINARY,
entry, n_fields, entry, n_fields,
&extra_size); &extra_size);
...@@ -511,10 +639,10 @@ static ...@@ -511,10 +639,10 @@ static
mem_heap_t* mem_heap_t*
row_merge_heap_create( row_merge_heap_create(
/*==================*/ /*==================*/
/* out: memory heap */ /* out: memory heap */
dict_index_t* index, /* in: record descriptor */ const dict_index_t* index, /* in: record descriptor */
ulint** offsets1, /* out: offsets */ ulint** offsets1, /* out: offsets */
ulint** offsets2) /* out: offsets */ ulint** offsets2) /* out: offsets */
{ {
ulint i = 1 + REC_OFFS_HEADER_SIZE ulint i = 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index); + dict_index_get_n_fields(index);
...@@ -620,7 +748,7 @@ row_merge_read_rec( ...@@ -620,7 +748,7 @@ row_merge_read_rec(
row_merge_block_t* block, /* in/out: file buffer */ row_merge_block_t* block, /* in/out: file buffer */
mrec_buf_t* buf, /* in/out: secondary buffer */ mrec_buf_t* buf, /* in/out: secondary buffer */
const byte* b, /* in: pointer to record */ const byte* b, /* in: pointer to record */
dict_index_t* index, /* in: index of the record */ const dict_index_t* index, /* in: index of the record */
int fd, /* in: file descriptor */ int fd, /* in: file descriptor */
ulint* foffs, /* in/out: file offset */ ulint* foffs, /* in/out: file offset */
const mrec_t** mrec, /* out: pointer to merge record, const mrec_t** mrec, /* out: pointer to merge record,
...@@ -930,16 +1058,16 @@ static ...@@ -930,16 +1058,16 @@ static
int int
row_merge_cmp( row_merge_cmp(
/*==========*/ /*==========*/
/* out: 1, 0, -1 if mrec1 is /* out: 1, 0, -1 if
greater, equal, less, mrec1 is greater, equal, less,
respectively, than mrec2 */ respectively, than mrec2 */
const mrec_t* mrec1, /* in: first merge record to be const mrec_t* mrec1, /* in: first merge
compared */ record to be compared */
const mrec_t* mrec2, /* in: second merge record to be const mrec_t* mrec2, /* in: second merge
compared */ record to be compared */
const ulint* offsets1, /* in: first record offsets */ const ulint* offsets1, /* in: first record offsets */
const ulint* offsets2, /* in: second record offsets */ const ulint* offsets2, /* in: second record offsets */
dict_index_t* index) /* in: index */ const dict_index_t* index) /* in: index */
{ {
int cmp; int cmp;
...@@ -967,9 +1095,11 @@ row_merge_read_clustered_index( ...@@ -967,9 +1095,11 @@ row_merge_read_clustered_index(
/*===========================*/ /*===========================*/
/* out: DB_SUCCESS or error */ /* out: DB_SUCCESS or error */
trx_t* trx, /* in: transaction */ trx_t* trx, /* in: transaction */
dict_table_t* old_table,/* in: table where rows are TABLE* table, /* in/out: MySQL table object,
for reporting erroneous records */
const dict_table_t* old_table,/* in: table where rows are
read from */ read from */
dict_table_t* new_table,/* in: table where indexes are const dict_table_t* new_table,/* in: table where indexes are
created; identical to old_table created; identical to old_table
unless creating a PRIMARY KEY */ unless creating a PRIMARY KEY */
dict_index_t** index, /* in: indexes to be created */ dict_index_t** index, /* in: indexes to be created */
...@@ -1054,6 +1184,7 @@ row_merge_read_clustered_index( ...@@ -1054,6 +1184,7 @@ row_merge_read_clustered_index(
/* Scan the clustered index. */ /* Scan the clustered index. */
for (;;) { for (;;) {
const rec_t* rec; const rec_t* rec;
ulint* offsets;
dtuple_t* row = NULL; dtuple_t* row = NULL;
row_ext_t* ext; row_ext_t* ext;
ibool has_next = TRUE; ibool has_next = TRUE;
...@@ -1074,6 +1205,8 @@ row_merge_read_clustered_index( ...@@ -1074,6 +1205,8 @@ row_merge_read_clustered_index(
if (UNIV_LIKELY(has_next)) { if (UNIV_LIKELY(has_next)) {
rec = btr_pcur_get_rec(&pcur); rec = btr_pcur_get_rec(&pcur);
offsets = rec_get_offsets(rec, clust_index, NULL,
ULINT_UNDEFINED, &row_heap);
/* Skip delete marked records. */ /* Skip delete marked records. */
if (rec_get_deleted_flag( if (rec_get_deleted_flag(
...@@ -1086,7 +1219,7 @@ row_merge_read_clustered_index( ...@@ -1086,7 +1219,7 @@ row_merge_read_clustered_index(
/* Build a row based on the clustered index. */ /* Build a row based on the clustered index. */
row = row_build(ROW_COPY_POINTERS, clust_index, row = row_build(ROW_COPY_POINTERS, clust_index,
rec, NULL, &ext, row_heap); rec, offsets, &ext, row_heap);
if (UNIV_LIKELY_NULL(nonnull)) { if (UNIV_LIKELY_NULL(nonnull)) {
for (i = 0; i < n_nonnull; i++) { for (i = 0; i < n_nonnull; i++) {
...@@ -1097,9 +1230,9 @@ row_merge_read_clustered_index( ...@@ -1097,9 +1230,9 @@ row_merge_read_clustered_index(
& DATA_NOT_NULL)); & DATA_NOT_NULL));
if (dfield_is_null(field)) { if (dfield_is_null(field)) {
trx->error_key_num = 0;
err = DB_PRIMARY_KEY_IS_NULL; err = DB_PRIMARY_KEY_IS_NULL;
goto func_exit; i = 0;
goto err_exit;
} }
field->type.prtype |= DATA_NOT_NULL; field->type.prtype |= DATA_NOT_NULL;
...@@ -1113,6 +1246,7 @@ row_merge_read_clustered_index( ...@@ -1113,6 +1246,7 @@ row_merge_read_clustered_index(
for (i = 0; i < n_index; i++) { for (i = 0; i < n_index; i++) {
row_merge_buf_t* buf = merge_buf[i]; row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i]; merge_file_t* file = &files[i];
const dict_index_t* index = buf->index;
if (UNIV_LIKELY if (UNIV_LIKELY
(row && row_merge_buf_add(buf, row, ext))) { (row && row_merge_buf_add(buf, row, ext))) {
...@@ -1126,21 +1260,32 @@ row_merge_read_clustered_index( ...@@ -1126,21 +1260,32 @@ row_merge_read_clustered_index(
/* We have enough data tuples to form a block. /* We have enough data tuples to form a block.
Sort them and write to disk. */ Sort them and write to disk. */
if (buf->n_tuples if (buf->n_tuples) {
&& row_merge_buf_sort(buf) if (dict_index_is_unique(index)) {
&& dict_index_is_unique(buf->index)) { row_merge_dup_t dup = {
trx->error_key_num = i; buf->index, old_table,
err = DB_DUPLICATE_KEY; table, 0
goto func_exit; };
row_merge_buf_sort(buf, &dup);
if (dup.n_dup) {
err = DB_DUPLICATE_KEY;
err_exit:
trx->error_key_num = i;
goto func_exit;
}
} else {
row_merge_buf_sort(buf, NULL);
}
} }
row_merge_buf_write(buf, file, block); row_merge_buf_write(buf, file, block);
if (!row_merge_write(file->fd, file->offset++, if (!row_merge_write(file->fd, file->offset++,
block)) { block)) {
trx->error_key_num = i;
err = DB_OUT_OF_FILE_SPACE; err = DB_OUT_OF_FILE_SPACE;
goto func_exit; goto err_exit;
} }
UNIV_MEM_INVALID(block[0], sizeof block[0]); UNIV_MEM_INVALID(block[0], sizeof block[0]);
...@@ -1191,7 +1336,7 @@ ulint ...@@ -1191,7 +1336,7 @@ ulint
row_merge_blocks( row_merge_blocks(
/*=============*/ /*=============*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS or error code */
dict_index_t* index, /* in: index being created */ const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing merge_file_t* file, /* in/out: file containing
index entries */ index entries */
row_merge_block_t* block, /* in/out: 3 buffers */ row_merge_block_t* block, /* in/out: 3 buffers */
...@@ -1264,6 +1409,7 @@ row_merge_blocks( ...@@ -1264,6 +1409,7 @@ row_merge_blocks(
if (UNIV_UNLIKELY if (UNIV_UNLIKELY
(dict_index_is_unique(index))) { (dict_index_is_unique(index))) {
mem_heap_free(heap); mem_heap_free(heap);
/* TODO: if clustered, convert to MySQL */
return(DB_DUPLICATE_KEY); return(DB_DUPLICATE_KEY);
} }
/* fall through */ /* fall through */
...@@ -1308,7 +1454,7 @@ row_merge( ...@@ -1308,7 +1454,7 @@ row_merge(
/*======*/ /*======*/
/* out: DB_SUCCESS /* out: DB_SUCCESS
or error code */ or error code */
dict_index_t* index, /* in: index being created */ const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing merge_file_t* file, /* in/out: file containing
index entries */ index entries */
ulint half, /* in: half the file */ ulint half, /* in: half the file */
...@@ -1371,7 +1517,7 @@ row_merge_sort( ...@@ -1371,7 +1517,7 @@ row_merge_sort(
/*===========*/ /*===========*/
/* out: DB_SUCCESS /* out: DB_SUCCESS
or error code */ or error code */
dict_index_t* index, /* in: index being created */ const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing merge_file_t* file, /* in/out: file containing
index entries */ index entries */
row_merge_block_t* block, /* in/out: 3 buffers */ row_merge_block_t* block, /* in/out: 3 buffers */
...@@ -2131,7 +2277,10 @@ row_merge_build_indexes( ...@@ -2131,7 +2277,10 @@ row_merge_build_indexes(
created; identical to old_table created; identical to old_table
unless creating a PRIMARY KEY */ unless creating a PRIMARY KEY */
dict_index_t** indexes, /* in: indexes to be created */ dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes) /* in: size of indexes[] */ ulint n_indexes, /* in: size of indexes[] */
TABLE* table) /* in/out: MySQL table, for
reporting erroneous key value
if applicable */
{ {
merge_file_t* merge_files; merge_file_t* merge_files;
row_merge_block_t* block; row_merge_block_t* block;
...@@ -2166,7 +2315,7 @@ row_merge_build_indexes( ...@@ -2166,7 +2315,7 @@ row_merge_build_indexes(
secondary index entries for merge sort */ secondary index entries for merge sort */
error = row_merge_read_clustered_index( error = row_merge_read_clustered_index(
trx, old_table, new_table, indexes, trx, table, old_table, new_table, indexes,
merge_files, n_indexes, block); merge_files, n_indexes, block);
if (error != DB_SUCCESS) { if (error != DB_SUCCESS) {
...@@ -2193,6 +2342,7 @@ row_merge_build_indexes( ...@@ -2193,6 +2342,7 @@ row_merge_build_indexes(
if (error != DB_SUCCESS) { if (error != DB_SUCCESS) {
trx->error_key_num = i; trx->error_key_num = i;
innobase_rec_reset(table);
goto func_exit; goto func_exit;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment