URI: 
       tnonblock - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
   DIR commit 2d2e5c71f73a8ac0656ad26b406945937f4b2c0e
   DIR parent b733ffba4fa1af07540e9687b2f84c4f3014063e
  HTML Author: rsc <devnull@localhost>
       Date:   Sun, 25 Jun 2006 21:04:52 +0000
       
       nonblock
       
       Diffstat:
         M src/libmux/io.c                     |      17 +++++++++++++----
         M src/libmux/mux.c                    |     158 +++++++++++++++++++++++--------
       
       2 files changed, 132 insertions(+), 43 deletions(-)
       ---
   DIR diff --git a/src/libmux/io.c b/src/libmux/io.c
       t@@ -74,7 +74,7 @@ _muxsendproc(void *v)
        }
        
        void*
       -_muxrecv(Mux *mux)
       +_muxrecv(Mux *mux, int canblock)
        {
                void *p;
        
       t@@ -88,15 +88,24 @@ _muxrecv(Mux *mux)
        */
                if(mux->readq){
                        qunlock(&mux->lk);
       -                return _muxqrecv(mux->readq);
       +                if(canblock)
       +                        return _muxqrecv(mux->readq);
       +                return _muxnbqrecv(mux->readq);
                }
        
                qlock(&mux->inlk);
                qunlock(&mux->lk);
       -        p = mux->recv(mux);
       +        if(canblock)
       +                p = mux->recv(mux);
       +        else{
       +                if(mux->nbrecv)
       +                        p = mux->nbrecv(mux);
       +                else
       +                        p = nil;
       +        }
                qunlock(&mux->inlk);
        /*
       -        if(!p)
       +        if(!p && canblock)
                        vthangup(mux);
        */
                return p;
   DIR diff --git a/src/libmux/mux.c b/src/libmux/mux.c
       t@@ -26,21 +26,31 @@ muxinit(Mux *mux)
                mux->sleep.prev = &mux->sleep;
        }
        
       -void*
       -muxrpc(Mux *mux, void *tx)
       +static Muxrpc*
       +allocmuxrpc(Mux *mux)
        {
       -        int tag;
       -        Muxrpc *r, *r2;
       -        void *p;
       -
       +        Muxrpc *r;
       +        
                /* must malloc because stack could be private */
                r = mallocz(sizeof(Muxrpc), 1);
                if(r == nil){
                        werrstr("mallocz: %r");
                        return nil;
                }
       +        r->mux = mux;
                r->r.l = &mux->lk;
       +        r->waiting = 1;
       +        
       +        return r;
       +}
        
       +static int
       +tagmuxrpc(Muxrpc *r, void *tx)
       +{
       +        int tag;
       +        Mux *mux;
       +        
       +        mux = r->mux;
                /* assign the tag, add selves to response queue */
                qlock(&mux->lk);
                tag = gettag(mux, r);
       t@@ -56,54 +66,83 @@ muxrpc(Mux *mux, void *tx)
                        dequeue(mux, r);
                        puttag(mux, r);
                        qunlock(&mux->lk);
       -                return nil;
       +                return -1;
                }
       +        return 0;
       +}
       +
       +void
       +muxmsgandqlock(Mux *mux, void *p)
       +{
       +        int tag;
       +        Muxrpc *r2;
       +
       +        tag = mux->gettag(mux, p) - mux->mintag;
       +/*print("mux tag %d\n", tag); */
       +        qlock(&mux->lk);
       +        /* hand packet to correct sleeper */
       +        if(tag < 0 || tag >= mux->mwait){
       +                fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
       +                /* must leak packet! don't know how to free it! */
       +                return;
       +        }
       +        r2 = mux->wait[tag];
       +        if(r2 == nil || r2->prev == nil){
       +                fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
       +                /* must leak packet! don't know how to free it! */
       +                return;
       +        }        
       +        r2->p = p;
       +        dequeue(mux, r2);
       +        rwakeup(&r2->r);
       +}
       +
       +void
       +electmuxer(Mux *mux)
       +{
       +        /* if there is anyone else sleeping, wake them to mux */
       +        if(mux->sleep.next != &mux->sleep){
       +                mux->muxer = mux->sleep.next;
       +                rwakeup(&mux->muxer->r);
       +        }else
       +                mux->muxer = nil;
       +}
       +
       +void*
       +muxrpc(Mux *mux, void *tx)
       +{
       +        int tag;
       +        Muxrpc *r;
       +        void *p;
       +
       +        if((r = allocmuxrpc(mux)) == nil)
       +                return nil;
       +
       +        if((tag = tagmuxrpc(r, tx)) < 0)
       +                return nil;
        
                qlock(&mux->lk);
                /* wait for our packet */
       -        while(mux->muxer && !r->p){
       +        while(mux->muxer && mux->muxer != r && !r->p)
                        rsleep(&r->r);
       -        }
        
                /* if not done, there's no muxer: start muxing */
                if(!r->p){
       -                if(mux->muxer)
       +                if(mux->muxer != nil && mux->muxer != r)
                                abort();
       -                mux->muxer = 1;
       +                mux->muxer = r;
                        while(!r->p){
                                qunlock(&mux->lk);
       -                        p = _muxrecv(mux);
       -                        if(p)
       -                                tag = mux->gettag(mux, p) - mux->mintag;
       -                        else
       -                                tag = ~0;
       -/*print("mux tag %d\n", tag); */
       -                        qlock(&mux->lk);
       -                        if(p == nil){        /* eof -- just give up and pass the buck */
       +                        p = _muxrecv(mux, 1);
       +                        if(p == nil){
       +                                /* eof -- just give up and pass the buck */
       +                                qlock(&mux->lk);
                                        dequeue(mux, r);
                                        break;
                                }
       -                        /* hand packet to correct sleeper */
       -                        if(tag < 0 || tag >= mux->mwait){
       -                                fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
       -                                /* must leak packet! don't know how to free it! */
       -                                continue;
       -                        }
       -                        r2 = mux->wait[tag];
       -                        if(r2 == nil || r2->prev == nil){
       -                                fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
       -                                /* must leak packet! don't know how to free it! */
       -                                continue;
       -                        }        
       -                        r2->p = p;
       -                        dequeue(mux, r2);
       -                        rwakeup(&r2->r);
       +                        muxmsgandqlock(mux, p);
                        }
       -                mux->muxer = 0;
       -
       -                /* if there is anyone else sleeping, wake them to mux */
       -                if(mux->sleep.next != &mux->sleep)
       -                        rwakeup(&mux->sleep.next->r);
       +                electmuxer(mux);
                }
        /*print("finished %p\n", r); */
                p = r->p;
       t@@ -114,6 +153,47 @@ muxrpc(Mux *mux, void *tx)
                return p;
        }
        
       +Muxrpc*
       +muxrpcstart(Mux *mux, void *tx)
       +{
       +        int tag;
       +        Muxrpc *r;
       +
       +        if((r = allocmuxrpc(mux)) == nil)
       +                return nil;
       +        if((tag = tagmuxrpc(r, tx)) < 0)
       +                return nil;
       +        return r;
       +}
       +
       +void*
       +muxrpccanfinish(Muxrpc *r)
       +{
       +        char *p;
       +        Mux *mux;
       +        
       +        mux = r->mux;
       +        qlock(&mux->lk);
       +        if(!r->p && !mux->muxer){
       +                mux->muxer = r;
       +                while(!r->p){
       +                        qunlock(&mux->lk);
       +                        p = _muxrecv(mux, 0);
       +                        if(p == nil){
       +                                qlock(&mux->lk);
       +                                break;
       +                        }
       +                        muxmsgandqlock(mux, p);
       +                }
       +                electmuxer(mux);
       +        }
       +        p = r->p;
       +        if(p)
       +                puttag(mux, r);
       +        qunlock(&mux->lk);
       +        return p;
       +}
       +
        static void
        enqueue(Mux *mux, Muxrpc *r)
        {