aboutsummaryrefslogtreecommitdiff
import os
import sys
from pathlib import Path
import pwd
import shutil

import IPython
from IPython import display
from sql.run import ResultSet

here = Path(__loader__.path).parent

def prepare_notebook_for_sql():
    ipy = IPython.get_ipython()
    ipy.run_line_magic('load_ext', 'sql')
    ipy.run_line_magic('config', 'SqlMagic.style = "_DEPRECATED_DEFAULT"')
    ipy.run_line_magic('config', 'SqlMagic.displaycon = False')

    # https://stackoverflow.com/questions/41046955/formatting-sql-query-inside-an-ipython-jupyter-notebook#answer-54782158
    display.display(display.Javascript('''
        require(['notebook/js/codecell'], function(codecell) {
            console.log(codecell);
            const modes = codecell.CodeCell.options_default.highlight_modes;
            modes['magic_sqlite'] = {reg: [/^%%sql/]};
            Jupyter.notebook.events.one('kernel_ready.Kernel', function(){
                Jupyter.notebook.get_cells().map(function(cell){
                    if (cell.cell_type == 'code'){
                        cell.auto_highlight();
                    }
                });
            });
        });
    '''))

    # Prevent MathJaxizing of text in cells.
    repr_html_orig = ResultSet._repr_html_
    def repr_html_new(self):
        maybe_html = repr_html_orig(self)
        return maybe_html and maybe_html.replace('$', '\\$')
    ResultSet._repr_html_ = repr_html_new

def run_command(command):
    code = os.system(command)
    if code != 0:
        raise Exception(f'Got error code {code} from shell command: {command}')

def verify_file_against_sha256(path, sha256_hex):
    run_command(f'sha256sum {Path(path)} | grep -q {sha256_hex}' +
                ' # Verify contents.')

def download_file(url, path):
    path = Path(path)
    if path.exists():
        if path.stat().st_size > 0:
            return
        path.unlink()
    run_command(f'wget {url} -O {path} --quiet')

def download_verify_file(url, path, sha256_hex):
    path = Path(path)
    tmp_path = path.with_name(path.name + ".tmp")
    download_file(url, tmp_path)
    verify_file_against_sha256(tmp_path, sha256_hex)
    tmp_path.rename(path)

class WebImage:
    def __init__(self, url, path):
        self.url = url
        self.path = Path(path)

    def download(self):
        download_file(self.url, self.path)

    def download_display(self):
        self.download()
        name = self.path.name
        if name.lower().endswith('.png'):
            display_func = display.display_png
        elif any(name.lower().endswith(ext) for ext in ('.jpg', '.jpeg')):
            display_func = display.display_jpeg
        else:
            raise Exception(f'Image format not deduced from name: {name}')
        display_func(self.path.read_bytes(), raw=True)

    def download_open(self):
        self.download()
        run_command(f'nohup xdg-open {self.path} > /dev/null 2>&1')

nw_diagram = WebImage(
    'https://raw.githubusercontent.com/pthom/northwind_psql/cd0ef28d66369fbe177778e604e4be0f153c9e5c/ER.png',
    here / 'nw-schema.png'
)

nw_dump_url = 'https://github.com/pthom/northwind_psql/raw/cd0ef28d66369fbe177778e604e4be0f153c9e5c/northwind.sql'
nw_dump_path = here / 'nw-pg.sql'
nw_dump_sha256 = '0ee30c01ba282f7194f38bf7f99cd6be0470b7ee5f67d0f7ca41fb058d735e0c'

def drop_all_nw_postgres_tables():
    ipy = IPython.get_ipython()
    table_names = [name for name, in ipy.run_line_magic('sql', '''
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = 'public';
    ''')]
    ipy.run_line_magic('sql', ' '.join(f'DROP TABLE "{name}" CASCADE;'
                                       for name in table_names
                                       if '"' not in name))

def download_restore_nw_postgres_dump():
    download_verify_file(nw_dump_url, nw_dump_path, nw_dump_sha256)

    drop_all_nw_postgres_tables()
    ipy = IPython.get_ipython()
    ipy.run_line_magic('sql', f'--file {nw_dump_path}')

nw_dump_mariadb_path = here / 'nw-m.sql'

def download_restore_nw_mariadb_dump():
    download_verify_file(nw_dump_url, nw_dump_path, nw_dump_sha256)

    subs = ''.join([
        r's/---/-- /;',
        r's/^SET/-- SET/;',
        r's/bytea/binary/;',
        r's/ADD CONSTRAINT \S\+/ADD/;',
        r's/\(ALTER TABLE\) ONLY/\1/;',
        r's/\(FOREIGN KEY \([(][^)]\+[)]\) .\+\);/\1 \2;/;',
        r's/\([(]ship_via[)] REFERENCES \S\+\)[^;]\+/\1 (shipper_id)/;',
        r's/\([(]reports_to[)] REFERENCES \S\+\)[^;]\+/\1 (employee_id)/;'
    ])
    run_command(f"sed '{subs}' {nw_dump_path} > {nw_dump_mariadb_path}")

    ipy = IPython.get_ipython()
    ipy.run_line_magic('sql', f'--file {nw_dump_mariadb_path}')

wp_tarball_url = 'https://wordpress.org/wordpress-6.8.3.tar.gz'
wp_tarball_path = here / 'wp-6.8.3.tar.gz'
wp_tarball_sha256 = '92da34c9960e64d1258652c1ef73c517f7e46ac6dfd2dfc75436d3855af46b0c'

faker_zip_url = 'https://downloads.wordpress.org/plugin/fakerpress.0.8.0.zip'
faker_zip_path = here / 'fakerpress-0.8.0.zip'
faker_zip_sha256 = '6dfee3efc79bddc56f84b885c1c4983bde6d03e151b5af96bb3d5802a4af8519'

def download_install_wordpress_site():
    if Path('/var/www/site').exists():
        shutil.rmtree('/var/www/site')

    run_command('printf "DROP DATABASE IF EXISTS agh_it_wordpress" | mariadb')
    run_command('printf "CREATE DATABASE agh_it_wordpress" | mariadb')

    download_verify_file(wp_tarball_url, wp_tarball_path, wp_tarball_sha256)
    Path('/var/www/site').mkdir(exist_ok=True)
    run_command(f'tar -C /var/www/site --strip-components=1 -xf {wp_tarball_path}')

    files_string = ' '.join(f'/var/www/site/{file}' for file in [
        'wp-includes/link-template.php',
        'wp-includes/js/dist/block-library.min.js',
        'wp-includes/js/dist/block-library.js',
        'wp-admin/user-edit.php',
        'wp-admin/includes/upgrade.php',
        'wp-admin/includes/credits.php',
        'wp-admin/includes/credits.php'
    ])
    # Less trackers ;)
    run_command(f'sed -i s/gravatar[.]com/gravatar.invalid/g {files_string}')

    substitutions = [
        [r".*define[(] 'DB_NAME', .*",
         "define( 'DB_NAME', 'agh_it_wordpress' );"],
        [r".*define[(] 'DB_USER', .*",
         f"define( 'DB_USER', '{pwd.getpwuid(os.getuid()).pw_name}' );"],
        [r".*define[(] 'DB_HOST', .*",
         "define( 'DB_HOST', 'localhost:/var/run/mysql/mysql.sock' );"],
        [r".*define[(] 'WP_DEBUG', .*",
         "define( 'DB_DEBUG', true );"]
    ]
    sed_program = ';'.join(f's^{pattern}^{replacement}^'
                           for pattern, replacement in substitutions)
    run_command(f'sed -i "{sed_program}" /var/www/site/wp-config-sample.php')
    shutil.copy('/var/www/site/wp-config-sample.php',
                '/var/www/site/wp-config.php')

    # Less noise.
    with open('/var/www/site/wp-includes/plugin.php', 'at') as plugin_php:
        plugin_php.write('''
/* https://developer.wordpress.org/reference/hooks/pre_http_request/ */

add_filter('pre_http_request', 'pass_or_reject_request', 10, 3);

function pass_or_reject_request($preempt, $parsed_args, $url) {
    return new WP_Error( 'http_request_block', __( "This request is not allowed", "textdomain"));
}
''')

    params_string = '&'.join(f'{key}={val}' for key, val in dict.items({
        'weblog_title': 'Demo+Site',
        'user_name': 'demo_user',
        'admin_password': 'demo_pwd',
        'admin_password2': 'demo_pwd',
        'pw_weak': 'on',
        'admin_email': 'dummy%40domain.invalid',
        'language': ''
    }))
    install_url = 'http://127.0.0.1:28080/site/wp-admin/install.php?step=2'
    run_command(f'wget --post-data "{params_string}" -O /dev/null --quiet \
                  {install_url}')

    # Fakerpress is useful for generating some lorem ipsum posts.
    download_verify_file(faker_zip_url, faker_zip_path, faker_zip_sha256)
    run_command(f'cd /var/www/site/wp-content/plugins && \
                  unzip -qq {faker_zip_path}')

wp_diagram = WebImage(
    'https://codex.wordpress.org/images/2/25/WP4.4.2-ERD.png',
    here / 'wp-schema.png'
)

def setup_django_project():
    for path in ('demo_site', 'forge', 'manage.py'):
        run_command(f'rm -rf {path}')

    run_command('django-admin startproject demo_site .')
    with open('demo_site/settings.py', 'at') as settings_file:
        settings_file.write('''
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'object_orientation',
        'USER': 'demo_user',
        'PASSWORD': 'demo_pwd',
        'HOST': 'localhost',
        'PORT': '25432'
    }
}
''')
    run_command(f'python3 manage.py startapp forge')
    sed_program = r'/INSTALLED_APPS = [[]/\0\n    "forge",/'
    run_command(f'''sed -i s'{sed_program}' demo_site/settings.py''')

def prepare_notebook_for_django():
    if os.getcwd() not in sys.path:
        sys.path.append(os.getcwd())

    os.environ['DJANGO_SETTINGS_MODULE'] = 'demo_site.settings'
    os.environ['DJANGO_ALLOW_ASYNC_UNSAFE'] = 'whatever'

    import django
    django.setup()

    from django.apps import registry
    from django.db.models import base as models_base

    old_new = models_base.ModelBase.__new__

    def new_new(cls, name, bases, attrs, **kwargs):
        'AGH replacement function for __new__.'
        if attrs['__module__'] == '__main__':
            attrs['__module__'] = 'forge.models'
            if 'forge' in registry.apps.all_models:
                forge_models = registry.apps.all_models['forge']
                if name.lower() in forge_models:
                    del forge_models[name.lower()]

        return old_new(cls, name, bases, attrs, **kwargs)

    if (old_new.__doc__ or '').find('AGH replacement') < 0:
        models_base.ModelBase.__new__ = new_new