2012-12-20 48 views
6

मैं एक बहुत बड़े कार्य को विभाजित करने के लिए मल्टीप्रोसेसिंग मॉड्यूल का उपयोग कर रहा हूं। यह अधिकांश भाग के लिए काम करता है, लेकिन मुझे अपने डिजाइन के साथ कुछ स्पष्ट याद आना चाहिए, क्योंकि इस तरह से डेटा के संसाधित होने पर प्रभावी ढंग से यह बताने में मेरे लिए बहुत मुश्किल है।मल्टीप्रोसेसिंग - निर्माता/उपभोक्ता डिजाइन

मेरे पास दो अलग-अलग कार्य हैं जो चलते हैं; एक जो दूसरे को खिलाता है। मुझे लगता है कि यह एक निर्माता/उपभोक्ता समस्या है। मैं सभी प्रक्रियाओं के बीच साझा कतार का उपयोग करता हूं, जहां निर्माता कतार भरते हैं, और उपभोक्ता कतार से पढ़ते हैं और प्रसंस्करण करते हैं। समस्या यह है कि डेटा की एक सीमित मात्रा है, इसलिए किसी बिंदु पर सभी को यह जानने की जरूरत है कि सभी डेटा संसाधित हो गए हैं ताकि सिस्टम सुन्दरता से बंद हो सके।

मैप_एसिंक() फ़ंक्शन का उपयोग करना समझ में आता है, लेकिन चूंकि निर्माता कतार भर रहे हैं, इसलिए मैं सभी वस्तुओं को सामने नहीं जानता, इसलिए मुझे थोड़ी देर लूप में जाना है और apply_async() का उपयोग करें और यह पता लगाने का प्रयास करें कि कुछ समय के साथ सबकुछ कब किया जाता है ... बदसूरत।

मुझे लगता है कि मुझे कुछ स्पष्ट याद आ रहा है। यह बेहतर ढंग से कैसे डिजाइन किया जा सकता है?

prodcuer

class ProducerProcess(multiprocessing.Process): 
    def __init__(self, item, consumer_queue): 
     self.item = item 
     self.consumer_queue = consumer_queue 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     for record in get_records_for_item(self.item): # this takes time 
      self.consumer_queue.put(record) 

def start_producer_processes(producer_queue, consumer_queue, max_running): 
    running = [] 

    while not producer_queue.empty(): 
     running = [r for r in running if r.is_alive()] 
     if len(running) < max_running: 
      producer_item = producer_queue.get() 
      p = ProducerProcess(producer_item, consumer_queue) 
      p.start() 
      running.append(p) 
     time.sleep(1) 

उपभोक्ता

def process_consumer_chunk(queue, chunksize=10000): 
    for i in xrange(0, chunksize): 
     try: 
      # don't wait too long for an item 
      # if new records don't arrive in 10 seconds, process what you have 
      # and let the next process pick up more items. 

      record = queue.get(True, 10) 
     except Queue.Empty:     
      break 

     do_stuff_with_record(record) 

मुख्य

if __name__ == "__main__": 
    manager = multiprocessing.Manager() 
    consumer_queue = manager.Queue(1024*1024) 
    producer_queue = manager.Queue() 

    producer_items = xrange(0,10) 

    for item in producer_items: 
     producer_queue.put(item) 

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8)) 
    p.start() 

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1) 

यहाँ जहां यह घटिया हो जाता है। मैं मानचित्र का उपयोग नहीं कर सकता, क्योंकि उपभोग करने की सूची एक ही समय में भर दी जा रही है। तो मुझे थोड़ी देर लूप में जाना है और टाइमआउट का पता लगाने की कोशिश करनी है। उपभोक्ता_क्यू खाली हो सकता है जबकि निर्माता अभी भी इसे भरने की कोशिश कर रहे हैं, इसलिए मैं उस पर एक खाली कतार का पता नहीं लगा सकता।

timed_out = False 
    timeout= 1800 
    while 1: 
     try: 
      result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,)) 
      if timed_out: 
       timed_out = False 

     except Queue.Empty: 
      if timed_out: 
       break 

      timed_out = True 
      time.sleep(timeout) 
     time.sleep(1) 

    consumer_queue.join() 
    consumer_pool.close() 
    consumer_pool.join() 

मैंने सोचा कि शायद मैं मिल सकता है() मुख्य थ्रेड में रिकॉर्ड और में कतार में उत्तीर्ण होने की जगह उपभोक्ता में उन गुजरती हैं, लेकिन मुझे लगता है मैं एक ही समस्या के साथ इस तरह अंत। मुझे अभी भी थोड़ी देर लूप चलाना है और apply_async का उपयोग करना है() किसी भी सलाह के लिए अग्रिम धन्यवाद!

उत्तर

2

आप काम के अंत को सिग्नल करने के लिए manager.Event का उपयोग कर सकते हैं। यह ईवेंट आपकी सभी प्रक्रियाओं के बीच साझा किया जा सकता है और फिर जब आप इसे अपनी मुख्य प्रक्रिया से संकेत देते हैं तो अन्य कर्मचारी फिर से बंद कर सकते हैं।

while not event.is_set(): 
...rest of code... 

तो, आपके उपभोक्ता इस कार्यक्रम को सेट करने के बाद इंतजार करेंगे और सफाई के बाद सफाई को संभालेंगे।

यह ध्वज निर्धारित करने के लिए कि आप निर्माता थ्रेड पर join कर सकते हैं और जब वे सभी पूर्ण हो जाते हैं तो आप उपभोक्ता धागे पर शामिल हो सकते हैं।

+0

मुझे लगता है कि यह काम करेगा। धन्यवाद! मुझे यकीन नहीं है कि कैसे शामिल() आपके विवरण से काम करता है, लेकिन मुझे लगता है कि मुझे एक रास्ता मिला। मैं event_producer_process() प्रक्रिया में ईवेंट को पास करता हूं, और सेट() इसे सभी उत्पादकों ने उपभोक्ता_क्यू में जोड़ना समाप्त कर दिया है। उस बिंदु पर (मुख्य धागे में वापस) यदि उपभोक्ता_क्यू खाली हो जाता है, तो इसका मतलब है कि सब कुछ संसाधित हो गया है, इसलिए मैं सुरक्षित रूप से थोड़ी देर से बाहर निकल सकता हूं। – user1914881

+0

उलझन में भाग लेने के लिए खेद है, जुड़ने मुख्य धागे में होगा ताकि आप अपने उत्पादकों के समाप्त होने के बाद कार्यक्रम से बाहर नहीं निकले और उपभोक्ताओं ने अभी अपना काम शुरू कर दिया था। – sean

0

मैं मल्टीप्रोसेस/थ्रेडिंग के बजाय असतत ईवेंट सिमुलेशन के बजाय SimPy की दृढ़ता से अनुशंसा करना चाहता हूं।

 संबंधित मुद्दे

  • कोई संबंधित समस्या नहीं^_^