2012-06-20 10 views
5

मैं एकल io_service (HTTP Server 3 example) का उपयोग करके थ्रेड पूल के साथ boost :: asio सर्वर को लागू करने जा रहा हूं। io_service डोमेन सॉकेट को यूनिक्स करने के लिए बाध्य होगा और इस सॉकेट पर विभिन्न धागे से कनेक्शन से जाने वाले अनुरोध पास होंगे। संसाधन खपत को कम करने के लिए मैं धागा पूल गतिशील बनाना चाहता हूं।बूस्ट में गतिशील थ्रेड पूल का उदाहरण :: asio

यहां एक अवधारणा है। सबसे पहले एक धागा बनाया जाता है। जब कोई अनुरोध आता है और सर्वर देखता है कि पूल में कोई निष्क्रिय धागा नहीं है तो यह एक नया धागा बनाता है और अनुरोध को पास करता है। सर्वर कुछ अधिकतम धागे तक बना सकता है। आदर्श रूप में इसमें कुछ समय के लिए निष्क्रिय होने वाले धागे को निलंबित करने की functinality होना चाहिए।

क्या कोई ऐसा कुछ करता है? या शायद किसी के पास एक प्रासंगिक उदाहरण है?

मेरे लिए, मुझे लगता है कि मुझे इसे प्राप्त करने के लिए किसी भी तरह io_service.dispatch ओवरराइड करना चाहिए।

उत्तर

5

प्रारंभिक दृष्टिकोण के साथ कुछ चुनौतियों हो सकता है:

  • boost::asio::io_service से ली गई या reimplemented जा करने के लिए इरादा नहीं है। आभासी कार्यों की कमी पर ध्यान दें।
  • यदि आपकी थ्रेड लाइब्रेरी थ्रेड के राज्य से पूछताछ करने की क्षमता प्रदान नहीं करती है, तो राज्य की जानकारी को अलग से प्रबंधित करने की आवश्यकता है।

एक वैकल्पिक समाधान io_service में नौकरी पोस्ट करना है, तो जांचें कि यह io_service में कितनी देर तक बैठा था। यदि समय-समय पर चलने के दौरान डेल्टा समय था और जब यह वास्तव में दौड़ता था तो निश्चित सीमा से ऊपर होता है, तो यह इंगित करता है कि कतार में सेवा करने वाले धागे की तुलना में कतार में और अधिक नौकरियां हैं। इसका एक बड़ा लाभ यह है कि गतिशील धागा पूल वृद्धि तर्क अन्य तर्क से decoupled हो जाता है।

यहां एक उदाहरण है जो deadline_timer का उपयोग कर इसे पूरा करता है।

  • deadline_timer सेट करें 3 सेकंड से समाप्त होने के लिए।
  • असीमित रूप से deadline_timer पर प्रतीक्षा करें। deadline_timer सेट होने पर हैंडलर सेकेंड तैयार होगा।
  • एसिंक्रोनस हैंडलर में, टाइमर को समाप्त होने के समय के संबंध में वर्तमान समय की जांच करें। यदि यह 2 सेकंड से अधिक है, तो io_service कतार का बैक अप ले रहा है, इसलिए थ्रेड पूल में थ्रेड जोड़ें।

उदाहरण:

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

class thread_pool_checker 
    : private boost::noncopyable 
{ 
public: 

    thread_pool_checker(boost::asio::io_service& io_service, 
         boost::thread_group& threads, 
         unsigned int max_threads, 
         long threshold_seconds, 
         long periodic_seconds) 
    : io_service_(io_service), 
     timer_(io_service), 
     threads_(threads), 
     max_threads_(max_threads), 
     threshold_seconds_(threshold_seconds), 
     periodic_seconds_(periodic_seconds) 
    { 
     schedule_check(); 
    } 

private: 

    void schedule_check(); 
    void on_check(const boost::system::error_code& error); 

private: 

    boost::asio::io_service& io_service_; 
    boost::asio::deadline_timer timer_; 
    boost::thread_group&  threads_; 
    unsigned int    max_threads_; 
    long      threshold_seconds_; 
    long      periodic_seconds_; 
}; 

void thread_pool_checker::schedule_check() 
{ 
    // Thread pool is already at max size. 
    if (max_threads_ <= threads_.size()) 
    { 
    std::cout << "Thread pool has reached its max. Example will shutdown." 
       << std::endl; 
    io_service_.stop(); 
    return; 
    } 

    // Schedule check to see if pool needs to increase. 
    std::cout << "Will check if pool needs to increase in " 
      << periodic_seconds_ << " seconds." << std::endl; 
    timer_.expires_from_now(boost::posix_time::seconds(periodic_seconds_)); 
    timer_.async_wait( 
    boost::bind(&thread_pool_checker::on_check, this, 
       boost::asio::placeholders::error)); 
} 

void thread_pool_checker::on_check(const boost::system::error_code& error) 
{ 
    // On error, return early. 
    if (error) return; 

    // Check how long this job was waiting in the service queue. This 
    // returns the expiration time relative to now. Thus, if it expired 
    // 7 seconds ago, then the delta time is -7 seconds. 
    boost::posix_time::time_duration delta = timer_.expires_from_now(); 
    long wait_in_seconds = -delta.seconds(); 

    // If the time delta is greater than the threshold, then the job 
    // remained in the service queue for too long, so increase the 
    // thread pool. 
    std::cout << "Job job sat in queue for " 
      << wait_in_seconds << " seconds." << std::endl; 
    if (threshold_seconds_ < wait_in_seconds) 
    { 
    std::cout << "Increasing thread pool." << std::endl; 
    threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, 
        &io_service_)); 
    } 

    // Otherwise, schedule another pool check. 
    run(); 
} 

// Busy work functions. 
void busy_work(boost::asio::io_service&, 
       unsigned int); 

void add_busy_work(boost::asio::io_service& io_service, 
        unsigned int count) 
{ 
    io_service.post(
    boost::bind(busy_work, 
       boost::ref(io_service), 
       count)); 
} 

void busy_work(boost::asio::io_service& io_service, 
       unsigned int count) 
{ 
    boost::this_thread::sleep(boost::posix_time::seconds(5)); 

    count += 1; 

    // When the count is 3, spawn additional busy work. 
    if (3 == count) 
    { 
    add_busy_work(io_service, 0); 
    } 
    add_busy_work(io_service, count); 
} 

int main() 
{ 
    using boost::asio::ip::tcp; 

    // Create io service. 
    boost::asio::io_service io_service; 

    // Add some busy work to the service. 
    add_busy_work(io_service, 0); 

    // Create thread group and thread_pool_checker. 
    boost::thread_group threads; 
    thread_pool_checker checker(io_service, threads, 
           3, // Max pool size. 
           2, // Create thread if job waits for 2 sec. 
           3); // Check if pool needs to grow every 3 sec. 

    // Start running the io service. 
    io_service.run(); 

    threads.join_all(); 

    return 0; 
} 

आउटपुट:

Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 7 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 4 seconds. 
Increasing thread pool. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 0 seconds. 
Will check if pool needs to increase in 3 seconds. 
Job job sat in queue for 3 seconds. 
Increasing thread pool. 
Thread pool has reached its max. Example will shutdown.
+1

अगर मैं सही ढंग से समझ, busy_work कार्यों सेकंड के लिए एक कतार के साथ-साथ पूल चेकर भले ही अधिकतम धागा गिनती नहीं था में प्रतीक्षा कर सकते हैं अभी तक पहुंचे क्योंकि नए धागे समय से पहले नहीं बनाए गए हैं। इससे यह सिद्धांत शायद ही उपयोग योग्य हो जाता है क्योंकि गतिशील होने की सुविधा को प्रदर्शन को कम नहीं करना चाहिए। यह स्थिर पूल के साथ आवश्यक समय की तुलना में एक नए थ्रेड निर्माण के लिए आवश्यक समय के साथ कार्य निष्पादन को लंबे समय तक करना चाहिए। वैसे भी धन्यवाद। – boqapt

+0

@ उपयोगकर्ता 484936: आपकी समझ सही है।पूल वृद्धि होती है _after_ गिरावट का पता चला है; यह पूलिंग के लिए आसान दृष्टिकोणों में से एक है और इसे 'प्रदर्शन को कम नहीं करना चाहिए'। यदि आप धागे आवंटित करना चाहते हैं _ जब आप जानते हैं कि उनकी आवश्यकता है, तो थ्रेड स्थिति को प्रबंधित करने की आवश्यकता है, सभी धागे के लिए ओवरहेड पेश करना है, और राज्य के तर्क को पूरे कोड में बिखरे जाने की आवश्यकता हो सकती है। यदि आप थ्रेड आवंटित करना चाहते हैं, तो आप भविष्यवाणी करते हैं कि उनकी आवश्यकता होगी, फिर सेवा में नौकरी पोस्ट करने के लिए एक समर्पित धागा है, फिर प्रतिक्रिया के लिए समय प्रतीक्षा करें। –

+0

मुझे आश्चर्य है कि एक परिदृश्य में क्या होता है जहां केवल एक लंबा चलने वाला कार्य निष्पादित होता है और जब हम टाइमर आग लगते हैं तो हम पूल में थ्रेड जोड़ते हैं। यदि उस समय प्रक्रिया करने के लिए वास्तव में कोई और घटना नहीं है तो यह दृष्टिकोण मेरे लिए अक्षम है। अगर मैं गलत हूं कृपया मुझे सही। – russoue