पहले पर एक से अधिक एसिंक्रोनस अनुरोध भेजें, मुझे संदर्भ की व्याख्या करते हैं:एक Netty ग्राहक
मैं एक ग्राहक छवियों डाउनलोड करने के लिए कई HTTP अनुरोध भेज देंगे जो बनाने के लिए मिल गया है। इन अनुरोधों को असीमित होना चाहिए क्योंकि जैसे ही कोई छवि पूरी हो जाती है, इसे कतार में जोड़ा जाएगा और फिर स्क्रीन पर प्रिंट किया जाएगा। चूंकि छवियां बड़ी हो सकती हैं और प्रतिक्रियाएं टूट जाती हैं, इसलिए मेरे हैंडलर को इसे एक बफर में जोड़ना होता है।
तो मैं नेटटी उदाहरण कोड (HTTP spoon example) का पालन करता हूं।
वर्तमान में, मेरे पास चैनल आईडी और बफर/चंक बूलियन/मेरी अंतिम वस्तु के प्रत्येक चैनल के लिए स्टोर करने के लिए तीन स्थिर मानचित्र हैं।
private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();
उसके बाद, मैं अपना बूटस्ट्रैप क्लाइंट बना देता हूं और गिनने के लिए काउंटर लंबित अनुरोधों की संख्या डाउन करता हूं। जब प्रतिक्रिया छवि पूरी हो जाती है तो अंतिम कतार और काउंटर मेरे हैंडलर को पास कर दिया जाता है।
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 30000);
final CountDownLatch latch = new CountDownLatch(downloadList.size()) {
@Override
public void countDown() {
super.countDown();
if (getCount() <= 0) {
try {
queue.put(END_OF_QUEUE);
bootstrap.releaseExternalResources();
} catch (InterruptedException ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
}
}
};
bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));
इसके बाद मैं प्रत्येक छवि को डाउनलोड करने के लिए एक चैनल बनाता हूं और जब चैनल कनेक्ट होता है, तो अनुरोध बनाया और भेज दिया जाएगा। मेजबान और बंदरगाह पहले ही निकाला जा चुका है।
for (final ImagePack pack : downloadList) {
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture cf) throws Exception {
final Channel channel = future.getChannel();
PACK_MAP.put(channel.getId(), pack);
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);
if (channel.isWritable()) {
channel.write(request);
}
}
});
}
अब, यह मेरी ChannelHandler जो एक आंतरिक वर्ग कि विस्तार SimpleChannelUpstreamHandler
है। जब चैनल कनेक्ट होता है, BUFFER_MAP
में और CHUNKS_MAP
में एक नई प्रविष्टि बनाई गई है। BUFFER_MAP
में हैंडलर द्वारा चैनलों से छवियों को एकत्र करने और CHUNKS_MAP
में सभी छवियों को बफर का उपयोग किया जाता है जिसमें प्रतिक्रिया चंकित बूलियन होता है। जब प्रतिक्रिया पूरी हो जाती है, तो छवि InputSteam
कतार में जोड़ दी जाती है, लच गिनती है और चैनल बंद हो जाता है।
private class TileClientHandler extends SimpleChannelUpstreamHandler {
private CancellableQueue<Object> queue;
private CountDownLatch latch;
public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
super.writeComplete(ctx, e);
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Integer channelID = ctx.getChannel().getId();
if (!CHUNKS_MAP.get(channelID)) {
final HttpResponse response = (HttpResponse) e.getMessage();
if (response.isChunked()) {
CHUNKS_MAP.put(channelID, true);
} else {
final ChannelBuffer content = response.getContent();
if (content.readable()) {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(content);
BUFFER_MAP.put(channelID, buf);
messageCompleted(e);
}
}
} else {
final HttpChunk chunk = (HttpChunk) e.getMessage();
if (chunk.isLast()) {
CHUNKS_MAP.put(channelID, false);
messageCompleted(e);
} else {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(chunk.getContent());
BUFFER_MAP.put(channelID, buf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
latch.countDown();
e.getChannel().close();
}
private void messageCompleted(MessageEvent e) {
final Integer channelID = e.getChannel().getId();
if (queue.isCancelled()) {
return;
}
try {
final ImagePack p = PACK_MAP.get(channelID);
final ChannelBuffer b = BUFFER_MAP.get(channelID);
p.setBuffer(new ByteArrayInputStream(b.array()));
queue.put(p.getTile());
} catch (Exception ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
latch.countDown();
e.getChannel().close();
}
}
मेरे समस्या यह है, जब मैं इस कोड निष्पादित, मैं इन अपवादों मिल गया है:
java.lang.IllegalArgumentException: invalid version format: 3!}@
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
java.lang.IllegalArgumentException: invalid version format:
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format:
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
और भी कुछ एनपीई कुछ समय दिखाई देता है।
java.lang.NullPointerException
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
इन सभी कोड एक अनुरोध के लिए ठीक काम करता है, लेकिन कुछ अजीब सामान बफ़र्स पर संलग्न जब कई अनुरोध जहां भेजें।
कोई विचार जो मैं यहां याद कर रहा हूं? धन्यवाद।
मेरे पहले संस्करण में, मैं प्रत्येक अनुरोधित छवियों के लिए बूटस्ट्रैप/हैंडलर डुप्लिकेट करता हूं, यह ठीक काम करता है लेकिन बहुत अनुकूल नहीं है।
हाय जॉनस्टलर, इस त्वरित उपयोगी उत्तर के लिए धन्यवाद, अब मैं HTTPCodec dand को अपने टाइल हैंडलर को तुरंत चालू करने के लिए एक चैनलपिपलाइन फैक्टरी का उपयोग करता हूं। यह ठीक काम करता है, लेकिन मुझे अभी भी 'java.lang.IleglegalStateException मिला है: एक निष्पादक को अपने आप से प्राप्त धागे से बंद नहीं किया जा सकता है। कृपया सुनिश्चित करें कि आप I/O कार्यकर्ता थ्रेड से रिलीज एक्सेलरेटर्स स्रोत() को कॉल नहीं कर रहे हैं। अपवाद। क्या आपके पास इसके लिए कोई विचार था? और जानकारी के लिए, मैंने एक HttpChunkAggregator का उपयोग नहीं किया है, इसलिए आपको HttpChunkAggregator कन्स्ट्रक्टर में बफर आकार सेट करना होगा। – qboileau
आप bootstrap.releaseExternalResources को CountDownLatch.countDown के भीतर से कॉल कर रहे हैं, जिसे आपके हैंडलर विधियों में आईओ थ्रेड से बुलाया जा रहा है। दुर्भाग्य से आप यह नहीं कर सकते हैं। आपको थ्रेड से रिलीज एक्सेलरियल स्रोतों को कॉल करने की आवश्यकता है जो नेटटी द्वारा उपयोग किए जाने वाले थ्रेड पूल में नहीं है। एक विकल्प आपके थ्रेड में रिलीज एक्सेलरियल स्रोतों को कॉल करने के लिए हो सकता है जो कतार को संसाधित करने के बाद आपकी आंतरिक कतार से पढ़ रहा है। इसके अलावा, आप HttpChunkAggregator के बारे में पूरी तरह से सही हैं। माफ़ कीजिये! – johnstlr