#!/usr/bin/env python # -*- coding: utf-8 -*- """ TaskReporter - Task Reporting Tool This script is designed to record and track user task execution. It sends task information (including username, tool name, task name, time saved, time cost) to a specified server for statistics. Usage: #---------------------------------- usage 1: decorator ---------------------------------- from TaskReporter import task_reporter_decorator debug_mode = True @task_reporter_decorator(tool_name='MetaBox', task_name='CheckUV', debug=debug_mode, time_saved=5) def your_function(): # your code #---------------------------------- usage 2: function ---------------------------------- from TaskReporter import task_reporter debug_mode = True def your_function(): start_time = time.time() # your function code task_reporter(tool_name='MetaBox', task_name='CheckUV', debug=debug_mode, time_saved=5, time_cost=round(time.time()-start_time, 3)) """ import os, sys from functools import wraps import time, datetime, json # add lib path to import requests py_version = sys.version_info.major minor_version = sys.version_info.minor version = "py{}{}".format(py_version, minor_version) if py_version in [2,3] and minor_version<=7 else "py38+" lib_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "reporter_lib", version) if lib_path not in sys.path: sys.path.append(lib_path) import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry class TaskReporter: def __init__(self): self.session = requests.Session() self.session.mount('http://', HTTPAdapter(max_retries=3)) self.session.mount('https://', HTTPAdapter(max_retries=3)) self.server_status = {'available': True, 'last_check': 0} self.SERVER_URL = 'https://toolstrack.cgnico.com' self.cache_file = os.path.join(os.environ.get('APPDATA'), 'task_cache.json') def report_task(self, data, is_cached_task=False): # first check and send cached data if not is_cached_task: self._try_send_cached_tasks() task_has_cached = False current_time = time.time() if not self.server_status['available'] and not is_cached_task: if current_time - self.server_status['last_check'] < 10: self._cache_task(data) return False try: response = self.session.post( '{}/api.php'.format(self.SERVER_URL), json=data, timeout=1 ) self.server_status['available'] = True print(response.json()) return response.json() except (requests.exceptions.RequestException, TimeoutError) as e: self.server_status['available'] = False self.server_status['last_check'] = current_time if not task_has_cached and not is_cached_task: self._cache_task(data) task_has_cached = True print("Failed to connect to server, task has been cached\n{}".format(str(e))) # print("Failed to connect to server: {}".format(str(e))) return False def _cache_task(self, data): try: data['timestamp'] = (datetime.datetime.utcnow() + datetime.timedelta(hours=8)).isoformat() cache = self._read_cache() cache.append(data) with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(cache, f, ensure_ascii=False, indent=2) # print("Task cached: {}".format(self.cache_file)) except Exception as e: print("Cache task failed:{}".format(str(e))) def _read_cache(self): try: if os.path.exists(self.cache_file): with open(self.cache_file, 'r', encoding='utf-8') as f: return json.load(f) return [] except Exception as e: print("Read cache failed: {}".format(str(e))) return [] def _try_send_cached_tasks(self): if not os.path.exists(self.cache_file): return cache = self._read_cache() if not cache: return # print("Find {} cached tasks, try to send again...".format(len(cache))) if not self.server_status['available']: return successful_indices = [] for i, task in enumerate(cache): try: response = self.report_task(task, is_cached_task=True) if response != False: successful_indices.append(i) print("Cached task sent successfully: {}".format(task.get('task_name'))) else: # print("Cached task sent failed") break except Exception as e: print("Send cached task failed: {}".format(str(e))) break if successful_indices: new_cache = [task for i, task in enumerate(cache) if i not in successful_indices] if new_cache: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(new_cache, f, ensure_ascii=False, indent=2) print("{} tasks left to send".format(len(new_cache))) else: os.remove(self.cache_file) print("All cached tasks sent") reporter = TaskReporter() def task_reporter_decorator(tool_name, task_name, debug=False, time_saved=0): """ task reporter decorator :param tool_name: name of the tool, eg "MetaBox" :param task_name: name of the task, eg "CheckUV" :param debug: set to True when develop, and set it to False when release the tool to artists :param time_saved: time saved for this task, usually should communicate with artists to get a reasonable value """ # reporter = TaskReporter() def run_and_report(func): @wraps(func) def execute_task(*args, **kwargs): # record start time start_time = time.time() # execute original function try: result = func(*args, **kwargs) return result except Exception as e: raise e finally: # report task - 无论函数执行成功与否都发送报告 success = result != False data= { 'username': os.environ.get('USERNAME'), 'tool_name': tool_name + "(Debug)" if debug else tool_name, 'task_name': task_name, 'time_saved': str(time_saved if success else 0), # 如果失败,节省时间为0 'time_cost': str(round(time.time() - start_time, 3)), # 确保是字符串格式 'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # 使用文档中指定的时间格式 'status': "success" if success else "failed" # 添加状态字段 } reporter.report_task(data) return execute_task return run_and_report def task_reporter(tool_name, task_name, debug=False, time_saved=0, time_cost=0): ''' task reporter function :param tool_name: name of the tool, eg "MetaBox" :param task_name: name of the task, eg "CheckUV" :param debug: set to True when develop, and set it to False when release the tool to artists :param time_saved: time saved for this task, usually should communicate with artists to get a reasonable value :param time_cost: time cost for this task, you can get it from time_cost = round(time.time() - start_time, 3) ''' # reporter = TaskReporter() data = { 'username': os.environ.get('USERNAME'), 'tool_name': tool_name + "(Debug)" if debug else tool_name, 'task_name': task_name, 'time_saved': str(time_saved), # 确保是字符串格式 'time_cost': str(round(time_cost, 3)), # 确保是字符串格式 'timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 使用文档中指定的时间格式 } reporter.report_task(data) if __name__ == '__main__': #---------------------------------- usage 1: decorator ---------------------------------- debug_mode = True @task_reporter_decorator(tool_name='MetaBox', task_name='CheckUV', debug=debug_mode, time_saved=5) def check_uv_dec(): print("Check UV, and report by decorator") #---------------------------------- usage 2: function ---------------------------------- def check_uv_func(): debug_mode = True start_time = time.time() # function code print("Check UV, and report by function") task_reporter(tool_name='MetaBox', task_name='CheckUV', debug=debug_mode, time_saved=5, time_cost=round(time.time()-start_time, 3)) check_uv_dec() check_uv_func()