Commit 8d2d441a authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "There is a lot of refactoring and hardening of the libceph and rbd
  code here from Ilya that fix various smaller bugs, and a few more
  important fixes with clone overlap.  The main fix is a critical change
  to the request_fn handling to not sleep that was exposed by the recent
  mutex changes (which will also go to the 3.16 stable series).

  Yan Zheng has several fixes in here for CephFS fixing ACL handling,
  time stamps, and request resends when the MDS restarts.

  Finally, there are a few cleanups from Himangi Saraogi based on
  Coccinelle"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (39 commits)
  libceph: set last_piece in ceph_msg_data_pages_cursor_init() correctly
  rbd: remove extra newlines from rbd_warn() messages
  rbd: allocate img_request with GFP_NOIO instead GFP_ATOMIC
  rbd: rework rbd_request_fn()
  ceph: fix kick_requests()
  ceph: fix append mode write
  ceph: fix sizeof(struct tYpO *) typo
  ceph: remove redundant memset(0)
  rbd: take snap_id into account when reading in parent info
  rbd: do not read in parent info before snap context
  rbd: update mapping size only on refresh
  rbd: harden rbd_dev_refresh() and callers a bit
  rbd: split rbd_dev_spec_update() into two functions
  rbd: remove unnecessary asserts in rbd_dev_image_probe()
  rbd: introduce rbd_dev_header_info()
  rbd: show the entire chain of parent images
  ceph: replace comma with a semicolon
  rbd: use rbd_segment_name_free() instead of kfree()
  ceph: check zero length in ceph_sync_read()
  ceph: reset r_resend_mds after receiving -ESTALE
  ...
parents 89838b80 5f740d7e
......@@ -94,5 +94,5 @@ current_snap
parent
Information identifying the pool, image, and snapshot id for
the parent image in a layered rbd image (format 2 only).
Information identifying the chain of parent images in a layered rbd
image. Entries are separated by empty lines.
......@@ -42,6 +42,7 @@
#include <linux/blkdev.h>
#include <linux/slab.h>
#include <linux/idr.h>
#include <linux/workqueue.h>
#include "rbd_types.h"
......@@ -332,7 +333,10 @@ struct rbd_device {
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
struct list_head rq_queue; /* incoming rq queue */
spinlock_t lock; /* queue, flags, open_count */
struct workqueue_struct *rq_wq;
struct work_struct rq_work;
struct rbd_image_header header;
unsigned long flags; /* possibly lock protected */
......@@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
static int rbd_dev_refresh(struct rbd_device *rbd_dev);
static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
static int rbd_dev_header_info(struct rbd_device *rbd_dev);
static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
u64 snap_id);
static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
......@@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
header->snap_names = snap_names;
header->snap_sizes = snap_sizes;
/* Make sure mapping size is consistent with header info */
if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
if (rbd_dev->mapping.size != header->image_size)
rbd_dev->mapping.size = header->image_size;
return 0;
out_2big:
ret = -EIO;
......@@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
rbd_dev->mapping.features = 0;
}
static void rbd_segment_name_free(const char *name)
{
/* The explicit cast here is needed to drop the const qualifier */
kmem_cache_free(rbd_segment_name_cache, (void *)name);
}
static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
{
char *name;
......@@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
pr_err("error formatting segment name for #%llu (%d)\n",
segment, ret);
kfree(name);
rbd_segment_name_free(name);
name = NULL;
}
return name;
}
static void rbd_segment_name_free(const char *name)
{
/* The explicit cast here is needed to drop the const qualifier */
kmem_cache_free(rbd_segment_name_cache, (void *)name);
}
static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
{
u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
......@@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
struct rbd_device *rbd_dev;
rbd_dev = obj_request->img_request->rbd_dev;
rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
rbd_warn(rbd_dev, "obj_request %p already marked img_data",
obj_request);
}
}
......@@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request)
if (obj_request_img_data_test(obj_request))
rbd_dev = obj_request->img_request->rbd_dev;
rbd_warn(rbd_dev, "obj_request %p already marked done\n",
rbd_warn(rbd_dev, "obj_request %p already marked done",
obj_request);
}
}
......@@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
{
dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
dout("%s %p\n", __func__, obj_request);
return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
}
static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
{
dout("%s %p\n", __func__, obj_request);
ceph_osdc_cancel_request(obj_request->osd_req);
}
/*
* Wait for an object request to complete. If interrupted, cancel the
* underlying osd request.
*/
static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
{
int ret;
dout("%s %p\n", __func__, obj_request);
ret = wait_for_completion_interruptible(&obj_request->completion);
if (ret < 0) {
dout("%s %p interrupted\n", __func__, obj_request);
rbd_obj_request_end(obj_request);
return ret;
}
dout("%s %p done\n", __func__, obj_request);
return 0;
}
static void rbd_img_request_complete(struct rbd_img_request *img_request)
{
......@@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
rbd_img_request_put(img_request);
}
/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
{
dout("%s: obj %p\n", __func__, obj_request);
return wait_for_completion_interruptible(&obj_request->completion);
}
/*
* The default/initial value for all image request flags is 0. Each
* is conditionally set to 1 at image request initialization time
......@@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
rbd_osd_trivial_callback(obj_request);
break;
default:
rbd_warn(NULL, "%s: unsupported op %hu\n",
rbd_warn(NULL, "%s: unsupported op %hu",
obj_request->object_name, (unsigned short) opcode);
break;
}
......@@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
if (!counter)
rbd_dev_unparent(rbd_dev);
else
rbd_warn(rbd_dev, "parent reference underflow\n");
rbd_warn(rbd_dev, "parent reference underflow");
}
/*
......@@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
/* Image was flattened, but parent is not yet torn down */
if (counter < 0)
rbd_warn(rbd_dev, "parent reference overflow\n");
rbd_warn(rbd_dev, "parent reference overflow");
return false;
}
......@@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create(
{
struct rbd_img_request *img_request;
img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
if (!img_request)
return NULL;
......@@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
if (result) {
struct rbd_device *rbd_dev = img_request->rbd_dev;
rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
img_request_write_test(img_request) ? "write" : "read",
obj_request->length, obj_request->img_offset,
obj_request->offset);
rbd_warn(rbd_dev, " result %d xferred %x\n",
rbd_warn(rbd_dev, " result %d xferred %x",
result, xferred);
if (!img_request->result)
img_request->result = result;
......@@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
rbd_dev->header_name, (unsigned long long)notify_id,
(unsigned int)opcode);
/*
* Until adequate refresh error handling is in place, there is
* not much we can do here, except warn.
*
* See http://tracker.ceph.com/issues/5040
*/
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
rbd_warn(rbd_dev, "refresh failed: %d", ret);
rbd_obj_notify_ack_sync(rbd_dev, notify_id);
ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
if (ret)
rbd_warn(rbd_dev, "notify_ack ret %d", ret);
}
/*
* Initiate a watch request, synchronously.
* Send a (un)watch request and wait for the ack. Return a request
* with a ref held on success or error.
*/
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
static struct rbd_obj_request *rbd_obj_watch_request_helper(
struct rbd_device *rbd_dev,
bool watch)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
rbd_assert(!rbd_dev->watch_event);
rbd_assert(!rbd_dev->watch_request);
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
&rbd_dev->watch_event);
if (ret < 0)
return ret;
rbd_assert(rbd_dev->watch_event);
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
OBJ_REQUEST_NODATA);
if (!obj_request) {
ret = -ENOMEM;
goto out_cancel;
}
if (!obj_request)
return ERR_PTR(-ENOMEM);
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
if (!obj_request->osd_req) {
ret = -ENOMEM;
goto out_put;
goto out;
}
ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie, 0, 1);
rbd_dev->watch_event->cookie, 0, watch);
rbd_osd_req_format_write(obj_request);
if (watch)
ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out_linger;
goto out;
ret = rbd_obj_request_wait(obj_request);
if (ret)
goto out_linger;
goto out;
ret = obj_request->result;
if (ret)
goto out_linger;
/*
* A watch request is set to linger, so the underlying osd
* request won't go away until we unregister it. We retain
* a pointer to the object request during that time (in
* rbd_dev->watch_request), so we'll keep a reference to
* it. We'll drop that reference (below) after we've
* unregistered it.
*/
rbd_dev->watch_request = obj_request;
if (ret) {
if (watch)
rbd_obj_request_end(obj_request);
goto out;
}
return 0;
return obj_request;
out_linger:
ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
out_put:
out:
rbd_obj_request_put(obj_request);
out_cancel:
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return ret;
return ERR_PTR(ret);
}
/*
* Tear down a watch request, synchronously.
* Initiate a watch request, synchronously.
*/
static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
int ret;
rbd_assert(rbd_dev->watch_event);
rbd_assert(rbd_dev->watch_request);
rbd_assert(!rbd_dev->watch_event);
rbd_assert(!rbd_dev->watch_request);
obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
OBJ_REQUEST_NODATA);
if (!obj_request) {
ret = -ENOMEM;
goto out_cancel;
}
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
&rbd_dev->watch_event);
if (ret < 0)
return ret;
obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
obj_request);
if (!obj_request->osd_req) {
ret = -ENOMEM;
goto out_put;
obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
if (IS_ERR(obj_request)) {
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return PTR_ERR(obj_request);
}
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie, 0, 0);
rbd_osd_req_format_write(obj_request);
ret = rbd_obj_request_submit(osdc, obj_request);
if (ret)
goto out_put;
/*
* A watch request is set to linger, so the underlying osd
* request won't go away until we unregister it. We retain
* a pointer to the object request during that time (in
* rbd_dev->watch_request), so we'll keep a reference to it.
* We'll drop that reference after we've unregistered it in
* rbd_dev_header_unwatch_sync().
*/
rbd_dev->watch_request = obj_request;
ret = rbd_obj_request_wait(obj_request);
if (ret)
goto out_put;
return 0;
}
ret = obj_request->result;
if (ret)
goto out_put;
/*
* Tear down a watch request, synchronously.
*/
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{
struct rbd_obj_request *obj_request;
/* We have successfully torn down the watch request */
rbd_assert(rbd_dev->watch_event);
rbd_assert(rbd_dev->watch_request);
ceph_osdc_unregister_linger_request(osdc,
rbd_dev->watch_request->osd_req);
rbd_obj_request_end(rbd_dev->watch_request);
rbd_obj_request_put(rbd_dev->watch_request);
rbd_dev->watch_request = NULL;
out_put:
rbd_obj_request_put(obj_request);
out_cancel:
obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
if (!IS_ERR(obj_request))
rbd_obj_request_put(obj_request);
else
rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
PTR_ERR(obj_request));
ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL;
return ret;
}
static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
{
int ret;
ret = __rbd_dev_header_unwatch_sync(rbd_dev);
if (ret) {
rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
ret);
}
}
/*
......@@ -3183,102 +3180,129 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
return ret;
}
static void rbd_request_fn(struct request_queue *q)
__releases(q->queue_lock) __acquires(q->queue_lock)
static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
{
struct rbd_device *rbd_dev = q->queuedata;
struct request *rq;
struct rbd_img_request *img_request;
u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
u64 length = blk_rq_bytes(rq);
bool wr = rq_data_dir(rq) == WRITE;
int result;
while ((rq = blk_fetch_request(q))) {
bool write_request = rq_data_dir(rq) == WRITE;
struct rbd_img_request *img_request;
u64 offset;
u64 length;
/* Ignore/skip any zero-length requests */
/* Ignore any non-FS requests that filter through. */
if (!length) {
dout("%s: zero-length request\n", __func__);
result = 0;
goto err_rq;
}
if (rq->cmd_type != REQ_TYPE_FS) {
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type);
__blk_end_request_all(rq, 0);
continue;
/* Disallow writes to a read-only device */
if (wr) {
if (rbd_dev->mapping.read_only) {
result = -EROFS;
goto err_rq;
}
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
/* Ignore/skip any zero-length requests */
/*
* Quit early if the mapped snapshot no longer exists. It's
* still possible the snapshot will have disappeared by the
* time our request arrives at the osd, but there's no sense in
* sending it if we already know.
*/
if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
dout("request for non-existent snapshot");
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
result = -ENXIO;
goto err_rq;
}
offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
length = (u64) blk_rq_bytes(rq);
if (offset && length > U64_MAX - offset + 1) {
rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
length);
result = -EINVAL;
goto err_rq; /* Shouldn't happen */
}
if (!length) {
dout("%s: zero-length request\n", __func__);
__blk_end_request_all(rq, 0);
continue;
}
if (offset + length > rbd_dev->mapping.size) {
rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
length, rbd_dev->mapping.size);
result = -EIO;
goto err_rq;
}
spin_unlock_irq(q->queue_lock);
img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
if (!img_request) {
result = -ENOMEM;
goto err_rq;
}
img_request->rq = rq;
/* Disallow writes to a read-only device */
result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
if (result)
goto err_img_request;
if (write_request) {
result = -EROFS;
if (rbd_dev->mapping.read_only)
goto end_request;
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
result = rbd_img_request_submit(img_request);
if (result)
goto err_img_request;
/*
* Quit early if the mapped snapshot no longer
* exists. It's still possible the snapshot will
* have disappeared by the time our request arrives
* at the osd, but there's no sense in sending it if
* we already know.
*/
if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
dout("request for non-existent snapshot");
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
result = -ENXIO;
goto end_request;
}
return;
result = -EINVAL;
if (offset && length > U64_MAX - offset + 1) {
rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
offset, length);
goto end_request; /* Shouldn't happen */
}
err_img_request:
rbd_img_request_put(img_request);
err_rq:
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
wr ? "write" : "read", length, offset, result);
blk_end_request_all(rq, result);
}
result = -EIO;
if (offset + length > rbd_dev->mapping.size) {
rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
offset, length, rbd_dev->mapping.size);
goto end_request;
}
static void rbd_request_workfn(struct work_struct *work)
{
struct rbd_device *rbd_dev =
container_of(work, struct rbd_device, rq_work);
struct request *rq, *next;
LIST_HEAD(requests);
result = -ENOMEM;
img_request = rbd_img_request_create(rbd_dev, offset, length,
write_request);
if (!img_request)
goto end_request;
spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
list_splice_init(&rbd_dev->rq_queue, &requests);
spin_unlock_irq(&rbd_dev->lock);
img_request->rq = rq;
list_for_each_entry_safe(rq, next, &requests, queuelist) {
list_del_init(&rq->queuelist);
rbd_handle_request(rbd_dev, rq);
}
}
result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
rq->bio);
if (!result)
result = rbd_img_request_submit(img_request);
if (result)
rbd_img_request_put(img_request);
end_request:
spin_lock_irq(q->queue_lock);
if (result < 0) {
rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
write_request ? "write" : "read",
length, offset, result);
__blk_end_request_all(rq, result);
/*
* Called with q->queue_lock held and interrupts disabled, possibly on
* the way to schedule(). Do not sleep here!
*/
static void rbd_request_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
struct request *rq;
int queued = 0;
rbd_assert(rbd_dev);
while ((rq = blk_fetch_request(q))) {
/* Ignore any non-FS requests that filter through. */
if (rq->cmd_type != REQ_TYPE_FS) {
dout("%s: non-fs request type %d\n", __func__,
(int) rq->cmd_type);
__blk_end_request_all(rq, 0);
continue;
}
list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
queued++;
}
if (queued)
queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
}
/*
......@@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
u64 mapping_size;
int ret;
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
down_write(&rbd_dev->header_rwsem);
mapping_size = rbd_dev->mapping.size;
if (rbd_dev->image_format == 1)
ret = rbd_dev_v1_header_info(rbd_dev);
else
ret = rbd_dev_v2_header_info(rbd_dev);
/* If it's a mapped snapshot, validate its EXISTS flag */
ret = rbd_dev_header_info(rbd_dev);
if (ret)
return ret;
/*
* If there is a parent, see if it has disappeared due to the
* mapped image getting flattened.
*/
if (rbd_dev->parent) {
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
return ret;
}
if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
if (rbd_dev->mapping.size != rbd_dev->header.image_size)
rbd_dev->mapping.size = rbd_dev->header.image_size;
} else {
/* validate mapped snapshot's EXISTS flag */
rbd_exists_validate(rbd_dev);
}
rbd_exists_validate(rbd_dev);
up_write(&rbd_dev->header_rwsem);
if (mapping_size != rbd_dev->mapping.size) {
if (mapping_size != rbd_dev->mapping.size)
rbd_dev_update_size(rbd_dev);
}
return ret;
return 0;
}
static int rbd_init_disk(struct rbd_device *rbd_dev)
......@@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev,
}
/*
* For an rbd v2 image, shows the pool id, image id, and snapshot id
* for the parent image. If there is no parent, simply shows
* "(no parent image)".
* For a v2 image, shows the chain of parent images, separated by empty
* lines. For v1 images or if there is no parent, shows "(no parent
* image)".
*/
static ssize_t rbd_parent_show(struct device *dev,
struct device_attribute *attr,
char *buf)
struct device_attribute *attr,
char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
struct rbd_spec *spec = rbd_dev->parent_spec;
int count;
char *bufp = buf;
ssize_t count = 0;
if (!spec)
if (!rbd_dev->parent)
return sprintf(buf, "(no parent image)\n");
count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
(unsigned long long) spec->pool_id, spec->pool_name);
if (count < 0)
return count;
bufp += count;
count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
spec->image_name ? spec->image_name : "(unknown)");
if (count < 0)
return count;
bufp += count;
count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
(unsigned long long) spec->snap_id, spec->snap_name);
if (count < 0)
return count;
bufp += count;
count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
if (count < 0)
return count;
bufp += count;
for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
struct rbd_spec *spec = rbd_dev->parent_spec;
count += sprintf(&buf[count], "%s"
"pool_id %llu\npool_name %s\n"
"image_id %s\nimage_name %s\n"
"snap_id %llu\nsnap_name %s\n"
"overlap %llu\n",
!count ? "" : "\n", /* first? */
spec->pool_id, spec->pool_name,
spec->image_id, spec->image_name ?: "(unknown)",
spec->snap_id, spec->snap_name,
rbd_dev->parent_overlap);
}
return (ssize_t) (bufp - buf);
return count;
}
static ssize_t rbd_image_refresh(struct device *dev,
......@@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
ret = rbd_dev_refresh(rbd_dev);
if (ret)
rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
return ret;
return ret < 0 ? ret : size;
return size;
}
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
......@@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
spec = kzalloc(sizeof (*spec), GFP_KERNEL);
if (!spec)
return NULL;
spec->pool_id = CEPH_NOPOOL;
spec->snap_id = CEPH_NOSNAP;
kref_init(&spec->kref);
return spec;
......@@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
return NULL;
spin_lock_init(&rbd_dev->lock);
INIT_LIST_HEAD(&rbd_dev->rq_queue);
INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
rbd_dev->flags = 0;
atomic_set(&rbd_dev->parent_ref, 0);
INIT_LIST_HEAD(&rbd_dev->node);
......@@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
goto out_err;
}
snapid = cpu_to_le64(CEPH_NOSNAP);
snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_parent",
&snapid, sizeof (snapid),
......@@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
ret = -EIO;
if (pool_id > (u64)U32_MAX) {
rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
rbd_warn(NULL, "parent pool id too large (%llu > %u)",
(unsigned long long)pool_id, U32_MAX);
goto out_err;
}
......@@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
parent_spec->snap_id = snap_id;
rbd_dev->parent_spec = parent_spec;
parent_spec = NULL; /* rbd_dev now owns this */
} else {
kfree(image_id);
}
/*
......@@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
* overlap is zero we just pretend there was
* no parent image.
*/
rbd_warn(rbd_dev, "ignoring parent of "
"clone with overlap 0\n");
rbd_warn(rbd_dev, "ignoring parent with overlap 0");
}
}
out:
......@@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
}
/*
* When an rbd image has a parent image, it is identified by the
* pool, image, and snapshot ids (not names). This function fills
* in the names for those ids. (It's OK if we can't figure out the
* name for an image id, but the pool and snapshot ids should always
* exist and have names.) All names in an rbd spec are dynamically
* allocated.
* An image being mapped will have everything but the snap id.
*/
static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
{
struct rbd_spec *spec = rbd_dev->spec;
rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
rbd_assert(spec->image_id && spec->image_name);
rbd_assert(spec->snap_name);
if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
u64 snap_id;
snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
if (snap_id == CEPH_NOSNAP)
return -ENOENT;
spec->snap_id = snap_id;
} else {
spec->snap_id = CEPH_NOSNAP;
}
return 0;
}
/*
* A parent image will have all ids but none of the names.
*
* When an image being mapped (not a parent) is probed, we have the
* pool name and pool id, image name and image id, and the snapshot
* name. The only thing we're missing is the snapshot id.
* All names in an rbd spec are dynamically allocated. It's OK if we
* can't figure out the name for an image id.
*/
static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_spec *spec = rbd_dev->spec;
......@@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
const char *snap_name;
int ret;
/*
* An image being mapped will have the pool name (etc.), but
* we need to look up the snapshot id.
*/
if (spec->pool_name) {
if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
u64 snap_id;
snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
if (snap_id == CEPH_NOSNAP)
return -ENOENT;
spec->snap_id = snap_id;
} else {
spec->snap_id = CEPH_NOSNAP;
}
return 0;
}
rbd_assert(spec->pool_id != CEPH_NOPOOL);
rbd_assert(spec->image_id);
rbd_assert(spec->snap_id != CEPH_NOSNAP);
/* Get the pool name; we have to make our own copy of this */
......@@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
if (!image_name)
rbd_warn(rbd_dev, "unable to get image name");
/* Look up the snapshot name, and make a copy */
/* Fetch the snapshot name */
snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
if (IS_ERR(snap_name)) {
......@@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
spec->snap_name = snap_name;
return 0;
out_err:
kfree(image_name);
kfree(pool_name);
return ret;
}
......@@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
return ret;
}
/*
* If the image supports layering, get the parent info. We
* need to probe the first time regardless. Thereafter we
* only need to if there's a parent, to see if it has
* disappeared due to the mapped image getting flattened.
*/
if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
(first_time || rbd_dev->parent_spec)) {
bool warn;
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
return ret;
/*
* Print a warning if this is the initial probe and
* the image has a parent. Don't print it if the
* image now being probed is itself a parent. We
* can tell at this point because we won't know its
* pool name yet (just its pool id).
*/
warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
if (first_time && warn)
rbd_warn(rbd_dev, "WARNING: kernel layering "
"is EXPERIMENTAL!");
}
if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
if (rbd_dev->mapping.size != rbd_dev->header.image_size)
rbd_dev->mapping.size = rbd_dev->header.image_size;
ret = rbd_dev_v2_snap_context(rbd_dev);
dout("rbd_dev_v2_snap_context returned %d\n", ret);
return ret;
}
static int rbd_dev_header_info(struct rbd_device *rbd_dev)
{
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
if (rbd_dev->image_format == 1)
return rbd_dev_v1_header_info(rbd_dev);
return rbd_dev_v2_header_info(rbd_dev);
}
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
{
struct device *dev;
......@@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
ret = rbd_dev_mapping_set(rbd_dev);
if (ret)
goto err_out_disk;
set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
if (!rbd_dev->rq_wq)
goto err_out_mapping;
ret = rbd_bus_add_dev(rbd_dev);
if (ret)
goto err_out_mapping;
goto err_out_workqueue;
/* Everything's ready. Announce the disk to the world. */
......@@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
return ret;
err_out_workqueue:
destroy_workqueue(rbd_dev->rq_wq);
rbd_dev->rq_wq = NULL;
err_out_mapping:
rbd_dev_mapping_clear(rbd_dev);
err_out_disk:
......@@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
ret = rbd_dev_image_id(rbd_dev);
if (ret)
return ret;
rbd_assert(rbd_dev->spec->image_id);
rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
ret = rbd_dev_header_name(rbd_dev);
if (ret)
......@@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
goto out_header_name;
}
if (rbd_dev->image_format == 1)
ret = rbd_dev_v1_header_info(rbd_dev);
else
ret = rbd_dev_v2_header_info(rbd_dev);
ret = rbd_dev_header_info(rbd_dev);
if (ret)
goto err_out_watch;
ret = rbd_dev_spec_update(rbd_dev);
/*
* If this image is the one being mapped, we have pool name and
* id, image name and id, and snap name - need to fill snap id.
* Otherwise this is a parent image, identified by pool, image
* and snap ids - need to fill in names for those ids.
*/
if (mapping)
ret = rbd_spec_fill_snap_id(rbd_dev);
else
ret = rbd_spec_fill_names(rbd_dev);
if (ret)
goto err_out_probe;
if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
ret = rbd_dev_v2_parent_info(rbd_dev);
if (ret)
goto err_out_probe;
/*
* Need to warn users if this image is the one being
* mapped and has a parent.
*/
if (mapping && rbd_dev->parent_spec)
rbd_warn(rbd_dev,
"WARNING: kernel layering is EXPERIMENTAL!");
}
ret = rbd_dev_probe_parent(rbd_dev);
if (ret)
goto err_out_probe;
dout("discovered format %u image, header name is %s\n",
rbd_dev->image_format, rbd_dev->header_name);
return 0;
err_out_probe:
rbd_dev_unprobe(rbd_dev);
err_out_watch:
......@@ -5199,9 +5242,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
rbd_dev->image_format = 0;
kfree(rbd_dev->spec->image_id);
rbd_dev->spec->image_id = NULL;
dout("probe failed, returning %d\n", ret);
return ret;
}
......@@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
/* The ceph file layout needs to fit pool id in 32 bits */
if (spec->pool_id > (u64)U32_MAX) {
rbd_warn(NULL, "pool id too large (%llu > %u)\n",
rbd_warn(NULL, "pool id too large (%llu > %u)",
(unsigned long long)spec->pool_id, U32_MAX);
rc = -EIO;
goto err_out_client;
......@@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
destroy_workqueue(rbd_dev->rq_wq);
rbd_free_disk(rbd_dev);
clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
rbd_dev_mapping_clear(rbd_dev);
......
......@@ -172,14 +172,24 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
{
struct posix_acl *default_acl, *acl;
umode_t new_mode = inode->i_mode;
int error;
error = posix_acl_create(dir, &inode->i_mode, &default_acl, &acl);
error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
if (error)
return error;
if (!default_acl && !acl)
if (!default_acl && !acl) {
cache_no_acl(inode);
if (new_mode != inode->i_mode) {
struct iattr newattrs = {
.ia_mode = new_mode,
.ia_valid = ATTR_MODE,
};
error = ceph_setattr(dentry, &newattrs);
}
return error;
}
if (default_acl) {
error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
......
......@@ -3277,7 +3277,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
rel->ino = cpu_to_le64(ceph_ino(inode));
rel->cap_id = cpu_to_le64(cap->cap_id);
rel->seq = cpu_to_le32(cap->seq);
rel->issue_seq = cpu_to_le32(cap->issue_seq),
rel->issue_seq = cpu_to_le32(cap->issue_seq);
rel->mseq = cpu_to_le32(cap->mseq);
rel->caps = cpu_to_le32(cap->implemented);
rel->wanted = cpu_to_le32(cap->mds_wanted);
......
......@@ -423,6 +423,9 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
dout("sync_read on file %p %llu~%u %s\n", file, off,
(unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
if (!len)
return 0;
/*
* flush any page cache pages in this range. this
* will make concurrent normal and sync io slow,
......@@ -470,8 +473,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
size_t left = ret;
while (left) {
int copy = min_t(size_t, PAGE_SIZE, left);
l = copy_page_to_iter(pages[k++], 0, copy, i);
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t,
PAGE_SIZE - page_off, left);
l = copy_page_to_iter(pages[k++], page_off,
copy, i);
off += l;
left -= l;
if (l < copy)
......@@ -531,7 +537,7 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
* objects, rollback on failure, etc.)
*/
static ssize_t
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
......@@ -547,7 +553,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
int check_caps = 0;
int ret;
struct timespec mtime = CURRENT_TIME;
loff_t pos = iocb->ki_pos;
size_t count = iov_iter_count(from);
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
......@@ -646,7 +651,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from)
* correct atomic write, we should e.g. take write locks on all
* objects, rollback on failure, etc.)
*/
static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
static ssize_t
ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
......@@ -663,7 +669,6 @@ static ssize_t ceph_sync_write(struct kiocb *iocb, struct iov_iter *from)
int check_caps = 0;
int ret;
struct timespec mtime = CURRENT_TIME;
loff_t pos = iocb->ki_pos;
size_t count = iov_iter_count(from);
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
......@@ -918,9 +923,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
/* we might need to revert back to that point */
data = *from;
if (file->f_flags & O_DIRECT)
written = ceph_sync_direct_write(iocb, &data);
written = ceph_sync_direct_write(iocb, &data, pos);
else
written = ceph_sync_write(iocb, &data);
written = ceph_sync_write(iocb, &data, pos);
if (written == -EOLDSNAPC) {
dout("aio_write %p %llx.%llx %llu~%u"
"got EOLDSNAPC, retrying\n",
......@@ -1177,6 +1182,9 @@ static long ceph_fallocate(struct file *file, int mode,
loff_t endoff = 0;
loff_t size;
if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
return -EOPNOTSUPP;
if (!S_ISREG(inode->i_mode))
return -EOPNOTSUPP;
......
......@@ -1904,6 +1904,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
if (req->r_got_unsafe) {
void *p;
/*
* Replay. Do not regenerate message (and rebuild
* paths, etc.); just use the original message.
......@@ -1924,8 +1925,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
/* remove cap/dentry releases from message */
rhead->num_releases = 0;
msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
msg->front.iov_len = req->r_request_release_offset;
/* time stamp */
p = msg->front.iov_base + req->r_request_release_offset;
ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
return 0;
}
......@@ -2061,11 +2067,12 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
static void kick_requests(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_request *req;
struct rb_node *p;
struct rb_node *p = rb_first(&mdsc->request_tree);
dout("kick_requests mds%d\n", mds);
for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p);
if (req->r_got_unsafe)
continue;
if (req->r_session &&
......@@ -2248,6 +2255,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
*/
if (result == -ESTALE) {
dout("got ESTALE on request %llu", req->r_tid);
req->r_resend_mds = -1;
if (req->r_direct_mode != USE_AUTH_MDS) {
dout("not using auth, setting for that now");
req->r_direct_mode = USE_AUTH_MDS;
......
......@@ -592,12 +592,12 @@ static int __build_xattrs(struct inode *inode)
xattr_version = ci->i_xattrs.version;
spin_unlock(&ci->i_ceph_lock);
xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
xattrs = kcalloc(numattr, sizeof(struct ceph_inode_xattr *),
GFP_NOFS);
err = -ENOMEM;
if (!xattrs)
goto bad_lock;
memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *));
for (i = 0; i < numattr; i++) {
xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
GFP_NOFS);
......
......@@ -285,19 +285,9 @@ extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail);
extern void ceph_msg_kfree(struct ceph_msg *m);
static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
kref_get(&msg->kref);
return msg;
}
extern void ceph_msg_last_put(struct kref *kref);
static inline void ceph_msg_put(struct ceph_msg *msg)
{
kref_put(&msg->kref, ceph_msg_last_put);
}
extern struct ceph_msg *ceph_msg_get(struct ceph_msg *msg);
extern void ceph_msg_put(struct ceph_msg *msg);
extern void ceph_msg_dump(struct ceph_msg *msg);
......
......@@ -117,7 +117,7 @@ struct ceph_osd_request {
struct list_head r_req_lru_item;
struct list_head r_osd_item;
struct list_head r_linger_item;
struct list_head r_linger_osd;
struct list_head r_linger_osd_item;
struct ceph_osd *r_osd;
struct ceph_pg r_pgid;
int r_pg_osds[CEPH_PG_MAX_SIZE];
......@@ -325,22 +325,14 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
{
kref_get(&req->r_kref);
}
extern void ceph_osdc_release_request(struct kref *kref);
static inline void ceph_osdc_put_request(struct ceph_osd_request *req)
{
kref_put(&req->r_kref, ceph_osdc_release_request);
}
extern void ceph_osdc_get_request(struct ceph_osd_request *req);
extern void ceph_osdc_put_request(struct ceph_osd_request *req);
extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
bool nofail);
extern void ceph_osdc_cancel_request(struct ceph_osd_request *req);
extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
......
......@@ -174,6 +174,7 @@ static struct lock_class_key socket_class;
#define SKIP_BUF_SIZE 1024
static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
static void con_work(struct work_struct *);
static void con_fault(struct ceph_connection *con);
......@@ -680,7 +681,7 @@ void ceph_con_close(struct ceph_connection *con)
reset_connection(con);
con->peer_global_seq = 0;
cancel_delayed_work(&con->work);
cancel_con(con);
con_close_socket(con);
mutex_unlock(&con->mutex);
}
......@@ -900,7 +901,7 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(page_count > (int)USHRT_MAX);
cursor->page_count = (unsigned short)page_count;
BUG_ON(length > SIZE_MAX - cursor->page_offset);
cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
}
static struct page *
......@@ -2667,19 +2668,16 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
{
if (!con->ops->get(con)) {
dout("%s %p ref count 0\n", __func__, con);
return -ENOENT;
}
if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
return -EBUSY;
}
dout("%s %p %lu\n", __func__, con, delay);
return 0;
}
......@@ -2688,6 +2686,14 @@ static void queue_con(struct ceph_connection *con)
(void) queue_con_delay(con, 0);
}
static void cancel_con(struct ceph_connection *con)
{
if (cancel_delayed_work(&con->work)) {
dout("%s %p\n", __func__, con);
con->ops->put(con);
}
}
static bool con_sock_closed(struct ceph_connection *con)
{
if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
......@@ -3269,24 +3275,21 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
/*
* Free a generically kmalloc'd message.
*/
void ceph_msg_kfree(struct ceph_msg *m)
static void ceph_msg_free(struct ceph_msg *m)
{
dout("msg_kfree %p\n", m);
dout("%s %p\n", __func__, m);
ceph_kvfree(m->front.iov_base);
kmem_cache_free(ceph_msg_cache, m);
}
/*
* Drop a msg ref. Destroy as needed.
*/
void ceph_msg_last_put(struct kref *kref)
static void ceph_msg_release(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
LIST_HEAD(data);
struct list_head *links;
struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
dout("%s %p\n", __func__, m);
WARN_ON(!list_empty(&m->list_head));
/* drop middle, data, if any */
......@@ -3308,9 +3311,25 @@ void ceph_msg_last_put(struct kref *kref)
if (m->pool)
ceph_msgpool_put(m->pool, m);
else
ceph_msg_kfree(m);
ceph_msg_free(m);
}
struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
{
dout("%s %p (was %d)\n", __func__, msg,
atomic_read(&msg->kref.refcount));
kref_get(&msg->kref);
return msg;
}
EXPORT_SYMBOL(ceph_msg_get);
void ceph_msg_put(struct ceph_msg *msg)
{
dout("%s %p (was %d)\n", __func__, msg,
atomic_read(&msg->kref.refcount));
kref_put(&msg->kref, ceph_msg_release);
}
EXPORT_SYMBOL(ceph_msg_last_put);
EXPORT_SYMBOL(ceph_msg_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
......
......@@ -297,12 +297,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
/*
* requests
*/
void ceph_osdc_release_request(struct kref *kref)
static void ceph_osdc_release_request(struct kref *kref)
{
struct ceph_osd_request *req;
struct ceph_osd_request *req = container_of(kref,
struct ceph_osd_request, r_kref);
unsigned int which;
req = container_of(kref, struct ceph_osd_request, r_kref);
dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
req->r_request, req->r_reply);
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
WARN_ON(!list_empty(&req->r_req_lru_item));
WARN_ON(!list_empty(&req->r_osd_item));
WARN_ON(!list_empty(&req->r_linger_item));
WARN_ON(!list_empty(&req->r_linger_osd_item));
WARN_ON(req->r_osd);
if (req->r_request)
ceph_msg_put(req->r_request);
if (req->r_reply) {
......@@ -320,7 +329,22 @@ void ceph_osdc_release_request(struct kref *kref)
kmem_cache_free(ceph_osd_request_cache, req);
}
EXPORT_SYMBOL(ceph_osdc_release_request);
void ceph_osdc_get_request(struct ceph_osd_request *req)
{
dout("%s %p (was %d)\n", __func__, req,
atomic_read(&req->r_kref.refcount));
kref_get(&req->r_kref);
}
EXPORT_SYMBOL(ceph_osdc_get_request);
void ceph_osdc_put_request(struct ceph_osd_request *req)
{
dout("%s %p (was %d)\n", __func__, req,
atomic_read(&req->r_kref.refcount));
kref_put(&req->r_kref, ceph_osdc_release_request);
}
EXPORT_SYMBOL(ceph_osdc_put_request);
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_snap_context *snapc,
......@@ -364,7 +388,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd);
INIT_LIST_HEAD(&req->r_linger_osd_item);
INIT_LIST_HEAD(&req->r_req_lru_item);
INIT_LIST_HEAD(&req->r_osd_item);
......@@ -916,7 +940,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
* list at the end to keep things in tid order.
*/
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
r_linger_osd) {
r_linger_osd_item) {
/*
* reregister request prior to unregistering linger so
* that r_osd is preserved.
......@@ -1008,6 +1032,8 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
dout("__remove_osd %p\n", osd);
BUG_ON(!list_empty(&osd->o_requests));
BUG_ON(!list_empty(&osd->o_linger_requests));
rb_erase(&osd->o_node, &osdc->osds);
list_del_init(&osd->o_osd_lru);
ceph_con_close(&osd->o_con);
......@@ -1029,12 +1055,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
static void __move_osd_to_lru(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
dout("__move_osd_to_lru %p\n", osd);
dout("%s %p\n", __func__, osd);
BUG_ON(!list_empty(&osd->o_osd_lru));
list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
}
static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
struct ceph_osd *osd)
{
dout("%s %p\n", __func__, osd);
if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests))
__move_osd_to_lru(osdc, osd);
}
static void __remove_osd_from_lru(struct ceph_osd *osd)
{
dout("__remove_osd_from_lru %p\n", osd);
......@@ -1175,6 +1212,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
rb_erase(&req->r_node, &osdc->requests);
RB_CLEAR_NODE(&req->r_node);
osdc->num_requests--;
if (req->r_osd) {
......@@ -1182,12 +1220,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
ceph_msg_revoke(req->r_request);
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests) &&
list_empty(&req->r_osd->o_linger_requests)) {
dout("moving osd to %p lru\n", req->r_osd);
__move_osd_to_lru(osdc, req->r_osd);
}
if (list_empty(&req->r_linger_item))
maybe_move_osd_to_lru(osdc, req->r_osd);
if (list_empty(&req->r_linger_osd_item))
req->r_osd = NULL;
}
......@@ -1214,45 +1248,39 @@ static void __cancel_request(struct ceph_osd_request *req)
static void __register_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__register_linger_request %p\n", req);
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
WARN_ON(!req->r_linger);
ceph_osdc_get_request(req);
list_add_tail(&req->r_linger_item, &osdc->req_linger);
if (req->r_osd)
list_add_tail(&req->r_linger_osd,
list_add_tail(&req->r_linger_osd_item,
&req->r_osd->o_linger_requests);
}
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__unregister_linger_request %p\n", req);
WARN_ON(!req->r_linger);
if (list_empty(&req->r_linger_item)) {
dout("%s %p tid %llu not registered\n", __func__, req,
req->r_tid);
return;
}
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
list_del_init(&req->r_linger_item);
if (req->r_osd) {
list_del_init(&req->r_linger_osd);
if (list_empty(&req->r_osd->o_requests) &&
list_empty(&req->r_osd->o_linger_requests)) {
dout("moving osd to %p lru\n", req->r_osd);
__move_osd_to_lru(osdc, req->r_osd);
}
if (req->r_osd) {
list_del_init(&req->r_linger_osd_item);
maybe_move_osd_to_lru(osdc, req->r_osd);
if (list_empty(&req->r_osd_item))
req->r_osd = NULL;
}
ceph_osdc_put_request(req);
}
void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
mutex_lock(&osdc->request_mutex);
if (req->r_linger) {
req->r_linger = 0;
__unregister_linger_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
}
EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
......@@ -2429,6 +2457,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_start_request);
/*
* Unregister a registered request. The request is not completed (i.e.
* no callbacks or wakeups) - higher layers are supposed to know what
* they are canceling.
*/
void ceph_osdc_cancel_request(struct ceph_osd_request *req)
{
struct ceph_osd_client *osdc = req->r_osdc;
mutex_lock(&osdc->request_mutex);
if (req->r_linger)
__unregister_linger_request(osdc, req);
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
}
EXPORT_SYMBOL(ceph_osdc_cancel_request);
/*
* wait for a request to complete
*/
......@@ -2437,18 +2484,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
{
int rc;
dout("%s %p tid %llu\n", __func__, req, req->r_tid);
rc = wait_for_completion_interruptible(&req->r_completion);
if (rc < 0) {
mutex_lock(&osdc->request_mutex);
__cancel_request(req);
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
ceph_osdc_cancel_request(req);
complete_request(req);
dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
return rc;
}
dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
req->r_result);
return req->r_result;
}
EXPORT_SYMBOL(ceph_osdc_wait_request);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment