2010-01-17 6 views
45

मुझे मल्टीप्रोसेसिंग मॉड्यूल के साथ समस्याएं आ रही हैं। मैं कई फाइलों से डेटा लोड करने के लिए अपने मानचित्र विधि के साथ श्रमिकों का एक पूल उपयोग कर रहा हूं और उनमें से प्रत्येक के लिए मैं एक कस्टम फ़ंक्शन के साथ डेटा का विश्लेषण करता हूं। प्रत्येक बार जब एक फाइल संसाधित की जाती है, तो मैं एक काउंटर अपडेट करना चाहता हूं ताकि मैं ट्रैक कर सकूं कि कितनी फाइलें संसाधित की जा रही हैं।पायथन मल्टीप्रोसेसिंग और एक साझा काउंटर

def analyze_data(args): 
    # do something 
    counter += 1 
    print counter 


if __name__ == '__main__': 

    list_of_files = os.listdir(some_directory) 

    global counter 
    counter = 0 

    p = Pool() 
    p.map(analyze_data, list_of_files) 

मैं इस के लिए एक समाधान नहीं मिल सकता है: यहाँ नमूना कोड है।

उत्तर

48

समस्या यह है कि counter चर आपकी प्रक्रियाओं के बीच साझा नहीं किया गया है: प्रत्येक अलग प्रक्रिया इसकी स्थानीय घटना बना रही है और इसे बढ़ा रही है।

कुछ प्रक्रियाओं के लिए प्रलेखन के this section देखें जो आप अपनी प्रक्रियाओं के बीच राज्य साझा करने के लिए नियोजित कर सकते हैं। आपके मामले में आप अपने कर्मचारियों के बीच Value उदाहरण साझा करना चाहेंगे

यहां आपके उदाहरण का एक वर्किंग वर्जन है (कुछ डमी इनपुट डेटा के साथ)। नोट यह वैश्विक मूल्यों जो मैं वास्तव में व्यवहार में से बचने के लिए कोशिश करेगा उपयोग करता है: दौड़ हालत बग बिना

from multiprocessing import Pool, Value 
from time import sleep 

counter = None 

def init(args): 
    ''' store the counter for later use ''' 
    global counter 
    counter = args 

def analyze_data(args): 
    ''' increment the global counter, do something with the input ''' 
    global counter 
    # += operation is not atomic, so we need to get a lock: 
    with counter.get_lock(): 
     counter.value += 1 
    print counter.value 
    return args * 10 

if __name__ == '__main__': 
    #inputs = os.listdir(some_directory) 

    # 
    # initialize a cross-process counter and the input lists 
    # 
    counter = Value('i', 0) 
    inputs = [1, 2, 3, 4] 

    # 
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    # 
    p = Pool(initializer = init, initargs = (counter,)) 
    i = p.map_async(analyze_data, inputs, chunksize = 1) 
    i.wait() 
    print i.get() 
+0

महान उत्तर! मुझे आयरनपीथन में एक ही समस्या थी, और मल्टीप्रोसेसिंग के दौरान। वैल्यू उपलब्ध नहीं है आप क्लियर के साथ कुछ ऐसा कर सकते हैं। संदर्भ और सिस्टम। थ्रेडिंग। इन्टरलाक्ड: http://stackoverflow.com/questions/2255461/how-to-atomically- increment-a-static-member-in-ironpython/2314858 # 2314858 –

+3

@jkp, वैश्विक चर के बिना आप इसे कैसे करेंगे? - मैं एक वर्ग का उपयोग करने की कोशिश कर रहा हूं, लेकिन ऐसा लगता है कि यह उतना आसान नहीं है। Http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma – Anna

+18

दुर्भाग्यवश, यह उदाहरण त्रुटिपूर्ण प्रतीत होता है, क्योंकि 'counter.value + = 1 'प्रक्रियाओं के बीच परमाणु नहीं है, इसलिए कुछ प्रक्रियाओं के साथ लंबे समय तक चलने पर मान गलत होगा –

24

काउंटर वर्ग: का उपयोग किए बिना

class Counter(object): 
    def __init__(self): 
     self.val = multiprocessing.Value('i', 0) 

    def increment(self, n=1): 
     with self.val.get_lock(): 
      self.val.value += n 

    @property 
    def value(self): 
     return self.val.value 
+0

इसी तरह के कोड के साथ काम करता है 'जॉबलिब 'समानांतर' (इस उत्तर में कोड 'जॉबलिब' के साथ काम नहीं करता है), https://github.com/davidheryanto/etc/blob/master/python-recipes/parallel-joblib-counter.py देखें –

0

तेज़ काउंटर वर्ग में निर्मित मूल्य के ताला दो बार

class Counter(object): 
    def __init__(self, initval=0): 
     self.val = multiprocessing.RawValue('i', initval) 
     self.lock = multiprocessing.Lock() 

    def increment(self): 
     with self.lock: 
      self.val.value += 1 

    @property 
    def value(self): 
     return self.val.value 

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue