测试号
微信公众号平台的说明:https://developers.weixin.qq.com/doc/offiaccount/Getting_Started/Overview.html
去注册一个测试号:测试号注册的网页
服务器上
前面的准备工作可以看:阿里云windows服务器配置
服务器可以到这里买:
参考了https://blog.csdn.net/u013205877/article/details/77602853 ,但是原文基于python2,这里已经修改了所有必要的东西。
- 另外有一个专门的框架:https://github.com/offu/WeRoBot
- 一个课上的抢票代码:https://github.com/THUCampus/WeChatTicket/tree/master/wechat
前提:已经打开80端口,已经装好Python,最好还要准备一个域名绑定到这个服务器的公网IP上。
到Windows服务器中,创建一个你想要放你自己的项目的文件夹(这里我的文件夹路径是C:\Users\Administrator\Documents\mp),打开这个文件夹,地址栏输入powershell回车进入
PS C:\Users\Administrator\Documents\mp> python -m venv ./venv
PS C:\Users\Administrator\Documents\mp> .\venv\Scripts\activate
(venv) PS C:\Users\Administrator\Documents\mp> pip install Django==3.0.5
(venv) PS C:\Users\Administrator\Documents\mp> django-admin startproject mprobot
(venv) PS C:\Users\Administrator\Documents\mp> cd mprobot
(venv) PS C:\Users\Administrator\Documents\mp\mprobot> django-admin startapp wechat
之后用VSCode打开这个创建出来的mprobot文件夹。
修改mprobot\settings.py,在INSTALLED_APPS里面加入'wechat',
,修改ALLOWED_HOSTS = ['*']
修改mprobot\urls.py,内容是:
from django.contrib import admin
from django.urls import path
from wechat.views import *
urlpatterns = [
path('admin/', admin.site.urls),
path('wechat/', weixin_main)
]
打开wechat目录下views.py文件,输入如下内容:
import hashlib
import json
from django.utils.encoding import smart_str
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponse
#微信服务器推送消息是xml的,根据利用ElementTree来解析出的不同xml内容返回不同的回复信息,就实现了基本的自动回复功能了,也可以按照需求用其他的XML解析方法
import xml.etree.ElementTree as ET
#django默认开启csrf防护,这里使用@csrf_exempt去掉防护
@csrf_exempt
def weixin_main(request):
if request.method == "GET":
#接收微信服务器get请求发过来的参数
signature = request.GET.get('signature', None)
timestamp = request.GET.get('timestamp', None)
nonce = request.GET.get('nonce', None)
echostr = request.GET.get('echostr', None)
#服务器配置中的token
token = 'C0GtrfPTCfY7cBCn94rj'
#把参数放到list中排序后合成一个字符串,再用sha1加密得到新的字符串与微信发来的signature对比,如果相同就返回echostr给服务器,校验通过
hashlist = [token, timestamp, nonce]
hashlist.sort()
hashstr = ''.join([s for s in hashlist])
hashstr = hashlib.sha1(bytes(hashstr, encoding='utf-8')).hexdigest()
if hashstr == signature:
print("OK")
return HttpResponse(echostr)
else:
print("HHH")
return HttpResponse("field")
else:
othercontent = autoreply(request)
return HttpResponse(othercontent)
def autoreply(request):
try:
webData = request.body
xmlData = ET.fromstring(webData)
msg_type = xmlData.find('MsgType').text
ToUserName = xmlData.find('ToUserName').text
FromUserName = xmlData.find('FromUserName').text
CreateTime = xmlData.find('CreateTime').text
MsgType = xmlData.find('MsgType').text
MsgId = xmlData.find('MsgId').text
toUser = FromUserName
fromUser = ToUserName
if msg_type == 'text':
content = "文件已收到"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
elif msg_type == 'image':
content = "图片已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
elif msg_type == 'voice':
content = "语音已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
elif msg_type == 'video':
content = "视频已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
elif msg_type == 'shortvideo':
content = "小视频已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
elif msg_type == 'location':
content = "位置已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
else:
msg_type == 'link'
content = "链接已收到,谢谢"
replyMsg = TextMsg(toUser, fromUser, content)
return replyMsg.send()
except Exception as e:
return e
class Msg(object):
def __init__(self, xmlData):
self.ToUserName = xmlData.find('ToUserName').text
self.FromUserName = xmlData.find('FromUserName').text
self.CreateTime = xmlData.find('CreateTime').text
self.MsgType = xmlData.find('MsgType').text
self.MsgId = xmlData.find('MsgId').text
import time
class TextMsg(Msg):
def __init__(self, toUserName, fromUserName, content):
self.__dict = dict()
self.__dict['ToUserName'] = toUserName
self.__dict['FromUserName'] = fromUserName
self.__dict['CreateTime'] = int(time.time())
self.__dict['Content'] = content
def send(self):
XmlForm = """
<xml>
<ToUserName><![CDATA[{ToUserName}]]></ToUserName>
<FromUserName><![CDATA[{FromUserName}]]></FromUserName>
<CreateTime>{CreateTime}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{Content}]]></Content>
</xml>
"""
再回到powershell里面,python manage.py rumserver 0.0.0.0:80
之后在测试号平台修改接口配置信息:

下面的JS接口安全域名也顺便改一下,只写域名,不用加wechat
扩展
加上了语义理解(https://open.weixin.qq.com/zh_CN/htmledition/res/assets/smart_lang_protocol.pdf),加了图片回复,并且需要装一下requests库
import hashlib
import json
from django.utils.encoding import smart_str
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponse
#微信服务器推送消息是xml的,根据利用ElementTree来解析出的不同xml内容返回不同的回复信息,就实现了基本的自动回复功能了,也可以按照需求用其他的XML解析方法
import xml.etree.ElementTree as ET
import time
import requests
import json
access_token = ""
expire_time = 0
app_id = 'wxb523466a6324ff01'
secret = '90b355162a8bd1e25dff55eec8cd47c4'
#服务器配置中的token
token = 'C0GtrfPTCfY7cBCn94rj'
def get_access_token():
global access_token, expire_time
now = time.time()
if expire_time <= now:
ans = requests.get("https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={}&secret={}".format(
app_id, secret)).text
ans = json.loads(ans)
access_token = ans["access_token"]
expire_time = ans["expires_in"] + now
return access_token
def send_NLP(text, user):
url = 'https://api.weixin.qq.com/semantic/semproxy/search?access_token={}'.format(get_access_token())
data = {
"query": text,
"city": "北京",
"category": "flight,hotel",
"appid": app_id,
"uid": '' # 2020.04.26晚上尝试填写user会出错
}
ans = requests.post(url, json.dumps(data, ensure_ascii=False).encode())
print(user, type(user))
return ans.text
def send_text(toUserName, fromUserName, content):
XmlForm = """
<xml>
<ToUserName><![CDATA[{ToUserName}]]></ToUserName>
<FromUserName><![CDATA[{FromUserName}]]></FromUserName>
<CreateTime>{CreateTime}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{Content}]]></Content>
</xml>
""".format(ToUserName=toUserName, FromUserName=fromUserName, CreateTime=int(time.time()), Content=content)
return HttpResponse(XmlForm)
def send_img(toUserName, fromUserName, imageID):
XmlForm = """
<xml>
<ToUserName><![CDATA[{ToUserName}]]></ToUserName>
<FromUserName><![CDATA[{FromUserName}]]></FromUserName>
<CreateTime>{CreateTime}</CreateTime>
<MsgType><![CDATA[image]]></MsgType>
<Image>
<MediaId><![CDATA[{Content}]]></MediaId>
</Image>
</xml>
""".format(ToUserName=toUserName, FromUserName=fromUserName, CreateTime=int(time.time()), Content=imageID)
return HttpResponse(XmlForm)
#django默认开启csrf防护,这里使用@csrf_exempt去掉防护
@csrf_exempt
def weixin_main(request):
get_access_token()
if request.method == "GET":
#接收微信服务器get请求发过来的参数
signature = request.GET.get('signature', None)
timestamp = request.GET.get('timestamp', None)
nonce = request.GET.get('nonce', None)
echostr = request.GET.get('echostr', None)
#把参数放到list中排序后合成一个字符串,再用sha1加密得到新的字符串与微信发来的signature对比,如果相同就返回echostr给服务器,校验通过
hashlist = [token, timestamp, nonce]
hashlist.sort()
hashstr = ''.join([s for s in hashlist])
hashstr = hashlib.sha1(bytes(hashstr, encoding='utf-8')).hexdigest()
if hashstr == signature:
print("OK")
return HttpResponse(echostr)
else:
print("HHH")
return HttpResponse("field")
else:
othercontent = autoreply(request)
return HttpResponse(othercontent)
def autoreply(request):
# try:
webData = request.body
xmlData = ET.fromstring(webData)
msg_type = xmlData.find('MsgType').text
ToUserName = xmlData.find('ToUserName').text
FromUserName = xmlData.find('FromUserName').text
CreateTime = xmlData.find('CreateTime').text
MsgType = xmlData.find('MsgType').text
MsgId = xmlData.find('MsgId').text
toUser = FromUserName
fromUser = ToUserName
if msg_type == 'text':
# 价值1个亿的人工智能代码
received_content = xmlData.find('Content').text
content = received_content
if content.startswith("查一下"):
return send_text(toUser, fromUser, send_NLP(content, toUser))
content = content.replace("?", "!")
content = content.replace("?", "!")
content = content.replace("吗", "")
return send_text(toUser, fromUser, content)
elif msg_type == 'image':
# 返回对应的
mediaId = xmlData.find('MediaId').text
return send_img(toUser, fromUser, mediaId)
elif msg_type == 'voice':
content = "语音已收到,谢谢"
return send_text(toUser, fromUser, content)
elif msg_type == 'video':
content = "视频已收到,谢谢"
return send_text(toUser, fromUser, content)
elif msg_type == 'shortvideo':
content = "小视频已收到,谢谢"
return send_text(toUser, fromUser, content)
elif msg_type == 'location':
content = "位置已收到,谢谢"
return send_text(toUser, fromUser, content)
else:
msg_type == 'link'
content = "链接已收到,谢谢"
return send_text(toUser, fromUser, content)
# except Exception as e:
# print("ERROR")
# return e
网友评论