美文网首页物联网IOT创意开发
自己动手编写一个熊孩子监视器(U型遥控器,Python 3,多线

自己动手编写一个熊孩子监视器(U型遥控器,Python 3,多线

作者: TypeY | 来源:发表于2020-04-22 11:23 被阅读0次

    本篇所展示的脚本均无加密处理,请在安全的网络环境下使用。本篇所展示内容均开源开放,但其使用务必遵循当地法律法规!

    熊孩子在家上网课偷偷玩电脑?是时候处理一下了!

    借此机会展示一下图传和多线程同时收发的脚本,脚本不是很完善但是希望能做到抛砖引玉。

    目标

    1.捕捉熊孩子的电脑桌面并且发送到USwitch;

    2.通过摄像头观察熊孩子是否在认真听课;

    3.点点手机就可以弹出窗口警告熊孩子认真学习;

    4.合理的资源占用,随时停止发送图像,随时启用/停用摄像头;

    5.多线程,多客户端,简单配对,可以做到父母亲友同时监督 :)

    下面是一些效果图:

    同时启用摄像头和屏幕捕捉 弹出窗口警告 弹出窗口警告

    准备工作

    要有网,有网很重要 (手机和熊孩子的电脑连接同一个路由器,或者你有自己的服务器...)

    0.Windows平台的脚本已经打包好可以直接下载使用,因此你可以直接跳过后边所有的步骤,直接跳到“USwitch端的配置”学习如何使用。(备用下载地址,提取码:5yam)

    1.安装必备的库(windows 在 cmd 执行,linux 在终端执行):

    pip install opencv-python
    pip install pillow
    pip install numpy 
    

    2.在手机上安装U型遥控器

    在手机上安装U型遥控器

    脚本和注释

    脚本的后缀由.py修改为.pyw可以在windows平台上实现无弹窗运行,可以手动设置为开机启动项。(运行后仍可在任务管理器中找到)

    from socket import *
    from threading import Thread
    from PIL import ImageGrab
    from io import BytesIO
    from numpy import array
    from sys import platform
    from math import ceil
    import  cv2
    
    #弹窗警告功能暂时只针对win平台
    if platform == "win32" or platform == "win64":
        ISWIN = True
        import ctypes
    
    Use_WebCam=False
    WebCam_starting=False
    
    
    #接收端ip列表,当接收端地址从其中移除将不再向其发送
    #支持多端接收
    Running=[]
    #发送图像的分辨率
    Res_x=690
    Res_y=360
    
    #配置UDP
    r_host = ''
    r_port = 127  #电脑的接收端口
    s_port = 9921 #手机的接收端,使用4位数的端口(受部分安卓系统限制)
    bufsize = 1024 
    r_addr = (r_host,r_port)
    r_udpServer = socket(AF_INET,SOCK_DGRAM) 
    r_udpServer.bind(r_addr)
    
    #接收线程
    class Rec_Thread(Thread):
        def __init__(self, server: socket):
            super().__init__()
            self.server = server
        def run(self):
            global Running
            global Use_WebCam
            while 1:
                data,addr = self.server.recvfrom(bufsize)
                data=data.decode()
                if data=="exit":
                    Running=[]
                    if Use_WebCam:
                        Use_WebCam=False
                        try:
                            cap.release()
                        except:
                            pass
                    exit(0)
                if data=="start" and not addr[0] in Running:
                    Running.append(addr[0]) #将发来start命令的ip地址添加到列表
                    stream_thread = Send_Thread(addr[0]) #创建发送线程
                    stream_thread.start()
                elif data=="stop" and addr[0] in Running:
                    Running.remove(addr[0])
                elif data=="warning":
                    Msg_Box_thread = Msg_Box(0)
                    Msg_Box_thread.start()
                elif data=="camera" and not WebCam_starting: #摄像头启动有延迟请耐心
                    if Use_WebCam:
                        Use_WebCam=False
                        try:
                            cap.release()
                        except:
                            pass
                    else:
                        Use_WebCam=True
                else:
                    Msg_Box_thread = Msg_Box(1,addr[0],data)
                    Msg_Box_thread.start()
    
                    
    #发送线程
    class  Send_Thread(Thread):
        def __init__(self,host):
            super().__init__()
            self.host = host
            self.addr = (host,s_port)
            self.udpClient = socket(AF_INET,SOCK_DGRAM)
        def run(self):
            while self.host in Running: 
                captureImage = ImageGrab.grab() #捕获屏幕截图
                captureImage = captureImage.resize((Res_x,Res_y))
                #转变为cv格式以便编码发送
                captureImage = cv2.cvtColor(array(captureImage), cv2.COLOR_RGB2BGR)
                #如果启用摄像头将进行图片叠加
                if Use_WebCam:
                    try:
                        get, image_np = cap.read()
                        if get :
                            rows, cols = image_np.shape[:2]
                            captureImage[:rows, :cols]=image_np
                        elif not WebCam_starting:
                            Cam_thread = Set_Webcam()
                            Cam_thread.start()
                    except:
                        if not WebCam_starting:
                            Cam_thread = Set_Webcam()
                            Cam_thread.start()
    
                #captureImage.resize((690,360))
                data=cv2.imencode(".jpg",captureImage,[cv2.IMWRITE_JPEG_QUALITY, 30])[1].tobytes()
                cut=int(ceil(len(data)/(bufsize)))#计算切片数
                strr="size;"+str(cut)# 通知手机开始接收切片
                self.udpClient.sendto(strr.encode(),self.addr)
                for i in range(cut):
                  self.udpClient.sendto(data[i*int(bufsize):(i+1)*int(bufsize)],self.addr)#切片并且发送
                self.udpClient.sendto(("end").encode(),self.addr)#通知手机显示图片
            self.udpClient.sendto(("clear").encode(),self.addr)#结束,清理手机上显示的图片,V1.2.7以后版本可用
    
    
    #弹窗警告功能
    class  Msg_Box(Thread):
        def __init__(self,Type,addr="",Text=""):
            super().__init__()
            self.Type=Type
            self.addr=addr
            self.Text=Text
        def run(self):
            if ISWIN:
                if self.Type == 0: #警告
                    ctypes.windll.user32.MessageBoxW(0,  "专心学习!", "不专心警告:",16+4096)
                if self.Type == 1: #其他消息
                    ctypes.windll.user32.MessageBoxW(0,  self.Text, "来自:"+self.addr,1+4096)
            
    #异步摄像头启动
    class  Set_Webcam(Thread):
        def run(self):
            global WebCam_starting
            global cap
            WebCam_starting=True
            cap = cv2.VideoCapture(0)
            cap.set(3,int(Res_x/3.0))
            cap.set(4,int(Res_y/3.0))
            WebCam_starting=False
            
    # 获取本机ip
    def get_host_ip():
        ip='127.0.0.1'
        try:
            s=socket(AF_INET,SOCK_DGRAM)
            s.connect(('8.8.8.8',80))
            ip=s.getsockname()[0]
        finally:
            s.close()
        return ip
    
            
        
    
    
    server_thread = Rec_Thread(r_udpServer)
    server_thread.start()
    #提示本机ip便于接收
    Msg_Box_thread = Msg_Box(1,"熊孩子监视器","本机IP:"+get_host_ip()+"\n接收端口:"+str(r_port))
    Msg_Box_thread.start()
    
    
    
    

    理论上,上面的脚本可运行在任何平台,但是弹窗提醒只支持Windows平台,截屏函数只支持Windows和MacOS,其他平台有需要的,可以自行替换对应内容。

    USwitch端的配置

    1.和熊孩子电脑连接同一个路由器

    2.进入编辑模式,点击左上角的“地球”,设置一个全局IP(电脑端脚本运行时会显示那个IP)

    设置全局IP

    3.创建按钮,脚本可以接受"start","stop","warning","camera"等命令,你可以选择性的将它们创建成按钮,由于我们已经设置了全局IP,所以这里每个按钮的的IP将不再重要,相比之下我们更关心端口(127),和发送的消息("start","stop","warning","camera"等)。

    创建按钮g

    4.设置图传,现版本图传在FPSView中,同样这里最重要的是开启图传,把接收端口设置为9921(如脚本),注意接收端口可以更改,
    但是不可以小于四位数,不可以是已经被占用的端口(检查你的会话界面)

    图传设置

    5.去试一下吧!

    同样你可以设置一个会话界面,生成各种自定义的弹窗,这里就不一一介绍了。

    本教程到此就结束:)

    APP下载

    在手机上安装U型遥控器

    相关文章

      网友评论

        本文标题:自己动手编写一个熊孩子监视器(U型遥控器,Python 3,多线

        本文链接:https://www.haomeiwen.com/subject/ewwbihtx.html