Last active
April 11, 2022 00:51
-
-
Save mkmark/91e2a8a3b848a064773d75df597e7071 to your computer and use it in GitHub Desktop.
苏康码自动健康申报
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% import | |
import json | |
import datetime | |
import dateutil.tz | |
import logging | |
import io | |
import requests | |
import urllib.parse | |
import pickle | |
import os | |
import atexit | |
# %% logger | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
handler_console = logging.StreamHandler() | |
handler_console.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
handler_console.setFormatter(formatter) | |
log_stringIO = io.StringIO() | |
handler_string = logging.StreamHandler(log_stringIO) | |
handler_string.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(message)s') | |
handler_string.setFormatter(formatter) | |
logger.addHandler(handler_console) | |
logger.addHandler(handler_string) | |
# %% jsstm | |
class Jsstm(): | |
def __init__(self): | |
self.session = requests.session() | |
if os.path.exists('jsstm_cookie'): | |
with open('jsstm_cookie', 'rb') as f: | |
self.session.cookies.update(pickle.load(f)) | |
def get_headers(self): | |
headers = { | |
'Origin': 'https://jsstm.jszwfw.gov.cn', | |
'Accept': 'application/json, text/javascript, */*; q=0.01', | |
'X-Requested-With': 'XMLHttpRequest', | |
'User-Agent': {{User-Agent}}, | |
'Referer': 'https://jsstm.jszwfw.gov.cn/jkmIndex.html?token={{token}}&jszwuserid={{token2}}&uuid={{token3}}', | |
'x-mass-tappid': '2018062060350751', | |
'Accept-Language': 'en-US', | |
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', | |
'Accept-Encoding': 'gzip', | |
'Host': 'jsstm.jszwfw.gov.cn', | |
'Connection': 'Keep-Alive' | |
} | |
return(headers) | |
def http_post(self, job, url, data): | |
logger.debug('http_post start: %s', job) | |
response = self.session.post(url, data, headers=self.get_headers()) | |
logger.debug('http_post end: %s', job) | |
if (response.status_code == 200): | |
logger.info('http_post success: %s', job) | |
else: | |
logger.info('http_post fail: %s', job) | |
exit() | |
return(response) | |
def userAuth(self, data): | |
job = 'userAuth' | |
url = 'https://jsstm.jszwfw.gov.cn/jkm/userAuth' | |
response = self.http_post(job, url, data) | |
return(response) | |
def get_urlabc(self, response): | |
response_dict = json.loads(response.text) | |
abc = response_dict['res']['userdetail']['abc'] | |
urlabc = urllib.parse.quote(abc) | |
return(urlabc) | |
def queryAttendanceList(self, data): | |
job = 'queryAttendanceList' | |
url = 'https://jsstm.jszwfw.gov.cn/day/queryAttendanceList' | |
response = self.http_post(job, url, data) | |
return(response) | |
def get_last_attendance(self, response): | |
response_dict = json.loads(response.text) | |
timestamp = response_dict['res']['list'][0]['createtime'] | |
utc = datetime.datetime.utcfromtimestamp(timestamp) | |
from_zone = dateutil.tz.gettz('UTC') | |
to_zone = dateutil.tz.gettz('China/Shang_Hai') | |
utc = utc.replace(tzinfo=from_zone) | |
utcp8 = utc.astimezone(to_zone) | |
utcp8_str = utcp8.strftime('%Y-%m-%d %H:%M:%S') | |
logger.info('last attendance: %s', utcp8_str) | |
return(utcp8_str) | |
def saveDailyAttendence(self, data): | |
job = 'saveDailyAttendence' | |
url = 'https://jsstm.jszwfw.gov.cn/day/saveDailyAttendance' | |
response = self.http_post(job, url, data) | |
# %% telegram_bot | |
class Telegram_bot(): | |
proxies = { | |
'http': '{{proxy}}' | |
} | |
def ping(self, token, chat_id, text): | |
url = 'https://api.telegram.org/bot' + token + '/sendMessage?chat_id=' + chat_id + '&text=' + text | |
requests.get(url, proxies=self.proxies) | |
# %% | |
def exit_handler(): | |
# telegram_bot | |
telegram_bot = Telegram_bot() | |
token = '{{telegram_token}}' | |
chat_id = '{{telegram_chat_id}}' | |
telegram_bot.ping(token, chat_id, log_stringIO.getvalue()) | |
atexit.register(exit_handler) | |
if __name__ == '__main__': | |
jsstm = Jsstm() | |
# login | |
data_userAuth = 'token={{token}}&uuid={{token2}}&source=alipay'.encode('utf-8') | |
response = jsstm.userAuth(data_userAuth) | |
urlabc = jsstm.get_urlabc(response) | |
# saveDailyAttendence | |
data_saveDailyAttendence = ('member_id={{身份证}}&name={{url姓名}}&mobile={{手机号}}&idType=1°ree=37.3&realtimeLocation=&source=alipay°ree_flag=0&r1data=0&r2data=0&r3data=0&travel=0&r5data=&travel_destination=&travel_time=&travel_duration=&travel_destination_code=&other=&abc=' + urlabc).encode('utf-8') | |
jsstm.saveDailyAttendence(data_saveDailyAttendence) | |
# queryAttendanceList | |
data_queryAttendanceList = ('userId={{身份证}}&abc=' + urlabc).encode('utf-8') | |
response = jsstm.queryAttendanceList(data_queryAttendanceList) | |
utcp8_str = jsstm.get_last_attendance(response) | |
# save cookie accross sessions | |
with open('jsstm_cookie', 'wb') as f: | |
pickle.dump(jsstm.session.cookies, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment