diff --git a/mysql-test/r/ndb_database.result b/mysql-test/r/ndb_database.result
new file mode 100644
index 0000000000000000000000000000000000000000..566a3eaf3dd9df1354cde5aaf18881bfaa4cb827
--- /dev/null
+++ b/mysql-test/r/ndb_database.result
@@ -0,0 +1,27 @@
+drop table if exists t1;
+drop database if exists mysqltest;
+drop table if exists t1;
+drop database if exists mysqltest;
+create database mysqltest;
+create database mysqltest;
+create table mysqltest.t1 (a int primary key, b int) engine=ndb;
+use mysqltest;
+show tables;
+Tables_in_mysqltest
+t1
+drop database mysqltest;
+use mysqltest;
+show tables;
+Tables_in_mysqltest
+create database mysqltest;
+create table mysqltest.t1 (c int, d int primary key) engine=ndb;
+use mysqltest;
+show tables;
+Tables_in_mysqltest
+t1
+drop database mysqltest;
+use mysqltest;
+show tables;
+Tables_in_mysqltest
+drop table if exists t1;
+drop database if exists mysqltest;
diff --git a/mysql-test/t/ndb_database.test b/mysql-test/t/ndb_database.test
new file mode 100644
index 0000000000000000000000000000000000000000..1264c3fa73bc05ce6f2bd634ec315568acdfd11d
--- /dev/null
+++ b/mysql-test/t/ndb_database.test
@@ -0,0 +1,50 @@
+-- source include/have_ndb.inc
+-- source include/have_multi_ndb.inc
+-- source include/not_embedded.inc
+
+--disable_warnings
+connection server1;
+drop table if exists t1;
+drop database if exists mysqltest;
+connection server2;
+drop table if exists t1;
+drop database if exists mysqltest;
+--enable_warnings
+
+#
+# Check that all tables in a database are dropped when database is dropped
+#
+
+connection server1;
+create database mysqltest;
+
+connection server2;
+create database mysqltest;
+create table mysqltest.t1 (a int primary key, b int) engine=ndb;
+use mysqltest;
+show tables;
+
+connection server1;
+drop database mysqltest;
+
+connection server2;
+use mysqltest;
+show tables;
+
+connection server1;
+create database mysqltest;
+create table mysqltest.t1 (c int, d int primary key) engine=ndb;
+use mysqltest;
+show tables;
+
+connection server2;
+drop database mysqltest;
+
+connection server1;
+use mysqltest;
+show tables;
+
+--disable_warnings
+drop table if exists t1;
+drop database if exists mysqltest;
+--enable_warnings
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 9ca4e1de106aa7f6c602f99c0505ffa84ed9a062..ffa4ac55462e8f1bb1b79b29180c388c9bfc6b8c 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -4111,18 +4111,6 @@ int ha_ndbcluster::drop_table()
 }
 
 
-/*
-  Drop a database in NDB Cluster
- */
-
-int ndbcluster_drop_database(const char *path)
-{
-  DBUG_ENTER("ndbcluster_drop_database");
-  // TODO drop all tables for this database
-  DBUG_RETURN(1);
-}
-
-
 ulonglong ha_ndbcluster::get_auto_increment()
 {  
   int cache_size;
@@ -4477,6 +4465,53 @@ extern "C" byte* tables_get_key(const char *entry, uint *length,
 }
 
 
+/*
+  Drop a database in NDB Cluster
+ */
+
+int ndbcluster_drop_database(const char *path)
+{
+  DBUG_ENTER("ndbcluster_drop_database");
+  THD *thd= current_thd;
+  char dbname[FN_HEADLEN];
+  Ndb* ndb;
+  NdbDictionary::Dictionary::List list;
+  uint i;
+  char *tabname;
+  List<char> drop_list;
+  ha_ndbcluster::set_dbname(path, (char *)&dbname);
+  DBUG_PRINT("enter", ("db: %s", dbname));
+  
+  if (!(ndb= check_ndb_in_thd(thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+  
+  // List tables in NDB
+  NDBDICT *dict= ndb->getDictionary();
+  if (dict->listObjects(list, 
+                        NdbDictionary::Object::UserTable) != 0)
+    ERR_RETURN(dict->getNdbError());
+  for (i= 0 ; i < list.count ; i++)
+  {
+    NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
+    
+    // Add only tables that belongs to db
+    if (my_strcasecmp(system_charset_info, t.database, dbname))
+      continue;
+    DBUG_PRINT("info", ("%s must be dropped", t.name));     
+    drop_list.push_back(thd->strdup(t.name));
+  }
+  // Drop any tables belonging to database
+  ndb->setDatabaseName(dbname);
+  List_iterator_fast<char> it(drop_list);
+  while ((tabname=it++))
+    if (dict->dropTable(tabname))
+      ERR_RETURN(dict->getNdbError());      
+
+  DBUG_RETURN(0);
+}
+
+
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
                           const char *wild, bool dir, List<char> *files)
 {
@@ -4797,26 +4832,31 @@ void ndbcluster_print_error(int error, const NdbOperation *error_op)
   DBUG_VOID_RETURN;
 }
 
-/*
-  Set m_tabname from full pathname to table file 
+/**
+ * Set a given location from full pathname to database name
+ *
  */
-
-void ha_ndbcluster::set_tabname(const char *path_name)
+void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
 {
   char *end, *ptr;
   
   /* Scan name from the end */
-  end= strend(path_name)-1;
-  ptr= end;
+  ptr= strend(path_name)-1;
+  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
+    ptr--;
+  }
+  ptr--;
+  end= ptr;
   while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
     ptr--;
   }
   uint name_len= end - ptr;
-  memcpy(m_tabname, ptr + 1, end - ptr);
-  m_tabname[name_len]= '\0';
+  memcpy(dbname, ptr + 1, name_len);
+  dbname[name_len]= '\0';
 #ifdef __WIN__
   /* Put to lower case */
-  ptr= m_tabname;
+  
+  ptr= dbname;
   
   while (*ptr != '\0') {
     *ptr= tolower(*ptr);
@@ -4825,6 +4865,15 @@ void ha_ndbcluster::set_tabname(const char *path_name)
 #endif
 }
 
+/*
+  Set m_dbname from full pathname to table file
+ */
+
+void ha_ndbcluster::set_dbname(const char *path_name)
+{
+  set_dbname(path_name, m_dbname);
+}
+
 /**
  * Set a given location from full pathname to table file
  *
@@ -4854,39 +4903,13 @@ ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
 #endif
 }
 
-
 /*
-  Set m_dbname from full pathname to table file
- 
+  Set m_tabname from full pathname to table file 
  */
 
-void ha_ndbcluster::set_dbname(const char *path_name)
+void ha_ndbcluster::set_tabname(const char *path_name)
 {
-  char *end, *ptr;
-  
-  /* Scan name from the end */
-  ptr= strend(path_name)-1;
-  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
-    ptr--;
-  }
-  ptr--;
-  end= ptr;
-  while (ptr >= path_name && *ptr != '\\' && *ptr != '/') {
-    ptr--;
-  }
-  uint name_len= end - ptr;
-  memcpy(m_dbname, ptr + 1, name_len);
-  m_dbname[name_len]= '\0';
-#ifdef __WIN__
-  /* Put to lower case */
-  
-  ptr= m_dbname;
-  
-  while (*ptr != '\0') {
-    *ptr= tolower(*ptr);
-    ptr++;
-  }
-#endif
+  set_tabname(path_name, m_tabname);
 }
 
 
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index 4574ddc3562595d800f605b9da3ba713799349dc..4dbab18b82899ba30c3ef15ec745f699450d1755 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -457,6 +457,9 @@ class ha_ndbcluster: public handler
   static Thd_ndb* seize_thd_ndb();
   static void release_thd_ndb(Thd_ndb* thd_ndb);
  
+static void set_dbname(const char *pathname, char *dbname);
+static void set_tabname(const char *pathname, char *tabname);
+
   /*
     Condition pushdown
   */
@@ -537,7 +540,6 @@ class ha_ndbcluster: public handler
 
   void set_dbname(const char *pathname);
   void set_tabname(const char *pathname);
-  void set_tabname(const char *pathname, char *tabname);
 
   bool set_hidden_key(NdbOperation*,
                       uint fieldnr, const byte* field_ptr);