Commit 6d87c225 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "This has a mix of bug fixes and cleanups.

  Alex's patch fixes a rare race in RBD.  Ilya's patches fix an ENOENT
  check when a second rbd image is mapped and a couple memory leaks.
  Zheng fixes several issues with fragmented directories and multiple
  MDSs.  Josh fixes a spin/sleep issue, and Josh and Guangliang's
  patches fix setting and unsetting RBD images read-only.

  Naturally there are several other cleanups mixed in for good measure"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  rbd: only set disk to read-only once
  rbd: move calls that may sleep out of spin lock range
  rbd: add ioctl for rbd
  ceph: use truncate_pagecache() instead of truncate_inode_pages()
  ceph: include time stamp in every MDS request
  rbd: fix ida/idr memory leak
  rbd: use reference counts for image requests
  rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
  rbd: make sure we have latest osdmap on 'rbd map'
  libceph: add ceph_monc_wait_osdmap()
  libceph: mon_get_version request infrastructure
  libceph: recognize poolop requests in debugfs
  ceph: refactor readpage_nounlock() to make the logic clearer
  mds: check cap ID when handling cap export message
  ceph: remember subtree root dirfrag's auth MDS
  ceph: introduce ceph_fill_fragtree()
  ceph: handle cap import atomically
  ceph: pre-allocate ceph_cap struct for ceph_add_cap()
  ceph: update inode fields according to issued caps
  rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
  ...
parents 338c09a9 22001f61
...@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode) ...@@ -541,7 +541,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
return -ENOENT; return -ENOENT;
(void) get_device(&rbd_dev->dev); (void) get_device(&rbd_dev->dev);
set_device_ro(bdev, rbd_dev->mapping.read_only);
return 0; return 0;
} }
...@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode) ...@@ -559,10 +558,76 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
put_device(&rbd_dev->dev); put_device(&rbd_dev->dev);
} }
static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
{
int ret = 0;
int val;
bool ro;
bool ro_changed = false;
/* get_user() may sleep, so call it before taking rbd_dev->lock */
if (get_user(val, (int __user *)(arg)))
return -EFAULT;
ro = val ? true : false;
/* Snapshot doesn't allow to write*/
if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
return -EROFS;
spin_lock_irq(&rbd_dev->lock);
/* prevent others open this device */
if (rbd_dev->open_count > 1) {
ret = -EBUSY;
goto out;
}
if (rbd_dev->mapping.read_only != ro) {
rbd_dev->mapping.read_only = ro;
ro_changed = true;
}
out:
spin_unlock_irq(&rbd_dev->lock);
/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
if (ret == 0 && ro_changed)
set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
return ret;
}
static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
unsigned int cmd, unsigned long arg)
{
struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
int ret = 0;
switch (cmd) {
case BLKROSET:
ret = rbd_ioctl_set_ro(rbd_dev, arg);
break;
default:
ret = -ENOTTY;
}
return ret;
}
#ifdef CONFIG_COMPAT
static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
unsigned int cmd, unsigned long arg)
{
return rbd_ioctl(bdev, mode, cmd, arg);
}
#endif /* CONFIG_COMPAT */
static const struct block_device_operations rbd_bd_ops = { static const struct block_device_operations rbd_bd_ops = {
.owner = THIS_MODULE, .owner = THIS_MODULE,
.open = rbd_open, .open = rbd_open,
.release = rbd_release, .release = rbd_release,
.ioctl = rbd_ioctl,
#ifdef CONFIG_COMPAT
.compat_ioctl = rbd_compat_ioctl,
#endif
}; };
/* /*
...@@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request) ...@@ -1382,6 +1447,13 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
kref_put(&obj_request->kref, rbd_obj_request_destroy); kref_put(&obj_request->kref, rbd_obj_request_destroy);
} }
static void rbd_img_request_get(struct rbd_img_request *img_request)
{
dout("%s: img %p (was %d)\n", __func__, img_request,
atomic_read(&img_request->kref.refcount));
kref_get(&img_request->kref);
}
static bool img_request_child_test(struct rbd_img_request *img_request); static bool img_request_child_test(struct rbd_img_request *img_request);
static void rbd_parent_request_destroy(struct kref *kref); static void rbd_parent_request_destroy(struct kref *kref);
static void rbd_img_request_destroy(struct kref *kref); static void rbd_img_request_destroy(struct kref *kref);
...@@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) ...@@ -2142,6 +2214,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
img_request->next_completion = which; img_request->next_completion = which;
out: out:
spin_unlock_irq(&img_request->completion_lock); spin_unlock_irq(&img_request->completion_lock);
rbd_img_request_put(img_request);
if (!more) if (!more)
rbd_img_request_complete(img_request); rbd_img_request_complete(img_request);
...@@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, ...@@ -2242,6 +2315,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
goto out_unwind; goto out_unwind;
obj_request->osd_req = osd_req; obj_request->osd_req = osd_req;
obj_request->callback = rbd_img_obj_callback; obj_request->callback = rbd_img_obj_callback;
rbd_img_request_get(img_request);
if (write_request) { if (write_request) {
osd_req_op_alloc_hint_init(osd_req, which, osd_req_op_alloc_hint_init(osd_req, which,
...@@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) ...@@ -2872,56 +2946,55 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
} }
/* /*
* Request sync osd watch/unwatch. The value of "start" determines * Initiate a watch request, synchronously.
* whether a watch request is being initiated or torn down.
*/ */
static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{ {
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request; struct rbd_obj_request *obj_request;
int ret; int ret;
rbd_assert(start ^ !!rbd_dev->watch_event); rbd_assert(!rbd_dev->watch_event);
rbd_assert(start ^ !!rbd_dev->watch_request); rbd_assert(!rbd_dev->watch_request);
if (start) { ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, &rbd_dev->watch_event);
&rbd_dev->watch_event); if (ret < 0)
if (ret < 0) return ret;
return ret;
rbd_assert(rbd_dev->watch_event != NULL); rbd_assert(rbd_dev->watch_event);
}
ret = -ENOMEM;
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
OBJ_REQUEST_NODATA); OBJ_REQUEST_NODATA);
if (!obj_request) if (!obj_request) {
ret = -ENOMEM;
goto out_cancel; goto out_cancel;
}
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1, obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request); obj_request);
if (!obj_request->osd_req) if (!obj_request->osd_req) {
goto out_cancel; ret = -ENOMEM;
goto out_put;
}
if (start) ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
else
ceph_osdc_unregister_linger_request(osdc,
rbd_dev->watch_request->osd_req);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie, 0, start ? 1 : 0); rbd_dev->watch_event->cookie, 0, 1);
rbd_osd_req_format_write(obj_request); rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request); ret = rbd_obj_request_submit(osdc, obj_request);
if (ret) if (ret)
goto out_cancel; goto out_linger;
ret = rbd_obj_request_wait(obj_request); ret = rbd_obj_request_wait(obj_request);
if (ret) if (ret)
goto out_cancel; goto out_linger;
ret = obj_request->result; ret = obj_request->result;
if (ret) if (ret)
goto out_cancel; goto out_linger;
/* /*
* A watch request is set to linger, so the underlying osd * A watch request is set to linger, so the underlying osd
...@@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start) ...@@ -2931,36 +3004,84 @@ static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
* it. We'll drop that reference (below) after we've * it. We'll drop that reference (below) after we've
* unregistered it. * unregistered it.
*/ */
if (start) { rbd_dev->watch_request = obj_request;
rbd_dev->watch_request = obj_request;
return 0; return 0;
out_linger:
ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
out_put:
rbd_obj_request_put(obj_request);
out_cancel:
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return ret;
}
/*
* Tear down a watch request, synchronously.
*/
static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
rbd_assert(rbd_dev->watch_event);
rbd_assert(rbd_dev->watch_request);
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
OBJ_REQUEST_NODATA);
if (!obj_request) {
ret = -ENOMEM;
goto out_cancel;
}
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
if (!obj_request->osd_req) {
ret = -ENOMEM;
goto out_put;
} }
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie, 0, 0);
rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out_put;
ret = rbd_obj_request_wait(obj_request);
if (ret)
goto out_put;
ret = obj_request->result;
if (ret)
goto out_put;
/* We have successfully torn down the watch request */ /* We have successfully torn down the watch request */
ceph_osdc_unregister_linger_request(osdc,
rbd_dev->watch_request->osd_req);
rbd_obj_request_put(rbd_dev->watch_request); rbd_obj_request_put(rbd_dev->watch_request);
rbd_dev->watch_request = NULL; rbd_dev->watch_request = NULL;
out_put:
rbd_obj_request_put(obj_request);
out_cancel: out_cancel:
/* Cancel the event if we're tearing down, or on error */
ceph_osdc_cancel_event(rbd_dev->watch_event); ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL; rbd_dev->watch_event = NULL;
if (obj_request)
rbd_obj_request_put(obj_request);
return ret; return ret;
} }
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{
return __rbd_dev_header_watch_sync(rbd_dev, true);
}
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{ {
int ret; int ret;
ret = __rbd_dev_header_watch_sync(rbd_dev, false); ret = __rbd_dev_header_unwatch_sync(rbd_dev);
if (ret) { if (ret) {
rbd_warn(rbd_dev, "unable to tear down watch request: %d\n", rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
ret); ret);
...@@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q) ...@@ -3058,7 +3179,6 @@ static void rbd_request_fn(struct request_queue *q)
__releases(q->queue_lock) __acquires(q->queue_lock) __releases(q->queue_lock) __acquires(q->queue_lock)
{ {
struct rbd_device *rbd_dev = q->queuedata; struct rbd_device *rbd_dev = q->queuedata;
bool read_only = rbd_dev->mapping.read_only;
struct request *rq; struct request *rq;
int result; int result;
...@@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q) ...@@ -3094,7 +3214,7 @@ static void rbd_request_fn(struct request_queue *q)
if (write_request) { if (write_request) {
result = -EROFS; result = -EROFS;
if (read_only) if (rbd_dev->mapping.read_only)
goto end_request; goto end_request;
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
} }
...@@ -4682,6 +4802,38 @@ static int rbd_add_parse_args(const char *buf, ...@@ -4682,6 +4802,38 @@ static int rbd_add_parse_args(const char *buf,
return ret; return ret;
} }
/*
* Return pool id (>= 0) or a negative error code.
*/
static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
{
u64 newest_epoch;
unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
int tries = 0;
int ret;
again:
ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
if (ret == -ENOENT && tries++ < 1) {
ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
&newest_epoch);
if (ret < 0)
return ret;
if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
ceph_monc_request_next_osdmap(&rbdc->client->monc);
(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
newest_epoch, timeout);
goto again;
} else {
/* the osdmap we have is new enough */
return -ENOENT;
}
}
return ret;
}
/* /*
* An rbd format 2 image has a unique identifier, distinct from the * An rbd format 2 image has a unique identifier, distinct from the
* name given to it by the user. Internally, that identifier is * name given to it by the user. Internally, that identifier is
...@@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) ...@@ -4752,7 +4904,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
image_id = ceph_extract_encoded_string(&p, p + ret, image_id = ceph_extract_encoded_string(&p, p + ret,
NULL, GFP_NOIO); NULL, GFP_NOIO);
ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; ret = PTR_ERR_OR_ZERO(image_id);
if (!ret) if (!ret)
rbd_dev->image_format = 2; rbd_dev->image_format = 2;
} else { } else {
...@@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ...@@ -4907,6 +5059,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
if (ret) if (ret)
goto err_out_disk; goto err_out_disk;
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
ret = rbd_bus_add_dev(rbd_dev); ret = rbd_bus_add_dev(rbd_dev);
if (ret) if (ret)
...@@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus, ...@@ -5053,7 +5206,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
struct rbd_options *rbd_opts = NULL; struct rbd_options *rbd_opts = NULL;
struct rbd_spec *spec = NULL; struct rbd_spec *spec = NULL;
struct rbd_client *rbdc; struct rbd_client *rbdc;
struct ceph_osd_client *osdc;
bool read_only; bool read_only;
int rc = -ENOMEM; int rc = -ENOMEM;
...@@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus, ...@@ -5075,8 +5227,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
} }
/* pick the pool */ /* pick the pool */
osdc = &rbdc->client->osdc; rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
if (rc < 0) if (rc < 0)
goto err_out_client; goto err_out_client;
spec->pool_id = (u64)rc; spec->pool_id = (u64)rc;
...@@ -5387,6 +5538,7 @@ static int __init rbd_init(void) ...@@ -5387,6 +5538,7 @@ static int __init rbd_init(void)
static void __exit rbd_exit(void) static void __exit rbd_exit(void)
{ {
ida_destroy(&rbd_dev_id_ida);
rbd_sysfs_cleanup(); rbd_sysfs_cleanup();
if (single_major) if (single_major)
unregister_blkdev(rbd_major, RBD_DRV_NAME); unregister_blkdev(rbd_major, RBD_DRV_NAME);
......
...@@ -104,12 +104,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type) ...@@ -104,12 +104,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
struct dentry *dentry; struct dentry *dentry;
if (acl) {
ret = posix_acl_valid(acl);
if (ret < 0)
goto out;
}
switch (type) { switch (type) {
case ACL_TYPE_ACCESS: case ACL_TYPE_ACCESS:
name = POSIX_ACL_XATTR_ACCESS; name = POSIX_ACL_XATTR_ACCESS;
......
...@@ -211,18 +211,15 @@ static int readpage_nounlock(struct file *filp, struct page *page) ...@@ -211,18 +211,15 @@ static int readpage_nounlock(struct file *filp, struct page *page)
SetPageError(page); SetPageError(page);
ceph_fscache_readpage_cancel(inode, page); ceph_fscache_readpage_cancel(inode, page);
goto out; goto out;
} else {
if (err < PAGE_CACHE_SIZE) {
/* zero fill remainder of page */
zero_user_segment(page, err, PAGE_CACHE_SIZE);
} else {
flush_dcache_page(page);
}
} }
SetPageUptodate(page); if (err < PAGE_CACHE_SIZE)
/* zero fill remainder of page */
zero_user_segment(page, err, PAGE_CACHE_SIZE);
else
flush_dcache_page(page);
if (err >= 0) SetPageUptodate(page);
ceph_readpage_to_fscache(inode, page); ceph_readpage_to_fscache(inode, page);
out: out:
return err < 0 ? err : 0; return err < 0 ? err : 0;
......
...@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc, ...@@ -221,8 +221,8 @@ int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
return 0; return 0;
} }
static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc, struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx) struct ceph_cap_reservation *ctx)
{ {
struct ceph_cap *cap = NULL; struct ceph_cap *cap = NULL;
...@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, ...@@ -508,15 +508,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
* it is < 0. (This is so we can atomically add the cap and add an * it is < 0. (This is so we can atomically add the cap and add an
* open file reference to it.) * open file reference to it.)
*/ */
int ceph_add_cap(struct inode *inode, void ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session, u64 cap_id, struct ceph_mds_session *session, u64 cap_id,
int fmode, unsigned issued, unsigned wanted, int fmode, unsigned issued, unsigned wanted,
unsigned seq, unsigned mseq, u64 realmino, int flags, unsigned seq, unsigned mseq, u64 realmino, int flags,
struct ceph_cap_reservation *caps_reservation) struct ceph_cap **new_cap)
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *new_cap = NULL;
struct ceph_cap *cap; struct ceph_cap *cap;
int mds = session->s_mds; int mds = session->s_mds;
int actual_wanted; int actual_wanted;
...@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode, ...@@ -531,20 +530,10 @@ int ceph_add_cap(struct inode *inode,
if (fmode >= 0) if (fmode >= 0)
wanted |= ceph_caps_for_mode(fmode); wanted |= ceph_caps_for_mode(fmode);
retry:
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); cap = __get_cap_for_mds(ci, mds);
if (!cap) { if (!cap) {
if (new_cap) { cap = *new_cap;
cap = new_cap; *new_cap = NULL;
new_cap = NULL;
} else {
spin_unlock(&ci->i_ceph_lock);
new_cap = get_cap(mdsc, caps_reservation);
if (new_cap == NULL)
return -ENOMEM;
goto retry;
}
cap->issued = 0; cap->issued = 0;
cap->implemented = 0; cap->implemented = 0;
...@@ -562,9 +551,6 @@ int ceph_add_cap(struct inode *inode, ...@@ -562,9 +551,6 @@ int ceph_add_cap(struct inode *inode,
session->s_nr_caps++; session->s_nr_caps++;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
} else { } else {
if (new_cap)
ceph_put_cap(mdsc, new_cap);
/* /*
* auth mds of the inode changed. we received the cap export * auth mds of the inode changed. we received the cap export
* message, but still haven't received the cap import message. * message, but still haven't received the cap import message.
...@@ -626,7 +612,6 @@ int ceph_add_cap(struct inode *inode, ...@@ -626,7 +612,6 @@ int ceph_add_cap(struct inode *inode,
ci->i_auth_cap = cap; ci->i_auth_cap = cap;
cap->mds_wanted = wanted; cap->mds_wanted = wanted;
} }
ci->i_cap_exporting_issued = 0;
} else { } else {
WARN_ON(ci->i_auth_cap == cap); WARN_ON(ci->i_auth_cap == cap);
} }
...@@ -648,9 +633,6 @@ int ceph_add_cap(struct inode *inode, ...@@ -648,9 +633,6 @@ int ceph_add_cap(struct inode *inode,
if (fmode >= 0) if (fmode >= 0)
__ceph_get_fmode(ci, fmode); __ceph_get_fmode(ci, fmode);
spin_unlock(&ci->i_ceph_lock);
wake_up_all(&ci->i_cap_wq);
return 0;
} }
/* /*
...@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap) ...@@ -685,7 +667,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
*/ */
int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{ {
int have = ci->i_snap_caps | ci->i_cap_exporting_issued; int have = ci->i_snap_caps;
struct ceph_cap *cap; struct ceph_cap *cap;
struct rb_node *p; struct rb_node *p;
...@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) ...@@ -900,7 +882,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
*/ */
static int __ceph_is_any_caps(struct ceph_inode_info *ci) static int __ceph_is_any_caps(struct ceph_inode_info *ci)
{ {
return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued; return !RB_EMPTY_ROOT(&ci->i_caps);
} }
int ceph_is_any_caps(struct inode *inode) int ceph_is_any_caps(struct inode *inode)
...@@ -2397,32 +2379,30 @@ static void invalidate_aliases(struct inode *inode) ...@@ -2397,32 +2379,30 @@ static void invalidate_aliases(struct inode *inode)
* actually be a revocation if it specifies a smaller cap set.) * actually be a revocation if it specifies a smaller cap set.)
* *
* caller holds s_mutex and i_ceph_lock, we drop both. * caller holds s_mutex and i_ceph_lock, we drop both.
*
* return value:
* 0 - ok
* 1 - check_caps on auth cap only (writeback)
* 2 - check_caps (ack revoke)
*/ */
static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap, struct ceph_cap *cap, int issued)
struct ceph_buffer *xattr_buf) __releases(ci->i_ceph_lock)
__releases(ci->i_ceph_lock)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds; int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq); int seq = le32_to_cpu(grant->seq);
int newcaps = le32_to_cpu(grant->caps); int newcaps = le32_to_cpu(grant->caps);
int issued, implemented, used, wanted, dirty; int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size); u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size); u64 max_size = le64_to_cpu(grant->max_size);
struct timespec mtime, atime, ctime; struct timespec mtime, atime, ctime;
int check_caps = 0; int check_caps = 0;
int wake = 0; bool wake = 0;
int writeback = 0; bool writeback = 0;
int queue_invalidate = 0; bool queue_trunc = 0;
int deleted_inode = 0; bool queue_invalidate = 0;
int queue_revalidate = 0; bool queue_revalidate = 0;
bool deleted_inode = 0;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps)); inode, cap, mds, seq, ceph_cap_string(newcaps));
...@@ -2466,16 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2466,16 +2446,13 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
} }
/* side effects now are allowed */ /* side effects now are allowed */
issued = __ceph_caps_issued(ci, &implemented);
issued |= implemented | __ceph_caps_dirty(ci);
cap->cap_gen = session->s_cap_gen; cap->cap_gen = session->s_cap_gen;
cap->seq = seq; cap->seq = seq;
__check_cap_issue(ci, cap, newcaps); __check_cap_issue(ci, cap, newcaps);
if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(grant->mode); inode->i_mode = le32_to_cpu(grant->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
...@@ -2484,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2484,7 +2461,8 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
from_kgid(&init_user_ns, inode->i_gid)); from_kgid(&init_user_ns, inode->i_gid));
} }
if ((issued & CEPH_CAP_LINK_EXCL) == 0) { if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_LINK_EXCL) == 0) {
set_nlink(inode, le32_to_cpu(grant->nlink)); set_nlink(inode, le32_to_cpu(grant->nlink));
if (inode->i_nlink == 0 && if (inode->i_nlink == 0 &&
(newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
...@@ -2511,30 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2511,30 +2489,35 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1) if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
queue_revalidate = 1; queue_revalidate = 1;
/* size/ctime/mtime/atime? */ if (newcaps & CEPH_CAP_ANY_RD) {
ceph_fill_file_size(inode, issued, /* ctime/mtime/atime? */
le32_to_cpu(grant->truncate_seq), ceph_decode_timespec(&mtime, &grant->mtime);
le64_to_cpu(grant->truncate_size), size); ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&mtime, &grant->mtime); ceph_decode_timespec(&ctime, &grant->ctime);
ceph_decode_timespec(&atime, &grant->atime); ceph_fill_file_time(inode, issued,
ceph_decode_timespec(&ctime, &grant->ctime); le32_to_cpu(grant->time_warp_seq),
ceph_fill_file_time(inode, issued, &ctime, &mtime, &atime);
le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, }
&atime);
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
/* file layout may have changed */
/* file layout may have changed */ ci->i_layout = grant->layout;
ci->i_layout = grant->layout; /* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued,
/* max size increase? */ le32_to_cpu(grant->truncate_seq),
if (ci->i_auth_cap == cap && max_size != ci->i_max_size) { le64_to_cpu(grant->truncate_size),
dout("max_size %lld -> %llu\n", ci->i_max_size, max_size); size);
ci->i_max_size = max_size; /* max size increase? */
if (max_size >= ci->i_wanted_max_size) { if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
ci->i_wanted_max_size = 0; /* reset */ dout("max_size %lld -> %llu\n",
ci->i_requested_max_size = 0; ci->i_max_size, max_size);
ci->i_max_size = max_size;
if (max_size >= ci->i_wanted_max_size) {
ci->i_wanted_max_size = 0; /* reset */
ci->i_requested_max_size = 0;
}
wake = 1;
} }
wake = 1;
} }
/* check cap bits */ /* check cap bits */
...@@ -2595,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2595,6 +2578,23 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace,
snaptrace + snaptrace_len, false);
downgrade_write(&mdsc->snap_rwsem);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
if (newcaps & ~issued)
wake = 1;
}
if (queue_trunc) {
ceph_queue_vmtruncate(inode);
ceph_queue_revalidate(inode);
} else if (queue_revalidate)
ceph_queue_revalidate(inode);
if (writeback) if (writeback)
/* /*
* queue inode for writeback: we can't actually call * queue inode for writeback: we can't actually call
...@@ -2606,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2606,8 +2606,6 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
ceph_queue_invalidate(inode); ceph_queue_invalidate(inode);
if (deleted_inode) if (deleted_inode)
invalidate_aliases(inode); invalidate_aliases(inode);
if (queue_revalidate)
ceph_queue_revalidate(inode);
if (wake) if (wake)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
...@@ -2784,7 +2782,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2784,7 +2782,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_session *tsession = NULL; struct ceph_mds_session *tsession = NULL;
struct ceph_cap *cap, *tcap; struct ceph_cap *cap, *tcap, *new_cap = NULL;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 t_cap_id; u64 t_cap_id;
unsigned mseq = le32_to_cpu(ex->migrate_seq); unsigned mseq = le32_to_cpu(ex->migrate_seq);
...@@ -2807,7 +2805,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2807,7 +2805,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
retry: retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); cap = __get_cap_for_mds(ci, mds);
if (!cap) if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
goto out_unlock; goto out_unlock;
if (target < 0) { if (target < 0) {
...@@ -2846,15 +2844,14 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2846,15 +2844,14 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
} }
__ceph_remove_cap(cap, false); __ceph_remove_cap(cap, false);
goto out_unlock; goto out_unlock;
} } else if (tsession) {
if (tsession) {
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
spin_unlock(&ci->i_ceph_lock);
/* add placeholder for the export tagert */ /* add placeholder for the export tagert */
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
t_seq - 1, t_mseq, (u64)-1, flag, NULL); t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
goto retry;
__ceph_remove_cap(cap, false);
goto out_unlock;
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
...@@ -2873,6 +2870,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2873,6 +2870,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
SINGLE_DEPTH_NESTING); SINGLE_DEPTH_NESTING);
} }
ceph_add_cap_releases(mdsc, tsession); ceph_add_cap_releases(mdsc, tsession);
new_cap = ceph_get_cap(mdsc, NULL);
} else { } else {
WARN_ON(1); WARN_ON(1);
tsession = NULL; tsession = NULL;
...@@ -2887,24 +2885,27 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -2887,24 +2885,27 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
mutex_unlock(&tsession->s_mutex); mutex_unlock(&tsession->s_mutex);
ceph_put_mds_session(tsession); ceph_put_mds_session(tsession);
} }
if (new_cap)
ceph_put_cap(mdsc, new_cap);
} }
/* /*
* Handle cap IMPORT. If there are temp bits from an older EXPORT, * Handle cap IMPORT.
* clean them up.
* *
* caller holds s_mutex. * caller holds s_mutex. acquires i_ceph_lock
*/ */
static void handle_cap_import(struct ceph_mds_client *mdsc, static void handle_cap_import(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *im, struct inode *inode, struct ceph_mds_caps *im,
struct ceph_mds_cap_peer *ph, struct ceph_mds_cap_peer *ph,
struct ceph_mds_session *session, struct ceph_mds_session *session,
void *snaptrace, int snaptrace_len) struct ceph_cap **target_cap, int *old_issued)
__acquires(ci->i_ceph_lock)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_cap *cap; struct ceph_cap *cap, *ocap, *new_cap = NULL;
int mds = session->s_mds; int mds = session->s_mds;
unsigned issued = le32_to_cpu(im->caps); int issued;
unsigned caps = le32_to_cpu(im->caps);
unsigned wanted = le32_to_cpu(im->wanted); unsigned wanted = le32_to_cpu(im->wanted);
unsigned seq = le32_to_cpu(im->seq); unsigned seq = le32_to_cpu(im->seq);
unsigned mseq = le32_to_cpu(im->migrate_seq); unsigned mseq = le32_to_cpu(im->migrate_seq);
...@@ -2924,40 +2925,52 @@ static void handle_cap_import(struct ceph_mds_client *mdsc, ...@@ -2924,40 +2925,52 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
inode, ci, mds, mseq, peer); inode, ci, mds, mseq, peer);
retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; cap = __get_cap_for_mds(ci, mds);
if (cap && cap->cap_id == p_cap_id) { if (!cap) {
if (!new_cap) {
spin_unlock(&ci->i_ceph_lock);
new_cap = ceph_get_cap(mdsc, NULL);
goto retry;
}
cap = new_cap;
} else {
if (new_cap) {
ceph_put_cap(mdsc, new_cap);
new_cap = NULL;
}
}
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
if (ocap && ocap->cap_id == p_cap_id) {
dout(" remove export cap %p mds%d flags %d\n", dout(" remove export cap %p mds%d flags %d\n",
cap, peer, ph->flags); ocap, peer, ph->flags);
if ((ph->flags & CEPH_CAP_FLAG_AUTH) && if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
(cap->seq != le32_to_cpu(ph->seq) || (ocap->seq != le32_to_cpu(ph->seq) ||
cap->mseq != le32_to_cpu(ph->mseq))) { ocap->mseq != le32_to_cpu(ph->mseq))) {
pr_err("handle_cap_import: mismatched seq/mseq: " pr_err("handle_cap_import: mismatched seq/mseq: "
"ino (%llx.%llx) mds%d seq %d mseq %d " "ino (%llx.%llx) mds%d seq %d mseq %d "
"importer mds%d has peer seq %d mseq %d\n", "importer mds%d has peer seq %d mseq %d\n",
ceph_vinop(inode), peer, cap->seq, ceph_vinop(inode), peer, ocap->seq,
cap->mseq, mds, le32_to_cpu(ph->seq), ocap->mseq, mds, le32_to_cpu(ph->seq),
le32_to_cpu(ph->mseq)); le32_to_cpu(ph->mseq));
} }
ci->i_cap_exporting_issued = cap->issued; __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
__ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
} }
/* make sure we re-request max_size, if necessary */ /* make sure we re-request max_size, if necessary */
ci->i_wanted_max_size = 0; ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0; ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock);
down_write(&mdsc->snap_rwsem);
ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
false);
downgrade_write(&mdsc->snap_rwsem);
ceph_add_cap(inode, session, cap_id, -1,
issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
NULL /* no caps context */);
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
*old_issued = issued;
*target_cap = cap;
} }
/* /*
...@@ -2977,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2977,7 +2990,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_mds_caps *h; struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL; struct ceph_mds_cap_peer *peer = NULL;
int mds = session->s_mds; int mds = session->s_mds;
int op; int op, issued;
u32 seq, mseq; u32 seq, mseq;
struct ceph_vino vino; struct ceph_vino vino;
u64 cap_id; u64 cap_id;
...@@ -3069,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3069,7 +3082,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_IMPORT: case CEPH_CAP_OP_IMPORT:
handle_cap_import(mdsc, inode, h, peer, session, handle_cap_import(mdsc, inode, h, peer, session,
snaptrace, snaptrace_len); &cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
msg->middle, session, cap, issued);
goto done_unlocked;
} }
/* the rest require a cap */ /* the rest require a cap */
...@@ -3086,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3086,8 +3102,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) { switch (op) {
case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT: case CEPH_CAP_OP_GRANT:
case CEPH_CAP_OP_IMPORT: __ceph_caps_issued(ci, &issued);
handle_cap_grant(inode, h, session, cap, msg->middle); issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
session, cap, issued);
goto done_unlocked; goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK: case CEPH_CAP_OP_FLUSH_ACK:
......
...@@ -169,7 +169,7 @@ static struct dentry *__get_parent(struct super_block *sb, ...@@ -169,7 +169,7 @@ static struct dentry *__get_parent(struct super_block *sb,
return dentry; return dentry;
} }
struct dentry *ceph_get_parent(struct dentry *child) static struct dentry *ceph_get_parent(struct dentry *child)
{ {
/* don't re-export snaps */ /* don't re-export snaps */
if (ceph_snap(child->d_inode) != CEPH_NOSNAP) if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <linux/writeback.h> #include <linux/writeback.h>
#include <linux/vmalloc.h> #include <linux/vmalloc.h>
#include <linux/posix_acl.h> #include <linux/posix_acl.h>
#include <linux/random.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
...@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f) ...@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
* specified, copy the frag delegation info to the caller if * specified, copy the frag delegation info to the caller if
* it is present. * it is present.
*/ */
u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
struct ceph_inode_frag *pfrag, struct ceph_inode_frag *pfrag, int *found)
int *found)
{ {
u32 t = ceph_frag_make(0, 0); u32 t = ceph_frag_make(0, 0);
struct ceph_inode_frag *frag; struct ceph_inode_frag *frag;
...@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, ...@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
if (found) if (found)
*found = 0; *found = 0;
mutex_lock(&ci->i_fragtree_mutex);
while (1) { while (1) {
WARN_ON(!ceph_frag_contains_value(t, v)); WARN_ON(!ceph_frag_contains_value(t, v));
frag = __ceph_find_frag(ci, t); frag = __ceph_find_frag(ci, t);
...@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v, ...@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
} }
dout("choose_frag(%x) = %x\n", v, t); dout("choose_frag(%x) = %x\n", v, t);
mutex_unlock(&ci->i_fragtree_mutex);
return t; return t;
} }
u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
struct ceph_inode_frag *pfrag, int *found)
{
u32 ret;
mutex_lock(&ci->i_fragtree_mutex);
ret = __ceph_choose_frag(ci, v, pfrag, found);
mutex_unlock(&ci->i_fragtree_mutex);
return ret;
}
/* /*
* Process dirfrag (delegation) info from the mds. Include leaf * Process dirfrag (delegation) info from the mds. Include leaf
* fragment in tree ONLY if ndist > 0. Otherwise, only * fragment in tree ONLY if ndist > 0. Otherwise, only
...@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode, ...@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,
u32 id = le32_to_cpu(dirinfo->frag); u32 id = le32_to_cpu(dirinfo->frag);
int mds = le32_to_cpu(dirinfo->auth); int mds = le32_to_cpu(dirinfo->auth);
int ndist = le32_to_cpu(dirinfo->ndist); int ndist = le32_to_cpu(dirinfo->ndist);
int diri_auth = -1;
int i; int i;
int err = 0; int err = 0;
spin_lock(&ci->i_ceph_lock);
if (ci->i_auth_cap)
diri_auth = ci->i_auth_cap->mds;
spin_unlock(&ci->i_ceph_lock);
mutex_lock(&ci->i_fragtree_mutex); mutex_lock(&ci->i_fragtree_mutex);
if (ndist == 0) { if (ndist == 0 && mds == diri_auth) {
/* no delegation info needed. */ /* no delegation info needed. */
frag = __ceph_find_frag(ci, id); frag = __ceph_find_frag(ci, id);
if (!frag) if (!frag)
...@@ -286,6 +300,75 @@ static int ceph_fill_dirfrag(struct inode *inode, ...@@ -286,6 +300,75 @@ static int ceph_fill_dirfrag(struct inode *inode,
return err; return err;
} }
static int ceph_fill_fragtree(struct inode *inode,
struct ceph_frag_tree_head *fragtree,
struct ceph_mds_reply_dirfrag *dirinfo)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_inode_frag *frag;
struct rb_node *rb_node;
int i;
u32 id, nsplits;
bool update = false;
mutex_lock(&ci->i_fragtree_mutex);
nsplits = le32_to_cpu(fragtree->nsplits);
if (nsplits) {
i = prandom_u32() % nsplits;
id = le32_to_cpu(fragtree->splits[i].frag);
if (!__ceph_find_frag(ci, id))
update = true;
} else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
rb_node = rb_first(&ci->i_fragtree);
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
update = true;
}
if (!update && dirinfo) {
id = le32_to_cpu(dirinfo->frag);
if (id != __ceph_choose_frag(ci, id, NULL, NULL))
update = true;
}
if (!update)
goto out_unlock;
dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
rb_node = rb_first(&ci->i_fragtree);
for (i = 0; i < nsplits; i++) {
id = le32_to_cpu(fragtree->splits[i].frag);
frag = NULL;
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
if (ceph_frag_compare(frag->frag, id) >= 0) {
if (frag->frag != id)
frag = NULL;
else
rb_node = rb_next(rb_node);
break;
}
rb_node = rb_next(rb_node);
rb_erase(&frag->node, &ci->i_fragtree);
kfree(frag);
frag = NULL;
}
if (!frag) {
frag = __get_or_create_frag(ci, id);
if (IS_ERR(frag))
continue;
}
frag->split_by = le32_to_cpu(fragtree->splits[i].by);
dout(" frag %x split by %d\n", frag->frag, frag->split_by);
}
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
rb_node = rb_next(rb_node);
rb_erase(&frag->node, &ci->i_fragtree);
kfree(frag);
}
out_unlock:
mutex_unlock(&ci->i_fragtree_mutex);
return 0;
}
/* /*
* initialize a newly allocated inode. * initialize a newly allocated inode.
...@@ -341,7 +424,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -341,7 +424,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_cap_snaps); INIT_LIST_HEAD(&ci->i_cap_snaps);
ci->i_head_snapc = NULL; ci->i_head_snapc = NULL;
ci->i_snap_caps = 0; ci->i_snap_caps = 0;
ci->i_cap_exporting_issued = 0;
for (i = 0; i < CEPH_FILE_MODE_NUM; i++) for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
ci->i_nr_by_mode[i] = 0; ci->i_nr_by_mode[i] = 0;
...@@ -407,7 +489,7 @@ void ceph_destroy_inode(struct inode *inode) ...@@ -407,7 +489,7 @@ void ceph_destroy_inode(struct inode *inode)
/* /*
* we may still have a snap_realm reference if there are stray * we may still have a snap_realm reference if there are stray
* caps in i_cap_exporting_issued or i_snap_caps. * caps in i_snap_caps.
*/ */
if (ci->i_snap_realm) { if (ci->i_snap_realm) {
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
...@@ -582,22 +664,26 @@ static int fill_inode(struct inode *inode, ...@@ -582,22 +664,26 @@ static int fill_inode(struct inode *inode,
unsigned long ttl_from, int cap_fmode, unsigned long ttl_from, int cap_fmode,
struct ceph_cap_reservation *caps_reservation) struct ceph_cap_reservation *caps_reservation)
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct ceph_mds_reply_inode *info = iinfo->in; struct ceph_mds_reply_inode *info = iinfo->in;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int i; int issued = 0, implemented, new_issued;
int issued = 0, implemented;
struct timespec mtime, atime, ctime; struct timespec mtime, atime, ctime;
u32 nsplits;
struct ceph_inode_frag *frag;
struct rb_node *rb_node;
struct ceph_buffer *xattr_blob = NULL; struct ceph_buffer *xattr_blob = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0; int err = 0;
int queue_trunc = 0; bool wake = false;
bool queue_trunc = false;
bool new_version = false;
dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
inode, ceph_vinop(inode), le64_to_cpu(info->version), inode, ceph_vinop(inode), le64_to_cpu(info->version),
ci->i_version); ci->i_version);
/* prealloc new cap struct */
if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
new_cap = ceph_get_cap(mdsc, caps_reservation);
/* /*
* prealloc xattr data, if it looks like we'll need it. only * prealloc xattr data, if it looks like we'll need it. only
* if len > 4 (meaning there are actually xattrs; the first 4 * if len > 4 (meaning there are actually xattrs; the first 4
...@@ -623,19 +709,23 @@ static int fill_inode(struct inode *inode, ...@@ -623,19 +709,23 @@ static int fill_inode(struct inode *inode,
* 3 2 skip * 3 2 skip
* 3 3 update * 3 3 update
*/ */
if (le64_to_cpu(info->version) > 0 && if (ci->i_version == 0 ||
(ci->i_version & ~1) >= le64_to_cpu(info->version)) ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
goto no_change; le64_to_cpu(info->version) > (ci->i_version & ~1)))
new_version = true;
issued = __ceph_caps_issued(ci, &implemented); issued = __ceph_caps_issued(ci, &implemented);
issued |= implemented | __ceph_caps_dirty(ci); issued |= implemented | __ceph_caps_dirty(ci);
new_issued = ~issued & le32_to_cpu(info->cap.caps);
/* update inode */ /* update inode */
ci->i_version = le64_to_cpu(info->version); ci->i_version = le64_to_cpu(info->version);
inode->i_version++; inode->i_version++;
inode->i_rdev = le32_to_cpu(info->rdev); inode->i_rdev = le32_to_cpu(info->rdev);
inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
if ((issued & CEPH_CAP_AUTH_EXCL) == 0) { if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
(issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(info->mode); inode->i_mode = le32_to_cpu(info->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid)); inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid)); inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
...@@ -644,23 +734,35 @@ static int fill_inode(struct inode *inode, ...@@ -644,23 +734,35 @@ static int fill_inode(struct inode *inode,
from_kgid(&init_user_ns, inode->i_gid)); from_kgid(&init_user_ns, inode->i_gid));
} }
if ((issued & CEPH_CAP_LINK_EXCL) == 0) if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
(issued & CEPH_CAP_LINK_EXCL) == 0)
set_nlink(inode, le32_to_cpu(info->nlink)); set_nlink(inode, le32_to_cpu(info->nlink));
/* be careful with mtime, atime, size */ if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
ceph_decode_timespec(&atime, &info->atime); /* be careful with mtime, atime, size */
ceph_decode_timespec(&mtime, &info->mtime); ceph_decode_timespec(&atime, &info->atime);
ceph_decode_timespec(&ctime, &info->ctime); ceph_decode_timespec(&mtime, &info->mtime);
queue_trunc = ceph_fill_file_size(inode, issued, ceph_decode_timespec(&ctime, &info->ctime);
le32_to_cpu(info->truncate_seq), ceph_fill_file_time(inode, issued,
le64_to_cpu(info->truncate_size), le32_to_cpu(info->time_warp_seq),
le64_to_cpu(info->size)); &ctime, &mtime, &atime);
ceph_fill_file_time(inode, issued, }
le32_to_cpu(info->time_warp_seq),
&ctime, &mtime, &atime); if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
ci->i_layout = info->layout; ci->i_layout = info->layout;
inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1; queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq),
le64_to_cpu(info->truncate_size),
le64_to_cpu(info->size));
/* only update max_size on auth cap */
if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
ci->i_max_size != le64_to_cpu(info->max_size)) {
dout("max_size %lld -> %llu\n", ci->i_max_size,
le64_to_cpu(info->max_size));
ci->i_max_size = le64_to_cpu(info->max_size);
}
}
/* xattrs */ /* xattrs */
/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
...@@ -745,58 +847,6 @@ static int fill_inode(struct inode *inode, ...@@ -745,58 +847,6 @@ static int fill_inode(struct inode *inode,
dout(" marking %p complete (empty)\n", inode); dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
} }
no_change:
/* only update max_size on auth cap */
if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
ci->i_max_size != le64_to_cpu(info->max_size)) {
dout("max_size %lld -> %llu\n", ci->i_max_size,
le64_to_cpu(info->max_size));
ci->i_max_size = le64_to_cpu(info->max_size);
}
spin_unlock(&ci->i_ceph_lock);
/* queue truncate if we saw i_size decrease */
if (queue_trunc)
ceph_queue_vmtruncate(inode);
/* populate frag tree */
/* FIXME: move me up, if/when version reflects fragtree changes */
nsplits = le32_to_cpu(info->fragtree.nsplits);
mutex_lock(&ci->i_fragtree_mutex);
rb_node = rb_first(&ci->i_fragtree);
for (i = 0; i < nsplits; i++) {
u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
frag = NULL;
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
if (ceph_frag_compare(frag->frag, id) >= 0) {
if (frag->frag != id)
frag = NULL;
else
rb_node = rb_next(rb_node);
break;
}
rb_node = rb_next(rb_node);
rb_erase(&frag->node, &ci->i_fragtree);
kfree(frag);
frag = NULL;
}
if (!frag) {
frag = __get_or_create_frag(ci, id);
if (IS_ERR(frag))
continue;
}
frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
dout(" frag %x split by %d\n", frag->frag, frag->split_by);
}
while (rb_node) {
frag = rb_entry(rb_node, struct ceph_inode_frag, node);
rb_node = rb_next(rb_node);
rb_erase(&frag->node, &ci->i_fragtree);
kfree(frag);
}
mutex_unlock(&ci->i_fragtree_mutex);
/* were we issued a capability? */ /* were we issued a capability? */
if (info->cap.caps) { if (info->cap.caps) {
...@@ -809,30 +859,41 @@ static int fill_inode(struct inode *inode, ...@@ -809,30 +859,41 @@ static int fill_inode(struct inode *inode,
le32_to_cpu(info->cap.seq), le32_to_cpu(info->cap.seq),
le32_to_cpu(info->cap.mseq), le32_to_cpu(info->cap.mseq),
le64_to_cpu(info->cap.realm), le64_to_cpu(info->cap.realm),
info->cap.flags, info->cap.flags, &new_cap);
caps_reservation); wake = true;
} else { } else {
spin_lock(&ci->i_ceph_lock);
dout(" %p got snap_caps %s\n", inode, dout(" %p got snap_caps %s\n", inode,
ceph_cap_string(le32_to_cpu(info->cap.caps))); ceph_cap_string(le32_to_cpu(info->cap.caps)));
ci->i_snap_caps |= le32_to_cpu(info->cap.caps); ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
if (cap_fmode >= 0) if (cap_fmode >= 0)
__ceph_get_fmode(ci, cap_fmode); __ceph_get_fmode(ci, cap_fmode);
spin_unlock(&ci->i_ceph_lock);
} }
} else if (cap_fmode >= 0) { } else if (cap_fmode >= 0) {
pr_warn("mds issued no caps on %llx.%llx\n", pr_warn("mds issued no caps on %llx.%llx\n",
ceph_vinop(inode)); ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode); __ceph_get_fmode(ci, cap_fmode);
} }
spin_unlock(&ci->i_ceph_lock);
if (wake)
wake_up_all(&ci->i_cap_wq);
/* queue truncate if we saw i_size decrease */
if (queue_trunc)
ceph_queue_vmtruncate(inode);
/* populate frag tree */
if (S_ISDIR(inode->i_mode))
ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
/* update delegation info? */ /* update delegation info? */
if (dirinfo) if (dirinfo)
ceph_fill_dirfrag(inode, dirinfo); ceph_fill_dirfrag(inode, dirinfo);
err = 0; err = 0;
out: out:
if (new_cap)
ceph_put_cap(mdsc, new_cap);
if (xattr_blob) if (xattr_blob)
ceph_buffer_put(xattr_blob); ceph_buffer_put(xattr_blob);
return err; return err;
...@@ -1485,7 +1546,7 @@ static void ceph_invalidate_work(struct work_struct *work) ...@@ -1485,7 +1546,7 @@ static void ceph_invalidate_work(struct work_struct *work)
orig_gen = ci->i_rdcache_gen; orig_gen = ci->i_rdcache_gen;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
truncate_inode_pages(inode->i_mapping, 0); truncate_pagecache(inode, 0);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (orig_gen == ci->i_rdcache_gen && if (orig_gen == ci->i_rdcache_gen &&
...@@ -1588,7 +1649,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode) ...@@ -1588,7 +1649,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
ci->i_truncate_pending, to); ci->i_truncate_pending, to);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
truncate_inode_pages(inode->i_mapping, to); truncate_pagecache(inode, to);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (to == ci->i_truncate_size) { if (to == ci->i_truncate_size) {
......
...@@ -1558,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) ...@@ -1558,6 +1558,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
init_completion(&req->r_safe_completion); init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_unsafe_item);
req->r_stamp = CURRENT_TIME;
req->r_op = op; req->r_op = op;
req->r_direct_mode = mode; req->r_direct_mode = mode;
return req; return req;
...@@ -1783,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1783,7 +1785,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
} }
len = sizeof(*head) + len = sizeof(*head) +
pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
sizeof(struct timespec);
/* calculate (max) length for cap releases */ /* calculate (max) length for cap releases */
len += sizeof(struct ceph_mds_request_release) * len += sizeof(struct ceph_mds_request_release) *
...@@ -1800,6 +1803,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1800,6 +1803,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
goto out_free2; goto out_free2;
} }
msg->hdr.version = 2;
msg->hdr.tid = cpu_to_le64(req->r_tid); msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base; head = msg->front.iov_base;
...@@ -1836,6 +1840,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1836,6 +1840,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
head->num_releases = cpu_to_le16(releases); head->num_releases = cpu_to_le16(releases);
/* time stamp */
ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
BUG_ON(p > end); BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base; msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
......
...@@ -194,6 +194,7 @@ struct ceph_mds_request { ...@@ -194,6 +194,7 @@ struct ceph_mds_request {
int r_fmode; /* file mode, if expecting cap */ int r_fmode; /* file mode, if expecting cap */
kuid_t r_uid; kuid_t r_uid;
kgid_t r_gid; kgid_t r_gid;
struct timespec r_stamp;
/* for choosing which mds to send this request to */ /* for choosing which mds to send this request to */
int r_direct_mode; int r_direct_mode;
......
...@@ -292,7 +292,6 @@ struct ceph_inode_info { ...@@ -292,7 +292,6 @@ struct ceph_inode_info {
struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or struct ceph_snap_context *i_head_snapc; /* set if wr_buffer_head > 0 or
dirty|flushing caps */ dirty|flushing caps */
unsigned i_snap_caps; /* cap bits for snapped files */ unsigned i_snap_caps; /* cap bits for snapped files */
unsigned i_cap_exporting_issued;
int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */ int i_nr_by_mode[CEPH_FILE_MODE_NUM]; /* open file counts */
...@@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode) ...@@ -775,11 +774,13 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
extern const char *ceph_cap_string(int c); extern const char *ceph_cap_string(int c);
extern void ceph_handle_caps(struct ceph_mds_session *session, extern void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
extern int ceph_add_cap(struct inode *inode, extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, u64 cap_id, struct ceph_cap_reservation *ctx);
int fmode, unsigned issued, unsigned wanted, extern void ceph_add_cap(struct inode *inode,
unsigned cap, unsigned seq, u64 realmino, int flags, struct ceph_mds_session *session, u64 cap_id,
struct ceph_cap_reservation *caps_reservation); int fmode, unsigned issued, unsigned wanted,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
extern void ceph_put_cap(struct ceph_mds_client *mdsc, extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap); struct ceph_cap *cap);
......
...@@ -625,6 +625,8 @@ int ceph_flags_to_mode(int flags); ...@@ -625,6 +625,8 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_LINK_EXCL | \ CEPH_CAP_LINK_EXCL | \
CEPH_CAP_XATTR_EXCL | \ CEPH_CAP_XATTR_EXCL | \
CEPH_CAP_FILE_EXCL) CEPH_CAP_FILE_EXCL)
#define CEPH_CAP_ANY_FILE_RD (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE | \
CEPH_CAP_FILE_SHARED)
#define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | \ #define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER | \
CEPH_CAP_FILE_EXCL) CEPH_CAP_FILE_EXCL)
#define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR) #define CEPH_CAP_ANY_WR (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
......
...@@ -40,9 +40,9 @@ struct ceph_mon_request { ...@@ -40,9 +40,9 @@ struct ceph_mon_request {
}; };
/* /*
* ceph_mon_generic_request is being used for the statfs and poolop requests * ceph_mon_generic_request is being used for the statfs, poolop and
* which are bening done a bit differently because we need to get data back * mon_get_version requests which are being done a bit differently
* to the caller * because we need to get data back to the caller
*/ */
struct ceph_mon_generic_request { struct ceph_mon_generic_request {
struct kref kref; struct kref kref;
...@@ -104,10 +104,15 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have); ...@@ -104,10 +104,15 @@ extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have); extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
unsigned long timeout);
extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
struct ceph_statfs *buf); struct ceph_statfs *buf);
extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
const char *what, u64 *newest);
extern int ceph_monc_open_session(struct ceph_mon_client *monc); extern int ceph_monc_open_session(struct ceph_mon_client *monc);
extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
......
...@@ -72,6 +72,8 @@ const char *ceph_msg_type_name(int type) ...@@ -72,6 +72,8 @@ const char *ceph_msg_type_name(int type)
case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack"; case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
case CEPH_MSG_STATFS: return "statfs"; case CEPH_MSG_STATFS: return "statfs";
case CEPH_MSG_STATFS_REPLY: return "statfs_reply"; case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
case CEPH_MSG_MON_GET_VERSION: return "mon_get_version";
case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";
case CEPH_MSG_MDS_MAP: return "mds_map"; case CEPH_MSG_MDS_MAP: return "mds_map";
case CEPH_MSG_CLIENT_SESSION: return "client_session"; case CEPH_MSG_CLIENT_SESSION: return "client_session";
case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect"; case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
......
...@@ -126,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p) ...@@ -126,9 +126,13 @@ static int monc_show(struct seq_file *s, void *p)
req = rb_entry(rp, struct ceph_mon_generic_request, node); req = rb_entry(rp, struct ceph_mon_generic_request, node);
op = le16_to_cpu(req->request->hdr.type); op = le16_to_cpu(req->request->hdr.type);
if (op == CEPH_MSG_STATFS) if (op == CEPH_MSG_STATFS)
seq_printf(s, "%lld statfs\n", req->tid); seq_printf(s, "%llu statfs\n", req->tid);
else if (op == CEPH_MSG_POOLOP)
seq_printf(s, "%llu poolop\n", req->tid);
else if (op == CEPH_MSG_MON_GET_VERSION)
seq_printf(s, "%llu mon_get_version", req->tid);
else else
seq_printf(s, "%lld unknown\n", req->tid); seq_printf(s, "%llu unknown\n", req->tid);
} }
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
......
...@@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) ...@@ -296,6 +296,33 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
__send_subscribe(monc); __send_subscribe(monc);
mutex_unlock(&monc->mutex); mutex_unlock(&monc->mutex);
} }
EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
unsigned long timeout)
{
unsigned long started = jiffies;
int ret;
mutex_lock(&monc->mutex);
while (monc->have_osdmap < epoch) {
mutex_unlock(&monc->mutex);
if (timeout != 0 && time_after_eq(jiffies, started + timeout))
return -ETIMEDOUT;
ret = wait_event_interruptible_timeout(monc->client->auth_wq,
monc->have_osdmap >= epoch, timeout);
if (ret < 0)
return ret;
mutex_lock(&monc->mutex);
}
mutex_unlock(&monc->mutex);
return 0;
}
EXPORT_SYMBOL(ceph_monc_wait_osdmap);
/* /*
* *
...@@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, ...@@ -477,14 +504,13 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
return m; return m;
} }
static int do_generic_request(struct ceph_mon_client *monc, static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
struct ceph_mon_generic_request *req) struct ceph_mon_generic_request *req)
{ {
int err; int err;
/* register request */ /* register request */
mutex_lock(&monc->mutex); req->tid = tid != 0 ? tid : ++monc->last_tid;
req->tid = ++monc->last_tid;
req->request->hdr.tid = cpu_to_le64(req->tid); req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req); __insert_generic_request(monc, req);
monc->num_generic_requests++; monc->num_generic_requests++;
...@@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc, ...@@ -496,13 +522,24 @@ static int do_generic_request(struct ceph_mon_client *monc,
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
rb_erase(&req->node, &monc->generic_request_tree); rb_erase(&req->node, &monc->generic_request_tree);
monc->num_generic_requests--; monc->num_generic_requests--;
mutex_unlock(&monc->mutex);
if (!err) if (!err)
err = req->result; err = req->result;
return err; return err;
} }
static int do_generic_request(struct ceph_mon_client *monc,
struct ceph_mon_generic_request *req)
{
int err;
mutex_lock(&monc->mutex);
err = __do_generic_request(monc, 0, req);
mutex_unlock(&monc->mutex);
return err;
}
/* /*
* statfs * statfs
*/ */
...@@ -579,6 +616,96 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) ...@@ -579,6 +616,96 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
} }
EXPORT_SYMBOL(ceph_monc_do_statfs); EXPORT_SYMBOL(ceph_monc_do_statfs);
static void handle_get_version_reply(struct ceph_mon_client *monc,
struct ceph_msg *msg)
{
struct ceph_mon_generic_request *req;
u64 tid = le64_to_cpu(msg->hdr.tid);
void *p = msg->front.iov_base;
void *end = p + msg->front_alloc_len;
u64 handle;
dout("%s %p tid %llu\n", __func__, msg, tid);
ceph_decode_need(&p, end, 2*sizeof(u64), bad);
handle = ceph_decode_64(&p);
if (tid != 0 && tid != handle)
goto bad;
mutex_lock(&monc->mutex);
req = __lookup_generic_req(monc, handle);
if (req) {
*(u64 *)req->buf = ceph_decode_64(&p);
req->result = 0;
get_generic_request(req);
}
mutex_unlock(&monc->mutex);
if (req) {
complete_all(&req->completion);
put_generic_request(req);
}
return;
bad:
pr_err("corrupt mon_get_version reply\n");
ceph_msg_dump(msg);
}
/*
* Send MMonGetVersion and wait for the reply.
*
* @what: one of "mdsmap", "osdmap" or "monmap"
*/
int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
u64 *newest)
{
struct ceph_mon_generic_request *req;
void *p, *end;
u64 tid;
int err;
req = kzalloc(sizeof(*req), GFP_NOFS);
if (!req)
return -ENOMEM;
kref_init(&req->kref);
req->buf = newest;
req->buf_len = sizeof(*newest);
init_completion(&req->completion);
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
sizeof(u64) + sizeof(u32) + strlen(what),
GFP_NOFS, true);
if (!req->request) {
err = -ENOMEM;
goto out;
}
req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
GFP_NOFS, true);
if (!req->reply) {
err = -ENOMEM;
goto out;
}
p = req->request->front.iov_base;
end = p + req->request->front_alloc_len;
/* fill out request */
mutex_lock(&monc->mutex);
tid = ++monc->last_tid;
ceph_encode_64(&p, tid); /* handle */
ceph_encode_string(&p, end, what, strlen(what));
err = __do_generic_request(monc, tid, req);
mutex_unlock(&monc->mutex);
out:
kref_put(&req->kref, release_generic_request);
return err;
}
EXPORT_SYMBOL(ceph_monc_do_get_version);
/* /*
* pool ops * pool ops
*/ */
...@@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -981,6 +1108,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
handle_statfs_reply(monc, msg); handle_statfs_reply(monc, msg);
break; break;
case CEPH_MSG_MON_GET_VERSION_REPLY:
handle_get_version_reply(monc, msg);
break;
case CEPH_MSG_POOLOP_REPLY: case CEPH_MSG_POOLOP_REPLY:
handle_poolop_reply(monc, msg); handle_poolop_reply(monc, msg);
break; break;
...@@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, ...@@ -1029,6 +1160,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_AUTH_REPLY:
m = ceph_msg_get(monc->m_auth_reply); m = ceph_msg_get(monc->m_auth_reply);
break; break;
case CEPH_MSG_MON_GET_VERSION_REPLY:
if (le64_to_cpu(hdr->tid) != 0)
return get_generic_reply(con, hdr, skip);
/*
* Older OSDs don't set reply tid even if the orignal
* request had a non-zero tid. Workaround this weirdness
* by falling through to the allocate case.
*/
case CEPH_MSG_MON_MAP: case CEPH_MSG_MON_MAP:
case CEPH_MSG_MDS_MAP: case CEPH_MSG_MDS_MAP:
case CEPH_MSG_OSD_MAP: case CEPH_MSG_OSD_MAP:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment