diff options
Diffstat (limited to 'src/hydrilla/proxy/csp.py')
-rw-r--r-- | src/hydrilla/proxy/csp.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/csp.py b/src/hydrilla/proxy/csp.py new file mode 100644 index 0000000..df2f65b --- /dev/null +++ b/src/hydrilla/proxy/csp.py @@ -0,0 +1,196 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# Tools for working with Content Security Policy headers. +# +# This file is part of Hydrilla&Haketilo. +# +# Copyright (C) 2022 Wojtek Kosior +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +# +# +# I, Wojtek Kosior, thereby promise not to sue for violation of this +# file's license. Although I request that you do not make use of this +# code in a proprietary program, I am not going to enforce this in +# court. + +""" +..... +""" + +import re +import typing as t +import dataclasses as dc + +from immutables import Map, MapMutation + +from . import http_messages + + +enforce_header_names = ( + 'content-security-policy', + 'x-content-security-policy', + 'x-webkit-csp' +) + +header_names = (*enforce_header_names, 'content-security-policy-report-only') + +@dc.dataclass +class ContentSecurityPolicy: + directives: Map[str, t.Sequence[str]] + header_name: str = 'Content-Security-Policy' + disposition: str = 'enforce' + + def remove(self, directives: t.Sequence[str]) -> 'ContentSecurityPolicy': + mutation = self.directives.mutate() + + for name in directives: + mutation.pop(name, None) + + return dc.replace(self, directives = mutation.finish()) + + def extend(self, directives: t.Mapping[str, t.Sequence[str]]) \ + -> 'ContentSecurityPolicy': + mutation = self.directives.mutate() + + for name, extras in directives.items(): + if name in mutation: + mutation[name] = (*mutation[name], *extras) + + return dc.replace(self, directives = mutation.finish()) + + def serialize(self) -> tuple[str, str]: + """ + Produces (name, value) pair suitable for use as an HTTP header. + + If a deserialized policy is being reserialized, the resulting value is + not guaranteed to be the same as the original one. It shall be merely + semantically equivalent. + """ + serialized_directives = [] + for name, value_seq in self.directives.items(): + if all(val == "'none'" for val in value_seq): + value_seq = ["'none'"] + else: + value_seq = [val for val in value_seq if val != "'none'"] + + serialized_directives.append(f'{name} {" ".join(value_seq)}') + + return (self.header_name, ';'.join(serialized_directives)) + + @staticmethod + def deserialize( + serialized: str, + header_name: str, + disposition: str = 'enforce' + ) -> 'ContentSecurityPolicy': + """ + Parses the policy as required by W3C Working Draft. + + Extra whitespace information, invalid/empty directives and the order of + directives are not preserved, only the semantically-relevant information + is. + """ + # For more info, see: + # https://www.w3.org/TR/CSP3/#parse-serialized-policy + empty_directives: Map[str, t.Sequence[str]] = Map() + + directives = empty_directives.mutate() + + for serialized_directive in serialized.split(';'): + if not serialized_directive.isascii(): + continue + + serialized_directive = serialized_directive.strip() + if len(serialized_directive) == 0: + continue + + tokens = serialized_directive.split() + directive_name = tokens.pop(0).lower() + directive_value = tokens + + # Specs mention giving warnings for duplicate directive names but + # from our proxy's perspective this is not important right now. + if directive_name in directives: + continue + + directives[directive_name] = directive_value + + return ContentSecurityPolicy( + directives = directives.finish(), + header_name = header_name, + disposition = disposition + ) + +# def extract(headers: http_messages.IHeaders) \ +# -> tuple[ContentSecurityPolicy, ...]: +# """....""" +# csp_policies = [] + +# for header_name, disposition in header_names_and_dispositions: +# for serialized_list in headers.get_all(header_name): +# for serialized in serialized_list.split(','): +# policy = ContentSecurityPolicy.deserialize( +# serialized, +# header_name, +# disposition +# ) + +# if policy.directives != Map(): +# csp_policies.append(policy) + +# return tuple(csp_policies) + +def modify( + headers: http_messages.IHeaders, + clear: t.Union[t.Sequence[str], t.Literal['all']] = (), + extend: t.Mapping[str, t.Sequence[str]] = Map(), + add: t.Mapping[str, t.Sequence[str]] = Map(), +) -> http_messages.IHeaders: + """ + This function modifies the CSP Headers. The following actions are performed + *in order* + 1. report-only CSP Headers are removed, + 2. directives with names in `clear` are removed, + 3. directives that could cause CSP reports to be sent are removed, + 4. directives from `add` are added in a separate Content-Security-Policy, + header. + 5. directives from `extend` are merged into the existing directives, + effectively loosening them, + + No measures are yet implemented to prevent fingerprinting when serving HTTP + responses with headers modified by this function. Please use wisely, you + have been warned. + """ + headers_list = [ + (key, val) + for key, val in headers.items() + if key.lower() not in header_names + ] + + if clear != 'all': + for name in header_names: + for serialized_list in headers.get_all(name): + for serialized in serialized_list.split(','): + policy = ContentSecurityPolicy.deserialize(serialized, name) + policy = policy.remove((*clear, 'report-to', 'report-uri')) + policy = policy.extend(extend) + if policy.directives != Map(): + headers_list.append(policy.serialize()) + + if add != Map(): + csp_to_add = ContentSecurityPolicy(Map(add)).extend(extend) + headers_list.append(csp_to_add.serialize()) + + return http_messages.make_headers(headers_list) |