26 struct ThreadPool::ThreadPoolThread :
public Thread
28 ThreadPoolThread (
ThreadPool& p,
size_t stackSize)
29 :
Thread (
"Pool", stackSize), pool (p)
36 if (! pool.runNextJob (*
this))
40 std::atomic<ThreadPoolJob*> currentJob {
nullptr };
43 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
55 jassert (pool ==
nullptr || ! pool->
contains (
this));
76 listeners.add (listener);
81 listeners.remove (listener);
87 return t->currentJob.load();
95 jassert (numThreads > 0);
97 createThreads (numThreads, threadStackSize);
107 removeAllJobs (
true, 5000);
111 void ThreadPool::createThreads (
int numThreads,
size_t threadStackSize)
113 for (
int i = jmax (1, numThreads); --i >= 0;)
114 threads.add (
new ThreadPoolThread (*
this, threadStackSize));
116 for (
auto* t : threads)
120 void ThreadPool::stopThreads()
122 for (
auto* t : threads)
123 t->signalThreadShouldExit();
125 for (
auto* t : threads)
131 jassert (job !=
nullptr);
132 jassert (job->pool ==
nullptr);
134 if (job->pool ==
nullptr)
137 job->shouldStop =
false;
138 job->isActive =
false;
139 job->shouldBeDeleted = deleteJobWhenFinished;
146 for (
auto* t : threads)
158 std::function<ThreadPoolJob::JobStatus()> job;
161 addJob (
new LambdaJobWrapper (jobToRun),
true);
168 LambdaJobWrapper (std::function<
void()> j) :
ThreadPoolJob (
"lambda"), job (j) {}
171 std::function<void()> job;
174 addJob (
new LambdaJobWrapper (jobToRun),
true);
185 return threads.size();
197 return jobs.contains (const_cast<ThreadPoolJob*> (job));
203 return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
210 auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
212 if (index > 0 && ! job->isActive)
213 jobs.move (index, 0);
222 while (contains (job))
227 jobFinishedSignal.wait (2);
236 bool dontWait =
true;
243 if (jobs.contains (job))
247 if (interruptIfRunning)
254 jobs.removeFirstMatchingValue (job);
255 addToDeleteList (deletionList, job);
260 return dontWait || waitForJobToFinish (job, timeOutMs);
274 for (
int i = jobs.size(); --i >= 0;)
276 auto* job = jobs.getUnchecked(i);
278 if (selectedJobsToRemove ==
nullptr || selectedJobsToRemove->
isJobSuitable (job))
282 jobsToWaitFor.
add (job);
284 if (interruptRunningJobs)
285 job->signalJobShouldExit();
290 addToDeleteList (deletionList, job);
301 for (
int i = jobsToWaitFor.
size(); --i >= 0;)
305 if (! isJobRunning (job))
309 if (jobsToWaitFor.
size() == 0)
315 jobFinishedSignal.wait (20);
326 for (
auto* job : jobs)
327 if (job->isActive || ! onlyReturnActiveJobs)
328 s.
add (job->getJobName());
337 for (
auto* t : threads)
338 if (! t->setPriority (newPriority))
351 for (
int i = 0; i < jobs.size(); ++i)
353 if (
auto* job = jobs[i])
360 addToDeleteList (deletionList, job);
365 job->isActive =
true;
375 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
377 if (
auto* job = pickNextJobToRun())
380 thread.currentJob = job;
384 result = job->runJob();
391 thread.currentJob =
nullptr;
398 if (jobs.contains (job))
400 job->isActive =
false;
404 jobs.removeFirstMatchingValue (job);
405 addToDeleteList (deletionList, job);
407 jobFinishedSignal.signal();
412 jobs.move (jobs.indexOf (job), -1);
425 job->shouldStop =
true;
428 if (job->shouldBeDeleted)
429 deletionList.
add (job);
int getNumJobs() const noexcept
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
void removeListener(Thread::Listener *)
int getNumThreads() const noexcept
Thread(const String &threadName, size_t threadStackSize=0)
void add(const ElementType &newElement)
virtual bool isJobSuitable(ThreadPoolJob *job)=0
ElementType getUnchecked(int index) const
virtual JobStatus runJob()=0
static ThreadPoolJob * getCurrentThreadPoolJob()
bool isJobRunning(const ThreadPoolJob *job) const noexcept
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
bool threadShouldExit() const
ThreadPoolJob(const String &name)
void addListener(Thread::Listener *)
int size() const noexcept
virtual void exitSignalSent()=0
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
ObjectClass * add(ObjectClass *newObject)
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
bool setThreadPriorities(int newPriority)
String getJobName() const
void signalJobShouldExit()
static Thread *JUCE_CALLTYPE getCurrentThread()
void setJobName(const String &newName)
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
ThreadPoolJob * getJob(int index) const noexcept
void remove(int indexToRemove)
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
static int getNumCpus() noexcept
void add(String stringToAdd)
bool contains(const ThreadPoolJob *job) const noexcept
static uint32 getMillisecondCounter() noexcept