Commit 57666509 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
 "The big item here is support for inline data for CephFS and for
  message signatures from Zheng.  There are also several bug fixes,
  including interrupted flock request handling, 0-length xattrs, mksnap,
  cached readdir results, and a message version compat field.  Finally
  there are several cleanups from Ilya, Dan, and Markus.

  Note that there is another series coming soon that fixes some bugs in
  the RBD 'lingering' requests, but it isn't quite ready yet"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits)
  ceph: fix setting empty extended attribute
  ceph: fix mksnap crash
  ceph: do_sync is never initialized
  libceph: fixup includes in pagelist.h
  ceph: support inline data feature
  ceph: flush inline version
  ceph: convert inline data to normal data before data write
  ceph: sync read inline data
  ceph: fetch inline data when getting Fcr cap refs
  ceph: use getattr request to fetch inline data
  ceph: add inline data to pagecache
  ceph: parse inline data in MClientReply and MClientCaps
  libceph: specify position of extent operation
  libceph: add CREATE osd operation support
  libceph: add SETXATTR/CMPXATTR osd operations support
  rbd: don't treat CEPH_OSD_OP_DELETE as extent op
  ceph: remove unused stringification macros
  libceph: require cephx message signature by default
  ceph: introduce global empty snap context
  ceph: message versioning fixes
  ...
parents 87c31b39 0aeff37a
......@@ -2370,8 +2370,12 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
opcode = CEPH_OSD_OP_READ;
}
osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
0, 0);
if (opcode == CEPH_OSD_OP_DELETE)
osd_req_op_init(osd_request, num_ops, opcode);
else
osd_req_op_extent_init(osd_request, num_ops, opcode,
offset, length, 0, 0);
if (obj_request->type == OBJ_REQUEST_BIO)
osd_req_op_extent_osd_data_bio(osd_request, num_ops,
obj_request->bio_list, length);
......@@ -3405,8 +3409,7 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
if (result)
rbd_warn(rbd_dev, "%s %llx at %llx result %d",
obj_op_name(op_type), length, offset, result);
if (snapc)
ceph_put_snap_context(snapc);
ceph_put_snap_context(snapc);
blk_end_request_all(rq, result);
}
......
......@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc;
int err = 0;
u64 off = page_offset(page);
u64 len = PAGE_CACHE_SIZE;
err = ceph_readpage_from_fscache(inode, page);
if (off >= i_size_read(inode)) {
zero_user_segment(page, err, PAGE_CACHE_SIZE);
SetPageUptodate(page);
return 0;
}
/*
* Uptodate inline data should have been added into page cache
* while getting Fcr caps.
*/
if (ci->i_inline_version != CEPH_INLINE_NONE)
return -EINVAL;
err = ceph_readpage_from_fscache(inode, page);
if (err == 0)
goto out;
dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
(u64) page_offset(page), &len,
off, &len,
ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0);
if (err == -ENOENT)
......@@ -319,7 +332,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
off, len);
vino = ceph_vino(inode);
req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
1, CEPH_OSD_OP_READ,
0, 1, CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size,
false);
......@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
int rc = 0;
int max = 0;
if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
return -EINVAL;
rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
&nr_pages);
......@@ -673,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping,
int rc = 0;
unsigned wsize = 1 << inode->i_blkbits;
struct ceph_osd_request *req = NULL;
int do_sync;
int do_sync = 0;
u64 truncate_size, snap_size;
u32 truncate_seq;
......@@ -750,7 +766,6 @@ static int ceph_writepages_start(struct address_space *mapping,
last_snapc = snapc;
while (!done && index <= end) {
int num_ops = do_sync ? 2 : 1;
unsigned i;
int first;
pgoff_t next;
......@@ -850,7 +865,8 @@ static int ceph_writepages_start(struct address_space *mapping,
len = wsize;
req = ceph_osdc_new_request(&fsc->client->osdc,
&ci->i_layout, vino,
offset, &len, num_ops,
offset, &len, 0,
do_sync ? 2 : 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
......@@ -862,6 +878,9 @@ static int ceph_writepages_start(struct address_space *mapping,
break;
}
if (do_sync)
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
req->r_callback = writepages_finish;
req->r_inode = inode;
......@@ -1204,6 +1223,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
struct page *pinned_page = NULL;
loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
int want, got, ret;
......@@ -1215,7 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_CACHE;
while (1) {
got = 0;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
-1, &got, &pinned_page);
if (ret == 0)
break;
if (ret != -ERESTARTSYS) {
......@@ -1226,12 +1247,54 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
ret = filemap_fault(vma, vmf);
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE)
ret = filemap_fault(vma, vmf);
else
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
if (pinned_page)
page_cache_release(pinned_page);
ceph_put_cap_refs(ci, got);
if (ret != -EAGAIN)
return ret;
/* read inline data */
if (off >= PAGE_CACHE_SIZE) {
/* does not support inline data > PAGE_SIZE */
ret = VM_FAULT_SIGBUS;
} else {
int ret1;
struct address_space *mapping = inode->i_mapping;
struct page *page = find_or_create_page(mapping, 0,
mapping_gfp_mask(mapping) &
~__GFP_FS);
if (!page) {
ret = VM_FAULT_OOM;
goto out;
}
ret1 = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 < 0 || off >= i_size_read(inode)) {
unlock_page(page);
page_cache_release(page);
ret = VM_FAULT_SIGBUS;
goto out;
}
if (ret1 < PAGE_CACHE_SIZE)
zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
else
flush_dcache_page(page);
SetPageUptodate(page);
vmf->page = page;
ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
}
out:
dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ret);
return ret;
}
......@@ -1250,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
size_t len;
int want, got, ret;
if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
if (off == 0) {
lock_page(page);
locked_page = page;
}
ret = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
if (ret < 0)
return VM_FAULT_SIGBUS;
}
if (off + PAGE_CACHE_SIZE <= size)
len = PAGE_CACHE_SIZE;
else
......@@ -1263,7 +1339,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_BUFFER;
while (1) {
got = 0;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
&got, NULL);
if (ret == 0)
break;
if (ret != -ERESTARTSYS) {
......@@ -1297,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
ret = VM_FAULT_SIGBUS;
}
out:
if (ret != VM_FAULT_LOCKED) {
if (ret != VM_FAULT_LOCKED)
unlock_page(page);
} else {
if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) {
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
......@@ -1315,6 +1394,178 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
return ret;
}
void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
char *data, size_t len)
{
struct address_space *mapping = inode->i_mapping;
struct page *page;
if (locked_page) {
page = locked_page;
} else {
if (i_size_read(inode) == 0)
return;
page = find_or_create_page(mapping, 0,
mapping_gfp_mask(mapping) & ~__GFP_FS);
if (!page)
return;
if (PageUptodate(page)) {
unlock_page(page);
page_cache_release(page);
return;
}
}
dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
inode, ceph_vinop(inode), len, locked_page);
if (len > 0) {
void *kaddr = kmap_atomic(page);
memcpy(kaddr, data, len);
kunmap_atomic(kaddr);
}
if (page != locked_page) {
if (len < PAGE_CACHE_SIZE)
zero_user_segment(page, len, PAGE_CACHE_SIZE);
else
flush_dcache_page(page);
SetPageUptodate(page);
unlock_page(page);
page_cache_release(page);
}
}
int ceph_uninline_data(struct file *filp, struct page *locked_page)
{
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req;
struct page *page = NULL;
u64 len, inline_version;
int err = 0;
bool from_pagecache = false;
spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
spin_unlock(&ci->i_ceph_lock);
dout("uninline_data %p %llx.%llx inline_version %llu\n",
inode, ceph_vinop(inode), inline_version);
if (inline_version == 1 || /* initial version, no data */
inline_version == CEPH_INLINE_NONE)
goto out;
if (locked_page) {
page = locked_page;
WARN_ON(!PageUptodate(page));
} else if (ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
from_pagecache = true;
lock_page(page);
} else {
page_cache_release(page);
page = NULL;
}
}
}
if (page) {
len = i_size_read(inode);
if (len > PAGE_CACHE_SIZE)
len = PAGE_CACHE_SIZE;
} else {
page = __page_cache_alloc(GFP_NOFS);
if (!page) {
err = -ENOMEM;
goto out;
}
err = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (err < 0) {
/* no inline data */
if (err == -ENODATA)
err = 0;
goto out;
}
len = err;
}
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 0, 1,
CEPH_OSD_OP_CREATE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ci->i_snap_realm->cached_context,
0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_osdc_put_request(req);
if (err < 0)
goto out;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 1, 3,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
ci->i_snap_realm->cached_context,
ci->i_truncate_seq, ci->i_truncate_size,
false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
"inline_version", &inline_version,
sizeof(inline_version),
CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64);
if (err)
goto out_put;
err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
"inline_version", &inline_version,
sizeof(inline_version), 0, 0);
if (err)
goto out_put;
ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
out_put:
ceph_osdc_put_request(req);
if (err == -ECANCELED)
err = 0;
out:
if (page && page != locked_page) {
if (from_pagecache) {
unlock_page(page);
page_cache_release(page);
} else
__free_pages(page, 0);
}
dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
inode, ceph_vinop(inode), inline_version, err);
return err;
}
static struct vm_operations_struct ceph_vmops = {
.fault = ceph_filemap_fault,
.page_mkwrite = ceph_page_mkwrite,
......
......@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version,
struct ceph_buffer *xattrs_buf,
u64 follows)
u64 follows, bool inline_data)
{
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
void *p;
size_t extra_len;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu"
......@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
/* flock buffer size + inline version + inline data size */
extra_len = 4 + 8 + 4;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
......@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
fc->mode = cpu_to_le32(mode);
p = fc + 1;
/* flock buffer size */
ceph_encode_32(&p, 0);
/* inline version */
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
msg->middle = ceph_buffer_get(xattrs_buf);
......@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
u64 flush_tid = 0;
int i;
int ret;
bool inline_data;
held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued;
......@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
xattr_version = ci->i_xattrs.version;
}
inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows);
follows, inline_data);
if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1;
......@@ -1336,7 +1352,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows);
capsnap->follows, capsnap->inline_data);
next_follows = capsnap->follows + 1;
ceph_put_cap_snap(capsnap);
......@@ -2057,15 +2073,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
int *got, loff_t endoff, int *check_max, int *err)
loff_t endoff, int *got, struct page **pinned_page,
int *check_max, int *err)
{
struct inode *inode = &ci->vfs_inode;
int ret = 0;
int have, implemented;
int have, implemented, _got = 0;
int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
again:
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
......@@ -2075,7 +2093,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF;
ret = 1;
goto out;
goto out_unlock;
}
/* finish pending truncate */
......@@ -2095,7 +2113,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*check_max = 1;
ret = 1;
}
goto out;
goto out_unlock;
}
/*
* If a sync write is in progress, we must wait, so that we
......@@ -2103,7 +2121,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*/
if (__ceph_have_pending_cap_snap(ci)) {
dout("get_cap_refs %p cap_snap_pending\n", inode);
goto out;
goto out_unlock;
}
}
......@@ -2120,18 +2138,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
*got = need | (have & want);
__take_cap_refs(ci, *got);
_got = need | (have & want);
__take_cap_refs(ci, _got);
ret = 1;
}
} else {
dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need));
}
out:
out_unlock:
spin_unlock(&ci->i_ceph_lock);
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(inode) > 0) {
int ret1;
struct page *page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
goto out;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while holding
* caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/* getattr request will bring inline data into page cache */
ret1 = __ceph_do_getattr(inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 >= 0) {
ret = 0;
goto again;
}
*err = ret1;
ret = 1;
}
out:
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
ret, ceph_cap_string(_got));
*got = _got;
return ret;
}
......@@ -2168,8 +2218,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
* due to a small max_size, make sure we check_max_size (and possibly
* ask the mds) so we don't get hung up indefinitely.
*/
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
loff_t endoff)
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
int check_max, ret, err;
......@@ -2179,8 +2229,8 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
check_max = 0;
err = 0;
ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want,
got, endoff,
try_get_cap_refs(ci, need, want, endoff,
got, pinned_page,
&check_max, &err));
if (err)
ret = err;
......@@ -2383,6 +2433,8 @@ static void invalidate_aliases(struct inode *inode)
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
void *snaptrace, int snaptrace_len,
u64 inline_version,
void *inline_data, int inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
......@@ -2403,6 +2455,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool queue_invalidate = false;
bool queue_revalidate = false;
bool deleted_inode = false;
bool fill_inline = false;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps));
......@@ -2576,6 +2629,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
BUG_ON(cap->issued & ~cap->implemented);
if (inline_version > 0 && inline_version >= ci->i_inline_version) {
ci->i_inline_version = inline_version;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
fill_inline = true;
}
spin_unlock(&ci->i_ceph_lock);
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
......@@ -2589,6 +2649,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
wake = true;
}
if (fill_inline)
ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
if (queue_trunc) {
ceph_queue_vmtruncate(inode);
ceph_queue_revalidate(inode);
......@@ -2996,11 +3059,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 cap_id;
u64 size, max_size;
u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
void *flock;
void *end;
u32 flock_len;
void *p, *end;
dout("handle_caps from mds%d\n", mds);
......@@ -3021,30 +3085,37 @@ void ceph_handle_caps(struct ceph_mds_session *session,
snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len);
p = snaptrace + snaptrace_len;
if (le16_to_cpu(msg->hdr.version) >= 2) {
void *p = snaptrace + snaptrace_len;
u32 flock_len;
ceph_decode_32_safe(&p, end, flock_len, bad);
if (p + flock_len > end)
goto bad;
flock = p;
} else {
flock = NULL;
flock_len = 0;
p += flock_len;
}
if (le16_to_cpu(msg->hdr.version) >= 3) {
if (op == CEPH_CAP_OP_IMPORT) {
void *p = flock + flock_len;
if (p + sizeof(*peer) > end)
goto bad;
peer = p;
p += sizeof(*peer);
} else if (op == CEPH_CAP_OP_EXPORT) {
/* recorded in unused fields */
peer = (void *)&h->size;
}
}
if (le16_to_cpu(msg->hdr.version) >= 4) {
ceph_decode_64_safe(&p, end, inline_version, bad);
ceph_decode_32_safe(&p, end, inline_len, bad);
if (p + inline_len > end)
goto bad;
inline_data = p;
p += inline_len;
}
/* lookup ino */
inode = ceph_find_inode(sb, vino);
ci = ceph_inode(inode);
......@@ -3085,6 +3156,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked;
}
......@@ -3105,8 +3177,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
session, cap, issued);
handle_cap_grant(mdsc, inode, h, NULL, 0,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
......@@ -3137,8 +3210,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
if (inode)
iput(inode);
iput(inode);
return;
bad:
......
......@@ -183,7 +183,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx,
spin_unlock(&parent->d_lock);
/* make sure a dentry wasn't dropped while we didn't have parent lock */
if (!ceph_dir_is_complete(dir)) {
if (!ceph_dir_is_complete_ordered(dir)) {
dout(" lost dir complete on %p; falling back to mds\n", dir);
dput(dentry);
err = -EAGAIN;
......@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* always start with . and .. */
if (ctx->pos == 0) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
fi->dir_release_count = atomic_read(&ci->i_release_count);
dout("readdir off 0 -> '.'\n");
if (!dir_emit(ctx, ".", 1,
ceph_translate_ino(inode->i_sb, inode->i_ino),
......@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
if ((ctx->pos == 2 || fi->dentry) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete(ci) &&
__ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock);
......@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
/* proceed with a normal readdir */
if (ctx->pos == 2) {
/* note dir version at start of readdir so we can tell
* if any dentries get dropped */
fi->dir_release_count = atomic_read(&ci->i_release_count);
fi->dir_ordered_count = ci->i_ordered_count;
}
more:
/* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) {
......@@ -446,8 +449,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
*/
spin_lock(&ci->i_ceph_lock);
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
dout(" marking %p complete\n", inode);
__ceph_dir_set_complete(ci, fi->dir_release_count);
if (ci->i_ordered_count == fi->dir_ordered_count)
dout(" marking %p complete and ordered\n", inode);
else
dout(" marking %p complete\n", inode);
__ceph_dir_set_complete(ci, fi->dir_release_count,
fi->dir_ordered_count);
}
spin_unlock(&ci->i_ceph_lock);
......@@ -805,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
acls.pagelist = NULL;
}
err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && !req->r_reply_info.head->is_dentry)
if (!err &&
!req->r_reply_info.head->is_target &&
!req->r_reply_info.head->is_dentry)
err = ceph_handle_notrace_create(dir, dentry);
ceph_mdsc_put_request(req);
out:
......
......@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
return 0;
}
enum {
CHECK_EOF = 1,
READ_INLINE = 2,
};
/*
* Read a range of bytes striped over one or more objects. Iterate over
* objects we stripe over. (That's not atomic, but good enough for now.)
......@@ -412,7 +417,7 @@ static int striped_read(struct inode *inode,
ret = read;
/* did we bounce off eof? */
if (pos + left > inode->i_size)
*checkeof = 1;
*checkeof = CHECK_EOF;
}
dout("striped_read returns %d\n", ret);
......@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len,
vino, pos, &len, 0,
2,/*include a 'startsync' command*/
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
......@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
break;
}
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
n = iov_iter_get_pages_alloc(from, &pages, len, &start);
if (unlikely(n < 0)) {
ret = n;
......@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 1,
vino, pos, &len, 0, 1,
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
ci->i_truncate_size,
......@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
size_t len = iocb->ki_nbytes;
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct page *pinned_page = NULL;
ssize_t ret;
int want, got = 0;
int checkeof = 0, read = 0;
int retry_op = 0, read = 0;
again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
......@@ -815,7 +823,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
if (ret < 0)
return ret;
......@@ -827,8 +835,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
/* hmm, this isn't really async... */
ret = ceph_sync_read(iocb, to, &checkeof);
if (ci->i_inline_version == CEPH_INLINE_NONE) {
/* hmm, this isn't really async... */
ret = ceph_sync_read(iocb, to, &retry_op);
} else {
retry_op = READ_INLINE;
}
} else {
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
......@@ -838,13 +850,55 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) {
page_cache_release(pinned_page);
pinned_page = NULL;
}
ceph_put_cap_refs(ci, got);
if (retry_op && ret >= 0) {
int statret;
struct page *page = NULL;
loff_t i_size;
if (retry_op == READ_INLINE) {
page = __page_cache_alloc(GFP_NOFS);
if (!page)
return -ENOMEM;
}
if (checkeof && ret >= 0) {
int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
statret = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, !!page);
if (statret < 0) {
__free_page(page);
if (statret == -ENODATA) {
BUG_ON(retry_op != READ_INLINE);
goto again;
}
return statret;
}
i_size = i_size_read(inode);
if (retry_op == READ_INLINE) {
/* does not support inline data > PAGE_SIZE */
if (i_size > PAGE_CACHE_SIZE) {
ret = -EIO;
} else if (iocb->ki_pos < i_size) {
loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len);
if (statret < end)
zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page,
iocb->ki_pos & ~PAGE_MASK,
end - iocb->ki_pos, to);
iocb->ki_pos += ret;
} else {
ret = 0;
}
__free_pages(page, 0);
return ret;
}
/* hit EOF or hole? */
if (statret == 0 && iocb->ki_pos < inode->i_size &&
if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos,
......@@ -852,7 +906,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
read += ret;
len -= ret;
checkeof = 0;
retry_op = 0;
goto again;
}
}
......@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err)
goto out;
if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
goto out;
}
retry_snap:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
......@@ -922,7 +982,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
else
want = CEPH_CAP_FILE_BUFFER;
got = 0;
err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
&got, NULL);
if (err < 0)
goto out;
......@@ -969,6 +1030,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (written >= 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
......@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode),
offset, length,
1, op,
0, 1, op,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
NULL, 0, 0, false);
......@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
if (ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file, NULL);
if (ret < 0)
goto unlock;
}
size = i_size_read(inode);
if (!(mode & FALLOC_FL_KEEP_SIZE))
endoff = offset + length;
......@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
else
want = CEPH_CAP_FILE_BUFFER;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0)
goto unlock;
......@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
if (!ret) {
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
......
......@@ -387,8 +387,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
spin_lock_init(&ci->i_ceph_lock);
ci->i_version = 0;
ci->i_inline_version = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
ci->i_ordered_count = 0;
atomic_set(&ci->i_release_count, 1);
atomic_set(&ci->i_complete_count, 0);
ci->i_symlink = NULL;
......@@ -657,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
* Populate an inode based on info from mds. May be called on new or
* existing inodes.
*/
static int fill_inode(struct inode *inode,
static int fill_inode(struct inode *inode, struct page *locked_page,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo,
struct ceph_mds_session *session,
......@@ -675,6 +677,7 @@ static int fill_inode(struct inode *inode,
bool wake = false;
bool queue_trunc = false;
bool new_version = false;
bool fill_inline = false;
dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
inode, ceph_vinop(inode), le64_to_cpu(info->version),
......@@ -845,7 +848,8 @@ static int fill_inode(struct inode *inode,
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
ci->i_ordered_count);
}
/* were we issued a capability? */
......@@ -873,8 +877,23 @@ static int fill_inode(struct inode *inode,
ceph_vinop(inode));
__ceph_get_fmode(ci, cap_fmode);
}
if (iinfo->inline_version > 0 &&
iinfo->inline_version >= ci->i_inline_version) {
int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
ci->i_inline_version = iinfo->inline_version;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(locked_page ||
(le32_to_cpu(info->cap.caps) & cache_caps)))
fill_inline = true;
}
spin_unlock(&ci->i_ceph_lock);
if (fill_inline)
ceph_fill_inline_data(inode, locked_page,
iinfo->inline_data, iinfo->inline_len);
if (wake)
wake_up_all(&ci->i_cap_wq);
......@@ -1062,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
struct inode *dir = req->r_locked_dir;
if (dir) {
err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
err = fill_inode(dir, NULL,
&rinfo->diri, rinfo->dirfrag,
session, req->r_request_started, -1,
&req->r_caps_reservation);
if (err < 0)
......@@ -1132,7 +1152,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
}
req->r_target_inode = in;
err = fill_inode(in, &rinfo->targeti, NULL,
err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
session, req->r_request_started,
(!req->r_aborted && rinfo->head->result == 0) ?
req->r_fmode : -1,
......@@ -1204,8 +1224,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
ceph_invalidate_dentry_lease(dn);
/* d_move screws up sibling dentries' offsets */
ceph_dir_clear_complete(dir);
ceph_dir_clear_complete(olddir);
ceph_dir_clear_ordered(dir);
ceph_dir_clear_ordered(olddir);
dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset);
......@@ -1217,6 +1237,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
if (!rinfo->head->is_target) {
dout("fill_trace null dentry\n");
if (dn->d_inode) {
ceph_dir_clear_ordered(dir);
dout("d_delete %p\n", dn);
d_delete(dn);
} else {
......@@ -1233,7 +1254,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
/* attach proper inode */
if (!dn->d_inode) {
ceph_dir_clear_complete(dir);
ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, &have_lease);
if (IS_ERR(dn)) {
......@@ -1263,7 +1284,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
BUG_ON(!dir);
BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
dout(" linking snapped dir %p to dn %p\n", in, dn);
ceph_dir_clear_complete(dir);
ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, NULL);
if (IS_ERR(dn)) {
......@@ -1300,7 +1321,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
dout("new_inode badness got %d\n", err);
continue;
}
rc = fill_inode(in, &rinfo->dir_in[i], NULL, session,
rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation);
if (rc < 0) {
......@@ -1416,7 +1437,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
}
}
if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
req->r_request_started, -1,
&req->r_caps_reservation) < 0) {
pr_err("fill_inode badness on %p\n", in);
......@@ -1899,7 +1920,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
* Verify that we have a lease on the given mask. If not,
* do a getattr against an mds.
*/
int ceph_do_getattr(struct inode *inode, int mask, bool force)
int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
......@@ -1911,7 +1933,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
return 0;
}
dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
dout("do_getattr inode %p mask %s mode 0%o\n",
inode, ceph_cap_string(mask), inode->i_mode);
if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0;
......@@ -1922,7 +1945,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force)
ihold(inode);
req->r_num_caps = 1;
req->r_args.getattr.mask = cpu_to_le32(mask);
req->r_locked_page = locked_page;
err = ceph_mdsc_do_request(mdsc, NULL, req);
if (locked_page && err == 0) {
u64 inline_version = req->r_reply_info.targeti.inline_version;
if (inline_version == 0) {
/* the reply is supposed to contain inline data */
err = -EINVAL;
} else if (inline_version == CEPH_INLINE_NONE) {
err = -ENODATA;
} else {
err = req->r_reply_info.targeti.inline_len;
}
}
ceph_mdsc_put_request(req);
dout("do_getattr result=%d\n", err);
return err;
......
......@@ -9,6 +9,8 @@
#include <linux/ceph/pagelist.h>
static u64 lock_secret;
static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req);
static inline u64 secure_addr(void *addr)
{
......@@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
u64 length = 0;
u64 owner;
if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
wait = 0;
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
......@@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait;
if (wait)
req->r_wait_for_completion = ceph_lock_wait_for_completion;
err = ceph_mdsc_do_request(mdsc, inode, req);
if (operation == CEPH_MDS_OP_GETFILELOCK) {
......@@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
return err;
}
static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
struct ceph_mds_request *intr_req;
struct inode *inode = req->r_inode;
int err, lock_type;
BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
lock_type = CEPH_LOCK_FCNTL_INTR;
else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
lock_type = CEPH_LOCK_FLOCK_INTR;
else
BUG_ON(1);
BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
err = wait_for_completion_interruptible(&req->r_completion);
if (!err)
return 0;
dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
req->r_tid);
intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
USE_AUTH_MDS);
if (IS_ERR(intr_req))
return PTR_ERR(intr_req);
intr_req->r_inode = inode;
ihold(inode);
intr_req->r_num_caps = 1;
intr_req->r_args.filelock_change = req->r_args.filelock_change;
intr_req->r_args.filelock_change.rule = lock_type;
intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
err = ceph_mdsc_do_request(mdsc, inode, intr_req);
ceph_mdsc_put_request(intr_req);
if (err && err != -ERESTARTSYS)
return err;
wait_for_completion(&req->r_completion);
return 0;
}
/**
* Attempt to set an fcntl lock.
* For now, this just goes away to the server. Later it may be more awesome.
......@@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
err);
}
}
} else if (err == -ERESTARTSYS) {
dout("undoing lock\n");
ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
......@@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
file, CEPH_LOCK_UNLOCK, 0, fl);
dout("got %d on flock_lock_file_wait, undid lock", err);
}
} else if (err == -ERESTARTSYS) {
dout("undoing lock\n");
ceph_lock_message(CEPH_LOCK_FLOCK,
CEPH_MDS_OP_SETFILELOCK,
file, CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
......
......@@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end,
ceph_decode_need(p, end, info->xattr_len, bad);
info->xattr_data = *p;
*p += info->xattr_len;
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
ceph_decode_64_safe(p, end, info->inline_version, bad);
ceph_decode_32_safe(p, end, info->inline_len, bad);
ceph_decode_need(p, end, info->inline_len, bad);
info->inline_data = *p;
*p += info->inline_len;
} else
info->inline_version = CEPH_INLINE_NONE;
return 0;
bad:
return err;
......@@ -524,8 +534,7 @@ void ceph_mdsc_release_request(struct kref *kref)
}
if (req->r_locked_dir)
ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
if (req->r_target_inode)
iput(req->r_target_inode);
iput(req->r_target_inode);
if (req->r_dentry)
dput(req->r_dentry);
if (req->r_old_dentry)
......@@ -861,8 +870,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
/*
* Serialize client metadata into waiting buffer space, using
* the format that userspace expects for map<string, string>
*
* ClientSession messages with metadata are v2
*/
msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */
msg->hdr.version = cpu_to_le16(2);
msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
p = msg->front.iov_base + sizeof(*h);
......@@ -1066,8 +1078,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
if (last_inode)
iput(last_inode);
iput(last_inode);
if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
......@@ -1874,7 +1885,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
goto out_free2;
}
msg->hdr.version = 2;
msg->hdr.version = cpu_to_le16(2);
msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base;
......@@ -2208,6 +2219,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
&req->r_completion, req->r_timeout);
if (err == 0)
err = -EIO;
} else if (req->r_wait_for_completion) {
err = req->r_wait_for_completion(mdsc, req);
} else {
err = wait_for_completion_killable(&req->r_completion);
}
......@@ -3744,6 +3757,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
return msg;
}
static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_mds_session *s = con->private;
struct ceph_auth_handshake *auth = &s->s_auth;
return ceph_auth_sign_message(auth, msg);
}
static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_mds_session *s = con->private;
struct ceph_auth_handshake *auth = &s->s_auth;
return ceph_auth_check_message_signature(auth, msg);
}
static const struct ceph_connection_operations mds_con_ops = {
.get = con_get,
.put = con_put,
......@@ -3753,6 +3780,8 @@ static const struct ceph_connection_operations mds_con_ops = {
.invalidate_authorizer = invalidate_authorizer,
.peer_reset = peer_reset,
.alloc_msg = mds_alloc_msg,
.sign_message = sign_message,
.check_message_signature = check_message_signature,
};
/* eof */
......@@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in {
char *symlink;
u32 xattr_len;
char *xattr_data;
u64 inline_version;
u32 inline_len;
char *inline_data;
};
/*
......@@ -166,6 +169,11 @@ struct ceph_mds_client;
*/
typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request *req);
/*
* wait for request completion callback
*/
typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request *req);
/*
* an in-flight mds request
......@@ -215,6 +223,7 @@ struct ceph_mds_request {
int r_request_release_offset;
struct ceph_msg *r_reply;
struct ceph_mds_reply_info_parsed r_reply_info;
struct page *r_locked_page;
int r_err;
bool r_aborted;
......@@ -239,6 +248,7 @@ struct ceph_mds_request {
struct completion r_completion;
struct completion r_safe_completion;
ceph_mds_request_callback_t r_callback;
ceph_mds_request_wait_callback_t r_wait_for_completion;
struct list_head r_unsafe_item; /* per-session unsafe list item */
bool r_got_unsafe, r_got_safe, r_got_result;
......
......@@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b)
return 0;
}
static struct ceph_snap_context *empty_snapc;
/*
* build the snap context for a given realm.
*/
......@@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm)
return 0;
}
if (num == 0 && realm->seq == empty_snapc->seq) {
ceph_get_snap_context(empty_snapc);
snapc = empty_snapc;
goto done;
}
/* alloc new snap context */
err = -ENOMEM;
if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
......@@ -365,8 +374,8 @@ static int build_snap_context(struct ceph_snap_realm *realm)
realm->ino, realm, snapc, snapc->seq,
(unsigned int) snapc->num_snaps);
if (realm->cached_context)
ceph_put_snap_context(realm->cached_context);
done:
ceph_put_snap_context(realm->cached_context);
realm->cached_context = snapc;
return 0;
......@@ -466,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
cap_snap. lucky us. */
dout("queue_cap_snap %p already pending\n", inode);
kfree(capsnap);
} else if (ci->i_snap_realm->cached_context == empty_snapc) {
dout("queue_cap_snap %p empty snapc\n", inode);
kfree(capsnap);
} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
struct ceph_snap_context *snapc = ci->i_head_snapc;
......@@ -504,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
capsnap->xattr_version = 0;
}
capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
/* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this
snapshot. */
......@@ -590,15 +604,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
if (!inode)
continue;
spin_unlock(&realm->inodes_with_caps_lock);
if (lastinode)
iput(lastinode);
iput(lastinode);
lastinode = inode;
ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock);
}
spin_unlock(&realm->inodes_with_caps_lock);
if (lastinode)
iput(lastinode);
iput(lastinode);
list_for_each_entry(child, &realm->children, child_item) {
dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
......@@ -928,5 +940,16 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
return;
}
int __init ceph_snap_init(void)
{
empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
if (!empty_snapc)
return -ENOMEM;
empty_snapc->seq = 1;
return 0;
}
void ceph_snap_exit(void)
{
ceph_put_snap_context(empty_snapc);
}
......@@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
struct ceph_fs_client *fsc;
const u64 supported_features =
CEPH_FEATURE_FLOCK |
CEPH_FEATURE_DIRLAYOUTHASH;
CEPH_FEATURE_DIRLAYOUTHASH |
CEPH_FEATURE_MDS_INLINE_DATA;
const u64 required_features = 0;
int page_count;
size_t size;
......@@ -1017,9 +1018,6 @@ static struct file_system_type ceph_fs_type = {
};
MODULE_ALIAS_FS("ceph");
#define _STRINGIFY(x) #x
#define STRINGIFY(x) _STRINGIFY(x)
static int __init init_ceph(void)
{
int ret = init_caches();
......@@ -1028,15 +1026,20 @@ static int __init init_ceph(void)
ceph_flock_init();
ceph_xattr_init();
ret = ceph_snap_init();
if (ret)
goto out_xattr;
ret = register_filesystem(&ceph_fs_type);
if (ret)
goto out_icache;
goto out_snap;
pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
return 0;
out_icache:
out_snap:
ceph_snap_exit();
out_xattr:
ceph_xattr_exit();
destroy_caches();
out:
......@@ -1047,6 +1050,7 @@ static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
unregister_filesystem(&ceph_fs_type);
ceph_snap_exit();
ceph_xattr_exit();
destroy_caches();
}
......
......@@ -161,6 +161,7 @@ struct ceph_cap_snap {
u64 time_warp_seq;
int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data;
};
static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
......@@ -253,9 +254,11 @@ struct ceph_inode_info {
spinlock_t i_ceph_lock;
u64 i_version;
u64 i_inline_version;
u32 i_time_warp_seq;
unsigned i_ceph_flags;
int i_ordered_count;
atomic_t i_release_count;
atomic_t i_complete_count;
......@@ -434,14 +437,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
/*
* Ceph inode.
*/
#define CEPH_I_NODELAY 4 /* do not delay cap release */
#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
#define CEPH_I_NODELAY 4 /* do not delay cap release */
#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
int release_count)
int release_count, int ordered_count)
{
atomic_set(&ci->i_complete_count, release_count);
if (ci->i_ordered_count == ordered_count)
ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
else
ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
}
static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
......@@ -455,16 +463,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
atomic_read(&ci->i_release_count);
}
static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
{
return __ceph_dir_is_complete(ci) &&
(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
}
static inline void ceph_dir_clear_complete(struct inode *inode)
{
__ceph_dir_clear_complete(ceph_inode(inode));
}
static inline bool ceph_dir_is_complete(struct inode *inode)
static inline void ceph_dir_clear_ordered(struct inode *inode)
{
return __ceph_dir_is_complete(ceph_inode(inode));
struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock);
ci->i_ordered_count++;
ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
spin_unlock(&ci->i_ceph_lock);
}
static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
bool ret;
spin_lock(&ci->i_ceph_lock);
ret = __ceph_dir_is_complete_ordered(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
}
/* find a specific frag @f */
extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
......@@ -580,6 +607,7 @@ struct ceph_file_info {
char *last_name; /* last entry in previous chunk */
struct dentry *dentry; /* next dentry (for dcache readdir) */
int dir_release_count;
int dir_ordered_count;
/* used for -o dirstat read() on directory thing */
char *dir_info;
......@@ -673,6 +701,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
extern int ceph_snap_init(void);
extern void ceph_snap_exit(void);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress
......@@ -715,7 +745,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode);
extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force);
static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
{
return __ceph_do_getattr(inode, NULL, mask, force);
}
extern int ceph_permission(struct inode *inode, int mask);
extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
......@@ -830,7 +865,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
int mds, int drop, int unless);
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
int *got, loff_t endoff);
loff_t endoff, int *got, struct page **pinned_page);
/* for counting open files by mode */
static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
......@@ -852,7 +887,9 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode,
int *opened);
extern int ceph_release(struct inode *inode, struct file *filp);
extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
char *data, size_t len);
int ceph_uninline_data(struct file *filp, struct page *locked_page);
/* dir.c */
extern const struct file_operations ceph_dir_fops;
extern const struct inode_operations ceph_dir_iops;
......
--- fs/ceph/super.h
+++ fs/ceph/super.h
@@ -254,6 +255,7 @@
spinlock_t i_ceph_lock;
u64 i_version;
+ u64 i_inline_version;
u32 i_time_warp_seq;
unsigned i_ceph_flags;
......@@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
struct ceph_pagelist *pagelist = NULL;
int err;
if (value) {
if (size > 0) {
/* copy value into pagelist */
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
......@@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
err = ceph_pagelist_append(pagelist, value, size);
if (err)
goto out;
} else {
} else if (!value) {
flags |= CEPH_XATTR_REMOVE;
}
......@@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
return generic_setxattr(dentry, name, value, size, flags);
if (size == 0)
value = ""; /* empty EA, do not remove */
return __ceph_setxattr(dentry, name, value, size, flags);
}
......
......@@ -13,6 +13,7 @@
struct ceph_auth_client;
struct ceph_authorizer;
struct ceph_msg;
struct ceph_auth_handshake {
struct ceph_authorizer *authorizer;
......@@ -20,6 +21,10 @@ struct ceph_auth_handshake {
size_t authorizer_buf_len;
void *authorizer_reply_buf;
size_t authorizer_reply_buf_len;
int (*sign_message)(struct ceph_auth_handshake *auth,
struct ceph_msg *msg);
int (*check_message_signature)(struct ceph_auth_handshake *auth,
struct ceph_msg *msg);
};
struct ceph_auth_client_ops {
......@@ -66,6 +71,11 @@ struct ceph_auth_client_ops {
void (*reset)(struct ceph_auth_client *ac);
void (*destroy)(struct ceph_auth_client *ac);
int (*sign_message)(struct ceph_auth_handshake *auth,
struct ceph_msg *msg);
int (*check_message_signature)(struct ceph_auth_handshake *auth,
struct ceph_msg *msg);
};
struct ceph_auth_client {
......@@ -113,4 +123,20 @@ extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac,
extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
int peer_type);
static inline int ceph_auth_sign_message(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
if (auth->sign_message)
return auth->sign_message(auth, msg);
return 0;
}
static inline
int ceph_auth_check_message_signature(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
if (auth->check_message_signature)
return auth->check_message_signature(auth, msg);
return 0;
}
#endif
......@@ -10,8 +10,7 @@
/*
* a simple reference counted buffer.
*
* use kmalloc for small sizes (<= one page), vmalloc for larger
* sizes.
* use kmalloc for smaller sizes, vmalloc for larger sizes.
*/
struct ceph_buffer {
struct kref kref;
......
......@@ -84,6 +84,7 @@ static inline u64 ceph_sanitize_features(u64 features)
CEPH_FEATURE_PGPOOL3 | \
CEPH_FEATURE_OSDENC | \
CEPH_FEATURE_CRUSH_TUNABLES | \
CEPH_FEATURE_MSG_AUTH | \
CEPH_FEATURE_CRUSH_TUNABLES2 | \
CEPH_FEATURE_REPLY_CREATE_INODE | \
CEPH_FEATURE_OSDHASHPSPOOL | \
......
......@@ -522,8 +522,11 @@ struct ceph_mds_reply_dirfrag {
__le32 dist[];
} __attribute__ ((packed));
#define CEPH_LOCK_FCNTL 1
#define CEPH_LOCK_FLOCK 2
#define CEPH_LOCK_FCNTL 1
#define CEPH_LOCK_FLOCK 2
#define CEPH_LOCK_FCNTL_INTR 3
#define CEPH_LOCK_FLOCK_INTR 4
#define CEPH_LOCK_SHARED 1
#define CEPH_LOCK_EXCL 2
......@@ -549,6 +552,7 @@ struct ceph_filelock {
int ceph_flags_to_mode(int flags);
#define CEPH_INLINE_NONE ((__u64)-1)
/* capability bits */
#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */
......@@ -613,6 +617,8 @@ int ceph_flags_to_mode(int flags);
CEPH_CAP_LINK_SHARED | \
CEPH_CAP_FILE_SHARED | \
CEPH_CAP_XATTR_SHARED)
#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
CEPH_CAP_FILE_RD)
#define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \
CEPH_CAP_LINK_SHARED | \
......
......@@ -29,6 +29,7 @@
#define CEPH_OPT_NOSHARE (1<<1) /* don't share client with other sbs */
#define CEPH_OPT_MYIP (1<<2) /* specified my ip */
#define CEPH_OPT_NOCRC (1<<3) /* no data crc on writes */
#define CEPH_OPT_NOMSGAUTH (1<<4) /* not require cephx message signature */
#define CEPH_OPT_DEFAULT (0)
......@@ -184,7 +185,6 @@ extern bool libceph_compatible(void *data);
extern const char *ceph_msg_type_name(int type);
extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
extern void *ceph_kvmalloc(size_t size, gfp_t flags);
extern void ceph_kvfree(const void *ptr);
extern struct ceph_options *ceph_parse_options(char *options,
const char *dev_name, const char *dev_name_end,
......
......@@ -42,6 +42,10 @@ struct ceph_connection_operations {
struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
int (*check_message_signature) (struct ceph_connection *con,
struct ceph_msg *msg);
};
/* use format string %s%d */
......@@ -142,7 +146,10 @@ struct ceph_msg_data_cursor {
*/
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
struct ceph_msg_footer footer; /* footer */
union {
struct ceph_msg_footer footer; /* footer */
struct ceph_msg_footer_old old_footer; /* old format footer */
};
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
......
......@@ -152,7 +152,8 @@ struct ceph_msg_header {
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
__le32 reserved;
__le16 compat_version;
__le16 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
......@@ -164,13 +165,21 @@ struct ceph_msg_header {
/*
* follows data payload
*/
struct ceph_msg_footer_old {
__le32 front_crc, middle_crc, data_crc;
__u8 flags;
} __attribute__ ((packed));
struct ceph_msg_footer {
__le32 front_crc, middle_crc, data_crc;
// sig holds the 64 bits of the digital signature for the message PLR
__le64 sig;
__u8 flags;
} __attribute__ ((packed));
#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */
#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */
#define CEPH_MSG_FOOTER_SIGNED (1<<2) /* msg was signed */
#endif
......@@ -86,6 +86,13 @@ struct ceph_osd_req_op {
u32 truncate_seq;
struct ceph_osd_data osd_data;
} extent;
struct {
__le32 name_len;
__le32 value_len;
__u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
__u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
struct ceph_osd_data osd_data;
} xattr;
struct {
const char *class_name;
const char *method_name;
......@@ -295,6 +302,9 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method);
extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode);
extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag);
......@@ -318,7 +328,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
struct ceph_file_layout *layout,
struct ceph_vino vino,
u64 offset, u64 *len,
int num_ops, int opcode, int flags,
unsigned int which, int num_ops,
int opcode, int flags,
struct ceph_snap_context *snapc,
u32 truncate_seq, u64 truncate_size,
bool use_mempool);
......
#ifndef __FS_CEPH_PAGELIST_H
#define __FS_CEPH_PAGELIST_H
#include <linux/list.h>
#include <asm/byteorder.h>
#include <linux/atomic.h>
#include <linux/list.h>
#include <linux/types.h>
struct ceph_pagelist {
struct list_head head;
......
......@@ -8,6 +8,7 @@
#include <linux/ceph/decode.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/messenger.h>
#include "crypto.h"
#include "auth_x.h"
......@@ -293,6 +294,11 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
dout("build_authorizer for %s %p\n",
ceph_entity_type_name(th->service), au);
ceph_crypto_key_destroy(&au->session_key);
ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
if (ret)
return ret;
maxlen = sizeof(*msg_a) + sizeof(msg_b) +
ceph_x_encrypt_buflen(ticket_blob_len);
dout(" need len %d\n", maxlen);
......@@ -302,8 +308,10 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
}
if (!au->buf) {
au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
if (!au->buf)
if (!au->buf) {
ceph_crypto_key_destroy(&au->session_key);
return -ENOMEM;
}
}
au->service = th->service;
au->secret_id = th->secret_id;
......@@ -329,7 +337,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
get_random_bytes(&au->nonce, sizeof(au->nonce));
msg_b.struct_v = 1;
msg_b.nonce = cpu_to_le64(au->nonce);
ret = ceph_x_encrypt(&th->session_key, &msg_b, sizeof(msg_b),
ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
p, end - p);
if (ret < 0)
goto out_buf;
......@@ -560,6 +568,8 @@ static int ceph_x_create_authorizer(
auth->authorizer_buf_len = au->buf->vec.iov_len;
auth->authorizer_reply_buf = au->reply_buf;
auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
auth->sign_message = ac->ops->sign_message;
auth->check_message_signature = ac->ops->check_message_signature;
return 0;
}
......@@ -588,17 +598,13 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len)
{
struct ceph_x_authorizer *au = (void *)a;
struct ceph_x_ticket_handler *th;
int ret = 0;
struct ceph_x_authorize_reply reply;
void *preply = &reply;
void *p = au->reply_buf;
void *end = p + sizeof(au->reply_buf);
th = get_ticket_handler(ac, au->service);
if (IS_ERR(th))
return PTR_ERR(th);
ret = ceph_x_decrypt(&th->session_key, &p, end, &preply, sizeof(reply));
ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
if (ret < 0)
return ret;
if (ret != sizeof(reply))
......@@ -618,6 +624,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
{
struct ceph_x_authorizer *au = (void *)a;
ceph_crypto_key_destroy(&au->session_key);
ceph_buffer_put(au->buf);
kfree(au);
}
......@@ -663,6 +670,59 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
memset(&th->validity, 0, sizeof(th->validity));
}
static int calcu_signature(struct ceph_x_authorizer *au,
struct ceph_msg *msg, __le64 *sig)
{
int ret;
char tmp_enc[40];
__le32 tmp[5] = {
16u, msg->hdr.crc, msg->footer.front_crc,
msg->footer.middle_crc, msg->footer.data_crc,
};
ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
tmp_enc, sizeof(tmp_enc));
if (ret < 0)
return ret;
*sig = *(__le64*)(tmp_enc + 4);
return 0;
}
static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
int ret;
if (!auth->authorizer)
return 0;
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &msg->footer.sig);
if (ret < 0)
return ret;
msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
return 0;
}
static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
__le64 sig_check;
int ret;
if (!auth->authorizer)
return 0;
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &sig_check);
if (ret < 0)
return ret;
if (sig_check == msg->footer.sig)
return 0;
if (msg->footer.flags & CEPH_MSG_FOOTER_SIGNED)
dout("ceph_x_check_message_signature %p has signature %llx "
"expect %llx\n", msg, msg->footer.sig, sig_check);
else
dout("ceph_x_check_message_signature %p sender did not set "
"CEPH_MSG_FOOTER_SIGNED\n", msg);
return -EBADMSG;
}
static const struct ceph_auth_client_ops ceph_x_ops = {
.name = "x",
......@@ -677,6 +737,8 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
.invalidate_authorizer = ceph_x_invalidate_authorizer,
.reset = ceph_x_reset,
.destroy = ceph_x_destroy,
.sign_message = ceph_x_sign_message,
.check_message_signature = ceph_x_check_message_signature,
};
......
......@@ -26,6 +26,7 @@ struct ceph_x_ticket_handler {
struct ceph_x_authorizer {
struct ceph_crypto_key session_key;
struct ceph_buffer *buf;
unsigned int service;
u64 nonce;
......
......@@ -6,7 +6,7 @@
#include <linux/ceph/buffer.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
#include <linux/ceph/libceph.h> /* for ceph_kvmalloc */
struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
{
......@@ -35,7 +35,7 @@ void ceph_buffer_release(struct kref *kref)
struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
dout("buffer_release %p\n", b);
ceph_kvfree(b->vec.iov_base);
kvfree(b->vec.iov_base);
kfree(b);
}
EXPORT_SYMBOL(ceph_buffer_release);
......
......@@ -184,14 +184,6 @@ void *ceph_kvmalloc(size_t size, gfp_t flags)
return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
}
void ceph_kvfree(const void *ptr)
{
if (is_vmalloc_addr(ptr))
vfree(ptr);
else
kfree(ptr);
}
static int parse_fsid(const char *str, struct ceph_fsid *fsid)
{
......@@ -245,6 +237,8 @@ enum {
Opt_noshare,
Opt_crc,
Opt_nocrc,
Opt_cephx_require_signatures,
Opt_nocephx_require_signatures,
};
static match_table_t opt_tokens = {
......@@ -263,6 +257,8 @@ static match_table_t opt_tokens = {
{Opt_noshare, "noshare"},
{Opt_crc, "crc"},
{Opt_nocrc, "nocrc"},
{Opt_cephx_require_signatures, "cephx_require_signatures"},
{Opt_nocephx_require_signatures, "nocephx_require_signatures"},
{-1, NULL}
};
......@@ -461,6 +457,12 @@ ceph_parse_options(char *options, const char *dev_name,
case Opt_nocrc:
opt->flags |= CEPH_OPT_NOCRC;
break;
case Opt_cephx_require_signatures:
opt->flags &= ~CEPH_OPT_NOMSGAUTH;
break;
case Opt_nocephx_require_signatures:
opt->flags |= CEPH_OPT_NOMSGAUTH;
break;
default:
BUG_ON(token);
......@@ -504,6 +506,9 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
init_waitqueue_head(&client->auth_wq);
client->auth_err = 0;
if (!ceph_test_opt(client, NOMSGAUTH))
required_features |= CEPH_FEATURE_MSG_AUTH;
client->extra_mon_dispatch = NULL;
client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
supported_features;
......
......@@ -1196,8 +1196,18 @@ static void prepare_write_message_footer(struct ceph_connection *con)
dout("prepare_write_message_footer %p\n", con);
con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer;
con->out_kvec[v].iov_len = sizeof(m->footer);
con->out_kvec_bytes += sizeof(m->footer);
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
if (con->ops->sign_message)
con->ops->sign_message(con, m);
else
m->footer.sig = 0;
con->out_kvec[v].iov_len = sizeof(m->footer);
con->out_kvec_bytes += sizeof(m->footer);
} else {
m->old_footer.flags = m->footer.flags;
con->out_kvec[v].iov_len = sizeof(m->old_footer);
con->out_kvec_bytes += sizeof(m->old_footer);
}
con->out_kvec_left++;
con->out_more = m->more_to_follow;
con->out_msg_done = true;
......@@ -2249,6 +2259,7 @@ static int read_partial_message(struct ceph_connection *con)
int ret;
unsigned int front_len, middle_len, data_len;
bool do_datacrc = !con->msgr->nocrc;
bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
u64 seq;
u32 crc;
......@@ -2361,12 +2372,21 @@ static int read_partial_message(struct ceph_connection *con)
}
/* footer */
size = sizeof (m->footer);
if (need_sign)
size = sizeof(m->footer);
else
size = sizeof(m->old_footer);
end += size;
ret = read_partial(con, end, size, &m->footer);
if (ret <= 0)
return ret;
if (!need_sign) {
m->footer.flags = m->old_footer.flags;
m->footer.sig = 0;
}
dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
m, front_len, m->footer.front_crc, middle_len,
m->footer.middle_crc, data_len, m->footer.data_crc);
......@@ -2390,6 +2410,12 @@ static int read_partial_message(struct ceph_connection *con)
return -EBADMSG;
}
if (need_sign && con->ops->check_message_signature &&
con->ops->check_message_signature(con, m)) {
pr_err("read_partial_message %p signature check failed\n", m);
return -EBADMSG;
}
return 1; /* done! */
}
......@@ -3288,7 +3314,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
static void ceph_msg_free(struct ceph_msg *m)
{
dout("%s %p\n", __func__, m);
ceph_kvfree(m->front.iov_base);
kvfree(m->front.iov_base);
kmem_cache_free(ceph_msg_cache, m);
}
......
......@@ -292,6 +292,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
ceph_osd_data_release(&op->cls.request_data);
ceph_osd_data_release(&op->cls.response_data);
break;
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
ceph_osd_data_release(&op->xattr.osd_data);
break;
default:
break;
}
......@@ -476,8 +480,7 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
opcode != CEPH_OSD_OP_TRUNCATE);
opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
op->extent.offset = offset;
op->extent.length = length;
......@@ -545,6 +548,39 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
}
EXPORT_SYMBOL(osd_req_op_cls_init);
int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
struct ceph_pagelist *pagelist;
size_t payload_len;
BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
if (!pagelist)
return -ENOMEM;
ceph_pagelist_init(pagelist);
payload_len = strlen(name);
op->xattr.name_len = payload_len;
ceph_pagelist_append(pagelist, name, payload_len);
op->xattr.value_len = size;
ceph_pagelist_append(pagelist, value, size);
payload_len += size;
op->xattr.cmp_op = cmp_op;
op->xattr.cmp_mode = cmp_mode;
ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
op->payload_len = payload_len;
return 0;
}
EXPORT_SYMBOL(osd_req_op_xattr_init);
void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
......@@ -626,7 +662,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
case CEPH_OSD_OP_ZERO:
case CEPH_OSD_OP_DELETE:
case CEPH_OSD_OP_TRUNCATE:
if (src->op == CEPH_OSD_OP_WRITE)
request_data_len = src->extent.length;
......@@ -676,6 +711,19 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->alloc_hint.expected_write_size =
cpu_to_le64(src->alloc_hint.expected_write_size);
break;
case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR:
dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
dst->xattr.cmp_op = src->xattr.cmp_op;
dst->xattr.cmp_mode = src->xattr.cmp_mode;
osd_data = &src->xattr.osd_data;
ceph_osdc_msg_data_add(req->r_request, osd_data);
request_data_len = osd_data->pagelist->length;
break;
case CEPH_OSD_OP_CREATE:
case CEPH_OSD_OP_DELETE:
break;
default:
pr_err("unsupported osd opcode %s\n",
ceph_osd_op_name(src->op));
......@@ -705,7 +753,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
struct ceph_vino vino,
u64 off, u64 *plen, int num_ops,
u64 off, u64 *plen,
unsigned int which, int num_ops,
int opcode, int flags,
struct ceph_snap_context *snapc,
u32 truncate_seq,
......@@ -716,13 +765,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
u64 objnum = 0;
u64 objoff = 0;
u64 objlen = 0;
u32 object_size;
u64 object_base;
int r;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
opcode != CEPH_OSD_OP_TRUNCATE);
opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
GFP_NOFS);
......@@ -738,29 +785,24 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
return ERR_PTR(r);
}
object_size = le32_to_cpu(layout->fl_object_size);
object_base = off - objoff;
if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
if (truncate_size <= object_base) {
truncate_size = 0;
} else {
truncate_size -= object_base;
if (truncate_size > object_size)
truncate_size = object_size;
if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
osd_req_op_init(req, which, opcode);
} else {
u32 object_size = le32_to_cpu(layout->fl_object_size);
u32 object_base = off - objoff;
if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
if (truncate_size <= object_base) {
truncate_size = 0;
} else {
truncate_size -= object_base;
if (truncate_size > object_size)
truncate_size = object_size;
}
}
osd_req_op_extent_init(req, which, opcode, objoff, objlen,
truncate_size, truncate_seq);
}
osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
truncate_size, truncate_seq);
/*
* A second op in the ops array means the caller wants to
* also issue a include a 'startsync' command so that the
* osd will flush data quickly.
*/
if (num_ops > 1)
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
......@@ -2626,7 +2668,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, *plen);
req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, truncate_seq, truncate_size,
false);
......@@ -2669,7 +2711,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
int page_align = off & ~PAGE_MASK;
BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
snapc, truncate_seq, truncate_size,
......@@ -2920,6 +2962,20 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}
static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_osd *o = con->private;
struct ceph_auth_handshake *auth = &o->o_auth;
return ceph_auth_sign_message(auth, msg);
}
static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
{
struct ceph_osd *o = con->private;
struct ceph_auth_handshake *auth = &o->o_auth;
return ceph_auth_check_message_signature(auth, msg);
}
static const struct ceph_connection_operations osd_con_ops = {
.get = get_osd_con,
.put = put_osd_con,
......@@ -2928,5 +2984,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
.sign_message = sign_message,
.check_message_signature = check_message_signature,
.fault = osd_reset,
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment