#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@IDE: PyCharm
@Project: automation
@Author: Alice Wu
@Email: wutong01@corp.netease.com
@File: Log.py
@Time: 2021/6/2 16:33
@Description:
"""
import os
import re
import time
import logging
from datetime import datetime
import threading
[docs]class StoppableThread(threading.Thread):
def __init__(self):
super().__init__()
self._running = True
self.StopEvent = threading.Event()
[docs] def stop(self):
self._running = False
self.on_stop()
self.StopEvent.set()
[docs] def on_stop(self):
pass
[docs]class LogTracker(StoppableThread):
LISTENER = {}
def __init__(self, id_name, log_path, wait_for_path=True, wait_for_timeout=1800):
super().__init__()
self.id_name = id_name
self.path = log_path
self.wait_for_path = wait_for_path
self.wait_for_timeout = wait_for_timeout
self.find_flag = False
self._log_pos = 0 # The position that has read
self.listener_list = [] # [[trace_1, callback_1], [trace_2, callback_2]]
# print(' --- Init log tracer %s ---' % self.id_name)
# Reset Seeking Position
self.seek(0)
[docs] def run(self):
# print(' --- Run log tracer %s ---' % self.id_name)
if self.path is None:
logging.error('Path should not be none')
return
if self.wait_for_path:
# Wait for path to be created
# logging.error(' --- %s Waiting for log file ---' % self.id_name)
start_wait = datetime.now()
while self._running and not os.path.exists(self.path) \
and (datetime.now() - start_wait).total_seconds() < self.wait_for_timeout:
time.sleep(.1)
if not os.path.exists(self.path):
logging.error('%s cannot find path %s' % (self.id_name, self.path))
return
# print('Log tracker %s start' % self.id_name)
rfb = open(self.path, 'rb')
while self._running:
if self._log_pos != 0:
rfb.seek(self._log_pos)
else:
# seek(Offset, from_what)
# offset: Number of positions to move forward
# from_what: 0(default): begining of the file, 1: current position, 2: end of file
rfb.seek(self._log_pos, 2)
blines = rfb.readlines()
for bline in blines:
try:
line = bline.decode('gb18030', 'ignore')
except Exception as e:
print(e)
continue
self.custom_line(line)
self.notify(line)
self._log_pos = rfb.tell()
[docs] def seek(self, pos):
self._log_pos = pos
[docs] def custom_line(self, line):
pass
[docs] def notify(self, line):
for listener in self.listener_list:
case_line = line
if listener.ignore_case:
case_line = line.lower()
if listener.regex is not None and re.match(listener.regex, case_line):
listener.on_trigger(case_line)
elif listener.keywords is not None \
and isinstance(listener.keywords, list) \
and all(key in case_line for key in listener.keywords):
listener.on_trigger(case_line)
elif listener.keywords is not None and isinstance(listener.keywords, str) \
and listener.keywords in case_line:
listener.on_trigger(case_line)
[docs] def register_listener(self, callback, **kwargs):
# print('Register listener callback=%s kwargs=%s' % (callback, kwargs))
listener = LogListener(self, callback, **kwargs)
self.listener_list.append(listener)
return listener
[docs] def uninstall(self, listener):
self.listener_list.remove(listener)
del listener
[docs]class LogListener(object):
def __init__(self, tracker, callback, **kwargs):
self.tracker = tracker
self.callback = callback
self.regex = kwargs.get('regex', None)
self.call_after = kwargs.get('call_after', 0)
self.call_times = kwargs.get('call_times', None)
self.ignore_case = kwargs.get('ignore_case', False)
self.keywords = kwargs.get('keywords', None)
if self.keywords and self.ignore_case:
self.keywords = [key.lower() for key in self.keywords]
self.lines = []
[docs] def on_trigger(self, line):
try:
self.lines.append(line)
except Exception as e:
logging.error(e)
if self.call_after > 0:
self.call_after -= 1
return
if self.call_times is None:
self.callback(line)
else:
if self.call_times > 0:
self.callback(line)
self.call_times -= 1
if self.call_times <= 0:
self.uninstall()
[docs] def uninstall(self):
if self.tracker:
self.tracker.uninstall(self)
[docs]def wait_for_keyword(filename, keyword, timeout=20, is_rematch=False, opfunc=None):
ret = None
f = LogTracker("log", filename)
f.start()
f.find_flag = False
def callback(line):
print(f"find line: {line}")
f.find_flag = line
if is_rematch:
listener = f.register_listener(callback=callback, regex=[keyword], call_times=1)
else:
# print(f"register find in: {filename}")
# print(f"register find word: {keyword}")
listener = f.register_listener(callback=callback, keywords=[keyword], call_times=1)
if opfunc is not None:
time.sleep(1)
try:
opfunc()
except Exception as error:
print(error)
f.stop()
raise error
s_t = time.time()
while f.find_flag is False:
c_t = time.time()
if c_t - s_t >= timeout:
ret = False
break
ret = f.find_flag
if ret is False:
f.uninstall(listener)
# close
f.stop()
return ret