Commit 00748209 authored by David S. Miller's avatar David S. Miller
parents 1fc19aff 972a77fb
......@@ -101,7 +101,7 @@ static inline unsigned int tipc_node(__u32 addr)
* Limiting values for messages
*/
#define TIPC_MAX_USER_MSG_SIZE 66000
#define TIPC_MAX_USER_MSG_SIZE 66000U
/*
* Message importance levels
......
......@@ -37,14 +37,17 @@
#ifndef _TIPC_ADDR_H
#define _TIPC_ADDR_H
#define TIPC_ZONE_MASK 0xff000000u
#define TIPC_CLUSTER_MASK 0xfffff000u
static inline u32 tipc_zone_mask(u32 addr)
{
return addr & 0xff000000u;
return addr & TIPC_ZONE_MASK;
}
static inline u32 tipc_cluster_mask(u32 addr)
{
return addr & 0xfffff000u;
return addr & TIPC_CLUSTER_MASK;
}
static inline int in_own_cluster(u32 addr)
......
......@@ -44,13 +44,6 @@
#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */
/*
* Loss rate for incoming broadcast frames; used to test retransmission code.
* Set to N to cause every N'th frame to be discarded; 0 => don't discard any.
*/
#define TIPC_BCAST_LOSS_RATE 0
/**
* struct bcbearer_pair - a pair of bearers used by broadcast link
* @primary: pointer to primary bearer
......@@ -414,9 +407,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
spin_lock_bh(&bc_lock);
res = tipc_link_send_buf(bcl, buf);
if (unlikely(res == -ELINKCONG))
buf_discard(buf);
else
if (likely(res > 0))
bclink_set_last_sent();
bcl->stats.queue_sz_counts++;
......@@ -434,9 +425,6 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
void tipc_bclink_recv_pkt(struct sk_buff *buf)
{
#if (TIPC_BCAST_LOSS_RATE)
static int rx_count;
#endif
struct tipc_msg *msg = buf_msg(buf);
struct tipc_node *node = tipc_node_find(msg_prevnode(msg));
u32 next_in;
......@@ -470,14 +458,6 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
return;
}
#if (TIPC_BCAST_LOSS_RATE)
if (++rx_count == TIPC_BCAST_LOSS_RATE) {
rx_count = 0;
buf_discard(buf);
return;
}
#endif
tipc_node_lock(node);
receive:
deferred = node->bclink.deferred_head;
......
......@@ -46,6 +46,8 @@ static u32 media_count;
struct tipc_bearer tipc_bearers[MAX_BEARERS];
static void bearer_disable(struct tipc_bearer *b_ptr);
/**
* media_name_valid - validate media name
*
......@@ -342,15 +344,15 @@ struct sk_buff *tipc_bearer_get_names(void)
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest)
{
tipc_nmap_add(&b_ptr->nodes, dest);
tipc_disc_update_link_req(b_ptr->link_req);
tipc_bcbearer_sort();
tipc_disc_add_dest(b_ptr->link_req);
}
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
{
tipc_nmap_remove(&b_ptr->nodes, dest);
tipc_disc_update_link_req(b_ptr->link_req);
tipc_bcbearer_sort();
tipc_disc_remove_dest(b_ptr->link_req);
}
/*
......@@ -493,8 +495,15 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
warn("Bearer <%s> rejected, illegal name\n", name);
return -EINVAL;
}
if (!tipc_addr_domain_valid(disc_domain) ||
!tipc_in_scope(disc_domain, tipc_own_addr)) {
if (tipc_addr_domain_valid(disc_domain) &&
(disc_domain != tipc_own_addr)) {
if (tipc_in_scope(disc_domain, tipc_own_addr)) {
disc_domain = tipc_own_addr & TIPC_CLUSTER_MASK;
res = 0; /* accept any node in own cluster */
} else if (in_own_cluster(disc_domain))
res = 0; /* accept specified node in own cluster */
}
if (res) {
warn("Bearer <%s> rejected, illegal discovery domain\n", name);
return -EINVAL;
}
......@@ -511,7 +520,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
if (!m_ptr) {
warn("Bearer <%s> rejected, media <%s> not registered\n", name,
b_name.media_name);
goto failed;
goto exit;
}
if (priority == TIPC_MEDIA_LINK_PRI)
......@@ -527,14 +536,14 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
}
if (!strcmp(name, tipc_bearers[i].name)) {
warn("Bearer <%s> rejected, already enabled\n", name);
goto failed;
goto exit;
}
if ((tipc_bearers[i].priority == priority) &&
(++with_this_prio > 2)) {
if (priority-- == 0) {
warn("Bearer <%s> rejected, duplicate priority\n",
name);
goto failed;
goto exit;
}
warn("Bearer <%s> priority adjustment required %u->%u\n",
name, priority + 1, priority);
......@@ -544,7 +553,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
if (bearer_id >= MAX_BEARERS) {
warn("Bearer <%s> rejected, bearer limit reached (%u)\n",
name, MAX_BEARERS);
goto failed;
goto exit;
}
b_ptr = &tipc_bearers[bearer_id];
......@@ -552,7 +561,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
res = m_ptr->enable_bearer(b_ptr);
if (res) {
warn("Bearer <%s> rejected, enable failure (%d)\n", name, -res);
goto failed;
goto exit;
}
b_ptr->identity = bearer_id;
......@@ -562,14 +571,18 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
b_ptr->priority = priority;
INIT_LIST_HEAD(&b_ptr->cong_links);
INIT_LIST_HEAD(&b_ptr->links);
b_ptr->link_req = tipc_disc_init_link_req(b_ptr, &m_ptr->bcast_addr,
disc_domain);
spin_lock_init(&b_ptr->lock);
write_unlock_bh(&tipc_net_lock);
res = tipc_disc_create(b_ptr, &m_ptr->bcast_addr, disc_domain);
if (res) {
bearer_disable(b_ptr);
warn("Bearer <%s> rejected, discovery object creation failed\n",
name);
goto exit;
}
info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
name, tipc_addr_string_fill(addr_string, disc_domain), priority);
return 0;
failed:
exit:
write_unlock_bh(&tipc_net_lock);
return res;
}
......@@ -620,14 +633,14 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
struct link *temp_l_ptr;
info("Disabling bearer <%s>\n", b_ptr->name);
tipc_disc_stop_link_req(b_ptr->link_req);
spin_lock_bh(&b_ptr->lock);
b_ptr->link_req = NULL;
b_ptr->blocked = 1;
b_ptr->media->disable_bearer(b_ptr);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
if (b_ptr->link_req)
tipc_disc_delete(b_ptr->link_req);
spin_unlock_bh(&b_ptr->lock);
memset(b_ptr, 0, sizeof(struct tipc_bearer));
}
......
......@@ -179,8 +179,7 @@ static int __init tipc_init(void)
if (tipc_log_resize(CONFIG_TIPC_LOG) != 0)
warn("Unable to create log buffer\n");
info("Activated (version " TIPC_MOD_VER
" compiled " __DATE__ " " __TIME__ ")\n");
info("Activated (version " TIPC_MOD_VER ")\n");
tipc_own_addr = 0;
tipc_remote_management = 1;
......
......@@ -39,19 +39,17 @@
#include "discover.h"
#define TIPC_LINK_REQ_INIT 125 /* min delay during bearer start up */
#define TIPC_LINK_REQ_FAST 2000 /* normal delay if bearer has no links */
#define TIPC_LINK_REQ_SLOW 600000 /* normal delay if bearer has links */
/*
* TODO: Most of the inter-cluster setup stuff should be
* rewritten, and be made conformant with specification.
*/
#define TIPC_LINK_REQ_FAST 1000 /* max delay if bearer has no links */
#define TIPC_LINK_REQ_SLOW 60000 /* max delay if bearer has links */
#define TIPC_LINK_REQ_INACTIVE 0xffffffff /* indicates no timer in use */
/**
* struct link_req - information about an ongoing link setup request
* @bearer: bearer issuing requests
* @dest: destination address for request messages
* @domain: network domain to which links can be established
* @num_nodes: number of nodes currently discovered (i.e. with an active link)
* @buf: request message to be (repeatedly) sent
* @timer: timer governing period between requests
* @timer_intv: current interval between requests (in ms)
......@@ -59,6 +57,8 @@
struct link_req {
struct tipc_bearer *bearer;
struct tipc_media_addr dest;
u32 domain;
int num_nodes;
struct sk_buff *buf;
struct timer_list timer;
unsigned int timer_intv;
......@@ -147,7 +147,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
}
if (!tipc_in_scope(dest, tipc_own_addr))
return;
if (!in_own_cluster(orig))
if (!tipc_in_scope(b_ptr->link_req->domain, orig))
return;
/* Locate structure corresponding to requesting node */
......@@ -214,44 +214,54 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
}
/**
* tipc_disc_stop_link_req - stop sending periodic link setup requests
* disc_update - update frequency of periodic link setup requests
* @req: ptr to link request structure
*
* Reinitiates discovery process if discovery object has no associated nodes
* and is either not currently searching or is searching at a slow rate
*/
void tipc_disc_stop_link_req(struct link_req *req)
static void disc_update(struct link_req *req)
{
if (!req)
return;
if (!req->num_nodes) {
if ((req->timer_intv == TIPC_LINK_REQ_INACTIVE) ||
(req->timer_intv > TIPC_LINK_REQ_FAST)) {
req->timer_intv = TIPC_LINK_REQ_INIT;
k_start_timer(&req->timer, req->timer_intv);
}
}
}
k_cancel_timer(&req->timer);
k_term_timer(&req->timer);
buf_discard(req->buf);
kfree(req);
/**
* tipc_disc_add_dest - increment set of discovered nodes
* @req: ptr to link request structure
*/
void tipc_disc_add_dest(struct link_req *req)
{
req->num_nodes++;
}
/**
* tipc_disc_update_link_req - update frequency of periodic link setup requests
* tipc_disc_remove_dest - decrement set of discovered nodes
* @req: ptr to link request structure
*/
void tipc_disc_update_link_req(struct link_req *req)
void tipc_disc_remove_dest(struct link_req *req)
{
if (!req)
return;
req->num_nodes--;
disc_update(req);
}
if (req->timer_intv == TIPC_LINK_REQ_SLOW) {
if (!req->bearer->nodes.count) {
req->timer_intv = TIPC_LINK_REQ_FAST;
k_start_timer(&req->timer, req->timer_intv);
}
} else if (req->timer_intv == TIPC_LINK_REQ_FAST) {
if (req->bearer->nodes.count) {
req->timer_intv = TIPC_LINK_REQ_SLOW;
k_start_timer(&req->timer, req->timer_intv);
}
} else {
/* leave timer "as is" if haven't yet reached a "normal" rate */
}
/**
* disc_send_msg - send link setup request message
* @req: ptr to link request structure
*/
static void disc_send_msg(struct link_req *req)
{
if (!req->bearer->blocked)
tipc_bearer_send(req->bearer, req->buf, &req->dest);
}
/**
......@@ -263,56 +273,86 @@ void tipc_disc_update_link_req(struct link_req *req)
static void disc_timeout(struct link_req *req)
{
int max_delay;
spin_lock_bh(&req->bearer->lock);
req->bearer->media->send_msg(req->buf, req->bearer, &req->dest);
/* Stop searching if only desired node has been found */
if ((req->timer_intv == TIPC_LINK_REQ_SLOW) ||
(req->timer_intv == TIPC_LINK_REQ_FAST)) {
/* leave timer interval "as is" if already at a "normal" rate */
} else {
req->timer_intv *= 2;
if (req->timer_intv > TIPC_LINK_REQ_FAST)
req->timer_intv = TIPC_LINK_REQ_FAST;
if ((req->timer_intv == TIPC_LINK_REQ_FAST) &&
(req->bearer->nodes.count))
req->timer_intv = TIPC_LINK_REQ_SLOW;
if (tipc_node(req->domain) && req->num_nodes) {
req->timer_intv = TIPC_LINK_REQ_INACTIVE;
goto exit;
}
k_start_timer(&req->timer, req->timer_intv);
/*
* Send discovery message, then update discovery timer
*
* Keep doubling time between requests until limit is reached;
* hold at fast polling rate if don't have any associated nodes,
* otherwise hold at slow polling rate
*/
disc_send_msg(req);
req->timer_intv *= 2;
if (req->num_nodes)
max_delay = TIPC_LINK_REQ_SLOW;
else
max_delay = TIPC_LINK_REQ_FAST;
if (req->timer_intv > max_delay)
req->timer_intv = max_delay;
k_start_timer(&req->timer, req->timer_intv);
exit:
spin_unlock_bh(&req->bearer->lock);
}
/**
* tipc_disc_init_link_req - start sending periodic link setup requests
* tipc_disc_create - create object to send periodic link setup requests
* @b_ptr: ptr to bearer issuing requests
* @dest: destination address for request messages
* @dest_domain: network domain of node(s) which should respond to message
* @dest_domain: network domain to which links can be established
*
* Returns pointer to link request structure, or NULL if unable to create.
* Returns 0 if successful, otherwise -errno.
*/
struct link_req *tipc_disc_init_link_req(struct tipc_bearer *b_ptr,
const struct tipc_media_addr *dest,
u32 dest_domain)
int tipc_disc_create(struct tipc_bearer *b_ptr,
struct tipc_media_addr *dest, u32 dest_domain)
{
struct link_req *req;
req = kmalloc(sizeof(*req), GFP_ATOMIC);
if (!req)
return NULL;
return -ENOMEM;
req->buf = tipc_disc_init_msg(DSC_REQ_MSG, dest_domain, b_ptr);
if (!req->buf) {
kfree(req);
return NULL;
return -ENOMSG;
}
memcpy(&req->dest, dest, sizeof(*dest));
req->bearer = b_ptr;
req->domain = dest_domain;
req->num_nodes = 0;
req->timer_intv = TIPC_LINK_REQ_INIT;
k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req);
k_start_timer(&req->timer, req->timer_intv);
return req;
b_ptr->link_req = req;
disc_send_msg(req);
return 0;
}
/**
* tipc_disc_delete - destroy object sending periodic link setup requests
* @req: ptr to link request structure
*/
void tipc_disc_delete(struct link_req *req)
{
k_cancel_timer(&req->timer);
k_term_timer(&req->timer);
buf_discard(req->buf);
kfree(req);
}
......@@ -39,12 +39,11 @@
struct link_req;
struct link_req *tipc_disc_init_link_req(struct tipc_bearer *b_ptr,
const struct tipc_media_addr *dest,
int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest,
u32 dest_domain);
void tipc_disc_update_link_req(struct link_req *req);
void tipc_disc_stop_link_req(struct link_req *req);
void tipc_disc_delete(struct link_req *req);
void tipc_disc_add_dest(struct link_req *req);
void tipc_disc_remove_dest(struct link_req *req);
void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr);
#endif
......@@ -92,7 +92,8 @@ static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
u32 num_sect, u32 destnode);
u32 num_sect, unsigned int total_len,
u32 destnode);
static void link_check_defragm_bufs(struct link *l_ptr);
static void link_state_event(struct link *l_ptr, u32 event);
static void link_reset_statistics(struct link *l_ptr);
......@@ -842,6 +843,25 @@ static void link_add_to_outqueue(struct link *l_ptr,
l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
}
static void link_add_chain_to_outqueue(struct link *l_ptr,
struct sk_buff *buf_chain,
u32 long_msgno)
{
struct sk_buff *buf;
struct tipc_msg *msg;
if (!l_ptr->next_out)
l_ptr->next_out = buf_chain;
while (buf_chain) {
buf = buf_chain;
buf_chain = buf_chain->next;
msg = buf_msg(buf);
msg_set_long_msgno(msg, long_msgno);
link_add_to_outqueue(l_ptr, buf, msg);
}
}
/*
* tipc_link_send_buf() is the 'full path' for messages, called from
* inside TIPC when the 'fast path' in tipc_send_buf
......@@ -864,8 +884,9 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
if (unlikely(queue_size >= queue_limit)) {
if (imp <= TIPC_CRITICAL_IMPORTANCE) {
return link_schedule_port(l_ptr, msg_origport(msg),
size);
link_schedule_port(l_ptr, msg_origport(msg), size);
buf_discard(buf);
return -ELINKCONG;
}
buf_discard(buf);
if (imp > CONN_MANAGER) {
......@@ -1042,6 +1063,7 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
const u32 num_sect,
unsigned int total_len,
u32 destaddr)
{
struct tipc_msg *hdr = &sender->phdr;
......@@ -1057,8 +1079,8 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
* (Must not hold any locks while building message.)
*/
res = tipc_msg_build(hdr, msg_sect, num_sect, sender->max_pkt,
!sender->user_port, &buf);
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len,
sender->max_pkt, !sender->user_port, &buf);
read_lock_bh(&tipc_net_lock);
node = tipc_node_find(destaddr);
......@@ -1069,8 +1091,6 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
if (likely(buf)) {
res = link_send_buf_fast(l_ptr, buf,
&sender->max_pkt);
if (unlikely(res < 0))
buf_discard(buf);
exit:
tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock);
......@@ -1105,7 +1125,8 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
goto again;
return link_send_sections_long(sender, msg_sect,
num_sect, destaddr);
num_sect, total_len,
destaddr);
}
tipc_node_unlock(node);
}
......@@ -1117,7 +1138,7 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
if (res >= 0)
return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
TIPC_ERR_NO_NODE);
total_len, TIPC_ERR_NO_NODE);
return res;
}
......@@ -1138,12 +1159,13 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
u32 num_sect,
unsigned int total_len,
u32 destaddr)
{
struct link *l_ptr;
struct tipc_node *node;
struct tipc_msg *hdr = &sender->phdr;
u32 dsz = msg_data_sz(hdr);
u32 dsz = total_len;
u32 max_pkt, fragm_sz, rest;
struct tipc_msg fragm_hdr;
struct sk_buff *buf, *buf_chain, *prev;
......@@ -1169,7 +1191,6 @@ static int link_send_sections_long(struct tipc_port *sender,
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, msg_destnode(hdr));
msg_set_link_selector(&fragm_hdr, sender->ref);
msg_set_size(&fragm_hdr, max_pkt);
msg_set_fragm_no(&fragm_hdr, 1);
......@@ -1271,28 +1292,15 @@ static int link_send_sections_long(struct tipc_port *sender,
buf_discard(buf_chain);
}
return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
TIPC_ERR_NO_NODE);
total_len, TIPC_ERR_NO_NODE);
}
/* Append whole chain to send queue: */
/* Append chain of fragments to send queue & send them */
buf = buf_chain;
l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
if (!l_ptr->next_out)
l_ptr->next_out = buf_chain;
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
l_ptr->stats.sent_fragmented++;
while (buf) {
struct sk_buff *next = buf->next;
struct tipc_msg *msg = buf_msg(buf);
l_ptr->stats.sent_fragments++;
msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
link_add_to_outqueue(l_ptr, buf, msg);
buf = next;
}
/* Send it, if possible: */
tipc_link_push_queue(l_ptr);
tipc_node_unlock(node);
return dsz;
......@@ -2407,6 +2415,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
*/
static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
{
struct sk_buff *buf_chain = NULL;
struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
struct tipc_msg *inmsg = buf_msg(buf);
struct tipc_msg fragm_hdr;
u32 insize = msg_size(inmsg);
......@@ -2415,7 +2425,7 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
u32 rest = insize;
u32 pack_sz = l_ptr->max_pkt;
u32 fragm_sz = pack_sz - INT_H_SIZE;
u32 fragm_no = 1;
u32 fragm_no = 0;
u32 destaddr;
if (msg_short(inmsg))
......@@ -2427,10 +2437,6 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, destaddr);
msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
msg_set_fragm_no(&fragm_hdr, fragm_no);
l_ptr->stats.sent_fragmented++;
/* Chop up message: */
......@@ -2443,27 +2449,37 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
}
fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
if (fragm == NULL) {
warn("Link unable to fragment message\n");
dsz = -ENOMEM;
goto exit;
buf_discard(buf);
while (buf_chain) {
buf = buf_chain;
buf_chain = buf_chain->next;
buf_discard(buf);
}
return -ENOMEM;
}
msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
fragm_no++;
msg_set_fragm_no(&fragm_hdr, fragm_no);
skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
fragm_sz);
/* Send queued messages first, if any: */
buf_chain_tail->next = fragm;
buf_chain_tail = fragm;
l_ptr->stats.sent_fragments++;
tipc_link_send_buf(l_ptr, fragm);
if (!tipc_link_is_up(l_ptr))
return dsz;
msg_set_fragm_no(&fragm_hdr, ++fragm_no);
rest -= fragm_sz;
crs += fragm_sz;
msg_set_type(&fragm_hdr, FRAGMENT);
}
exit:
buf_discard(buf);
/* Append chain of fragments to send queue & send them */
l_ptr->long_msg_seq_no++;
link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
l_ptr->stats.sent_fragments += fragm_no;
l_ptr->stats.sent_fragmented++;
tipc_link_push_queue(l_ptr);
return dsz;
}
......
......@@ -228,6 +228,7 @@ u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
const u32 num_sect,
unsigned int total_len,
u32 destnode);
void tipc_link_recv_bundle(struct sk_buff *buf);
int tipc_link_recv_fragment(struct sk_buff **pending,
......
......@@ -67,20 +67,6 @@ void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
}
}
/**
* tipc_msg_calc_data_size - determine total data size for message
*/
int tipc_msg_calc_data_size(struct iovec const *msg_sect, u32 num_sect)
{
int dsz = 0;
int i;
for (i = 0; i < num_sect; i++)
dsz += msg_sect[i].iov_len;
return dsz;
}
/**
* tipc_msg_build - create message using specified header and data
*
......@@ -89,18 +75,13 @@ int tipc_msg_calc_data_size(struct iovec const *msg_sect, u32 num_sect)
* Returns message data size or errno
*/
int tipc_msg_build(struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
u32 num_sect, unsigned int total_len,
int max_size, int usrmem, struct sk_buff **buf)
{
int dsz, sz, hsz, pos, res, cnt;
dsz = tipc_msg_calc_data_size(msg_sect, num_sect);
if (unlikely(dsz > TIPC_MAX_USER_MSG_SIZE)) {
*buf = NULL;
return -EINVAL;
}
dsz = total_len;
pos = hsz = msg_hdr_sz(hdr);
sz = hsz + dsz;
msg_set_size(hdr, sz);
......
......@@ -39,41 +39,24 @@
#include "bearer.h"
/*
* Constants and routines used to read and write TIPC payload message headers
*
* Note: Some items are also used with TIPC internal message headers
*/
#define TIPC_VERSION 2
/*
* TIPC user data message header format, version 2:
*
*
* 1 0 9 8 7 6 5 4|3 2 1 0 9 8 7 6|5 4 3 2 1 0 9 8|7 6 5 4 3 2 1 0
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w0:|vers | user |hdr sz |n|d|s|-| message size |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w1:|mstyp| error |rer cnt|lsc|opt p| broadcast ack no |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w2:| link level ack no | broadcast/link level seq no |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w3:| previous node |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w4:| originating port |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w5:| destination port |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w6:| originating node |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w7:| destination node |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w8:| name type / transport sequence number |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* w9:| name instance/multicast lower bound |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* wA:| multicast upper bound |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* / /
* \ options \
* / /
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*
* Payload message users are defined in TIPC's public API:
* - TIPC_LOW_IMPORTANCE
* - TIPC_MEDIUM_IMPORTANCE
* - TIPC_HIGH_IMPORTANCE
* - TIPC_CRITICAL_IMPORTANCE
*/
/*
* Payload message types
*/
#define TIPC_CONN_MSG 0
......@@ -81,6 +64,9 @@
#define TIPC_NAMED_MSG 2
#define TIPC_DIRECT_MSG 3
/*
* Message header sizes
*/
#define SHORT_H_SIZE 24 /* Connected, in-cluster messages */
#define DIR_MSG_H_SIZE 32 /* Directly addressed messages */
......@@ -473,40 +459,11 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
/*
TIPC internal message header format, version 2
1 0 9 8 7 6 5 4|3 2 1 0 9 8 7 6|5 4 3 2 1 0 9 8|7 6 5 4 3 2 1 0
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w0:|vers |msg usr|hdr sz |n|resrv| packet size |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w1:|m typ| sequence gap | broadcast ack no |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w2:| link level ack no/bc_gap_from | seq no / bcast_gap_to |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w3:| previous node |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w4:| next sent broadcast/fragm no | next sent pkt/ fragm msg no |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w5:| session no |rsv=0|r|berid|link prio|netpl|p|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w6:| originating node |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w7:| destination node |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w8:| transport sequence number |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
w9:| msg count / bcast tag | link tolerance |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
\ \
/ User Specific Data /
\ \
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
NB: CONN_MANAGER use data message format. LINK_CONFIG has own format.
*/
* Constants and routines used to read and write TIPC internal message headers
*/
/*
* Internal users
* Internal message users
*/
#define BCAST_PROTOCOL 5
......@@ -520,7 +477,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define LINK_CONFIG 13
/*
* Connection management protocol messages
* Connection management protocol message types
*/
#define CONN_PROBE 0
......@@ -528,12 +485,41 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define CONN_ACK 2
/*
* Name distributor messages
* Name distributor message types
*/
#define PUBLICATION 0
#define WITHDRAWAL 1
/*
* Segmentation message types
*/
#define FIRST_FRAGMENT 0
#define FRAGMENT 1
#define LAST_FRAGMENT 2
/*
* Link management protocol message types
*/
#define STATE_MSG 0
#define RESET_MSG 1
#define ACTIVATE_MSG 2
/*
* Changeover tunnel message types
*/
#define DUPLICATE_MSG 0
#define ORIGINAL_MSG 1
/*
* Config protocol message types
*/
#define DSC_REQ_MSG 0
#define DSC_RESP_MSG 1
/*
* Word 1
......@@ -761,50 +747,11 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
/*
* Segmentation message types
*/
#define FIRST_FRAGMENT 0
#define FRAGMENT 1
#define LAST_FRAGMENT 2
/*
* Link management protocol message types
*/
#define STATE_MSG 0
#define RESET_MSG 1
#define ACTIVATE_MSG 2
/*
* Changeover tunnel message types
*/
#define DUPLICATE_MSG 0
#define ORIGINAL_MSG 1
/*
* Routing table message types
*/
#define EXT_ROUTING_TABLE 0
#define LOCAL_ROUTING_TABLE 1 /* obsoleted */
#define SLAVE_ROUTING_TABLE 2
#define ROUTE_ADDITION 3
#define ROUTE_REMOVAL 4
/*
* Config protocol message types
*/
#define DSC_REQ_MSG 0
#define DSC_RESP_MSG 1
u32 tipc_msg_tot_importance(struct tipc_msg *m);
void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type,
u32 hsize, u32 destnode);
int tipc_msg_calc_data_size(struct iovec const *msg_sect, u32 num_sect);
int tipc_msg_build(struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
u32 num_sect, unsigned int total_len,
int max_size, int usrmem, struct sk_buff **buf);
static inline void msg_set_media_addr(struct tipc_msg *m, struct tipc_media_addr *a)
......
......@@ -74,7 +74,8 @@ static u32 port_peerport(struct tipc_port *p_ptr)
*/
int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
u32 num_sect, struct iovec const *msg_sect)
u32 num_sect, struct iovec const *msg_sect,
unsigned int total_len)
{
struct tipc_msg *hdr;
struct sk_buff *buf;
......@@ -91,11 +92,14 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
hdr = &oport->phdr;
msg_set_type(hdr, TIPC_MCAST_MSG);
msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
msg_set_destport(hdr, 0);
msg_set_destnode(hdr, 0);
msg_set_nametype(hdr, seq->type);
msg_set_namelower(hdr, seq->lower);
msg_set_nameupper(hdr, seq->upper);
msg_set_hdr_sz(hdr, MCAST_H_SIZE);
res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
!oport->user_port, &buf);
if (unlikely(!buf))
return res;
......@@ -161,6 +165,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
/* Deliver a copy of message to each destination port */
if (dp->count != 0) {
msg_set_destnode(msg, tipc_own_addr);
if (dp->count == 1) {
msg_set_destport(msg, dp->ports[0]);
tipc_port_recv_msg(buf);
......@@ -414,12 +419,12 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int err)
unsigned int total_len, int err)
{
struct sk_buff *buf;
int res;
res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
!p_ptr->user_port, &buf);
if (!buf)
return res;
......@@ -1065,6 +1070,7 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
msg_set_orignode(msg, tipc_own_addr);
msg_set_origport(msg, p_ptr->ref);
msg_set_type(msg, TIPC_CONN_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_hdr_sz(msg, SHORT_H_SIZE);
p_ptr->probing_interval = PROBING_INTERVAL;
......@@ -1158,12 +1164,13 @@ int tipc_shutdown(u32 ref)
*/
static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
struct iovec const *msg_sect)
struct iovec const *msg_sect,
unsigned int total_len)
{
struct sk_buff *buf;
int res;
res = tipc_msg_build(&sender->phdr, msg_sect, num_sect,
res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
MAX_MSG_SIZE, !sender->user_port, &buf);
if (likely(buf))
tipc_port_recv_msg(buf);
......@@ -1174,7 +1181,8 @@ static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_se
* tipc_send - send message sections on connection
*/
int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
{
struct tipc_port *p_ptr;
u32 destnode;
......@@ -1189,9 +1197,10 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
destnode = port_peernode(p_ptr);
if (likely(destnode != tipc_own_addr))
res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
destnode);
total_len, destnode);
else
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
total_len);
if (likely(res != -ELINKCONG)) {
p_ptr->congested = 0;
......@@ -1202,8 +1211,7 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
}
if (port_unreliable(p_ptr)) {
p_ptr->congested = 0;
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
return total_len;
}
return -ELINKCONG;
}
......@@ -1213,7 +1221,8 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
*/
int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
unsigned int num_sect, struct iovec const *msg_sect)
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg;
......@@ -1240,23 +1249,23 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
if (likely(destport)) {
if (likely(destnode == tipc_own_addr))
res = tipc_port_recv_sections(p_ptr, num_sect,
msg_sect);
msg_sect, total_len);
else
res = tipc_link_send_sections_fast(p_ptr, msg_sect,
num_sect, destnode);
num_sect, total_len,
destnode);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
return res;
}
if (port_unreliable(p_ptr)) {
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
return total_len;
}
return -ELINKCONG;
}
return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
TIPC_ERR_NO_NAME);
total_len, TIPC_ERR_NO_NAME);
}
/**
......@@ -1264,7 +1273,8 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
*/
int tipc_send2port(u32 ref, struct tipc_portid const *dest,
unsigned int num_sect, struct iovec const *msg_sect)
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len)
{
struct tipc_port *p_ptr;
struct tipc_msg *msg;
......@@ -1276,6 +1286,7 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
msg_set_lookup_scope(msg, 0);
msg_set_orignode(msg, tipc_own_addr);
msg_set_origport(msg, ref);
msg_set_destnode(msg, dest->node);
......@@ -1283,18 +1294,18 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
if (dest->node == tipc_own_addr)
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
total_len);
else
res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
dest->node);
total_len, dest->node);
if (likely(res != -ELINKCONG)) {
if (res > 0)
p_ptr->sent++;
return res;
}
if (port_unreliable(p_ptr)) {
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
return total_len;
}
return -ELINKCONG;
}
......
......@@ -205,23 +205,27 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr);
/*
* TIPC messaging routines
*/
int tipc_send(u32 portref, unsigned int num_sect, struct iovec const *msg_sect);
int tipc_send(u32 portref, unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len);
int tipc_send2name(u32 portref, struct tipc_name const *name, u32 domain,
unsigned int num_sect, struct iovec const *msg_sect);
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len);
int tipc_send2port(u32 portref, struct tipc_portid const *dest,
unsigned int num_sect, struct iovec const *msg_sect);
unsigned int num_sect, struct iovec const *msg_sect,
unsigned int total_len);
int tipc_send_buf2port(u32 portref, struct tipc_portid const *dest,
struct sk_buff *buf, unsigned int dsz);
int tipc_multicast(u32 portref, struct tipc_name_seq const *seq,
unsigned int section_count, struct iovec const *msg);
unsigned int section_count, struct iovec const *msg,
unsigned int total_len);
int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int err);
unsigned int total_len, int err);
struct sk_buff *tipc_port_get_ports(void);
void tipc_port_recv_proto_msg(struct sk_buff *buf);
void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp);
......
......@@ -535,6 +535,9 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
if (unlikely((m->msg_namelen < sizeof(*dest)) ||
(dest->family != AF_TIPC)))
return -EINVAL;
if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
(m->msg_iovlen > (unsigned)INT_MAX))
return -EMSGSIZE;
if (iocb)
lock_sock(sk);
......@@ -573,12 +576,14 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
&dest->addr.name.name,
dest->addr.name.domain,
m->msg_iovlen,
m->msg_iov);
m->msg_iov,
total_len);
} else if (dest->addrtype == TIPC_ADDR_ID) {
res = tipc_send2port(tport->ref,
&dest->addr.id,
m->msg_iovlen,
m->msg_iov);
m->msg_iov,
total_len);
} else if (dest->addrtype == TIPC_ADDR_MCAST) {
if (needs_conn) {
res = -EOPNOTSUPP;
......@@ -590,7 +595,8 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
res = tipc_multicast(tport->ref,
&dest->addr.nameseq,
m->msg_iovlen,
m->msg_iov);
m->msg_iov,
total_len);
}
if (likely(res != -ELINKCONG)) {
if (needs_conn && (res >= 0))
......@@ -640,6 +646,10 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
if (unlikely(dest))
return send_msg(iocb, sock, m, total_len);
if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
(m->msg_iovlen > (unsigned)INT_MAX))
return -EMSGSIZE;
if (iocb)
lock_sock(sk);
......@@ -652,7 +662,8 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
break;
}
res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov);
res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
total_len);
if (likely(res != -ELINKCONG))
break;
if (m->msg_flags & MSG_DONTWAIT) {
......@@ -723,6 +734,12 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
goto exit;
}
if ((total_len > (unsigned)INT_MAX) ||
(m->msg_iovlen > (unsigned)INT_MAX)) {
res = -EMSGSIZE;
goto exit;
}
/*
* Send each iovec entry using one or more messages
*
......@@ -753,7 +770,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
bytes_to_send = curr_left;
my_iov.iov_base = curr_start;
my_iov.iov_len = bytes_to_send;
res = send_packet(NULL, sock, &my_msg, 0);
res = send_packet(NULL, sock, &my_msg, bytes_to_send);
if (res < 0) {
if (bytes_sent)
res = bytes_sent;
......
......@@ -109,7 +109,7 @@ static void subscr_send_event(struct subscription *sub,
sub->evt.found_upper = htohl(found_upper, sub->swap);
sub->evt.port.ref = htohl(port_ref, sub->swap);
sub->evt.port.node = htohl(node, sub->swap);
tipc_send(sub->server_ref, 1, &msg_sect);
tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len);
}
/**
......@@ -521,7 +521,7 @@ static void subscr_named_msg_event(void *usr_handle,
/* Send an ACK- to complete connection handshaking */
tipc_send(server_port_ref, 0, NULL);
tipc_send(server_port_ref, 0, NULL, 0);
/* Handle optional subscription request */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment