Commit 0ecca62b authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.16-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "One notable change here is that async creates and unlinks introduced
  in 5.7 are now enabled by default. This should greatly speed up things
  like rm, tar and rsync. To opt out, wsync mount option can be used.

  Other than that we have a pile of bug fixes all across the filesystem
  from Jeff, Xiubo and Kotresh and a metrics infrastructure rework from
  Luis"

* tag 'ceph-for-5.16-rc1' of git://github.com/ceph/ceph-client:
  ceph: add a new metric to keep track of remote object copies
  libceph, ceph: move ceph_osdc_copy_from() into cephfs code
  ceph: clean-up metrics data structures to reduce code duplication
  ceph: split 'metric' debugfs file into several files
  ceph: return the real size read when it hits EOF
  ceph: properly handle statfs on multifs setups
  ceph: shut down mount on bad mdsmap or fsmap decode
  ceph: fix mdsmap decode when there are MDS's beyond max_mds
  ceph: ignore the truncate when size won't change with Fx caps issued
  ceph: don't rely on error_string to validate blocklisted session.
  ceph: just use ci->i_version for fscache aux info
  ceph: shut down access to inode when async create fails
  ceph: refactor remove_session_caps_cb
  ceph: fix auth cap handling logic in remove_session_caps_cb
  ceph: drop private list from remove_session_caps_cb
  ceph: don't use -ESTALE as special return code in try_get_cap_refs
  ceph: print inode numbers instead of pointer values
  ceph: enable async dirops by default
  libceph: drop ->monmap and err initialization
  ceph: convert to noop_direct_IO
parents a27c0858 c02cb7bd
......@@ -725,7 +725,7 @@ static int ceph_writepages_start(struct address_space *mapping,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
(wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
if (ceph_inode_is_shutdown(inode)) {
if (ci->i_wrbuffer_ref > 0) {
pr_warn_ratelimited(
"writepage_start %p %lld forced umount\n",
......@@ -1146,12 +1146,12 @@ static struct ceph_snap_context *
ceph_find_incompatible(struct page *page)
{
struct inode *inode = page->mapping->host;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
dout(" page %p forced umount\n", page);
return ERR_PTR(-EIO);
if (ceph_inode_is_shutdown(inode)) {
dout(" page %p %llx:%llx is shutdown\n", page,
ceph_vinop(inode));
return ERR_PTR(-ESTALE);
}
for (;;) {
......@@ -1312,17 +1312,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
return copied;
}
/*
* we set .direct_IO to indicate direct io is supported, but since we
* intercept O_DIRECT reads and writes early, this function should
* never get called.
*/
static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
{
WARN_ON(1);
return -EINVAL;
}
const struct address_space_operations ceph_aops = {
.readpage = ceph_readpage,
.readahead = ceph_readahead,
......@@ -1333,7 +1322,7 @@ const struct address_space_operations ceph_aops = {
.set_page_dirty = ceph_set_page_dirty,
.invalidatepage = ceph_invalidatepage,
.releasepage = ceph_releasepage,
.direct_IO = ceph_direct_io,
.direct_IO = noop_direct_IO,
};
static void ceph_block_sigs(sigset_t *oldset)
......@@ -1362,6 +1351,9 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
sigset_t oldset;
vm_fault_t ret = VM_FAULT_SIGBUS;
if (ceph_inode_is_shutdown(inode))
return ret;
ceph_block_sigs(&oldset);
dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
......@@ -1453,6 +1445,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
sigset_t oldset;
vm_fault_t ret = VM_FAULT_SIGBUS;
if (ceph_inode_is_shutdown(inode))
return ret;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
return VM_FAULT_OOM;
......
......@@ -12,12 +12,6 @@
#include "super.h"
#include "cache.h"
struct ceph_aux_inode {
u64 version;
u64 mtime_sec;
u64 mtime_nsec;
};
struct fscache_netfs ceph_cache_netfs = {
.name = "ceph",
.version = 0,
......@@ -109,20 +103,14 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
void *cookie_netfs_data, const void *data, uint16_t dlen,
loff_t object_size)
{
struct ceph_aux_inode aux;
struct ceph_inode_info* ci = cookie_netfs_data;
struct inode* inode = &ci->vfs_inode;
if (dlen != sizeof(aux) ||
if (dlen != sizeof(ci->i_version) ||
i_size_read(inode) != object_size)
return FSCACHE_CHECKAUX_OBSOLETE;
memset(&aux, 0, sizeof(aux));
aux.version = ci->i_version;
aux.mtime_sec = inode->i_mtime.tv_sec;
aux.mtime_nsec = inode->i_mtime.tv_nsec;
if (memcmp(data, &aux, sizeof(aux)) != 0)
if (*(u64 *)data != ci->i_version)
return FSCACHE_CHECKAUX_OBSOLETE;
dout("ceph inode 0x%p cached okay\n", ci);
......@@ -139,7 +127,6 @@ void ceph_fscache_register_inode_cookie(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_aux_inode aux;
/* No caching for filesystem */
if (!fsc->fscache)
......@@ -151,14 +138,10 @@ void ceph_fscache_register_inode_cookie(struct inode *inode)
inode_lock_nested(inode, I_MUTEX_CHILD);
if (!ci->fscache) {
memset(&aux, 0, sizeof(aux));
aux.version = ci->i_version;
aux.mtime_sec = inode->i_mtime.tv_sec;
aux.mtime_nsec = inode->i_mtime.tv_nsec;
ci->fscache = fscache_acquire_cookie(fsc->fscache,
&ceph_fscache_inode_object_def,
&ci->i_vino, sizeof(ci->i_vino),
&aux, sizeof(aux),
&ci->i_version, sizeof(ci->i_version),
ci, i_size_read(inode), false);
}
inode_unlock(inode);
......
......@@ -1188,11 +1188,11 @@ void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
lockdep_assert_held(&ci->i_ceph_lock);
fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
fsc = ceph_inode_to_client(&ci->vfs_inode);
WARN_ON_ONCE(ci->i_auth_cap == cap &&
!list_empty(&ci->i_dirty_item) &&
!fsc->blocklisted &&
READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
!ceph_inode_is_shutdown(&ci->vfs_inode));
__ceph_remove_cap(cap, queue_release);
}
......@@ -1968,8 +1968,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
}
}
dout("check_caps %p file_want %s used %s dirty %s flushing %s"
" issued %s revoking %s retain %s %s%s\n", inode,
dout("check_caps %llx.%llx file_want %s used %s dirty %s flushing %s"
" issued %s revoking %s retain %s %s%s\n", ceph_vinop(inode),
ceph_cap_string(file_wanted),
ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
ceph_cap_string(ci->i_flushing_caps),
......@@ -1990,7 +1990,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
!tried_invalidate) {
dout("check_caps trying to invalidate on %p\n", inode);
dout("check_caps trying to invalidate on %llx.%llx\n",
ceph_vinop(inode));
if (try_nonblocking_invalidate(inode) < 0) {
dout("check_caps queuing invalidate\n");
queue_invalidate = true;
......@@ -2629,9 +2630,9 @@ void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
*
* Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
* or a negative error code. There are 3 speical error codes:
* -EAGAIN: need to sleep but non-blocking is specified
* -EFBIG: ask caller to call check_max_size() and try again.
* -ESTALE: ask caller to call ceph_renew_caps() and try again.
* -EAGAIN: need to sleep but non-blocking is specified
* -EFBIG: ask caller to call check_max_size() and try again.
* -EUCLEAN: ask caller to call ceph_renew_caps() and try again.
*/
enum {
/* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
......@@ -2679,7 +2680,7 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
inode, endoff, ci->i_max_size);
if (endoff > ci->i_requested_max_size)
ret = ci->i_auth_cap ? -EFBIG : -ESTALE;
ret = ci->i_auth_cap ? -EFBIG : -EUCLEAN;
goto out_unlock;
}
/*
......@@ -2749,9 +2750,9 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
goto out_unlock;
}
if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode);
ret = -EIO;
if (ceph_inode_is_shutdown(inode)) {
dout("get_cap_refs %p inode is shutdown\n", inode);
ret = -ESTALE;
goto out_unlock;
}
mds_wanted = __ceph_caps_mds_wanted(ci, false);
......@@ -2759,7 +2760,7 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
dout("get_cap_refs %p need %s > mds_wanted %s\n",
inode, ceph_cap_string(need),
ceph_cap_string(mds_wanted));
ret = -ESTALE;
ret = -EUCLEAN;
goto out_unlock;
}
......@@ -2843,7 +2844,7 @@ int ceph_try_get_caps(struct inode *inode, int need, int want,
ret = try_get_cap_refs(inode, need, want, 0, flags, got);
/* three special error codes */
if (ret == -EAGAIN || ret == -EFBIG || ret == -ESTALE)
if (ret == -EAGAIN || ret == -EFBIG || ret == -EUCLEAN)
ret = 0;
return ret;
}
......@@ -2926,7 +2927,7 @@ int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got
}
if (ret < 0) {
if (ret == -EFBIG || ret == -ESTALE) {
if (ret == -EFBIG || ret == -EUCLEAN) {
int ret2 = ceph_wait_on_async_create(inode);
if (ret2 < 0)
return ret2;
......@@ -2935,7 +2936,7 @@ int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got
check_max_size(inode, endoff);
continue;
}
if (ret == -ESTALE) {
if (ret == -EUCLEAN) {
/* session was killed, try renew caps */
ret = ceph_renew_caps(inode, flags);
if (ret == 0)
......@@ -4315,7 +4316,7 @@ static void flush_dirty_session_caps(struct ceph_mds_session *s)
i_dirty_item);
inode = &ci->vfs_inode;
ihold(inode);
dout("flush_dirty_caps %p\n", inode);
dout("flush_dirty_caps %llx.%llx\n", ceph_vinop(inode));
spin_unlock(&mdsc->cap_dirty_lock);
ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
iput(inode);
......@@ -4560,3 +4561,119 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry,
spin_unlock(&dentry->d_lock);
return ret;
}
static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap_snap *capsnap;
int capsnap_release = 0;
lockdep_assert_held(&ci->i_ceph_lock);
dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
while (!list_empty(&ci->i_cap_snaps)) {
capsnap = list_first_entry(&ci->i_cap_snaps,
struct ceph_cap_snap, ci_item);
__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
ceph_put_snap_context(capsnap->context);
ceph_put_cap_snap(capsnap);
capsnap_release++;
}
wake_up_all(&ci->i_cap_wq);
wake_up_all(&mdsc->cap_flushing_wq);
return capsnap_release;
}
int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
bool is_auth;
bool dirty_dropped = false;
int iputs = 0;
lockdep_assert_held(&ci->i_ceph_lock);
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
is_auth = (cap == ci->i_auth_cap);
__ceph_remove_cap(cap, false);
if (is_auth) {
struct ceph_cap_flush *cf;
if (ceph_inode_is_shutdown(inode)) {
if (inode->i_data.nrpages > 0)
*invalidate = true;
if (ci->i_wrbuffer_ref > 0)
mapping_set_error(&inode->i_data, -EIO);
}
spin_lock(&mdsc->cap_dirty_lock);
/* trash all of the cap flushes for this inode */
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
struct ceph_cap_flush, i_list);
list_del_init(&cf->g_list);
list_del_init(&cf->i_list);
if (!cf->is_capsnap)
ceph_free_cap_flush(cf);
}
if (!list_empty(&ci->i_dirty_item)) {
pr_warn_ratelimited(
" dropping dirty %s state for %p %lld\n",
ceph_cap_string(ci->i_dirty_caps),
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
dirty_dropped = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
" dropping dirty+flushing %s state for %p %lld\n",
ceph_cap_string(ci->i_flushing_caps),
inode, ceph_ino(inode));
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
if (dirty_dropped) {
mapping_set_error(inode->i_mapping, -EIO);
if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_wr_ref == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
}
}
if (atomic_read(&ci->i_filelock_ref) > 0) {
/* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
pr_warn_ratelimited(" dropping file locks for %p %lld\n",
inode, ceph_ino(inode));
}
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
cf = ci->i_prealloc_cap_flush;
ci->i_prealloc_cap_flush = NULL;
if (!cf->is_capsnap)
ceph_free_cap_flush(cf);
}
if (!list_empty(&ci->i_cap_snaps))
iputs = remove_capsnaps(mdsc, inode);
}
if (dirty_dropped)
++iputs;
return iputs;
}
......@@ -146,82 +146,93 @@ static int mdsc_show(struct seq_file *s, void *p)
name, total, avg, _min, max, sum); \
}
static int metric_show(struct seq_file *s, void *p)
static int metrics_file_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_client_metric *m = &mdsc->metric;
int nr_caps = 0;
s64 total, sum, avg, min, max, sq;
u64 sum_sz, avg_sz, min_sz, max_sz;
struct ceph_client_metric *m = &fsc->mdsc->metric;
sum = percpu_counter_sum(&m->total_inodes);
seq_printf(s, "item total\n");
seq_printf(s, "------------------------------------------\n");
seq_printf(s, "%-35s%lld / %lld\n", "opened files / total inodes",
atomic64_read(&m->opened_files), sum);
seq_printf(s, "%-35s%lld / %lld\n", "pinned i_caps / total inodes",
atomic64_read(&m->total_caps), sum);
seq_printf(s, "%-35s%lld / %lld\n", "opened inodes / total inodes",
percpu_counter_sum(&m->opened_inodes), sum);
seq_printf(s, "\n");
seq_printf(s, "%-35s%lld\n", "total inodes",
percpu_counter_sum(&m->total_inodes));
seq_printf(s, "%-35s%lld\n", "opened files",
atomic64_read(&m->opened_files));
seq_printf(s, "%-35s%lld\n", "pinned i_caps",
atomic64_read(&m->total_caps));
seq_printf(s, "%-35s%lld\n", "opened inodes",
percpu_counter_sum(&m->opened_inodes));
return 0;
}
static const char * const metric_str[] = {
"read",
"write",
"metadata",
"copyfrom"
};
static int metrics_latency_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_client_metric *cm = &fsc->mdsc->metric;
struct ceph_metric *m;
s64 total, sum, avg, min, max, sq;
int i;
seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
seq_printf(s, "-----------------------------------------------------------------------------------\n");
spin_lock(&m->read_metric_lock);
total = m->total_reads;
sum = m->read_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->read_latency_min;
max = m->read_latency_max;
sq = m->read_latency_sq_sum;
spin_unlock(&m->read_metric_lock);
CEPH_LAT_METRIC_SHOW("read", total, avg, min, max, sq);
spin_lock(&m->write_metric_lock);
total = m->total_writes;
sum = m->write_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->write_latency_min;
max = m->write_latency_max;
sq = m->write_latency_sq_sum;
spin_unlock(&m->write_metric_lock);
CEPH_LAT_METRIC_SHOW("write", total, avg, min, max, sq);
spin_lock(&m->metadata_metric_lock);
total = m->total_metadatas;
sum = m->metadata_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->metadata_latency_min;
max = m->metadata_latency_max;
sq = m->metadata_latency_sq_sum;
spin_unlock(&m->metadata_metric_lock);
CEPH_LAT_METRIC_SHOW("metadata", total, avg, min, max, sq);
seq_printf(s, "\n");
for (i = 0; i < METRIC_MAX; i++) {
m = &cm->metric[i];
spin_lock(&m->lock);
total = m->total;
sum = m->latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->latency_min;
max = m->latency_max;
sq = m->latency_sq_sum;
spin_unlock(&m->lock);
CEPH_LAT_METRIC_SHOW(metric_str[i], total, avg, min, max, sq);
}
return 0;
}
static int metrics_size_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_client_metric *cm = &fsc->mdsc->metric;
struct ceph_metric *m;
s64 total;
u64 sum, avg, min, max;
int i;
seq_printf(s, "item total avg_sz(bytes) min_sz(bytes) max_sz(bytes) total_sz(bytes)\n");
seq_printf(s, "----------------------------------------------------------------------------------------\n");
spin_lock(&m->read_metric_lock);
total = m->total_reads;
sum_sz = m->read_size_sum;
avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
min_sz = m->read_size_min;
max_sz = m->read_size_max;
spin_unlock(&m->read_metric_lock);
CEPH_SZ_METRIC_SHOW("read", total, avg_sz, min_sz, max_sz, sum_sz);
spin_lock(&m->write_metric_lock);
total = m->total_writes;
sum_sz = m->write_size_sum;
avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
min_sz = m->write_size_min;
max_sz = m->write_size_max;
spin_unlock(&m->write_metric_lock);
CEPH_SZ_METRIC_SHOW("write", total, avg_sz, min_sz, max_sz, sum_sz);
seq_printf(s, "\n");
for (i = 0; i < METRIC_MAX; i++) {
/* skip 'metadata' as it doesn't use the size metric */
if (i == METRIC_METADATA)
continue;
m = &cm->metric[i];
spin_lock(&m->lock);
total = m->total;
sum = m->size_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->size_min;
max = m->size_max;
spin_unlock(&m->lock);
CEPH_SZ_METRIC_SHOW(metric_str[i], total, avg, min, max, sum);
}
return 0;
}
static int metrics_caps_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_client_metric *m = &fsc->mdsc->metric;
int nr_caps = 0;
seq_printf(s, "item total miss hit\n");
seq_printf(s, "-------------------------------------------------\n");
......@@ -350,8 +361,11 @@ DEFINE_SHOW_ATTRIBUTE(mdsmap);
DEFINE_SHOW_ATTRIBUTE(mdsc);
DEFINE_SHOW_ATTRIBUTE(caps);
DEFINE_SHOW_ATTRIBUTE(mds_sessions);
DEFINE_SHOW_ATTRIBUTE(metric);
DEFINE_SHOW_ATTRIBUTE(status);
DEFINE_SHOW_ATTRIBUTE(metrics_file);
DEFINE_SHOW_ATTRIBUTE(metrics_latency);
DEFINE_SHOW_ATTRIBUTE(metrics_size);
DEFINE_SHOW_ATTRIBUTE(metrics_caps);
/*
......@@ -385,8 +399,9 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
debugfs_remove(fsc->debugfs_mdsmap);
debugfs_remove(fsc->debugfs_mds_sessions);
debugfs_remove(fsc->debugfs_caps);
debugfs_remove(fsc->debugfs_metric);
debugfs_remove(fsc->debugfs_status);
debugfs_remove(fsc->debugfs_mdsc);
debugfs_remove_recursive(fsc->debugfs_metrics_dir);
}
void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
......@@ -426,12 +441,6 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
fsc,
&mdsc_fops);
fsc->debugfs_metric = debugfs_create_file("metrics",
0400,
fsc->client->debugfs_dir,
fsc,
&metric_fops);
fsc->debugfs_caps = debugfs_create_file("caps",
0400,
fsc->client->debugfs_dir,
......@@ -443,6 +452,18 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
fsc->client->debugfs_dir,
fsc,
&status_fops);
fsc->debugfs_metrics_dir = debugfs_create_dir("metrics",
fsc->client->debugfs_dir);
debugfs_create_file("file", 0400, fsc->debugfs_metrics_dir, fsc,
&metrics_file_fops);
debugfs_create_file("latency", 0400, fsc->debugfs_metrics_dir, fsc,
&metrics_latency_fops);
debugfs_create_file("size", 0400, fsc->debugfs_metrics_dir, fsc,
&metrics_size_fops);
debugfs_create_file("caps", 0400, fsc->debugfs_metrics_dir, fsc,
&metrics_caps_fops);
}
......
......@@ -157,6 +157,11 @@ static struct inode *__lookup_inode(struct super_block *sb, u64 ino)
ceph_mdsc_put_request(req);
if (!inode)
return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE);
} else {
if (ceph_inode_is_shutdown(inode)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
}
return inode;
}
......@@ -223,8 +228,13 @@ static struct dentry *__snapfh_to_dentry(struct super_block *sb,
return ERR_PTR(-ESTALE);
inode = ceph_find_inode(sb, vino);
if (inode)
if (inode) {
if (ceph_inode_is_shutdown(inode)) {
iput(inode);
return ERR_PTR(-ESTALE);
}
return d_obtain_alias(inode);
}
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
USE_ANY_MDS);
......
......@@ -525,6 +525,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
if (result) {
struct dentry *dentry = req->r_dentry;
struct inode *inode = d_inode(dentry);
int pathlen = 0;
u64 base = 0;
char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
......@@ -534,7 +535,8 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
if (!d_unhashed(dentry))
d_drop(dentry);
/* FIXME: start returning I/O errors on all accesses? */
ceph_inode_shutdown(inode);
pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
base, IS_ERR(path) ? "<<bad>>" : path, result);
ceph_mdsc_free_path(path, pathlen);
......@@ -556,7 +558,7 @@ static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
}
ceph_kick_flushing_inode_caps(req->r_session, ci);
spin_unlock(&ci->i_ceph_lock);
} else {
} else if (!result) {
pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
req->r_deleg_ino);
}
......@@ -845,6 +847,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ssize_t ret;
u64 off = iocb->ki_pos;
u64 len = iov_iter_count(to);
u64 i_size;
dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
......@@ -868,7 +871,6 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
struct page **pages;
int num_pages;
size_t page_off;
u64 i_size;
bool more;
int idx;
size_t left;
......@@ -951,11 +953,14 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
}
if (off > iocb->ki_pos) {
if (ret >= 0 &&
iov_iter_count(to) > 0 && off >= i_size_read(inode))
if (off >= i_size) {
*retry_op = CHECK_EOF;
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
ret = i_size - iocb->ki_pos;
iocb->ki_pos = i_size;
} else {
ret = off - iocb->ki_pos;
iocb->ki_pos = off;
}
}
dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
......@@ -1526,6 +1531,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
if (direct_lock)
ceph_start_io_direct(inode);
else
......@@ -1678,6 +1686,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
loff_t pos;
loff_t limit = max(i_size_read(inode), fsc->max_file_size);
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
......@@ -2200,6 +2211,54 @@ static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
return 0;
}
static struct ceph_osd_request *
ceph_alloc_copyfrom_request(struct ceph_osd_client *osdc,
u64 src_snapid,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 truncate_seq, u64 truncate_size)
{
struct ceph_osd_request *req;
int ret;
u32 src_fadvise_flags =
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE;
u32 dst_fadvise_flags =
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
if (!req)
return ERR_PTR(-ENOMEM);
req->r_flags = CEPH_OSD_FLAG_WRITE;
ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
ceph_oid_copy(&req->r_t.base_oid, dst_oid);
ret = osd_req_op_copy_from_init(req, src_snapid, 0,
src_oid, src_oloc,
src_fadvise_flags,
dst_fadvise_flags,
truncate_seq,
truncate_size,
CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
if (ret)
goto out;
ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
goto out;
return req;
out:
ceph_osdc_put_request(req);
return ERR_PTR(ret);
}
static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off,
struct ceph_inode_info *dst_ci, u64 *dst_off,
struct ceph_fs_client *fsc,
......@@ -2207,6 +2266,8 @@ static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off
{
struct ceph_object_locator src_oloc, dst_oloc;
struct ceph_object_id src_oid, dst_oid;
struct ceph_osd_client *osdc;
struct ceph_osd_request *req;
size_t bytes = 0;
u64 src_objnum, src_objoff, dst_objnum, dst_objoff;
u32 src_objlen, dst_objlen;
......@@ -2217,6 +2278,7 @@ static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off
src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
dst_oloc.pool = dst_ci->i_layout.pool_id;
dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
osdc = &fsc->client->osdc;
while (len >= object_size) {
ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off,
......@@ -2232,17 +2294,22 @@ static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off
ceph_oid_printf(&dst_oid, "%llx.%08llx",
dst_ci->i_vino.ino, dst_objnum);
/* Do an object remote copy */
ret = ceph_osdc_copy_from(&fsc->client->osdc,
src_ci->i_vino.snap, 0,
&src_oid, &src_oloc,
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
&dst_oid, &dst_oloc,
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED,
dst_ci->i_truncate_seq,
dst_ci->i_truncate_size,
CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ);
req = ceph_alloc_copyfrom_request(osdc, src_ci->i_vino.snap,
&src_oid, &src_oloc,
&dst_oid, &dst_oloc,
dst_ci->i_truncate_seq,
dst_ci->i_truncate_size);
if (IS_ERR(req))
ret = PTR_ERR(req);
else {
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
ceph_update_copyfrom_metrics(&fsc->mdsc->metric,
req->r_start_latency,
req->r_end_latency,
object_size, ret);
ceph_osdc_put_request(req);
}
if (ret) {
if (ret == -EOPNOTSUPP) {
fsc->have_copy_from2 = false;
......
......@@ -1841,15 +1841,14 @@ void ceph_queue_inode_work(struct inode *inode, int work_bit)
static void ceph_do_invalidate_pages(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
u32 orig_gen;
int check = 0;
mutex_lock(&ci->i_truncate_mutex);
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n",
inode, ceph_ino(inode));
if (ceph_inode_is_shutdown(inode)) {
pr_warn_ratelimited("%s: inode %llx.%llx is shut down\n",
__func__, ceph_vinop(inode));
mapping_set_error(inode->i_mapping, -EIO);
truncate_pagecache(inode, 0);
mutex_unlock(&ci->i_truncate_mutex);
......@@ -1871,7 +1870,8 @@ static void ceph_do_invalidate_pages(struct inode *inode)
ceph_fscache_invalidate(inode);
if (invalidate_inode_pages2(inode->i_mapping) < 0) {
pr_err("invalidate_pages %p fails\n", inode);
pr_err("invalidate_inode_pages2 %llx.%llx failed\n",
ceph_vinop(inode));
}
spin_lock(&ci->i_ceph_lock);
......@@ -2103,12 +2103,14 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
loff_t isize = i_size_read(inode);
dout("setattr %p size %lld -> %lld\n", inode, isize, attr->ia_size);
if ((issued & CEPH_CAP_FILE_EXCL) && attr->ia_size > isize) {
i_size_write(inode, attr->ia_size);
inode->i_blocks = calc_inode_blocks(attr->ia_size);
ci->i_reported_size = attr->ia_size;
dirtied |= CEPH_CAP_FILE_EXCL;
ia_valid |= ATTR_MTIME;
if ((issued & CEPH_CAP_FILE_EXCL) && attr->ia_size >= isize) {
if (attr->ia_size > isize) {
i_size_write(inode, attr->ia_size);
inode->i_blocks = calc_inode_blocks(attr->ia_size);
ci->i_reported_size = attr->ia_size;
dirtied |= CEPH_CAP_FILE_EXCL;
ia_valid |= ATTR_MTIME;
}
} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
attr->ia_size != isize) {
req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
......@@ -2217,6 +2219,9 @@ int ceph_setattr(struct user_namespace *mnt_userns, struct dentry *dentry,
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
err = setattr_prepare(&init_user_ns, dentry, attr);
if (err != 0)
return err;
......@@ -2347,6 +2352,9 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
u32 valid_mask = STATX_BASIC_STATS;
int err = 0;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
/* Skip the getattr altogether if we're asked not to sync */
if (!(flags & AT_STATX_DONT_SYNC)) {
err = ceph_do_getattr(inode,
......@@ -2394,3 +2402,27 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
stat->result_mask = request_mask & valid_mask;
return err;
}
void ceph_inode_shutdown(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
int iputs = 0;
bool invalidate = false;
spin_lock(&ci->i_ceph_lock);
ci->i_ceph_flags |= CEPH_I_SHUTDOWN;
p = rb_first(&ci->i_caps);
while (p) {
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
iputs += ceph_purge_inode_cap(inode, cap, &invalidate);
}
spin_unlock(&ci->i_ceph_lock);
if (invalidate)
ceph_queue_invalidate(inode);
while (iputs--)
iput(inode);
}
......@@ -241,6 +241,9 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
if (!(fl->fl_flags & FL_POSIX))
return -ENOLCK;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
dout("ceph_lock, fl_owner: %p\n", fl->fl_owner);
/* set wait bit as appropriate, then make command as Ceph expects it*/
......@@ -303,6 +306,9 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
if (!(fl->fl_flags & FL_FLOCK))
return -ENOLCK;
if (ceph_inode_is_shutdown(inode))
return -ESTALE;
dout("ceph_flock, fl_file: %p\n", fl->fl_file);
spin_lock(&ci->i_ceph_lock);
......
......@@ -1590,129 +1590,23 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
return ret;
}
static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap_snap *capsnap;
int capsnap_release = 0;
lockdep_assert_held(&ci->i_ceph_lock);
dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
while (!list_empty(&ci->i_cap_snaps)) {
capsnap = list_first_entry(&ci->i_cap_snaps,
struct ceph_cap_snap, ci_item);
__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
ceph_put_snap_context(capsnap->context);
ceph_put_cap_snap(capsnap);
capsnap_release++;
}
wake_up_all(&ci->i_cap_wq);
wake_up_all(&mdsc->cap_flushing_wq);
return capsnap_release;
}
static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
void *arg)
{
struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
LIST_HEAD(to_remove);
bool dirty_dropped = false;
bool invalidate = false;
int capsnap_release = 0;
int iputs;
dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock);
__ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf;
if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
if (inode->i_data.nrpages > 0)
invalidate = true;
if (ci->i_wrbuffer_ref > 0)
mapping_set_error(&inode->i_data, -EIO);
}
while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list,
struct ceph_cap_flush, i_list);
list_move(&cf->i_list, &to_remove);
}
spin_lock(&mdsc->cap_dirty_lock);
list_for_each_entry(cf, &to_remove, i_list)
list_del_init(&cf->g_list);
if (!list_empty(&ci->i_dirty_item)) {
pr_warn_ratelimited(
" dropping dirty %s state for %p %lld\n",
ceph_cap_string(ci->i_dirty_caps),
inode, ceph_ino(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
dirty_dropped = true;
}
if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited(
" dropping dirty+flushing %s state for %p %lld\n",
ceph_cap_string(ci->i_flushing_caps),
inode, ceph_ino(inode));
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
if (dirty_dropped) {
mapping_set_error(inode->i_mapping, -EIO);
if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_wr_ref == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
}
}
if (atomic_read(&ci->i_filelock_ref) > 0) {
/* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
pr_warn_ratelimited(" dropping file locks for %p %lld\n",
inode, ceph_ino(inode));
}
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
ci->i_prealloc_cap_flush = NULL;
}
if (!list_empty(&ci->i_cap_snaps))
capsnap_release = remove_capsnaps(mdsc, inode);
}
iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
spin_unlock(&ci->i_ceph_lock);
while (!list_empty(&to_remove)) {
struct ceph_cap_flush *cf;
cf = list_first_entry(&to_remove,
struct ceph_cap_flush, i_list);
list_del_init(&cf->i_list);
if (!cf->is_capsnap)
ceph_free_cap_flush(cf);
}
wake_up_all(&ci->i_cap_wq);
if (invalidate)
ceph_queue_invalidate(inode);
if (dirty_dropped)
iput(inode);
while (capsnap_release--)
while (iputs--)
iput(inode);
return 0;
}
......@@ -3467,9 +3361,14 @@ static void handle_session(struct ceph_mds_session *session,
if (msg_version >= 3) {
u32 len;
/* version >= 2, metadata */
if (__decode_session_metadata(&p, end, &blocklisted) < 0)
/* version >= 2 and < 5, decode metadata, skip otherwise
* as it's handled via flags.
*/
if (msg_version >= 5)
ceph_decode_skip_map(&p, end, string, string, bad);
else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
goto bad;
/* version >= 3, feature bits */
ceph_decode_32_safe(&p, end, len, bad);
if (len) {
......@@ -3478,6 +3377,18 @@ static void handle_session(struct ceph_mds_session *session,
}
}
if (msg_version >= 5) {
u32 flags;
/* version >= 4, struct_v, struct_cv, len, metric_spec */
ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 2, bad);
/* version >= 5, flags */
ceph_decode_32_safe(&p, end, flags, bad);
if (flags & CEPH_SESSION_BLOCKLISTED) {
pr_warn("mds%d session blocklisted\n", session->s_mds);
blocklisted = true;
}
}
mutex_lock(&mdsc->mutex);
if (op == CEPH_SESSION_CLOSE) {
ceph_get_mds_session(session);
......@@ -5072,7 +4983,8 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
return;
bad:
pr_err("error decoding fsmap\n");
pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
ceph_umount_begin(mdsc->fsc->sb);
err_out:
mutex_lock(&mdsc->mutex);
mdsc->mdsmap_err = err;
......@@ -5139,7 +5051,8 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
bad_unlock:
mutex_unlock(&mdsc->mutex);
bad:
pr_err("error decoding mdsmap %d\n", err);
pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
ceph_umount_begin(mdsc->fsc->sb);
return;
}
......
......@@ -263,10 +263,6 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end, bool msgr2)
goto nomem;
for (j = 0; j < num_export_targets; j++) {
target = ceph_decode_32(&pexport_targets);
if (target >= m->possible_max_rank) {
err = -EIO;
goto corrupt;
}
info->export_targets[j] = target;
}
} else {
......
......@@ -62,7 +62,7 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
read->header.ver = 1;
read->header.compat = 1;
read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
sum = m->read_latency_sum;
sum = m->metric[METRIC_READ].latency_sum;
jiffies_to_timespec64(sum, &ts);
read->sec = cpu_to_le32(ts.tv_sec);
read->nsec = cpu_to_le32(ts.tv_nsec);
......@@ -74,7 +74,7 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
write->header.ver = 1;
write->header.compat = 1;
write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
sum = m->write_latency_sum;
sum = m->metric[METRIC_WRITE].latency_sum;
jiffies_to_timespec64(sum, &ts);
write->sec = cpu_to_le32(ts.tv_sec);
write->nsec = cpu_to_le32(ts.tv_nsec);
......@@ -86,7 +86,7 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
meta->header.ver = 1;
meta->header.compat = 1;
meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
sum = m->metadata_latency_sum;
sum = m->metric[METRIC_METADATA].latency_sum;
jiffies_to_timespec64(sum, &ts);
meta->sec = cpu_to_le32(ts.tv_sec);
meta->nsec = cpu_to_le32(ts.tv_nsec);
......@@ -141,8 +141,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
rsize->header.ver = 1;
rsize->header.compat = 1;
rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len);
rsize->total_ops = cpu_to_le64(m->total_reads);
rsize->total_size = cpu_to_le64(m->read_size_sum);
rsize->total_ops = cpu_to_le64(m->metric[METRIC_READ].total);
rsize->total_size = cpu_to_le64(m->metric[METRIC_READ].size_sum);
items++;
/* encode the write io size metric */
......@@ -151,8 +151,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
wsize->header.ver = 1;
wsize->header.compat = 1;
wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len);
wsize->total_ops = cpu_to_le64(m->total_writes);
wsize->total_size = cpu_to_le64(m->write_size_sum);
wsize->total_ops = cpu_to_le64(m->metric[METRIC_WRITE].total);
wsize->total_size = cpu_to_le64(m->metric[METRIC_WRITE].size_sum);
items++;
put_unaligned_le32(items, &head->num);
......@@ -220,7 +220,8 @@ static void metric_delayed_work(struct work_struct *work)
int ceph_metric_init(struct ceph_client_metric *m)
{
int ret;
struct ceph_metric *metric;
int ret, i;
if (!m)
return -EINVAL;
......@@ -243,32 +244,18 @@ int ceph_metric_init(struct ceph_client_metric *m)
if (ret)
goto err_i_caps_mis;
spin_lock_init(&m->read_metric_lock);
m->read_latency_sq_sum = 0;
m->read_latency_min = KTIME_MAX;
m->read_latency_max = 0;
m->total_reads = 0;
m->read_latency_sum = 0;
m->read_size_min = U64_MAX;
m->read_size_max = 0;
m->read_size_sum = 0;
spin_lock_init(&m->write_metric_lock);
m->write_latency_sq_sum = 0;
m->write_latency_min = KTIME_MAX;
m->write_latency_max = 0;
m->total_writes = 0;
m->write_latency_sum = 0;
m->write_size_min = U64_MAX;
m->write_size_max = 0;
m->write_size_sum = 0;
spin_lock_init(&m->metadata_metric_lock);
m->metadata_latency_sq_sum = 0;
m->metadata_latency_min = KTIME_MAX;
m->metadata_latency_max = 0;
m->total_metadatas = 0;
m->metadata_latency_sum = 0;
for (i = 0; i < METRIC_MAX; i++) {
metric = &m->metric[i];
spin_lock_init(&metric->lock);
metric->size_sum = 0;
metric->size_min = U64_MAX;
metric->size_max = 0;
metric->total = 0;
metric->latency_sum = 0;
metric->latency_sq_sum = 0;
metric->latency_min = KTIME_MAX;
metric->latency_max = 0;
}
atomic64_set(&m->opened_files, 0);
ret = percpu_counter_init(&m->opened_inodes, 0, GFP_KERNEL);
......@@ -338,9 +325,9 @@ static inline void __update_stdev(ktime_t total, ktime_t lsum,
*sq_sump += sq;
}
void ceph_update_read_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
void ceph_update_metrics(struct ceph_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
......@@ -348,63 +335,12 @@ void ceph_update_read_metrics(struct ceph_client_metric *m,
if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
return;
spin_lock(&m->read_metric_lock);
total = ++m->total_reads;
m->read_size_sum += size;
m->read_latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->read_size_min,
m->read_size_max,
size);
METRIC_UPDATE_MIN_MAX(m->read_latency_min,
m->read_latency_max,
lat);
__update_stdev(total, m->read_latency_sum,
&m->read_latency_sq_sum, lat);
spin_unlock(&m->read_metric_lock);
}
void ceph_update_write_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
if (unlikely(rc && rc != -ETIMEDOUT))
return;
spin_lock(&m->write_metric_lock);
total = ++m->total_writes;
m->write_size_sum += size;
m->write_latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->write_size_min,
m->write_size_max,
size);
METRIC_UPDATE_MIN_MAX(m->write_latency_min,
m->write_latency_max,
lat);
__update_stdev(total, m->write_latency_sum,
&m->write_latency_sq_sum, lat);
spin_unlock(&m->write_metric_lock);
}
void ceph_update_metadata_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
ktime_t total;
if (unlikely(rc && rc != -ENOENT))
return;
spin_lock(&m->metadata_metric_lock);
total = ++m->total_metadatas;
m->metadata_latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->metadata_latency_min,
m->metadata_latency_max,
lat);
__update_stdev(total, m->metadata_latency_sum,
&m->metadata_latency_sq_sum, lat);
spin_unlock(&m->metadata_metric_lock);
spin_lock(&m->lock);
total = ++m->total;
m->size_sum += size;
METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size);
m->latency_sum += lat;
METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat);
__update_stdev(total, m->latency_sum, &m->latency_sq_sum, lat);
spin_unlock(&m->lock);
}
......@@ -125,6 +125,26 @@ struct ceph_metric_head {
__le32 num; /* the number of metrics that will be sent */
} __packed;
enum metric_type {
METRIC_READ,
METRIC_WRITE,
METRIC_METADATA,
METRIC_COPYFROM,
METRIC_MAX
};
struct ceph_metric {
spinlock_t lock;
u64 total;
u64 size_sum;
u64 size_min;
u64 size_max;
ktime_t latency_sum;
ktime_t latency_sq_sum;
ktime_t latency_min;
ktime_t latency_max;
};
/* This is the global metrics */
struct ceph_client_metric {
atomic64_t total_dentries;
......@@ -135,32 +155,7 @@ struct ceph_client_metric {
struct percpu_counter i_caps_hit;
struct percpu_counter i_caps_mis;
spinlock_t read_metric_lock;
u64 total_reads;
u64 read_size_sum;
u64 read_size_min;
u64 read_size_max;
ktime_t read_latency_sum;
ktime_t read_latency_sq_sum;
ktime_t read_latency_min;
ktime_t read_latency_max;
spinlock_t write_metric_lock;
u64 total_writes;
u64 write_size_sum;
u64 write_size_min;
u64 write_size_max;
ktime_t write_latency_sum;
ktime_t write_latency_sq_sum;
ktime_t write_latency_min;
ktime_t write_latency_max;
spinlock_t metadata_metric_lock;
u64 total_metadatas;
ktime_t metadata_latency_sum;
ktime_t metadata_latency_sq_sum;
ktime_t metadata_latency_min;
ktime_t metadata_latency_max;
struct ceph_metric metric[METRIC_MAX];
/* The total number of directories and files that are opened */
atomic64_t opened_files;
......@@ -195,13 +190,36 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
percpu_counter_inc(&m->i_caps_mis);
}
extern void ceph_update_read_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc);
extern void ceph_update_write_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc);
extern void ceph_update_metadata_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc);
extern void ceph_update_metrics(struct ceph_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc);
static inline void ceph_update_read_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
{
ceph_update_metrics(&m->metric[METRIC_READ],
r_start, r_end, size, rc);
}
static inline void ceph_update_write_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
{
ceph_update_metrics(&m->metric[METRIC_WRITE],
r_start, r_end, size, rc);
}
static inline void ceph_update_metadata_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc)
{
ceph_update_metrics(&m->metric[METRIC_METADATA],
r_start, r_end, 0, rc);
}
static inline void ceph_update_copyfrom_metrics(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
unsigned int size, int rc)
{
ceph_update_metrics(&m->metric[METRIC_COPYFROM],
r_start, r_end, size, rc);
}
#endif /* _FS_CEPH_MDS_METRIC_H */
......@@ -52,8 +52,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry));
struct ceph_mon_client *monc = &fsc->client->monc;
struct ceph_statfs st;
u64 fsid;
int err;
int i, err;
u64 data_pool;
if (fsc->mdsc->mdsmap->m_num_data_pg_pools == 1) {
......@@ -99,12 +98,14 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
buf->f_namelen = NAME_MAX;
/* Must convert the fsid, for consistent values across arches */
buf->f_fsid.val[0] = 0;
mutex_lock(&monc->mutex);
fsid = le64_to_cpu(*(__le64 *)(&monc->monmap->fsid)) ^
le64_to_cpu(*((__le64 *)&monc->monmap->fsid + 1));
for (i = 0 ; i < sizeof(monc->monmap->fsid) / sizeof(__le32) ; ++i)
buf->f_fsid.val[0] ^= le32_to_cpu(((__le32 *)&monc->monmap->fsid)[i]);
mutex_unlock(&monc->mutex);
buf->f_fsid = u64_to_fsid(fsid);
/* fold the fs_cluster_id into the upper bits */
buf->f_fsid.val[1] = monc->fs_cluster_id;
return 0;
}
......@@ -577,8 +578,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
seq_show_option(m, "recover_session", "clean");
if (fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
seq_puts(m, ",nowsync");
if (!(fsopt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS))
seq_puts(m, ",wsync");
if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
seq_printf(m, ",wsize=%u", fsopt->wsize);
......@@ -842,7 +843,7 @@ static void __ceph_umount_begin(struct ceph_fs_client *fsc)
* ceph_umount_begin - initiate forced umount. Tear down the
* mount, skipping steps that may hang while waiting for server(s).
*/
static void ceph_umount_begin(struct super_block *sb)
void ceph_umount_begin(struct super_block *sb)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
......
......@@ -48,7 +48,8 @@
#define CEPH_MOUNT_OPT_DEFAULT \
(CEPH_MOUNT_OPT_DCACHE | \
CEPH_MOUNT_OPT_NOCOPYFROM)
CEPH_MOUNT_OPT_NOCOPYFROM | \
CEPH_MOUNT_OPT_ASYNC_DIROPS)
#define ceph_set_mount_opt(fsc, opt) \
(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt
......@@ -128,9 +129,9 @@ struct ceph_fs_client {
struct dentry *debugfs_congestion_kb;
struct dentry *debugfs_bdi;
struct dentry *debugfs_mdsc, *debugfs_mdsmap;
struct dentry *debugfs_metric;
struct dentry *debugfs_status;
struct dentry *debugfs_mds_sessions;
struct dentry *debugfs_metrics_dir;
#endif
#ifdef CONFIG_CEPH_FSCACHE
......@@ -580,6 +581,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_ODIRECT (1 << 11) /* inode in direct I/O mode */
#define CEPH_ASYNC_CREATE_BIT (12) /* async create in flight for this */
#define CEPH_I_ASYNC_CREATE (1 << CEPH_ASYNC_CREATE_BIT)
#define CEPH_I_SHUTDOWN (1 << 13) /* inode is no longer usable */
/*
* Masks of ceph inode work.
......@@ -939,6 +941,7 @@ extern void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
struct ceph_snapid_map *sm);
extern void ceph_trim_snapid_map(struct ceph_mds_client *mdsc);
extern void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc);
void ceph_umount_begin(struct super_block *sb);
/*
......@@ -1027,6 +1030,16 @@ extern int ceph_setattr(struct user_namespace *mnt_userns,
extern int ceph_getattr(struct user_namespace *mnt_userns,
const struct path *path, struct kstat *stat,
u32 request_mask, unsigned int flags);
void ceph_inode_shutdown(struct inode *inode);
static inline bool ceph_inode_is_shutdown(struct inode *inode)
{
unsigned long flags = READ_ONCE(ceph_inode(inode)->i_ceph_flags);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
int state = READ_ONCE(fsc->mount_state);
return (flags & CEPH_I_SHUTDOWN) || state >= CEPH_MOUNT_SHUTDOWN;
}
/* xattr.c */
int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
......@@ -1198,6 +1211,7 @@ extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
extern int ceph_pool_perm_check(struct inode *inode, int need);
extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate);
/* file.c */
extern const struct file_operations ceph_file_fops;
......
......@@ -302,6 +302,8 @@ enum {
CEPH_SESSION_REQUEST_FLUSH_MDLOG,
};
#define CEPH_SESSION_BLOCKLISTED (1 << 0) /* session blocklisted */
extern const char *ceph_session_op_name(int op);
struct ceph_mds_session_head {
......
......@@ -475,6 +475,14 @@ extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
u64 expected_object_size,
u64 expected_write_size,
u32 flags);
extern int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
u32 dst_fadvise_flags,
u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_snap_context *snapc,
......@@ -515,17 +523,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
struct page *req_page, size_t req_len,
struct page **resp_pages, size_t *resp_len);
int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 dst_fadvise_flags,
u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags);
/* watch/notify */
struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client *osdc,
......
......@@ -1153,12 +1153,11 @@ static int build_initial_monmap(struct ceph_mon_client *monc)
int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
{
int err = 0;
int err;
dout("init\n");
memset(monc, 0, sizeof(*monc));
monc->client = cl;
monc->monmap = NULL;
mutex_init(&monc->mutex);
err = build_initial_monmap(monc);
......
......@@ -5310,14 +5310,14 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
}
static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
u32 dst_fadvise_flags,
u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags)
int osd_req_op_copy_from_init(struct ceph_osd_request *req,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
u32 dst_fadvise_flags,
u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags)
{
struct ceph_osd_req_op *op;
struct page **pages;
......@@ -5346,49 +5346,7 @@ static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
op->indata_len, 0, false, true);
return 0;
}
int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
u64 src_snapid, u64 src_version,
struct ceph_object_id *src_oid,
struct ceph_object_locator *src_oloc,
u32 src_fadvise_flags,
struct ceph_object_id *dst_oid,
struct ceph_object_locator *dst_oloc,
u32 dst_fadvise_flags,
u32 truncate_seq, u64 truncate_size,
u8 copy_from_flags)
{
struct ceph_osd_request *req;
int ret;
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
if (!req)
return -ENOMEM;
req->r_flags = CEPH_OSD_FLAG_WRITE;
ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
ceph_oid_copy(&req->r_t.base_oid, dst_oid);
ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
src_oloc, src_fadvise_flags,
dst_fadvise_flags, truncate_seq,
truncate_size, copy_from_flags);
if (ret)
goto out;
ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
goto out;
ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req);
out:
ceph_osdc_put_request(req);
return ret;
}
EXPORT_SYMBOL(ceph_osdc_copy_from);
EXPORT_SYMBOL(osd_req_op_copy_from_init);
int __init ceph_osdc_setup(void)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment