2012-04-17 17 views
21

क्या प्रत्येक कार्यकर्ता को एक पाइथन मल्टीप्रोसेसिंग पूल में एक अद्वितीय आईडी असाइन करने का कोई तरीका है जिससे कि पूल में किसी विशेष कार्यकर्ता द्वारा संचालित नौकरी पता चल सके कि कौन सा कार्यकर्ता चल रहा है यह? डॉक्स के अनुसार, एक Process एक name लेकिनपायथन मल्टीप्रोसेसिंग पूल में श्रमिक के लिए एक अद्वितीय आईडी प्राप्त करें

नाम पहचान प्रयोजनों के लिए ही इस्तेमाल किया एक स्ट्रिंग है। इसमें अर्थशास्त्र नहीं है। एकाधिक प्रक्रियाओं को एक ही नाम दिया जा सकता है।

मेरे विशेष उपयोग-मामले के लिए, मैं चार जीपीयू के समूह पर नौकरियों का एक समूह चलाने के लिए चाहता हूं, और जीपीयू के लिए डिवाइस नंबर सेट करने की आवश्यकता है कि नौकरी चलनी चाहिए। चूंकि नौकरियां गैर-वर्दी लंबाई की हैं, इसलिए मैं यह सुनिश्चित करना चाहता हूं कि पिछले एक पूर्ण होने से पहले मुझे चलाने की कोशिश कर रहे नौकरी के जीपीयू पर टक्कर न हो (इसलिए इससे आईडी को प्री-असाइन करना बंद हो जाता है समय से पहले काम की इकाई)।

+1

क्यों कुछ UUID तरह यादृच्छिक का उपयोग नहीं? –

+0

@ लूपररच - क्या आप इसका मतलब उस पर विस्तार कर सकते हैं? – JoshAdel

+1

उदाहरण के लिए '' प्रक्रिया = प्रक्रिया (लक्ष्य = foo, name = uuid.uuid4() हेक्स) '' आपकी प्रक्रियाओं के लिए अद्वितीय नाम देगा। –

उत्तर

38

ऐसा लगता है कि आप क्या चाहते हैं: multiprocessing.current_process()। उदाहरण के लिए:

import multiprocessing 

def f(x): 
    print multiprocessing.current_process() 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

आउटपुट:

$ python foo.py 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-3, started daemon)> 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-4, started daemon)> 
[0, 1, 4, 9, 16, 25] 

इस प्रक्रिया वस्तु ही देता है, तो प्रक्रिया उसकी अपनी पहचान का हो सकता है। आप एक अद्वितीय संख्यात्मक आईडी के लिए id पर भी कॉल कर सकते हैं - cpython में, यह प्रक्रिया ऑब्जेक्ट का स्मृति पता है, इसलिए नहीं लगता है ओवरलैप की कोई संभावना है। अंत में, आप ident या pid प्रक्रिया की संपत्ति का उपयोग कर सकते हैं - लेकिन यह प्रक्रिया शुरू होने के बाद ही सेट हो जाती है।

इसके अलावा, स्रोत को देखते हुए, मुझे लगता है कि ऑटोगनेरेटेड नाम (जैसा कि Process ऊपर के पहले स्ट्रिंग्स में पहले मान द्वारा उदाहरण दिया गया है) अद्वितीय हैं। multiprocessing प्रत्येक प्रक्रिया के लिए itertools.counter ऑब्जेक्ट रखता है, जिसका उपयोग किसी भी बच्चे की प्रक्रियाओं के लिए _identity ट्यूपल उत्पन्न करने के लिए किया जाता है। तो शीर्ष-स्तरीय प्रक्रिया एकल प्रक्रिया आईडी के साथ बाल प्रक्रिया का उत्पादन करती है, और वे दो-मूल्य आईडी के साथ प्रक्रिया को जन्म देती हैं, और इसी तरह। फिर, यदि Process कन्स्ट्रक्टर को कोई नाम पास नहीं किया गया है, तो यह ':'.join(...) का उपयोग करके, _identity के आधार पर बस autogenerates the name है। Poolalters the namereplace का उपयोग कर प्रक्रिया के, स्वत: उत्पन्न आईडी को छोड़कर।

इस सब का नतीजा है कि हालांकि दो Process तों एक ही नाम हो सकता है, क्योंकि आप जब आपने उन्हें बनाया उन्हें एक ही नाम असाइन कर सकते हैं, वे अद्वितीय हैं यदि आप नाम को स्पर्श नहीं करते है पैरामीटर। इसके अलावा, आप सैद्धांतिक रूप से एक अद्वितीय पहचानकर्ता के रूप में _identity का उपयोग कर सकते हैं; लेकिन मैं इकट्ठा करता हूं कि उन्होंने एक कारण के लिए उस परिवर्तनीय निजी बनाया!

कार्रवाई में ऊपर का एक उदाहरण:

import multiprocessing 

def f(x): 
    created = multiprocessing.Process() 
    current = multiprocessing.current_process() 
    print 'running:', current.name, current._identity 
    print 'created:', created.name, created._identity 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

आउटपुट:

$ python foo.py 
running: PoolWorker-1 (1,) 
created: Process-1:1 (1, 1) 
running: PoolWorker-2 (2,) 
created: Process-2:1 (2, 1) 
running: PoolWorker-3 (3,) 
created: Process-3:1 (3, 1) 
running: PoolWorker-1 (1,) 
created: Process-1:2 (1, 2) 
running: PoolWorker-2 (2,) 
created: Process-2:2 (2, 2) 
running: PoolWorker-4 (4,) 
created: Process-4:1 (4, 1) 
[0, 1, 4, 9, 16, 25] 
1

आप multiprocessing.Queue का उपयोग आईडी स्टोर करने के लिए और फिर पूल प्रक्रिया के प्रारंभ में आईडी प्राप्त कर सकते हैं।

लाभ:

  • आप internals पर भरोसा करने की जरूरत नहीं है।
  • यदि आपका उपयोग केस संसाधन/उपकरणों का प्रबंधन करना है तो आप सीधे डिवाइस नंबर डाल सकते हैं। इससे यह भी सुनिश्चित होगा कि किसी डिवाइस का दो बार उपयोग नहीं किया जाता है: यदि आपके डिवाइस में डिवाइस की तुलना में आपके पास अधिक प्रक्रियाएं हैं, तो अतिरिक्त प्रक्रिया queue.get() पर अवरुद्ध हो जाएगी और कोई काम नहीं करेगा (यह आपके पोर्ग्राम को अवरुद्ध नहीं करेगा, या कम से कम यह नहीं था जब मैंने परीक्षण किया)।

नुकसान:

  • आप अतिरिक्त संचार भूमि के ऊपर और पूल को उत्पन्न करने प्रक्रियाओं एक छोटा सा समय लेता है: बिना उदाहरण में sleep(1) सब काम दूसरों के रूप में, पहली प्रक्रिया द्वारा किया जा सकता है अभी तक प्रारंभ नहीं किया जा रहा है।
  • आप एक वैश्विक (इसके चारों ओर एक तरह से या कम से कम मैं नहीं जानता कि ) की जरूरत है

उदाहरण:

import multiprocessing 
from time import sleep 

def init(queue): 
    global idx 
    idx = queue.get() 

def f(x): 
    global idx 
    process = multiprocessing.current_process() 
    sleep(1) 
    return (idx, process.pid, x * x) 

ids = [0, 1, 2, 3] 
manager = multiprocessing.Manager() 
idQueue = manager.Queue() 

for i in ids: 
    idQueue.put(i) 

p = multiprocessing.Pool(8, init, (idQueue,)) 
print(p.map(f, range(8))) 

आउटपुट:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)] 

ध्यान दें, कि केवल 4 अलग-अलग पिड हैं, हालांकि पूल में 8 प्रक्रियाएं हैं और एक आईडीएक्स केवल एक प्रक्रिया द्वारा उपयोग किया जाता है।

0

मैंने इसे थ्रेडिंग के साथ किया और नौकरी प्रबंधन को संभालने के लिए a queue का उपयोग करके समाप्त कर दिया। आधार रेखा यहाँ है। मेरे पूर्ण संस्करण में try-catches का एक समूह है (विशेष रूप से कार्यकर्ता में, यह सुनिश्चित करने के लिए कि q.task_done() विफलता पर भी बुलाया जाता है)।

from threading import Thread 
from queue import Queue 
import time 
import random 


def run(idx, *args): 
    time.sleep(random.random() * 1) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     while True: 
      args = q.get() 
      run(idx, *args) 
      q.task_done() 

    for job in jobs: 
     q.put(job) 

    for i in range(0, workers): 
     t = Thread(target=worker, args=[i]) 
     t.daemon = True 
     t.start() 

    q.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

मैं (मेरे कार्यकर्ता हैं सिर्फ एक बाहरी प्रक्रिया फोन करने के लिए) बहु उपयोग करने की आवश्यकता नहीं किया था, लेकिन इस बढ़ाया जा सकता है। बहु के लिए एपीआई यह एक स्पर्श बदलता है, यहाँ आप कैसे अनुकूलित कर सकते हैं:

from multiprocessing import Process, Queue 
from Queue import Empty 
import time 
import random 

def run(idx, *args): 
    time.sleep(random.random() * i) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     try: 
      while True: 
       args = q.get(timeout=1) 
       run(idx, *args) 
     except Empty: 
      return 

    for job in jobs: 
     q.put(job) 

    processes = [] 
    for i in range(0, workers): 
     p = Process(target=worker, args=[i]) 
     p.daemon = True 
     p.start() 
     processes.append(p) 

    for p in processes: 
     p.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

दोनों संस्करणों उत्पादन कुछ की तरह होगा:

0 : ('job', 0) 
1 : ('job', 2) 
1 : ('job', 6) 
3 : ('job', 3) 
0 : ('job', 5) 
1 : ('job', 7) 
2 : ('job', 1) 
4 : ('job', 4) 
3 : ('job', 8) 
0 : ('job', 9)