Was ist der beste Weg, um eine Funktion alle x Sekunden wiederholt auszuführen?

281

Ich möchte eine Funktion in Python alle 60 Sekunden für immer wiederholt ausführen (genau wie ein NSTimer in Ziel C). Dieser Code wird als Daemon ausgeführt und entspricht praktisch dem Aufrufen des Python-Skripts jede Minute mit einem Cron, ohne dass dies vom Benutzer eingerichtet werden muss.

In dieser Frage zu einem in Python implementierten Cron scheint die Lösung effektiv nur x Sekunden lang zu schlafen () . Ich brauche keine so erweiterte Funktionalität, also würde so etwas vielleicht funktionieren

while True:
    # Code executed here
    time.sleep(60)

Gibt es vorhersehbare Probleme mit diesem Code?

DavidM
quelle
83
Ein pedantischer Punkt, der jedoch kritisch sein kann: Ihr Code über dem Code wird nicht alle 60 Sekunden ausgeführt, sondern es besteht eine Lücke von 60 Sekunden zwischen den Ausführungen. Dies geschieht nur alle 60 Sekunden, wenn Ihr ausgeführter Code überhaupt keine Zeit benötigt.
Simon
4
auch time.sleep(60)zurückgeben kann sowohl früher und später
JFS
5
Ich frage mich immer noch: Gibt es vorhersehbare Probleme mit diesem Code?
Banane
1
Das "vorhersehbare Problem" ist, dass Sie nicht 60 Iterationen pro Stunde erwarten können, wenn Sie nur time.sleep (60) verwenden. Wenn Sie also ein Element pro Iteration anhängen und eine Liste mit festgelegter Länge führen, repräsentiert der Durchschnitt dieser Liste keinen konsistenten "Zeitraum". Funktionen wie "gleitender Durchschnitt" können also auf zu alte Datenpunkte verweisen, was Ihre Anzeige verzerrt.
Litepresence
2
@Banana Ja, Sie können mit Problemen rechnen, die dadurch verursacht werden, dass Ihr Skript nicht genau alle 60 Sekunden ausgeführt wird. Zum Beispiel. Ich habe so etwas gemacht, um Videostreams zu teilen und sie hochzuladen, und am Ende bekam ich Strems 5-10 ~ Sekunden länger, weil die Medienwarteschlange gepuffert wird, während ich Daten innerhalb der Schleife verarbeite. Das hängt von Ihren Daten ab. Wenn es sich bei der Funktion um eine Art einfachen Watchdog handelt, der Sie beispielsweise warnt, wenn Ihre Festplatte voll ist, sollten Sie damit überhaupt keine Probleme haben. Wenn Sie Warnmeldungen zu Kernkraftwerken überprüfen, erhalten Sie möglicherweise eine Stadt komplett in die Luft gesprengt x
DGoiko

Antworten:

227

Wenn Ihr Programm noch keine Ereignisschleife hat, verwenden Sie das Schedul- Modul, das einen Allzweck-Event-Scheduler implementiert.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Wenn Sie bereits eine Event Loop - Bibliothek wie mit asyncio, trio, tkinter, PyQt5, gobject, kivy, und viele andere - planen nur die Aufgabe , Ihre vorhandenen Ereignisschleife Bibliothek Methoden verwenden, statt.

nosklo
quelle
16
Das Schedule-Modul dient zum Ausführen von Scheduling-Funktionen nach einiger Zeit. Wie können Sie damit einen Funktionsaufruf alle x Sekunden wiederholen, ohne time.sleep () zu verwenden?
Baishampayan Ghose
2
@ Baishampayan: Planen Sie einfach einen neuen Lauf.
Nosklo
3
Dann sollte auch apscheduler unter packages.python.org/APScheduler an dieser Stelle erwähnt werden.
Daniel F
6
Hinweis: Diese Version kann driften. Sie könnten es verwenden enterabs(), um es zu vermeiden. Hier ist eine nicht driftende Version zum Vergleich .
JFS
8
@JavaSa: weil "mach deine Sachen" nicht sofort ist und time.sleepsich hier Fehler von ansammeln können. "Alle X Sekunden ausführen" und "Immer wieder mit einer Verzögerung von ~ X Sekunden ausführen" sind nicht dasselbe. Siehe auch diesen Kommentar
jfs
180

Sperren Sie einfach Ihre Zeitschleife auf die Systemuhr. Einfach.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Dave Rove
quelle
22
+1. Ihre und die twistedAntwort sind die einzigen Antworten, die alle xSekunden eine Funktion ausführen . Der Rest führt die Funktion mit einer Verzögerung von xSekunden nach jedem Aufruf aus.
JFS
13
Wenn Sie einen Code hinzufügen möchten, der länger als eine Sekunde dauert ... würde dies das Timing verkürzen und ins Hintertreffen geraten. Die akzeptierte Antwort ist in diesem Fall richtig ... Jeder kann einen einfachen Druckbefehl wiederholen und lassen Sie es jede Sekunde ohne Verzögerung laufen ...
Angry 84
5
Ich bevorzuge from time import time, sleepwegen der existenziellen Implikationen;)
Will
14
Funktioniert fantastisch. Es ist nicht erforderlich, Ihre zu subtrahieren, starttimewenn Sie mit der Synchronisierung zu einer bestimmten Zeit beginnen: time.sleep(60 - time.time() % 60)hat für mich gut funktioniert. Ich habe es als verwendet time.sleep(1200 - time.time() % 1200)und es gibt mir Protokolle auf dem :00 :20 :40, genau wie ich wollte.
TemporalWolf
2
@AntonSchigur, um Drift nach mehreren Iterationen zu vermeiden. Eine individuelle Iteration kann etwas früher oder später beginnen , je nach sleep(), timer()Präzision und wie lange es dauert , die Schleife zu überführen , aber im Durchschnitt Iterationen immer dann auftreten , auf den Intervallgrenzen (auch wenn einige übersprungen): while keep_doing_it(): sleep(interval - timer() % interval). Vergleichen Sie es damit, while keep_doing_it(): sleep(interval)wo sich nach mehreren Iterationen Fehler ansammeln können.
JFS
71

Vielleicht möchten Sie Twisted in Betracht ziehen , eine Python-Netzwerkbibliothek, die das Reaktormuster implementiert .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Während "while True: sleep (60)" wahrscheinlich funktioniert, implementiert Twisted wahrscheinlich bereits viele der Funktionen, die Sie möglicherweise benötigen (Dämonisierung, Protokollierung oder Ausnahmebehandlung, wie von Bobince hervorgehoben), und ist wahrscheinlich eine robustere Lösung

Aaron Maenpaa
quelle
Tolle Antwort, sehr genau ohne Drift. Ich frage mich, ob dadurch die CPU auch in den Ruhezustand versetzt wird, während auf die Ausführung der Aufgabe gewartet wird (auch bekannt als nicht beschäftigt).
Smoothware
1
Dies driftet auf der Millisekunden-Ebene
Derek Eden
Was bedeutet "Abweichungen im Millisekundenbereich"?
Jean-Paul Calderone
67

Wenn Sie möchten, dass Ihre Funktion nicht blockierend ausgeführt wird, würde ich anstelle einer blockierenden Endlosschleife einen Thread-Timer verwenden. Auf diese Weise kann Ihr Code weiter ausgeführt werden und andere Aufgaben ausführen, und Ihre Funktion wird weiterhin alle n Sekunden aufgerufen. Ich verwende diese Technik häufig zum Drucken von Fortschrittsinformationen bei langen, CPU / Disk / Netzwerk-intensiven Aufgaben.

Hier ist der Code, den ich in einer ähnlichen Frage mit der Steuerung start () und stop () gepostet habe:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Verwendungszweck:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Eigenschaften:

  • Nur Standardbibliothek, keine externen Abhängigkeiten
  • start()und können stop()sicher mehrmals angerufen werden, auch wenn der Timer bereits gestartet / gestoppt wurde
  • Die aufzurufende Funktion kann positionelle und benannte Argumente haben
  • Sie können intervaljederzeit ändern , es wird nach dem nächsten Lauf wirksam. Das Gleiche gilt für args, kwargsund sogar function!
MestreLion
quelle
Diese Lösung scheint im Laufe der Zeit zu driften; Ich brauchte eine Version, die die Funktion alle n Sekunden ohne Drift aufrufen soll. Ich werde ein Update in einer separaten Frage veröffentlichen.
Ära
In def _run(self)Ich versuche , meinen Kopf herum zu wickeln , warum Sie rufen self.start()vor self.function(). Können Sie das näher erläutern? Ich würde denken, wenn wir start()zuerst anrufen, self.is_runningwäre das immer Falseso, dann würden wir immer einen neuen Thread aufbauen.
Rich Episcopo
1
Ich glaube, ich bin dem auf den Grund gegangen. Die Lösung von @ MestreLion führt alle xSekunden eine Funktion aus (dh t = 0, t = 1x, t = 2x, t = 3x, ...), wobei auf den Originalplakaten im Beispielcode eine Funktion mit einem Intervall von x Sekunden dazwischen ausgeführt wird. Außerdem hat diese Lösung meines Erachtens einen Fehler, wenn sie intervalkürzer als die functionAusführungszeit ist. In diesem Fall self._timerwird in der startFunktion überschrieben .
Rich Episcopo
Ja, @RichieEpiscopo, der Aufruf von .function()after .start()besteht darin, die Funktion bei t = 0 auszuführen. Und ich denke nicht, dass es ein Problem sein wird, wenn es functionlänger dauert als interval, aber ja, es könnte einige Rennbedingungen im Code geben.
MestreLion
Dies ist der einzige nicht blockierende Weg, den ich bekommen könnte. Vielen Dank.
BackslashN
35

Der einfachere Weg, den ich glaube:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Auf diese Weise wird Ihr Code ausgeführt, dann wartet er 60 Sekunden, dann wird er erneut ausgeführt, wartet, wird ausgeführt usw. Keine Notwendigkeit, Dinge zu komplizieren: D.

Itxaka
quelle
Das Schlüsselwort True sollte in Großbuchstaben geschrieben werden
Sean Cain
38
Eigentlich ist das nicht die Antwort: time sleep () kann nur verwendet werden, um nach jeder Ausführung X Sekunden zu warten. Wenn die Ausführung Ihrer Funktion beispielsweise 0,5 Sekunden dauert und Sie time.sleep (1) verwenden, bedeutet dies, dass Ihre Funktion alle 1,5 Sekunden und nicht 1 ausgeführt wird. Sie sollten andere Module und / oder Threads verwenden, um sicherzustellen, dass etwas Y-mal funktioniert in jeder X Sekunde.
kommradHomer
1
@kommradHomer: Dave Rove Antwort zeigt , dass Sie kann verwendet werden time.sleep()alle X Sekunden laufen etwas
JFS
2
Meiner Meinung nach sollte der Code time.sleep()in einer while TrueSchleife aufgerufen werden wie:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
Leonard Lepadatu
22
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Wenn Sie dies tun möchten, ohne den verbleibenden Code zu blockieren, können Sie ihn verwenden, um ihn in einem eigenen Thread ausführen zu lassen:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Diese Lösung kombiniert mehrere Funktionen, die in den anderen Lösungen selten zu finden sind:

  • Ausnahmebehandlung: Soweit auf dieser Ebene möglich, werden Ausnahmen ordnungsgemäß behandelt, dh zu Debugging-Zwecken protokolliert, ohne unser Programm abzubrechen.
  • Keine Verkettung: Die übliche kettenartige Implementierung (zum Planen des nächsten Ereignisses), die Sie in vielen Antworten finden, ist insofern spröde threading.Timer, als die Kette beendet wird , wenn innerhalb des Planungsmechanismus ( oder was auch immer) etwas schief geht . Es werden dann keine weiteren Ausführungen stattfinden, selbst wenn der Grund des Problems bereits behoben ist. Eine einfache Schleife und Warten mit einer einfachensleep() ist im Vergleich viel robuster.
  • Keine Abweichung: Meine Lösung verfolgt genau die Zeiten, zu denen sie ausgeführt werden soll. Abhängig von der Ausführungszeit gibt es keine Drift (wie bei vielen anderen Lösungen).
  • Überspringen : Meine Lösung überspringt Aufgaben, wenn eine Ausführung zu lange gedauert hat (z. B. X alle fünf Sekunden ausführen, X jedoch 6 Sekunden). Dies ist das Standard-Cron-Verhalten (und das aus gutem Grund). Viele andere Lösungen führen die Aufgabe dann einfach mehrere Male hintereinander ohne Verzögerung aus. In den meisten Fällen (z. B. Bereinigungsaufgaben) ist dies nicht erwünscht. Wenn es wird gewünscht, verwenden Sie einfach next_time += delaystatt.
Alfe
quelle
2
beste Antwort für nicht treiben.
Sebastian Stark
1
@PirateApp Ich würde dies in einem anderen Thread tun. Sie könnten es im selben Thread tun, aber dann programmieren Sie Ihr eigenes Planungssystem, das für einen Kommentar viel zu komplex ist.
Alfe
1
In Python ist der Zugriff auf Variablen in zwei Threads dank der GIL absolut sicher. Und das bloße Lesen in zwei Threads sollte niemals ein Problem sein (auch nicht in anderen Thread-Umgebungen). Nur das Schreiben von zwei verschiedenen Threads in einem System ohne GIL (z. B. in Java, C ++ usw.) erfordert eine explizite Synchronisierung.
Alfe
1
@ user50473 Ohne weitere Informationen würde ich mich der Aufgabe zunächst von der Thread-Seite nähern. Ein Thread liest die Daten ab und zu und schläft dann, bis es wieder Zeit ist, dies zu tun. Die obige Lösung könnte natürlich dazu verwendet werden. Aber ich könnte mir eine Reihe von Gründen vorstellen, einen anderen Weg zu gehen. Also viel Glück :)
Alfe
1
Der Ruhezustand kann durch Threading ersetzt werden. Warten Sie mit Timeout, um beim Beenden der Anwendung schneller zu reagieren. stackoverflow.com/questions/29082268/…
themadmax
20

Hier ist ein Update des Codes von MestreLion, das ein Drifitieren im Laufe der Zeit vermeidet.

Die RepeatedTimer-Klasse ruft hier die angegebene Funktion alle "Intervall" Sekunden auf, wie vom OP angefordert; Der Zeitplan hängt nicht davon ab, wie lange die Ausführung der Funktion dauert. Ich mag diese Lösung, da sie keine externen Bibliotheksabhängigkeiten hat. Das ist nur reine Python.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Beispielnutzung (kopiert aus der Antwort von MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
Äraoul
quelle
5

Ich hatte vor einiger Zeit ein ähnliches Problem. Kann sein http://cronus.readthedocs.org helfen könnten?

Für v0.2 funktioniert das folgende Snippet

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Anay
quelle
4

Der Hauptunterschied zwischen diesem und cron besteht darin, dass eine Ausnahme den Dämon endgültig tötet. Möglicherweise möchten Sie mit einem Ausnahmefänger und einem Logger umbrechen.

Bobince
quelle
4

Eine mögliche Antwort:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
sks
quelle
1
Dies ist beschäftigt zu warten und daher sehr schlecht.
Alfe
Gute Lösung für jemanden, der einen nicht blockierenden Timer sucht.
Noel
3

Am Ende habe ich das Zeitplanmodul verwendet . Die API ist nett.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Statistiken anhand eines Beispiels lernen
quelle
Es fällt mir schwer, dieses Modul zu verwenden. Ich muss den Hauptthread entsperren. Ich habe die FAQ auf der Dokumentationswebsite des Zeitplans überprüft, aber die bereitgestellte Problemumgehung nicht wirklich verstanden. Weiß jemand, wo ich ein funktionierendes Beispiel finden kann, das den Hauptthread nicht blockiert?
5Tagträume
1

Ich verwende die Tkinter after () -Methode, die das Spiel nicht "stiehlt" (wie das zuvor vorgestellte Sched- Modul), dh andere Dinge können parallel ausgeführt werden:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()und do_something2()kann parallel und in jeder Intervallgeschwindigkeit laufen. Hier wird der zweite doppelt so schnell ausgeführt. Beachten Sie auch, dass ich einen einfachen Zähler als Bedingung verwendet habe, um eine der beiden Funktionen zu beenden. Sie können jede andere oder keine beliebige Bedingung verwenden, wenn Sie eine Funktion ausführen möchten, bis das Programm endet (z. B. eine Uhr).

Apostolos
quelle
Seien Sie vorsichtig mit Ihrem Wortlaut: afterErlaubt nicht, dass Dinge parallel laufen. Tkinter ist Single-Threaded und kann jeweils nur eine Aufgabe ausführen. Wenn etwas ausgeführt wird, das von geplant afterist, wird es nicht parallel zum Rest des Codes ausgeführt. Wenn beide do_something1und do_something2gleichzeitig ausgeführt werden sollen, werden sie nacheinander und nicht parallel ausgeführt.
Bryan Oakley
@Apostolos alle Ihre Lösung tut , ist das verwenden tkinter mainloop statt sched mainloop, so dass es genau in der gleichen Art und Weise funktioniert , aber erlaubt tkinter Schnittstellen reagiert fortzusetzen. Wenn Sie tkinter nicht für andere Zwecke verwenden, ändert dies nichts an der Sched-Lösung. Sie können zwei oder mehr geplante Funktionen mit unterschiedlichen Intervallen in der schedLösung verwenden und sie funktioniert genauso wie Ihre.
Nosklo
Nein, es funktioniert nicht auf die gleiche Weise. Ich habe das erklärt. Der eine "sperrt" das Programm (dh er stoppt den Fluss, Sie können nichts anderes tun - nicht einmal eine andere geplante Arbeit starten, wie Sie vorschlagen), bis es beendet ist, und der andere lässt Ihre Hände frei (dh Sie können es tun) andere Dinge, nachdem es begonnen hat. Sie müssen nicht warten, bis es fertig ist. Dies ist ein großer Unterschied. Wenn Sie die von mir vorgestellte Methode ausprobiert hätten, hätten Sie es selbst gesehen. Ich habe Ihre ausprobiert. Warum nicht? versuchen Sie es auch mit mir?
Apostolos
1

Hier ist eine angepasste Version des Codes von MestreLion. Zusätzlich zur ursprünglichen Funktion enthält dieser Code:

1) Fügen Sie das erste Intervall hinzu, mit dem der Timer zu einem bestimmten Zeitpunkt ausgelöst wird (der Anrufer muss das erste Intervall berechnen und übergeben).

2) Löse eine Rennbedingung im Originalcode. Wenn der ursprüngliche Thread im ursprünglichen Code den laufenden Timer nicht abbrechen konnte ("Stoppen Sie den Timer und brechen Sie die Ausführung der Timer-Aktion ab. Dies funktioniert nur, wenn sich der Timer noch in der Wartephase befindet.", Zitiert aus https: // docs.python.org/2/library/threading.html ) läuft der Timer endlos.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
dproc
quelle
1

Dies scheint viel einfacher zu sein als eine akzeptierte Lösung - hat sie Mängel, die ich nicht in Betracht ziehe? Kam hierher auf der Suche nach einer absolut einfachen Kopie Pasta und war enttäuscht.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Welche asynchron ausgegeben.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

Es hat eine Drift in dem Sinne, dass, wenn die ausgeführte Aufgabe eine beträchtliche Zeit in Anspruch nimmt, das Intervall 2 Sekunden + Aufgabenzeit beträgt. Wenn Sie also eine genaue Planung benötigen, ist dies nichts für Sie.

Beachten Sie, dass das daemon=TrueFlag bedeutet, dass dieser Thread das Herunterfahren der App nicht blockiert. Hatte zum Beispiel ein Problem, wo pytestes unbegrenzt hängen würde, nachdem Tests ausgeführt wurden, die darauf warteten, dass dieser Kopf aufhörte.

Adam Hughes
quelle
Nein, es druckt nur die erste Datums- und Uhrzeitangabe und stoppt dann ...
Alex Poca
Sind Sie sicher - ich habe nur kopiert und in Terminal eingefügt. Es kehrt sofort zurück, aber der Ausdruck wird für mich im Hintergrund fortgesetzt.
Adam Hughes
Es sieht so aus, als würde mir hier etwas fehlen. Ich habe den Code in test.py kopiert / eingefügt und mit python test.py ausgeführt . Mit Python2.7 muss ich daemon = True entfernen , das nicht erkannt wird, und ich lese mehrere Ausdrucke. Mit Python3.8 stoppt es nach dem ersten Druck und nach seinem Ende ist kein Prozess mehr aktiv. Daemon entfernen = True Ich habe mehrere Ausdrucke gelesen ...
Alex Poca vor
hmm seltsam - ich bin auf Python 3.6.10, weiß aber nicht warum das wichtig ist
Adam Hughes vor
Nochmals: Python3.4.2 (Debian GNU / Linux 8 (jessie)) musste daemon = True entfernen, damit es mehrfach gedruckt werden kann. Mit Daemon erhalte ich einen Syntaxfehler. Die vorherigen Tests mit Python2.7 und 3.8 waren unter Ubuntu 19.10. Könnte es sein, dass der Daemon je nach Betriebssystem unterschiedlich behandelt wird?
Alex Poca
0

Ich benutze dies, um 60 Ereignisse pro Stunde zu verursachen, wobei die meisten Ereignisse nach der ganzen Minute in der gleichen Anzahl von Sekunden auftreten:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

Abhängig von den tatsächlichen Bedingungen können Zecken mit einer Länge von:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

Aber nach 60 Minuten haben Sie 60 Zecken. und die meisten von ihnen treten mit dem richtigen Versatz zu der Minute auf, die Sie bevorzugen.

Auf meinem System erhalte ich eine typische Drift von <1/20 Sekunde, bis Korrekturbedarf besteht.

Der Vorteil dieser Methode ist die Auflösung der Taktdrift; Dies kann zu Problemen führen, wenn Sie beispielsweise einen Artikel pro Tick anhängen und 60 Artikel pro Stunde anhängen. Wenn die Drift nicht berücksichtigt wird, können sekundäre Anzeigen wie gleitende Durchschnitte dazu führen, dass Daten zu tief in der Vergangenheit liegen, was zu einer fehlerhaften Ausgabe führt.

Präsenz
quelle
0

Beispiel: Aktuelle Ortszeit anzeigen

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
Rise.riyo
quelle
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
RaviGupta
quelle
Auf der Grundlage von Benutzereingaben wird diese Methode in jedem Zeitintervall wiederholt.
RaviGupta
0

Hier ist eine andere Lösung ohne zusätzliche Bibliotheken.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
Dat Nguyen
quelle