aboutsummaryrefslogtreecommitdiff
path: root/src/hydrilla/proxy/csp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/hydrilla/proxy/csp.py')
-rw-r--r--src/hydrilla/proxy/csp.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/src/hydrilla/proxy/csp.py b/src/hydrilla/proxy/csp.py
new file mode 100644
index 0000000..df2f65b
--- /dev/null
+++ b/src/hydrilla/proxy/csp.py
@@ -0,0 +1,196 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# Tools for working with Content Security Policy headers.
+#
+# This file is part of Hydrilla&Haketilo.
+#
+# Copyright (C) 2022 Wojtek Kosior
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+#
+#
+# I, Wojtek Kosior, thereby promise not to sue for violation of this
+# file's license. Although I request that you do not make use of this
+# code in a proprietary program, I am not going to enforce this in
+# court.
+
+"""
+.....
+"""
+
+import re
+import typing as t
+import dataclasses as dc
+
+from immutables import Map, MapMutation
+
+from . import http_messages
+
+
+enforce_header_names = (
+ 'content-security-policy',
+ 'x-content-security-policy',
+ 'x-webkit-csp'
+)
+
+header_names = (*enforce_header_names, 'content-security-policy-report-only')
+
+@dc.dataclass
+class ContentSecurityPolicy:
+ directives: Map[str, t.Sequence[str]]
+ header_name: str = 'Content-Security-Policy'
+ disposition: str = 'enforce'
+
+ def remove(self, directives: t.Sequence[str]) -> 'ContentSecurityPolicy':
+ mutation = self.directives.mutate()
+
+ for name in directives:
+ mutation.pop(name, None)
+
+ return dc.replace(self, directives = mutation.finish())
+
+ def extend(self, directives: t.Mapping[str, t.Sequence[str]]) \
+ -> 'ContentSecurityPolicy':
+ mutation = self.directives.mutate()
+
+ for name, extras in directives.items():
+ if name in mutation:
+ mutation[name] = (*mutation[name], *extras)
+
+ return dc.replace(self, directives = mutation.finish())
+
+ def serialize(self) -> tuple[str, str]:
+ """
+ Produces (name, value) pair suitable for use as an HTTP header.
+
+ If a deserialized policy is being reserialized, the resulting value is
+ not guaranteed to be the same as the original one. It shall be merely
+ semantically equivalent.
+ """
+ serialized_directives = []
+ for name, value_seq in self.directives.items():
+ if all(val == "'none'" for val in value_seq):
+ value_seq = ["'none'"]
+ else:
+ value_seq = [val for val in value_seq if val != "'none'"]
+
+ serialized_directives.append(f'{name} {" ".join(value_seq)}')
+
+ return (self.header_name, ';'.join(serialized_directives))
+
+ @staticmethod
+ def deserialize(
+ serialized: str,
+ header_name: str,
+ disposition: str = 'enforce'
+ ) -> 'ContentSecurityPolicy':
+ """
+ Parses the policy as required by W3C Working Draft.
+
+ Extra whitespace information, invalid/empty directives and the order of
+ directives are not preserved, only the semantically-relevant information
+ is.
+ """
+ # For more info, see:
+ # https://www.w3.org/TR/CSP3/#parse-serialized-policy
+ empty_directives: Map[str, t.Sequence[str]] = Map()
+
+ directives = empty_directives.mutate()
+
+ for serialized_directive in serialized.split(';'):
+ if not serialized_directive.isascii():
+ continue
+
+ serialized_directive = serialized_directive.strip()
+ if len(serialized_directive) == 0:
+ continue
+
+ tokens = serialized_directive.split()
+ directive_name = tokens.pop(0).lower()
+ directive_value = tokens
+
+ # Specs mention giving warnings for duplicate directive names but
+ # from our proxy's perspective this is not important right now.
+ if directive_name in directives:
+ continue
+
+ directives[directive_name] = directive_value
+
+ return ContentSecurityPolicy(
+ directives = directives.finish(),
+ header_name = header_name,
+ disposition = disposition
+ )
+
+# def extract(headers: http_messages.IHeaders) \
+# -> tuple[ContentSecurityPolicy, ...]:
+# """...."""
+# csp_policies = []
+
+# for header_name, disposition in header_names_and_dispositions:
+# for serialized_list in headers.get_all(header_name):
+# for serialized in serialized_list.split(','):
+# policy = ContentSecurityPolicy.deserialize(
+# serialized,
+# header_name,
+# disposition
+# )
+
+# if policy.directives != Map():
+# csp_policies.append(policy)
+
+# return tuple(csp_policies)
+
+def modify(
+ headers: http_messages.IHeaders,
+ clear: t.Union[t.Sequence[str], t.Literal['all']] = (),
+ extend: t.Mapping[str, t.Sequence[str]] = Map(),
+ add: t.Mapping[str, t.Sequence[str]] = Map(),
+) -> http_messages.IHeaders:
+ """
+ This function modifies the CSP Headers. The following actions are performed
+ *in order*
+ 1. report-only CSP Headers are removed,
+ 2. directives with names in `clear` are removed,
+ 3. directives that could cause CSP reports to be sent are removed,
+ 4. directives from `add` are added in a separate Content-Security-Policy,
+ header.
+ 5. directives from `extend` are merged into the existing directives,
+ effectively loosening them,
+
+ No measures are yet implemented to prevent fingerprinting when serving HTTP
+ responses with headers modified by this function. Please use wisely, you
+ have been warned.
+ """
+ headers_list = [
+ (key, val)
+ for key, val in headers.items()
+ if key.lower() not in header_names
+ ]
+
+ if clear != 'all':
+ for name in header_names:
+ for serialized_list in headers.get_all(name):
+ for serialized in serialized_list.split(','):
+ policy = ContentSecurityPolicy.deserialize(serialized, name)
+ policy = policy.remove((*clear, 'report-to', 'report-uri'))
+ policy = policy.extend(extend)
+ if policy.directives != Map():
+ headers_list.append(policy.serialize())
+
+ if add != Map():
+ csp_to_add = ContentSecurityPolicy(Map(add)).extend(extend)
+ headers_list.append(csp_to_add.serialize())
+
+ return http_messages.make_headers(headers_list)