2012-02-27 31 views
14

मुझे जावा ब्लॉकिंगक्यूयू के समान डेटा संरचना में रूचि है, अपवाद के साथ कि यह कतार में ऑब्जेक्ट्स को बैच करने में सक्षम होना चाहिए। दूसरे शब्दों में, मैं निर्माता को कतार में वस्तुओं को रखने में सक्षम होना चाहता हूं, लेकिन उपभोक्ता ब्लॉक take() पर है जब तक कि कतार एक निश्चित आकार (बैच आकार) तक पहुंच न जाए।बैचिंग के साथ जावा ब्लॉकिंगक्यूयू?

फिर, एक बार कतार बैच आकार तक पहुँच गया है, निर्माता put() पर ब्लॉक चाहिए तक उपभोक्ता कतार में तत्वों (इस स्थिति में निर्माता फिर से उत्पादन शुरू कर देंगे और उपभोक्ता ब्लॉक बैच तक के सभी उपयोग किए गए फिर से पहुंचा है)।

क्या समान डेटा संरचना मौजूद है? या मुझे इसे लिखना चाहिए (जो मुझे कोई फर्क नहीं पड़ता), अगर मैं वहां कुछ भी कर रहा हूं तो मैं अपना समय बर्बाद नहीं करना चाहता हूं।


अद्यतन

शायद चीज़ें थोड़ी स्पष्ट करने के लिए:

स्थिति हमेशा इस प्रकार होगी। कतार में वस्तुओं को जोड़ने वाले कई उत्पादक हो सकते हैं, लेकिन कतार से सामान लेने वाले एक से अधिक उपभोक्ता कभी नहीं होंगे।

अब, समस्या यह है कि समानांतर और धारावाहिक में इन सेटअप में से कई हैं। दूसरे शब्दों में, निर्माता कई कतारों के लिए वस्तुओं का उत्पादन करते हैं, जबकि उपभोक्ता अपने अधिकार में उत्पादक भी हो सकते हैं। इसे उत्पादकों, उपभोक्ता-उत्पादकों और आखिरकार उपभोक्ताओं के निर्देशित ग्राफ के रूप में अधिक आसानी से सोचा जा सकता है।

कारण है कि उत्पादकों को कतार खाली होने तक ब्लॉक करना चाहिए (@ पीटर लॉरी) क्योंकि इनमें से प्रत्येक धागे में चल रहा है। यदि आप उन्हें बस अंतरिक्ष के रूप में उपलब्ध कराने के लिए छोड़ देते हैं, तो आप एक ऐसी स्थिति के साथ समाप्त हो जाएंगे जहां आपके पास बहुत सारी चीजें एक साथ कई प्रक्रियाओं को संसाधित करने की कोशिश कर रही हैं।

शायद इसे निष्पादन सेवा के साथ जोड़ना समस्या को हल कर सकता है?

उत्तर

9

मैं सुझाव दूंगा कि आप BlockingQueue.drainTo(Collection, int) का उपयोग करें। यह सुनिश्चित करने के लिए कि आप कम से कम तत्व प्राप्त करते हैं, आप इसका उपयोग() के साथ कर सकते हैं।

इस दृष्टिकोण का उपयोग करने का लाभ यह है कि आपका बैच आकार वर्कलोड के साथ गतिशील रूप से बढ़ता है और उपभोक्ता व्यस्त होने पर उत्पादक को अवरुद्ध नहीं करना पड़ता है। यानी यह विलंबता और थ्रूपुट के लिए स्वयं अनुकूलित करता है। के रूप में पूछा


वास्तव में लागू करने के लिए (जो मुझे लगता है कि एक बुरा विचार है) आप एक व्यस्त उपभोक्ता धागे से एक SynchronousQueue उपयोग कर सकते हैं।

अर्थात लेने वाली धागा जब कभी उपभोक्ता व्यस्त है एक

list.clear(); 
while(list.size() < required) list.add(queue.take()); 
// process list. 

निर्माता को ब्लॉक करेगा करता है।

+0

मैं चाहता हूं कि निर्माता व्यस्त हो, जबकि उपभोक्ता व्यस्त हो। –

+1

दिलचस्प, अधिकांश सिस्टम इससे बचने के लिए बहुत अधिक समय तक जाते हैं। ;) दूसरा सुझाव ठीक वही करेगा। यदि आप निर्माता को ब्लॉक करने के लिए चाहते हैं, तो आप एकाधिक धागे का उपयोग क्यों कर रहे हैं? "निर्माता" के लिए यह प्रोसेसर/उपभोक्ता होने के साथ आसान नहीं होगा, साथ ही आप उन्हें एक ही समय में चलाना नहीं चाहते हैं। –

+0

कृपया मेरा अपडेट देखें। डिजाइन के लिए उत्पादकों को भी ब्लॉक की आवश्यकता होती है ताकि निष्पादन धागे की संख्या कम रखा जा सके। इसके अलावा, यह उत्पादकों और उपभोक्ताओं के बीच एक निर्भरता समस्या हल करता है। –

1

ऐसा नहीं है कि मुझे पता है। अगर मैं सही ढंग से समझता हूं कि आप निर्माता को काम करना चाहते हैं (जब उपभोक्ता अवरुद्ध हो जाता है) जब तक कि यह कतार या उपभोक्ता को काम करने के लिए (उत्पादक ब्लॉक) तक भर देता है जब तक कि यह कतार को साफ़ नहीं करता है। यदि ऐसा है तो मैं सुझाव दे सकता हूं कि आपको डेटा संरचना की आवश्यकता नहीं है बल्कि एक पार्टी को अवरुद्ध करने के लिए एक तंत्र है जबकि दूसरा म्यूटेक्स फेशन में काम कर रहा है। आप इसके लिए किसी ऑब्जेक्ट को लॉक कर सकते हैं और आंतरिक रूप से लॉक को रिलीज़ करने के लिए पूर्ण या खाली का तर्क है और इसे दूसरी पार्टी में पास कर सकते हैं। तो संक्षेप में, आपको इसे स्वयं लिखना चाहिए :)

1

ऐसा लगता है कि रिंगबफर एलएमएक्स विघटन पैटर्न में कैसे काम करता है। अधिक के लिए http://code.google.com/p/disruptor/ देखें।

एक बहुत ही नाराज स्पष्टीकरण आपकी मुख्य डेटा संरचना रिंगबफर है। उत्पादक अनुक्रम में रिंग बफर में डेटा डालते हैं और उपभोक्ता उतने डेटा को खींच सकते हैं क्योंकि निर्माता ने बफर में डाल दिया है (इसलिए अनिवार्य रूप से बैचिंग)। यदि बफर भरा हुआ है, तो निर्माता तब तक अवरुद्ध हो जाता है जब तक उपभोक्ता समाप्त नहीं हो जाता है और बफर में स्लॉट को मुक्त कर देता है।

2

यहां एक त्वरित (= सरल लेकिन पूरी तरह से परीक्षण नहीं किया गया) कार्यान्वयन है जो मुझे लगता है कि आपके अनुरोधों के लिए उपयुक्त हो सकता है - यदि आपको आवश्यकता हो तो आपको पूर्ण कतार इंटरफ़ेस का समर्थन करने के लिए इसे विस्तारित करने में सक्षम होना चाहिए।

प्रदर्शन को बढ़ाने के लिए आप का उपयोग कर "सिंक्रनाइज़" कीवर्ड की जगह ReentrantLock स्विच कर सकते हैं ..

public class BatchBlockingQueue<T> { 

    private ArrayList<T> queue; 
    private Semaphore readerLock; 
    private Semaphore writerLock; 
    private int batchSize; 

    public BatchBlockingQueue(int batchSize) { 
     this.queue = new ArrayList<>(batchSize); 
     this.readerLock = new Semaphore(0); 
     this.writerLock = new Semaphore(batchSize); 
     this.batchSize = batchSize; 
    } 

    public synchronized void put(T e) throws InterruptedException { 
     writerLock.acquire(); 
     queue.add(e); 
     if (queue.size() == batchSize) { 
      readerLock.release(batchSize); 
     } 
    } 

    public synchronized T poll() throws InterruptedException { 
     readerLock.acquire(); 
     T ret = queue.remove(0); 
     if (queue.isEmpty()) { 
      writerLock.release(batchSize); 
     } 
     return ret; 
    } 

} 

आशा है कि आप इसे उपयोगी पाते।