美文网首页
Android 实时监控CPU、MEM、FPS、NET信息

Android 实时监控CPU、MEM、FPS、NET信息

作者: 崔某 | 来源:发表于2021-08-25 11:26 被阅读0次

    1、需求

    在用户操作手机时(或模拟用户操作手机),实时检测手机CPU、FPS、MEM、NET信息,确实此数据变化是否异常

    2、环境

    Win7+Python3+Appium

    3、代码目录说明

    image

    4、运行方式

    4.1 连接手机,并导入相关第三方库(pip )

    image

    4.2 修改全局变量PACKAGE_NAME、ACTIVITY_NAME的值(即确认被测应用的包名和类名)

    image

    4.3 重写User类的user_swipe_screen()方法,为读取设备相关信息做准备

    4.4 运行main函数,并输入模拟用户操作的次数

    image

    5、结果说明

    5.1 测试完成后,会在main.py同级目录生成下面文件

    image

    5.2 可以打开csv文件并插入折线图

    image

    image

    image

    image

    源代码:

    # -*- coding: utf-8 -*-
    # @Time    : 2021/8/18 16:50
    # @Author  : Shuangqi
    # @File    : main.py
    import csv
    import os
    import re
    import threading
    import time
    from time import sleep
    from selenium.webdriver.support.wait import WebDriverWait
    from selenium.common.exceptions import NoSuchElementException, TimeoutException
    from appium import webdriver
    
    # 手机型号
    PHONE_MODEL = os.popen("adb shell getprop ro.product.model").read().strip('\n')
    # 安卓版本
    ANDROID_VERSION = os.popen("adb shell getprop ro.build.version.release").read().strip('\n')
    
    # Package/Activity
    PACKAGE_NAME = "com.ss.android.article.news"
    ACTIVITY_NAME = "com.ss.android.article.news.activity.MainActivity"
    
    # 应用版本号
    APP_VERSION = os.popen(f'adb shell pm dump {PACKAGE_NAME} | findstr "versionName"').read().strip("\n").split("=")[1]
    
    print("================================")
    print("【测试信息】")
    print(f"【机型】:{PHONE_MODEL}")
    print(f"【安卓】:Android {ANDROID_VERSION}")
    print(f"【包名】:{PACKAGE_NAME}")
    print(f"【类名】:{ACTIVITY_NAME}")
    print(f"【版本】:{APP_VERSION}")
    print("================================")
    
    # 测试运行次数(即滑动次数)
    RUNNING_TIMES = int(input("请输入滑动测试次数:"))
    
    
    class Utils(object):
        # 根据包名获取Pid
        @staticmethod
        def get_pid_by_package_name(pkg):
            pid = os.popen(f'adb shell ps | findstr "{pkg}"').read()
            ps_info = re.findall("\S+", pid)
            return ps_info[1]
    
        # 写入CSV
        @staticmethod
        def get_csv_writer(dirs, file_name, field_names):
            if not os.path.exists(dirs):
                os.makedirs(dirs)
            now_time = time.strftime('%Y%m%d-%H%M%S', time.localtime(time.time()))
            file_path = dirs + file_name + "_" + now_time + ".csv"
            mem_csv = open(file_path, 'w', newline='', encoding="GBK")
            writer = csv.DictWriter(mem_csv, fieldnames=field_names)
            writer.writeheader()
            return writer
    
    
    class Phone(object):
    
        def __init__(self):
            desired_caps = {
                'platformName': 'Android',  # 平台名称
                'deviceName': 'test',  # 设备名称(任写)
                'platformVersion': ANDROID_VERSION,  # 安卓版本
                'appPackage': PACKAGE_NAME,  # 启动包名
                'appActivity': ACTIVITY_NAME,  # 启动 Acclivity
                'noReset': False,  # False即重置应用
                'newCommandTimeout': 60000  # 超时时间(一分钟)
            }
            if ANDROID_VERSION == "11":
                # Android 11 添加此参数
                desired_caps["automationName"] = "uiautomator2"
            # 开始初始化
            print("Appium 正在初始化...")
            # 启动服务/通过4723端口来建立一个会话
            self.driver = webdriver.Remote('http://localhost:4723/wd/hub', desired_caps)
            # 隐式等待15s
            self.driver.implicitly_wait(15)
            # 获取屏幕宽度
            self.window_width = self.driver.get_window_size()['width']
            # 获取屏幕高度
            self.window_height = self.driver.get_window_size()['height']
            print("Appium 初始化完成...")
    
        # 等待元素
        def wait_element(self, way, value, timeout=10, poll_frequency=0.5):
            """
            :param way:         定位方式
            :param value:       值
            :param timeout:     超时时长(s)
            :param poll_frequency: 刷新频率(s)
            :return:
            """
            try:
                if way == "text":
                    WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                        lambda x: x.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value),
                        message=f'【text:{value}】等待超时')
                elif way == "id":
                    WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                        lambda x: x.find_element_by_id(value), message=f'【id:{value}】等待超时')
                elif way == "desc":
                    WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                        lambda x: x.find_element_by_accessibility_id(value), message=f'【desc:{value}】等待超时')
                # xpath参数示例://*[@text="xxx"]
                elif way == "xpath":
                    WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                        lambda x: x.find_element_by_xpath(value), message=f'【xpath:{value}】等待超时')
                else:
                    raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
            except TimeoutException:
                raise
    
        # 查找元素
        def find_element(self, way, value):
            """
            :param way:     定位方式
            :param value:   值
            :return:    返回元素
            """
            try:
                if way == "text":
                    # return self.driver.find_element_by_name(value)   已经凉了
                    return self.driver.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value)
                elif way == "id":
                    return self.driver.find_element_by_id(value)
                elif way == "desc":
                    return self.driver.find_element_by_accessibility_id(value)
                # xpath参数示例://*[@text="xxx"]
                elif way == "xpath":
                    return self.driver.find_element_by_xpath(value)
                else:
                    raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
            except NoSuchElementException:
                raise
    
        # 滑动屏幕(根据屏幕宽度,不需要坐标点)
        def swipe_screen(self, swipe_type):
            """
            :param swipe_type: 滑动方式(左/右/上/下/)
            :return:
            """
            if swipe_type == "down":
                self.driver.swipe(self.window_width / 2, self.window_height / 4, self.window_width / 2,
                                  3 * self.window_height / 4)
            elif swipe_type == "up":
                self.driver.swipe(self.window_width / 2, 3 * self.window_height / 4, self.window_width / 2,
                                  self.window_height / 4)
            elif swipe_type == "left":
                self.driver.swipe(self.window_width / 4, self.window_height / 2, 3 * self.window_width / 4,
                                  self.window_height / 2)
            elif swipe_type == "right":
                self.driver.swipe(3 * self.window_width / 4, self.window_height / 2, self.window_width / 4,
                                  self.window_height / 2)
            else:
                raise TypeError(f"参数错误:{swipe_type},传入参数:down/up/left/right")
    
    
    class User(Phone):
    
        def user_swipe_screen_start(self):
            # 调用一个线程滑动屏幕
            t = threading.Thread(target=self.user_swipe_screen)
            t.start()
    
        def user_swipe_screen(self):
            global RUNNING_TIMES
            # 进入app的准备工作
            try:
                self.wait_element("text", "同意")
                self.find_element("text", "同意").click()
                self.wait_element("text", "允许")
                self.find_element("text", "允许").click()
                # 低配置机器提示 开启流畅模式
                self.find_element("text", "开启").click()
                self.wait_element("text", "流畅模式")
                self.driver.keyevent(4)
                self.wait_element("text", "首页")
                # 滑动屏幕
                while True:
                    self.swipe_screen("up")
                    # print(f"第{i}次滑动屏幕!")
                    RUNNING_TIMES -= 1
                    print(f"用户滑动次数:{RUNNING_TIMES}")
                    if RUNNING_TIMES == 0:
                        break
            except NoSuchElementException:
                raise
            finally:
                sleep(5)
                self.driver.quit()
                print("Driver成功退出!")
    
    
    class FPS(object):
        TIME = "time"
        FPS = "FPS"
    
        def __init__(self):
            self.is_first = True
            self.last_time = 0.0
            self.last_fps = 0
            self.is_running = True
    
        def get_fps_info_start(self):
            t = threading.Thread(target=self.get_fps_info)
            t.start()
    
        def get_fps_info(self):
            fps_dirs = f"./{PHONE_MODEL}/fps/"
            fps_file_name = f"fps_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
            fps_field_names = [self.TIME, self.FPS]
            fps_command = f'adb shell dumpsys gfxinfo {PACKAGE_NAME} | findstr "Total frames"'
            writer = Utils.get_csv_writer(fps_dirs, fps_file_name, fps_field_names)
            while RUNNING_TIMES != 0:
                if self.is_first:
                    self.last_time = time.time_ns()
                    self.last_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
                    self.is_first = False
                current_time = time.time_ns()
                current_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
                time_delta = (current_time - self.last_time) / 1000000000.0
                fps_delta = current_fps - self.last_fps
                self.last_time = current_time
                self.last_fps = current_fps
                now_fps = "{:.2f}".format(fps_delta / time_delta)
                writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.FPS: now_fps})
                # 间隔0.5s获取数据
                print(f"FPS:【{now_fps}】")
                sleep(0.5)
    
    
    class CPU(object):
        TIME = "time"
        CPU_RATE = "进程CPU占比(%)"
    
        def get_cpu_info_start(self):
            t = threading.Thread(target=self.get_cpu_info)
            t.start()
    
        # 总的CPU信息
        def get_cpu_usage(self):
            cpu_usage = 0.0
            cpu_path = "/proc/stat"
            info = os.popen(f'adb shell cat {cpu_path}').read()
            result = re.split("\s+", info.split("\n")[0])
            for i in range(2, len(result)):
                cpu_usage += float(result[i])
            return cpu_usage
    
        # 通过ID获取cpu信息(例如此时的今日头条进程id为10808)
        def get_process_cpu_usage(self):
            pid = Utils.get_pid_by_package_name(PACKAGE_NAME)
            cpu_path = "/proc/" + pid + "/stat"
            info = os.popen(f'adb shell cat {cpu_path}').read()
            result = re.split("\s+", info)
            # 进程总的cpu占用
            cpu_usage = float(result[13]) + float(result[14])
            return cpu_usage
    
        # 计算CPU占用
        def get_cpu_info(self):
            cpu_dirs = f"./{PHONE_MODEL}/cpu/"
            cpu_file_name = f"cpu_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
            cpu_field_names = [self.TIME, self.CPU_RATE]
            writer = Utils.get_csv_writer(cpu_dirs, cpu_file_name, cpu_field_names)
            while RUNNING_TIMES != 0:
                start_all_cpu = self.get_cpu_usage()
                start_p_cpu = self.get_process_cpu_usage()
                sleep(1)
                end_all_cpu = self.get_cpu_usage()
                end_p_cpu = self.get_process_cpu_usage()
                cpu_rate = 0.0
                if (end_all_cpu - start_all_cpu) != 0:
                    cpu_rate = (end_p_cpu - start_p_cpu) * 100.00 / (
                            end_all_cpu - start_all_cpu)
                    if cpu_rate < 0:
                        cpu_rate = 0.00
                    elif cpu_rate > 100:
                        cpu_rate = 100.00
                writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.CPU_RATE: format(cpu_rate, ".2f")})
                print(f"CPU:【{format(cpu_rate, '.2f')}%】")
                sleep(0.5)
    
    
    class MEM(object):
        TIME = "time"
        NATIVE_HEAP = "Native Heap(MB)"
        DALVIK_HEAP = "Dalvik Heap(MB)"
    
        def get_mem_info_start(self):
            t = threading.Thread(target=self.get_mem_info)
            t.start()
    
        def get_mem_info(self):
            mem_dirs = f"./{PHONE_MODEL}/mem/"
            mem_file_name = f"mem_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
            mem_field_names = [self.TIME, self.NATIVE_HEAP, self.DALVIK_HEAP]
            writer = Utils.get_csv_writer(mem_dirs, mem_file_name, mem_field_names)
            while RUNNING_TIMES != 0:
                # native_info
                native_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Native Heap"').read()
                native_pss = format(int(re.findall(r"\d+", native_info)[0]) / 1000.0, ".2f")
                # dalvik_info
                dalvik_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Dalvik Heap"').read()
                dalvik_pss = format(int(re.findall(r"\d+", dalvik_info)[0]) / 1000.0, ".2f")
                writer.writerow(
                    {self.TIME: time.strftime("%H:%M:%S"), self.NATIVE_HEAP: native_pss, self.DALVIK_HEAP: dalvik_pss})
                print(f"native_pss:【{native_pss} MB】")
                print(f"dalvik_pss:【{dalvik_pss} MB】")
                sleep(1)
                sleep(0.5)
    
    
    class NET(object):
        TIME = "time"
        DOWN_SPEED = "下载速度(KB/s)"
        UP_SPEED = "上传速度(KB/s)"
        AVERAGE_DOWN_SPEED = "平均下载速度(KB/s)"
        AVERAGE_UP_SPEED = "平均上传速度(KB/s)"
        TOTAL_DOWN_SPEED = "下载总流量(KB)"
        TOTAL_UP_SPEED = "上传总流量(KB)"
    
        def __init__(self):
            self.is_first = True
            self.last_time = 0
            self.last_net_up = 0
            self.last_net_down = 0
            self.start_time = 0
            self.start_net_up = 0
            self.start_net_down = 0
    
        def get_net_info(self):
            t = threading.Thread(target=self.get_net_info)
            t.start()
    
        def get_net_info_start(self):
            net_command = "adb shell cat /proc/" + Utils.get_pid_by_package_name(PACKAGE_NAME) + '/net/dev | findstr "wlan"'
            net_dirs = f"./{PHONE_MODEL}/net/"
            net_file_name = f"net_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
            net_field_names = [self.TIME,
                               self.DOWN_SPEED,
                               self.UP_SPEED,
                               self.AVERAGE_DOWN_SPEED,
                               self.AVERAGE_UP_SPEED,
                               self.TOTAL_DOWN_SPEED,
                               self.TOTAL_UP_SPEED]
            writer = Utils.get_csv_writer(net_dirs, net_file_name, net_field_names)
            while RUNNING_TIMES != 0:
                if self.is_first:
                    self.start_time = time.time_ns()
                    self.last_time = self.start_time
                    net_info = os.popen(net_command).read()
                    net_array = re.split("\s+", net_info)
                    self.start_net_down = int(net_array[2])
                    self.last_net_down = self.start_net_down
                    self.start_net_up = int(net_array[10])
                    self.last_net_up = self.start_net_up
                    self.is_first = False
                current_time = time.time_ns()
                current_info = os.popen(net_command).read()
                current_array = re.split("\s+", current_info)
                current_net_down = int(current_array[2])
                current_net_up = int(current_array[10])
                time_delta = (current_time - self.last_time) / 1000000000.0
                time_total = (current_time - self.start_time) / 1000000000.0
                net_delta_up = (current_net_up - self.last_net_up) / 1024.0
                net_delta_down = (current_net_down - self.last_net_down) / 1024.0
                net_total_up = (current_net_up - self.start_net_up) / 1024.0
                net_total_down = (current_net_down - self.start_net_down) / 1024.0
                net_speed_up = net_delta_up / time_delta
                net_speed_down = net_delta_down / time_delta
                net_average_speed_up = net_total_up / time_total
                net_average_speed_down = net_total_down / time_total
                writer.writerow({self.TIME: time.strftime("%H:%M:%S"),
                                 self.DOWN_SPEED: "{:.2f}".format(net_speed_down),
                                 self.UP_SPEED: "{:.2f}".format(net_speed_up),
                                 self.AVERAGE_DOWN_SPEED: "{:.2f}".format(net_average_speed_down),
                                 self.AVERAGE_UP_SPEED: "{:.2f}".format(net_average_speed_up),
                                 self.TOTAL_DOWN_SPEED: "{:.2f}".format(net_total_down),
                                 self.TOTAL_UP_SPEED: "{:.2f}".format(net_total_up)
                                 })
                print("下载速度:{:.2f} KB/s".format(net_speed_down))
                print("上传速度:{:.2f} KB/s".format(net_speed_up))
                print("平均下载速度:{:.2f} KB/s".format(net_average_speed_down))
                print("平均上传速度:{:.2f} KB/s".format(net_average_speed_up))
                print("下载总流量:{:.0f} KB/s".format(net_total_down))
                print("上传总流量:{:.0f} KB/s".format(net_total_up))
                self.last_time = current_time
                self.last_net_up = current_net_up
                self.last_net_down = current_net_down
                time.sleep(0.5)
    
    
    if __name__ == '__main__':
        user = User()
        user.user_swipe_screen_start()
        print("===========【获取FPS】===========")
        fps = FPS()
        fps.get_fps_info_start()
        print("===========【获取CPU】===========")
        cpu = CPU()
        cpu.get_cpu_info_start()
        print("===========【获取MEM】===========")
        mem = MEM()
        mem.get_mem_info_start()
        print("===========【获取NET】===========")
        net = NET()
        net.get_net_info_start()
    
    

    相关文章

      网友评论

          本文标题:Android 实时监控CPU、MEM、FPS、NET信息

          本文链接:https://www.haomeiwen.com/subject/xopsiltx.html