Commit 03a7cb22 authored by marko's avatar marko

branches/zip: Implement the reporting of duplicate key values to MySQL.

innobase_rec_to_mysql(): New function, for converting an InnoDB clustered
index record to MySQL table->record[0].  TODO: convert integer fields.
Currently, integer fields are in big-endian byte order instead of
host byte order, and signed integer fields are offset by 0x80000000.

innobase_rec_reset(): New function, for resetting table->record[0].

row_merge_build_indexes(): Add the parameter TABLE* table (the MySQL table
handle) for reporting duplicate key values.

dtuple_from_fields(): New function, to convert an array of dfield_t* to
dtuple_t.

dtuple_get_n_ext(): New function, to compute the number of externally stored
fields.

row_merge_dup_t: Structure for counting and reporting duplicate records.

row_merge_dup_report(): Function for counting and reporting duplicate records.

row_merge_tuple_cmp(), row_merge_tuple_sort(): Replace the ulint* n_dup
parameter with row_merge_dup_t* dup.

row_merge_buf_sort(): Add the parameter row_merge_dup_t* dup, which is
NULL when sorting a non-unique index.

row_merge_buf_write(), row_merge_heap_create(), row_merge_read_rec(),
row_merge_cmp(), row_merge_read_clustered_index(), row_merge_blocks(),
row_merge(), row_merge_sort(): Add const qualifiers.

row_merge_read_clustered_index(): Use a common error handling branch err_exit.
Invoke row_merge_buf_sort() differently on unique indexes.

row_merge_blocks(): note TODO: We could invoke innobase_rec_to_mysql()
to report duplicate key values when creating a clustered index.
parent d0631476
......@@ -19,6 +19,80 @@ extern "C" {
#include "handler0alter.h"
}
/*****************************************************************
Copies an InnoDB clustered index record to table->record[0]. */
extern "C"
void
innobase_rec_to_mysql(
/*==================*/
TABLE* table, /* in/out: MySQL table */
const rec_t* rec, /* in: record */
const dict_index_t* index, /* in: clustered index */
const ulint* offsets) /* in: rec_get_offsets(
rec, index, ...) */
{
uint n_fields = table->s->fields;
uint i;
ut_ad(dict_index_is_clust(index));
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(n_fields == dict_table_get_n_user_cols(index->table));
for (i = 0; i < n_fields; i++) {
Field* field = table->field[i];
void* ptr = field->ptr;
uint32 flen = field->pack_length();
ulint ipos;
ulint ilen;
const void* ifield;
ipos = dict_index_get_nth_col_pos(index, i);
ut_ad(ipos != ULINT_UNDEFINED);
if (UNIV_UNLIKELY(ipos == ULINT_UNDEFINED)) {
reset_field:
field->reset();
field->set_null();
continue;
}
ifield = rec_get_nth_field(rec, offsets, ipos, &ilen);
/* Assign the NULL flag */
if (ilen == UNIV_SQL_NULL) {
ut_ad(field->real_maybe_null());
goto reset_field;
} else {
field->set_notnull();
/* Copy the data. */
/* TODO: convert integer fields */
if (ilen >= flen) {
memcpy(ptr, ifield, flen);
} else {
field->reset();
memcpy(ptr, ifield, ilen);
}
}
}
}
/*****************************************************************
Resets table->record[0]. */
extern "C"
void
innobase_rec_reset(
/*===============*/
TABLE* table) /* in/out: MySQL table */
{
uint n_fields = table->s->fields;
uint i;
for (i = 0; i < n_fields; i++) {
table->field[i]->set_default();
}
}
/**********************************************************************
Removes the filename encoding of a database and table name. */
static
......@@ -638,7 +712,7 @@ err_exit:
/* Read the clustered index of the table and build indexes
based on this information using temporary files and merge sort. */
error = row_merge_build_indexes(trx, innodb_table, indexed_table,
index, num_of_idx);
index, num_of_idx, table);
error_handling:
#ifdef UNIV_DEBUG
......
......@@ -213,6 +213,18 @@ dtuple_create(
is created */
ulint n_fields); /* in: number of fields */
/**************************************************************
Wrap data fields in a tuple. The default value for number
of fields used in record comparisons for this tuple is n_fields. */
UNIV_INLINE
const dtuple_t*
dtuple_from_fields(
/*===============*/
/* out: data tuple */
dtuple_t* tuple, /* in: storage for data tuple */
const dfield_t* fields, /* in: fields */
ulint n_fields); /* in: number of fields */
/*************************************************************************
Creates a dtuple for use in MySQL. */
......@@ -247,6 +259,14 @@ dtuple_get_data_size(
/*=================*/
/* out: sum of data lens */
const dtuple_t* tuple); /* in: typed data tuple */
/*************************************************************************
Computes the number of externally stored fields in a data tuple. */
UNIV_INLINE
ulint
dtuple_get_n_ext(
/*=============*/
/* out: number of fields */
const dtuple_t* tuple); /* in: tuple */
/****************************************************************
Compare two data tuples, respecting the collation of character fields. */
......
......@@ -349,6 +349,26 @@ dtuple_create(
return(tuple);
}
/**************************************************************
Wrap data fields in a tuple. The default value for number
of fields used in record comparisons for this tuple is n_fields. */
UNIV_INLINE
const dtuple_t*
dtuple_from_fields(
/*===============*/
/* out: data tuple */
dtuple_t* tuple, /* in: storage for data tuple */
const dfield_t* fields, /* in: fields */
ulint n_fields) /* in: number of fields */
{
tuple->info_bits = 0;
tuple->n_fields = tuple->n_fields_cmp = n_fields;
tuple->fields = (dfield_t*) fields;
ut_d(tuple->magic_n = DATA_TUPLE_MAGIC_N);
return(tuple);
}
/**************************************************************
The following function returns the sum of data lengths of a tuple. The space
occupied by the field structs or the tuple struct is not counted. Neither
......@@ -386,6 +406,30 @@ dtuple_get_data_size(
return(sum);
}
/*************************************************************************
Computes the number of externally stored fields in a data tuple. */
UNIV_INLINE
ulint
dtuple_get_n_ext(
/*=============*/
/* out: number of externally stored fields */
const dtuple_t* tuple) /* in: tuple */
{
ulint n_ext = 0;
ulint n_fields = tuple->n_fields;
ulint i;
ut_ad(tuple);
ut_ad(dtuple_check_typed(tuple));
ut_ad(tuple->magic_n == DATA_TUPLE_MAGIC_N);
for (i = 0; i < n_fields; i++) {
n_ext += dtuple_get_nth_field(tuple, i)->ext;
}
return(n_ext);
}
/***********************************************************************
Sets types of fields binary in a tuple. */
UNIV_INLINE
......
......@@ -3,3 +3,23 @@ Smart ALTER TABLE
(c) 2005-2007 Innobase Oy
*******************************************************/
/*****************************************************************
Copies an InnoDB clustered index record to table->record[0]. */
void
innobase_rec_to_mysql(
/*==================*/
TABLE* table, /* in/out: MySQL table */
const rec_t* rec, /* in: record */
const dict_index_t* index, /* in: clustered index */
const ulint* offsets); /* in: rec_get_offsets(
rec, index, ...) */
/*****************************************************************
Resets table->record[0]. */
void
innobase_rec_reset(
/*===============*/
TABLE* table); /* in/out: MySQL table */
......@@ -165,5 +165,8 @@ row_merge_build_indexes(
created; identical to old_table
unless creating a PRIMARY KEY */
dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes); /* in: size of indexes[] */
ulint n_indexes, /* in: size of indexes[] */
TABLE* table); /* in/out: MySQL table, for
reporting erroneous key value
if applicable */
#endif /* row0merge.h */
......@@ -38,4 +38,7 @@ typedef struct row_ext_struct row_ext_t;
typedef struct row_prebuilt_struct row_prebuilt_t;
/* MySQL data types */
typedef struct st_table TABLE;
#endif
......@@ -38,6 +38,7 @@ Completed by Sunny Bains and Marko Makela
#include "mem0mem.h"
#include "log0log.h"
#include "ut0sort.h"
#include "handler0alter.h"
#ifdef UNIV_DEBUG
/* Set these in order ot enable debug printout. */
......@@ -221,13 +222,13 @@ row_merge_buf_add(
row_ext_t* ext) /* in/out: cache of externally stored
column prefixes, or NULL */
{
ulint i;
ulint n_fields;
ulint data_size;
ulint extra_size;
dict_index_t* index;
dfield_t* entry;
dfield_t* field;
ulint i;
ulint n_fields;
ulint data_size;
ulint extra_size;
const dict_index_t* index;
dfield_t* entry;
dfield_t* field;
if (buf->n_tuples >= buf->max_tuples) {
return(FALSE);
......@@ -247,7 +248,7 @@ row_merge_buf_add(
extra_size = UT_BITS_IN_BYTES(index->n_nullable);
for (i = 0; i < n_fields; i++, field++) {
dict_field_t* ifield;
const dict_field_t* ifield;
const dict_col_t* col;
ulint col_no;
const dfield_t* row_field;
......@@ -359,6 +360,137 @@ row_merge_buf_add(
return(TRUE);
}
/* Structure for reporting duplicate records. */
struct row_merge_dup_struct {
const dict_index_t* index; /* index being sorted */
const dict_table_t* old_table; /* original table */
TABLE* table; /* MySQL table object */
ulint n_dup; /* number of duplicates */
};
typedef struct row_merge_dup_struct row_merge_dup_t;
/*****************************************************************
Report a duplicate key. */
static
void
row_merge_dup_report(
/*=================*/
row_merge_dup_t* dup, /* in/out: for reporting duplicates */
const dfield_t* entry) /* in: duplicate index entry */
{
/* Buffer for converting the record */
byte buf[UNIV_PAGE_SIZE / 2];
const dtuple_t* tuple;
dtuple_t tuple_store;
const rec_t* clust_rec;
const dict_index_t* index = dup->index;
const dict_table_t* table = index->table;
ulint n_fields= dict_index_get_n_fields(index);
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets;
if (dup->n_dup++) {
/* Only report the first duplicate record,
but count all duplicate records. */
return;
}
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
if (table != dup->old_table) {
/* A new clustered index is being created. */
if (dict_index_is_clust(index)) {
/* Convert the clustered index record
to MySQL format. */
ulint n_ext;
tuple = dtuple_from_fields(&tuple_store,
entry, n_fields);
n_ext = dtuple_get_n_ext(tuple);
clust_rec = rec_convert_dtuple_to_rec(buf, index,
tuple, n_ext);
offsets = rec_get_offsets(clust_rec, index, offsets_,
ULINT_UNDEFINED, &heap);
innobase_rec_to_mysql(dup->table, clust_rec,
index, offsets);
} else {
/* We cannot fetch the MySQL record
corresponding to a secondary index record when
also creating the clustered index. Consider a
table t (a,b,c,d) that lacks a primary key.
Consider the operation ALTER TABLE t ADD
PRIMARY KEY (a,b), ADD UNIQUE KEY (c).
The original clustered index record is
(DB_ROW_ID,DB_TRX_ID,DB_ROLL_PTR,a,b,c,d),
the new clustered index record is
(a,b,DB_TRX_ID,DB_ROLL_PTR,c,d), and the
unique secondary index record is (c,a,b).
Because the new records do not contain
DB_ROW_ID and the new clustered index B-tree
has not been created when the UNIQUE KEY (c)
violation is detected, it is impossible to
fetch the clustered index record without an
expensive table scan. */
innobase_rec_reset(dup->table);
}
} else {
const dict_index_t* clust_index
= dict_table_get_first_index(dup->old_table);
btr_pcur_t pcur;
mtr_t mtr;
ulint n_uniq;
ut_ad(!dict_index_is_clust(index));
ut_ad(dict_index_is_clust(clust_index));
/* Build a search tuple for the clustered index record. */
n_uniq = dict_index_get_n_unique(clust_index);
ut_a(n_uniq < n_fields);
tuple = dtuple_from_fields(&tuple_store,
entry + n_fields - n_uniq, n_uniq);
/* Fetch the clustered index record. */
mtr_start(&mtr);
if (row_search_on_row_ref(&pcur, BTR_SEARCH_LEAF,
table, tuple, &mtr)) {
ut_ad(clust_index == btr_cur_get_index(
btr_pcur_get_btr_cur(&pcur)));
clust_rec = btr_pcur_get_rec(&pcur);
offsets = rec_get_offsets(clust_rec, clust_index,
offsets_, ULINT_UNDEFINED,
&heap);
innobase_rec_to_mysql(dup->table, clust_rec,
clust_index, offsets);
} else {
/* The clustered index record was not found.
This should never happen, but we ignore this
error unless UNIV_DEBUG is defined. */
ut_ad(0);
innobase_rec_reset(dup->table);
}
btr_pcur_close(&pcur);
mtr_commit(&mtr);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
/*****************************************************************
Compare two tuples. */
static
......@@ -368,18 +500,19 @@ row_merge_tuple_cmp(
/* out: 1, 0, -1 if a is greater,
equal, less, respectively, than b */
ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */
const dfield_t* a, /* in: first tuple to be compared */
const dfield_t* b) /* in: second tuple to be compared */
const dfield_t* b, /* in: second tuple to be compared */
row_merge_dup_t* dup) /* in/out: for reporting duplicates */
{
int cmp;
int cmp;
const dfield_t* field = a;
do {
cmp = cmp_dfield_dfield(a++, b++);
} while (!cmp && --n_field);
if (!cmp) {
(*n_dup)++;
if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
row_merge_dup_report(dup, field);
}
return(cmp);
......@@ -392,7 +525,7 @@ void
row_merge_tuple_sort(
/*=================*/
ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */
row_merge_dup_t* dup, /* in/out: for reporting duplicates */
const dfield_t** tuples, /* in/out: tuples */
const dfield_t** aux, /* in/out: work area */
ulint low, /* in: lower bound of the
......@@ -401,8 +534,8 @@ row_merge_tuple_sort(
sorting area, exclusive */
{
#define row_merge_tuple_sort_ctx(a,b,c,d) \
row_merge_tuple_sort(n_field, n_dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, n_dup, a, b)
row_merge_tuple_sort(n_field, dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
tuples, aux, low, high, row_merge_tuple_cmp_ctx);
......@@ -411,19 +544,14 @@ row_merge_tuple_sort(
/**********************************************************
Sort a buffer. */
static
ulint
void
row_merge_buf_sort(
/*===============*/
/* out: number of duplicates
encountered */
row_merge_buf_t* buf) /* in/out: sort buffer */
row_merge_buf_t* buf, /* in/out: sort buffer */
row_merge_dup_t* dup) /* in/out: for reporting duplicates */
{
ulint n_dup = 0;
row_merge_tuple_sort(dict_index_get_n_unique(buf->index), &n_dup,
row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
return(n_dup);
}
/**********************************************************
......@@ -441,9 +569,9 @@ row_merge_buf_write(
# define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
#endif /* !UNIV_DEBUG */
{
dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &(*block)[0];
const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &(*block)[0];
ulint i;
......@@ -452,7 +580,7 @@ row_merge_buf_write(
ulint extra_size;
const dfield_t* entry = buf->tuples[i];
size = rec_get_converted_size_comp(buf->index,
size = rec_get_converted_size_comp(index,
REC_STATUS_ORDINARY,
entry, n_fields,
&extra_size);
......@@ -511,10 +639,10 @@ static
mem_heap_t*
row_merge_heap_create(
/*==================*/
/* out: memory heap */
dict_index_t* index, /* in: record descriptor */
ulint** offsets1, /* out: offsets */
ulint** offsets2) /* out: offsets */
/* out: memory heap */
const dict_index_t* index, /* in: record descriptor */
ulint** offsets1, /* out: offsets */
ulint** offsets2) /* out: offsets */
{
ulint i = 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index);
......@@ -620,7 +748,7 @@ row_merge_read_rec(
row_merge_block_t* block, /* in/out: file buffer */
mrec_buf_t* buf, /* in/out: secondary buffer */
const byte* b, /* in: pointer to record */
dict_index_t* index, /* in: index of the record */
const dict_index_t* index, /* in: index of the record */
int fd, /* in: file descriptor */
ulint* foffs, /* in/out: file offset */
const mrec_t** mrec, /* out: pointer to merge record,
......@@ -930,16 +1058,16 @@ static
int
row_merge_cmp(
/*==========*/
/* out: 1, 0, -1 if mrec1 is
greater, equal, less,
respectively, than mrec2 */
const mrec_t* mrec1, /* in: first merge record to be
compared */
const mrec_t* mrec2, /* in: second merge record to be
compared */
const ulint* offsets1, /* in: first record offsets */
const ulint* offsets2, /* in: second record offsets */
dict_index_t* index) /* in: index */
/* out: 1, 0, -1 if
mrec1 is greater, equal, less,
respectively, than mrec2 */
const mrec_t* mrec1, /* in: first merge
record to be compared */
const mrec_t* mrec2, /* in: second merge
record to be compared */
const ulint* offsets1, /* in: first record offsets */
const ulint* offsets2, /* in: second record offsets */
const dict_index_t* index) /* in: index */
{
int cmp;
......@@ -967,9 +1095,11 @@ row_merge_read_clustered_index(
/*===========================*/
/* out: DB_SUCCESS or error */
trx_t* trx, /* in: transaction */
dict_table_t* old_table,/* in: table where rows are
TABLE* table, /* in/out: MySQL table object,
for reporting erroneous records */
const dict_table_t* old_table,/* in: table where rows are
read from */
dict_table_t* new_table,/* in: table where indexes are
const dict_table_t* new_table,/* in: table where indexes are
created; identical to old_table
unless creating a PRIMARY KEY */
dict_index_t** index, /* in: indexes to be created */
......@@ -1054,6 +1184,7 @@ row_merge_read_clustered_index(
/* Scan the clustered index. */
for (;;) {
const rec_t* rec;
ulint* offsets;
dtuple_t* row = NULL;
row_ext_t* ext;
ibool has_next = TRUE;
......@@ -1074,6 +1205,8 @@ row_merge_read_clustered_index(
if (UNIV_LIKELY(has_next)) {
rec = btr_pcur_get_rec(&pcur);
offsets = rec_get_offsets(rec, clust_index, NULL,
ULINT_UNDEFINED, &row_heap);
/* Skip delete marked records. */
if (rec_get_deleted_flag(
......@@ -1086,7 +1219,7 @@ row_merge_read_clustered_index(
/* Build a row based on the clustered index. */
row = row_build(ROW_COPY_POINTERS, clust_index,
rec, NULL, &ext, row_heap);
rec, offsets, &ext, row_heap);
if (UNIV_LIKELY_NULL(nonnull)) {
for (i = 0; i < n_nonnull; i++) {
......@@ -1097,9 +1230,9 @@ row_merge_read_clustered_index(
& DATA_NOT_NULL));
if (dfield_is_null(field)) {
trx->error_key_num = 0;
err = DB_PRIMARY_KEY_IS_NULL;
goto func_exit;
i = 0;
goto err_exit;
}
field->type.prtype |= DATA_NOT_NULL;
......@@ -1113,6 +1246,7 @@ row_merge_read_clustered_index(
for (i = 0; i < n_index; i++) {
row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i];
const dict_index_t* index = buf->index;
if (UNIV_LIKELY
(row && row_merge_buf_add(buf, row, ext))) {
......@@ -1126,21 +1260,32 @@ row_merge_read_clustered_index(
/* We have enough data tuples to form a block.
Sort them and write to disk. */
if (buf->n_tuples
&& row_merge_buf_sort(buf)
&& dict_index_is_unique(buf->index)) {
trx->error_key_num = i;
err = DB_DUPLICATE_KEY;
goto func_exit;
if (buf->n_tuples) {
if (dict_index_is_unique(index)) {
row_merge_dup_t dup = {
buf->index, old_table,
table, 0
};
row_merge_buf_sort(buf, &dup);
if (dup.n_dup) {
err = DB_DUPLICATE_KEY;
err_exit:
trx->error_key_num = i;
goto func_exit;
}
} else {
row_merge_buf_sort(buf, NULL);
}
}
row_merge_buf_write(buf, file, block);
if (!row_merge_write(file->fd, file->offset++,
block)) {
trx->error_key_num = i;
err = DB_OUT_OF_FILE_SPACE;
goto func_exit;
goto err_exit;
}
UNIV_MEM_INVALID(block[0], sizeof block[0]);
......@@ -1191,7 +1336,7 @@ ulint
row_merge_blocks(
/*=============*/
/* out: DB_SUCCESS or error code */
dict_index_t* index, /* in: index being created */
const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
row_merge_block_t* block, /* in/out: 3 buffers */
......@@ -1264,6 +1409,7 @@ corrupt:
if (UNIV_UNLIKELY
(dict_index_is_unique(index))) {
mem_heap_free(heap);
/* TODO: if clustered, convert to MySQL */
return(DB_DUPLICATE_KEY);
}
/* fall through */
......@@ -1308,7 +1454,7 @@ row_merge(
/*======*/
/* out: DB_SUCCESS
or error code */
dict_index_t* index, /* in: index being created */
const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
ulint half, /* in: half the file */
......@@ -1371,7 +1517,7 @@ row_merge_sort(
/*===========*/
/* out: DB_SUCCESS
or error code */
dict_index_t* index, /* in: index being created */
const dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
row_merge_block_t* block, /* in/out: 3 buffers */
......@@ -2131,7 +2277,10 @@ row_merge_build_indexes(
created; identical to old_table
unless creating a PRIMARY KEY */
dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes) /* in: size of indexes[] */
ulint n_indexes, /* in: size of indexes[] */
TABLE* table) /* in/out: MySQL table, for
reporting erroneous key value
if applicable */
{
merge_file_t* merge_files;
row_merge_block_t* block;
......@@ -2166,7 +2315,7 @@ row_merge_build_indexes(
secondary index entries for merge sort */
error = row_merge_read_clustered_index(
trx, old_table, new_table, indexes,
trx, table, old_table, new_table, indexes,
merge_files, n_indexes, block);
if (error != DB_SUCCESS) {
......@@ -2193,6 +2342,7 @@ row_merge_build_indexes(
if (error != DB_SUCCESS) {
trx->error_key_num = i;
innobase_rec_reset(table);
goto func_exit;
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment