summaryrefslogtreecommitdiff
path: root/scrape_groups_info.py
diff options
context:
space:
mode:
authorW. Kosior <koszko@koszko.org>2024-12-16 19:41:24 +0100
committerW. Kosior <koszko@koszko.org>2024-12-16 19:41:24 +0100
commit973d4b6fc1232ae36b865f91204e8198233d5484 (patch)
tree98ddc1e2592ce0dafb9520bf4ca7d0aed3c15cdb /scrape_groups_info.py
downloadAGH-threat-intel-course-973d4b6fc1232ae36b865f91204e8198233d5484.tar.gz
AGH-threat-intel-course-973d4b6fc1232ae36b865f91204e8198233d5484.zip
Initial commit.
Diffstat (limited to 'scrape_groups_info.py')
-rw-r--r--scrape_groups_info.py127
1 files changed, 127 insertions, 0 deletions
diff --git a/scrape_groups_info.py b/scrape_groups_info.py
new file mode 100644
index 0000000..549f872
--- /dev/null
+++ b/scrape_groups_info.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: CC0-1.0
+#
+# Copyright (C) 2024 Wojtek Kosior <koszko@koszko.org>
+
+from dataclasses import dataclass
+from html.parser import HTMLParser
+from pathlib import Path
+import sys
+
+import requests
+import yaml
+
+mitre_pages_path = Path(".") / "scraping" / "attack.mitre.org"
+profiles_path = Path('./profiles.yaml')
+
+def mitre_page_download(path):
+ response = requests.get('https://attack.mitre.org/' + path)
+ response.raise_for_status()
+ return response.text
+
+def mitre_page_get(path):
+ page_path = mitre_pages_path / path
+ if page_path.exists():
+ return page_path.read_text()
+ else:
+ if not page_path.parent.exists():
+ page_path.parent.mkdir(parents=True)
+ page_text = mitre_page_download(path)
+ page_path.write_text(page_text)
+ return page_text
+
+@dataclass
+class Group:
+ name: str
+ mitre_id: str
+ aliases: list[str]
+
+
+
+class GroupListPageParser(HTMLParser):
+ def __init__(self, relevant_groups):
+ super().__init__()
+ self.relevant_groups = relevant_groups
+
+ self.col_numbers = [-1]
+ self.current_tags = ["*TOP*"]
+
+ self.collected_groups = {}
+
+ self.collecting_new_group()
+
+ def collecting_new_group(self):
+ self.current_group_mitre_id = None
+ self.current_group_name = None
+ self.current_group_aliases = None
+
+ def handle_starttag(self, tag, attrs):
+ self.current_tags.append(tag)
+
+ if tag == "tr":
+ self.col_numbers.append(-1)
+ elif tag == "td":
+ self.col_numbers[-1] += 1
+
+ def handle_data(self, data):
+ if self.current_tags[-1] == "a" and self.col_numbers[-1] == 0:
+ self.current_group_mitre_id = data.strip()
+ elif self.current_tags[-1] == "a" and self.col_numbers[-1] == 1:
+ self.current_group_name = data.strip()
+ elif self.current_tags[-1] == "td" and self.col_numbers[-1] == 2:
+ data = data.strip()
+ if data:
+ self.current_group_aliases = data.split(", ")
+ else:
+ self.current_group_aliases = []
+
+ def handle_endtag(self, tag):
+ self.current_tags.pop()
+
+ if tag == "tr":
+ self.col_numbers.pop()
+
+ if self.current_group_name is None or \
+ self.current_group_mitre_id is None or \
+ self.current_group_aliases is None:
+ print("Incomplete data for group.", file=sys.stderr)
+ return
+
+ if self.current_group_name not in self.relevant_groups:
+ print(f"Ignoring group `{self.current_group_name}'",
+ file=sys.stderr)
+ return
+
+ if self.current_group_name in self.collected_groups:
+ print(f"Double definition of group `{self.current_group_name}'",
+ file=sys.stderr)
+ return
+
+ self.collected_groups[self.current_group_name] = Group(
+ self.current_group_name,
+ self.current_group_mitre_id,
+ self.current_group_aliases
+ )
+
+ self.collecting_new_group()
+
+def get_groups(names):
+ parser = GroupListPageParser(names)
+ parser.feed(mitre_page_get("groups/"))
+ return parser.collected_groups
+
+def get_group_names(profiles_path):
+ def group_names(inp):
+ return {group["name"] for group in yaml.safe_load(inp)["groups"]}
+
+ if profiles_path:
+ with open(profiles_path) as inp:
+ return group_names(inp)
+
+ return group_names(sys.stdin)
+
+if __name__ == "__main__":
+ group_names = get_group_names(None if len(sys.argv) < 2 else sys.argv[1])
+ groups = get_groups(group_names)
+ yaml.safe_dump([group.__dict__ for group in groups.values()], sys.stdout)