## diffname gnot/sturp.c 1990/0312
## diff -e /dev/null /n/bootesdump/1990/0312/sys/src/9/68020/sturp.c
0a
#include "u.h"
#include "lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "io.h"
#include "errno.h"
enum {
Nurp= 32,
MSrexmit= 1000,
Nmask= 0x7,
};
#define DPRINT print
typedef struct Urp Urp;
#define NOW (MACHP(0)->ticks*MS2HZ)
/*
* URP status
*/
struct urpstat {
ulong input; /* bytes read from urp */
ulong output; /* bytes output to urp */
ulong rxmit; /* retransmit rejected urp msg */
ulong rjtrs; /* reject, trailer size */
ulong rjpks; /* reject, packet size */
ulong rjseq; /* reject, sequence number */
ulong levelb; /* unknown level b */
ulong enqsx; /* enqs sent */
ulong enqsr; /* enqs rcved */
} urpstat;
struct Urp {
QLock;
short state; /* flags */
Rendez r; /* process waiting for close */
/* input */
Queue *rq; /* input queue */
uchar iseq; /* last good input sequence number */
uchar lastecho; /* last echo/rej sent */
uchar trbuf[3]; /* trailer being collected */
short trx; /* # bytes in trailer being collected */
/* output */
QLock xmit; /* output lock, only one process at a time */
Queue *wq; /* output queue */
int maxout; /* maximum outstanding unacked blocks */
int maxblock; /* max block size */
int next; /* next block to send */
int unechoed; /* first unechoed block */
int unacked; /* first unacked block */
int nxb; /* next xb to use */
Block *xb[8]; /* the xmit window buffer */
QLock xl[8];
ulong timer; /* timeout for xmit */
int kstarted;
};
#define WINDOW(u) ((u->unechoed + u->maxout - u->next)%8)
#define IN(x, f, n) (f<=n ? x>=f && x<n : x<n || x>=f)
#define NEXT(x) (((x)+1)&Nmask)
/*
* Protocol control bytes
*/
#define SEQ 0010 /* sequence number, ends trailers */
#undef ECHO
#define ECHO 0020 /* echos, data given to next queue */
#define REJ 0030 /* rejections, transmission error */
#define ACK 0040 /* acknowledgments */
#define BOT 0050 /* beginning of trailer */
#define BOTM 0051 /* beginning of trailer, more data follows */
#define BOTS 0052 /* seq update algorithm on this trailer */
#define SOU 0053 /* start of unsequenced trailer */
#define EOU 0054 /* end of unsequenced trailer */
#define ENQ 0055 /* xmitter requests flow/error status */
#define CHECK 0056 /* xmitter requests error status */
#define INITREQ 0057 /* request initialization */
#define INIT0 0060 /* disable trailer processing */
#define INIT1 0061 /* enable trailer procesing */
#define AINIT 0062 /* response to INIT0/INIT1 */
#undef DELAY
#define DELAY 0100 /* real-time printing delay */
#define BREAK 0110 /* Send/receive break (new style) */
#define REJECTING 0x1
#define INITING 0x2
#define HUNGUP 0x4
#define OPEN 0x8
#define CLOSING 0x10
Urp urp[Nurp];
/*
* predeclared
*/
static void urpciput(Queue*, Block*);
static void urpiput(Queue*, Block*);
static void urpoput(Queue*, Block*);
static void urpopen(Queue*, Stream*);
static void urpclose(Queue *);
static void output(Urp*);
static void sendblock(Urp*, int);
static void rcvack(Urp*, int);
static void flushinput(Urp*);
static void sendctl(Urp*, int);
static void initoutput(Urp*, int);
static void initinput(Urp*, int);
static void urpkproc(void *arg);
Qinfo urpinfo = { urpciput, urpoput, urpopen, urpclose, "urp" };
static void
urpopen(Queue *q, Stream *s)
{
Urp *up;
int i;
char name[128];
DPRINT("urpopen\n");
/*
* find a free urp structure
*/
for(up = urp; up < &urp[Nurp]; up++){
qlock(up);
if(up->state == 0)
break;
qunlock(up);
}
if(up == &urp[Nurp])
error(0, Egreg);
q->ptr = q->other->ptr = up;
up->rq = q;
up->wq = q->other;
up->state = OPEN;
qunlock(up);
initinput(up, 0);
initoutput(up, 0);
/*
* start the ack/(re)xmit process
*/
if(up->kstarted == 0){
up->kstarted = 1;
sprint(name, "**urp%d**", up - urp);
kproc(name, urpkproc, up);
}
}
/*
* Shut down the connection and kill off the kernel process
*/
static int
isflushed(void *a)
{
Urp *up;
up = (Urp *)a;
return (up->state&HUNGUP) || (up->unechoed==up->next && up->wq->len==0);
}
static int
isdead(void *a)
{
Urp *up;
up = (Urp *)a;
return up->kstarted == 0;
}
static void
urpclose(Queue *q)
{
Block *bp;
Urp *up;
int i;
up = (Urp *)q->ptr;
/*
* wait for all outstanding messages to drain, tell kernel
* process we're closing.
*/
up->state |= CLOSING;
sleep(&up->r, isflushed, up);
/*
* ack all outstanding messages
*/
qlock(&up->xmit);
up->state |= HUNGUP;
i = up->next - 1;
if(i < 0)
i = 7;
rcvack(up, ECHO+i);
qunlock(&up->xmit);
DPRINT("urpclose(%ux)\n", up);
/*
* kill off the kernel process
*/
wakeup(&up->rq->r);
DPRINT("urpclosed(%ux)\n", up);
}
/*
* upstream control messages
*/
static void
urpctliput(Urp *up, Queue *q, Block *bp)
{
switch(bp->type){
case M_HANGUP:
up->state |= HUNGUP;
wakeup(&up->r);
wakeup(&up->rq->r);
break;
}
PUTNEXT(q, bp);
}
/*
* character mode input.
*
* the first byte in every message is a ctl byte (which belongs at the end).
*/
void
urpciput(Queue *q, Block *bp)
{
Urp *up;
int i;
int ctl;
up = (Urp *)q->ptr;
if(bp->type != M_DATA){
urpctliput(up, q, bp);
return;
}
/*
* get the control character
*/
ctl = *bp->rptr++;
if(ctl < 0)
return;
/*
* take care of any data
*/
if(BLEN(bp)>0 && q->next->len<Streamhi)
PUTNEXT(q, bp);
else
freeb(bp);
/*
* handle the control character
*/
switch(ctl){
case 0:
break;
case ENQ:
urpstat.enqsr++;
sendctl(up, up->lastecho);
sendctl(up, ACK+up->iseq);
break;
case CHECK:
sendctl(up, ACK+up->iseq);
break;
case AINIT:
up->state &= ~INITING;
flushinput(up);
wakeup(&up->rq->r);
break;
case INIT0:
case INIT1:
sendctl(up, AINIT);
if(ctl == INIT1)
q->put = urpiput;
initinput(up, 0);
break;
case INITREQ:
initoutput(up, 0);
break;
case BREAK:
break;
case REJ+0: case REJ+1: case REJ+2: case REJ+3:
case REJ+4: case REJ+5: case REJ+6: case REJ+7:
rcvack(up, ctl);
break;
case ACK+0: case ACK+1: case ACK+2: case ACK+3:
case ACK+4: case ACK+5: case ACK+6: case ACK+7:
case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
rcvack(up, ctl);
break;
case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
i = ctl & Nmask;
if(q->next->len < Streamhi)
sendctl(up, up->lastecho = ECHO+i);
up->iseq = i;
break;
}
}
/*
* block mode input.
*
* the first byte in every message is a ctl byte (which belongs at the end).
*
* Simplifying assumption: one put == one message && the control byte
* is in the first block. If this isn't true, strange bytes will be
* used as control bytes.
*/
void
urpiput(Queue *q, Block *bp)
{
Urp *up;
int i;
int ctl;
up = (Urp *)q->ptr;
if(bp->type != M_DATA){
urpctliput(up, q, bp);
return;
}
/*
* get the control character
*/
ctl = *bp->rptr++;
/*
* take care of any block count(trx)
*/
while(up->trx){
if(BLEN(bp)<=0)
break;
switch (up->trx) {
case 1:
case 2:
up->trbuf[up->trx++] = *bp->rptr++;
continue;
default:
up->trx = 0;
break;
}
}
/*
* queue the block(s)
*/
if(BLEN(bp) > 0){
putq(q, bp);
q->last->flags &= ~S_DELIM;
if(q->len > 4*1024){
flushinput(up);
return;
}
} else
freeb(bp);
/*
* handle the control character
*/
switch(ctl){
case 0:
break;
case ENQ:
print("rENQ %uo %uo\n", up->lastecho, ACK+up->iseq);
urpstat.enqsr++;
sendctl(up, up->lastecho);
sendctl(up, ACK+up->iseq);
flushinput(up);
break;
case CHECK:
sendctl(up, ACK+up->iseq);
break;
case AINIT:
up->state &= ~INITING;
flushinput(up);
wakeup(&up->rq->r);
break;
case INIT0:
case INIT1:
sendctl(up, AINIT);
if(ctl == INIT0)
q->put = urpciput;
initinput(up, 0);
break;
case INITREQ:
initoutput(up, 0);
break;
case BREAK:
break;
case BOT:
case BOTS:
case BOTM:
up->trx = 1;
up->trbuf[0] = ctl;
break;
case REJ+0: case REJ+1: case REJ+2: case REJ+3:
case REJ+4: case REJ+5: case REJ+6: case REJ+7:
print("rREJ\n");
rcvack(up, ctl);
break;
case ACK+0: case ACK+1: case ACK+2: case ACK+3:
case ACK+4: case ACK+5: case ACK+6: case ACK+7:
case ECHO+0: case ECHO+1: case ECHO+2: case ECHO+3:
case ECHO+4: case ECHO+5: case ECHO+6: case ECHO+7:
rcvack(up, ctl);
break;
/*
* if the seuence number is the next expected
* and te trailer length == 3
* and the block count matches the bytes received
* then send the bytes upstream.
*/
case SEQ+0: case SEQ+1: case SEQ+2: case SEQ+3:
case SEQ+4: case SEQ+5: case SEQ+6: case SEQ+7:
i = ctl & Nmask;
if(up->trx != 3){
urpstat.rjtrs++;
flushinput(up);
print("sREJ1 %d\n", up->iseq);
sendctl(up, up->lastecho = REJ+up->iseq);
break;
} else if(q->len != up->trbuf[1] + (up->trbuf[2]<<8)){
urpstat.rjpks++;
flushinput(up);
print("sREJ2 %d\n", up->iseq);
sendctl(up, up->lastecho = REJ+up->iseq);
break;
} else if(i != ((up->iseq+1)&Nmask)) {
urpstat.rjseq++;
flushinput(up);
print("sREJ3 %d %d\n", i, up->iseq);
sendctl(up, up->lastecho = REJ+up->iseq);
break;
}
/*
* send data upstream
*/
if(q->first) {
if(up->trbuf[0] != BOTM)
q->last->flags |= S_DELIM;
while(bp = getq(q))
PUTNEXT(q, bp);
} else {
bp = allocb(0);
bp->flags |= S_DELIM;
PUTNEXT(q, bp);
}
up->trx = 0;
/*
* acknowledge receipt
*/
if(q->next->len < Streamhi){
sendctl(up, ECHO+i);
up->lastecho = ECHO+i;
wakeup(&up->rq->r);
}
up->iseq = i;
break;
}
}
/*
* downstream control
*/
static void
urpctloput(Urp *up, Queue *q, Block *bp)
{
char *fields[2];
int n;
int inwin=0, outwin=0;
switch(bp->type){
case M_CTL:
if(streamparse("init", bp)){
switch(getfields((char *)bp->rptr, fields, 2, ' ')){
case 2:
inwin = strtoul(fields[1], 0, 0);
case 1:
outwin = strtoul(fields[0], 0, 0);
}
/* initinput(up, inwin); */
DPRINT("initoutput %d\n", outwin);
initoutput(up, outwin);
freeb(bp);
return;
}
}
PUTNEXT(q, bp);
}
/*
* accept data from a writer
*/
static void
urpoput(Queue *q, Block *bp)
{
Urp *up;
up = (Urp *)q->ptr;
if(bp->type != M_DATA){
urpctloput(up, q, bp);
return;
}
urpstat.output += BLEN(bp);
putq(q, bp);
output(up);
}
/*
* start output
*/
static void
output(Urp *up)
{
Block *bp, *nbp;
ulong now;
Queue *q;
int n;
if(!canqlock(&up->xmit))
return;
if(waserror()){
print("urp output error\n");
qunlock(&up->xmit);
nexterror();
}
/*
* if still initing and it's time to rexmit, send an INIT1
*/
now = NOW;
if(up->state & INITING){
if(now > up->timer){
sendctl(up, INIT1);
up->timer = now + MSrexmit;
}
qunlock(&up->xmit);
poperror();
return;
}
/*
* fill the transmit buffers
*/
q = up->wq;
for(bp = getq(q); bp && up->xb[up->nxb]==0; up->nxb = NEXT(up->nxb)){
if(BLEN(bp) > up->maxblock){
nbp = up->xb[up->nxb] = allocb(0);
nbp->rptr = bp->rptr;
nbp->wptr = bp->rptr = bp->rptr + up->maxblock;
} else {
up->xb[up->nxb] = bp;
bp = getq(q);
}
}
if(bp)
putbq(q, bp);
/* print("output w(%d) up->xb[%d](%ux) up->nxb(%d) up->state(%ux)\n",
WINDOW(up), up->next, up->xb[up->next], up->nxb, up->state);
/**/
/*
* if a retransmit time has elapsed since a transmit, send an ENQ
*/
if(up->unechoed != up->next && NOW > up->timer){
print("sENQ\n");
up->timer = NOW + MSrexmit;
up->state &= ~REJECTING;
sendctl(up, ENQ);
qunlock(&up->xmit);
poperror();
return;
}
/*
* if there's a window open, push some blocks out
*/
while(WINDOW(up)>0 && up->xb[up->next]!=0 && canqlock(&up->xl[up->next])){
if(up->xb[up->next])
sendblock(up, up->next);
qunlock(&up->xl[up->next]);
up->next = NEXT(up->next);
}
qunlock(&up->xmit);
poperror();
}
/*
* send a control byte, put the byte at the end of the allocated
* space in case a lower layer needs header room.
*/
static void
sendctl(Urp *up, int ctl)
{
Block *bp;
if(up->wq->next->len > Streamhi)
return;
bp = allocb(1);
bp->wptr = bp->lim;
bp->rptr = bp->lim-1;
*bp->rptr = ctl;
bp->flags |= S_DELIM;
PUTNEXT(up->wq, bp);
}
/*
* send a block.
*/
static void
sendblock(Urp *up, int bn)
{
Block *bp, *m, *nbp;
int n;
up->timer = NOW + MSrexmit;
if(up->wq->next->len > Streamhi)
return;
/*
* message 1, the BOT and the data
*/
bp = up->xb[bn];
m = allocb(1);
m->rptr = m->lim - 1;
m->wptr = m->lim;
*m->rptr = (bp->flags & S_DELIM) ? BOT : BOTM;
nbp = m->next = allocb(0);
nbp->rptr = bp->rptr;
nbp->wptr = bp->wptr;
nbp->flags |= S_DELIM;
PUTNEXT(up->wq, m);
/*
* message 2, the block length and the SEQ
*/
m = allocb(3);
m->rptr = m->lim - 3;
m->wptr = m->lim;
n = BLEN(bp);
m->rptr[0] = SEQ | bn;
m->rptr[1] = n;
m->rptr[2] = n<<8;
m->flags |= S_DELIM;
PUTNEXT(up->wq, m);
}
/*
* receive an acknowledgement
*/
static void
rcvack(Urp *up, int msg)
{
int seqno;
int next;
seqno = msg&Nmask;
next = NEXT(seqno);
/*
* release any acknowledged blocks
*/
if(IN(seqno, up->unacked, up->next)){
for(; up->unacked != next; up->unacked = NEXT(up->unacked)){
qlock(&up->xl[up->unacked]);
if(up->xb[up->unacked])
freeb(up->xb[up->unacked]);
up->xb[up->unacked] = 0;
qunlock(&up->xl[up->unacked]);
}
}
switch(msg & 0370){
case ECHO:
if(IN(seqno, up->unechoed, up->next)) {
up->unechoed = next;
}
/*
* the next reject at the start of a window starts a
* retransmission.
*/
up->state &= ~REJECTING;
break;
case REJ:
if(IN(seqno, up->unechoed, up->next))
up->unechoed = next;
/*
* ... FALL THROUGH ...
*/
case ACK:
/*
* start a retransmission if we aren't retransmitting
* and this is the start of a window.
*/
if(up->unechoed==next && !(up->state & REJECTING)){
up->state |= REJECTING;
up->next = next;
}
break;
}
wakeup(&up->rq->r);
}
/*
* throw away any partially collected input
*/
static void
flushinput(Urp *up)
{
Block *bp;
while (bp = getq(up->rq))
freeb(bp);
up->trx = 0;
}
/*
* initialize output
*/
static void
initoutput(Urp *up, int window)
{
int i;
/*
* set output window
*/
up->maxblock = window/4;
if(up->maxblock < 64)
up->maxblock = 64;
if(up->maxblock > Streamhi/4)
up->maxblock = Streamhi/4;
up->maxblock -= 4;
up->maxout = 3;
/*
* set sequence varialbles
*/
up->unechoed = 1;
up->unacked = 1;
up->next = 1;
up->nxb = 1;
/*
* free any outstanding blocks
*/
for(i = 0; i < 8; i++){
qlock(&up->xl[i]);
if(up->xb[i])
freeb(up->xb[i]);
qunlock(&up->xl[i]);
}
/*
* tell the other side we've inited
*/
up->state |= INITING;
up->timer = NOW + MSrexmit;
sendctl(up, INIT1);
}
/*
* initialize input
*/
static void
initinput(Urp *up, int window)
{
/*
* restart all sequence parameters
*/
up->trx = 0;
up->iseq = 0;
up->lastecho = ECHO+0;
flushinput(up);
}
/*
* do retransmissions etc
*/
static int
todo(void *arg)
{
Urp *up;
up = (Urp *)arg;
return (WINDOW(up)>0 && up->wq->len>0 && !(up->state&INITING));
}
static void
urpkproc(void *arg)
{
Urp *up;
up = (Urp *)arg;
print("urpkproc started\n");
for(;;){
if(up->state & (HUNGUP|CLOSING)){
if(isflushed(up))
wakeup(&up->r);
if(up->state & HUNGUP)
break;
}
if((up->lastecho&Nmask)!=up->iseq && up->rq->next->len<Streamhi)
sendctl(up, up->lastecho = ECHO+up->iseq);
output(up);
tsleep(&up->rq->r, todo, up, MSrexmit/2);
}
DPRINT("urpkproc exiting %ux\n", up);
up->kstarted = 0;
up->state = 0;
}
.
## diffname gnot/sturp.c 1990/0315
## diff -e /n/bootesdump/1990/0312/sys/src/9/68020/sturp.c /n/bootesdump/1990/0315/sys/src/9/68020/sturp.c
828c
DPRINT("urpkproc started\n");
.
784a
up->xb[i] = 0;
.
15c
#define DPRINT if(0)
.
## diffname gnot/sturp.c 1990/0320
## diff -e /n/bootesdump/1990/0315/sys/src/9/68020/sturp.c /n/bootesdump/1990/0320/sys/src/9/68020/sturp.c
843d
829d
599c
DPRINT("sENQ\n");
.
513d
484,485c
sendctl(up, up->lastecho = ECHO+i);
.
460,461c
DPRINT("sREJ3 %d %d\n", i, up->iseq);
if((up->lastecho&~7)==ECHO)
sendctl(up, up->lastecho = REJ+up->iseq);/**/
.
454,455c
if((up->lastecho&~7)==ECHO)
sendctl(up, up->lastecho = REJ+up->iseq);/**/
.
452a
DPRINT("sREJ2 %d %d %d\n", up->iseq, q->len,
up->trbuf[1] + (up->trbuf[2]<<8));
.
448,449c
DPRINT("sREJ1 %d\n", up->iseq);
if((up->lastecho&~7)==ECHO)
sendctl(up, up->lastecho = REJ+up->iseq);/**/
.
425c
DPRINT("rREJ\n");
.
384c
DPRINT("rENQ %uo %uo\n", up->lastecho, ACK+up->iseq);
.
209d
203d
126,127d
## diffname gnot/sturp.c 1990/0321
## diff -e /n/bootesdump/1990/0320/sys/src/9/68020/sturp.c /n/bootesdump/1990/0321/sys/src/9/68020/sturp.c
835,836c
sendack(up);
.
803a
up->blocks = 0;
.
638a
* send a reject
*/
static void
sendrej(Urp *up)
{
flushinput(up);
qlock(&up->ack);
if((up->lastecho&~Nmask) == ECHO){
DPRINT("REJ %d\n", up->iseq);
sendctl(up, up->lastecho = REJ|up->iseq);
}
qunlock(&up->ack);
}
/*
* send an acknowledge
*/
static void
sendack(Urp *up)
{
Block *bp;
/*
* check the precondition for acking
*/
if(up->rq->next->len>=Streamhi || (up->lastecho&Nmask)==up->iseq)
return;
if(!canqlock(&up->ack))
return;
/*
* check again now that we've locked
*/
if(up->rq->next->len>=Streamhi || (up->lastecho&Nmask)==up->iseq){
qunlock(&up->ack);
return;
}
/*
* send the ack
*/
sendctl(up, up->lastecho = ECHO|up->iseq);
qunlock(&up->ack);
}
/*
.
487a
if(q->next->len < Streamhi)
sendctl(up, up->lastecho = ECHO|i);
qunlock(&up->ack);
.
483,486c
qlock(&up->ack);
.
458,461c
sendrej(up);
.
450,454c
sendrej(up);
.
443,446c
sendrej(up);
.
380c
DPRINT("rENQ %d %uo %uo\n", up->blocks, up->lastecho, ACK+up->iseq);
up->blocks = 0;
.
311a
qunlock(&up->ack);
.
307a
qlock(&up->ack);
.
112a
static void sendack(Urp*);
static void sendrej(Urp*);
.
47a
int blocks;
.
42c
QLock ack; /* ack lock */
.
## diffname gnot/sturp.c 1990/0331
## diff -e /n/bootesdump/1990/0321/sys/src/9/68020/sturp.c /n/bootesdump/1990/0331/sys/src/9/68020/sturp.c
66c
#define WINDOW(u) ((u)->unechoed>(u)->next ? (u)->unechoed+(u)->maxout-(u)->next-8 :\
(u)->unechoed+(u)->maxout-(u)->next)
.
## diffname gnot/sturp.c 1990/0403
## diff -e /n/bootesdump/1990/0331/sys/src/9/68020/sturp.c /n/bootesdump/1990/0403/sys/src/9/68020/sturp.c
807,808d
692c
if(QFULL(up->wq->next))
.
670c
if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq){
.
661c
if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq)
.
625c
if(QFULL(up->wq->next))
.
482c
if(!QFULL(q->next))
.
314c
if(!QFULL(q->next))
.
256c
if(BLEN(bp)>0 && !QFULL(q->next))
.
## diffname gnot/sturp.c 1990/0406
## diff -e /n/bootesdump/1990/0403/sys/src/9/68020/sturp.c /n/bootesdump/1990/0406/sys/src/9/68020/sturp.c
883a
up->kstarted = 0;
.
882d
870a
if(waserror()){
up->state = 0;
up->kstarted = 0;
wakeup(&up->r);
return;
}
.
585,586d
583a
if(bp)
putbq(q, bp);
.
575,582c
if(up->xb[up->nxb]==0) {
for(bp=getq(q); bp && up->xb[up->nxb]==0; up->nxb=NEXT(up->nxb)){
if(BLEN(bp) > up->maxblock){
nbp = up->xb[up->nxb] = allocb(0);
nbp->rptr = bp->rptr;
nbp->wptr = bp->rptr = bp->rptr + up->maxblock;
} else {
up->xb[up->nxb] = bp;
bp = getq(q);
}
.
209a
if(up->kstarted == 0)
up->state = 0;
.
## diffname gnot/sturp.c 1990/05312
## diff -e /n/bootesdump/1990/0406/sys/src/9/68020/sturp.c /n/bootesdump/1990/05312/sys/src/9/68020/sturp.c
894a
DPRINT("urpkproc %ux\n", up);
.
889c
if(up->state == 0){
DPRINT("urpkproc: %ux->state == 0\n", up);
break;
}
if(!QFULL(up->rq->next))
sendack(up);
.
876a
print("urpkproc error %ux\n", up);
.
821a
up->rexmit = 0;
.
777c
up->rexmit = 1;
.
745,749c
i = up->unacked;
qlock(&up->xl[i]);
if(up->xb[i])
freeb(up->xb[i]);
up->xb[i] = 0;
qunlock(&up->xl[i]);
.
735a
int i;
.
615a
poperror();
.
611,614c
if(up->rexmit){
up->rexmit = 0;
up->next = up->unechoed;
}
while(WINDOW(up)>0 && up->xb[up->next]!=0){
i = up->next;
qlock(&up->xl[i]);
if(waserror()){
qunlock(&up->xl[i]);
nexterror();
}
sendblock(up, i);
qunlock(&up->xl[i]);
.
598,599c
if(up->unechoed!=up->next && NOW>up->timer){
.
549a
int i;
.
475c
if(up->trbuf[0] != BOTM)
bp->flags |= S_DELIM;
.
462a
} else if(q->next->len > (3*Streamhi)/2
|| q->next->nb > (3*Streambhi)/2) {
flushinput(up);
break;
.
374d
372a
bp->flags &= ~S_DELIM;
.
261c
} else
.
259c
if(BLEN(bp)>0 && q->next->len<2*Streamhi && q->next->nb<2*Streambhi){
bp->flags |= S_DELIM;
.
212a
}
.
211c
if(up->kstarted == 0){
DPRINT("urpclose %ux\n", up);
.
209c
for(i = 0; i < 7; i++)
if(up->xb[i]){
freeb(up->xb[i]);
up->xb[i] = 0;
}
qunlock(&up->xmit);
.
207c
* free all staged but unsent messages
.
204d
199a
wakeup(&up->rq->r);
qlock(&up->xmit);
/*
* ack all outstanding messages
*/
.
198d
196c
* kill off the kernel process
.
193c
tsleep(&up->r, isflushed, up, 2*60*1000);
.
190a
*
* if 2 minutes elapse, give it up
.
171,178d
169c
return (up->state&HUNGUP) || (up->unechoed==up->nxb && up->wq->len==0);
.
62a
int rexmit;
.
51d
## diffname gnot/sturp.c 1990/0629
## diff -e /n/bootesdump/1990/05312/sys/src/9/68020/sturp.c /n/bootesdump/1990/0629/sys/src/9/68020/sturp.c
925a
}
/*
* urp got very confused, complain
*/
static void
urpvomit(char *msg, Urp* up)
{
print("urpvomit: %s %ux next %d unechoed %d unacked %d nxb %d\n",
msg, up, up->next, up->unechoed, up->unacked, up->nxb);
print("\txb: %ux %ux %ux %ux %ux %ux %ux %ux\n",
up->xb[0], up->xb[1], up->xb[2], up->xb[3], up->xb[4],
up->xb[5], up->xb[6], up->xb[7]);
print("\tiseq: %uo lastecho: %uo trx: %d trbuf: %uo %uo %uo\n",
up->iseq, up->lastecho, up->trx, up->trbuf[0], up->trbuf[1],
up->trbuf[2]);
print("\tupq: %ux %d %d\n", up->rq->next->first, up->rq->next->nb,
up->rq->next->len);
}
int
urpdump(void)
{
Urp *up;
for(up = urp; up < &urp[Nurp]; up++)
if(up->rq)
urpvomit("", up);
.
770a
else
urpvomit("rcvack", up);
.
724a
if(bp == 0){
urpvomit("sendblock", up);
return;
}
.
622,626c
while(WINDOW(up)>0 && up->next!=up->nxb){
.
620a
*
* the lock is to synchronize with acknowledges that free
* blocks.
.
610c
if(up->rexmit){
/*
* if a retransmit is requested, move next back to
* the unacked blocks
*/
up->rexmit = 0;
up->next = up->unacked;
} else if(up->unacked!=up->next && NOW>up->timer){
/*
* if a retransmit time has elapsed since a transmit,
* send an ENQ
*/
.
608c
* retransmit cruft
.
604,606c
.
599a
up->nxb = i;
.
590,591c
i = NEXT(up->nxb);
if(i != up->unechoed) {
for(bp = getq(q); bp && i!=up->unechoed; i = NEXT(i)){
if(up->xb[up->nxb] != 0)
urpvomit("output", up);
.
587c
* fill the transmit buffers, `nxb' can never overtake `unechoed'
.
119a
static void urpvomit(char*, Urp*);
.
68c
#define IN(x, f, n) (f<=n ? (x>=f && x<n) : (x<n || x>=f))
.
## diffname gnot/sturp.c 1990/0702
## diff -e /n/bootesdump/1990/0629/sys/src/9/68020/sturp.c /n/bootesdump/1990/0702/sys/src/9/68020/sturp.c
961c
print("\tupq: %ux %d %d\n", &up->rq->next->r, up->rq->next->nb,
.
## diffname gnot/sturp.c 1990/0717
## diff -e /n/bootesdump/1990/0702/sys/src/9/68020/sturp.c /n/bootesdump/1990/0717/sys/src/9/68020/sturp.c
970c
for(up = urp; up < &urp[conf.nurp]; up++)
.
179a
if(up == 0)
return;
.
141a
}
.
140c
if(up == &urp[conf.nurp]){
q->ptr = 0;
WR(q)->ptr = 0;
.
134c
for(up = urp; up < &urp[conf.nurp]; up++){
.
123a
void
urpreset(void)
{
newqinfo(&urpinfo);
urp = (Urp *)ialloc(conf.nurp*sizeof(Urp), 0);
}
.
100c
Urp *urp;
.
10d
## diffname gnot/sturp.c 1990/0721
## diff -e /n/bootesdump/1990/0717/sys/src/9/68020/sturp.c /n/bootesdump/1990/0721/sys/src/9/68020/sturp.c
928a
q = up->wq;
.
926c
Urp *up; Queue *q;
.
690a
Queue *q = up->wq;
.
538a
if(streamparse("debug", bp)){
switch(getfields((char *)bp->rptr, fields, 2, ' ')){
case 1:
if (strcmp(fields[0], "on") == 0) {
q->flag |= QDEBUG;
q->other->flag |= QDEBUG;
}
if (strcmp(fields[0], "off") == 0) {
q->flag &= ~QDEBUG;
q->other->flag &= ~QDEBUG;
}
}
freeb(bp);
return;
}
.
485a
DPRINT("rSEQ%d accept %d\n", i, q->len);
.
461,462c
* if the sequence number is the next expected
* and the trailer length == 3
.
442a
case BOTS:
DPRINT("rBOT%d...", ctl-BOT);
.
441d
14c
#define DPRINT if(q->flag&QDEBUG)kprint
.
## diffname gnot/sturp.c 1990/0725
## diff -e /n/bootesdump/1990/0721/sys/src/9/68020/sturp.c /n/bootesdump/1990/0725/sys/src/9/68020/sturp.c
974a
}
/*
* timer to wakeup urpkproc's for retransmissions
*/
static void
urptimer(Alarm *a)
{
Urp *up;
Urp *last;
Queue *q;
urptiming = 0;
for(up = urp, last = &urp[conf.nurp]; up < last; up++){
if(up->state==0)
continue;
if(up->unacked!=up->next && NOW>up->timer){
q = up->rq;
if(q)
wakeup(&q->r);
}
}
.
973a
wakeup(&up->r);
poperror();
.
972d
970c
sleep(&up->rq->r, todo, up);
.
966d
957,964c
if(up->state & HUNGUP)
.
951d
947c
up->kstarted = 1;
.
944c
Urp *up;
.
678a
out:
.
656,658c
goto out;
.
610,612c
goto out;
.
591a
/*
* start the urptimer if it isn't already
*/
if(urptiming==0){
if(canlock(&urptlock)){
if(urptiming == 0)
urptiming = alarm(500, urptimer, 0);
unlock(&urptlock);
}
}
.
226,229c
/*
* wait for kernel process to die
*/
while(up->kstarted)
sleep(&up->r, isdead, up);
up->state = 0;
.
202c
* tell kernel process to die
.
173a
isdead(void *a)
{
Urp *up;
up = (Urp *)a;
return up->kstarted==0;
}
static int
.
163,166c
sprint(name, "urp%d", up - urp);
kproc(name, urpkproc, up);
/*
* start the urptimer if it isn't already
*/
if(urptiming==0){
if(canlock(&urptlock)){
if(urptiming == 0)
urptiming = alarm(500, urptimer, 0);
unlock(&urptlock);
}
.
118a
static void urptimer(Alarm*);
.
70a
* Alarm for urptiming
*/
Alarm *urptiming;
Lock urptlock;
/*
.
14c
#define DPRINT /*if(q->flag&QDEBUG)kprint*/
.
## diffname gnot/sturp.c 1990/0726
## diff -e /n/bootesdump/1990/0725/sys/src/9/68020/sturp.c /n/bootesdump/1990/0726/sys/src/9/68020/sturp.c
1001,1022d
998,999d
996a
up->state = 0;
.
995c
tsleep(&up->rq->r, todo, up, MSrexmit/2);
.
991a
}
.
990c
if(up->state & (HUNGUP|CLOSING)){
if(isflushed(up))
wakeup(&up->r);
if(up->state & HUNGUP)
break;
}
if(up->state == 0){
DPRINT("urpkproc: %ux->state == 0\n", up);
.
984a
up->state = 0;
.
981c
q = up->wq;
.
978c
Urp *up; Queue *q;
.
712d
691c
qunlock(&up->xmit);
poperror();
return;
.
647c
qunlock(&up->xmit);
poperror();
return;
.
618,628d
249,255c
if(up->kstarted == 0){
DPRINT("urpclose %ux\n", up);
up->state = 0;
}
.
225c
* kill off the kernel process
.
189,196d
170,181c
if(up->kstarted == 0){
up->kstarted = 1;
sprint(name, "**urp%d**", up - urp);
kproc(name, urpkproc, up);
.
125d
71,76d
14c
#define DPRINT if(q->flag&QDEBUG)kprint
.
## diffname gnot/sturp.c 1990/0728
## diff -e /n/bootesdump/1990/0726/sys/src/9/68020/sturp.c /n/bootesdump/1990/0728/sys/src/9/68020/sturp.c
974a
}
/*
* timer to wakeup urpkproc's for retransmissions
*/
static void
urptimer(Alarm *a)
{
Urp *up;
Urp *last;
Queue *q;
cancel(a);
alarm(500, urptimer, 0);
for(up = urp, last = &urp[conf.nurp]; up < last; up++){
if(up->state==0)
continue;
if(up->unacked!=up->next && NOW>up->timer){
q = up->rq;
if(q)
wakeup(&q->r);
}
}
.
973a
wakeup(&up->r);
poperror();
.
972d
970c
sleep(&up->rq->r, todo, up);
.
966d
957,964c
if(up->state & HUNGUP)
.
951d
947c
up->kstarted = 1;
.
944c
Urp *up;
.
678a
out:
.
656,658c
goto out;
.
610,612c
goto out;
.
226,229c
/*
* wait for kernel process to die
*/
while(up->kstarted)
sleep(&up->r, isdead, up);
up->state = 0;
.
202c
* tell kernel process to die
.
173a
isdead(void *a)
{
Urp *up;
up = (Urp *)a;
return up->kstarted==0;
}
static int
.
163,167c
sprint(name, "urp%d", up - urp);
kproc(name, urpkproc, up);
.
127a
alarm(500, urptimer, 0);
.
118a
static void urptimer(Alarm*);
.
14c
#define DPRINT /*if(q->flag&QDEBUG)kprint*/
.
## diffname gnot/sturp.c 1990/0731
## diff -e /n/bootesdump/1990/0728/sys/src/9/68020/sturp.c /n/bootesdump/1990/0731/sys/src/9/68020/sturp.c
991c
if((up->unacked!=up->next || (up->state&INITING)) && NOW>up->timer){
.
## diffname gnot/sturp.c 1990/0804
## diff -e /n/bootesdump/1990/0731/sys/src/9/68020/sturp.c /n/bootesdump/1990/0804/sys/src/9/68020/sturp.c
991,995c
if(up->rq && todo(up))
wakeup(&up->rq->r);
.
973d
946c
return (up->state&INITING)
? NOW>up->timer /* time to INIT1 */
: ((up->unacked!=up->next && NOW>up->timer) /* time to ENQ */
|| (!QFULL(up->rq->next) && up->iseq!=(up->lastecho&7))); /* time to ECHO */
.
## diffname gnot/sturp.c 1990/0809
## diff -e /n/bootesdump/1990/0804/sys/src/9/68020/sturp.c /n/bootesdump/1990/0809/sys/src/9/68020/sturp.c
949a
|| WINDOW(up)>0 && up->next!=up->nxb
.
14c
#define DPRINT if(q->flag&QDEBUG)kprint
.
## diffname gnot/sturp.c 1990/0814
## diff -e /n/bootesdump/1990/0809/sys/src/9/68020/sturp.c /n/bootesdump/1990/0814/sys/src/9/68020/sturp.c
777d
774,775c
if(bp == 0)
.
## diffname gnot/sturp.c 1990/0911
## diff -e /n/bootesdump/1990/0814/sys/src/9/68020/sturp.c /n/bootesdump/1990/0911/sys/src/9/68020/sturp.c
127d
124c
static void
.
122c
Qinfo urpinfo = { urpciput, urpoput, urpopen, urpclose, "urp", urpreset };
.
103a
static void urpreset(void);
.
## diffname gnot/sturp.c 1990/0925
## diff -e /n/bootesdump/1990/0911/sys/src/9/68020/sturp.c /n/bootesdump/1990/0925/sys/src/9/68020/sturp.c
969c
sendack(up, ECHO);
.
947a
|| up->tofree != up->unacked
.
915a
qunlock(&up->xmit);
.
908d
904d
893a
up->tofree = 1;
.
881a
qlock(&up->xmit);
.
819,826d
817c
for(; up->unacked != next; up->unacked = NEXT(up->unacked))
.
733,753c
qlock(&up->ack);
if(!QFULL(up->rq->next) && (up->lastecho&Nmask)!=up->iseq)
sendctl(up, up->lastecho = type|up->iseq);
.
729c
sendack(Urp *up, int type)
.
718c
if(!QFULL(up->rq->next) && (up->lastecho&~Nmask)==ECHO){
.
715a
.
683d
674,681c
sendblock(up, up->next);
.
667a
* free any blocks that can be freed
*/
while(up->tofree != up->unacked){
if(up->xb[up->tofree] == 0)
urpvomit("output2", up);
else
freeb(up->xb[up->tofree]);
up->xb[up->tofree] = 0;
up->tofree = NEXT(up->tofree);
}
/*
.
602,603c
qlock(&up->xmit);
.
520,522c
sendack(up, ECHO);
.
518d
348c
sendack(up, ECHO);
.
343,346d
158c
unlock(&urpalloc);
.
150a
unlock(&urpalloc);
.
146,147d
142,143c
lock(&urpalloc);
for(up = urp; up < &urp[conf.nurp]; up++)
.
115c
static void sendack(Urp*, int);
.
99a
Lock urpalloc;
.
59d
54a
int tofree; /* first block to be freed (unacked is first not to) */
.
36d
## diffname gnot/sturp.c 1990/0928
## diff -e /n/bootesdump/1990/0925/sys/src/9/68020/sturp.c /n/bootesdump/1990/0928/sys/src/9/68020/sturp.c
941c
sendack(up);
.
919d
886d
878a
qunlock(&up->xl[i]);
.
875a
qlock(&up->xl[i]);
.
865d
852d
796a
qlock(&up->xl[i]);
if(up->xb[i])
freeb(up->xb[i]);
else
urpvomit("rcvack", up);
up->xb[i] = 0;
qunlock(&up->xl[i]);
}
.
795c
for(; up->unacked != next; up->unacked = NEXT(up->unacked)){
.
729,731c
/*
* check the precondition for acking
*/
if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq)
return;
if(!canqlock(&up->ack))
return;
/*
* check again now that we've locked
*/
if(QFULL(up->rq->next) || (up->lastecho&Nmask)==up->iseq){
qunlock(&up->ack);
return;
}
/*
* send the ack
*/
sendctl(up, up->lastecho = ECHO|up->iseq);
.
725c
sendack(Urp *up)
.
714c
if((up->lastecho&~Nmask) == ECHO){
.
711d
678a
poperror();
.
677c
i = up->next;
qlock(&up->xl[i]);
if(waserror()){
qunlock(&up->xl[i]);
nexterror();
}
sendblock(up, i);
qunlock(&up->xl[i]);
.
659,670d
594c
if(!canqlock(&up->xmit))
return;
.
514c
if(!QFULL(q->next))
sendctl(up, up->lastecho = ECHO|i);
qunlock(&up->ack);
.
512a
qlock(&up->ack);
.
343c
qunlock(&up->ack);
.
341a
qlock(&up->ack);
i = ctl & Nmask;
if(!QFULL(q->next))
sendctl(up, up->lastecho = ECHO+i);
.
157c
qunlock(up);
.
149d
145a
qunlock(up);
}
.
142,143c
for(up = urp; up < &urp[conf.nurp]; up++){
qlock(up);
.
115c
static void sendack(Urp*);
.
99d
58a
QLock xl[8];
.
54d
35a
QLock;
.
## diffname gnot/sturp.c 1990/0930
## diff -e /n/bootesdump/1990/0928/sys/src/9/68020/sturp.c /n/bootesdump/1990/0930/sys/src/9/68020/sturp.c
947c
: ((up->unechoed!=up->next && NOW>up->timer) /* time to ENQ */
.
656c
} else if(up->unechoed!=up->next && NOW>up->timer){
.
## diffname gnot/sturp.c 1990/1002
## diff -e /n/bootesdump/1990/0930/sys/src/9/68020/sturp.c /n/bootesdump/1990/1002/sys/src/9/68020/sturp.c
948,949c
|| (WINDOW(up)>0 && (up->next!=up->nxb || up->wq->first)) /* open xmit window */
|| (up->iseq!=(up->lastecho&7) && !QFULL(up->rq->next))); /* time to ECHO */
.
797c
PUTNEXT(q, m);
DPRINT("sb %d\n", bn);
.
784c
PUTNEXT(q, m);
.
767c
if(QFULL(q->next))
.
765a
q = up->wq;
.
764a
Queue *q;
.
706c
PUTNEXT(q, bp);
DPRINT("sCTL %ulx\n", ctl);
.
699c
q = up->wq;
if(QFULL(q->next))
.
697a
Queue *q;
.
467a
DPRINT("rACK %ux\n", ctl);
.
## diffname gnot/sturp.c 1990/1004
## diff -e /n/bootesdump/1990/1002/sys/src/9/68020/sturp.c /n/bootesdump/1990/1004/sys/src/9/68020/sturp.c
1000,1001c
if(up->rq && canlock(up)){
if(up->rq && NOW>up->timer
&& ((up->state&INITING) || up->unechoed!=up->next))
wakeup(&up->rq->r);
unlock(up);
}
.
993d
239a
up->rq = 0;
unlock(up);
.
238a
lock(up);
.
158c
unlock(up);
.
146c
unlock(up);
.
143c
lock(up);
.
36c
Lock;
.
14c
#define DPRINT if(q->flag&QDEBUG)print
.
## diffname gnot/sturp.c 1990/11151
## diff -e /n/bootesdump/1990/1004/sys/src/9/68020/sturp.c /n/bootesdump/1990/11151/sys/src/9/68020/sturp.c
123c
Qinfo urpinfo =
{
urpciput,
urpoput,
urpopen,
urpclose,
"urp",
urpreset
};
.
## diffname gnot/sturp.c 1990/11211
## diff -e /n/bootesdump/1990/11151/sys/src/9/68020/sturp.c /n/bootesdump/1990/11211/sys/src/9/68020/sturp.c
159c
error(Egreg);
.
## diffname gnot/sturp.c 1990/1206
## diff -e /n/bootesdump/1990/11211/sys/src/9/68020/sturp.c /n/bootesdump/1990/1206/sys/src/9/68020/sturp.c
1015a
tsleep(&urpkr, return0, 0, 1000);
.
984,1014c
for(up = urp; up < eup; up++){
if(up->state==0 || (up->state&HUNGUP))
break;
if(!canqlock(up))
continue;
if(up->state==0 || (up->state&HUNGUP))
break;
if(up->iseq!=(up->lastecho&7) && !QFULL(up->rq->next))
sendack(up);
output(up);
qunlock(up);
.
977,982c
eup = urp + conf.nurp;
.
974,975c
if(waserror())
;
.
972c
Urp *up, *eup;
.
953,968d
876c
wakeup(&urpkr);
.
443c
wakeup(&urpkr);
.
322c
wakeup(&urpkr);
.
263d
249,250c
qunlock(up);
.
241,247c
qlock(up);
.
220d
215,218d
181,188d
169,174d
166c
qunlock(up);
.
154c
qunlock(up);
.
151c
qlock(up);
.
146a
if(!urpkstarted){
qlock(&urpkl);
if(!urpkstarted){
urpkstarted = 1;
kproc("urpkproc", urpkproc, 0);
}
qunlock(&urpkl);
}
.
137d
120d
99,100d
64a
Urp *urp;
Rendez urpkr;
QLock urpkl;
int urpkstarted;
.
62,63d
38c
Rendez r; /* process waiting for output to finish */
.
36c
QLock;
.
## diffname gnot/sturp.c 1990/1210 # deleted
## diff -e /n/bootesdump/1990/1206/sys/src/9/68020/sturp.c /n/bootesdump/1990/1210/sys/src/9/68020/sturp.c
1,987d
|