summaryrefslogtreecommitdiff
path: root/scrape_groups_info.py
blob: ac6956bc7ccbed5e27218387bd2dfac541b613c2 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3

# SPDX-License-Identifier: CC0-1.0
#
# Copyright (C) 2024 Wojtek Kosior <koszko@koszko.org>

from dataclasses import dataclass
from pathlib import Path
import sys
import time

from bs4 import BeautifulSoup
import requests
import yaml

mitre_pages_path = Path(".") / "scraping" / "attack.mitre.org"

mitre_request_delay_time = 5 # seconds

last_mitre_download_time = 0

def mitre_page_download(path):
    global last_mitre_download_time

    now = time.time()
    time_to_wait = last_mitre_download_time + mitre_request_delay_time - now
    if time_to_wait > 0:
        time.sleep(time_to_wait)

    URI = 'https://attack.mitre.org/' + path
    print(f"Fetching `{URI}'.", file=sys.stderr)
    response = requests.get(URI)
    response.raise_for_status()

    last_mitre_download_time = time.time()

    return response.text

def mitre_page_get(path):
    page_path = (mitre_pages_path / path).with_suffix(".html")
    if page_path.exists():
        return page_path.read_text()
    else:
        if not page_path.parent.exists():
            page_path.parent.mkdir(parents=True)
        page_text = mitre_page_download(path)
        page_path.write_text(page_text)
        return page_text

@dataclass
class Group:
    name: str
    mitre_id: str
    aliases: list[str]
    technique_ids: list[str]

@dataclass
class Technique:
    name: str
    mitre_id: str

def get_group_techniques(gid):
    techniques = {}

    soup = BeautifulSoup(mitre_page_get(f"groups/{gid}/"), features="lxml")

    for row in soup.select("table.techniques-used tbody tr"):
        if "sub" in row.attrs.get("class", []):
            continue

        tid_cell = row.select_one("td:nth-child(2)")
        tid = tid_cell.select_one("a").get_text(strip=True)
        name_cell_idx = 1 + int(tid_cell.attrs.get("colspan", 1))
        name = row.select_one(f"td:nth-child({name_cell_idx}) a:first-child")\
                  .get_text(strip=True)

        if not name or not tid:
            print("Incomplete data for group's technique.", file=sys.stderr)
        else:
            techniques[tid] = Technique(name, tid)

    return techniques

def get_groups_and_techniques(relevant_names):
    groups = {}
    all_techniques = {}

    soup = BeautifulSoup(mitre_page_get("groups/"), features="lxml")

    for row in soup.select("tbody tr"):
        gid = row.select_one("td:nth-child(1) a").get_text(strip=True)
        name = row.select_one("td:nth-child(2) a").get_text(strip=True)

        if not name or not gid:
            print("Incomplete data for group.", file=sys.stderr)
        elif name not in relevant_names:
            print(f"Ignoring group `{name}'", file=sys.stderr)
        elif name in groups:
            print(f"Double definition of group `{name}'", file=sys.stderr)
        else:
            aliases = row.select_one("td:nth-child(3)").get_text().split(", ")
            aliases = list(filter(None, [alias.strip() for alias in aliases]))

            techniques = get_group_techniques(gid)
            all_techniques.update(techniques)

            groups[name] = Group(name, gid, aliases,
                                 [t.mitre_id for t in techniques.values()])

    return groups, all_techniques

def get_profiles_data(profiles_path):
    if profiles_path:
        with open(profiles_path) as inp:
            return yaml.safe_load(inp)

    return yaml.safe_load(sys.stdin)

if __name__ == "__main__":
    profiles_data = get_profiles_data(None if len(sys.argv) < 2
                                      else sys.argv[1])
    group_profiles = dict((g["name"],g) for g in profiles_data["groups"])
    groups, techniques = get_groups_and_techniques(group_profiles)
    missing_names = set(group_profiles).difference(groups)

    if missing_names:
        print(f"No data found for group(s): {', '.join(sorted(missing_names))}",
              file=sys.stderr)

    for name, group in groups.items():
        group_profiles[name].update(group.__dict__)

    profiles_data["groups"] = list(group_profiles.values())
    profiles_data["techniques"] = [t.__dict__ for t in techniques.values()]

    yaml.safe_dump(profiles_data, sys.stdout)