Commit 524d0c68 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-6.1-rc1' of https://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "A quiet round this time: several assorted filesystem fixes, the most
  noteworthy one being some additional wakeups in cap handling code, and
  a messenger cleanup"

* tag 'ceph-for-6.1-rc1' of https://github.com/ceph/ceph-client:
  ceph: remove Sage's git tree from documentation
  ceph: fix incorrectly showing the .snap size for stat
  ceph: fail the open_by_handle_at() if the dentry is being unlinked
  ceph: increment i_version when doing a setattr with caps
  ceph: Use kcalloc for allocating multiple elements
  ceph: no need to wait for transition RDCACHE|RD -> RD
  ceph: fail the request if the peer MDS doesn't support getvxattr op
  ceph: wake up the waiters if any new caps comes
  libceph: drop last_piece flag from ceph_msg_data_cursor
parents 66b83455 71cf0c1c
...@@ -203,7 +203,6 @@ For more information on Ceph, see the home page at ...@@ -203,7 +203,6 @@ For more information on Ceph, see the home page at
The Linux kernel client source tree is available at The Linux kernel client source tree is available at
- https://github.com/ceph/ceph-client.git - https://github.com/ceph/ceph-client.git
- git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
and the source for the full system is at and the source for the full system is at
https://github.com/ceph/ceph.git https://github.com/ceph/ceph.git
...@@ -754,6 +754,7 @@ void ceph_add_cap(struct inode *inode, ...@@ -754,6 +754,7 @@ void ceph_add_cap(struct inode *inode,
cap->issue_seq = seq; cap->issue_seq = seq;
cap->mseq = mseq; cap->mseq = mseq;
cap->cap_gen = gen; cap->cap_gen = gen;
wake_up_all(&ci->i_cap_wq);
} }
/* /*
...@@ -2285,7 +2286,7 @@ static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode) ...@@ -2285,7 +2286,7 @@ static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode)
struct ceph_mds_request *req; struct ceph_mds_request *req;
int i; int i;
sessions = kzalloc(max_sessions * sizeof(s), GFP_KERNEL); sessions = kcalloc(max_sessions, sizeof(s), GFP_KERNEL);
if (!sessions) { if (!sessions) {
err = -ENOMEM; err = -ENOMEM;
goto out; goto out;
...@@ -2759,13 +2760,17 @@ static int try_get_cap_refs(struct inode *inode, int need, int want, ...@@ -2759,13 +2760,17 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
* on transition from wanted -> needed caps. This is needed * on transition from wanted -> needed caps. This is needed
* for WRBUFFER|WR -> WR to avoid a new WR sync write from * for WRBUFFER|WR -> WR to avoid a new WR sync write from
* going before a prior buffered writeback happens. * going before a prior buffered writeback happens.
*
* For RDCACHE|RD -> RD, there is not need to wait and we can
* just exclude the revoking caps and force to sync read.
*/ */
int not = want & ~(have & need); int not = want & ~(have & need);
int revoking = implemented & ~have; int revoking = implemented & ~have;
int exclude = revoking & not;
dout("get_cap_refs %p have %s but not %s (revoking %s)\n", dout("get_cap_refs %p have %s but not %s (revoking %s)\n",
inode, ceph_cap_string(have), ceph_cap_string(not), inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking)); ceph_cap_string(revoking));
if ((revoking & not) == 0) { if (!exclude || !(exclude & CEPH_CAP_FILE_BUFFER)) {
if (!snap_rwsem_locked && if (!snap_rwsem_locked &&
!ci->i_head_snapc && !ci->i_head_snapc &&
(need & CEPH_CAP_FILE_WR)) { (need & CEPH_CAP_FILE_WR)) {
...@@ -2787,7 +2792,7 @@ static int try_get_cap_refs(struct inode *inode, int need, int want, ...@@ -2787,7 +2792,7 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
snap_rwsem_locked = true; snap_rwsem_locked = true;
} }
if ((have & want) == want) if ((have & want) == want)
*got = need | want; *got = need | (want & ~exclude);
else else
*got = need; *got = need;
ceph_take_cap_refs(ci, *got, true); ceph_take_cap_refs(ci, *got, true);
...@@ -3550,6 +3555,9 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3550,6 +3555,9 @@ static void handle_cap_grant(struct inode *inode,
check_caps = 1; /* check auth cap only */ check_caps = 1; /* check auth cap only */
else else
check_caps = 2; /* check all caps */ check_caps = 2; /* check all caps */
/* If there is new caps, try to wake up the waiters */
if (~cap->issued & newcaps)
wake = true;
cap->issued = newcaps; cap->issued = newcaps;
cap->implemented |= newcaps; cap->implemented |= newcaps;
} else if (cap->issued == newcaps) { } else if (cap->issued == newcaps) {
......
...@@ -181,6 +181,7 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino) ...@@ -181,6 +181,7 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
{ {
struct inode *inode = __lookup_inode(sb, ino); struct inode *inode = __lookup_inode(sb, ino);
struct ceph_inode_info *ci = ceph_inode(inode);
int err; int err;
if (IS_ERR(inode)) if (IS_ERR(inode))
...@@ -192,7 +193,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) ...@@ -192,7 +193,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
return ERR_PTR(err); return ERR_PTR(err);
} }
/* -ESTALE if inode as been unlinked and no file is open */ /* -ESTALE if inode as been unlinked and no file is open */
if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) { if ((inode->i_nlink == 0) && !__ceph_is_file_opened(ci)) {
iput(inode); iput(inode);
return ERR_PTR(-ESTALE); return ERR_PTR(-ESTALE);
} }
......
...@@ -2192,6 +2192,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr) ...@@ -2192,6 +2192,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
&prealloc_cf); &prealloc_cf);
inode->i_ctime = attr->ia_ctime; inode->i_ctime = attr->ia_ctime;
inode_inc_iversion_raw(inode);
} }
release &= issued; release &= issued;
...@@ -2356,6 +2357,7 @@ int ceph_do_getvxattr(struct inode *inode, const char *name, void *value, ...@@ -2356,6 +2357,7 @@ int ceph_do_getvxattr(struct inode *inode, const char *name, void *value,
goto out; goto out;
} }
req->r_feature_needed = CEPHFS_FEATURE_OP_GETVXATTR;
req->r_path2 = kstrdup(name, GFP_NOFS); req->r_path2 = kstrdup(name, GFP_NOFS);
if (!req->r_path2) { if (!req->r_path2) {
err = -ENOMEM; err = -ENOMEM;
...@@ -2447,6 +2449,7 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path, ...@@ -2447,6 +2449,7 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
struct kstat *stat, u32 request_mask, unsigned int flags) struct kstat *stat, u32 request_mask, unsigned int flags)
{ {
struct inode *inode = d_inode(path->dentry); struct inode *inode = d_inode(path->dentry);
struct super_block *sb = inode->i_sb;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u32 valid_mask = STATX_BASIC_STATS; u32 valid_mask = STATX_BASIC_STATS;
int err = 0; int err = 0;
...@@ -2476,16 +2479,34 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path, ...@@ -2476,16 +2479,34 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
} }
if (ceph_snap(inode) == CEPH_NOSNAP) if (ceph_snap(inode) == CEPH_NOSNAP)
stat->dev = inode->i_sb->s_dev; stat->dev = sb->s_dev;
else else
stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0; stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
if (S_ISDIR(inode->i_mode)) { if (S_ISDIR(inode->i_mode)) {
if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), if (ceph_test_mount_opt(ceph_sb_to_client(sb), RBYTES)) {
RBYTES))
stat->size = ci->i_rbytes; stat->size = ci->i_rbytes;
} else if (ceph_snap(inode) == CEPH_SNAPDIR) {
struct ceph_inode_info *pci;
struct ceph_snap_realm *realm;
struct inode *parent;
parent = ceph_lookup_inode(sb, ceph_ino(inode));
if (!parent)
return PTR_ERR(parent);
pci = ceph_inode(parent);
spin_lock(&pci->i_ceph_lock);
realm = pci->i_snap_realm;
if (realm)
stat->size = realm->num_snaps;
else else
stat->size = 0;
spin_unlock(&pci->i_ceph_lock);
iput(parent);
} else {
stat->size = ci->i_files + ci->i_subdirs; stat->size = ci->i_files + ci->i_subdirs;
}
stat->blocks = 0; stat->blocks = 0;
stat->blksize = 65536; stat->blksize = 65536;
/* /*
......
...@@ -2318,6 +2318,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) ...@@ -2318,6 +2318,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
INIT_LIST_HEAD(&req->r_unsafe_dir_item); INIT_LIST_HEAD(&req->r_unsafe_dir_item);
INIT_LIST_HEAD(&req->r_unsafe_target_item); INIT_LIST_HEAD(&req->r_unsafe_target_item);
req->r_fmode = -1; req->r_fmode = -1;
req->r_feature_needed = -1;
kref_init(&req->r_kref); kref_init(&req->r_kref);
RB_CLEAR_NODE(&req->r_node); RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_wait); INIT_LIST_HEAD(&req->r_wait);
...@@ -2916,6 +2917,16 @@ static void __do_request(struct ceph_mds_client *mdsc, ...@@ -2916,6 +2917,16 @@ static void __do_request(struct ceph_mds_client *mdsc,
dout("do_request mds%d session %p state %s\n", mds, session, dout("do_request mds%d session %p state %s\n", mds, session,
ceph_session_state_name(session->s_state)); ceph_session_state_name(session->s_state));
/*
* The old ceph will crash the MDSs when see unknown OPs
*/
if (req->r_feature_needed > 0 &&
!test_bit(req->r_feature_needed, &session->s_features)) {
err = -EOPNOTSUPP;
goto out_session;
}
if (session->s_state != CEPH_MDS_SESSION_OPEN && if (session->s_state != CEPH_MDS_SESSION_OPEN &&
session->s_state != CEPH_MDS_SESSION_HUNG) { session->s_state != CEPH_MDS_SESSION_HUNG) {
/* /*
......
...@@ -31,8 +31,9 @@ enum ceph_feature_type { ...@@ -31,8 +31,9 @@ enum ceph_feature_type {
CEPHFS_FEATURE_METRIC_COLLECT, CEPHFS_FEATURE_METRIC_COLLECT,
CEPHFS_FEATURE_ALTERNATE_NAME, CEPHFS_FEATURE_ALTERNATE_NAME,
CEPHFS_FEATURE_NOTIFY_SESSION_STATE, CEPHFS_FEATURE_NOTIFY_SESSION_STATE,
CEPHFS_FEATURE_OP_GETVXATTR,
CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_NOTIFY_SESSION_STATE, CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_OP_GETVXATTR,
}; };
#define CEPHFS_FEATURES_CLIENT_SUPPORTED { \ #define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
...@@ -44,6 +45,7 @@ enum ceph_feature_type { ...@@ -44,6 +45,7 @@ enum ceph_feature_type {
CEPHFS_FEATURE_DELEG_INO, \ CEPHFS_FEATURE_DELEG_INO, \
CEPHFS_FEATURE_METRIC_COLLECT, \ CEPHFS_FEATURE_METRIC_COLLECT, \
CEPHFS_FEATURE_NOTIFY_SESSION_STATE, \ CEPHFS_FEATURE_NOTIFY_SESSION_STATE, \
CEPHFS_FEATURE_OP_GETVXATTR, \
} }
/* /*
...@@ -336,6 +338,8 @@ struct ceph_mds_request { ...@@ -336,6 +338,8 @@ struct ceph_mds_request {
long long r_dir_ordered_cnt; long long r_dir_ordered_cnt;
int r_readdir_cache_idx; int r_readdir_cache_idx;
int r_feature_needed;
struct ceph_cap_reservation r_caps_reservation; struct ceph_cap_reservation r_caps_reservation;
}; };
......
...@@ -207,7 +207,6 @@ struct ceph_msg_data_cursor { ...@@ -207,7 +207,6 @@ struct ceph_msg_data_cursor {
struct ceph_msg_data *data; /* current data item */ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */ size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */ bool need_crc; /* crc update needed */
union { union {
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
...@@ -498,8 +497,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq); ...@@ -498,8 +497,7 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq);
void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
struct ceph_msg *msg, size_t length); struct ceph_msg *msg, size_t length);
struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset, size_t *length, size_t *page_offset, size_t *length);
bool *last_piece);
void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes); void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes);
u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
......
...@@ -728,7 +728,6 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -728,7 +728,6 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
it->iter.bi_size = cursor->resid; it->iter.bi_size = cursor->resid;
BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
} }
static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
...@@ -754,10 +753,8 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, ...@@ -754,10 +753,8 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
cursor->resid -= bytes; cursor->resid -= bytes;
bio_advance_iter(it->bio, &it->iter, bytes); bio_advance_iter(it->bio, &it->iter, bytes);
if (!cursor->resid) { if (!cursor->resid)
BUG_ON(!cursor->last_piece);
return false; /* no more data */ return false; /* no more data */
}
if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done && if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done &&
page == bio_iter_page(it->bio, it->iter))) page == bio_iter_page(it->bio, it->iter)))
...@@ -770,9 +767,7 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, ...@@ -770,9 +767,7 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
it->iter.bi_size = cursor->resid; it->iter.bi_size = cursor->resid;
} }
BUG_ON(cursor->last_piece);
BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
return true; return true;
} }
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
...@@ -788,8 +783,6 @@ static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -788,8 +783,6 @@ static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
cursor->bvec_iter.bi_size = cursor->resid; cursor->bvec_iter.bi_size = cursor->resid;
BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
cursor->last_piece =
cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
} }
static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor, static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
...@@ -815,19 +808,14 @@ static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor, ...@@ -815,19 +808,14 @@ static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
cursor->resid -= bytes; cursor->resid -= bytes;
bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes); bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
if (!cursor->resid) { if (!cursor->resid)
BUG_ON(!cursor->last_piece);
return false; /* no more data */ return false; /* no more data */
}
if (!bytes || (cursor->bvec_iter.bi_bvec_done && if (!bytes || (cursor->bvec_iter.bi_bvec_done &&
page == bvec_iter_page(bvecs, cursor->bvec_iter))) page == bvec_iter_page(bvecs, cursor->bvec_iter)))
return false; /* more bytes to process in this segment */ return false; /* more bytes to process in this segment */
BUG_ON(cursor->last_piece);
BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
cursor->last_piece =
cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
return true; return true;
} }
...@@ -853,7 +841,6 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -853,7 +841,6 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(page_count > (int)USHRT_MAX); BUG_ON(page_count > (int)USHRT_MAX);
cursor->page_count = (unsigned short)page_count; cursor->page_count = (unsigned short)page_count;
BUG_ON(length > SIZE_MAX - cursor->page_offset); BUG_ON(length > SIZE_MAX - cursor->page_offset);
cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE;
} }
static struct page * static struct page *
...@@ -868,11 +855,7 @@ ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor, ...@@ -868,11 +855,7 @@ ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
BUG_ON(cursor->page_offset >= PAGE_SIZE); BUG_ON(cursor->page_offset >= PAGE_SIZE);
*page_offset = cursor->page_offset; *page_offset = cursor->page_offset;
if (cursor->last_piece) *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
*length = cursor->resid;
else
*length = PAGE_SIZE - *page_offset;
return data->pages[cursor->page_index]; return data->pages[cursor->page_index];
} }
...@@ -897,8 +880,6 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, ...@@ -897,8 +880,6 @@ static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor,
BUG_ON(cursor->page_index >= cursor->page_count); BUG_ON(cursor->page_index >= cursor->page_count);
cursor->page_index++; cursor->page_index++;
cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true; return true;
} }
...@@ -928,7 +909,6 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -928,7 +909,6 @@ ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
cursor->resid = min(length, pagelist->length); cursor->resid = min(length, pagelist->length);
cursor->page = page; cursor->page = page;
cursor->offset = 0; cursor->offset = 0;
cursor->last_piece = cursor->resid <= PAGE_SIZE;
} }
static struct page * static struct page *
...@@ -948,11 +928,7 @@ ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor, ...@@ -948,11 +928,7 @@ ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
/* offset of first page in pagelist is always 0 */ /* offset of first page in pagelist is always 0 */
*page_offset = cursor->offset & ~PAGE_MASK; *page_offset = cursor->offset & ~PAGE_MASK;
if (cursor->last_piece) *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset);
*length = cursor->resid;
else
*length = PAGE_SIZE - *page_offset;
return cursor->page; return cursor->page;
} }
...@@ -985,8 +961,6 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, ...@@ -985,8 +961,6 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
cursor->page = list_next_entry(cursor->page, lru); cursor->page = list_next_entry(cursor->page, lru);
cursor->last_piece = cursor->resid <= PAGE_SIZE;
return true; return true;
} }
...@@ -1044,8 +1018,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -1044,8 +1018,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
* Indicate whether this is the last piece in this data item. * Indicate whether this is the last piece in this data item.
*/ */
struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset, size_t *length, size_t *page_offset, size_t *length)
bool *last_piece)
{ {
struct page *page; struct page *page;
...@@ -1074,8 +1047,6 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, ...@@ -1074,8 +1047,6 @@ struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
BUG_ON(*page_offset + *length > PAGE_SIZE); BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length); BUG_ON(!*length);
BUG_ON(*length > cursor->resid); BUG_ON(*length > cursor->resid);
if (last_piece)
*last_piece = cursor->last_piece;
return page; return page;
} }
...@@ -1112,7 +1083,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) ...@@ -1112,7 +1083,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
cursor->total_resid -= bytes; cursor->total_resid -= bytes;
if (!cursor->resid && cursor->total_resid) { if (!cursor->resid && cursor->total_resid) {
WARN_ON(!cursor->last_piece);
cursor->data++; cursor->data++;
__ceph_msg_data_cursor_init(cursor); __ceph_msg_data_cursor_init(cursor);
new_piece = true; new_piece = true;
......
...@@ -495,7 +495,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -495,7 +495,7 @@ static int write_partial_message_data(struct ceph_connection *con)
continue; continue;
} }
page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); page = ceph_msg_data_next(cursor, &page_offset, &length);
if (length == cursor->total_resid) if (length == cursor->total_resid)
more = MSG_MORE; more = MSG_MORE;
ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
...@@ -1008,7 +1008,7 @@ static int read_partial_msg_data(struct ceph_connection *con) ...@@ -1008,7 +1008,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
continue; continue;
} }
page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); page = ceph_msg_data_next(cursor, &page_offset, &length);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) { if (ret <= 0) {
if (do_datacrc) if (do_datacrc)
...@@ -1050,7 +1050,7 @@ static int read_partial_msg_data_bounce(struct ceph_connection *con) ...@@ -1050,7 +1050,7 @@ static int read_partial_msg_data_bounce(struct ceph_connection *con)
continue; continue;
} }
page = ceph_msg_data_next(cursor, &off, &len, NULL); page = ceph_msg_data_next(cursor, &off, &len);
ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len); ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
if (ret <= 0) { if (ret <= 0) {
con->in_data_crc = crc; con->in_data_crc = crc;
......
...@@ -862,7 +862,7 @@ static void get_bvec_at(struct ceph_msg_data_cursor *cursor, ...@@ -862,7 +862,7 @@ static void get_bvec_at(struct ceph_msg_data_cursor *cursor,
ceph_msg_data_advance(cursor, 0); ceph_msg_data_advance(cursor, 0);
/* get a piece of data, cursor isn't advanced */ /* get a piece of data, cursor isn't advanced */
page = ceph_msg_data_next(cursor, &off, &len, NULL); page = ceph_msg_data_next(cursor, &off, &len);
bv->bv_page = page; bv->bv_page = page;
bv->bv_offset = off; bv->bv_offset = off;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment