Pandas: Zick-Zack-Segmentierung von Daten basierend auf lokalen Minima-Maxima

10

Ich habe Zeitreihendaten. Daten generieren

date_rng = pd.date_range('2019-01-01', freq='s', periods=400)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']

Ich möchte eine Zick-Zack-Linie zwischen den lokalen Maxima und den lokalen Minima erstellen, die die Bedingung erfüllt, dass auf der y-Achse |highest - lowest value|jeder Zick-Zack-Linie einen Prozentsatz (z. B. 20%) des Abstands der vorherigen überschreiten muss Zick-Zack-Linie UND ein vorgegebener Wert k (sagen wir 1,2)

Ich kann die lokalen Extrema mit diesem Code finden:

# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]

# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)

# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

aber ich weiß nicht, wie ich die Schwellenbedingung darauf anwenden soll. Bitte teilen Sie mir mit, wie diese Bedingung anzuwenden ist.

Da die Daten Millionen Zeitstempel enthalten können, wird eine effiziente Berechnung dringend empfohlen

Für eine klarere Beschreibung: Geben Sie hier die Bildbeschreibung ein

Beispielausgabe meiner Daten:

 # Instantiate axes.
(fig, ax) = plt.subplots()
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Zigzag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

Geben Sie hier die Bildbeschreibung ein

Meine gewünschte Ausgabe (etwas Ähnliches, der Zickzack verbindet nur die signifikanten Segmente) Geben Sie hier die Bildbeschreibung ein

Thanh Nguyen
quelle

Antworten:

3

Ich habe mein bestes Verständnis der Frage beantwortet. Es ist jedoch nicht klar, wie die Variable K den Filter beeinflusst.

Sie möchten die Extrema basierend auf einer laufenden Bedingung filtern. Ich gehe davon aus, dass Sie alle Extrema markieren möchten, deren relativer Abstand zum zuletzt markierten Extremum größer als p% ist. Ich gehe weiter davon aus, dass Sie das erste Element der Zeitreihe immer als gültigen / relevanten Punkt betrachten.

Ich habe dies mit der folgenden Filterfunktion implementiert:

def filter(values, percentage):
    previous = values[0] 
    mask = [True]
    for value in values[1:]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    return mask

Um Ihren Code auszuführen, importiere ich zuerst Abhängigkeiten:

from scipy import signal
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

Um den Code reproduzierbar zu machen, korrigiere ich den zufälligen Startwert:

np.random.seed(0)

Der Rest von hier ist Copypasta. Beachten Sie, dass ich die Probenmenge verringert habe, um das Ergebnis klar zu machen.

date_rng = pd.date_range('2019-01-01', freq='s', periods=30)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']
# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]
# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)
# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

Dann verwenden wir die Filterfunktion:

p = 0.2 # 20% 
filter_mask = filter(df_peaks_valleys.zigzag_y, p)
filtered = df_peaks_valleys[filter_mask]

Und Plot wie Sie sowohl Ihren vorherigen Plot als auch die neu gefilterten Extrema:

 # Instantiate axes.
(fig, ax) = plt.subplots(figsize=(10,10))
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Extrema")
# Plot zigzag trendline.
ax.plot(filtered['date'].values, filtered['zigzag_y'].values, 
                                                        color='blue', label="ZigZag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

Geben Sie hier die Bildbeschreibung ein

BEARBEITEN :

Wenn Sie sowohl den ersten als auch den letzten Punkt als gültig betrachten möchten, können Sie die Filterfunktion wie folgt anpassen:

def filter(values, percentage):
    # the first value is always valid
    previous = values[0] 
    mask = [True]
    # evaluate all points from the second to (n-1)th
    for value in values[1:-1]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    # the last value is always valid
    mask.append(True)
    return mask
Nikolas Rieble
quelle
Hallo, danke für die tolle Antwort. Ja, Ihre Annahme ist richtig. "Markieren Sie alle Extrema, deren relativer Abstand zum zuletzt markierten Extremum größer als p% ist." Sowohl der erste als auch der letzte Punkt sollten immer berücksichtigt werden. Ich habe Ihre Antwort überprüft, manchmal ist der letzte Punkt verfehlt. Können Sie mir dabei helfen?
Thanh Nguyen
3

Sie können die Rollfunktion von Pandas verwenden, um die lokalen Extrema zu erstellen. Das vereinfacht den Code ein wenig im Vergleich zu Ihrem Scipy-Ansatz.

Funktionen zum Auffinden der Extrema:

def islocalmax(x):
    """Both neighbors are lower,
    assumes a centered window of size 3"""
    return (x[0] < x[1]) & (x[2] < x[1])

def islocalmin(x):
    """Both neighbors are higher,
    assumes a centered window of size 3"""
    return (x[0] > x[1]) & (x[2] > x[1])

def isextrema(x):
    return islocalmax(x) or islocalmin(x)

Die Funktion zum Erstellen des Zickzacks kann sofort (über jede Spalte) auf den Datenrahmen angewendet werden. Dadurch werden jedoch NaNs eingeführt, da die zurückgegebenen Zeitstempel für jede Spalte unterschiedlich sind. Sie können diese später einfach löschen, wie im folgenden Beispiel gezeigt, oder die Funktion einfach auf eine einzelne Spalte in Ihrem Datenrahmen anwenden.

Beachten Sie, dass ich den Test anhand eines Schwellenwerts auskommentiert habe k. Ich bin mir nicht sicher, ob ich diesen Teil richtig verstanden habe. Sie können es einschließen, wenn der absolute Unterschied zwischen dem vorherigen und dem aktuellen Extrem größer sein muss als k:& (ext_val.diff().abs() > k)

Ich bin mir auch nicht sicher, ob der endgültige Zickzack immer von einem ursprünglichen Hoch zu einem Tief oder umgekehrt wechseln soll. Ich habe es angenommen, sonst können Sie die zweite Suche nach Extrem am Ende der Funktion entfernen.

def create_zigzag(col, p=0.2, k=1.2):

    # Find the local min/max
    # converting to bool converts NaN to True, which makes it include the endpoints    
    ext_loc = col.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)

    # extract values at local min/max
    ext_val = col[ext_loc]

    # filter locations based on threshold
    thres_ext_loc = (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)) #& (ext_val.diff().abs() > k)

    # Keep the endpoints
    thres_ext_loc.iloc[0] = True
    thres_ext_loc.iloc[-1] = True

    thres_ext_loc = thres_ext_loc[thres_ext_loc]

    # extract values at filtered locations 
    thres_ext_val = col.loc[thres_ext_loc.index]

    # again search the extrema to force the zigzag to always go from high > low or vice versa,
    # never low > low, or high > high
    ext_loc = thres_ext_val.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)
    thres_ext_val  =thres_ext_val[ext_loc]

    return thres_ext_val

Generieren Sie einige Beispieldaten:

date_rng = pd.date_range('2019-01-01', freq='s', periods=35)

df = pd.DataFrame(np.random.randn(len(date_rng), 3),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)

df = df.cumsum()

Wenden Sie die Funktion an und extrahieren Sie das Ergebnis für die Spalte 'data1':

dfzigzag = df.apply(create_zigzag)
data1_zigzag = dfzigzag['data1'].dropna()

Visualisieren Sie das Ergebnis:

fig, axs = plt.subplots(figsize=(10, 3))

axs.plot(df.data1, 'ko-', ms=4, label='original')
axs.plot(data1_zigzag, 'ro-', ms=4, label='zigzag')
axs.legend()

Geben Sie hier die Bildbeschreibung ein

Rutger Kassies
quelle
Danke für deine Antwort. Ich möchte nach dieser Linie fragen. Soweit (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p))ich weiß, vergleichen Sie den Abstand zwischen zwei Punkten mit p%dem letzten Punkt. Stimmt das? Weil ich jedes Zickzack-Segment mit dem vorherigen Segment vergleichen und wiederholen möchte, bis die Bedingung erfüllt ist.
Thanh Nguyen