2012-06-01 12 views
8

पृष्ठभूमि: मैं एक python मॉड्यूल एक स्ट्रीमिंग एपीआई से JSON ऑब्जेक्ट आकर्षित करने और उन्हें pymongo का उपयोग कर MongoDB में (एक समय में 25 के थोक डालने) स्टोर करने के लिए की स्थापना की है। तुलना के लिए, मेरे पास एक ही स्ट्रीमिंग एपीआई और pipe से mongoimport तक curl पर बैश कमांड भी है। इन दोनों दृष्टिकोण अलग संग्रह में डेटा स्टोर करते हैं।अनुकूलन: मोंगो करने के लिए एक स्ट्रीमिंग एपीआई से JSON डम्पिंग

समय-समय पर, मैं संग्रह के count() की निगरानी करता हूं ताकि यह जांच सके कि वे कैसे किराया करते हैं।

अब तक, python मॉड्यूल curl | mongoimport दृष्टिकोण के पीछे लगभग 1000 JSON ऑब्जेक्ट्स द्वारा मॉडलिंग को देख रहा है।

समस्या: मैं कैसे curl | mongoimport के साथ सिंक में ~ होने के लिए मेरी python मॉड्यूल अनुकूलन कर सकते हैं?

मैं tweetstream का उपयोग नहीं कर सकता क्योंकि मैं ट्विटर एपीआई का उपयोग नहीं कर रहा हूं लेकिन एक तृतीय पक्ष स्ट्रीमिंग सेवा का उपयोग नहीं कर रहा हूं।

क्या कोई मेरी मदद कर सकता है?

Python मॉड्यूल:


class StreamReader: 
    def __init__(self): 
     try: 
      self.buff = "" 
      self.tweet = "" 
      self.chunk_count = 0 
      self.tweet_list = [] 
      self.string_buffer = cStringIO.StringIO() 
      self.mongo = pymongo.Connection(DB_HOST) 
      self.db = self.mongo[DB_NAME] 
      self.raw_tweets = self.db["raw_tweets_gnip"] 
      self.conn = pycurl.Curl() 
      self.conn.setopt(pycurl.ENCODING, 'gzip') 
      self.conn.setopt(pycurl.URL, STREAM_URL) 
      self.conn.setopt(pycurl.USERPWD, AUTH) 
      self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data) 
      self.conn.perform() 
     except Exception as ex: 
      print "error ocurred : %s" % str(ex) 

    def handle_data(self, data): 
     try: 
      self.string_buffer = cStringIO.StringIO(data) 
      for line in self.string_buffer: 
       try: 
        self.tweet = json.loads(line) 
       except Exception as json_ex: 
        print "JSON Exception occurred: %s" % str(json_ex) 
        continue 

       if self.tweet: 
        try: 
         self.tweet_list.append(self.tweet) 
         self.chunk_count += 1 
         if self.chunk_count % 1000 == 0 
          self.raw_tweets.insert(self.tweet_list) 
          self.chunk_count = 0 
          self.tweet_list = [] 

        except Exception as insert_ex: 
         print "Error inserting tweet: %s" % str(insert_ex) 
         continue 
     except Exception as ex: 
      print "Exception occurred: %s" % str(ex) 
      print repr(self.buff) 

    def __del__(self): 
     self.string_buffer.close() 

पढ़ने के लिए धन्यवाद।

+0

क्या आपके द्वारा डालने वाले दस्तावेज़ों में "_id" फ़ील्ड है? –

+0

@AyayaKamsky हाँ, वे करते हैं। –

+0

मोंगो का कौन सा संस्करण और पिमोंगो का कौन सा संस्करण आप उपयोग कर रहे हैं? –

उत्तर

1

गया StringIO पुस्तकालय से छुटकारा।WRITEFUNCTION कॉलबैक handle_data के रूप में, इस मामले में, प्रत्येक पंक्ति के लिए बुलाया जाता है, बस JSON सीधे लोड करें। कभी-कभी, डेटा में निहित दो JSON ऑब्जेक्ट्स हो सकते हैं। मुझे खेद है, मैं curl कमांड पोस्ट नहीं कर सकता जो मैं उपयोग करता हूं क्योंकि इसमें हमारे प्रमाण-पत्र शामिल हैं। लेकिन, जैसा कि मैंने कहा, यह किसी भी स्ट्रीमिंग एपीआई पर लागू एक सामान्य मुद्दा है।


def handle_data(self, buf): 
    try: 
     self.tweet = json.loads(buf) 
    except Exception as json_ex: 
     self.data_list = buf.split('\r\n') 
     for data in self.data_list: 
      self.tweet_list.append(json.loads(data))  
3

मूल रूप से आपके कोड में एक बग था।

   if self.chunk_count % 50 == 0 
        self.raw_tweets.insert(self.tweet_list) 
        self.chunk_count = 0 

आप chunk_count को रीसेट करते हैं लेकिन आप tweet_list को रीसेट नहीं करते हैं। तो आप के माध्यम से दूसरी बार 100 आइटम डालने का प्रयास करें (50 नए प्लस 50 जो पहले से ही पहले डीबी को भेजे गए थे)। आपने इसे ठीक कर दिया है, लेकिन फिर भी प्रदर्शन में एक अंतर दिखाई देता है।

पूरे बैच आकार की चीज़ लाल हेरिंग बन जाती है। मैंने जेसन की एक बड़ी फाइल का उपयोग करने की कोशिश की और इसे पायथन बनाम लोड करके इसे लोड किया। इसे मोंगोइम्पोर्ट और पायथन के माध्यम से लोड करना हमेशा तेज था (सुरक्षित मोड में भी - नीचे देखें)।

अपने कोड पर नज़र डालने पर, मुझे एहसास हुआ कि समस्या इस तथ्य के साथ है कि स्ट्रीमिंग एपीआई वास्तव में आपको भागों में डेटा सौंप रही है। आपसे उन हिस्सों को लेने और उन्हें डेटाबेस में रखने की उम्मीद है (यही वह है जो मोंगोइम्पोर्ट कर रहा है)। स्ट्रीम को विभाजित करने के लिए आपका अजगर क्या काम कर रहा है, इसे एक सूची में जोड़ें और फिर समय-समय पर मोंगो को बैच भेजें, जो मैं देखता हूं और जो आप देखते हैं उसके बीच अंतर है।

अपने handle_data() के लिए इस स्निपेट का प्रयास करें नोट करने के लिए

def handle_data(self, data): 
    try: 
     string_buffer = StringIO(data) 
     tweets = json.load(string_buffer) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 
    try: 
     self.raw_tweets.insert(tweets) 
    except Exception as ex: 
     print "Exception occurred: %s" % str(ex) 

एक बात यह है कि अपने python inserts are not running in "safe mode" - आप बदलना चाहिए कि आपके डालने बयान के एक तर्क safe=True जोड़कर। इसके बाद आपको विफल होने वाले किसी भी सम्मिलन पर अपवाद मिलेगा और आपकी कोशिश/पकड़ समस्या को उजागर करने में त्रुटि को प्रिंट करेगा।

यह प्रदर्शन में ज्यादा खर्च नहीं करना पड़ता या तो - मैं वर्तमान में एक परीक्षण चल रहा हूँ और लगभग पांच मिनट के बाद, दो संग्रह के आकार हैं 14120 14113.

+0

बीटीडब्ल्यू, मैंने आपके कोड की कोशिश की - फिक्स के साथ, पाइथन मोन्गिमपोर्ट के रूप में डेटा डालने के बारे में दोगुनी है। ऐसा इसलिए है क्योंकि डिफ़ॉल्ट रूप से "सुरक्षित" आवेषण बंद हैं। सुरक्षित मोड़कर लिखना (सुरक्षित पास करना = डालने के लिए सही) पाइथन आवेषण अभी भी लगभग 75% मोंगोइम्पोर्ट समय थे। –

+0

उनको इंगित करने के लिए धन्यवाद! मैंने आवश्यक परिवर्तन किए हैं (ऊपर कोड भी अपडेट किया गया है): self.chunk_count = 0 के बाद "self.tweet_list = []" जोड़ा गया और बैच आकार को 1000 तक बढ़ा दिया। यह अभी भी लग रहा है - पायथन मॉड्यूल गिनती 5000 पर है जबकि कर्ल mongoimport कॉम्बो 5718 पर है। (यह 4000: 5662 था)। कोई अंतर्दृष्टि? महान टिप्पणियों के लिए –

+0

+1! –