app.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. from gevent import pywsgi
  2. import subprocess
  3. import threading
  4. import time
  5. from flask import Flask, render_template, Response, jsonify, request, send_file
  6. from queue import Queue, Empty
  7. import os, sys
  8. import json
  9. import uuid
  10. import io
  11. import zipfile
  12. from datetime import datetime
  13. app = Flask(__name__)
  14. # 获取当前脚本所在目录(即打包后的exe所在目录)
  15. if getattr(sys, 'frozen', False):
  16. # 如果是PyInstaller打包后的exe
  17. BASE_DIR = sys._MEIPASS
  18. else:
  19. # 普通Python脚本
  20. BASE_DIR = os.path.dirname(os.path.abspath(__file__))
  21. # 配置
  22. UPLOAD_FOLDER = os.path.join(BASE_DIR, 'uploads')
  23. SCRIPT_FOLDER = os.path.join(BASE_DIR, 'scripts')
  24. # UPLOAD_FOLDER = 'uploads'
  25. # SCRIPT_FOLDER = 'scripts'
  26. ALLOWED_EXTENSIONS = {'tar.gz', 'tgz'}
  27. LOG_FOLDERS = {
  28. 'network': '/home/forlinx/Desktop/workspace/TPMFM-A/logs',
  29. 'compute': '/home/forlinx/Desktop/workspace/dataParsing/logs',
  30. 'monitor': '/home/forlinx/monitor/storage/logs'
  31. }
  32. # 创建必要的目录
  33. os.makedirs(UPLOAD_FOLDER, exist_ok=True)
  34. os.makedirs(SCRIPT_FOLDER, exist_ok=True)
  35. # 存储执行状态和输出的全局变量
  36. execution_data = {
  37. 'is_running': False,
  38. 'output': [],
  39. 'process': None,
  40. 'current_file': None
  41. }
  42. def allowed_file(filename):
  43. """检查文件扩展名是否允许"""
  44. return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS or \
  45. filename.endswith('.tar.gz') or filename.endswith('.tgz')
  46. def run_fixed_shell_script(software_type, uploaded_file_path, output_queue):
  47. """运行固定的shell脚本处理上传的文件"""
  48. try:
  49. # 固定的shell脚本路径
  50. fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
  51. # 检查固定脚本是否存在,如果不存在则创建
  52. if not os.path.exists(fixed_script_path):
  53. create_default_script(fixed_script_path)
  54. # 确保脚本有执行权限
  55. os.chmod(fixed_script_path, 0o755)
  56. # 使用Popen执行固定脚本,传入上传的文件作为参数
  57. process = subprocess.Popen(
  58. ['bash', fixed_script_path, software_type, uploaded_file_path],
  59. stdout=subprocess.PIPE,
  60. stderr=subprocess.STDOUT,
  61. text=True,
  62. bufsize=1,
  63. universal_newlines=True
  64. )
  65. execution_data['process'] = process
  66. # 实时读取输出
  67. for line in iter(process.stdout.readline, ''):
  68. if line:
  69. # 清理行尾的换行符,添加HTML换行
  70. clean_line = line.rstrip('\n')
  71. output_queue.put(f"{clean_line}<br>")
  72. process.wait()
  73. return_code = process.returncode
  74. if return_code == 0:
  75. output_queue.put(f"<span style='color: green'>文件处理完成!退出码: {return_code}</span><br>")
  76. else:
  77. output_queue.put(f"<span style='color: red'>文件处理失败!退出码: {return_code}</span><br>")
  78. except Exception as e:
  79. output_queue.put(f"<span style='color: red'>执行错误: {str(e)}</span><br>")
  80. finally:
  81. execution_data['is_running'] = False
  82. execution_data['process'] = None
  83. @app.route('/')
  84. def index():
  85. """主页面"""
  86. return render_template('index.html')
  87. @app.route('/upload', methods=['POST'])
  88. def upload_file():
  89. """上传文件并自动执行处理"""
  90. if execution_data['is_running']:
  91. return jsonify({
  92. 'success': False,
  93. 'message': '当前有任务正在执行,请等待完成'
  94. })
  95. # 检查是否有文件
  96. if 'file' not in request.files:
  97. return jsonify({'success': False, 'message': '没有选择文件'})
  98. file = request.files['file']
  99. # 获取 software_type
  100. software_type = request.form.get('software_type')
  101. if file.filename == '':
  102. return jsonify({'success': False, 'message': '没有选择文件'})
  103. if file and allowed_file(file.filename):
  104. # 生成唯一文件名
  105. # filename = f"{uuid.uuid4().hex}_{file.filename}"
  106. filename = file.filename
  107. filepath = os.path.join(UPLOAD_FOLDER, filename)
  108. # 保存文件
  109. file.save(filepath)
  110. # 重置输出
  111. execution_data['output'] = []
  112. execution_data['is_running'] = True
  113. execution_data['current_file'] = filename
  114. # 创建输出队列
  115. output_queue = Queue()
  116. # 在新线程中执行固定的shell脚本
  117. thread = threading.Thread(
  118. target=run_fixed_shell_script,
  119. args=(software_type, filepath, output_queue),
  120. daemon=True
  121. )
  122. thread.start()
  123. # 启动一个线程来定期从队列中获取输出
  124. def collect_output():
  125. while execution_data['is_running'] or not output_queue.empty():
  126. try:
  127. # 从队列获取输出,最多等待1秒
  128. line = output_queue.get(timeout=1)
  129. execution_data['output'].append(line)
  130. except Empty:
  131. continue
  132. except Exception as e:
  133. print(f"收集输出时出错: {e}")
  134. break
  135. collection_thread = threading.Thread(target=collect_output, daemon=True)
  136. collection_thread.start()
  137. return jsonify({
  138. 'success': True,
  139. 'message': '文件上传成功,开始处理',
  140. 'filename': filename
  141. })
  142. return jsonify({'success': False, 'message': '不支持的文件类型'})
  143. @app.route('/stream')
  144. def stream():
  145. """SSE流式输出"""
  146. def generate():
  147. last_index = 0
  148. while execution_data['is_running']:
  149. # 检查是否有新输出
  150. current_output = execution_data['output']
  151. if len(current_output) > last_index:
  152. # 发送新内容
  153. for i in range(last_index, len(current_output)):
  154. yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
  155. last_index = len(current_output)
  156. time.sleep(0.1) # 短暂休眠减少CPU使用
  157. # 发送最后可能剩余的输出
  158. current_output = execution_data['output']
  159. if len(current_output) > last_index:
  160. for i in range(last_index, len(current_output)):
  161. yield f"data: {json.dumps({'output': current_output[i]})}\n\n"
  162. # 发送结束标志
  163. yield f"data: {json.dumps({'output': '<br><strong>处理完成</strong>'})}\n\n"
  164. return Response(
  165. generate(),
  166. mimetype='text/event-stream',
  167. headers={
  168. 'Cache-Control': 'no-cache',
  169. 'X-Accel-Buffering': 'no'
  170. }
  171. )
  172. @app.route('/status')
  173. def get_status():
  174. """获取执行状态"""
  175. return jsonify({
  176. 'is_running': execution_data['is_running'],
  177. 'current_file': execution_data['current_file'],
  178. 'output_length': len(execution_data['output'])
  179. })
  180. @app.route('/stop')
  181. def stop_execution():
  182. """停止当前执行"""
  183. if execution_data['is_running'] and execution_data['process']:
  184. try:
  185. execution_data['process'].terminate()
  186. execution_data['is_running'] = False
  187. return jsonify({
  188. 'success': True,
  189. 'message': '已停止处理'
  190. })
  191. except Exception as e:
  192. return jsonify({
  193. 'success': False,
  194. 'message': f'停止失败: {str(e)}'
  195. })
  196. return jsonify({
  197. 'success': False,
  198. 'message': '没有正在执行的任务'
  199. })
  200. @app.route('/clear_output', methods=['POST'])
  201. def clear_output():
  202. """清空输出"""
  203. execution_data['output'] = []
  204. return jsonify({'success': True, 'message': '输出已清空'})
  205. @app.route('/download_logs', methods=['GET'])
  206. def download_logs():
  207. """下载指定软件的日志文件"""
  208. software_type = request.args.get('software', 'network')
  209. if software_type not in LOG_FOLDERS:
  210. return jsonify({
  211. 'success': False,
  212. 'message': '不支持的软件类型'
  213. }), 400
  214. log_folder = LOG_FOLDERS[software_type]
  215. # 检查日志文件夹是否存在
  216. if not os.path.exists(log_folder):
  217. return jsonify({
  218. 'success': False,
  219. 'message': f'日志文件夹不存在: {log_folder}'
  220. }), 404
  221. # 获取所有.log文件
  222. log_files = []
  223. for root, dirs, files in os.walk(log_folder):
  224. for file in files:
  225. if file.endswith('.log'):
  226. log_files.append(os.path.join(root, file))
  227. if not log_files:
  228. return jsonify({
  229. 'success': False,
  230. 'message': '未找到日志文件'
  231. }), 404
  232. # 创建临时ZIP文件
  233. temp_zip = io.BytesIO()
  234. try:
  235. with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
  236. for log_file in log_files:
  237. # 获取相对路径
  238. rel_path = os.path.relpath(log_file, log_folder)
  239. zipf.write(log_file, arcname=rel_path)
  240. temp_zip.seek(0)
  241. # 设置下载文件名
  242. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  243. filename = f'{software_type}_logs_{timestamp}.zip'
  244. # 记录下载日志
  245. # download_log_path = os.path.join(log_folder, f'{software_type}_download.log')
  246. # with open(download_log_path, 'a') as f:
  247. # f.write(f'[{datetime.now()}] Logs downloaded: {filename}\n')
  248. # 发送ZIP文件
  249. return send_file(
  250. temp_zip,
  251. mimetype='application/zip',
  252. as_attachment=True,
  253. download_name=filename
  254. )
  255. except Exception as e:
  256. return jsonify({
  257. 'success': False,
  258. 'message': f'创建ZIP文件失败: {str(e)}'
  259. }), 500
  260. if __name__ == '__main__':
  261. # 确保固定脚本存在
  262. fixed_script_path = os.path.join(SCRIPT_FOLDER, 'run.sh')
  263. print(f"固定脚本位置: {fixed_script_path}")
  264. print(f"上传目录: {UPLOAD_FOLDER}")
  265. # app.run(
  266. # debug=True,
  267. # host='0.0.0.0',
  268. # port=5000,
  269. # threaded=True
  270. # )
  271. server = pywsgi.WSGIServer(('0.0.0.0', 5000), app)
  272. try:
  273. server.serve_forever()
  274. except KeyboardInterrupt:
  275. print("OTA server stopped by user")
  276. server.stop()