summaryrefslogtreecommitdiff
path: root/scrape_groups_info.py
diff options
context:
space:
mode:
authorW. Kosior <koszko@koszko.org>2024-12-17 13:16:37 +0100
committerW. Kosior <koszko@koszko.org>2024-12-17 13:16:37 +0100
commitb6418bfbfad8fb3f0d9f206163496a10b36a1877 (patch)
tree03822d438a7c78556f7509a6ef95492315610486 /scrape_groups_info.py
parentb05017f6b51cac86bacabcfb905dd98f998c230a (diff)
downloadAGH-threat-intel-course-b6418bfbfad8fb3f0d9f206163496a10b36a1877.tar.gz
AGH-threat-intel-course-b6418bfbfad8fb3f0d9f206163496a10b36a1877.zip
Use BeautifulSoup4. Also scrap techniques used.
Diffstat (limited to 'scrape_groups_info.py')
-rw-r--r--scrape_groups_info.py142
1 files changed, 74 insertions, 68 deletions
diff --git a/scrape_groups_info.py b/scrape_groups_info.py
index 2037b85..92219db 100644
--- a/scrape_groups_info.py
+++ b/scrape_groups_info.py
@@ -5,22 +5,39 @@
# Copyright (C) 2024 Wojtek Kosior <koszko@koszko.org>
from dataclasses import dataclass
-from html.parser import HTMLParser
from pathlib import Path
import sys
+import time
+from bs4 import BeautifulSoup
import requests
import yaml
mitre_pages_path = Path(".") / "scraping" / "attack.mitre.org"
+mitre_request_delay_time = 5 # seconds
+
+last_mitre_download_time = 0
+
def mitre_page_download(path):
- response = requests.get('https://attack.mitre.org/' + path)
+ global last_mitre_download_time
+
+ now = time.time()
+ time_to_wait = last_mitre_download_time + mitre_request_delay_time - now
+ if time_to_wait > 0:
+ time.sleep(time_to_wait)
+
+ URI = 'https://attack.mitre.org/' + path
+ print(f"Fetching `{URI}'.", file=sys.stderr)
+ response = requests.get(URI)
response.raise_for_status()
+
+ last_mitre_download_time = time.time()
+
return response.text
def mitre_page_get(path):
- page_path = mitre_pages_path / path
+ page_path = (mitre_pages_path / path).with_suffix(".html")
if page_path.exists():
return page_path.read_text()
else:
@@ -35,80 +52,62 @@ class Group:
name: str
mitre_id: str
aliases: list[str]
+ technique_ids: list[str]
-
-
-class GroupListPageParser(HTMLParser):
- def __init__(self, relevant_groups):
- super().__init__()
- self.relevant_groups = relevant_groups
-
- self.col_numbers = [-1]
- self.current_tags = ["*TOP*"]
-
- self.collected_groups = {}
+@dataclass
+class Technique:
+ name: str
+ mitre_id: str
- self.collecting_new_group()
+def get_group_techniques(gid):
+ techniques = {}
- def collecting_new_group(self):
- self.current_group_mitre_id = None
- self.current_group_name = None
- self.current_group_aliases = None
+ soup = BeautifulSoup(mitre_page_get(f"groups/{gid}/"), features="lxml")
- def handle_starttag(self, tag, attrs):
- self.current_tags.append(tag)
+ for row in soup.select("table.techniques-used tbody tr"):
+ if "sub" in row.attrs.get("class", []):
+ continue
- if tag == "tr":
- self.col_numbers.append(-1)
- elif tag == "td":
- self.col_numbers[-1] += 1
+ tid_cell = row.select_one("td:nth-child(2)")
+ tid = tid_cell.select_one("a").get_text(strip=True)
+ name_cell_idx = 1 + int(tid_cell.attrs.get("colspan", 1))
+ name = row.select_one(f"td:nth-child({name_cell_idx}) a:first-child")\
+ .get_text(strip=True)
- def handle_data(self, data):
- if self.current_tags[-1] == "a" and self.col_numbers[-1] == 0:
- self.current_group_mitre_id = data.strip()
- elif self.current_tags[-1] == "a" and self.col_numbers[-1] == 1:
- self.current_group_name = data.strip()
- elif self.current_tags[-1] == "td" and self.col_numbers[-1] == 2:
- data = data.strip()
- if data:
- self.current_group_aliases = data.split(", ")
- else:
- self.current_group_aliases = []
+ if not name or not tid:
+ print("Incomplete data for group's technique.", file=sys.stderr)
+ else:
+ techniques[tid] = Technique(name, tid)
- def handle_endtag(self, tag):
- self.current_tags.pop()
+ return techniques
- if tag == "tr":
- self.col_numbers.pop()
+def get_groups_and_techniques(relevant_names):
+ groups = {}
+ all_techniques = {}
- if self.current_group_name is None or \
- self.current_group_mitre_id is None or \
- self.current_group_aliases is None:
- print("Incomplete data for group.", file=sys.stderr)
- return
+ soup = BeautifulSoup(mitre_page_get("groups/"), features="lxml")
- if self.current_group_name not in self.relevant_groups:
- print(f"Ignoring group `{self.current_group_name}'",
- file=sys.stderr)
- return
+ for row in soup.select("tbody tr"):
+ gid = row.select_one("td:nth-child(1) a").get_text(strip=True)
+ name = row.select_one("td:nth-child(2) a").get_text(strip=True)
- if self.current_group_name in self.collected_groups:
- print(f"Double definition of group `{self.current_group_name}'",
- file=sys.stderr)
- return
+ if not name or not gid:
+ print("Incomplete data for group.", file=sys.stderr)
+ elif name not in relevant_names:
+ print(f"Ignoring group `{name}'", file=sys.stderr)
+ elif name in groups:
+ print(f"Double definition of group `{name}'", file=sys.stderr)
+ else:
+ aliases = row.select_one("td:nth-child(3)").get_text().split(", ")
+ aliases = list(filter(None, [alias.strip() for alias in aliases]))
- self.collected_groups[self.current_group_name] = Group(
- self.current_group_name,
- self.current_group_mitre_id,
- self.current_group_aliases
- )
+ techniques = get_group_techniques(gid)
+ all_techniques.update(techniques)
- self.collecting_new_group()
+ groups[name] = Group(name, gid, aliases,
+ [t.mitre_id for t in techniques.values()])
-def get_groups(names):
- parser = GroupListPageParser(names)
- parser.feed(mitre_page_get("groups/"))
- return parser.collected_groups
+ return groups, all_techniques
def get_group_names(profiles_path):
def group_names(inp):
@@ -122,9 +121,16 @@ def get_group_names(profiles_path):
if __name__ == "__main__":
group_names = get_group_names(None if len(sys.argv) < 2 else sys.argv[1])
- groups = get_groups(group_names)
- missing_group_names = group_names.difference(groups)
- if missing_group_names:
- print(f"No data found for group(s): {', '.join(sorted(missing_group_names))}",
+ groups, techniques = get_groups_and_techniques(group_names)
+ missing_names = group_names.difference(groups)
+
+ if missing_names:
+ print(f"No data found for group(s): {', '.join(sorted(missing_names))}",
file=sys.stderr)
- yaml.safe_dump([group.__dict__ for group in groups.values()], sys.stdout)
+
+ out_obj = {
+ "groups": [g.__dict__ for g in groups.values()],
+ "techniques": [t.__dict__ for t in techniques.values()]
+ }
+
+ yaml.safe_dump(out_obj, sys.stdout)