from gmssl import sm2 import requests import time import json import hashlib import pprint import datetime pay_url = "http://192.144.212.211:13657/pay" search_url = "http://192.144.212.211:13657/search" balance_url = "http://192.144.212.211:13657/balance" callback_url = "http://192.144.212.211:13657/callback" # 测试用 秘钥组1 脚本加密用 pk1 = "04865e672fa71ab6e37429fd731800792c1015cf4e936bd01ec0092b7da571fb63cb99cca8104b35243175d3f535784f6127af451bed43b5b929db6b864de3b207" sk1 = "b73190a4fa4f6786fa36a35c1c16e533943fb5951a40c6b9e2d06b5a19d5c19a" # 测试用 秘钥组2 脚本解密用 pk2 = "04f87981e38e98c5d400daed3e3dbf0a77e622860923b37ab94622c435401604a2e23f7c2ddb656f0b7735ca1f36cc203e6ac8fe47d6bc15f3739608e81b424c04" sk2 = "6907ee4516db5e2f04eef3976551f2c8ead793648edf37b82035013527b3ee42" sinKey = "ZctLTt2tugU2ntZdVgMXssrGuENPH6V7b7ROYN88Msw=" pub_head = { "appId": "CESHI", "method": "sd.tovc.recharge", "signType": "md5", "sign": "", "timestamp": "", "version": "v1.0", "requestId": str(int(time.time()*1000)), "bizContent": "" } def encrypt(data: str): en = sm2.CryptSM2(private_key="", public_key=pk1) r = en.encrypt(data.encode("utf-8")) # result = bytearray([0x4]) result = bytearray() result.extend(r) # type:ignore return result.hex() # type:ignore def decrypt(data: str): en = sm2.CryptSM2(private_key=sk2, public_key="") r = en.decrypt(data.encode("utf-8")) return r.hex() # type:ignore def sign_head(head: dict): keys = list(head.keys()) keys.sort() ks = [] for k in keys: v = head[k].strip() if k != "sign" and v: ks.append(f"{k}={v}") ks.append(f"key={sinKey}") s = "&".join(ks) m5 = hashlib.md5(s.encode("utf-8")) sign = m5.hexdigest() head["sign"] = sign def do_post(url, data): data = json.dumps(data, ensure_ascii=False) head = pub_head.copy() now = datetime.datetime.now() head["timestamp"] = f"{now:%Y-%m-%d %H:%M:%S}" head["requestId"] = str(int(time.time()*1000)) head["bizContent"] = encrypt(data) sign_head(head) pprint.pprint(data) r = requests.post(url, json=head) result = r.json() return result def test_pay(): pay_body = { "gameUserId": "13856023633", "serverType": "tel", "productNo": "YDHF10", "spOrderId": str(int(time.time()*1000)), "source": "SD" } result = do_post(pay_url, pay_body) print(result) def test_search(): search_body = { "spOrderId": "1679680499000" } result = do_post(search_url, search_body) print(result) def test_balance(): data = { "source": "SD" } result = do_post(balance_url, data) print(result) test_pay() test_search() test_balance() s = '''{\"HEADER\":{\"VERSION\":\"V1.1\",\"TIMESTAMP\":\"20230325132234425\",\"SEQNO\":\"1679721754404\",\"APPID\":\"jtceshi\",\"SECERTKEY\":\"3FF6CDABE28AFE275197768302AACBB4\"},\"MSGBODY\":{\"CONTENT\":{\"SIGN\":\"1FE2F722E6B4C4CD2CF458067A3A0EF9\",\"TIMESTAMP\":\"1679721754000\"}}}''' # pprint.pprint(json.loads(s)) appId = "jtceshi" appSecret = "5f1b8349873841e3afe9c6ec91c9847e" def test_account(): now = datetime.datetime.now() h_time = now.strftime("%Y%m%d%H%M%S%f") seqno = '1679720124150' version = 'V1.1' h_hash = hashlib.md5( f"{h_time}{seqno}{appId}{appSecret}".encode("utf-8")).hexdigest() b_time = str(int(now.timestamp()*1000)) b_hash = hashlib.md5(f"{appSecret}{b_time}".encode("utf-8")).hexdigest() data = { 'HEADER': { 'APPID': appId, 'SECERTKEY': h_hash, 'SEQNO': seqno, 'TIMESTAMP': h_time, 'VERSION': version }, 'MSGBODY': { 'CONTENT': { 'SIGN': f"{b_hash}a", 'TIMESTAMP': b_time } } } data = {'HEADER': {'APPID': 'jtceshi', 'SECERTKEY': '3FF6CDABE28AFE275197768302AACBB4', 'SEQNO': '1679721754404', 'TIMESTAMP': '20230325132234425', 'VERSION': 'V1.1'}, 'MSGBODY': {'CONTENT': {'SIGN': '1FE2F722E6B4C4CD2CF458067A3A0EF9', 'TIMESTAMP': '1679721754000'}}} pprint.pprint(data) url = "http://192.144.212.211:36000/gateway/flowservice/queryAccount.ws" r = requests.post(url, json=data) pprint.pprint(r.json()) # sign="86D85A1C092D38AAFBB86784138A5117" # timestamp="1679721020000" # b_hash = hashlib.md5(f"{appSecret}{timestamp}".encode("utf-8")).hexdigest() # print(b_hash.upper() == sign) # test_account() def aaa(): data = "{\"HEADER\":{\"VERSION\":\"V1.1\",\"TIMESTAMP\":\"20230325134408538\",\"SEQNO\":\"1679723048517\",\"APPID\":\"jtceshi\",\"SECERTKEY\":\"D1219C9DD84CEA29686EC17435234A6D\"},\"MSGBODY\":{\"CONTENT\":{\"SIGN\":\"88E0B2DEDBF62439FADD205D7A7F7E2B\",\"TIMESTAMP\":\"1679723048000\"}}}" url = "http://192.144.212.211:36000/gateway/flowservice/queryAccount.ws" r = requests.post(url, data=data, headers={"Content-Type": "application/json"}) pprint.pprint(r.json()) # aaa()