2012-06-07 15 views
10

के बीच देरी जोड़ें मेरे पास एक ऐसा एप्लिकेशन है जो कुछ बिंदुओं पर लगभग एक ही समय में 1000 घटनाओं को बढ़ाता है। मैं क्या करना चाहता हूं कि घटनाओं को 50 वस्तुओं के टुकड़ों में बैच करना और उन्हें हर 10 सेकंड में प्रोसेस करना शुरू करना है। एक नई बैच प्रोसेसिंग शुरू करने से पहले बैच को पूरा करने की प्रतीक्षा करने की आवश्यकता नहीं है।प्रतिक्रियाशील एक्सटेंशन: बैचों में प्रक्रिया की घटनाएं + प्रत्येक बैच

उदाहरण के लिए:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

कोई भी विचार कैसे इस लक्ष्य को हासिल करने के लिए? मुझे लगता है कि प्रतिक्रियाशील एक्सटेंशन जाने का रास्ता है लेकिन अन्य समाधान भी स्वीकार्य हैं।

मैं यहाँ से शुरू करने की कोशिश:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

लेकिन गौर किया है कि देरी के रूप में मैं के लिए आशा व्यक्त की और हालांकि 5 सेकंड देरी के बजाय सभी बैचों, एक साथ शुरू कर दिया काम नहीं किया।

मैंने विंडो-विधि का भी परीक्षण किया, लेकिन मुझे व्यवहार में कोई अंतर नहीं आया। मुझे लगता है कि विंडो में TimeSpan वास्तव में इसका मतलब है कि "हर घटना जो अगले 10 सेकंड में होता है ले:।

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

मैं आरएक्स-मुख्य 2.0.20304 बीटा का उपयोग कर रहा

उत्तर

20

आप धागे सोने के लिए नहीं करना चाहते हैं, तो आप ऐसा कर सकते हैं:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

मेरी इच्छा है कि मैं इसे एक से अधिक बार बढ़ा सकता हूं! इसे समझने की कोशिश में बस तीन घंटे बिताए। –

1

इस प्रयास करें:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); });