Verwenden von SignalR mit Redis Messagebus-Failover mithilfe von ConnectionUtils.Connect () von BookSleeve

112

Ich versuche, ein Redis-Nachrichtenbus-Failover-Szenario mit einer SignalR-App zu erstellen.

Zuerst haben wir ein einfaches Hardware-Load-Balancer-Failover versucht, bei dem einfach zwei Redis-Server überwacht wurden. Die SignalR-Anwendung zeigte auf den singulären HLB-Endpunkt. Ich habe dann einen Server ausgefallen, konnte jedoch keine Nachrichten auf dem zweiten Redis-Server erfolgreich abrufen, ohne den SignalR-App-Pool zu recyceln. Vermutlich liegt dies daran, dass die Setup-Befehle an den neuen Redis-Nachrichtenbus ausgegeben werden müssen.

Ab SignalR RC1 Microsoft.AspNet.SignalR.Redis.RedisMessageBuswird Booksleeve verwendet RedisConnection(), um eine Verbindung zu einem einzelnen Redis für Pub / Sub herzustellen .

Ich habe eine neue Klasse erstellt, RedisMessageBusCluster()die mithilfe von Booksleeve eine ConnectionUtils.Connect()Verbindung zu einer in einem Cluster von Redis-Servern herstellt.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve verfügt über einen eigenen Mechanismus zum Bestimmen eines Masters und wird automatisch auf einen anderen Server übertragen. Ich teste dies jetzt mit SignalR.Chat.

In web.configstelle ich die Liste der verfügbaren Server ein:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Dann in Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Ich habe zwei zusätzliche Methoden hinzugefügt Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Das Problem ist nun, dass die Anwendung wie erwartet funktioniert, wenn mehrere Haltepunkte aktiviert sind, bis nach dem Hinzufügen eines Benutzernamens alle Haltepunkte deaktiviert wurden. Da die Haltepunkte von Anfang an deaktiviert sind, scheint es jedoch einige Race-Bedingungen zu geben, die während des Verbindungsprozesses möglicherweise fehlschlagen.

So in RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

Ich habe versucht, sowohl ein Task.Waitals auch ein zusätzliches Sleep()(oben nicht gezeigt) hinzuzufügen - die warteten / etc, aber immer noch Fehler erhielten.

Der wiederkehrende Fehler scheint in Booksleeve.MessageQueue.cs~ ln 71 zu sein:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Wo eine geschlossene Warteschlangenausnahme ausgelöst wird.

Ich sehe ein anderes Problem voraus: Da die Redis-Verbindung hergestellt wird, Application_Start()kann es zu Problemen beim "erneuten Verbinden " mit einem anderen Server kommen. Ich denke jedoch, dass dies gültig ist, wenn der Singular verwendet wird RedisConnection(), bei dem nur eine Verbindung zur Auswahl steht. Mit der Einführung von ConnectionUtils.Connect()würde ich jedoch gerne von @dfowleroder den anderen SignalR-Leuten hören, wie dieses Szenario in SignalR gehandhabt wird.

ElHaix
quelle
Ich werde einen Blick darauf werfen, aber: Das erste, was passiert, ist, dass Sie nicht anrufen müssen, Openda die Verbindung, die Sie haben, bereits offen sein sollte. Ich werde jedoch nicht sofort nachsehen können, da ich mich auf einen Flug
vorbereite
Ich glaube, hier gibt es zwei Probleme. 1) wie Booksleeve mit einem Failover umgeht; 2) Wie SignalR Cursor verwendet, um Clients zu verfolgen. Wenn ein neuer Nachrichtenbus initialisiert wird, werden nicht alle Cursor von mb1 auf mb2 beendet. Daher beginnt das Zurücksetzen des SignalR-App-Pools zu funktionieren - nicht vorher, was offensichtlich keine praktikable Option ist.
ElHaix
2
Link, der beschreibt, wie SignalR Cursor verwendet: stackoverflow.com/questions/13054592/…
ElHaix
Versuchen Sie es mit der neuesten Version des Redis-Nachrichtenbusses. Es unterstützt das Übergeben einer Verbindungsfactory und das erneute Versuchen, eine Verbindung herzustellen, wenn der Server ausfällt.
Davidfowl
Haben Sie einen Link zu den Versionshinweisen? Vielen Dank.
ElHaix

Antworten:

13

Das SignalR Team hat nun Unterstützung für eine benutzerdefinierte Verbindung Fabrik mit implementierter StackExchange.Redis , dem Nachfolger von BookSleeve, die redundanten Redis Verbindungen über ConnectionMultiplexer unterstützt.

Das anfängliche Problem bestand darin, dass trotz der Erstellung meiner eigenen Erweiterungsmethoden in BookSleeve zur Annahme einer Sammlung von Servern kein Failover möglich war.

Mit der Entwicklung von BookSleeve zu StackExchange.Redis können wir jetzt die Sammlung von Servern / Ports direkt bei der Initialisierung konfigurierenConnect .

Die neue Implementierung ist viel einfacher als der Weg, den ich beim Erstellen einer UseRedisClusterMethode eingeschlagen habe, und das Back-End-Pluming unterstützt jetzt echtes Failover:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis ermöglicht auch eine zusätzliche manuelle Konfiguration, wie im Automatic and Manual ConfigurationAbschnitt der Dokumentation beschrieben:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

Im Wesentlichen löst die Möglichkeit, unsere SignalR-Scale-Out-Umgebung mit einer Sammlung von Servern zu initialisieren, das anfängliche Problem.

ElHaix
quelle
Soll ich Ihre Antwort mit 500 Wiederholungen belohnen? ;)
Nicael
Nun, wenn Sie glauben, dass das jetzt die Antwort ist :)
ElHaix
@ElHaix Da Sie die Frage gestellt haben, können Sie wahrscheinlich am besten sagen, ob Ihre Antwort schlüssig ist oder ob sie nur ein Teil des Puzzles ist. Ich schlage vor, einen Satz hinzuzufügen, um anzugeben, ob und möglicherweise wie Ihr Problem gelöst wurde
Lars Höppner
So? Prämienprämie? Oder ich kann warten, bis es mehr Aufmerksamkeit erregt.
Nicael
Vermisse ich etwas oder ist dies nur in einem Feature-Zweig, nicht im Haupt-Nuget-Paket (2.1)? Außerdem scheint es im Zweig bug-stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ) in der RedisScaleoutConfiguration-Klasse noch keine Möglichkeit zu geben, einen eigenen Multiplexer bereitzustellen.
Steve