## diffname gnot/stream.c 1990/03091
## diff -e /dev/null /n/bootesdump/1990/03091/sys/src/9/68020/stream.c
0a
#include "u.h"
#include "lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "io.h"
#include "errno.h"
#include "devtab.h"
static void stputq(Queue*, Block*);
Qinfo procinfo = { stputq, nullput, 0, 0, "process" } ;
/*extern Qinfo noetherinfo; */
static Qinfo *lds[] = {
/* &noetherinfo, */
0
};
enum {
Nclass=4,
};
/*
* All stream structures are ialloc'd at boot time
*/
Stream *slist;
Queue *qlist;
Block *blist;
static Lock garbagelock;
/*
* The block classes. There are Nclass block sizes, each with its own free list.
* All are ialloced at qinit() time.
*/
typedef struct {
int size;
Queue;
} Bclass;
Bclass bclass[Nclass]={
{ 0 },
{ 64 },
{ 512 },
{ 4096 },
};
/*
* Allocate streams, queues, and blocks. Allocate n block classes with
* 1/2(m+1) to class m < n-1
* 1/2(n-1) to class n-1
*/
void
streaminit(void)
{
int class, i, n;
Block *bp;
Bclass *bcp;
slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0);
qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0);
blist = (Block *)ialloc(conf.nblock * sizeof(Block), 0);
bp = blist;
n = conf.nblock;
for(class = 0; class < Nclass; class++){
if(class < Nclass-1)
n = n/2;
bcp = &bclass[class];
for(i = 0; i < n; i++) {
if(bcp->size)
bp->base = (uchar *)ialloc(bcp->size, 0);
bp->lim = bp->base + bcp->size;
bp->flags = class;
freeb(bp);
bp++;
}
}
}
/*
* allocate a block
*/
static int
isblock(void *arg)
{
Bclass *bcp;
bcp = (Bclass *)arg;
return bcp->first!=0;
}
Block *
allocb(ulong size)
{
Block *bp;
Bclass *bcp;
int i;
/*
* map size to class
*/
for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++)
;
/*
* look for a free block, garbage collect if there are none
*/
lock(bcp);
while(bcp->first == 0){
unlock(bcp);
print("waiting for blocks\n");
sleep(&bcp->r, isblock, (void *)bcp);
lock(bcp);
}
bp = bcp->first;
bcp->first = bp->next;
if(bcp->first == 0)
bcp->last = 0;
unlock(bcp);
/*
* return an empty block
*/
bp->rptr = bp->wptr = bp->base;
bp->next = 0;
bp->type = M_DATA;
bp->flags &= S_CLASS;
return bp;
}
/*
* Free a block. Poison its pointers so that someone trying to access
* it after freeing will cause a dump.
*/
void
freeb(Block *bp)
{
Bclass *bcp;
bcp = &bclass[bp->flags & S_CLASS];
bp->rptr = bp->wptr = 0;
lock(bcp);
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
bp->next = 0;
wakeup(&bcp->r);
unlock(bcp);
}
/*
* allocate a pair of queues. flavor them with the requested put routines.
* the `QINUSE' flag on the read side is the only one used.
*/
static Queue *
allocq(Qinfo *qi)
{
Queue *q, *wq;
for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) {
if(q->flag == 0){
if(canlock(q)){
if(q->flag == 0)
break;
unlock(q);
}
}
}
if(q == &qlist[conf.nqueue]){
print("no more queues\n");
error(0, Enoqueue);
}
q->flag = QINUSE;
q->r.p = 0;
q->info = qi;
q->put = qi->iput;
wq = q->other = q + 1;
wq->r.p = 0;
wq->info = qi;
wq->put = qi->oput;
wq->other = q;
unlock(q);
return q;
}
/*
* free a queue
*/
static void
freeq(Queue *q)
{
Block *bp;
q = RD(q);
while(bp = getq(q))
freeb(bp);
q = WR(q);
while(bp = getq(q))
freeb(bp);
RD(q)->flag = 0;
}
/*
* push a queue onto a stream referenced by the proc side write q
*/
Queue *
pushq(Stream* s, Qinfo *qi)
{
Queue *q;
Queue *nq;
q = RD(s->procq);
/*
* make the new queue
*/
nq = allocq(qi);
/*
* push
*/
RD(nq)->next = q;
RD(WR(q)->next)->next = RD(nq);
WR(nq)->next = WR(q)->next;
WR(q)->next = WR(nq);
if(qi->open)
(*qi->open)(RD(nq), s);
return WR(nq)->next;
}
/*
* pop off the top line discipline
*/
static void
popq(Stream *s)
{
Queue *q;
if(s->procq->next == WR(s->devq))
error(0, Ebadld);
q = s->procq->next;
if(q->info->close)
(*q->info->close)(RD(q));
s->procq->next = q->next;
RD(q->next)->next = RD(s->procq);
freeq(q);
}
/*
* add a block (or list of blocks) to the end of a queue. return true
* if one of the blocks contained a delimiter.
*/
int
putq(Queue *q, Block *bp)
{
int delim;
delim = 0;
lock(q);
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->len += bp->wptr - bp->rptr;
delim = bp->flags & S_DELIM;
while(bp->next) {
bp = bp->next;
q->len += bp->wptr - bp->rptr;
delim |= bp->flags & S_DELIM;
}
q->last = bp;
if(q->len >= Streamhi)
q->flag |= QHIWAT;
unlock(q);
return delim;
}
int
putb(Blist *q, Block *bp)
{
int delim;
delim = 0;
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->len += bp->wptr - bp->rptr;
delim = bp->flags & S_DELIM;
while(bp->next) {
bp = bp->next;
q->len += bp->wptr - bp->rptr;
delim |= bp->flags & S_DELIM;
}
q->last = bp;
bp->next = 0;
return delim;
}
/*
* add a block to the start of a queue
*/
static void
putbq(Blist *q, Block *bp)
{
lock(q);
if(q->first)
bp->next = q->first;
else
q->last = bp;
q->first = bp;
q->len += bp->wptr - bp->rptr;
unlock(q);
}
/*
* remove the first block from a queue
*/
Block *
getq(Queue *q)
{
Block *bp;
lock(q);
bp = q->first;
if(bp) {
q->first = bp->next;
if(q->first == 0)
q->last = 0;
q->len -= bp->wptr - bp->rptr;
if((q->flag&QHIWAT) && q->len < Streamhi/2){
wakeup(&q->other->next->other->r);
q->flag &= ~QHIWAT;
}
bp->next = 0;
}
unlock(q);
return bp;
}
Block *
getb(Blist *q)
{
Block *bp;
bp = q->first;
if(bp) {
q->first = bp->next;
if(q->first == 0)
q->last = 0;
q->len -= bp->wptr - bp->rptr;
bp->next = 0;
}
return bp;
}
/*
* put a block into the bit bucket
*/
void
nullput(Queue *q, Block *bp)
{
freeb(bp);
error(0, Ehungup);
}
/*
* find the info structure for line discipline 'name'
*/
static Qinfo *
qinfofind(char *name)
{
Qinfo **qip;
if(name == 0)
error(0, Ebadld);
for(qip = lds; *qip; qip++)
if(strcmp((*qip)->name, name)==0)
return *qip;
error(0, Ebadld);
}
/*
* send a hangup up a stream
*/
static void
hangup(Stream *s)
{
Block *bp;
bp = allocb(0);
bp->type = M_HANGUP;
(*s->devq->put)(s->devq, bp);
}
/*
* parse a string and return a pointer to the second element if the
* first matches name. bp->rptr will be updated to point to the
* second element.
*
* return 0 if no match.
*
* it is assumed that the block data is null terminated. streamwrite
* guarantees this.
*/
int
streamparse(char *name, Block *bp)
{
int len;
len = strlen(name);
if(bp->wptr - bp->rptr < len)
return 0;
if(strncmp(name, (char *)bp->rptr, len)==0){
if(bp->rptr[len] == ' ')
bp->rptr += len+1;
else if(bp->rptr[len])
return 0;
else
bp->rptr += len;
return 1;
}
return 0;
}
/*
* the per stream directory structure
*/
Dirtab streamdir[]={
"data", Sdataqid, 0, 0600,
"ctl", Sctlqid, 0, 0600,
};
/*
* A stream device consists of the contents of streamdir plus
* any directory supplied by the actual device.
*
* values of s:
* 0 to ntab-1 apply to the auxiliary directory.
* ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
*/
int
streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
{
Proc *p;
char buf[NAMELEN];
if(s < ntab)
tab = &tab[s];
else if(s < ntab + Shighqid - Slowqid + 1)
tab = &streamdir[s - ntab];
else
return -1;
devdir(c, STREAMQID(STREAMID(c->qid),tab->qid), tab->name, tab->length,
tab->perm, dp);
return 1;
}
/*
* create a new stream
*/
Stream *
streamnew(Chan *c, Qinfo *qi)
{
Stream *s;
Queue *q;
/*
* find a free stream struct
*/
for(s = slist; s < &slist[conf.nstream]; s++) {
if(s->inuse == 0){
if(canlock(s)){
if(s->inuse == 0)
break;
unlock(s);
}
}
}
if(s == &slist[conf.nstream]){
print("no more streams\n");
error(0, Enostream);
}
if(waserror()){
unlock(s);
streamclose(c);
nexterror();
}
/*
* marry a stream and a channel
*/
if(c){
c->stream = s;
s->type = c->type;
s->dev = c->dev;
s->id = STREAMID(c->qid);
} else
s->type = -1;
/*
* hang a device and process q off the stream
*/
s->inuse = 1;
s->tag[0] = 0;
q = allocq(&procinfo);
s->procq = WR(q);
q = allocq(qi);
s->devq = RD(q);
WR(s->procq)->next = WR(s->devq);
RD(s->procq)->next = 0;
RD(s->devq)->next = RD(s->procq);
WR(s->devq)->next = 0;
if(qi->open)
(*qi->open)(RD(s->devq), s);
c->flag |= COPEN;
unlock(s);
poperror();
return s;
}
/*
* (Re)open a stream. If this is the first open, create a stream.
*/
void
streamopen(Chan *c, Qinfo *qi)
{
Stream *s;
Queue *q;
/*
* if the stream already exists, just up the reference count.
*/
for(s = slist; s < &slist[conf.nstream]; s++) {
if(s->inuse && s->type == c->type && s->dev == c->dev
&& s->id == STREAMID(c->qid)){
lock(s);
if(s->inuse && s->type == c->type
&& s->dev == c->dev
&& s->id == STREAMID(c->qid)){
s->inuse++;
c->stream = s;
unlock(s);
return;
}
unlock(s);
}
}
/*
* create a new stream
*/
streamnew(c, qi);
}
/*
* On the last close of a stream, for each queue on the
* stream release its blocks and call its close routine.
*/
void
streamclose(Chan *c)
{
Queue *q, *nq;
Block *bp;
Stream *s = c->stream;
/*
* if not open, ignore it
*/
if(!(c->flag & COPEN))
return;
/*
* decrement the reference cound
*/
lock(s);
if(s->inuse != 1){
s->inuse--;
unlock(c->stream);
return;
}
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
if(q == s->devq->other)
break;
}
/*
* ascend the stream freeing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
freeq(q);
}
s->id = s->dev = s->type = 0;
s->inuse--;
unlock(s);
}
/*
* put a block to be read into the queue. wakeup any waiting reader
*/
void
stputq(Queue *q, Block *bp)
{
int i;
if(bp->type == M_HANGUP){
freeb(bp);
q->flag |= QHUNGUP;
q->other->flag |= QHUNGUP;
} else {
lock(q);
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->last = bp;
q->len += bp->wptr - bp->rptr;
if(q->len >= Streamhi)
q->flag |= QHIWAT;
unlock(q);
}
wakeup(&q->r);
}
/*
* read a string. update the offset accordingly.
*/
long
stringread(Chan *c, uchar *buf, long n, char *str)
{
long i;
i = strlen(str);
i -= c->offset;
if(i<n)
n = i;
if(n<0)
return 0;
memcpy(buf + c->offset, str, n);
c->offset += n;
return n;
}
/*
* return true if there is an output buffer available
*/
static int
isinput(void *x)
{
return ((Queue *)x)->first != 0;
}
/*
* read until we fill the buffer or until a DELIM is encountered
*/
long
streamread(Chan *c, void *vbuf, long n)
{
Block *bp;
Stream *s;
Queue *q;
long rv = 0;
int left, i, x;
uchar *buf = vbuf;
char num[32];
s = c->stream;
switch(STREAMTYPE(c->qid)){
case Sdataqid:
break;
case Sctlqid:
sprint(num, "%d", s->id);
return stringread(c, buf, n, num);
default:
if(CHDIR & c->qid)
return devdirread(c, vbuf, n, 0, 0, streamgen);
else
panic("streamread");
}
/*
* one reader at a time
*/
qlock(&s->rdlock);
if(waserror()){
qunlock(&s->rdlock);
nexterror();
}
/*
* sleep till data is available
*/
q = RD(s->procq);
left = n;
while(left){
bp = getq(q);
if(bp == 0){
if(q->flag & QHUNGUP)
break;
sleep(&q->r, &isinput, (void *)q);
continue;
}
i = bp->wptr - bp->rptr;
if(i <= left){
memcpy(buf, bp->rptr, i);
left -= i;
buf += i;
if(bp->flags & S_DELIM){
freeb(bp);
break;
} else
freeb(bp);
} else {
memcpy(buf, bp->rptr, left);
bp->rptr += left;
putbq(q, bp);
left = 0;
}
};
qunlock(&s->rdlock);
poperror();
return n - left;
}
/*
* Handle a ctl request. Streamwide requests are:
*
* hangup -- send an M_HANGUP up the stream
* push ldname -- push the line discipline named ldname
* pop -- pop a line discipline
*
* This routing is entrered with s->wrlock'ed and must unlock.
*/
static long
streamctlwrite(Stream *s, void *a, long n)
{
Qinfo *qi;
Block *bp;
/*
* package
*/
bp = allocb(n+1);
memcpy(bp->wptr, a, n);
bp->wptr[n] = 0;
bp->wptr += n + 1;
/*
* check for standard requests
*/
if(streamparse("hangup", bp)){
hangup(s);
freeb(bp);
} else if(streamparse("push", bp)){
qi = qinfofind((char *)bp->rptr);
pushq(s, qi);
freeb(bp);
} else if(streamparse("pop", bp)){
popq(s);
freeb(bp);
} else {
bp->type = M_CTL;
bp->flags |= S_DELIM;
PUTNEXT(s->procq, bp);
}
return n;
}
/*
* wait till there's room in the next stream
*/
static int
notfull(void *arg)
{
Queue *q;
q = (Queue *)arg;
return q->len < Streamhi;
}
void
flowctl(Queue *q)
{
if(q->next->len >= Streamhi)
sleep(&q->r, notfull, q->next);
}
/*
* send the request as a single delimited block
*/
long
streamwrite(Chan *c, void *a, long n)
{
Stream *s;
Block *bp;
Queue *q;
long rem;
int i;
/*
* one writer at a time
*/
s = c->stream;
qlock(&s->wrlock);
if(waserror()){
qunlock(&s->wrlock);
nexterror();
}
/*
* decode the qid
*/
switch(STREAMTYPE(c->qid)){
case Sdataqid:
break;
case Sctlqid:
n = streamctlwrite(s, a, n);
qunlock(&s->wrlock);
poperror();
return n;
default:
panic("bad stream qid\n");
}
/*
* No writes allowed on hungup channels
*/
q = s->procq;
if(q->other->flag & QHUNGUP)
error(0, Ehungup);
if(GLOBAL(a) || n==0){
/*
* `a' is global to the whole system, just create a
* pointer to it and pass it on.
*/
flowctl(q);
bp = allocb(0);
bp->rptr = bp->base = (uchar *)a;
bp->wptr = bp->lim = (uchar *)a+n;
bp->flags |= S_DELIM;
bp->type = M_DATA;
PUTNEXT(q, bp);
} else {
/*
* `a' is in the user's address space, copy it into
* system buffers and pass the buffers on.
*/
for(rem = n; ; rem -= i) {
flowctl(q);
bp = allocb(rem);
i = bp->lim - bp->wptr;
if(i >= rem){
memcpy(bp->wptr, a, rem);
bp->flags |= S_DELIM;
bp->wptr += rem;
bp->type = M_DATA;
PUTNEXT(q, bp);
break;
} else {
memcpy(bp->wptr, a, i);
bp->wptr += i;
bp->type = M_DATA;
PUTNEXT(q, bp);
a = ((char*)a) + i;
}
}
}
qunlock(&s->wrlock);
poperror();
return n;
}
.
## diffname gnot/stream.c 1990/0312
## diff -e /n/bootesdump/1990/03091/sys/src/9/68020/stream.c /n/bootesdump/1990/0312/sys/src/9/68020/stream.c
888a
}
/*
* like andrew's getmfields but no hidden state
*/
int
getfields(char *lp, /* to be parsed */
char **fields, /* where to put pointers */
int n, /* number of pointers */
char sep /* separator */
)
{
int i;
for(i=0; lp && *lp && i<n; i++){
while(*lp == sep)
*lp++=0;
if(*lp == 0)
break;
fields[i]=lp;
while(*lp && *lp != sep)
lp++;
}
return i;
.
849c
if((GLOBAL(a) && !docopy) || n==0){
.
809c
streamwrite(Chan *c, void *a, long n, int docopy)
.
719c
i = BLEN(bp);
.
654,655c
memcpy(buf, str + c->offset, n);
.
632d
630a
q->len += BLEN(bp);
while(bp->next) {
bp = bp->next;
q->len += BLEN(bp);
}
.
624a
wakeup(&q->other->r);
.
418c
if(BLEN(bp) < len)
.
363a
* make sure the first block has n bytes
*/
Block *
pullup(Block *bp, int n)
{
Block *nbp;
int i;
/*
* this should almost always be true, the rest it
* just for to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
if(bp->lim - bp->rptr < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while(nbp = bp->next){
i = BLEN(nbp);
if(i > n) {
memcpy(bp->wptr, nbp->rptr, n);
bp->wptr += n;
nbp->rptr += n;
return bp;
} else {
memcpy(bp->wptr, nbp->rptr, i);
bp->wptr += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
}
}
freeb(bp);
return 0;
}
/*
* grow the front of a list of blocks by n bytes
*/
Block *
prepend(Block *bp, int n)
{
Block *nbp;
if(bp->base && (bp->rptr - bp->base)>=n){
/*
* room for channel number in first block of message
*/
bp->rptr -= n;
return bp;
} else {
/*
* make new block, put message number at end
*/
nbp = allocb(2);
nbp->next = bp;
nbp->wptr = nbp->lim;
nbp->rptr = nbp->wptr - n;
return nbp;
}
}
/*
.
357c
q->len -= BLEN(bp);
.
346a
/*
* remove the first block from a list of blocks
*/
.
337c
q->len -= BLEN(bp);
.
324c
* remove the first block from a queue
.
319c
q->len += BLEN(bp);
.
310c
void
.
303d
299c
q->len += BLEN(bp);
.
295c
q->len += BLEN(bp);
.
276c
q->len += BLEN(bp);
.
272c
q->len += BLEN(bp);
.
149a
wakeup(&bcp->r);
.
147,148d
145a
tries = 0;
while(bp->next){
if(++tries > 10){
dumpstack();
panic("freeb");
}
bp = bp->next;
}
.
141a
bp->rptr = bp->wptr = 0;
.
140d
137a
int tries;
.
131,132c
* Free a block (or list of blocks). Poison its pointers so that
* someone trying to access it after freeing will cause a dump.
.
111a
qunlock(bcp);
.
110a
qlock(bcp);
.
37c
Blist;
QLock; /* qlock for sleepers on r */
Rendez r; /* sleep here waiting for blocks */
.
15c
&dkmuxinfo,
&urpinfo,
.
13a
/*
* line disciplines that can be pushed
*
* WARNING: this table should be the result of configuration
*/
extern Qinfo noetherinfo;
extern Qinfo dkmuxinfo;
extern Qinfo urpinfo;
.
11,12c
Qinfo procinfo = { stputq, nullput, 0, 0, "process" };
.
9a
/*
* process end line discipline
*/
.
## diffname gnot/stream.c 1990/0321
## diff -e /n/bootesdump/1990/0312/sys/src/9/68020/stream.c /n/bootesdump/1990/0321/sys/src/9/68020/stream.c
742c
if(delim)
wakeup(&q->r);
.
739a
delim = 1;
}
.
738c
if(q->len >= Streamhi){
.
735a
delim |= bp->flags & S_DELIM;
.
732a
delim = bp->flags & S_DELIM;
.
726a
delim = 0;
.
725a
delim = 1;
.
719c
int delim;
.
172c
if(bcp->r.p)
wakeup(&bcp->r);
.
91d
54,55c
{ 68 },
{ 260 },
.
33a
for(i=0; i<Nlds && lds[i]; i++)
if(lds[i] == qi)
return;
if(i == Nlds)
panic("pushable");
lds[i] = qi;
}
.
30,32c
void
newqinfo(Qinfo *qi)
{
int i;
.
21,28c
static Qinfo *lds[Nlds+1];
.
18,19d
9a
enum {
Nclass=4, /* number of block classes */
Nlds=32, /* max number of pushable line disciplines */
};
.
## diffname gnot/stream.c 1990/0322
## diff -e /n/bootesdump/1990/0321/sys/src/9/68020/stream.c /n/bootesdump/1990/0322/sys/src/9/68020/stream.c
129c
tsleep(&bcp->r, isblock, (void *)bcp, 250);
.
127c
if(loop++ > 10)
panic("waiting for blocks\n");
.
114d
112c
int loop=0;
.
## diffname gnot/stream.c 1990/03292
## diff -e /n/bootesdump/1990/0322/sys/src/9/68020/stream.c /n/bootesdump/1990/03292/sys/src/9/68020/stream.c
684c
if(!c->stream)
.
630d
## diffname gnot/stream.c 1990/0331
## diff -e /n/bootesdump/1990/03292/sys/src/9/68020/stream.c /n/bootesdump/1990/0331/sys/src/9/68020/stream.c
828,829c
if(q->flag & QHUNGUP){
if(s->hread++ < 3)
break;
else
error(0, Ehungup);
}
.
616a
s->hread = 0;
.
127a
}
.
126c
if(loop++ > 10){
dumpqueues();
dumpstack();
.
64a
* Dump all block information of how many blocks are in which queues
*/
void
dumpqueues(void)
{
Queue *q;
int count;
Block *bp;
for(q = qlist; q < qlist + conf.nqueue; q++, q++){
if(!(q->flag & QINUSE))
continue;
for(count = 0, bp = q->first; bp; bp = bp->next)
count++;
print("%s %ux RD count %d len %d", q->info->name, q, count, q->len);
for(count = 0, bp = WR(q)->first; bp; bp = bp->next)
count++;
print(" WR count %d len %d\n", count, WR(q)->len);
}
}
/*
.
61c
{ 1024 },
.
## diffname gnot/stream.c 1990/0403
## diff -e /n/bootesdump/1990/0331/sys/src/9/68020/stream.c /n/bootesdump/1990/0403/sys/src/9/68020/stream.c
1012c
FLOWCTL(q);
.
999c
FLOWCTL(q);
.
946,947c
sleep(&q->r, notfull, q->next);
.
941c
return !QFULL(q->next);
.
772c
if(q->len >= Streamhi || q->nb >= Streambhi){
.
768a
q->nb++;
.
764a
q->nb++;
.
390c
q->nb--;
if((q->flag&QHIWAT) && q->len < Streamhi/2 && q->nb < Streambhi){
.
333c
if(q->len >= Streamhi || q->nb >= Streambhi)
.
329a
q->nb++;
.
325a
q->nb++;
.
238a
wq->len = wq->nb = 0;
.
232a
q->len = q->nb = 0;
.
150,151c
print("waiting for blocks\n");
.
148c
if(loop++ == 10){
.
## diffname gnot/stream.c 1990/0406
## diff -e /n/bootesdump/1990/0403/sys/src/9/68020/stream.c /n/bootesdump/1990/0406/sys/src/9/68020/stream.c
944,947c
return !QFULL((Queue *)arg);
.
394c
if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2){
.
374a
q->nb++;
.
## diffname gnot/stream.c 1990/05313
## diff -e /n/bootesdump/1990/0406/sys/src/9/68020/stream.c /n/bootesdump/1990/05313/sys/src/9/68020/stream.c
182a
if((bp->flags&S_CLASS) >= Nclass)
panic("freeb class");
.
83a
print("\n");
for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
lock(bcp);
for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
;
unlock(bcp);
print("%d blocks of size %d\n", count, bcp->size);
}
print("\n");
.
82c
print(" W c %d l %d f %ux\n", count, WR(q)->len, WR(q)->flag);
.
79c
print("%s %ux R c %d l %d f %ux", q->info->name, q, count,
q->len, q->flag);
.
73a
print("\n");
.
72a
Bclass *bcp;
.
61c
{ 4096 },
.
## diffname gnot/stream.c 1990/0620
## diff -e /n/bootesdump/1990/05313/sys/src/9/68020/stream.c /n/bootesdump/1990/0620/sys/src/9/68020/stream.c
757,758c
/*
* leave it and free it
*/
streamexit(s, 1);
.
755c
flushq(q);
.
751c
* ascend the stream flushing the queues
.
749a
.
735,736c
if(s->opens != 1){
s->opens--;
.
732c
* decrement the reference count
.
714a
* Enter a stream. Increment the reference count so it can't disappear
* under foot.
*/
int
streamenter(Stream *s)
{
lock(s);
if(s->opens == 0){
unlock(s);
return -1;
}
s->inuse++;
unlock(s);
return 0;
}
/*
* Decrement the reference count on a stream. If the count is
* zero, free the stream.
*/
void
streamexit(Stream *s, int locked)
{
Queue *q;
Queue *nq;
if(!locked)
lock(s);
s->inuse--;
if(s->inuse != 0){
if(!locked)
unlock(s);
return;
}
/*
* ascend the stream freeing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
freeq(q);
}
s->id = s->dev = s->type = 0;
if(!locked)
unlock(s);
}
/*
.
699a
s->opens++;
.
662d
660a
s->opens = 1;
.
260a
* flush a queue
*/
static void
flushq(Queue *q)
{
Block *bp;
q = RD(q);
while(bp = getq(q))
freeb(bp);
q = WR(q);
while(bp = getq(q))
freeb(bp);
}
/*
.
## diffname gnot/stream.c 1990/0621
## diff -e /n/bootesdump/1990/0620/sys/src/9/68020/stream.c /n/bootesdump/1990/0621/sys/src/9/68020/stream.c
805,822c
s->opens--;
.
800,803c
if(s->opens == 1){
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
/* this may be 2 streams joined device end to device end */
if(q == s->devq->other)
break;
}
/*
* ascend the stream flushing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
flushq(q);
}
.
766,774c
s->inuse--;
.
760,764c
if(s->inuse == 1){
/*
* ascend the stream freeing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
freeq(q);
}
s->id = s->dev = s->type = 0;
.
534,535c
if(bp->type == M_HANGUP)
freeb(bp);
else {
freeb(bp);
error(0, Ehungup);
}
.
## diffname gnot/stream.c 1990/0702
## diff -e /n/bootesdump/1990/0621/sys/src/9/68020/stream.c /n/bootesdump/1990/0702/sys/src/9/68020/stream.c
248a
wq->flag = QINUSE;
.
85c
print(" W c %d l %d f %ux r %ux\n", count, WR(q)->len, WR(q)->flag,
&(WR(q)->r));
.
81,82c
print("%s %ux R c %d l %d f %ux r %ux", q->info->name, q, count,
q->len, q->flag, &(q->r));
.
## diffname gnot/stream.c 1990/0703
## diff -e /n/bootesdump/1990/0702/sys/src/9/68020/stream.c /n/bootesdump/1990/0703/sys/src/9/68020/stream.c
86a
dumpblocks(q, 'R');
dumpblocks(q, 'W');
.
83,85c
print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
.
79,81c
print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
.
67a
dumpblocks(Queue *q, char c)
{
Block *bp;
uchar *cp;
lock(q);
for(bp = q->first; bp; bp = bp->next){
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
print(" %uo", *cp);
print("\n");
}
unlock(q);
}
void
.
## diffname gnot/stream.c 1990/0704
## diff -e /n/bootesdump/1990/0703/sys/src/9/68020/stream.c /n/bootesdump/1990/0704/sys/src/9/68020/stream.c
100c
dumpblocks(WR(q), 'W');
.
## diffname gnot/stream.c 1990/0801
## diff -e /n/bootesdump/1990/0704/sys/src/9/68020/stream.c /n/bootesdump/1990/0801/sys/src/9/68020/stream.c
1157a
}
/*
* stat a stream. the length is the number of bytes up to the
* first delimiter.
*/
void
streamstat(Chan *c, char *db, char *name)
{
Dir dir;
Stream *s;
Queue *q;
Block *bp;
long n;
s = c->stream;
if(s == 0)
panic("streamstat");
q = RD(s->procq);
lock(q);
for(n=0, bp=q->first; bp; bp = bp->next){
n += BLEN(bp);
if(bp->flags&S_DELIM)
break;
}
unlock(q);
devdir(c, c->qid, name, n, 0, &dir);
convD2M(&dir, db);
.
8a
#include "fcall.h"
.
## diffname gnot/stream.c 1990/08272
## diff -e /n/bootesdump/1990/0801/sys/src/9/68020/stream.c /n/bootesdump/1990/08272/sys/src/9/68020/stream.c
988a
if(left<0 || left>n){
print("streamread returns %d for a %d read\n", n-left, n);
panic("streamread");
}
.
## diffname gnot/stream.c 1990/0905
## diff -e /n/bootesdump/1990/08272/sys/src/9/68020/stream.c /n/bootesdump/1990/0905/sys/src/9/68020/stream.c
234a
* pad a block to the front with n bytes
*/
Block *
padb(Block *bp, int n)
{
Block *nbp;
if(bp->base && bp->rptr-bp->base>=n){
bp->rptr -= n;
return bp;
} else {
nbp = allocb(n);
nbp->wptr = nbp->lim;
nbp->rptr = nbp->wptr - n;
nbp->next = nbp;
return nbp;
}
}
/*
.
## diffname gnot/stream.c 1990/09051
## diff -e /n/bootesdump/1990/0905/sys/src/9/68020/stream.c /n/bootesdump/1990/09051/sys/src/9/68020/stream.c
249c
nbp->next = bp;
.
## diffname gnot/stream.c 1990/0911
## diff -e /n/bootesdump/1990/09051/sys/src/9/68020/stream.c /n/bootesdump/1990/0911/sys/src/9/68020/stream.c
1212a
}
/*
* Dump all block information of how many blocks are in which queues
*/
void
dumpblocks(Queue *q, char c)
{
Block *bp;
uchar *cp;
lock(q);
for(bp = q->first; bp; bp = bp->next){
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
print(" %uo", *cp);
print("\n");
}
unlock(q);
}
void
dumpqueues(void)
{
Queue *q;
int count;
Block *bp;
Bclass *bcp;
print("\n");
for(q = qlist; q < qlist + conf.nqueue; q++, q++){
if(!(q->flag & QINUSE))
continue;
print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
q->len, q->flag, &(q->r));
print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
&(WR(q)->r));
dumpblocks(q, 'R');
dumpblocks(WR(q), 'W');
}
print("\n");
for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
lock(bcp);
for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
;
unlock(bcp);
print("%d blocks of size %d\n", count, bcp->size);
}
print("\n");
.
1009,1012d
589,591c
for(qi = lds; qi; qi = qi->next)
if(strcmp(qi->name, name)==0)
return qi;
.
585c
Qinfo *qi;
.
176,179d
162d
146a
* make known a stream module and call its initialization routine, if
* it has one.
*/
void
newqinfo(Qinfo *qi)
{
qi->next = lds;
lds = qi;
if(qi->reset)
(*qi->reset)();
}
/*
.
143a
/*
* make stream modules available
*/
streaminit0();
.
125a
/*
* allocate blocks, queues, and streams
*/
.
74,113d
65,72c
#include "stream.h"
.
27,39d
25c
static Qinfo *lds;
.
13d
## diffname gnot/stream.c 1990/0914
## diff -e /n/bootesdump/1990/0911/sys/src/9/68020/stream.c /n/bootesdump/1990/0914/sys/src/9/68020/stream.c
1160d
1151,1158c
n = 0;
else {
q = RD(s->procq);
lock(q);
for(n=0, bp=q->first; bp; bp = bp->next){
n += BLEN(bp);
if(bp->flags&S_DELIM)
break;
}
unlock(q);
.
1107,1108c
out:
/* qunlock(&s->wrlock);
poperror(); /**/
.
1056,1058c
goto out;
.
1046a
*/
.
1040,1041d
1037a
s = c->stream;
.
## diffname gnot/stream.c 1990/0925
## diff -e /n/bootesdump/1990/0914/sys/src/9/68020/stream.c /n/bootesdump/1990/0925/sys/src/9/68020/stream.c
1107,1108d
1041,1049d
130c
* look for a free block
.
## diffname gnot/stream.c 1990/0928
## diff -e /n/bootesdump/1990/0925/sys/src/9/68020/stream.c /n/bootesdump/1990/0928/sys/src/9/68020/stream.c
1097d
1060c
if(!docopy && GLOBAL(a)){
.
1043,1051c
if(STREAMTYPE(c->qid) != Sdataqid)
return streamctlwrite(c, a, n);
.
981a
if(STREAMTYPE(c->qid) != Sctlqid)
panic("streamctlwrite %lux", c->qid);
s = c->stream;
.
980a
Stream *s;
.
977c
streamctlwrite(Chan *c, void *a, long n)
.
919a
s = c->stream;
.
903,915c
if(STREAMTYPE(c->qid) != Sdataqid)
return streamctlread(c, vbuf, n);
.
901d
898,899c
int left, i;
.
880a
* return the stream id
*/
long
streamctlread(Chan *c, void *vbuf, long n)
{
uchar *buf = vbuf;
char num[32];
Stream *s;
s = c->stream;
if(STREAMTYPE(c->qid) == Sctlqid){
sprint(num, "%d", s->id);
return stringread(c, buf, n, num);
} else {
if(CHDIR & c->qid)
return devdirread(c, vbuf, n, 0, 0, streamgen);
else
panic("streamctlread");
}
}
/*
.
## diffname gnot/stream.c 1990/0930
## diff -e /n/bootesdump/1990/0928/sys/src/9/68020/stream.c /n/bootesdump/1990/0930/sys/src/9/68020/stream.c
908c
Queue *q;
q = (Queue *)x;
return (q->flag&QHUNGUP) || q->first!=0;
.
## diffname gnot/stream.c 1990/1009
## diff -e /n/bootesdump/1990/0930/sys/src/9/68020/stream.c /n/bootesdump/1990/1009/sys/src/9/68020/stream.c
1177c
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
.
819a
void
streamclose(Chan *c)
{
/*
* if no stream, ignore it
*/
if(!c->stream)
return;
streamclose1(c->stream);
}
.
783,788d
780d
776c
streamclose1(Stream *s)
.
724c
c->stream = streamnew(c->type, c->dev, STREAMID(c->qid), qi, 0);
.
702c
* if the stream already exists, just increment the reference counts.
.
673c
if(noopen)
s->opens = 0;
else
s->opens = 1;
.
661,667c
s->type = type;
s->dev = dev;
s->id = id;
.
659c
* identify the stream
.
654c
streamclose1(s);
.
631c
streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen)
.
628c
* create a new stream, if noopen is non-zero, don't increment the open count
.
244a
wq->ptr = 0;
.
237a
q->ptr = 0;
.
## diffname gnot/stream.c 1990/1011
## diff -e /n/bootesdump/1990/1009/sys/src/9/68020/stream.c /n/bootesdump/1990/1011/sys/src/9/68020/stream.c
787,795c
if(!waserror()){
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
}
poperror();
.
## diffname gnot/stream.c 1990/1018
## diff -e /n/bootesdump/1990/1011/sys/src/9/68020/stream.c /n/bootesdump/1990/1018/sys/src/9/68020/stream.c
1050a
qunlock(&q->rlock);
.
1049a
qlock(&q->rlock);
.
## diffname gnot/stream.c 1990/1024
## diff -e /n/bootesdump/1990/1018/sys/src/9/68020/stream.c /n/bootesdump/1990/1024/sys/src/9/68020/stream.c
793a
RD(q)->put = nullput;
WR(q)->put = nullput;
.
769a
return rv;
.
767a
rv = s->inuse;
.
753a
int rv;
.
749c
int
.
## diffname gnot/stream.c 1990/1101
## diff -e /n/bootesdump/1990/1024/sys/src/9/68020/stream.c /n/bootesdump/1990/1101/sys/src/9/68020/stream.c
797d
## diffname gnot/stream.c 1990/1104
## diff -e /n/bootesdump/1990/1101/sys/src/9/68020/stream.c /n/bootesdump/1990/1104/sys/src/9/68020/stream.c
758a
if(s->opens != 0)
print("streamexit %d %s\n", s->opens, s->devq->info->name);
.
754a
char *name;
.
## diffname gnot/stream.c 1990/11062
## diff -e /n/bootesdump/1990/1104/sys/src/9/68020/stream.c /n/bootesdump/1990/11062/sys/src/9/68020/stream.c
135a
if(waserror()){
qunlock(bcp);
nexterror();
}
.
## diffname gnot/stream.c 1990/1113
## diff -e /n/bootesdump/1990/11062/sys/src/9/68020/stream.c /n/bootesdump/1990/1113/sys/src/9/68020/stream.c
1065a
poperror();
.
1063a
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
.
141a
poperror();
.
## diffname gnot/stream.c 1990/11151
## diff -e /n/bootesdump/1990/1113/sys/src/9/68020/stream.c /n/bootesdump/1990/11151/sys/src/9/68020/stream.c
1009,1010d
831c
qunlock(s);
.
817,823c
/*
* ascend the stream flushing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
flushq(q);
.
815a
}
.
814a
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
.
807,812d
799,803c
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(!waserror()){
.
797c
qlock(s);
.
780c
qunlock(s);
.
766c
panic("streamexit %d %s\n", s->opens, s->devq->info->name);
.
763c
qlock(s);
.
746c
qunlock(s);
.
742c
qunlock(s);
.
740c
qlock(s);
.
723c
qunlock(s);
.
720c
qunlock(s);
.
713c
qlock(s);
.
693c
qunlock(s);
.
660c
qunlock(s);
.
651c
qunlock(s);
.
648c
if(canqlock(s)){
.
19c
Qinfo procinfo =
{
stputq,
nullput,
0,
0,
"process"
};
.
## diffname gnot/stream.c 1990/11161
## diff -e /n/bootesdump/1990/11151/sys/src/9/68020/stream.c /n/bootesdump/1990/11161/sys/src/9/68020/stream.c
1015a
*
* This routing is entrered with s->wrlock'ed and must unlock.
.
825,830c
/*
* ascend the stream flushing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
flushq(q);
}
.
823d
815c
WR(q)->put = nullput;
.
813c
poperror();
.
## diffname gnot/stream.c 1990/11211
## diff -e /n/bootesdump/1990/11161/sys/src/9/68020/stream.c /n/bootesdump/1990/11211/sys/src/9/68020/stream.c
1106c
error(Ehungup);
.
1098c
if(STREAMTYPE(c->qid.path) != Sdataqid)
.
1026c
if(STREAMTYPE(c->qid.path) != Sctlqid)
.
981c
error(Ehungup);
.
956c
if(STREAMTYPE(c->qid.path) != Sdataqid)
.
925c
if(CHDIR & c->qid.path)
.
921c
if(STREAMTYPE(c->qid.path) == Sctlqid){
.
848c
return streamclose1(c->stream);
.
840c
int
.
838a
return rv;
.
832c
rv = --(s->opens);
.
799a
int rv;
.
795c
int
.
737c
c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
.
723c
&& s->id == STREAMID(c->qid.path)){
.
719c
&& s->id == STREAMID(c->qid.path)){
.
664c
error(Enostream);
.
636c
devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length,
.
611,612c
"data", {Sdataqid}, 0, 0600,
"ctl", {Sctlqid}, 0, 0600,
.
561c
error(Ebadld);
.
557c
error(Ebadld);
.
544c
error(Ehungup);
.
338c
error(Ebadld);
.
242c
error(Enoqueue);
.
## diffname gnot/stream.c 1990/1127
## diff -e /n/bootesdump/1990/11211/sys/src/9/68020/stream.c /n/bootesdump/1990/1127/sys/src/9/68020/stream.c
140a
if(newblock(bcp) == 0)
continue;
.
113a
* upgrade a block 0 block to another class (called with bcp qlocked)
*/
newblock(Bclass *bcp)
{
Page *page;
int n;
Block *bp;
uchar *cp;
if(bcp->made > bcp->lim)
return;
if(bcp == bclass){
/*
* create some level zero blocks and return
*/
page = newpage(1, 0, 0);
page->va = VA(kmap(page));
n = BY2PG/sizeof(Block);
bp = (Block *)(page->va);
while(n-- > 0){
bp->flags = 0;
bp->base = bp->lim = bp->rptr = bp->wptr = 0;
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
bcp->made++;
bp++;
}
} else {
/*
* create a page worth of new blocks
*/
page = newpage(1, 0, 0);
page->va = VA(kmap(page));
n = BY2PG/bcp->size;
cp = (uchar *)(page->va);
while(n-- > 0){
/*
* upgrade a level 0 block
*/
bp = allocb(0);
qlock(bclass);
bclass->made--;
bcp->made++;
bp->flags = bcp - bclass;
qunlock(bclass);
/*
* tack on the data area
*/
bp->base = bp->rptr = bp->wptr = cp;
cp += bcp->size;
bp->lim = cp;
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
}
}
return;
}
/*
.
84,91c
bcp->lim = n;
bcp->made = 0;
.
77,78c
/*
* set limits on blocks
*/
.
73c
* allocate queues, streams
.
69d
46a
int lim;
int made;
.
38d
## diffname gnot/stream.c 1990/1128
## diff -e /n/bootesdump/1990/1127/sys/src/9/68020/stream.c /n/bootesdump/1990/1128/sys/src/9/68020/stream.c
1117a
freeb(bp);
} else if(streamparse("look", bp)){
qlook(s, (char *)bp->rptr);
.
1083a
* look ldname -- look for a line discipline
.
1078a
* look for an instance of the line discipline `name' on
* the stream `s'
*/
void
qlook(Stream *s, char *name)
{
Queue *q;
for(q = s->procq; q; q = q->next){
if(strcmp(q->info->name, name) == 0)
return;
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
}
errors("not found");
}
/*
.
## diffname gnot/stream.c 1990/1210 # deleted
## diff -e /n/bootesdump/1990/1128/sys/src/9/68020/stream.c /n/bootesdump/1990/1210/sys/src/9/68020/stream.c
1,1344d
|