Commit d9d47901 authored by Jinshan Xiong's avatar Jinshan Xiong Committed by Greg Kroah-Hartman

staging/lustre/clio: collapse layer of cl_page

Move radix tree to osc layer to for performance improvement.
Signed-off-by: default avatarJinshan Xiong <jinshan.xiong@intel.com>
Reviewed-on: http://review.whamcloud.com/7892
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3321Reviewed-by: default avatarLai Siyao <lai.siyao@intel.com>
Reviewed-by: default avatarBobi Jam <bobijam@gmail.com>
Signed-off-by: default avatarOleg Drokin <green@linuxhacker.ru>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 2579d8d0
......@@ -388,6 +388,12 @@ struct cl_object_operations {
*/
int (*coo_glimpse)(const struct lu_env *env,
const struct cl_object *obj, struct ost_lvb *lvb);
/**
* Object prune method. Called when the layout is going to change on
* this object, therefore each layer has to clean up their cache,
* mainly pages and locks.
*/
int (*coo_prune)(const struct lu_env *env, struct cl_object *obj);
};
/**
......@@ -403,15 +409,9 @@ struct cl_object_header {
* mostly useless otherwise.
*/
/** @{ */
/** Lock protecting page tree. */
spinlock_t coh_page_guard;
/** Lock protecting lock list. */
spinlock_t coh_lock_guard;
/** @} locks */
/** Radix tree of cl_page's, cached for this object. */
struct radix_tree_root coh_tree;
/** # of pages in radix tree. */
unsigned long coh_pages;
/** List of cl_lock's granted for this object. */
struct list_head coh_locks;
......@@ -896,14 +896,6 @@ struct cl_page_operations {
*/
void (*cpo_export)(const struct lu_env *env,
const struct cl_page_slice *slice, int uptodate);
/**
* Unmaps page from the user space (if it is mapped).
*
* \see cl_page_unmap()
* \see vvp_page_unmap()
*/
int (*cpo_unmap)(const struct lu_env *env,
const struct cl_page_slice *slice, struct cl_io *io);
/**
* Checks whether underlying VM page is locked (in the suitable
* sense). Used for assertions.
......@@ -2794,19 +2786,13 @@ enum {
};
/* callback of cl_page_gang_lookup() */
typedef int (*cl_page_gang_cb_t) (const struct lu_env *, struct cl_io *,
struct cl_page *, void *);
int cl_page_gang_lookup(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io, pgoff_t start, pgoff_t end,
cl_page_gang_cb_t cb, void *cbdata);
struct cl_page *cl_page_lookup(struct cl_object_header *hdr, pgoff_t index);
struct cl_page *cl_page_find(const struct lu_env *env, struct cl_object *obj,
pgoff_t idx, struct page *vmpage,
enum cl_page_type type);
struct cl_page *cl_page_find_sub(const struct lu_env *env,
struct cl_object *obj,
pgoff_t idx, struct page *vmpage,
struct cl_page *parent);
struct cl_page *cl_page_alloc(const struct lu_env *env,
struct cl_object *o, pgoff_t ind,
struct page *vmpage,
enum cl_page_type type);
void cl_page_get(struct cl_page *page);
void cl_page_put(const struct lu_env *env, struct cl_page *page);
void cl_page_print(const struct lu_env *env, void *cookie, lu_printer_t printer,
......@@ -2872,8 +2858,6 @@ int cl_page_flush(const struct lu_env *env, struct cl_io *io,
void cl_page_discard(const struct lu_env *env, struct cl_io *io,
struct cl_page *pg);
void cl_page_delete(const struct lu_env *env, struct cl_page *pg);
int cl_page_unmap(const struct lu_env *env, struct cl_io *io,
struct cl_page *pg);
int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg);
void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate);
int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
......
......@@ -442,7 +442,7 @@ static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
cl_page_list_add(queue, page);
rc = 1;
} else {
cl_page_delete(env, page);
cl_page_discard(env, io, page);
rc = -ENOLCK;
}
} else {
......
......@@ -95,11 +95,7 @@ static void ll_invalidatepage(struct page *vmpage, unsigned int offset,
if (obj) {
page = cl_vmpage_page(vmpage, obj);
if (page) {
lu_ref_add(&page->cp_reference,
"delete", vmpage);
cl_page_delete(env, page);
lu_ref_del(&page->cp_reference,
"delete", vmpage);
cl_page_put(env, page);
}
} else
......
......@@ -36,6 +36,7 @@
* cl_device and cl_device_type implementation for VVP layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_LLITE
......@@ -356,23 +357,18 @@ static loff_t vvp_pgcache_find(const struct lu_env *env,
return ~0ULL;
clob = vvp_pgcache_obj(env, dev, &id);
if (clob) {
struct cl_object_header *hdr;
int nr;
struct cl_page *pg;
struct inode *inode = ccc_object_inode(clob);
struct page *vmpage;
int nr;
/* got an object. Find next page. */
hdr = cl_object_header(clob);
spin_lock(&hdr->coh_page_guard);
nr = radix_tree_gang_lookup(&hdr->coh_tree,
(void **)&pg,
id.vpi_index, 1);
nr = find_get_pages_contig(inode->i_mapping,
id.vpi_index, 1, &vmpage);
if (nr > 0) {
id.vpi_index = pg->cp_index;
id.vpi_index = vmpage->index;
/* Cant support over 16T file */
nr = !(pg->cp_index > 0xffffffff);
nr = !(vmpage->index > 0xffffffff);
page_cache_release(vmpage);
}
spin_unlock(&hdr->coh_page_guard);
lu_object_ref_del(&clob->co_lu, "dump", current);
cl_object_put(env, clob);
......@@ -431,8 +427,6 @@ static int vvp_pgcache_show(struct seq_file *f, void *v)
struct ll_sb_info *sbi;
struct cl_object *clob;
struct lu_env *env;
struct cl_page *page;
struct cl_object_header *hdr;
struct vvp_pgcache_id id;
int refcheck;
int result;
......@@ -444,14 +438,23 @@ static int vvp_pgcache_show(struct seq_file *f, void *v)
sbi = f->private;
clob = vvp_pgcache_obj(env, &sbi->ll_cl->cd_lu_dev, &id);
if (clob) {
hdr = cl_object_header(clob);
spin_lock(&hdr->coh_page_guard);
page = cl_page_lookup(hdr, id.vpi_index);
spin_unlock(&hdr->coh_page_guard);
struct inode *inode = ccc_object_inode(clob);
struct cl_page *page = NULL;
struct page *vmpage;
result = find_get_pages_contig(inode->i_mapping,
id.vpi_index, 1,
&vmpage);
if (result > 0) {
lock_page(vmpage);
page = cl_vmpage_page(vmpage, clob);
unlock_page(vmpage);
page_cache_release(vmpage);
}
seq_printf(f, "%8x@"DFID": ",
id.vpi_index, PFID(&hdr->coh_lu.loh_fid));
seq_printf(f, "%8x@" DFID ": ", id.vpi_index,
PFID(lu_object_fid(&clob->co_lu)));
if (page) {
vvp_pgcache_page_show(env, f, page);
cl_page_put(env, page);
......
......@@ -763,7 +763,6 @@ static int vvp_io_fault_start(const struct lu_env *env,
vmpage = NULL;
if (result < 0) {
cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
......
......@@ -165,6 +165,18 @@ static int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
return 0;
}
static int vvp_prune(const struct lu_env *env, struct cl_object *obj)
{
struct inode *inode = ccc_object_inode(obj);
int rc;
rc = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_ALL, 1);
if (rc == 0)
truncate_inode_pages(inode->i_mapping, 0);
return rc;
}
static const struct cl_object_operations vvp_ops = {
.coo_page_init = vvp_page_init,
.coo_lock_init = vvp_lock_init,
......@@ -172,6 +184,7 @@ static const struct cl_object_operations vvp_ops = {
.coo_attr_get = vvp_attr_get,
.coo_attr_set = vvp_attr_set,
.coo_conf_set = vvp_conf_set,
.coo_prune = vvp_prune,
.coo_glimpse = ccc_object_glimpse
};
......
......@@ -138,6 +138,7 @@ static void vvp_page_discard(const struct lu_env *env,
struct page *vmpage = cl2vm_page(slice);
struct address_space *mapping;
struct ccc_page *cpg = cl2ccc_page(slice);
__u64 offset;
LASSERT(vmpage);
LASSERT(PageLocked(vmpage));
......@@ -147,6 +148,9 @@ static void vvp_page_discard(const struct lu_env *env,
if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
ll_ra_stats_inc(mapping, RA_STAT_DISCARDED);
offset = vmpage->index << PAGE_SHIFT;
ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_SIZE);
/*
* truncate_complete_page() calls
* a_ops->invalidatepage()->cl_page_delete()->vvp_page_delete().
......@@ -154,37 +158,26 @@ static void vvp_page_discard(const struct lu_env *env,
truncate_complete_page(mapping, vmpage);
}
static int vvp_page_unmap(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
{
struct page *vmpage = cl2vm_page(slice);
__u64 offset;
LASSERT(vmpage);
LASSERT(PageLocked(vmpage));
offset = vmpage->index << PAGE_CACHE_SHIFT;
/*
* XXX is it safe to call this with the page lock held?
*/
ll_teardown_mmaps(vmpage->mapping, offset, offset + PAGE_CACHE_SIZE);
return 0;
}
static void vvp_page_delete(const struct lu_env *env,
const struct cl_page_slice *slice)
{
struct page *vmpage = cl2vm_page(slice);
struct inode *inode = vmpage->mapping->host;
struct cl_object *obj = slice->cpl_obj;
struct cl_page *page = slice->cpl_page;
int refc;
LASSERT(PageLocked(vmpage));
LASSERT((struct cl_page *)vmpage->private == slice->cpl_page);
LASSERT((struct cl_page *)vmpage->private == page);
LASSERT(inode == ccc_object_inode(obj));
vvp_write_complete(cl2ccc(obj), cl2ccc_page(slice));
/* Drop the reference count held in vvp_page_init */
refc = atomic_dec_return(&page->cp_ref);
LASSERTF(refc >= 1, "page = %p, refc = %d\n", page, refc);
ClearPageUptodate(vmpage);
ClearPagePrivate(vmpage);
vmpage->private = 0;
/*
......@@ -404,7 +397,6 @@ static const struct cl_page_operations vvp_page_ops = {
.cpo_vmpage = ccc_page_vmpage,
.cpo_discard = vvp_page_discard,
.cpo_delete = vvp_page_delete,
.cpo_unmap = vvp_page_unmap,
.cpo_export = vvp_page_export,
.cpo_is_vmlocked = vvp_page_is_vmlocked,
.cpo_fini = vvp_page_fini,
......@@ -541,6 +533,8 @@ int vvp_page_init(const struct lu_env *env, struct cl_object *obj,
INIT_LIST_HEAD(&cpg->cpg_pending_linkage);
if (page->cp_type == CPT_CACHEABLE) {
/* in cache, decref in vvp_page_delete */
atomic_inc(&page->cp_ref);
SetPagePrivate(vmpage);
vmpage->private = (unsigned long)page;
cl_page_slice_add(page, &cpg->cpg_cl, obj, &vvp_page_ops);
......
......@@ -287,7 +287,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
lov_layout_wait(env, lov);
cl_object_prune(env, &lov->lo_cl);
cl_locks_prune(env, &lov->lo_cl, 0);
return 0;
}
......@@ -364,7 +364,7 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
}
}
}
cl_object_prune(env, &lov->lo_cl);
cl_locks_prune(env, &lov->lo_cl, 0);
return 0;
}
......@@ -666,7 +666,6 @@ static int lov_layout_change(const struct lu_env *unused,
const struct lov_layout_operations *old_ops;
const struct lov_layout_operations *new_ops;
struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
void *cookie;
struct lu_env *env;
int refcheck;
......@@ -691,13 +690,13 @@ static int lov_layout_change(const struct lu_env *unused,
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
cl_object_prune(env, &lov->lo_cl);
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
old_ops->llo_fini(env, lov, &lov->u);
LASSERT(atomic_read(&lov->lo_active_ios) == 0);
LASSERT(!hdr->coh_tree.rnode);
LASSERT(hdr->coh_pages == 0);
lov->lo_type = LLT_EMPTY;
result = new_ops->llo_init(env,
......
......@@ -36,6 +36,7 @@
* Implementation of cl_page for LOV layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_LOV
......@@ -179,31 +180,21 @@ int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
sub = lov_sub_get(env, lio, stripe);
if (IS_ERR(sub)) {
rc = PTR_ERR(sub);
goto out;
}
if (IS_ERR(sub))
return PTR_ERR(sub);
subobj = lovsub2cl(r0->lo_sub[stripe]);
subpage = cl_page_find_sub(sub->sub_env, subobj,
cl_index(subobj, suboff), vmpage, page);
lov_sub_put(sub);
if (IS_ERR(subpage)) {
rc = PTR_ERR(subpage);
goto out;
}
if (likely(subpage->cp_parent == page)) {
lu_ref_add(&subpage->cp_reference, "lov", page);
subpage = cl_page_alloc(sub->sub_env, subobj, cl_index(subobj, suboff),
vmpage, page->cp_type);
if (!IS_ERR(subpage)) {
subpage->cp_parent = page;
page->cp_child = subpage;
lpg->lps_invalid = 0;
rc = 0;
} else {
CL_PAGE_DEBUG(D_ERROR, env, page, "parent page\n");
CL_PAGE_DEBUG(D_ERROR, env, subpage, "child page\n");
LASSERT(0);
rc = PTR_ERR(subpage);
}
lov_sub_put(sub);
out:
return rc;
}
......
......@@ -36,6 +36,7 @@
* Client IO.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_CLASS
......
......@@ -36,6 +36,7 @@
* Client Extent Lock.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_CLASS
......@@ -1815,128 +1816,6 @@ struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env,
}
EXPORT_SYMBOL(cl_lock_at_pgoff);
/**
* Calculate the page offset at the layer of @lock.
* At the time of this writing, @page is top page and @lock is sub lock.
*/
static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
{
struct lu_device_type *dtype;
const struct cl_page_slice *slice;
dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
slice = cl_page_at(page, dtype);
return slice->cpl_page->cp_index;
}
/**
* Check if page @page is covered by an extra lock or discard it.
*/
static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, void *cbdata)
{
struct cl_thread_info *info = cl_env_info(env);
struct cl_lock *lock = cbdata;
pgoff_t index = pgoff_at_lock(page, lock);
if (index >= info->clt_fn_index) {
struct cl_lock *tmp;
/* refresh non-overlapped index */
tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
lock, 1, 0);
if (tmp) {
/* Cache the first-non-overlapped index so as to skip
* all pages within [index, clt_fn_index). This
* is safe because if tmp lock is canceled, it will
* discard these pages.
*/
info->clt_fn_index = tmp->cll_descr.cld_end + 1;
if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
info->clt_fn_index = CL_PAGE_EOF;
cl_lock_put(env, tmp);
} else if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
LASSERT(page->cp_state == CPS_FREEING);
}
}
info->clt_next_index = index + 1;
return CLP_GANG_OKAY;
}
static int discard_cb(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, void *cbdata)
{
struct cl_thread_info *info = cl_env_info(env);
struct cl_lock *lock = cbdata;
LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
!PageWriteback(cl_page_vmpage(env, page))));
KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
!PageDirty(cl_page_vmpage(env, page))));
info->clt_next_index = pgoff_at_lock(page, lock) + 1;
if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
LASSERT(page->cp_state == CPS_FREEING);
}
return CLP_GANG_OKAY;
}
/**
* Discard pages protected by the given lock. This function traverses radix
* tree to find all covering pages and discard them. If a page is being covered
* by other locks, it should remain in cache.
*
* If error happens on any step, the process continues anyway (the reasoning
* behind this being that lock cancellation cannot be delayed indefinitely).
*/
int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock)
{
struct cl_thread_info *info = cl_env_info(env);
struct cl_io *io = &info->clt_io;
struct cl_lock_descr *descr = &lock->cll_descr;
cl_page_gang_cb_t cb;
int res;
int result;
LINVRNT(cl_lock_invariant(env, lock));
io->ci_obj = cl_object_top(descr->cld_obj);
io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
goto out;
cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
info->clt_fn_index = info->clt_next_index = descr->cld_start;
do {
res = cl_page_gang_lookup(env, descr->cld_obj, io,
info->clt_next_index, descr->cld_end,
cb, (void *)lock);
if (info->clt_next_index > descr->cld_end)
break;
if (res == CLP_GANG_RESCHED)
cond_resched();
} while (res != CLP_GANG_OKAY);
out:
cl_io_fini(env, io);
return result;
}
EXPORT_SYMBOL(cl_lock_discard_pages);
/**
* Eliminate all locks for a given object.
*
......@@ -1951,12 +1830,6 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
struct cl_lock *lock;
head = cl_object_header(obj);
/*
* If locks are destroyed without cancellation, all pages must be
* already destroyed (as otherwise they will be left unprotected).
*/
LASSERT(ergo(!cancel,
!head->coh_tree.rnode && head->coh_pages == 0));
spin_lock(&head->coh_lock_guard);
while (!list_empty(&head->coh_locks)) {
......@@ -2095,8 +1968,8 @@ void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
LINVRNT(cl_lock_invariant(env, lock));
LASSERT(lock->cll_state != CLS_FREEING);
cl_lock_hold_mod(env, lock, 1);
cl_lock_get(lock);
cl_lock_hold_mod(env, lock, 1);
lu_ref_add(&lock->cll_holders, scope, source);
lu_ref_add(&lock->cll_reference, scope, source);
}
......
......@@ -36,6 +36,7 @@
* Client Lustre Object.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
/*
......@@ -43,7 +44,6 @@
*
* i_mutex
* PG_locked
* ->coh_page_guard
* ->coh_lock_guard
* ->coh_attr_guard
* ->ls_guard
......@@ -63,8 +63,6 @@
static struct kmem_cache *cl_env_kmem;
/** Lock class of cl_object_header::coh_page_guard */
static struct lock_class_key cl_page_guard_class;
/** Lock class of cl_object_header::coh_lock_guard */
static struct lock_class_key cl_lock_guard_class;
/** Lock class of cl_object_header::coh_attr_guard */
......@@ -81,15 +79,10 @@ int cl_object_header_init(struct cl_object_header *h)
result = lu_object_header_init(&h->coh_lu);
if (result == 0) {
spin_lock_init(&h->coh_page_guard);
spin_lock_init(&h->coh_lock_guard);
spin_lock_init(&h->coh_attr_guard);
lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
h->coh_pages = 0;
/* XXX hard coded GFP_* mask. */
INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
INIT_LIST_HEAD(&h->coh_locks);
h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
}
......@@ -314,6 +307,32 @@ int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
}
EXPORT_SYMBOL(cl_conf_set);
/**
* Prunes caches of pages and locks for this object.
*/
void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
{
struct lu_object_header *top;
struct cl_object *o;
int result;
top = obj->co_lu.lo_header;
result = 0;
list_for_each_entry(o, &top->loh_layers, co_lu.lo_linkage) {
if (o->co_ops->coo_prune) {
result = o->co_ops->coo_prune(env, o);
if (result != 0)
break;
}
}
/* TODO: pruning locks will be moved into layers after cl_lock
* simplification is done
*/
cl_locks_prune(env, obj, 1);
}
EXPORT_SYMBOL(cl_object_prune);
/**
* Helper function removing all object locks, and marking object for
* deletion. All object pages must have been deleted at this point.
......@@ -326,8 +345,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
struct cl_object_header *hdr;
hdr = cl_object_header(obj);
LASSERT(!hdr->coh_tree.rnode);
LASSERT(hdr->coh_pages == 0);
set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
/*
......@@ -341,16 +358,6 @@ void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
}
EXPORT_SYMBOL(cl_object_kill);
/**
* Prunes caches of pages and locks for this object.
*/
void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
{
cl_pages_prune(env, obj);
cl_locks_prune(env, obj, 1);
}
EXPORT_SYMBOL(cl_object_prune);
void cache_stats_init(struct cache_stats *cs, const char *name)
{
int i;
......
......@@ -1015,7 +1015,6 @@ static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
lu_ref_add(&page->cp_reference, "truncate", current);
if (cl_page_own(env, io, page) == 0) {
cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
......@@ -2136,8 +2135,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
cl_object_get(obj);
client_obd_list_unlock(&cli->cl_loi_list_lock);
lu_object_ref_add_at(&obj->co_lu, &link, "check",
current);
lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
/* attempt some read/write balancing by alternating between
* reads and writes in an object. The makes_rpc checks here
......@@ -2180,8 +2178,7 @@ static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
osc_object_unlock(osc);
osc_list_maint(cli, osc);
lu_object_ref_del_at(&obj->co_lu, &link, "check",
current);
lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
cl_object_put(env, obj);
client_obd_list_lock(&cli->cl_loi_list_lock);
......@@ -2994,4 +2991,204 @@ int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
return result;
}
/**
* Returns a list of pages by a given [start, end] of \a obj.
*
* \param resched If not NULL, then we give up before hogging CPU for too
* long and set *resched = 1, in that case caller should implement a retry
* logic.
*
* Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
* crucial in the face of [offset, EOF] locks.
*
* Return at least one page in @queue unless there is no covered page.
*/
int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, pgoff_t start, pgoff_t end,
osc_page_gang_cbt cb, void *cbdata)
{
struct osc_page *ops;
void **pvec;
pgoff_t idx;
unsigned int nr;
unsigned int i;
unsigned int j;
int res = CLP_GANG_OKAY;
bool tree_lock = true;
idx = start;
pvec = osc_env_info(env)->oti_pvec;
spin_lock(&osc->oo_tree_lock);
while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
idx, OTI_PVEC_SIZE)) > 0) {
struct cl_page *page;
bool end_of_region = false;
for (i = 0, j = 0; i < nr; ++i) {
ops = pvec[i];
pvec[i] = NULL;
idx = osc_index(ops);
if (idx > end) {
end_of_region = true;
break;
}
page = cl_page_top(ops->ops_cl.cpl_page);
LASSERT(page->cp_type == CPT_CACHEABLE);
if (page->cp_state == CPS_FREEING)
continue;
cl_page_get(page);
lu_ref_add_atomic(&page->cp_reference,
"gang_lookup", current);
pvec[j++] = ops;
}
++idx;
/*
* Here a delicate locking dance is performed. Current thread
* holds a reference to a page, but has to own it before it
* can be placed into queue. Owning implies waiting, so
* radix-tree lock is to be released. After a wait one has to
* check that pages weren't truncated (cl_page_own() returns
* error in the latter case).
*/
spin_unlock(&osc->oo_tree_lock);
tree_lock = false;
for (i = 0; i < j; ++i) {
ops = pvec[i];
if (res == CLP_GANG_OKAY)
res = (*cb)(env, io, ops, cbdata);
page = cl_page_top(ops->ops_cl.cpl_page);
lu_ref_del(&page->cp_reference, "gang_lookup", current);
cl_page_put(env, page);
}
if (nr < OTI_PVEC_SIZE || end_of_region)
break;
if (res == CLP_GANG_OKAY && need_resched())
res = CLP_GANG_RESCHED;
if (res != CLP_GANG_OKAY)
break;
spin_lock(&osc->oo_tree_lock);
tree_lock = true;
}
if (tree_lock)
spin_unlock(&osc->oo_tree_lock);
return res;
}
/**
* Check if page @page is covered by an extra lock or discard it.
*/
static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_lock *lock = cbdata;
pgoff_t index;
index = osc_index(ops);
if (index >= info->oti_fn_index) {
struct cl_lock *tmp;
struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
/* refresh non-overlapped index */
tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
lock, 1, 0);
if (tmp) {
/* Cache the first-non-overlapped index so as to skip
* all pages within [index, oti_fn_index). This
* is safe because if tmp lock is canceled, it will
* discard these pages.
*/
info->oti_fn_index = tmp->cll_descr.cld_end + 1;
if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
info->oti_fn_index = CL_PAGE_EOF;
cl_lock_put(env, tmp);
} else if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
LASSERT(page->cp_state == CPS_FREEING);
}
}
info->oti_next_index = index + 1;
return CLP_GANG_OKAY;
}
static int discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_lock *lock = cbdata;
struct cl_page *page = cl_page_top(ops->ops_cl.cpl_page);
LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
!PageWriteback(cl_page_vmpage(env, page))));
KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
!PageDirty(cl_page_vmpage(env, page))));
/* page is top page. */
info->oti_next_index = osc_index(ops) + 1;
if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
} else {
LASSERT(page->cp_state == CPS_FREEING);
}
return CLP_GANG_OKAY;
}
/**
* Discard pages protected by the given lock. This function traverses radix
* tree to find all covering pages and discard them. If a page is being covered
* by other locks, it should remain in cache.
*
* If error happens on any step, the process continues anyway (the reasoning
* behind this being that lock cancellation cannot be delayed indefinitely).
*/
int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_io *io = &info->oti_io;
struct cl_object *osc = ols->ols_cl.cls_obj;
struct cl_lock *lock = ols->ols_cl.cls_lock;
struct cl_lock_descr *descr = &lock->cll_descr;
osc_page_gang_cbt cb;
int res;
int result;
io->ci_obj = cl_object_top(osc);
io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
goto out;
cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
info->oti_fn_index = info->oti_next_index = descr->cld_start;
do {
res = osc_page_gang_lookup(env, io, cl2osc(osc),
info->oti_next_index, descr->cld_end,
cb, (void *)lock);
if (info->oti_next_index > descr->cld_end)
break;
if (res == CLP_GANG_RESCHED)
cond_resched();
} while (res != CLP_GANG_OKAY);
out:
cl_io_fini(env, io);
return result;
}
/** @} osc */
......@@ -111,7 +111,12 @@ struct osc_thread_info {
struct lustre_handle oti_handle;
struct cl_page_list oti_plist;
struct cl_io oti_io;
struct cl_page *oti_pvec[OTI_PVEC_SIZE];
void *oti_pvec[OTI_PVEC_SIZE];
/**
* Fields used by cl_lock_discard_pages().
*/
pgoff_t oti_next_index;
pgoff_t oti_fn_index; /* first non-overlapped index */
};
struct osc_object {
......@@ -161,6 +166,13 @@ struct osc_object {
* oo_{read|write}_pages soon.
*/
spinlock_t oo_lock;
/**
* Radix tree for caching pages
*/
struct radix_tree_root oo_tree;
spinlock_t oo_tree_lock;
unsigned long oo_npages;
};
static inline void osc_object_lock(struct osc_object *obj)
......@@ -569,6 +581,11 @@ static inline struct osc_page *oap2osc_page(struct osc_async_page *oap)
return (struct osc_page *)container_of(oap, struct osc_page, ops_oap);
}
static inline pgoff_t osc_index(struct osc_page *opg)
{
return opg->ops_cl.cpl_page->cp_index;
}
static inline struct osc_lock *cl2osc_lock(const struct cl_lock_slice *slice)
{
LINVRNT(osc_is_object(&slice->cls_obj->co_lu));
......@@ -691,6 +708,14 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
int sent, int rc);
void osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *lock);
typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
struct osc_page *, void *);
int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, pgoff_t start, pgoff_t end,
osc_page_gang_cbt cb, void *cbdata);
/** @} osc */
#endif /* OSC_CL_INTERNAL_H */
......@@ -391,18 +391,13 @@ static int osc_async_upcall(void *a, int rc)
* Checks that there are no pages being written in the extent being truncated.
*/
static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
struct cl_page *page, void *cbdata)
struct osc_page *ops, void *cbdata)
{
const struct cl_page_slice *slice;
struct osc_page *ops;
struct cl_page *page = ops->ops_cl.cpl_page;
struct osc_async_page *oap;
__u64 start = *(__u64 *)cbdata;
slice = cl_page_at(page, &osc_device_type);
LASSERT(slice);
ops = cl2osc_page(slice);
oap = &ops->ops_oap;
if (oap->oap_cmd & OBD_BRW_WRITE &&
!list_empty(&oap->oap_pending_item))
CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
......@@ -434,8 +429,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
/*
* Complain if there are pages in the truncated region.
*/
cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
trunc_check_cb, (void *)&size);
osc_page_gang_lookup(env, io, cl2osc(clob),
start + partial, CL_PAGE_EOF,
trunc_check_cb, (void *)&size);
}
static int osc_io_setattr_start(const struct lu_env *env,
......
......@@ -36,6 +36,7 @@
* Implementation of cl_lock for OSC layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_OSC
......@@ -897,11 +898,8 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
static unsigned long osc_lock_weigh(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
/*
* don't need to grab coh_page_guard since we don't care the exact #
* of pages..
*/
return cl_object_header(slice->cls_obj)->coh_pages;
/* TODO: check how many pages are covered by this lock */
return cl2osc(slice->cls_obj)->oo_npages;
}
static void osc_lock_build_einfo(const struct lu_env *env,
......@@ -1276,7 +1274,7 @@ static int osc_lock_flush(struct osc_lock *ols, int discard)
result = 0;
}
rc = cl_lock_discard_pages(env, lock);
rc = osc_lock_discard_pages(env, ols);
if (result == 0 && rc < 0)
result = rc;
......
......@@ -36,6 +36,7 @@
* Implementation of cl_object for OSC layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_OSC
......@@ -94,6 +95,7 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
atomic_set(&osc->oo_nr_reads, 0);
atomic_set(&osc->oo_nr_writes, 0);
spin_lock_init(&osc->oo_lock);
spin_lock_init(&osc->oo_tree_lock);
cl_object_page_init(lu2cl(obj), sizeof(struct osc_page));
......
......@@ -36,6 +36,7 @@
* Implementation of cl_page for OSC layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
* Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_OSC
......@@ -326,6 +327,18 @@ static void osc_page_delete(const struct lu_env *env,
spin_unlock(&obj->oo_seatbelt);
osc_lru_del(osc_cli(obj), opg);
if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
void *value;
spin_lock(&obj->oo_tree_lock);
value = radix_tree_delete(&obj->oo_tree, osc_index(opg));
if (value)
--obj->oo_npages;
spin_unlock(&obj->oo_tree_lock);
LASSERT(ergo(value, value == opg));
}
}
static void osc_page_clip(const struct lu_env *env,
......@@ -422,8 +435,18 @@ int osc_page_init(const struct lu_env *env, struct cl_object *obj,
INIT_LIST_HEAD(&opg->ops_lru);
/* reserve an LRU space for this page */
if (page->cp_type == CPT_CACHEABLE && result == 0)
if (page->cp_type == CPT_CACHEABLE && result == 0) {
result = osc_lru_reserve(env, osc, opg);
if (result == 0) {
spin_lock(&osc->oo_tree_lock);
result = radix_tree_insert(&osc->oo_tree,
page->cp_index, opg);
if (result == 0)
++osc->oo_npages;
spin_unlock(&osc->oo_tree_lock);
LASSERT(result == 0);
}
}
return result;
}
......@@ -611,7 +634,6 @@ static void discard_pagevec(const struct lu_env *env, struct cl_io *io,
struct cl_page *page = pvec[i];
LASSERT(cl_page_is_owned(page, io));
cl_page_unmap(env, io, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
cl_page_put(env, page);
......@@ -652,7 +674,7 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
atomic_inc(&cli->cl_lru_shrinkers);
}
pvec = osc_env_info(env)->oti_pvec;
pvec = (struct cl_page **)osc_env_info(env)->oti_pvec;
io = &osc_env_info(env)->oti_io;
client_obd_list_lock(&cli->cl_lru_list_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment