Проблемы параллелизма с QThreads. Потоки, получающие один и тот же сигнал, блокируют друг друга


Итак, я работаю над программой, которая обрабатывает видео в режиме реального времени, и у меня возникли некоторые проблемы с потоками, "блокирующими" друг друга.

Моя система устроена примерно так:

         DataSourceThread
             /      
            /        
           /          
     Receiver       Receiver
         /              
        /                 
       /                  
 Processor1            Processor2

(Все эти классы расширяют QThread.)

Таким образом, DataSourceThread извлекает кадры из видеопотока и посылает сигнал, содержащий кадр, в приемники. тип соединения: Qt:: DirectConnection

Приемники в основном получают кадры, отправленные DataSourceThread и если процессор закончит обработку предыдущего кадра, он будет выдавать сигнал, содержащий кадр в процессор. тип соединения: Qt:: QueuedConnection. Если процессор не закончил обработку предыдущего кадра, он просто вернется, не испуская сигнала (пропуская кадры).

Чтобы проверить, работает ли это, все, что я сделал, это заставил Processor1 просто распечатать сообщение, когда он получает кадр, а Processor2 делает QThread::sleep(3); и распечатать сообщение. сообщение.

(приемники также сделают глубокую копию кадра, прежде чем передать его процессорам.)

Ожидаемый результат:

И processor1 необходимо постоянно печатать сообщения. Processor2 должен печатать сообщение каждые 3 секунды.

Задача:

Оба процессора печатают свои сообщения одновременно (каждые 3 секунды). И processor1 ждет, пока Процессор2 делается, прежде чем печатать свое сообщение. Так что выход в значительной степени похож это:

"Message from processor1"
"Message from processor2"
"Message from processor1"
"Message from processor2"
"Message from processor1"
"Message from processor2"

И так далее.

Здесь у меня кончаются идеи., так что любая помощь будет очень признательна!

Редактировать: Вот некоторые из кодов:

Главное.cpp:

int main(int argc, char *argv[])
{
    QApplication app(argc, argv);

    DataSourceThread dataSourceThread;
    dataSourceThread.start();

    GUIThread *guiProcessor = new GUIThread();
    FrameReceiver *guiReceiver = new FrameReceiver(guiProcessor, 0);

    QObject::connect(
        &dataSourceThread, SIGNAL(frameReceived(Frame*)),
        guiReceiver, SLOT(receive(Frame*)),
        Qt::DirectConnection
    );

    DetectorThread *detectorProcessor = new DetectorThread();
    FrameReceiver *detectorReceiver = new FrameReceiver(detectorProcessor, 0);

    QObject::connect(
        &dataSourceThread, SIGNAL(frameReceived(Frame*)),
        detectorReceiver, SLOT(receive(Frame*)),
        Qt::DirectConnection
    );

    return app.exec();
}  

Из DataSourceThread.cpp:

void DataSourceThread::run()
{
    ... stuff ...

    while (true) {
        image = cvQueryFrame(capture);

        if (!image) { 
            qDebug() << QString("Could not capture frame"); 
            continue;
        }

        cvReleaseImage(&temp_image);
        temp_image = cvCreateImage(cvSize(640, 480), image->depth, 3);

        cvResize(image, temp_image, 1);

        frame->lock();
        frame->setImage(temp_image);
        frame->unlock();

        emit frameReceived(frame);

        msleep(1); 
    }
} 

Фреймерсивер.cpp:

FrameReceiver::FrameReceiver(FrameProcessor* processor, QObject *parent) : QThread(parent) {
    m_ready = true;

    m_processor = processor;
    m_processor->start();

    QObject::connect(
        (QObject*)this, SIGNAL(frameReceived(Frame*)),
        m_processor, SLOT(receive(Frame*)), 
        Qt::QueuedConnection
    );

    QObject::connect(
        m_processor, SIGNAL(ready()),
        (QObject*)this, SLOT(processCompleted()),
        Qt::DirectConnection
    ); }

void FrameReceiver::processCompleted() {
    m_ready = true; }

void FrameReceiver::receive(Frame *frame) {
    if (m_ready == true) {
        m_ready = false;
        frame->lock();
        Frame *f = new Frame(*frame);
        frame->unlock();
        emit frameReceived(f);
    } else {
        // SKIPPED THIS FRAME
    }
}

Гуитрид.cpp: (Processor1)

GUIThread::GUIThread(QObject *parent) : FrameProcessor(parent)
{
    m_frame = new Frame();
}

void GUIThread::setFrame(Frame *frame)
{ 
    qDebug() << QString("Guithread received frame");
}    

Фреймпроцессор.cpp

// (The processors extend this class)
void FrameProcessor::receive(Frame *frame)
 {
     setFrame(frame);
     delete frame;
     emit ready();
 }

DetectorThread (Processor2) делает то же самое, что guithread, но с 3 сек сна в setFrame.

1 4

1 ответ:

Я думаю, что часть проблемы заключается в том, что все ваши QObjects принадлежат главному потоку приложения. Это означает, что все они используют один цикл событий для доставки асинхронных сигналов, эффективно сериализуя всю цепочку обработки.

Я думаю, что правильный способ настроить это было бы что-то вроде:

GUIProcessor *guiProcessor = new GUIProcessor();
QThread guiProcessorThread;
guiProcessor.moveToThread(&guiProcessorThread);

FrameReceiver *guiReceiver = new FrameReceiver(guiProcessor, 0);
QThread guiReceiverThread;
guiReceiver.moveToThread(&guiReceiverThread);

guiProcessorThread.start();
guiReceiverThread.start();

Если вы делаете это таким образом, я бы предложил не использовать DirectConnection между потоками, а BlockingQueuedConnection, Если вы хотите убедиться, что текущий кадр обрабатывается до захвата следующий.

Смотрите это: http://labs.qt.nokia.com/2010/06/17/youre-doing-it-wrong/

И это: http://labs.qt.nokia.com/2006/12/04/threading-without-the-headache/

Надеюсь, это поможет!

EDIT: чтобы быть ясным, с моим предложением ваши классы будут наследовать QObject вместо QThread.