From b6418bfbfad8fb3f0d9f206163496a10b36a1877 Mon Sep 17 00:00:00 2001 From: "W. Kosior" Date: Tue, 17 Dec 2024 13:16:37 +0100 Subject: Use BeautifulSoup4. Also scrap techniques used. --- scrape_groups_info.py | 142 ++++++++++++++++++++++++++------------------------ 1 file changed, 74 insertions(+), 68 deletions(-) (limited to 'scrape_groups_info.py') diff --git a/scrape_groups_info.py b/scrape_groups_info.py index 2037b85..92219db 100644 --- a/scrape_groups_info.py +++ b/scrape_groups_info.py @@ -5,22 +5,39 @@ # Copyright (C) 2024 Wojtek Kosior from dataclasses import dataclass -from html.parser import HTMLParser from pathlib import Path import sys +import time +from bs4 import BeautifulSoup import requests import yaml mitre_pages_path = Path(".") / "scraping" / "attack.mitre.org" +mitre_request_delay_time = 5 # seconds + +last_mitre_download_time = 0 + def mitre_page_download(path): - response = requests.get('https://attack.mitre.org/' + path) + global last_mitre_download_time + + now = time.time() + time_to_wait = last_mitre_download_time + mitre_request_delay_time - now + if time_to_wait > 0: + time.sleep(time_to_wait) + + URI = 'https://attack.mitre.org/' + path + print(f"Fetching `{URI}'.", file=sys.stderr) + response = requests.get(URI) response.raise_for_status() + + last_mitre_download_time = time.time() + return response.text def mitre_page_get(path): - page_path = mitre_pages_path / path + page_path = (mitre_pages_path / path).with_suffix(".html") if page_path.exists(): return page_path.read_text() else: @@ -35,80 +52,62 @@ class Group: name: str mitre_id: str aliases: list[str] + technique_ids: list[str] - - -class GroupListPageParser(HTMLParser): - def __init__(self, relevant_groups): - super().__init__() - self.relevant_groups = relevant_groups - - self.col_numbers = [-1] - self.current_tags = ["*TOP*"] - - self.collected_groups = {} +@dataclass +class Technique: + name: str + mitre_id: str - self.collecting_new_group() +def get_group_techniques(gid): + techniques = {} - def collecting_new_group(self): - self.current_group_mitre_id = None - self.current_group_name = None - self.current_group_aliases = None + soup = BeautifulSoup(mitre_page_get(f"groups/{gid}/"), features="lxml") - def handle_starttag(self, tag, attrs): - self.current_tags.append(tag) + for row in soup.select("table.techniques-used tbody tr"): + if "sub" in row.attrs.get("class", []): + continue - if tag == "tr": - self.col_numbers.append(-1) - elif tag == "td": - self.col_numbers[-1] += 1 + tid_cell = row.select_one("td:nth-child(2)") + tid = tid_cell.select_one("a").get_text(strip=True) + name_cell_idx = 1 + int(tid_cell.attrs.get("colspan", 1)) + name = row.select_one(f"td:nth-child({name_cell_idx}) a:first-child")\ + .get_text(strip=True) - def handle_data(self, data): - if self.current_tags[-1] == "a" and self.col_numbers[-1] == 0: - self.current_group_mitre_id = data.strip() - elif self.current_tags[-1] == "a" and self.col_numbers[-1] == 1: - self.current_group_name = data.strip() - elif self.current_tags[-1] == "td" and self.col_numbers[-1] == 2: - data = data.strip() - if data: - self.current_group_aliases = data.split(", ") - else: - self.current_group_aliases = [] + if not name or not tid: + print("Incomplete data for group's technique.", file=sys.stderr) + else: + techniques[tid] = Technique(name, tid) - def handle_endtag(self, tag): - self.current_tags.pop() + return techniques - if tag == "tr": - self.col_numbers.pop() +def get_groups_and_techniques(relevant_names): + groups = {} + all_techniques = {} - if self.current_group_name is None or \ - self.current_group_mitre_id is None or \ - self.current_group_aliases is None: - print("Incomplete data for group.", file=sys.stderr) - return + soup = BeautifulSoup(mitre_page_get("groups/"), features="lxml") - if self.current_group_name not in self.relevant_groups: - print(f"Ignoring group `{self.current_group_name}'", - file=sys.stderr) - return + for row in soup.select("tbody tr"): + gid = row.select_one("td:nth-child(1) a").get_text(strip=True) + name = row.select_one("td:nth-child(2) a").get_text(strip=True) - if self.current_group_name in self.collected_groups: - print(f"Double definition of group `{self.current_group_name}'", - file=sys.stderr) - return + if not name or not gid: + print("Incomplete data for group.", file=sys.stderr) + elif name not in relevant_names: + print(f"Ignoring group `{name}'", file=sys.stderr) + elif name in groups: + print(f"Double definition of group `{name}'", file=sys.stderr) + else: + aliases = row.select_one("td:nth-child(3)").get_text().split(", ") + aliases = list(filter(None, [alias.strip() for alias in aliases])) - self.collected_groups[self.current_group_name] = Group( - self.current_group_name, - self.current_group_mitre_id, - self.current_group_aliases - ) + techniques = get_group_techniques(gid) + all_techniques.update(techniques) - self.collecting_new_group() + groups[name] = Group(name, gid, aliases, + [t.mitre_id for t in techniques.values()]) -def get_groups(names): - parser = GroupListPageParser(names) - parser.feed(mitre_page_get("groups/")) - return parser.collected_groups + return groups, all_techniques def get_group_names(profiles_path): def group_names(inp): @@ -122,9 +121,16 @@ def get_group_names(profiles_path): if __name__ == "__main__": group_names = get_group_names(None if len(sys.argv) < 2 else sys.argv[1]) - groups = get_groups(group_names) - missing_group_names = group_names.difference(groups) - if missing_group_names: - print(f"No data found for group(s): {', '.join(sorted(missing_group_names))}", + groups, techniques = get_groups_and_techniques(group_names) + missing_names = group_names.difference(groups) + + if missing_names: + print(f"No data found for group(s): {', '.join(sorted(missing_names))}", file=sys.stderr) - yaml.safe_dump([group.__dict__ for group in groups.values()], sys.stdout) + + out_obj = { + "groups": [g.__dict__ for g in groups.values()], + "techniques": [t.__dict__ for t in techniques.values()] + } + + yaml.safe_dump(out_obj, sys.stdout) -- cgit v1.2.3