Eingaben aus mehreren Dateien / Pipes kombinieren, ohne Leitungen zu blockieren oder zu blockieren?

9

Gibt es ein Tool, das Eingaben aus mehreren Dateien oder Pipes in stdout schreibt, ohne Lesevorgänge zu blockieren, sodass einzelne Eingabezeilen intakt bleiben? Grundsätzlich möchte ich eine Reihe von Eingängen auf einen Ausgang multiplexen, ohne die Leitungen zu beschädigen.

$ combine file1 <(prog2) ... > nice-output.txt
  1. Die Reihenfolge der Ausgabe ist mir egal
  2. Es sollte nicht blockieren, solange einige Eingaben Daten enthalten
  3. Es sollte effizient sein (dh ich kann Ihren Perl-Einzeiler ablehnen;)
Jay Hacker
quelle

Antworten:

4

Sie sollten dies multitailziemlich einfach tun können.

Caleb
quelle
1
Können Sie vorschlagen, welche Argumente ich mit Multitail verwenden würde? Es scheint keinen nicht interaktiven Modus zu haben, hängt beim Versuch, in stdout zu schreiben, und stürzt beim Lesen aus einer Pipe ab.
Jay Hacker
Beginnen Sie mit -L, um einen Befehl auszuführen, die Ausgabe mit dem aktuellen Stream zusammenzuführen und -adie Ausgabe in eine Datei zu schreiben. Ich werde morgen mehr suchen. Wenn Sie ein detaillierteres Beispiel geben, werde ich versuchen, es zu bearbeiten.
Caleb
4

Wenn die Prozesse die Zeilen in einem einzigen writeAufruf schreiben , für den die Prozesse die Zeilenpufferung verwenden müssen (normalerweise deaktiviert, wenn ihre Standardausgabe kein Terminal ist), können Sie sie alle auf eine Pipe verweisen.

{ { sleep .1; echo one; sleep .1; echo two; } &
  { echo hello; sleep .15; echo world; };
  wait; } | cat

Wenn die Prozesse beim Schreiben in ein Terminal nur eine Zeilenpufferung durchführen, ist die Verwendung einfach script. Es ist etwas ungeschickt: Es kann nur in eine Datei schreiben.

script -q -c '
    { { sleep .1; echo one; sleep .1; echo two; } &
      { echo hello; sleep .15; echo world; };
      wait; }'
tail -n +2 typescript

Wenn die Programme lange Zeilen schreiben oder einfach keine Zeilenpufferung verwenden, funktioniert dieser Ansatz nicht. Sie benötigen ein Kollektorprogramm, das Zeilen von jedem Eingang separat liest und puffert und die Synchronisation an Zeilenenden durchführt. Es gibt kein Standarddienstprogramm mit dieser Funktionalität. Ich stimme Calebs Vorschlag von zumultitail .

Hier ist ein Python-Skript, das Zeilen liest, die von mehreren Befehlen erzeugt wurden, und sie in der Standardausgabe ausspuckt, ohne eine Zeile aufzubrechen. Ich habe es nicht viel getestet, also Vorbehalt Benutzer. Ich habe es überhaupt nicht bewertet.

#!/usr/bin/env python
import Queue, itertools, os, subprocess, sys, threading
# Queue of (producer_id, line). line==None indicates the end of a producer.
lq = Queue.Queue()

# Line producer
def run_task(i, cmd):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    line = p.stdout.readline()
    while line <> "":
        lq.put((i, line))
        line = p.stdout.readline()
    lq.put((i, None))

# Start a producer for each command passed as an argument
for i in range(1,len(sys.argv)):
    threading.Thread(target=run_task, args=(i, sys.argv[i])).start()
sources = len(sys.argv) - 1
# Consumer: print lines as they come in, until no producer is left.
while sources > 0:
    (k, line) = lq.get()
    if line == None: sources -= 1
    else: sys.stdout.write(str(k) + ":" + line)

Beispielnutzung:

./collect.py 'sleep 1; ls /; sleep 1; ls /' \
             '/bin/echo -n foo; sleep 1; /bin/echo -n bar; sleep 1; /bin/echo qux'
Gilles 'SO - hör auf böse zu sein'
quelle
1

Ja, Multitail scheint an die Vorstellung eines "Fensters" als Teilmenge eines Terminals gebunden zu sein. Ich konnte es nicht dazu bringen, als Pipeline-Komponente gut zu spielen.

So sieht aus wie wir hafta diesem selbst tun Risse Knöchel

/* Copyright © 2015 [email protected]
** Use/modify as you see fit but leave this attribution.
** If you change the interface and want to distribute the
** result please change the binary name too! */
#include <err.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>

/* typedefs are for pussies */
struct {
    char *filename; /* for clarity of errors */
    char *data;
    long len;
    long cap;
} saved[FD_SETSIZE] = {0};

void
ewriten(int fd, char *buf, int n)
{
    int done = 0, c;
    while (done < n) {
        if ((c=write(fd, buf + done, n - done)) <= 0 && errno != EINTR) {
            err(1, "write");
        }
        done += c;
    }
}

int
empty(fd_set *fdset, int maxfd)
{
    int i;
    for (i=0; i <= maxfd; i++) {
        if (FD_ISSET(i, fdset)) return 0;
    }
    return 1;
}

void
combine(fd_set *fdset, int maxfd)
{
    char buf[4096], *cp;
    fd_set ready;
    int n, i, fd, left;
    while (!empty(fdset, maxfd)) {
        ready = *fdset;
        /* timeouts are for pussies */
        if (select(maxfd + 1, &ready, NULL, NULL, NULL) == -1) err(1, "select");
        for (fd=0; fd <= maxfd; fd++) {
            if (!FD_ISSET(fd, &ready)) continue;

            switch (n=read(fd, &buf, sizeof(buf))) {
            case -1:
                if (errno == EINTR)
                    break; /* ignore interrupts; we'll re-read next iteration */
                if (saved[fd].filename) err(1, "read: %s", saved[fd].filename);
                err(1, "read: %d", fd);
            case 0:
                if (saved[fd].len > 0) {
                    /* someone forgot their newline at EOF... */
                    ewriten(1, saved[fd].data, saved[fd].len);
                    saved[fd].data[0] = '\n'; /* put it back for them */
                    ewriten(1, saved[fd].data, 1);
                }
                free(saved[fd].data);
                FD_CLR(fd, fdset);
                break;
            default:
                for (cp=buf + n - 1; cp >= buf && *cp != '\n'; cp--); /* find last newline */
                left = n - (cp - buf + 1);
                if (cp >= buf) {
                    /* we found one! first dump any saved data from the last read */
                    if (saved[fd].len > 0) {
                        ewriten(1, saved[fd].data, saved[fd].len);
                        saved[fd].len = 0;
                    }
                    ewriten(1, buf, cp - buf + 1);
                }
                if (left > 0) {
                    /* now save any leftover data for later */
                    int need = saved[fd].len + left;
                    if (saved[fd].cap < need &&
                       (saved[fd].data=realloc(saved[fd].data, need)) == NULL) {
                        errx(1, "realloc: failed on %d bytes", need);
                        /* it was good enough for quake... */
                    }
                    saved[fd].cap = need;
                    memcpy(saved[fd].data + saved[fd].len, buf + n - 1 - left, left);
                    saved[fd].len += left;
                }
            }
        }
    }
}

void
addfd(int fd, fd_set *fdset, int *maxfd)
{
    FD_SET(fd, fdset);
    if (*maxfd < fd) {
        *maxfd = fd;
    }
}

int
main(int argc, char **argv)
{
    fd_set fdset;
    char **arg = argv + 1;
    char *cp;
    struct stat st;
    int fd, maxfd = -1;
    FD_ZERO(&fdset);
    while (*arg != NULL) {
        /* getopt is for pussies */
        if (strncmp("-u", *arg, 2) == 0) {
            *arg += 2;
            if (**arg == '\0' && *++arg == NULL ) errx(1, "-u requires argument (comma separated FD list)");
            /* reentrancy is for pussies */
            for (cp=strtok(*arg, ","); cp != NULL; cp=strtok(NULL, ",")) {
                fd = atoi(cp);
                if (fstat(fd, &st) != 0) err(1, "%d", fd);
                addfd(fd, &fdset, &maxfd);
            }
            arg++;
        } else if (strcmp("-", *arg) == 0) {
            if (fstat(0, &st) != 0) err(1, "stdin", fd);
            addfd(0, &fdset, &maxfd);
            saved[0].filename = "stdin";
            arg++;
        } else if (strcmp("--", *arg) == 0) {
            arg++;
            break;
        } else if (**arg == '-') {
            errx(1, "unrecognized argument %s", *arg);
        } else {
            break; /* treat as filename */
        }
    }
    /* remaining args are filenames */
    for (; *arg != NULL; arg++) {
        /* stdio is for pussies */
        if ((fd=open(*arg, O_RDONLY)) == -1) err(1, "open: %s", *arg);
        addfd(fd, &fdset, &maxfd);
        saved[fd].filename = *arg;
    }
    combine(&fdset, maxfd);
    return 0;
}

Ahhh das fühlte sich gut an.

(Hinweis: Es wurde an ungefähr zwei Eingabesätzen getestet. Fehler können vorhanden sein oder nicht.)

Woche
quelle