2012-02-12 25 views
5

कुछ दिन पहले मैं पर एक प्रश्न पूछा गया है तो मेरे कई HTTP की संरचना के लिए एक प्रतिमान डिजाइन की मदद करने के बारे में अनुरोध करता हैप्रवाह मतभेद जब coroutines का उपयोग कर बनाम सूत्रण

यहाँ परिदृश्य है। मैं एक बहु उत्पादक, बहु उपभोक्ता प्रणाली चाहते हैं। मेरे निर्माता कुछ साइटों को क्रॉल और स्क्रैप करते हैं और लिंक को जोड़ते हैं जो इसे कतार में पाता है। चूंकि मैं कई साइटों को क्रॉल कर दूंगा, इसलिए मैं कई उत्पादक/क्रॉलर रखना चाहता हूं।

उपभोक्ता/कर्मचारी इस कतार को खिलाते हैं, इन लिंक के लिए टीसीपी/यूडीपी अनुरोध करते हैं और परिणाम मेरे Django डीबी में सहेजते हैं। मैं कई श्रमिकों को भी पसंद करूंगा क्योंकि प्रत्येक कतार वस्तु एक दूसरे से पूरी तरह से स्वतंत्र है।

लोगों ने सुझाव दिया कि इस यानी गेवेन्ट या इवेंटलेट के लिए कोरआउट लाइब्रेरी का उपयोग करें। कोरआउटिन के साथ कभी काम नहीं करने के बाद, मैंने पढ़ा कि प्रोग्रामिंग प्रतिमान थ्रेडेड प्रतिमानों के समान है, फिर भी केवल एक थ्रेड सक्रिय रूप से निष्पादित होता है लेकिन कॉल अवरुद्ध होने पर - जैसे I/O कॉल - स्टैक्स को स्मृति में बदल दिया जाता है और अन्य हरा थ्रेड तब तक ले जाता है जब तक कि यह किसी अवरुद्ध I/O कॉल का सामना नहीं करता है। उम्मीद है कि मुझे यह अधिकार मिला है? यहाँ मेरी अतः पदों में से एक से कोड है:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 


def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 


def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

# This doesn't work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

# Uncommenting this makes this script work. 
# producer() 

q.join() 

यह अच्छा कार्य करता है क्योंकि sleep कॉल अवरुद्ध कॉल कर रहे हैं और जब एक sleep घटना होती है, एक और हरे रंग के धागे से अधिक लेता है। अनुक्रमिक निष्पादन से यह बहुत तेज़ है। जैसा कि आप देख सकते हैं, मेरे पास मेरे प्रोग्राम में कोई कोड नहीं है जो जानबूझकर एक थ्रेड के निष्पादन को दूसरे धागे में लाता है। मैं यह देखने में असफल रहा कि यह उपरोक्त परिदृश्य में कैसे फिट बैठता है क्योंकि मैं सभी धागे एक साथ निष्पादित करना चाहता हूं।

सभी ठीक काम करता है, लेकिन मैं प्रवाह है कि मैं का उपयोग कर Gevent/Eventlets मूल क्रमिक रूप से चल रहे प्रोग्राम की तुलना में अधिक है, लेकिन तेजी से क्या असली-थ्रेडिंग का प्रयोग कर प्राप्त किया जा सकता है की तुलना में कम है हासिल कर लिया है लग रहा है।

यदि मैं थ्रेडिंग तंत्र का उपयोग करके अपने कार्यक्रम को फिर से कार्यान्वित करना चाहता था, तो मेरे प्रत्येक उत्पादक और उपभोक्ता एक साथ कोरआउट के अंदर और बाहर ढेर को स्वैप करने की आवश्यकता के बिना काम कर सकते थे।

क्या इसे थ्रेडिंग का उपयोग करके फिर से कार्यान्वित किया जाना चाहिए? क्या मेरा डिजाइन गलत है? मैं coroutines का उपयोग करने के असली लाभ देखने में विफल रहा है।

शायद मेरी अवधारणाएं थोड़ा गंदे हैं लेकिन यह वही है जो मैंने समेट लिया है। मेरे प्रतिमान और अवधारणाओं की कोई भी मदद या स्पष्टीकरण बहुत अच्छा होगा।

धन्यवाद

+0

एकाधिक प्रक्रियाओं का उपयोग क्यों नहीं करते? –

+0

मैं तो मैं इसके ठीक है या नहीं अगर नहीं जानते पेशेवरों और बहु ​​सूत्रण बनाम बहु प्रसंस्करण के विपक्ष पता नहीं है। –

+1

वहाँ "वास्तविक सूत्रण" जैसी कोई चीज नहीं है वैश्विक दुभाषिया लॉक की वजह से सी एक्सटेंशन (या भारी वजन ओएस प्रक्रियाओं) का सहारा के बिना अजगर कार्यक्रमों में (केवल एक वास्तविक ओएस धागा किसी भी समय पर निष्पादित करता है)। –

उत्तर

5

आप देख सकते हैं, मैं अपने प्रोग्राम है जो जानबूझकर एक और धागा करने के लिए एक थ्रेड के निष्पादन के पैदावार में कोई कोड नहीं है। मैं परिणाम जानने के लिए ऊपर के रूप में मैं सभी धागे एक साथ क्रियान्वित करना चाहते हैं परिदृश्य में फिट बैठता है असफल।

एक ओएस थ्रेड है लेकिन कई ग्रीनलेट्स हैं। आपके मामले में gevent.sleep() श्रमिकों को समवर्ती रूप से निष्पादित करने की अनुमति देता है। ब्लॉक कर रहा है आईओ कॉल urllib2.urlopen(url).read() भी ऐसा ही जैसे यदि आप urllib2 समझौता का उपयोग करें (gevent.monkey.patch_*() फोन करके) gevent के साथ काम करने के लिए।

भी A Curious Course on Coroutines and Concurrency को समझने के लिए एक कोड एक भी लड़ी वातावरण में समवर्ती काम कर सकते हैं देखें।

gevent, थ्रेडिंग के बीच प्रवाह क्षमता मतभेद की तुलना करने के लिए, आप कोड लिख सकता है बहु है कि सभी aproaches के साथ संगत:

#!/usr/bin/env python 
concurrency_impl = 'gevent' # single process, single thread 
##concurrency_impl = 'threading' # single process, multiple threads 
##concurrency_impl = 'multiprocessing' # multiple processes 

if concurrency_impl == 'gevent': 
    import gevent.monkey; gevent.monkey.patch_all() 

import logging 
import time 
import random 
from itertools import count, islice 

info = logging.info 

if concurrency_impl in ['gevent', 'threading']: 
    from Queue import Queue as JoinableQueue 
    from threading import Thread 
if concurrency_impl == 'multiprocessing': 
    from multiprocessing import Process as Thread, JoinableQueue 

स्क्रिप्ट के बाकी सभी संगामिति कार्यान्वयन के लिए एक ही है:

def do_work(wid, value): 
    time.sleep(random.randint(0,2)) 
    info("%d Task %s done" % (wid, value)) 

def worker(wid, q): 
    while True: 
     item = q.get() 
     try: 
      info("%d Got item %s" % (wid, item)) 
      do_work(wid, item) 
     finally: 
      q.task_done() 
      info("%d Done item %s" % (wid, item)) 

def producer(pid, q): 
    for item in iter(lambda: random.randint(1, 11), 10): 
     time.sleep(.1) # simulate a green blocking call that yields control 
     info("%d Added item %s" % (pid, item)) 
     q.put(item) 
    info("%d Signal Received" % (pid,)) 

एक मॉड्यूल स्तर पर कोड निष्पादित न करें main() में रख:

def main(): 
    logging.basicConfig(level=logging.INFO, 
         format="%(asctime)s %(process)d %(message)s") 

    q = JoinableQueue() 
    it = count(1) 
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] 
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] 
    for t in producers+workers: 
     t.daemon = True 
     t.start() 

    for t in producers: t.join() # put items in the queue 
    q.join() # wait while it is empty 
    # exit main thread (daemon workers die at this point) 

if __name__=="__main__":  
    main() 
+0

हाय सेबस्टियन, मैं अपने कोड में देखा और देखा कि मेरे उत्पादकों और उपभोक्ताओं के लिए एक साथ काम कर रहे हैं है। जब मेरे हरितलेट में से एक में अवरुद्ध ऑपरेशन होता है तो यह अन्य हरितलेट पर नियंत्रण उत्पन्न करता है। मैंने लापता 'monkey_patch' कॉल जोड़ा है ताकि सॉकेट मॉड्यूल भी अवरुद्ध न हो लेकिन मुझे अपने प्रोसेसर को पर्याप्त क्रंच नहीं मिल सकता है। एक नियमित पीसी में पर्याप्त रस और अधिक ग्रीनलेट होने के लिए पर्याप्त रस होता है लेकिन मुझे पर्याप्त गति नहीं मिलती है। मैं बहुत खो गया हूं और उलझन में हूं क्योंकि यह अधिक प्रोसेसर का उपयोग क्यों नहीं करता है और तेजी से काम करता है। क्या आप कृपया मुझे समझने में मदद कर सकते हैं? मैं बहुत खो गया हूँ धन्यवाद। –

+0

@ मिरंग अग्रवाल: मैंने आपके प्रश्न में आपके द्वारा पोस्ट किए गए कोड पर टिप्पणी की है। 'निर्माता' * इसमें समवर्ती रूप से काम नहीं करते हैं। – jfs

+1

@ मिरंगंग अग्रवाल: यदि आपकी समस्या आईओ बाध्य है (डिस्क, नेटवर्क) तो इससे कोई फर्क नहीं पड़ता कि आपका सीपीयू कितना तेज़ है, अगर आप डिस्क पर केवल 50 एमबी/एस पर लिख सकते हैं तो इससे कोई फर्क नहीं पड़ता कि आपका सीपीयू कर सकता है प्रक्रिया 1 जीबी/एस। इसके अलावा आपका प्रोग्राम अन्य सीमित संसाधनों जैसे कि खुली फाइलों की संख्या का उपभोग कर सकता है। यदि आप 'gevent' का उपयोग करते हैं तो सुनिश्चित करें कि सभी अवरुद्ध कॉल "हरे" हैं, यानी वे ब्लॉक नहीं करते हैं, इसलिए आपका डेटाबेस ड्राइवर 'gevent' के साथ संगत नहीं हो सकता है। – jfs

1

gevent महान आप बहुत सारे (हरा) धागे है जब है। मैंने इसे हजारों के साथ परीक्षण किया और यह बहुत अच्छा काम किया। आपने सुनिश्चित किया है कि आप स्क्रैपिंग के लिए और डीबी को बचाने के लिए दोनों पुस्तकालयों का उपयोग हरे रंग के होते हैं। अगर वे पाइथन की सॉकेट का उपयोग करते हैं, तो गीवा इंजेक्शन को काम करना चाहिए। सी (उदा। mysqldb) में लिखे गए एक्सटेंशन हालांकि ब्लॉक करेंगे और आपको इसके बजाय हरे समकक्षों का उपयोग करने की आवश्यकता होगी।

यदि आप भूगर्भ का उपयोग करते हैं तो आप अधिकतर कतारों से दूर हो सकते हैं, प्रत्येक कार्य के लिए नए (हरे) धागे को स्प्रे कर सकते हैं, थ्रेड के लिए कोड db.save(web.get(address)) जितना आसान हो सकता है। जब डीबी या वेब ब्लॉक में कुछ लाइब्रेरी होती है तो गीवेंट प्रीपेम्प्शन का ख्याल रखेगा। यह तब तक काम करेगा जब तक आपके कार्य स्मृति में फिट हों।

0

इस मामले में, आपकी समस्या प्रोग्राम की गति (यानी gevent या threading की पसंद) के साथ नहीं है, लेकिन नेटवर्क IO थ्रूपुट। यह बाधा है (यह होना चाहिए) यह निर्धारित करता है कि कार्यक्रम कितनी तेजी से चलता है।

गीवेंट यह सुनिश्चित करने का एक अच्छा तरीका है कि बाधा है, न कि आपके प्रोग्राम की वास्तुकला।

इस प्रक्रिया की तरह आप चाहते हैं होता है:

import gevent 
from gevent.queue import Queue, JoinableQueue 
from gevent.monkey import patch_all 


patch_all() # Patch urllib2, etc 


def worker(work_queue, output_queue): 
    for work_unit in work_queue: 
     finished = do_work(work_unit) 
     output_queue.put(finished) 
     work_queue.task_done() 


def producer(input_queue, work_queue): 
    for url in input_queue: 
     url_list = crawl(url) 
     for work in url_list: 
      work_queue.put(work) 
     input_queue.task_done() 


def do_work(work): 
    gevent.sleep(0) # Actually proces link here 
    return work 


def crawl(url): 
    gevent.sleep(0) 
    return list(url) # Actually process url here 

input = JoinableQueue() 
work = JoinableQueue() 
output = Queue() 

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] 
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] 


list_of_urls = ['foo', 'bar'] 

for url in list_of_urls: 
    input.put(url) 

# Wait for input to finish processing 
input.join() 
print 'finished producing' 
# Wait for workers to finish processing work 
work.join() 
print 'finished working' 

# We now have output! 
print 'output:' 
for message in output: 
    print message 
# Or if you'd like, you could use the output as it comes! 

आप इनपुट और काम कतारों समाप्त होने की प्रतीक्षा की जरूरत नहीं है, मैं सिर्फ यहाँ है कि प्रदर्शन किया है।

 संबंधित मुद्दे

  • कोई संबंधित समस्या नहीं^_^