#include "datafifo.h" #include #include #include #include #include #include #include #include #include #include "logger.h" #include "defs.h" #include "appcontext.h" DataFifo::DataFifo(QObject *parent) : QObject(parent) , m_isRunning(false) , m_recvTimestamp(QDateTime::currentMSecsSinceEpoch()) // 初始化接收数据时间戳 { connect(&m_thread, &QThread::started, this, &DataFifo::doWork); moveToThread(&m_thread); start(); } void DataFifo::start() { m_isRunning = true; m_thread.start(); } void DataFifo::stop() { m_isRunning = false; // 打开写端写一个换行,唤醒阻塞的 read int fd = open(CALCULATE_FIFO_PATH, O_WRONLY | O_NONBLOCK); if (fd != -1) { write(fd, "\n", 1); close(fd); } m_thread.exit(); m_thread.wait(); } const qint64 DataFifo::recvTimestamp() { return m_recvTimestamp; } void DataFifo::doWork() { DataDict &dataDict = AppContext::instance().dataDict(); while(m_isRunning) { // 1. 创建 FIFO(如果已存在忽略错误) if (mkfifo(NETPARSE_FIFO_PATH, 0666) == -1) { if (errno != EEXIST) { LOG_ERROR("mkfifo error"); return; } } if (mkfifo(CALCULATE_FIFO_PATH, 0666) == -1) { if (errno != EEXIST) { LOG_ERROR("mkfifo error"); return; } } // 2. 打开 FIFO 读端 int fd = open(CALCULATE_FIFO_PATH, O_RDONLY); if (fd == -1) { LOG_ERROR("open fifo error"); return; } FILE *stream = fdopen(fd, "r"); ssize_t n; char *line = NULL; size_t len = 0; // 3. 循环读取 while (m_isRunning) { n = getline(&line, &len, stream); if(n > 0) { m_recvTimestamp = QDateTime::currentMSecsSinceEpoch(); // 更新接收数据时间戳 // LOG_INFO("read size: {}, buffer size: {}, data: {}", n, len, line); QByteArray byteArray(line, n); QJsonParseError error; QJsonDocument doc = QJsonDocument::fromJson(byteArray, &error); if(error.error == QJsonParseError::NoError && doc.isObject()) { QJsonObject obj = doc.object(); // dataDict.updateData(obj); emit updateScreen(obj); } else { LOG_ERROR("data fifo: invalid json"); } } else if (n == 0) { // FIFO 写端关闭时 read 返回 0 LOG_INFO("writer closed. exiting."); break; } else { LOG_ERROR("read error"); break; } } free(line); fclose(stream); // 会同时关闭文件描述符 // close(fd); } }