From a0e9082675415cb004352df47495583c19320155 Mon Sep 17 00:00:00 2001
From: Zardosht Kasheff <zardosht@tokutek.com>
Date: Wed, 17 Apr 2013 00:01:42 -0400
Subject: [PATCH] addresses 922 when locking table, grab table locks

git-svn-id: file:///svn/mysql/tokudb-engine/src@4764 c7de825b-a66e-492c-adef-691d508d4ae1
---
 storage/tokudb/ha_tokudb.cc | 98 ++++++++++++++++++++++++++++++++++---
 storage/tokudb/ha_tokudb.h  |  6 +++
 2 files changed, 98 insertions(+), 6 deletions(-)

diff --git a/storage/tokudb/ha_tokudb.cc b/storage/tokudb/ha_tokudb.cc
index 7dae6e855e..46459ba166 100644
--- a/storage/tokudb/ha_tokudb.cc
+++ b/storage/tokudb/ha_tokudb.cc
@@ -942,6 +942,12 @@ int primary_key_part_compare (const void* left, const void* right) {
     return left_part->offset - right_part->offset;
 }
 
+//
+// macro that modifies read flag for cursor operations depending on whether
+// we have preacquired lock or not
+//
+#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? (flg | DB_PRELOCKED) : flg)
+
 //
 // Open a secondary table, the key will be a secondary index, the data will be a primary key
 //
@@ -2567,7 +2573,7 @@ int ha_tokudb::index_next(uchar * buf) {
     TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
     int error; 
     DBT row;
-    u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT;
+    u_int32_t flags = SET_READ_FLAG(DB_NEXT);
     CHECK_VALID_CURSOR();
     
     statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
@@ -2601,11 +2607,11 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
         !(table->key_info[active_index].flags & HA_NOSAME) && 
         !(table->key_info[active_index].flags & HA_END_SPACE_KEY)) {
 
-        u_int32_t flags = range_lock_grabbed ? (DB_NEXT_DUP | DB_PRELOCKED) : DB_NEXT_DUP;
+        u_int32_t flags = SET_READ_FLAG(DB_NEXT_DUP);
         error = cursor->c_get(cursor, &last_key, &row, flags);
         error = read_row(error, buf, active_index, &row, &last_key, 1);
     } else {
-        u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT;
+        u_int32_t flags = SET_READ_FLAG(DB_NEXT);
         error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
         if (!error &&::key_cmp_if_same(table, key, active_index, keylen))
             error = HA_ERR_END_OF_FILE;
@@ -2626,7 +2632,7 @@ cleanup:
 int ha_tokudb::index_prev(uchar * buf) {
     TOKUDB_DBUG_ENTER("ha_tokudb::index_prev");
     int error;
-    u_int32_t flags = range_lock_grabbed ? (DB_PREV | DB_PRELOCKED) : DB_PREV;
+    u_int32_t flags = SET_READ_FLAG(DB_PREV);
     CHECK_VALID_CURSOR();
     DBT row;
     statistic_increment(table->in_use->status_var.ha_read_prev_count, &LOCK_status);
@@ -2733,7 +2739,7 @@ int ha_tokudb::rnd_next(uchar * buf) {
     TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
     int error;
     DBT row;    
-    u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT;
+    u_int32_t flags = SET_READ_FLAG(DB_NEXT);
 
     CHECK_VALID_CURSOR()
     //
@@ -2977,6 +2983,51 @@ int ha_tokudb::reset(void) {
     TOKUDB_DBUG_RETURN(0);
 }
 
+
+//
+// helper function that iterates through all DB's 
+// and grabs a lock (either read or write, but not both)
+// Parameters:
+//      [in]    trans - transaction to be used to pre acquire the lock
+//              lt - type of lock to get, either lock_read or lock_write
+//  Returns:
+//      0 on success
+//      error otherwise
+//
+int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
+    int error = ENOSYS;
+    uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
+    if (lt == lock_read) {
+        for (uint i = 0; i < curr_num_DBs; i++) {
+            DB* db = share->key_file[i];
+            error = db->pre_acquire_read_lock(
+                db, 
+                trans, 
+                db->dbt_neg_infty(), db->dbt_neg_infty(), 
+                db->dbt_pos_infty(), db->dbt_pos_infty()
+                );
+            if (error) { goto cleanup; }
+        }
+    }
+    else if (lt == lock_write) {
+        for (uint i = 0; i < curr_num_DBs; i++) {
+            DB* db = share->key_file[i];
+            error = db->pre_acquire_table_lock(db, trans);
+            if (error) { goto cleanup; }
+        }
+    }
+    else {
+        error = ENOSYS;
+        goto cleanup;
+    }
+
+    error = 0;
+cleanup:
+    return error;
+}
+
+
+
 /*
   As MySQL will execute an external lock for every new table it uses
   we can use this to start the transactions.
@@ -3033,7 +3084,22 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
                 trx->sp_level = trx->all;
                 trans_register_ha(thd, TRUE, tokudb_hton);
                 if (thd->in_lock_tables) {
-                    error = 0;          // Don't create stmt trans
+                    //
+                    // grab table locks
+                    // For the command "Lock tables foo read, bar read"
+                    // This statement is grabbing the locks for the table
+                    // foo. The locks for bar will be grabbed when 
+                    // trx->tokudb_lock_count has been initialized
+                    //
+                    assert(lock.type == TL_WRITE || lock.type == TL_READ_NO_INSERT);
+                    if (lock.type == TL_READ_NO_INSERT) {
+                        error = acquire_table_lock(trx->all,lock_read);
+                    }
+                    else if (lock.type == TL_WRITE) {
+                        error = acquire_table_lock(trx->all,lock_write);
+                    }
+                    // Don't create stmt trans
+                    if (error) {trx->tokudb_lock_count--;}
                     goto cleanup;
                 }
             }
@@ -3053,6 +3119,26 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
             }
             trans_register_ha(thd, FALSE, tokudb_hton);
         }
+        else {
+            if (thd->in_lock_tables) {
+                assert(trx->all != NULL);
+                assert(lock.type == TL_WRITE || lock.type == TL_READ_NO_INSERT);
+                //
+                // For the command "Lock tables foo read, bar read"
+                // This statement is grabbing the locks for the table
+                // bar. The locks for foo will be grabbed when 
+                // trx->tokudb_lock_count is 0 and we are initializing
+                // trx->all above
+                //
+                if (lock.type == TL_READ_NO_INSERT) {
+                    error = acquire_table_lock(trx->all,lock_read);
+                }
+                else if (lock.type == TL_WRITE) {
+                    error = acquire_table_lock(trx->all,lock_write);
+                }
+                if (error) {trx->tokudb_lock_count--; goto cleanup;}
+            }
+        }
         transaction = trx->stmt;
     }
     else {
diff --git a/storage/tokudb/ha_tokudb.h b/storage/tokudb/ha_tokudb.h
index 4ce27e9f4f..3d9ce3b4e5 100644
--- a/storage/tokudb/ha_tokudb.h
+++ b/storage/tokudb/ha_tokudb.h
@@ -37,6 +37,11 @@ typedef struct st_prim_key_part_info {
     uint part_index;
 } PRIM_KEY_PART_INFO;
 
+typedef enum {
+    lock_read = 0,
+    lock_write
+} TABLE_LOCK_TYPE;
+
 class ha_tokudb : public handler {
 private:
     THR_LOCK_DATA lock;         ///< MySQL lock
@@ -153,6 +158,7 @@ private:
     DBT *get_pos(DBT * to, uchar * pos);
  
     int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
+    int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
  
 public:
     ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
-- 
2.30.9