2012-02-09 19 views
6

मैं concurrent.futures.ProcessPoolExecutor का उपयोग करके कुछ कोड समानांतर करने की कोशिश कर रहा हूं लेकिन ThreadPoolExecutor के साथ होने वाली अजीब डेडलॉक्स नहीं रखी है। एक न्यूनतम उदाहरण:समवर्ती.फ्यूचर कोड में डेडलॉक

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     executor.submit(test) 

अजगर 3.2.2 में (64-बिट Ubuntu पर), यह सब कार्य सबमिट करने के बाद लगातार लटका लगता है - और ऐसा करने के लिए जब भी प्रस्तुत नौकरियों की संख्या से अधिक है लगता है श्रमिकों की संख्या अगर मैं ProcessPoolExecutor को ThreadPoolExecutor के साथ प्रतिस्थापित करता हूं तो यह बेकार ढंग से काम करता है।

की जांच के लिए एक प्रयास के रूप में, मैं प्रत्येक भविष्य एक कॉलबैक दिया i का मूल्य मुद्रित करने के लिए:

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test) 

     def callback(f): 
      print('callback {}'.format(i)) 
     future.add_done_callback(callback) 

यह सिर्फ मुझे और भी अधिक भ्रमित - callback द्वारा मुद्रित i के मूल्य पर मूल्य है जिस समय इसे परिभाषित किया गया था, उसके बजाए इसे कॉल किया जाता है (इसलिए मैं कभी भी callback 0 नहीं देखता लेकिन मुझे callback 99 एस बहुत मिलता है)। फिर, ThreadPoolExecutor अपेक्षित मूल्य प्रिंट करता है।

आश्चर्य है कि यह एक बग हो सकता है, मैंने पाइथन के हाल के विकास संस्करण की कोशिश की। अब, कोड कम से कम समाप्त होता प्रतीत होता है, लेकिन मुझे अभी भी i मुद्रित आउट का गलत मान मिलता है।

तो किसी को समझा सकता है:

  • क्या अजगर 3.2 और वर्तमान देव संस्करण है कि जाहिरा तौर पर इस गतिरोध

  • तय क्यों 'गलत' i का मूल्य मुद्रित किया जा रहा है के बीच में ProcessPoolExecutor हुआ

संपादित करें: जैसा कि जुकिविज़ ने नीचे बताया है, निश्चित रूप से प्रिंटिंग i प्रिंट करेगा उस कॉलबैक को कॉल करने के समय मूल्य, मुझे नहीं पता कि मैं क्या सोच रहा था ... अगर मैं i के मूल्य के साथ एक कॉल करने योग्य ऑब्जेक्ट पास करता हूं, तो इसकी अपेक्षाओं में से एक के रूप में काम करता है।

संपादित करें: थोड़ा और अधिक जानकारी: कॉलबैक के सभी, क्रियान्वित कर रहे हैं तो ऐसा लगता है कि यह executor.shutdown (executor.__exit__ द्वारा कहा जाता है) कि बताने के लिए है कि प्रक्रियाओं को पूरा कर लिया असमर्थ है। ऐसा लगता है कि वर्तमान पायथन 3.3 में पूरी तरह से तय किया गया है, लेकिन multiprocessing और concurrent.futures में बहुत सारे बदलाव हुए हैं, इसलिए मुझे नहीं पता कि यह क्या तय करता है। चूंकि मैं 3.3 का उपयोग नहीं कर सकता (यह रिलीज या डिस्प्ले के देव संस्करणों के साथ संगत प्रतीत नहीं होता है), मैंने बस अपने 3.2 इंस्टॉलेशन में अपने मल्टीप्रोसेसिंग और समवर्ती पैकेजों की प्रतिलिपि बनाने की कोशिश की, जो ठीक काम करने लगता है। फिर भी, यह थोड़ा अजीब लगता है कि - जहां तक ​​मैं देख सकता हूं - ProcessPoolExecutor नवीनतम रिलीज संस्करण में पूरी तरह से टूटा हुआ है लेकिन कोई भी प्रभावित नहीं हुआ है।

+1

दूसरे के रूप में, यह प्राकृतिक है कि प्रिंट '99' को संसाधित करता है। प्रतीक 'i' वैश्विक संदर्भ से बंधे हैं, और नई प्रक्रियाएं बनाना महंगा है, इसलिए जब तक आप कुछ भी निष्पादित करते हैं,' i == 99'। – julkiewicz

+1

इसके अलावा, मेरे पास उबंटू 64-बिट, पायथन 3.2.2 है और पहला कोड स्निपेट लटका नहीं है ... – julkiewicz

+0

@julkiewicz: यह बहुत अजीब है। मैंने अभी 64-बिट वैज्ञानिक लिनक्स और पायथन 3.2.2 चलाने वाली एक अलग मशीन पर इसे आजमाया है, और 10 में से 10 प्रयासों पर '99 सबमिट करने' प्रिंट करने के बाद इसे रोक दिया गया। मैंने कोड को 'if __name__ ==' में लपेटने का भी प्रयास किया। __main __ 'जैसा कि मैंने सुना है कि विंडोज पर मल्टीप्रोसेसिंग के लिए जरूरी है। – James

उत्तर

2

मैंने कोड को निम्नानुसार संशोधित किया, जिससे दोनों समस्याओं का हल हो गया। callback फ़ंक्शन को बंद करने के रूप में परिभाषित किया गया था, इस प्रकार हर बार i के अद्यतन मूल्य का उपयोग करेगा। डेडलॉक के रूप में, सभी कार्य पूरा होने से पहले निष्पादक को बंद करने का एक कारण होने की संभावना है। फ्यूचर्स को पूरा करने की प्रतीक्षा करना भी हल करता है।

from concurrent import futures 

def test(i): 
    return i 

def callback(f): 
    print('callback {}'.format(f.result())) 


with futures.ProcessPoolExecutor(4) as executor: 
    fs = [] 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test, i) 
     future.add_done_callback(callback) 
     fs.append(future) 

    for _ in futures.as_completed(fs): pass 

अद्यतन: ओह, क्षमा करें, मैंने आपके अपडेट नहीं पढ़े हैं, ऐसा लगता है कि यह पहले ही हल हो चुका है।