diff --git a/mysql-test/r/ndb_cache2.result b/mysql-test/r/ndb_cache2.result
new file mode 100644
index 0000000000000000000000000000000000000000..ce10e9dab00329759f2125b24a2f70e6e03029f0
--- /dev/null
+++ b/mysql-test/r/ndb_cache2.result
@@ -0,0 +1,193 @@
+drop table if exists t1;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=5;
+reset query cache;
+flush status;
+CREATE TABLE t1 ( pk int not null primary key,
+a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
+insert into t1 value (1, 2, 3, 'First row');
+select * from t1;
+pk	a	b	c
+1	2	3	First row
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	1
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+select * from t1;
+pk	a	b	c
+1	2	3	First row
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	1
+update t1 set a=3 where pk=1;
+select * from t1;
+pk	a	b	c
+1	3	3	First row
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	2
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	1
+insert into t1 value (2, 7, 8, 'Second row');
+insert into t1 value (4, 5, 6, 'Fourth row');
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+4	5	6	Fourth row
+1	3	3	First row
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	3
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	1
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+4	5	6	Fourth row
+1	3	3	First row
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	2
+select * from t1 where b=3;
+pk	a	b	c
+1	3	3	First row
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	2
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	2
+select * from t1 where b=3;
+pk	a	b	c
+1	3	3	First row
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	3
+delete from t1 where c='Fourth row';
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	0
+select * from t1 where b=3;
+pk	a	b	c
+1	3	3	First row
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	3
+use test;
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	3	3	First row
+select * from t1 where b=3;
+pk	a	b	c
+1	3	3	First row
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	4
+update t1 set a=4 where b=3;
+use test;
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	0
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	4	3	First row
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	4	3	First row
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	7
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	5
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	4	3	First row
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	4	3	First row
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	7
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	7
+begin;
+update t1 set a=5 where pk=1;
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	0
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	7
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	7
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	4	3	First row
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	8
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	7
+commit;
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	8
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	7
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	5	3	First row
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	9
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	7
+select * from t1;
+pk	a	b	c
+2	7	8	Second row
+1	5	3	First row
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	9
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	8
+drop table t1;
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	0
+SET GLOBAL query_cache_size=0;
+SET GLOBAL ndb_cache_check_time=0;
diff --git a/mysql-test/r/ndb_cache_multi2.result b/mysql-test/r/ndb_cache_multi2.result
new file mode 100644
index 0000000000000000000000000000000000000000..6e435c071b57dd1deac3c916c2351b1be9e1b5e3
--- /dev/null
+++ b/mysql-test/r/ndb_cache_multi2.result
@@ -0,0 +1,74 @@
+drop table if exists t1, t2;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+create table t1 (a int) engine=ndbcluster;
+create table t2 (a int) engine=ndbcluster;
+insert into t1 value (2);
+insert into t2 value (3);
+select * from t1;
+a
+2
+select * from t2;
+a
+3
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	2
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	2
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	0
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	0
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+select * from t1;
+a
+2
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	1
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	1
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+update t1 set a=3 where a=2;
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	2
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	2
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+select * from t1;
+a
+3
+show status like "Qcache_queries_in_cache";
+Variable_name	Value
+Qcache_queries_in_cache	2
+show status like "Qcache_inserts";
+Variable_name	Value
+Qcache_inserts	3
+show status like "Qcache_hits";
+Variable_name	Value
+Qcache_hits	0
+drop table t1, t2;
diff --git a/mysql-test/t/ndb_cache2.test b/mysql-test/t/ndb_cache2.test
new file mode 100644
index 0000000000000000000000000000000000000000..5c1674a702129a31bf9484bb9611752b27be5bdf
--- /dev/null
+++ b/mysql-test/t/ndb_cache2.test
@@ -0,0 +1,126 @@
+-- source include/have_query_cache.inc
+-- source include/have_ndb.inc
+
+--disable_warnings
+drop table if exists t1;
+--enable_warnings
+
+
+# Turn on and reset query cache
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+# Turn on thread that will fetch commit count for open tables
+set GLOBAL ndb_cache_check_time=5;
+reset query cache;
+flush status;
+
+# Wait for thread to wake up and start "working"
+sleep 20; 
+
+# Create test table in NDB
+CREATE TABLE t1 ( pk int not null primary key,
+ a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
+insert into t1 value (1, 2, 3, 'First row');
+
+# Perform one query which should be inerted in query cache
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Perform the same query and make sure the query cache is hit
+select * from t1;
+show status like "Qcache_hits";
+
+# Update the table and make sure the correct data is returned
+update t1 set a=3 where pk=1;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Insert a new record and make sure the correct data is returned
+insert into t1 value (2, 7, 8, 'Second row');
+insert into t1 value (4, 5, 6, 'Fourth row');
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_hits";
+
+# Perform a "new" query and make sure the query cache is not hit
+select * from t1 where b=3;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_hits";
+
+# Same query again...
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Delete from the table
+delete from t1 where c='Fourth row';
+show status like "Qcache_queries_in_cache";
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Start another connection and check that the query cache is hit
+connect (con1,localhost,root,,);
+connection con1;
+use test;
+select * from t1;
+select * from t1 where b=3;
+show status like "Qcache_hits";
+
+# Update the table and switch to other connection 
+update t1 set a=4 where b=3;
+connect (con2,localhost,root,,);
+connection con2;
+use test;
+show status like "Qcache_queries_in_cache";
+select * from t1;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+select * from t1;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+# Use transactions and make sure the query cache is not updated until
+# transaction is commited
+begin;
+update t1 set a=5 where pk=1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con2;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+commit;
+# Sleep to let the query cache thread update commit count
+sleep 10;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con2;
+select * from t1;
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+connection con1;
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+drop table t1;
+
+show status like "Qcache_queries_in_cache";
+
+SET GLOBAL query_cache_size=0;
+SET GLOBAL ndb_cache_check_time=0;
+
+
diff --git a/mysql-test/t/ndb_cache_multi2.test b/mysql-test/t/ndb_cache_multi2.test
new file mode 100644
index 0000000000000000000000000000000000000000..a9d008dba7c640b9ccfdc24d845281421a9dcb6f
--- /dev/null
+++ b/mysql-test/t/ndb_cache_multi2.test
@@ -0,0 +1,71 @@
+-- source include/have_query_cache.inc
+-- source include/have_ndb.inc
+-- source include/have_multi_ndb.inc
+
+--disable_warnings
+drop table if exists t1, t2;
+--enable_warnings
+
+
+# Turn on and reset query cache on server1
+connection server1;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+
+# Turn on and reset query cache on server2
+connection server2;
+set GLOBAL query_cache_type=on;
+set GLOBAL query_cache_size=1355776;
+set GLOBAL ndb_cache_check_time=1;
+reset query cache;
+flush status;
+
+# Sleep so that the query cache check thread has time to start
+sleep 15;
+
+
+# Create test tables in NDB and load them into cache
+# on server1
+connection server1;
+create table t1 (a int) engine=ndbcluster;
+create table t2 (a int) engine=ndbcluster;
+insert into t1 value (2);
+insert into t2 value (3);
+select * from t1;
+select * from t2;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+
+# Connect server2, load table in to cache, then update the table
+connection server2;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+update t1 set a=3 where a=2;
+
+# Sleep so that the query cache check thread has time to run
+sleep 5;
+
+# Connect to server1 and check that cache is invalidated 
+# and correct data is returned
+connection server1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+select * from t1;
+show status like "Qcache_queries_in_cache";
+show status like "Qcache_inserts";
+show status like "Qcache_hits";
+
+drop table t1, t2;
+
+
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 0d83955a335016740d4c99e554f76415105f5e35..4f6e243db93cbabeabd925452e54df180a2a8e2e 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -86,6 +86,12 @@ static int unpackfrm(const void **data, uint *len,
 static int ndb_get_table_statistics(Ndb*, const char *, 
 				    Uint64* rows, Uint64* commits);
 
+// Util thread variables
+static pthread_t ndb_util_thread;
+pthread_mutex_t LOCK_ndb_util_thread;
+pthread_cond_t COND_ndb_util_thread;
+extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
+ulong ndb_cache_check_time;
 
 /*
   Dummy buffer to read zero pack_length fields
@@ -3865,6 +3871,7 @@ ha_ndbcluster::~ha_ndbcluster()
 }
 
 
+
 /*
   Open a table for further use
   - fetch metadata for this table from NDB
@@ -3963,16 +3970,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
 
 Ndb* check_ndb_in_thd(THD* thd)
 {
-  DBUG_ENTER("check_ndb_in_thd");
-  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
-  
+  Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;  
   if (!thd_ndb)
   {
     if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
-      DBUG_RETURN(NULL);
+      return NULL;
     thd->transaction.thd_ndb= thd_ndb;
   }
-  DBUG_RETURN(thd_ndb->ndb);
+  return thd_ndb->ndb;
 }
 
 
@@ -4310,13 +4315,21 @@ bool ndbcluster_init()
   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                    (hash_get_key) ndbcluster_get_key,0,0);
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&LOCK_ndb_util_thread,MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_ndb_util_thread,NULL);
+
 
+  // Create utility thread
+  pthread_t tmp;
+  if (pthread_create(&tmp,&connection_attrib,ndb_util_thread_func,0))
+  {
+    DBUG_PRINT("error", ("Could not create ndb utility thread"));
+    goto ndbcluster_init_error;
+  }
+  
   ndbcluster_inited= 1;
-#ifdef USE_DISCOVER_ON_STARTUP
-  if (ndb_discover_tables() != 0)
-    goto ndbcluster_init_error;    
-#endif
   DBUG_RETURN(FALSE);
+
  ndbcluster_init_error:
   ndbcluster_end();
   DBUG_RETURN(TRUE);
@@ -4326,12 +4339,19 @@ bool ndbcluster_init()
 /*
   End use of the NDB Cluster table handler
   - free all global variables allocated by 
-    ndcluster_init()
+    ndbcluster_init()
 */
 
 bool ndbcluster_end()
 {
   DBUG_ENTER("ndbcluster_end");
+
+  // Kill ndb utility thread
+  (void) pthread_mutex_lock(&LOCK_ndb_util_thread);  
+  DBUG_PRINT("exit",("killing ndb util thread: %lx",ndb_util_thread));
+  (void) pthread_cond_signal(&COND_ndb_util_thread);
+  (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
   if(g_ndb)
     delete g_ndb;
   g_ndb= NULL;
@@ -4342,6 +4362,8 @@ bool ndbcluster_end()
     DBUG_RETURN(0);
   hash_free(&ndbcluster_open_tables);
   pthread_mutex_destroy(&ndbcluster_mutex);
+  pthread_mutex_destroy(&LOCK_ndb_util_thread);
+  pthread_cond_destroy(&COND_ndb_util_thread);
   ndbcluster_inited= 0;
   DBUG_RETURN(0);
 }
@@ -4534,12 +4556,53 @@ const char* ha_ndbcluster::index_type(uint key_number)
     return "HASH";
   }
 }
+
 uint8 ha_ndbcluster::table_cache_type()
 {
   DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
   DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
 }
 
+
+uint ndb_get_commitcount(THD* thd, char* dbname, char* tabname, 
+			 Uint64* commit_count)
+{
+  DBUG_ENTER("ndb_get_commitcount");
+ 
+  if (ndb_cache_check_time > 0)
+  {
+    // Use cached commit_count from share
+    char name[FN_REFLEN];
+    NDB_SHARE* share;
+    (void)strxnmov(name, FN_REFLEN, 
+		   "./",dbname,"/",tabname,NullS);
+    DBUG_PRINT("info", ("name: %s", name));
+    pthread_mutex_lock(&ndbcluster_mutex);
+    if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+				   (byte*) name,
+				   strlen(name))))
+    {
+      pthread_mutex_unlock(&ndbcluster_mutex);
+      DBUG_RETURN(1);
+    }
+    *commit_count= share->commit_count;    
+    DBUG_PRINT("info", ("commit_count: %d", *commit_count));
+    pthread_mutex_unlock(&ndbcluster_mutex);
+    DBUG_RETURN(0);
+  }
+  
+  // Get commit_count from NDB
+  Ndb *ndb;
+  if (!(ndb= check_ndb_in_thd(thd)))
+    DBUG_RETURN(1);
+  ndb->setDatabaseName(dbname);
+  
+  if (ndb_get_table_statistics(ndb, tabname, 0, commit_count))
+    DBUG_RETURN(1);
+  DBUG_RETURN(0);
+}
+
+
 static
 my_bool
 ndbcluster_cache_retrieval_allowed(
@@ -4561,51 +4624,33 @@ ndbcluster_cache_retrieval_allowed(
 				all cached queries with this table*/
 {
   DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
-  char tabname[128];
-  char *dbname= full_name;
-  my_bool is_autocommit;
-  {
-    int dbname_len= strlen(full_name);
-    int tabname_len= full_name_len-dbname_len-1;
-    memcpy(tabname, full_name+dbname_len+1, tabname_len);
-    tabname[tabname_len]= '\0';
-  }
-  if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
-    is_autocommit = FALSE;
-  else
-    is_autocommit = TRUE;
+
+  Uint64 commit_count;
+  bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
+  char* dbname= full_name;
+  char* tabname= dbname+strlen(dbname)+1;
+
   DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
-		      dbname,tabname,is_autocommit));
+		      dbname, tabname, is_autocommit));
+
   if (!is_autocommit)
+    DBUG_RETURN(FALSE);
+
+  if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
   {
-    DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
-		       thd->options & OPTION_NOT_AUTOCOMMIT,
-		       thd->options & OPTION_BEGIN));
-    // ToDo enable cache inside a transaction
-    // no need to invalidate though so leave *engine_data
+    *engine_data= *engine_data+1; // invalidate
     DBUG_RETURN(FALSE);
   }
+  DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu", 
+		      *engine_data, commit_count));
+  if (*engine_data != commit_count)
   {
-    Ndb *ndb;
-    Uint64 commit_count;
-    if (!(ndb= check_ndb_in_thd(thd)))
-    {
-      *engine_data= *engine_data+1; // invalidate
-      DBUG_RETURN(FALSE);
-    }
-    ndb->setDatabaseName(dbname);
-    if (ndb_get_table_statistics(ndb, tabname, 0, &commit_count))
-    {
-      *engine_data= *engine_data+1; // invalidate
-      DBUG_RETURN(FALSE);
-    }
-    if (*engine_data != commit_count)
-    {
-      *engine_data= commit_count; // invalidate
-      DBUG_RETURN(FALSE);
-    }
+    *engine_data= commit_count; // invalidate
+    DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
+    DBUG_RETURN(FALSE);
   }
-  DBUG_PRINT("exit",("*engine_data=%d ok, use cache",*engine_data));
+
+  DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
   DBUG_RETURN(TRUE);
 }
 
@@ -4630,35 +4675,24 @@ ha_ndbcluster::cached_table_registration(
 				   invalidate all cached queries with this table*/
 {
   DBUG_ENTER("ha_ndbcluster::cached_table_registration");
-  my_bool is_autocommit;
-  if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
-    is_autocommit = FALSE;
-  else
-    is_autocommit = TRUE;
+
+  bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
   DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
 		      m_dbname,m_tabname,is_autocommit));
   if (!is_autocommit)
-  {
-    DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
-		       thd->options & OPTION_NOT_AUTOCOMMIT,
-		       thd->options & OPTION_BEGIN));
-    // ToDo enable cache inside a transaction
-    // no need to invalidate though so leave *engine_data
     DBUG_RETURN(FALSE);
-  }
+
+  
+  Uint64 commit_count;
+  if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
   {
-    Uint64 commit_count;
-    Ndb *ndb= get_ndb();
-    ndb->setDatabaseName(m_dbname);
-    if (ndb_get_table_statistics(ndb, m_tabname, 0, &commit_count))
-    {
-      *engine_data= 0;
-      DBUG_RETURN(FALSE);
-    }
-    *engine_data= commit_count;
+    *engine_data= 0;
+    DBUG_PRINT("error", ("Could not get commitcount"))
+    DBUG_RETURN(FALSE);
   }
+  *engine_data= commit_count;
   *engine_callback= ndbcluster_cache_retrieval_allowed;
-  DBUG_PRINT("exit",("*engine_data=%d", *engine_data));
+  DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
   DBUG_RETURN(TRUE);
 }
 
@@ -4700,8 +4734,14 @@ static NDB_SHARE* get_share(const char *table_name)
       }
       thr_lock_init(&share->lock);
       pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+      share->commit_count= 0;
     }
   }
+  DBUG_PRINT("share", 
+	     ("table_name: %s, length: %d, use_count: %d, commit_count: %d", 
+	      share->table_name, share->table_name_length, share->use_count, 
+	      share->commit_count));
+
   share->use_count++;
   pthread_mutex_unlock(&ndbcluster_mutex);
   return share;
@@ -4868,10 +4908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
 
     ndb->closeTransaction(pTrans);
     if(row_count)
-      * row_count= sum_rows;
+      *row_count= sum_rows;
     if(commit_count)
-      * commit_count= sum_commits;
-    DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
+      *commit_count= sum_commits;
+    DBUG_PRINT("exit", ("records: %llu commits: %llu", sum_rows, sum_commits));
     DBUG_RETURN(0);
   } while(0);
 
@@ -4906,4 +4946,124 @@ int ha_ndbcluster::write_ndb_file()
   DBUG_RETURN(error);
 }
 
+
+// Utility thread main loop
+extern "C" pthread_handler_decl(ndb_util_thread_func,arg __attribute__((unused)))
+{
+  THD *thd; // needs to be first for thread_stack
+  int error = 0;
+  struct timespec abstime;
+
+  my_thread_init();
+  DBUG_ENTER("ndb_util_thread");
+  DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
+
+  thd= new THD; // note that contructor of THD uses DBUG_ !
+  THD_CHECK_SENTRY(thd);
+
+  pthread_detach_this_thread();
+  ndb_util_thread = pthread_self();
+
+  thd->thread_stack = (char*)&thd; // remember where our stack is
+  if (thd->store_globals())
+  {
+    thd->cleanup();
+    delete thd;
+    DBUG_RETURN(NULL);
+  }
+
+  List<NDB_SHARE> util_open_tables;
+  set_timespec(abstime, ndb_cache_check_time);
+  for (;;)
+  {
+
+    pthread_mutex_lock(&LOCK_ndb_util_thread);
+    error= pthread_cond_timedwait(&COND_ndb_util_thread, 
+			   &LOCK_ndb_util_thread, 
+			   &abstime);
+    pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+    DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d", 
+				   ndb_cache_check_time));
+    
+    if (abort_loop)
+      break; // Shutting down server
+    
+    if (ndb_cache_check_time == 0)
+    {
+      set_timespec(abstime, 10);
+      continue;
+    }
+
+    // Set new time to wake up 
+    set_timespec(abstime, ndb_cache_check_time);
+
+    // Lock mutex and fill list with pointers to all open tables
+    NDB_SHARE *share;
+    pthread_mutex_lock(&ndbcluster_mutex);
+    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+    {
+      share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+      share->use_count++; // Make sure the table can't be closed
+      
+      DBUG_PRINT("ndb_util_thread", 
+		 ("Found open table[%d]: %s, use_count: %d", 
+		  i, share->table_name, share->use_count));
+      
+      // Store pointer to table
+      util_open_tables.push_back(share);
+    }
+    pthread_mutex_unlock(&ndbcluster_mutex);
+    
+    
+    // Iterate through the  open files list 
+    List_iterator_fast<NDB_SHARE> it(util_open_tables);
+    while (share=it++)
+    {  
+      // Split tab- and dbname
+      char buf[FN_REFLEN];
+      char *tabname, *db;
+      uint length= dirname_length(share->table_name);
+      tabname= share->table_name+length;
+      memcpy(buf, share->table_name, length-1);
+      buf[length-1]= 0;
+      db= buf+dirname_length(buf);
+      DBUG_PRINT("ndb_util_thread", 
+		 ("Fetching commit count for: %s, db: %s, tab: %s", 
+		  share->table_name, db, tabname));
+      
+      // Contact NDB to get commit count for table
+      g_ndb->setDatabaseName(db);
+      Uint64 rows, commit_count;
+      if(ndb_get_table_statistics(g_ndb, tabname, 
+				  &rows, &commit_count) == 0){
+	DBUG_PRINT("ndb_util_thread", 
+		   ("Table: %s, rows: %llu, commit_count: %llu", 
+		    share->table_name, rows, commit_count));
+	share->commit_count= commit_count;
+      }
+      else
+      {
+	DBUG_PRINT("ndb_util_thread", 
+		   ("Error: Could not get commit count for table %s",
+		    share->table_name));
+	share->commit_count++; // Invalidate
+      }
+      // Decrease the use count and possibly free share
+      free_share(share);
+    }
+    
+    // Clear the list of open tables
+    util_open_tables.empty();  
+    
+  }
+  
+  thd->cleanup();
+  delete thd;
+  DBUG_PRINT("exit", ("ndb_util_thread"));
+  my_thread_end();
+  DBUG_RETURN(NULL);
+}
+
+
 #endif /* HAVE_NDBCLUSTER_DB */
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index b5cf727ead75727ddf30b69bf0bc7955fdac2d34..df88afa678ab3cba5cf74f35941b013ae0b42970 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -38,6 +38,7 @@ class NdbBlob;
 
 // connectstring to cluster if given by mysqld
 extern const char *ndbcluster_connectstring;
+extern ulong ndb_cache_check_time;
 
 typedef enum ndb_index_type {
   UNDEFINED_INDEX = 0,
@@ -59,6 +60,7 @@ typedef struct st_ndbcluster_share {
   pthread_mutex_t mutex;
   char *table_name;
   uint table_name_length,use_count;
+  uint commit_count;
 } NDB_SHARE;
 
 /*
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d1fef3519bff6a3e1ac4c1c6f37e0ea0e73dfcea..671f38898c1f65b0df7a2c43e3c8cbce6afc6e27 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -284,6 +284,7 @@ my_bool	opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
 #ifdef HAVE_NDBCLUSTER_DB
 const char *opt_ndbcluster_connectstring= 0;
 my_bool	opt_ndb_shm, opt_ndb_optimized_node_selection;
+ulong opt_ndb_cache_check_time= 0;
 #endif
 my_bool opt_readonly, use_temp_pool, relay_log_purge;
 my_bool opt_sync_bdb_logs, opt_sync_frm;
@@ -4016,7 +4017,7 @@ enum options_mysqld
   OPT_INNODB, OPT_ISAM,
   OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
   OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
-  OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION,
+  OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION, OPT_NDB_CACHE_CHECK_TIME,
   OPT_SKIP_SAFEMALLOC,
   OPT_TEMP_POOL, OPT_TX_ISOLATION,
   OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -4498,6 +4499,10 @@ Disable with --skip-ndbcluster (will save memory).",
    (gptr*) &opt_ndb_optimized_node_selection,
    (gptr*) &opt_ndb_optimized_node_selection,
    0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+  { "ndb_cache_check_time", OPT_NDB_CACHE_CHECK_TIME,
+    "A dedicated thread is created to update cached commit count value at the given interval.",
+    (gptr*) &opt_ndb_cache_check_time, (gptr*) &opt_ndb_cache_check_time, 0, GET_ULONG, REQUIRED_ARG,
+    0, 0, LONG_TIMEOUT, 0, 1, 0},
 #endif
   {"new", 'n', "Use very new possible 'unsafe' functions.",
    (gptr*) &global_system_variables.new_mode,
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 082c55db1884b4cf80240e624a005664b51a3b77..58c30c8e9bc71138017357f4bdf67ce703d83c9b 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -370,6 +370,7 @@ sys_var_thd_bool
 sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count);
 sys_var_thd_bool
 sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions);
+sys_var_long_ptr sys_ndb_cache_check_time("ndb_cache_check_time", &ndb_cache_check_time);
 #endif
 
 /* Time/date/datetime formats */
@@ -630,6 +631,7 @@ sys_var *sys_variables[]=
   &sys_ndb_force_send,
   &sys_ndb_use_exact_count,
   &sys_ndb_use_transactions,
+  &sys_ndb_cache_check_time,
 #endif
   &sys_unique_checks,
   &sys_warning_count
@@ -797,6 +799,7 @@ struct show_var_st init_vars[]= {
   {sys_ndb_force_send.name,   (char*) &sys_ndb_force_send,          SHOW_SYS},
   {sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count,   SHOW_SYS},
   {sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS},
+  {sys_ndb_cache_check_time.name,(char*) &sys_ndb_cache_check_time, SHOW_SYS},
 #endif
   {sys_net_buffer_length.name,(char*) &sys_net_buffer_length,       SHOW_SYS},
   {sys_net_read_timeout.name, (char*) &sys_net_read_timeout,        SHOW_SYS},