# SPDX-License-Identifier: AGPL-3.0-or-later
# Main repository logic.
# This file is part of Hydrilla
# Copyright (C) 2021 Wojtek Kosior
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
# I, Wojtek Kosior, thereby promise not to sue for violation of this
# file's license. Although I request that you do not make use this code
# in a proprietary program, I am not going to enforce this in court.
from flask import Flask, Blueprint, current_app, url_for, abort, request, \
from jinja2 import Environment, PackageLoader
import re
from hashlib import sha256
import os
import pathlib
import json
import gettext
import logging
strip_comment_re = re.compile(r'''
^ # match from the beginning of each line
( # catch the part before '//' comment
(?: # this group matches either a string or a single out-of-string character
[^"/] |
(?: # this group matches any in-a-string character
[^"\\] | # match any normal character
\\[^u] | # match any escaped character like '\f' or '\n'
\\u[a-fA-F0-9]{4} # match an escape
# expect either end-of-line or a comment:
# * unterminated strings will cause matching to fail
# * bad comment (with '/' instead of '//') will be indicated by second group
# having length 1 instead of 2 or 0
''', re.VERBOSE)
def strip_json_comments(text):
processed = 0
stripped_text = []
for line in text.split('\n'):
match = strip_comment_re.match(line)
if match is None: # unterminated string
# ignore this error, let json module report it
stripped = line
elif len(match[2]) == 1:
raise json.JSONDecodeError('bad comment', text,
processed + len(match[1]))
stripped = match[1]
processed += len(line) + 1
return '\n'.join(stripped_text)
here = pathlib.Path(__file__).resolve().parent
bp = Blueprint('bp', __package__)
def load_config(config_path):
config = {}
to_load = [config_path]
failures_ok = [False]
while to_load:
path = to_load.pop()
can_fail = failures_ok.pop()
with open(config_path) as config_file:
new_config = json.loads(strip_json_comments(config_file.read()))
except Exception as e:
if can_fail:
raise e from None
for key, failure_ok in [('try_configs', True), ('use_configs', False)]:
paths = new_config.get(key, [])
failures_ok.extend([failure_ok] * len(paths))
for key in ['try_configs', 'use_configs']:
if key in config:
return config
def get_content_file_path(path):
if os.path.sep != '/':
path.replace('/', os.path.sep)
path = pathlib.Path(path)
if path.is_absolute():
raise ValueError(_('path_is_absolute_{}').format(path))
return path
class MyNotImplError(NotImplementedError):
'''Raised when a planned but not-yet-completed feature is used.'''
def __init__(self, what, where):
.format(what=what, where=where))
def normalize_version(ver):
ver is an array of integers. Strip right-most zeroes from ver.
Returns a *new* array. Doesn't modify its argument.
new_len = 0
for i, num in enumerate(ver):
if num != 0:
new_len = i + 1
return ver[:new_len]
def parse_version(ver_str):
Convert ver_str into an array representation, e.g. for ver_str=""
return [4, 6, 13, 0].
return [int(num) for num in ver_str.split('.')]
def version_string(ver, rev=None):
ver is an array of integers. rev is an optional integer. Produce string
representation of version (optionally with revision number), like:
No version normalization is performed.
return '.'.join([str(n) for n in ver]) + ('' if rev is None else f'-{rev}')
class VersionedContentItem:
'''Stores definitions of multiple versions of website content item.'''
def __init__(self):
self.uuid = None
self.identifier = None
self.by_version = {}
self.known_versions = []
def register_item(self, item):
'''Make item queryable by version. Perform sanity checks for uuid.'''
if self.identifier is None:
self.identifier = item['identifier']
self.uuid = item['uuid']
elif self.uuid != item['uuid']:
raise ValueError(_('uuid_mismatch_{identifier}')
ver = item['version']
ver_str = version_string(ver)
if ver_str in self.by_version:
raise ValueError(_('version_clash_{identifier}_{version}')
self.by_version[ver_str] = item
def get_by_ver(self, ver=None):
Find and return definition of the newest version of item.
If ver is specified, instead find and return definition of that version
of the item (or None is absent).
ver = version_string(ver or self.known_versions[-1])
return self.by_version.get(ver)
def get_all(self):
'''Return a list of all definitions of item, ordered by version.'''
return [self.by_version[version_string(ver)]
for ver in self.known_versions]
class PatternTreeNode:
"Pattern Tree" is how we refer to the data structure used for querying
Haketilo patterns. Those look like 'https://*.example.com/ab/***'. The goal
is to make it possible for given URL to quickly retrieve all known patterns
that match it.
def __init__(self):
self.wildcard_matches = [None, None, None]
self.literal_match = None
self.children = {}
def search(self, segments):
Yields all matches of this segments sequence against the tree that
starts at this node. Results are produces in order from greatest to
lowest pattern specificity.
nodes = [self]
for segment in segments:
next_node = nodes[-1].children.get(segment)
if next_node is None:
nsegments = len(segments)
cond_literal = lambda: len(nodes) == nsegments
cond_wildcard = [
lambda: len(nodes) + 1 == nsegments and segments[-1] != '*',
lambda: len(nodes) + 1 < nsegments,
lambda: len(nodes) + 1 != nsegments or segments[-1] != '***'
while nodes:
node = nodes.pop()
for item, condition in [(node.literal_match, cond_literal),
*zip(node.wildcard_matches, cond_wildcard)]:
if item is not None and condition():
yield item
def add(self, segments, item_instantiator):
Make item queryable through (this branch of) the Pattern Tree. If there
was not yet any item associated with the tree path designated by
segments, create a new one using item_instantiator() function. Return
all items matching this path (both the ones that existed and the ones
just created).
node = self
segment = None
for segment in segments:
wildcards = node.wildcard_matches
child = node.children.get(segment) or PatternTreeNode()
node.children[segment] = child
node = child
if node.literal_match is None:
node.literal_match = item_instantiator()
if segment not in ('*', '**', '***'):
return [node.literal_match]
if wildcards[len(segment) - 1] is None:
wildcards[len(segment) - 1] = item_instantiator()
return [node.literal_match, wildcards[len(segment) - 1]]
proto_regex = re.compile(r'^(?P\w+)://(?P.*)$')
user_re = r'[^/?#@]+@' # r'(?P[^/?#@]+)@' # discarded for now
query_re = r'\??[^#]*' # r'\??(?P[^#]*)' # discarded for now
domain_re = r'(?P[^/?#]+)'
path_re = r'(?P[^?#]*)'
http_regex = re.compile(f'{domain_re}{path_re}{query_re}.*')
ftp_regex = re.compile(f'(?:{user_re})?{domain_re}{path_re}.*')
class UrlError(ValueError):
class DeconstructedUrl:
'''Represents a deconstructed URL or URL pattern'''
def __init__(self, url):
self.url = url
match = proto_regex.match(url)
if not match:
raise UrlError(_('invalid_URL_{}').format(url))
self.proto = match.group('proto')
if self.proto not in ('http', 'https', 'ftp'):
raise UrlError(_('disallowed_protocol_{}').format(proto))
if self.proto == 'ftp':
match = ftp_regex.match(match.group('rest'))
elif self.proto in ('http', 'https'):
match = http_regex.match(match.group('rest'))
if not match:
raise UrlError(_('invalid_URL_{}').format(url))
self.domain = match.group('domain').split('.')
self.path = [*filter(None, match.group('path').split('/'))]
class MappingItem:
A mapping, together with one of its patterns, as stored in Pattern Tree.
def __init__(self, pattern, mapping):
self.pattern = pattern
self.mapping = mapping
def register(self, patterns_by_proto):
Make self queryable through the Pattern Tree that starts with the
protocols dictionary passed in the argument.
deco = DeconstructedUrl(self.pattern)
domain_tree = patterns_by_proto.get(deco.proto) or PatternTreeNode()
patterns_by_proto[deco.proto] = domain_tree
for path_tree in domain_tree.add(deco.domain, PatternTreeNode):
for match_list in path_tree.add(deco.path, list):
class Content:
'''Stores serveable website content.'''
def __init__(self, content_dir_path):
When an instance of Content is constructed, it searches
content_dir_path for custom serveable site content and loads it.
self.resources = {}
self.mappings = {}
self.licenses = {}
self.indexes = {}
self.definition_processors = {
'resource': self._process_resource_or_mapping,
'mapping': self._process_resource_or_mapping,
'license': self._process_license
self.patterns_by_proto = {}
self.file_sha256sums = {}
self.content_dir_path = pathlib.Path(content_dir_path).resolve()
if not self.content_dir_path.is_dir():
raise ValueError(_('content_dir_path_not_dir'))
for subdir_path in self.content_dir_path.iterdir():
if not subdir_path.is_dir():
self._load_content_from_subdir(subdir_path, subdir_path.name)
except Exception as e:
if current_app._pydrilla_werror:
raise e from None
logging.error(_('couldnt_load_content_from_%s'), subdir_path,
def _load_content_from_subdir(self, subdir_path, source_name):
Helper function used to load definitions from index.json of a
subdirectory of the content direcotory.
index_path = subdir_path / 'index.json'
with open(index_path) as index_file:
index = json.loads(strip_json_comments(index_file.read()))
self._process_index(index, source_name)
def register_item(dict, item):
Helper function used to add a versioned item definition to content
data structures.
identifier = item['identifier']
versioned_item = dict.get(identifier)
if versioned_item is None:
versioned_item = VersionedContentItem()
dict[identifier] = versioned_item
def _process_copyright_and_license(definition):
'''Helper function used by other _process_*() methods.'''
for field in ['copyright', 'licenses']:
if definition[field] == 'auto':
raise MyNotImplError(f'"{{field}}": "auto"',
def _get_file_sha256sum(self, path):
Compute sha256 of the file at path. Cache results on this Content
path = path.resolve()
sha256sum = self.file_sha256sums.get(path)
if sha256sum is None:
with open(path, mode='rb') as hashed_file:
sha256sum = sha256(hashed_file.read()).digest().hex()
self.file_sha256sums[path] = sha256sum
return sha256sum
def _add_file_sha256sum(self, source_name, file_object):
Expect file_object to be a dict with field "file" holding a file path
relative to content directory's subdirectory source_name. Compute or
fetch from cache the sha256 sum of that file and put it in file_object's
"sha256" field.
file_path = self.content_dir_path / source_name / file_object['file']
file_object['sha256'] = self._get_file_sha256sum(file_path)
def _process_resource_or_mapping(self, definition, index):
Sanitizes, autocompletes and registers serveable mapping/resource
definition['version'] = normalize_version(definition['version'])
if definition['type'] == 'resource':
definition['dependencies'] = definition.get('dependencies', [])
self.register_item(self.resources, definition)
source_name = definition['source_name']
for script in definition['scripts']:
self._add_file_sha256sum(source_name, script)
self.register_item(self.mappings, definition)
def _process_license(self, license, index):
'''Sanitizes and registers serveable license definition.'''
identifier = license['identifier']
if identifier in self.licenses:
raise ValueError(_('license_clash_{}').format(identifier))
self.licenses[identifier] = license
source_name = license['source_name']
for legal_text in license['legal_text']:
self._add_file_sha256sum(source_name, legal_text)
notice = license.get('notice')
if notice is not None:
self._add_file_sha256sum(source_name, notice)
def _process_index(self, index, source_name):
Sanitizes, autocompletes and registers data from a loaded index.json
schema_ver = normalize_version(index['schema_version'])
index['schema_version'] = schema_ver
if schema_ver != SCHEMA_VERSION:
raise ValueError('index_json_schema_mismatch_{found}_{required}'
if source_name in self.indexes:
raise ValueError(_('source_name_clash_{}').format(source_name))
index['source_name'] = source_name
self.indexes[source_name] = index
for definition in index['definitions']:
definition['source_name'] = source_name
definition['source_copyright'] = index['copyright']
definition['source_licenses'] = index['licenses']
processor = self.definition_processors[definition['type']]
processor(definition, index)
except Exception as e:
if current_app._pydrilla_werror:
raise e from None
logging.error(_('couldnt_load_definition_from_%s'), subdir_path,
def all_items(versioned_items_dict):
'''Iterator over all registered versions of all items.'''
for versioned_item in versioned_items_dict.values():
for item in versioned_item.by_version.values():
yield item
def _report_missing(self):
Use logger to print information about items that are referenced but
were not loaded.
def report_missing_license(object, object_type, lic):
if object_type == 'index':
source=object['source_name'], lic=lic)
ver_str = version_string(object['version'])
kwargs = {object_type: object['identifier'], ver: ver_str, lic: lic}
if object_type == 'resource':
fmt = _('no_resource_license_%(resource)s_%(ver)s_%(lic)s')
fmt = _('no_mapping_license_%(mapping)s_%(ver)s_%(lic)s')
logging.error(fmt, **kwargs)
for object_type, iterable in [
('index', self.indexes.values()),
('resource', self.all_items(self.resources))
for object in iterable:
to_process = [object['licenses']]
licenses = []
while to_process:
term = to_process.pop()
if type(term) is str:
if term not in ['or', 'and'] and \
term not in self.licenses:
report_missing_license(object, object_type, lic)
def report_missing_dependency(resource, dep):
dep=dep, resource=resource['identifier'],
for resource in self.all_items(self.resources):
for dep in resource['dependencies']:
if dep not in self.resources:
report_missing_dependency(resource, dep)
def report_missing_payload(mapping, payload):
mapping=mapping['identifier'], payload=payload,
for mapping in self.all_items(self.mappings):
for payload in mapping['payloads']:
payload = payload['payload']
if payload not in self.resources:
report_missing_payload(mapping, payload)
def _finalize(self):
Initialize structures needed to serve queries. Called once after all
data gets loaded.
for dict in [self.resources, self.mappings]:
for versioned_item in dict.values():
for mapping in self.all_items(self.mappings):
for payload in mapping['payloads']:
pattern = payload['pattern']
MappingItem(pattern, mapping)\
except Exception as e:
if current_app._pydrilla_werror:
raise e from None
mapping=mapping['identifier'], pattern=pattern,
def query(self, url):
Return a list of registered mappings that match url.
If multiple versions of a mapping are applicable, only the most recent
is included in the result.
deco = DeconstructedUrl(url)
mappings = {}
domain_tree = self.patterns_by_proto.get(deco.proto) \
or PatternTreeNode()
def process_item(item):
if url[-1] != '/' and item.pattern[-1] == '/':
identifier = item.mapping['identifier']
if identifier not in mappings or \
item.mapping['version'] > mappings[identifier]['version']:
mappings[identifier] = item.mapping
for path_tree in domain_tree.search(deco.domain):
for item_list in path_tree.search(deco.path):
for item in item_list:
return list(mappings.values())
def create_app(config_path=(here / 'config.json'), flask_config={}):
app = Flask(__package__)
language = flask_config.get('lang', 'en')
translation = gettext.translation('pydrilla', localedir=(here / 'locales'),
app._pydrilla_gettext = translation.gettext
# https://stackoverflow.com/questions/9449101/how-to-stop-flask-from-initialising-twice-in-debug-mode
if app.debug and os.environ.get('WERKZEUG_RUN_MAIN') != 'true':
return app
config = load_config(config_path)
for key in ['static_resource_uri', 'content_dir', 'hydrilla_sources_uri']:
if key not in config:
raise ValueError(_('config_key_absent_{}').format(key))
app._pydrilla_static_resource_uri = config['static_resource_uri']
if app._pydrilla_static_resource_uri[-1] != '/':
app._pydrilla_static_resource_uri += '/'
app._pydrilla_hydrilla_sources_uri = config['hydrilla_sources_uri']
app._pydrilla_werror = config.get('werror', False)
if 'hydrilla_parent' in config:
raise MyNotImplError('hydrilla_parent', config_path.name)
content_dir = pathlib.Path(config['content_dir'])
if not content_dir.is_absolute():
content_dir = config_path.parent / content_dir
with app.app_context():
app._pydrilla_content = Content(content_dir.resolve())
return app
def _(text_key):
return current_app._pydrilla_gettext(text_key)
def content():
return current_app._pydrilla_content
class MyEnvironment(Environment):
A wrapper class around jinja2.Environment that causes GNU gettext function
(as '_' and '__'), url_for function and 'hydrilla_sources_uri' config option
to be passed to every call of each template's render() method.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def get_template(self, *args, **kwargs):
template = super().get_template(*args, **kwargs)
old_render = template.render
def new_render(*args, **kwargs):
_ = current_app._pydrilla_gettext
sources_uri = current_app._pydrilla_hydrilla_sources_uri
def escaping_gettext(text_key):
from markupsafe import escape
return str(escape(_(text_key)))
final_kwargs = {
'_': escaping_gettext,
'__': escaping_gettext,
'url_for': url_for,
'hydrilla_sources_uri' : sources_uri
return old_render(*args, **final_kwargs)
template.render = new_render
return template
j2env = MyEnvironment(loader=PackageLoader(__package__), autoescape=False)
indexpage = j2env.get_template('index.html')
def index():
return indexpage.render()
def get_resource_or_mapping(identifier, get_dict):
ver = request.args.get('ver')
versioned_item = get_dict().get(identifier)
if ver == 'all':
definition = versioned_item.get_all() if versioned_item else []
if ver is not None:
ver = normalize_version(parse_version(ver))
definition = versioned_item and versioned_item.get_by_ver(ver)
if definition is None:
return json.dumps(definition)
def get_license_or_source(identifier, get_dict):
definition = get_dict().get(identifier)
if definition is None:
return json.dumps(definition)
for item_type, get_dict, get_item in [
('resource', lambda: content().resources, get_resource_or_mapping),
('mapping', lambda: content().mappings, get_resource_or_mapping),
('license', lambda: content().licenses, get_license_or_source),
('source', lambda: content().indexes, get_license_or_source)
def _get_item(identifier, get_dict=get_dict, get_item=get_item):
return get_item(identifier, get_dict)
bp.add_url_rule(f'/{item_type}s/', item_type, _get_item)
def query():
url = request.args['url']
return json.dumps(content().query(url))
def get_file(identifier, path):
if identifier not in content().indexes:
new_uri = f'{current_app._pydrilla_static_resource_uri}{identifier}/{path}'
return redirect(new_uri, code=301)