霸王茶姬小程序任务脚本

小程序 0

霸王茶姬小程序任务脚本

小白操作----仅供学习研究参考

在这里插入图片描述

功能: 积分签到

解析

该脚本用于“霸王茶姬小程序”的签到和积分查询操作。通过模拟网络请求登录账号,获取个人信息,执行每日签到,并查询积分情况。支持多账号操作,并能通过微信推送签到结果和相关信息。

RUN这个类是脚本的核心,负责执行各种任务。

  • init: 初始化类,设置用户信息、会话和请求头。

  • personal_info: 获取并验证用户的个人信息,如用户名和手机号。

  • user_sign_statistics: 获取用户的签到信息,并根据签到状态执行签到操作。

  • take_part_in_sign: 执行签到操作并获取签到奖励。

  • points_info: 查询用户的积分信息,包括即将过期的积分。

  • main: 主流程控制,依次执行登录、获取用户信息、签到和查询积分等操作。

  • sendMsg: 发送微信推送通知消息。

辅助函数

  • Log: 记录日志并更新消息内容。

  • down_file: 下载文件并进行必要的文件操作。

  • import_Tools: 导入工具模块并初始化环境。

主函数负责脚本的总体运行流程,包括加载配置、检查环境、执行任务等。它会导入必要的工具模块,并根据配置执行多账号的签到操作。

定时任务

脚本通过定时任务 (cron) 在每天的1点30分运行,自动执行签到和相关操作。

该脚本的主要功能是自动化执行“霸王茶姬小程序”的签到和积分查询操作,通过模拟登录、获取用户信息、签到和积分查询等步骤,实现对签到任务的自动化处理,并通过微信推送通知用户签到结果。

import osimport requestsfrom requests.packages.urllib3.exceptions import InsecureRequestWarning# import CHERWIN_TOOLS# 禁用安全请求警告requests.packages.urllib3.disable_warnings(InsecureRequestWarning)IS_DEV = Falseif os.path.isfile('DEV_ENV.py'):    import DEV_ENV    IS_DEV = Trueif os.path.isfile('notify.py'):    from notify import send    print("加载通知服务成功!")else:    print("加载通知服务失败!")send_msg = ''one_msg=''def Log(cont=''):    global send_msg,one_msg    print(cont)    if cont:        one_msg += f'{cont}/n'        send_msg += f'{cont}/n'class RUN:    def __init__(self,info,index):        global one_msg        one_msg = ''        split_info = info.split('@')        self.token = split_info[0]        len_split_info = len(split_info)        last_info = split_info[len_split_info - 1]        self.send_UID = None        if len_split_info > 0 and "UID_" in last_info:            self.send_UID = last_info        self.index = index + 1        Log(f"/n---------开始执行第{self.index}个账号>>>>>")        self.s = requests.session()        self.s.verify = False        self.headers = {            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555',            'work-wechat-userid': '',            'multi-store-id': '',            'gdt-vid': '',            'qz-gtd': '',            'scene': '1006',            'Qm-From': 'wechat',            'store-id': '49006',            'Qm-User-Token': self.token,            'channelCode': '',            'Qm-From-Type': 'catering',            'promotion-code': '',            'work-staff-name': '',            'work-staff-id': '',            'Accept': 'v=1.0',            'Accept-Encoding': 'gzip,compress,br,deflate',            'Referer': 'https://servicewechat.com/wxafec6f8422cb357b/87/page-frame.html'        }        self.s.headers.update(self.headers)        self.appid = 'wxafec6f8422cb357b'        self.activity_id='947079313798000641'    def personal_info(self):        personal_info_valid = False        try:            # 请求的参数            params = {'appid': self.appid}            # 发送GET请求            response = self.s.get('https://webapi.qmai.cn/web/catering/crm/personal-info', json=params)            result = response.json()            # 检查请求是否成功            if result.get('code','-1') == '0':                personal_info_valid = True                # 提取个人信息                mobile_phone = result['data']['mobilePhone'] if 'data' in result and 'mobilePhone' in result[                    'data'] else None                self.mobile_phone = mobile_phone[:3] + "*" * 4 + mobile_phone[7:]                self.name = result['data']['name'] if 'data' in result and 'name' in result['data'] else None                Log(f"账号[{self.index}]登陆成功!/n用户名:【{self.name}】 /n手机号:【{self.mobile_phone}】")            else:                # 如果请求不成功,则打印错误信息                message = result.get('message', '')                Log(f'登录失败: {message}')        except Exception as e:            # 捕获任何异常并打印            print(e)        finally:            # 最终返回请求是否成功的标志            return personal_info_valid    def user_sign_statistics(self):        try:            json_data = {                'activityId': self.activity_id,                'appid': self.appid            }            # Send the POST request            response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/userSignStatistics', json=json_data)            result = response.json()            status_code = response.status_code            # Check if the request was successful            if result.get('code', status_code) == 0:                data = result.get('data', {})                sign_days = data.get('signDays', '')                sign_status = data.get('signStatus', 0) == 1                Log(f'新版签到今天{"已" if sign_status else "未"}签到, 已连续签到{sign_days}天')                if not sign_status:                    self.take_part_in_sign()                return sign_status, sign_days            else:                message = result.get('message', '')                Log(f'查询新版签到失败: {message}')                return False, 0        except Exception as e:            print(e)            return False, 0    def take_part_in_sign(self):        try:            json_data = {                'activityId': self.activity_id,                'appid': self.appid            }            response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/takePartInSign', json=json_data)            result = response.json()            status_code = response.status_code            if result.get('code', status_code) == 0:                data = result.get('data',{})                rewardDetailList = data.get('rewardDetailList',[{}])                if rewardDetailList:                    rewardName = rewardDetailList[0].get('rewardName','')                    sendNum = rewardDetailList[0].get('sendNum','')                    Log(f'新版签到成功,获得【{sendNum}】{rewardName}')                    return True                else:                    Log(f'签到失败:【{result.get("message","")}】')                    return True            else:                message = result.get('message', '')                Log(f'新版签到失败: {message}')                return False        except Exception as e:            print(e)            return False    def points_info(self):        try:            json_data = {                'appid': self.appid            }            response = self.s.post('https://webapi.qmai.cn/web/catering/crm/points-info', json=json_data)            result = response.json()            status_code = response.status_code            if result.get('code', status_code) == '0':                data = result.get('data', {})                soon_expired_points = data.get('soonExpiredPoints', 0)                total_points = data.get('totalPoints', 0)                expired_time = data.get('expiredTime', '')                if soon_expired_points:                    Log(f'有【{soon_expired_points}】积分将于( {expired_time})过期')                Log(f'当前积分: 【{total_points}】')                return total_points, soon_expired_points, expired_time            else:                message = result.get('message', '')                Log(f'查询积分失败: {message}')                return None        except Exception as e:            print(e)            return False    def main(self):        if not self.personal_info() :            Log("用户信息无效,请更新CK")            self.sendMsg()            return False        self.user_sign_statistics()        self.points_info()        self.sendMsg()        return True    def sendMsg(self, help=False):        if self.send_UID:            push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)            print(push_res)def down_file(filename, file_url):    print(f'开始下载:{filename},下载地址:{file_url}')    try:        response = requests.get(file_url, verify=False, timeout=10)        response.raise_for_status()        with open(filename + '.tmp', 'wb') as f:            f.write(response.content)        print(f'【{filename}】下载完成!')        # 检查临时文件是否存在        temp_filename = filename + '.tmp'        if os.path.exists(temp_filename):            # 删除原有文件            if os.path.exists(filename):                os.remove(filename)            # 重命名临时文件            os.rename(temp_filename, filename)            print(f'【{filename}】重命名成功!')            return True        else:            print(f'【{filename}】临时文件不存在!')            return False    except Exception as e:        print(f'【{filename}】下载失败:{str(e)}')        return Falsedef import_Tools():    global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode    import CHERWIN_TOOLS    ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)if __name__ == '__main__':    APP_NAME = '霸王茶姬小程序'    ENV_NAME = 'BWCJ'    CK_NAME = 'qm-user-token'    print(f'''{APP_NAME}签到

在这里插入图片描述

本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途。

也许您对下面的内容还感兴趣: