Hi folks,
This is my first collab. It's a modest Python script that :
The idea is to find low hanging fruits based on a high-level criteria.
This is a PoC, so it's not parametrized, commented or well-structured. Sorry about that (I did it in like 30min, lol).
These are the reqs:
This is the code:
This is my first collab. It's a modest Python script that :
- Downloads N GitHub repos based on a search term (you can filter by language, org, starts, etc.)
- Runs three SAST tools: Bandit, Pyright and Semgrep to look for critical vulnerabilities.
- If there are no vulns found, then we delete the repo, else we store the details in a .log file. Dead simple.
The idea is to find low hanging fruits based on a high-level criteria.
This is a PoC, so it's not parametrized, commented or well-structured. Sorry about that (I did it in like 30min, lol).
These are the reqs:
You must reply before you can see the hidden data contained here.
Code:
import json
import os
import shutil
import subprocess
from git import Repo
import time
import requests
from tqdm import tqdm
# Reference: https://github.com/settings/tokens
token = "ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxx"
# Reference: https://docs.github.com/en/search-github/searching-on-github/searching-for-repositories
search_query = "forum language:python"
# Reference: https://semgrep.dev/explore
semgrep_command = [
'semgrep',
'--config',"p/gitleaks",
'--config',"p/security-code-scan",
'--config',"p/docker-compose",
'--config',"p/insecure-transport",
'--config',"p/r2c-security-audit",
'--config',"p/flask",
'--config',"p/django",
'--config',"p/security-audit",
'--config',"p/python",
'--config',"p/default",
'--config',"p/owasp-top-ten",
'--config',"p/cwe-top-25",
'--config',"p/jwt",
'--config',"p/sql-injection",
'--config',"p/secrets",
'--config',"p/bandit",
'--config',"p/brakeman",
'--config',"p/findsecbugs",
'--config',"p/flawfinder",
'--config',"p/command-injection",
'--json'
]
def search_github_repos(query, sort='stars', order='desc', per_page=100, max_pages=1):
headers = {"Authorization": f"token {token}"}
base_url = "https://api.github.com/search/repositories"
repos = []
page = 1
while True:
params = {
"q": query,
"sort": sort,
"order": order,
"per_page": per_page,
"page": page
}
response = requests.get(base_url, headers=headers, params=params)
response_data = response.json()
if response.status_code != 200:
print(f"Error: {response_data.get('message')}")
break
page_repos = response_data.get("items", [])
if not page_repos or (max_pages is not None and page > max_pages):
break
print(f"Found {len(page_repos)} repositories on page {page}.")
repos.extend(page_repos)
page += 1
time.sleep(0.5)
print(f"Total repositories found: {len(repos)}")
return repos
def run_bandit(repo_path):
result = subprocess.run(['bandit', '-r', repo_path, '-f', 'json', '-l', 'HIGH', '-c', 'MEDIUM'], capture_output=True, text=True)
issues = json.loads(result.stdout).get("results", []) if (result.returncode == 3) else []
return issues
def run_pyright(repo_path):
result = subprocess.run(['pyright', '--outputjson', repo_path], capture_output=True, text=True)
issues = json.loads(result.stdout).get("diagnostics", [])
high_severity_issues = [issue for issue in issues if issue.get("severity") == "warning"]
return high_severity_issues
def run_semgrep(repo_path):
semgrep_command.append(repo_path)
result = subprocess.run(semgrep_command, capture_output=True, text=True)
issues = json.loads(result.stdout).get("results", [])
high_severity_issues = [issue for issue in issues if issue.get("severity") == "WARNING"]
return high_severity_issues
def clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url):
repo_path = os.path.join(os.getcwd(), repo_name)
print(f"Cloning {repo_html_url} to {repo_path}...")
Repo.clone_from(repo_clone_url, repo_path)
print(f"Performing SAST analysis on {repo_name}...")
bandit_issues = run_bandit(repo_path)
pyright_issues = run_pyright(repo_path)
semgrep_issues = run_semgrep(repo_path)
all_issues = bandit_issues + pyright_issues + semgrep_issues
if not all_issues:
print("No high or critical vulnerabilities found. Deleting repo folder.")
shutil.rmtree(repo_path)
else:
print("High or critical vulnerabilities found. Saving logs.")
with open(os.path.join(repo_path, "bandit.log"), "w") as bandit_log, \
open(os.path.join(repo_path, "pyright.log"), "w") as pyright_log, \
open(os.path.join(repo_path, "semgrep.log"), "w") as semgrep_log:
bandit_log.write(json.dumps({"results": bandit_issues}, indent=2))
pyright_log.write(json.dumps({"diagnostics": pyright_issues}, indent=2))
semgrep_log.write(json.dumps({"results": semgrep_issues}, indent=2))
return all_issues
def main():
print(f"Searching {search_query} repos...")
repos = search_github_repos(search_query.replace(' ', '+'))
print(" ")
vuln_repos_count = 0
for repo in tqdm(repos, desc="Processing repos"):
repo_name = repo.get("name")
repo_stars = repo.get("stargazers_count")
repo_html_url = repo.get("html_url")
repo_clone_url = repo.get("clone_url")
print(f"Processing {repo_name} ({repo_stars} stars)...")
has_vulns = clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url)
if has_vulns:
vuln_repos_count += 1
print(" ")
print(f"{vuln_repos_count} repos with vulns")
print("Bye")
if __name__ == "__main__":
main()