स्टैक ओवरफ्लो मेरे लिए जबरदस्त मदद कर रहा है और मैं समुदाय को कुछ देना चाहता हूं। मैं Stackoverflow से जो सीखा है, उसका उपयोग करके, TinyThread ++ website C++ पोर्टेबल थ्रेड लाइब्रेरी का उपयोग करके एक साधारण थ्रेडपूल को कार्यान्वित कर रहा हूं।सरल सी ++ थ्रेडपूल कार्यान्वयन पर प्रश्न
// ThreadPool.h
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();
// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);
// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob(void (*workFunc)(void *), void *workData);
// Begins execution of all the jobs in the pool.
void StartJobs();
// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();
private:
enum typeOfWorkEnum { e_work, e_quit };
class ThreadData
{
public:
bool ready; // thread has been created and is ready for work
bool haveWorkToDo;
typeOfWorkEnum typeOfWork;
// Pointer to the work function each thread has to call.
void (*workFunc)(void *);
// Pointer to work data
void *workData;
ThreadData() : ready(false), haveWorkToDo(false) { };
};
struct ThreadArgStruct
{
ThreadPool *threadPoolInstance;
int threadId;
};
// Data for each thread
ThreadData *m_ThreadData;
ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden
// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}
// The function each thread calls
void ThreadFunc(int threadId);
// Called by the thread pool destructor
void DestroyThreadPool();
// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;
// List of threads
std::vector<tthread::thread *> m_ThreadList;
// Condition variable to signal each thread has been created and executing
tthread::mutex m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;
// Condition variable to signal each thread to start work
tthread::mutex m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;
// Condition variable to signal the main thread that
// all threads in the pool have completed their work
tthread::mutex m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};
cpp फ़ाइल:
//
// ThreadPool.cpp
//
#include "ThreadPool.h"
// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc(int threadId)
{
ThreadData *myThreadData = &m_ThreadData[threadId];
std::cout << "Hello world: Thread " << threadId << std::endl;
// Signal that this thread is ready
m_ThreadReady_mutex.lock();
myThreadData->ready = true;
m_ThreadReady_condvar.notify_one(); // notify the main thread
m_ThreadReady_mutex.unlock();
while(true)
{
//tthread::lock_guard<tthread::mutex> guard(m);
m_WorkToDo_mutex.lock();
while(!myThreadData->haveWorkToDo) // check for work to do
m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here
myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex
m_WorkToDo_mutex.unlock();
// Do the work
switch(myThreadData->typeOfWork)
{
case e_work:
std::cout << "Thread " << threadId << ": Woken with work to do\n";
// Do work
myThreadData->workFunc(myThreadData->workData);
std::cout << "#Thread " << threadId << ": Work is completed\n";
break;
case e_quit:
std::cout << "Thread " << threadId << ": Asked to quit\n";
return; // ends the thread
}
// Now to signal the main thread that my work is completed
m_WorkCompleted_mutex.lock();
m_NumOfThreadsDoingWork--;
// Unsure if this 'if' would make the program more efficient
// if(m_NumOfThreadsDoingWork == 0)
m_WorkCompleted_condvar.notify_one(); // notify the main thread
m_WorkCompleted_mutex.unlock();
}
}
ThreadPool::ThreadPool()
{
m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}
ThreadPool::~ThreadPool()
{
if(m_numOfThreads)
{
DestroyThreadPool();
delete [] m_ThreadData;
}
}
void ThreadPool::CreateThreads(int numOfThreads)
{
// Check if a thread pool has already been created
if(m_numOfThreads > 0)
return;
m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;
for(int i=0; i<m_numOfThreads; ++i)
{
threadArg.threadId = i;
threadArg.threadPoolInstance = this;
// Creates the thread and saves it in a list so we can destroy it later
m_ThreadList.push_back(new tthread::thread(ThreadFuncWrapper, (void *)&threadArg ));
// It takes a little time for a thread to get established.
// Best wait until it gets established before creating the next thread.
m_ThreadReady_mutex.lock();
while(!m_ThreadData[i].ready) // Check if thread is ready
m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
m_ThreadReady_mutex.unlock();
}
}
// Assigns a job to a thread, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
// Check if the thread pool has been created
if(!m_numOfThreads)
return;
if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
return;
m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;
std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;
m_NumOfThreadsGivenJobs++;
}
void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs)
return;
// Set 'haveworkToDo' flag for all threads
m_WorkToDo_mutex.lock();
for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
{
m_ThreadData[i].typeOfWork = e_work; // forgot to do this !
m_ThreadData[i].haveWorkToDo = true;
}
m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;
// Reset this counter so we can resubmit jobs later
m_NumOfThreadsGivenJobs = 0;
// Notify all threads they have work to do
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();
}
void ThreadPool::WaitForJobsToComplete()
{
// Check that a thread pool has been created
if(!m_numOfThreads)
return;
m_WorkCompleted_mutex.lock();
while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work
m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
m_WorkCompleted_mutex.unlock();
}
void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
for(int i=0; i<m_numOfThreads; ++i)
{
m_ThreadData[i].haveWorkToDo = true;
m_ThreadData[i].typeOfWork = e_quit;
}
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();
// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
{
tthread::thread *t = m_ThreadList[i];
// Wait for thread to complete
t->join();
}
m_numOfThreads = 0;
}
मैं आदि प्रोग्रामिंग थ्रेड के लिए नए हैं, इसलिए नहीं कि mutexes के साथ सहज, मैं एक सवाल सबसे अच्छा कोड (जो लिनक्स के तहत काफी अच्छी तरह से चलाता है) पेश करने के बाद पूछा है कर रहा हूँ
उपयोग का उदाहरण: (यह वर्गों के पारस्परिक परिणामों को जोड़कर पीआई-वर्ग/6 की गणना करता है) दरअसल, यह उपयोग उदाहरण समानांतर में समान गणना 10 गुणा चलाता है। प्रत्येक थ्रेड के लिए सारांशित शब्दों के एक अलग सेट की गणना करने के लिए एक और व्यावहारिक उपयोग होगा। एक बार पूल नौकरी पूरी होने के बाद अंतिम परिणाम सभी धागे परिणामों को जोड़कर प्राप्त किया जाता है।
struct CalculationDataStruct
{
int inputVal;
double outputVal;
};
void LongCalculation(void *theSums)
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;
int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
sum += 1.0/(double(i)*double(i));
sums->outputVal = sum;
}
int main(int argc, char** argv)
{
int numThreads = 10;
// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);
// Create thread workspace
CalculationDataStruct sums[numThreads];
// Set up jobs
for(int i=0; i<numThreads; i++)
{
sums[i].inputVal = 3000*(i+1);
threadPool.SubmitJob(LongCalculation, &sums[i]);
}
// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();
// Print results
for(int i=0; i<numThreads; i++)
std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;
return 0;
}
प्रश्न: ThreadPool :: ThreadFunc विधि में, बेहतर प्रदर्शन प्राप्त किया जाएगा यदि निम्न अगर बयान
if(NumOfThreadsDoingWork == 0)
शामिल किया गया था? इसके अलावा, मैं कोड सुधारने के लिए आलोचनाओं और तरीकों का आभारी हूं। साथ ही, मुझे उम्मीद है कि कोड दूसरों के लिए उपयोग की जाएगी।
जब तक एक ही काम पूरा होने के बाद मुख्य धागे के लिए कुछ नहीं होता है, तब तक सभी नौकरियों को पूरा होने तक सिग्नलिंग में कोई बात नहीं होती है। मुख्य धागे को जागने का मुद्दा है यदि यह सब करने जा रहा है तो तुरंत सो जाओ। उस ने कहा, मुझे संदेह है कि ओवरहेड पर्याप्त है कि आप अंतर को मापने में सक्षम होंगे (जब तक आपके थ्रेड पूल में बड़ी संख्या में धागे नहीं हैं) –
धन्यवाद जेएफ, यह मेरी सोच भी थी। मैंने 'if' कथन के साथ और उसके बिना कोड की कोशिश की और किसी भी प्रदर्शन अंतर का पता नहीं लगा सका, लेकिन फिर, मैं केवल 10 धागे का उपयोग कर रहा था, जो अधिकतम है कि मैं अपने ऐप्स में वैसे भी उपयोग कर रहा हूं। – ticketman
एक बग फिक्स किया गया है जो विंडोज डीबग बिल्ड को ठीक से काम करता है: जोड़ा गया: m_ThreadData [i] .typeOfWork = e_work; फंक्शन StartJobs() में फॉर-लूप के लिए। – ticketman