2008-12-05 15 views
7

मैं पाइथन 2.6 में नए मल्टीप्रोसेसिंग मॉड्यूल के साथ प्रयोग कर रहा हूं। मैं अपने स्वयं के मल्टीप्रोसेसर के साथ कई प्रक्रियाएं बना रहा हूं। जॉइनबल क्यूई उदाहरण। प्रत्येक प्रक्रिया में एक या अधिक कार्यकर्ता धागे (थ्रेडिंग के सबक्लास) होते हैं जो जॉइनबल क्यूईयू उदाहरण साझा करते हैं (प्रत्येक थ्रेड के __init__ विधि के माध्यम से पारित)। ऐसा लगता है कि आम तौर पर लेकिन कभी-कभी और अप्रत्याशित रूप से काम करने के लिए निम्न त्रुटि के साथ विफल:पायथन 2.6 multiprocessing.Queue धागे के साथ संगत?

File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run 
    self.queue.task_done() 
    File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done 
    raise ValueError('task_done() called too many times') 
ValueError: task_done() called too many times 

मेरी पंक्ति मिल() और task_done() कॉल कर रहे हैं सही एक दूसरे के बाद तो वे बराबर होना चाहिए। अनजाने में यह तब होता है जब get() और task_done() के बीच किए गए काम बहुत तेज़ होते हैं। एक छोटी time.sleep(0.01) डालने से समस्या कम हो जाती है।

कोई विचार क्या चल रहा है? क्या मैं अधिक पारंपरिक (Queue.Queue) के बजाय थ्रेड के साथ एक मल्टीप्रोसेसर कतार का उपयोग कर सकता हूं?

धन्यवाद!

-brian

+0

आपके कोड का एक अंश जिसमें क्यू ऑब्जेक्ट्स शामिल हैं, मदद कर सकता है। – jfs

उत्तर

2

आपको कतार वस्तुओं को लक्ष्य के तर्क के रूप में पारित करना चाहिए। multiprocessing's documentation से

उदाहरण:

from multiprocessing import Process, Queue 

def f(q): 
    q.put([42, None, 'hello']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print q.get() # prints "[42, None, 'hello']" 
    p.join() 

Queues are thread and process safe.

-1

त्वरित प्रतिक्रिया के लिए धन्यवाद। मैं multiprocessing गुजर रहा हूँ। जैसा कि आप बताते हैं प्रत्येक प्रक्रिया के लिए तर्क के रूप में queue उदाहरण। विफलता धागे में होती है। मैं थ्रेडिंग को उप-वर्गीकरण करके बना रहा हूं। प्रत्येक थ्रेड इंस्टेंस के 'init' कतार को कतारबद्ध करना और पास करना। ऐसा लगता है कि कतार में थ्रेड उप-वर्गों में जाने का स्वीकार्य तरीका है। मेरा एकमात्र विचार था कि मल्टीप्रोसेसिंग क्यूई थ्रेड के साथ संगत नहीं हो सकता है (हालांकि वे माना जाता है कि थ्रेड-सुरक्षित)।

4

मैंने अभी तक 2.6 में बहु-प्रोसेसिंग के साथ प्रयोग नहीं किया है, लेकिन मैंने पाइप्रोसेसिंग के साथ बहुत कुछ खेला (जैसा कि इसे 2.5 में कहा गया था)।

मैं देख सकता हूं कि आप क्रमशः धागे के सेट के साथ कई प्रक्रियाओं की तलाश कर रहे हैं।

जब से तुम बहु मॉड्यूल का उपयोग कर रहे हैं, मैं उपयोग बहु प्रक्रिया का सुझाव देगा और न बहु धागा दृष्टिकोण, आप गतिरोध की तरह कम समस्याओं मारा जाएगा, आदि

एक कतार वस्तु बनाएँ। http://pyprocessing.berlios.de/doc/queue-objects.html

एक बहु प्रक्रिया वातावरण बनाने के लिए पूल का उपयोग करें: http://pyprocessing.berlios.de/doc/pool-objects.html जो आपके लिए कार्यकर्ता प्रक्रियाओं का प्रबंधन करेगा। फिर आप श्रमिकों को एसिंक्रोनस/सिंक्रोनस लागू कर सकते हैं और यदि आवश्यक हो तो प्रत्येक कार्यकर्ता के लिए कॉलबैक भी जोड़ सकते हैं। लेकिन याद वापस बुलाना एक आम कोड खंड है और इसे तुरंत (के रूप में दस्तावेज में उल्लेख किया है) लौटना चाहिए

कुछ अतिरिक्त जानकारी: यदि आवश्यक कतार वस्तु का उपयोग करने का प्रबंधन करने के लिए एक प्रबंधक http://pyprocessing.berlios.de/doc/manager-objects.html पैदा करते हैं। आपको इसके लिए कतार वस्तु साझा करना होगा। लेकिन इसका फायदा यह है कि, एक बार साझा और प्रबंधित होने पर आप प्रॉक्सी ऑब्जेक्ट्स बनाकर पूरे नेटवर्क पर इस साझा कतार तक पहुंच सकते हैं। यह आपको किसी केंद्रीकृत साझा कतार वस्तु के तरीकों को (स्पष्ट रूप से) किसी भी नेटवर्क नोड पर मूल विधियों के रूप में कॉल करने में सक्षम करेगा।

यहाँ प्रलेखन

यह एक मशीन पर एक प्रबंधक सर्वर चलाने के लिए और ग्राहकों को अन्य मशीनों (यह मानते हुए कि फायरवॉल शामिल इसे अनुमति देने के) से इसका इस्तेमाल करना संभव है से एक कोड उदाहरण है।

>>> from processing.managers import BaseManager, CreatorMethod 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(typeid='get_proxy') 
... 
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none') 
>>> queue = m.get_proxy() 
>>> queue.put('hello') 

आप सुरक्षित थ्रेड पर जोर देते हैं: इस प्रकार

>>> from processing.managers import BaseManager, CreatorMethod 
>>> import Queue 
>>> queue = Queue.Queue() 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy') 
... 
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none') 
>>> m.serve_forever() 

एक ग्राहक सर्वर का उपयोग कर सकते हैं: निम्न कमांड चल रहा है एक साझा कतार के लिए एक सर्वर जो दूरस्थ क्लाइंट प्रयोग कर सकते हैं सामान, पीईपी 371 (मल्टीप्रोसेसिंग) संदर्भ http://code.google.com/p/python-safethread/