bool force, bool terminal)
{
struct rxrpc_skb_priv *sp;
+ struct rxrpc_sock *rx = call->socket;
struct sock *sk;
int skb_len, ret;
return 0;
}
- sk = &call->socket->sk;
+ sk = &rx->sk;
if (!force) {
/* cast skb->rcvbuf to unsigned... It's pointless, but
skb->sk = sk;
atomic_add(skb->truesize, &sk->sk_rmem_alloc);
- /* Cache the SKB length before we tack it onto the receive
- * queue. Once it is added it no longer belongs to us and
- * may be freed by other threads of control pulling packets
- * from the queue.
- */
- skb_len = skb->len;
-
- _net("post skb %p", skb);
- __skb_queue_tail(&sk->sk_receive_queue, skb);
- spin_unlock_bh(&sk->sk_receive_queue.lock);
-
- if (!sock_flag(sk, SOCK_DEAD))
- sk->sk_data_ready(sk, skb_len);
-
if (terminal) {
_debug("<<<< TERMINAL MESSAGE >>>>");
set_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags);
}
+ /* allow interception by a kernel service */
+ if (rx->interceptor) {
+ rx->interceptor(sk, call->user_call_ID, skb);
+ spin_unlock_bh(&sk->sk_receive_queue.lock);
+ } else {
+
+ /* Cache the SKB length before we tack it onto the
+ * receive queue. Once it is added it no longer
+ * belongs to us and may be freed by other threads of
+ * control pulling packets from the queue */
+ skb_len = skb->len;
+
+ _net("post skb %p", skb);
+ __skb_queue_tail(&sk->sk_receive_queue, skb);
+ spin_unlock_bh(&sk->sk_receive_queue.lock);
+
+ if (!sock_flag(sk, SOCK_DEAD))
+ sk->sk_data_ready(sk, skb_len);
+ }
skb = NULL;
} else {
spin_unlock_bh(&sk->sk_receive_queue.lock);
/* we may already have the packet in the out of sequence queue */
ackbit = seq - (call->rx_data_eaten + 1);
ASSERTCMP(ackbit, >=, 0);
- if (__test_and_set_bit(ackbit, &call->ackr_window)) {
+ if (__test_and_set_bit(ackbit, call->ackr_window)) {
_debug("dup oos #%u [%u,%u]",
seq, call->rx_data_eaten, call->rx_data_post);
ack = RXRPC_ACK_DUPLICATE;
if (seq >= call->ackr_win_top) {
_debug("exceed #%u [%u]", seq, call->ackr_win_top);
- __clear_bit(ackbit, &call->ackr_window);
+ __clear_bit(ackbit, call->ackr_window);
ack = RXRPC_ACK_EXCEEDS_WINDOW;
goto discard_and_ack;
}
ret = rxrpc_queue_rcv_skb(call, skb, false, terminal);
if (ret < 0) {
if (ret == -ENOMEM || ret == -ENOBUFS) {
- __clear_bit(ackbit, &call->ackr_window);
+ __clear_bit(ackbit, call->ackr_window);
ack = RXRPC_ACK_NOSPACE;
goto discard_and_ack;
}
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_COMPLETE &&
!test_and_set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events))
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
read_unlock(&call->state_lock);
}
atomic_inc(&call->ackr_not_idle);
read_lock(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD)
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
read_unlock(&call->state_lock);
_leave(" = 0 [queued]");
return 0;
call->state = RXRPC_CALL_REMOTELY_ABORTED;
call->abort_code = abort_code;
set_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
goto free_packet_unlock;
case RXRPC_CALL_CLIENT_SEND_REQUEST:
call->state = RXRPC_CALL_SERVER_BUSY;
set_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
case RXRPC_CALL_SERVER_BUSY:
goto free_packet_unlock;
default:
read_lock_bh(&call->state_lock);
if (call->state < RXRPC_CALL_DEAD) {
skb_queue_tail(&call->rx_queue, skb);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
skb = NULL;
}
read_unlock_bh(&call->state_lock);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
free_packet_unlock:
write_unlock_bh(&call->state_lock);
call->state = RXRPC_CALL_LOCALLY_ABORTED;
call->abort_code = RX_PROTOCOL_ERROR;
set_bit(RXRPC_CALL_ABORT, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
}
write_unlock_bh(&call->state_lock);
_leave("");
switch (call->state) {
case RXRPC_CALL_LOCALLY_ABORTED:
if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
case RXRPC_CALL_REMOTELY_ABORTED:
case RXRPC_CALL_NETWORK_ERROR:
case RXRPC_CALL_DEAD:
sp->hdr.seq == __constant_cpu_to_be32(1)) {
_debug("incoming call");
skb_queue_tail(&conn->trans->local->accept_queue, skb);
- schedule_work(&conn->trans->local->acceptor);
+ rxrpc_queue_work(&conn->trans->local->acceptor);
goto done;
}
_debug("final ack again");
rxrpc_get_call(call);
set_bit(RXRPC_CALL_ACK_FINAL, &call->events);
- schedule_work(&call->processor);
+ rxrpc_queue_call(call);
free_unlock:
read_unlock(&call->state_lock);
atomic_inc(&conn->usage);
skb_queue_tail(&conn->rx_queue, skb);
- schedule_work(&conn->processor);
+ rxrpc_queue_conn(conn);
}
/*
if (sp->hdr.seq == __constant_cpu_to_be32(1)) {
_debug("first packet");
skb_queue_tail(&local->accept_queue, skb);
- schedule_work(&local->acceptor);
+ rxrpc_queue_work(&local->acceptor);
rxrpc_put_local(local);
_leave(" [incoming]");
return;