Commit a05c36c2 authored by marko's avatar marko

branches/zip: fast index creation: Check the status of os_file_read()

and os_file_write(), and return DB_CORRUPTION when they fail.  Minor cleanup.

row_mege_insert_index_tuples(): Use a shared heap for dtuple and rec.
Use fewer gotos.

row_merge_sort_linked_list_in_disk(): Free the merge blocks at error exit.
parent 4e14254e
......@@ -1198,7 +1198,11 @@ row_merge_sort_linked_list_in_disk(
block1 = backup1;
row_merge_block_read(file, list_head, block1);
if (!row_merge_block_read(file, list_head, block1)) {
file_error:
*error = DB_CORRUPTION;
goto err_exit;
}
ut_ad(row_merge_block_validate(block1, index));
for (;;) {
......@@ -1217,8 +1221,10 @@ row_merge_sort_linked_list_in_disk(
/* Here read only the header to iterate the
list in the disk. */
row_merge_block_header_read(file, offset,
&header);
if (!row_merge_block_header_read(file, offset,
&header)) {
goto file_error;
}
offset = header.next;
......@@ -1237,7 +1243,10 @@ row_merge_sort_linked_list_in_disk(
block2 = NULL;
} else {
block2 = backup2;
row_merge_block_read(file, offset, block2);
if (!row_merge_block_read(
file, offset, block2)) {
goto file_error;
}
ut_ad(row_merge_block_validate(block2, index));
}
......@@ -1276,7 +1285,8 @@ row_merge_sort_linked_list_in_disk(
block1, &block2, index);
if (tmp == NULL) {
goto error_handling;
*error = DB_DUPLICATE_KEY;
goto err_exit;
}
block1 = backup1 = tmp;
......@@ -1297,17 +1307,19 @@ row_merge_sort_linked_list_in_disk(
ut_ad(row_merge_block_validate(tmp, index));
row_merge_block_write(
file, tmp->header.offset, tmp);
if (!row_merge_block_write(
file, tmp->header.offset, tmp)) {
goto file_error;
}
/* Now we can read the next record from the
selected list if it contains more records */
if (tmp->header.next) {
row_merge_block_read(file,
if (tmp->header.next
&& !row_merge_block_read(file,
tmp->header.next,
tmp);
tmp)) {
goto file_error;
}
}
......@@ -1335,12 +1347,9 @@ row_merge_sort_linked_list_in_disk(
}
}
error_handling:
/* In the sort phase we can have duplicate key error, inform this to
upper layer */
*error = DB_DUPLICATE_KEY;
err_exit:
mem_free(backup1);
mem_free(backup2);
return(ULINT_UNDEFINED);
}
......@@ -1348,10 +1357,10 @@ error_handling:
Merge sort linked list in the memory and store part of the linked
list into a block and write this block to the disk. */
static
ulint
ibool
row_merge_sort_and_store(
/*=====================*/
/* out: 1 or 0 in case of error */
/* out: FALSE on error */
dict_index_t* index, /* in: Index */
merge_file_t* file, /* in: File where to write index
entries */
......@@ -1363,7 +1372,7 @@ row_merge_sort_and_store(
/* Firstly, merge sort linked list in the memory */
if (!row_merge_sort_linked_list(index, *list)) {
return(0);
return(FALSE);
}
/* Secondly, write part of the linked list to the block */
......@@ -1378,9 +1387,7 @@ row_merge_sort_and_store(
block->header.next = ++file->offset;
/* Thirdly, write block to the disk */
row_merge_block_write(file->file, block->header.offset, block);
return(1);
return(row_merge_block_write(file->file, block->header.offset, block));
}
#ifdef UNIV_DEBUG_INDEX_CREATE
......@@ -1634,8 +1641,11 @@ next_record:
/* Write the last block. */
block->header.next = 0; /* end-of-list marker */
row_merge_block_header_write(
files[idx_num].file, &block->header);
if (!row_merge_block_header_write(
files[idx_num].file, &block->header)) {
err = DB_CORRUPTION;
goto error_handling;
}
}
#ifdef UNIV_DEBUG_INDEX_CREATE
......@@ -1680,11 +1690,9 @@ row_merge_insert_index_tuples(
merge_block_t* block;
que_thr_t* thr;
ins_node_t* node;
mem_heap_t* rec_heap;
mem_heap_t* dtuple_heap;
mem_heap_t* heap;
mem_heap_t* graph_heap;
ulint error = DB_SUCCESS;
ibool was_lock_wait = FALSE;
ut_ad(trx && index && table);
......@@ -1693,7 +1701,7 @@ row_merge_insert_index_tuples(
trx->op_info = "inserting index entries";
graph_heap = mem_heap_create(512);
graph_heap = mem_heap_create(500);
node = ins_node_create(INS_DIRECT, table, graph_heap);
thr = pars_complete_graph_for_exec(node, trx, graph_heap);
......@@ -1701,30 +1709,28 @@ row_merge_insert_index_tuples(
que_thr_move_to_run_state_for_mysql(thr, trx);
block = row_merge_block_create();
rec_heap = mem_heap_create(128);
dtuple_heap = mem_heap_create(256);
heap = mem_heap_create(1000);
do {
ulint n_rec;
ulint tuple_offset;
ulint tuple_offset = 0;
row_merge_block_read(file, offset, block);
if (!row_merge_block_read(file, offset, block)) {
error = DB_CORRUPTION;
break;
}
ut_ad(row_merge_block_validate(block, index));
tuple_offset = 0;
for (n_rec = 0; n_rec < block->header.n_records; n_rec++) {
merge_rec_t* mrec;
dtuple_t* dtuple;
mrec = row_merge_read_rec_from_block(
block, &tuple_offset, rec_heap,index);
merge_rec_t* mrec = row_merge_read_rec_from_block(
block, &tuple_offset, heap, index);
if (!rec_get_deleted_flag(mrec->rec, 0)) {
dtuple = row_rec_to_index_entry(
dtuple_t* dtuple = row_rec_to_index_entry(
ROW_COPY_POINTERS,
index, mrec->rec, dtuple_heap);
index, mrec->rec, heap);
node->row = dtuple;
node->table = table;
......@@ -1736,20 +1742,29 @@ row_merge_insert_index_tuples(
row_merge_dtuple_print(stderr, dtuple);
#endif
run_again:
do {
thr->run_node = thr;
thr->prev_node = thr->common.parent;
error = row_ins_index_entry(
index, dtuple, NULL, 0, thr);
mem_heap_empty(rec_heap);
mem_heap_empty(dtuple_heap);
if (error != DB_SUCCESS) {
goto error_handling;
if (error == DB_SUCCESS) {
goto next_rec;
}
thr->lock_state = QUE_THR_LOCK_ROW;
trx->error_state = error;
que_thr_stop_for_mysql(thr);
thr->lock_state = QUE_THR_LOCK_NOLOCK;
} while (row_mysql_handle_errors(&error, trx,
thr, NULL));
goto err_exit;
}
next_rec:
mem_heap_empty(heap);
}
offset = block->header.next;
......@@ -1759,39 +1774,13 @@ run_again:
} while (offset);
que_thr_stop_for_mysql_no_error(thr, trx);
que_graph_free(thr->graph);
trx->op_info="";
mem_free(block);
mem_heap_free(rec_heap);
mem_heap_free(dtuple_heap);
return(error);
error_handling:
thr->lock_state = QUE_THR_LOCK_ROW;
trx->error_state = error;
que_thr_stop_for_mysql(thr);
thr->lock_state = QUE_THR_LOCK_NOLOCK;
was_lock_wait = row_mysql_handle_errors(&error, trx, thr, NULL);
if (was_lock_wait) {
goto run_again;
}
err_exit:
que_graph_free(thr->graph);
trx->op_info = "";
if (block) {
mem_free(block);
}
mem_heap_free(rec_heap);
mem_heap_free(dtuple_heap);
mem_heap_free(heap);
return(error);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment