2012-07-19 32 views
97

में का इंतजार है एक मेट्रो एप्लिकेशन में, मैं WCF के एक नंबर कॉल पर अमल करने की जरूरत है। कॉल की एक बड़ी संख्या बनाई जा रही है, इसलिए मुझे उन्हें समांतर लूप में करने की आवश्यकता है। समस्या यह है कि डब्ल्यूसीएफ कॉल पूरी होने से पहले समांतर लूप निकलता है।नेस्टिंग Parallel.ForEach

कैसे आप इस अपेक्षा के अनुरूप काम करने के लिए refactor होगा?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>(); 

Parallel.ForEach(ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 

उत्तर

101

Parallel.ForEach() के पीछे पूरे विचार आप धागे का एक सेट और संग्रह के प्रत्येक धागा प्रक्रियाओं हिस्सा होता है। जैसा कि आपने देखा है, यह async - await के साथ काम नहीं करता है, जहां आप एसिंक कॉल की अवधि के लिए थ्रेड जारी करना चाहते हैं।

ForEach() धागे को अवरुद्ध करके आप "ठीक" कर सकते हैं, लेकिन यह async - await के पूरे बिंदु को हरा देता है।

Parallel.ForEach() के बजाय TPL Dataflow का उपयोग करने के लिए आप क्या कर सकते हैं, जो एसिंक्रोनस Task एस का समर्थन करता है।

विशेष रूप से, अपने कोड एक TransformBlock कि async लैम्ब्डा का उपयोग कर एक Customer में प्रत्येक आईडी बदल देती का उपयोग कर लिखा जा सकता है। इस ब्लॉक को समानांतर में निष्पादित करने के लिए कॉन्फ़िगर किया जा सकता है। आप उस ब्लॉक को ActionBlock से लिंक करेंगे जो कंसोल पर प्रत्येक Customer लिखता है। आप ब्लॉक नेटवर्क सेट करने के बाद, आप कर सकते हैं Post()TransformBlock करने के लिए प्रत्येक आईडी।

कोड में:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i => 
    { 
     ICustomerRepo repo = new CustomerRepo(); 
     return await repo.GetCustomer(i); 
    }, new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
    }); 
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID)); 
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions 
    { 
     PropagateCompletion = true 
    }); 

foreach (var id in ids) 
    getCustomerBlock.Post(id); 

getCustomerBlock.Complete(); 
writeCustomerBlock.Completion.Wait(); 

हालांकि आप शायद कुछ छोटे निरंतर करने के लिए TransformBlock की समानांतरवाद सीमित करना चाहते हैं। इसके अलावा, आप TransformBlock की क्षमता को सीमित कर सकते हैं और आइटम को SendAsync() का उपयोग करके असीमित रूप से जोड़ सकते हैं, उदाहरण के लिए यदि संग्रह बहुत बड़ा है।

एक अतिरिक्त लाभ के रूप में जब अपने कोड की तुलना में (अगर यह काम किया) कि लेखन जैसे ही एक आइटम समाप्त हो गया है शुरू कर देंगे, और जब तक प्रसंस्करण के सभी समाप्त हो गया है के लिए इंतजार नहीं है।

+1

एक बहुत ही संक्षिप्त सिंहावलोकन, प्रतिक्रियाशील एक्सटेंशन, TPL और TPL dataflow - http://vantsuyoshi.wordpress.com/2012/01/05/जब-टू-यूज-टीपीएल-एसिंक-रीएक्टिव-एक्सटेंशन-टीपीएल-डेटाफ्लो/मेरे जैसे लोगों के लिए जिन्हें कुछ स्पष्टता की आवश्यकता हो सकती है। –

+1

मुझे पूरा यकीन है कि यह उत्तर प्रसंस्करण को समानांतर नहीं करता है। मेरा मानना ​​है कि आपको समानांतर करने की ज़रूरत है। आईडी पर आचरण करें और उनको पोस्ट करें कस्टमरब्लॉक पर पोस्ट करें। जब मैंने इस सुझाव का परीक्षण किया तो कम से कम यही मिला। – JasonLind

+2

@ जेसनलिंड यह वास्तव में करता है। समांतर में 'समानांतर। फ़ॉरएच() 'से' पोस्ट()' आइटम का उपयोग करना कोई वास्तविक प्रभाव नहीं होना चाहिए। – svick

79

svick's answer (हमेशा की तरह) उत्कृष्ट है।

हालांकि, मैं आप वास्तव में स्थानांतरण करने के लिए डेटा की बड़ी मात्रा है जब Dataflow और अधिक उपयोगी हो पाते हैं। या जब आपको async-संगत कतार की आवश्यकता होती है।

आपके मामले में, एक सरल समाधान सिर्फ async शैली समानांतरवाद उपयोग करने के लिए है:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var customerTasks = ids.Select(i => 
    { 
    ICustomerRepo repo = new CustomerRepo(); 
    return repo.GetCustomer(i); 
    }); 
var customers = await Task.WhenAll(customerTasks); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 
+7

में ट्रांसफॉर्मब्लॉक पर करते हैं, यदि आप समानांतरता को मैन्युअल रूप से सीमित करना चाहते हैं (जो आप इस मामले में सबसे अधिक संभावना रखते हैं), तो ऐसा करने से यह अधिक जटिल होगा। – svick

+1

अच्छा बिंदु। डेटाफ्लो के लिए अच्छे knobs है। –

+0

लेकिन आप सही हैं कि डेटाफ्लो काफी जटिल हो सकता है (उदाहरण के लिए जब 'समानांतर। फॉरएच() 'की तुलना में)। लेकिन मुझे लगता है कि वर्तमान में संग्रह के साथ लगभग किसी भी 'async' काम करने का सबसे अच्छा विकल्प है। – svick

47

overkill हो सकता है सुझाव दिया svick रूप dataflow का उपयोग करना, और स्टीफन के जवाब साधन प्रदान नहीं करता है के संगामिति नियंत्रित करने के लिए आपरेशन। एक सूची के बजाय एक सरणी का उपयोग करने और पूर्ण हो चुके कार्यों की जगह

public static async Task RunWithMaxDegreeOfConcurrency<T>(
    int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory) 
{ 
    var activeTasks = new List<Task>(maxDegreeOfConcurrency); 
    foreach (var task in collection.Select(taskFactory)) 
    { 
     activeTasks.Add(task); 
     if (activeTasks.Count == maxDegreeOfConcurrency) 
     { 
      await Task.WhenAny(activeTasks.ToArray()); 
      //observe exceptions here 
      activeTasks.RemoveAll(t => t.IsCompleted); 
     } 
    } 
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    { 
     //observe exceptions in a manner consistent with the above 
    }); 
} 

ToArray() कॉल अनुकूलित किया जा सकता है, लेकिन मुझे शक है यह सबसे स्थितियों में बहुत अधिक अंतर होगा: हालांकि, कि बजाय बस हासिल किया जा सकता।ओपी के सवाल प्रति नमूना उपयोग:

RunWithMaxDegreeOfConcurrency(10, ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

संपादित बंदे अतः उपयोगकर्ता और TPL जानकार Eli Arbel एक related article from Stephen Toub करने के लिए मुझे इशारा किया। हमेशा की तरह, उसके कार्यान्वयन है दोनों सुरुचिपूर्ण और कुशल:

public static Task ForEachAsync<T>(
     this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current).ContinueWith(t => 
          { 
           //observe exceptions 
          }); 

     })); 
} 
+0

एली अरबेल विकल्प से प्यार करें। कुछ अनुवर्ती प्रश्न हैं: मुझे प्रगति का ट्रैक रखना अच्छा लगेगा। मैंने विधि में 'रेफ इंट किया' जोड़ा, और फिर 'जारी किए गए ++' के साथ जारी रखें लेकिन "अज्ञात विधि, लैम्ब्डा अभिव्यक्ति या क्वेरी अभिव्यक्ति के अंदर रेफरी या आउट पैरामीटर का उपयोग नहीं कर सकता ... किसी भी विचार को प्रगति को ट्रैक करने के लिए कैसे? – Stefanvds

+0

कभी नहीं, मैं foreachasync कोड – Stefanvds

+0

में बस ++ कर सकता हूं मुझे लगता है कि 'ForEachAsync' कोड अपेक्षित रूप से काम नहीं करता है। कम से कम, हमेशा किसी कारण या अन्य के लिए नहीं (मैं वर्तमान में यह नहीं बता सकता कि क्या हो रहा है)।' dop = 5 'कोड को कॉल करते समय मुझे अलग-अलग परिणाम मिलेंगे (मुझे हमेशा वही मिलना चाहिए - डेटा नहीं बदला जाना चाहिए)! सावधान रहें! – Shaamaan

7

लपेटें एक Task.Run में Parallel.Foreach() और बदले का इंतजार कीवर्ड उपयोग [yourasyncmethod] की .Result

(आप की जरूरत है

var yourForeachTask = Task.Run(() => 
     { 
      Parallel.ForEach(ids, i => 
      { 
       ICustomerRepo repo = new CustomerRepo(); 
       var cust = repo.GetCustomer(i).Result; 
       customers.Add(cust); 
      }); 
     }); 
await yourForeachTask; 
+3

? मैं यह वास्तव में इस तरह किया है चाहता हूँ। चलो 'Parallel.ForEach' करना सब जब तक किया जाता है, और फिर एक पृष्ठभूमि धागा करने के लिए पूरी बात धक्का एक संवेदनशील यूआई है। उस के साथ किसी भी मुद्दों? हो सकता है कि बहुत ज्यादा एक नींद की धागा है, लेकिन यह छोटा है, पठनीय कोड। – ygoe

+0

@LonelyPixel मेरी केवल मुद्दा यह है कि है है कॉल 'टास्क.रुन' डब्ल्यू मुर्गी 'कार्यसंचार स्रोत' बेहतर है। – Gusdor

+1

@ गुस्डर उत्सुक - 'कार्य पूर्णीकरण स्रोत 'क्यों बेहतर है? – Seafish

6

यह सुंदर कुशल, और पूरे TPL Dataflow काम मिल रहा है की तुलना में आसान होना चाहिए: यूआई धागा ब्लॉक नहीं करने के लिए)

कुछ इस तरह Task.Run काम करते हैं ing:

var customers = await ids.SelectAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    return await repo.GetCustomer(i); 
}); 

... 

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4) 
{ 
    var results = new List<TResult>(); 

    var activeTasks = new HashSet<Task<TResult>>(); 
    foreach (var item in source) 
    { 
     activeTasks.Add(selector(item)); 
     if (activeTasks.Count >= maxDegreesOfParallelism) 
     { 
      var completed = await Task.WhenAny(activeTasks); 
      activeTasks.Remove(completed); 
      results.Add(completed.Result); 
     } 
    } 

    results.AddRange(await Task.WhenAll(activeTasks)); 
    return results; 
} 
+0

उपयोग उदाहरण को 'प्रतीक्षा' का उपयोग नहीं करना चाहिए: 'var customers = ids.SelectAsync (async i => {...});'? – Paccc

+0

@pacc: आप सही हैं। फिक्स्ड। –

1

मैं एक छोटे से देर से कर रहा हूँ पार्टी के लिए लेकिन आप GetAwaiter.GetResult() का उपयोग कर सिंक संदर्भ में अपने async कोड को चलाने के लिए विचार कर सकते हैं, लेकिन नीचे के रूप में paralled के रूप में;

Parallel.ForEach(ids, i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    // Run this in thread which Parallel library occupied. 
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); 
    customers.Add(cust); 
}); 
7

आप नए AsyncEnumerator NuGet Package है, जो 4 साल पहले अस्तित्व में नहीं था जब प्रश्न मूल रूप से तैनात किया गया था के साथ प्रयास बचा सकता है। यह आप समानांतरवाद की डिग्री को नियंत्रित करने के लिए अनुमति देता है:

using System.Collections.Async; 
... 

await ids.ParallelForEachAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}, 
maxDegreeOfParallelism: 10); 

अस्वीकरण: मैं AsyncEnumerator पुस्तकालय है, जो खुला स्रोत और एमआईटी के तहत लाइसेंस प्राप्त है के लेखक हूँ, और मैं सिर्फ समुदाय की मदद करने के लिए इस संदेश पोस्ट कर रहा हूँ ।

+4

सेर्गेई, आपको यह खुलासा करना चाहिए कि आप पुस्तकालय –

+2

ठीक लेखक हैं, अस्वीकरण जोड़ा गया। मैं इसे विज्ञापन से किसी भी लाभ की तलाश नहीं कर रहा हूं, बस लोगों की मदद करना चाहता हूं;) –

+0

बहुत उपयोगी लाइब्रेरी, आशा है कि जल्द ही कोर में शामिल किया जाएगा – rekiem87

1

सहायक तरीकों में से एक गुच्छा शुरू करने के बाद, तो आप इस सरल sintax साथ सक्षम रन समानांतर प्रश्नों होगा:

const int DegreeOfParallelism = 10; 
IEnumerable<double> result = await Enumerable.Range(0, 1000000) 
    .Split(DegreeOfParallelism) 
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .ConfigureAwait(false); 

यहां होता हम 10 मात्रा (.Split(DegreeOfParallelism)) में स्रोत संग्रह विभाजित है, तो 10 कार्यों को चलाने प्रत्येक अपनी वस्तुओं को एक-एक करके (.SelectManyAsync(...)) संसाधित करता है और उन्हें वापस एक सूची में विलय करता है।

उल्लेख वहाँ एक सरल तरीका है वर्थ:

double[] result2 = await Enumerable.Range(0, 1000000) 
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .WhenAll() 
    .ConfigureAwait(false); 

लेकिन यह एक एहतियात की जरूरत है: यदि आप एक स्रोत संग्रह है कि बहुत बड़ा है है, यह हर आइटम के लिए एक Task अभी chedule होगा, जो महत्वपूर्ण प्रदर्शन हिट का कारण बन सकता है।

एक्सटेंशन देखो ऊपर के उदाहरण में प्रयुक्त विधियों इस प्रकार है: async की

public static class CollectionExtensions 
{ 
    /// <summary> 
    /// Splits collection into number of collections of nearly equal size. 
    /// </summary> 
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount) 
    { 
     if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); 

     List<T> source = src.ToList(); 
     var sourceIndex = 0; 
     for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) 
     { 
      var list = new List<T>(); 
      int itemsLeft = source.Count - targetIndex; 
      while (slicesCount * list.Count < itemsLeft) 
      { 
       list.Add(source[sourceIndex++]); 
      } 

      yield return list; 
     } 
    } 

    /// <summary> 
    /// Takes collection of collections, projects those in parallel and merges results. 
    /// </summary> 
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
     this IEnumerable<IEnumerable<T>> source, 
     Func<T, Task<TResult>> func) 
    { 
     List<TResult>[] slices = await source 
      .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) 
      .WhenAll() 
      .ConfigureAwait(false); 
     return slices.SelectMany(s => s); 
    } 

    /// <summary>Runs selector and awaits results.</summary> 
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) 
    { 
     List<TResult> result = new List<TResult>(); 
     foreach (TSource source1 in source) 
     { 
      TResult result1 = await selector(source1).ConfigureAwait(false); 
      result.Add(result1); 
     } 
     return result; 
    } 

    /// <summary>Wraps tasks with Task.WhenAll.</summary> 
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source) 
    { 
     return Task.WhenAll<TResult>(source); 
    } 
}