URI: 
       trunq.c - plan9port - [fork] Plan 9 from user space
  HTML git clone git://src.adamsgaard.dk/plan9port
   DIR Log
   DIR Files
   DIR Refs
   DIR README
   DIR LICENSE
       ---
       trunq.c (13234B)
       ---
            1 #include "common.h"
            2 #include <ctype.h>
            3 
            4 void        doalldirs(void);
            5 void        dodir(char*);
            6 void        dofile(Dir*);
            7 void        rundir(char*);
            8 char*        file(char*, char);
            9 void        warning(char*, void*);
           10 void        error(char*, void*);
           11 int        returnmail(char**, char*, char*);
           12 void        logit(char*, char*, char**);
           13 void        doload(int);
           14 
           15 #define HUNK 32
           16 char        *cmd;
           17 char        *root;
           18 int        debug;
           19 int        giveup = 2*24*60*60;
           20 int        load;
           21 int        limit;
           22 
           23 /* the current directory */
           24 Dir        *dirbuf;
           25 long        ndirbuf = 0;
           26 int        nfiles;
           27 char        *curdir;
           28 
           29 char *runqlog = "runq";
           30 
           31 int        *pidlist;
           32 char        **badsys;                /* array of recalcitrant systems */
           33 int        nbad;
           34 int        npid = 50;
           35 int        sflag;                        /* single thread per directory */
           36 int        aflag;                        /* all directories */
           37 int        Eflag;                        /* ignore E.xxxxxx dates */
           38 int        Rflag;                        /* no giving up, ever */
           39 
           40 void
           41 usage(void)
           42 {
           43         fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
           44         exits("");
           45 }
           46 
           47 void
           48 main(int argc, char **argv)
           49 {
           50         char *qdir, *x;
           51 
           52         qdir = 0;
           53 
           54         ARGBEGIN{
           55         case 'l':
           56                 x = ARGF();
           57                 if(x == 0)
           58                         usage();
           59                 load = atoi(x);
           60                 if(load < 0)
           61                         load = 0;
           62                 break;
           63         case 'E':
           64                 Eflag++;
           65                 break;
           66         case 'R':        /* no giving up -- just leave stuff in the queue */
           67                 Rflag++;
           68                 break;
           69         case 'a':
           70                 aflag++;
           71                 break;
           72         case 'd':
           73                 debug++;
           74                 break;
           75         case 'r':
           76                 limit = atoi(ARGF());
           77                 break;
           78         case 's':
           79                 sflag++;
           80                 break;
           81         case 't':
           82                 giveup = 60*60*atoi(ARGF());
           83                 break;
           84         case 'q':
           85                 qdir = ARGF();
           86                 if(qdir == 0)
           87                         usage();
           88                 break;
           89         case 'n':
           90                 npid = atoi(ARGF());
           91                 if(npid == 0)
           92                         usage();
           93                 break;
           94         }ARGEND;
           95 
           96         if(argc != 2)
           97                 usage();
           98 
           99         pidlist = malloc(npid*sizeof(*pidlist));
          100         if(pidlist == 0)
          101                 error("can't malloc", 0);
          102 
          103         if(aflag == 0 && qdir == 0) {
          104                 qdir = getuser();
          105                 if(qdir == 0)
          106                         error("unknown user", 0);
          107         }
          108         root = argv[0];
          109         cmd = argv[1];
          110 
          111         if(chdir(root) < 0)
          112                 error("can't cd to %s", root);
          113 
          114         doload(1);
          115         if(aflag)
          116                 doalldirs();
          117         else
          118                 dodir(qdir);
          119         doload(0);
          120         exits(0);
          121 }
          122 
          123 int
          124 emptydir(char *name)
          125 {
          126         int fd;
          127         long n;
          128         char buf[2048];
          129 
          130         fd = open(name, OREAD);
          131         if(fd < 0)
          132                 return 1;
          133         n = read(fd, buf, sizeof(buf));
          134         close(fd);
          135         if(n <= 0) {
          136                 if(debug)
          137                         fprint(2, "removing directory %s\n", name);
          138                 syslog(0, runqlog, "rmdir %s", name);
          139                 sysremove(name);
          140                 return 1;
          141         }
          142         return 0;
          143 }
          144 
          145 int
          146 forkltd(void)
          147 {
          148         int i;
          149         int pid;
          150 
          151         for(i = 0; i < npid; i++){
          152                 if(pidlist[i] <= 0)
          153                         break;
          154         }
          155 
          156         while(i >= npid){
          157                 pid = waitpid();
          158                 if(pid < 0){
          159                         syslog(0, runqlog, "forkltd confused");
          160                         exits(0);
          161                 }
          162 
          163                 for(i = 0; i < npid; i++)
          164                         if(pidlist[i] == pid)
          165                                 break;
          166         }
          167         pidlist[i] = fork();
          168         return pidlist[i];
          169 }
          170 
          171 /*
          172  *  run all user directories, must be bootes (or root on unix) to do this
          173  */
          174 void
          175 doalldirs(void)
          176 {
          177         Dir *db;
          178         int fd;
          179         long i, n;
          180 
          181 
          182         fd = open(".", OREAD);
          183         if(fd == -1){
          184                 warning("reading %s", root);
          185                 return;
          186         }
          187         n = sysdirreadall(fd, &db);
          188         if(n > 0){
          189                 for(i=0; i<n; i++){
          190                         if(db[i].qid.type & QTDIR){
          191                                 if(emptydir(db[i].name))
          192                                         continue;
          193                                 switch(forkltd()){
          194                                 case -1:
          195                                         syslog(0, runqlog, "out of procs");
          196                                         doload(0);
          197                                         exits(0);
          198                                 case 0:
          199                                         if(sysdetach() < 0)
          200                                                 error("%r", 0);
          201                                         dodir(db[i].name);
          202                                         exits(0);
          203                                 default:
          204                                         break;
          205                                 }
          206                         }
          207                 }
          208                 free(db);
          209         }
          210         close(fd);
          211 }
          212 
          213 /*
          214  *  cd to a user directory and run it
          215  */
          216 void
          217 dodir(char *name)
          218 {
          219         curdir = name;
          220 
          221         if(chdir(name) < 0){
          222                 warning("cd to %s", name);
          223                 return;
          224         }
          225         if(debug)
          226                 fprint(2, "running %s\n", name);
          227         rundir(name);
          228         chdir("..");
          229 }
          230 
          231 /*
          232  *  run the current directory
          233  */
          234 void
          235 rundir(char *name)
          236 {
          237         int fd;
          238         long i;
          239 
          240         if(aflag && sflag)
          241                 fd = sysopenlocked(".", OREAD);
          242         else
          243                 fd = open(".", OREAD);
          244         if(fd == -1){
          245                 warning("reading %s", name);
          246                 return;
          247         }
          248         nfiles = sysdirreadall(fd, &dirbuf);
          249         if(nfiles > 0){
          250                 for(i=0; i<nfiles; i++){
          251                         if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
          252                                 continue;
          253                         dofile(&dirbuf[i]);
          254                 }
          255                 free(dirbuf);
          256         }
          257         if(aflag && sflag)
          258                 sysunlockfile(fd);
          259         else
          260                 close(fd);
          261 }
          262 
          263 /*
          264  *  free files matching name in the current directory
          265  */
          266 void
          267 remmatch(char *name)
          268 {
          269         long i;
          270 
          271         syslog(0, runqlog, "removing %s/%s", curdir, name);
          272 
          273         for(i=0; i<nfiles; i++){
          274                 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
          275                         sysremove(dirbuf[i].name);
          276         }
          277 
          278         /* error file (may have) appeared after we read the directory */
          279         /* stomp on data file in case of phase error */
          280         sysremove(file(name, 'D'));
          281         sysremove(file(name, 'E'));
          282 }
          283 
          284 /*
          285  *  like trylock, but we've already got the lock on fd,
          286  *  and don't want an L. lock file.
          287  */
          288 static Mlock *
          289 keeplockalive(char *path, int fd)
          290 {
          291         char buf[1];
          292         Mlock *l;
          293 
          294         l = malloc(sizeof(Mlock));
          295         if(l == 0)
          296                 return 0;
          297         l->fd = fd;
          298         l->name = s_new();
          299         s_append(l->name, path);
          300 
          301         /* fork process to keep lock alive until sysunlock(l) */
          302         switch(l->pid = rfork(RFPROC)){
          303         default:
          304                 break;
          305         case 0:
          306                 fd = l->fd;
          307                 for(;;){
          308                         sleep(1000*60);
          309                         if(pread(fd, buf, 1, 0) < 0)
          310                                 break;
          311                 }
          312                 _exits(0);
          313         }
          314         return l;
          315 }
          316 
          317 /*
          318  *  try a message
          319  */
          320 void
          321 dofile(Dir *dp)
          322 {
          323         Dir *d;
          324         int dfd, ac, dtime, efd, pid, i, etime;
          325         char *buf, *cp, **av;
          326         Waitmsg *wm;
          327         Biobuf *b;
          328         Mlock *l = nil;
          329 
          330         if(debug)
          331                 fprint(2, "dofile %s\n", dp->name);
          332         /*
          333          *  if no data file or empty control or data file, just clean up
          334          *  the empty control file must be 15 minutes old, to minimize the
          335          *  chance of a race.
          336          */
          337         d = dirstat(file(dp->name, 'D'));
          338         if(d == nil){
          339                 syslog(0, runqlog, "no data file for %s", dp->name);
          340                 remmatch(dp->name);
          341                 return;
          342         }
          343         if(dp->length == 0){
          344                 if(time(0)-dp->mtime > 15*60){
          345                         syslog(0, runqlog, "empty ctl file for %s", dp->name);
          346                         remmatch(dp->name);
          347                 }
          348                 return;
          349         }
          350         dtime = d->mtime;
          351         free(d);
          352 
          353         /*
          354          *  retry times depend on the age of the errors file
          355          */
          356         if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
          357                 etime = d->mtime;
          358                 free(d);
          359                 if(etime - dtime < 60*60){
          360                         /* up to the first hour, try every 15 minutes */
          361                         if(time(0) - etime < 15*60)
          362                                 return;
          363                 } else {
          364                         /* after the first hour, try once an hour */
          365                         if(time(0) - etime < 60*60)
          366                                 return;
          367                 }
          368 
          369         }
          370 
          371         /*
          372          *  open control and data
          373          */
          374         b = sysopen(file(dp->name, 'C'), "rl", 0660);
          375         if(b == 0) {
          376                 if(debug)
          377                         fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
          378                 return;
          379         }
          380         dfd = open(file(dp->name, 'D'), OREAD);
          381         if(dfd < 0){
          382                 if(debug)
          383                         fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
          384                 Bterm(b);
          385                 sysunlockfile(Bfildes(b));
          386                 return;
          387         }
          388 
          389         /*
          390          *  make arg list
          391          *        - read args into (malloc'd) buffer
          392          *        - malloc a vector and copy pointers to args into it
          393          */
          394         buf = malloc(dp->length+1);
          395         if(buf == 0){
          396                 warning("buffer allocation", 0);
          397                 Bterm(b);
          398                 sysunlockfile(Bfildes(b));
          399                 close(dfd);
          400                 return;
          401         }
          402         if(Bread(b, buf, dp->length) != dp->length){
          403                 warning("reading control file %s\n", dp->name);
          404                 Bterm(b);
          405                 sysunlockfile(Bfildes(b));
          406                 close(dfd);
          407                 free(buf);
          408                 return;
          409         }
          410         buf[dp->length] = 0;
          411         av = malloc(2*sizeof(char*));
          412         if(av == 0){
          413                 warning("argv allocation", 0);
          414                 close(dfd);
          415                 free(buf);
          416                 Bterm(b);
          417                 sysunlockfile(Bfildes(b));
          418                 return;
          419         }
          420         for(ac = 1, cp = buf; *cp; ac++){
          421                 while(isspace(*cp))
          422                         *cp++ = 0;
          423                 if(*cp == 0)
          424                         break;
          425 
          426                 av = realloc(av, (ac+2)*sizeof(char*));
          427                 if(av == 0){
          428                         warning("argv allocation", 0);
          429                         close(dfd);
          430                         free(buf);
          431                         Bterm(b);
          432                         sysunlockfile(Bfildes(b));
          433                         return;
          434                 }
          435                 av[ac] = cp;
          436                 while(*cp && !isspace(*cp)){
          437                         if(*cp++ == '"'){
          438                                 while(*cp && *cp != '"')
          439                                         cp++;
          440                                 if(*cp)
          441                                         cp++;
          442                         }
          443                 }
          444         }
          445         av[0] = cmd;
          446         av[ac] = 0;
          447 
          448         if(!Eflag &&time(0) - dtime > giveup){
          449                 if(returnmail(av, dp->name, "Giveup") != 0)
          450                         logit("returnmail failed", dp->name, av);
          451                 remmatch(dp->name);
          452                 goto done;
          453         }
          454 
          455         for(i = 0; i < nbad; i++){
          456                 if(strcmp(av[3], badsys[i]) == 0)
          457                         goto done;
          458         }
          459 
          460         /*
          461          * Ken's fs, for example, gives us 5 minutes of inactivity before
          462          * the lock goes stale, so we have to keep reading it.
          463           */
          464         l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
          465 
          466         /*
          467          *  transfer
          468          */
          469         pid = fork();
          470         switch(pid){
          471         case -1:
          472                 sysunlock(l);
          473                 sysunlockfile(Bfildes(b));
          474                 syslog(0, runqlog, "out of procs");
          475                 exits(0);
          476         case 0:
          477                 if(debug) {
          478                         fprint(2, "Starting %s", cmd);
          479                         for(ac = 0; av[ac]; ac++)
          480                                 fprint(2, " %s", av[ac]);
          481                         fprint(2, "\n");
          482                 }
          483                 logit("execing", dp->name, av);
          484                 close(0);
          485                 dup(dfd, 0);
          486                 close(dfd);
          487                 close(2);
          488                 efd = open(file(dp->name, 'E'), OWRITE);
          489                 if(efd < 0){
          490                         if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
          491                         efd = create(file(dp->name, 'E'), OWRITE, 0666);
          492                         if(efd < 0){
          493                                 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
          494                                 exits("could not open error file - Retry");
          495                         }
          496                 }
          497                 seek(efd, 0, 2);
          498                 exec(cmd, av);
          499                 error("can't exec %s", cmd);
          500                 break;
          501         default:
          502                 for(;;){
          503                         wm = wait();
          504                         if(wm == nil)
          505                                 error("wait failed: %r", "");
          506                         if(wm->pid == pid)
          507                                 break;
          508                         free(wm);
          509                 }
          510                 if(debug)
          511                         fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
          512 
          513                 if(wm->msg[0]){
          514                         if(debug)
          515                                 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
          516                         if(!Rflag && atoi(wm->msg) != RetryCode){
          517                                 /* return the message and remove it */
          518                                 if(returnmail(av, dp->name, wm->msg) != 0)
          519                                         logit("returnmail failed", dp->name, av);
          520                                 remmatch(dp->name);
          521                         } else {
          522                                 /* add sys to bad list and try again later */
          523                                 nbad++;
          524                                 badsys = realloc(badsys, nbad*sizeof(char*));
          525                                 badsys[nbad-1] = strdup(av[3]);
          526                         }
          527                 } else {
          528                         /* it worked remove the message */
          529                         remmatch(dp->name);
          530                 }
          531                 free(wm);
          532 
          533         }
          534 done:
          535         if (l)
          536                 sysunlock(l);
          537         Bterm(b);
          538         sysunlockfile(Bfildes(b));
          539         free(buf);
          540         free(av);
          541         close(dfd);
          542 }
          543 
          544 
          545 /*
          546  *  return a name starting with the given character
          547  */
          548 char*
          549 file(char *name, char type)
          550 {
          551         static char nname[Elemlen+1];
          552 
          553         strncpy(nname, name, Elemlen);
          554         nname[Elemlen] = 0;
          555         nname[0] = type;
          556         return nname;
          557 }
          558 
          559 /*
          560  *  send back the mail with an error message
          561  *
          562  *  return 0 if successful
          563  */
          564 int
          565 returnmail(char **av, char *name, char *msg)
          566 {
          567         int pfd[2];
          568         Waitmsg *wm;
          569         int fd;
          570         char buf[256];
          571         char attachment[256];
          572         int i;
          573         long n;
          574         String *s;
          575         char *sender;
          576 
          577         if(av[1] == 0 || av[2] == 0){
          578                 logit("runq - dumping bad file", name, av);
          579                 return 0;
          580         }
          581 
          582         s = unescapespecial(s_copy(av[2]));
          583         sender = s_to_c(s);
          584 
          585         if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
          586                 logit("runq - dumping p to p mail", name, av);
          587                 return 0;
          588         }
          589 
          590         if(pipe(pfd) < 0){
          591                 logit("runq - pipe failed", name, av);
          592                 return -1;
          593         }
          594 
          595         switch(rfork(RFFDG|RFPROC|RFENVG)){
          596         case -1:
          597                 logit("runq - fork failed", name, av);
          598                 return -1;
          599         case 0:
          600                 logit("returning", name, av);
          601                 close(pfd[1]);
          602                 close(0);
          603                 dup(pfd[0], 0);
          604                 close(pfd[0]);
          605                 putenv("upasname", "/dev/null");
          606                 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
          607                 snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
          608                 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
          609                 error("can't exec", 0);
          610                 break;
          611         default:
          612                 break;
          613         }
          614 
          615         close(pfd[0]);
          616         fprint(pfd[1], "\n");        /* get out of headers */
          617         if(av[1]){
          618                 fprint(pfd[1], "Your request ``%.20s ", av[1]);
          619                 for(n = 3; av[n]; n++)
          620                         fprint(pfd[1], "%s ", av[n]);
          621         }
          622         fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
          623         fd = open(file(name, 'E'), OREAD);
          624         if(fd >= 0){
          625                 for(;;){
          626                         n = read(fd, buf, sizeof(buf));
          627                         if(n <= 0)
          628                                 break;
          629                         if(write(pfd[1], buf, n) != n){
          630                                 close(fd);
          631                                 goto out;
          632                         }
          633                 }
          634                 close(fd);
          635         }
          636         close(pfd[1]);
          637 out:
          638         wm = wait();
          639         if(wm == nil){
          640                 syslog(0, "runq", "wait: %r");
          641                 logit("wait failed", name, av);
          642                 return -1;
          643         }
          644         i = 0;
          645         if(wm->msg[0]){
          646                 i = -1;
          647                 syslog(0, "runq", "returnmail child: %s", wm->msg);
          648                 logit("returnmail child failed", name, av);
          649         }
          650         free(wm);
          651         return i;
          652 }
          653 
          654 /*
          655  *  print a warning and continue
          656  */
          657 void
          658 warning(char *f, void *a)
          659 {
          660         char err[65];
          661         char buf[256];
          662 
          663         rerrstr(err, sizeof(err));
          664         snprint(buf, sizeof(buf), f, a);
          665         fprint(2, "runq: %s: %s\n", buf, err);
          666 }
          667 
          668 /*
          669  *  print an error and die
          670  */
          671 void
          672 error(char *f, void *a)
          673 {
          674         char err[Errlen];
          675         char buf[256];
          676 
          677         rerrstr(err, sizeof(err));
          678         snprint(buf, sizeof(buf), f, a);
          679         fprint(2, "runq: %s: %s\n", buf, err);
          680         exits(buf);
          681 }
          682 
          683 void
          684 logit(char *msg, char *file, char **av)
          685 {
          686         int n, m;
          687         char buf[256];
          688 
          689         n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
          690         for(; *av; av++){
          691                 m = strlen(*av);
          692                 if(n + m + 4 > sizeof(buf))
          693                         break;
          694                 sprint(buf + n, " '%s'", *av);
          695                 n += m + 3;
          696         }
          697         syslog(0, runqlog, "%s", buf);
          698 }
          699 
          700 char *loadfile = ".runqload";
          701 
          702 /*
          703  *  load balancing
          704  */
          705 void
          706 doload(int start)
          707 {
          708         int fd;
          709         char buf[32];
          710         int i, n;
          711         Mlock *l;
          712         Dir *d;
          713 
          714         if(load <= 0)
          715                 return;
          716 
          717         if(chdir(root) < 0){
          718                 load = 0;
          719                 return;
          720         }
          721 
          722         l = syslock(loadfile);
          723         fd = open(loadfile, ORDWR);
          724         if(fd < 0){
          725                 fd = create(loadfile, 0666, ORDWR);
          726                 if(fd < 0){
          727                         load = 0;
          728                         sysunlock(l);
          729                         return;
          730                 }
          731         }
          732 
          733         /* get current load */
          734         i = 0;
          735         n = read(fd, buf, sizeof(buf)-1);
          736         if(n >= 0){
          737                 buf[n] = 0;
          738                 i = atoi(buf);
          739         }
          740         if(i < 0)
          741                 i = 0;
          742 
          743         /* ignore load if file hasn't been changed in 30 minutes */
          744         d = dirfstat(fd);
          745         if(d != nil){
          746                 if(d->mtime + 30*60 < time(0))
          747                         i = 0;
          748                 free(d);
          749         }
          750 
          751         /* if load already too high, give up */
          752         if(start && i >= load){
          753                 sysunlock(l);
          754                 exits(0);
          755         }
          756 
          757         /* increment/decrement load */
          758         if(start)
          759                 i++;
          760         else
          761                 i--;
          762         seek(fd, 0, 0);
          763         fprint(fd, "%d\n", i);
          764         sysunlock(l);
          765         close(fd);
          766 }