Skip to content

Commit

Permalink
ENH: Improve Visual DICOM browser
Browse files Browse the repository at this point in the history
ENH: Clean servers API calls
PERF: Improve queue jobs performances by cleaning mutex use
ENH: Improve inserter jobs log report

Co-authored-by: Andras Lasso <lasso@queensu.ca>
  • Loading branch information
Punzo and lassoan committed Sep 9, 2024
1 parent f3b6cb6 commit e4e1891
Show file tree
Hide file tree
Showing 18 changed files with 301 additions and 191 deletions.
151 changes: 94 additions & 57 deletions Libs/Core/ctkJobScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
// Qt includes
#include <QCoreApplication>
#include <QDebug>
#include <QMutexLocker>
#include <QReadLocker>
#include <QWriteLocker>
#include <QSharedPointer>
#include <QThreadPool>
#include <QUuid>
Expand Down Expand Up @@ -68,20 +69,18 @@ void ctkJobSchedulerPrivate::init()
void ctkJobSchedulerPrivate::queueJobsInThreadPool()
{
Q_Q(ctkJobScheduler);
// NOTE: No need to queue jobs with a signal/slot mechanism, since the mutex makes
// sure that concurrent threads append/clean/delete the jobs map.

if (this->FreezeJobsScheduling)
{
return;
}

// No need to queue jobs with a signal/slot mechanism, since the mutex makes
// sure that concurrent threads append/clean/delete the jobs map.

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);

// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
foreach (QThread::Priority priority, (QList<QThread::Priority>()
<< QThread::Priority::HighestPriority
<< QThread::Priority::HighPriority
Expand Down Expand Up @@ -109,21 +108,24 @@ void ctkJobSchedulerPrivate::queueJobsInThreadPool()
int numberOfRunningJobsWithSameType = this->getSameTypeJobsInThreadPoolQueueOrRunning(job);
if (numberOfRunningJobsWithSameType >= job->maximumConcurrentJobsPerType())
{
continue;
// When the maximum number of concurrent jobs of the same type is reached,
// return early instead of adding more jobs to an already crowded queue.
// This allows the scheduler time to finish the currently running jobs,
// preventing a jobs traffic jam.
return;
}

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker = QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);
this->Workers.insert(job->jobUID(), worker);

job->setStatus(ctkAbstractJob::JobStatus::Queued);
emit q->jobQueued(job->toVariant());

QSharedPointer<ctkAbstractWorker> worker =
QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);

this->Workers.insert(job->jobUID(), worker);
this->ThreadPool->start(worker.data(), job->priority());
}
}
Expand Down Expand Up @@ -184,16 +186,40 @@ bool ctkJobSchedulerPrivate::insertJob(QSharedPointer<ctkAbstractJob> job)
{"progress", progressConnection},
};

emit q->jobInitialized(job->toVariant());

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
this->JobsQueue.insert(job->jobUID(), job);
this->JobsConnections.insert(job->jobUID(), connections);

int numberOfRunningJobsWithSameType = this->getSameTypeJobsInThreadPoolQueueOrRunning(job);
if (numberOfRunningJobsWithSameType >= job->maximumConcurrentJobsPerType())
{
return false;
}

logger.debug(QString("ctkDICOMScheduler: creating worker for job %1 in thread %2.\n")
.arg(job->jobUID())
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId())), 16));

QSharedPointer<ctkAbstractWorker> worker = QSharedPointer<ctkAbstractWorker>(job->createWorker());
worker->setScheduler(*q);
this->Workers.insert(job->jobUID(), worker);

job->setStatus(ctkAbstractJob::JobStatus::Queued);
emit q->jobQueued(job->toVariant());

this->ThreadPool->start(worker.data(), job->priority());
}

emit q->jobInitialized(job->toVariant());
this->queueJobsInThreadPool();
return true;
}

Expand All @@ -205,11 +231,11 @@ bool ctkJobSchedulerPrivate::cleanJob(const QString &jobUID)
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId()), 16)));

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
QSharedPointer<ctkAbstractJob> job = this->JobsQueue.value(jobUID);
if (!job || !this->JobsConnections.contains(jobUID))
if (!job)
{
return false;
}
Expand All @@ -228,9 +254,9 @@ void ctkJobSchedulerPrivate::cleanJobs(const QStringList &jobUIDs)

QList<QVariant> dataObjects;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&this->QueueLock);

foreach (QString jobUID, jobUIDs)
{
Expand All @@ -256,9 +282,9 @@ bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
.arg(QString::number(reinterpret_cast<quint64>(QThread::currentThreadId()), 16)));

{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);
QSharedPointer<ctkAbstractJob> job = this->JobsQueue.value(jobUID);
if (!job || !this->JobsConnections.contains(jobUID))
{
Expand All @@ -284,11 +310,10 @@ bool ctkJobSchedulerPrivate::removeJob(const QString& jobUID)
//------------------------------------------------------------------------------
void ctkJobSchedulerPrivate::removeJobs(const QStringList &jobUIDs)
{
QList<QVariant> dataObjects;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&this->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&this->QueueLock);

foreach (QString jobUID, jobUIDs)
{
Expand All @@ -298,8 +323,6 @@ void ctkJobSchedulerPrivate::removeJobs(const QStringList &jobUIDs)
continue;
}

dataObjects.append(job->toVariant());

QMap<QString, QMetaObject::Connection> connections = this->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("started"));
QObject::disconnect(connections.value("userStopped"));
Expand Down Expand Up @@ -377,7 +400,6 @@ ctkJobScheduler::ctkJobScheduler(ctkJobSchedulerPrivate* pimpl, QObject* parent)
// --------------------------------------------------------------------------
ctkJobScheduler::~ctkJobScheduler()
{
this->setFreezeJobsScheduling(true);
this->stopAllJobs(true);
// stopAllJobs is not main thread blocking. Therefore we need actually
// to wait the jobs to end (either finished or stopped) before closing the application.
Expand All @@ -401,9 +423,9 @@ int ctkJobScheduler::numberOfJobs()
Q_D(ctkJobScheduler);
int numberOfJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
numberOfJobs = d->JobsQueue.count();
}
return numberOfJobs;
Expand All @@ -415,9 +437,9 @@ int ctkJobScheduler::numberOfPersistentJobs()
Q_D(ctkJobScheduler);
int numberOfPersistentJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
{
if (job->isPersistent())
Expand All @@ -437,9 +459,9 @@ int ctkJobScheduler::numberOfRunningJobs()

int numberOfRunningJobs = 0;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
{
if (job->status() <= ctkAbstractJob::JobStatus::Running)
Expand Down Expand Up @@ -503,9 +525,9 @@ QSharedPointer<ctkAbstractJob> ctkJobScheduler::getJobSharedByUID(const QString&

QSharedPointer<ctkAbstractJob> job = nullptr;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);
QMap<QString, QSharedPointer<ctkAbstractJob>>::iterator it = d->JobsQueue.find(jobUID);
if (it == d->JobsQueue.end())
{
Expand Down Expand Up @@ -566,11 +588,12 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
{
Q_D(ctkJobScheduler);

d->FreezeJobsScheduling = true;
QStringList stoppedJobsUIDs;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QReadLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QReadLockers within the scheduler's methods.
QReadLocker locker(&d->QueueLock);

// Stops jobs without a worker (in waiting, still in main thread).
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
Expand All @@ -586,7 +609,7 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
}

QString jobUID = job->jobUID();
if (!d->JobsConnections.contains(jobUID))
if (jobUID.isEmpty() || !d->JobsConnections.contains(jobUID))
{
continue;
}
Expand All @@ -600,6 +623,7 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
d->BatchedJobsUserStopped.append(job->toVariant());
stoppedJobsUIDs.append(jobUID);
}
}
Expand All @@ -626,6 +650,13 @@ QStringList ctkJobScheduler::stopAllJobs(bool stopPersistentJobs, bool removeJob
worker->requestCancel();
}

d->FreezeJobsScheduling = false;

if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}

return stoppedJobsUIDs;
}

Expand All @@ -641,9 +672,9 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ

QStringList initializedStoppedJobsUIDs;
{
// The QMutexLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QMutexLockers within the scheduler's methods.
QMutexLocker locker(&d->QueueMutex);
// The QWriteLocker is enclosed within brackets to restrict its scope and
// prevent conflicts with other QWriteLockers within the scheduler's methods.
QWriteLocker locker(&d->QueueLock);

// Stops jobs without a worker (in waiting, still in main thread)
foreach (QSharedPointer<ctkAbstractJob> job, d->JobsQueue)
Expand Down Expand Up @@ -674,6 +705,7 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ
QMap<QString, QMetaObject::Connection> connections = d->JobsConnections.value(jobUID);
QObject::disconnect(connections.value("userStopped"));
job->setStatus(ctkAbstractJob::JobStatus::UserStopped);
d->BatchedJobsUserStopped.append(job->toVariant());
initializedStoppedJobsUIDs.append(job->jobUID());
}
}
Expand Down Expand Up @@ -702,6 +734,11 @@ void ctkJobScheduler::stopJobsByJobUIDs(const QStringList &jobUIDs, bool removeJ
worker->requestCancel();
}
}

if (!d->ThrottleTimer->isActive())
{
d->ThrottleTimer->start(d->ThrottleTimeInterval);
}
}

//----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion Libs/Core/ctkJobScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CTK_CORE_EXPORT ctkJobScheduler : public QObject
int numberOfJobs();
int numberOfPersistentJobs();
int numberOfRunningJobs();
Q_INVOKABLE void addJob(ctkAbstractJob* job);
Q_INVOKABLE virtual void addJob(ctkAbstractJob* job);
Q_INVOKABLE virtual void resetJob(const QString& jobUID);
Q_INVOKABLE virtual void deleteJob(const QString& jobUID);
Q_INVOKABLE virtual void deleteJobs(const QStringList& jobUIDs);
Expand Down
5 changes: 3 additions & 2 deletions Libs/Core/ctkJobScheduler_p.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#define __ctkJobSchedulerPrivate_h

// Qt includes
#include <QMutex>
#include <QReadWriteLock>
#include <QSharedPointer>
#include <QTimer>
class QThreadPool;
Expand Down Expand Up @@ -61,7 +61,7 @@ class CTK_CORE_EXPORT ctkJobSchedulerPrivate : public QObject
virtual void queueJobsInThreadPool();
virtual void clearBactchedJobsLists();

QMutex QueueMutex;
QReadWriteLock QueueLock;

int RetryDelay{100};
int MaximumNumberOfRetry{3};
Expand All @@ -71,6 +71,7 @@ class CTK_CORE_EXPORT ctkJobSchedulerPrivate : public QObject
QMap<QString, QSharedPointer<ctkAbstractJob>> JobsQueue;
QMap<QString, QMap<QString, QMetaObject::Connection>> JobsConnections;
QMap<QString, QSharedPointer<ctkAbstractWorker>> Workers;
QMap<QString, int> RunningJobsByJobClass;
QList<QVariant> BatchedJobsStarted;
QList<QVariant> BatchedJobsUserStopped;
QList<QVariant> BatchedJobsFinished;
Expand Down
Loading

0 comments on commit e4e1891

Please sign in to comment.