Hi folks,
This is my first collab. It's a modest Python script that :
The idea is to find low hanging fruits based on a high-level criteria.
This is a PoC, so it's not parametrized, commented or well-structured. Sorry about that (I did it in like 30min, lol).
These are the reqs:
This is the code:
This is my first collab. It's a modest Python script that :
- Downloads N GitHub repos based on a search term (you can filter by language, org, starts, etc.)
- Runs three SAST tools: Bandit, Pyright and Semgrep to look for critical vulnerabilities.
- If there are no vulns found, then we delete the repo, else we store the details in a .log file. Dead simple.
The idea is to find low hanging fruits based on a high-level criteria.
This is a PoC, so it's not parametrized, commented or well-structured. Sorry about that (I did it in like 30min, lol).
These are the reqs:
You must reply before you can see the hidden data contained here.
import json
import os
import shutil
import subprocess
from git import Repo
import time
import requests
from tqdm import tqdm
# Reference: https://github.com/settings/tokens
token = "ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxx"
# Reference: https://docs.github.com/en/search-github/searching-on-github/searching-for-repositories
search_query = "forum language:python"
# Reference: https://semgrep.dev/explore
semgrep_command = [
def search_github_repos(query, sort='stars', order='desc', per_page=100, max_pages=1):
headers = {"Authorization": f"token {token}"}
base_url = "https://api.github.com/search/repositories"
repos = []
page = 1
while True:
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page
response = requests.get(base_url, headers=headers, params=params)
response_data = response.json()
if response.status_code != 200:
print(f"Error: {response_data.get('message')}")
page_repos = response_data.get("items", [])
if not page_repos or (max_pages is not None and page > max_pages):
print(f"Found {len(page_repos)} repositories on page {page}.")
page += 1
print(f"Total repositories found: {len(repos)}")
return repos
def run_bandit(repo_path):
result = subprocess.run(['bandit', '-r', repo_path, '-f', 'json', '-l', 'HIGH', '-c', 'MEDIUM'], capture_output=True, text=True)
issues = json.loads(result.stdout).get("results", []) if (result.returncode == 3) else []
return issues
def run_pyright(repo_path):
result = subprocess.run(['pyright', '--outputjson', repo_path], capture_output=True, text=True)
issues = json.loads(result.stdout).get("diagnostics", [])
high_severity_issues = [issue for issue in issues if issue.get("severity") == "warning"]
return high_severity_issues
def run_semgrep(repo_path):
result = subprocess.run(semgrep_command, capture_output=True, text=True)
issues = json.loads(result.stdout).get("results", [])
high_severity_issues = [issue for issue in issues if issue.get("severity") == "WARNING"]
return high_severity_issues
def clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url):
repo_path = os.path.join(os.getcwd(), repo_name)
print(f"Cloning {repo_html_url} to {repo_path}...")
Repo.clone_from(repo_clone_url, repo_path)
print(f"Performing SAST analysis on {repo_name}...")
bandit_issues = run_bandit(repo_path)
pyright_issues = run_pyright(repo_path)
semgrep_issues = run_semgrep(repo_path)
all_issues = bandit_issues + pyright_issues + semgrep_issues
if not all_issues:
print("No high or critical vulnerabilities found. Deleting repo folder.")
print("High or critical vulnerabilities found. Saving logs.")
with open(os.path.join(repo_path, "bandit.log"), "w") as bandit_log, \
open(os.path.join(repo_path, "pyright.log"), "w") as pyright_log, \
open(os.path.join(repo_path, "semgrep.log"), "w") as semgrep_log:
bandit_log.write(json.dumps({"results": bandit_issues}, indent=2))
pyright_log.write(json.dumps({"diagnostics": pyright_issues}, indent=2))
semgrep_log.write(json.dumps({"results": semgrep_issues}, indent=2))
return all_issues
def main():
print(f"Searching {search_query} repos...")
repos = search_github_repos(search_query.replace(' ', '+'))
print(" ")
vuln_repos_count = 0
for repo in tqdm(repos, desc="Processing repos"):
repo_name = repo.get("name")
repo_stars = repo.get("stargazers_count")
repo_html_url = repo.get("html_url")
repo_clone_url = repo.get("clone_url")
print(f"Processing {repo_name} ({repo_stars} stars)...")
has_vulns = clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url)
if has_vulns:
vuln_repos_count += 1
print(" ")
print(f"{vuln_repos_count} repos with vulns")
if __name__ == "__main__":