Commit fe4f961e authored by David S. Miller's avatar David S. Miller

Merge branch 'net-smc-add-event-based-framework-for-LLC-msgs'

Karsten Graul says:

====================
net/smc: add event-based framework for LLC msgs

These patches are the next step towards SMC-R link failover support. They add
a new framework to handle Link Layer Control (LLC) messages and adapt the
existing code to use the new framework.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 1569a3c4 41a211d8
......@@ -382,22 +382,24 @@ static int smcr_lgr_reg_rmbs(struct smc_link_group *lgr,
static int smcr_clnt_conf_first_link(struct smc_sock *smc)
{
struct smc_link *link = smc->conn.lnk;
int rest;
struct smc_llc_qentry *qentry;
int rc;
link->lgr->type = SMC_LGR_SINGLE;
/* receive CONFIRM LINK request from server over RoCE fabric */
rest = wait_for_completion_interruptible_timeout(
&link->llc_confirm,
SMC_LLC_WAIT_FIRST_TIME);
if (rest <= 0) {
qentry = smc_llc_wait(link->lgr, NULL, SMC_LLC_WAIT_TIME,
SMC_LLC_CONFIRM_LINK);
if (!qentry) {
struct smc_clc_msg_decline dclc;
rc = smc_clc_wait_msg(smc, &dclc, sizeof(dclc),
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_CL : rc;
}
if (link->llc_confirm_rc)
rc = smc_llc_eval_conf_link(qentry, SMC_LLC_REQ);
smc_llc_flow_qentry_del(&link->lgr->llc_flow_lcl);
if (rc)
return SMC_CLC_DECL_RMBE_EC;
rc = smc_ib_modify_qp_rts(link);
......@@ -409,31 +411,30 @@ static int smcr_clnt_conf_first_link(struct smc_sock *smc)
if (smcr_link_reg_rmb(link, smc->conn.rmb_desc, false))
return SMC_CLC_DECL_ERR_REGRMB;
/* confirm_rkey is implicit on 1st contact */
smc->conn.rmb_desc->is_conf_rkey = true;
/* send CONFIRM LINK response over RoCE fabric */
rc = smc_llc_send_confirm_link(link, SMC_LLC_RESP);
if (rc < 0)
return SMC_CLC_DECL_TIMEOUT_CL;
/* receive ADD LINK request from server over RoCE fabric */
rest = wait_for_completion_interruptible_timeout(&link->llc_add,
SMC_LLC_WAIT_TIME);
if (rest <= 0) {
smc_llc_link_active(link);
/* optional 2nd link, receive ADD LINK request from server */
qentry = smc_llc_wait(link->lgr, NULL, SMC_LLC_WAIT_TIME,
SMC_LLC_ADD_LINK);
if (!qentry) {
struct smc_clc_msg_decline dclc;
rc = smc_clc_wait_msg(smc, &dclc, sizeof(dclc),
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_AL : rc;
if (rc == -EAGAIN)
rc = 0; /* no DECLINE received, go with one link */
return rc;
}
/* send add link reject message, only one link supported for now */
rc = smc_llc_send_add_link(link,
link->smcibdev->mac[link->ibport - 1],
link->gid, SMC_LLC_RESP);
if (rc < 0)
return SMC_CLC_DECL_TIMEOUT_AL;
smc_llc_link_active(link);
smc_llc_flow_qentry_clr(&link->lgr->llc_flow_lcl);
/* tbd: call smc_llc_cli_add_link(link, qentry); */
return 0;
}
......@@ -613,8 +614,8 @@ static int smc_connect_rdma(struct smc_sock *smc,
struct smc_clc_msg_accept_confirm *aclc,
struct smc_init_info *ini)
{
int i, reason_code = 0;
struct smc_link *link;
int reason_code = 0;
ini->is_smcd = false;
ini->ib_lcl = &aclc->lcl;
......@@ -627,10 +628,28 @@ static int smc_connect_rdma(struct smc_sock *smc,
mutex_unlock(&smc_client_lgr_pending);
return reason_code;
}
link = smc->conn.lnk;
smc_conn_save_peer_info(smc, aclc);
if (ini->cln_first_contact == SMC_FIRST_CONTACT) {
link = smc->conn.lnk;
} else {
/* set link that was assigned by server */
link = NULL;
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
struct smc_link *l = &smc->conn.lgr->lnk[i];
if (l->peer_qpn == ntoh24(aclc->qpn)) {
link = l;
break;
}
}
if (!link)
return smc_connect_abort(smc, SMC_CLC_DECL_NOSRVLINK,
ini->cln_first_contact);
smc->conn.lnk = link;
}
/* create send buffer and rmb */
if (smc_buf_create(smc, false))
return smc_connect_abort(smc, SMC_CLC_DECL_MEM,
......@@ -666,7 +685,9 @@ static int smc_connect_rdma(struct smc_sock *smc,
if (ini->cln_first_contact == SMC_FIRST_CONTACT) {
/* QP confirmation over RoCE fabric */
smc_llc_flow_initiate(link->lgr, SMC_LLC_FLOW_ADD_LINK);
reason_code = smcr_clnt_conf_first_link(smc);
smc_llc_flow_stop(link->lgr, &link->lgr->llc_flow_lcl);
if (reason_code)
return smc_connect_abort(smc, reason_code,
ini->cln_first_contact);
......@@ -1019,9 +1040,11 @@ void smc_close_non_accepted(struct sock *sk)
static int smcr_serv_conf_first_link(struct smc_sock *smc)
{
struct smc_link *link = smc->conn.lnk;
int rest;
struct smc_llc_qentry *qentry;
int rc;
link->lgr->type = SMC_LGR_SINGLE;
if (smcr_link_reg_rmb(link, smc->conn.rmb_desc, false))
return SMC_CLC_DECL_ERR_REGRMB;
......@@ -1031,40 +1054,27 @@ static int smcr_serv_conf_first_link(struct smc_sock *smc)
return SMC_CLC_DECL_TIMEOUT_CL;
/* receive CONFIRM LINK response from client over the RoCE fabric */
rest = wait_for_completion_interruptible_timeout(
&link->llc_confirm_resp,
SMC_LLC_WAIT_FIRST_TIME);
if (rest <= 0) {
qentry = smc_llc_wait(link->lgr, link, SMC_LLC_WAIT_TIME,
SMC_LLC_CONFIRM_LINK);
if (!qentry) {
struct smc_clc_msg_decline dclc;
rc = smc_clc_wait_msg(smc, &dclc, sizeof(dclc),
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_CL : rc;
}
if (link->llc_confirm_resp_rc)
rc = smc_llc_eval_conf_link(qentry, SMC_LLC_RESP);
smc_llc_flow_qentry_del(&link->lgr->llc_flow_lcl);
if (rc)
return SMC_CLC_DECL_RMBE_EC;
/* send ADD LINK request to client over the RoCE fabric */
rc = smc_llc_send_add_link(link,
link->smcibdev->mac[link->ibport - 1],
link->gid, SMC_LLC_REQ);
if (rc < 0)
return SMC_CLC_DECL_TIMEOUT_AL;
/* receive ADD LINK response from client over the RoCE fabric */
rest = wait_for_completion_interruptible_timeout(&link->llc_add_resp,
SMC_LLC_WAIT_TIME);
if (rest <= 0) {
struct smc_clc_msg_decline dclc;
rc = smc_clc_wait_msg(smc, &dclc, sizeof(dclc),
SMC_CLC_DECLINE, CLC_WAIT_TIME_SHORT);
return rc == -EAGAIN ? SMC_CLC_DECL_TIMEOUT_AL : rc;
}
/* confirm_rkey is implicit on 1st contact */
smc->conn.rmb_desc->is_conf_rkey = true;
smc_llc_link_active(link);
/* initial contact - try to establish second link */
/* tbd: call smc_llc_srv_add_link(link); */
return 0;
}
......@@ -1240,7 +1250,9 @@ static int smc_listen_rdma_finish(struct smc_sock *new_smc,
goto decline;
}
/* QP confirmation over RoCE fabric */
smc_llc_flow_initiate(link->lgr, SMC_LLC_FLOW_ADD_LINK);
reason_code = smcr_serv_conf_first_link(new_smc);
smc_llc_flow_stop(link->lgr, &link->lgr->llc_flow_lcl);
if (reason_code)
goto decline;
}
......
......@@ -45,6 +45,7 @@
#define SMC_CLC_DECL_GETVLANERR 0x03080000 /* err to get vlan id of ip device*/
#define SMC_CLC_DECL_ISMVLANERR 0x03090000 /* err to reg vlan id on ism dev */
#define SMC_CLC_DECL_NOACTLINK 0x030a0000 /* no active smc-r link in lgr */
#define SMC_CLC_DECL_NOSRVLINK 0x030b0000 /* SMC-R link from srv not found */
#define SMC_CLC_DECL_SYNCERR 0x04000000 /* synchronization error */
#define SMC_CLC_DECL_PEERDECL 0x05000000 /* peer declined during handshake */
#define SMC_CLC_DECL_INTERR 0x09990000 /* internal error */
......
......@@ -200,7 +200,6 @@ static int smcr_link_send_delete(struct smc_link *lnk, bool orderly)
{
if (lnk->state == SMC_LNK_ACTIVE &&
!smc_llc_send_delete_link(lnk, SMC_LLC_REQ, orderly)) {
smc_llc_link_deleting(lnk);
return 0;
}
return -ENOTCONN;
......@@ -263,6 +262,7 @@ static void smc_lgr_free_work(struct work_struct *work)
if (smc_link_usable(lnk))
lnk->state = SMC_LNK_INACTIVE;
}
wake_up_interruptible_all(&lgr->llc_waiter);
}
smc_lgr_free(lgr);
}
......@@ -445,13 +445,11 @@ static int smc_lgr_create(struct smc_sock *smc, struct smc_init_info *ini)
}
static void smcr_buf_unuse(struct smc_buf_desc *rmb_desc,
struct smc_link *lnk)
struct smc_link_group *lgr)
{
struct smc_link_group *lgr = lnk->lgr;
if (rmb_desc->is_conf_rkey && !list_empty(&lgr->list)) {
/* unregister rmb with peer */
smc_llc_do_delete_rkey(lnk, rmb_desc);
smc_llc_do_delete_rkey(lgr, rmb_desc);
rmb_desc->is_conf_rkey = false;
}
if (rmb_desc->is_reg_err) {
......@@ -474,7 +472,7 @@ static void smc_buf_unuse(struct smc_connection *conn,
if (conn->rmb_desc && lgr->is_smcd)
conn->rmb_desc->used = 0;
else if (conn->rmb_desc)
smcr_buf_unuse(conn->rmb_desc, conn->lnk);
smcr_buf_unuse(conn->rmb_desc, lgr);
}
/* remove a finished connection from its link group */
......@@ -696,6 +694,7 @@ static void smc_lgr_cleanup(struct smc_link_group *lgr)
if (smc_link_usable(lnk))
lnk->state = SMC_LNK_INACTIVE;
}
wake_up_interruptible_all(&lgr->llc_waiter);
}
}
......@@ -767,8 +766,7 @@ void smc_port_terminate(struct smc_ib_device *smcibdev, u8 ibport)
continue;
/* tbd - terminate only when no more links are active */
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (!smc_link_usable(&lgr->lnk[i]) ||
lgr->lnk[i].state == SMC_LNK_DELETING)
if (!smc_link_usable(&lgr->lnk[i]))
continue;
if (lgr->lnk[i].smcibdev == smcibdev &&
lgr->lnk[i].ibport == ibport) {
......@@ -1167,7 +1165,6 @@ static int smcr_buf_map_usable_links(struct smc_link_group *lgr,
if (!smc_link_usable(lnk))
continue;
if (smcr_buf_map_link(buf_desc, is_rmb, lnk)) {
smcr_buf_unuse(buf_desc, lnk);
rc = -ENOMEM;
goto out;
}
......@@ -1273,6 +1270,7 @@ static int __smc_buf_create(struct smc_sock *smc, bool is_smcd, bool is_rmb)
if (!is_smcd) {
if (smcr_buf_map_usable_links(lgr, buf_desc, is_rmb)) {
smcr_buf_unuse(buf_desc, lgr);
return -ENOMEM;
}
}
......@@ -1368,6 +1366,53 @@ static inline int smc_rmb_reserve_rtoken_idx(struct smc_link_group *lgr)
return -ENOSPC;
}
static int smc_rtoken_find_by_link(struct smc_link_group *lgr, int lnk_idx,
u32 rkey)
{
int i;
for (i = 0; i < SMC_RMBS_PER_LGR_MAX; i++) {
if (test_bit(i, lgr->rtokens_used_mask) &&
lgr->rtokens[i][lnk_idx].rkey == rkey)
return i;
}
return -ENOENT;
}
/* set rtoken for a new link to an existing rmb */
void smc_rtoken_set(struct smc_link_group *lgr, int link_idx, int link_idx_new,
__be32 nw_rkey_known, __be64 nw_vaddr, __be32 nw_rkey)
{
int rtok_idx;
rtok_idx = smc_rtoken_find_by_link(lgr, link_idx, ntohl(nw_rkey_known));
if (rtok_idx == -ENOENT)
return;
lgr->rtokens[rtok_idx][link_idx_new].rkey = ntohl(nw_rkey);
lgr->rtokens[rtok_idx][link_idx_new].dma_addr = be64_to_cpu(nw_vaddr);
}
/* set rtoken for a new link whose link_id is given */
void smc_rtoken_set2(struct smc_link_group *lgr, int rtok_idx, int link_id,
__be64 nw_vaddr, __be32 nw_rkey)
{
u64 dma_addr = be64_to_cpu(nw_vaddr);
u32 rkey = ntohl(nw_rkey);
bool found = false;
int link_idx;
for (link_idx = 0; link_idx < SMC_LINKS_PER_LGR_MAX; link_idx++) {
if (lgr->lnk[link_idx].link_id == link_id) {
found = true;
break;
}
}
if (!found)
return;
lgr->rtokens[rtok_idx][link_idx].rkey = rkey;
lgr->rtokens[rtok_idx][link_idx].dma_addr = dma_addr;
}
/* add a new rtoken from peer */
int smc_rtoken_add(struct smc_link *lnk, __be64 nw_vaddr, __be32 nw_rkey)
{
......
......@@ -36,7 +36,6 @@ enum smc_link_state { /* possible states of a link */
SMC_LNK_INACTIVE, /* link is inactive */
SMC_LNK_ACTIVATING, /* link is being activated */
SMC_LNK_ACTIVE, /* link is active */
SMC_LNK_DELETING, /* link is being deleted */
};
#define SMC_WR_BUF_SIZE 48 /* size of work request buffer */
......@@ -120,20 +119,9 @@ struct smc_link {
struct smc_link_group *lgr; /* parent link group */
enum smc_link_state state; /* state of link */
struct completion llc_confirm; /* wait for rx of conf link */
struct completion llc_confirm_resp; /* wait 4 rx of cnf lnk rsp */
int llc_confirm_rc; /* rc from confirm link msg */
int llc_confirm_resp_rc; /* rc from conf_resp msg */
struct completion llc_add; /* wait for rx of add link */
struct completion llc_add_resp; /* wait for rx of add link rsp*/
struct delayed_work llc_testlink_wrk; /* testlink worker */
struct completion llc_testlink_resp; /* wait for rx of testlink */
int llc_testlink_time; /* testlink interval */
struct completion llc_confirm_rkey_resp; /* w4 rx of cnf rkey */
int llc_confirm_rkey_resp_rc; /* rc from cnf rkey */
struct completion llc_delete_rkey_resp; /* w4 rx of del rkey */
int llc_delete_rkey_resp_rc; /* rc from del rkey */
struct mutex llc_delete_rkey_mutex; /* serialize usage */
};
/* For now we just allow one parallel link per link group. The SMC protocol
......@@ -197,6 +185,28 @@ struct smc_rtoken { /* address/key of remote RMB */
struct smcd_dev;
enum smc_lgr_type { /* redundancy state of lgr */
SMC_LGR_NONE, /* no active links, lgr to be deleted */
SMC_LGR_SINGLE, /* 1 active RNIC on each peer */
SMC_LGR_SYMMETRIC, /* 2 active RNICs on each peer */
SMC_LGR_ASYMMETRIC_PEER, /* local has 2, peer 1 active RNICs */
SMC_LGR_ASYMMETRIC_LOCAL, /* local has 1, peer 2 active RNICs */
};
enum smc_llc_flowtype {
SMC_LLC_FLOW_NONE = 0,
SMC_LLC_FLOW_ADD_LINK = 2,
SMC_LLC_FLOW_DEL_LINK = 4,
SMC_LLC_FLOW_RKEY = 6,
};
struct smc_llc_qentry;
struct smc_llc_flow {
enum smc_llc_flowtype type;
struct smc_llc_qentry *qentry;
};
struct smc_link_group {
struct list_head list;
struct rb_root conns_all; /* connection tree */
......@@ -232,12 +242,24 @@ struct smc_link_group {
DECLARE_BITMAP(rtokens_used_mask, SMC_RMBS_PER_LGR_MAX);
/* used rtoken elements */
u8 next_link_id;
enum smc_lgr_type type;
/* redundancy state */
struct list_head llc_event_q;
/* queue for llc events */
spinlock_t llc_event_q_lock;
/* protects llc_event_q */
struct work_struct llc_event_work;
/* llc event worker */
wait_queue_head_t llc_waiter;
/* w4 next llc event */
struct smc_llc_flow llc_flow_lcl;
/* llc local control field */
struct smc_llc_flow llc_flow_rmt;
/* llc remote control field */
struct smc_llc_qentry *delayed_event;
/* arrived when flow active */
spinlock_t llc_flow_lock;
/* protects llc flow */
int llc_testlink_time;
/* link keep alive time */
};
......@@ -329,6 +351,10 @@ int smc_rmb_rtoken_handling(struct smc_connection *conn, struct smc_link *link,
struct smc_clc_msg_accept_confirm *clc);
int smc_rtoken_add(struct smc_link *lnk, __be64 nw_vaddr, __be32 nw_rkey);
int smc_rtoken_delete(struct smc_link *lnk, __be32 nw_rkey);
void smc_rtoken_set(struct smc_link_group *lgr, int link_idx, int link_idx_new,
__be32 nw_rkey_known, __be64 nw_vaddr, __be32 nw_rkey);
void smc_rtoken_set2(struct smc_link_group *lgr, int rtok_idx, int link_id,
__be64 nw_vaddr, __be32 nw_rkey);
void smc_sndbuf_sync_sg_for_cpu(struct smc_connection *conn);
void smc_sndbuf_sync_sg_for_device(struct smc_connection *conn);
void smc_rmb_sync_sg_for_cpu(struct smc_connection *conn);
......
......@@ -98,13 +98,8 @@ struct smc_llc_msg_confirm_rkey { /* type 0x06 */
u8 reserved;
};
struct smc_llc_msg_confirm_rkey_cont { /* type 0x08 */
struct smc_llc_hdr hd;
u8 num_rkeys;
struct smc_rmb_rtoken rtoken[SMC_LLC_RKEYS_PER_MSG];
};
#define SMC_LLC_DEL_RKEY_MAX 8
#define SMC_LLC_FLAG_RKEY_RETRY 0x10
#define SMC_LLC_FLAG_RKEY_NEG 0x20
struct smc_llc_msg_delete_rkey { /* type 0x09 */
......@@ -122,7 +117,6 @@ union smc_llc_msg {
struct smc_llc_msg_del_link delete_link;
struct smc_llc_msg_confirm_rkey confirm_rkey;
struct smc_llc_msg_confirm_rkey_cont confirm_rkey_cont;
struct smc_llc_msg_delete_rkey delete_rkey;
struct smc_llc_msg_test_link test_link;
......@@ -140,6 +134,154 @@ struct smc_llc_qentry {
union smc_llc_msg msg;
};
struct smc_llc_qentry *smc_llc_flow_qentry_clr(struct smc_llc_flow *flow)
{
struct smc_llc_qentry *qentry = flow->qentry;
flow->qentry = NULL;
return qentry;
}
void smc_llc_flow_qentry_del(struct smc_llc_flow *flow)
{
struct smc_llc_qentry *qentry;
if (flow->qentry) {
qentry = flow->qentry;
flow->qentry = NULL;
kfree(qentry);
}
}
static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
flow->qentry = qentry;
}
/* try to start a new llc flow, initiated by an incoming llc msg */
static bool smc_llc_flow_start(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
struct smc_link_group *lgr = qentry->link->lgr;
spin_lock_bh(&lgr->llc_flow_lock);
if (flow->type) {
/* a flow is already active */
if ((qentry->msg.raw.hdr.common.type == SMC_LLC_ADD_LINK ||
qentry->msg.raw.hdr.common.type == SMC_LLC_DELETE_LINK) &&
!lgr->delayed_event) {
lgr->delayed_event = qentry;
} else {
/* forget this llc request */
kfree(qentry);
}
spin_unlock_bh(&lgr->llc_flow_lock);
return false;
}
switch (qentry->msg.raw.hdr.common.type) {
case SMC_LLC_ADD_LINK:
flow->type = SMC_LLC_FLOW_ADD_LINK;
break;
case SMC_LLC_DELETE_LINK:
flow->type = SMC_LLC_FLOW_DEL_LINK;
break;
case SMC_LLC_CONFIRM_RKEY:
case SMC_LLC_DELETE_RKEY:
flow->type = SMC_LLC_FLOW_RKEY;
break;
default:
flow->type = SMC_LLC_FLOW_NONE;
}
if (qentry == lgr->delayed_event)
lgr->delayed_event = NULL;
spin_unlock_bh(&lgr->llc_flow_lock);
smc_llc_flow_qentry_set(flow, qentry);
return true;
}
/* start a new local llc flow, wait till current flow finished */
int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type)
{
enum smc_llc_flowtype allowed_remote = SMC_LLC_FLOW_NONE;
int rc;
/* all flows except confirm_rkey and delete_rkey are exclusive,
* confirm/delete rkey flows can run concurrently (local and remote)
*/
if (type == SMC_LLC_FLOW_RKEY)
allowed_remote = SMC_LLC_FLOW_RKEY;
again:
if (list_empty(&lgr->list))
return -ENODEV;
spin_lock_bh(&lgr->llc_flow_lock);
if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote)) {
lgr->llc_flow_lcl.type = type;
spin_unlock_bh(&lgr->llc_flow_lock);
return 0;
}
spin_unlock_bh(&lgr->llc_flow_lock);
rc = wait_event_interruptible_timeout(lgr->llc_waiter,
(lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
(lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
lgr->llc_flow_rmt.type == allowed_remote)),
SMC_LLC_WAIT_TIME);
if (!rc)
return -ETIMEDOUT;
goto again;
}
/* finish the current llc flow */
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow)
{
spin_lock_bh(&lgr->llc_flow_lock);
memset(flow, 0, sizeof(*flow));
flow->type = SMC_LLC_FLOW_NONE;
spin_unlock_bh(&lgr->llc_flow_lock);
if (!list_empty(&lgr->list) && lgr->delayed_event &&
flow == &lgr->llc_flow_lcl)
schedule_work(&lgr->llc_event_work);
else
wake_up_interruptible(&lgr->llc_waiter);
}
/* lnk is optional and used for early wakeup when link goes down, useful in
* cases where we wait for a response on the link after we sent a request
*/
struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
struct smc_link *lnk,
int time_out, u8 exp_msg)
{
struct smc_llc_flow *flow = &lgr->llc_flow_lcl;
wait_event_interruptible_timeout(lgr->llc_waiter,
(flow->qentry ||
(lnk && !smc_link_usable(lnk)) ||
list_empty(&lgr->list)),
time_out);
if (!flow->qentry ||
(lnk && !smc_link_usable(lnk)) || list_empty(&lgr->list)) {
smc_llc_flow_qentry_del(flow);
goto out;
}
if (exp_msg && flow->qentry->msg.raw.hdr.common.type != exp_msg) {
if (exp_msg == SMC_LLC_ADD_LINK &&
flow->qentry->msg.raw.hdr.common.type ==
SMC_LLC_DELETE_LINK) {
/* flow_start will delay the unexpected msg */
smc_llc_flow_start(&lgr->llc_flow_lcl,
smc_llc_flow_qentry_clr(flow));
return NULL;
}
smc_llc_flow_qentry_del(flow);
}
out:
return flow->qentry;
}
/********************************** send *************************************/
struct smc_llc_tx_pend {
......@@ -221,27 +363,44 @@ int smc_llc_send_confirm_link(struct smc_link *link,
}
/* send LLC confirm rkey request */
static int smc_llc_send_confirm_rkey(struct smc_link *link,
static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
struct smc_buf_desc *rmb_desc)
{
struct smc_llc_msg_confirm_rkey *rkeyllc;
struct smc_wr_tx_pend_priv *pend;
struct smc_wr_buf *wr_buf;
int rc;
struct smc_link *link;
int i, rc, rtok_ix;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
rc = smc_llc_add_pending_send(send_link, &wr_buf, &pend);
if (rc)
return rc;
rkeyllc = (struct smc_llc_msg_confirm_rkey *)wr_buf;
memset(rkeyllc, 0, sizeof(*rkeyllc));
rkeyllc->hd.common.type = SMC_LLC_CONFIRM_RKEY;
rkeyllc->hd.length = sizeof(struct smc_llc_msg_confirm_rkey);
rtok_ix = 1;
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
link = &send_link->lgr->lnk[i];
if (link->state == SMC_LNK_ACTIVE && link != send_link) {
rkeyllc->rtoken[rtok_ix].link_id = link->link_id;
rkeyllc->rtoken[rtok_ix].rmb_key =
htonl(rmb_desc->mr_rx[link->link_idx]->rkey);
rkeyllc->rtoken[rtok_ix].rmb_vaddr = cpu_to_be64(
(u64)sg_dma_address(
rmb_desc->sgt[link->link_idx].sgl));
rtok_ix++;
}
}
/* rkey of send_link is in rtoken[0] */
rkeyllc->rtoken[0].num_rkeys = rtok_ix - 1;
rkeyllc->rtoken[0].rmb_key =
htonl(rmb_desc->mr_rx[link->link_idx]->rkey);
htonl(rmb_desc->mr_rx[send_link->link_idx]->rkey);
rkeyllc->rtoken[0].rmb_vaddr = cpu_to_be64(
(u64)sg_dma_address(rmb_desc->sgt[link->link_idx].sgl));
(u64)sg_dma_address(rmb_desc->sgt[send_link->link_idx].sgl));
/* send llc message */
rc = smc_wr_tx_send(link, pend);
rc = smc_wr_tx_send(send_link, pend);
return rc;
}
......@@ -380,54 +539,12 @@ static int smc_llc_send_message(struct smc_link *link, void *llcbuf)
/********************************* receive ***********************************/
static void smc_llc_rx_confirm_link(struct smc_link *link,
struct smc_llc_msg_confirm_link *llc)
{
struct smc_link_group *lgr = smc_get_lgr(link);
int conf_rc = 0;
/* RMBE eyecatchers are not supported */
if (!(llc->hd.flags & SMC_LLC_FLAG_NO_RMBE_EYEC))
conf_rc = ENOTSUPP;
if (lgr->role == SMC_CLNT &&
link->state == SMC_LNK_ACTIVATING) {
link->llc_confirm_rc = conf_rc;
link->link_id = llc->link_num;
complete(&link->llc_confirm);
}
}
static void smc_llc_rx_add_link(struct smc_link *link,
struct smc_llc_msg_add_link *llc)
{
struct smc_link_group *lgr = smc_get_lgr(link);
if (link->state == SMC_LNK_ACTIVATING) {
complete(&link->llc_add);
return;
}
if (lgr->role == SMC_SERV) {
smc_llc_prep_add_link(llc, link,
link->smcibdev->mac[link->ibport - 1],
link->gid, SMC_LLC_REQ);
} else {
smc_llc_prep_add_link(llc, link,
link->smcibdev->mac[link->ibport - 1],
link->gid, SMC_LLC_RESP);
}
smc_llc_send_message(link, llc);
}
static void smc_llc_rx_delete_link(struct smc_link *link,
struct smc_llc_msg_del_link *llc)
{
struct smc_link_group *lgr = smc_get_lgr(link);
smc_lgr_forget(lgr);
smc_llc_link_deleting(link);
if (lgr->role == SMC_SERV) {
/* client asks to delete this link, send request */
smc_llc_prep_delete_link(llc, link, SMC_LLC_REQ, true);
......@@ -439,57 +556,68 @@ static void smc_llc_rx_delete_link(struct smc_link *link,
smc_lgr_terminate_sched(lgr);
}
static void smc_llc_rx_test_link(struct smc_link *link,
struct smc_llc_msg_test_link *llc)
{
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc);
}
static void smc_llc_rx_confirm_rkey(struct smc_link *link,
struct smc_llc_msg_confirm_rkey *llc)
/* process a confirm_rkey request from peer, remote flow */
static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr)
{
int rc;
rc = smc_rtoken_add(link,
llc->rtoken[0].rmb_vaddr,
llc->rtoken[0].rmb_key);
/* ignore rtokens for other links, we have only one link */
llc->hd.flags |= SMC_LLC_FLAG_RESP;
if (rc < 0)
llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG;
smc_llc_send_message(link, llc);
}
static void smc_llc_rx_confirm_rkey_cont(struct smc_link *link,
struct smc_llc_msg_confirm_rkey_cont *llc)
{
/* ignore rtokens for other links, we have only one link */
struct smc_llc_msg_confirm_rkey *llc;
struct smc_llc_qentry *qentry;
struct smc_link *link;
int num_entries;
int rk_idx;
int i;
qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.confirm_rkey;
link = qentry->link;
num_entries = llc->rtoken[0].num_rkeys;
/* first rkey entry is for receiving link */
rk_idx = smc_rtoken_add(link,
llc->rtoken[0].rmb_vaddr,
llc->rtoken[0].rmb_key);
if (rk_idx < 0)
goto out_err;
for (i = 1; i <= min_t(u8, num_entries, SMC_LLC_RKEYS_PER_MSG - 1); i++)
smc_rtoken_set2(lgr, rk_idx, llc->rtoken[i].link_id,
llc->rtoken[i].rmb_vaddr,
llc->rtoken[i].rmb_key);
/* max links is 3 so there is no need to support conf_rkey_cont msgs */
goto out;
out_err:
llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG;
llc->hd.flags |= SMC_LLC_FLAG_RKEY_RETRY;
out:
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc);
smc_llc_send_message(link, &qentry->msg);
smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
}
static void smc_llc_rx_delete_rkey(struct smc_link *link,
struct smc_llc_msg_delete_rkey *llc)
/* process a delete_rkey request from peer, remote flow */
static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
{
struct smc_llc_msg_delete_rkey *llc;
struct smc_llc_qentry *qentry;
struct smc_link *link;
u8 err_mask = 0;
int i, max;
qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.delete_rkey;
link = qentry->link;
max = min_t(u8, llc->num_rkeys, SMC_LLC_DEL_RKEY_MAX);
for (i = 0; i < max; i++) {
if (smc_rtoken_delete(link, llc->rkey[i]))
err_mask |= 1 << (SMC_LLC_DEL_RKEY_MAX - 1 - i);
}
if (err_mask) {
llc->hd.flags |= SMC_LLC_FLAG_RKEY_NEG;
llc->err_mask = err_mask;
}
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc);
smc_llc_send_message(link, &qentry->msg);
smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
}
/* flush the llc event queue */
......@@ -509,32 +637,66 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
{
union smc_llc_msg *llc = &qentry->msg;
struct smc_link *link = qentry->link;
struct smc_link_group *lgr = link->lgr;
if (!smc_link_usable(link))
goto out;
switch (llc->raw.hdr.common.type) {
case SMC_LLC_TEST_LINK:
smc_llc_rx_test_link(link, &llc->test_link);
break;
case SMC_LLC_CONFIRM_LINK:
smc_llc_rx_confirm_link(link, &llc->confirm_link);
llc->test_link.hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, llc);
break;
case SMC_LLC_ADD_LINK:
smc_llc_rx_add_link(link, &llc->add_link);
if (list_empty(&lgr->list))
goto out; /* lgr is terminating */
if (lgr->role == SMC_CLNT) {
if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_ADD_LINK) {
/* a flow is waiting for this message */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl,
qentry);
wake_up_interruptible(&lgr->llc_waiter);
} else if (smc_llc_flow_start(&lgr->llc_flow_lcl,
qentry)) {
/* tbd: schedule_work(&lgr->llc_add_link_work); */
}
} else if (smc_llc_flow_start(&lgr->llc_flow_lcl, qentry)) {
/* as smc server, handle client suggestion */
/* tbd: schedule_work(&lgr->llc_add_link_work); */
}
return;
case SMC_LLC_CONFIRM_LINK:
if (lgr->llc_flow_lcl.type != SMC_LLC_FLOW_NONE) {
/* a flow is waiting for this message */
smc_llc_flow_qentry_set(&lgr->llc_flow_lcl, qentry);
wake_up_interruptible(&lgr->llc_waiter);
return;
}
break;
case SMC_LLC_DELETE_LINK:
smc_llc_rx_delete_link(link, &llc->delete_link);
break;
case SMC_LLC_CONFIRM_RKEY:
smc_llc_rx_confirm_rkey(link, &llc->confirm_rkey);
break;
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
smc_llc_rmt_conf_rkey(lgr);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
case SMC_LLC_CONFIRM_RKEY_CONT:
smc_llc_rx_confirm_rkey_cont(link, &llc->confirm_rkey_cont);
/* not used because max links is 3, and 3 rkeys fit into
* one CONFIRM_RKEY message
*/
break;
case SMC_LLC_DELETE_RKEY:
smc_llc_rx_delete_rkey(link, &llc->delete_rkey);
break;
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
smc_llc_rmt_delete_rkey(lgr);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
}
out:
kfree(qentry);
......@@ -547,6 +709,16 @@ static void smc_llc_event_work(struct work_struct *work)
llc_event_work);
struct smc_llc_qentry *qentry;
if (!lgr->llc_flow_lcl.type && lgr->delayed_event) {
if (smc_link_usable(lgr->delayed_event->link)) {
smc_llc_event_handler(lgr->delayed_event);
} else {
qentry = lgr->delayed_event;
lgr->delayed_event = NULL;
kfree(qentry);
}
}
again:
spin_lock_bh(&lgr->llc_event_q_lock);
if (!list_empty(&lgr->llc_event_q)) {
......@@ -561,80 +733,75 @@ static void smc_llc_event_work(struct work_struct *work)
}
/* process llc responses in tasklet context */
static void smc_llc_rx_response(struct smc_link *link, union smc_llc_msg *llc)
static void smc_llc_rx_response(struct smc_link *link,
struct smc_llc_qentry *qentry)
{
int rc = 0;
u8 llc_type = qentry->msg.raw.hdr.common.type;
switch (llc->raw.hdr.common.type) {
switch (llc_type) {
case SMC_LLC_TEST_LINK:
if (link->state == SMC_LNK_ACTIVE)
complete(&link->llc_testlink_resp);
break;
case SMC_LLC_CONFIRM_LINK:
if (!(llc->raw.hdr.flags & SMC_LLC_FLAG_NO_RMBE_EYEC))
rc = ENOTSUPP;
if (link->lgr->role == SMC_SERV &&
link->state == SMC_LNK_ACTIVATING) {
link->llc_confirm_resp_rc = rc;
complete(&link->llc_confirm_resp);
}
break;
case SMC_LLC_ADD_LINK:
if (link->state == SMC_LNK_ACTIVATING)
complete(&link->llc_add_resp);
break;
case SMC_LLC_CONFIRM_LINK:
case SMC_LLC_CONFIRM_RKEY:
case SMC_LLC_DELETE_RKEY:
/* assign responses to the local flow, we requested them */
smc_llc_flow_qentry_set(&link->lgr->llc_flow_lcl, qentry);
wake_up_interruptible(&link->lgr->llc_waiter);
return;
case SMC_LLC_DELETE_LINK:
if (link->lgr->role == SMC_SERV)
smc_lgr_schedule_free_work_fast(link->lgr);
break;
case SMC_LLC_CONFIRM_RKEY:
link->llc_confirm_rkey_resp_rc = llc->raw.hdr.flags &
SMC_LLC_FLAG_RKEY_NEG;
complete(&link->llc_confirm_rkey_resp);
break;
case SMC_LLC_CONFIRM_RKEY_CONT:
/* unused as long as we don't send this type of msg */
break;
case SMC_LLC_DELETE_RKEY:
link->llc_delete_rkey_resp_rc = llc->raw.hdr.flags &
SMC_LLC_FLAG_RKEY_NEG;
complete(&link->llc_delete_rkey_resp);
/* not used because max links is 3 */
break;
}
kfree(qentry);
}
/* copy received msg and add it to the event queue */
static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
static void smc_llc_enqueue(struct smc_link *link, union smc_llc_msg *llc)
{
struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
struct smc_link_group *lgr = link->lgr;
struct smc_llc_qentry *qentry;
union smc_llc_msg *llc = buf;
unsigned long flags;
if (wc->byte_len < sizeof(*llc))
return; /* short message */
if (llc->raw.hdr.length != sizeof(*llc))
return; /* invalid message */
/* process responses immediately */
if (llc->raw.hdr.flags & SMC_LLC_FLAG_RESP) {
smc_llc_rx_response(link, llc);
return;
}
qentry = kmalloc(sizeof(*qentry), GFP_ATOMIC);
if (!qentry)
return;
qentry->link = link;
INIT_LIST_HEAD(&qentry->list);
memcpy(&qentry->msg, llc, sizeof(union smc_llc_msg));
/* process responses immediately */
if (llc->raw.hdr.flags & SMC_LLC_FLAG_RESP) {
smc_llc_rx_response(link, qentry);
return;
}
/* add requests to event queue */
spin_lock_irqsave(&lgr->llc_event_q_lock, flags);
list_add_tail(&qentry->list, &lgr->llc_event_q);
spin_unlock_irqrestore(&lgr->llc_event_q_lock, flags);
schedule_work(&link->lgr->llc_event_work);
}
/* copy received msg and add it to the event queue */
static void smc_llc_rx_handler(struct ib_wc *wc, void *buf)
{
struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
union smc_llc_msg *llc = buf;
if (wc->byte_len < sizeof(*llc))
return; /* short message */
if (llc->raw.hdr.length != sizeof(*llc))
return; /* invalid message */
smc_llc_enqueue(link, llc);
}
/***************************** worker, utils *********************************/
static void smc_llc_testlink_work(struct work_struct *work)
......@@ -676,6 +843,8 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
INIT_WORK(&lgr->llc_event_work, smc_llc_event_work);
INIT_LIST_HEAD(&lgr->llc_event_q);
spin_lock_init(&lgr->llc_event_q_lock);
spin_lock_init(&lgr->llc_flow_lock);
init_waitqueue_head(&lgr->llc_waiter);
lgr->llc_testlink_time = net->ipv4.sysctl_tcp_keepalive_time;
}
......@@ -683,18 +852,16 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc)
void smc_llc_lgr_clear(struct smc_link_group *lgr)
{
smc_llc_event_flush(lgr);
wake_up_interruptible_all(&lgr->llc_waiter);
cancel_work_sync(&lgr->llc_event_work);
if (lgr->delayed_event) {
kfree(lgr->delayed_event);
lgr->delayed_event = NULL;
}
}
int smc_llc_link_init(struct smc_link *link)
{
init_completion(&link->llc_confirm);
init_completion(&link->llc_confirm_resp);
init_completion(&link->llc_add);
init_completion(&link->llc_add_resp);
init_completion(&link->llc_confirm_rkey_resp);
init_completion(&link->llc_delete_rkey_resp);
mutex_init(&link->llc_delete_rkey_mutex);
init_completion(&link->llc_testlink_resp);
INIT_DELAYED_WORK(&link->llc_testlink_wrk, smc_llc_testlink_work);
return 0;
......@@ -710,12 +877,6 @@ void smc_llc_link_active(struct smc_link *link)
}
}
void smc_llc_link_deleting(struct smc_link *link)
{
link->state = SMC_LNK_DELETING;
smc_wr_wakeup_tx_wait(link);
}
/* called in worker context */
void smc_llc_link_clear(struct smc_link *link)
{
......@@ -725,50 +886,74 @@ void smc_llc_link_clear(struct smc_link *link)
smc_wr_wakeup_tx_wait(link);
}
/* register a new rtoken at the remote peer */
int smc_llc_do_confirm_rkey(struct smc_link *link,
/* register a new rtoken at the remote peer (for all links) */
int smc_llc_do_confirm_rkey(struct smc_link *send_link,
struct smc_buf_desc *rmb_desc)
{
int rc;
struct smc_link_group *lgr = send_link->lgr;
struct smc_llc_qentry *qentry = NULL;
int rc = 0;
/* protected by mutex smc_create_lgr_pending */
reinit_completion(&link->llc_confirm_rkey_resp);
rc = smc_llc_send_confirm_rkey(link, rmb_desc);
rc = smc_llc_flow_initiate(lgr, SMC_LLC_FLOW_RKEY);
if (rc)
return rc;
rc = smc_llc_send_confirm_rkey(send_link, rmb_desc);
if (rc)
goto out;
/* receive CONFIRM RKEY response from server over RoCE fabric */
rc = wait_for_completion_interruptible_timeout(
&link->llc_confirm_rkey_resp, SMC_LLC_WAIT_TIME);
if (rc <= 0 || link->llc_confirm_rkey_resp_rc)
return -EFAULT;
return 0;
qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
SMC_LLC_CONFIRM_RKEY);
if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
rc = -EFAULT;
out:
if (qentry)
smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
smc_llc_flow_stop(lgr, &lgr->llc_flow_lcl);
return rc;
}
/* unregister an rtoken at the remote peer */
int smc_llc_do_delete_rkey(struct smc_link *link,
int smc_llc_do_delete_rkey(struct smc_link_group *lgr,
struct smc_buf_desc *rmb_desc)
{
struct smc_llc_qentry *qentry = NULL;
struct smc_link *send_link;
int rc = 0;
mutex_lock(&link->llc_delete_rkey_mutex);
if (link->state != SMC_LNK_ACTIVE)
goto out;
reinit_completion(&link->llc_delete_rkey_resp);
rc = smc_llc_send_delete_rkey(link, rmb_desc);
send_link = smc_llc_usable_link(lgr);
if (!send_link)
return -ENOLINK;
rc = smc_llc_flow_initiate(lgr, SMC_LLC_FLOW_RKEY);
if (rc)
return rc;
/* protected by llc_flow control */
rc = smc_llc_send_delete_rkey(send_link, rmb_desc);
if (rc)
goto out;
/* receive DELETE RKEY response from server over RoCE fabric */
rc = wait_for_completion_interruptible_timeout(
&link->llc_delete_rkey_resp, SMC_LLC_WAIT_TIME);
if (rc <= 0 || link->llc_delete_rkey_resp_rc)
qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
SMC_LLC_DELETE_RKEY);
if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
rc = -EFAULT;
else
rc = 0;
out:
mutex_unlock(&link->llc_delete_rkey_mutex);
if (qentry)
smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
smc_llc_flow_stop(lgr, &lgr->llc_flow_lcl);
return rc;
}
/* evaluate confirm link request or response */
int smc_llc_eval_conf_link(struct smc_llc_qentry *qentry,
enum smc_llc_reqresp type)
{
if (type == SMC_LLC_REQ) /* SMC server assigns link_id */
qentry->link->link_id = qentry->msg.confirm_link.link_num;
if (!(qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_NO_RMBE_EYEC))
return -ENOTSUPP;
return 0;
}
/***************************** init, exit, misc ******************************/
static struct smc_wr_rx_handler smc_llc_rx_handlers[] = {
......
......@@ -57,12 +57,21 @@ void smc_llc_lgr_init(struct smc_link_group *lgr, struct smc_sock *smc);
void smc_llc_lgr_clear(struct smc_link_group *lgr);
int smc_llc_link_init(struct smc_link *link);
void smc_llc_link_active(struct smc_link *link);
void smc_llc_link_deleting(struct smc_link *link);
void smc_llc_link_clear(struct smc_link *link);
int smc_llc_do_confirm_rkey(struct smc_link *link,
int smc_llc_do_confirm_rkey(struct smc_link *send_link,
struct smc_buf_desc *rmb_desc);
int smc_llc_do_delete_rkey(struct smc_link *link,
int smc_llc_do_delete_rkey(struct smc_link_group *lgr,
struct smc_buf_desc *rmb_desc);
int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type);
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow);
int smc_llc_eval_conf_link(struct smc_llc_qentry *qentry,
enum smc_llc_reqresp type);
struct smc_llc_qentry *smc_llc_wait(struct smc_link_group *lgr,
struct smc_link *lnk,
int time_out, u8 exp_msg);
struct smc_llc_qentry *smc_llc_flow_qentry_clr(struct smc_llc_flow *flow);
void smc_llc_flow_qentry_del(struct smc_llc_flow *flow);
int smc_llc_init(void) __init;
#endif /* SMC_LLC_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment