霸王茶姬小程序任务脚本
小白操作----仅供学习研究参考
功能: 积分签到
解析
该脚本用于“霸王茶姬小程序”的签到和积分查询操作。通过模拟网络请求登录账号,获取个人信息,执行每日签到,并查询积分情况。支持多账号操作,并能通过微信推送签到结果和相关信息。
RUN这个类是脚本的核心,负责执行各种任务。
-
init: 初始化类,设置用户信息、会话和请求头。
-
personal_info: 获取并验证用户的个人信息,如用户名和手机号。
-
user_sign_statistics: 获取用户的签到信息,并根据签到状态执行签到操作。
-
take_part_in_sign: 执行签到操作并获取签到奖励。
-
points_info: 查询用户的积分信息,包括即将过期的积分。
-
main: 主流程控制,依次执行登录、获取用户信息、签到和查询积分等操作。
-
sendMsg: 发送微信推送通知消息。
辅助函数
-
Log: 记录日志并更新消息内容。
-
down_file: 下载文件并进行必要的文件操作。
-
import_Tools: 导入工具模块并初始化环境。
主函数负责脚本的总体运行流程,包括加载配置、检查环境、执行任务等。它会导入必要的工具模块,并根据配置执行多账号的签到操作。
定时任务
脚本通过定时任务 (cron) 在每天的1点30分运行,自动执行签到和相关操作。
该脚本的主要功能是自动化执行“霸王茶姬小程序”的签到和积分查询操作,通过模拟登录、获取用户信息、签到和积分查询等步骤,实现对签到任务的自动化处理,并通过微信推送通知用户签到结果。
import osimport requestsfrom requests.packages.urllib3.exceptions import InsecureRequestWarning# import CHERWIN_TOOLS# 禁用安全请求警告requests.packages.urllib3.disable_warnings(InsecureRequestWarning)IS_DEV = Falseif os.path.isfile('DEV_ENV.py'): import DEV_ENV IS_DEV = Trueif os.path.isfile('notify.py'): from notify import send print("加载通知服务成功!")else: print("加载通知服务失败!")send_msg = ''one_msg=''def Log(cont=''): global send_msg,one_msg print(cont) if cont: one_msg += f'{cont}/n' send_msg += f'{cont}/n'class RUN: def __init__(self,info,index): global one_msg one_msg = '' split_info = info.split('@') self.token = split_info[0] len_split_info = len(split_info) last_info = split_info[len_split_info - 1] self.send_UID = None if len_split_info > 0 and "UID_" in last_info: self.send_UID = last_info self.index = index + 1 Log(f"/n---------开始执行第{self.index}个账号>>>>>") self.s = requests.session() self.s.verify = False self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555', 'work-wechat-userid': '', 'multi-store-id': '', 'gdt-vid': '', 'qz-gtd': '', 'scene': '1006', 'Qm-From': 'wechat', 'store-id': '49006', 'Qm-User-Token': self.token, 'channelCode': '', 'Qm-From-Type': 'catering', 'promotion-code': '', 'work-staff-name': '', 'work-staff-id': '', 'Accept': 'v=1.0', 'Accept-Encoding': 'gzip,compress,br,deflate', 'Referer': 'https://servicewechat.com/wxafec6f8422cb357b/87/page-frame.html' } self.s.headers.update(self.headers) self.appid = 'wxafec6f8422cb357b' self.activity_id='947079313798000641' def personal_info(self): personal_info_valid = False try: # 请求的参数 params = {'appid': self.appid} # 发送GET请求 response = self.s.get('https://webapi.qmai.cn/web/catering/crm/personal-info', json=params) result = response.json() # 检查请求是否成功 if result.get('code','-1') == '0': personal_info_valid = True # 提取个人信息 mobile_phone = result['data']['mobilePhone'] if 'data' in result and 'mobilePhone' in result[ 'data'] else None self.mobile_phone = mobile_phone[:3] + "*" * 4 + mobile_phone[7:] self.name = result['data']['name'] if 'data' in result and 'name' in result['data'] else None Log(f"账号[{self.index}]登陆成功!/n用户名:【{self.name}】 /n手机号:【{self.mobile_phone}】") else: # 如果请求不成功,则打印错误信息 message = result.get('message', '') Log(f'登录失败: {message}') except Exception as e: # 捕获任何异常并打印 print(e) finally: # 最终返回请求是否成功的标志 return personal_info_valid def user_sign_statistics(self): try: json_data = { 'activityId': self.activity_id, 'appid': self.appid } # Send the POST request response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/userSignStatistics', json=json_data) result = response.json() status_code = response.status_code # Check if the request was successful if result.get('code', status_code) == 0: data = result.get('data', {}) sign_days = data.get('signDays', '') sign_status = data.get('signStatus', 0) == 1 Log(f'新版签到今天{"已" if sign_status else "未"}签到, 已连续签到{sign_days}天') if not sign_status: self.take_part_in_sign() return sign_status, sign_days else: message = result.get('message', '') Log(f'查询新版签到失败: {message}') return False, 0 except Exception as e: print(e) return False, 0 def take_part_in_sign(self): try: json_data = { 'activityId': self.activity_id, 'appid': self.appid } response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/takePartInSign', json=json_data) result = response.json() status_code = response.status_code if result.get('code', status_code) == 0: data = result.get('data',{}) rewardDetailList = data.get('rewardDetailList',[{}]) if rewardDetailList: rewardName = rewardDetailList[0].get('rewardName','') sendNum = rewardDetailList[0].get('sendNum','') Log(f'新版签到成功,获得【{sendNum}】{rewardName}') return True else: Log(f'签到失败:【{result.get("message","")}】') return True else: message = result.get('message', '') Log(f'新版签到失败: {message}') return False except Exception as e: print(e) return False def points_info(self): try: json_data = { 'appid': self.appid } response = self.s.post('https://webapi.qmai.cn/web/catering/crm/points-info', json=json_data) result = response.json() status_code = response.status_code if result.get('code', status_code) == '0': data = result.get('data', {}) soon_expired_points = data.get('soonExpiredPoints', 0) total_points = data.get('totalPoints', 0) expired_time = data.get('expiredTime', '') if soon_expired_points: Log(f'有【{soon_expired_points}】积分将于( {expired_time})过期') Log(f'当前积分: 【{total_points}】') return total_points, soon_expired_points, expired_time else: message = result.get('message', '') Log(f'查询积分失败: {message}') return None except Exception as e: print(e) return False def main(self): if not self.personal_info() : Log("用户信息无效,请更新CK") self.sendMsg() return False self.user_sign_statistics() self.points_info() self.sendMsg() return True def sendMsg(self, help=False): if self.send_UID: push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help) print(push_res)def down_file(filename, file_url): print(f'开始下载:{filename},下载地址:{file_url}') try: response = requests.get(file_url, verify=False, timeout=10) response.raise_for_status() with open(filename + '.tmp', 'wb') as f: f.write(response.content) print(f'【{filename}】下载完成!') # 检查临时文件是否存在 temp_filename = filename + '.tmp' if os.path.exists(temp_filename): # 删除原有文件 if os.path.exists(filename): os.remove(filename) # 重命名临时文件 os.rename(temp_filename, filename) print(f'【{filename}】重命名成功!') return True else: print(f'【{filename}】临时文件不存在!') return False except Exception as e: print(f'【{filename}】下载失败:{str(e)}') return Falsedef import_Tools(): global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode import CHERWIN_TOOLS ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)if __name__ == '__main__': APP_NAME = '霸王茶姬小程序' ENV_NAME = 'BWCJ' CK_NAME = 'qm-user-token' print(f'''{APP_NAME}签到
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途。