मैं एक बहुत बड़े कार्य को विभाजित करने के लिए मल्टीप्रोसेसिंग मॉड्यूल का उपयोग कर रहा हूं। यह अधिकांश भाग के लिए काम करता है, लेकिन मुझे अपने डिजाइन के साथ कुछ स्पष्ट याद आना चाहिए, क्योंकि इस तरह से डेटा के संसाधित होने पर प्रभावी ढंग से यह बताने में मेरे लिए बहुत मुश्किल है।मल्टीप्रोसेसिंग - निर्माता/उपभोक्ता डिजाइन
मेरे पास दो अलग-अलग कार्य हैं जो चलते हैं; एक जो दूसरे को खिलाता है। मुझे लगता है कि यह एक निर्माता/उपभोक्ता समस्या है। मैं सभी प्रक्रियाओं के बीच साझा कतार का उपयोग करता हूं, जहां निर्माता कतार भरते हैं, और उपभोक्ता कतार से पढ़ते हैं और प्रसंस्करण करते हैं। समस्या यह है कि डेटा की एक सीमित मात्रा है, इसलिए किसी बिंदु पर सभी को यह जानने की जरूरत है कि सभी डेटा संसाधित हो गए हैं ताकि सिस्टम सुन्दरता से बंद हो सके।
मैप_एसिंक() फ़ंक्शन का उपयोग करना समझ में आता है, लेकिन चूंकि निर्माता कतार भर रहे हैं, इसलिए मैं सभी वस्तुओं को सामने नहीं जानता, इसलिए मुझे थोड़ी देर लूप में जाना है और apply_async() का उपयोग करें और यह पता लगाने का प्रयास करें कि कुछ समय के साथ सबकुछ कब किया जाता है ... बदसूरत।
मुझे लगता है कि मुझे कुछ स्पष्ट याद आ रहा है। यह बेहतर ढंग से कैसे डिजाइन किया जा सकता है?
prodcuer
class ProducerProcess(multiprocessing.Process):
def __init__(self, item, consumer_queue):
self.item = item
self.consumer_queue = consumer_queue
multiprocessing.Process.__init__(self)
def run(self):
for record in get_records_for_item(self.item): # this takes time
self.consumer_queue.put(record)
def start_producer_processes(producer_queue, consumer_queue, max_running):
running = []
while not producer_queue.empty():
running = [r for r in running if r.is_alive()]
if len(running) < max_running:
producer_item = producer_queue.get()
p = ProducerProcess(producer_item, consumer_queue)
p.start()
running.append(p)
time.sleep(1)
उपभोक्ता
def process_consumer_chunk(queue, chunksize=10000):
for i in xrange(0, chunksize):
try:
# don't wait too long for an item
# if new records don't arrive in 10 seconds, process what you have
# and let the next process pick up more items.
record = queue.get(True, 10)
except Queue.Empty:
break
do_stuff_with_record(record)
मुख्य
if __name__ == "__main__":
manager = multiprocessing.Manager()
consumer_queue = manager.Queue(1024*1024)
producer_queue = manager.Queue()
producer_items = xrange(0,10)
for item in producer_items:
producer_queue.put(item)
p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
p.start()
consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)
यहाँ जहां यह घटिया हो जाता है। मैं मानचित्र का उपयोग नहीं कर सकता, क्योंकि उपभोग करने की सूची एक ही समय में भर दी जा रही है। तो मुझे थोड़ी देर लूप में जाना है और टाइमआउट का पता लगाने की कोशिश करनी है। उपभोक्ता_क्यू खाली हो सकता है जबकि निर्माता अभी भी इसे भरने की कोशिश कर रहे हैं, इसलिए मैं उस पर एक खाली कतार का पता नहीं लगा सकता।
timed_out = False
timeout= 1800
while 1:
try:
result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue,), dict(chunksize=chunksize,))
if timed_out:
timed_out = False
except Queue.Empty:
if timed_out:
break
timed_out = True
time.sleep(timeout)
time.sleep(1)
consumer_queue.join()
consumer_pool.close()
consumer_pool.join()
मैंने सोचा कि शायद मैं मिल सकता है() मुख्य थ्रेड में रिकॉर्ड और में कतार में उत्तीर्ण होने की जगह उपभोक्ता में उन गुजरती हैं, लेकिन मुझे लगता है मैं एक ही समस्या के साथ इस तरह अंत। मुझे अभी भी थोड़ी देर लूप चलाना है और apply_async का उपयोग करना है() किसी भी सलाह के लिए अग्रिम धन्यवाद!
मुझे लगता है कि यह काम करेगा। धन्यवाद! मुझे यकीन नहीं है कि कैसे शामिल() आपके विवरण से काम करता है, लेकिन मुझे लगता है कि मुझे एक रास्ता मिला। मैं event_producer_process() प्रक्रिया में ईवेंट को पास करता हूं, और सेट() इसे सभी उत्पादकों ने उपभोक्ता_क्यू में जोड़ना समाप्त कर दिया है। उस बिंदु पर (मुख्य धागे में वापस) यदि उपभोक्ता_क्यू खाली हो जाता है, तो इसका मतलब है कि सब कुछ संसाधित हो गया है, इसलिए मैं सुरक्षित रूप से थोड़ी देर से बाहर निकल सकता हूं। – user1914881
उलझन में भाग लेने के लिए खेद है, जुड़ने मुख्य धागे में होगा ताकि आप अपने उत्पादकों के समाप्त होने के बाद कार्यक्रम से बाहर नहीं निकले और उपभोक्ताओं ने अभी अपना काम शुरू कर दिया था। – sean