Plan 9 from Bell Labs’s /usr/web/sources/contrib/rminnich/9.fastpath/port/devpipe.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#include	"u.h"
#include	"../port/lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"../port/error.h"

typedef struct Pipe	Pipe;
struct Pipe
{
	QLock;
	Pipe	*next;
	int	ref;
	ulong	path;
	Queue	*q[2];
	int	qref[2];
};

struct
{
	Lock;
	ulong	path;
} pipealloc;

enum
{
	Qdir,
	Qdata0,
	Qdata1,
	Qctl,
};

Dirtab pipedir[] =
{
	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
	"data",		{Qdata0},	0,		0600,
	"data1",	{Qdata1},	0,		0600,
	"ctl",		{Qctl},		0,		0600,
};

#define NPIPEDIR 4

#define PIPETYPE(x)	(((unsigned)x)&0x1f)
#define PIPEID(x)	((((unsigned)x))>>5)
#define PIPEQID(i, t)	((((unsigned)i)<<5)|(t))

static void
pipeinit(void)
{
	if(conf.pipeqsize == 0){
		if(conf.nmach > 1)
			conf.pipeqsize = 256*1024;
		else
			conf.pipeqsize = 32*1024;
	}
}

/*
 *  create a pipe, no streams are created until an open
 */
static Chan*
pipeattach(char *spec)
{
	Pipe *p;
	Chan *c;

	c = devattach('|', spec);
	p = malloc(sizeof(Pipe));
	if(p == 0)
		exhausted("memory");
	p->ref = 1;

	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
	if(p->q[0] == 0){
		free(p);
		exhausted("memory");
	}
	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
	if(p->q[1] == 0){
		free(p->q[0]);
		free(p);
		exhausted("memory");
	}

	lock(&pipealloc);
	p->path = ++pipealloc.path;
	unlock(&pipealloc);

	mkqid(&c->qid, PIPEQID(3*p->path, Qdir), 0, QTDIR);
	c->aux = p;
	return c;
}

static int
pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
{
	Qid q;
	int len;
	Pipe *p;

	if(i == DEVDOTDOT){
		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
		return 1;
	}
	i++;	/* skip . */
	if(tab==0 || i>=ntab)
		return -1;

	tab += i;
	p = c->aux;
	switch((ulong)tab->qid.path){
	case Qdata0:
		len = qlen(p->q[0]);
		break;
	case Qdata1:
		len = qlen(p->q[1]);
		break;
	default:
		len = tab->length;
		break;
	}
	mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
	devdir(c, q, tab->name, len, eve, tab->perm, dp);
	return 1;
}


static Walkqid*
pipewalk(Chan *c, Chan *nc, char **name, int nname)
{
	Walkqid *wq;
	Pipe *p;

	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
	if(wq != nil && wq->clone != nil && wq->clone != c){
		p = c->aux;
		qlock(p);
		p->ref++;
		if(c->flag & COPEN){
			print("channel open in pipewalk\n");
			switch(PIPETYPE(c->qid.path)){
			case Qdata0:
				p->qref[0]++;
				break;
			case Qdata1:
				p->qref[1]++;
				break;
			}
		}
		qunlock(p);
	}
	return wq;
}

static int
pipestat(Chan *c, uchar *db, int n)
{
	Pipe *p;
	Dir dir;

	p = c->aux;

	switch(PIPETYPE(c->qid.path)){
	case Qdir:
		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
		break;
	case Qdata0:
		devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
		break;
	case Qdata1:
		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
		break;
	case Qctl:
		devdir(c, c->qid, "ctl", 0, eve, 0600, &dir);
		break;
	default:
		panic("pipestat");
	}
	n = convD2M(&dir, db, n);
	if(n < BIT16SZ)
		error(Eshortstat);
	return n;
}

/*
 *  if the stream doesn't exist, create it
 */
static Chan*
pipeopen(Chan *c, int omode)
{
	Pipe *p;

	if(c->qid.type & QTDIR){
		if(omode != OREAD)
			error(Ebadarg);
		c->mode = omode;
		c->flag |= COPEN;
		c->offset = 0;
		return c;
	}

	p = c->aux;
	qlock(p);
	switch(PIPETYPE(c->qid.path)){
	case Qdata0:
		p->qref[0]++;
		break;
	case Qdata1:
		p->qref[1]++;
		break;
	}
	qunlock(p);

	c->mode = openmode(omode);
	c->flag |= COPEN;
	c->offset = 0;
	c->iounit = qiomaxatomic;
	return c;
}

static void
pipeclose(Chan *c)
{
	Pipe *p;

	p = c->aux;
	qlock(p);

	if(c->flag & COPEN){
		/*
		 *  closing either side hangs up the stream
		 */
		switch(PIPETYPE(c->qid.path)){
		case Qdata0:
			p->qref[0]--;
			if(p->qref[0] == 0){
				qhangup(p->q[1], 0);
				qclose(p->q[0]);
			}
			break;
		case Qdata1:
			p->qref[1]--;
			if(p->qref[1] == 0){
				qhangup(p->q[0], 0);
				qclose(p->q[1]);
			}
			break;
		}
	}


	/*
	 *  if both sides are closed, they are reusable
	 */
	if(p->qref[0] == 0 && p->qref[1] == 0){
		qreopen(p->q[0]);
		qreopen(p->q[1]);
	}

	/*
	 *  free the structure on last close
	 */
	p->ref--;
	if(p->ref == 0){
		qunlock(p);
		free(p->q[0]);
		free(p->q[1]);
		free(p);
	} else
		qunlock(p);
}

static long
piperead(Chan *c, void *va, long n, vlong)
{
	Pipe *p;

	p = c->aux;

	switch(PIPETYPE(c->qid.path)){
	case Qdir:
		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
	case Qdata0:
		return qread(p->q[0], va, n);
	case Qdata1:
		return qread(p->q[1], va, n);
	default:
		panic("piperead");
	}
	return -1;	/* not reached */
}

static Block*
pipebread(Chan *c, long n, ulong offset)
{
	Pipe *p;

	p = c->aux;

	switch(PIPETYPE(c->qid.path)){
	case Qdata0:
		return qbread(p->q[0], n);
	case Qdata1:
		return qbread(p->q[1], n);
	}

	return devbread(c, n, offset);
}

static long
pipefastwrite(Chan *c, void *, long n) {
	Pipe *p;
	void *b;
	int r = 0;

	if (n < 0) {
		panic("pipefastwrite len < 0");
	}

	//xchgw(&c->offset, c->offset + n);
	c->offset += n;
	/*
	lock(c);
	c->offset += n;
	unlock(c);
	*/

	return n;
	if (n > 1024)
		n = 1024;
	b = mallocz(1024, 1);
	if(waserror()) {
		/* avoid notes when pipe is a mounted queue */
		if((c->flag & CMSG) == 0)
			postnote(up, 1, "sys: write on closed pipe", NUser);
		nexterror();
	}

	p = c->aux;
//	print("pipe is %p, TYPE is %d\n", p,PIPETYPE(c->qid.path));
	switch(PIPETYPE(c->qid.path)){
	case Qdata0:
//		print("writing to data0\n");
		r = qwrite(p->q[1], b, n);
		break;

	case Qdata1:
//		print("writing to data1\n");
		r = qwrite(p->q[0], b, n);
		break;
	}

	poperror();
	return r;
}

/*
 *  a write to a closed pipe causes a note to be sent to
 *  the process.
 */
static long
pipewrite(Chan *c, void *va, long n, vlong)
{
	Pipe *p;
	char *s = nil;
	/* cmd syscall fd pointer len */
	char *tok[5];
	Fastcall f;
	int ntok;
	Fastcall *nfc;

	if(!islo())
		print("pipewrite hi %#p\n", getcallerpc(&c));
	if(waserror()) {
		/* avoid notes when pipe is a mounted queue */
		if((c->flag & CMSG) == 0)
			postnote(up, 1, "sys: write on closed pipe", NUser);
		nexterror();
	}

	p = c->aux;

	switch(PIPETYPE(c->qid.path)){
	case Qdata0:
		n = qwrite(p->q[1], va, n);
		break;

	case Qdata1:
		n = qwrite(p->q[0], va, n);
		break;

	case Qctl:
		print("hey Mr. Ctl file\n");
		s = malloc(n + 1);

		if (waserror()) {
			free(s);
			nexterror();
		}
		print("malloc'd...");
		memmove(s, va, n);
		print("memmove'd...");
		s[n] = 0;
		ntok = tokenize(s, tok, nelem(tok));
		print("tokenized!\n");
		if (ntok == 5 && !strcmp(tok[0], "fastwrite")) {
			Chan *fc;
			char *cmd = tok[0];
			int syscall = atoi(tok[1]);
			int fd = atoi(tok[2]);
			void *va = (void *)strtoul(tok[3], 0, 0);
			int len = strtoul(tok[4], 0, 0);
			print("command = %s, tokenized to (ntok = %d) cmd: %s, syscall: %d, fd: %p, va: %p, len: %d\n", s, ntok, cmd, syscall, fd, va, len);
			print("gimme a fastwrite on pipe %p\n", p);
			if (syscall < nsyscall)
				error("syscall must be > nsyscall");
			/* validate the address before the fd -- simpler error */
			if (len < 0)
				error("len < 0");
			validaddr((unsigned long)va, len, 0);
			print("validaddr\n");
			fc = fdtochan(fd, OWRITE, 1, 1);
			print("fc %p\n", fc);
			if (waserror()) {
				cclose(fc);
				nexterror();
			}

			if (fc->aux != c->aux) {
				error("fd is not for same pipe as ctl fd");
			}

			/* figure out which q it is, and inc the ref. */
			f.scnum = syscall;
			f.c = fc;

			qlock(p);
			switch(PIPETYPE(fc->qid.path)){
			case 0:
				p->qref[0]++;
				break;
			case 1:
				p->qref[1]++;
				break;
			}
			qunlock(p);

			f.fun = pipefastwrite;
			f.buf =va;
			f.n = len;
			nfc = malloc(sizeof(Fastcall)*(up->fcount + 1));
			memmove(nfc, up->fc, sizeof(Fastcall)*up->fcount);
			print("fcount = %d, giving scnum = %d, chan = %p, function = %p\n", up->fcount, f.scnum, f.c, f.fun);
			nfc[up->fcount] = f;
			up->fc = nfc;
			up->fcount++;
			print("fcount now %d\n", up->fcount);
			poperror();
			poperror();
			poperror();
			return n;
		} else {
			error("devpipe: bad command");
		}
		break;

	default:
		panic("pipewrite");
	}

	poperror();
	return n;
}

static long
pipebwrite(Chan *c, Block *bp, ulong)
{
	long n;
	Pipe *p;

	if(waserror()) {
		/* avoid notes when pipe is a mounted queue */
		if((c->flag & CMSG) == 0)
			postnote(up, 1, "sys: write on closed pipe", NUser);
		nexterror();
	}

	p = c->aux;
	switch(PIPETYPE(c->qid.path)){
	case Qdata0:
		n = qbwrite(p->q[1], bp);
		break;

	case Qdata1:
		n = qbwrite(p->q[0], bp);
		break;

	default:
		n = 0;
		panic("pipebwrite");
	}

	poperror();
	return n;
}

Dev pipedevtab = {
	'|',
	"pipe",

	devreset,
	pipeinit,
	devshutdown,
	pipeattach,
	pipewalk,
	pipestat,
	pipeopen,
	devcreate,
	pipeclose,
	piperead,
	pipebread,
	pipewrite,
	pipebwrite,
	devremove,
	devwstat,
};

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.