Wie höre ich N Kanäle? (dynamische select-Anweisung)

116

Um eine Endlosschleife für die Ausführung von zwei Goroutinen zu starten, kann ich den folgenden Code verwenden:

Nach Erhalt der Nachricht wird eine neue Goroutine gestartet und für immer fortgesetzt.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Ich hätte jetzt gerne das gleiche Verhalten für N Goroutinen, aber wie wird die select-Anweisung in diesem Fall aussehen?

Dies ist das Codebit, mit dem ich begonnen habe, aber ich bin verwirrt, wie die select-Anweisung codiert wird

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
John Smith
quelle
4
Ich denke, was Sie wollen, ist Channel Multiplexing. golang.org/doc/effective_go.html#chan_of_chan Grundsätzlich haben Sie einen einzelnen Kanal, den Sie hören, und dann mehrere untergeordnete Kanäle, die in den Hauptkanal geleitet werden. Verwandte SO Frage: stackoverflow.com/questions/10979608/…
Brenden

Antworten:

152

Sie können dies mit der SelectFunktion aus dem Reflect- Paket tun :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select führt eine Auswahloperation aus, die in der Liste der Fälle beschrieben wird. Wie die Go select-Anweisung blockiert sie, bis mindestens einer der Fälle fortfahren kann, trifft eine einheitliche pseudozufällige Auswahl und führt diesen Fall dann aus. Es gibt den Index des ausgewählten Falls zurück und, wenn dieser Fall eine Empfangsoperation war, den empfangenen Wert und einen Booleschen Wert, der angibt, ob der Wert einem Senden auf dem Kanal entspricht (im Gegensatz zu einem Nullwert, der empfangen wird, weil der Kanal geschlossen ist).

Sie übergeben ein Array von SelectCaseStrukturen, die den Kanal identifizieren, auf dem ausgewählt werden soll, die Richtung der Operation und einen Wert, der im Fall einer Sendeoperation gesendet werden soll.

Sie könnten also so etwas tun:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Sie können hier mit einem ausführlicheren Beispiel experimentieren: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
quelle
4
Gibt es eine praktische Grenze für die Anzahl der Fälle in einer solchen Auswahl? Wenn Sie darüber hinausgehen, wird die Leistung stark beeinträchtigt?
Maxim Vladimirsky
4
Vielleicht ist es meine Inkompetenz, aber ich fand es sehr schwierig, mit diesem Muster zu arbeiten, wenn Sie komplexe Strukturen über den Kanal senden und empfangen. In meinem Fall war es viel einfacher, einen gemeinsamen "Aggregat" -Kanal zu übergeben, wie Tim Allclair sagte.
Bora M. Alper
89

Sie können dies erreichen, indem Sie jeden Kanal in eine Goroutine einschließen, die Nachrichten an einen gemeinsam genutzten "aggregierten" Kanal "weiterleitet". Beispielsweise:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Wenn Sie wissen möchten, von welchem ​​Kanal die Nachricht stammt, können Sie sie in eine Struktur mit zusätzlichen Informationen einschließen, bevor Sie sie an den Gesamtkanal weiterleiten.

In meinen (eingeschränkten) Tests ist diese Methode mit dem Reflect-Paket sehr leistungsfähig:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Benchmark-Code hier

Tim Allclair
quelle
2
Ihr Benchmark-Code ist falsch. Sie müssen eine Schleifeb.N innerhalb eines Benchmarks durchführen. Andernfalls sind die Ergebnisse (die b.Nin Ihrer Ausgabe durch 1 und 2000000000 geteilt werden ) völlig bedeutungslos.
Dave C
2
@ DaveC Danke! Die Schlussfolgerung ändert sich nicht, aber die Ergebnisse sind viel vernünftiger.
Tim Allclair
1
In der Tat habe ich Ihren Benchmark-Code schnell gehackt, um einige tatsächliche Zahlen zu erhalten . Möglicherweise fehlt in diesem Benchmark noch etwas / stimmt nicht, aber das einzige, was der kompliziertere Reflect-Code bewirkt, ist, dass das Setup schneller ist (mit GOMAXPROCS = 1), da keine Goroutinen erforderlich sind. In jedem anderen Fall bläst ein einfacher Goroutine-Zusammenführungskanal die reflektierende Lösung weg (um ~ 2 Größenordnungen).
Dave C
2
Ein wichtiger Nachteil (im Vergleich zum reflect.SelectAnsatz) ist, dass die Goroutinen, die den Zusammenführungspuffer ausführen, auf jedem Kanal, der zusammengeführt wird, mindestens einen einzelnen Wert haben. Normalerweise ist das kein Problem, aber in einigen spezifischen Anwendungen kann dies ein Deal Breaker sein :(.
Dave C
1
Ein gepufferter Zusammenführungskanal verschlimmert das Problem. Das Problem ist, dass nur die Reflect-Lösung eine vollständig ungepufferte Semantik aufweisen kann. Ich habe den Testcode, mit dem ich experimentiert habe, als separate Antwort veröffentlicht, um (hoffentlich) zu klären, was ich sagen wollte.
Dave C
22

Um einige Kommentare zu früheren Antworten zu erweitern und hier einen klareren Vergleich zu ermöglichen, ist ein Beispiel für beide bisher vorgestellten Ansätze bei gleicher Eingabe, eine Reihe von Kanälen zum Lesen und eine Funktion zum Aufrufen jedes Werts, der auch wissen muss, welcher Kanal, von dem der Wert kam.

Es gibt drei Hauptunterschiede zwischen den Ansätzen:

  • Komplexität. Obwohl es teilweise eine Leserpräferenz sein mag, finde ich den Kanalansatz idiomatischer, unkomplizierter und lesbarer.

  • Performance. Auf meinem Xeon amd64-System führen die Goroutinen + Kanäle aus die Reflexionslösung um etwa zwei Größenordnungen aus (im Allgemeinen ist die Reflexion in Go oft langsamer und sollte nur verwendet werden, wenn dies unbedingt erforderlich ist). Natürlich kann dieser Leistungsunterschied leicht unbedeutend werden, wenn entweder die Funktion, die die Ergebnisse verarbeitet, oder das Schreiben von Werten in die Eingangskanäle erheblich verzögert wird.

  • Semantik blockieren / puffern. Die Bedeutung davon hängt vom Anwendungsfall ab. Meistens spielt es entweder keine Rolle oder die geringfügige zusätzliche Pufferung in der Goroutine-Zusammenführungslösung kann für den Durchsatz hilfreich sein. Wenn es jedoch wünschenswert ist, die Semantik zu haben, dass nur ein einzelner Writer entsperrt wird und sein Wert vollständig verarbeitet wird, bevor ein anderer Writer entsperrt wird, kann dies nur mit der Reflect-Lösung erreicht werden.

Beachten Sie, dass beide Ansätze vereinfacht werden können, wenn entweder die "ID" des sendenden Kanals nicht erforderlich ist oder wenn die Quellkanäle niemals geschlossen werden.

Goroutine-Zusammenführungskanal:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Reflexionsauswahl:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Vollständiger Code auf dem Go-Spielplatz .]

Dave C.
quelle
1
Es ist auch erwähnenswert, dass die Goroutines + Channels-Lösung nicht alles kann selectoder kann reflect.Select. Die Goroutinen drehen sich so lange, bis sie alles aus den Kanälen verbrauchen. Es gibt also keinen klaren Weg, den Sie machen könntenProcess1 vorzeitig . Es besteht auch die Möglichkeit von Problemen, wenn Sie mehrere Lesegeräte haben, da die Goroutinen ein Element aus jedem der Kanäle puffern, was bei nicht der Fall ist select.
James Henstridge
@ JamesHenstridge, deine erste Anmerkung zum Stoppen ist nicht wahr. Sie würden veranlassen, Process1 genauso zu stoppen, wie Sie Process2 stoppen würden. zB durch Hinzufügen eines "Stopp" -Kanals, der geschlossen wird, wenn die Goroutinen anhalten sollen. Process1 würde anstelle der derzeit verwendeten einfacheren Schleife zwei Fälle selectinnerhalb einer forSchleife benötigen for range. Process2 müsste einen anderen Fall einsteckencases diesen Wert von und ihn speziell behandeln i.
Dave C
Das löst immer noch nicht das Problem, dass Sie Werte aus den Kanälen lesen, die im Fall des frühen Stopps nicht verwendet werden.
James Henstridge
0

Warum würde dieser Ansatz nicht funktionieren, wenn jemand Ereignisse sendet?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
quelle
8
Dies ist eine Spin-Schleife. Während Sie darauf warten, dass ein Eingangskanal einen Wert hat, verbraucht dies die gesamte verfügbare CPU. Der springende Punkt selectbei mehreren Kanälen (ohne defaultKlausel) ist, dass effizient gewartet wird, bis mindestens einer bereit ist, ohne sich zu drehen.
Dave C
0

Möglicherweise einfachere Option:

Anstatt ein Array von Kanälen zu haben, sollten Sie nur einen Kanal als Parameter an die Funktionen übergeben, die auf separaten Goroutinen ausgeführt werden, und dann den Kanal in einer Consumer-Goroutine anhören.

Auf diese Weise können Sie nur einen Kanal in Ihrem Listener auswählen, um eine einfache Auswahl zu ermöglichen und die Erstellung neuer Goroutinen zu vermeiden, um Nachrichten aus mehreren Kanälen zusammenzufassen.

Fernando Sanchez
quelle