diff options
author | W. Kosior <koszko@koszko.org> | 2024-12-16 19:41:24 +0100 |
---|---|---|
committer | W. Kosior <koszko@koszko.org> | 2024-12-16 19:41:24 +0100 |
commit | 973d4b6fc1232ae36b865f91204e8198233d5484 (patch) | |
tree | 98ddc1e2592ce0dafb9520bf4ca7d0aed3c15cdb /scrape_groups_info.py | |
download | AGH-threat-intel-course-973d4b6fc1232ae36b865f91204e8198233d5484.tar.gz AGH-threat-intel-course-973d4b6fc1232ae36b865f91204e8198233d5484.zip |
Initial commit.
Diffstat (limited to 'scrape_groups_info.py')
-rw-r--r-- | scrape_groups_info.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/scrape_groups_info.py b/scrape_groups_info.py new file mode 100644 index 0000000..549f872 --- /dev/null +++ b/scrape_groups_info.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: CC0-1.0 +# +# Copyright (C) 2024 Wojtek Kosior <koszko@koszko.org> + +from dataclasses import dataclass +from html.parser import HTMLParser +from pathlib import Path +import sys + +import requests +import yaml + +mitre_pages_path = Path(".") / "scraping" / "attack.mitre.org" +profiles_path = Path('./profiles.yaml') + +def mitre_page_download(path): + response = requests.get('https://attack.mitre.org/' + path) + response.raise_for_status() + return response.text + +def mitre_page_get(path): + page_path = mitre_pages_path / path + if page_path.exists(): + return page_path.read_text() + else: + if not page_path.parent.exists(): + page_path.parent.mkdir(parents=True) + page_text = mitre_page_download(path) + page_path.write_text(page_text) + return page_text + +@dataclass +class Group: + name: str + mitre_id: str + aliases: list[str] + + + +class GroupListPageParser(HTMLParser): + def __init__(self, relevant_groups): + super().__init__() + self.relevant_groups = relevant_groups + + self.col_numbers = [-1] + self.current_tags = ["*TOP*"] + + self.collected_groups = {} + + self.collecting_new_group() + + def collecting_new_group(self): + self.current_group_mitre_id = None + self.current_group_name = None + self.current_group_aliases = None + + def handle_starttag(self, tag, attrs): + self.current_tags.append(tag) + + if tag == "tr": + self.col_numbers.append(-1) + elif tag == "td": + self.col_numbers[-1] += 1 + + def handle_data(self, data): + if self.current_tags[-1] == "a" and self.col_numbers[-1] == 0: + self.current_group_mitre_id = data.strip() + elif self.current_tags[-1] == "a" and self.col_numbers[-1] == 1: + self.current_group_name = data.strip() + elif self.current_tags[-1] == "td" and self.col_numbers[-1] == 2: + data = data.strip() + if data: + self.current_group_aliases = data.split(", ") + else: + self.current_group_aliases = [] + + def handle_endtag(self, tag): + self.current_tags.pop() + + if tag == "tr": + self.col_numbers.pop() + + if self.current_group_name is None or \ + self.current_group_mitre_id is None or \ + self.current_group_aliases is None: + print("Incomplete data for group.", file=sys.stderr) + return + + if self.current_group_name not in self.relevant_groups: + print(f"Ignoring group `{self.current_group_name}'", + file=sys.stderr) + return + + if self.current_group_name in self.collected_groups: + print(f"Double definition of group `{self.current_group_name}'", + file=sys.stderr) + return + + self.collected_groups[self.current_group_name] = Group( + self.current_group_name, + self.current_group_mitre_id, + self.current_group_aliases + ) + + self.collecting_new_group() + +def get_groups(names): + parser = GroupListPageParser(names) + parser.feed(mitre_page_get("groups/")) + return parser.collected_groups + +def get_group_names(profiles_path): + def group_names(inp): + return {group["name"] for group in yaml.safe_load(inp)["groups"]} + + if profiles_path: + with open(profiles_path) as inp: + return group_names(inp) + + return group_names(sys.stdin) + +if __name__ == "__main__": + group_names = get_group_names(None if len(sys.argv) < 2 else sys.argv[1]) + groups = get_groups(group_names) + yaml.safe_dump([group.__dict__ for group in groups.values()], sys.stdout) |