Commit bbb03029 authored by Tom Herbert's avatar Tom Herbert Committed by David S. Miller

strparser: Generalize strparser

Generalize strparser from more than just being used in conjunction
with read_sock. strparser will also be used in the send path with
zero proxy. The primary change is to create strp_process function
that performs the critical processing on skbs. The documentation
is also updated to reflect the new uses.
Signed-off-by: default avatarTom Herbert <tom@quantonium.net>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 20bf50de
Stream Parser Stream Parser (strparser)
-------------
Introduction
============
The stream parser (strparser) is a utility that parses messages of an The stream parser (strparser) is a utility that parses messages of an
application layer protocol running over a TCP connection. The stream application layer protocol running over a data stream. The stream
parser works in conjunction with an upper layer in the kernel to provide parser works in conjunction with an upper layer in the kernel to provide
kernel support for application layer messages. For instance, Kernel kernel support for application layer messages. For instance, Kernel
Connection Multiplexor (KCM) uses the Stream Parser to parse messages Connection Multiplexor (KCM) uses the Stream Parser to parse messages
using a BPF program. using a BPF program.
The strparser works in one of two modes: receive callback or general
mode.
In receive callback mode, the strparser is called from the data_ready
callback of a TCP socket. Messages are parsed and delivered as they are
received on the socket.
In general mode, a sequence of skbs are fed to strparser from an
outside source. Message are parsed and delivered as the sequence is
processed. This modes allows strparser to be applied to arbitrary
streams of data.
Interface Interface
--------- =========
The API includes a context structure, a set of callbacks, utility The API includes a context structure, a set of callbacks, utility
functions, and a data_ready function. The callbacks include functions, and a data_ready function for receive callback mode. The
a parse_msg function that is called to perform parsing (e.g. callbacks include a parse_msg function that is called to perform
BPF parsing in case of KCM), and a rcv_msg function that is called parsing (e.g. BPF parsing in case of KCM), and a rcv_msg function
when a full message has been completed. that is called when a full message has been completed.
A stream parser can be instantiated for a TCP connection. This is done Functions
by: =========
strp_init(struct strparser *strp, struct sock *csk, strp_init(struct strparser *strp, struct sock *sk,
struct strp_callbacks *cb) struct strp_callbacks *cb)
strp is a struct of type strparser that is allocated by the upper layer. Called to initialize a stream parser. strp is a struct of type
csk is the TCP socket associated with the stream parser. Callbacks are strparser that is allocated by the upper layer. sk is the TCP
called by the stream parser. socket associated with the stream parser for use with receive
callback mode; in general mode this is set to NULL. Callbacks
are called by the stream parser (the callbacks are listed below).
void strp_pause(struct strparser *strp)
Temporarily pause a stream parser. Message parsing is suspended
and no new messages are delivered to the upper layer.
void strp_pause(struct strparser *strp)
Unpause a paused stream parser.
void strp_stop(struct strparser *strp);
strp_stop is called to completely stop stream parser operations.
This is called internally when the stream parser encounters an
error, and it is called from the upper layer to stop parsing
operations.
void strp_done(struct strparser *strp);
strp_done is called to release any resources held by the stream
parser instance. This must be called after the stream processor
has been stopped.
int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
unsigned int orig_offset, size_t orig_len,
size_t max_msg_size, long timeo)
strp_process is called in general mode for a stream parser to
parse an sk_buff. The number of bytes processed or a negative
error number is returned. Note that strp_process does not
consume the sk_buff. max_msg_size is maximum size the stream
parser will parse. timeo is timeout for completing a message.
void strp_data_ready(struct strparser *strp);
The upper layer calls strp_tcp_data_ready when data is ready on
the lower socket for strparser to process. This should be called
from a data_ready callback that is set on the socket. Note that
maximum messages size is the limit of the receive socket
buffer and message timeout is the receive timeout for the socket.
void strp_check_rcv(struct strparser *strp);
strp_check_rcv is called to check for new messages on the socket.
This is normally called at initialization of a stream parser
instance or after strp_unpause.
Callbacks Callbacks
--------- =========
There are four callbacks: There are six callbacks:
int (*parse_msg)(struct strparser *strp, struct sk_buff *skb); int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
parse_msg is called to determine the length of the next message parse_msg is called to determine the length of the next message
in the stream. The upper layer must implement this function. It in the stream. The upper layer must implement this function. It
should parse the sk_buff as containing the headers for the should parse the sk_buff as containing the headers for the
next application layer messages in the stream. next application layer message in the stream.
The skb->cb in the input skb is a struct strp_rx_msg. Only The skb->cb in the input skb is a struct strp_msg. Only
the offset field is relevant in parse_msg and gives the offset the offset field is relevant in parse_msg and gives the offset
where the message starts in the skb. where the message starts in the skb.
...@@ -50,26 +112,41 @@ int (*parse_msg)(struct strparser *strp, struct sk_buff *skb); ...@@ -50,26 +112,41 @@ int (*parse_msg)(struct strparser *strp, struct sk_buff *skb);
-ESTRPIPE : current message should not be processed by the -ESTRPIPE : current message should not be processed by the
kernel, return control of the socket to userspace which kernel, return control of the socket to userspace which
can proceed to read the messages itself can proceed to read the messages itself
other < 0 : Error is parsing, give control back to userspace other < 0 : Error in parsing, give control back to userspace
assuming that synchronization is lost and the stream assuming that synchronization is lost and the stream
is unrecoverable (application expected to close TCP socket) is unrecoverable (application expected to close TCP socket)
In the case that an error is returned (return value is less than In the case that an error is returned (return value is less than
zero) the stream parser will set the error on TCP socket and wake zero) and the parser is in receive callback mode, then it will set
it up. If parse_msg returned -ESTRPIPE and the stream parser had the error on TCP socket and wake it up. If parse_msg returned
previously read some bytes for the current message, then the error -ESTRPIPE and the stream parser had previously read some bytes for
set on the attached socket is ENODATA since the stream is the current message, then the error set on the attached socket is
unrecoverable in that case. ENODATA since the stream is unrecoverable in that case.
void (*lock)(struct strparser *strp)
The lock callback is called to lock the strp structure when
the strparser is performing an asynchronous operation (such as
processing a timeout). In receive callback mode the default
function is to lock_sock for the associated socket. In general
mode the callback must be set appropriately.
void (*unlock)(struct strparser *strp)
The unlock callback is called to release the lock obtained
by the lock callback. In receive callback mode the default
function is release_sock for the associated socket. In general
mode the callback must be set appropriately.
void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb); void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
rcv_msg is called when a full message has been received and rcv_msg is called when a full message has been received and
is queued. The callee must consume the sk_buff; it can is queued. The callee must consume the sk_buff; it can
call strp_pause to prevent any further messages from being call strp_pause to prevent any further messages from being
received in rcv_msg (see strp_pause below). This callback received in rcv_msg (see strp_pause above). This callback
must be set. must be set.
The skb->cb in the input skb is a struct strp_rx_msg. This The skb->cb in the input skb is a struct strp_msg. This
struct contains two fields: offset and full_len. Offset is struct contains two fields: offset and full_len. Offset is
where the message starts in the skb, and full_len is the where the message starts in the skb, and full_len is the
the length of the message. skb->len - offset may be greater the length of the message. skb->len - offset may be greater
...@@ -78,59 +155,53 @@ void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb); ...@@ -78,59 +155,53 @@ void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
int (*read_sock_done)(struct strparser *strp, int err); int (*read_sock_done)(struct strparser *strp, int err);
read_sock_done is called when the stream parser is done reading read_sock_done is called when the stream parser is done reading
the TCP socket. The stream parser may read multiple messages the TCP socket in receive callback mode. The stream parser may
in a loop and this function allows cleanup to occur when existing read multiple messages in a loop and this function allows cleanup
the loop. If the callback is not set (NULL in strp_init) a to occur when exiting the loop. If the callback is not set (NULL
default function is used. in strp_init) a default function is used.
void (*abort_parser)(struct strparser *strp, int err); void (*abort_parser)(struct strparser *strp, int err);
This function is called when stream parser encounters an error This function is called when stream parser encounters an error
in parsing. The default function stops the stream parser for the in parsing. The default function stops the stream parser and
TCP socket and sets the error in the socket. The default function sets the error in the socket if the parser is in receive callback
can be changed by setting the callback to non-NULL in strp_init. mode. The default function can be changed by setting the callback
to non-NULL in strp_init.
Functions Statistics
--------- ==========
The upper layer calls strp_tcp_data_ready when data is ready on the lower Various counters are kept for each stream parser instance. These are in
socket for strparser to process. This should be called from a data_ready the strp_stats structure. strp_aggr_stats is a convenience structure for
callback that is set on the socket. accumulating statistics for multiple stream parser instances.
save_strp_stats and aggregate_strp_stats are helper functions to save
and aggregate statistics.
strp_stop is called to completely stop stream parser operations. This Message assembly limits
is called internally when the stream parser encounters an error, and =======================
it is called from the upper layer when unattaching a TCP socket.
strp_done is called to unattach the stream parser from the TCP socket. The stream parser provide mechanisms to limit the resources consumed by
This must be called after the stream processor has be stopped. message assembly.
strp_check_rcv is called to check for new messages on the socket. This A timer is set when assembly starts for a new message. In receive
is normally called at initialization of the a stream parser instance callback mode the message timeout is taken from rcvtime for the
of after strp_unpause. associated TCP socket. In general mode, the timeout is passed as an
argument in strp_process. If the timer fires before assembly completes
the stream parser is aborted and the ETIMEDOUT error is set on the TCP
socket if in receive callback mode.
Statistics In receive callback mode, message length is limited to the receive
---------- buffer size of the associated TCP socket. If the length returned by
parse_msg is greater than the socket buffer size then the stream parser
is aborted with EMSGSIZE error set on the TCP socket. Note that this
makes the maximum size of receive skbuffs for a socket with a stream
parser to be 2*sk_rcvbuf of the TCP socket.
Various counters are kept for each stream parser for a TCP socket. In general mode the message length limit is passed in as an argument
These are in the strp_stats structure. strp_aggr_stats is a convenience to strp_process.
structure for accumulating statistics for multiple stream parser
instances. save_strp_stats and aggregate_strp_stats are helper functions
to save and aggregate statistics.
Message assembly limits Author
----------------------- ======
The stream parser provide mechanisms to limit the resources consumed by Tom Herbert (tom@quantonium.net)
message assembly.
A timer is set when assembly starts for a new message. The message
timeout is taken from rcvtime for the associated TCP socket. If the
timer fires before assembly completes the stream parser is aborted
and the ETIMEDOUT error is set on the TCP socket.
Message length is limited to the receive buffer size of the associated
TCP socket. If the length returned by parse_msg is greater than
the socket buffer size then the stream parser is aborted with
EMSGSIZE error set on the TCP socket. Note that this makes the
maximum size of receive skbuffs for a socket with a stream parser
to be 2*sk_rcvbuf of the TCP socket.
...@@ -18,26 +18,26 @@ ...@@ -18,26 +18,26 @@
#define STRP_STATS_INCR(stat) ((stat)++) #define STRP_STATS_INCR(stat) ((stat)++)
struct strp_stats { struct strp_stats {
unsigned long long rx_msgs; unsigned long long msgs;
unsigned long long rx_bytes; unsigned long long bytes;
unsigned int rx_mem_fail; unsigned int mem_fail;
unsigned int rx_need_more_hdr; unsigned int need_more_hdr;
unsigned int rx_msg_too_big; unsigned int msg_too_big;
unsigned int rx_msg_timeouts; unsigned int msg_timeouts;
unsigned int rx_bad_hdr_len; unsigned int bad_hdr_len;
}; };
struct strp_aggr_stats { struct strp_aggr_stats {
unsigned long long rx_msgs; unsigned long long msgs;
unsigned long long rx_bytes; unsigned long long bytes;
unsigned int rx_mem_fail; unsigned int mem_fail;
unsigned int rx_need_more_hdr; unsigned int need_more_hdr;
unsigned int rx_msg_too_big; unsigned int msg_too_big;
unsigned int rx_msg_timeouts; unsigned int msg_timeouts;
unsigned int rx_bad_hdr_len; unsigned int bad_hdr_len;
unsigned int rx_aborts; unsigned int aborts;
unsigned int rx_interrupted; unsigned int interrupted;
unsigned int rx_unrecov_intr; unsigned int unrecov_intr;
}; };
struct strparser; struct strparser;
...@@ -48,16 +48,18 @@ struct strp_callbacks { ...@@ -48,16 +48,18 @@ struct strp_callbacks {
void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb); void (*rcv_msg)(struct strparser *strp, struct sk_buff *skb);
int (*read_sock_done)(struct strparser *strp, int err); int (*read_sock_done)(struct strparser *strp, int err);
void (*abort_parser)(struct strparser *strp, int err); void (*abort_parser)(struct strparser *strp, int err);
void (*lock)(struct strparser *strp);
void (*unlock)(struct strparser *strp);
}; };
struct strp_rx_msg { struct strp_msg {
int full_len; int full_len;
int offset; int offset;
}; };
static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb) static inline struct strp_msg *strp_msg(struct sk_buff *skb)
{ {
return (struct strp_rx_msg *)((void *)skb->cb + return (struct strp_msg *)((void *)skb->cb +
offsetof(struct qdisc_skb_cb, data)); offsetof(struct qdisc_skb_cb, data));
} }
...@@ -65,18 +67,18 @@ static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb) ...@@ -65,18 +67,18 @@ static inline struct strp_rx_msg *strp_rx_msg(struct sk_buff *skb)
struct strparser { struct strparser {
struct sock *sk; struct sock *sk;
u32 rx_stopped : 1; u32 stopped : 1;
u32 rx_paused : 1; u32 paused : 1;
u32 rx_aborted : 1; u32 aborted : 1;
u32 rx_interrupted : 1; u32 interrupted : 1;
u32 rx_unrecov_intr : 1; u32 unrecov_intr : 1;
struct sk_buff **rx_skb_nextp; struct sk_buff **skb_nextp;
struct timer_list rx_msg_timer; struct timer_list msg_timer;
struct sk_buff *rx_skb_head; struct sk_buff *skb_head;
unsigned int rx_need_bytes; unsigned int need_bytes;
struct delayed_work rx_delayed_work; struct delayed_work delayed_work;
struct work_struct rx_work; struct work_struct work;
struct strp_stats stats; struct strp_stats stats;
struct strp_callbacks cb; struct strp_callbacks cb;
}; };
...@@ -84,7 +86,7 @@ struct strparser { ...@@ -84,7 +86,7 @@ struct strparser {
/* Must be called with lock held for attached socket */ /* Must be called with lock held for attached socket */
static inline void strp_pause(struct strparser *strp) static inline void strp_pause(struct strparser *strp)
{ {
strp->rx_paused = 1; strp->paused = 1;
} }
/* May be called without holding lock for attached socket */ /* May be called without holding lock for attached socket */
...@@ -97,37 +99,37 @@ static inline void save_strp_stats(struct strparser *strp, ...@@ -97,37 +99,37 @@ static inline void save_strp_stats(struct strparser *strp,
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += \ #define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += \
strp->stats._stat) strp->stats._stat)
SAVE_PSOCK_STATS(rx_msgs); SAVE_PSOCK_STATS(msgs);
SAVE_PSOCK_STATS(rx_bytes); SAVE_PSOCK_STATS(bytes);
SAVE_PSOCK_STATS(rx_mem_fail); SAVE_PSOCK_STATS(mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr); SAVE_PSOCK_STATS(need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big); SAVE_PSOCK_STATS(msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts); SAVE_PSOCK_STATS(msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len); SAVE_PSOCK_STATS(bad_hdr_len);
#undef SAVE_PSOCK_STATS #undef SAVE_PSOCK_STATS
if (strp->rx_aborted) if (strp->aborted)
agg_stats->rx_aborts++; agg_stats->aborts++;
if (strp->rx_interrupted) if (strp->interrupted)
agg_stats->rx_interrupted++; agg_stats->interrupted++;
if (strp->rx_unrecov_intr) if (strp->unrecov_intr)
agg_stats->rx_unrecov_intr++; agg_stats->unrecov_intr++;
} }
static inline void aggregate_strp_stats(struct strp_aggr_stats *stats, static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
struct strp_aggr_stats *agg_stats) struct strp_aggr_stats *agg_stats)
{ {
#define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat) #define SAVE_PSOCK_STATS(_stat) (agg_stats->_stat += stats->_stat)
SAVE_PSOCK_STATS(rx_msgs); SAVE_PSOCK_STATS(msgs);
SAVE_PSOCK_STATS(rx_bytes); SAVE_PSOCK_STATS(bytes);
SAVE_PSOCK_STATS(rx_mem_fail); SAVE_PSOCK_STATS(mem_fail);
SAVE_PSOCK_STATS(rx_need_more_hdr); SAVE_PSOCK_STATS(need_more_hdr);
SAVE_PSOCK_STATS(rx_msg_too_big); SAVE_PSOCK_STATS(msg_too_big);
SAVE_PSOCK_STATS(rx_msg_timeouts); SAVE_PSOCK_STATS(msg_timeouts);
SAVE_PSOCK_STATS(rx_bad_hdr_len); SAVE_PSOCK_STATS(bad_hdr_len);
SAVE_PSOCK_STATS(rx_aborts); SAVE_PSOCK_STATS(aborts);
SAVE_PSOCK_STATS(rx_interrupted); SAVE_PSOCK_STATS(interrupted);
SAVE_PSOCK_STATS(rx_unrecov_intr); SAVE_PSOCK_STATS(unrecov_intr);
#undef SAVE_PSOCK_STATS #undef SAVE_PSOCK_STATS
} }
...@@ -135,8 +137,11 @@ static inline void aggregate_strp_stats(struct strp_aggr_stats *stats, ...@@ -135,8 +137,11 @@ static inline void aggregate_strp_stats(struct strp_aggr_stats *stats,
void strp_done(struct strparser *strp); void strp_done(struct strparser *strp);
void strp_stop(struct strparser *strp); void strp_stop(struct strparser *strp);
void strp_check_rcv(struct strparser *strp); void strp_check_rcv(struct strparser *strp);
int strp_init(struct strparser *strp, struct sock *csk, int strp_init(struct strparser *strp, struct sock *sk,
struct strp_callbacks *cb); struct strp_callbacks *cb);
void strp_data_ready(struct strparser *strp); void strp_data_ready(struct strparser *strp);
int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
unsigned int orig_offset, size_t orig_len,
size_t max_msg_size, long timeo);
#endif /* __NET_STRPARSER_H_ */ #endif /* __NET_STRPARSER_H_ */
...@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq, ...@@ -155,8 +155,8 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
seq_printf(seq, seq_printf(seq,
" psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ", " psock-%-5u %-10llu %-16llu %-10llu %-16llu %-8d %-8d %-8d %-8d ",
psock->index, psock->index,
psock->strp.stats.rx_msgs, psock->strp.stats.msgs,
psock->strp.stats.rx_bytes, psock->strp.stats.bytes,
psock->stats.tx_msgs, psock->stats.tx_msgs,
psock->stats.tx_bytes, psock->stats.tx_bytes,
psock->sk->sk_receive_queue.qlen, psock->sk->sk_receive_queue.qlen,
...@@ -170,22 +170,22 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq, ...@@ -170,22 +170,22 @@ static void kcm_format_psock(struct kcm_psock *psock, struct seq_file *seq,
if (psock->tx_stopped) if (psock->tx_stopped)
seq_puts(seq, "TxStop "); seq_puts(seq, "TxStop ");
if (psock->strp.rx_stopped) if (psock->strp.stopped)
seq_puts(seq, "RxStop "); seq_puts(seq, "RxStop ");
if (psock->tx_kcm) if (psock->tx_kcm)
seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index); seq_printf(seq, "Rsvd-%d ", psock->tx_kcm->index);
if (!psock->strp.rx_paused && !psock->ready_rx_msg) { if (!psock->strp.paused && !psock->ready_rx_msg) {
if (psock->sk->sk_receive_queue.qlen) { if (psock->sk->sk_receive_queue.qlen) {
if (psock->strp.rx_need_bytes) if (psock->strp.need_bytes)
seq_printf(seq, "RxWait=%u ", seq_printf(seq, "RxWait=%u ",
psock->strp.rx_need_bytes); psock->strp.need_bytes);
else else
seq_printf(seq, "RxWait "); seq_printf(seq, "RxWait ");
} }
} else { } else {
if (psock->strp.rx_paused) if (psock->strp.paused)
seq_puts(seq, "RxPause "); seq_puts(seq, "RxPause ");
if (psock->ready_rx_msg) if (psock->ready_rx_msg)
...@@ -371,20 +371,20 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v) ...@@ -371,20 +371,20 @@ static int kcm_stats_seq_show(struct seq_file *seq, void *v)
seq_printf(seq, seq_printf(seq,
"%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n", "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
"", "",
strp_stats.rx_msgs, strp_stats.msgs,
strp_stats.rx_bytes, strp_stats.bytes,
psock_stats.tx_msgs, psock_stats.tx_msgs,
psock_stats.tx_bytes, psock_stats.tx_bytes,
psock_stats.reserved, psock_stats.reserved,
psock_stats.unreserved, psock_stats.unreserved,
strp_stats.rx_aborts, strp_stats.aborts,
strp_stats.rx_interrupted, strp_stats.interrupted,
strp_stats.rx_unrecov_intr, strp_stats.unrecov_intr,
strp_stats.rx_mem_fail, strp_stats.mem_fail,
strp_stats.rx_need_more_hdr, strp_stats.need_more_hdr,
strp_stats.rx_bad_hdr_len, strp_stats.bad_hdr_len,
strp_stats.rx_msg_too_big, strp_stats.msg_too_big,
strp_stats.rx_msg_timeouts, strp_stats.msg_timeouts,
psock_stats.tx_aborts); psock_stats.tx_aborts);
return 0; return 0;
......
...@@ -96,12 +96,12 @@ static void kcm_update_rx_mux_stats(struct kcm_mux *mux, ...@@ -96,12 +96,12 @@ static void kcm_update_rx_mux_stats(struct kcm_mux *mux,
struct kcm_psock *psock) struct kcm_psock *psock)
{ {
STRP_STATS_ADD(mux->stats.rx_bytes, STRP_STATS_ADD(mux->stats.rx_bytes,
psock->strp.stats.rx_bytes - psock->strp.stats.bytes -
psock->saved_rx_bytes); psock->saved_rx_bytes);
mux->stats.rx_msgs += mux->stats.rx_msgs +=
psock->strp.stats.rx_msgs - psock->saved_rx_msgs; psock->strp.stats.msgs - psock->saved_rx_msgs;
psock->saved_rx_msgs = psock->strp.stats.rx_msgs; psock->saved_rx_msgs = psock->strp.stats.msgs;
psock->saved_rx_bytes = psock->strp.stats.rx_bytes; psock->saved_rx_bytes = psock->strp.stats.bytes;
} }
static void kcm_update_tx_mux_stats(struct kcm_mux *mux, static void kcm_update_tx_mux_stats(struct kcm_mux *mux,
...@@ -1118,7 +1118,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg, ...@@ -1118,7 +1118,7 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
struct kcm_sock *kcm = kcm_sk(sk); struct kcm_sock *kcm = kcm_sk(sk);
int err = 0; int err = 0;
long timeo; long timeo;
struct strp_rx_msg *rxm; struct strp_msg *stm;
int copied = 0; int copied = 0;
struct sk_buff *skb; struct sk_buff *skb;
...@@ -1132,26 +1132,26 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg, ...@@ -1132,26 +1132,26 @@ static int kcm_recvmsg(struct socket *sock, struct msghdr *msg,
/* Okay, have a message on the receive queue */ /* Okay, have a message on the receive queue */
rxm = strp_rx_msg(skb); stm = strp_msg(skb);
if (len > rxm->full_len) if (len > stm->full_len)
len = rxm->full_len; len = stm->full_len;
err = skb_copy_datagram_msg(skb, rxm->offset, msg, len); err = skb_copy_datagram_msg(skb, stm->offset, msg, len);
if (err < 0) if (err < 0)
goto out; goto out;
copied = len; copied = len;
if (likely(!(flags & MSG_PEEK))) { if (likely(!(flags & MSG_PEEK))) {
KCM_STATS_ADD(kcm->stats.rx_bytes, copied); KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
if (copied < rxm->full_len) { if (copied < stm->full_len) {
if (sock->type == SOCK_DGRAM) { if (sock->type == SOCK_DGRAM) {
/* Truncated message */ /* Truncated message */
msg->msg_flags |= MSG_TRUNC; msg->msg_flags |= MSG_TRUNC;
goto msg_finished; goto msg_finished;
} }
rxm->offset += copied; stm->offset += copied;
rxm->full_len -= copied; stm->full_len -= copied;
} else { } else {
msg_finished: msg_finished:
/* Finished with message */ /* Finished with message */
...@@ -1175,7 +1175,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos, ...@@ -1175,7 +1175,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
struct kcm_sock *kcm = kcm_sk(sk); struct kcm_sock *kcm = kcm_sk(sk);
long timeo; long timeo;
struct strp_rx_msg *rxm; struct strp_msg *stm;
int err = 0; int err = 0;
ssize_t copied; ssize_t copied;
struct sk_buff *skb; struct sk_buff *skb;
...@@ -1192,12 +1192,12 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos, ...@@ -1192,12 +1192,12 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
/* Okay, have a message on the receive queue */ /* Okay, have a message on the receive queue */
rxm = strp_rx_msg(skb); stm = strp_msg(skb);
if (len > rxm->full_len) if (len > stm->full_len)
len = rxm->full_len; len = stm->full_len;
copied = skb_splice_bits(skb, sk, rxm->offset, pipe, len, flags); copied = skb_splice_bits(skb, sk, stm->offset, pipe, len, flags);
if (copied < 0) { if (copied < 0) {
err = copied; err = copied;
goto err_out; goto err_out;
...@@ -1205,8 +1205,8 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos, ...@@ -1205,8 +1205,8 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,
KCM_STATS_ADD(kcm->stats.rx_bytes, copied); KCM_STATS_ADD(kcm->stats.rx_bytes, copied);
rxm->offset += copied; stm->offset += copied;
rxm->full_len -= copied; stm->full_len -= copied;
/* We have no way to return MSG_EOR. If all the bytes have been /* We have no way to return MSG_EOR. If all the bytes have been
* read we still leave the message in the receive socket buffer. * read we still leave the message in the receive socket buffer.
......
...@@ -29,44 +29,46 @@ ...@@ -29,44 +29,46 @@
static struct workqueue_struct *strp_wq; static struct workqueue_struct *strp_wq;
struct _strp_rx_msg { struct _strp_msg {
/* Internal cb structure. struct strp_rx_msg must be first for passing /* Internal cb structure. struct strp_msg must be first for passing
* to upper layer. * to upper layer.
*/ */
struct strp_rx_msg strp; struct strp_msg strp;
int accum_len; int accum_len;
int early_eaten; int early_eaten;
}; };
static inline struct _strp_rx_msg *_strp_rx_msg(struct sk_buff *skb) static inline struct _strp_msg *_strp_msg(struct sk_buff *skb)
{ {
return (struct _strp_rx_msg *)((void *)skb->cb + return (struct _strp_msg *)((void *)skb->cb +
offsetof(struct qdisc_skb_cb, data)); offsetof(struct qdisc_skb_cb, data));
} }
/* Lower lock held */ /* Lower lock held */
static void strp_abort_rx_strp(struct strparser *strp, int err) static void strp_abort_strp(struct strparser *strp, int err)
{ {
struct sock *csk = strp->sk;
/* Unrecoverable error in receive */ /* Unrecoverable error in receive */
del_timer(&strp->rx_msg_timer); del_timer(&strp->msg_timer);
if (strp->rx_stopped) if (strp->stopped)
return; return;
strp->rx_stopped = 1; strp->stopped = 1;
if (strp->sk) {
struct sock *sk = strp->sk;
/* Report an error on the lower socket */ /* Report an error on the lower socket */
csk->sk_err = err; sk->sk_err = err;
csk->sk_error_report(csk); sk->sk_error_report(sk);
}
} }
static void strp_start_rx_timer(struct strparser *strp) static void strp_start_timer(struct strparser *strp, long timeo)
{ {
if (strp->sk->sk_rcvtimeo) if (timeo)
mod_timer(&strp->rx_msg_timer, strp->sk->sk_rcvtimeo); mod_timer(&strp->msg_timer, timeo);
} }
/* Lower lock held */ /* Lower lock held */
...@@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err, ...@@ -74,46 +76,55 @@ static void strp_parser_err(struct strparser *strp, int err,
read_descriptor_t *desc) read_descriptor_t *desc)
{ {
desc->error = err; desc->error = err;
kfree_skb(strp->rx_skb_head); kfree_skb(strp->skb_head);
strp->rx_skb_head = NULL; strp->skb_head = NULL;
strp->cb.abort_parser(strp, err); strp->cb.abort_parser(strp, err);
} }
static inline int strp_peek_len(struct strparser *strp) static inline int strp_peek_len(struct strparser *strp)
{ {
if (strp->sk) {
struct socket *sock = strp->sk->sk_socket; struct socket *sock = strp->sk->sk_socket;
return sock->ops->peek_len(sock); return sock->ops->peek_len(sock);
}
/* If we don't have an associated socket there's nothing to peek.
* Return int max to avoid stopping the strparser.
*/
return INT_MAX;
} }
/* Lower socket lock held */ /* Lower socket lock held */
static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, static int __strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
unsigned int orig_offset, size_t orig_len) unsigned int orig_offset, size_t orig_len,
size_t max_msg_size, long timeo)
{ {
struct strparser *strp = (struct strparser *)desc->arg.data; struct strparser *strp = (struct strparser *)desc->arg.data;
struct _strp_rx_msg *rxm; struct _strp_msg *stm;
struct sk_buff *head, *skb; struct sk_buff *head, *skb;
size_t eaten = 0, cand_len; size_t eaten = 0, cand_len;
ssize_t extra; ssize_t extra;
int err; int err;
bool cloned_orig = false; bool cloned_orig = false;
if (strp->rx_paused) if (strp->paused)
return 0; return 0;
head = strp->rx_skb_head; head = strp->skb_head;
if (head) { if (head) {
/* Message already in progress */ /* Message already in progress */
rxm = _strp_rx_msg(head); stm = _strp_msg(head);
if (unlikely(rxm->early_eaten)) { if (unlikely(stm->early_eaten)) {
/* Already some number of bytes on the receive sock /* Already some number of bytes on the receive sock
* data saved in rx_skb_head, just indicate they * data saved in skb_head, just indicate they
* are consumed. * are consumed.
*/ */
eaten = orig_len <= rxm->early_eaten ? eaten = orig_len <= stm->early_eaten ?
orig_len : rxm->early_eaten; orig_len : stm->early_eaten;
rxm->early_eaten -= eaten; stm->early_eaten -= eaten;
return eaten; return eaten;
} }
...@@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -126,12 +137,12 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
*/ */
orig_skb = skb_clone(orig_skb, GFP_ATOMIC); orig_skb = skb_clone(orig_skb, GFP_ATOMIC);
if (!orig_skb) { if (!orig_skb) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
desc->error = -ENOMEM; desc->error = -ENOMEM;
return 0; return 0;
} }
if (!pskb_pull(orig_skb, orig_offset)) { if (!pskb_pull(orig_skb, orig_offset)) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
kfree_skb(orig_skb); kfree_skb(orig_skb);
desc->error = -ENOMEM; desc->error = -ENOMEM;
return 0; return 0;
...@@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -140,13 +151,13 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
orig_offset = 0; orig_offset = 0;
} }
if (!strp->rx_skb_nextp) { if (!strp->skb_nextp) {
/* We are going to append to the frags_list of head. /* We are going to append to the frags_list of head.
* Need to unshare the frag_list. * Need to unshare the frag_list.
*/ */
err = skb_unclone(head, GFP_ATOMIC); err = skb_unclone(head, GFP_ATOMIC);
if (err) { if (err) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
desc->error = err; desc->error = err;
return 0; return 0;
} }
...@@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -165,20 +176,20 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
skb = alloc_skb(0, GFP_ATOMIC); skb = alloc_skb(0, GFP_ATOMIC);
if (!skb) { if (!skb) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
desc->error = -ENOMEM; desc->error = -ENOMEM;
return 0; return 0;
} }
skb->len = head->len; skb->len = head->len;
skb->data_len = head->len; skb->data_len = head->len;
skb->truesize = head->truesize; skb->truesize = head->truesize;
*_strp_rx_msg(skb) = *_strp_rx_msg(head); *_strp_msg(skb) = *_strp_msg(head);
strp->rx_skb_nextp = &head->next; strp->skb_nextp = &head->next;
skb_shinfo(skb)->frag_list = head; skb_shinfo(skb)->frag_list = head;
strp->rx_skb_head = skb; strp->skb_head = skb;
head = skb; head = skb;
} else { } else {
strp->rx_skb_nextp = strp->skb_nextp =
&skb_shinfo(head)->frag_list; &skb_shinfo(head)->frag_list;
} }
} }
...@@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -188,112 +199,112 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
/* Always clone since we will consume something */ /* Always clone since we will consume something */
skb = skb_clone(orig_skb, GFP_ATOMIC); skb = skb_clone(orig_skb, GFP_ATOMIC);
if (!skb) { if (!skb) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
desc->error = -ENOMEM; desc->error = -ENOMEM;
break; break;
} }
cand_len = orig_len - eaten; cand_len = orig_len - eaten;
head = strp->rx_skb_head; head = strp->skb_head;
if (!head) { if (!head) {
head = skb; head = skb;
strp->rx_skb_head = head; strp->skb_head = head;
/* Will set rx_skb_nextp on next packet if needed */ /* Will set skb_nextp on next packet if needed */
strp->rx_skb_nextp = NULL; strp->skb_nextp = NULL;
rxm = _strp_rx_msg(head); stm = _strp_msg(head);
memset(rxm, 0, sizeof(*rxm)); memset(stm, 0, sizeof(*stm));
rxm->strp.offset = orig_offset + eaten; stm->strp.offset = orig_offset + eaten;
} else { } else {
/* Unclone since we may be appending to an skb that we /* Unclone since we may be appending to an skb that we
* already share a frag_list with. * already share a frag_list with.
*/ */
err = skb_unclone(skb, GFP_ATOMIC); err = skb_unclone(skb, GFP_ATOMIC);
if (err) { if (err) {
STRP_STATS_INCR(strp->stats.rx_mem_fail); STRP_STATS_INCR(strp->stats.mem_fail);
desc->error = err; desc->error = err;
break; break;
} }
rxm = _strp_rx_msg(head); stm = _strp_msg(head);
*strp->rx_skb_nextp = skb; *strp->skb_nextp = skb;
strp->rx_skb_nextp = &skb->next; strp->skb_nextp = &skb->next;
head->data_len += skb->len; head->data_len += skb->len;
head->len += skb->len; head->len += skb->len;
head->truesize += skb->truesize; head->truesize += skb->truesize;
} }
if (!rxm->strp.full_len) { if (!stm->strp.full_len) {
ssize_t len; ssize_t len;
len = (*strp->cb.parse_msg)(strp, head); len = (*strp->cb.parse_msg)(strp, head);
if (!len) { if (!len) {
/* Need more header to determine length */ /* Need more header to determine length */
if (!rxm->accum_len) { if (!stm->accum_len) {
/* Start RX timer for new message */ /* Start RX timer for new message */
strp_start_rx_timer(strp); strp_start_timer(strp, timeo);
} }
rxm->accum_len += cand_len; stm->accum_len += cand_len;
eaten += cand_len; eaten += cand_len;
STRP_STATS_INCR(strp->stats.rx_need_more_hdr); STRP_STATS_INCR(strp->stats.need_more_hdr);
WARN_ON(eaten != orig_len); WARN_ON(eaten != orig_len);
break; break;
} else if (len < 0) { } else if (len < 0) {
if (len == -ESTRPIPE && rxm->accum_len) { if (len == -ESTRPIPE && stm->accum_len) {
len = -ENODATA; len = -ENODATA;
strp->rx_unrecov_intr = 1; strp->unrecov_intr = 1;
} else { } else {
strp->rx_interrupted = 1; strp->interrupted = 1;
} }
strp_parser_err(strp, len, desc); strp_parser_err(strp, len, desc);
break; break;
} else if (len > strp->sk->sk_rcvbuf) { } else if (len > max_msg_size) {
/* Message length exceeds maximum allowed */ /* Message length exceeds maximum allowed */
STRP_STATS_INCR(strp->stats.rx_msg_too_big); STRP_STATS_INCR(strp->stats.msg_too_big);
strp_parser_err(strp, -EMSGSIZE, desc); strp_parser_err(strp, -EMSGSIZE, desc);
break; break;
} else if (len <= (ssize_t)head->len - } else if (len <= (ssize_t)head->len -
skb->len - rxm->strp.offset) { skb->len - stm->strp.offset) {
/* Length must be into new skb (and also /* Length must be into new skb (and also
* greater than zero) * greater than zero)
*/ */
STRP_STATS_INCR(strp->stats.rx_bad_hdr_len); STRP_STATS_INCR(strp->stats.bad_hdr_len);
strp_parser_err(strp, -EPROTO, desc); strp_parser_err(strp, -EPROTO, desc);
break; break;
} }
rxm->strp.full_len = len; stm->strp.full_len = len;
} }
extra = (ssize_t)(rxm->accum_len + cand_len) - extra = (ssize_t)(stm->accum_len + cand_len) -
rxm->strp.full_len; stm->strp.full_len;
if (extra < 0) { if (extra < 0) {
/* Message not complete yet. */ /* Message not complete yet. */
if (rxm->strp.full_len - rxm->accum_len > if (stm->strp.full_len - stm->accum_len >
strp_peek_len(strp)) { strp_peek_len(strp)) {
/* Don't have the whole messages in the socket /* Don't have the whole message in the socket
* buffer. Set strp->rx_need_bytes to wait for * buffer. Set strp->need_bytes to wait for
* the rest of the message. Also, set "early * the rest of the message. Also, set "early
* eaten" since we've already buffered the skb * eaten" since we've already buffered the skb
* but don't consume yet per strp_read_sock. * but don't consume yet per strp_read_sock.
*/ */
if (!rxm->accum_len) { if (!stm->accum_len) {
/* Start RX timer for new message */ /* Start RX timer for new message */
strp_start_rx_timer(strp); strp_start_timer(strp, timeo);
} }
strp->rx_need_bytes = rxm->strp.full_len - strp->need_bytes = stm->strp.full_len -
rxm->accum_len; stm->accum_len;
rxm->accum_len += cand_len; stm->accum_len += cand_len;
rxm->early_eaten = cand_len; stm->early_eaten = cand_len;
STRP_STATS_ADD(strp->stats.rx_bytes, cand_len); STRP_STATS_ADD(strp->stats.bytes, cand_len);
desc->count = 0; /* Stop reading socket */ desc->count = 0; /* Stop reading socket */
break; break;
} }
rxm->accum_len += cand_len; stm->accum_len += cand_len;
eaten += cand_len; eaten += cand_len;
WARN_ON(eaten != orig_len); WARN_ON(eaten != orig_len);
break; break;
...@@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -308,14 +319,14 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
eaten += (cand_len - extra); eaten += (cand_len - extra);
/* Hurray, we have a new message! */ /* Hurray, we have a new message! */
del_timer(&strp->rx_msg_timer); del_timer(&strp->msg_timer);
strp->rx_skb_head = NULL; strp->skb_head = NULL;
STRP_STATS_INCR(strp->stats.rx_msgs); STRP_STATS_INCR(strp->stats.msgs);
/* Give skb to upper layer */ /* Give skb to upper layer */
strp->cb.rcv_msg(strp, head); strp->cb.rcv_msg(strp, head);
if (unlikely(strp->rx_paused)) { if (unlikely(strp->paused)) {
/* Upper layer paused strp */ /* Upper layer paused strp */
break; break;
} }
...@@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb, ...@@ -324,11 +335,33 @@ static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
if (cloned_orig) if (cloned_orig)
kfree_skb(orig_skb); kfree_skb(orig_skb);
STRP_STATS_ADD(strp->stats.rx_bytes, eaten); STRP_STATS_ADD(strp->stats.bytes, eaten);
return eaten; return eaten;
} }
int strp_process(struct strparser *strp, struct sk_buff *orig_skb,
unsigned int orig_offset, size_t orig_len,
size_t max_msg_size, long timeo)
{
read_descriptor_t desc; /* Dummy arg to strp_recv */
desc.arg.data = strp;
return __strp_recv(&desc, orig_skb, orig_offset, orig_len,
max_msg_size, timeo);
}
EXPORT_SYMBOL_GPL(strp_process);
static int strp_recv(read_descriptor_t *desc, struct sk_buff *orig_skb,
unsigned int orig_offset, size_t orig_len)
{
struct strparser *strp = (struct strparser *)desc->arg.data;
return __strp_recv(desc, orig_skb, orig_offset, orig_len,
strp->sk->sk_rcvbuf, strp->sk->sk_rcvtimeo);
}
static int default_read_sock_done(struct strparser *strp, int err) static int default_read_sock_done(struct strparser *strp, int err)
{ {
return err; return err;
...@@ -355,101 +388,129 @@ static int strp_read_sock(struct strparser *strp) ...@@ -355,101 +388,129 @@ static int strp_read_sock(struct strparser *strp)
/* Lower sock lock held */ /* Lower sock lock held */
void strp_data_ready(struct strparser *strp) void strp_data_ready(struct strparser *strp)
{ {
if (unlikely(strp->rx_stopped)) if (unlikely(strp->stopped))
return; return;
/* This check is needed to synchronize with do_strp_rx_work. /* This check is needed to synchronize with do_strp_work.
* do_strp_rx_work acquires a process lock (lock_sock) whereas * do_strp_work acquires a process lock (lock_sock) whereas
* the lock held here is bh_lock_sock. The two locks can be * the lock held here is bh_lock_sock. The two locks can be
* held by different threads at the same time, but bh_lock_sock * held by different threads at the same time, but bh_lock_sock
* allows a thread in BH context to safely check if the process * allows a thread in BH context to safely check if the process
* lock is held. In this case, if the lock is held, queue work. * lock is held. In this case, if the lock is held, queue work.
*/ */
if (sock_owned_by_user(strp->sk)) { if (sock_owned_by_user(strp->sk)) {
queue_work(strp_wq, &strp->rx_work); queue_work(strp_wq, &strp->work);
return; return;
} }
if (strp->rx_paused) if (strp->paused)
return; return;
if (strp->rx_need_bytes) { if (strp->need_bytes) {
if (strp_peek_len(strp) >= strp->rx_need_bytes) if (strp_peek_len(strp) >= strp->need_bytes)
strp->rx_need_bytes = 0; strp->need_bytes = 0;
else else
return; return;
} }
if (strp_read_sock(strp) == -ENOMEM) if (strp_read_sock(strp) == -ENOMEM)
queue_work(strp_wq, &strp->rx_work); queue_work(strp_wq, &strp->work);
} }
EXPORT_SYMBOL_GPL(strp_data_ready); EXPORT_SYMBOL_GPL(strp_data_ready);
static void do_strp_rx_work(struct strparser *strp) static void do_strp_work(struct strparser *strp)
{ {
read_descriptor_t rd_desc; read_descriptor_t rd_desc;
struct sock *csk = strp->sk;
/* We need the read lock to synchronize with strp_data_ready. We /* We need the read lock to synchronize with strp_data_ready. We
* need the socket lock for calling strp_read_sock. * need the socket lock for calling strp_read_sock.
*/ */
lock_sock(csk); strp->cb.lock(strp);
if (unlikely(strp->rx_stopped)) if (unlikely(strp->stopped))
goto out; goto out;
if (strp->rx_paused) if (strp->paused)
goto out; goto out;
rd_desc.arg.data = strp; rd_desc.arg.data = strp;
if (strp_read_sock(strp) == -ENOMEM) if (strp_read_sock(strp) == -ENOMEM)
queue_work(strp_wq, &strp->rx_work); queue_work(strp_wq, &strp->work);
out: out:
release_sock(csk); strp->cb.unlock(strp);
} }
static void strp_rx_work(struct work_struct *w) static void strp_work(struct work_struct *w)
{ {
do_strp_rx_work(container_of(w, struct strparser, rx_work)); do_strp_work(container_of(w, struct strparser, work));
} }
static void strp_rx_msg_timeout(unsigned long arg) static void strp_msg_timeout(unsigned long arg)
{ {
struct strparser *strp = (struct strparser *)arg; struct strparser *strp = (struct strparser *)arg;
/* Message assembly timed out */ /* Message assembly timed out */
STRP_STATS_INCR(strp->stats.rx_msg_timeouts); STRP_STATS_INCR(strp->stats.msg_timeouts);
lock_sock(strp->sk); strp->cb.lock(strp);
strp->cb.abort_parser(strp, ETIMEDOUT); strp->cb.abort_parser(strp, ETIMEDOUT);
strp->cb.unlock(strp);
}
static void strp_sock_lock(struct strparser *strp)
{
lock_sock(strp->sk);
}
static void strp_sock_unlock(struct strparser *strp)
{
release_sock(strp->sk); release_sock(strp->sk);
} }
int strp_init(struct strparser *strp, struct sock *csk, int strp_init(struct strparser *strp, struct sock *sk,
struct strp_callbacks *cb) struct strp_callbacks *cb)
{ {
struct socket *sock = csk->sk_socket;
if (!cb || !cb->rcv_msg || !cb->parse_msg) if (!cb || !cb->rcv_msg || !cb->parse_msg)
return -EINVAL; return -EINVAL;
/* The sk (sock) arg determines the mode of the stream parser.
*
* If the sock is set then the strparser is in receive callback mode.
* The upper layer calls strp_data_ready to kick receive processing
* and strparser calls the read_sock function on the socket to
* get packets.
*
* If the sock is not set then the strparser is in general mode.
* The upper layer calls strp_process for each skb to be parsed.
*/
if (sk) {
struct socket *sock = sk->sk_socket;
if (!sock->ops->read_sock || !sock->ops->peek_len) if (!sock->ops->read_sock || !sock->ops->peek_len)
return -EAFNOSUPPORT; return -EAFNOSUPPORT;
} else {
if (!cb->lock || !cb->unlock)
return -EINVAL;
}
memset(strp, 0, sizeof(*strp)); memset(strp, 0, sizeof(*strp));
strp->sk = csk; strp->sk = sk;
setup_timer(&strp->rx_msg_timer, strp_rx_msg_timeout,
(unsigned long)strp);
INIT_WORK(&strp->rx_work, strp_rx_work);
strp->cb.lock = cb->lock ? : strp_sock_lock;
strp->cb.unlock = cb->unlock ? : strp_sock_unlock;
strp->cb.rcv_msg = cb->rcv_msg; strp->cb.rcv_msg = cb->rcv_msg;
strp->cb.parse_msg = cb->parse_msg; strp->cb.parse_msg = cb->parse_msg;
strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done; strp->cb.read_sock_done = cb->read_sock_done ? : default_read_sock_done;
strp->cb.abort_parser = cb->abort_parser ? : strp_abort_rx_strp; strp->cb.abort_parser = cb->abort_parser ? : strp_abort_strp;
setup_timer(&strp->msg_timer, strp_msg_timeout,
(unsigned long)strp);
INIT_WORK(&strp->work, strp_work);
return 0; return 0;
} }
...@@ -457,12 +518,12 @@ EXPORT_SYMBOL_GPL(strp_init); ...@@ -457,12 +518,12 @@ EXPORT_SYMBOL_GPL(strp_init);
void strp_unpause(struct strparser *strp) void strp_unpause(struct strparser *strp)
{ {
strp->rx_paused = 0; strp->paused = 0;
/* Sync setting rx_paused with RX work */ /* Sync setting paused with RX work */
smp_mb(); smp_mb();
queue_work(strp_wq, &strp->rx_work); queue_work(strp_wq, &strp->work);
} }
EXPORT_SYMBOL_GPL(strp_unpause); EXPORT_SYMBOL_GPL(strp_unpause);
...@@ -471,27 +532,27 @@ EXPORT_SYMBOL_GPL(strp_unpause); ...@@ -471,27 +532,27 @@ EXPORT_SYMBOL_GPL(strp_unpause);
*/ */
void strp_done(struct strparser *strp) void strp_done(struct strparser *strp)
{ {
WARN_ON(!strp->rx_stopped); WARN_ON(!strp->stopped);
del_timer_sync(&strp->rx_msg_timer); del_timer_sync(&strp->msg_timer);
cancel_work_sync(&strp->rx_work); cancel_work_sync(&strp->work);
if (strp->rx_skb_head) { if (strp->skb_head) {
kfree_skb(strp->rx_skb_head); kfree_skb(strp->skb_head);
strp->rx_skb_head = NULL; strp->skb_head = NULL;
} }
} }
EXPORT_SYMBOL_GPL(strp_done); EXPORT_SYMBOL_GPL(strp_done);
void strp_stop(struct strparser *strp) void strp_stop(struct strparser *strp)
{ {
strp->rx_stopped = 1; strp->stopped = 1;
} }
EXPORT_SYMBOL_GPL(strp_stop); EXPORT_SYMBOL_GPL(strp_stop);
void strp_check_rcv(struct strparser *strp) void strp_check_rcv(struct strparser *strp)
{ {
queue_work(strp_wq, &strp->rx_work); queue_work(strp_wq, &strp->work);
} }
EXPORT_SYMBOL_GPL(strp_check_rcv); EXPORT_SYMBOL_GPL(strp_check_rcv);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment