2011-11-09 5 views
5

मैं IObservable<bool> बनाने की कोशिश कर रहा हूं जो true लौटाता है यदि पिछले 5 सेकंड में यूडीपी संदेश प्राप्त हुआ है और यदि कोई टाइमआउट होता है, तो झूठा वापस आ जाता है।प्रतिक्रियाशील एक्सटेंशन टाइमआउट जो अनुक्रम को रोकता नहीं है?

अब तक मैं इस है:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    var udp = BaseComms.UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Select(s => true); 

    return Observable 
     .Timeout(udp, TimeSpan.FromSeconds(5)) 
     .Catch(Observable.Return(false)); 
} 

इस के साथ मुद्दे हैं: -

  • एक बार एक झूठी दिया जाता है, अनुक्रम बंद हो जाता है
  • मैं केवल वास्तव में true या false पर की जरूरत है राज्य परिवर्तन

मैं एक Subject<T> इस्तेमाल कर सकते हैं, लेकिन मैं UDPBaseStringListener नमूदार के निपटान के लिए जब वहाँ कोई और अधिक ग्राहक हैं सावधान रहने की जरूरत है।

अद्यतन

हर बार जब मैं एक यूडीपी संदेश मिलता है कि मैं यह एक true वापसी करना चाहते हैं। अगर मुझे पिछले 5 सेकंड में यूडीपी संदेश नहीं मिला है, तो मैं इसे false वापस करना चाहता हूं।

+0

FYI करें, 'Timeout' एक अधिभार है कि जब समय के लिए एक वैकल्पिक नमूदार लेता है टाइमआउट "फेंकने" की बजाय होता है और 'कैच' की आवश्यकता होती है। –

+0

पाठकों को भी रुचि हो सकती है [1] (http://stackoverflow.com/q/23394441/1267663), [2] (http://stackoverflow.com/q/12786901/1267663), और [3] (http://stackoverflow.com/q/35873244/1267663)। – Whymarrh

उत्तर

3

जैसा कि Bj0 द्वारा इंगित किया गया है, BufferWithTime के साथ समाधान डेटा पॉइंट वापस नहीं लौटाएगा और डेटा पॉइंट प्राप्त करने के बाद बफर टाइमआउट रीसेट नहीं किया जाएगा।

आरएक्स एक्सटेंशन 2.0 के साथ, अपने एक नया बफर के साथ दोनों की समस्याओं को हल कर सकते हैं दोनों एक टाइमआउट और एक आकार को स्वीकार अधिभार:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return BaseComms 
     .UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Buffer(TimeSpan.FromSeconds(5), 1) 
     .Select(s => s.Count > 0) 
     .DistinctUntilChanged(); 
} 
1

मैं Timeout के उपयोग से बचने का सुझाव दूंगा - यह अपवादों के साथ अपवाद और कोडिंग खराब है।

इसके अलावा, ऐसा लगता है कि आपका अवलोकन एक मूल्य के बाद बंद हो जाता है। आपको यह समझाने की आवश्यकता हो सकती है कि आप क्या व्यवहार करना चाहते हैं।

आपकी समस्या का मेरे वर्तमान समाधान है:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return Observable.Create<bool>(o => 
    { 
     var subject = new AsyncSubject<bool>(); 
     return new CompositeDisposable(
      Observable.Amb(
       BaseComms 
        .UDPBaseStringListener(localEP) 
        .Where(msg => msg.Data.Contains("running")) 
        .Select(s => true), 
       Observable 
        .Timer(TimeSpan.FromMilliseconds(10.0)) 
        .Select(_ => false) 
      ).Take(1).Subscribe(subject), subject.Subscribe(o)); 
    }); 
} 

है कि मदद करता है?

+0

यह संकलित नहीं करता है। Observable.Create एक कार्रवाई वापस करने की उम्मीद कर रहा है ... – Tim

+0

'टाइमआउट' केवल डिफ़ॉल्ट रूप से अपवाद का कारण बनता है। इसमें एक अधिभार होता है जो टाइमआउट पहुंचने पर उपयोग करने के लिए एक वैकल्पिक अवलोकन लेता है, जो डिफ़ॉल्ट रूप से 'पर्यवेक्षण योग्य' के साथ डिफ़ॉल्ट ओवरलोड कॉल करता है। –

+0

@ जिम - आपको आरएक्स के एक अलग संस्करण का उपयोग करना होगा - यह v1.1.10621 के साथ ठीक है। – Enigmativity

1

आप को रोकने के लिए अनुक्रम नहीं करना चाहते हैं, बस इसे आस्थगित करें में लपेट + दोहराएँ:

Observable.Defer(() => GettingUDPMessages(endpoint) 
    .Repeat(); 
2

बफर के साथ समस्या यह है कि "समय समाप्ति" अंतराल नहीं करता हो जब आप एक मिल रीसेट है नया मूल्य, बफर विंडोज़ समय के स्लाइस (इस मामले में 5 एस) हैं जो एक-दूसरे का अनुसरण करते हैं। इसका मतलब यह है कि, जब आप अपना अंतिम मूल्य प्राप्त करते हैं, तो आपको टाइमआउट मान लगभग दोगुनी प्रतीक्षा करनी पड़ सकती है।

   should timeout here 
         v 
0s   5s   10s  15s 
|x - x - x | x - - - - | - - - x -| ... 
      true  true  true 

IObservable.Throttle, लेकिन, खुद के लिए हर बार एक नया मान में आता है और केवल एक मूल्य के बाद समयावधि बीत जाने पर (पिछले भेजे मूल्य) का उत्पादन रीसेट करता है: यह भी समय समाप्ति को याद कर सकते हैं। इस धारा में "समय समाप्ति" मूल्यों डालने के लिए एक समय समाप्ति के रूप में इस्तेमाल किया जा सकता और IObservable साथ विलय कर दिया:

var obs = BaseComms.UDPBaseStringListener(localEP) 
      .Where(msg => msg.Data.Contains("running")); 

return obs.Merge(obs.Throttle(TimeSpan.FromSeconds(5)) 
         .Select(x => false)) 
      .DistinctUntilChanged(); 

एक काम कर LINQPad उदाहरण:

var sub = new Subject<int>(); 

var script = sub.Timestamp() 
    .Merge(sub.Throttle(TimeSpan.FromSeconds(2)).Select(i => -1).Timestamp()) 
    .Subscribe(x => 
{ 
    x.Dump("val"); 
}); 


Thread.Sleep(1000); 

sub.OnNext(1); 
sub.OnNext(2); 

Thread.Sleep(10000); 

sub.OnNext(5); 

ए -1 के बाद धारा में डाला जाता है एक 2 एस टाइमआउट।