#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import os import time import requests import traceback import tempfile import shutil import hashlib import json requests.packages.urllib3.disable_warnings() class GithubClient: def __init__(self, token): self.url = 'https://api.github.com' self.headers = { 'Authorization': f'Bearer {token}', 'Connection': 'close', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.119 Safari/537.36' } self.limit = 0 self.users_octocat() def connect(self, method, resource, data=None): '''访问api''' time.sleep(0.1) if method == 'GET': r = requests.get('{0}{1}'.format( self.url, resource), params=data, headers=self.headers, verify=False, allow_redirects=False) elif method == 'POST': r = requests.post('{0}{1}'.format( self.url, resource), data=data, headers=self.headers, verify=False, allow_redirects=False) r.encoding = r.apparent_encoding if 'X-RateLimit-Remaining' in r.headers.keys(): self.limit = int(r.headers['X-RateLimit-Remaining']) try: return r.status_code, r.headers, r.json() except: return r.status_code, r.headers, r.content def search_code(self, keyword, page=1, per_page=10): '''搜索代码''' try: time.sleep(2) data = {'q': keyword, 'sort': 'indexed', 'order': 'desc', 'page': page, 'per_page': per_page} _, _, rs = self.connect("GET", '/search/code', data=data) return rs except: return {} def search_repositories(self, keyword, page=1, per_page=10): '''搜索项目''' try: time.sleep(2) data = {'q': keyword, 'sort': 'updated', 'order': 'desc', 'page': page, 'per_page': per_page} _, _, rs = self.connect("GET", '/search/repositories', data=data) return rs except: return {} def repos(self, author, repo): '''项目信息''' try: _, _, rs = self.connect("GET", f'/repos/{author}/{repo}') return rs except: return {} def repos_commits(self, author, repo): '''项目commit信息''' try: _, _, rs = self.connect( "GET", f'/repos/{author}/{repo}/commits') if isinstance(rs, dict): if rs.get('message', '') == 'Moved Permanently' and 'url' in rs: _, _, rs1 = self.connect("GET", rs['url'][18:]) if isinstance(rs1, list): return rs1 elif isinstance(rs, list): return rs except: pass return [] def repos_releases_latest(self, author, repo): '''项目最新release''' try: _, _, rs = self.connect( "GET", f'/repos/{author}/{repo}/releases/latest') return rs except: return {} def users_octocat(self): '''检查速率限制''' try: _, _, _ = self.connect( "GET", '/users/octocat') except: pass def clone_repo(url): temp_dir = tempfile.TemporaryDirectory().name if not os.path.exists(temp_dir): os.makedirs(temp_dir) os.chdir(temp_dir) os.system('git clone {}'.format(url)) return os.path.join(temp_dir, url[19:].split('/', 1)[1]) if __name__ == '__main__': # 更新历史 data = {} data_file = 'data.json' if os.path.exists(data_file): try: data = json.loads(open(data_file, 'r', encoding='utf8').read()) except: with open(data_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) else: with open(data_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) # 项目主页 html_urls = [] gc = GithubClient(os.getenv('GH_TOKEN')) # 搜索项目 try: rs = gc.search_repositories("goby", page=1, per_page=100) html_urls += [item['html_url'] for item in rs.get('items', []) if item.get('html_url')] except: traceback.print_exc() # 本地路径 root_path = os.path.dirname(os.path.abspath(__file__)) # 搜索代码,获取项目主页 try: rs = gc.search_code("GobyQuery+language:Go", page=1, per_page=100) html_urls += [item['repository']['html_url'] for item in rs.get('items', []) if item.get('repository', {}).get('html_url')] except: traceback.print_exc() try: rs = gc.search_code("GobyQuery+language:Json", page=1, per_page=100) html_urls += [item['repository']['html_url'] for item in rs.get('items', []) if item.get('repository', {}).get('html_url')] except: traceback.print_exc() html_urls = set(html_urls) print(f'[+] html_urls: {len(html_urls)}') # 克隆项目代码并复制poc for url in html_urls: print(url) try: repo_path = clone_repo(url) if not os.path.exists(repo_path): continue for root, _, files in os.walk(repo_path): for file in files: if not file.endswith('.go') and not file.endswith('.json'): continue file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf8') as f: content = f.read() if 'GobyQuery' in content and 'ScanSteps' in content: md5 = hashlib.md5( open(file_path, 'rb').read()).hexdigest() if md5 not in data: shutil.copyfile(file_path, os.path.join( root_path, 'poc', file)) data[md5] = {'name': file, 'from': url, "up_time": time.strftime( "%Y-%m-%d %H:%M:%S")} except: traceback.print_exc() except: traceback.print_exc() os.chdir(root_path) # 清理无效data md5s = [] for file in os.listdir(os.path.join(root_path, 'poc')): if not file.endswith('.go') and not file.endswith('.json'): continue md5 = hashlib.md5( open(os.path.join(root_path, 'poc', file), 'rb').read()).hexdigest() md5s.append(md5) for md5 in [md5 for md5 in data.keys() if md5 not in md5s]: del data[md5] # 写入README.md readme_md = '## goby poc (共{}个) 最近一次检查时间 {}\n'.format( len(data.keys()), time.strftime("%Y-%m-%d %H:%M:%S")) readme_md += '### 收集记录\n| 文件名称 | 收录时间 |\n| :----| :---- |\n' _data = sorted(data.values(), key=lambda x: x['up_time'], reverse=True) for item in _data: readme_md += '| [{}]({}) | {} |\n'.format(item['name'], item['from'], item['up_time']) with open('README.md', 'w', encoding='utf8') as f: f.write(readme_md) # 写入data with open(data_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4)