Commit 06563b56 authored by Jinshan Xiong's avatar Jinshan Xiong Committed by Greg Kroah-Hartman

staging/lustre/clio: cl_lock simplification

In this patch, the cl_lock cache is eliminated. cl_lock is turned
into a cacheless data container for the requirements of locks to
complete the IO. cl_lock is created before I/O starts and destroyed
when the I/O is complete.

cl_lock depends on LDLM lock to fulfill lock semantics. LDLM lock
is attached to cl_lock at OSC layer. LDLM lock is still cacheable.

Two major methods are supported for cl_lock: clo_enqueue and
clo_cancel.  A cl_lock is enqueued by cl_lock_request(), which will
call clo_enqueue() methods for each layer to enqueue the lock.
At the LOV layer, if a cl_lock consists of multiple sub cl_locks,
each sub locks will be enqueued correspondingly. At OSC layer, the
lock enqueue request will tend to reuse cached LDLM lock; otherwise
a new LDLM lock will have to be requested from OST side.

cl_lock_cancel() must be called to release a cl_lock after use.
clo_cancel() method will be called for each layer to release the
resource held by this lock. At OSC layer, the reference count of LDLM
lock, which is held at clo_enqueue time, is released.

LDLM lock can only be canceled if there is no cl_lock using it.
Signed-off-by: default avatarBobi Jam <bobijam.xu@intel.com>
Signed-off-by: default avatarJinshan Xiong <jinshan.xiong@intel.com>
Reviewed-on: http://review.whamcloud.com/10858
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3259Reviewed-by: default avatarJohn L. Hammond <john.hammond@intel.com>
Signed-off-by: default avatarOleg Drokin <green@linuxhacker.ru>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent e5c4e635
......@@ -98,10 +98,6 @@ struct ccc_io {
int cui_to;
} write;
} u;
/**
* True iff io is processing glimpse right now.
*/
int cui_glimpse;
/**
* Layout version when this IO is initialized
*/
......@@ -123,6 +119,7 @@ extern struct lu_context_key ccc_key;
extern struct lu_context_key ccc_session_key;
struct ccc_thread_info {
struct cl_lock cti_lock;
struct cl_lock_descr cti_descr;
struct cl_io cti_io;
struct cl_attr cti_attr;
......@@ -137,6 +134,14 @@ static inline struct ccc_thread_info *ccc_env_info(const struct lu_env *env)
return info;
}
static inline struct cl_lock *ccc_env_lock(const struct lu_env *env)
{
struct cl_lock *lock = &ccc_env_info(env)->cti_lock;
memset(lock, 0, sizeof(*lock));
return lock;
}
static inline struct cl_attr *ccc_env_thread_attr(const struct lu_env *env)
{
struct cl_attr *attr = &ccc_env_info(env)->cti_attr;
......@@ -308,18 +313,7 @@ void ccc_lock_delete(const struct lu_env *env,
void ccc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice);
int ccc_lock_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
struct cl_io *io, __u32 enqflags);
int ccc_lock_use(const struct lu_env *env, const struct cl_lock_slice *slice);
int ccc_lock_unuse(const struct lu_env *env, const struct cl_lock_slice *slice);
int ccc_lock_wait(const struct lu_env *env, const struct cl_lock_slice *slice);
int ccc_lock_fits_into(const struct lu_env *env,
const struct cl_lock_slice *slice,
const struct cl_lock_descr *need,
const struct cl_io *io);
void ccc_lock_state(const struct lu_env *env,
const struct cl_lock_slice *slice,
enum cl_lock_state state);
struct cl_io *io, struct cl_sync_io *anchor);
int ccc_io_one_lock_index(const struct lu_env *env, struct cl_io *io,
__u32 enqflags, enum cl_lock_mode mode,
pgoff_t start, pgoff_t end);
......
......@@ -2582,6 +2582,8 @@ struct ldlm_extent {
__u64 gid;
};
#define LDLM_GID_ANY ((__u64)-1)
static inline int ldlm_extent_overlap(struct ldlm_extent *ex1,
struct ldlm_extent *ex2)
{
......
......@@ -71,6 +71,7 @@ struct obd_device;
*/
enum ldlm_error {
ELDLM_OK = 0,
ELDLM_LOCK_MATCHED = 1,
ELDLM_LOCK_CHANGED = 300,
ELDLM_LOCK_ABORTED = 301,
......
......@@ -748,6 +748,7 @@ int ldlm_error2errno(enum ldlm_error error)
switch (error) {
case ELDLM_OK:
case ELDLM_LOCK_MATCHED:
result = 0;
break;
case ELDLM_LOCK_CHANGED:
......
......@@ -657,7 +657,7 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode)
struct ldlm_lock *lock;
lock = ldlm_handle2lock(lockh);
LASSERT(lock);
LASSERTF(lock, "Non-existing lock: %llx\n", lockh->cookie);
ldlm_lock_addref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
......@@ -1092,6 +1092,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
if (unlikely(match == LCK_GROUP) &&
lock->l_resource->lr_type == LDLM_EXTENT &&
policy->l_extent.gid != LDLM_GID_ANY &&
lock->l_policy_data.l_extent.gid != policy->l_extent.gid)
continue;
......
......@@ -347,7 +347,6 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
struct ldlm_lock *lock;
struct ldlm_reply *reply;
int cleanup_phase = 1;
int size = 0;
lock = ldlm_handle2lock(lockh);
/* ldlm_cli_enqueue is holding a reference on this lock. */
......@@ -375,8 +374,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
goto cleanup;
}
if (lvb_len != 0) {
LASSERT(lvb);
if (lvb_len > 0) {
int size = 0;
size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER);
......@@ -390,12 +389,13 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
rc = -EINVAL;
goto cleanup;
}
lvb_len = size;
}
if (rc == ELDLM_LOCK_ABORTED) {
if (lvb_len != 0)
if (lvb_len > 0 && lvb)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
lvb, size);
lvb, lvb_len);
if (rc == 0)
rc = ELDLM_LOCK_ABORTED;
goto cleanup;
......@@ -489,7 +489,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
/* If the lock has already been granted by a completion AST, don't
* clobber the LVB with an older one.
*/
if (lvb_len != 0) {
if (lvb_len > 0) {
/* We must lock or a racing completion might update lvb without
* letting us know and we'll clobber the correct value.
* Cannot unlock after the check either, as that still leaves
......@@ -498,7 +498,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
lock_res_and_lock(lock);
if (lock->l_req_mode != lock->l_granted_mode)
rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
lock->l_lvb_data, size);
lock->l_lvb_data, lvb_len);
unlock_res_and_lock(lock);
if (rc < 0) {
cleanup_phase = 1;
......@@ -518,7 +518,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
}
}
if (lvb_len && lvb) {
if (lvb_len > 0 && lvb) {
/* Copy the LVB here, and not earlier, because the completion
* AST (if any) can override what we got in the reply
*/
......
......@@ -1400,3 +1400,4 @@ void ldlm_resource_dump(int level, struct ldlm_resource *res)
LDLM_DEBUG_LIMIT(level, lock, "###");
}
}
EXPORT_SYMBOL(ldlm_resource_dump);
......@@ -86,17 +86,17 @@ blkcnt_t dirty_cnt(struct inode *inode)
int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
struct inode *inode, struct cl_object *clob, int agl)
{
struct cl_lock_descr *descr = &ccc_env_info(env)->cti_descr;
struct ll_inode_info *lli = ll_i2info(inode);
const struct lu_fid *fid = lu_object_fid(&clob->co_lu);
struct ccc_io *cio = ccc_env_io(env);
struct cl_lock *lock;
int result;
result = 0;
if (!(lli->lli_flags & LLIF_MDS_SIZE_LOCK)) {
CDEBUG(D_DLMTRACE, "Glimpsing inode " DFID "\n", PFID(fid));
if (lli->lli_has_smd) {
struct cl_lock *lock = ccc_env_lock(env);
struct cl_lock_descr *descr = &lock->cll_descr;
/* NOTE: this looks like DLM lock request, but it may
* not be one. Due to CEF_ASYNC flag (translated
* to LDLM_FL_HAS_INTENT by osc), this is
......@@ -113,11 +113,10 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
*/
*descr = whole_file;
descr->cld_obj = clob;
descr->cld_mode = CLM_PHANTOM;
descr->cld_mode = CLM_READ;
descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
if (agl)
descr->cld_enq_flags |= CEF_AGL;
cio->cui_glimpse = 1;
/*
* CEF_ASYNC is used because glimpse sub-locks cannot
* deadlock (because they never conflict with other
......@@ -126,19 +125,11 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
* CEF_MUST protects glimpse lock from conversion into
* a lockless mode.
*/
lock = cl_lock_request(env, io, descr, "glimpse",
current);
cio->cui_glimpse = 0;
if (!lock)
return 0;
if (IS_ERR(lock))
return PTR_ERR(lock);
result = cl_lock_request(env, io, lock);
if (result < 0)
return result;
LASSERT(agl == 0);
result = cl_wait(env, lock);
if (result == 0) {
if (!agl) {
ll_merge_attr(env, inode);
if (i_size_read(inode) > 0 &&
inode->i_blocks == 0) {
......@@ -150,9 +141,8 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
*/
inode->i_blocks = dirty_cnt(inode);
}
cl_unuse(env, lock);
}
cl_lock_release(env, lock, "glimpse", current);
cl_lock_release(env, lock);
} else {
CDEBUG(D_DLMTRACE, "No objects for inode\n");
ll_merge_attr(env, inode);
......@@ -233,10 +223,7 @@ int cl_local_size(struct inode *inode)
{
struct lu_env *env = NULL;
struct cl_io *io = NULL;
struct ccc_thread_info *cti;
struct cl_object *clob;
struct cl_lock_descr *descr;
struct cl_lock *lock;
int result;
int refcheck;
......@@ -252,19 +239,15 @@ int cl_local_size(struct inode *inode)
if (result > 0) {
result = io->ci_result;
} else if (result == 0) {
cti = ccc_env_info(env);
descr = &cti->cti_descr;
struct cl_lock *lock = ccc_env_lock(env);
*descr = whole_file;
descr->cld_obj = clob;
lock = cl_lock_peek(env, io, descr, "localsize", current);
if (lock) {
lock->cll_descr = whole_file;
lock->cll_descr.cld_enq_flags = CEF_PEEK;
lock->cll_descr.cld_obj = clob;
result = cl_lock_request(env, io, lock);
if (result == 0) {
ll_merge_attr(env, inode);
cl_unuse(env, lock);
cl_lock_release(env, lock, "localsize", current);
result = 0;
} else {
result = -ENODATA;
cl_lock_release(env, lock);
}
}
cl_io_fini(env, io);
......
......@@ -475,12 +475,6 @@ int ccc_transient_page_prep(const struct lu_env *env,
*
*/
void ccc_lock_delete(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
}
void ccc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice)
{
struct ccc_lock *clk = cl2ccc_lock(slice);
......@@ -490,111 +484,12 @@ void ccc_lock_fini(const struct lu_env *env, struct cl_lock_slice *slice)
int ccc_lock_enqueue(const struct lu_env *env,
const struct cl_lock_slice *slice,
struct cl_io *unused, __u32 enqflags)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
return 0;
}
int ccc_lock_use(const struct lu_env *env, const struct cl_lock_slice *slice)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
return 0;
}
int ccc_lock_unuse(const struct lu_env *env, const struct cl_lock_slice *slice)
struct cl_io *unused, struct cl_sync_io *anchor)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
return 0;
}
int ccc_lock_wait(const struct lu_env *env, const struct cl_lock_slice *slice)
{
CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
return 0;
}
/**
* Implementation of cl_lock_operations::clo_fits_into() methods for ccc
* layer. This function is executed every time io finds an existing lock in
* the lock cache while creating new lock. This function has to decide whether
* cached lock "fits" into io.
*
* \param slice lock to be checked
* \param io IO that wants a lock.
*
* \see lov_lock_fits_into().
*/
int ccc_lock_fits_into(const struct lu_env *env,
const struct cl_lock_slice *slice,
const struct cl_lock_descr *need,
const struct cl_io *io)
{
const struct cl_lock *lock = slice->cls_lock;
const struct cl_lock_descr *descr = &lock->cll_descr;
const struct ccc_io *cio = ccc_env_io(env);
int result;
/*
* Work around DLM peculiarity: it assumes that glimpse
* (LDLM_FL_HAS_INTENT) lock is always LCK_PR, and returns reads lock
* when asked for LCK_PW lock with LDLM_FL_HAS_INTENT flag set. Make
* sure that glimpse doesn't get CLM_WRITE top-lock, so that it
* doesn't enqueue CLM_WRITE sub-locks.
*/
if (cio->cui_glimpse)
result = descr->cld_mode != CLM_WRITE;
/*
* Also, don't match incomplete write locks for read, otherwise read
* would enqueue missing sub-locks in the write mode.
*/
else if (need->cld_mode != descr->cld_mode)
result = lock->cll_state >= CLS_ENQUEUED;
else
result = 1;
return result;
}
/**
* Implements cl_lock_operations::clo_state() method for ccc layer, invoked
* whenever lock state changes. Transfers object attributes, that might be
* updated as a result of lock acquiring into inode.
*/
void ccc_lock_state(const struct lu_env *env,
const struct cl_lock_slice *slice,
enum cl_lock_state state)
{
struct cl_lock *lock = slice->cls_lock;
/*
* Refresh inode attributes when the lock is moving into CLS_HELD
* state, and only when this is a result of real enqueue, rather than
* of finding lock in the cache.
*/
if (state == CLS_HELD && lock->cll_state < CLS_HELD) {
struct cl_object *obj;
struct inode *inode;
obj = slice->cls_obj;
inode = ccc_object_inode(obj);
/* vmtruncate() sets the i_size
* under both a DLM lock and the
* ll_inode_size_lock(). If we don't get the
* ll_inode_size_lock() here we can match the DLM lock and
* reset i_size. generic_file_write can then trust the
* stale i_size when doing appending writes and effectively
* cancel the result of the truncate. Getting the
* ll_inode_size_lock() after the enqueue maintains the DLM
* -> ll_inode_size_lock() acquiring order.
*/
if (lock->cll_descr.cld_start == 0 &&
lock->cll_descr.cld_end == CL_PAGE_EOF)
ll_merge_attr(env, inode);
}
}
/*****************************************************************************
*
* io operations.
......
......@@ -145,7 +145,7 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
io->ci_ignore_layout = 1;
rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (rc) {
if (rc != 0) {
cl_io_fini(env, io);
cl_env_put(env, &refcheck);
/* Does not make sense to take GL for released layout */
......@@ -154,7 +154,8 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
return rc;
}
descr = &ccc_env_info(env)->cti_descr;
lock = ccc_env_lock(env);
descr = &lock->cll_descr;
descr->cld_obj = obj;
descr->cld_start = 0;
descr->cld_end = CL_PAGE_EOF;
......@@ -164,11 +165,11 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0);
descr->cld_enq_flags = enqflags;
lock = cl_lock_request(env, io, descr, GROUPLOCK_SCOPE, current);
if (IS_ERR(lock)) {
rc = cl_lock_request(env, io, lock);
if (rc < 0) {
cl_io_fini(env, io);
cl_env_put(env, &refcheck);
return PTR_ERR(lock);
return rc;
}
cg->cg_env = cl_env_get(&refcheck);
......@@ -194,8 +195,7 @@ void cl_put_grouplock(struct ccc_grouplock *cg)
cl_env_implant(env, &refcheck);
cl_env_put(env, &refcheck);
cl_unuse(env, lock);
cl_lock_release(env, lock, GROUPLOCK_SCOPE, current);
cl_lock_release(env, lock);
cl_io_fini(env, io);
cl_env_put(env, NULL);
}
......@@ -150,8 +150,7 @@ static int ll_releasepage(struct page *vmpage, gfp_t gfp_mask)
* If this page holds the last refc of cl_object, the following
* call path may cause reschedule:
* cl_page_put -> cl_page_free -> cl_object_put ->
* lu_object_put -> lu_object_free -> lov_delete_raid0 ->
* cl_locks_prune.
* lu_object_put -> lu_object_free -> lov_delete_raid0.
*
* However, the kernel can't get rid of this inode until all pages have
* been cleaned up. Now that we hold page lock here, it's pretty safe
......
......@@ -233,7 +233,7 @@ static int vvp_mmap_locks(const struct lu_env *env,
ldlm_policy_data_t policy;
unsigned long addr;
ssize_t count;
int result;
int result = 0;
struct iov_iter i;
struct iovec iov;
......@@ -265,10 +265,10 @@ static int vvp_mmap_locks(const struct lu_env *env,
if (ll_file_nolock(vma->vm_file)) {
/*
* For no lock case, a lockless lock will be
* generated.
* For no lock case is not allowed for mmap
*/
flags = CEF_NEVER;
result = -EINVAL;
break;
}
/*
......@@ -290,10 +290,8 @@ static int vvp_mmap_locks(const struct lu_env *env,
descr->cld_mode, descr->cld_start,
descr->cld_end);
if (result < 0) {
up_read(&mm->mmap_sem);
return result;
}
if (result < 0)
break;
if (vma->vm_end - addr >= count)
break;
......@@ -302,8 +300,10 @@ static int vvp_mmap_locks(const struct lu_env *env,
addr = vma->vm_end;
}
up_read(&mm->mmap_sem);
if (result < 0)
break;
}
return 0;
return result;
}
static int vvp_io_rw_lock(const struct lu_env *env, struct cl_io *io,
......@@ -781,6 +781,7 @@ static int vvp_io_write_start(const struct lu_env *env,
* PARALLEL IO This has to be changed for parallel IO doing
* out-of-order writes.
*/
ll_merge_attr(env, inode);
pos = io->u.ci_wr.wr.crw_pos = i_size_read(inode);
cio->cui_iocb->ki_pos = pos;
} else {
......
......@@ -51,32 +51,9 @@
*
*/
/**
* Estimates lock value for the purpose of managing the lock cache during
* memory shortages.
*
* Locks for memory mapped files are almost infinitely precious, others are
* junk. "Mapped locks" are heavy, but not infinitely heavy, so that they are
* ordered within themselves by weights assigned from other layers.
*/
static unsigned long vvp_lock_weigh(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
struct ccc_object *cob = cl2ccc(slice->cls_obj);
return atomic_read(&cob->cob_mmap_cnt) > 0 ? ~0UL >> 2 : 0;
}
static const struct cl_lock_operations vvp_lock_ops = {
.clo_delete = ccc_lock_delete,
.clo_fini = ccc_lock_fini,
.clo_enqueue = ccc_lock_enqueue,
.clo_wait = ccc_lock_wait,
.clo_use = ccc_lock_use,
.clo_unuse = ccc_lock_unuse,
.clo_fits_into = ccc_lock_fits_into,
.clo_state = ccc_lock_state,
.clo_weigh = vvp_lock_weigh
.clo_enqueue = ccc_lock_enqueue
};
int vvp_lock_init(const struct lu_env *env, struct cl_object *obj,
......
......@@ -170,11 +170,15 @@ static int vvp_prune(const struct lu_env *env, struct cl_object *obj)
struct inode *inode = ccc_object_inode(obj);
int rc;
rc = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_ALL, 1);
if (rc == 0)
truncate_inode_pages(inode->i_mapping, 0);
rc = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, CL_FSYNC_LOCAL, 1);
if (rc < 0) {
CDEBUG(D_VFSTRACE, DFID ": writeback failed: %d\n",
PFID(lu_object_fid(&obj->co_lu)), rc);
return rc;
}
truncate_inode_pages(inode->i_mapping, 0);
return 0;
}
static const struct cl_object_operations vvp_ops = {
......
......@@ -280,25 +280,18 @@ struct lov_object {
struct task_struct *lo_owner;
};
/**
* Flags that top-lock can set on each of its sub-locks.
*/
enum lov_sub_flags {
/** Top-lock acquired a hold (cl_lock_hold()) on a sub-lock. */
LSF_HELD = 1 << 0
};
/**
* State lov_lock keeps for each sub-lock.
*/
struct lov_lock_sub {
/** sub-lock itself */
struct lovsub_lock *sub_lock;
/** An array of per-sub-lock flags, taken from enum lov_sub_flags */
unsigned sub_flags;
struct cl_lock sub_lock;
/** Set if the sublock has ever been enqueued, meaning it may
* hold resources of underlying layers
*/
unsigned int sub_is_enqueued:1,
sub_initialized:1;
int sub_stripe;
struct cl_lock_descr sub_descr;
struct cl_lock_descr sub_got;
};
/**
......@@ -308,59 +301,8 @@ struct lov_lock {
struct cl_lock_slice lls_cl;
/** Number of sub-locks in this lock */
int lls_nr;
/**
* Number of existing sub-locks.
*/
unsigned lls_nr_filled;
/**
* Set when sub-lock was canceled, while top-lock was being
* used, or unused.
*/
unsigned int lls_cancel_race:1;
/**
* An array of sub-locks
*
* There are two issues with managing sub-locks:
*
* - sub-locks are concurrently canceled, and
*
* - sub-locks are shared with other top-locks.
*
* To manage cancellation, top-lock acquires a hold on a sublock
* (lov_sublock_adopt()) when the latter is inserted into
* lov_lock::lls_sub[]. This hold is released (lov_sublock_release())
* when top-lock is going into CLS_CACHED state or destroyed. Hold
* prevents sub-lock from cancellation.
*
* Sub-lock sharing means, among other things, that top-lock that is
* in the process of creation (i.e., not yet inserted into lock list)
* is already accessible to other threads once at least one of its
* sub-locks is created, see lov_lock_sub_init().
*
* Sub-lock can be in one of the following states:
*
* - doesn't exist, lov_lock::lls_sub[]::sub_lock == NULL. Such
* sub-lock was either never created (top-lock is in CLS_NEW
* state), or it was created, then canceled, then destroyed
* (lov_lock_unlink() cleared sub-lock pointer in the top-lock).
*
* - sub-lock exists and is on
* hold. (lov_lock::lls_sub[]::sub_flags & LSF_HELD). This is a
* normal state of a sub-lock in CLS_HELD and CLS_CACHED states
* of a top-lock.
*
* - sub-lock exists, but is not held by the top-lock. This
* happens after top-lock released a hold on sub-locks before
* going into cache (lov_lock_unuse()).
*
* \todo To support wide-striping, array has to be replaced with a set
* of queues to avoid scanning.
*/
struct lov_lock_sub *lls_sub;
/**
* Original description with which lock was enqueued.
*/
struct cl_lock_descr lls_orig;
/** sublock array */
struct lov_lock_sub lls_sub[0];
};
struct lov_page {
......@@ -445,7 +387,6 @@ struct lov_thread_info {
struct ost_lvb lti_lvb;
struct cl_2queue lti_cl2q;
struct cl_page_list lti_plist;
struct cl_lock_closure lti_closure;
wait_queue_t lti_waiter;
struct cl_attr lti_attr;
};
......
......@@ -143,9 +143,7 @@ static void *lov_key_init(const struct lu_context *ctx,
struct lov_thread_info *info;
info = kmem_cache_zalloc(lov_thread_kmem, GFP_NOFS);
if (info)
INIT_LIST_HEAD(&info->lti_closure.clc_list);
else
if (!info)
info = ERR_PTR(-ENOMEM);
return info;
}
......@@ -155,7 +153,6 @@ static void lov_key_fini(const struct lu_context *ctx,
{
struct lov_thread_info *info = data;
LINVRNT(list_empty(&info->lti_closure.clc_list));
kmem_cache_free(lov_thread_kmem, info);
}
......
......@@ -310,8 +310,6 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
lov_layout_wait(env, lov);
cl_locks_prune(env, &lov->lo_cl, 0);
return 0;
}
......@@ -379,7 +377,7 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
struct lovsub_object *los = r0->lo_sub[i];
if (los) {
cl_locks_prune(env, &los->lso_cl, 1);
cl_object_prune(env, &los->lso_cl);
/*
* If top-level object is to be evicted from
* the cache, so are its sub-objects.
......@@ -388,7 +386,6 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
}
}
}
cl_locks_prune(env, &lov->lo_cl, 0);
return 0;
}
......@@ -714,7 +711,9 @@ static int lov_layout_change(const struct lu_env *unused,
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
cl_object_prune(env, &lov->lo_cl);
result = cl_object_prune(env, &lov->lo_cl);
if (result != 0)
goto out;
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
......@@ -736,6 +735,7 @@ static int lov_layout_change(const struct lu_env *unused,
}
}
out:
cl_env_put(env, &refcheck);
cl_env_reexit(cookie);
return result;
......@@ -816,7 +816,8 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
goto out;
}
lov->lo_layout_invalid = lov_layout_change(env, lov, conf);
result = lov_layout_change(env, lov, conf);
lov->lo_layout_invalid = result != 0;
out:
lov_conf_unlock(lov);
......
......@@ -160,7 +160,6 @@ static int cl_io_init0(const struct lu_env *env, struct cl_io *io,
io->ci_type = iot;
INIT_LIST_HEAD(&io->ci_lockset.cls_todo);
INIT_LIST_HEAD(&io->ci_lockset.cls_curr);
INIT_LIST_HEAD(&io->ci_lockset.cls_done);
INIT_LIST_HEAD(&io->ci_layers);
......@@ -242,37 +241,7 @@ static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
return lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
lu_object_fid(&d1->cld_obj->co_lu)) ?:
__diff_normalize(d0->cld_start, d1->cld_start);
}
static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
int ret;
ret = lu_fid_cmp(lu_object_fid(&d0->cld_obj->co_lu),
lu_object_fid(&d1->cld_obj->co_lu));
if (ret)
return ret;
if (d0->cld_end < d1->cld_start)
return -1;
if (d0->cld_start > d0->cld_end)
return 1;
return 0;
}
static void cl_lock_descr_merge(struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
d0->cld_start = min(d0->cld_start, d1->cld_start);
d0->cld_end = max(d0->cld_end, d1->cld_end);
if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
d0->cld_mode = CLM_WRITE;
if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
d0->cld_mode = CLM_GROUP;
}
/*
......@@ -321,33 +290,35 @@ static void cl_io_locks_sort(struct cl_io *io)
} while (!done);
}
/**
* Check whether \a queue contains locks matching \a need.
*
* \retval +ve there is a matching lock in the \a queue
* \retval 0 there are no matching locks in the \a queue
*/
int cl_queue_match(const struct list_head *queue,
const struct cl_lock_descr *need)
static void cl_lock_descr_merge(struct cl_lock_descr *d0,
const struct cl_lock_descr *d1)
{
struct cl_io_lock_link *scan;
d0->cld_start = min(d0->cld_start, d1->cld_start);
d0->cld_end = max(d0->cld_end, d1->cld_end);
list_for_each_entry(scan, queue, cill_linkage) {
if (cl_lock_descr_match(&scan->cill_descr, need))
return 1;
}
return 0;
if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
d0->cld_mode = CLM_WRITE;
if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
d0->cld_mode = CLM_GROUP;
}
EXPORT_SYMBOL(cl_queue_match);
static int cl_queue_merge(const struct list_head *queue,
static int cl_lockset_merge(const struct cl_lockset *set,
const struct cl_lock_descr *need)
{
struct cl_io_lock_link *scan;
list_for_each_entry(scan, queue, cill_linkage) {
if (cl_lock_descr_cmp(&scan->cill_descr, need))
list_for_each_entry(scan, &set->cls_todo, cill_linkage) {
if (!cl_object_same(scan->cill_descr.cld_obj, need->cld_obj))
continue;
/* Merge locks for the same object because ldlm lock server
* may expand the lock extent, otherwise there is a deadlock
* case if two conflicted locks are queueud for the same object
* and lock server expands one lock to overlap the another.
* The side effect is that it can generate a multi-stripe lock
* that may cause casacading problem
*/
cl_lock_descr_merge(&scan->cill_descr, need);
CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
......@@ -357,87 +328,20 @@ static int cl_queue_merge(const struct list_head *queue,
return 0;
}
static int cl_lockset_match(const struct cl_lockset *set,
const struct cl_lock_descr *need)
{
return cl_queue_match(&set->cls_curr, need) ||
cl_queue_match(&set->cls_done, need);
}
static int cl_lockset_merge(const struct cl_lockset *set,
const struct cl_lock_descr *need)
{
return cl_queue_merge(&set->cls_todo, need) ||
cl_lockset_match(set, need);
}
static int cl_lockset_lock_one(const struct lu_env *env,
struct cl_io *io, struct cl_lockset *set,
struct cl_io_lock_link *link)
{
struct cl_lock *lock;
int result;
lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
if (!IS_ERR(lock)) {
link->cill_lock = lock;
list_move(&link->cill_linkage, &set->cls_curr);
if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
result = cl_wait(env, lock);
if (result == 0)
list_move(&link->cill_linkage, &set->cls_done);
} else
result = 0;
} else
result = PTR_ERR(lock);
return result;
}
static void cl_lock_link_fini(const struct lu_env *env, struct cl_io *io,
struct cl_io_lock_link *link)
{
struct cl_lock *lock = link->cill_lock;
list_del_init(&link->cill_linkage);
if (lock) {
cl_lock_release(env, lock, "io", io);
link->cill_lock = NULL;
}
if (link->cill_fini)
link->cill_fini(env, link);
}
static int cl_lockset_lock(const struct lu_env *env, struct cl_io *io,
struct cl_lockset *set)
{
struct cl_io_lock_link *link;
struct cl_io_lock_link *temp;
struct cl_lock *lock;
int result;
result = 0;
list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
if (!cl_lockset_match(set, &link->cill_descr)) {
/* XXX some locking to guarantee that locks aren't
* expanded in between.
*/
result = cl_lockset_lock_one(env, io, set, link);
if (result != 0)
result = cl_lock_request(env, io, &link->cill_lock);
if (result < 0)
break;
} else
cl_lock_link_fini(env, io, link);
}
if (result == 0) {
list_for_each_entry_safe(link, temp,
&set->cls_curr, cill_linkage) {
lock = link->cill_lock;
result = cl_wait(env, lock);
if (result == 0)
list_move(&link->cill_linkage, &set->cls_done);
else
break;
}
}
return result;
}
......@@ -493,16 +397,19 @@ void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
set = &io->ci_lockset;
list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage)
cl_lock_link_fini(env, io, link);
list_for_each_entry_safe(link, temp, &set->cls_curr, cill_linkage)
cl_lock_link_fini(env, io, link);
list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
list_del_init(&link->cill_linkage);
if (link->cill_fini)
link->cill_fini(env, link);
}
list_for_each_entry_safe(link, temp, &set->cls_done, cill_linkage) {
cl_unuse(env, link->cill_lock);
cl_lock_link_fini(env, io, link);
list_del_init(&link->cill_linkage);
cl_lock_release(env, &link->cill_lock);
if (link->cill_fini)
link->cill_fini(env, link);
}
cl_io_for_each_reverse(scan, io) {
if (scan->cis_iop->op[io->ci_type].cio_unlock)
scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
......@@ -1435,6 +1342,7 @@ EXPORT_SYMBOL(cl_sync_io_end);
void cl_sync_io_init(struct cl_sync_io *anchor, int nr,
void (*end)(const struct lu_env *, struct cl_sync_io *))
{
memset(anchor, 0, sizeof(*anchor));
init_waitqueue_head(&anchor->csi_waitq);
atomic_set(&anchor->csi_sync_nr, nr);
atomic_set(&anchor->csi_barrier, nr > 0);
......
......@@ -44,7 +44,6 @@
*
* i_mutex
* PG_locked
* ->coh_lock_guard
* ->coh_attr_guard
* ->ls_guard
*/
......@@ -63,8 +62,6 @@
static struct kmem_cache *cl_env_kmem;
/** Lock class of cl_object_header::coh_lock_guard */
static struct lock_class_key cl_lock_guard_class;
/** Lock class of cl_object_header::coh_attr_guard */
static struct lock_class_key cl_attr_guard_class;
......@@ -79,11 +76,8 @@ int cl_object_header_init(struct cl_object_header *h)
result = lu_object_header_init(&h->coh_lu);
if (result == 0) {
spin_lock_init(&h->coh_lock_guard);
spin_lock_init(&h->coh_attr_guard);
lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
INIT_LIST_HEAD(&h->coh_locks);
h->coh_page_bufsize = 0;
}
return result;
......@@ -310,7 +304,7 @@ EXPORT_SYMBOL(cl_conf_set);
/**
* Prunes caches of pages and locks for this object.
*/
void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
int cl_object_prune(const struct lu_env *env, struct cl_object *obj)
{
struct lu_object_header *top;
struct cl_object *o;
......@@ -326,10 +320,7 @@ void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
}
}
/* TODO: pruning locks will be moved into layers after cl_lock
* simplification is done
*/
cl_locks_prune(env, obj, 1);
return result;
}
EXPORT_SYMBOL(cl_object_prune);
......@@ -342,19 +333,9 @@ EXPORT_SYMBOL(cl_object_prune);
*/
void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
{
struct cl_object_header *hdr;
hdr = cl_object_header(obj);
struct cl_object_header *hdr = cl_object_header(obj);
set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
/*
* Destroy all locks. Object destruction (including cl_inode_fini())
* cannot cancel the locks, because in the case of a local client,
* where client and server share the same thread running
* prune_icache(), this can dead-lock with ldlm_cancel_handler()
* waiting on __wait_on_freeing_inode().
*/
cl_locks_prune(env, obj, 0);
}
EXPORT_SYMBOL(cl_object_kill);
......@@ -406,11 +387,8 @@ int cl_site_init(struct cl_site *s, struct cl_device *d)
result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
if (result == 0) {
cache_stats_init(&s->cs_pages, "pages");
cache_stats_init(&s->cs_locks, "locks");
for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
atomic_set(&s->cs_pages_state[0], 0);
for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
atomic_set(&s->cs_locks_state[i], 0);
cl_env_percpu_refill();
}
return result;
......@@ -445,15 +423,6 @@ int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
[CPS_PAGEIN] = "r",
[CPS_FREEING] = "f"
};
static const char *lstate[] = {
[CLS_NEW] = "n",
[CLS_QUEUING] = "q",
[CLS_ENQUEUED] = "e",
[CLS_HELD] = "h",
[CLS_INTRANSIT] = "t",
[CLS_CACHED] = "c",
[CLS_FREEING] = "f"
};
/*
lookup hit total busy create
pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
......@@ -467,12 +436,6 @@ locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
seq_printf(m, "%s: %u ", pstate[i],
atomic_read(&site->cs_pages_state[i]));
seq_printf(m, "]\n");
cache_stats_print(&site->cs_locks, m, 0);
seq_printf(m, " [");
for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
seq_printf(m, "%s: %u ", lstate[i],
atomic_read(&site->cs_locks_state[i]));
seq_printf(m, "]\n");
cache_stats_print(&cl_env_stats, m, 0);
seq_printf(m, "\n");
return 0;
......@@ -1147,12 +1110,6 @@ void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
}
EXPORT_SYMBOL(cl_stack_fini);
int cl_lock_init(void);
void cl_lock_fini(void);
int cl_page_init(void);
void cl_page_fini(void);
static struct lu_context_key cl_key;
struct cl_thread_info *cl_env_info(const struct lu_env *env)
......@@ -1247,22 +1204,13 @@ int cl_global_init(void)
if (result)
goto out_kmem;
result = cl_lock_init();
if (result)
goto out_context;
result = cl_page_init();
if (result)
goto out_lock;
result = cl_env_percpu_init();
if (result)
/* no cl_env_percpu_fini on error */
goto out_lock;
goto out_context;
return 0;
out_lock:
cl_lock_fini();
out_context:
lu_context_key_degister(&cl_key);
out_kmem:
......@@ -1278,8 +1226,6 @@ int cl_global_init(void)
void cl_global_fini(void)
{
cl_env_percpu_fini();
cl_lock_fini();
cl_page_fini();
lu_context_key_degister(&cl_key);
lu_kmem_fini(cl_object_caches);
cl_env_store_fini();
......
......@@ -1075,12 +1075,3 @@ void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
slice->cpl_page = page;
}
EXPORT_SYMBOL(cl_page_slice_add);
int cl_page_init(void)
{
return 0;
}
void cl_page_fini(void)
{
}
......@@ -171,7 +171,7 @@ struct echo_thread_info {
struct cl_2queue eti_queue;
struct cl_io eti_io;
struct cl_lock_descr eti_descr;
struct cl_lock eti_lock;
struct lu_fid eti_fid;
struct lu_fid eti_fid2;
};
......@@ -327,26 +327,8 @@ static void echo_lock_fini(const struct lu_env *env,
kmem_cache_free(echo_lock_kmem, ecl);
}
static void echo_lock_delete(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
struct echo_lock *ecl = cl2echo_lock(slice);
LASSERT(list_empty(&ecl->el_chain));
}
static int echo_lock_fits_into(const struct lu_env *env,
const struct cl_lock_slice *slice,
const struct cl_lock_descr *need,
const struct cl_io *unused)
{
return 1;
}
static struct cl_lock_operations echo_lock_ops = {
.clo_fini = echo_lock_fini,
.clo_delete = echo_lock_delete,
.clo_fits_into = echo_lock_fits_into
};
/** @} echo_lock */
......@@ -811,16 +793,7 @@ static void echo_lock_release(const struct lu_env *env,
{
struct cl_lock *clk = echo_lock2cl(ecl);
cl_lock_get(clk);
cl_unuse(env, clk);
cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
if (!still_used) {
cl_lock_mutex_get(env, clk);
cl_lock_cancel(env, clk);
cl_lock_delete(env, clk);
cl_lock_mutex_put(env, clk);
}
cl_lock_put(env, clk);
cl_lock_release(env, clk);
}
static struct lu_device *echo_device_free(const struct lu_env *env,
......@@ -1014,9 +987,11 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
info = echo_env_info(env);
io = &info->eti_io;
descr = &info->eti_descr;
lck = &info->eti_lock;
obj = echo_obj2cl(eco);
memset(lck, 0, sizeof(*lck));
descr = &lck->cll_descr;
descr->cld_obj = obj;
descr->cld_start = cl_index(obj, start);
descr->cld_end = cl_index(obj, end);
......@@ -1024,13 +999,11 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
descr->cld_enq_flags = enqflags;
io->ci_obj = obj;
lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
if (lck) {
rc = cl_lock_request(env, io, lck);
if (rc == 0) {
struct echo_client_obd *ec = eco->eo_dev->ed_ec;
struct echo_lock *el;
rc = cl_wait(env, lck);
if (rc == 0) {
el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
spin_lock(&ec->ec_lock);
if (list_empty(&el->el_chain)) {
......@@ -1040,9 +1013,6 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
atomic_inc(&el->el_refcount);
*cookie = el->el_cookie;
spin_unlock(&ec->ec_lock);
} else {
cl_lock_release(env, lck, "ec enqueue", current);
}
}
return rc;
}
......
......@@ -76,6 +76,8 @@ static inline char *ext_flags(struct osc_extent *ext, char *flags)
*buf++ = ext->oe_rw ? 'r' : 'w';
if (ext->oe_intree)
*buf++ = 'i';
if (ext->oe_sync)
*buf++ = 'S';
if (ext->oe_srvlock)
*buf++ = 's';
if (ext->oe_hp)
......@@ -121,9 +123,13 @@ static const char *oes_strings[] = {
__ext->oe_grants, __ext->oe_nr_pages, \
list_empty_marker(&__ext->oe_pages), \
waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
__ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \
__ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner, \
/* ----- part 4 ----- */ \
## __VA_ARGS__); \
if (lvl == D_ERROR && __ext->oe_dlmlock) \
LDLM_ERROR(__ext->oe_dlmlock, "extent: %p\n", __ext); \
else \
LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p\n", __ext); \
} while (0)
#undef EASSERTF
......@@ -240,20 +246,25 @@ static int osc_extent_sanity_check0(struct osc_extent *ext,
goto out;
}
if (!ext->oe_osclock && ext->oe_grants > 0) {
if (ext->oe_sync && ext->oe_grants > 0) {
rc = 90;
goto out;
}
if (ext->oe_osclock) {
struct cl_lock_descr *descr;
if (ext->oe_dlmlock) {
struct ldlm_extent *extent;
descr = &ext->oe_osclock->cll_descr;
if (!(descr->cld_start <= ext->oe_start &&
descr->cld_end >= ext->oe_max_end)) {
extent = &ext->oe_dlmlock->l_policy_data.l_extent;
if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end))) {
rc = 100;
goto out;
}
if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP))) {
rc = 102;
goto out;
}
}
if (ext->oe_nr_pages > ext->oe_mppr) {
......@@ -359,7 +370,7 @@ static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
ext->oe_state = OES_INV;
INIT_LIST_HEAD(&ext->oe_pages);
init_waitqueue_head(&ext->oe_waitq);
ext->oe_osclock = NULL;
ext->oe_dlmlock = NULL;
return ext;
}
......@@ -385,9 +396,11 @@ static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
LASSERT(ext->oe_state == OES_INV);
LASSERT(!ext->oe_intree);
if (ext->oe_osclock) {
cl_lock_put(env, ext->oe_osclock);
ext->oe_osclock = NULL;
if (ext->oe_dlmlock) {
lu_ref_add(&ext->oe_dlmlock->l_reference,
"osc_extent", ext);
LDLM_LOCK_PUT(ext->oe_dlmlock);
ext->oe_dlmlock = NULL;
}
osc_extent_free(ext);
}
......@@ -543,7 +556,7 @@ static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
if (cur->oe_max_end != victim->oe_max_end)
return -ERANGE;
LASSERT(cur->oe_osclock == victim->oe_osclock);
LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_CACHE_SHIFT;
chunk_start = cur->oe_start >> ppc_bits;
chunk_end = cur->oe_end >> ppc_bits;
......@@ -624,10 +637,10 @@ static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
static struct osc_extent *osc_extent_find(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
int *grants)
{
struct client_obd *cli = osc_cli(obj);
struct cl_lock *lock;
struct osc_lock *olck;
struct cl_lock_descr *descr;
struct osc_extent *cur;
struct osc_extent *ext;
struct osc_extent *conflict = NULL;
......@@ -644,8 +657,12 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
if (!cur)
return ERR_PTR(-ENOMEM);
lock = cl_lock_at_pgoff(env, osc2cl(obj), index, NULL, 1, 0);
LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
olck = osc_env_io(env)->oi_write_osclock;
LASSERTF(olck, "page %lu is not covered by lock\n", index);
LASSERT(olck->ols_state == OLS_GRANTED);
descr = &olck->ols_cl.cls_lock->cll_descr;
LASSERT(descr->cld_mode >= CLM_WRITE);
LASSERT(cli->cl_chunkbits >= PAGE_CACHE_SHIFT);
ppc_bits = cli->cl_chunkbits - PAGE_CACHE_SHIFT;
......@@ -657,19 +674,23 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
max_pages = cli->cl_max_pages_per_rpc;
LASSERT((max_pages & ~chunk_mask) == 0);
max_end = index - (index % max_pages) + max_pages - 1;
max_end = min_t(pgoff_t, max_end, lock->cll_descr.cld_end);
max_end = min_t(pgoff_t, max_end, descr->cld_end);
/* initialize new extent by parameters so far */
cur->oe_max_end = max_end;
cur->oe_start = index & chunk_mask;
cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
if (cur->oe_start < lock->cll_descr.cld_start)
cur->oe_start = lock->cll_descr.cld_start;
if (cur->oe_start < descr->cld_start)
cur->oe_start = descr->cld_start;
if (cur->oe_end > max_end)
cur->oe_end = max_end;
cur->oe_osclock = lock;
cur->oe_grants = 0;
cur->oe_mppr = max_pages;
if (olck->ols_dlmlock) {
LASSERT(olck->ols_hold);
cur->oe_dlmlock = LDLM_LOCK_GET(olck->ols_dlmlock);
lu_ref_add(&olck->ols_dlmlock->l_reference, "osc_extent", cur);
}
/* grants has been allocated by caller */
LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
......@@ -691,7 +712,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
break;
/* if covering by different locks, no chance to match */
if (lock != ext->oe_osclock) {
if (olck->ols_dlmlock != ext->oe_dlmlock) {
EASSERTF(!overlapped(ext, cur), ext,
EXTSTR"\n", EXTPARA(cur));
......@@ -795,7 +816,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
if (found) {
LASSERT(!conflict);
if (!IS_ERR(found)) {
LASSERT(found->oe_osclock == cur->oe_osclock);
LASSERT(found->oe_dlmlock == cur->oe_dlmlock);
OSC_EXTENT_DUMP(D_CACHE, found,
"found caching ext for %lu.\n", index);
}
......@@ -810,7 +831,7 @@ static struct osc_extent *osc_extent_find(const struct lu_env *env,
found = osc_extent_hold(cur);
osc_extent_insert(obj, cur);
OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
index, lock->cll_descr.cld_end);
index, descr->cld_end);
}
osc_object_unlock(obj);
......@@ -2630,6 +2651,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
}
ext->oe_rw = !!(cmd & OBD_BRW_READ);
ext->oe_sync = 1;
ext->oe_urgent = 1;
ext->oe_start = start;
ext->oe_end = ext->oe_max_end = end;
......@@ -3087,27 +3109,27 @@ static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_lock *lock = cbdata;
struct osc_object *osc = cbdata;
pgoff_t index;
index = osc_index(ops);
if (index >= info->oti_fn_index) {
struct cl_lock *tmp;
struct ldlm_lock *tmp;
struct cl_page *page = ops->ops_cl.cpl_page;
/* refresh non-overlapped index */
tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
lock, 1, 0);
tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0);
if (tmp) {
__u64 end = tmp->l_policy_data.l_extent.end;
/* Cache the first-non-overlapped index so as to skip
* all pages within [index, oti_fn_index). This
* is safe because if tmp lock is canceled, it will
* discard these pages.
* all pages within [index, oti_fn_index). This is safe
* because if tmp lock is canceled, it will discard
* these pages.
*/
info->oti_fn_index = tmp->cll_descr.cld_end + 1;
if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
if (end == OBD_OBJECT_EOF)
info->oti_fn_index = CL_PAGE_EOF;
cl_lock_put(env, tmp);
LDLM_LOCK_PUT(tmp);
} else if (cl_page_own(env, io, page) == 0) {
/* discard the page */
cl_page_discard(env, io, page);
......@@ -3125,11 +3147,8 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_lock *lock = cbdata;
struct cl_page *page = ops->ops_cl.cpl_page;
LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
/* page is top page. */
info->oti_next_index = osc_index(ops) + 1;
if (cl_page_own(env, io, page) == 0) {
......@@ -3154,30 +3173,27 @@ static int discard_cb(const struct lu_env *env, struct cl_io *io,
* If error happens on any step, the process continues anyway (the reasoning
* behind this being that lock cancellation cannot be delayed indefinitely).
*/
int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *ols)
int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
{
struct osc_thread_info *info = osc_env_info(env);
struct cl_io *io = &info->oti_io;
struct cl_object *osc = ols->ols_cl.cls_obj;
struct cl_lock *lock = ols->ols_cl.cls_lock;
struct cl_lock_descr *descr = &lock->cll_descr;
osc_page_gang_cbt cb;
int res;
int result;
io->ci_obj = cl_object_top(osc);
io->ci_obj = cl_object_top(osc2cl(osc));
io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result != 0)
goto out;
cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
info->oti_fn_index = info->oti_next_index = descr->cld_start;
cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
info->oti_fn_index = info->oti_next_index = start;
do {
res = osc_page_gang_lookup(env, io, cl2osc(osc),
info->oti_next_index, descr->cld_end,
cb, (void *)lock);
if (info->oti_next_index > descr->cld_end)
res = osc_page_gang_lookup(env, io, osc,
info->oti_next_index, end, cb, osc);
if (info->oti_next_index > end)
break;
if (res == CLP_GANG_RESCHED)
......
......@@ -68,6 +68,9 @@ struct osc_io {
struct cl_io_slice oi_cl;
/** true if this io is lockless. */
int oi_lockless;
/** how many LRU pages are reserved for this IO */
int oi_lru_reserved;
/** active extents, we know how many bytes is going to be written,
* so having an active extent will prevent it from being fragmented
*/
......@@ -77,8 +80,8 @@ struct osc_io {
*/
struct osc_extent *oi_trunc;
int oi_lru_reserved;
/** write osc_lock for this IO, used by osc_extent_find(). */
struct osc_lock *oi_write_osclock;
struct obd_info oi_info;
struct obdo oi_oa;
struct osc_async_cbargs {
......@@ -117,6 +120,7 @@ struct osc_thread_info {
*/
pgoff_t oti_next_index;
pgoff_t oti_fn_index; /* first non-overlapped index */
struct cl_sync_io oti_anchor;
};
struct osc_object {
......@@ -173,6 +177,10 @@ struct osc_object {
struct radix_tree_root oo_tree;
spinlock_t oo_tree_lock;
unsigned long oo_npages;
/* Protect osc_lock this osc_object has */
spinlock_t oo_ol_spin;
struct list_head oo_ol_list;
};
static inline void osc_object_lock(struct osc_object *obj)
......@@ -212,8 +220,6 @@ enum osc_lock_state {
OLS_ENQUEUED,
OLS_UPCALL_RECEIVED,
OLS_GRANTED,
OLS_RELEASED,
OLS_BLOCKED,
OLS_CANCELLED
};
......@@ -222,10 +228,8 @@ enum osc_lock_state {
*
* Interaction with DLM.
*
* CLIO enqueues all DLM locks through ptlrpcd (that is, in "async" mode).
*
* Once receive upcall is invoked, osc_lock remembers a handle of DLM lock in
* osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_lock.
* osc_lock::ols_handle and a pointer to that lock in osc_lock::ols_dlmlock.
*
* This pointer is protected through a reference, acquired by
* osc_lock_upcall0(). Also, an additional reference is acquired by
......@@ -263,16 +267,27 @@ enum osc_lock_state {
*/
struct osc_lock {
struct cl_lock_slice ols_cl;
/** Internal lock to protect states, etc. */
spinlock_t ols_lock;
/** Owner sleeps on this channel for state change */
struct cl_sync_io *ols_owner;
/** waiting list for this lock to be cancelled */
struct list_head ols_waiting_list;
/** wait entry of ols_waiting_list */
struct list_head ols_wait_entry;
/** list entry for osc_object::oo_ol_list */
struct list_head ols_nextlock_oscobj;
/** underlying DLM lock */
struct ldlm_lock *ols_lock;
/** lock value block */
struct ost_lvb ols_lvb;
struct ldlm_lock *ols_dlmlock;
/** DLM flags with which osc_lock::ols_lock was enqueued */
__u64 ols_flags;
/** osc_lock::ols_lock handle */
struct lustre_handle ols_handle;
struct ldlm_enqueue_info ols_einfo;
enum osc_lock_state ols_state;
/** lock value block */
struct ost_lvb ols_lvb;
/**
* true, if ldlm_lock_addref() was called against
......@@ -302,16 +317,6 @@ struct osc_lock {
* If true, osc_lock_enqueue is able to tolerate the -EUSERS error.
*/
ols_locklessable:1,
/**
* set by osc_lock_use() to wait until blocking AST enters into
* osc_ldlm_blocking_ast0(), so that cl_lock mutex can be used for
* further synchronization.
*/
ols_ast_wait:1,
/**
* If the data of this lock has been flushed to server side.
*/
ols_flush:1,
/**
* if set, the osc_lock is a glimpse lock. For glimpse locks, we treat
* the EVAVAIL error as tolerable, this will make upper logic happy
......@@ -325,15 +330,6 @@ struct osc_lock {
* For async glimpse lock.
*/
ols_agl:1;
/**
* IO that owns this lock. This field is used for a dead-lock
* avoidance by osc_lock_enqueue_wait().
*
* XXX: unfortunately, the owner of a osc_lock is not unique,
* the lock may have multiple users, if the lock is granted and
* then matched.
*/
struct osc_io *ols_owner;
};
/**
......@@ -627,6 +623,8 @@ struct osc_extent {
unsigned int oe_intree:1,
/** 0 is write, 1 is read */
oe_rw:1,
/** sync extent, queued by osc_queue_sync_pages() */
oe_sync:1,
oe_srvlock:1,
oe_memalloc:1,
/** an ACTIVE extent is going to be truncated, so when this extent
......@@ -675,7 +673,7 @@ struct osc_extent {
*/
wait_queue_head_t oe_waitq;
/** lock covering this extent */
struct cl_lock *oe_osclock;
struct ldlm_lock *oe_dlmlock;
/** terminator of this extent. Must be true if this extent is in IO. */
struct task_struct *oe_owner;
/** return value of writeback. If somebody is waiting for this extent,
......@@ -690,14 +688,14 @@ int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
int sent, int rc);
void osc_extent_release(const struct lu_env *env, struct osc_extent *ext);
int osc_lock_discard_pages(const struct lu_env *env, struct osc_lock *lock);
int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
pgoff_t start, pgoff_t end, enum cl_lock_mode mode);
typedef int (*osc_page_gang_cbt)(const struct lu_env *, struct cl_io *,
struct osc_page *, void *);
int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
struct osc_object *osc, pgoff_t start, pgoff_t end,
osc_page_gang_cbt cb, void *cbdata);
/** @} osc */
#endif /* OSC_CL_INTERNAL_H */
......@@ -108,12 +108,14 @@ void osc_update_next_shrink(struct client_obd *cli);
extern struct ptlrpc_request_set *PTLRPCD_SET;
typedef int (*osc_enqueue_upcall_f)(void *cookie, struct lustre_handle *lockh,
int rc);
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
__u64 *flags, ldlm_policy_data_t *policy,
struct ost_lvb *lvb, int kms_valid,
obd_enqueue_update_f upcall,
osc_enqueue_upcall_f upcall,
void *cookie, struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh,
struct ptlrpc_request_set *rqset, int async, int agl);
int osc_cancel_base(struct lustre_handle *lockh, __u32 mode);
......@@ -140,7 +142,6 @@ int osc_lru_shrink(const struct lu_env *env, struct client_obd *cli,
int target, bool force);
int osc_lru_reclaim(struct client_obd *cli);
extern spinlock_t osc_ast_guard;
unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock);
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg);
......@@ -199,5 +200,8 @@ int osc_quotactl(struct obd_device *unused, struct obd_export *exp,
int osc_quotacheck(struct obd_device *unused, struct obd_export *exp,
struct obd_quotactl *oqctl);
int osc_quota_poll_check(struct obd_export *exp, struct if_quotacheck *qchk);
struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
int pending, int canceling);
#endif /* OSC_INTERNAL_H */
......@@ -354,6 +354,7 @@ static void osc_io_rw_iter_fini(const struct lu_env *env,
atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
oio->oi_lru_reserved = 0;
}
oio->oi_write_osclock = NULL;
}
static int osc_io_fault_start(const struct lu_env *env,
......@@ -751,8 +752,7 @@ static void osc_req_attr_set(const struct lu_env *env,
struct lov_oinfo *oinfo;
struct cl_req *clerq;
struct cl_page *apage; /* _some_ page in @clerq */
struct cl_lock *lock; /* _some_ lock protecting @apage */
struct osc_lock *olck;
struct ldlm_lock *lock; /* _some_ lock protecting @apage */
struct osc_page *opg;
struct obdo *oa;
struct ost_lvb *lvb;
......@@ -782,38 +782,37 @@ static void osc_req_attr_set(const struct lu_env *env,
oa->o_valid |= OBD_MD_FLID;
}
if (flags & OBD_MD_FLHANDLE) {
struct cl_object *subobj;
clerq = slice->crs_req;
LASSERT(!list_empty(&clerq->crq_pages));
apage = container_of(clerq->crq_pages.next,
struct cl_page, cp_flight);
opg = osc_cl_page_osc(apage, NULL);
subobj = opg->ops_cl.cpl_obj;
lock = cl_lock_at_pgoff(env, subobj, osc_index(opg),
NULL, 1, 1);
if (!lock) {
struct cl_object_header *head;
struct cl_lock *scan;
head = cl_object_header(subobj);
list_for_each_entry(scan, &head->coh_locks, cll_linkage)
CL_LOCK_DEBUG(D_ERROR, env, scan,
"no cover page!\n");
CL_PAGE_DEBUG(D_ERROR, env, apage,
"dump uncover page!\n");
lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
1, 1);
if (!lock && !opg->ops_srvlock) {
struct ldlm_resource *res;
struct ldlm_res_id *resname;
CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
resname = &osc_env_info(env)->oti_resname;
ostid_build_res_name(&oinfo->loi_oi, resname);
res = ldlm_resource_get(
osc_export(cl2osc(obj))->exp_obd->obd_namespace,
NULL, resname, LDLM_EXTENT, 0);
ldlm_resource_dump(D_ERROR, res);
dump_stack();
LBUG();
}
olck = osc_lock_at(lock);
LASSERT(ergo(opg->ops_srvlock, !olck->ols_lock));
/* check for lockless io. */
if (olck->ols_lock) {
oa->o_handle = olck->ols_lock->l_remote_handle;
if (lock) {
oa->o_handle = lock->l_remote_handle;
oa->o_valid |= OBD_MD_FLHANDLE;
LDLM_LOCK_PUT(lock);
}
cl_lock_put(env, lock);
}
}
......
......@@ -96,6 +96,8 @@ static int osc_object_init(const struct lu_env *env, struct lu_object *obj,
atomic_set(&osc->oo_nr_writes, 0);
spin_lock_init(&osc->oo_lock);
spin_lock_init(&osc->oo_tree_lock);
spin_lock_init(&osc->oo_ol_spin);
INIT_LIST_HEAD(&osc->oo_ol_list);
cl_object_page_init(lu2cl(obj), sizeof(struct osc_page));
......@@ -122,6 +124,7 @@ static void osc_object_free(const struct lu_env *env, struct lu_object *obj)
LASSERT(list_empty(&osc->oo_reading_exts));
LASSERT(atomic_read(&osc->oo_nr_reads) == 0);
LASSERT(atomic_read(&osc->oo_nr_writes) == 0);
LASSERT(list_empty(&osc->oo_ol_list));
lu_object_fini(obj);
kmem_cache_free(osc_object_kmem, osc);
......@@ -194,6 +197,32 @@ static int osc_object_glimpse(const struct lu_env *env,
return 0;
}
static int osc_object_ast_clear(struct ldlm_lock *lock, void *data)
{
LASSERT(lock->l_granted_mode == lock->l_req_mode);
if (lock->l_ast_data == data)
lock->l_ast_data = NULL;
return LDLM_ITER_CONTINUE;
}
static int osc_object_prune(const struct lu_env *env, struct cl_object *obj)
{
struct osc_object *osc = cl2osc(obj);
struct ldlm_res_id *resname = &osc_env_info(env)->oti_resname;
LASSERTF(osc->oo_npages == 0,
DFID "still have %lu pages, obj: %p, osc: %p\n",
PFID(lu_object_fid(&obj->co_lu)), osc->oo_npages, obj, osc);
/* DLM locks don't hold a reference of osc_object so we have to
* clear it before the object is being destroyed.
*/
ostid_build_res_name(&osc->oo_oinfo->loi_oi, resname);
ldlm_resource_iterate(osc_export(osc)->exp_obd->obd_namespace, resname,
osc_object_ast_clear, osc);
return 0;
}
void osc_object_set_contended(struct osc_object *obj)
{
obj->oo_contention_time = cfs_time_current();
......@@ -238,12 +267,12 @@ static const struct cl_object_operations osc_ops = {
.coo_io_init = osc_io_init,
.coo_attr_get = osc_attr_get,
.coo_attr_set = osc_attr_set,
.coo_glimpse = osc_object_glimpse
.coo_glimpse = osc_object_glimpse,
.coo_prune = osc_object_prune
};
static const struct lu_object_operations osc_lu_obj_ops = {
.loo_object_init = osc_object_init,
.loo_object_delete = NULL,
.loo_object_release = NULL,
.loo_object_free = osc_object_free,
.loo_object_print = osc_object_print,
......
......@@ -135,15 +135,15 @@ static int osc_page_is_under_lock(const struct lu_env *env,
struct cl_io *unused, pgoff_t *max_index)
{
struct osc_page *opg = cl2osc_page(slice);
struct cl_lock *lock;
struct ldlm_lock *dlmlock;
int result = -ENODATA;
*max_index = 0;
lock = cl_lock_at_pgoff(env, slice->cpl_obj, osc_index(opg),
NULL, 1, 0);
if (lock) {
*max_index = lock->cll_descr.cld_end;
cl_lock_put(env, lock);
dlmlock = osc_dlmlock_at_pgoff(env, cl2osc(slice->cpl_obj),
osc_index(opg), 1, 0);
if (dlmlock) {
*max_index = cl_index(slice->cpl_obj,
dlmlock->l_policy_data.l_extent.end);
LDLM_LOCK_PUT(dlmlock);
result = 0;
}
return result;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment