datafifo.cpp 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include "datafifo.h"
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <unistd.h>
  5. #include <QJsonObject>
  6. #include <QJsonArray>
  7. #include <QJsonDocument>
  8. #include <QDateTime>
  9. #include <QDebug>
  10. #include <fcntl.h>
  11. #include "logger.h"
  12. #include "defs.h"
  13. #include "appcontext.h"
  14. DataFifo::DataFifo(QObject *parent)
  15. : QObject(parent)
  16. , m_isRunning(false)
  17. , m_recvTimestamp(QDateTime::currentMSecsSinceEpoch()) // 初始化接收数据时间戳
  18. {
  19. connect(&m_thread, &QThread::started, this, &DataFifo::doWork);
  20. moveToThread(&m_thread);
  21. start();
  22. }
  23. void DataFifo::start()
  24. {
  25. m_isRunning = true;
  26. m_thread.start();
  27. }
  28. void DataFifo::stop()
  29. {
  30. m_isRunning = false;
  31. // 打开写端写一个换行,唤醒阻塞的 read
  32. int fd = open(CALCULATE_FIFO_PATH, O_WRONLY | O_NONBLOCK);
  33. if (fd != -1) {
  34. write(fd, "\n", 1);
  35. close(fd);
  36. }
  37. m_thread.exit();
  38. m_thread.wait();
  39. }
  40. const qint64 DataFifo::recvTimestamp()
  41. {
  42. return m_recvTimestamp;
  43. }
  44. void DataFifo::doWork()
  45. {
  46. DataDict &dataDict = AppContext::instance().dataDict();
  47. while(m_isRunning) {
  48. // 1. 创建 FIFO(如果已存在忽略错误)
  49. if (mkfifo(NETPARSE_FIFO_PATH, 0666) == -1) {
  50. if (errno != EEXIST) {
  51. LOG_ERROR("mkfifo error");
  52. return;
  53. }
  54. }
  55. if (mkfifo(CALCULATE_FIFO_PATH, 0666) == -1) {
  56. if (errno != EEXIST) {
  57. LOG_ERROR("mkfifo error");
  58. return;
  59. }
  60. }
  61. // 2. 打开 FIFO 读端
  62. int fd = open(CALCULATE_FIFO_PATH, O_RDONLY);
  63. if (fd == -1) {
  64. LOG_ERROR("open fifo error");
  65. return;
  66. }
  67. FILE *stream = fdopen(fd, "r");
  68. ssize_t n;
  69. char *line = NULL;
  70. size_t len = 0;
  71. // 3. 循环读取
  72. while (m_isRunning) {
  73. n = getline(&line, &len, stream);
  74. if(n > 0) {
  75. m_recvTimestamp = QDateTime::currentMSecsSinceEpoch(); // 更新接收数据时间戳
  76. // LOG_INFO("read size: {}, buffer size: {}, data: {}", n, len, line);
  77. QByteArray byteArray(line, n);
  78. QJsonParseError error;
  79. QJsonDocument doc = QJsonDocument::fromJson(byteArray, &error);
  80. if(error.error == QJsonParseError::NoError && doc.isObject()) {
  81. QJsonObject obj = doc.object();
  82. // dataDict.updateData(obj);
  83. emit updateScreen(obj);
  84. } else {
  85. LOG_ERROR("data fifo: invalid json");
  86. }
  87. } else if (n == 0) {
  88. // FIFO 写端关闭时 read 返回 0
  89. LOG_INFO("writer closed. exiting.");
  90. break;
  91. } else {
  92. LOG_ERROR("read error");
  93. break;
  94. }
  95. }
  96. free(line);
  97. fclose(stream); // 会同时关闭文件描述符
  98. // close(fd);
  99. }
  100. }