]> err.no Git - linux-2.6/blob - net/tipc/port.c
[TIPC] Initial merge
[linux-2.6] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  * 
4  * Copyright (c) 2003-2005, Ericsson Research Canada
5  * Copyright (c) 2004-2005, Wind River Systems
6  * Copyright (c) 2005-2006, Ericsson AB
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * Redistributions of source code must retain the above copyright notice, this 
13  * list of conditions and the following disclaimer.
14  * Redistributions in binary form must reproduce the above copyright notice, 
15  * this list of conditions and the following disclaimer in the documentation 
16  * and/or other materials provided with the distribution.
17  * Neither the names of the copyright holders nor the names of its 
18  * contributors may be used to endorse or promote products derived from this 
19  * software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "core.h"
35 #include "config.h"
36 #include "dbg.h"
37 #include "port.h"
38 #include "addr.h"
39 #include "link.h"
40 #include "node.h"
41 #include "port.h"
42 #include "name_table.h"
43 #include "user_reg.h"
44 #include "msg.h"
45 #include "bcast.h"
46
47 /* Connection management: */
48 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
49 #define CONFIRMED 0
50 #define PROBING 1
51
52 #define MAX_REJECT_SIZE 1024
53
54 static struct sk_buff *msg_queue_head = 0;
55 static struct sk_buff *msg_queue_tail = 0;
56
57 spinlock_t port_list_lock = SPIN_LOCK_UNLOCKED;
58 static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
59
60 LIST_HEAD(ports);
61 static void port_handle_node_down(unsigned long ref);
62 static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
63 static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
64 static void port_timeout(unsigned long ref);
65
66
67 static inline u32 port_peernode(struct port *p_ptr)
68 {
69         return msg_destnode(&p_ptr->publ.phdr);
70 }
71
72 static inline u32 port_peerport(struct port *p_ptr)
73 {
74         return msg_destport(&p_ptr->publ.phdr);
75 }
76
77 static inline u32 port_out_seqno(struct port *p_ptr)
78 {
79         return msg_transp_seqno(&p_ptr->publ.phdr);
80 }
81
82 static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno) 
83 {
84         msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
85 }
86
87 static inline void port_incr_out_seqno(struct port *p_ptr)
88 {
89         struct tipc_msg *m = &p_ptr->publ.phdr;
90
91         if (likely(!msg_routed(m)))
92                 return;
93         msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
94 }
95
96 /**
97  * tipc_multicast - send a multicast message to local and remote destinations
98  */
99
100 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
101                    u32 num_sect, struct iovec const *msg_sect)
102 {
103         struct tipc_msg *hdr;
104         struct sk_buff *buf;
105         struct sk_buff *ibuf = NULL;
106         struct port_list dports = {0, NULL, };
107         struct port *oport = port_deref(ref);
108         int ext_targets;
109         int res;
110
111         if (unlikely(!oport))
112                 return -EINVAL;
113
114         /* Create multicast message */
115
116         hdr = &oport->publ.phdr;
117         msg_set_type(hdr, TIPC_MCAST_MSG);
118         msg_set_nametype(hdr, seq->type);
119         msg_set_namelower(hdr, seq->lower);
120         msg_set_nameupper(hdr, seq->upper);
121         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
122         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
123                         !oport->user_port, &buf);
124         if (unlikely(!buf))
125                 return res;
126
127         /* Figure out where to send multicast message */
128
129         ext_targets = nametbl_mc_translate(seq->type, seq->lower, seq->upper,
130                                            TIPC_NODE_SCOPE, &dports);
131         
132         /* Send message to destinations (duplicate it only if necessary) */ 
133
134         if (ext_targets) {
135                 if (dports.count != 0) {
136                         ibuf = skb_copy(buf, GFP_ATOMIC);
137                         if (ibuf == NULL) {
138                                 port_list_free(&dports);
139                                 buf_discard(buf);
140                                 return -ENOMEM;
141                         }
142                 }
143                 res = bclink_send_msg(buf);
144                 if ((res < 0) && (dports.count != 0)) {
145                         buf_discard(ibuf);
146                 }
147         } else {
148                 ibuf = buf;
149         }
150
151         if (res >= 0) {
152                 if (ibuf)
153                         port_recv_mcast(ibuf, &dports);
154         } else {
155                 port_list_free(&dports);
156         }
157         return res;
158 }
159
160 /**
161  * port_recv_mcast - deliver multicast message to all destination ports
162  * 
163  * If there is no port list, perform a lookup to create one
164  */
165
166 void port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
167 {
168         struct tipc_msg* msg;
169         struct port_list dports = {0, NULL, };
170         struct port_list *item = dp;
171         int cnt = 0;
172
173         assert(buf);
174         msg = buf_msg(buf);
175
176         /* Create destination port list, if one wasn't supplied */
177
178         if (dp == NULL) {
179                 nametbl_mc_translate(msg_nametype(msg),
180                                      msg_namelower(msg),
181                                      msg_nameupper(msg),
182                                      TIPC_CLUSTER_SCOPE,
183                                      &dports);
184                 item = dp = &dports;
185         }
186
187         /* Deliver a copy of message to each destination port */
188
189         if (dp->count != 0) {
190                 if (dp->count == 1) {
191                         msg_set_destport(msg, dp->ports[0]);
192                         port_recv_msg(buf);
193                         port_list_free(dp);
194                         return;
195                 }
196                 for (; cnt < dp->count; cnt++) {
197                         int index = cnt % PLSIZE;
198                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
199
200                         if (b == NULL) {
201                                 warn("Buffer allocation failure\n");
202                                 msg_dbg(msg, "LOST:");
203                                 goto exit;
204                         }
205                         if ((index == 0) && (cnt != 0)) {
206                                 item = item->next;
207                         }
208                         msg_set_destport(buf_msg(b),item->ports[index]);
209                         port_recv_msg(b);
210                 }
211         }
212 exit:
213         buf_discard(buf);
214         port_list_free(dp);
215 }
216
217 /**
218  * tipc_createport_raw - create a native TIPC port
219  * 
220  * Returns local port reference
221  */
222
223 u32 tipc_createport_raw(void *usr_handle,
224                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
225                         void (*wakeup)(struct tipc_port *),
226                         const u32 importance)
227 {
228         struct port *p_ptr;
229         struct tipc_msg *msg;
230         u32 ref;
231
232         p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
233         if (p_ptr == NULL) {
234                 warn("Memory squeeze; failed to create port\n");
235                 return 0;
236         }
237         memset(p_ptr, 0, sizeof(*p_ptr));
238         ref = ref_acquire(p_ptr, &p_ptr->publ.lock);
239         if (!ref) {
240                 warn("Reference Table Exhausted\n");
241                 kfree(p_ptr);
242                 return 0;
243         }
244
245         port_lock(ref);
246         p_ptr->publ.ref = ref;
247         msg = &p_ptr->publ.phdr;
248         msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0);
249         msg_set_orignode(msg, tipc_own_addr);
250         msg_set_prevnode(msg, tipc_own_addr);
251         msg_set_origport(msg, ref);
252         msg_set_importance(msg,importance);
253         p_ptr->last_in_seqno = 41;
254         p_ptr->sent = 1;
255         p_ptr->publ.usr_handle = usr_handle;
256         INIT_LIST_HEAD(&p_ptr->wait_list);
257         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
258         p_ptr->congested_link = 0;
259         p_ptr->max_pkt = MAX_PKT_DEFAULT;
260         p_ptr->dispatcher = dispatcher;
261         p_ptr->wakeup = wakeup;
262         p_ptr->user_port = 0;
263         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
264         spin_lock_bh(&port_list_lock);
265         INIT_LIST_HEAD(&p_ptr->publications);
266         INIT_LIST_HEAD(&p_ptr->port_list);
267         list_add_tail(&p_ptr->port_list, &ports);
268         spin_unlock_bh(&port_list_lock);
269         port_unlock(p_ptr);
270         return ref;
271 }
272
273 int tipc_deleteport(u32 ref)
274 {
275         struct port *p_ptr;
276         struct sk_buff *buf = 0;
277
278         tipc_withdraw(ref, 0, 0);
279         p_ptr = port_lock(ref);
280         if (!p_ptr) 
281                 return -EINVAL;
282
283         ref_discard(ref);
284         port_unlock(p_ptr);
285
286         k_cancel_timer(&p_ptr->timer);
287         if (p_ptr->publ.connected) {
288                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
289                 nodesub_unsubscribe(&p_ptr->subscription);
290         }
291         if (p_ptr->user_port) {
292                 reg_remove_port(p_ptr->user_port);
293                 kfree(p_ptr->user_port);
294         }
295
296         spin_lock_bh(&port_list_lock);
297         list_del(&p_ptr->port_list);
298         list_del(&p_ptr->wait_list);
299         spin_unlock_bh(&port_list_lock);
300         k_term_timer(&p_ptr->timer);
301         kfree(p_ptr);
302         dbg("Deleted port %u\n", ref);
303         net_route_msg(buf);
304         return TIPC_OK;
305 }
306
307 /**
308  * tipc_get_port() - return port associated with 'ref'
309  * 
310  * Note: Port is not locked.
311  */
312
313 struct tipc_port *tipc_get_port(const u32 ref)
314 {
315         return (struct tipc_port *)ref_deref(ref);
316 }
317
318 /**
319  * tipc_get_handle - return user handle associated to port 'ref'
320  */
321
322 void *tipc_get_handle(const u32 ref)
323 {
324         struct port *p_ptr;
325         void * handle;
326
327         p_ptr = port_lock(ref);
328         if (!p_ptr)
329                 return 0;
330         handle = p_ptr->publ.usr_handle;
331         port_unlock(p_ptr);
332         return handle;
333 }
334
335 static inline int port_unreliable(struct port *p_ptr)
336 {
337         return msg_src_droppable(&p_ptr->publ.phdr);
338 }
339
340 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
341 {
342         struct port *p_ptr;
343         
344         p_ptr = port_lock(ref);
345         if (!p_ptr)
346                 return -EINVAL;
347         *isunreliable = port_unreliable(p_ptr);
348         spin_unlock_bh(p_ptr->publ.lock);
349         return TIPC_OK;
350 }
351
352 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
353 {
354         struct port *p_ptr;
355         
356         p_ptr = port_lock(ref);
357         if (!p_ptr)
358                 return -EINVAL;
359         msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
360         port_unlock(p_ptr);
361         return TIPC_OK;
362 }
363
364 static inline int port_unreturnable(struct port *p_ptr)
365 {
366         return msg_dest_droppable(&p_ptr->publ.phdr);
367 }
368
369 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
370 {
371         struct port *p_ptr;
372         
373         p_ptr = port_lock(ref);
374         if (!p_ptr)
375                 return -EINVAL;
376         *isunrejectable = port_unreturnable(p_ptr);
377         spin_unlock_bh(p_ptr->publ.lock);
378         return TIPC_OK;
379 }
380
381 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
382 {
383         struct port *p_ptr;
384         
385         p_ptr = port_lock(ref);
386         if (!p_ptr)
387                 return -EINVAL;
388         msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
389         port_unlock(p_ptr);
390         return TIPC_OK;
391 }
392
393 /* 
394  * port_build_proto_msg(): build a port level protocol 
395  * or a connection abortion message. Called with 
396  * tipc_port lock on.
397  */
398 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
399                                             u32 origport, u32 orignode,
400                                             u32 usr, u32 type, u32 err, 
401                                             u32 seqno, u32 ack)
402 {
403         struct sk_buff *buf;
404         struct tipc_msg *msg;
405         
406         buf = buf_acquire(LONG_H_SIZE);
407         if (buf) {
408                 msg = buf_msg(buf);
409                 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode);
410                 msg_set_destport(msg, destport);
411                 msg_set_origport(msg, origport);
412                 msg_set_destnode(msg, destnode);
413                 msg_set_orignode(msg, orignode);
414                 msg_set_transp_seqno(msg, seqno);
415                 msg_set_msgcnt(msg, ack);
416                 msg_dbg(msg, "PORT>SEND>:");
417         }
418         return buf;
419 }
420
421 int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz)
422 {
423         msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr));
424         msg_set_options(&tp_ptr->phdr, opt, sz);
425         return TIPC_OK;
426 }
427
428 int tipc_reject_msg(struct sk_buff *buf, u32 err)
429 {
430         struct tipc_msg *msg = buf_msg(buf);
431         struct sk_buff *rbuf;
432         struct tipc_msg *rmsg;
433         int hdr_sz;
434         u32 imp = msg_importance(msg);
435         u32 data_sz = msg_data_sz(msg);
436
437         if (data_sz > MAX_REJECT_SIZE)
438                 data_sz = MAX_REJECT_SIZE;
439         if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
440                 imp++;
441         msg_dbg(msg, "port->rej: ");
442
443         /* discard rejected message if it shouldn't be returned to sender */
444         if (msg_errcode(msg) || msg_dest_droppable(msg)) {
445                 buf_discard(buf);
446                 return data_sz;
447         }
448
449         /* construct rejected message */
450         if (msg_mcast(msg))
451                 hdr_sz = MCAST_H_SIZE;
452         else
453                 hdr_sz = LONG_H_SIZE;
454         rbuf = buf_acquire(data_sz + hdr_sz);
455         if (rbuf == NULL) {
456                 buf_discard(buf);
457                 return data_sz;
458         }
459         rmsg = buf_msg(rbuf);
460         msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg));
461         msg_set_destport(rmsg, msg_origport(msg));
462         msg_set_prevnode(rmsg, tipc_own_addr);
463         msg_set_origport(rmsg, msg_destport(msg));
464         if (msg_short(msg))
465                 msg_set_orignode(rmsg, tipc_own_addr);
466         else
467                 msg_set_orignode(rmsg, msg_destnode(msg));
468         msg_set_size(rmsg, data_sz + hdr_sz); 
469         msg_set_nametype(rmsg, msg_nametype(msg));
470         msg_set_nameinst(rmsg, msg_nameinst(msg));
471         memcpy(rbuf->data + hdr_sz, msg_data(msg), data_sz);
472
473         /* send self-abort message when rejecting on a connected port */
474         if (msg_connected(msg)) {
475                 struct sk_buff *abuf = 0;
476                 struct port *p_ptr = port_lock(msg_destport(msg));
477
478                 if (p_ptr) {
479                         if (p_ptr->publ.connected)
480                                 abuf = port_build_self_abort_msg(p_ptr, err);
481                         port_unlock(p_ptr);
482                 }
483                 net_route_msg(abuf);
484         }
485
486         /* send rejected message */
487         buf_discard(buf);
488         net_route_msg(rbuf);
489         return data_sz;
490 }
491
492 int port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
493                          struct iovec const *msg_sect, u32 num_sect,
494                          int err)
495 {
496         struct sk_buff *buf;
497         int res;
498
499         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 
500                         !p_ptr->user_port, &buf);
501         if (!buf)
502                 return res;
503
504         return tipc_reject_msg(buf, err);
505 }
506
507 static void port_timeout(unsigned long ref)
508 {
509         struct port *p_ptr = port_lock(ref);
510         struct sk_buff *buf = 0;
511
512         if (!p_ptr || !p_ptr->publ.connected)
513                 return;
514
515         /* Last probe answered ? */
516         if (p_ptr->probing_state == PROBING) {
517                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
518         } else {
519                 buf = port_build_proto_msg(port_peerport(p_ptr),
520                                            port_peernode(p_ptr),
521                                            p_ptr->publ.ref,
522                                            tipc_own_addr,
523                                            CONN_MANAGER,
524                                            CONN_PROBE,
525                                            TIPC_OK, 
526                                            port_out_seqno(p_ptr),
527                                            0);
528                 port_incr_out_seqno(p_ptr);
529                 p_ptr->probing_state = PROBING;
530                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
531         }
532         port_unlock(p_ptr);
533         net_route_msg(buf);
534 }
535
536
537 static void port_handle_node_down(unsigned long ref)
538 {
539         struct port *p_ptr = port_lock(ref);
540         struct sk_buff* buf = 0;
541
542         if (!p_ptr)
543                 return;
544         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
545         port_unlock(p_ptr);
546         net_route_msg(buf);
547 }
548
549
550 static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
551 {
552         u32 imp = msg_importance(&p_ptr->publ.phdr);
553
554         if (!p_ptr->publ.connected)
555                 return 0;
556         if (imp < TIPC_CRITICAL_IMPORTANCE)
557                 imp++;
558         return port_build_proto_msg(p_ptr->publ.ref,
559                                     tipc_own_addr,
560                                     port_peerport(p_ptr),
561                                     port_peernode(p_ptr),
562                                     imp,
563                                     TIPC_CONN_MSG,
564                                     err, 
565                                     p_ptr->last_in_seqno + 1,
566                                     0);
567 }
568
569
570 static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
571 {
572         u32 imp = msg_importance(&p_ptr->publ.phdr);
573
574         if (!p_ptr->publ.connected)
575                 return 0;
576         if (imp < TIPC_CRITICAL_IMPORTANCE)
577                 imp++;
578         return port_build_proto_msg(port_peerport(p_ptr),
579                                     port_peernode(p_ptr),
580                                     p_ptr->publ.ref,
581                                     tipc_own_addr,
582                                     imp,
583                                     TIPC_CONN_MSG,
584                                     err, 
585                                     port_out_seqno(p_ptr),
586                                     0);
587 }
588
589 void port_recv_proto_msg(struct sk_buff *buf)
590 {
591         struct tipc_msg *msg = buf_msg(buf);
592         struct port *p_ptr = port_lock(msg_destport(msg));
593         u32 err = TIPC_OK;
594         struct sk_buff *r_buf = 0;
595         struct sk_buff *abort_buf = 0;
596
597         msg_dbg(msg, "PORT<RECV<:");
598
599         if (!p_ptr) {
600                 err = TIPC_ERR_NO_PORT;
601         } else if (p_ptr->publ.connected) {
602                 if (port_peernode(p_ptr) != msg_orignode(msg))
603                         err = TIPC_ERR_NO_PORT;
604                 if (port_peerport(p_ptr) != msg_origport(msg))
605                         err = TIPC_ERR_NO_PORT;
606                 if (!err && msg_routed(msg)) {
607                         u32 seqno = msg_transp_seqno(msg);
608                         u32 myno =  ++p_ptr->last_in_seqno;
609                         if (seqno != myno) {
610                                 err = TIPC_ERR_NO_PORT;
611                                 abort_buf = port_build_self_abort_msg(p_ptr, err);
612                         }
613                 }
614                 if (msg_type(msg) == CONN_ACK) {
615                         int wakeup = port_congested(p_ptr) && 
616                                      p_ptr->publ.congested &&
617                                      p_ptr->wakeup;
618                         p_ptr->acked += msg_msgcnt(msg);
619                         if (port_congested(p_ptr))
620                                 goto exit;
621                         p_ptr->publ.congested = 0;
622                         if (!wakeup)
623                                 goto exit;
624                         p_ptr->wakeup(&p_ptr->publ);
625                         goto exit;
626                 }
627         } else if (p_ptr->publ.published) {
628                 err = TIPC_ERR_NO_PORT;
629         }
630         if (err) {
631                 r_buf = port_build_proto_msg(msg_origport(msg),
632                                              msg_orignode(msg), 
633                                              msg_destport(msg), 
634                                              tipc_own_addr,
635                                              DATA_HIGH,
636                                              TIPC_CONN_MSG,
637                                              err,
638                                              0,
639                                              0);
640                 goto exit;
641         }
642
643         /* All is fine */
644         if (msg_type(msg) == CONN_PROBE) {
645                 r_buf = port_build_proto_msg(msg_origport(msg), 
646                                              msg_orignode(msg), 
647                                              msg_destport(msg), 
648                                              tipc_own_addr, 
649                                              CONN_MANAGER,
650                                              CONN_PROBE_REPLY,
651                                              TIPC_OK,
652                                              port_out_seqno(p_ptr),
653                                              0);
654         }
655         p_ptr->probing_state = CONFIRMED;
656         port_incr_out_seqno(p_ptr);
657 exit:
658         if (p_ptr)
659                 port_unlock(p_ptr);
660         net_route_msg(r_buf);
661         net_route_msg(abort_buf);
662         buf_discard(buf);
663 }
664
665 static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
666 {
667         struct publication *publ;
668
669         if (full_id)
670                 tipc_printf(buf, "<%u.%u.%u:%u>:", 
671                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
672                             tipc_node(tipc_own_addr), p_ptr->publ.ref);
673         else
674                 tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
675
676         if (p_ptr->publ.connected) {
677                 u32 dport = port_peerport(p_ptr);
678                 u32 destnode = port_peernode(p_ptr);
679
680                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
681                             tipc_zone(destnode), tipc_cluster(destnode),
682                             tipc_node(destnode), dport);
683                 if (p_ptr->publ.conn_type != 0)
684                         tipc_printf(buf, " via {%u,%u}",
685                                     p_ptr->publ.conn_type,
686                                     p_ptr->publ.conn_instance);
687         }
688         else if (p_ptr->publ.published) {
689                 tipc_printf(buf, " bound to");
690                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
691                         if (publ->lower == publ->upper)
692                                 tipc_printf(buf, " {%u,%u}", publ->type,
693                                             publ->lower);
694                         else
695                                 tipc_printf(buf, " {%u,%u,%u}", publ->type, 
696                                             publ->lower, publ->upper);
697                 }
698         }
699         tipc_printf(buf, "\n");
700 }
701
702 #define MAX_PORT_QUERY 32768
703
704 struct sk_buff *port_get_ports(void)
705 {
706         struct sk_buff *buf;
707         struct tlv_desc *rep_tlv;
708         struct print_buf pb;
709         struct port *p_ptr;
710         int str_len;
711
712         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
713         if (!buf)
714                 return NULL;
715         rep_tlv = (struct tlv_desc *)buf->data;
716
717         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
718         spin_lock_bh(&port_list_lock);
719         list_for_each_entry(p_ptr, &ports, port_list) {
720                 spin_lock_bh(p_ptr->publ.lock);
721                 port_print(p_ptr, &pb, 0);
722                 spin_unlock_bh(p_ptr->publ.lock);
723         }
724         spin_unlock_bh(&port_list_lock);
725         str_len = printbuf_validate(&pb);
726
727         skb_put(buf, TLV_SPACE(str_len));
728         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
729
730         return buf;
731 }
732
733 #if 0
734
735 #define MAX_PORT_STATS 2000
736
737 struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
738 {
739         u32 ref;
740         struct port *p_ptr;
741         struct sk_buff *buf;
742         struct tlv_desc *rep_tlv;
743         struct print_buf pb;
744         int str_len;
745
746         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
747                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
748
749         ref = *(u32 *)TLV_DATA(req_tlv_area);
750         ref = ntohl(ref);
751
752         p_ptr = port_lock(ref);
753         if (!p_ptr)
754                 return cfg_reply_error_string("port not found");
755
756         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
757         if (!buf) {
758                 port_unlock(p_ptr);
759                 return NULL;
760         }
761         rep_tlv = (struct tlv_desc *)buf->data;
762
763         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
764         port_print(p_ptr, &pb, 1);
765         /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
766         port_unlock(p_ptr);
767         str_len = printbuf_validate(&pb);
768
769         skb_put(buf, TLV_SPACE(str_len));
770         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
771
772         return buf;
773 }
774
775 #endif
776
777 void port_reinit(void)
778 {
779         struct port *p_ptr;
780         struct tipc_msg *msg;
781
782         spin_lock_bh(&port_list_lock);
783         list_for_each_entry(p_ptr, &ports, port_list) {
784                 msg = &p_ptr->publ.phdr;
785                 if (msg_orignode(msg) == tipc_own_addr)
786                         break;
787                 msg_set_orignode(msg, tipc_own_addr);
788         }
789         spin_unlock_bh(&port_list_lock);
790 }
791
792
793 /*
794  *  port_dispatcher_sigh(): Signal handler for messages destinated
795  *                          to the tipc_port interface.
796  */
797
798 static void port_dispatcher_sigh(void *dummy)
799 {
800         struct sk_buff *buf;
801
802         spin_lock_bh(&queue_lock);
803         buf = msg_queue_head;
804         msg_queue_head = 0;
805         spin_unlock_bh(&queue_lock);
806
807         while (buf) {
808                 struct port *p_ptr;
809                 struct user_port *up_ptr;
810                 struct tipc_portid orig;
811                 struct tipc_name_seq dseq;
812                 void *usr_handle;
813                 int connected;
814                 int published;
815
816                 struct sk_buff *next = buf->next;
817                 struct tipc_msg *msg = buf_msg(buf);
818                 u32 dref = msg_destport(msg);
819                 
820                 p_ptr = port_lock(dref);
821                 if (!p_ptr) {
822                         /* Port deleted while msg in queue */
823                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
824                         buf = next;
825                         continue;
826                 }
827                 orig.ref = msg_origport(msg);
828                 orig.node = msg_orignode(msg);
829                 up_ptr = p_ptr->user_port;
830                 usr_handle = up_ptr->usr_handle;
831                 connected = p_ptr->publ.connected;
832                 published = p_ptr->publ.published;
833
834                 if (unlikely(msg_errcode(msg)))
835                         goto err;
836
837                 switch (msg_type(msg)) {
838                 
839                 case TIPC_CONN_MSG:{
840                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
841                                 u32 peer_port = port_peerport(p_ptr);
842                                 u32 peer_node = port_peernode(p_ptr);
843
844                                 spin_unlock_bh(p_ptr->publ.lock);
845                                 if (unlikely(!connected)) {
846                                         if (unlikely(published))
847                                                 goto reject;
848                                         tipc_connect2port(dref,&orig);
849                                 }
850                                 if (unlikely(msg_origport(msg) != peer_port))
851                                         goto reject;
852                                 if (unlikely(msg_orignode(msg) != peer_node))
853                                         goto reject;
854                                 if (unlikely(!cb))
855                                         goto reject;
856                                 if (unlikely(++p_ptr->publ.conn_unacked >= 
857                                              TIPC_FLOW_CONTROL_WIN))
858                                         tipc_acknowledge(dref, 
859                                                          p_ptr->publ.conn_unacked);
860                                 skb_pull(buf, msg_hdr_sz(msg));
861                                 cb(usr_handle, dref, &buf, msg_data(msg),
862                                    msg_data_sz(msg));
863                                 break;
864                         }
865                 case TIPC_DIRECT_MSG:{
866                                 tipc_msg_event cb = up_ptr->msg_cb;
867
868                                 spin_unlock_bh(p_ptr->publ.lock);
869                                 if (unlikely(connected))
870                                         goto reject;
871                                 if (unlikely(!cb))
872                                         goto reject;
873                                 skb_pull(buf, msg_hdr_sz(msg));
874                                 cb(usr_handle, dref, &buf, msg_data(msg), 
875                                    msg_data_sz(msg), msg_importance(msg),
876                                    &orig);
877                                 break;
878                         }
879                 case TIPC_NAMED_MSG:{
880                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
881
882                                 spin_unlock_bh(p_ptr->publ.lock);
883                                 if (unlikely(connected))
884                                         goto reject;
885                                 if (unlikely(!cb))
886                                         goto reject;
887                                 if (unlikely(!published))
888                                         goto reject;
889                                 dseq.type =  msg_nametype(msg);
890                                 dseq.lower = msg_nameinst(msg);
891                                 dseq.upper = dseq.lower;
892                                 skb_pull(buf, msg_hdr_sz(msg));
893                                 cb(usr_handle, dref, &buf, msg_data(msg), 
894                                    msg_data_sz(msg), msg_importance(msg),
895                                    &orig, &dseq);
896                                 break;
897                         }
898                 }
899                 if (buf)
900                         buf_discard(buf);
901                 buf = next;
902                 continue;
903 err:
904                 switch (msg_type(msg)) {
905                 
906                 case TIPC_CONN_MSG:{
907                                 tipc_conn_shutdown_event cb = 
908                                         up_ptr->conn_err_cb;
909                                 u32 peer_port = port_peerport(p_ptr);
910                                 u32 peer_node = port_peernode(p_ptr);
911
912                                 spin_unlock_bh(p_ptr->publ.lock);
913                                 if (!connected || !cb)
914                                         break;
915                                 if (msg_origport(msg) != peer_port)
916                                         break;
917                                 if (msg_orignode(msg) != peer_node)
918                                         break;
919                                 tipc_disconnect(dref);
920                                 skb_pull(buf, msg_hdr_sz(msg));
921                                 cb(usr_handle, dref, &buf, msg_data(msg),
922                                    msg_data_sz(msg), msg_errcode(msg));
923                                 break;
924                         }
925                 case TIPC_DIRECT_MSG:{
926                                 tipc_msg_err_event cb = up_ptr->err_cb;
927
928                                 spin_unlock_bh(p_ptr->publ.lock);
929                                 if (connected || !cb)
930                                         break;
931                                 skb_pull(buf, msg_hdr_sz(msg));
932                                 cb(usr_handle, dref, &buf, msg_data(msg),
933                                    msg_data_sz(msg), msg_errcode(msg), &orig);
934                                 break;
935                         }
936                 case TIPC_NAMED_MSG:{
937                                 tipc_named_msg_err_event cb = 
938                                         up_ptr->named_err_cb;
939
940                                 spin_unlock_bh(p_ptr->publ.lock);
941                                 if (connected || !cb)
942                                         break;
943                                 dseq.type =  msg_nametype(msg);
944                                 dseq.lower = msg_nameinst(msg);
945                                 dseq.upper = dseq.lower;
946                                 skb_pull(buf, msg_hdr_sz(msg));
947                                 cb(usr_handle, dref, &buf, msg_data(msg), 
948                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
949                                 break;
950                         }
951                 }
952                 if (buf)
953                         buf_discard(buf);
954                 buf = next;
955                 continue;
956 reject:
957                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
958                 buf = next;
959         }
960 }
961
962 /*
963  *  port_dispatcher(): Dispatcher for messages destinated
964  *  to the tipc_port interface. Called with port locked.
965  */
966
967 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
968 {
969         buf->next = NULL;
970         spin_lock_bh(&queue_lock);
971         if (msg_queue_head) {
972                 msg_queue_tail->next = buf;
973                 msg_queue_tail = buf;
974         } else {
975                 msg_queue_tail = msg_queue_head = buf;
976                 k_signal((Handler)port_dispatcher_sigh, 0);
977         }
978         spin_unlock_bh(&queue_lock);
979         return TIPC_OK;
980 }
981
982 /* 
983  * Wake up port after congestion: Called with port locked,
984  *                                
985  */
986
987 static void port_wakeup_sh(unsigned long ref)
988 {
989         struct port *p_ptr;
990         struct user_port *up_ptr;
991         tipc_continue_event cb = 0;
992         void *uh = 0;
993
994         p_ptr = port_lock(ref);
995         if (p_ptr) {
996                 up_ptr = p_ptr->user_port;
997                 if (up_ptr) {
998                         cb = up_ptr->continue_event_cb;
999                         uh = up_ptr->usr_handle;
1000                 }
1001                 port_unlock(p_ptr);
1002         }
1003         if (cb)
1004                 cb(uh, ref);
1005 }
1006
1007
1008 static void port_wakeup(struct tipc_port *p_ptr)
1009 {
1010         k_signal((Handler)port_wakeup_sh, p_ptr->ref);
1011 }
1012
1013 void tipc_acknowledge(u32 ref, u32 ack)
1014 {
1015         struct port *p_ptr;
1016         struct sk_buff *buf = 0;
1017
1018         p_ptr = port_lock(ref);
1019         if (!p_ptr)
1020                 return;
1021         if (p_ptr->publ.connected) {
1022                 p_ptr->publ.conn_unacked -= ack;
1023                 buf = port_build_proto_msg(port_peerport(p_ptr),
1024                                            port_peernode(p_ptr),
1025                                            ref,
1026                                            tipc_own_addr,
1027                                            CONN_MANAGER,
1028                                            CONN_ACK,
1029                                            TIPC_OK, 
1030                                            port_out_seqno(p_ptr),
1031                                            ack);
1032         }
1033         port_unlock(p_ptr);
1034         net_route_msg(buf);
1035 }
1036
1037 /*
1038  * tipc_createport(): user level call. Will add port to
1039  *                    registry if non-zero user_ref.
1040  */
1041
1042 int tipc_createport(u32 user_ref, 
1043                     void *usr_handle, 
1044                     unsigned int importance, 
1045                     tipc_msg_err_event error_cb, 
1046                     tipc_named_msg_err_event named_error_cb, 
1047                     tipc_conn_shutdown_event conn_error_cb, 
1048                     tipc_msg_event msg_cb, 
1049                     tipc_named_msg_event named_msg_cb, 
1050                     tipc_conn_msg_event conn_msg_cb, 
1051                     tipc_continue_event continue_event_cb,/* May be zero */
1052                     u32 *portref)
1053 {
1054         struct user_port *up_ptr;
1055         struct port *p_ptr; 
1056         u32 ref;
1057
1058         up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1059         if (up_ptr == NULL) {
1060                 return -ENOMEM;
1061         }
1062         ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
1063         p_ptr = port_lock(ref);
1064         if (!p_ptr) {
1065                 kfree(up_ptr);
1066                 return -ENOMEM;
1067         }
1068
1069         p_ptr->user_port = up_ptr;
1070         up_ptr->user_ref = user_ref;
1071         up_ptr->usr_handle = usr_handle;
1072         up_ptr->ref = p_ptr->publ.ref;
1073         up_ptr->err_cb = error_cb;
1074         up_ptr->named_err_cb = named_error_cb;
1075         up_ptr->conn_err_cb = conn_error_cb;
1076         up_ptr->msg_cb = msg_cb;
1077         up_ptr->named_msg_cb = named_msg_cb;
1078         up_ptr->conn_msg_cb = conn_msg_cb;
1079         up_ptr->continue_event_cb = continue_event_cb;
1080         INIT_LIST_HEAD(&up_ptr->uport_list);
1081         reg_add_port(up_ptr);
1082         *portref = p_ptr->publ.ref;
1083         dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref);        
1084         port_unlock(p_ptr);
1085         return TIPC_OK;
1086 }
1087
1088 int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1089 {
1090         id->ref = ref;
1091         id->node = tipc_own_addr;
1092         return TIPC_OK;
1093 }
1094
1095 int tipc_portimportance(u32 ref, unsigned int *importance)
1096 {
1097         struct port *p_ptr;
1098         
1099         p_ptr = port_lock(ref);
1100         if (!p_ptr)
1101                 return -EINVAL;
1102         *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1103         spin_unlock_bh(p_ptr->publ.lock);
1104         return TIPC_OK;
1105 }
1106
1107 int tipc_set_portimportance(u32 ref, unsigned int imp)
1108 {
1109         struct port *p_ptr;
1110
1111         if (imp > TIPC_CRITICAL_IMPORTANCE)
1112                 return -EINVAL;
1113
1114         p_ptr = port_lock(ref);
1115         if (!p_ptr)
1116                 return -EINVAL;
1117         msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1118         spin_unlock_bh(p_ptr->publ.lock);
1119         return TIPC_OK;
1120 }
1121
1122
1123 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1124 {
1125         struct port *p_ptr;
1126         struct publication *publ;
1127         u32 key;
1128         int res = -EINVAL;
1129
1130         p_ptr = port_lock(ref);
1131         dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1132             "lower = %u, upper = %u\n",
1133             ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1134         if (!p_ptr)
1135                 return -EINVAL;
1136         if (p_ptr->publ.connected)
1137                 goto exit;
1138         if (seq->lower > seq->upper)
1139                 goto exit;
1140         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1141                 goto exit;
1142         key = ref + p_ptr->pub_count + 1;
1143         if (key == ref) {
1144                 res = -EADDRINUSE;
1145                 goto exit;
1146         }
1147         publ = nametbl_publish(seq->type, seq->lower, seq->upper,
1148                                scope, p_ptr->publ.ref, key);
1149         if (publ) {
1150                 list_add(&publ->pport_list, &p_ptr->publications);
1151                 p_ptr->pub_count++;
1152                 p_ptr->publ.published = 1;
1153                 res = TIPC_OK;
1154         }
1155 exit:
1156         port_unlock(p_ptr);
1157         return res;
1158 }
1159
1160 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1161 {
1162         struct port *p_ptr;
1163         struct publication *publ;
1164         struct publication *tpubl;
1165         int res = -EINVAL;
1166         
1167         p_ptr = port_lock(ref);
1168         if (!p_ptr)
1169                 return -EINVAL;
1170         if (!p_ptr->publ.published)
1171                 goto exit;
1172         if (!seq) {
1173                 list_for_each_entry_safe(publ, tpubl, 
1174                                          &p_ptr->publications, pport_list) {
1175                         nametbl_withdraw(publ->type, publ->lower, 
1176                                          publ->ref, publ->key);
1177                 }
1178                 res = TIPC_OK;
1179         } else {
1180                 list_for_each_entry_safe(publ, tpubl, 
1181                                          &p_ptr->publications, pport_list) {
1182                         if (publ->scope != scope)
1183                                 continue;
1184                         if (publ->type != seq->type)
1185                                 continue;
1186                         if (publ->lower != seq->lower)
1187                                 continue;
1188                         if (publ->upper != seq->upper)
1189                                 break;
1190                         nametbl_withdraw(publ->type, publ->lower, 
1191                                          publ->ref, publ->key);
1192                         res = TIPC_OK;
1193                         break;
1194                 }
1195         }
1196         if (list_empty(&p_ptr->publications))
1197                 p_ptr->publ.published = 0;
1198 exit:
1199         port_unlock(p_ptr);
1200         return res;
1201 }
1202
1203 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1204 {
1205         struct port *p_ptr;
1206         struct tipc_msg *msg;
1207         int res = -EINVAL;
1208
1209         p_ptr = port_lock(ref);
1210         if (!p_ptr)
1211                 return -EINVAL;
1212         if (p_ptr->publ.published || p_ptr->publ.connected)
1213                 goto exit;
1214         if (!peer->ref)
1215                 goto exit;
1216
1217         msg = &p_ptr->publ.phdr;
1218         msg_set_destnode(msg, peer->node);
1219         msg_set_destport(msg, peer->ref);
1220         msg_set_orignode(msg, tipc_own_addr);
1221         msg_set_origport(msg, p_ptr->publ.ref);
1222         msg_set_transp_seqno(msg, 42);
1223         msg_set_type(msg, TIPC_CONN_MSG);
1224         if (!may_route(peer->node))
1225                 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1226         else
1227                 msg_set_hdr_sz(msg, LONG_H_SIZE);
1228
1229         p_ptr->probing_interval = PROBING_INTERVAL;
1230         p_ptr->probing_state = CONFIRMED;
1231         p_ptr->publ.connected = 1;
1232         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1233
1234         nodesub_subscribe(&p_ptr->subscription,peer->node, (void *)ref,
1235                           (net_ev_handler)port_handle_node_down);
1236         res = TIPC_OK;
1237 exit:
1238         port_unlock(p_ptr);
1239         p_ptr->max_pkt = link_get_max_pkt(peer->node, ref);
1240         return res;
1241 }
1242
1243 /*
1244  * tipc_disconnect(): Disconnect port form peer.
1245  *                    This is a node local operation.
1246  */
1247
1248 int tipc_disconnect(u32 ref)
1249 {
1250         struct port *p_ptr;
1251         int res = -ENOTCONN;
1252
1253         p_ptr = port_lock(ref);
1254         if (!p_ptr)
1255                 return -EINVAL;
1256         if (p_ptr->publ.connected) {
1257                 p_ptr->publ.connected = 0;
1258                 /* let timer expire on it's own to avoid deadlock! */
1259                 nodesub_unsubscribe(&p_ptr->subscription);
1260                 res = TIPC_OK;
1261         }
1262         port_unlock(p_ptr);
1263         return res;
1264 }
1265
1266 /*
1267  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1268  */
1269 int tipc_shutdown(u32 ref)
1270 {
1271         struct port *p_ptr;
1272         struct sk_buff *buf = 0;
1273
1274         p_ptr = port_lock(ref);
1275         if (!p_ptr)
1276                 return -EINVAL;
1277
1278         if (p_ptr->publ.connected) {
1279                 u32 imp = msg_importance(&p_ptr->publ.phdr);
1280                 if (imp < TIPC_CRITICAL_IMPORTANCE)
1281                         imp++;
1282                 buf = port_build_proto_msg(port_peerport(p_ptr),
1283                                            port_peernode(p_ptr),
1284                                            ref,
1285                                            tipc_own_addr,
1286                                            imp,
1287                                            TIPC_CONN_MSG,
1288                                            TIPC_CONN_SHUTDOWN, 
1289                                            port_out_seqno(p_ptr),
1290                                            0);
1291         }
1292         port_unlock(p_ptr);
1293         net_route_msg(buf);
1294         return tipc_disconnect(ref);
1295 }
1296
1297 int tipc_isconnected(u32 ref, int *isconnected)
1298 {
1299         struct port *p_ptr;
1300         
1301         p_ptr = port_lock(ref);
1302         if (!p_ptr)
1303                 return -EINVAL;
1304         *isconnected = p_ptr->publ.connected;
1305         port_unlock(p_ptr);
1306         return TIPC_OK;
1307 }
1308
1309 int tipc_peer(u32 ref, struct tipc_portid *peer)
1310 {
1311         struct port *p_ptr;
1312         int res;
1313          
1314         p_ptr = port_lock(ref);
1315         if (!p_ptr)
1316                 return -EINVAL;
1317         if (p_ptr->publ.connected) {
1318                 peer->ref = port_peerport(p_ptr);
1319                 peer->node = port_peernode(p_ptr);
1320                 res = TIPC_OK;
1321         } else
1322                 res = -ENOTCONN;
1323         port_unlock(p_ptr);
1324         return res;
1325 }
1326
1327 int tipc_ref_valid(u32 ref)
1328 {
1329         /* Works irrespective of type */
1330         return !!ref_deref(ref);
1331 }
1332
1333
1334 /*
1335  *  port_recv_sections(): Concatenate and deliver sectioned
1336  *                        message for this node.
1337  */
1338
1339 int port_recv_sections(struct port *sender, unsigned int num_sect,
1340                        struct iovec const *msg_sect)
1341 {
1342         struct sk_buff *buf;
1343         int res;
1344          
1345         res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1346                         MAX_MSG_SIZE, !sender->user_port, &buf);
1347         if (likely(buf))
1348                 port_recv_msg(buf);
1349         return res;
1350 }
1351
1352 /**
1353  * tipc_send - send message sections on connection
1354  */
1355
1356 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1357 {
1358         struct port *p_ptr;
1359         u32 destnode;
1360         int res;
1361
1362         p_ptr = port_deref(ref);
1363         if (!p_ptr || !p_ptr->publ.connected)
1364                 return -EINVAL;
1365
1366         p_ptr->publ.congested = 1;
1367         if (!port_congested(p_ptr)) {
1368                 destnode = port_peernode(p_ptr);
1369                 if (likely(destnode != tipc_own_addr))
1370                         res = link_send_sections_fast(p_ptr, msg_sect, num_sect,
1371                                                       destnode);
1372                 else
1373                         res = port_recv_sections(p_ptr, num_sect, msg_sect);
1374
1375                 if (likely(res != -ELINKCONG)) {
1376                         port_incr_out_seqno(p_ptr);
1377                         p_ptr->publ.congested = 0;
1378                         p_ptr->sent++;
1379                         return res;
1380                 }
1381         }
1382         if (port_unreliable(p_ptr)) {
1383                 p_ptr->publ.congested = 0;
1384                 /* Just calculate msg length and return */
1385                 return msg_calc_data_size(msg_sect, num_sect);
1386         }
1387         return -ELINKCONG;
1388 }
1389
1390 /** 
1391  * tipc_send_buf - send message buffer on connection
1392  */
1393
1394 int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1395 {
1396         struct port *p_ptr;
1397         struct tipc_msg *msg;
1398         u32 destnode;
1399         u32 hsz;
1400         u32 sz;
1401         u32 res;
1402          
1403         p_ptr = port_deref(ref);
1404         if (!p_ptr || !p_ptr->publ.connected)
1405                 return -EINVAL;
1406
1407         msg = &p_ptr->publ.phdr;
1408         hsz = msg_hdr_sz(msg);
1409         sz = hsz + dsz;
1410         msg_set_size(msg, sz);
1411         if (skb_cow(buf, hsz))
1412                 return -ENOMEM;
1413
1414         skb_push(buf, hsz);
1415         memcpy(buf->data, (unchar *)msg, hsz);
1416         destnode = msg_destnode(msg);
1417         p_ptr->publ.congested = 1;
1418         if (!port_congested(p_ptr)) {
1419                 if (likely(destnode != tipc_own_addr))
1420                         res = tipc_send_buf_fast(buf, destnode);
1421                 else {
1422                         port_recv_msg(buf);
1423                         res = sz;
1424                 }
1425                 if (likely(res != -ELINKCONG)) {
1426                         port_incr_out_seqno(p_ptr);
1427                         p_ptr->sent++;
1428                         p_ptr->publ.congested = 0;
1429                         return res;
1430                 }
1431         }
1432         if (port_unreliable(p_ptr)) {
1433                 p_ptr->publ.congested = 0;
1434                 return dsz;
1435         }
1436         return -ELINKCONG;
1437 }
1438
1439 /**
1440  * tipc_forward2name - forward message sections to port name
1441  */
1442
1443 int tipc_forward2name(u32 ref, 
1444                       struct tipc_name const *name, 
1445                       u32 domain,
1446                       u32 num_sect, 
1447                       struct iovec const *msg_sect,
1448                       struct tipc_portid const *orig, 
1449                       unsigned int importance)
1450 {
1451         struct port *p_ptr;
1452         struct tipc_msg *msg;
1453         u32 destnode = domain;
1454         u32 destport = 0;
1455         int res;
1456
1457         p_ptr = port_deref(ref);
1458         if (!p_ptr || p_ptr->publ.connected)
1459                 return -EINVAL;
1460
1461         msg = &p_ptr->publ.phdr;
1462         msg_set_type(msg, TIPC_NAMED_MSG);
1463         msg_set_orignode(msg, orig->node);
1464         msg_set_origport(msg, orig->ref);
1465         msg_set_hdr_sz(msg, LONG_H_SIZE);
1466         msg_set_nametype(msg, name->type);
1467         msg_set_nameinst(msg, name->instance);
1468         msg_set_lookup_scope(msg, addr_scope(domain));
1469         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1470                 msg_set_importance(msg,importance);
1471         destport = nametbl_translate(name->type, name->instance, &destnode);
1472         msg_set_destnode(msg, destnode);
1473         msg_set_destport(msg, destport);
1474
1475         if (likely(destport || destnode)) {
1476                 p_ptr->sent++;
1477                 if (likely(destnode == tipc_own_addr))
1478                         return port_recv_sections(p_ptr, num_sect, msg_sect);
1479                 res = link_send_sections_fast(p_ptr, msg_sect, num_sect, 
1480                                               destnode);
1481                 if (likely(res != -ELINKCONG))
1482                         return res;
1483                 if (port_unreliable(p_ptr)) {
1484                         /* Just calculate msg length and return */
1485                         return msg_calc_data_size(msg_sect, num_sect);
1486                 }
1487                 return -ELINKCONG;
1488         }
1489         return port_reject_sections(p_ptr, msg, msg_sect, num_sect, 
1490                                     TIPC_ERR_NO_NAME);
1491 }
1492
1493 /**
1494  * tipc_send2name - send message sections to port name
1495  */
1496
1497 int tipc_send2name(u32 ref, 
1498                    struct tipc_name const *name,
1499                    unsigned int domain, 
1500                    unsigned int num_sect, 
1501                    struct iovec const *msg_sect)
1502 {
1503         struct tipc_portid orig;
1504
1505         orig.ref = ref;
1506         orig.node = tipc_own_addr;
1507         return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1508                                  TIPC_PORT_IMPORTANCE);
1509 }
1510
1511 /** 
1512  * tipc_forward_buf2name - forward message buffer to port name
1513  */
1514
1515 int tipc_forward_buf2name(u32 ref,
1516                           struct tipc_name const *name,
1517                           u32 domain,
1518                           struct sk_buff *buf,
1519                           unsigned int dsz,
1520                           struct tipc_portid const *orig,
1521                           unsigned int importance)
1522 {
1523         struct port *p_ptr;
1524         struct tipc_msg *msg;
1525         u32 destnode = domain;
1526         u32 destport = 0;
1527         int res;
1528
1529         p_ptr = (struct port *)ref_deref(ref);
1530         if (!p_ptr || p_ptr->publ.connected)
1531                 return -EINVAL;
1532
1533         msg = &p_ptr->publ.phdr;
1534         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1535                 msg_set_importance(msg, importance);
1536         msg_set_type(msg, TIPC_NAMED_MSG);
1537         msg_set_orignode(msg, orig->node);
1538         msg_set_origport(msg, orig->ref);
1539         msg_set_nametype(msg, name->type);
1540         msg_set_nameinst(msg, name->instance);
1541         msg_set_lookup_scope(msg, addr_scope(domain));
1542         msg_set_hdr_sz(msg, LONG_H_SIZE);
1543         msg_set_size(msg, LONG_H_SIZE + dsz);
1544         destport = nametbl_translate(name->type, name->instance, &destnode);
1545         msg_set_destnode(msg, destnode);
1546         msg_set_destport(msg, destport);
1547         msg_dbg(msg, "forw2name ==> ");
1548         if (skb_cow(buf, LONG_H_SIZE))
1549                 return -ENOMEM;
1550         skb_push(buf, LONG_H_SIZE);
1551         memcpy(buf->data, (unchar *)msg, LONG_H_SIZE);
1552         msg_dbg(buf_msg(buf),"PREP:");
1553         if (likely(destport || destnode)) {
1554                 p_ptr->sent++;
1555                 if (destnode == tipc_own_addr)
1556                         return port_recv_msg(buf);
1557                 res = tipc_send_buf_fast(buf, destnode);
1558                 if (likely(res != -ELINKCONG))
1559                         return res;
1560                 if (port_unreliable(p_ptr))
1561                         return dsz;
1562                 return -ELINKCONG;
1563         }
1564         return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1565 }
1566
1567 /** 
1568  * tipc_send_buf2name - send message buffer to port name
1569  */
1570
1571 int tipc_send_buf2name(u32 ref, 
1572                        struct tipc_name const *dest, 
1573                        u32 domain,
1574                        struct sk_buff *buf, 
1575                        unsigned int dsz)
1576 {
1577         struct tipc_portid orig;
1578
1579         orig.ref = ref;
1580         orig.node = tipc_own_addr;
1581         return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1582                                      TIPC_PORT_IMPORTANCE);
1583 }
1584
1585 /** 
1586  * tipc_forward2port - forward message sections to port identity
1587  */
1588
1589 int tipc_forward2port(u32 ref,
1590                       struct tipc_portid const *dest,
1591                       unsigned int num_sect, 
1592                       struct iovec const *msg_sect,
1593                       struct tipc_portid const *orig, 
1594                       unsigned int importance)
1595 {
1596         struct port *p_ptr;
1597         struct tipc_msg *msg;
1598         int res;
1599
1600         p_ptr = port_deref(ref);
1601         if (!p_ptr || p_ptr->publ.connected)
1602                 return -EINVAL;
1603
1604         msg = &p_ptr->publ.phdr;
1605         msg_set_type(msg, TIPC_DIRECT_MSG);
1606         msg_set_orignode(msg, orig->node);
1607         msg_set_origport(msg, orig->ref);
1608         msg_set_destnode(msg, dest->node);
1609         msg_set_destport(msg, dest->ref);
1610         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1611         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1612                 msg_set_importance(msg, importance);
1613         p_ptr->sent++;
1614         if (dest->node == tipc_own_addr)
1615                 return port_recv_sections(p_ptr, num_sect, msg_sect);
1616         res = link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1617         if (likely(res != -ELINKCONG))
1618                 return res;
1619         if (port_unreliable(p_ptr)) {
1620                 /* Just calculate msg length and return */
1621                 return msg_calc_data_size(msg_sect, num_sect);
1622         }
1623         return -ELINKCONG;
1624 }
1625
1626 /** 
1627  * tipc_send2port - send message sections to port identity 
1628  */
1629
1630 int tipc_send2port(u32 ref, 
1631                    struct tipc_portid const *dest,
1632                    unsigned int num_sect, 
1633                    struct iovec const *msg_sect)
1634 {
1635         struct tipc_portid orig;
1636
1637         orig.ref = ref;
1638         orig.node = tipc_own_addr;
1639         return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 
1640                                  TIPC_PORT_IMPORTANCE);
1641 }
1642
1643 /** 
1644  * tipc_forward_buf2port - forward message buffer to port identity
1645  */
1646 int tipc_forward_buf2port(u32 ref,
1647                           struct tipc_portid const *dest,
1648                           struct sk_buff *buf,
1649                           unsigned int dsz,
1650                           struct tipc_portid const *orig,
1651                           unsigned int importance)
1652 {
1653         struct port *p_ptr;
1654         struct tipc_msg *msg;
1655         int res;
1656
1657         p_ptr = (struct port *)ref_deref(ref);
1658         if (!p_ptr || p_ptr->publ.connected)
1659                 return -EINVAL;
1660
1661         msg = &p_ptr->publ.phdr;
1662         msg_set_type(msg, TIPC_DIRECT_MSG);
1663         msg_set_orignode(msg, orig->node);
1664         msg_set_origport(msg, orig->ref);
1665         msg_set_destnode(msg, dest->node);
1666         msg_set_destport(msg, dest->ref);
1667         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1668         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1669                 msg_set_importance(msg, importance);
1670         msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1671         if (skb_cow(buf, DIR_MSG_H_SIZE))
1672                 return -ENOMEM;
1673
1674         skb_push(buf, DIR_MSG_H_SIZE);
1675         memcpy(buf->data, (unchar *)msg, DIR_MSG_H_SIZE);
1676         msg_dbg(msg, "buf2port: ");
1677         p_ptr->sent++;
1678         if (dest->node == tipc_own_addr)
1679                 return port_recv_msg(buf);
1680         res = tipc_send_buf_fast(buf, dest->node);
1681         if (likely(res != -ELINKCONG))
1682                 return res;
1683         if (port_unreliable(p_ptr))
1684                 return dsz;
1685         return -ELINKCONG;
1686 }
1687
1688 /** 
1689  * tipc_send_buf2port - send message buffer to port identity
1690  */
1691
1692 int tipc_send_buf2port(u32 ref, 
1693                        struct tipc_portid const *dest,
1694                        struct sk_buff *buf, 
1695                        unsigned int dsz)
1696 {
1697         struct tipc_portid orig;
1698
1699         orig.ref = ref;
1700         orig.node = tipc_own_addr;
1701         return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 
1702                                      TIPC_PORT_IMPORTANCE);
1703 }
1704