Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)

2024-06-04 5931阅读

下载安装 wcferry 库

通过 pip 快速安装 wcferry

pip install wcferry

免责声明:仅供学习和技术研究使用,不得用于任何商业或非法行为,否则后果自负。

基本原理

当微信收到消息时,抢在微信处理(显示到页面)前,先让工具处理,处理完之后再交还给原来的处理模块。需要发送消息时,模拟微信发送消息,组装好消息体,调用微信发送消息的模块。获取联系人,则是遍历一块特定的内存空间。通过好友验证,则是组装好验证信息,调用微信的验证模块。数据库相关功能,则是通过获取到数据库句柄,基于 sqlite3 的接口来执行。

Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中) 第1张

博客地址:微信机器人 DIY 从 0 到 1

Wcf 函数说明

WeChatFerry:一个玩微信的工具(函数说明)

函数名称描述返回类型
cleanup关闭连接 回收资源None
keep_running阻塞进程(让 RPC 一直维持连接)
is_receiving_msg是否已启动接收消息功能bool
get_qrcode获取登录二维码(已经登录则返回空字符串)str
is_login检查登录状态bool
get_self_wxid获取登录账号的 wxidstr
get_msg_types获取所有消息类型Dict
get_contacts获取所有联系人List[Dict]
get_friends获取所有好友List[Dict]
get_dbs获取数据库List[str]
get_tables获取某数据库下的表List[Dict]
get_user_info获取登录账号个人信息Dict
get_audio_msg取语音消息并转成 MP3str
send_text发送文本消息(可 @)int
_download_file下载文件str
_process_path处理路径(如果是网络路径则下载文件)str
send_image发送图片(非线程安全)int
send_file发送文件(非线程安全)int
send_xml发送 XMLint
send_emotion发送表情int
send_rich_text发送富文本消息int
send_pat_msg拍一拍群友int
forward_msg转发消息int
get_msg从消息队列中获取消息WxMsg
enable_receiving_msg允许接收消息bool
enable_recv_msg允许接收消息(旧接口)bool
disable_recv_msg停止接收消息bool
query_sql执行 SQL 查询List[Dict]
accept_new_friend接受好友申请int
refresh_pyq刷新朋友圈int
download_attach下载附件int
get_info_by_wxid通过 wxid 查询微信号昵称等信息dict
revoke_msg撤回消息int
decrypt_image解密图片str
get_ocr_result获取 OCR 结果str
download_image下载图片str
add_chatroom_members添加群成员int
del_chatroom_members删除群成员int
invite_chatroom_members邀请群成员int
get_chatroom_members获取群成员Dict
get_alias_in_chatroom获取群名片str
receive_transfer接收转账int

电脑端检测登录微信

from wcferry import Wcf
wcf = Wcf()

检测微信登陆状态

检查当前 PC 端微信登陆状态?

from wcferry import Wcf
wcf = Wcf()
print(wcf.is_login())

获取登录账号信息

获取当前 PC 端微信账号信息?

from wcferry import Wcf
wcf = Wcf()
print(wcf.get_user_info())

运行结果

{'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\Users\Administrator\Documents\WeChat Files\'}

开辟线程监听群消息

开启线程监听消息:判断是否是群消息?

from queue import Empty
from threading import Thread
from wcferry import Wcf, WxMsg
wcf = Wcf()
def processMsg(msg: WxMsg):
    if msg.from_group():
        print(msg.content)
def enableReceivingMsg():
    def innerWcFerryProcessMsg():
        while wcf.is_receiving_msg():
            try:
                msg = wcf.get_msg()
                processMsg(msg)
            except Empty:
                continue
            except Exception as e:
                print(f"ERROR: {e}")
    wcf.enable_receiving_msg()
    Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()
enableReceivingMsg()
wcf.keep_running()

微信消息属性说明

class WxMsg() 微信消息属性说明

属性说明

字段名类型描述
typeint消息类型 可通过 get_msg_types 获取
idstr消息 id
xmlstr消息 xml 部分
senderstr消息发送人
roomidstr(仅群消息有)群 id
contentstr消息内容
thumbstr视频或图片消息的缩略图路径
extrastr视频或图片消息的路径

消息类型

from wcferry import Wcf
wcf = Wcf()
print(wcf.get_msg_types())
消息类型编号消息类型描述属性
0朋友圈消息int
1文字int
3图片int
34语音int
37好友确认int
40POSSIBLEFRIEND_MSGint
42名片int
43视频int
47石头剪刀布表情图片
48位置int
49共享实时位置、文件、转账、链接int
50VOIPMSGint
51微信初始化int
52VOIPNOTIFYint
53VOIPINVITEint
62小视频int
66微信红包int
9999SYSNOTICEint
10000红包、系统消息int
10002撤回消息int
1048625搜狗表情int
16777265链接int
436207665微信红包int
536936497红包封面int
754974769视频号视频int
771751985视频号名片int
822083633引用消息int
922746929拍一拍int
973078577视频号直播int
974127153商品链接int
975175729视频号直播int
1040187441音乐链接int
1090519089文件int

根据群名称查询群 wxid

特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。

from wcferry import Wcf, WxMsg
wcf = Wcf()
wcf_rooms = []
for contact in wcf.get_contacts():
    if contact['wxid'].endswith("chatroom"):
        wcf_rooms.append(contact)
def get_chatroom_roomid(wcf_rooms: list, room_name: str):
    for room in wcf_rooms:
        if room['name'] == room_name:
            return room['wxid']
    return None
room_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群")

定时发送群文件

如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from wcferry import Wcf
wcf = Wcf()
wcf_rooms = []
for contact in wcf.get_contacts():
    if contact['wxid'].endswith("chatroom"):
        wcf_rooms.append(contact)
def get_chatroom_roomid(wcf_rooms: list, room_name: str):
    for room in wcf_rooms:
        if room['name'] == room_name:
            return room['wxid']
    return None
def schedule_task_job(room_id: str, wcf: Wcf):
    wcf.send_file(path="test.txt", receiver=room_id)
customize_time = "2024-05-09 09:10:10"
customize_room = "唤醒手腕测试群"
run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S")
room_id = get_chatroom_roomid(wcf_rooms, customize_room)
scheduler = BackgroundScheduler()
scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date)
scheduler.start()
wcf.keep_running()

运行结果

Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中) 第2张

监听保存语音消息

from queue import Empty
from threading import Thread
from wcferry import Wcf, WxMsg
wcf = Wcf()
def processMsg(msg: WxMsg):
    if msg.from_group():
        response = wcf.get_audio_msg(id=msg.id, dir=f"audio")
        print("语音地址:" + response)
def enableReceivingMsg():
    def innerWcFerryProcessMsg():
        while wcf.is_receiving_msg():
            try:
                msg = wcf.get_msg()
                processMsg(msg)
            except Empty:
                continue
            except Exception as e:
                print(f"ERROR: {e}")
    wcf.enable_receiving_msg()
    Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()
enableReceivingMsg()
wcf.keep_running()

更新中······


    免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

    目录[+]