Skip to content

Instantly share code, notes, and snippets.

@jerrylususu
Created July 28, 2024 13:20
Show Gist options
  • Save jerrylususu/7711da4f6e03c08adb2aa89278ca6be8 to your computer and use it in GitHub Desktop.
Save jerrylususu/7711da4f6e03c08adb2aa89278ca6be8 to your computer and use it in GitHub Desktop.
import os
import logging
import requests
import subprocess
from pathlib import Path
from datetime import datetime
import time
import json
import time
# 自定义日志处理类
class CustomHTTPHandler(logging.Handler):
def __init__(self, url):
logging.Handler.__init__(self)
self.url = url
def emit(self, record):
try:
msg = self.format(record)
current_time = int(time.time())
log_data = {
"logs": [{
"contents": {
"source": "myvm",
"time": str(current_time),
"task": "github-bak",
"message": str(msg),
"level": str(record.levelname),
"line_no": str(record.lineno),
"file_name": str(record.filename),
"function_name": str(record.funcName)
},
"time": current_time # 使用当前时间戳
}]
}
json_data = json.dumps(log_data)
headers = {'Content-Type': 'application/json'}
requests.post(self.url, headers=headers, data=json_data)
except Exception:
self.handleError(record)
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 添加自定义日志处理
# TODO: 填入 region, topic id
region = "TODO"
topic_id = "TODO"
url = f"https://ap-{region}.cls.tencentcs.com/tracklog?topic_id=${topic_id}"
custom_handler = CustomHTTPHandler(url)
logging.getLogger().addHandler(custom_handler)
logging.info(f"开始处理")
# 从环境变量中读取GitHub个人访问令牌
GITHUB_PAT = os.getenv('GITHUB_PAT')
if not GITHUB_PAT:
logging.error("GITHUB_PAT环境变量未设置")
exit(1)
# GitHub API URL
GITHUB_API_URL = "https://api.github.com/user"
# 获取用户信息
response = requests.get(GITHUB_API_URL, headers={"Authorization": f"token {GITHUB_PAT}"})
if response.status_code != 200:
logging.error(f"无法获取用户信息,状态码: {response.status_code}")
exit(1)
user_info = response.json()
username = user_info['login']
logging.info(f"登录成功,用户名 {username}")
# 获取用户名下的所有仓库
GITHUB_REPOS_URL = f"https://api.github.com/user/repos"
page = 1
repos = []
while True:
logging.info(f"获取 repo 页码:{page}")
response = requests.get(GITHUB_REPOS_URL, headers={"Authorization": f"token {GITHUB_PAT}"}, params={"per_page": 100, "page": page})
if response.status_code != 200:
logging.error(f"无法获取仓库列表,状态码: {response.status_code}")
exit(1)
page_repos = response.json()
if not page_repos:
break
repos.extend(page_repos)
page += 1
# 过滤出用户拥有的仓库
user_repos = [repo for repo in repos if repo['owner']['login'] == username]
total_repos = len(user_repos)
logging.info(f"repo 总数:{total_repos}")
# 处理每个仓库
for i, repo in enumerate(user_repos, 1):
repo_name = repo['name']
repo_url = f"https://{username}:{GITHUB_PAT}@github.com/{username}/{repo_name}.git"
local_path = Path(repo_name)
logging.info(f"正在处理仓库 {repo_name} ({i}/{total_repos})")
try:
if local_path.exists():
logging.info(f"仓库 {repo_name} 已存在,准备执行 git reset --hard")
start_time = time.time()
result = subprocess.run(['git', '-C', str(local_path), 'reset', '--hard'], capture_output=True, text=True)
end_time = time.time()
logging.info(f"仓库 {repo_name}: git reset --hard 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}")
if result.returncode != 0:
logging.error(f"仓库 {repo_name}: git reset --hard 失败: {result.stderr}")
continue
logging.info(f"仓库 {repo_name}: 准备执行 git pull")
start_time = time.time()
result = subprocess.run(['git', '-C', str(local_path), 'pull'], capture_output=True, text=True)
end_time = time.time()
logging.info(f"仓库 {repo_name}: git pull 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}")
if result.returncode != 0:
logging.error(f"仓库 {repo_name}: git pull 失败: {result.stderr}")
continue
else:
logging.info(f"仓库 {repo_name} 不存在,准备执行 git clone")
start_time = time.time()
result = subprocess.run(['git', 'clone', repo_url, str(local_path)], capture_output=True, text=True)
end_time = time.time()
logging.info(f"仓库 {repo_name}: git clone 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}")
if result.returncode != 0:
logging.error(f"仓库 {repo_name}: git clone 失败: {result.stderr}")
continue
logging.info(f"成功处理仓库 {repo_name}")
except subprocess.CalledProcessError as e:
logging.error(f"处理仓库 {repo_name} 时出错: {e}")
exit(1)
logging.info("所有仓库处理完毕")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment