URI: 
       tImplement write buffer. - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
   DIR commit 24998851775d2d2a737a172dc614d9b5c91706dc
   DIR parent d49a2e4801752c8a1211c7fac8cc08055a6b6fa5
  HTML Author: rsc <devnull@localhost>
       Date:   Thu, 11 Mar 2004 19:14:09 +0000
       
       Implement write buffer.
       
       Diffstat:
         M src/cmd/venti/arena.c               |      22 +++++++++++++++++-----
         M src/cmd/venti/dat.h                 |      16 ++++++++++++++++
         M src/cmd/venti/dcache.c              |     272 ++++++++++++++++++++++++++++++-
         M src/cmd/venti/findscore.c           |      10 ----------
         M src/cmd/venti/fns.h                 |       4 +++-
         M src/cmd/venti/httpd.c               |      31 ++++++++++++++++++++++++-------
         M src/cmd/venti/index.c               |      11 +++++++++--
         M src/cmd/venti/lumpqueue.c           |       2 +-
         M src/cmd/venti/mkfile                |      15 ++++-----------
         M src/cmd/venti/part.c                |       2 +-
         M src/cmd/venti/stats.c               |       5 +++++
         M src/cmd/venti/venti.c               |      16 +++++++++++-----
       
       12 files changed, 358 insertions(+), 48 deletions(-)
       ---
   DIR diff --git a/src/cmd/venti/arena.c b/src/cmd/venti/arena.c
       t@@ -144,6 +144,7 @@ writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
                cib = getcib(arena, clump, 1, &r);
                if(cib == nil)
                        return -1;
       +        dirtydblock(cib->data, DirtyArenaCib);
                packclumpinfo(ci, &cib->data->data[cib->offset]);
                putcib(arena, cib);
                return 0;
       t@@ -239,11 +240,13 @@ writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n)
                                qunlock(&arena->lock);
                                return -1;
                        }
       +                dirtydblock(b, DirtyArena);
                        m = blocksize - off;
                        if(m > n - nn)
                                m = n - nn;
                        memmove(&b->data[off], &clbuf[nn], m);
       -                ok = writepart(arena->part, a, b->data, blocksize);
       +                // ok = writepart(arena->part, a, b->data, blocksize);
       +                ok = 0;
                        putdblock(b);
                        if(ok < 0){
                                qunlock(&arena->lock);
       t@@ -302,12 +305,13 @@ writeaclump(Arena *arena, Clump *c, u8int *clbuf)
                                qunlock(&arena->lock);
                                return TWID64;
                        }
       +                dirtydblock(b, DirtyArena);
                        m = blocksize - off;
                        if(m > n - nn)
                                m = n - nn;
                        memmove(&b->data[off], &clbuf[nn], m);
       -print("writing\n");
       -                ok = writepart(arena->part, a, b->data, blocksize);
       +        //        ok = writepart(arena->part, a, b->data, blocksize);
       +                ok = 0;
                        putdblock(b);
                        if(ok < 0){
                                qunlock(&arena->lock);
       t@@ -352,6 +356,7 @@ static void
        sealarena(Arena *arena)
        {
                flushciblocks(arena);
       +        flushdcache();
                arena->sealed = 1;
                wbarena(arena);
                backsumarena(arena);
       t@@ -439,6 +444,8 @@ ReadErr:
        
                /*
                 * check for no checksum or the same
       +         *
       +         * the writepart is okay because we flushed the dcache in sealarena
                 */
                if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){
                        if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0)
       t@@ -585,6 +592,7 @@ getcib(Arena *arena, int clump, int writing, CIBlock *rock)
                block = clump / arena->clumpmax;
                off = (clump - block * arena->clumpmax) * ClumpInfoSize;
        
       +/*
                if(arena->cib.block == block
                && arena->cib.data != nil){
                        arena->cib.offset = off;
       t@@ -596,6 +604,8 @@ getcib(Arena *arena, int clump, int writing, CIBlock *rock)
                        cib = &arena->cib;
                }else
                        cib = rock;
       +*/
       +        cib = rock;
        
                qlock(&stats.lock);
                stats.cireads++;
       t@@ -620,6 +630,8 @@ putcib(Arena *arena, CIBlock *cib)
        
        /*
         * must be called with arena locked
       + * 
       + * cache turned off now that dcache does write caching too.
         */
        int
        flushciblocks(Arena *arena)
       t@@ -631,8 +643,8 @@ flushciblocks(Arena *arena)
                qlock(&stats.lock);
                stats.ciwrites++;
                qunlock(&stats.lock);
       -        ok = writepart(arena->part, arena->base + arena->size - (arena->cib.block + 1) * arena->blocksize, arena->cib.data->data, arena->blocksize);
       -
       +//        ok = writepart(arena->part, arena->base + arena->size - (arena->cib.block + 1) * arena->blocksize, arena->cib.data->data, arena->blocksize);
       +        ok = 0;
                if(ok < 0)
                        seterr(EAdmin, "failed writing arena directory block");
                putdblock(arena->cib.data);
   DIR diff --git a/src/cmd/venti/dat.h b/src/cmd/venti/dat.h
       t@@ -115,6 +115,13 @@ enum
        
                MaxClumpBlocks                =  (VtMaxLumpSize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog,
        
       +        /*
       +         * dirty flags 
       +         */
       +        DirtyArena                = 1,
       +        DirtyIndex,
       +        DirtyArenaCib,
       +
                VentiZZZZZZZZ
        };
        
       t@@ -142,6 +149,7 @@ struct Part
                u64int                size;                        /* size of the partiton */
                u32int                blocksize;                /* block size for reads and writes */
                char                *name;
       +        Channel                *writechan;                /* chan[dcache.nblock](DBlock*) */
        };
        
        /*
       t@@ -156,6 +164,8 @@ struct DBlock
                Part        *part;                        /* partition in which cached */
                u64int        addr;                        /* base address on the partition */
                u16int        size;                        /* amount of data available, not amount allocated; should go away */
       +        u32int        dirty;
       +        u32int        dirtying;
                DBlock        *next;                        /* doubly linked hash chains */
                DBlock        *prev;
                u32int        heap;                        /* index in heap table */
       t@@ -163,6 +173,8 @@ struct DBlock
                u32int        used2;
                u32int        ref;                        /* reference count */
                QLock        lock;                        /* for access to data only */
       +        Channel        writedonechan;        
       +        void*        chanbuf[1];                /* buffer for the chan! */
        };
        
        /*
       t@@ -486,6 +498,10 @@ struct Stats
                long                iclookups;                /* index cache lookups */
                long                ichits;                        /* hits in the cache */
                long                icfills;                /* successful fills from index */
       +        long                absorbedwrites;                /* disk writes absorbed by dcache */
       +        long                dirtydblocks;                /* blocks dirtied */
       +        long                dcacheflushes;                /* times dcache has flushed */
       +        long                dcacheflushwrites;        /* blocks written by those flushes */
        };
        
        extern        Index                *mainindex;
   DIR diff --git a/src/cmd/venti/dcache.c b/src/cmd/venti/dcache.c
       t@@ -1,3 +1,7 @@
       +/*
       + * The locking here is getting a little out of hand.
       + */
       +
        #include "stdinc.h"
        #include "dat.h"
        #include "fns.h"
       t@@ -14,7 +18,12 @@ enum
        struct DCache
        {
                QLock                lock;
       +        RWLock                dirtylock;                /* must be held to inspect or set b->dirty */
       +        u32int                flushround;
       +        Rendez                anydirty;
                Rendez                full;
       +        Rendez                flush;
       +        Rendez                flushdone;
                DBlock                *free;                        /* list of available lumps */
                u32int                now;                        /* ticks for usage timestamps */
                int                size;                        /* max. size of any block; allocated to each block */
       t@@ -23,7 +32,10 @@ struct DCache
                DBlock                **heap;                        /* heap for locating victims */
                int                nblocks;                /* number of blocks allocated */
                DBlock                *blocks;                /* array of block descriptors */
       +        DBlock                **write;                /* array of block pointers to be written */
                u8int                *mem;                        /* memory for all block descriptors */
       +        int                ndirty;                        /* number of dirty blocks */
       +        int                maxdirty;                /* max. number of dirty blocks */
        };
        
        static DCache        dcache;
       t@@ -33,6 +45,10 @@ static int        upheap(int i, DBlock *b);
        static DBlock        *bumpdblock(void);
        static void        delheap(DBlock *db);
        static void        fixheap(int i, DBlock *b);
       +static void        _flushdcache(void);
       +static void        flushproc(void*);
       +static void        flushtimerproc(void*);
       +static void        writeproc(void*);
        
        void
        initdcache(u32int mem)
       t@@ -47,14 +63,20 @@ initdcache(u32int mem)
                        sysfatal("no max. block size given for disk cache");
                blocksize = maxblocksize;
                nblocks = mem / blocksize;
       -        if(0)
       -                fprint(2, "initialize disk cache with %d blocks of %d bytes\n", nblocks, blocksize);
                dcache.full.l = &dcache.lock;
       +        dcache.flush.l = &dcache.lock;
       +        dcache.anydirty.l = &dcache.lock;
       +        dcache.flushdone.l = &dcache.lock;
                dcache.nblocks = nblocks;
       +        dcache.maxdirty = (nblocks * 3) / 4;
       +        if(1)
       +                fprint(2, "initialize disk cache with %d blocks of %d bytes, maximum %d dirty blocks\n",
       +                        nblocks, blocksize, dcache.maxdirty);
                dcache.size = blocksize;
                dcache.heads = MKNZ(DBlock*, HashSize);
                dcache.heap = MKNZ(DBlock*, nblocks);
                dcache.blocks = MKNZ(DBlock, nblocks);
       +        dcache.write = MKNZ(DBlock*, nblocks);
                dcache.mem = MKNZ(u8int, nblocks * blocksize);
        
                last = nil;
       t@@ -62,11 +84,15 @@ initdcache(u32int mem)
                        b = &dcache.blocks[i];
                        b->data = &dcache.mem[i * blocksize];
                        b->heap = TWID32;
       +                chaninit(&b->writedonechan, sizeof(void*), 1);
                        b->next = last;
                        last = b;
                }
                dcache.free = last;
                dcache.nheap = 0;
       +
       +        vtproc(flushproc, nil);
       +        vtproc(flushtimerproc, nil);
        }
        
        static u32int
       t@@ -178,25 +204,70 @@ putdblock(DBlock *b)
                if(b == nil)
                        return;
        
       +        if(b->dirtying){
       +                b->dirtying = 0;
       +                runlock(&dcache.dirtylock);
       +        }
                qunlock(&b->lock);
       +
        //checkdcache();
                qlock(&dcache.lock);
       -        if(--b->ref == 0){
       +        if(b->dirty)
       +                delheap(b);
       +        else if(--b->ref == 0){
                        if(b->heap == TWID32)
                                upheap(dcache.nheap++, b);
       -                rwakeup(&dcache.full);
       +                rwakeupall(&dcache.full);
                }
        
                qunlock(&dcache.lock);
        //checkdcache();
        }
        
       +void
       +dirtydblock(DBlock *b, int dirty)
       +{
       +        int odirty;
       +        Part *p;
       +
       +fprint(2, "dirty %p\n", b);
       +        rlock(&dcache.dirtylock);
       +        assert(b->ref != 0);
       +        assert(b->dirtying == 0);
       +        b->dirtying = 1;
       +
       +        qlock(&stats.lock);
       +        if(b->dirty)
       +                stats.absorbedwrites++;
       +        stats.dirtydblocks++;
       +        qunlock(&stats.lock);
       +
       +        if(b->dirty)
       +                assert(b->dirty == dirty);
       +        odirty = b->dirty;
       +        b->dirty = dirty;
       +        p = b->part;
       +        if(p->writechan == nil){
       +fprint(2, "allocate write proc for part %s\n", p->name);
       +                /* XXX hope this doesn't fail! */
       +                p->writechan = chancreate(sizeof(DBlock*), dcache.nblocks);
       +                vtproc(writeproc, p);
       +        }
       +        qlock(&dcache.lock);
       +        if(!odirty){
       +                dcache.ndirty++;
       +                rwakeupall(&dcache.anydirty);
       +        }
       +        qunlock(&dcache.lock);
       +}
       +
        /*
         * remove some block from use and update the free list and counters
         */
        static DBlock*
        bumpdblock(void)
        {
       +        int flushed;
                DBlock *b;
                ulong h;
        
       t@@ -206,14 +277,20 @@ bumpdblock(void)
                        return b;
                }
        
       +        if(dcache.ndirty >= dcache.maxdirty)
       +                _flushdcache();
       +
                /*
                 * remove blocks until we find one that is unused
                 * referenced blocks are left in the heap even though
                 * they can't be scavenged; this is simple a speed optimization
                 */
       +        flushed = 0;
                for(;;){
       -                if(dcache.nheap == 0)
       +                if(dcache.nheap == 0){
       +                        _flushdcache();
                                return nil;
       +                }
                        b = dcache.heap[0];
                        delheap(b);
                        if(!b->ref)
       t@@ -242,6 +319,8 @@ bumpdblock(void)
        static void
        delheap(DBlock *db)
        {
       +        if(db->heap == TWID32)
       +                return;
                fixheap(db->heap, dcache.heap[--dcache.nheap]);
                db->heap = TWID32;
        }
       t@@ -370,3 +449,186 @@ checkdcache(void)
                        sysfatal("dc: missing blocks: %d %d %d", dcache.nheap, refed, dcache.nblocks);
                qunlock(&dcache.lock);
        }
       +
       +void
       +flushdcache(void)
       +{
       +        u32int flushround;
       +
       +        qlock(&dcache.lock);
       +        flushround = dcache.flushround;
       +        rwakeupall(&dcache.flush);
       +        while(flushround == dcache.flushround)
       +                rsleep(&dcache.flushdone);
       +        qunlock(&dcache.lock);
       +}
       +
       +static void
       +_flushdcache(void)
       +{
       +        rwakeupall(&dcache.flush);
       +}
       +
       +static int
       +parallelwrites(DBlock **b, DBlock **eb, int dirty)
       +{
       +        DBlock **p;
       +
       +        for(p=b; p<eb && (*p)->dirty == dirty; p++)
       +                sendp((*p)->part->writechan, *p);
       +        for(p=b; p<eb && (*p)->dirty == dirty; p++)
       +                recvp(&(*p)->writedonechan);
       +
       +        return p-b;
       +}
       +
       +/*
       + * Sort first by dirty flag, then by partition, then by address in partition.
       + */
       +static int
       +writeblockcmp(const void *va, const void *vb)
       +{
       +        DBlock *a, *b;
       +
       +        a = *(DBlock**)va;
       +        b = *(DBlock**)vb;
       +
       +        if(a->dirty != b->dirty)
       +                return a->dirty - b->dirty;
       +        if(a->part != b->part){
       +                if(a->part < b->part)
       +                        return -1;
       +                if(a->part > b->part)
       +                        return 1;
       +        }
       +        if(a->addr < b->addr)
       +                return -1;
       +        return 1;
       +}
       +
       +static void
       +flushtimerproc(void *v)
       +{
       +        u32int round;
       +
       +        for(;;){
       +                qlock(&dcache.lock);
       +                while(dcache.ndirty == 0)
       +                        rsleep(&dcache.anydirty);
       +                round = dcache.flushround;
       +                qunlock(&dcache.lock);
       +
       +                sleep(60*1000);
       +
       +                qlock(&dcache.lock);
       +                if(round == dcache.flushround){
       +                        rwakeupall(&dcache.flush);
       +                        while(round == dcache.flushround)
       +                                rsleep(&dcache.flushdone);
       +                }
       +                qunlock(&dcache.lock);
       +        }
       +}
       +
       +static void
       +flushproc(void *v)
       +{
       +        int i, n;
       +        DBlock *b, **write;
       +
       +        USED(v);
       +        for(;;){
       +                qlock(&dcache.lock);
       +                dcache.flushround++;
       +                rwakeupall(&dcache.flushdone);
       +                rsleep(&dcache.flush);
       +                qunlock(&dcache.lock);
       +
       +                fprint(2, "flushing dcache\n");
       +
       +                /*
       +                 * Because we don't record any dependencies at all, we must write out
       +                 * all blocks currently dirty.  Thus we must lock all the blocks that 
       +                 * are currently dirty.
       +                 *
       +                 * We grab dirtylock to stop the dirtying of new blocks.
       +                 * Then we wait until all the current blocks finish being dirtied.
       +                 * Now all the dirty blocks in the system are immutable (clean blocks
       +                 * might still get recycled), so we can plan our disk writes.
       +                 * 
       +                 * In a better scheme, dirtiers might lock the block for writing in getdblock,
       +                 * so that flushproc could lock all the blocks here and then unlock them as it
       +                 * finishes with them.
       +                 */ 
       +        
       +                fprint(2, "flushproc: wlock\n");
       +                wlock(&dcache.dirtylock);
       +
       +                fprint(2, "flushproc: build list\n");
       +                write = dcache.write;
       +                n = 0;
       +                for(i=0; i<dcache.nblocks; i++){
       +                        b = &dcache.blocks[i];
       +                        if(b->dirty)
       +                                write[n++] = b;
       +                }
       +
       +                qsort(write, n, sizeof(write[0]), writeblockcmp);
       +
       +                /*
       +                 * At the beginning of the array are the arena blocks.
       +                 */
       +                fprint(2, "flushproc: write arena blocks\n");
       +                i = 0;
       +                i += parallelwrites(write+i, write+n, DirtyArena);
       +
       +                /*
       +                 * Next are the index blocks.
       +                 */
       +                fprint(2, "flushproc: write index blocks\n");
       +                i += parallelwrites(write+i, write+n, DirtyIndex);
       +
       +                /*
       +                 * Finally, the arena clump info blocks.
       +                 */
       +                fprint(2, "flushproc: write cib blocks\n");
       +                i += parallelwrites(write+i, write+n, DirtyArenaCib);
       +
       +                assert(i == n);
       +
       +                fprint(2, "flushproc: update dirty bits\n");
       +                qlock(&dcache.lock);
       +                for(i=0; i<n; i++){
       +                        b = write[i];
       +                        b->dirty = 0;
       +                        --dcache.ndirty;
       +                        if(b->ref == 0 && b->heap == TWID32){
       +                                upheap(dcache.nheap++, b);
       +                                rwakeupall(&dcache.full);
       +                        }
       +                }
       +                qunlock(&dcache.lock);
       +                wunlock(&dcache.dirtylock);
       +
       +                qlock(&stats.lock);
       +                stats.dcacheflushes++;
       +                stats.dcacheflushwrites += n;
       +                qunlock(&stats.lock);
       +        }
       +}
       +
       +static void
       +writeproc(void *v)
       +{
       +        DBlock *b;
       +        Part *p;
       +
       +        p = v;
       +
       +        for(;;){
       +                b = recvp(p->writechan);
       +                if(writepart(p, b->addr, b->data, b->size) < 0)
       +                        fprint(2, "write error: %r\n"); /* XXX details! */
       +                sendp(&b->writedonechan, b);
       +        }
       +}
   DIR diff --git a/src/cmd/venti/findscore.c b/src/cmd/venti/findscore.c
       t@@ -18,16 +18,6 @@ clumpinfoeq(ClumpInfo *c, ClumpInfo *d)
                        && scorecmp(c->score, d->score)==0;
        }
        
       -/*
       - * synchronize the clump info directory with
       - * with the clumps actually stored in the arena.
       - * the directory should be at least as up to date
       - * as the arena's trailer.
       - *
       - * checks/updates at most n clumps.
       - *
       - * returns 1 if ok, -1 if an error occured, 0 if blocks were updated
       - */
        int
        findscore(Arena *arena, uchar *score)
        {
   DIR diff --git a/src/cmd/venti/fns.h b/src/cmd/venti/fns.h
       t@@ -14,6 +14,7 @@ int                clumpinfoeq(ClumpInfo *c, ClumpInfo *d);
        int                clumpinfoeq(ClumpInfo *c, ClumpInfo *d);
        u32int                clumpmagic(Arena *arena, u64int aa);
        int                delarena(Arena *arena);
       +void                dirtydblock(DBlock*, int);
        void                *emalloc(ulong);
        void                *erealloc(void *, ulong);
        char                *estrdup(char*);
       t@@ -21,6 +22,8 @@ void                *ezmalloc(ulong);
        Arena                *findarena(char *name);
        ISect                *findisect(Index *ix, u32int buck);
        int                flushciblocks(Arena *arena);
       +void                flushdcache(void);
       +void                flushqueue(void);
        void                fmtzbinit(Fmt *f, ZBlock *b);
        void                freearena(Arena *arena);
        void                freearenapart(ArenaPart *ap, int freearenas);
       t@@ -90,7 +93,6 @@ void                printindex(int fd, Index *ix);
        void                printstats(void);
        void                putdblock(DBlock *b);
        void                putlump(Lump *b);
       -void                queueflush(void);
        int                queuewrite(Lump *b, Packet *p, int creator);
        u32int                readarena(Arena *arena, u64int aa, u8int *buf, long n);
        int                readarenamap(AMapN *amn, Part *part, u64int base, u32int size);
   DIR diff --git a/src/cmd/venti/httpd.c b/src/cmd/venti/httpd.c
       t@@ -137,12 +137,14 @@ httpproc(void *v)
                c = v;
        
                for(t = 15*60*1000; ; t = 15*1000){
       -                if(hparsereq(c, t) <= 0)
       +fprint(2, "httpd: get headers\n");
       +                if(hparsereq(c, t) < 0)
                                break;
        
                        ok = -1;
                        for(i = 0; i < MaxObjs && objs[i].name[0]; i++){
                                if(strcmp(c->req.uri, objs[i].name) == 0){
       +fprint(2, "httpd: call function %p\n", objs[i].f);
                                        ok = (*objs[i].f)(c);
                                        break;
                                }
       t@@ -180,7 +182,10 @@ static int
        preq(HConnect *c)
        {
                if(hparseheaders(c, 15*60*1000) < 0)
       +{
       +fprint(2, "hparseheaders failed\n");
                        return -1;
       +}
                if(strcmp(c->req.meth, "GET") != 0
                && strcmp(c->req.meth, "HEAD") != 0)
                        return hunallowed(c, "GET, HEAD");
       t@@ -196,7 +201,7 @@ preqtext(HConnect *c)
                int r;
        
                r = preq(c);
       -        if(r <= 0)
       +        if(r < 0)
                        return r;
        
                hout = &c->hout;
       t@@ -221,7 +226,7 @@ notfound(HConnect *c)
                int r;
        
                r = preq(c);
       -        if(r <= 0)
       +        if(r < 0)
                        return r;
                return hfail(c, HNotFound, c->req.uri);
        }
       t@@ -233,9 +238,13 @@ estats(HConnect *c)
                int r;
        
                r = preqtext(c);
       -        if(r <= 0)
       +        if(r < 0)
       +{
       +fprint(2, "preqtext failed\n");
                        return r;
       +}
        
       +fprint(2, "write stats\n");
                hout = &c->hout;
                hprint(hout, "lump writes=%,ld\n", stats.lumpwrites);
                hprint(hout, "lump reads=%,ld\n", stats.lumpreads);
       t@@ -268,13 +277,21 @@ estats(HConnect *c)
                hprint(hout, "disk cache misses=%,ld\n", stats.pcmiss);
                hprint(hout, "disk cache reads=%,ld\n", stats.pcreads);
                hprint(hout, "disk cache bytes read=%,lld\n", stats.pcbreads);
       +fprint(2, "write new stats\n");
       +        hprint(hout, "disk cache writes=%,ld\n", stats.dirtydblocks);
       +        hprint(hout, "disk cache writes absorbed=%,ld %d%%\n", stats.absorbedwrites,
       +                percent(stats.absorbedwrites, stats.dirtydblocks));
       +
       +fprint(2, "back to old stats\n");
        
                hprint(hout, "disk writes=%,ld\n", stats.diskwrites);
                hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites);
                hprint(hout, "disk reads=%,ld\n", stats.diskreads);
                hprint(hout, "disk bytes read=%,lld\n", stats.diskbreads);
        
       +fprint(2, "hflush stats\n");
                hflush(hout);
       +fprint(2, "done with stats\n");
                return 0;
        }
        
       t@@ -288,7 +305,7 @@ sindex(HConnect *c)
                int i, r, active;
        
                r = preqtext(c);
       -        if(r <= 0)
       +        if(r < 0)
                        return r;
                hout = &c->hout;
        
       t@@ -348,7 +365,7 @@ dindex(HConnect *c)
                int i, r;
        
                r = preqtext(c);
       -        if(r <= 0)
       +        if(r < 0)
                        return r;
                hout = &c->hout;
        
       t@@ -376,7 +393,7 @@ xindex(HConnect *c)
                int r;
        
                r = preq(c);
       -        if(r <= 0)
       +        if(r < 0)
                        return r;
        
                hout = &c->hout;
   DIR diff --git a/src/cmd/venti/index.c b/src/cmd/venti/index.c
       t@@ -624,12 +624,14 @@ storeientry(Index *ix, IEntry *ie)
                        h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
                        if(h & 1){
                                h ^= 1;
       +                        dirtydblock(b, DirtyIndex);
                                packientry(ie, &ib.data[h]);
                                ok = writebucket(is, buck, &ib, b);
                                break;
                        }
        
                        if(ib.n < is->buckmax){
       +                        dirtydblock(b, DirtyIndex);
                                memmove(&ib.data[h + IEntrySize], &ib.data[h], ib.n * IEntrySize - h);
                                ib.n++;
        
       t@@ -648,14 +650,19 @@ storeientry(Index *ix, IEntry *ie)
        static int
        writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b)
        {
       -        if(buck >= is->blocks)
       +        assert(b->dirty == DirtyIndex);
       +
       +        if(buck >= is->blocks){
                        seterr(EAdmin, "index write out of bounds: %d >= %d\n",
                                        buck, is->blocks);
       +                return -1;
       +        }
                qlock(&stats.lock);
                stats.indexwrites++;
                qunlock(&stats.lock);
                packibucket(ib, b->data);
       -        return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize);
       +        // return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize);
       +        return 0;
        }
        
        /*
   DIR diff --git a/src/cmd/venti/lumpqueue.c b/src/cmd/venti/lumpqueue.c
       t@@ -97,7 +97,7 @@ queuewrite(Lump *u, Packet *p, int creator)
        }
        
        void
       -queueflush(void)
       +flushqueue(void)
        {
                int i;
                LumpQueue *q;
   DIR diff --git a/src/cmd/venti/mkfile b/src/cmd/venti/mkfile
       t@@ -32,22 +32,15 @@ LIBOFILES=\
        
        SLIB=libvs.a
        
       -LIB=$SLIB\
       -        $PLAN9/lib/libventi.a\
       -        $PLAN9/lib/libhttpd.a\
       -        $PLAN9/lib/libbin.a\
       -        $PLAN9/lib/libsec.a\
       -        $PLAN9/lib/libthread.a\
       -        $PLAN9/lib/lib9.a\
       -        $PLAN9/lib/libfmt.a\
       -        $PLAN9/lib/libutf.a\
       +LIB=$SLIB
       +SHORTLIB=venti httpd bin sec thread 9
        
        HFILES=        dat.h\
                fns.h\
                stdinc.h\
        
        TARG=\
       -#        venti\
       +        venti\
                fmtarenas\
                fmtisect\
                fmtindex\
       t@@ -64,6 +57,7 @@ TARG=\
                read\
                write\
                copy\
       +        wb\
        
        BIN=$BIN/venti
        
       t@@ -82,4 +76,3 @@ ainstall:V: ${TARG:%=%.ainstall}
        %.ainstall:V:        $O.%
                scp $prereq amsterdam:/usr/local/bin/venti/$stem
        
       -LDFLAGS=$LDFLAGS -l9
   DIR diff --git a/src/cmd/venti/part.c b/src/cmd/venti/part.c
       t@@ -12,7 +12,7 @@ initpart(char *name, int writable)
                Dir *dir;
                int how;
        
       -        part = MK(Part);
       +        part = MKZ(Part);
                part->name = estrdup(name);
                if(!writable && readonly)
                        how = OREAD;
   DIR diff --git a/src/cmd/venti/stats.c b/src/cmd/venti/stats.c
       t@@ -57,8 +57,13 @@ printstats(void)
                fprint(2, "disk cache reads=%,ld\n", stats.pcreads);
                fprint(2, "disk cache bytes read=%,lld\n", stats.pcbreads);
        
       +        fprint(2, "disk cache writes=%,ld\n", stats.dirtydblocks);
       +        fprint(2, "disk cache writes absorbed=%,ld %d%%\n", stats.absorbedwrites,
       +                percent(stats.absorbedwrites, stats.dirtydblocks));
       +
                fprint(2, "disk writes=%,ld\n", stats.diskwrites);
                fprint(2, "disk bytes written=%,lld\n", stats.diskbwrites);
                fprint(2, "disk reads=%,ld\n", stats.diskreads);
                fprint(2, "disk bytes read=%,lld\n", stats.diskbreads);
       +
        }
   DIR diff --git a/src/cmd/venti/venti.c b/src/cmd/venti/venti.c
       t@@ -139,6 +139,7 @@ ventiserver(char *addr)
                Packet *p;
                VtReq *r;
                VtSrv *s;
       +        char err[ERRMAX];
        
                s = vtlisten(addr);
                if(s == nil)
       t@@ -153,17 +154,22 @@ ventiserver(char *addr)
                                vtrerror(r, "unknown request");
                                break;
                        case VtTread:
       -                        if((r->rx.data = readlump(r->tx.score, r->tx.dtype, r->tx.count)) == nil)
       -                                vtrerror(r, gerrstr());
       +                        if((r->rx.data = readlump(r->tx.score, r->tx.dtype, r->tx.count)) == nil){
       +                                rerrstr(err, sizeof err);
       +                                vtrerror(r, err);
       +                        }
                                break;
                        case VtTwrite:
                                p = r->tx.data;
                                r->tx.data = nil;
       -                        if(writelump(p, r->rx.score, r->tx.dtype, 0) < 0)        
       -                                vtrerror(r, gerrstr());
       +                        if(writelump(p, r->rx.score, r->tx.dtype, 0) < 0){
       +                                rerrstr(err, sizeof err);
       +                                vtrerror(r, err);
       +                        }
                                break;
                        case VtTsync:
       -                        queueflush();
       +                        flushqueue();
       +                        flushdcache();
                                break;
                        }
                        vtrespond(r);