2013-02-27 177 views
8

का उपयोग करते हुए उपप्रोसेसेस चलाने के दौरान सिस्टम त्रुटि मुझे मल्टीप्रोसेसिंग पैकेज (पाइथन 2.73 के साथ अमेज़ॅन ईसी 2 पर उबंटू 12.04 पर समानांतर में कुछ साधारण numpy- आधारित मैट्रिक्स बीजगणित गणना करने के दौरान एक सिस्टम त्रुटि (नीचे दिखाया गया) प्राप्त कर रहा है। । मेरा कोड छोटे मैट्रिक्स आकारों के लिए ठीक काम करता है लेकिन बड़ी संख्या में उपलब्ध है (उपलब्ध स्मृति के साथ)मल्टीप्रोसेसिंग

मेरे द्वारा उपयोग किए जाने वाले मैट्रिस का आकार पर्याप्त है (मेरा कोड 1000000x10 फ्लोट घने मैट्रिस के लिए ठीक है लेकिन 1000000x500 वाले के लिए क्रैश - I रास्ते में subprocesses से/इन matrices गुजर रहा हूँ)। 10 बनाम 500 रन-टाइम पैरामीटर है, बाकी सब कुछ वही रहता है (इनपुट डेटा, अन्य रन-टाइम पैरामीटर इत्यादि)

मैंने बड़े मैट्रिस के लिए python3 का उपयोग करके एक ही (पोर्ट किया गया) कोड चलाने की भी कोशिश की है subprocesses नींद/निष्क्रिय मोड (पाइथन 2.7 के रूप में दुर्घटनाग्रस्त होने के बजाय) पर जाते हैं और कार्यक्रम/subprocesses बस कुछ भी नहीं कर लटका। छोटे matrices के लिए कोड python3 के साथ ठीक चलाता है।

कोई सुझाव अत्यधिक सराहना की है (मैं विचारों से बाहर यहाँ चला रहा हूँ)

त्रुटि संदेश:

Exception in thread Thread-5: Traceback (most recent call last): 
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner 
    self.run() File "/usr/lib/python2.7/threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks 
    put(task) SystemError: NULL result without error in PyObject_Call 

Multiprocessing कोड मैं का उपयोग करें:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses): 
    if len(listOfInputs) == 0: 
     return 
    # Add result queue to the list of argument tuples. 
    resultQueue = mp.Manager().Queue() 
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs] 
    # Create and initialize the pool of workers. 
    pool = mp.Pool(processes = nParallelProcesses) 
    pool.map(proc, listOfInputsNew) 
    # Run the processes. 
    pool.close() 
    pool.join() 
    # Return the results. 
    return [resultQueue.get() for i in range(len(listOfInputs))] 

नीचे है " proc "जो प्रत्येक subprocess के लिए निष्पादित हो जाता है। असल में, यह numpy का उपयोग करके रैखिक समीकरणों की कई प्रणालियों को हल करता है (यह subprocess के अंदर आवश्यक matrices बनाता है) और परिणाम एक और मैट्रिक्स के रूप में देता है। एक बार फिर, यह एक रन-टाइम पैरामीटर के छोटे मानों के लिए ठीक काम करता है लेकिन बड़े लोगों के लिए क्रैश (या पायथन 3 में लटकता है)।

def solveForLFV(param): 
    startTime = time.time() 
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param 
    LFoutChunkSize = XY.shape[0] 
    nLFdim = LFVin.shape[1] 
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim)) 
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim)) 
    for LFVoutIndex in xrange(LFoutChunkSize): 
     LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex] 
     sumLFVinOuterProductLFVpurch[:, :] = 0. 
     LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize) 
     for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)): 
      LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :] 
      sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :]) 
     LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :]) 
    queue.put((chunkI, LFVoutChunk)) 
    print 'solveForLFV: ', time.time() - startTime, 'sec' 
    sys.stdout.flush() 
+0

क्या आप प्रो फ़ंक्शन के लिए कोड साझा कर सकते हैं? – barracel

+0

बस किया। मैंने proc के तर्कों का वर्णन नहीं किया है - उनमें से कुछ matrices हैं, कुछ सूचियों की सूची हैं, और कुछ केवल फ्लोट/पूर्णांक हैं। प्रत्येक क्यूप्रोसेस से परिणामों को वापस करने के लिए 'कतार' का उपयोग किया जाता है। – Yevgeny

उत्तर

5

500,000,000 बहुत बड़ा है: यदि आप फ्लोट 64 का उपयोग कर रहे हैं, तो यह 4 बिलियन बाइट्स या लगभग 4 जीबी है। (10,000,000 फ्लोट सरणी 80 मिलियन बाइट्स, या लगभग 80 एमबी - बहुत छोटी होगी।) मुझे उम्मीद है कि समस्या को मल्टीप्रोसेसिंग के साथ कुछ पाइप पर उपप्रोसेस को भेजने के लिए सरणी को चुनने की कोशिश करनी होगी।

चूंकि आप यूनिक्स प्लेटफॉर्म पर हैं, इसलिए आप fork() (मल्टीप्रोसेसिंग के श्रमिकों को बनाने के लिए उपयोग किए गए) के स्मृति विरासत व्यवहार का शोषण करके इस व्यवहार से बच सकते हैं। टिप्पणियों द्वारा वर्णित, इस हैक के साथ मुझे बड़ी सफलता मिली है (this project से बाहर फिसल गया)।

### A helper for letting the forked processes use data without pickling. 
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10)) 
    for _ in itertools.count()) 
class ForkedData(object): 
    ''' 
    Class used to pass data to child processes in multiprocessing without 
    really pickling/unpickling it. Only works on POSIX. 

    Intended use: 
     - The master process makes the data somehow, and does e.g. 
      data = ForkedData(the_value) 
     - The master makes sure to keep a reference to the ForkedData object 
      until the children are all done with it, since the global reference 
      is deleted to avoid memory leaks when the ForkedData object dies. 
     - Master process constructs a multiprocessing.Pool *after* 
      the ForkedData construction, so that the forked processes 
      inherit the new global. 
     - Master calls e.g. pool.map with data as an argument. 
     - Child gets the real value through data.value, and uses it read-only. 
    ''' 
    # TODO: does data really need to be used read-only? don't think so... 
    # TODO: more flexible garbage collection options 
    def __init__(self, val): 
        g = globals() 
        self.name = next(n for n in _data_name_cands if n not in g) 
        g[self.name] = val 
        self.master_pid = os.getpid() 

    @property 
    def value(self): 
        return globals()[self.name] 

    def __del__(self): 
        if os.getpid() == self.master_pid: 
            del globals()[self.name]