2011-11-27 32 views
6

मैं स्ट्रीम तक निम्नलिखित अवलोकन दोहरा सकता हूं। डेटा उपलब्ध है झूठी है? वर्तमान में ऐसा लगता है कि यह कभी नहीं रुकता है।फास्ट दोहराना TakeWhile अनंत लूप का कारण बनता है

AsyncReadChunk और Observable। Defer अनुभाग के अंदर वापसी ऑननेक्स्ट कॉल को फिर पूर्ण कॉल करें। जब दोहराना ऑननेक्स्ट कॉल प्राप्त करता है तो यह इसे टेकवॉइस में पास करता है। जब टेकवॉइस संतुष्ट नहीं होता है तो यह अवलोकन योग्य हो जाता है लेकिन मुझे लगता है कि ऑनकॉक्स्ट के बाद ऑनकॉप्लेट सही हो जाता है, यह अवलोकन करने के लिए पुन: सदस्यता लेने के लिए दोहराया जाता है और अनंत लूप का कारण बनता है।

मैं इस व्यवहार को कैसे ठीक कर सकता हूं?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
{ 
    return Observable.Defer(() => 
     { 
      try 
      { 
       return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); 
      } 
      catch (Exception) 
      { 
       return Observable.Return(new byte[0]); 
      } 
     }) 
     .Repeat() 
     .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
} 
+4

अच्छी तरह से आपकी समस्या को हल करने के तरीके के बारे में जानने के लिए और आपके समाधान को साझा करने के लिए धन्यवाद। हालांकि, क्या आप अपने प्रश्न को संपादित करने के बजाय अपने प्रश्न के समाधान को उत्तर के रूप में पोस्ट कर सकते हैं? –

+0

समेट, मैंने आपके स्वयं के उत्तर को प्रश्न से बाहर कर दिया है और एक अलग उत्तर में, समुदाय विकी के रूप में चिह्नित किया है। –

उत्तर

2

स्व उत्तर: (नीचे एक जवाब Samet, प्रश्न के लेखक द्वारा पोस्ट की गई है हालांकि, उन्होंने सवाल के हिस्से के रूप उत्तर पोस्ट मैं एक अलग में आगे बढ़ रहा हूँ।। जवाब है, समुदाय विकी के रूप में चिह्नित है, क्योंकि लेखक यह खुद नहीं ले जाया गया है।)


मैं पुनर्रचना कि यह शेड्यूलर साथ एक समस्या है द्वारा की खोज की। रिटर्न तत्काल शेड्यूलर का उपयोग करता है जबकि दोहराना CurrentThread का उपयोग करता है। निश्चित कोड नीचे है।

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
    { 
     return Observable.Defer(() => 
            { 
             try 
             { 
              return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
             catch (Exception) 
             { 
              return Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
            }) 
      .Repeat() 
      .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
    }