Colleague tells me I write blogs in chats while I explain how to write a producer-consumer

I’m at home now. I don’t do non-public unpaid work. So let’s blog the example I’m making for him.

workplace.h

#ifndef Workplace_H
#define Workplace_H

#include <QObject>
#include <QFuture>
#include <QWaitCondition>
#include <QMutex>
#include <QStack>
#include <QList>
#include <QThread>
#include <QFutureWatcher>

class Workplace;

typedef enum {
    WT_INSERTS,
    WT_QUERY
} WorkplaceWorkType;

typedef struct {
    WorkplaceWorkType type;
    QList<int> values;
    QString query;
    QFutureInterface<bool> insertIface;
    QFutureInterface<QList<QStringList> > queryIface;
} WorkplaceWork;

class WorkplaceWorker: public QThread {
    Q_OBJECT
public:
    WorkplaceWorker(QObject *parent = NULL)
        : QThread(parent), m_running(false) { }
    void run() Q_DECL_OVERRIDE;
    void pushWork(WorkplaceWork *a_work);
private:
    QStack<WorkplaceWork*> m_ongoing;
    QMutex m_mutex;
    QWaitCondition m_waitCondition;
    bool m_running;
};

class Workplace: public QObject {
    Q_OBJECT
public:
    explicit Workplace(QObject *a_parent=0) : QObject (a_parent) {}
    bool insert(QList<int> a_values);
    QList<QStringList> query(const QString &a_param);
    QFuture<bool> insertAsync(QList<int> a_values);
    QFuture<QList<QStringList> > queryAsync(const QString &a_param);
private:
    WorkplaceWorker m_worker;
};

class App: public QObject {
    Q_OBJECT
public slots:
    void perform();
    void onFinished();
private:
    Workplace m_workplace;
};

#endif// Workplace_H

workplace.cpp

#include "workplace.h"

void App::onFinished()
{
    QFutureWatcher<bool> *watcher = static_cast<QFutureWatcher<bool>* > ( sender() );
    delete watcher;
}

void App::perform()
{
    for (int i=0; i<10; i++) {
       QList<int> vals;
       vals.append(1);
       vals.append(2);
       QFutureWatcher<bool> *watcher = new QFutureWatcher<bool>;
       connect (watcher, &QFutureWatcher<bool>::finished, this, &App::onFinished);
       watcher->setFuture( m_workplace.insertAsync( vals ) );
    }

    for (int i=0; i<10; i++) {
       QList<int> vals;
       vals.append(1);
       vals.append(2);
       qWarning() << m_workplace.insert( vals );
       qWarning() << m_workplace.query("test");
    }
}

void WorkplaceWorker::pushWork(WorkplaceWork *a_work)
{
    if (!m_running) {
        start();
    }

    m_mutex.lock();
    switch (a_work->type) {
    case WT_QUERY:
        m_ongoing.push_front( a_work );
    break;
    default:
        m_ongoing.push_back( a_work );
    }
    m_waitCondition.wakeAll();
    m_mutex.unlock();
}

void WorkplaceWorker::run()
{
    m_mutex.lock();
    m_running = true;
    while ( m_running ) {
        m_mutex.unlock();
        m_mutex.lock();
        if ( m_ongoing.isEmpty() ) {
            m_waitCondition.wait(&m_mutex);
        }
        WorkplaceWork *work = m_ongoing.pop();
        m_mutex.unlock();

        // Do work here and report progress
        sleep(1);

        switch (work->type) {
        case WT_QUERY: {
            // Report result here
            QList<QStringList> result;
            QStringList row;
            row.append("abc"); row.append("def");
            result.append(row);
            work->queryIface.reportFinished( &result );
            } break;

        case WT_INSERTS:
        default: {
            // Report result here
            bool result = true;
            work->insertIface.reportFinished( &result );
            } break;
        }

        m_mutex.lock();
        delete work;
    }
    m_mutex.unlock();
}

bool Workplace::insert(QList<int> a_values)
{
    WorkplaceWork *work = new WorkplaceWork;;
    QFutureWatcher<bool> watcher;
    work->type = WT_INSERTS;
    work->values = a_values;
    work->insertIface.reportStarted();
    watcher.setFuture ( work->insertIface.future() );
    m_worker.pushWork( work );
    watcher.waitForFinished();
    return watcher.result();
}

QList<QStringList> Workplace::query(const QString &a_param)
{
    WorkplaceWork *work = new WorkplaceWork;
    QFutureWatcher<QList<QStringList> > watcher;
    work->type = WT_QUERY;
    work->query = a_param;
    work->queryIface.reportStarted();
    watcher.setFuture ( work->queryIface.future() );
    m_worker.pushWork( work );
    watcher.waitForFinished();
    return watcher.result();
}

QFuture<bool> Workplace::insertAsync(QList<int> a_values)
{
    WorkplaceWork *work = new WorkplaceWork;
    work->type = WT_INSERTS;
    work->values = a_values;
    work->insertIface.reportStarted();
    QFuture<bool> future = work->insertIface.future();
    m_worker.pushWork( work );
    return future;
}

QFuture<QList<QStringList> > Workplace::queryAsync(const QString &a_param)
{
    WorkplaceWork *work = new WorkplaceWork;
    work->type = WT_QUERY;
    work->query = a_param;
    work->queryIface.reportStarted();
    QFuture<QList<QStringList> > future = work->queryIface.future();
    m_worker.pushWork( work );
    return future;
}