2011-04-04 18 views
9

मेरे पास श्रमिकों का एक छोटा सा पूल है (4) और कार्यों की एक बड़ी सूची (5000 ~) है। मैं पूल का उपयोग कर रहा हूं और map_async() के साथ कार्यों को भेज रहा हूं। क्योंकि जो काम मैं चला रहा हूं वह काफी लंबा है, मैं 1 का एक टुकड़ा मजबूर कर रहा हूं ताकि एक लंबी प्रक्रिया कुछ छोटे लोगों को पकड़ न सके।पायथन: मैं multiprocessing.Pool में लंबित कार्यों की संख्या कैसे देख सकता हूं?

मैं क्या करना चाहता हूं समय-समय पर जांचें कि कितने कार्य सबमिट किए जाने हैं। मुझे पता है कि ज्यादातर 4 सक्रिय होंगे, मुझे चिंता है कि प्रक्रिया के लिए कितने शेष हैं।

मैंने चारों ओर गुमराह किया है और मुझे यह कोई नहीं मिल रहा है।

कुछ सरल कोड मदद करने के लिए:

import multiprocessing 
import time 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 

pool = multiprocessing.Pool(4) 
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1) 
pool.close() 

while True: 
    if not jobs.ready(): 
     print("We're not done yet, %s tasks to go!" % <somethingtogettasks>) 
     jobs.wait(2) 
    else: 
     break 
+0

मुझे ध्यान रखना चाहिए कि मैं एक आरएचईएल -6 प्रणाली पर पायथन 2.6 का उपयोग कर रहा हूं, हालांकि मैं विभिन्न संस्करणों/प्लेटफार्मों पर उदाहरणों के लिए खुला हूं। – jkeating

+0

स्थैतिक चर जो कार्य पूर्ण होने पर घट जाता है? (और जब कार्य स्पष्ट रूप से शुरू होता है तो वृद्धि हुई)। – Enders

+0

कार्य तब तक "शुरू नहीं होते" जब तक कार्यकर्ता उन्हें नहीं ले जाता। मुझे लगता है कि अगर मैंने एक वैश्विक बनाया है जो कार्यों का आकार था, तो प्रत्येक बार कार्य शुरू होने पर इसे कम किया जाता है, लेकिन यह थोड़ा अजीब है और कुछ थ्रेड सुरक्षा विचारों की आवश्यकता होती है। – jkeating

उत्तर

6

नौकरियों की तरह दिखता है ._number_left वह है जो आप चाहते हैं। _ इंगित करता है कि यह एक आंतरिक मूल्य है जो डेवलपर्स की ओर से बदल सकता है, लेकिन यह जानकारी प्राप्त करने का एकमात्र तरीका प्रतीत होता है।

+0

आह! यह एपीआई दस्तावेज़ों में नहीं था, और मैं ipython में नौकरियों पर एक dir() करना भूल गया था। जवाब के लिए धन्यवाद! – jkeating

1

नहीं वायुरुद्ध तरह से है कि मैं के बारे में पता है, लेकिन अगर आप map_async के बजाय Pool.imap_unordered() फ़ंक्शन का उपयोग करें, आप तत्वों है कि कार्रवाई की जाती है को रोक सकता।

import multiprocessing 
import time 

process_count = 4 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 
    # Actually, you should return the job you've created here. 
    return num 

pool = multiprocess.Pool(process_count) 
jobs = [] 
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4] 
job_count = 0 
for job in pool.imap_unordered(mytask, items): 
    jobs.append(job) 
    job_count += 1 

    incomplete = len(items) - job_count 
    unsubmitted = max(0, incomplete - process_count) 

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted 

pool.close() 

मैं process_count घटाकर रहा हूँ, क्योंकि आप काफी मान सकते हैं कि सभी प्रक्रियाओं दो अपवादों में से एक के साथ प्रसंस्करण किया जाएगा: 1) अगर आप एक iterator उपयोग करते हैं, वहाँ नहीं हो सकता है आगे आइटम उपभोग करने के लिए छोड़ दिया और प्रक्रिया , और 2) आपके पास 4 से कम आइटम शेष हो सकते हैं। मैंने पहले अपवाद के लिए कोड नहीं किया था। लेकिन यदि आपको आवश्यकता हो तो ऐसा करना बहुत आसान होना चाहिए। वैसे भी, आपका उदाहरण एक सूची का उपयोग करता है ताकि आपको वह समस्या न हो।

संपादित करें: मुझे यह भी एहसास हुआ कि आप एक लूप लूप का उपयोग कर रहे हैं, जो ऐसा लगता है कि आप समय-समय पर कुछ अद्यतन करने की कोशिश कर रहे हैं, कहें, हर आधे सेकेंड या कुछ। एक उदाहरण के रूप में मैंने जो कोड दिया वह इस तरह से नहीं करेगा। मुझे यकीन नहीं है कि यह एक समस्या है।

+0

धन्यवाद। मैंने वास्तव में imap कार्यों की खोज नहीं की थी (दस्तावेज़ थोड़ा सा थे ... terse)। आप सही हैं, हालांकि, नौकरियां चल रही हैं, जबकि मैं कुछ अन्य चीजें करना चाहता हूं, और समय-समय पर रिपोर्ट करता हूं कि कितनी नौकरियां बाकी हैं। – jkeating

1

मेरे पास समान आवश्यकताएं हैं: प्रगति ट्रैक करें, परिणामों के आधार पर अंतरिम कार्य करें, सभी प्रक्रियाओं को किसी भी मनमाने ढंग से साफ करें। मैंने इसका सामना कैसे किया है apply_async के साथ एक समय में कार्यों को भेजना। मैं क्या कर का एक भारी सरलीकृत संस्करण:

maxProcesses = 4 
q = multiprocessing.Queue() 
pool = multiprocessing.Pool() 
runlist = range(100000) 
sendcounter = 0 
donecounter = 0 
while donecounter < len(runlist): 
    if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early 
     if donecounter == sendcounter: # wait til already sent tasks finish running 
      break 
    else: # don't send new tasks if it's time to stop 
     while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses: 
      pool.apply_async(mytask, (runlist[sendcounter], q)) 
      sendcounter += 1 

    while not q.empty(): # process completed results as they arrive 
     aresult = q.get() 
     processResults(aresult) 
     donecounter += 1 

नोट है कि मैं एक Queue परिणाम ing return के बजाय का उपयोग करें।