Was ist der schnellste Weg, um 100.000 HTTP-Anfragen in Python zu senden?

286

Ich öffne eine Datei mit 100.000 URLs. Ich muss eine HTTP-Anfrage an jede URL senden und den Statuscode drucken. Ich verwende Python 2.6 und habe mir bisher die vielen verwirrenden Möglichkeiten angesehen, wie Python Threading / Parallelität implementiert. Ich habe mir sogar die Python- Concurrence- Bibliothek angesehen, kann aber nicht herausfinden, wie man dieses Programm richtig schreibt. Ist jemand auf ein ähnliches Problem gestoßen? Ich denke, im Allgemeinen muss ich wissen, wie man Tausende von Aufgaben in Python so schnell wie möglich ausführt - ich nehme an, das bedeutet "gleichzeitig".

IgorGanapolsky
quelle
47
Stellen Sie sicher, dass Sie nur eine HEAD-Anfrage ausführen (damit Sie nicht das gesamte Dokument herunterladen). Siehe: stackoverflow.com/questions/107405/…
Tarnay Kálmán
5
Hervorragender Punkt, Kalmi. Wenn Igor nur den Status der Anfrage wünscht, werden diese 100.000 Anfragen viel, viel, viel schneller gehen. Viel schneller.
Adam Crossland
1
Sie brauchen dafür keine Threads; Am effizientesten ist es wahrscheinlich, eine asynchrone Bibliothek wie Twisted zu verwenden.
Jemfinch
3
Hier sind gevent, verdrehte und asynchrone Codebeispiele (getestet auf 1000000 Anfragen)
jfs
4
@ TarnayKálmán Es ist möglich requests.getund requests.head(dh eine Seitenanfrage gegen eine Kopfanfrage ), verschiedene Statuscodes zurückzugeben, daher ist dies nicht der beste Rat
AlexG

Antworten:

201

Verdrehte Lösung:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Dieser ist etwas schneller als die verdrehte Lösung und verbraucht weniger CPU.

Tarnay Kálmán
quelle
10
@ Kalmi, warum stellst du Queue ein concurrent*2?
Marcel Wilson
8
Vergessen Sie nicht, die Verbindung zu schließenconn.close() . Wenn Sie zu viele http-Verbindungen öffnen, wird Ihr Skript möglicherweise irgendwann angehalten und Speicherplatz beansprucht.
Aamir Adnan
4
@hyh, das QueueModul wurde queuein Python 3 umbenannt . Dies ist Python 2-Code.
Tarnay Kálmán
3
Wie viel schneller können Sie gehen, wenn Sie jedes Mal mit dem gleichen Server sprechen möchten, indem Sie die Verbindung beibehalten? Kann dies sogar über Threads hinweg oder mit einer dauerhaften Verbindung pro Thread erfolgen?
mdurant
2
@mptevsion, wenn Sie CPython verwenden, können Sie (zum Beispiel) einfach "Druckstatus, URL" durch "my_global_list.append ((Status, URL))" ersetzen. (Die meisten Operationen an) Listen sind in CPython (und einigen anderen Python-Implementierungen) aufgrund der GIL implizit threadsicher, daher ist dies sicher.
Tarnay Kálmán
54

Eine Lösung mit asynchroner Tornado- Netzwerkbibliothek

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
mher
quelle
7
Dieser Code verwendet nicht blockierende Netzwerk-E / A und unterliegt keinen Einschränkungen. Es kann auf Zehntausende offener Verbindungen skaliert werden. Es wird in einem einzelnen Thread ausgeführt, ist jedoch viel schneller als jede Threading-Lösung. Kasse nicht blockierende E / A en.wikipedia.org/wiki/Asynchronous_I/O
mher
1
Können Sie erklären, was hier mit der globalen Variablen i passiert? Eine Art Fehlerprüfung?
LittleBobbyTables
4
Es ist ein Zähler, um zu bestimmen, wann der `` ioloop` verlassen werden soll - also wenn Sie fertig sind.
Michael Dorner
1
@ AndrewScottEvans es wurde angenommen, dass Sie Python 2.7 und Proxys verwenden
Dejell
5
@ Guy Avraham Viel Glück beim Erhalten von Hilfe bei Ihrem ddos-Plan.
Walter
50

Die Dinge haben sich seit 2010, als dies veröffentlicht wurde, ziemlich verändert und ich habe nicht alle anderen Antworten ausprobiert, aber ich habe einige ausprobiert, und ich fand, dass dies mit Python3.6 am besten für mich funktioniert.

Ich konnte ungefähr 150 eindeutige Domänen pro Sekunde abrufen, die unter AWS ausgeführt wurden.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
Glen Thompson
quelle
1
Ich frage nur, weil ich es nicht weiß, aber könnte dieses Futures-Zeug durch async / await ersetzt werden?
TankorSmash
1
Es könnte, aber ich habe festgestellt, dass das oben genannte besser funktioniert. Sie könnten aiohttp verwenden, aber es ist nicht Teil der Standardbibliothek und ändert sich ziemlich stark. Es funktioniert, aber ich habe es einfach nicht so gut gefunden. Ich bekomme höhere Fehlerraten, wenn ich es benutze, und für mein Leben kann ich es nicht so gut zum Laufen bringen wie gleichzeitige Futures, obwohl es theoretisch besser zu funktionieren scheint, siehe: stackoverflow.com/questions/45800857/… Wenn es gut funktioniert, poste bitte deine Antwort, damit ich es testen kann.
Glen Thompson
1
Dies ist ein Trottel, aber ich denke, es ist viel sauberer, time1 = time.time()oben in der for-Schleife und time2 = time.time()direkt nach der for-Schleife zu platzieren.
Matt M.
Ich habe dein Snippet getestet, irgendwie wird es zweimal ausgeführt. Mache ich etwas falsch? Oder soll es zweimal laufen? Wenn es der letztere Fall ist, können Sie mir auch helfen zu verstehen, wie es zweimal ausgelöst wird?
Ronnie
1
Es sollte nicht zweimal laufen. Ich bin mir nicht sicher, warum du das siehst.
Glen Thompson
40

Themen sind hier absolut nicht die Antwort. Sie bieten sowohl Prozess- als auch Kernel-Engpässe sowie Durchsatzbeschränkungen, die nicht akzeptabel sind, wenn das Gesamtziel "der schnellste Weg" ist.

Ein bisschen twistedund sein asynchroner HTTPClient würden Ihnen viel bessere Ergebnisse liefern.

Eisenfrosch
quelle
ironfroggy: Ich neige mich zu deinen Gefühlen. Ich habe versucht, meine Lösung mit Threads und Warteschlangen (für automatische Mutexe) zu implementieren. Können Sie sich vorstellen, wie lange es dauert, eine Warteschlange mit 100.000 Dingen zu füllen? Ich spiele immer noch mit verschiedenen Optionen und Vorschlägen von allen in diesem Thread herum, und vielleicht ist Twisted eine gute Lösung.
Igor Ganapolsky
2
Sie können vermeiden, eine Warteschlange mit 100.000 Dingen zu füllen. Verarbeiten Sie einfach Elemente einzeln aus Ihrer Eingabe und starten Sie dann einen Thread, um die Anforderung zu verarbeiten, die den einzelnen Elementen entspricht. (Wie unten beschrieben, verwenden Sie einen Launcher-Thread, um die HTTP-Anforderungsthreads zu starten, wenn Ihre Threadanzahl unter einem bestimmten Schwellenwert liegt. Lassen Sie die Threads die Ergebnisse in eine Diktatzuordnungs-URL zur Antwort schreiben oder Tupel an eine Liste anhängen.)
Erik Garrison
ironfroggy: Außerdem bin ich gespannt, welche Engpässe Sie bei der Verwendung von Python-Threads festgestellt haben. Und wie interagieren Python-Threads mit dem Betriebssystemkernel?
Erik Garrison
Stellen Sie sicher, dass Sie den Epollreaktor installieren. Andernfalls verwenden Sie select / poll und es ist sehr langsam. Wenn Sie tatsächlich versuchen möchten, 100.000 Verbindungen gleichzeitig zu öffnen (vorausgesetzt, Ihr Programm ist so geschrieben und die URLs befinden sich auf verschiedenen Servern), müssen Sie Ihr Betriebssystem so einstellen, dass es Ihnen nicht ausgeht von Dateideskriptoren, kurzlebigen Ports usw. (es ist wahrscheinlich einfacher, nur sicherzustellen, dass Sie nicht mehr als beispielsweise 10.000 ausstehende Verbindungen gleichzeitig haben).
Mark Nottingham
erikg: du hast eine großartige idee empfohlen. Das beste Ergebnis, das ich mit 200 Fäden erzielen konnte, war jedoch ca. 6 Minuten. Ich bin mir sicher, dass es Möglichkeiten gibt, dies in kürzerer Zeit zu erreichen ... Mark N: Wenn Twisted der Weg ist, für den ich mich entscheide, dann ist der Epollreaktor sicherlich nützlich. Wenn mein Skript jedoch von mehreren Computern ausgeführt wird, muss dann nicht Twisted auf JEDEM Computer installiert werden? Ich weiß nicht, ob ich meinen Chef davon überzeugen kann, diesen Weg zu gehen ...
IgorGanapolsky
20

Ich weiß, dass dies eine alte Frage ist, aber in Python 3.7 können Sie dies mit asynciound tun aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

Sie können mehr darüber lesen und hier ein Beispiel sehen .

Marius Stănescu
quelle
Ist dies ähnlich wie C # async / await und Kotlin Coroutines?
Igor Ganapolsky
@IgorGanapolsky, ja, es ist C # async / await sehr ähnlich. Ich kenne Kotlin Coroutines nicht.
Marius Stănescu
@sandyp, ich bin nicht sicher, ob es funktioniert, aber wenn Sie es versuchen möchten, müssen Sie den UnixConnector für aiohttp verwenden. Lesen Sie hier mehr: docs.aiohttp.org/en/stable/client_reference.html#connectors .
Marius Stănescu
Danke @ MariusStănescu. Genau das habe ich benutzt.
Sandyp
+1 für die Anzeige von asyncio.gather (* Aufgaben). Hier ist ein solcher Ausschnitt, den ich verwendet habe: urls= [fetch(construct_fetch_url(u),idx) for idx, u in enumerate(some_URI_list)] results = await asyncio.gather(*urls)
Ashwini Kumar
19

Verwenden Sie Grequests , es ist eine Kombination aus Anforderungen + Gevent-Modul.

Mit GRequests können Sie Requests with Gevent verwenden, um auf einfache Weise asynchrone HTTP-Requests zu erstellen.

Die Verwendung ist einfach:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Erstellen Sie eine Reihe nicht gesendeter Anfragen:

>>> rs = (grequests.get(u) for u in urls)

Senden Sie sie alle gleichzeitig:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Akshay Pratap Singh
quelle
7
gevent unterstützt jetzt Python 3
Benjamin Toueg
14
Grequests sind nicht Teil normaler Anfragen und scheinen weitgehend unbeaufsichtigt zu sein
Thom
8

Ein guter Ansatz zur Lösung dieses Problems besteht darin, zuerst den Code zu schreiben, der erforderlich ist, um ein Ergebnis zu erhalten, und dann Threading-Code zu integrieren, um die Anwendung zu parallelisieren.

In einer perfekten Welt würde dies einfach bedeuten, gleichzeitig 100.000 Threads zu starten, die ihre Ergebnisse zur späteren Verarbeitung in ein Wörterbuch oder eine Liste ausgeben. In der Praxis ist die Anzahl der parallelen HTTP-Anforderungen, die Sie auf diese Weise ausgeben können, jedoch begrenzt. Lokal haben Sie Grenzen, wie viele Sockets Sie gleichzeitig öffnen können und wie viele Ausführungsthreads Ihr Python-Interpreter zulässt. Aus der Ferne kann die Anzahl der gleichzeitigen Verbindungen begrenzt sein, wenn alle Anforderungen an einen oder mehrere Server gerichtet sind. Diese Einschränkungen erfordern wahrscheinlich, dass Sie das Skript so schreiben, dass jeweils nur ein kleiner Teil der URLs abgefragt wird (100, wie in einem anderen Poster erwähnt, sind wahrscheinlich eine anständige Größe des Thread-Pools, obwohl Sie dies möglicherweise feststellen kann erfolgreich viele weitere bereitstellen).

Sie können diesem Entwurfsmuster folgen, um das obige Problem zu beheben:

  1. Starten Sie einen Thread, der neue Anforderungsthreads startet, bis die Anzahl der aktuell ausgeführten Threads (Sie können sie über threading.active_count () oder durch Verschieben der Threadobjekte in eine Datenstruktur verfolgen)> = Ihre maximale Anzahl gleichzeitiger Anforderungen ist (z. B. 100). , schläft dann für eine kurze Auszeit. Dieser Thread sollte beendet werden, wenn keine URLs mehr verarbeitet werden müssen. Auf diese Weise wacht der Thread weiter auf, startet neue Threads und schläft, bis Sie fertig sind.
  2. Lassen Sie die Anforderungsthreads ihre Ergebnisse in einer Datenstruktur speichern, um sie später abzurufen und auszugeben. Wenn die Struktur, in der Sie die Ergebnisse speichern, a listoder dictin CPython ist, können Sie eindeutige Elemente ohne Sperren sicher an Ihre Threads anhängen oder einfügen. Wenn Sie jedoch in eine Datei schreiben oder eine komplexere threadübergreifende Dateninteraktion benötigen , sollten Sie a verwenden gegenseitige Ausschlusssperre zum Schutz dieses Staates vor Korruption .

Ich würde vorschlagen, dass Sie das Threading verwenden Modul verwenden. Sie können es verwenden, um laufende Threads zu starten und zu verfolgen. Die Threading-Unterstützung von Python ist kahl, aber die Beschreibung Ihres Problems legt nahe, dass sie für Ihre Anforderungen völlig ausreichend ist.

Wenn Sie würde schließlich wie eine ziemlich einfache Anwendung eines parallelen Netzwerk - Anwendung in Python geschrieben sehen, die ssh.py . Es ist eine kleine Bibliothek, die Python-Threading verwendet, um viele SSH-Verbindungen zu parallelisieren. Das Design ist nah genug an Ihren Anforderungen, sodass Sie es möglicherweise als gute Ressource ansehen.

Erik Garrison
quelle
1
erikg: wäre es vernünftig, eine Warteschlange in deine Gleichung zu werfen (für das Sperren des gegenseitigen Ausschlusses)? Ich vermute, dass Pythons GIL nicht darauf ausgerichtet ist, mit Tausenden von Threads zu spielen.
IgorGanapolsky
Warum benötigen Sie eine Sperre zum gegenseitigen Ausschluss, um die Erzeugung zu vieler Threads zu verhindern? Ich vermute, ich verstehe den Begriff falsch. Sie können laufende Threads in einer Thread-Warteschlange verfolgen, sie nach Abschluss entfernen und bis zu diesem Thread-Limit weitere hinzufügen. In einem einfachen Fall wie dem fraglichen können Sie jedoch auch nur die Anzahl der aktiven Threads im aktuellen Python-Prozess beobachten, warten, bis ein Schwellenwert unterschritten wird, und weitere Threads wie beschrieben bis zum Schwellenwert starten. Ich denke, Sie könnten dies als implizite Sperre betrachten, aber afaik sind keine expliziten Sperren erforderlich.
Erik Garrison
erikg: teilen sich nicht mehrere threads den status? Auf Seite 305 in O'Reillys Buch "Python für Unix- und Linux-Systemadministration" heißt es: "... Die Verwendung von Threading ohne Warteschlangen macht es komplexer, als viele Menschen realistisch handhaben können. Es ist eine viel bessere Idee, immer die Warteschlange zu verwenden Modul, wenn Sie feststellen, dass Sie Threads verwenden müssen. Warum? Weil das Warteschlangenmodul auch die Notwendigkeit verringert, Daten explizit mit Mutexen zu schützen, da die Warteschlange selbst bereits intern durch einen Mutex geschützt ist. " Auch hier begrüße ich Ihren Standpunkt dazu.
IgorGanapolsky
Igor: Sie haben absolut Recht, dass Sie ein Schloss verwenden sollten. Ich habe den Beitrag bearbeitet, um dies widerzuspiegeln. Die praktische Erfahrung mit Python legt jedoch nahe, dass Sie keine Datenstrukturen sperren müssen, die Sie atomar aus Ihren Threads ändern, z. B. durch list.append oder durch Hinzufügen eines Hash-Schlüssels. Ich glaube, der Grund ist die GIL, die Operationen wie list.append mit einem gewissen Grad an Atomizität bereitstellt. Ich führe derzeit einen Test durch, um dies zu überprüfen (verwenden Sie 10.000 Threads, um die Nummern 0-9999 an eine Liste anzuhängen, und überprüfen Sie, ob alle Anhänge funktioniert haben). Nach fast 100 Iterationen ist der Test nicht fehlgeschlagen.
Erik Garrison
Igor: Mir wurde eine weitere Frage zu diesem Thema gestellt: stackoverflow.com/questions/2740435/…
Erik Garrison
7

Wenn Sie die bestmögliche Leistung erzielen möchten, sollten Sie die Verwendung von asynchronen E / A anstelle von Threads in Betracht ziehen. Der mit Tausenden von Betriebssystem-Threads verbundene Overhead ist nicht trivial, und die Kontextumschaltung innerhalb des Python-Interpreters fügt noch mehr hinzu. Das Threading wird sicherlich die Arbeit erledigen, aber ich vermute, dass eine asynchrone Route eine bessere Gesamtleistung bietet.

Insbesondere würde ich den asynchronen Webclient in der Twisted-Bibliothek ( http://www.twistedmatrix.com ) vorschlagen . Es hat eine zugegebenermaßen steile Lernkurve, ist aber recht einfach zu bedienen, wenn Sie Twisted's asynchronen Programmierstil gut im Griff haben.

Eine Anleitung zur asynchronen Webclient-API von Twisted finden Sie unter:

http://twistedmatrix.com/documents/current/web/howto/client.html

Rakis
quelle
Rakis: Ich beschäftige mich derzeit mit asynchronen und nicht blockierenden E / A. Ich muss es besser lernen, bevor ich es implementiere. Ein Kommentar, den ich zu Ihrem Beitrag machen möchte, ist, dass es (zumindest unter meiner Linux-Distribution) unmöglich ist, "Tausende von Betriebssystem-Threads" zu erzeugen. Es gibt eine maximale Anzahl von Threads, die Sie mit Python erzeugen können, bevor das Programm unterbrochen wird. In meinem Fall (unter CentOS 5) beträgt die maximale Anzahl von Threads 303.
IgorGanapolsky
Das ist gut zu wissen. Ich habe noch nie versucht, mehr als eine Handvoll auf einmal in Python zu spawnen, aber ich hätte erwartet, dass ich mehr als das erstellen kann, bevor es bombardiert wird.
Rakis
6

Eine Lösung:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testzeit:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Tarnay Kálmán
quelle
6
Wenn Sie Twisted als Threadpool verwenden, werden die meisten Vorteile, die Sie daraus ziehen können, ignoriert. Sie sollten stattdessen den asynchronen HTTP-Client verwenden.
Jean-Paul Calderone
1

Die Verwendung eines Thread-Pools ist eine gute Option und macht dies ziemlich einfach. Leider verfügt Python nicht über eine Standardbibliothek, die Thread-Pools extrem einfach macht. Aber hier ist eine anständige Bibliothek, die Ihnen den Einstieg erleichtern soll: http://www.chrisarndt.de/projects/threadpool/

Codebeispiel von ihrer Site:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Hoffe das hilft.

Kevin Wiskia
quelle
Ich schlage vor, dass Sie q_size für ThreadPool wie folgt angeben: ThreadPool (poolsize, q_size = 1000) Damit Sie nicht 100000 WorkRequest-Objekte im Speicher haben. „Wenn q_size> 0 die Größe der Arbeitsanforderungswarteschlange ist begrenzt , und die Thread - Pool - Blöcke , wenn die Warteschlange voll ist und es versucht , in sie (siehe mehr Arbeitsanforderungen zu stellen putRequestMethode), es sei denn , Sie auch einen positiven verwenden timeoutWert putRequest.“
Tarnay Kálmán
Bisher versuche ich, die Threadpool-Lösung zu implementieren - wie vorgeschlagen. Ich verstehe jedoch die Parameterliste in der Funktion makeRequests nicht. Was ist ein Rückruf, eine Liste von Rückrufen? Vielleicht würde es helfen, wenn ich ein echtes Code-Snippet sehen würde. Ich bin überrascht, dass der Autor dieser Bibliothek KEINE Beispiele veröffentlicht hat.
IgorGanapolsky
some_callable ist Ihre Funktion, in der Ihre gesamte Arbeit erledigt wird (Verbindung zum http-Server herstellen). list_of_args sind Argumente, die an some_callabe übergeben werden. Rückruf ist eine Funktion, die aufgerufen wird, wenn der Arbeitsthread fertig ist. Es werden zwei Argumente benötigt, das Worker-Objekt (Sie müssen sich nicht wirklich damit befassen) und die Ergebnisse, die der Worker abgerufen hat.
Kevin Wiskia
1

Erstellen epoll Objekt,
öffnen viele Client - TCP - Sockets,
passen ihre Sendepuffer ein bisschen mehr als Request - Header zu sein,
einen Request - Header senden - es sollte sofort sein, nur in einen Puffer platzieren, registriert Buchse in dem epollObjekt,
tut .pollauf epollObect,
zuerst lesen 3 Bytes von jedem Socket aus .poll,
schreiben Sie sie sys.stdoutgefolgt von \n(nicht leeren), schließen Sie den Client-Socket.

Begrenzen Sie die Anzahl der gleichzeitig geöffneten Sockets. Behandeln Sie Fehler beim Erstellen von Sockets. Erstellen Sie einen neuen Socket nur, wenn ein anderer geschlossen ist.
Passen Sie die Betriebssystemgrenzen an.
Versuchen Sie, einige (nicht viele) Prozesse zu bearbeiten: Dies kann dazu beitragen, die CPU ein wenig effektiver zu nutzen.

George Sovetov
quelle
@IgorGanapolsky Muss sein. Ich wäre sonst überrascht. Aber es braucht sicherlich Experimente.
George Sovetov
0

In Ihrem Fall reicht das Threading wahrscheinlich aus, da Sie wahrscheinlich die meiste Zeit damit verbringen, auf eine Antwort zu warten. Es gibt hilfreiche Module wie Queue in der Standardbibliothek, die möglicherweise helfen.

Ähnliches habe ich beim parallelen Herunterladen von Dateien gemacht, und es war gut genug für mich, aber es war nicht in der Größenordnung, von der Sie sprechen.

Wenn Ihre Aufgabe stärker an die CPU gebunden war, sollten Sie sich das Multiprozessor- Modul ansehen, mit dem Sie mehr CPUs / Kerne / Threads verwenden können (mehr Prozesse, die sich nicht gegenseitig blockieren, da die Sperrung pro Prozess erfolgt).

Mattias Nilsson
quelle
Das einzige, was ich erwähnen möchte, ist, dass das Laichen mehrerer Prozesse teurer sein kann als das Laichen mehrerer Threads. Es gibt auch keinen eindeutigen Leistungsgewinn beim Senden von 100.000 HTTP-Anforderungen mit mehreren Prozessen im Vergleich zu mehreren Threads.
IgorGanapolsky
0

Erwägen Sie die Verwendung von Windmill , obwohl Windmill wahrscheinlich nicht so viele Threads ausführen kann.

Sie können dies mit einem handgerollten Python-Skript auf 5 Computern tun, von denen jeder über die Ports 40000-60000 eine ausgehende Verbindung herstellt und 100.000 Portverbindungen öffnet.

Es kann auch hilfreich sein, einen Beispieltest mit einer QA-App wie OpenSTA durchzuführen, um eine Vorstellung davon zu bekommen, wie viel jeder Server verarbeiten kann.

Versuchen Sie auch, nur einfaches Perl mit der LWP :: ConnCache-Klasse zu verwenden. Auf diese Weise erhalten Sie wahrscheinlich mehr Leistung (mehr Verbindungen).

Djangofan
quelle
0

Dieser verdrehte asynchrone Webclient geht ziemlich schnell.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)
Robᵩ
quelle
0

Ich fand, dass die Verwendung des tornadoPakets der schnellste und einfachste Weg ist, dies zu erreichen:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
RDRR
quelle
-2

Am einfachsten wäre es, die in Python integrierte Threading-Bibliothek zu verwenden. Sie sind keine "echten" / Kernel-Threads. Sie haben Probleme (wie die Serialisierung), sind aber gut genug. Sie möchten einen Warteschlangen- und Thread-Pool. Eine Option ist hier , aber es ist trivial, eigene zu schreiben. Sie können nicht alle 100.000 Anrufe parallelisieren, aber Sie können 100 (oder so) gleichzeitig abfeuern.

Pest669
quelle
7
Pythons Threads sind ziemlich real, im Gegensatz zu Rubys zum Beispiel. Unter der Haube werden sie als native Betriebssystem-Threads implementiert, zumindest unter Unix / Linux und Windows. Vielleicht beziehen Sie sich auf die GIL, aber es macht die Threads nicht weniger real ...
Eli Bendersky
2
Eli hat Recht mit Pythons Threads, aber Pestilence's Argument, dass Sie einen Thread-Pool verwenden möchten, ist auch richtig. Das letzte, was Sie in diesem Fall tun möchten, ist zu versuchen, einen separaten Thread für jede der 100K-Anforderungen gleichzeitig zu starten.
Adam Crossland
1
Igor, Sie können Code-Schnipsel nicht sinnvoll in Kommentaren posten, aber Sie können Ihre Frage bearbeiten und dort hinzufügen.
Adam Crossland
Pestilenz: Wie viele Warteschlangen und Threads pro Warteschlange würden Sie für meine Lösung empfehlen?
IgorGanapolsky
Außerdem ist dies eine E / A-gebundene Aufgabe, die nicht an die CPU gebunden ist. Die GIL wirkt sich hauptsächlich auf CPU-gebundene Aufgaben aus
PirateApp