2011-05-28 18 views
11

मैं एफ # संगणना अभिव्यक्ति वाक्य रचना के भीतर रिएक्टिव एक्सटेंशन का उपयोग करने के आरएक्स बिल्डर साथ आने के लिए कोशिश कर रहा हूँ। मैं इसे कैसे ठीक कर सकता हूं ताकि यह ढेर को उड़ाए? नीचे सेक उदाहरण की तरह। और प्रतिक्रियाशील एक्सटेंशन का हिस्सा है या .NET फ्रेमवर्क के भविष्य के संस्करणों के हिस्से के रूप RxBuilder के एक कार्यान्वयन प्रदान करने के लिए कोई योजना है?स्टैक ओवरफ़्लो अपवाद को ठीक करने के लिए मैं आरएक्स बिल्डर कार्यान्वयन को कैसे बदलूं?

open System 
open System.Linq 
open System.Reactive.Linq 

type rxBuilder() =  
    member this.Delay f = Observable.Defer f 
    member this.Combine (xs: IObservable<_>, ys : IObservable<_>) = 
     Observable.merge xs ys  
    member this.Yield x = Observable.Return x 
    member this.YieldFrom (xs:IObservable<_>) = xs 

let rx = rxBuilder() 

let rec f x = seq { yield x 
        yield! f (x + 1) } 

let rec g x = rx { yield x 
        yield! g (x + 1) } 


//do f 5 |> Seq.iter (printfn "%A") 

do g 5 |> Observable.subscribe (printfn "%A") |> ignore 

do System.Console.ReadLine() |> ignore 
+0

यह आपकी रूचि रख सकता है: http://feedproxy.google.com/~r/FCode/~3/rDQbQHOROOw/reactive-extensions-v1010425.html –

+0

क्या आपने नवीनतम आरएक्स बिल्ड में 'विस्तार' ऑपरेटर देखा है ? यह कुछ रिकर्सन परिदृश्य को संभालता है। –

+0

आप पर्यवेक्षक कैसे लागू करेंगे। इस समस्या का विस्तार करें? क्या आपके पास विस्तार के अर्थशास्त्र को समझाते हुए कोई लिंक है? – Holoed

उत्तर

9

एक संक्षिप्त उत्तर है कि आरएक्स फ्रेमवर्क इस तरह एक पुनरावर्ती पद्धति का उपयोग कर पैदा observables का समर्थन नहीं करता है, इसलिए यह आसानी से नहीं किया जा सकता। Combine ऑपरेशन जिसे F # अनुक्रमों के लिए उपयोग किया जाता है, कुछ विशेष हैंडलिंग की आवश्यकता होती है जो अवलोकन योग्य नहीं प्रदान करते हैं। आरएक्स फ्रेमवर्क शायद उम्मीद है कि आप Observable.Generate का उपयोग कर observables उत्पन्न करेंगे और फिर उन्हें कार्रवाई करने के लिए LINQ प्रश्नों के लिए/एफ # गणना बिल्डर का उपयोग करें।

वैसे भी, यहाँ कुछ विचार कर रहे हैं -

सबसे पहले, आप Observable.Concat साथ Observable.merge को बदलने के लिए की जरूरत है। पहले एक, समानांतर में दोनों observables चलाता है, जबकि दूसरा पहले पैदावार पहले नमूदार से सभी मूल्यों और फिर दूसरी नमूदार से मूल्यों को पैदा करता है। इस परिवर्तन के बाद, स्निपेट कम से कम ~ 800 संख्याओं को स्टैक ओवरफ़्लो से पहले प्रिंट करेगा।

स्टैक ओवरफ़्लो का कारण यह है कि Concat एक अवलोकन योग्य बनाता है जो Concat पर कॉल करता है ताकि Concat आदि को अन्य अवलोकन योग्य बनाया जा सके। इसे हल करने का एक तरीका कुछ सिंक्रनाइज़ेशन जोड़ना है। आप Windows फ़ॉर्म का उपयोग करके कर रहे हैं, तो आप Delay संशोधित कर सकते हैं इतना है कि यह कार्यक्रम जीयूआई धागा (जो वर्तमान ढेर को छोड़ देता है) पर नमूदार।

type RxBuilder() = 
    member this.Delay f = 
     let sync = System.Threading.SynchronizationContext.Current 
     let res = Observable.Defer f 
     { new IObservable<_> with 
      member x.Subscribe(a) = 
      sync.Post((fun _ -> res.Subscribe(a) |> ignore), null) 
      // Note: This is wrong, but we cannot easily get the IDisposable here 
      null } 
    member this.Combine (xs, ys) = Observable.Concat(xs, ys) 
    member this.Yield x = Observable.Return x 
    member this.YieldFrom (xs:IObservable<_>) = xs 

ठीक से इसे लागू करने के, आप अपने खुद Concat विधि है, जो काफी जटिल है लिखने के लिए होता है: यहाँ एक स्केच है। विचार यह होगा कि:

  • Concat कुछ विशेष प्रकार उदा। IConcatenatedObservable
  • जब विधि रिकर्सिवली कहा जाता है आप IConcatenatedObservable संदर्भ एक दूसरे को
  • Concat विधि इस श्रृंखला के लिए दिखेगा और वहाँ उदाहरण हैं जब की एक श्रृंखला पैदा हो जाएगी तीन वस्तुओं, यह मध्य एक को छोड़ देगा (हमेशा 2 की लंबाई की श्रृंखला रखने के लिए)।

यह स्टैक ओवरफ्लो उत्तर के लिए थोड़ा जटिल है, लेकिन यह आरएक्स टीम के लिए उपयोगी प्रतिक्रिया हो सकती है।

+2

धन्यवाद। बहुत बढ़िया जवाब। मुझे उम्मीद है कि आरएक्स टीम सुन रही है :) – Holoed

+0

क्या आप पुरानी अवलोकनों को छोड़ने और निपटाने के लिए 'TakeUntil' का उपयोग कर सकते हैं? बेशक, आपको पिछले अवलोकन के पकड़ को पकड़ने के लिए कुछ रास्ता चाहिए। –

2

अगर हम इस गणना अभिव्यक्ति (उर्फ इकाई) से वाक्यात्मक चीनी को दूर हम होगा:

let rec g x = Observable.Defer (fun() -> Observable.merge(Observable.Return x, g (x + 1)) 

या सी # में:

public static IObservable<int> g(int x) 
{ 
    return Observable.Defer<int>(() => 
    { 
     return Observable.Merge(Observable.Return(x), g(x + 1));      
    }); 
} 

कौन सा निश्चित रूप से पूंछ पुनरावर्ती नहीं है। मुझे लगता है कि अगर आप इसे पूंछ पुनरावर्ती तो यह शायद आपकी समस्या का समाधान होगा

3

क्या कुछ इस तरह के बारे में कर सकते हैं?

type rxBuilder() =  
    member this.Delay (f : unit -> 'a IObservable) = 
       { new IObservable<_> with 
        member this.Subscribe obv = (f()).Subscribe obv } 
    member this.Combine (xs:'a IObservable, ys: 'a IObservable) = 
       { new IObservable<_> with 
        member this.Subscribe obv = xs.Subscribe obv ; 
               ys.Subscribe obv } 
    member this.Yield x = Observable.Return x 
    member this.YieldFrom xs = xs 

let rx = rxBuilder() 

let rec f x = rx { yield x 
        yield! f (x + 1) } 

do f 5 |> Observable.subscribe (fun x -> Console.WriteLine x) |> ignore 

do System.Console.ReadLine() |> ignore 

http://rxbuilder.codeplex.com/

XS डिस्पोजेबल ऊपर तार नहीं है (RxBuilder के साथ प्रयोग करने के उद्देश्य के लिए बनाई गई)। जैसे ही मैं डिस्पोजेबल को तार करने की कोशिश करता हूं, यह स्टैक को उड़ाने के लिए वापस चला जाता है।

4

यह Rx 2.0 Beta में तय किया गया है। और यहां एक test है।

9

ध्यान दें कि यह आरएक्स v2.0 (जैसा कि पहले से उल्लेख किया गया है) में तय किया गया है, आमतौर पर सभी अनुक्रमक ऑपरेटरों (कंसैट, कैच, ऑनरर रेस्यूमनेक्स्ट) के साथ-साथ अनिवार्य ऑपरेटरों (यदि, इत्यादि) के लिए भी।)।

असल में, आप ऑपरेटर के इस वर्ग के बारे में सोच सकते हैं कि एक टर्मिनल पर्यवेक्षक संदेश में एक और अनुक्रम की सदस्यता लेना (उदाहरण के लिए कॉन्सैट वर्तमान अनुक्रमित संदेश प्राप्त करने के बाद अगले अनुक्रम में सदस्यता लेता है), जहां पूंछ रिकर्सन समानता है अंदर आता है

आरएक्स v2.0 में, सभी पूंछ-पुनरावर्ती सदस्यता एक समय में एक प्रोसेसिंग के लिए एक कतार जैसी डेटा संरचना में चपेट में आती है, जो डाउनस्ट्रीम पर्यवेक्षक से बात करती है। यह लगातार अनुक्रम सदस्यता के लिए एक-दूसरे से बात करने वाले पर्यवेक्षकों की असीमित वृद्धि से बचाता है।