2013-01-31 30 views
11

यहां मेरा उपयोग केस है।स्प्रिंग @ शेड्यूल्ड और @ एसिंक का उपयोग करके

एक विरासत प्रणाली डेटाबेस कतार तालिका QUEUE अद्यतन करती है।

मैं चाहता हूँ एक अनुसूचित आवर्ती काम है कि - जाँच करता कतार की सामग्री - यदि तालिका में पंक्तियों हैं यह पंक्ति ताले और कुछ काम है - कतार में पंक्ति

पिछले काम तो हटा देता है अभी भी चल रहा है, फिर काम करने के लिए एक नया धागा बनाया जाएगा। मैं समवर्ती धागे की अधिकतम संख्या को कॉन्फ़िगर करना चाहता हूं।

मैं वसंत 3 उपयोग कर रहा हूँ और मेरे वर्तमान समाधान निम्नलिखित (1 millisecond के fixedRate का उपयोग कर धागे मूल रूप से लगातार चलाने के लिए प्राप्त करने के लिए)

@Scheduled(fixedRate = 1) 
@Async 
public void doSchedule() throws InterruptedException { 
    log.debug("Start schedule"); 
    publishWorker.start(); 
    log.debug("End schedule"); 
} 

<task:executor id="workerExecutor" pool-size="4" /> 

यह 4 धागे सीधे बंद बनाई करना है और धागे कतार से वर्कलोड सही ढंग से साझा किया। हालांकि मुझे लगता है कि धागे को पूरा करने में काफी समय लगता है, तो मुझे स्मृति रिसाव मिल रही है।

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0        |    80 | 373,410,496 |  89.74% 
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940       |   48 | 373,410,136 |  89.74% 
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68 

तो

1: मैं @Async और एक साथ @Scheduled का उपयोग कर किया जाना चाहिए?

2: यदि नहीं, तो मैं अपनी आवश्यकताओं को प्राप्त करने के लिए वसंत का उपयोग कैसे कर सकता हूं?

3: जब मैं अन्य धागे व्यस्त होते हैं तो मैं केवल नए थ्रेड कैसे बना सकता हूं?

धन्यवाद सब!

संपादित करें: मुझे लगता है नौकरियों की कतार असीम लंबा हो रही थी ... अब

<task:executor id="workerExecutor" 
    pool-size="1-4" 
    queue-capacity="10" rejection-policy="DISCARD" /> 

का उपयोग कर परिणाम के साथ वापस रिपोर्ट करेंगे

+4

यह '@ Async' बिना ठीक से काम नहीं करता है? 'अनुसूचित 'के साथ एनोटेटेड विधि को वैसे भी अतुल्यकालिक रूप से निष्पादित किया जाना चाहिए। – ach

+0

यदि आप चाहते हैं कि "धागे लगातार चलें" तो आपको वास्तव में पहले स्थान पर @ शेड्यूल का उपयोग नहीं करना चाहिए। इसका उपयोग "अनुसूचित" गतिविधियों के लिए होगा, निरंतर नहीं ... – JoeG

+0

आप publWorker.start() को बनाने पर विचार कर सकते हैं; विधि Async। –

उत्तर

0
//using a fixedRate of 1 millisecond to get the threads to run basically continuously 
@Scheduled(fixedRate = 1) 

जब आप @Scheduled का उपयोग एक नया धागा बनाया जाएगा और 1 मिलीसेकंड पर निर्दिष्ट फिक्स्डरेट पर विधि doSchedule का आह्वान करेगा। जब आप अपना ऐप चलाते हैं तो आप क्यूईईई टेबल और संभवतः एक मृत लॉक के लिए प्रतिस्पर्धा करने वाले 4 थ्रेड देख सकते हैं।

जांच करें कि थ्रेड डंप ले कर डेडलॉक है या नहीं। http://helpx.adobe.com/cq/kb/TakeThreadDump.html

@Async एनोटेशन यहां किसी भी उपयोग का नहीं होगा।

इसे लागू करने का बेहतर तरीका आपको रननेबल को लागू करके थ्रेड के रूप में कक्षा बनाने और TaskExecutor पर आवश्यक संख्या के थ्रेड के साथ कक्षा बनाने के लिए है।

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

इसके अलावा अपने डिजाइन की जांच यह तुल्यकालन ठीक से निपटने जा करने के लिए प्रतीत नहीं होता। यदि पिछली नौकरी चल रही है और पंक्ति पर ताला लगा रही है, तो आपके द्वारा बनाई गई अगली नौकरी अभी भी उस पंक्ति को देखेगी और उस विशेष पंक्ति पर लॉक प्राप्त करने के लिए प्रतीक्षा करेगी।

2

आप

  1. एक सेकंड की देरी है, जो & सभी कतार रिकॉर्ड अब तक बंद नहीं किया गया था लाने लॉक हो जाएगा के साथ एक अनुसूचक चलाने की कोशिश कर सकते हैं।
  2. प्रत्येक रिकॉर्ड के लिए, एक Async विधि को कॉल करें, जो उस रिकॉर्ड को संसाधित करेगा & इसे हटाएं।
  3. निष्पादक की अस्वीकृति नीति ABORT होना चाहिए, ताकि शेड्यूलर QUEUE को अनलॉक कर सके जो अभी तक प्रसंस्करण के लिए नहीं दिए गए हैं। इस तरह शेड्यूलर अगले पंक्ति में फिर से उन QUEUEs को संसाधित करने का प्रयास कर सकता है।

बेशक, आपको परिदृश्य को संभालना होगा, जहां शेड्यूलर ने QUEUE लॉक कर दिया है, लेकिन हैंडलर ने किसी भी कारण से इसे संसाधित नहीं किया है।

छद्म कोड:

public class QueueScheduler { 
    @AutoWired 
    private QueueHandler queueHandler; 

    @Scheduled(fixedDelay = 1000) 
    public void doSchedule() throws InterruptedException { 
     log.debug("Start schedule"); 
     List<Long> queueIds = lockAndFetchAllUnlockedQueues(); 
     for (long id : queueIds) 
      queueHandler.process(id); 
     log.debug("End schedule"); 
    } 
} 

public class QueueHandler { 

    @Async 
    public void process(long queueId) { 
     // process the QUEUE & delete it from DB 
    } 
} 
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10" 
    rejection-policy="ABORT"/>