Kombinieren Sie das Framework, um asynchrone Operationen zu serialisieren

8

Wie kann ich die asynchronen Pipelines, aus denen das Combine-Framework besteht, synchron (seriell) ausrichten?

Angenommen, ich habe 50 URLs, von denen ich die entsprechenden Ressourcen herunterladen möchte, und nehmen wir an, ich möchte dies einzeln tun. Ich weiß, wie man das mit Operation / OperationQueue macht, z. B. mit einer Operation-Unterklasse, die sich erst nach Abschluss des Downloads für beendet erklärt. Wie würde ich mit Combine dasselbe machen?

Im Moment fällt mir nur ein, eine globale Liste der verbleibenden URLs zu führen und eine zu entfernen, diese eine Pipeline für einen Download einzurichten, den Download durchzuführen und in sinkder Pipeline zu wiederholen. Das scheint nicht sehr kombinationsartig zu sein.

Ich habe versucht, ein Array der URLs zu erstellen und es einem Array von Publishern zuzuordnen. Ich weiß, dass ich einen Publisher "produzieren" und veranlassen kann, dass er in der Pipeline mit veröffentlicht wird flatMap. Aber dann lade ich immer noch alle gleichzeitig herunter. Es gibt keine kombinierte Möglichkeit, das Array kontrolliert zu durchlaufen - oder gibt es eine?

(Ich habe mir auch vorgestellt, etwas mit Future zu machen, aber ich wurde hoffnungslos verwirrt. Ich bin an diese Denkweise nicht gewöhnt.)

matt
quelle

Antworten:

2

Ich habe dies nur kurz getestet, aber auf den ersten Blick scheint es, dass jede Anfrage auf den Abschluss der vorherigen Anfrage wartet, bevor sie gestartet wird.

Ich poste diese Lösung auf der Suche nach Feedback. Bitte seien Sie kritisch, wenn dies keine gute Lösung ist.

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


Eine präzisere Version dieser Lösung (bereitgestellt von @matt):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            $0.append($1).eraseToAnyPublisher()
        }
    }
}
Clay Ellis
quelle
Großartig, danke. appendist genau das, wonach ich gesucht habe. - Ihr Code kann erheblich verschärft werden. Insbesondere ist es nicht erforderlich, vorzeitig zurückzukehren count == 1, wenn dies der Fall ist , da in diesem Fall dropFirstleer ist und wir einfach keine Schleife ausführen. Und es besteht keine Notwendigkeit, die outputVariable zu pflegen , da wir reducestattdessen verwenden können for...in. Siehe meine Antwort für eine engere Darstellung.
Matt
3

Sie können einen benutzerdefinierten Abonnenten erstellen, in dem wiederkehrende Abonnenten empfangen werden.Demand.max (1). In diesem Fall fordert der Teilnehmer den nächsten Wert erst an, wenn er einen erhalten hat. Das Beispiel ist für Int.publisher, aber eine zufällige Verzögerung der Karte ahmt den Netzwerkverkehr nach :-)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber {
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  }

  func receive(completion: Subscribers.Completion<Never>) {
    DispatchQueue.main.async {
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    }
  }
}

(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map {
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", $0)
    }
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

Spielplatzdruck ...

Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true

UPDATE habe ich endlich gefunden .flatMap(maxPublishers: ), was mich zwingt, dieses interessante Thema mit einem etwas anderen Ansatz zu aktualisieren. Bitte beachten Sie, dass ich die globale Warteschlange für die Planung verwende, nicht nur eine zufällige Verzögerung, nur um sicherzugehen, dass das Empfangen eines serialisierten Streams kein "zufälliges" oder "glückliches" Verhalten ist :-)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1)) { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        }
}
.sink { value in
    print(value, "A")
}

let B = (1 ... 9)
    .publisher
    .flatMap { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        }
}
.sink { value in
    print("     ",value, "B")
}

druckt

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

Basierend auf hier geschrieben

.serialize ()?

Die von Clay Ellis definierte akzeptierte Antwort könnte durch ersetzt werden

.publisher.flatMap (maxPublishers: .max (1)) {$ 0}

während "unserialzed" Version muss verwendet werden

.publisher.flatMap {$ 0}

"Beispiel aus der realen Welt"

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
    var args: [String: String]
}


let collection = urls.compactMap { value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap { data, response -> Data in
            return data
        }
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch {_ in
            Just(Postman(args: [:]))
    }
}

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}

var streamA = ""
let A = collection
    .publisher.flatMap{$0}

    .sink(receiveCompletion: { (c) in
        print(streamA, "     ", c, "    .publisher.flatMap{$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    })


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion: { (c) in
        print(streamC, "     ", c, "    .serialize()?")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    })

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1)){$0}

    .sink(receiveCompletion: { (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    })

PlaygroundPage.current.needsIndefiniteExecution = true

druckt

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{$0}
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){$0}
... which proves the downloads are happening serially .-)       finished     .serialize()?

Scheint mir auch in anderen Szenarien sehr nützlich zu sein. Versuchen Sie, den Standardwert von maxPublishers im nächsten Snippet zu verwenden und vergleichen Sie die Ergebnisse :-)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)
user3441734
quelle
@matt sink funktioniert nicht anders, nur bei Rückgabe Subsribers.Demand.unlimited ... Verwenden Sie möglicherweise das richtige Instrument wie die serielle Warteschlange und Data.init? (Inhalt der URL: URL) ist die beste Option in Ihrem Szenario . Wenn Sie eine Summe von zwei Int machen müssen, tun Sie dies als [lhs: Int, rhs: Int] .reduce .... ??? Ich werde Data.init? (Inhalt der URL: URL) in receive (_ input :) von MySerialDownloaderSubscriber verwenden.
user3441734
@ Matt bitte, siehe aktualisierte Antwort. Kombinieren ist aufregend, aber (zumindest für mich) sehr schwer zu verstehen ...
user3441734
Ja ich sehe! Mit dem maxPublishersParameter können wir den Gegendruck erhöhen. Dies hängt mit dem zusammen, was ich in meiner Frage gesagt habe: "Ich weiß, dass ich einen Publisher" produzieren "und veranlassen kann, dass er mit flatMap in der Pipeline veröffentlicht wird. Aber dann lade ich immer noch alle gleichzeitig herunter." Nun, mit dem maxPublishersParameter sind sie nicht gleichzeitig.
Matt
@matt Ja, Senke Anruf Publisher-eigenen Abonnenten mit Subscribers.Demand.unlimited, flatMap hat den gleichen Effekt wie Set-Publisher-eigenen Abonnenten mit unterschiedlichem Wert, in unserem Anwendungsfall .max (1). Ich füge nur ein weiteres Beispiel mit einem anderen Szenario hinzu, in dem es so brauchbar ist.
user3441734
2

In allen anderen reaktiven Frameworks ist dies wirklich einfach. Sie verwenden nur concat, um die Ergebnisse in einem Schritt zu verketten und zu reduzieren, und dann können Sie reducedie Ergebnisse in ein endgültiges Array umwandeln. Apple macht dies schwierig, da Publisher.Concatenatees keine Überlastung gibt, die eine Reihe von Publishern akzeptiert. Es gibt eine ähnliche Verrücktheit mit Publisher.Merge. Ich habe das Gefühl, dass dies damit zu tun hat, dass sie verschachtelte generische Publisher zurückgeben, anstatt nur einen einzelnen generischen Typ wie rx Observable zurückzugeben. Ich denke, Sie können einfach Verketten anrufenin einer Schleife und reduzieren Sie dann die verketteten Ergebnisse in ein einziges Array, aber ich hoffe wirklich, dass sie dieses Problem in der nächsten Version beheben. Es besteht sicherlich die Notwendigkeit, mehr als 2 Verlage zusammenzufassen und mehr als 4 Verlage zusammenzuführen (und die Überlastungen für diese beiden Betreiber sind nicht einmal konsistent, was nur seltsam ist).

BEARBEITEN:

Ich bin darauf zurückgekommen und habe festgestellt, dass Sie tatsächlich eine beliebige Anzahl von Verlagen zusammenstellen können, die nacheinander emittieren. Ich habe keine Ahnung, warum es keine Funktion gibt ConcatenateMany, die dies für Sie erledigt, aber es sieht so aus, als wäre es nicht so schwer, selbst einen zu schreiben, solange Sie bereit sind, einen vom Typ gelöschten Verlag zu verwenden. Dieses Beispiel zeigt, dass Merge in zeitlicher Reihenfolge ausgegeben wird, während Concat in der Reihenfolge der Kombination ausgegeben wird:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
  total.append(next).eraseToAnyPublisher()
}

var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue: { v in
    print("concatenated: \(v)")
  }).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue: { v in
    print("merge: \(v)")
  }).store(in: &subscriptions)
Josh Homann
quelle
Ja, Sie haben wahrscheinlich vermutet, dass ich absichtlich eine große Zahl wie 50 gewählt habe.
Matt
Es gibt eine MergeMany. Ich verstehe nicht, warum es kein ConcatenateMany gibt. Rx swift hat Observable.concat und Reactive Swift hat flatMap (.concat), das ist also seltsam. Vielleicht fehlt mir etwas. Ich werde weiter nach developer.apple.com/documentation/combine/publishers/mergemany
Josh Homann
Würde concatserialisieren (in den anderen reaktiven Frameworks)?
Matt
Ja. Für eine Sequenz von Sequenzen haben Sie nur eine Möglichkeit zum Abflachen, dh setzen Sie die Elemente einer inneren Sequenz wie Sequence.flatMap schnell in eine andere. Wenn Sie eine asynchrone Sequenz haben, müssen Sie beim Reduzieren die zeitliche Dimension berücksichtigen. Sie können also entweder die Elemente aus allen inneren Sequenzen in zeitlicher Reihenfolge (Zusammenführen) oder die Elemente aus jeder inneren Sequenz in der Reihenfolge der Sequenzen (concat) ausgeben. Siehe das Marmordiagramm : rxmarbles.com/#concat vs rxmarbles.com/#merge
Josh Homann
Beachten Sie, dass dies .appendein Operator ist, der a erstellt Publisher.Concatenate.
Rob Mayoff
2

Aus der ursprünglichen Frage:

Ich habe versucht, ein Array der URLs zu erstellen und es einem Array von Publishern zuzuordnen. Ich weiß, dass ich einen Publisher "produzieren" und veranlassen kann, dass er in der Pipeline mit veröffentlicht wird flatMap. Aber dann lade ich immer noch alle gleichzeitig herunter. Es gibt keine kombinierte Möglichkeit, das Array kontrolliert zu durchlaufen - oder gibt es eine?


Hier ist ein Spielzeugbeispiel für das eigentliche Problem:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap() {$0}
    .sink {print($0)}.store(in:&self.storage)

Dies gibt die ganzen Zahlen von 1 bis 10 in zufälliger Reihenfolge aus, die zu zufälligen Zeiten eintreffen. Das Ziel ist es, etwas damit zu tun, collectiondas dazu führt, dass die ganzen Zahlen der Reihe nach von 1 bis 10 ausgegeben werden.


Jetzt werden wir nur eines ändern: in der Reihe

.flatMap {$0}

Wir fügen den maxPublishersParameter hinzu:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap(maxPublishers:.max(1)) {$0}
    .sink {print($0)}.store(in:&self.storage)

Presto, wir jetzt tun emit die ganzen Zahlen von 1 bis 10, um mit zufälligen Abständen zwischen ihnen.


Wenden wir dies auf das ursprüngliche Problem an. Um dies zu demonstrieren, benötige ich eine ziemlich langsame Internetverbindung und eine ziemlich große Ressource zum Herunterladen. Zuerst mache ich es mit gewöhnlichen .flatMap:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}
    .map {session.dataTaskPublisher(for: $0)
        .eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap() {$0}
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

Das Ergebnis ist

start
start
start
done
done
done
finished

Dies zeigt, dass wir die drei Downloads gleichzeitig durchführen. Okay, jetzt ändere dich

    .flatMap() {$0}

zu

    .flatMap(maxPublishers:.max(1) {$0}

Das Ergebnis ist jetzt:

start
done
start
done
start
done
finished

Wir laden jetzt seriell herunter, was das ursprünglich zu lösende Problem ist.


anhängen

In Übereinstimmung mit dem Prinzip von TIMTOWTDI können wir stattdessen die Herausgeber verketten, appendum sie zu serialisieren:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
    return $0.append($1).eraseToAnyPublisher()
}

Das Ergebnis ist ein Herausgeber, der die verzögerten Herausgeber in der ursprünglichen Sammlung serialisiert. Lassen Sie es uns beweisen, indem Sie es abonnieren:

pub.sink {print($0)}.store(in:&self.storage)

Sicher genug, die ganzen Zahlen kommen jetzt in der richtigen Reihenfolge an (mit zufälligen Intervallen dazwischen).


Wir können die Erstellung pubeiner Sammlung von Verlagen mit einer Erweiterung der Sammlung zusammenfassen, wie von Clay Ellis vorgeschlagen:

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}
matt
quelle
1

Hier ist ein einseitiger Spielplatzcode, der den möglichen Ansatz darstellt. Die Hauptidee besteht darin, asynchrone API-Aufrufe in eine FutureHerausgeberkette umzuwandeln und so eine serielle Pipeline zu erstellen .

Eingabe: Bereich von int von 1 bis 10, der asynchron in der Hintergrundwarteschlange in Zeichenfolgen konvertiert wird

Demo des direkten Aufrufs der asynchronen API:

let group = DispatchGroup()
inputValues.map {
    group.enter()
    asyncCall(input: $0) { (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    }
}
group.wait()

Ausgabe:

>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}

Demo der Mähdrescher-Pipeline:

Ausgabe:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

Code:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
    DispatchQueue.global(qos: .background).async {
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        }
}

// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
    Future<String, Error> { promise in
        asyncCall(input: input) { (value, error) in
            if let error = error {
                promise(.failure(error))
            } else {
                promise(.success(value))
            }
        }
    }
    .receive(on: DispatchQueue.main)
    .map {
        print(">> got \($0)") // << sideeffect of pipeline item
        return true
    }
    .eraseToAnyPublisher()
}

// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
        if let chain = chain {
            return chain.flatMap { _ in
                makeFuture(input: value)
            }.eraseToAnyPublisher()
        } else {
            return makeFuture(input: value)
        }
    }

// Execute pipeline
pipeline?
    .sink(receiveCompletion: { _ in
        // << do something on completion if needed
    }) { output in
        print(">>>> finished with \(output)")
    }
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true
Asperi
quelle
0

Verwenden Sie flatMap(maxPublishers:transform:)mit .max(1)z

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
        .flatMap(maxPublishers: .max(1)) { $0 }
        .eraseToAnyPublisher()
}

Wo

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: $0.data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

und

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    }, receiveValue: { image in
        // do whatever you want with the images as they come in
    })
}

Das führte zu:

seriell

Aber wir sollten erkennen, dass Sie einen großen Performance-Hit erzielen, wenn Sie sie so nacheinander ausführen. Wenn ich es zum Beispiel auf 6 gleichzeitig stoße, ist es mehr als doppelt so schnell:

gleichzeitig

Persönlich würde ich empfehlen, nur nacheinander herunterzuladen, wenn Sie dies unbedingt müssen (was beim Herunterladen einer Reihe von Bildern / Dateien mit ziemlicher Sicherheit nicht der Fall ist). Ja, das gleichzeitige Ausführen von Anforderungen kann dazu führen, dass sie nicht in einer bestimmten Reihenfolge abgeschlossen werden. Wir verwenden jedoch nur eine auftragsunabhängige Struktur (z. B. ein Wörterbuch anstelle eines einfachen Arrays), aber die Leistungssteigerungen sind so bedeutend, dass es sich im Allgemeinen lohnt.

Wenn Sie sie jedoch nacheinander herunterladen möchten, kann der maxPublishersParameter dies erreichen.

rauben
quelle
Ja, das ist es, was meine Antwort bereits sagt: stackoverflow.com/a/59889993/341994 sowie die Antwort, die ich an stackoverflow.com/a/59889174/341994
matt
Und siehe auch jetzt mein Buch apeth.com/UnderstandingCombine/operators/…
matt
Übrigens, wenn ich von sequentiell spreche, habe ich Ihre sequentielle asynchrone Operation für eine andere Aufgabe sehr gut genutzt, danke, dass Sie sie geschrieben haben
matt
@matt - Lol. Ich gestehe, dass ich nicht gesehen habe, dass Sie die maxPublishersOption gefunden haben. Und ich hätte nicht über "Don't Do Serial" nachgedacht, wenn ich bemerkt hätte, dass Sie es waren (da ich weiß, dass Sie die Vor- und Nachteile von Serial vs Concurrent vollständig verstehen). Ich sah buchstäblich nur "Ich möchte jeweils eine Datei herunterladen", ich war kürzlich auf die maxPublishersOption für etwas anderes gestoßen, das ich tat (nämlich eine moderne Lösung für diese Frage bereitzustellen ), und ich dachte, ich würde die Kombinationslösung I teilen hatte sich ausgedacht. Ich wollte nicht so abgeleitet sein.
Rob
1
Ja, es war die Lösung, auf die unter stackoverflow.com/a/48104095/1271826 verwiesen wurde, über die ich zuvor gesprochen habe. Ich fand das sehr hilfreich.
Matt