2013-02-03 73 views
22

के साथ मल्टीप्रोसेसिंग मैं कुछ भारी गणनाओं के समानांतर करने के लिए multiprocessing.Pool() का उपयोग कर रहा हूं।बड़े डेटा

लक्ष्य फ़ंक्शन बहुत सारे डेटा (एक विशाल सूची) देता है। मैं राम से बाहर चला रहा हूँ।

multiprocessing के बिना, मैं केवल yield द्वारा परिणामस्वरूप तत्वों को एक जनरेटर में बदलता हूं, जैसा कि उनकी गणना की जाती है।

मुझे लगता है कि मल्टीप्रोसेसिंग जेनरेटर का समर्थन नहीं करता है - यह पूरे आउटपुट की प्रतीक्षा करता है और इसे एक बार में देता है, है ना? कोई उपज नहीं। क्या Pool श्रमिकों को राम में संपूर्ण परिणाम सरणी के निर्माण के बिना, जैसे ही वे उपलब्ध हो जाते हैं, डेटा उत्पन्न करने का कोई तरीका है?

सरल उदाहरण:

def target_fnc(arg): 
    result = [] 
    for i in xrange(1000000): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     for element in result: 
      yield element 

यह अजगर 2.7 है।

उत्तर

15

यह एक पंक्ति के लिए एक आदर्श उपयोग के मामले की तरह लगता है: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

सीधे शब्दों में जमा कामगारों से कतार में अपने परिणामों को खिलाने और उन्हें मास्टर में निगलना।

ध्यान दें कि आप अभी भी मेमोरी प्रेशर के मुद्दों में भाग ले सकते हैं जब तक कि आप कतार को लगभग उतनी तेज़ी से नहीं निकालते जितना कि मजदूर इसे पॉप्युलेट कर रहे हैं। आप कतार आकार (कतार में फिट होने वाली वस्तुओं की अधिकतम संख्या) को सीमित कर सकते हैं, जिस स्थिति में पूल किए गए कर्मचारी कतार में स्थान उपलब्ध होने तक queue.put कथन पर अवरुद्ध करेंगे। यह स्मृति उपयोग पर एक छत लगाएगा। लेकिन यदि आप ऐसा कर रहे हैं, तो यह पुनर्विचार करने का समय हो सकता है कि आपको पूलिंग की आवश्यकता है या/या यदि कम श्रमिकों का उपयोग करना समझ में आता है।

+1

कतार मसालेदार डेटा पास करेगी। तो डेटा -> अचार-> अनपिक-> डेटा की नई प्रति। यह कार्यक्रम को धीमा कर देगा और अधिक अतिरिक्त रैम का उपयोग करेगा। इसके बजाय साझा स्मृति का उपयोग करने पर विचार करना चाहिए। – Wang

3

यदि आपके कार्य डेटा में डेटा वापस कर सकते हैं ... क्या वे छोटे कार्यों में विभाजित हो सकते हैं, जिनमें से प्रत्येक एक सिंगल खंड देता है? जाहिर है, यह हमेशा संभव नहीं है। जब ऐसा नहीं होता है, तो आपको कुछ अन्य तंत्र का उपयोग करना होगा (जैसे Queue, जैसा कि लॉरेन अब्राम सुझाता है)। लेकिन जब यह है, तो यह संभवतः अन्य कारणों के लिए बेहतर समाधान है, साथ ही इस समस्या को हल करने के लिए भी।

आपके उदाहरण के साथ, यह निश्चित रूप से करने योग्य है। उदाहरण के लिए:

def target_fnc(arg, low, high): 
    result = [] 
    for i in xrange(low, high): 
     result.append('dvsdbdfbngd') # <== would like to just use yield! 
    return result 

def process_args(some_args): 
    pool = Pool(16) 
    pool_args = [] 
    for low in in range(0, 1000000, 10000): 
     pool_args.extend(args + [low, low+10000] for args in some_args) 
    for result in pool.imap_unordered(target_fnc, pool_args): 
     for element in result: 
      yield element 

(आप निश्चित रूप से एक नेस्टेड समझ के साथ पाश, या एक zip और flatten, अगर आप चाहें बदल सकते थे।)

तो, अगर some_args[1, 2, 3] है, तो आप 300 मिलेगा कार्य- [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …], जिनमें से प्रत्येक 1000000 के बजाय केवल 10000 तत्व लौटाता है।

3

आपके विवरण से, ऐसा लगता है कि आप डेटा को संसाधित करने में इतनी दिलचस्पी नहीं रखते हैं, जैसे कि वे लाखों तत्वों को पार करने से बचते हैं list वापस।

ऐसा करने का एक आसान तरीका है: बस डेटा को एक फ़ाइल में रखें। उदाहरण के लिए:

def target_fnc(arg): 
    fd, path = tempfile.mkstemp(text=True) 
    with os.fdopen(fd) as f: 
     for i in xrange(1000000): 
      f.write('dvsdbdfbngd\n') 
    return path 

def process_args(some_args): 
    pool = Pool(16) 
    for result in pool.imap_unordered(target_fnc, some_args): 
     with open(result) as f: 
      for element in f: 
       yield element 

जाहिर है अपने परिणामों को नई-पंक्तियों को शामिल कर सकते हैं या नहीं तार, आदि कर रहे हैं, आप एक csv फ़ाइल, एक numpy, आदि एक साधारण पाठ फ़ाइल के बजाय का उपयोग करना चाहेंगे लेकिन विचार वही है।

कहा जा रहा है, भले ही यह आसान हो, आमतौर पर डेटा को एक समय में एक संक्रम को संसाधित करने के लाभ होते हैं, इसलिए अपने कार्यों को तोड़ना या Queue (जैसा कि अन्य दो उत्तरों सुझाव) का उपयोग करना बेहतर हो सकता है, अगर डाउनसाइड्स (क्रमशः, कार्यों को तोड़ने के तरीके की आवश्यकता होती है, या जितनी जल्दी हो सके उतनी डेटा उपभोग करने में सक्षम होने के लिए) डील ब्रेकर नहीं होते हैं।