Commit 4533f6e2 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph changes from Sage Weil:
 "On the RBD side, there is a conversion to blk-mq from Christoph,
  several long-standing bug fixes from Ilya, and some cleanup from
  Rickard Strandqvist.

  On the CephFS side there is a long list of fixes from Zheng, including
  improved session handling, a few IO path fixes, some dcache management
  correctness fixes, and several blocking while !TASK_RUNNING fixes.

  The core code gets a few cleanups and Chaitanya has added support for
  TCP_NODELAY (which has been used on the server side for ages but we
  somehow missed on the kernel client).

  There is also an update to MAINTAINERS to fix up some email addresses
  and reflect that Ilya and Zheng are doing most of the maintenance for
  RBD and CephFS these days.  Do not be surprised to see a pull request
  come from one of them in the future if I am unavailable for some
  reason"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits)
  MAINTAINERS: update Ceph and RBD maintainers
  libceph: kfree() in put_osd() shouldn't depend on authorizer
  libceph: fix double __remove_osd() problem
  rbd: convert to blk-mq
  ceph: return error for traceless reply race
  ceph: fix dentry leaks
  ceph: re-send requests when MDS enters reconnecting stage
  ceph: show nocephx_require_signatures and notcp_nodelay options
  libceph: tcp_nodelay support
  rbd: do not treat standalone as flatten
  ceph: fix atomic_open snapdir
  ceph: properly mark empty directory as complete
  client: include kernel version in client metadata
  ceph: provide seperate {inode,file}_operations for snapdir
  ceph: fix request time stamp encoding
  ceph: fix reading inline data when i_size > PAGE_SIZE
  ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_close_sessions)
  ceph: avoid block operation when !TASK_RUNNING (ceph_get_caps)
  ceph: avoid block operation when !TASK_RUNNING (ceph_mdsc_sync)
  rbd: fix error paths in rbd_dev_refresh()
  ...
parents 89d3fa45 0f5417ce
...@@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell* ...@@ -2433,7 +2433,8 @@ F: arch/powerpc/oprofile/*cell*
F: arch/powerpc/platforms/cell/ F: arch/powerpc/platforms/cell/
CEPH DISTRIBUTED FILE SYSTEM CLIENT CEPH DISTRIBUTED FILE SYSTEM CLIENT
M: Sage Weil <sage@inktank.com> M: Yan, Zheng <zyan@redhat.com>
M: Sage Weil <sage@redhat.com>
L: ceph-devel@vger.kernel.org L: ceph-devel@vger.kernel.org
W: http://ceph.com/ W: http://ceph.com/
T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git T: git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
...@@ -7998,8 +7999,8 @@ S: Supported ...@@ -7998,8 +7999,8 @@ S: Supported
F: drivers/net/wireless/ath/wcn36xx/ F: drivers/net/wireless/ath/wcn36xx/
RADOS BLOCK DEVICE (RBD) RADOS BLOCK DEVICE (RBD)
M: Yehuda Sadeh <yehuda@inktank.com> M: Ilya Dryomov <idryomov@gmail.com>
M: Sage Weil <sage@inktank.com> M: Sage Weil <sage@redhat.com>
M: Alex Elder <elder@kernel.org> M: Alex Elder <elder@kernel.org>
M: ceph-devel@vger.kernel.org M: ceph-devel@vger.kernel.org
W: http://ceph.com/ W: http://ceph.com/
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include <linux/kernel.h> #include <linux/kernel.h>
#include <linux/device.h> #include <linux/device.h>
#include <linux/module.h> #include <linux/module.h>
#include <linux/blk-mq.h>
#include <linux/fs.h> #include <linux/fs.h>
#include <linux/blkdev.h> #include <linux/blkdev.h>
#include <linux/slab.h> #include <linux/slab.h>
...@@ -340,9 +341,7 @@ struct rbd_device { ...@@ -340,9 +341,7 @@ struct rbd_device {
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
struct list_head rq_queue; /* incoming rq queue */
spinlock_t lock; /* queue, flags, open_count */ spinlock_t lock; /* queue, flags, open_count */
struct work_struct rq_work;
struct rbd_image_header header; struct rbd_image_header header;
unsigned long flags; /* possibly lock protected */ unsigned long flags; /* possibly lock protected */
...@@ -360,6 +359,9 @@ struct rbd_device { ...@@ -360,6 +359,9 @@ struct rbd_device {
atomic_t parent_ref; atomic_t parent_ref;
struct rbd_device *parent; struct rbd_device *parent;
/* Block layer tags. */
struct blk_mq_tag_set tag_set;
/* protects updating the header */ /* protects updating the header */
struct rw_semaphore header_rwsem; struct rw_semaphore header_rwsem;
...@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, ...@@ -1817,7 +1819,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
/* /*
* We support a 64-bit length, but ultimately it has to be * We support a 64-bit length, but ultimately it has to be
* passed to blk_end_request(), which takes an unsigned int. * passed to the block layer, which just supports a 32-bit
* length field.
*/ */
obj_request->xferred = osd_req->r_reply_op_len[0]; obj_request->xferred = osd_req->r_reply_op_len[0];
rbd_assert(obj_request->xferred < (u64)UINT_MAX); rbd_assert(obj_request->xferred < (u64)UINT_MAX);
...@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) ...@@ -2275,7 +2278,10 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
more = obj_request->which < img_request->obj_request_count - 1; more = obj_request->which < img_request->obj_request_count - 1;
} else { } else {
rbd_assert(img_request->rq != NULL); rbd_assert(img_request->rq != NULL);
more = blk_end_request(img_request->rq, result, xferred);
more = blk_update_request(img_request->rq, result, xferred);
if (!more)
__blk_mq_end_request(img_request->rq, result);
} }
return more; return more;
...@@ -3304,8 +3310,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ...@@ -3304,8 +3310,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
return ret; return ret;
} }
static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) static void rbd_queue_workfn(struct work_struct *work)
{ {
struct request *rq = blk_mq_rq_from_pdu(work);
struct rbd_device *rbd_dev = rq->q->queuedata;
struct rbd_img_request *img_request; struct rbd_img_request *img_request;
struct ceph_snap_context *snapc = NULL; struct ceph_snap_context *snapc = NULL;
u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
...@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) ...@@ -3314,6 +3322,13 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
u64 mapping_size; u64 mapping_size;
int result; int result;
if (rq->cmd_type != REQ_TYPE_FS) {
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type);
result = -EIO;
goto err;
}
if (rq->cmd_flags & REQ_DISCARD) if (rq->cmd_flags & REQ_DISCARD)
op_type = OBJ_OP_DISCARD; op_type = OBJ_OP_DISCARD;
else if (rq->cmd_flags & REQ_WRITE) else if (rq->cmd_flags & REQ_WRITE)
...@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) ...@@ -3359,6 +3374,8 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
goto err_rq; /* Shouldn't happen */ goto err_rq; /* Shouldn't happen */
} }
blk_mq_start_request(rq);
down_read(&rbd_dev->header_rwsem); down_read(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size; mapping_size = rbd_dev->mapping.size;
if (op_type != OBJ_OP_READ) { if (op_type != OBJ_OP_READ) {
...@@ -3404,53 +3421,18 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) ...@@ -3404,53 +3421,18 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
rbd_warn(rbd_dev, "%s %llx at %llx result %d", rbd_warn(rbd_dev, "%s %llx at %llx result %d",
obj_op_name(op_type), length, offset, result); obj_op_name(op_type), length, offset, result);
ceph_put_snap_context(snapc); ceph_put_snap_context(snapc);
blk_end_request_all(rq, result); err:
blk_mq_end_request(rq, result);
} }
static void rbd_request_workfn(struct work_struct *work) static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
const struct blk_mq_queue_data *bd)
{ {
struct rbd_device *rbd_dev = struct request *rq = bd->rq;
container_of(work, struct rbd_device, rq_work); struct work_struct *work = blk_mq_rq_to_pdu(rq);
struct request *rq, *next;
LIST_HEAD(requests);
spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */ queue_work(rbd_wq, work);
list_splice_init(&rbd_dev->rq_queue, &requests); return BLK_MQ_RQ_QUEUE_OK;
spin_unlock_irq(&rbd_dev->lock);
list_for_each_entry_safe(rq, next, &requests, queuelist) {
list_del_init(&rq->queuelist);
rbd_handle_request(rbd_dev, rq);
}
}
/*
* Called with q->queue_lock held and interrupts disabled, possibly on
* the way to schedule(). Do not sleep here!
*/
static void rbd_request_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
struct request *rq;
int queued = 0;
rbd_assert(rbd_dev);
while ((rq = blk_fetch_request(q))) {
/* Ignore any non-FS requests that filter through. */
if (rq->cmd_type != REQ_TYPE_FS) {
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type);
__blk_end_request_all(rq, 0);
continue;
}
list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
queued++;
}
if (queued)
queue_work(rbd_wq, &rbd_dev->rq_work);
} }
/* /*
...@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) ...@@ -3511,6 +3493,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
del_gendisk(disk); del_gendisk(disk);
if (disk->queue) if (disk->queue)
blk_cleanup_queue(disk->queue); blk_cleanup_queue(disk->queue);
blk_mq_free_tag_set(&rbd_dev->tag_set);
} }
put_disk(disk); put_disk(disk);
} }
...@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) ...@@ -3694,7 +3677,7 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
ret = rbd_dev_header_info(rbd_dev); ret = rbd_dev_header_info(rbd_dev);
if (ret) if (ret)
return ret; goto out;
/* /*
* If there is a parent, see if it has disappeared due to the * If there is a parent, see if it has disappeared due to the
...@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev) ...@@ -3703,30 +3686,46 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
if (rbd_dev->parent) { if (rbd_dev->parent) {
ret = rbd_dev_v2_parent_info(rbd_dev); ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret) if (ret)
return ret; goto out;
} }
if (rbd_dev->spec->snap_id == CEPH_NOSNAP) { if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
if (rbd_dev->mapping.size != rbd_dev->header.image_size)
rbd_dev->mapping.size = rbd_dev->header.image_size; rbd_dev->mapping.size = rbd_dev->header.image_size;
} else { } else {
/* validate mapped snapshot's EXISTS flag */ /* validate mapped snapshot's EXISTS flag */
rbd_exists_validate(rbd_dev); rbd_exists_validate(rbd_dev);
} }
out:
up_write(&rbd_dev->header_rwsem); up_write(&rbd_dev->header_rwsem);
if (!ret && mapping_size != rbd_dev->mapping.size)
if (mapping_size != rbd_dev->mapping.size)
rbd_dev_update_size(rbd_dev); rbd_dev_update_size(rbd_dev);
return ret;
}
static int rbd_init_request(void *data, struct request *rq,
unsigned int hctx_idx, unsigned int request_idx,
unsigned int numa_node)
{
struct work_struct *work = blk_mq_rq_to_pdu(rq);
INIT_WORK(work, rbd_queue_workfn);
return 0; return 0;
} }
static struct blk_mq_ops rbd_mq_ops = {
.queue_rq = rbd_queue_rq,
.map_queue = blk_mq_map_queue,
.init_request = rbd_init_request,
};
static int rbd_init_disk(struct rbd_device *rbd_dev) static int rbd_init_disk(struct rbd_device *rbd_dev)
{ {
struct gendisk *disk; struct gendisk *disk;
struct request_queue *q; struct request_queue *q;
u64 segment_size; u64 segment_size;
int err;
/* create gendisk info */ /* create gendisk info */
disk = alloc_disk(single_major ? disk = alloc_disk(single_major ?
...@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) ...@@ -3744,10 +3743,25 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
disk->fops = &rbd_bd_ops; disk->fops = &rbd_bd_ops;
disk->private_data = rbd_dev; disk->private_data = rbd_dev;
q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
if (!q) rbd_dev->tag_set.ops = &rbd_mq_ops;
rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
rbd_dev->tag_set.flags =
BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
rbd_dev->tag_set.nr_hw_queues = 1;
rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
if (err)
goto out_disk; goto out_disk;
q = blk_mq_init_queue(&rbd_dev->tag_set);
if (IS_ERR(q)) {
err = PTR_ERR(q);
goto out_tag_set;
}
/* We use the default size, but let's be explicit about it. */ /* We use the default size, but let's be explicit about it. */
blk_queue_physical_block_size(q, SECTOR_SIZE); blk_queue_physical_block_size(q, SECTOR_SIZE);
...@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev) ...@@ -3773,10 +3787,11 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
rbd_dev->disk = disk; rbd_dev->disk = disk;
return 0; return 0;
out_tag_set:
blk_mq_free_tag_set(&rbd_dev->tag_set);
out_disk: out_disk:
put_disk(disk); put_disk(disk);
return err;
return -ENOMEM;
} }
/* /*
...@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, ...@@ -4033,8 +4048,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL; return NULL;
spin_lock_init(&rbd_dev->lock); spin_lock_init(&rbd_dev->lock);
INIT_LIST_HEAD(&rbd_dev->rq_queue);
INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
rbd_dev->flags = 0; rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0); atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node); INIT_LIST_HEAD(&rbd_dev->node);
...@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) ...@@ -4274,32 +4287,22 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
} }
/* /*
* We always update the parent overlap. If it's zero we * We always update the parent overlap. If it's zero we issue
* treat it specially. * a warning, as we will proceed as if there was no parent.
*/ */
rbd_dev->parent_overlap = overlap;
if (!overlap) { if (!overlap) {
/* A null parent_spec indicates it's the initial probe */
if (parent_spec) { if (parent_spec) {
/* /* refresh, careful to warn just once */
* The overlap has become zero, so the clone if (rbd_dev->parent_overlap)
* must have been resized down to 0 at some rbd_warn(rbd_dev,
* point. Treat this the same as a flatten. "clone now standalone (overlap became 0)");
*/
rbd_dev_parent_put(rbd_dev);
pr_info("%s: clone image now standalone\n",
rbd_dev->disk->disk_name);
} else { } else {
/* /* initial probe */
* For the initial probe, if we find the rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
* overlap is zero we just pretend there was
* no parent image.
*/
rbd_warn(rbd_dev, "ignoring parent with overlap 0");
} }
} }
rbd_dev->parent_overlap = overlap;
out: out:
ret = 0; ret = 0;
out_err: out_err:
...@@ -4770,36 +4773,6 @@ static inline size_t next_token(const char **buf) ...@@ -4770,36 +4773,6 @@ static inline size_t next_token(const char **buf)
return strcspn(*buf, spaces); /* Return token length */ return strcspn(*buf, spaces); /* Return token length */
} }
/*
* Finds the next token in *buf, and if the provided token buffer is
* big enough, copies the found token into it. The result, if
* copied, is guaranteed to be terminated with '\0'. Note that *buf
* must be terminated with '\0' on entry.
*
* Returns the length of the token found (not including the '\0').
* Return value will be 0 if no token is found, and it will be >=
* token_size if the token would not fit.
*
* The *buf pointer will be updated to point beyond the end of the
* found token. Note that this occurs even if the token buffer is
* too small to hold it.
*/
static inline size_t copy_token(const char **buf,
char *token,
size_t token_size)
{
size_t len;
len = next_token(buf);
if (len < token_size) {
memcpy(token, *buf, len);
*(token + len) = '\0';
}
*buf += len;
return len;
}
/* /*
* Finds the next token in *buf, dynamically allocates a buffer big * Finds the next token in *buf, dynamically allocates a buffer big
* enough to hold a copy of it, and copies the token into the new * enough to hold a copy of it, and copies the token into the new
......
...@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode, ...@@ -40,20 +40,6 @@ static inline void ceph_set_cached_acl(struct inode *inode,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} }
static inline struct posix_acl *ceph_get_cached_acl(struct inode *inode,
int type)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct posix_acl *acl = ACL_NOT_CACHED;
spin_lock(&ci->i_ceph_lock);
if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0))
acl = get_cached_acl(inode, type);
spin_unlock(&ci->i_ceph_lock);
return acl;
}
struct posix_acl *ceph_get_acl(struct inode *inode, int type) struct posix_acl *ceph_get_acl(struct inode *inode, int type)
{ {
int size; int size;
......
...@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page) ...@@ -196,17 +196,22 @@ static int readpage_nounlock(struct file *filp, struct page *page)
u64 len = PAGE_CACHE_SIZE; u64 len = PAGE_CACHE_SIZE;
if (off >= i_size_read(inode)) { if (off >= i_size_read(inode)) {
zero_user_segment(page, err, PAGE_CACHE_SIZE); zero_user_segment(page, 0, PAGE_CACHE_SIZE);
SetPageUptodate(page); SetPageUptodate(page);
return 0; return 0;
} }
if (ci->i_inline_version != CEPH_INLINE_NONE) {
/* /*
* Uptodate inline data should have been added into page cache * Uptodate inline data should have been added
* while getting Fcr caps. * into page cache while getting Fcr caps.
*/ */
if (ci->i_inline_version != CEPH_INLINE_NONE) if (off == 0)
return -EINVAL; return -EINVAL;
zero_user_segment(page, 0, PAGE_CACHE_SIZE);
SetPageUptodate(page);
return 0;
}
err = ceph_readpage_from_fscache(inode, page); err = ceph_readpage_from_fscache(inode, page);
if (err == 0) if (err == 0)
......
...@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode, ...@@ -577,7 +577,6 @@ void ceph_add_cap(struct inode *inode,
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
realmino); realmino);
if (realm) { if (realm) {
ceph_get_snap_realm(mdsc, realm);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
ci->i_snap_realm = realm; ci->i_snap_realm = realm;
list_add(&ci->i_snap_realm_item, list_add(&ci->i_snap_realm_item,
...@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode, ...@@ -1451,8 +1450,8 @@ static int __mark_caps_flushing(struct inode *inode,
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item); list_del_init(&ci->i_dirty_item);
ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
if (list_empty(&ci->i_flushing_item)) { if (list_empty(&ci->i_flushing_item)) {
ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
mdsc->num_cap_flushing++; mdsc->num_cap_flushing++;
dout(" inode %p now flushing seq %lld\n", inode, dout(" inode %p now flushing seq %lld\n", inode,
...@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) ...@@ -2073,17 +2072,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS. * requested from the MDS.
*/ */
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page, loff_t endoff, int *got, int *check_max, int *err)
int *check_max, int *err)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
int ret = 0; int ret = 0;
int have, implemented, _got = 0; int have, implemented;
int file_wanted; int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode, dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want)); ceph_cap_string(need), ceph_cap_string(want));
again:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */ /* make sure file is actually open */
...@@ -2138,50 +2136,34 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2138,50 +2136,34 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
inode, ceph_cap_string(have), ceph_cap_string(not), inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking)); ceph_cap_string(revoking));
if ((revoking & not) == 0) { if ((revoking & not) == 0) {
_got = need | (have & want); *got = need | (have & want);
__take_cap_refs(ci, _got); __take_cap_refs(ci, *got);
ret = 1; ret = 1;
} }
} else { } else {
int session_readonly = false;
if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
struct ceph_mds_session *s = ci->i_auth_cap->session;
spin_lock(&s->s_cap_lock);
session_readonly = s->s_readonly;
spin_unlock(&s->s_cap_lock);
}
if (session_readonly) {
dout("get_cap_refs %p needed %s but mds%d readonly\n",
inode, ceph_cap_string(need), ci->i_auth_cap->mds);
*err = -EROFS;
ret = 1;
goto out_unlock;
}
dout("get_cap_refs %p have %s needed %s\n", inode, dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need)); ceph_cap_string(have), ceph_cap_string(need));
} }
out_unlock: out_unlock:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(inode) > 0) {
int ret1;
struct page *page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
goto out;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while holding
* caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/* getattr request will bring inline data into page cache */
ret1 = __ceph_do_getattr(inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 >= 0) {
ret = 0;
goto again;
}
*err = ret1;
ret = 1;
}
out:
dout("get_cap_refs %p ret %d got %s\n", inode, dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(_got)); ret, ceph_cap_string(*got));
*got = _got;
return ret; return ret;
} }
...@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff) ...@@ -2221,22 +2203,52 @@ static void check_max_size(struct inode *inode, loff_t endoff)
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page) loff_t endoff, int *got, struct page **pinned_page)
{ {
int check_max, ret, err; int _got, check_max, ret, err = 0;
retry: retry:
if (endoff > 0) if (endoff > 0)
check_max_size(&ci->vfs_inode, endoff); check_max_size(&ci->vfs_inode, endoff);
_got = 0;
check_max = 0; check_max = 0;
err = 0;
ret = wait_event_interruptible(ci->i_cap_wq, ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff, try_get_cap_refs(ci, need, want, endoff,
got, pinned_page, &_got, &check_max, &err));
&check_max, &err));
if (err) if (err)
ret = err; ret = err;
if (ret < 0)
return ret;
if (check_max) if (check_max)
goto retry; goto retry;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(&ci->vfs_inode) > 0) {
struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
goto out;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while holding
* caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/* getattr request will bring inline data into page cache */
ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret < 0)
return ret; return ret;
goto retry;
}
out:
*got = _got;
return 0;
} }
/* /*
...@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode) ...@@ -2432,13 +2444,13 @@ static void invalidate_aliases(struct inode *inode)
*/ */
static void handle_cap_grant(struct ceph_mds_client *mdsc, static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant, struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len,
u64 inline_version, u64 inline_version,
void *inline_data, int inline_len, void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf, struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap, int issued) struct ceph_cap *cap, int issued)
__releases(ci->i_ceph_lock) __releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds; int mds = session->s_mds;
...@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ...@@ -2639,10 +2651,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace,
snaptrace + snaptrace_len, false);
downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode); kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued) if (newcaps & ~issued)
...@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3052,6 +3060,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap; struct ceph_cap *cap;
struct ceph_mds_caps *h; struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL; struct ceph_mds_cap_peer *peer = NULL;
struct ceph_snap_realm *realm;
int mds = session->s_mds; int mds = session->s_mds;
int op, issued; int op, issued;
u32 seq, mseq; u32 seq, mseq;
...@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3153,11 +3162,23 @@ void ceph_handle_caps(struct ceph_mds_session *session,
goto done_unlocked; goto done_unlocked;
case CEPH_CAP_OP_IMPORT: case CEPH_CAP_OP_IMPORT:
realm = NULL;
if (snaptrace_len) {
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace,
snaptrace + snaptrace_len,
false, &realm);
downgrade_write(&mdsc->snap_rwsem);
} else {
down_read(&mdsc->snap_rwsem);
}
handle_cap_import(mdsc, inode, h, peer, session, handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued); &cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len, inline_version, inline_data, inline_len,
msg->middle, session, cap, issued); msg->middle, session, cap, issued);
if (realm)
ceph_put_snap_realm(mdsc, realm);
goto done_unlocked; goto done_unlocked;
} }
...@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3177,7 +3198,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT: case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued); __ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci); issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, NULL, 0, handle_cap_grant(mdsc, inode, h,
inline_version, inline_data, inline_len, inline_version, inline_data, inline_len,
msg->middle, session, cap, issued); msg->middle, session, cap, issued);
goto done_unlocked; goto done_unlocked;
......
...@@ -26,8 +26,6 @@ ...@@ -26,8 +26,6 @@
* point by name. * point by name.
*/ */
const struct inode_operations ceph_dir_iops;
const struct file_operations ceph_dir_fops;
const struct dentry_operations ceph_dentry_ops; const struct dentry_operations ceph_dentry_ops;
/* /*
...@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) ...@@ -672,13 +670,17 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
/* /*
* We created the item, then did a lookup, and found * We created the item, then did a lookup, and found
* it was already linked to another inode we already * it was already linked to another inode we already
* had in our cache (and thus got spliced). Link our * had in our cache (and thus got spliced). To not
* dentry to that inode, but don't hash it, just in * confuse VFS (especially when inode is a directory),
* case the VFS wants to dereference it. * we don't link our dentry to that inode, return an
* error instead.
*
* This event should be rare and it happens only when
* we talk to old MDS. Recent MDS does not send traceless
* reply for request that creates new inode.
*/ */
BUG_ON(!result->d_inode); d_drop(result);
d_instantiate(dentry, result->d_inode); return -ESTALE;
return 0;
} }
return PTR_ERR(result); return PTR_ERR(result);
} }
...@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = { ...@@ -1335,6 +1337,13 @@ const struct file_operations ceph_dir_fops = {
.fsync = ceph_dir_fsync, .fsync = ceph_dir_fsync,
}; };
const struct file_operations ceph_snapdir_fops = {
.iterate = ceph_readdir,
.llseek = ceph_dir_llseek,
.open = ceph_open,
.release = ceph_release,
};
const struct inode_operations ceph_dir_iops = { const struct inode_operations ceph_dir_iops = {
.lookup = ceph_lookup, .lookup = ceph_lookup,
.permission = ceph_permission, .permission = ceph_permission,
...@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = { ...@@ -1357,6 +1366,14 @@ const struct inode_operations ceph_dir_iops = {
.atomic_open = ceph_atomic_open, .atomic_open = ceph_atomic_open,
}; };
const struct inode_operations ceph_snapdir_iops = {
.lookup = ceph_lookup,
.permission = ceph_permission,
.getattr = ceph_getattr,
.mkdir = ceph_mkdir,
.rmdir = ceph_unlink,
};
const struct dentry_operations ceph_dentry_ops = { const struct dentry_operations ceph_dentry_ops = {
.d_revalidate = ceph_d_revalidate, .d_revalidate = ceph_d_revalidate,
.d_release = ceph_d_release, .d_release = ceph_d_release,
......
...@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, ...@@ -275,10 +275,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
err = ceph_mdsc_do_request(mdsc, err = ceph_mdsc_do_request(mdsc,
(flags & (O_CREAT|O_TRUNC)) ? dir : NULL, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
req); req);
err = ceph_handle_snapdir(req, dentry, err);
if (err) if (err)
goto out_req; goto out_req;
err = ceph_handle_snapdir(req, dentry, err);
if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry); err = ceph_handle_notrace_create(dir, dentry);
...@@ -392,13 +392,14 @@ static int striped_read(struct inode *inode, ...@@ -392,13 +392,14 @@ static int striped_read(struct inode *inode,
if (ret >= 0) { if (ret >= 0) {
int didpages; int didpages;
if (was_short && (pos + ret < inode->i_size)) { if (was_short && (pos + ret < inode->i_size)) {
u64 tmp = min(this_len - ret, int zlen = min(this_len - ret,
inode->i_size - pos - ret); inode->i_size - pos - ret);
int zoff = (o_direct ? buf_align : io_align) +
read + ret;
dout(" zero gap %llu to %llu\n", dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + tmp); pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(page_align + read + ret, ceph_zero_page_vector_range(zoff, zlen, pages);
tmp, pages); ret += zlen;
ret += tmp;
} }
didpages = (page_align + ret) >> PAGE_CACHE_SHIFT; didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
...@@ -878,23 +879,29 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -878,23 +879,29 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
i_size = i_size_read(inode); i_size = i_size_read(inode);
if (retry_op == READ_INLINE) { if (retry_op == READ_INLINE) {
/* does not support inline data > PAGE_SIZE */ BUG_ON(ret > 0 || read > 0);
if (i_size > PAGE_CACHE_SIZE) { if (iocb->ki_pos < i_size &&
ret = -EIO; iocb->ki_pos < PAGE_CACHE_SIZE) {
} else if (iocb->ki_pos < i_size) {
loff_t end = min_t(loff_t, i_size, loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len); iocb->ki_pos + len);
end = min_t(loff_t, end, PAGE_CACHE_SIZE);
if (statret < end) if (statret < end)
zero_user_segment(page, statret, end); zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page, ret = copy_page_to_iter(page,
iocb->ki_pos & ~PAGE_MASK, iocb->ki_pos & ~PAGE_MASK,
end - iocb->ki_pos, to); end - iocb->ki_pos, to);
iocb->ki_pos += ret; iocb->ki_pos += ret;
} else { read += ret;
ret = 0; }
if (iocb->ki_pos < i_size && read < len) {
size_t zlen = min_t(size_t, len - read,
i_size - iocb->ki_pos);
ret = iov_iter_zero(zlen, to);
iocb->ki_pos += ret;
read += ret;
} }
__free_pages(page, 0); __free_pages(page, 0);
return ret; return read;
} }
/* hit EOF or hole? */ /* hit EOF or hole? */
......
...@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent) ...@@ -82,8 +82,8 @@ struct inode *ceph_get_snapdir(struct inode *parent)
inode->i_mode = parent->i_mode; inode->i_mode = parent->i_mode;
inode->i_uid = parent->i_uid; inode->i_uid = parent->i_uid;
inode->i_gid = parent->i_gid; inode->i_gid = parent->i_gid;
inode->i_op = &ceph_dir_iops; inode->i_op = &ceph_snapdir_iops;
inode->i_fop = &ceph_dir_fops; inode->i_fop = &ceph_snapdir_fops;
ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */ ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
ci->i_rbytes = 0; ci->i_rbytes = 0;
return inode; return inode;
...@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -838,30 +838,31 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_vinop(inode), inode->i_mode); ceph_vinop(inode), inode->i_mode);
} }
/* set dir completion flag? */
if (S_ISDIR(inode->i_mode) &&
ci->i_files == 0 && ci->i_subdirs == 0 &&
ceph_snap(inode) == CEPH_NOSNAP &&
(le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
ci->i_ordered_count);
}
/* were we issued a capability? */ /* were we issued a capability? */
if (info->cap.caps) { if (info->cap.caps) {
if (ceph_snap(inode) == CEPH_NOSNAP) { if (ceph_snap(inode) == CEPH_NOSNAP) {
unsigned caps = le32_to_cpu(info->cap.caps);
ceph_add_cap(inode, session, ceph_add_cap(inode, session,
le64_to_cpu(info->cap.cap_id), le64_to_cpu(info->cap.cap_id),
cap_fmode, cap_fmode, caps,
le32_to_cpu(info->cap.caps),
le32_to_cpu(info->cap.wanted), le32_to_cpu(info->cap.wanted),
le32_to_cpu(info->cap.seq), le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq), le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm), le64_to_cpu(info->cap.realm),
info->cap.flags, &new_cap); info->cap.flags, &new_cap);
/* set dir completion flag? */
if (S_ISDIR(inode->i_mode) &&
ci->i_files == 0 && ci->i_subdirs == 0 &&
(caps & CEPH_CAP_FILE_SHARED) &&
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci,
atomic_read(&ci->i_release_count),
ci->i_ordered_count);
}
wake = true; wake = true;
} else { } else {
dout(" %p got snap_caps %s\n", inode, dout(" %p got snap_caps %s\n", inode,
...@@ -1446,12 +1447,14 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1446,12 +1447,14 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
} }
if (!dn->d_inode) { if (!dn->d_inode) {
dn = splice_dentry(dn, in, NULL); struct dentry *realdn = splice_dentry(dn, in, NULL);
if (IS_ERR(dn)) { if (IS_ERR(realdn)) {
err = PTR_ERR(dn); err = PTR_ERR(realdn);
d_drop(dn);
dn = NULL; dn = NULL;
goto next_item; goto next_item;
} }
dn = realdn;
} }
di = dn->d_fsdata; di = dn->d_fsdata;
......
...@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ...@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
mdsc->max_sessions = newmax; mdsc->max_sessions = newmax;
} }
mdsc->sessions[mds] = s; mdsc->sessions[mds] = s;
atomic_inc(&mdsc->num_sessions);
atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
...@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, ...@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
mdsc->sessions[s->s_mds] = NULL; mdsc->sessions[s->s_mds] = NULL;
ceph_con_close(&s->s_con); ceph_con_close(&s->s_con);
ceph_put_mds_session(s); ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
} }
/* /*
...@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 ...@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
struct ceph_options *opt = mdsc->fsc->client->options; struct ceph_options *opt = mdsc->fsc->client->options;
void *p; void *p;
const char* metadata[3][2] = { const char* metadata[][2] = {
{"hostname", utsname()->nodename}, {"hostname", utsname()->nodename},
{"kernel_version", utsname()->release},
{"entity_id", opt->name ? opt->name : ""}, {"entity_id", opt->name ? opt->name : ""},
{NULL, NULL} {NULL, NULL}
}; };
...@@ -1464,19 +1467,33 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc, ...@@ -1464,19 +1467,33 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
return err; return err;
} }
static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int ret;
spin_lock(&ci->i_ceph_lock);
if (ci->i_flushing_caps)
ret = ci->i_cap_flush_seq >= want_flush_seq;
else
ret = 1;
spin_unlock(&ci->i_ceph_lock);
return ret;
}
/* /*
* flush all dirty inode data to disk. * flush all dirty inode data to disk.
* *
* returns true if we've flushed through want_flush_seq * returns true if we've flushed through want_flush_seq
*/ */
static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
{ {
int mds, ret = 1; int mds;
dout("check_cap_flush want %lld\n", want_flush_seq); dout("check_cap_flush want %lld\n", want_flush_seq);
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { for (mds = 0; mds < mdsc->max_sessions; mds++) {
struct ceph_mds_session *session = mdsc->sessions[mds]; struct ceph_mds_session *session = mdsc->sessions[mds];
struct inode *inode = NULL;
if (!session) if (!session)
continue; continue;
...@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) ...@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
list_entry(session->s_cap_flushing.next, list_entry(session->s_cap_flushing.next,
struct ceph_inode_info, struct ceph_inode_info,
i_flushing_item); i_flushing_item);
struct inode *inode = &ci->vfs_inode;
spin_lock(&ci->i_ceph_lock); if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
if (ci->i_cap_flush_seq <= want_flush_seq) {
dout("check_cap_flush still flushing %p " dout("check_cap_flush still flushing %p "
"seq %lld <= %lld to mds%d\n", inode, "seq %lld <= %lld to mds%d\n",
ci->i_cap_flush_seq, want_flush_seq, &ci->vfs_inode, ci->i_cap_flush_seq,
session->s_mds); want_flush_seq, session->s_mds);
ret = 0; inode = igrab(&ci->vfs_inode);
} }
spin_unlock(&ci->i_ceph_lock);
} }
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session); ceph_put_mds_session(session);
if (!ret) if (inode) {
return ret; wait_event(mdsc->cap_flushing_wq,
check_cap_flush(inode, want_flush_seq));
iput(inode);
}
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
} }
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
return ret;
} }
/* /*
...@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
head->num_releases = cpu_to_le16(releases); head->num_releases = cpu_to_le16(releases);
/* time stamp */ /* time stamp */
ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); {
struct ceph_timespec ts;
ceph_encode_timespec(&ts, &req->r_stamp);
ceph_encode_copy(&p, &ts, sizeof(ts));
}
BUG_ON(p > end); BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base; msg->front.iov_len = p - msg->front.iov_base;
...@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
/* time stamp */ /* time stamp */
p = msg->front.iov_base + req->r_request_release_offset; p = msg->front.iov_base + req->r_request_release_offset;
ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp)); {
struct ceph_timespec ts;
ceph_encode_timespec(&ts, &req->r_stamp);
ceph_encode_copy(&p, &ts, sizeof(ts));
}
msg->front.iov_len = p - msg->front.iov_base; msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
...@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) ...@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
p = rb_next(p); p = rb_next(p);
if (req->r_got_unsafe) if (req->r_got_unsafe)
continue; continue;
if (req->r_attempts > 0)
continue; /* only new requests */
if (req->r_session && if (req->r_session &&
req->r_session->s_mds == mds) { req->r_session->s_mds == mds) {
dout(" kicking tid %llu\n", req->r_tid); dout(" kicking tid %llu\n", req->r_tid);
...@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
struct ceph_mds_request *req; struct ceph_mds_request *req;
struct ceph_mds_reply_head *head = msg->front.iov_base; struct ceph_mds_reply_head *head = msg->front.iov_base;
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
struct ceph_snap_realm *realm;
u64 tid; u64 tid;
int err, result; int err, result;
int mds = session->s_mds; int mds = session->s_mds;
...@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
} }
/* snap trace */ /* snap trace */
realm = NULL;
if (rinfo->snapblob_len) { if (rinfo->snapblob_len) {
down_write(&mdsc->snap_rwsem); down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, rinfo->snapblob, ceph_update_snap_trace(mdsc, rinfo->snapblob,
rinfo->snapblob + rinfo->snapblob_len, rinfo->snapblob + rinfo->snapblob_len,
le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
&realm);
downgrade_write(&mdsc->snap_rwsem); downgrade_write(&mdsc->snap_rwsem);
} else { } else {
down_read(&mdsc->snap_rwsem); down_read(&mdsc->snap_rwsem);
...@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_unlock(&req->r_fill_mutex); mutex_unlock(&req->r_fill_mutex);
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
if (realm)
ceph_put_snap_realm(mdsc, realm);
out_err: out_err:
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
if (!req->r_aborted) { if (!req->r_aborted) {
...@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, ...@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
BUG_ON(req->r_err); BUG_ON(req->r_err);
BUG_ON(req->r_got_result); BUG_ON(req->r_got_result);
req->r_attempts = 0;
req->r_num_fwd = fwd_seq; req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds; req->r_resend_mds = next_mds;
put_request_session(req); put_request_session(req);
...@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session, ...@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
send_flushmsg_ack(mdsc, session, seq); send_flushmsg_ack(mdsc, session, seq);
break; break;
case CEPH_SESSION_FORCE_RO:
dout("force_session_readonly %p\n", session);
spin_lock(&session->s_cap_lock);
session->s_readonly = true;
spin_unlock(&session->s_cap_lock);
wake_up_session_caps(session, 0);
break;
default: default:
pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
WARN_ON(1); WARN_ON(1);
...@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, ...@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session) struct ceph_mds_session *session)
{ {
struct ceph_mds_request *req, *nreq; struct ceph_mds_request *req, *nreq;
struct rb_node *p;
int err; int err;
dout("replay_unsafe_requests mds%d\n", session->s_mds); dout("replay_unsafe_requests mds%d\n", session->s_mds);
...@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, ...@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
ceph_con_send(&session->s_con, req->r_request); ceph_con_send(&session->s_con, req->r_request);
} }
} }
/*
* also re-send old requests when MDS enters reconnect stage. So that MDS
* can process completed request in clientreplay stage.
*/
p = rb_first(&mdsc->request_tree);
while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p);
if (req->r_got_unsafe)
continue;
if (req->r_attempts == 0)
continue; /* only old requests */
if (req->r_session &&
req->r_session->s_mds == session->s_mds) {
err = __prepare_send_request(mdsc, req, session->s_mds);
if (!err) {
ceph_msg_get(req->r_request);
ceph_con_send(&session->s_con, req->r_request);
}
}
}
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
} }
...@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_gen_ttl_lock); spin_unlock(&session->s_gen_ttl_lock);
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
/* don't know if session is readonly */
session->s_readonly = 0;
/* /*
* notify __ceph_remove_cap() that we are composing cap reconnect. * notify __ceph_remove_cap() that we are composing cap reconnect.
* If a cap get released before being added to the cap reconnect, * If a cap get released before being added to the cap reconnect,
...@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc, ...@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
mutex_unlock(&s->s_mutex); mutex_unlock(&s->s_mutex);
s->s_state = CEPH_MDS_SESSION_RESTARTING; s->s_state = CEPH_MDS_SESSION_RESTARTING;
} }
/* kick any requests waiting on the recovering mds */
kick_requests(mdsc, i);
} else if (oldstate == newstate) { } else if (oldstate == newstate) {
continue; /* nothing new with this mds */ continue; /* nothing new with this mds */
} }
...@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
init_waitqueue_head(&mdsc->session_close_wq); init_waitqueue_head(&mdsc->session_close_wq);
INIT_LIST_HEAD(&mdsc->waiting_for_map); INIT_LIST_HEAD(&mdsc->waiting_for_map);
mdsc->sessions = NULL; mdsc->sessions = NULL;
atomic_set(&mdsc->num_sessions, 0);
mdsc->max_sessions = 0; mdsc->max_sessions = 0;
mdsc->stopping = 0; mdsc->stopping = 0;
init_rwsem(&mdsc->snap_rwsem); init_rwsem(&mdsc->snap_rwsem);
...@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ...@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
dout("sync\n"); dout("sync\n");
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
want_tid = mdsc->last_tid; want_tid = mdsc->last_tid;
want_flush = mdsc->cap_flush_seq;
mutex_unlock(&mdsc->mutex); mutex_unlock(&mdsc->mutex);
dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
ceph_flush_dirty_caps(mdsc); ceph_flush_dirty_caps(mdsc);
spin_lock(&mdsc->cap_dirty_lock);
want_flush = mdsc->cap_flush_seq;
spin_unlock(&mdsc->cap_dirty_lock);
dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
wait_unsafe_requests(mdsc, want_tid); wait_unsafe_requests(mdsc, want_tid);
wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); wait_caps_flush(mdsc, want_flush);
} }
/* /*
...@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ...@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
*/ */
static bool done_closing_sessions(struct ceph_mds_client *mdsc) static bool done_closing_sessions(struct ceph_mds_client *mdsc)
{ {
int i, n = 0;
if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
return true; return true;
return atomic_read(&mdsc->num_sessions) == 0;
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++)
if (mdsc->sessions[i])
n++;
mutex_unlock(&mdsc->mutex);
return n == 0;
} }
/* /*
......
...@@ -137,6 +137,7 @@ struct ceph_mds_session { ...@@ -137,6 +137,7 @@ struct ceph_mds_session {
int s_nr_caps, s_trim_caps; int s_nr_caps, s_trim_caps;
int s_num_cap_releases; int s_num_cap_releases;
int s_cap_reconnect; int s_cap_reconnect;
int s_readonly;
struct list_head s_cap_releases; /* waiting cap_release messages */ struct list_head s_cap_releases; /* waiting cap_release messages */
struct list_head s_cap_releases_done; /* ready to send */ struct list_head s_cap_releases_done; /* ready to send */
struct ceph_cap *s_cap_iterator; struct ceph_cap *s_cap_iterator;
...@@ -272,6 +273,7 @@ struct ceph_mds_client { ...@@ -272,6 +273,7 @@ struct ceph_mds_client {
struct list_head waiting_for_map; struct list_head waiting_for_map;
struct ceph_mds_session **sessions; /* NULL for mds if no session */ struct ceph_mds_session **sessions; /* NULL for mds if no session */
atomic_t num_sessions;
int max_sessions; /* len of s_mds_sessions */ int max_sessions; /* len of s_mds_sessions */
int stopping; /* true if shutting down */ int stopping; /* true if shutting down */
......
...@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc, ...@@ -70,13 +70,11 @@ void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
* safe. we do need to protect against concurrent empty list * safe. we do need to protect against concurrent empty list
* additions, however. * additions, however.
*/ */
if (atomic_read(&realm->nref) == 0) { if (atomic_inc_return(&realm->nref) == 1) {
spin_lock(&mdsc->snap_empty_lock); spin_lock(&mdsc->snap_empty_lock);
list_del_init(&realm->empty_item); list_del_init(&realm->empty_item);
spin_unlock(&mdsc->snap_empty_lock); spin_unlock(&mdsc->snap_empty_lock);
} }
atomic_inc(&realm->nref);
} }
static void __insert_snap_realm(struct rb_root *root, static void __insert_snap_realm(struct rb_root *root,
...@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -116,7 +114,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
if (!realm) if (!realm)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
atomic_set(&realm->nref, 0); /* tree does not take a ref */ atomic_set(&realm->nref, 1); /* for caller */
realm->ino = ino; realm->ino = ino;
INIT_LIST_HEAD(&realm->children); INIT_LIST_HEAD(&realm->children);
INIT_LIST_HEAD(&realm->child_item); INIT_LIST_HEAD(&realm->child_item);
...@@ -134,7 +132,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm( ...@@ -134,7 +132,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
* *
* caller must hold snap_rwsem for write. * caller must hold snap_rwsem for write.
*/ */
struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
u64 ino) u64 ino)
{ {
struct rb_node *n = mdsc->snap_realms.rb_node; struct rb_node *n = mdsc->snap_realms.rb_node;
...@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, ...@@ -154,6 +152,16 @@ struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
return NULL; return NULL;
} }
struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
u64 ino)
{
struct ceph_snap_realm *r;
r = __lookup_snap_realm(mdsc, ino);
if (r)
ceph_get_snap_realm(mdsc, r);
return r;
}
static void __put_snap_realm(struct ceph_mds_client *mdsc, static void __put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm); struct ceph_snap_realm *realm);
...@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc, ...@@ -273,7 +281,6 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
} }
realm->parent_ino = parentino; realm->parent_ino = parentino;
realm->parent = parent; realm->parent = parent;
ceph_get_snap_realm(mdsc, parent);
list_add(&realm->child_item, &parent->children); list_add(&realm->child_item, &parent->children);
return 1; return 1;
} }
...@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) ...@@ -631,12 +638,14 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
* Caller must hold snap_rwsem for write. * Caller must hold snap_rwsem for write.
*/ */
int ceph_update_snap_trace(struct ceph_mds_client *mdsc, int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
void *p, void *e, bool deletion) void *p, void *e, bool deletion,
struct ceph_snap_realm **realm_ret)
{ {
struct ceph_mds_snap_realm *ri; /* encoded */ struct ceph_mds_snap_realm *ri; /* encoded */
__le64 *snaps; /* encoded */ __le64 *snaps; /* encoded */
__le64 *prior_parent_snaps; /* encoded */ __le64 *prior_parent_snaps; /* encoded */
struct ceph_snap_realm *realm; struct ceph_snap_realm *realm = NULL;
struct ceph_snap_realm *first_realm = NULL;
int invalidate = 0; int invalidate = 0;
int err = -ENOMEM; int err = -ENOMEM;
LIST_HEAD(dirty_realms); LIST_HEAD(dirty_realms);
...@@ -704,13 +713,18 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -704,13 +713,18 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino, dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
realm, invalidate, p, e); realm, invalidate, p, e);
if (p < e)
goto more;
/* invalidate when we reach the _end_ (root) of the trace */ /* invalidate when we reach the _end_ (root) of the trace */
if (invalidate) if (invalidate && p >= e)
rebuild_snap_realms(realm); rebuild_snap_realms(realm);
if (!first_realm)
first_realm = realm;
else
ceph_put_snap_realm(mdsc, realm);
if (p < e)
goto more;
/* /*
* queue cap snaps _after_ we've built the new snap contexts, * queue cap snaps _after_ we've built the new snap contexts,
* so that i_head_snapc can be set appropriately. * so that i_head_snapc can be set appropriately.
...@@ -721,12 +735,21 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc, ...@@ -721,12 +735,21 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
queue_realm_cap_snaps(realm); queue_realm_cap_snaps(realm);
} }
if (realm_ret)
*realm_ret = first_realm;
else
ceph_put_snap_realm(mdsc, first_realm);
__cleanup_empty_realms(mdsc); __cleanup_empty_realms(mdsc);
return 0; return 0;
bad: bad:
err = -EINVAL; err = -EINVAL;
fail: fail:
if (realm && !IS_ERR(realm))
ceph_put_snap_realm(mdsc, realm);
if (first_realm)
ceph_put_snap_realm(mdsc, first_realm);
pr_err("update_snap_trace error %d\n", err); pr_err("update_snap_trace error %d\n", err);
return err; return err;
} }
...@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -844,7 +867,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
if (IS_ERR(realm)) if (IS_ERR(realm))
goto out; goto out;
} }
ceph_get_snap_realm(mdsc, realm);
dout("splitting snap_realm %llx %p\n", realm->ino, realm); dout("splitting snap_realm %llx %p\n", realm->ino, realm);
for (i = 0; i < num_split_inos; i++) { for (i = 0; i < num_split_inos; i++) {
...@@ -905,7 +927,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -905,7 +927,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
/* we may have taken some of the old realm's children. */ /* we may have taken some of the old realm's children. */
for (i = 0; i < num_split_realms; i++) { for (i = 0; i < num_split_realms; i++) {
struct ceph_snap_realm *child = struct ceph_snap_realm *child =
ceph_lookup_snap_realm(mdsc, __lookup_snap_realm(mdsc,
le64_to_cpu(split_realms[i])); le64_to_cpu(split_realms[i]));
if (!child) if (!child)
continue; continue;
...@@ -918,7 +940,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -918,7 +940,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
* snap, we can avoid queueing cap_snaps. * snap, we can avoid queueing cap_snaps.
*/ */
ceph_update_snap_trace(mdsc, p, e, ceph_update_snap_trace(mdsc, p, e,
op == CEPH_SNAP_OP_DESTROY); op == CEPH_SNAP_OP_DESTROY, NULL);
if (op == CEPH_SNAP_OP_SPLIT) if (op == CEPH_SNAP_OP_SPLIT)
/* we took a reference when we created the realm, above */ /* we took a reference when we created the realm, above */
......
...@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) ...@@ -414,6 +414,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_puts(m, ",noshare"); seq_puts(m, ",noshare");
if (opt->flags & CEPH_OPT_NOCRC) if (opt->flags & CEPH_OPT_NOCRC)
seq_puts(m, ",nocrc"); seq_puts(m, ",nocrc");
if (opt->flags & CEPH_OPT_NOMSGAUTH)
seq_puts(m, ",nocephx_require_signatures");
if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
seq_puts(m, ",notcp_nodelay");
if (opt->name) if (opt->name)
seq_printf(m, ",name=%s", opt->name); seq_printf(m, ",name=%s", opt->name);
......
...@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc, ...@@ -693,7 +693,8 @@ extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc, extern void ceph_put_snap_realm(struct ceph_mds_client *mdsc,
struct ceph_snap_realm *realm); struct ceph_snap_realm *realm);
extern int ceph_update_snap_trace(struct ceph_mds_client *m, extern int ceph_update_snap_trace(struct ceph_mds_client *m,
void *p, void *e, bool deletion); void *p, void *e, bool deletion,
struct ceph_snap_realm **realm_ret);
extern void ceph_handle_snap(struct ceph_mds_client *mdsc, extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
...@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, ...@@ -892,7 +893,9 @@ extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
int ceph_uninline_data(struct file *filp, struct page *locked_page); int ceph_uninline_data(struct file *filp, struct page *locked_page);
/* dir.c */ /* dir.c */
extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_dir_fops;
extern const struct file_operations ceph_snapdir_fops;
extern const struct inode_operations ceph_dir_iops; extern const struct inode_operations ceph_dir_iops;
extern const struct inode_operations ceph_snapdir_iops;
extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops, extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
ceph_snapdir_dentry_ops; ceph_snapdir_dentry_ops;
......
...@@ -158,17 +158,6 @@ enum { ...@@ -158,17 +158,6 @@ enum {
}; };
/* pool operations */
enum {
POOL_OP_CREATE = 0x01,
POOL_OP_DELETE = 0x02,
POOL_OP_AUID_CHANGE = 0x03,
POOL_OP_CREATE_SNAP = 0x11,
POOL_OP_DELETE_SNAP = 0x12,
POOL_OP_CREATE_UNMANAGED_SNAP = 0x21,
POOL_OP_DELETE_UNMANAGED_SNAP = 0x22,
};
struct ceph_mon_request_header { struct ceph_mon_request_header {
__le64 have_version; __le64 have_version;
__le16 session_mon; __le16 session_mon;
...@@ -191,31 +180,6 @@ struct ceph_mon_statfs_reply { ...@@ -191,31 +180,6 @@ struct ceph_mon_statfs_reply {
struct ceph_statfs st; struct ceph_statfs st;
} __attribute__ ((packed)); } __attribute__ ((packed));
const char *ceph_pool_op_name(int op);
struct ceph_mon_poolop {
struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid;
__le32 pool;
__le32 op;
__le64 auid;
__le64 snapid;
__le32 name_len;
} __attribute__ ((packed));
struct ceph_mon_poolop_reply {
struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid;
__le32 reply_code;
__le32 epoch;
char has_data;
char data[0];
} __attribute__ ((packed));
struct ceph_mon_unmanaged_snap {
__le64 snapid;
} __attribute__ ((packed));
struct ceph_osd_getmap { struct ceph_osd_getmap {
struct ceph_mon_request_header monhdr; struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid; struct ceph_fsid fsid;
...@@ -307,6 +271,7 @@ enum { ...@@ -307,6 +271,7 @@ enum {
CEPH_SESSION_RECALL_STATE, CEPH_SESSION_RECALL_STATE,
CEPH_SESSION_FLUSHMSG, CEPH_SESSION_FLUSHMSG,
CEPH_SESSION_FLUSHMSG_ACK, CEPH_SESSION_FLUSHMSG_ACK,
CEPH_SESSION_FORCE_RO,
}; };
extern const char *ceph_session_op_name(int op); extern const char *ceph_session_op_name(int op);
......
...@@ -30,8 +30,9 @@ ...@@ -30,8 +30,9 @@
#define CEPH_OPT_MYIP (1<<2) /* specified my ip */ #define CEPH_OPT_MYIP (1<<2) /* specified my ip */
#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */ #define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */ #define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
#define CEPH_OPT_TCP_NODELAY (1<<5) /* TCP_NODELAY on TCP sockets */
#define CEPH_OPT_DEFAULT (0) #define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY)
#define ceph_set_opt(client, opt) \ #define ceph_set_opt(client, opt) \
(client)->options->flags |= CEPH_OPT_##opt; (client)->options->flags |= CEPH_OPT_##opt;
......
...@@ -57,6 +57,7 @@ struct ceph_messenger { ...@@ -57,6 +57,7 @@ struct ceph_messenger {
atomic_t stopping; atomic_t stopping;
bool nocrc; bool nocrc;
bool tcp_nodelay;
/* /*
* the global_seq counts connections i (attempt to) initiate * the global_seq counts connections i (attempt to) initiate
...@@ -264,7 +265,8 @@ extern void ceph_messenger_init(struct ceph_messenger *msgr, ...@@ -264,7 +265,8 @@ extern void ceph_messenger_init(struct ceph_messenger *msgr,
struct ceph_entity_addr *myaddr, struct ceph_entity_addr *myaddr,
u64 supported_features, u64 supported_features,
u64 required_features, u64 required_features,
bool nocrc); bool nocrc,
bool tcp_nodelay);
extern void ceph_con_init(struct ceph_connection *con, void *private, extern void ceph_con_init(struct ceph_connection *con, void *private,
const struct ceph_connection_operations *ops, const struct ceph_connection_operations *ops,
......
...@@ -40,7 +40,7 @@ struct ceph_mon_request { ...@@ -40,7 +40,7 @@ struct ceph_mon_request {
}; };
/* /*
* ceph_mon_generic_request is being used for the statfs, poolop and * ceph_mon_generic_request is being used for the statfs and
* mon_get_version requests which are being done a bit differently * mon_get_version requests which are being done a bit differently
* because we need to get data back to the caller * because we need to get data back to the caller
*/ */
...@@ -50,7 +50,6 @@ struct ceph_mon_generic_request { ...@@ -50,7 +50,6 @@ struct ceph_mon_generic_request {
struct rb_node node; struct rb_node node;
int result; int result;
void *buf; void *buf;
int buf_len;
struct completion completion; struct completion completion;
struct ceph_msg *request; /* original request */ struct ceph_msg *request; /* original request */
struct ceph_msg *reply; /* and reply */ struct ceph_msg *reply; /* and reply */
...@@ -117,10 +116,4 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc); ...@@ -117,10 +116,4 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid);
extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid);
#endif #endif
...@@ -239,6 +239,8 @@ enum { ...@@ -239,6 +239,8 @@ enum {
Opt_nocrc, Opt_nocrc,
Opt_cephx_require_signatures, Opt_cephx_require_signatures,
Opt_nocephx_require_signatures, Opt_nocephx_require_signatures,
Opt_tcp_nodelay,
Opt_notcp_nodelay,
}; };
static match_table_t opt_tokens = { static match_table_t opt_tokens = {
...@@ -259,6 +261,8 @@ static match_table_t opt_tokens = { ...@@ -259,6 +261,8 @@ static match_table_t opt_tokens = {
{Opt_nocrc, "nocrc"}, {Opt_nocrc, "nocrc"},
{Opt_cephx_require_signatures, "cephx_require_signatures"}, {Opt_cephx_require_signatures, "cephx_require_signatures"},
{Opt_nocephx_require_signatures, "nocephx_require_signatures"}, {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
{Opt_tcp_nodelay, "tcp_nodelay"},
{Opt_notcp_nodelay, "notcp_nodelay"},
{-1, NULL} {-1, NULL}
}; };
...@@ -457,6 +461,7 @@ ceph_parse_options(char *options, const char *dev_name, ...@@ -457,6 +461,7 @@ ceph_parse_options(char *options, const char *dev_name,
case Opt_nocrc: case Opt_nocrc:
opt->flags |= CEPH_OPT_NOCRC; opt->flags |= CEPH_OPT_NOCRC;
break; break;
case Opt_cephx_require_signatures: case Opt_cephx_require_signatures:
opt->flags &= ~CEPH_OPT_NOMSGAUTH; opt->flags &= ~CEPH_OPT_NOMSGAUTH;
break; break;
...@@ -464,6 +469,13 @@ ceph_parse_options(char *options, const char *dev_name, ...@@ -464,6 +469,13 @@ ceph_parse_options(char *options, const char *dev_name,
opt->flags |= CEPH_OPT_NOMSGAUTH; opt->flags |= CEPH_OPT_NOMSGAUTH;
break; break;
case Opt_tcp_nodelay:
opt->flags |= CEPH_OPT_TCP_NODELAY;
break;
case Opt_notcp_nodelay:
opt->flags &= ~CEPH_OPT_TCP_NODELAY;
break;
default: default:
BUG_ON(token); BUG_ON(token);
} }
...@@ -518,10 +530,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, ...@@ -518,10 +530,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
/* msgr */ /* msgr */
if (ceph_test_opt(client, MYIP)) if (ceph_test_opt(client, MYIP))
myaddr = &client->options->my_addr; myaddr = &client->options->my_addr;
ceph_messenger_init(&client->msgr, myaddr, ceph_messenger_init(&client->msgr, myaddr,
client->supported_features, client->supported_features,
client->required_features, client->required_features,
ceph_test_opt(client, NOCRC)); ceph_test_opt(client, NOCRC),
ceph_test_opt(client, TCP_NODELAY));
/* subsystems */ /* subsystems */
err = ceph_monc_init(&client->monc, client); err = ceph_monc_init(&client->monc, client);
......
...@@ -42,17 +42,3 @@ const char *ceph_osd_state_name(int s) ...@@ -42,17 +42,3 @@ const char *ceph_osd_state_name(int s)
return "???"; return "???";
} }
} }
const char *ceph_pool_op_name(int op)
{
switch (op) {
case POOL_OP_CREATE: return "create";
case POOL_OP_DELETE: return "delete";
case POOL_OP_AUID_CHANGE: return "auid change";
case POOL_OP_CREATE_SNAP: return "create snap";
case POOL_OP_DELETE_SNAP: return "delete snap";
case POOL_OP_CREATE_UNMANAGED_SNAP: return "create unmanaged snap";
case POOL_OP_DELETE_UNMANAGED_SNAP: return "delete unmanaged snap";
}
return "???";
}
...@@ -127,8 +127,6 @@ static int monc_show(struct seq_file *s, void *p) ...@@ -127,8 +127,6 @@ static int monc_show(struct seq_file *s, void *p)
op = le16_to_cpu(req->request->hdr.type); op = le16_to_cpu(req->request->hdr.type);
if (op == CEPH_MSG_STATFS) if (op == CEPH_MSG_STATFS)
seq_printf(s, "%llu statfs\n", req->tid); seq_printf(s, "%llu statfs\n", req->tid);
else if (op == CEPH_MSG_POOLOP)
seq_printf(s, "%llu poolop\n", req->tid);
else if (op == CEPH_MSG_MON_GET_VERSION) else if (op == CEPH_MSG_MON_GET_VERSION)
seq_printf(s, "%llu mon_get_version", req->tid); seq_printf(s, "%llu mon_get_version", req->tid);
else else
......
...@@ -510,6 +510,16 @@ static int ceph_tcp_connect(struct ceph_connection *con) ...@@ -510,6 +510,16 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return ret; return ret;
} }
if (con->msgr->tcp_nodelay) {
int optval = 1;
ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
(char *)&optval, sizeof(optval));
if (ret)
pr_err("kernel_setsockopt(TCP_NODELAY) failed: %d",
ret);
}
sk_set_memalloc(sock->sk); sk_set_memalloc(sock->sk);
con->sock = sock; con->sock = sock;
...@@ -2922,7 +2932,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr, ...@@ -2922,7 +2932,8 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
struct ceph_entity_addr *myaddr, struct ceph_entity_addr *myaddr,
u64 supported_features, u64 supported_features,
u64 required_features, u64 required_features,
bool nocrc) bool nocrc,
bool tcp_nodelay)
{ {
msgr->supported_features = supported_features; msgr->supported_features = supported_features;
msgr->required_features = required_features; msgr->required_features = required_features;
...@@ -2937,6 +2948,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr, ...@@ -2937,6 +2948,7 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
encode_my_addr(msgr); encode_my_addr(msgr);
msgr->nocrc = nocrc; msgr->nocrc = nocrc;
msgr->tcp_nodelay = tcp_nodelay;
atomic_set(&msgr->stopping, 0); atomic_set(&msgr->stopping, 0);
......
...@@ -410,7 +410,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc, ...@@ -410,7 +410,7 @@ static void ceph_monc_handle_map(struct ceph_mon_client *monc,
} }
/* /*
* generic requests (e.g., statfs, poolop) * generic requests (currently statfs, mon_get_version)
*/ */
static struct ceph_mon_generic_request *__lookup_generic_req( static struct ceph_mon_generic_request *__lookup_generic_req(
struct ceph_mon_client *monc, u64 tid) struct ceph_mon_client *monc, u64 tid)
...@@ -569,7 +569,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, ...@@ -569,7 +569,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
return; return;
bad: bad:
pr_err("corrupt generic reply, tid %llu\n", tid); pr_err("corrupt statfs reply, tid %llu\n", tid);
ceph_msg_dump(msg); ceph_msg_dump(msg);
} }
...@@ -588,7 +588,6 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) ...@@ -588,7 +588,6 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
kref_init(&req->kref); kref_init(&req->kref);
req->buf = buf; req->buf = buf;
req->buf_len = sizeof(*buf);
init_completion(&req->completion); init_completion(&req->completion);
err = -ENOMEM; err = -ENOMEM;
...@@ -611,7 +610,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) ...@@ -611,7 +610,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
err = do_generic_request(monc, req); err = do_generic_request(monc, req);
out: out:
kref_put(&req->kref, release_generic_request); put_generic_request(req);
return err; return err;
} }
EXPORT_SYMBOL(ceph_monc_do_statfs); EXPORT_SYMBOL(ceph_monc_do_statfs);
...@@ -647,7 +646,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, ...@@ -647,7 +646,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc,
return; return;
bad: bad:
pr_err("corrupt mon_get_version reply\n"); pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
ceph_msg_dump(msg); ceph_msg_dump(msg);
} }
...@@ -670,7 +669,6 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, ...@@ -670,7 +669,6 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
kref_init(&req->kref); kref_init(&req->kref);
req->buf = newest; req->buf = newest;
req->buf_len = sizeof(*newest);
init_completion(&req->completion); init_completion(&req->completion);
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
...@@ -701,132 +699,10 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, ...@@ -701,132 +699,10 @@ int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
out: out:
kref_put(&req->kref, release_generic_request);
return err;
}
EXPORT_SYMBOL(ceph_monc_do_get_version);
/*
* pool ops
*/
static int get_poolop_reply_buf(const char *src, size_t src_len,
char *dst, size_t dst_len)
{
u32 buf_len;
if (src_len != sizeof(u32) + dst_len)
return -EINVAL;
buf_len = le32_to_cpu(*(__le32 *)src);
if (buf_len != dst_len)
return -EINVAL;
memcpy(dst, src + sizeof(u32), dst_len);
return 0;
}
static void handle_poolop_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
u64 tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*reply))
goto bad;
dout("handle_poolop_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex);
req = __lookup_generic_req(monc, tid);
if (req) {
if (req->buf_len &&
get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
msg->front.iov_len - sizeof(*reply),
req->buf, req->buf_len) < 0) {
mutex_unlock(&monc->mutex);
goto bad;
}
req->result = le32_to_cpu(reply->reply_code);
get_generic_request(req);
}
mutex_unlock(&monc->mutex);
if (req) {
complete(&req->completion);
put_generic_request(req); put_generic_request(req);
}
return;
bad:
pr_err("corrupt generic reply, tid %llu\n", tid);
ceph_msg_dump(msg);
}
/*
* Do a synchronous pool op.
*/
static int do_poolop(struct ceph_mon_client *monc, u32 op,
u32 pool, u64 snapid,
char *buf, int len)
{
struct ceph_mon_generic_request *req;
struct ceph_mon_poolop *h;
int err;
req = kzalloc(sizeof(*req), GFP_NOFS);
if (!req)
return -ENOMEM;
kref_init(&req->kref);
req->buf = buf;
req->buf_len = len;
init_completion(&req->completion);
err = -ENOMEM;
req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
true);
if (!req->request)
goto out;
req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
true);
if (!req->reply)
goto out;
/* fill out request */
req->request->hdr.version = cpu_to_le16(2);
h = req->request->front.iov_base;
h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid;
h->pool = cpu_to_le32(pool);
h->op = cpu_to_le32(op);
h->auid = 0;
h->snapid = cpu_to_le64(snapid);
h->name_len = 0;
err = do_generic_request(monc, req);
out:
kref_put(&req->kref, release_generic_request);
return err; return err;
} }
EXPORT_SYMBOL(ceph_monc_do_get_version);
int ceph_monc_create_snapid(struct ceph_mon_client *monc,
u32 pool, u64 *snapid)
{
return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, 0, (char *)snapid, sizeof(*snapid));
}
EXPORT_SYMBOL(ceph_monc_create_snapid);
int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
u32 pool, u64 snapid)
{
return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP,
pool, snapid, NULL, 0);
}
/* /*
* Resend pending generic requests. * Resend pending generic requests.
...@@ -1112,10 +988,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -1112,10 +988,6 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_get_version_reply(monc, msg); handle_get_version_reply(monc, msg);
break; break;
case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg);
break;
case CEPH_MSG_MON_MAP: case CEPH_MSG_MON_MAP:
ceph_monc_handle_map(monc, msg); ceph_monc_handle_map(monc, msg);
break; break;
...@@ -1154,7 +1026,6 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, ...@@ -1154,7 +1026,6 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_MON_SUBSCRIBE_ACK: case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack); m = ceph_msg_get(monc->m_subscribe_ack);
break; break;
case CEPH_MSG_POOLOP_REPLY:
case CEPH_MSG_STATFS_REPLY: case CEPH_MSG_STATFS_REPLY:
return get_generic_reply(con, hdr, skip); return get_generic_reply(con, hdr, skip);
case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_AUTH_REPLY:
......
...@@ -1035,9 +1035,10 @@ static void put_osd(struct ceph_osd *osd) ...@@ -1035,9 +1035,10 @@ static void put_osd(struct ceph_osd *osd)
{ {
dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
atomic_read(&osd->o_ref) - 1); atomic_read(&osd->o_ref) - 1);
if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { if (atomic_dec_and_test(&osd->o_ref)) {
struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
if (osd->o_auth.authorizer)
ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
kfree(osd); kfree(osd);
} }
...@@ -1048,14 +1049,24 @@ static void put_osd(struct ceph_osd *osd) ...@@ -1048,14 +1049,24 @@ static void put_osd(struct ceph_osd *osd)
*/ */
static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{ {
dout("__remove_osd %p\n", osd); dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
WARN_ON(!list_empty(&osd->o_requests)); WARN_ON(!list_empty(&osd->o_requests));
WARN_ON(!list_empty(&osd->o_linger_requests)); WARN_ON(!list_empty(&osd->o_linger_requests));
rb_erase(&osd->o_node, &osdc->osds);
list_del_init(&osd->o_osd_lru); list_del_init(&osd->o_osd_lru);
rb_erase(&osd->o_node, &osdc->osds);
RB_CLEAR_NODE(&osd->o_node);
}
static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
if (!RB_EMPTY_NODE(&osd->o_node)) {
ceph_con_close(&osd->o_con); ceph_con_close(&osd->o_con);
__remove_osd(osdc, osd);
put_osd(osd); put_osd(osd);
}
} }
static void remove_all_osds(struct ceph_osd_client *osdc) static void remove_all_osds(struct ceph_osd_client *osdc)
...@@ -1065,7 +1076,7 @@ static void remove_all_osds(struct ceph_osd_client *osdc) ...@@ -1065,7 +1076,7 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
while (!RB_EMPTY_ROOT(&osdc->osds)) { while (!RB_EMPTY_ROOT(&osdc->osds)) {
struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
struct ceph_osd, o_node); struct ceph_osd, o_node);
__remove_osd(osdc, osd); remove_osd(osdc, osd);
} }
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
} }
...@@ -1106,7 +1117,7 @@ static void remove_old_osds(struct ceph_osd_client *osdc) ...@@ -1106,7 +1117,7 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
if (time_before(jiffies, osd->lru_ttl)) if (time_before(jiffies, osd->lru_ttl))
break; break;
__remove_osd(osdc, osd); remove_osd(osdc, osd);
} }
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
} }
...@@ -1121,8 +1132,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) ...@@ -1121,8 +1132,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
dout("__reset_osd %p osd%d\n", osd, osd->o_osd); dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests) && if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) { list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd); remove_osd(osdc, osd);
return -ENODEV; return -ENODEV;
} }
...@@ -1926,6 +1936,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc) ...@@ -1926,6 +1936,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
{ {
struct rb_node *p, *n; struct rb_node *p, *n;
dout("%s %p\n", __func__, osdc);
for (p = rb_first(&osdc->osds); p; p = n) { for (p = rb_first(&osdc->osds); p; p = n) {
struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment