2012-09-27 19 views
5

के साथ आलसी, स्ट्रीम संचालित ऑब्जेक्ट सीरियलाइजेशन हम बड़ी मात्रा में डेटा स्ट्रीमिंग के लिए डब्ल्यूसीएफ सेवा विकसित कर रहे हैं, इसलिए हमने WCF Streaming कार्यक्षमता का उपयोग protobuf-net क्रमबद्धता के साथ संयुक्त करने के लिए चुना है।प्रोटोबफ-नेट

प्रसंग:

आम तौर पर एक विचार, सेवा में वस्तुओं को क्रमानुसार उन्हें एक धारा में लिख सकते हैं और भेजने के लिए है। दूसरी तरफ कॉलर स्ट्रीम ऑब्जेक्ट प्राप्त करेगा और यह सभी डेटा पढ़ सकता है।

तो वर्तमान में सेवा विधि कोड इस तरह कुछ हद तक दिखता है:

public Result TestMethod(Parameter parameter) 
{ 
    // Create response 
    var responseObject = new BusinessResponse { Value = "some very large data"}; 

    // The resposne have to be serialized in advance to intermediate MemoryStream 
    var stream = new MemoryStream(); 
    serializer.Serialize(stream, responseObject); 
    stream.Position = 0; 

    // ResultBody is a stream, Result is a MessageContract 
    return new Result {ResultBody = stream}; 
} 

BusinessResponse वस्तु एक MemoryStream करने के लिए धारावाहिक है और वह एक विधि से दिया जाता है। ग्राहक पक्ष पर बुला कोड कि तरह लग रहा है:

var parameter = new Parameter(); 

// Call the service method 
var methodResult = channel.TestMethod(parameter); 

// protobuf-net deserializer reads from a stream received from a service. 
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored 
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody); 
return result; 

तो जब serializer.Deserialize() कहा जाता है यह एक धारा methodResult.ResultBody से पढ़ता सेवा पक्ष WCF पर एक ही समय पर एक MemoryStream, कि से लौटा दिया गया है पढ़ रही है एक TestMethod

समस्या:

हम क्या प्राप्त करने के लिए चाहते हैं एक ही बार में एक MemoryStream और प्रारंभिक सेवा तरफ पूरी वस्तु की क्रमबद्धता से छुटकारा पाने के लिए है। चूंकि हम स्ट्रीमिंग का उपयोग करते हैं, इसलिए हम भेजने से पहले स्मृति में एक क्रमबद्ध ऑब्जेक्ट को रखने से बचना चाहते हैं।

विचार:

आदर्श समाधान एक खाली, कस्टम निर्मित स्ट्रीम वस्तु (TestMethod() से) एक वस्तु के लिए एक संदर्भ है कि मेरे उदाहरण में श्रृंखलाबद्ध जा करने के लिए ('BusinessResponse' वस्तु के साथ वापस जाने के लिए किया जाएगा)। तो जब डब्ल्यूसीएफ मेरी स्ट्रीम के Read() विधि को कॉल करता है, तो मैं प्रोटोबफ-नेट का उपयोग करके ऑब्जेक्ट का एक टुकड़ा आंतरिक रूप से क्रमबद्ध करता हूं और इसे स्मृति में संग्रहीत किए बिना कॉलर पर वापस कर देता हूं।

और अब एक समस्या है, क्योंकि हमें वास्तव में जो चाहिए वह धारा के पढ़ने के समय उस वस्तु के टुकड़े को क्रमबद्ध करने की संभावना है। मैं समझता हूं कि यह धारावाहिकता का एक बिल्कुल अलग तरीका है - एक ऑब्जेक्ट को एक धारावाहिक को धक्का देने के बजाय, मैं टुकड़े से क्रमबद्ध सामग्री टुकड़े का अनुरोध करना चाहता हूं।

क्या प्रोटोबफ-नेट का उपयोग करके इस प्रकार का क्रमबद्धता संभव है?

+0

क्या यह एक वस्तु है? या वस्तुओं की एक श्रृंखला (एक संग्रह)? चाहे यह देखने के लायक है, वास्तव में आपके डब्ल्यूसीएफ कॉन्फ़िगरेशन पर निर्भर करता है - अधिकांश कॉन्फ़िगरेशन में यह हमेशा पूरे संदेश को मेमोरी * बफर में हमेशा बफर करेगा * - इसलिए कुछ भी नहीं बदला जा सकता है। –

+0

हाय मार्क, डब्ल्यूसीएफ को बफरिंग का उपयोग न करने के लिए कॉन्फ़िगर किया गया है - यह स्ट्रीमिंग का बिंदु है - मैं सर्वर पक्ष पर मेमोरी पदचिह्न को कम करना चाहता हूं। इसके अतिरिक्त यदि मैं ऑब्जेक्ट्स के संग्रह को क्रमबद्ध करना चाहता हूं तो क्लाइंट कॉल 'रीड()' के दौरान प्रत्येक बार 'SerializeWithLengthPrefix()' का उपयोग करेगा और मेरा अंतर्निहित बफर अनुरोधित डेटा राशि से छोटा है। यहां मुद्दा यह है कि मैं एकल ऑब्जेक्ट क्रमबद्धता को विभाजित करने में सक्षम होना चाहता हूं। –

+0

दिलचस्प सवाल। मैं * सोचता हूं * यह सामान्य रूप से एक स्पूफ स्ट्रीम के लिए सामान्यीकृत किया जा सकता है जो सह-दिनचर्या के रूप में पढ़ना और लिखना काम करता है। यदि आपको अतिरिक्त थ्रेड होने पर कोई फर्क नहीं पड़ता है, तो यह एक साधारण गेट के साथ किया जा सकता है, हालांकि आईआईआरसी जॉन के कुछ दिलचस्प विचार थे। मुझे एक नज़र रखना होगा और आपको वापस लेना होगा। हालांकि, मैं बिना किसी संदेह के कह सकता हूं कि उद्देश्य के लिए प्रोटोबफ-नेट के कोर को हैकिंग का इरादा नहीं है :) –

उत्तर

2

मैंने कुछ कोड पकाया जो शायद मार्क के गेट विचार की रेखाओं के साथ है।

public class PullStream : Stream 
{ 
    private byte[] internalBuffer; 
    private bool ended; 
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false); 
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true); 

    public override bool CanRead 
    { 
     get { return true; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return true; } 
    } 

    public override void Flush() 
    { 
     throw new NotImplementedException(); 
    } 

    public override long Length 
    { 
     get { throw new NotImplementedException(); } 
    } 

    public override long Position 
    { 
     get 
     { 
      throw new NotImplementedException(); 
     } 
     set 
     { 
      throw new NotImplementedException(); 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     dataAvailable.WaitOne(); 
     if (count >= internalBuffer.Length) 
     { 
      var retVal = internalBuffer.Length; 
      Array.Copy(internalBuffer, buffer, retVal); 
      internalBuffer = null; 
      dataAvailable.Reset(); 
      dataEmpty.Set(); 
      return retVal; 
     } 
     else 
     { 
      Array.Copy(internalBuffer, buffer, count); 
      internalBuffer = internalBuffer.Skip(count).ToArray(); // i know 
      return count; 
     } 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void SetLength(long value) 
    { 
     throw new NotImplementedException(); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[count]; 
     Array.Copy(buffer, internalBuffer, count); 

     Debug.WriteLine("Writing some data"); 

     dataAvailable.Set(); 
    } 

    public void End() 
    { 
     dataEmpty.WaitOne(); 
     dataEmpty.Reset(); 

     internalBuffer = new byte[0]; 

     Debug.WriteLine("Ending writes"); 

     dataAvailable.Set(); 
    } 
} 

यह एक साधारण धारा वंश वर्ग है जो केवल पढ़ने और लिखने (और अंत) को कार्यान्वित करता है। कोई डेटा उपलब्ध नहीं होने पर डेटा पढ़ें और डेटा उपलब्ध होने पर ब्लॉक लिखें। इस तरह से केवल एक बाइट बफर शामिल है। बाकी की लिनक प्रतिलिपि अनुकूलन के लिए खुली है ;-) अंत विधि जोड़ दी गई है, इसलिए कोई अवरोध नहीं होता है जहां कोई डेटा उपलब्ध नहीं होने पर पढ़ा जाता है और कोई डेटा और नहीं लिखा जाएगा।

आपको इस स्ट्रीम को एक अलग धागे से लिखना है।मैं इस के नीचे प्रदर्शित:

// create a large object 
    var obj = new List<ToSerialize>(); 
    for(int i = 0; i <= 1000; i ++) 
     obj.Add(new ToSerialize { Test = "This is my very loooong message" }); 
    // create my special stream to read from 
    var ms = new PullStream(); 
    new Thread(x => 
    { 
     ProtoBuf.Serializer.Serialize(ms, obj); 
     ms.End(); 
    }).Start(); 
    var buffer = new byte[100]; 
    // stream to write back to (just to show deserialization is working too) 
    var ws = new MemoryStream(); 
    int read; 
    while ((read = ms.Read(buffer, 0, 100)) != 0) 
    { 
     ws.Write(buffer, 0, read); 
     Debug.WriteLine("read some data"); 
    } 
    ws.Position = 0; 
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws); 

मुझे आशा है कि यह आपकी समस्या का हल :-) यह वैसे भी इस कोड करने के लिए मजेदार था।

सम्मान, जैको

+0

पुलस्ट्रीम को वापस करना संभव है (यह सुनिश्चित न करें कि नाम क्या करने की कोशिश करता है) आपके परिणाम के रूप में – Jacco