मैंने हाल ही में 0MQ सीखना शुरू किया। इससे पहले, मैं एक ब्लॉग, Python Multiprocessing with ZeroMQ में भाग गया। मैंने 0 एमक्यू गाइड में the ventilator pattern के बारे में बात की जो मैंने पढ़ा, इसलिए मैंने इसे आज़माने का फैसला किया।वितरित कंप्यूटिंग के लिए यह पायथन 0 एमक्यू स्क्रिप्ट एक निश्चित इनपुट आकार पर क्यों लटका है?
मूल कोड के रूप में श्रमिकों द्वारा संख्याओं के उत्पादों की गणना करने के बजाय, मैंने वेंटिलेटर को 0 एमक संदेशों के माध्यम से श्रमिकों को बड़े सरणी भेजने की कोशिश करने का निर्णय लिया। निम्नलिखित कोड है जिसका उपयोग मैं अपने "प्रयोगों" के लिए कर रहा हूं।
जैसा कि नीचे दी गई टिप्पणी में उल्लेख किया गया है, किसी भी समय मैंने परिवर्तनीय string_length को 3 एमबी से बड़े नंबर पर बढ़ाने का प्रयास किया, तो कोड लटक गया।
विशिष्ट लक्षण: कहते हैं कि हम 4MB (अर्थात 4,194,304) के लिए STRING_LENGTH निर्धारित करते हैं, तो शायद परिणाम प्रबंधक एक कार्यकर्ता से परिणाम हो जाता है, और फिर कोड सिर्फ रुक जाता है की सुविधा देता है। htop दिखाता है कि 2 कोर ज्यादा नहीं कर रहे हैं। एथेरेप नेटवर्क यातायात मॉनीटर या तो इंटरफ़ेस पर कोई ट्रैफ़िक दिखाता है।
अब तक, घंटों के चारों ओर देखने के बाद, मैं यह समझने में सक्षम नहीं हूं कि इसका क्या कारण है, और इस मुद्दे के बारे में किसी भी संकल्प के बारे में संकेत या दो की सराहना करेंगे। धन्यवाद!
मैं इंटेल कोर देय सीपीयू, 8 जीबी रैम, 80 जीबी इंटेल एक्स 25 एमजी 2 एसएसडी, पायथन 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, पायथन-पायज़म 2.1.10- के साथ डेल नोटबुक पर उबंटू 11.04 64 बिट चला रहा हूं। 1chl1 ~ natty1
import time
import zmq
from multiprocessing import Process, cpu_count
np = cpu_count()
pool_size = np
number_of_elements = 128
# Odd, why once the slen is bumped to 3MB or above, the code hangs?
string_length = 1024 * 1024 * 3
def create_inputs(nelem, slen, pb=True):
'''
Generates an array that contains nelem fix-sized (of slen bytes)
random strings and an accompanying array of hexdigests of the
former's elements. Both are returned in a tuple.
:type nelem: int
:param nelem: The desired number of elements in the to be generated
array.
:type slen: int
:param slen: The desired number of bytes of each array element.
:type pb: bool
:param pb: If True, displays a text progress bar during input array
generation.
'''
from os import urandom
import sys
import hashlib
if pb:
if nelem <= 64:
toolbar_width = nelem
chunk_size = 1
else:
toolbar_width = 64
chunk_size = nelem // toolbar_width
description = '%d random strings of %d bytes. ' % (nelem, slen)
s = ''.join(('Generating an array of ', description, '...\n'))
sys.stdout.write(s)
# create an ASCII progress bar
sys.stdout.write("[%s]" % (" " * toolbar_width))
sys.stdout.flush()
sys.stdout.write("\b" * (toolbar_width+1))
array = list()
hash4a = list()
try:
for i in range(nelem):
e = urandom(int(slen))
array.append(e)
h = hashlib.md5()
h.update(e)
he = h.hexdigest()
hash4a.append(he)
i += 1
if pb and i and i % chunk_size == 0:
sys.stdout.write("-")
sys.stdout.flush()
if pb:
sys.stdout.write("\n")
except MemoryError:
print('Memory Error: discarding existing arrays')
array = list()
hash4a = list()
finally:
return array, hash4a
# The "ventilator" function generates an array of nelem fix-sized (of slen
# bytes long) random strings, and sends the array down a zeromq "PUSH"
# connection to be processed by listening workers, in a round robin load
# balanced fashion.
def ventilator():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to send work
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
# Give everything a second to spin up and connect
time.sleep(1)
# Create the input array
nelem = number_of_elements
slen = string_length
payloads = create_inputs(nelem, slen)
# Send an array to each worker
for num in range(np):
work_message = { 'num' : payloads }
ventilator_send.send_pyobj(work_message)
time.sleep(1)
# The "worker" functions listen on a zeromq PULL connection for "work"
# (array to be processed) from the ventilator, get the length of the array
# and send the results down another zeromq PUSH connection to the results
# manager.
def worker(wrk_num):
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive work from the ventilator
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
# Set up a channel to send result of work to the results reporter
results_sender = context.socket(zmq.PUSH)
results_sender.connect("tcp://127.0.0.1:5558")
# Set up a channel to receive control messages over
control_receiver = context.socket(zmq.SUB)
control_receiver.connect("tcp://127.0.0.1:5559")
control_receiver.setsockopt(zmq.SUBSCRIBE, "")
# Set up a poller to multiplex the work receiver and control receiver channels
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
poller.register(control_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll())
# If the message came from work_receiver channel, get the length
# of the array and send the answer to the results reporter
if socks.get(work_receiver) == zmq.POLLIN:
#work_message = work_receiver.recv_json()
work_message = work_receiver.recv_pyobj()
length = len(work_message['num'][0])
answer_message = { 'worker' : wrk_num, 'result' : length }
results_sender.send_json(answer_message)
# If the message came over the control channel, shut down the worker.
if socks.get(control_receiver) == zmq.POLLIN:
control_message = control_receiver.recv()
if control_message == "FINISHED":
print("Worker %i received FINSHED, quitting!" % wrk_num)
break
# The "results_manager" function receives each result from multiple workers,
# and prints those results. When all results have been received, it signals
# the worker processes to shut down.
def result_manager():
# Initialize a zeromq context
context = zmq.Context()
# Set up a channel to receive results
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5558")
# Set up a channel to send control commands
control_sender = context.socket(zmq.PUB)
control_sender.bind("tcp://127.0.0.1:5559")
for task_nbr in range(np):
result_message = results_receiver.recv_json()
print "Worker %i answered: %i" % (result_message['worker'], result_message['result'])
# Signal to all workers that we are finsihed
control_sender.send("FINISHED")
time.sleep(5)
if __name__ == "__main__":
# Create a pool of workers to distribute work to
for wrk_num in range(pool_size):
Process(target=worker, args=(wrk_num,)).start()
# Fire up our result manager...
result_manager = Process(target=result_manager, args=())
result_manager.start()
# Start the ventilator!
ventilator = Process(target=ventilator, args=())
ventilator.start()
मैं और अधिक प्रयोगों किया: 64 number_of_elements उतारा और 6 को STRING_LENGTH वृद्धि हुई कोड अभी भी ठीक भाग गया। इसके ऊपर, वही लक्षण दिखाई दिया। इससे मुझे विश्वास हुआ कि पिज्जाक बाध्यकारी में कहीं भी एक समग्र संदेश आकार सीमा हो सकती है। 0 एमक्यू सी एपीआई में यह [लिंक] है (http://api.zeromq.org/2-1:zmq-msg-init-size) zmq_msg_init_size (3) फ़ंक्शन जो मुझे pyzmq के दस्तावेज़ में नहीं मिल रहा है। क्या यह कारण हो सकता है? – user183394
क्या आप एक ट्रेसबैक प्राप्त कर सकते हैं जहां यह लटक रहा है? यह आपको एक संकेत दे सकता है। –
मैंने अपने मैक लैपटॉप पर string_length = 1024 * 1024 * 4 के साथ अपना कोड आजमाया और यह ठीक काम किया, इसलिए मुझे लगता है कि इसमें किसी प्रकार की मेमोरी विवाद के साथ कुछ करना होगा। –