fix: fix some error

Description:

Log:
This commit is contained in:
mikigo 2024-05-23 17:46:47 +08:00
parent 3ae6533a6d
commit a22eafb743
9 changed files with 61 additions and 202 deletions

View File

@ -25,6 +25,7 @@ from os import remove
from os import makedirs from os import makedirs
from os import walk from os import walk
from os.path import exists from os.path import exists
from os.path import join
from os.path import splitext from os.path import splitext
from enum import Enum from enum import Enum
from time import sleep from time import sleep
@ -38,16 +39,22 @@ from concurrent.futures import wait
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ALL_COMPLETED from concurrent.futures import ALL_COMPLETED
import letmego
import allure import allure
import pytest import pytest
from _pytest.mark import Mark from _pytest.mark import Mark
from _pytest.terminal import TerminalReporter from _pytest.terminal import TerminalReporter
from funnylog.conf import setting as log_setting from funnylog.conf import setting as log_setting
letmego.conf.setting.PASSWORD = GlobalConfig.PASSWORD try:
letmego.conf.setting.RUNNING_MAN_FILE = f"{GlobalConfig.REPORT_PATH}/_running_man.log" import letmego
letmego.conf.setting.DEBUG = GlobalConfig.LETMEGO_DEBUG
HAS_LETMEGO = True
letmego.conf.setting.PASSWORD = GlobalConfig.PASSWORD
letmego.conf.setting.RUNNING_MAN_FILE = f"{GlobalConfig.REPORT_PATH}/_running_man.log"
letmego.conf.setting.DEBUG = GlobalConfig.LETMEGO_DEBUG
except ModuleNotFoundError:
HAS_LETMEGO = False
log_setting.LOG_FILE_PATH = GlobalConfig.REPORT_PATH log_setting.LOG_FILE_PATH = GlobalConfig.REPORT_PATH
log_setting.CLASS_NAME_STARTSWITH = GlobalConfig.CLASS_NAME_STARTSWITH log_setting.CLASS_NAME_STARTSWITH = GlobalConfig.CLASS_NAME_STARTSWITH
@ -434,8 +441,9 @@ def pytest_collection_modifyitems(session):
_reruns = None _reruns = None
if hasattr(session.config.option, "reruns"): if hasattr(session.config.option, "reruns"):
_reruns = session.config.option.reruns _reruns = session.config.option.reruns
if letmego.read_testcase_running_status(item, reruns=_reruns): if HAS_LETMEGO:
session.items.remove(item) if letmego.read_testcase_running_status(item, reruns=_reruns):
session.items.remove(item)
if (suite_id or task_id) and session.items: if (suite_id or task_id) and session.items:
print("\n即将执行的用例:") print("\n即将执行的用例:")
@ -538,7 +546,7 @@ def pytest_collection_finish(session):
def pytest_runtest_setup(item): def pytest_runtest_setup(item):
if hasattr(item, "execution_count"): if HAS_LETMEGO and hasattr(item, "execution_count"):
letmego.conf.setting.EXECUTION_COUNT = item.execution_count letmego.conf.setting.EXECUTION_COUNT = item.execution_count
print() # 处理首行日志换行的问题 print() # 处理首行日志换行的问题
@ -630,7 +638,7 @@ def pytest_runtest_makereport(item, call):
# 只要是需要数据回填(无论是自动还是手动),都需要写json结果. # 只要是需要数据回填(无论是自动还是手动),都需要写json结果.
write_case_result(item, report) write_case_result(item, report)
if item.config.option.autostart: if HAS_LETMEGO and item.config.option.autostart:
letmego.write_testcase_running_status(item, report) letmego.write_testcase_running_status(item, report)
try: try:
if item.execution_count >= (int(item.config.option.record_failed_case) + 1): if item.execution_count >= (int(item.config.option.record_failed_case) + 1):
@ -760,7 +768,7 @@ def pytest_sessionfinish(session):
): ):
execute[item_name] = default_result execute[item_name] = default_result
json_report_path = f"{GlobalConfig.JSON_REPORT_PATH}/json" json_report_path = join(GlobalConfig.JSON_REPORT_PATH, "json")
if not exists(json_report_path): if not exists(json_report_path):
makedirs(json_report_path) makedirs(json_report_path)
with open(f"{json_report_path}/detail_report.json", "w", encoding="utf-8") as _f: with open(f"{json_report_path}/detail_report.json", "w", encoding="utf-8") as _f:

View File

@ -35,6 +35,7 @@ class Manage:
def __init__(self): def __init__(self):
from src.plugins.mng import trim from src.plugins.mng import trim
from src.plugins.mng import help_tip from src.plugins.mng import help_tip
from src.plugins.mng import SubCmd
trim() trim()
logger(GlobalConfig.LOG_LEVEL) logger(GlobalConfig.LOG_LEVEL)
@ -45,7 +46,6 @@ class Manage:
sys.exit(1) sys.exit(1)
parser = ArgumentParser(epilog=self.__author__) parser = ArgumentParser(epilog=self.__author__)
subparsers = parser.add_subparsers(help="子命令") subparsers = parser.add_subparsers(help="子命令")
from setting.subcmd import SubCmd
sub_parser_remote = subparsers.add_parser(SubCmd.remote.value) sub_parser_remote = subparsers.add_parser(SubCmd.remote.value)
sub_parser_run = subparsers.add_parser(SubCmd.run.value) sub_parser_run = subparsers.add_parser(SubCmd.run.value)

View File

@ -1,12 +0,0 @@
from enum import Enum
from enum import unique
@unique
class SubCmd(Enum):
run = "run"
remote = "remote"
pmsctl = "pmsctl"
csvctl = "csvctl"
startapp = "startapp"
git = "git"

View File

@ -1,13 +1,15 @@
from setting.subcmd import SubCmd from enum import Enum
from enum import unique
from setting.globalconfig import GlobalConfig from setting.globalconfig import GlobalConfig
def help_tip(): def help_tip():
color = "34" color = "32"
return ( return (
f"\033[1;{color}mmanage.py\033[0m 支持 \033[1;{color}m{[i.value for i in SubCmd]}\033[0m 命令, " f'''\033[1;{color}mYouQu\033[0m 支持的子命令:\n '''
f"\n您需要传入一个命令,可以使用 \033[1;{color}m-h\033[0m 或 \033[1;{color}m--help\033[0m 查看每个命令参数的详细使用说明," f'''\033[1;{color}m{str([i.value for i in SubCmd]).replace("'", "").strip("[").strip("]")}\033[0m'''
f"\n比如: \033[1;{color}myouqu manage.py run -h\033[0m \n" f"\n您需要传入其中一个子命令,可以使用 \033[1;{color}m-h\033[0m 或 \033[1;{color}m--help\033[0m 查看每个命令参数的详细使用说明."
f"\n如: \033[1;{color}myouqu manage.py run -h\033[0m \n"
) )
@ -25,8 +27,19 @@ def trim():
say(GlobalConfig.PROJECT_NAME) say(GlobalConfig.PROJECT_NAME)
version_font = "slick" version_font = "slick"
color = "32"
say(GlobalConfig.current_tag, font=version_font, space=False) say(GlobalConfig.current_tag, font=version_font, space=False)
say(f"Code: \033[0;32m{GlobalConfig.GITHUB_URL}\033[0m", font="console", space=False) say(f"Code: \033[1;{color}m{GlobalConfig.GITHUB_URL}\033[0m", font="console", space=False)
say(f"Docs: \033[0;32m{GlobalConfig.DOCS_URL}\033[0m", font="console", space=False) say(f"Docs: \033[1;{color}m{GlobalConfig.DOCS_URL}\033[0m", font="console", space=False)
say(f"PyPI: \033[0;32m{GlobalConfig.PyPI_URL}\033[0m", font="console", space=False) say(f"PyPI: \033[1;{color}m{GlobalConfig.PyPI_URL}\033[0m", font="console", space=False)
say("=" * 60, font="console", space=False) say(f'\033[1;{color}m{"=" * 60}\033[0m', font="console", space=False)
@unique
class SubCmd(Enum):
run = "run"
remote = "remote"
pmsctl = "pmsctl"
csvctl = "csvctl"
startapp = "startapp"
git = "git"

View File

@ -109,14 +109,17 @@ def local_runner(parser, sub_parser_run, cmd_args=None):
Args.slaves.value: args.slaves, Args.slaves.value: args.slaves,
} }
if local_kwargs.get(Args.autostart.value) or GlobalConfig.AUTOSTART: if local_kwargs.get(Args.autostart.value) or GlobalConfig.AUTOSTART:
import letmego try:
import letmego
letmego.conf.setting.PASSWORD = GlobalConfig.PASSWORD letmego.conf.setting.PASSWORD = GlobalConfig.PASSWORD
letmego.register_autostart_service( letmego.register_autostart_service(
user=GlobalConfig.USERNAME, user=GlobalConfig.USERNAME,
working_directory=GlobalConfig.ROOT_DIR, working_directory=GlobalConfig.ROOT_DIR,
cmd=f"pipenv run python manage.py {' '.join(cmd_args)}", cmd=f"pipenv run python manage.py {' '.join(cmd_args)}",
) )
except ModuleNotFoundError:
...
return local_kwargs, args return local_kwargs, args

View File

@ -32,8 +32,6 @@ al_setting.html_title = GlobalConfig.REPORT_TITLE
al_setting.report_name = GlobalConfig.REPORT_NAME al_setting.report_name = GlobalConfig.REPORT_NAME
al_setting.report_language = GlobalConfig.REPORT_LANGUAGE al_setting.report_language = GlobalConfig.REPORT_LANGUAGE
import letmego
from src import logger from src import logger
from src.rtk._base import Args, write_json from src.rtk._base import Args, write_json
from src.requestx import RequestX from src.requestx import RequestX
@ -360,11 +358,11 @@ class LocalRunner:
line=self.line, line=self.line,
) )
json_report_path = f"{GlobalConfig.JSON_REPORT_PATH}/json" json_report_path = join(GlobalConfig.JSON_REPORT_PATH, "json")
with open(f"{json_report_path}/detail_report.json", "r", encoding="utf-8") as _f: with open(f"{json_report_path}/detail_report.json", "r", encoding="utf-8") as _f:
detail_report = json.load(_f) detail_report = json.load(_f)
res = Counter([detail_report.get(i).get("result") for i in detail_report]) res = Counter([detail_report.get(i).get("result") for i in detail_report])
with open(f"{json_report_path}/summarize.json", "r", encoding="utf-8") as _f: with open(f"{json_report_path}/summarize.json", "w", encoding="utf-8") as _f:
_f.write(json.dumps( _f.write(json.dumps(
{ {
"total": sum(res.values()), "total": sum(res.values()),
@ -400,12 +398,15 @@ class LocalRunner:
f"{json_report_path}/" f"{json_report_path}/"
f"result_{self.default.get(Args.app_name.value)}_{GlobalConfig.TIME_STRING}_{GlobalConfig.HOST_IP.replace('.', '')}.json" f"result_{self.default.get(Args.app_name.value)}_{GlobalConfig.TIME_STRING}_{GlobalConfig.HOST_IP.replace('.', '')}.json"
) )
try:
if exists(letmego.conf.setting.RUNNING_MAN_FILE): import letmego
letmego.unregister_autostart_service() if exists(letmego.conf.setting.RUNNING_MAN_FILE):
letmego.clean_running_man( letmego.unregister_autostart_service()
copy_to=f"{GlobalConfig.REPORT_PATH}/_running_man_{GlobalConfig.TIME_STRING}.log" letmego.clean_running_man(
) copy_to=f"{GlobalConfig.REPORT_PATH}/_running_man_{GlobalConfig.TIME_STRING}.log"
)
except ModuleNotFoundError:
...
def install_deb(self): def install_deb(self):
logger.info("安装deb包") logger.info("安装deb包")

View File

@ -11,7 +11,6 @@ env(){
deb_array=( deb_array=(
python3-pip python3-pip
sshpass
scrot scrot
python3-tk python3-tk
python3-pyatspi python3-pyatspi
@ -72,7 +71,6 @@ pip_array=(
allure-custom allure-custom
funnylog funnylog
image-center image-center
letmego
) )
# 裁剪基础环境 # 裁剪基础环境
if [ "${ENV_CUT_FLAG}" = "cut" ]; then if [ "${ENV_CUT_FLAG}" = "cut" ]; then

View File

@ -22,7 +22,6 @@ env(){
deb_array=( deb_array=(
python3-pip python3-pip
python3-tk python3-tk
sshpass
scrot scrot
openjdk-11-jdk-headless openjdk-11-jdk-headless
gir1.2-atspi-2.0 gir1.2-atspi-2.0
@ -36,7 +35,6 @@ env(){
ENV_CUT_FLAG="cut" ENV_CUT_FLAG="cut"
deb_array=( deb_array=(
python3-pip python3-pip
sshpass
openjdk-11-jdk-headless openjdk-11-jdk-headless
) )
fi fi
@ -126,7 +124,6 @@ pip_array=(
pdocr-rpc pdocr-rpc
image-center image-center
allure-custom allure-custom
letmego
) )
if [ "${ENV_CUT_FLAG}" = "cut" ]; then if [ "${ENV_CUT_FLAG}" = "cut" ]; then
@ -192,8 +189,4 @@ sudo chmod +x /usr/bin/youqu
sudo chmod +x /usr/bin/youqu-shell sudo chmod +x /usr/bin/youqu-shell
sudo chmod +x /usr/bin/youqu-rm sudo chmod +x /usr/bin/youqu-rm
#cp --force ${ROOT_DIR}/src/utils/command_complete.sh ${HOME}/.config/
#echo "source ${HOME}/.config/command_complete.sh" >> $HOME/.bashrc
#source $HOME/.bashrc
cd ${ROOT_DIR};youqu manage.py -h cd ${ROOT_DIR};youqu manage.py -h

View File

@ -1,145 +0,0 @@
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# SPDX-FileCopyrightText: 2023 UnionTech Software Technology Co., Ltd.
# SPDX-License-Identifier: GPL-2.0-only
# pylint: disable=C0114
# pylint: disable=too-many-locals
import math
from os.path import join
from os.path import dirname
from os.path import abspath
from os.path import exists
from os import makedirs
from time import time
from socketserver import ThreadingMixIn
from xmlrpc.server import SimpleXMLRPCServer
# pylint: disable=import-error
import pypinyin
import cv2 as cv
import numpy as np
class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
"""ThreadXMLRPCServer"""
CURRENT_DIR = dirname(abspath(__file__))
def image_put(data):
"""上传图片"""
pic_dir = join(CURRENT_DIR, "pic")
if not exists(pic_dir):
makedirs(pic_dir)
pic_path = join(pic_dir, f"pic_{time()}.png")
# pylint: disable=consider-using-with
handle = open(pic_path, "wb")
handle.write(data.data)
handle.close()
return pic_path
def pinyin(word) -> str:
"""
汉字转化为拼音
:param word: 待转化的汉语字符串
:return: 拼音字符串
"""
_s = ""
for key in pypinyin.pinyin(word, style=pypinyin.NORMAL):
_s += "".join(key)
return _s
def match_image_by_opencv(template_path, source_path, rate=None, multiple=False):
"""
图像识别匹配小图在屏幕中的坐标 x, y
:param image_path: 图像识别目标文件的存放路径
:param rate: 匹配度
:param multiple: 是否返回匹配到的多个目标
:return: 根据匹配度返回坐标
"""
# pylint: disable=I1101,E1101
template = cv.imread(template_path)
# pylint: disable=I1101,E1101
source = cv.imread(source_path)
# pylint: disable=I1101,E1101
result = cv.matchTemplate(source, template, cv.TM_CCOEFF_NORMED)
if not multiple:
# pylint: disable=I1101,E1101
pos_start = cv.minMaxLoc(result)[3]
_x = int(pos_start[0]) + int(template.shape[1] / 2)
_y = int(pos_start[1]) + int(template.shape[0] / 2)
# pylint: disable=I1101,E1101
similarity = cv.minMaxLoc(result)[1]
if similarity < rate:
return False
return _x, _y
# else:
loc = np.where(result >= rate)
tmp_list_out = []
tmp_list_in = []
loc_list = list(zip(*loc))
for i in range(0, len(loc_list) - 1):
tmp_list_in.append(loc_list[i])
if loc_list[i + 1][0] != loc_list[i][0] or (loc_list[i + 1][1] - loc_list[i][1]) > 1:
tmp_list_out.append(tmp_list_in)
tmp_list_in = []
continue
if i == len(loc_list) - 2:
tmp_list_in.append(loc_list[-1])
tmp_list_out.append(tmp_list_in)
result_list = []
x_list, y_list = [], []
if tmp_list_out:
for i in tmp_list_out:
for j in i:
x_list.append(j[1])
y_list.append(j[0])
_x = np.mean(x_list) + int(template.shape[1] / 2)
_y = np.mean(y_list) + int(template.shape[0] / 2)
result_list.append((_x, _y))
x_list, y_list = [], []
result_list.sort(key=lambda x: x[0])
return result_list
return False
def coordinate_distance(start: tuple, end: tuple) -> float:
"""
计算两个坐标之间的直线距离
:param start: 起始坐标
:param end: 终止坐标
:return: 两点之间距离
"""
position_start = np.array(start)
position_end = np.array(end)
position_res = position_end - position_start
return math.hypot(position_res[0], position_res[1])
def translational_coordinates(start: tuple, relative: tuple) -> np.ndarray:
"""
计算坐标平移
:param start: 起始坐标
:param relative: 平移的相对坐标
:return: 平移后的坐标
"""
position_start = np.array(start)
position_end = np.array(relative)
return np.sum([position_start, position_end], axis=0)
if __name__ == "__main__":
server = ThreadXMLRPCServer(("10.8.13.78", 8889), allow_none=True)
server.register_function(image_put, "image_put")
server.register_function(pinyin, "pinyin")
server.register_function(match_image_by_opencv, "match_image_by_opencv")
server.register_function(coordinate_distance, "coordinate_distance")
server.register_function(translational_coordinates, "translational_coordinates")
print("监听客户端请求。。")
server.serve_forever()