2011-01-17 4 views
9

मैं कुछ नेट के लिए प्रतिक्रियाशील एक्सटेंशन का उपयोग कर कोड को फिर से लिखने की कोशिश कर रहा हूँ, लेकिन मैं अपने लक्ष्य को प्राप्त करने पर कुछ मार्गदर्शन की जरूरत है पर ब्लॉक करने के लिए Rx का उपयोग करना।(और संभवतः समय समाप्त) एक अतुल्यकालिक आपरेशन

मैं एक वर्ग है कि एक निम्न स्तर पुस्तकालय में कुछ अतुल्यकालिक व्यवहार समाहित है। ऐसा कुछ सोचें जो नेटवर्क को पढ़ता या लिखता है। जब कक्षा शुरू होती है तो यह पर्यावरण से जुड़ने की कोशिश करेगी और जब सफल हो तो यह एक कार्यकर्ता थ्रेड से कॉल करके इसे वापस संकेत देगा।

मैं एक तुल्यकालिक कॉल में इस अतुल्यकालिक व्यवहार चालू करना चाहते हैं और मुझे लगता है कि कैसे प्राप्त किया जा सकता पर नीचे एक बहुत सरल उदाहरण बनाया है:

ManualResetEvent readyEvent = new ManualResetEvent(false); 

public void Start(TimeSpan timeout) { 
    // Simulate a background process 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Wait for startup to complete. 
    if (!this.readyEvent.WaitOne(timeout)) 
    throw new TimeoutException(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); // Simulate startup delay. 
    this.readyEvent.Set(); 
} 

एक कार्यकर्ता धागे पर AsyncStart चल रहा है बस एक तरह से अनुकरण करने के लिए है लाइब्रेरी का असीमित व्यवहार और मेरे वास्तविक कोड का हिस्सा नहीं है जहां निम्न स्तर की लाइब्रेरी थ्रेड की आपूर्ति करती है और कॉलबैक पर अपना कोड कॉल करती है।

ध्यान दें कि Start विधि TimeoutException फेंक देगी यदि स्टार्ट टाइमआउट अंतराल के भीतर पूरा नहीं हुआ है।

मैं इस कोड को आरएक्स का उपयोग करने के लिए फिर से लिखना चाहता हूं। यहां मेरा पहला प्रयास है:

Subject<Unit> readySubject = new Subject<Unit>(); 

public void Start(TimeSpan timeout) { 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Point A - see below 
    this.readySubject.Timeout(timeout).First(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); 
    this.readySubject.OnNext(new Unit()); 
} 

यह एक सभ्य प्रयास है लेकिन दुर्भाग्य से इसमें दौड़ की स्थिति शामिल है। यदि स्टार्टअप तेज़ पूरा करता है (उदाहरण के लिए यदि delay 0 है) और यदि बिंदु ए पर कोई अतिरिक्त विलंब होता है तो OnNextreadySubject पर First से पहले निष्पादित किया जाएगा। सार IObservable में मैं आवेदन करने कर रहा हूँ Timeout और First देखता है कभी नहीं है कि स्टार्टअप पूरा कर लिया है और एक TimeoutException बजाय फेंक दिया जाएगा।

ऐसा लगता है कि Observable.Defer इस तरह की समस्याओं को संभालने के लिए बनाया गया है।

Subject<Unit> readySubject = new Subject<Unit>(); 

void Start(TimeSpan timeout) { 
    var ready = Observable.Defer(() => { 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Point B - see below 
    return this.readySubject.AsObservable(); 
    }); 
    ready.Timeout(timeout).First(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); 
    this.readySubject.OnNext(new Unit()); 
} 

अब अतुल्यकालिक आपरेशन तुरंत लेकिन केवल तभी जब IObservable किया जा रहा है शुरू नहीं कर रहा है: यहाँ आरएक्स उपयोग करने के लिए थोड़ा और अधिक जटिल प्रयास है। दुर्भाग्य से वहाँ अभी भी एक रेस स्थिति लेकिन बिंदु बी पर इस बार अतुल्यकालिक आपरेशन कॉल OnNext शुरू कर दिया इससे पहले कि Defer लैम्ब्डा रिटर्न यह अभी भी खो दिया है और एक TimeoutExceptionTimeout द्वारा फेंक दिया जाएगा तो है।

मुझे पता है कि मैं Replay जैसे ऑपरेटर का उपयोग बफर घटनाओं के लिए कर सकता हूं लेकिन आरएक्स के बिना मेरा प्रारंभिक उदाहरण किसी भी तरह के बफरिंग का उपयोग नहीं करता है। क्या रेस की स्थिति के बिना मेरी समस्या को हल करने के लिए आरएक्स का उपयोग करने का कोई तरीका है? संक्षेप में IObservable के बाद एसिंक्रोनस ऑपरेशन शुरू करने के बाद इस मामले में Timeout और First से कनेक्ट किया गया है?


पॉल बेट्स का जवाब यहाँ के आधार पर समाधान काम कर रहा है:

void Start(TimeSpan timeout) { 
    var readySubject = new AsyncSubject<Unit>(); 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1))); 
    // Point C - see below 
    readySubject.Timeout(timeout).First(); 
} 

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) { 
    Thread.Sleep(delay); 
    readySubject.OnNext(new Unit()); 
    readySubject.OnCompleted(); 
} 

जब वहाँ बिंदु सेल्सियस पर एक देरी समय AsyncStart पूरा करने के लिए ले जाता है से अधिक लंबी है कि दिलचस्प हिस्सा है। AsyncSubject भेजे गए अंतिम अधिसूचना को बरकरार रखता है और Timeout और First अभी भी अपेक्षित प्रदर्शन करेगा।

उत्तर

12

तो, आरएक्स के बारे में जानने के लिए एक बात मुझे लगता है कि बहुत से लोग पहले (खुद को शामिल करते हैं!): यदि आप किसी पारंपरिक थ्रेडिंग फ़ंक्शन का उपयोग कर रहे हैं जैसे ResetEvents, Thread.Sleeps, या जो भी हो, आप कर रहे हैं यह गलत (टीएम) - यह LINQ में Arrays को चीजों को कास्टिंग करने जैसा है क्योंकि आप जानते हैं कि अंतर्निहित प्रकार एक सरणी होता है।

यह जानना महत्वपूर्ण बात यह है कि एक async func को उस फ़ंक्शन द्वारा दर्शाया जाता है जो IObservable<TResult> देता है - यह जादू सॉस है जो आपको कुछ पूरा होने पर संकेत देता है। तो यहाँ कैसे आप "आरएक्स-ify" चाहते हैं एक और अधिक परंपरागत async समारोह, जैसा कि एक Silverlight वेब सेवा में देखते हैं क्या है:

IObservable<byte[]> readFromNetwork() 
{ 
    var ret = new AsyncSubject(); 
    // Here's a traditional async function that you provide a callback to 
    asyncReaderFunc(theFile, buffer => { 
     ret.OnNext(buffer); 
     ret.OnCompleted(); 
    }); 

    return ret; 
} 

यह एक सभ्य प्रयास है लेकिन दुर्भाग्य से यह एक रेस स्थिति शामिल है।

यह जहां AsyncSubject आता है - यह सुनिश्चित करें कि भले ही asyncReaderFunc धड़कता पंच के लिए सदस्यता लें बनाता है, AsyncSubject अभी भी "रिप्ले" होगा कि क्या हुआ।

तो, अब है कि हम अपने समारोह मिल गया है, हम दिलचस्प बातें की बहुत सारी इसे करने के लिए कर सकते हैं:

// Make it into a sync function 
byte[] results = readFromNetwork().First(); 

// Keep reading blocks one at a time until we run out 
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => { 
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length); 
}) 

// Read the entire stream and get notified when the whole deal is finished 
readFromNetwork() 
    .Repeat().TakeUntil(x => x == null || x.Length == 0) 
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes)) 
    .Subscribe(ms => { 
     Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length); 
    }); 

// Or just get the entire thing as a MemoryStream and wait for it 
var memoryStream = readFromNetwork() 
    .Repeat().TakeUntil(x => x == null || x.Length == 0) 
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes)) 
    .First(); 
4

मैं आगे जोड़ने WaitHandles के पॉल की टिप्पणी को जोड़ना होगा मतलब है कि आप गलत कर रहे हैं, कि विषयों का उपयोग सीधे आमतौर पर इसका मतलब है कि आप इसे भी गलत कर रहे हैं। ;-)

अनुक्रम या पाइपलाइनों के साथ काम कर रहे अपने आरएक्स कोड पर विचार करने का प्रयास करें। विषय पढ़ने और लिखने की क्षमताओं की पेशकश करते हैं जिसका मतलब है कि अब आप पाइपलाइन या अनुक्रम के साथ काम नहीं कर रहे हैं (जब तक आपके पास पाइपलाइन नहीं है जो दोनों तरीकों या अनुक्रमों को पीछे ला सकते हैं?!?)

तो पहले पॉल का कोड बहुत अच्छा है, लेकिन चलो "आरएक्स नरक से बाहर"।

AsyncStart विधि यह इस

IObservable<Unit> AsyncStart(TimeSpan delay) 
{ 
    Observable.Timer(delay).Select(_=>Unit.Default); 
} 

तो आसान करने के लिए परिवर्तन! देखो कोई विषय और डेटा केवल एक ही रास्ता बहता है। यहां महत्वपूर्ण बात हस्ताक्षर परिवर्तन है। यह हमें सामान धक्का देगा। यह अब बहुत स्पष्ट है। मेरे विषय में गुजरना बहुत संदिग्ध है।

दूसरा। अब हमें स्टार्ट विधि में परिभाषित विषय की आवश्यकता नहीं है। हम पुरानी स्कूल ThreadPool.QueueUserWorkItem की बजाय शेड्यूलर सुविधाओं का भी लाभ उठा सकते हैं। > IsReady - -> शुरू

इसके बजाय प्रारंभ की -> AsyncStart -> शुरू

तो

void Start(TimeSpan timeout) 
{ 
    var isReady = AsyncStart(TimeSpan.FromSeconds(1)) 
        .SubscribeOn(Scheduler.ThreadPool) 
        .PublishLast(); 
    isReady.Connect(); 
    isReady.Timeout(timeout).First(); 
} 

अब हम

AsyncStart एक स्पष्ट पाइपलाइन या घटनाओं के अनुक्रम है मुझे आपकी समस्या की अधिक जगह पता था, मुझे यकीन है कि हम ऐसा करने के एक बेहतर तरीके से आ सकते हैं जिसके लिए प्रारंभ विधि की अवरुद्ध प्रकृति की आवश्यकता नहीं है। जितना अधिक आप आरएक्स का उपयोग करेंगे उतना अधिक आप पाएंगे कि जब आपको अवरुद्ध करने की आवश्यकता होती है, तो वाईथेंडल्स का उपयोग करें, आदि को खिड़की से बाहर निकाल दिया जा सकता है।

+0

उत्तर देने के लिए धन्यवाद। आरएक्स जारी होने से पहले सवाल पोस्ट किया गया था और तब से एपीआई सतह बहुत बदल गई है।मेरी विशेष समस्या के संबंध में मुझे डर है कि मैं खिड़की से अवरुद्ध नहीं कर सकता। जिस एपीआई का उपयोग कर रहा था, उसे एक बहु स्तरीय प्रारंभिकता की आवश्यकता थी, लेकिन आरएक्स का उपयोग करने से मुझे इनमें से कुछ को समानांतर करने की अनुमति मिली। –

+0

मुझे लगता है कि आरएक्स वास्तव में मल्टी स्टेज प्रारंभ करने के लिए भी उपयोगी है, हालांकि मुझे यह भी लगता है कि मुझे अभी भी ब्लॉक करने की आवश्यकता नहीं है। प्रायः आपके पास एक भी घटना हो सकती है जो उठाया जाता है जब सभी भागों तैयार होते हैं कि सिस्टम तैयार है। इसके बाद आप बुलाए जाने से पहले अन्यथा ब्लॉक करने के लिए उस विकल्प को चुनने के लिए प्रभावी रूप से एक चुनिंदा (कई) कर सकते हैं। –

+0

ओह पीएस, मैंने जो कोड प्रदान किया है वह पिछले 3 वर्षों में आरएक्स एपीआईएस पर काम करेगा (कुछ भिन्नता में)। –