Nachdem ich vor Jahren einige (mehr oder weniger) socket
asynchrone "Low-Level" -Programmierungen durchgeführt hatte (in Form eines ereignisbasierten asynchronen Musters (EAP) ) und kürzlich zu einem TcpListener
(asynchronen Programmiermodell (APM) ) aufgestiegen war Beim Versuch, auf async/await
(Task-based Asynchronous Pattern (TAP) ) umzusteigen, hatte ich es ziemlich schwer, mich immer wieder mit all diesen „Low-Level-Klempnern“ zu beschäftigen. Also habe ich nachgedacht; warum nicht geben RX
ein Go ( Reactive Extensions ) , da es könnte mehr eng an mein Problem Domäne passen.
Eine Menge Code, den ich schreibe, muss mit vielen Clients zusammenarbeiten, die sich über TCP mit meiner Anwendung verbinden und dann eine bidirektionale (asynchrone) Kommunikation starten. Der Client oder Server kann jederzeit entscheiden, ob eine Nachricht gesendet werden muss, und dies ist also nicht Ihr klassisches request/response
Setup, sondern eher eine bidirektionale Echtzeit-Leitung, die beiden Parteien offensteht, um zu senden, was sie wollen , wann immer sie wollen. (Wenn jemand einen anständigen Namen hat, der dies beschreibt, würde ich mich freuen, ihn zu hören!).
Das "Protokoll" unterscheidet sich je nach Anwendung (und ist für meine Frage nicht wirklich relevant). Ich habe jedoch eine erste Frage:
- Unter der Voraussetzung, dass nur ein "Server" ausgeführt wird, der jedoch viele (normalerweise Tausende) Verbindungen (z. B. Clients) überwachen muss, von denen jeder (mangels besserer Beschreibung) eine eigene "Zustandsmaschine" besitzt, um die internen Verbindungen zu überwachen Staaten usw., welchen Ansatz würden Sie bevorzugen? EAP / TAP / APM? Wäre RX überhaupt eine Option? Wenn nein, warum?
Ich muss also mit Async arbeiten, da a) es kein Anforderungs- / Antwortprotokoll ist und ich keinen Thread / Client in einem "Warten auf Nachricht" -Blockierungs- oder "Senden von Nachrichten" -Blockierungsaufruf haben kann (wenn dies jedoch der Fall ist) Blockierung nur für diesen Client Ich könnte damit leben) und b) Ich muss mit vielen gleichzeitigen Verbindungen umgehen. Ich sehe keine Möglichkeit, dies (zuverlässig) mit blockierenden Anrufen zu tun.
Die meisten meiner Anwendungen beziehen sich auf VoiP. Sei es SIP-Nachrichten von SIP-Clients oder PBX-Nachrichten (in Verbindung mit Anwendungen wie FreeSwitch / OpenSIPS usw.), aber Sie können sich in der einfachsten Form einen "Chat" -Server vorstellen, der versucht, mit vielen "Chat" -Clients umzugehen . Die meisten Protokolle basieren auf Text (ASCII).
Nachdem ich viele verschiedene Permutationen der oben genannten Techniken implementiert habe, möchte ich meine Arbeit vereinfachen, indem ich ein Objekt erstelle, das ich einfach instanziieren kann, dem ich IPEndpoint
zuhören kann und das mir sagt, wann immer etwas von Interesse ist (was ich normalerweise tue) verwenden Sie Ereignisse für, so dass einige EAP normalerweise mit den anderen beiden Techniken gemischt werden). Die Klasse sollte sich nicht die Mühe machen, das Protokoll zu verstehen. Es sollte lediglich eingehende / ausgehende Zeichenfolgen verarbeiten. Und so habe ich mein Auge auf RX gerichtet und gehofft, dass dies (am Ende) die Arbeit vereinfachen würde. Ich habe eine neue "Geige" von Grund auf erstellt:
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;
class Program
{
static void Main(string[] args)
{
var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
f.Start();
Console.ReadKey();
f.Stop();
Console.ReadKey();
}
}
public class FiddleServer
{
private TcpListener _listener;
private ConcurrentDictionary<ulong, FiddleClient> _clients;
private ulong _currentid = 0;
public IPEndPoint LocalEP { get; private set; }
public FiddleServer(IPEndPoint localEP)
{
this.LocalEP = localEP;
_clients = new ConcurrentDictionary<ulong, FiddleClient>();
}
public void Start()
{
_listener = new TcpListener(this.LocalEP);
_listener.Start();
Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
//OnNext
tcpclient =>
{
//Create new FSClient with unique ID
var fsclient = new FiddleClient(_currentid++, tcpclient);
//Keep track of clients
_clients.TryAdd(fsclient.ClientId, fsclient);
//Initialize connection
fsclient.Send("connect\n\n");
Console.WriteLine("Client {0} accepted", fsclient.ClientId);
},
//OnError
ex =>
{
},
//OnComplete
() =>
{
Console.WriteLine("Client connection initialized");
//Accept new connections
_listener.AcceptTcpClientAsync();
}
);
Console.WriteLine("Started");
}
public void Stop()
{
_listener.Stop();
Console.WriteLine("Stopped");
}
public void Send(ulong clientid, string rawmessage)
{
FiddleClient fsclient;
if (_clients.TryGetValue(clientid, out fsclient))
{
fsclient.Send(rawmessage);
}
}
}
public class FiddleClient
{
private TcpClient _tcpclient;
public ulong ClientId { get; private set; }
public FiddleClient(ulong id, TcpClient tcpclient)
{
this.ClientId = id;
_tcpclient = tcpclient;
}
public void Send(string rawmessage)
{
Console.WriteLine("Sending {0}", rawmessage);
var data = Encoding.ASCII.GetBytes(rawmessage);
_tcpclient.GetStream().WriteAsync(data, 0, data.Length); //Write vs WriteAsync?
}
}
Mir ist bewusst, dass es in dieser "Geige" ein kleines implementierungsspezifisches Detail gibt; In diesem Fall arbeite ich mit FreeSwitch ESL, sodass das Problem bei der Umgestaltung"connect\n\n"
auf einen allgemeineren Ansatz beseitigt werden sollte.
Mir ist auch bewusst, dass ich die anonymen Methoden in private Instanzmethoden in der Serverklasse umgestalten muss. Ich bin mir nur nicht sicher, welche Konvention (zB " OnSomething
") für ihre Methodennamen verwendet werden soll.
Dies ist meine Basis / Ausgangspunkt / Grundlage (die etwas "Feineinstellung" benötigt). Ich habe einige Fragen dazu:
- Siehe obige Frage "1"
- Bin ich auf dem richtigen Weg? Oder sind meine "Design" -Entscheidungen ungerecht?
- Concurrency-weise: würde dies mit Tausenden von Clients fertig werden (Analysieren / Behandeln der tatsächlichen Nachrichten beiseite)
- Ausnahmen: Ich bin nicht sicher, wie Ausnahmen innerhalb von Clients "bis" zum Server ausgelöst werden ("RX-weise"). Was wäre ein guter Weg?
- Ich kann jetzt jeden verbundenen Client von meiner Serverklasse
ClientId
abrufen (indem ich ihn verwende ), vorausgesetzt, ich mache die Clients auf die eine oder andere Weise verfügbar und rufe Methoden direkt auf. Ich kann Methoden auch über die Serverklasse aufrufen (zum Beispiel dieSend(clientId, rawmessage)
Methode (wobei der letztere Ansatz eine "praktische" Methode wäre, um eine Nachricht schnell auf die andere Seite zu bringen). - Ich bin mir nicht ganz sicher, wohin (und wie) ich von hier aus gehen soll:
- a) Ich muss eingehende Nachrichten bearbeiten. Wie würde ich das einrichten? Ich kann den Datenstrom abrufen, aber wo kann ich die empfangenen Bytes abrufen ? Ich glaube, ich brauche eine Art "ObservableStream" - etwas, das ich abonnieren kann? Würde ich das in das
FiddleClient
oder setzenFiddleServer
? - b) Angenommen, ich möchte die Verwendung von event vermeiden, bis diese
FiddleClient
/FiddleServer
Klassen spezifischer implementiert sind, um ihre anwendungsspezifische Protokollbehandlung usw. unter Verwendung spezifischerFooClient
/FooServer
Klassen anzupassen: Wie würde ich von den Daten in den zugrunde liegenden 'Fiddle'-Klassen zu ihren gehen spezifischere Gegenstücke?
- a) Ich muss eingehende Nachrichten bearbeiten. Wie würde ich das einrichten? Ich kann den Datenstrom abrufen, aber wo kann ich die empfangenen Bytes abrufen ? Ich glaube, ich brauche eine Art "ObservableStream" - etwas, das ich abonnieren kann? Würde ich das in das
Artikel / Links, die ich bereits gelesen / überflogen / als Referenz verwendet habe:
- Verbrauchen Sie eine Steckdose mit Reactive Extension
- Einfache beobachtbare Sequenzen erstellen und abonnieren
- Hinzufügen oder Zuordnen von Kontext zu jeder ObservableTcpListener-Verbindung / -Sitzung
- Asynchrone Programmierung mit dem Reactive Framework und der Task Parallel Library - Teil 1 , 2 und 3 .
- Reactive Extensions (Rx) für die Socket-Programmierung verwenden?
- Ein .NET Rx-gesteuerter Webserver und ein .NET Rx-gesteuerter Webserver, Take 2
- Einige Rx-Tutorials
Antworten:
Nachdem ich diese Aussage gelesen hatte, dachte ich sofort "Schauspieler". Akteure sind Objekten sehr ähnlich, mit der Ausnahme, dass sie nur eine einzige Eingabe haben, an die Sie die Nachricht übergeben (anstatt die Methoden des Objekts direkt aufzurufen), und dass sie asynchron arbeiten. In einem sehr vereinfachten Beispiel ... würden Sie den Akteur erstellen und ihm eine Nachricht mit dem IPEndpoint und der Adresse des Akteurs senden, an den das Ergebnis gesendet werden soll. Es geht los und funktioniert im Hintergrund. Man hört nur davon, wenn "etwas von Interesse" passiert. Sie können so viele Akteure instanziieren, wie Sie für die Last benötigen.
Ich kenne keine Schauspieler-Bibliotheken in .Net, obwohl ich weiß, dass es einige gibt. Ich bin mit der TPL-Datenflussbibliothek vertraut (in meinem Buch http://DataflowBook.com wird ein Abschnitt darüber enthalten sein ), und es sollte einfach sein, mit dieser Bibliothek ein einfaches Darstellermodell zu implementieren.
quelle