2013-02-15 30 views
111

का उपयोग कर रेडिस संदेशबस विफलता के साथ सिग्नलआर का उपयोग करना मैं सिग्नलर ऐप के साथ रेडिस संदेश बस विफलता परिदृश्य बनाने की कोशिश कर रहा हूं।BookSleeve's ConnectionUtils.Connect()

सबसे पहले, हमने एक साधारण हार्डवेयर लोड-बैलेंसर विफलता की कोशिश की, जिसने बस दो रेडिस सर्वरों की निगरानी की। सिग्नलआर एप्लिकेशन ने एकवचन एचएलबी एंडपॉइंट की ओर इशारा किया। मैं फिर एक सर्वर विफल रहा, लेकिन सिग्नलर ऐप पूल रीसाइक्लिंग किए बिना दूसरे रेडिस सर्वर पर सफलतापूर्वक कोई संदेश प्राप्त करने में असमर्थ था। संभवतः ऐसा इसलिए है क्योंकि इसे नए रेडिस संदेश बस में सेटअप कमांड जारी करने की आवश्यकता है।

सिग्नल आरसी 1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus पबलीव के RedisConnection() का उपयोग पब/उप के लिए एक रेडिस से कनेक्ट करने के लिए करता है।

मैंने एक नई कक्षा, RedisMessageBusCluster() बनाई जो Redle सर्वर के क्लस्टर में से किसी एक से कनेक्ट करने के लिए Booksleeve's ConnectionUtils.Connect() का उपयोग करता है।

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using BookSleeve; 
using Microsoft.AspNet.SignalR.Infrastructure; 

namespace Microsoft.AspNet.SignalR.Redis 
{ 
    /// <summary> 
    /// WIP: Getting scaleout for Redis working 
    /// </summary> 
    public class RedisMessageBusCluster : ScaleoutMessageBus 
    { 
     private readonly int _db; 
     private readonly string[] _keys; 
     private RedisConnection _connection; 
     private RedisSubscriberConnection _channel; 
     private Task _connectTask; 

     private readonly TaskQueue _publishQueue = new TaskQueue(); 

     public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) 
      : base(resolver) 
     { 
      _db = db; 
      _keys = keys.ToArray(); 

      // uses a list of connections 
      _connection = ConnectionUtils.Connect(serverList); 

      //_connection = new RedisConnection(host: server, port: port, password: password); 

      _connection.Closed += OnConnectionClosed; 
      _connection.Error += OnConnectionError; 


      // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on 
      _connectTask = _connection.Open().Then(() => 
      { 
       // Create a subscription channel in redis 
       _channel = _connection.GetOpenSubscriberChannel(); 

       // Subscribe to the registered connections 
       _channel.Subscribe(_keys, OnMessage); 

       // Dirty hack but it seems like subscribe returns before the actual 
       // subscription is properly setup in some cases 
       while (_channel.SubscriptionCount == 0) 
       { 
        Thread.Sleep(500); 
       } 
      }); 
     } 


     protected override Task Send(Message[] messages) 
     { 
      return _connectTask.Then(msgs => 
      { 
       var taskCompletionSource = new TaskCompletionSource<object>(); 

       // Group messages by source (connection id) 
       var messagesBySource = msgs.GroupBy(m => m.Source); 

       SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); 

       return taskCompletionSource.Task; 
      }, 
      messages); 
     } 

     private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) 
     { 
      if (!enumerator.MoveNext()) 
      { 
       taskCompletionSource.TrySetResult(null); 
      } 
      else 
      { 
       IGrouping<string, Message> group = enumerator.Current; 

       // Get the channel index we're going to use for this message 
       int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; 

       string key = _keys[index]; 

       // Increment the channel number 
       _connection.Strings.Increment(_db, key) 
            .Then((id, k) => 
            { 
             var message = new RedisMessage(id, group.ToArray()); 

             return _connection.Publish(k, message.GetBytes()); 
            }, key) 
            .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) 
            .ContinueWithNotComplete(taskCompletionSource); 
      } 
     } 

     private void OnConnectionClosed(object sender, EventArgs e) 
     { 
      // Should we auto reconnect? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) 
     { 
      // How do we bubble errors? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnMessage(string key, byte[] data) 
     { 
      // The key is the stream id (channel) 
      var message = RedisMessage.Deserialize(data); 

      _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_channel != null) 
       { 
        _channel.Unsubscribe(_keys); 
        _channel.Close(abort: true); 
       } 

       if (_connection != null) 
       { 
        _connection.Close(abort: true); 
       }     
      } 

      base.Dispose(disposing); 
     } 
    } 
} 

Booksleeve एक मास्टर निर्धारित करने के लिए अपने स्वयं के तंत्र है, और स्वचालित रूप से अन्य सर्वर पर अधिक असफल हो जायेगी, और अब SignalR.Chat के साथ इस परीक्षण कर रहा हूँ। अब

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) 
{ 
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); 
} 

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) 
{ 
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); 
    resolver.Register(typeof(IMessageBus),() => bus.Value); 

    return resolver; 
} 

:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 
फिर Application_Start() में

:

 // Redis cluster server list 
     string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; 

     List<string> eventKeys = new List<string>(); 
     eventKeys.Add("SignalR.Redis.FailoverTest"); 
     GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

मैं Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions के लिए दो अतिरिक्त विधियां जोड़

web.config में, मैं उपलब्ध सर्वर की सूची सेट समस्या यह है कि जब मेरे पास कई ब्रेकपॉइंट सक्षम हैं, तो आप जब तक उपयोगकर्ता नाम जोड़ा नहीं गया है, तो सभी ब्रेकपॉइंट्स को अक्षम करें, एप्लिकेशन अपेक्षा के अनुसार काम करता है। हालांकि, शुरुआत से अक्षम ब्रेकपॉइंट्स के साथ, कुछ दौड़ की स्थिति प्रतीत होती है जो कनेक्शन प्रक्रिया के दौरान विफल हो सकती है।

इस प्रकार, RedisMessageCluster() में:

// Start the connection 
    _connectTask = _connection.Open().Then(() => 
    { 
     // Create a subscription channel in redis 
     _channel = _connection.GetOpenSubscriberChannel(); 

     // Subscribe to the registered connections 
     _channel.Subscribe(_keys, OnMessage); 

     // Dirty hack but it seems like subscribe returns before the actual 
     // subscription is properly setup in some cases 
     while (_channel.SubscriptionCount == 0) 
     { 
      Thread.Sleep(500); 
     } 
    }); 

मैं दोनों एक Task.Wait जोड़ने की कोशिश की, और यहां तक ​​कि एक अतिरिक्त Sleep() (ऊपर नहीं दिखाया गया है) - जो इंतज़ार कर रहे थे/आदि, लेकिन अभी भी त्रुटियों हो रही।

आवर्ती त्रुटि Booksleeve.MessageQueue.cs ~ ln 71 में हो रहा है:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll 
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 
    --- End of inner exception stack trace --- 
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- 



public void Enqueue(RedisMessage item, bool highPri) 
{ 
    lock (stdPriority) 
    { 
     if (closed) 
     { 
      throw new InvalidOperationException("The queue is closed"); 
     } 

कहाँ एक बंद कतार अपवाद उत्पन्न की जा रही है।

मुझे एक और समस्या है: चूंकि रेडिस कनेक्शन Application_Start() में बनाया गया है, तो किसी अन्य सर्वर से "रीकनेक्ट" में कुछ समस्याएं हो सकती हैं। हालांकि, मुझे लगता है कि एकवचन RedisConnection() का उपयोग करते समय यह मान्य है, जहां से चुनने के लिए केवल एक कनेक्शन है। हालांकि, ConnectionUtils.Connect() की अंतर्निहितता के साथ मैं @dfowler या सिग्नलआर में इस परिदृश्य को कैसे प्रबंधित किया जाता है, इस बारे में अन्य सिग्नलआर लोगों से सुनना चाहता हूं।

+0

मैं एक नज़र डालेगा, लेकिन: पहली चीज जो होती है वह यह है कि आपको 'ओपन' को कॉल करने की आवश्यकता नहीं है क्योंकि आपके पास पहले से ही * कनेक्शन होना चाहिए। मैं तत्काल देखने में सक्षम नहीं हूं, हालांकि, क्योंकि मैं एक उड़ान के लिए तैयार हो रहा हूं –

+0

मेरा मानना ​​है कि यहां दो मुद्दे हैं। 1) कैसे बुकलीव एक फेलओवर से निपट रहा है; 2) ग्राहकों का ट्रैक रखने के लिए सिग्नलआर कर्सर का उपयोग कैसे करता है। जब कोई नई संदेश बस शुरू की जाती है, तो mb1 के सभी कर्सर mb2 पर exsit नहीं करते हैं। इसलिए सिग्नलर ऐप पूल को रीसेट करते समय, यह काम करना शुरू कर देगा - पहले नहीं, जो स्पष्ट रूप से व्यवहार्य विकल्प नहीं है। – ElHaix

+2

लिंक का वर्णन करता है कि सिग्नलआर कर्सर का उपयोग कैसे करता है: http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix

उत्तर

14

सिग्नलआर टीम ने अब बुकस्लीव के उत्तराधिकारी StackExchange.Redis के साथ कस्टम कनेक्शन फैक्ट्री के लिए समर्थन लागू किया है, जो कनेक्शन मल्टीप्लेक्सर के माध्यम से अनावश्यक रेडिस कनेक्शन का समर्थन करता है।

प्रारंभिक समस्या का सामना करना पड़ा था कि सर्वर के संग्रह को स्वीकार करने के लिए बुकस्लीव में अपने स्वयं के विस्तार विधियों को बनाने के बावजूद, असफलता संभव नहीं थी।

अब बुकस्लीव के विकास के साथ StackExchange.Redis के साथ, हम अब configureConnect प्रारंभ में सर्वर/बंदरगाहों का संग्रह कर सकते हैं।

नई कार्यान्वयन सड़क मैं नीचे जा रहा था एक UseRedisCluster विधि बनाने में, तुलना में बहुत सरल है, और पीछे के अंत अब pluming सच का समर्थन करता है असफल ओवर:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis भी लिए अनुमति देता है अतिरिक्त मैन्युअल कॉन्फ़िगरेशन दस्तावेज की Automatic and Manual Configuration अनुभाग में बताया गया:

ConfigurationOptions config = new ConfigurationOptions 
{ 
    EndPoints = 
    { 
     { "redis0", 6379 }, 
     { "redis1", 6380 } 
    }, 
    CommandMap = CommandMap.Create(new HashSet<string> 
    { // EXCLUDE a few commands 
     "INFO", "CONFIG", "CLUSTER", 
     "PING", "ECHO", "CLIENT" 
    }, available: false), 
    KeepAlive = 180, 
    DefaultVersion = new Version(2, 8, 8), 
    Password = "changeme" 
}; 

संक्षेप में, सर्वर का एक संग्रह के साथ हमारे SignalR पैमाने आउट वातावरण को प्रारंभ करने की क्षमता अब प्रारंभिक समस्याओं को हल करती है मीटर।

+0

क्या मुझे 500 उत्तर बक्षीस के साथ अपना जवाब इनाम देना चाहिए? ;) – nicael

+0

ठीक है, अगर आपको लगता है कि अब * उत्तर * :) – ElHaix

+0

@ElHaix है क्योंकि आपने सवाल पूछा है, तो शायद आप यह कहने के लिए सबसे योग्य हैं कि आपका उत्तर निर्णायक है या नहीं, यह पहेली में सिर्फ एक टुकड़ा है - I यह इंगित करने के लिए एक वाक्य जोड़ने का सुझाव दें कि क्या आपकी समस्या को हल किया गया है और संभवतः –