Commit 66cc83e9 authored by Mikhail Pershin's avatar Mikhail Pershin Committed by Greg Kroah-Hartman

staging/lustre/obdclass: use common way to store lastid

Local files last id are stored in root in files named seq-xxx-lastid
while lastid for OST objects is stored in O/seq/LAST_ID special
object with zero OID and handled by OSD.
Patch reworks local files lastid to be stored in O/seq/LAST_ID too
and using the same format.

Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2886
Lustre-change: http://review.whamcloud.com/6199Signed-off-by: default avatarMikhail Pershin <mike.pershin@intel.com>
Signed-off-by: default avatarJames Nunez <james.a.nunez@intel.com>
Reviewed-by: default avatarAndreas Dilger <andreas.dilger@intel.com>
Reviewed-by: default avatarAlex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: default avatarFan Yong <fan.yong@intel.com>
Signed-off-by: default avatarPeng Tao <tao.peng@emc.com>
Signed-off-by: default avatarAndreas Dilger <andreas.dilger@intel.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 53b78538
...@@ -383,6 +383,8 @@ extern void lustre_hsm_swab(struct hsm_attrs *attrs); ...@@ -383,6 +383,8 @@ extern void lustre_hsm_swab(struct hsm_attrs *attrs);
* fid constants * fid constants
*/ */
enum { enum {
/** LASTID file has zero OID */
LUSTRE_FID_LASTID_OID = 0UL,
/** initial fid id value */ /** initial fid id value */
LUSTRE_FID_INIT_OID = 1UL LUSTRE_FID_INIT_OID = 1UL
}; };
...@@ -501,8 +503,8 @@ static inline int fid_seq_is_llog(obd_seq seq) ...@@ -501,8 +503,8 @@ static inline int fid_seq_is_llog(obd_seq seq)
static inline int fid_is_llog(const struct lu_fid *fid) static inline int fid_is_llog(const struct lu_fid *fid)
{ {
/* file with OID == 1 is not llog but contains last oid */ /* file with OID == 0 is not llog but contains last oid */
return fid_seq_is_llog(fid_seq(fid)) && fid_oid(fid) > 1; return fid_seq_is_llog(fid_seq(fid)) && fid_oid(fid) > 0;
} }
static inline int fid_seq_is_rsvd(const __u64 seq) static inline int fid_seq_is_rsvd(const __u64 seq)
...@@ -785,8 +787,7 @@ static inline int fid_to_ostid(const struct lu_fid *fid, struct ost_id *ostid) ...@@ -785,8 +787,7 @@ static inline int fid_to_ostid(const struct lu_fid *fid, struct ost_id *ostid)
/* Check whether the fid is for LAST_ID */ /* Check whether the fid is for LAST_ID */
static inline int fid_is_last_id(const struct lu_fid *fid) static inline int fid_is_last_id(const struct lu_fid *fid)
{ {
return (fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) && return (fid_oid(fid) == 0);
fid_oid(fid) == 0;
} }
/** /**
......
...@@ -518,16 +518,6 @@ struct lustre_mount_info { ...@@ -518,16 +518,6 @@ struct lustre_mount_info {
struct list_head lmi_list_chain; struct list_head lmi_list_chain;
}; };
/* on-disk structure describing local object OIDs storage
* the structure to be used with any sequence managed by
* local object library */
struct los_ondisk {
__u32 lso_magic;
__u32 lso_next_oid;
};
#define LOS_MAGIC 0xdecafbee
/****************** prototypes *********************/ /****************** prototypes *********************/
/* obd_mount.c */ /* obd_mount.c */
......
...@@ -208,7 +208,7 @@ int local_object_fid_generate(const struct lu_env *env, ...@@ -208,7 +208,7 @@ int local_object_fid_generate(const struct lu_env *env,
mutex_lock(&los->los_id_lock); mutex_lock(&los->los_id_lock);
fid->f_seq = los->los_seq; fid->f_seq = los->los_seq;
fid->f_oid = los->los_last_oid++; fid->f_oid = ++los->los_last_oid;
fid->f_ver = 0; fid->f_ver = 0;
mutex_unlock(&los->los_id_lock); mutex_unlock(&los->los_id_lock);
...@@ -252,7 +252,7 @@ int local_object_create(const struct lu_env *env, ...@@ -252,7 +252,7 @@ int local_object_create(const struct lu_env *env,
struct dt_object_format *dof, struct thandle *th) struct dt_object_format *dof, struct thandle *th)
{ {
struct dt_thread_info *dti = dt_info(env); struct dt_thread_info *dti = dt_info(env);
struct los_ondisk losd; obd_id lastid;
int rc; int rc;
ENTRY; ENTRY;
...@@ -274,12 +274,11 @@ int local_object_create(const struct lu_env *env, ...@@ -274,12 +274,11 @@ int local_object_create(const struct lu_env *env,
/* update local oid number on disk so that /* update local oid number on disk so that
* we know the last one used after reboot */ * we know the last one used after reboot */
losd.lso_magic = cpu_to_le32(LOS_MAGIC); lastid = cpu_to_le64(los->los_last_oid);
losd.lso_next_oid = cpu_to_le32(los->los_last_oid);
dti->dti_off = 0; dti->dti_off = 0;
dti->dti_lb.lb_buf = &losd; dti->dti_lb.lb_buf = &lastid;
dti->dti_lb.lb_len = sizeof(losd); dti->dti_lb.lb_len = sizeof(lastid);
rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off, rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
th); th);
mutex_unlock(&los->los_id_lock); mutex_unlock(&los->los_id_lock);
...@@ -660,6 +659,79 @@ void dt_los_put(struct local_oid_storage *los) ...@@ -660,6 +659,79 @@ void dt_los_put(struct local_oid_storage *los)
return; return;
} }
/* after Lustre 2.3 release there may be old file to store last generated FID
* If such file exists then we have to read its content
*/
int lastid_compat_check(const struct lu_env *env, struct dt_device *dev,
__u64 lastid_seq, __u32 *first_oid, struct ls_device *ls)
{
struct dt_thread_info *dti = dt_info(env);
struct dt_object *root = NULL;
struct los_ondisk losd;
struct dt_object *o = NULL;
int rc = 0;
rc = dt_root_get(env, dev, &dti->dti_fid);
if (rc)
return rc;
root = ls_locate(env, ls, &dti->dti_fid);
if (IS_ERR(root))
return PTR_ERR(root);
/* find old last_id file */
snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-"LPX64"-lastid",
lastid_seq);
rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
lu_object_put_nocache(env, &root->do_lu);
if (rc == -ENOENT) {
/* old llog lastid accessed by FID only */
if (lastid_seq != FID_SEQ_LLOG)
return 0;
dti->dti_fid.f_seq = FID_SEQ_LLOG;
dti->dti_fid.f_oid = 1;
dti->dti_fid.f_ver = 0;
o = ls_locate(env, ls, &dti->dti_fid);
if (IS_ERR(o))
return PTR_ERR(o);
if (!dt_object_exists(o)) {
lu_object_put_nocache(env, &o->do_lu);
return 0;
}
CDEBUG(D_INFO, "Found old llog lastid file\n");
} else if (rc < 0) {
return rc;
} else {
CDEBUG(D_INFO, "Found old lastid file for sequence "LPX64"\n",
lastid_seq);
o = ls_locate(env, ls, &dti->dti_fid);
if (IS_ERR(o))
return PTR_ERR(o);
}
/* let's read seq-NNNNNN-lastid file value */
LASSERT(dt_object_exists(o));
dti->dti_off = 0;
dti->dti_lb.lb_buf = &losd;
dti->dti_lb.lb_len = sizeof(losd);
dt_read_lock(env, o, 0);
rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
dt_read_unlock(env, o);
lu_object_put_nocache(env, &o->do_lu);
if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
CERROR("%s: wrong content of seq-"LPX64"-lastid file, magic %x\n",
o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq,
le32_to_cpu(losd.lso_magic));
return -EINVAL;
} else if (rc < 0) {
CERROR("%s: failed to read seq-"LPX64"-lastid: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, rc);
return rc;
}
*first_oid = le32_to_cpu(losd.lso_next_oid);
return rc;
}
/** /**
* Initialize local OID storage for required sequence. * Initialize local OID storage for required sequence.
* That may be needed for services that uses local files and requires * That may be needed for services that uses local files and requires
...@@ -683,11 +755,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -683,11 +755,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
{ {
struct dt_thread_info *dti = dt_info(env); struct dt_thread_info *dti = dt_info(env);
struct ls_device *ls; struct ls_device *ls;
struct los_ondisk losd; obd_id lastid;
struct dt_object *root = NULL;
struct dt_object *o = NULL; struct dt_object *o = NULL;
struct thandle *th; struct thandle *th;
int rc; __u32 first_oid = fid_oid(first_fid);
int rc = 0;
ENTRY; ENTRY;
...@@ -711,30 +783,21 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -711,30 +783,21 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
atomic_inc(&ls->ls_refcount); atomic_inc(&ls->ls_refcount);
list_add(&(*los)->los_list, &ls->ls_los_list); list_add(&(*los)->los_list, &ls->ls_los_list);
rc = dt_root_get(env, dev, &dti->dti_fid); /* Use {seq, 0, 0} to create the LAST_ID file for every
if (rc) * sequence. OIDs start at LUSTRE_FID_INIT_OID.
GOTO(out_los, rc); */
dti->dti_fid.f_seq = fid_seq(first_fid);
root = ls_locate(env, ls, &dti->dti_fid); dti->dti_fid.f_oid = LUSTRE_FID_LASTID_OID;
if (IS_ERR(root)) dti->dti_fid.f_ver = 0;
GOTO(out_los, rc = PTR_ERR(root));
/* initialize data allowing to generate new fids,
* literally we need a sequence */
snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%Lx-lastid",
fid_seq(first_fid));
rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
if (rc == -ENOENT)
dti->dti_fid = *first_fid;
else if (rc < 0)
GOTO(out_los, rc);
o = ls_locate(env, ls, &dti->dti_fid); o = ls_locate(env, ls, &dti->dti_fid);
if (IS_ERR(o)) if (IS_ERR(o))
GOTO(out_los, rc = PTR_ERR(o)); GOTO(out_los, rc = PTR_ERR(o));
LASSERT(fid_seq(&dti->dti_fid) == fid_seq(first_fid));
if (!dt_object_exists(o)) { if (!dt_object_exists(o)) {
LASSERT(rc == -ENOENT); rc = lastid_compat_check(env, dev, fid_seq(first_fid),
&first_oid, ls);
if (rc < 0)
GOTO(out_los, rc);
th = dt_trans_create(env, dev); th = dt_trans_create(env, dev);
if (IS_ERR(th)) if (IS_ERR(th))
...@@ -749,14 +812,7 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -749,14 +812,7 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
if (rc) if (rc)
GOTO(out_trans, rc); GOTO(out_trans, rc);
rc = dt_declare_insert(env, root, rc = dt_declare_record_write(env, o, sizeof(lastid), 0, th);
(const struct dt_rec *)&dti->dti_fid,
(const struct dt_key *)dti->dti_buf,
th);
if (rc)
GOTO(out_trans, rc);
rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
if (rc) if (rc)
GOTO(out_trans, rc); GOTO(out_trans, rc);
...@@ -764,7 +820,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -764,7 +820,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
if (rc) if (rc)
GOTO(out_trans, rc); GOTO(out_trans, rc);
dt_write_lock(env, root, 0);
dt_write_lock(env, o, 0); dt_write_lock(env, o, 0);
if (dt_object_exists(o)) if (dt_object_exists(o))
GOTO(out_lock, rc = 0); GOTO(out_lock, rc = 0);
...@@ -774,43 +829,33 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -774,43 +829,33 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
if (rc) if (rc)
GOTO(out_lock, rc); GOTO(out_lock, rc);
losd.lso_magic = cpu_to_le32(LOS_MAGIC); lastid = cpu_to_le64(first_oid);
losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
dti->dti_off = 0; dti->dti_off = 0;
dti->dti_lb.lb_buf = &losd; dti->dti_lb.lb_buf = &lastid;
dti->dti_lb.lb_len = sizeof(losd); dti->dti_lb.lb_len = sizeof(lastid);
rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th); rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
if (rc) if (rc)
GOTO(out_lock, rc); GOTO(out_lock, rc);
rc = dt_insert(env, root,
(const struct dt_rec *)&dti->dti_fid,
(const struct dt_key *)dti->dti_buf,
th, BYPASS_CAPA, 1);
if (rc)
GOTO(out_lock, rc);
out_lock: out_lock:
dt_write_unlock(env, o); dt_write_unlock(env, o);
dt_write_unlock(env, root);
out_trans: out_trans:
dt_trans_stop(env, dev, th); dt_trans_stop(env, dev, th);
} else { } else {
dti->dti_off = 0; dti->dti_off = 0;
dti->dti_lb.lb_buf = &losd; dti->dti_lb.lb_buf = &lastid;
dti->dti_lb.lb_len = sizeof(losd); dti->dti_lb.lb_len = sizeof(lastid);
dt_read_lock(env, o, 0); dt_read_lock(env, o, 0);
rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off); rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
dt_read_unlock(env, o); dt_read_unlock(env, o);
if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) { if (rc == 0 && le64_to_cpu(lastid) > OBIF_MAX_OID) {
CERROR("local storage file "DFID" is corrupted\n", CERROR("%s: bad oid "LPU64" is read from LAST_ID\n",
PFID(first_fid)); o->do_lu.lo_dev->ld_obd->obd_name,
le64_to_cpu(lastid));
rc = -EINVAL; rc = -EINVAL;
} }
} }
out_los: out_los:
if (root != NULL && !IS_ERR(root))
lu_object_put_nocache(env, &root->do_lu);
if (rc != 0) { if (rc != 0) {
list_del(&(*los)->los_list); list_del(&(*los)->los_list);
atomic_dec(&ls->ls_refcount); atomic_dec(&ls->ls_refcount);
...@@ -820,8 +865,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, ...@@ -820,8 +865,11 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
lu_object_put_nocache(env, &o->do_lu); lu_object_put_nocache(env, &o->do_lu);
} else { } else {
(*los)->los_seq = fid_seq(first_fid); (*los)->los_seq = fid_seq(first_fid);
(*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid); (*los)->los_last_oid = le64_to_cpu(lastid);
(*los)->los_obj = o; (*los)->los_obj = o;
/* read value should not be less than initial one */
LASSERTF((*los)->los_last_oid >= first_oid, "%u < %u\n",
(*los)->los_last_oid, first_oid);
} }
out: out:
mutex_unlock(&ls->ls_los_mutex); mutex_unlock(&ls->ls_los_mutex);
......
...@@ -74,3 +74,15 @@ struct ls_device *ls_device_get(struct dt_device *dev); ...@@ -74,3 +74,15 @@ struct ls_device *ls_device_get(struct dt_device *dev);
void ls_device_put(const struct lu_env *env, struct ls_device *ls); void ls_device_put(const struct lu_env *env, struct ls_device *ls);
struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq); struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq);
void dt_los_put(struct local_oid_storage *los); void dt_los_put(struct local_oid_storage *los);
/* Lustre 2.3 on-disk structure describing local object OIDs storage
* the structure to be used with any sequence managed by
* local object library.
* Obsoleted since 2.4 but is kept for compatibility reasons,
* see lastid_compat_check() in obdclass/local_storage.c */
struct los_ondisk {
__u32 lso_magic;
__u32 lso_next_oid;
};
#define LOS_MAGIC 0xdecafbee
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment