Source code for autoease.utils.log

#!/usr/bin/python
# -*- coding: UTF-8 -*-

"""
@IDE: PyCharm
@Project: automation
@Author: Alice Wu
@Email: wutong01@corp.netease.com
@File: Log.py
@Time: 2021/6/2 16:33
@Description:
"""

import os
import re
import time
import logging
from datetime import datetime
import threading


[docs]class StoppableThread(threading.Thread): def __init__(self): super().__init__() self._running = True self.StopEvent = threading.Event()
[docs] def stop(self): self._running = False self.on_stop() self.StopEvent.set()
[docs] def on_stop(self): pass
[docs]class LogTracker(StoppableThread): LISTENER = {} def __init__(self, id_name, log_path, wait_for_path=True, wait_for_timeout=1800): super().__init__() self.id_name = id_name self.path = log_path self.wait_for_path = wait_for_path self.wait_for_timeout = wait_for_timeout self.find_flag = False self._log_pos = 0 # The position that has read self.listener_list = [] # [[trace_1, callback_1], [trace_2, callback_2]] # print(' --- Init log tracer %s ---' % self.id_name) # Reset Seeking Position self.seek(0)
[docs] def run(self): # print(' --- Run log tracer %s ---' % self.id_name) if self.path is None: logging.error('Path should not be none') return if self.wait_for_path: # Wait for path to be created # logging.error(' --- %s Waiting for log file ---' % self.id_name) start_wait = datetime.now() while self._running and not os.path.exists(self.path) \ and (datetime.now() - start_wait).total_seconds() < self.wait_for_timeout: time.sleep(.1) if not os.path.exists(self.path): logging.error('%s cannot find path %s' % (self.id_name, self.path)) return # print('Log tracker %s start' % self.id_name) rfb = open(self.path, 'rb') while self._running: if self._log_pos != 0: rfb.seek(self._log_pos) else: # seek(Offset, from_what) # offset: Number of positions to move forward # from_what: 0(default): begining of the file, 1: current position, 2: end of file rfb.seek(self._log_pos, 2) blines = rfb.readlines() for bline in blines: try: line = bline.decode('gb18030', 'ignore') except Exception as e: print(e) continue self.custom_line(line) self.notify(line) self._log_pos = rfb.tell()
[docs] def seek(self, pos): self._log_pos = pos
[docs] def custom_line(self, line): pass
[docs] def notify(self, line): for listener in self.listener_list: case_line = line if listener.ignore_case: case_line = line.lower() if listener.regex is not None and re.match(listener.regex, case_line): listener.on_trigger(case_line) elif listener.keywords is not None \ and isinstance(listener.keywords, list) \ and all(key in case_line for key in listener.keywords): listener.on_trigger(case_line) elif listener.keywords is not None and isinstance(listener.keywords, str) \ and listener.keywords in case_line: listener.on_trigger(case_line)
[docs] def register_listener(self, callback, **kwargs): # print('Register listener callback=%s kwargs=%s' % (callback, kwargs)) listener = LogListener(self, callback, **kwargs) self.listener_list.append(listener) return listener
[docs] def uninstall(self, listener): self.listener_list.remove(listener) del listener
[docs]class LogListener(object): def __init__(self, tracker, callback, **kwargs): self.tracker = tracker self.callback = callback self.regex = kwargs.get('regex', None) self.call_after = kwargs.get('call_after', 0) self.call_times = kwargs.get('call_times', None) self.ignore_case = kwargs.get('ignore_case', False) self.keywords = kwargs.get('keywords', None) if self.keywords and self.ignore_case: self.keywords = [key.lower() for key in self.keywords] self.lines = []
[docs] def on_trigger(self, line): try: self.lines.append(line) except Exception as e: logging.error(e) if self.call_after > 0: self.call_after -= 1 return if self.call_times is None: self.callback(line) else: if self.call_times > 0: self.callback(line) self.call_times -= 1 if self.call_times <= 0: self.uninstall()
[docs] def uninstall(self): if self.tracker: self.tracker.uninstall(self)
[docs]def wait_for_keyword(filename, keyword, timeout=20, is_rematch=False, opfunc=None): ret = None f = LogTracker("log", filename) f.start() f.find_flag = False def callback(line): print(f"find line: {line}") f.find_flag = line if is_rematch: listener = f.register_listener(callback=callback, regex=[keyword], call_times=1) else: # print(f"register find in: {filename}") # print(f"register find word: {keyword}") listener = f.register_listener(callback=callback, keywords=[keyword], call_times=1) if opfunc is not None: time.sleep(1) try: opfunc() except Exception as error: print(error) f.stop() raise error s_t = time.time() while f.find_flag is False: c_t = time.time() if c_t - s_t >= timeout: ret = False break ret = f.find_flag if ret is False: f.uninstall(listener) # close f.stop() return ret