Berechnen Sie die Durchschnittsgeschwindigkeit von Straßen [geschlossen]

20

Ich ging zu einem Vorstellungsgespräch als Dateningenieur. Der Interviewer hat mir eine Frage gestellt. Er gab mir eine Situation und bat mich, den Datenfluss für dieses System zu entwerfen. Ich habe das gelöst, aber er mochte meine Lösung nicht und ich habe versagt. Ich würde gerne wissen, ob Sie bessere Ideen haben, wie Sie diese Herausforderung lösen können.

Die Frage war:

Unser System empfängt vier Datenströme. Die Daten enthalten eine Fahrzeug-ID, Geschwindigkeit und Geolokalisierungskoordinationen. Jedes Fahrzeug sendet seine Daten einmal pro Minute. Es gibt keine Verbindung zwischen einem bestimmten Strom und einer bestimmten Straße, einem bestimmten Fahrzeug oder etwas anderem. Es gibt eine Funktion, die Koordinationen akzeptiert und einen Straßenabschnittnamen zurückgibt. Wir müssen die durchschnittliche Geschwindigkeit pro Straßenabschnitt pro 5 Minuten kennen. Zum Schluss wollen wir die Ergebnisse an Kafka schreiben.

Geben Sie hier die Bildbeschreibung ein

Meine Lösung war also:

Schreiben Sie zuerst alle Daten in einen Kafka-Cluster, in ein Thema, unterteilt durch die 5-6 ersten Ziffern des Breitengrads, verkettet mit den 5-6 ersten Ziffern des Längengrads. Lesen Sie dann die Daten durch strukturiertes Streaming, fügen Sie für jede Zeile den Namen des Straßenabschnitts durch die Koordinationen hinzu (dafür gibt es ein vordefiniertes udf) und fassen Sie die Daten dann nach dem Namen des Straßenabschnitts zusammen.

Da ich die Daten in Kafka nach den 5-6 ersten Ziffern der Koordinationen partitioniere, müssen nach der Übersetzung der Koordinationen in den Abschnittsnamen nicht viele Daten auf die richtige Partition übertragen werden, und daher kann ich die Operation colesce () nutzen das löst kein vollständiges Mischen aus.

Berechnen Sie dann die Durchschnittsgeschwindigkeit pro Executor.

Der gesamte Vorgang wird alle 5 Minuten ausgeführt und die Daten werden im Append-Modus in die endgültige Kafka-Senke geschrieben.

Geben Sie hier die Bildbeschreibung ein

Also wieder, der Interviewer mochte meine Lösung nicht. Könnte jemand vorschlagen, wie man es verbessern kann oder eine ganz andere und bessere Idee?

Alon
quelle
Wäre es nicht besser, die Person zu fragen, was sie nicht genau mochte?
Gino Pane
Ich denke, es ist eine schlechte Idee, nach dem verketteten Lat-Long zu partitionieren. Wird der Datenpunkt nicht für jede Spur als etwas andere Koordinate angegeben?
Webber
@webber daher nehme ich nur ein paar Ziffern, damit die Position nicht eindeutig ist, sondern relativ groß wie ein Straßenabschnitt.
Alon

Antworten:

6

Ich fand diese Frage sehr interessant und dachte darüber nach, es zu versuchen.

Wie ich weiter ausgewertet habe, ist Ihr Versuch selbst gut, mit Ausnahme der folgenden:

aufgeteilt durch die 5-6 ersten Ziffern des Breitengrads, verkettet mit den 5-6 ersten Ziffern des Längengrads

Wenn Sie bereits eine Methode haben, um die ID / den Namen des Straßenabschnitts basierend auf Breite und Länge zu ermitteln, rufen Sie diese Methode zuerst auf und verwenden Sie die ID / den Namen des Straßenabschnitts, um die Daten überhaupt zu partitionieren.

Und danach ist alles ganz einfach, also wird die Topologie sein

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Eine ausführlichere Erklärung finden Sie in den Kommentaren im Code unten. Bitte fragen Sie, ob etwas unklar ist.)

Ich habe den Code am Ende dieser Antwort hinzugefügt. Bitte beachten Sie, dass ich anstelle des Durchschnitts die Summe verwendet habe, da dies einfacher zu demonstrieren ist. Es ist möglich, einen Durchschnitt zu erstellen, indem einige zusätzliche Daten gespeichert werden.

Ich habe die Antwort in Kommentaren detailliert beschrieben. Es folgt ein Topologiediagramm, das aus dem Code generiert wurde (dank https://zz85.github.io/kafka-streams-viz/ ).

Topologie:

Topologiediagramm

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
quelle
Ist das Zusammenführen aller Streams nicht eine schlechte Idee? Dies kann zu einem Engpass für Ihren Datenfluss werden. Was passiert, wenn Sie mit dem Wachstum Ihres Systems immer mehr Eingabestreams empfangen? Wird dies skalierbar sein?
Wypul
@wypul> Ist das Zusammenführen aller Streams nicht eine schlechte Idee? -> Ich denke nein. Parallelität in Kafka wird nicht durch Streams erreicht, sondern durch Partitionen (und Aufgaben), Threading usw. Streams sind eine Möglichkeit, die Daten zu gruppieren. > Wird dies skalierbar sein? -> ja. Da wir nach Straßenabschnitten tasten und davon ausgehen, dass die Straßenabschnitte fair verteilt sind, können wir die Anzahl der Partitionen für diese Themen erhöhen, um den Stream in verschiedenen Containern parallel zu verarbeiten. Wir können einen guten Partitionierungsalgorithmus verwenden, der auf dem Straßenabschnitt basiert, um die Last auf Replikate zu verteilen.
Irshad PI
1

Das Problem als solches scheint einfach zu sein und die angebotenen Lösungen sind bereits sehr sinnvoll. Ich frage mich, ob der Interviewer besorgt über das Design und die Leistung der Lösung war, auf die Sie sich konzentriert haben, oder über die Genauigkeit des Ergebnisses. Da sich andere auf Code, Design und Leistung konzentriert haben, werde ich die Genauigkeit abwägen.

Streaming-Lösung

Während die Daten einfließen, können wir eine grobe Schätzung der Durchschnittsgeschwindigkeit einer Straße liefern. Diese Schätzung ist hilfreich bei der Erkennung von Überlastungen, bei der Bestimmung der Geschwindigkeitsbegrenzung jedoch nicht.

  1. Kombinieren Sie alle 4 Datenströme miteinander.
  2. Erstellen Sie ein Fenster von 5 Minuten, um Daten aus allen 4 Streams in 5 Minuten zu erfassen.
  3. Wenden Sie UDF auf Koordinaten an, um den Straßennamen und den Städtenamen zu erhalten. Die Straßennamen sind oft städteübergreifend doppelt vorhanden, daher verwenden wir den Stadtnamen + den Straßennamen als Schlüssel.
  4. Berechnen Sie die Durchschnittsgeschwindigkeit mit einer Syntax wie -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Chargenlösung

Diese Schätzung ist deaktiviert, da die Stichprobengröße klein ist. Wir benötigen eine Stapelverarbeitung für ganze Monat / Quartal / Jahr-Daten, um das Tempolimit genauer zu bestimmen.

  1. Lesen Sie die Daten eines Jahres vom Datensee (oder Kafka-Thema).

  2. Wenden Sie UDF auf Koordinaten an, um den Straßennamen und den Städtenamen zu erhalten.

  3. Berechnen Sie die Durchschnittsgeschwindigkeit mit einer Syntax wie -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. Schreiben Sie das Ergebnis in Data Lake.

Basierend auf dieser genaueren Geschwindigkeitsbegrenzung können wir langsamen Verkehr in der Streaming-Anwendung vorhersagen.

Salim
quelle
1

Ich sehe einige Probleme mit Ihrer Partitionierungsstrategie:

  • Wenn Sie sagen, dass Sie Ihre Daten basierend auf den ersten 5-6 Ziffern Lat-Länge partitionieren möchten, können Sie die Anzahl der Kafka-Partitionen nicht im Voraus bestimmen. Sie haben verzerrte Daten, da Sie bei einigen Straßenabschnitten ein höheres Volumen als bei anderen beobachten.

  • Und Ihre Tastenkombination garantiert ohnehin nicht dieselben Straßenabschnittsdaten in derselben Partition, und daher können Sie nicht sicher sein, dass kein Mischen stattfindet.

IMO-Informationen reichen nicht aus, um die gesamte Datenpipeline zu entwerfen. Denn beim Entwerfen der Pipeline spielt die Partitionierung Ihrer Daten eine wichtige Rolle. Sie sollten sich mehr über die Daten erkundigen, die Sie erhalten, z. B. Anzahl der Fahrzeuge, Größe der Eingabedatenströme, Ist die Anzahl der Datenströme festgelegt oder kann sie in Zukunft zunehmen? Sind die Eingangsdatenströme, die Sie empfangen, Kafka-Ströme? Wie viele Daten erhalten Sie in 5 Minuten?

  • Nehmen wir nun an, Sie haben 4 Streams, die zu 4 Themen in Kafka oder 4 Partitionen geschrieben wurden, und Sie haben keinen bestimmten Schlüssel, aber Ihre Daten werden basierend auf einem Rechenzentrumsschlüssel partitioniert oder es ist eine Hash-Partitionierung. Wenn nicht, sollte dies auf der Datenseite erfolgen, anstatt die Daten in einem anderen Kafka-Stream zu de-duplizieren und zu partitionieren.
  • Wenn Sie die Daten in verschiedenen Rechenzentren empfangen, müssen Sie die Daten in einem Cluster speichern. Zu diesem Zweck können Sie Kafka Mirror Maker oder ähnliches verwenden.
  • Nachdem Sie alle Daten in einem Cluster gespeichert haben, können Sie dort einen strukturierten Streaming-Job mit einem Triggerintervall von 5 Minuten und einem Wasserzeichen ausführen, das Ihren Anforderungen entspricht.
  • Um den Durchschnitt zu berechnen und viel Mischen zu vermeiden, können Sie eine Kombination von mapValuesund reduceByKeyanstelle von groupBy verwenden. Verweisen Sie darauf .
  • Sie können die Daten nach der Verarbeitung in die Kafka-Senke schreiben.
wypul
quelle
mapValues ​​und reductByKey gehören zum Low-Level-RDD. Ist Catalyst nicht intelligent genug, um die effizienteste RDD zu generieren, wenn ich den Durchschnitt gruppiere und berechne?
Alon
@Alon Catalyst wird sicherlich in der Lage sein, den besten Plan zum Ausführen Ihrer Abfrage zu finden. Wenn Sie jedoch groupBy verwenden, werden Daten mit demselben Schlüssel zuerst auf dieselbe Partition gemischt und dann eine aggregierte Operation darauf angewendet. mapValuesund reduceBygehört zwar zu Low-Level-RDD, wird aber in dieser Situation immer noch eine bessere Leistung erbringen, da zuerst das Aggregat pro Partition berechnet und dann gemischt wird.
Wypul
0

Die Hauptprobleme, die ich bei dieser Lösung sehe, sind:

  • Straßenabschnitte, die sich am Rand der 6-stelligen Quadrate der Karte befinden, enthalten Daten in mehreren Themenpartitionen und mehrere Durchschnittsgeschwindigkeiten.
  • Die Größe der Aufnahmedaten für Ihre Kafka-Partitionen ist möglicherweise unausgewogen (Stadt vs. Wüste). Die Partitionierung nach den ersten Ziffern der Auto-ID könnte eine gute Idee sein, IMO.
  • Ich bin mir nicht sicher, ob ich dem Koaleszenz-Teil gefolgt bin, aber es klingt problematisch.

Ich würde sagen, die Lösung muss Folgendes tun: Lesen aus dem Kafka-Stream -> UDF -> Gruppieren nach Straßenabschnitten -> Durchschnitt -> Schreiben in den Kafka-Stream.

David Taub
quelle
0

Mein Design würde davon abhängen

  1. Anzahl der Straßen
  2. Zahl der Fahrzeuge
  3. Berechnungskosten der Straße aus Koordinaten

Wenn ich für eine beliebige Anzahl von Zählungen skalieren möchte, würde das Design so aussehen Geben Sie hier die Bildbeschreibung ein

Cross Bedenken zu diesem Design -

  1. Behalten Sie den dauerhaften Zustand der Eingabestreams bei (wenn die Eingabe kafka ist, können wir Offsets mit Kafka oder extern speichern)
  2. Regelmäßiger Checkpoint-Status für externes System (ich bevorzuge die Verwendung asynchroner Checkpoint-Barrieren in Flink )

Einige praktische Verbesserungen an diesem Design möglich -

  1. Caching-Funktion zur Zuordnung von Straßenabschnitten, wenn möglich, basierend auf Straßen
  2. Umgang mit fehlenden Pings (in der Praxis ist nicht jeder Ping verfügbar)
  3. Berücksichtigung der Straßenkrümmung (Peilung und Höhe)
Yugandhar
quelle