URI: 
       tbuildindex.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       tbuildindex.c (22561B)
       ---
            1 /*
            2  * Rebuild the index from scratch, in place.
            3  */
            4 #include "stdinc.h"
            5 #include "dat.h"
            6 #include "fns.h"
            7 
            8 enum
            9 {
           10         MinBufSize = 64*1024,
           11         MaxBufSize = 4*1024*1024,
           12 };
           13 
           14 typedef struct IEntryBuf IEntryBuf;
           15 struct IEntryBuf
           16 {
           17         IEntry ie[100];
           18         int nie;
           19 };
           20 
           21 typedef struct ScoreBuf ScoreBuf;
           22 struct ScoreBuf
           23 {
           24         uchar score[100][VtScoreSize];
           25         int nscore;
           26 };
           27 
           28 int                dumb;
           29 int                errors;
           30 char                **isect;
           31 int                nisect;
           32 int                bloom;
           33 int                zero;
           34 
           35 u32int        isectmem;
           36 u64int        totalbuckets;
           37 u64int        totalclumps;
           38 Channel        *arenadonechan;
           39 Channel        *isectdonechan;
           40 Index        *ix;
           41 
           42 u64int        arenaentries;
           43 u64int        skipentries;
           44 u64int        indexentries;
           45 
           46 static int shouldprocess(ISect*);
           47 static void        isectproc(void*);
           48 static void        arenapartproc(void*);
           49 
           50 void
           51 usage(void)
           52 {
           53         fprint(2, "usage: buildindex [-bd] [-i isect]... [-M imem] venti.conf\n");
           54         threadexitsall("usage");
           55 }
           56 
           57 void
           58 threadmain(int argc, char *argv[])
           59 {
           60         int fd, i, napart, nfinish, maxdisks;
           61         u32int bcmem, imem;
           62         Config conf;
           63         Part *p;
           64 
           65         maxdisks = 100000;
           66         ventifmtinstall();
           67         imem = 256*1024*1024;
           68         ARGBEGIN{
           69         case 'b':
           70                 bloom = 1;
           71                 break;
           72         case 'd':        /* debugging - make sure to run all 3 passes */
           73                 dumb = 1;
           74                 break;
           75         case 'i':
           76                 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
           77                 isect[nisect++] = EARGF(usage());
           78                 break;
           79         case 'M':
           80                 imem = unittoull(EARGF(usage()));
           81                 break;
           82         case 'm':        /* temporary - might go away */
           83                 maxdisks = atoi(EARGF(usage()));
           84                 break;
           85         default:
           86                 usage();
           87                 break;
           88         }ARGEND
           89 
           90         if(argc != 1)
           91                 usage();
           92 
           93         if(initventi(argv[0], &conf) < 0)
           94                 sysfatal("can't init venti: %r");
           95         ix = mainindex;
           96         if(nisect == 0 && ix->bloom)
           97                 bloom = 1;
           98         if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
           99                 sysfatal("loadbloom: %r");
          100         if(bloom && !ix->bloom)
          101                 sysfatal("-b specified but no bloom filter");
          102         if(!bloom)
          103                 ix->bloom = nil;
          104         isectmem = imem/ix->nsects;
          105 
          106         /*
          107          * safety first - only need read access to arenas
          108          */
          109         p = nil;
          110         for(i=0; i<ix->narenas; i++){
          111                 if(ix->arenas[i]->part != p){
          112                         p = ix->arenas[i]->part;
          113                         if((fd = open(p->filename, OREAD)) < 0)
          114                                 sysfatal("cannot reopen %s: %r", p->filename);
          115                         dup(fd, p->fd);
          116                         close(fd);
          117                 }
          118         }
          119 
          120         /*
          121          * need a block for every arena
          122          */
          123         bcmem = maxblocksize * (mainindex->narenas + 16);
          124         if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
          125         initdcache(bcmem);
          126 
          127         totalclumps = 0;
          128         for(i=0; i<ix->narenas; i++)
          129                 totalclumps += ix->arenas[i]->diskstats.clumps;
          130 
          131         totalbuckets = 0;
          132         for(i=0; i<ix->nsects; i++)
          133                 totalbuckets += ix->sects[i]->blocks;
          134         fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
          135 
          136         /* start index procs */
          137         fprint(2, "%T read index\n");
          138         isectdonechan = chancreate(sizeof(void*), 1);
          139         for(i=0; i<ix->nsects; i++){
          140                 if(shouldprocess(ix->sects[i])){
          141                         ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
          142                         vtproc(isectproc, ix->sects[i]);
          143                 }
          144         }
          145 
          146         for(i=0; i<nisect; i++)
          147                 if(isect[i])
          148                         fprint(2, "warning: did not find index section %s\n", isect[i]);
          149 
          150         /* start arena procs */
          151         p = nil;
          152         napart = 0;
          153         nfinish = 0;
          154         arenadonechan = chancreate(sizeof(void*), 0);
          155         for(i=0; i<ix->narenas; i++){
          156                 if(ix->arenas[i]->part != p){
          157                         p = ix->arenas[i]->part;
          158                         vtproc(arenapartproc, p);
          159                         if(++napart >= maxdisks){
          160                                 recvp(arenadonechan);
          161                                 nfinish++;
          162                         }
          163                 }
          164         }
          165 
          166         /* wait for arena procs to finish */
          167         for(; nfinish<napart; nfinish++)
          168                 recvp(arenadonechan);
          169 
          170         /* tell index procs to finish */
          171         for(i=0; i<ix->nsects; i++)
          172                 if(ix->sects[i]->writechan)
          173                         send(ix->sects[i]->writechan, nil);
          174 
          175         /* wait for index procs to finish */
          176         for(i=0; i<ix->nsects; i++)
          177                 if(ix->sects[i]->writechan)
          178                         recvp(isectdonechan);
          179 
          180         if(ix->bloom && writebloom(ix->bloom) < 0)
          181                 fprint(2, "writing bloom filter: %r\n");
          182 
          183         fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
          184                 arenaentries, indexentries, skipentries);
          185         threadexitsall(nil);
          186 }
          187 
          188 static int
          189 shouldprocess(ISect *is)
          190 {
          191         int i;
          192 
          193         if(nisect == 0)
          194                 return 1;
          195 
          196         for(i=0; i<nisect; i++)
          197                 if(isect[i] && strcmp(isect[i], is->name) == 0){
          198                         isect[i] = nil;
          199                         return 1;
          200                 }
          201         return 0;
          202 }
          203 
          204 static void
          205 add(u64int *a, u64int n)
          206 {
          207         static Lock l;
          208 
          209         lock(&l);
          210         *a += n;
          211         unlock(&l);
          212 }
          213 
          214 /*
          215  * Read through an arena partition and send each of its IEntries
          216  * to the appropriate index section.  When finished, send on
          217  * arenadonechan.
          218  */
          219 enum
          220 {
          221         ClumpChunks = 32*1024,
          222 };
          223 static void
          224 arenapartproc(void *v)
          225 {
          226         int i, j, n, nskip, x;
          227         u32int clump;
          228         u64int addr, tot;
          229         Arena *a;
          230         ClumpInfo *ci, *cis;
          231         IEntry ie;
          232         Part *p;
          233         IEntryBuf *buf, *b;
          234         uchar *score;
          235         ScoreBuf sb;
          236 
          237         p = v;
          238         threadsetname("arenaproc %s", p->name);
          239         buf = MKNZ(IEntryBuf, ix->nsects);
          240 
          241         nskip = 0;
          242         tot = 0;
          243         sb.nscore = 0;
          244         cis = MKN(ClumpInfo, ClumpChunks);
          245         for(i=0; i<ix->narenas; i++){
          246                 a = ix->arenas[i];
          247                 if(a->part != p)
          248                         continue;
          249                 if(a->memstats.clumps)
          250                         fprint(2, "%T arena %s: %d entries\n",
          251                                 a->name, a->memstats.clumps);
          252                 /*
          253                  * Running the loop backwards accesses the
          254                  * clump info blocks forwards, since they are
          255                  * stored in reverse order at the end of the arena.
          256                  * This speeds things slightly.
          257                  */
          258                 addr = ix->amap[i].start + a->memstats.used;
          259                 for(clump=a->memstats.clumps; clump > 0; clump-=n){
          260                         n = ClumpChunks;
          261                         if(n > clump)
          262                                 n = clump;
          263                         if(readclumpinfos(a, clump-n, cis, n) != n){
          264                                 fprint(2, "%T arena %s: directory read: %r\n", a->name);
          265                                 errors = 1;
          266                                 break;
          267                         }
          268                         for(j=n-1; j>=0; j--){
          269                                 ci = &cis[j];
          270                                 ie.ia.type = ci->type;
          271                                 ie.ia.size = ci->uncsize;
          272                                 addr -= ci->size + ClumpSize;
          273                                 ie.ia.addr = addr;
          274                                 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
          275                                 scorecp(ie.score, ci->score);
          276                                 if(ci->type == VtCorruptType)
          277                                         nskip++;
          278                                 else{
          279                                         tot++;
          280                                         x = indexsect(ix, ie.score);
          281                                         assert(0 <= x && x < ix->nsects);
          282                                         if(ix->sects[x]->writechan) {
          283                                                 b = &buf[x];
          284                                                 b->ie[b->nie] = ie;
          285                                                 b->nie++;
          286                                                 if(b->nie == nelem(b->ie)) {
          287                                                         send(ix->sects[x]->writechan, b);
          288                                                         b->nie = 0;
          289                                                 }
          290                                         }
          291                                         if(ix->bloom) {
          292                                                 score = sb.score[sb.nscore++];
          293                                                 scorecp(score, ie.score);
          294                                                 if(sb.nscore == nelem(sb.score)) {
          295                                                         markbloomfiltern(ix->bloom, sb.score, sb.nscore);
          296                                                         sb.nscore = 0;
          297                                                 }
          298                                         }
          299                                 }
          300                         }
          301                 }
          302                 if(addr != ix->amap[i].start)
          303                         fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
          304         }
          305         add(&arenaentries, tot);
          306         add(&skipentries, nskip);
          307 
          308         for(i=0; i<ix->nsects; i++)
          309                 if(ix->sects[i]->writechan && buf[i].nie > 0)
          310                         send(ix->sects[i]->writechan, &buf[i]);
          311         free(buf);
          312         free(cis);
          313         if(ix->bloom && sb.nscore > 0)
          314                 markbloomfiltern(ix->bloom, sb.score, sb.nscore);
          315         sendp(arenadonechan, p);
          316 }
          317 
          318 /*
          319  * Convert score into relative bucket number in isect.
          320  * Can pass a packed ientry instead of score - score is first.
          321  */
          322 static u32int
          323 score2bucket(ISect *is, uchar *score)
          324 {
          325         u32int b;
          326 
          327         b = hashbits(score, 32)/ix->div;
          328         if(b < is->start || b >= is->stop){
          329                 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
          330                         score, ix->div, b, is->start, is->stop);
          331         }
          332         assert(is->start <= b && b < is->stop);
          333         return b - is->start;
          334 }
          335 
          336 /*
          337  * Convert offset in index section to bucket number.
          338  */
          339 static u32int
          340 offset2bucket(ISect *is, u64int offset)
          341 {
          342         u32int b;
          343 
          344         assert(is->blockbase <= offset);
          345         offset -= is->blockbase;
          346         b = offset/is->blocksize;
          347         assert(b < is->stop-is->start);
          348         return b;
          349 }
          350 
          351 /*
          352  * Convert bucket number to offset.
          353  */
          354 static u64int
          355 bucket2offset(ISect *is, u32int b)
          356 {
          357         assert(b <= is->stop-is->start);
          358         return is->blockbase + (u64int)b*is->blocksize;
          359 }
          360 
          361 /*
          362  * IEntry buffers to hold initial round of spraying.
          363  */
          364 typedef struct Buf Buf;
          365 struct Buf
          366 {
          367         Part *part;                        /* partition being written */
          368         uchar *bp;                /* current block */
          369         uchar *ep;                /* end of block */
          370         uchar *wp;                /* write position in block */
          371         u64int boffset;                /* start offset */
          372         u64int woffset;                /* next write offset */
          373         u64int eoffset;                /* end offset */
          374         u32int nentry;                /* number of entries written */
          375 };
          376 
          377 static void
          378 bflush(Buf *buf)
          379 {
          380         u32int bufsize;
          381 
          382         if(buf->woffset >= buf->eoffset)
          383                 sysfatal("buf index chunk overflow - need bigger index");
          384         bufsize = buf->ep - buf->bp;
          385         if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
          386                 fprint(2, "write %s: %r\n", buf->part->name);
          387                 errors = 1;
          388         }
          389         buf->woffset += bufsize;
          390         memset(buf->bp, 0, bufsize);
          391         buf->wp = buf->bp;
          392 }
          393 
          394 static void
          395 bwrite(Buf *buf, IEntry *ie)
          396 {
          397         if(buf->wp+IEntrySize > buf->ep)
          398                 bflush(buf);
          399         assert(buf->bp <= buf->wp && buf->wp < buf->ep);
          400         packientry(ie, buf->wp);
          401         buf->wp += IEntrySize;
          402         assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
          403         buf->nentry++;
          404 }
          405 
          406 /*
          407  * Minibuffer.  In-memory data structure holds our place
          408  * in the buffer but has no block data.  We are writing and
          409  * reading the minibuffers at the same time.  (Careful!)
          410  */
          411 typedef struct Minibuf Minibuf;
          412 struct Minibuf
          413 {
          414         u64int boffset;                /* start offset */
          415         u64int roffset;                /* read offset */
          416         u64int woffset;                /* write offset */
          417         u64int eoffset;                /* end offset */
          418         u32int nentry;                /* # entries left to read */
          419         u32int nwentry;        /* # entries written */
          420 };
          421 
          422 /*
          423  * Index entry pool.  Used when trying to shuffle around
          424  * the entries in a big buffer into the corresponding M minibuffers.
          425  * Sized to hold M*EntriesPerBlock entries, so that there will always
          426  * either be room in the pool for another block worth of entries
          427  * or there will be an entire block worth of sorted entries to
          428  * write out.
          429  */
          430 typedef struct IEntryLink IEntryLink;
          431 typedef struct IPool IPool;
          432 
          433 struct IEntryLink
          434 {
          435         uchar ie[IEntrySize];                /* raw IEntry */
          436         IEntryLink *next;                /* next in chain */
          437 };
          438 
          439 struct IPool
          440 {
          441         ISect *isect;
          442         u32int buck0;                        /* first bucket in pool */
          443         u32int mbufbuckets;        /* buckets per minibuf */
          444         IEntryLink *entry;                /* all IEntryLinks */
          445         u32int nentry;                        /* # of IEntryLinks */
          446         IEntryLink *free;                /* free list */
          447         u32int nfree;                        /* # on free list */
          448         Minibuf *mbuf;                        /* all minibufs */
          449         u32int nmbuf;                        /* # of minibufs */
          450         IEntryLink **mlist;                /* lists for each minibuf */
          451         u32int *mcount;                /* # on each mlist[i] */
          452         u32int bufsize;                        /* block buffer size */
          453         uchar *rbuf;                        /* read buffer */
          454         uchar *wbuf;                        /* write buffer */
          455         u32int epbuf;                        /* entries per block buffer */
          456 };
          457 
          458 /*
          459 static int
          460 countsokay(IPool *p)
          461 {
          462         int i;
          463         u64int n;
          464 
          465         n = 0;
          466         for(i=0; i<p->nmbuf; i++)
          467                 n += p->mcount[i];
          468         n += p->nfree;
          469         if(n != p->nentry){
          470                 print("free %ud:", p->nfree);
          471                 for(i=0; i<p->nmbuf; i++)
          472                         print(" %ud", p->mcount[i]);
          473                 print(" = %lld nentry: %ud\n", n, p->nentry);
          474         }
          475         return n == p->nentry;
          476 }
          477 */
          478 
          479 static IPool*
          480 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
          481         u32int mbufbuckets, u32int bufsize)
          482 {
          483         u32int i, nentry;
          484         uchar *data;
          485         IPool *p;
          486         IEntryLink *l;
          487 
          488         nentry = (nmbuf+1)*bufsize / IEntrySize;
          489         p = ezmalloc(sizeof(IPool)
          490                 +nentry*sizeof(IEntry)
          491                 +nmbuf*sizeof(IEntryLink*)
          492                 +nmbuf*sizeof(u32int)
          493                 +3*bufsize);
          494 
          495         p->isect = isect;
          496         p->mbufbuckets = mbufbuckets;
          497         p->bufsize = bufsize;
          498         p->entry = (IEntryLink*)(p+1);
          499         p->nentry = nentry;
          500         p->mlist = (IEntryLink**)(p->entry+nentry);
          501         p->mcount = (u32int*)(p->mlist+nmbuf);
          502         p->nmbuf = nmbuf;
          503         p->mbuf = mbuf;
          504         data = (uchar*)(p->mcount+nmbuf);
          505         data += bufsize - (uintptr)data%bufsize;
          506         p->rbuf = data;
          507         p->wbuf = data+bufsize;
          508         p->epbuf = bufsize/IEntrySize;
          509 
          510         for(i=0; i<p->nentry; i++){
          511                 l = &p->entry[i];
          512                 l->next = p->free;
          513                 p->free = l;
          514                 p->nfree++;
          515         }
          516         return p;
          517 }
          518 
          519 /*
          520  * Add the index entry ie to the pool p.
          521  * Caller must know there is room.
          522  */
          523 static void
          524 ipoolinsert(IPool *p, uchar *ie)
          525 {
          526         u32int buck, x;
          527         IEntryLink *l;
          528 
          529         assert(p->free != nil);
          530 
          531         buck = score2bucket(p->isect, ie);
          532         x = (buck-p->buck0) / p->mbufbuckets;
          533         if(x >= p->nmbuf){
          534                 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
          535                         buck, p->mbufbuckets, x);
          536         }
          537         assert(x < p->nmbuf);
          538 
          539         l = p->free;
          540         p->free = l->next;
          541         p->nfree--;
          542         memmove(l->ie, ie, IEntrySize);
          543         l->next = p->mlist[x];
          544         p->mlist[x] = l;
          545         p->mcount[x]++;
          546 }
          547 
          548 /*
          549  * Pull out a block containing as many
          550  * entries as possible for minibuffer x.
          551  */
          552 static u32int
          553 ipoolgetbuf(IPool *p, u32int x)
          554 {
          555         uchar *bp, *ep, *wp;
          556         IEntryLink *l;
          557         u32int n;
          558 
          559         bp = p->wbuf;
          560         ep = p->wbuf + p->bufsize;
          561         n = 0;
          562         assert(x < p->nmbuf);
          563         for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
          564                 l = p->mlist[x];
          565                 p->mlist[x] = l->next;
          566                 p->mcount[x]--;
          567                 memmove(wp, l->ie, IEntrySize);
          568                 l->next = p->free;
          569                 p->free = l;
          570                 p->nfree++;
          571                 n++;
          572         }
          573         memset(wp, 0, ep-wp);
          574         return n;
          575 }
          576 
          577 /*
          578  * Read a block worth of entries from the minibuf
          579  * into the pool.  Caller must know there is room.
          580  */
          581 static void
          582 ipoolloadblock(IPool *p, Minibuf *mb)
          583 {
          584         u32int i, n;
          585 
          586         assert(mb->nentry > 0);
          587         assert(mb->roffset >= mb->woffset);
          588         assert(mb->roffset < mb->eoffset);
          589 
          590         n = p->bufsize/IEntrySize;
          591         if(n > mb->nentry)
          592                 n = mb->nentry;
          593         if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
          594                 fprint(2, "readpart %s: %r\n", p->isect->part->name);
          595         else{
          596                 for(i=0; i<n; i++)
          597                         ipoolinsert(p, p->rbuf+i*IEntrySize);
          598         }
          599         mb->nentry -= n;
          600         mb->roffset += p->bufsize;
          601 }
          602 
          603 /*
          604  * Write out a block worth of entries to minibuffer x.
          605  * If necessary, pick up the data there before overwriting it.
          606  */
          607 static void
          608 ipoolflush0(IPool *pool, u32int x)
          609 {
          610         u32int bufsize;
          611         Minibuf *mb;
          612 
          613         mb = pool->mbuf+x;
          614         bufsize = pool->bufsize;
          615         mb->nwentry += ipoolgetbuf(pool, x);
          616         if(mb->nentry > 0 && mb->roffset == mb->woffset){
          617                 assert(pool->nfree >= pool->bufsize/IEntrySize);
          618                 /*
          619                  * There will be room in the pool -- we just
          620                  * removed a block worth.
          621                  */
          622                 ipoolloadblock(pool, mb);
          623         }
          624         if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
          625                 fprint(2, "writepart %s: %r\n", pool->isect->part->name);
          626         mb->woffset += bufsize;
          627 }
          628 
          629 /*
          630  * Write out some full block of entries.
          631  * (There must be one -- the pool is almost full!)
          632  */
          633 static void
          634 ipoolflush1(IPool *pool)
          635 {
          636         u32int i;
          637 
          638         assert(pool->nfree <= pool->epbuf);
          639 
          640         for(i=0; i<pool->nmbuf; i++){
          641                 if(pool->mcount[i] >= pool->epbuf){
          642                         ipoolflush0(pool, i);
          643                         return;
          644                 }
          645         }
          646         /* can't be reached - someone must be full */
          647         sysfatal("ipoolflush1");
          648 }
          649 
          650 /*
          651  * Flush all the entries in the pool out to disk.
          652  * Nothing more to read from disk.
          653  */
          654 static void
          655 ipoolflush(IPool *pool)
          656 {
          657         u32int i;
          658 
          659         for(i=0; i<pool->nmbuf; i++)
          660                 while(pool->mlist[i])
          661                         ipoolflush0(pool, i);
          662         assert(pool->nfree == pool->nentry);
          663 }
          664 
          665 /*
          666  * Third pass.  Pick up each minibuffer from disk into
          667  * memory and then write out the buckets.
          668  */
          669 
          670 /*
          671  * Compare two packed index entries.
          672  * Usual ordering except break ties by putting higher
          673  * index addresses first (assumes have duplicates
          674  * due to corruption in the lower addresses).
          675  */
          676 static int
          677 ientrycmpaddr(const void *va, const void *vb)
          678 {
          679         int i;
          680         uchar *a, *b;
          681 
          682         a = (uchar*)va;
          683         b = (uchar*)vb;
          684         i = ientrycmp(a, b);
          685         if(i)
          686                 return i;
          687         return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
          688 }
          689 
          690 static void
          691 zerorange(Part *p, u64int o, u64int e)
          692 {
          693         static uchar zero[MaxIoSize];
          694         u32int n;
          695 
          696         for(; o<e; o+=n){
          697                 n = sizeof zero;
          698                 if(o+n > e)
          699                         n = e-o;
          700                 if(writepart(p, o, zero, n) < 0)
          701                         fprint(2, "writepart %s: %r\n", p->name);
          702         }
          703 }
          704 
          705 /*
          706  * Load a minibuffer into memory and write out the
          707  * corresponding buckets.
          708  */
          709 static void
          710 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
          711 {
          712         uchar *buckdata, *p, *q, *ep;
          713         u32int b, lastb, memsize, n;
          714         u64int o;
          715         IBucket ib;
          716         Part *part;
          717 
          718         part = is->part;
          719         buckdata = emalloc(is->blocksize);
          720 
          721         if(mb->nwentry == 0)
          722                 return;
          723 
          724         /*
          725          * read entire buffer.
          726          */
          727         assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
          728         assert(mb->woffset-mb->boffset <= nbuf);
          729         if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
          730                 fprint(2, "readpart %s: %r\n", part->name);
          731                 errors = 1;
          732                 return;
          733         }
          734         assert(*(uint*)buf != 0xa5a5a5a5);
          735 
          736         /*
          737          * remove fragmentation due to IEntrySize
          738          * not evenly dividing Bufsize
          739          */
          740         memsize = (bufsize/IEntrySize)*IEntrySize;
          741         for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
          742                 memmove(p, q, memsize);
          743                 p += memsize;
          744                 q += bufsize;
          745         }
          746         ep = buf + mb->nwentry*IEntrySize;
          747         assert(ep <= buf+nbuf);
          748 
          749         /*
          750          * sort entries
          751          */
          752         qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
          753 
          754         /*
          755          * write buckets out
          756          */
          757         n = 0;
          758         lastb = offset2bucket(is, mb->boffset);
          759         for(p=buf; p<ep; p=q){
          760                 b = score2bucket(is, p);
          761                 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
          762                         ;
          763                 if(lastb+1 < b && zero)
          764                         zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
          765                 if(IBucketSize+(q-p) > is->blocksize)
          766                         sysfatal("bucket overflow - make index bigger");
          767                 memmove(buckdata+IBucketSize, p, q-p);
          768                 ib.n = (q-p)/IEntrySize;
          769                 n += ib.n;
          770                 packibucket(&ib, buckdata, is->bucketmagic);
          771                 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
          772                         fprint(2, "write %s: %r\n", part->name);
          773                 lastb = b;
          774         }
          775         if(lastb+1 < is->stop-is->start && zero)
          776                 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
          777 
          778         if(n != mb->nwentry)
          779                 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
          780 
          781         free(buckdata);
          782 }
          783 
          784 static void
          785 isectproc(void *v)
          786 {
          787         u32int buck, bufbuckets, bufsize, epbuf, i, j;
          788         u32int mbufbuckets, n, nbucket, nn, space;
          789         u32int nbuf, nminibuf, xminiclump, prod;
          790         u64int blocksize, offset, xclump;
          791         uchar *data, *p;
          792         Buf *buf;
          793         IEntry ie;
          794         IEntryBuf ieb;
          795         IPool *ipool;
          796         ISect *is;
          797         Minibuf *mbuf, *mb;
          798 
          799         is = v;
          800         blocksize = is->blocksize;
          801         nbucket = is->stop - is->start;
          802 
          803         /*
          804          * Three passes:
          805          *        pass 1 - write index entries from arenas into
          806          *                large sequential sections on index disk.
          807          *                requires nbuf * bufsize memory.
          808          *
          809          *        pass 2 - split each section into minibufs.
          810          *                requires nminibuf * bufsize memory.
          811          *
          812          *        pass 3 - read each minibuf into memory and
          813          *                write buckets out.
          814          *                requires entries/minibuf * IEntrySize memory.
          815          *
          816          * The larger we set bufsize the less seeking hurts us.
          817          *
          818          * The fewer sections and minibufs we have, the less
          819          * seeking hurts us.
          820          *
          821          * The fewer sections and minibufs we have, the
          822          * more entries we end up with in each minibuf
          823          * at the end.
          824          *
          825          * Shoot for using half our memory to hold each
          826          * minibuf.  The chance of a random distribution
          827          * getting off by 2x is quite low.
          828          *
          829          * Once that is decided, figure out the smallest
          830          * nminibuf and nsection/biggest bufsize we can use
          831          * and still fit in the memory constraints.
          832          */
          833 
          834         /* expected number of clump index entries we'll see */
          835         xclump = nbucket * (double)totalclumps/totalbuckets;
          836 
          837         /* number of clumps we want to see in a minibuf */
          838         xminiclump = isectmem/2/IEntrySize;
          839 
          840         /* total number of minibufs we need */
          841         prod = (xclump+xminiclump-1) / xminiclump;
          842 
          843         /* if possible, skip second pass */
          844         if(!dumb && prod*MinBufSize < isectmem){
          845                 nbuf = prod;
          846                 nminibuf = 1;
          847         }else{
          848                 /* otherwise use nsection = sqrt(nmini) */
          849                 for(nbuf=1; nbuf*nbuf<prod; nbuf++)
          850                         ;
          851                 if(nbuf*MinBufSize > isectmem)
          852                         sysfatal("not enough memory");
          853                 nminibuf = nbuf;
          854         }
          855         if (nbuf == 0) {
          856                 fprint(2, "%s: brand-new index, no work to do\n", argv0);
          857                 threadexitsall(nil);
          858         }
          859 
          860         /* size buffer to use extra memory */
          861         bufsize = MinBufSize;
          862         while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
          863                 bufsize *= 2;
          864         data = emalloc(nbuf*bufsize);
          865         epbuf = bufsize/IEntrySize;
          866         fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
          867                 is->part->name, nbucket, nbuf, nminibuf, bufsize);
          868         /*
          869          * Accept index entries from arena procs.
          870          */
          871         buf = MKNZ(Buf, nbuf);
          872         p = data;
          873         offset = is->blockbase;
          874         bufbuckets = (nbucket+nbuf-1)/nbuf;
          875         for(i=0; i<nbuf; i++){
          876                 buf[i].part = is->part;
          877                 buf[i].bp = p;
          878                 buf[i].wp = p;
          879                 p += bufsize;
          880                 buf[i].ep = p;
          881                 buf[i].boffset = offset;
          882                 buf[i].woffset = offset;
          883                 if(i < nbuf-1){
          884                         offset += bufbuckets*blocksize;
          885                         buf[i].eoffset = offset;
          886                 }else{
          887                         offset = is->blockbase + nbucket*blocksize;
          888                         buf[i].eoffset = offset;
          889                 }
          890         }
          891         assert(p == data+nbuf*bufsize);
          892 
          893         n = 0;
          894         while(recv(is->writechan, &ieb) == 1){
          895                 if(ieb.nie == 0)
          896                         break;
          897                 for(j=0; j<ieb.nie; j++){
          898                         ie = ieb.ie[j];
          899                         buck = score2bucket(is, ie.score);
          900                         i = buck/bufbuckets;
          901                         assert(i < nbuf);
          902                         bwrite(&buf[i], &ie);
          903                         n++;
          904                 }
          905         }
          906         add(&indexentries, n);
          907 
          908         nn = 0;
          909         for(i=0; i<nbuf; i++){
          910                 bflush(&buf[i]);
          911                 buf[i].bp = nil;
          912                 buf[i].ep = nil;
          913                 buf[i].wp = nil;
          914                 nn += buf[i].nentry;
          915         }
          916         if(n != nn)
          917                 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
          918 
          919         free(data);
          920 
          921         fprint(2, "%T %s: reordering\n", is->part->name);
          922 
          923         /*
          924          * Rearrange entries into minibuffers and then
          925          * split each minibuffer into buckets.
          926          * The minibuffer must be sized so that it is
          927          * a multiple of blocksize -- ipoolloadblock assumes
          928          * that each minibuf starts aligned on a blocksize
          929          * boundary.
          930          */
          931         mbuf = MKN(Minibuf, nminibuf);
          932         mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
          933         while(mbufbuckets*blocksize % bufsize)
          934                 mbufbuckets++;
          935         for(i=0; i<nbuf; i++){
          936                 /*
          937                  * Set up descriptors.
          938                  */
          939                 n = buf[i].nentry;
          940                 nn = 0;
          941                 offset = buf[i].boffset;
          942                 memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
          943                 for(j=0; j<nminibuf; j++){
          944                         mb = &mbuf[j];
          945                         mb->boffset = offset;
          946                         offset += mbufbuckets*blocksize;
          947                         if(offset > buf[i].eoffset)
          948                                 offset = buf[i].eoffset;
          949                         mb->eoffset = offset;
          950                         mb->roffset = mb->boffset;
          951                         mb->woffset = mb->boffset;
          952                         mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
          953                         if(mb->nentry > buf[i].nentry)
          954                                 mb->nentry = buf[i].nentry;
          955                         buf[i].nentry -= mb->nentry;
          956                         nn += mb->nentry;
          957                 }
          958                 if(n != nn)
          959                         fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
          960                 /*
          961                  * Rearrange.
          962                  */
          963                 if(!dumb && nminibuf == 1){
          964                         mbuf[0].nwentry = mbuf[0].nentry;
          965                         mbuf[0].woffset = buf[i].woffset;
          966                 }else{
          967                         ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
          968                         ipool->buck0 = bufbuckets*i;
          969                         for(j=0; j<nminibuf; j++){
          970                                 mb = &mbuf[j];
          971                                 while(mb->nentry > 0){
          972                                         if(ipool->nfree < epbuf){
          973                                                 ipoolflush1(ipool);
          974                                                 /* ipoolflush1 might change mb->nentry */
          975                                                 continue;
          976                                         }
          977                                         assert(ipool->nfree >= epbuf);
          978                                         ipoolloadblock(ipool, mb);
          979                                 }
          980                         }
          981                         ipoolflush(ipool);
          982                         nn = 0;
          983                         for(j=0; j<nminibuf; j++)
          984                                 nn += mbuf[j].nwentry;
          985                         if(n != nn)
          986                                 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
          987                         free(ipool);
          988                 }
          989 
          990                 /*
          991                  * Make buckets.
          992                  */
          993                 space = 0;
          994                 for(j=0; j<nminibuf; j++)
          995                         if(space < mbuf[j].woffset - mbuf[j].boffset)
          996                                 space = mbuf[j].woffset - mbuf[j].boffset;
          997 
          998                 data = emalloc(space);
          999                 for(j=0; j<nminibuf; j++){
         1000                         mb = &mbuf[j];
         1001                         sortminibuffer(is, mb, data, space, bufsize);
         1002                 }
         1003                 free(data);
         1004         }
         1005 
         1006         sendp(isectdonechan, is);
         1007 }