import os
import sys
from pathlib import Path
import pwd
import shutil
import IPython
from IPython import display
from sql.run import ResultSet
here = Path(__loader__.path).parent
def prepare_notebook_for_sql():
ipy = IPython.get_ipython()
ipy.run_line_magic('load_ext', 'sql')
ipy.run_line_magic('config', 'SqlMagic.style = "_DEPRECATED_DEFAULT"')
ipy.run_line_magic('config', 'SqlMagic.displaycon = False')
# https://stackoverflow.com/questions/41046955/formatting-sql-query-inside-an-ipython-jupyter-notebook#answer-54782158
display.display(display.Javascript('''
require(['notebook/js/codecell'], function(codecell) {
console.log(codecell);
const modes = codecell.CodeCell.options_default.highlight_modes;
modes['magic_sqlite'] = {reg: [/^%%sql/]};
Jupyter.notebook.events.one('kernel_ready.Kernel', function(){
Jupyter.notebook.get_cells().map(function(cell){
if (cell.cell_type == 'code'){
cell.auto_highlight();
}
});
});
});
'''))
# Prevent MathJaxizing of text in cells.
repr_html_orig = ResultSet._repr_html_
def repr_html_new(self):
maybe_html = repr_html_orig(self)
return maybe_html and maybe_html.replace('$', '\\$')
ResultSet._repr_html_ = repr_html_new
def run_command(command):
code = os.system(command)
if code != 0:
raise Exception(f'Got error code {code} from shell command: {command}')
def verify_file_against_sha256(path, sha256_hex):
run_command(f'sha256sum {Path(path)} | grep -q {sha256_hex}' +
' # Verify contents.')
def download_file(url, path):
path = Path(path)
if path.exists():
if path.stat().st_size > 0:
return
path.unlink()
run_command(f'wget {url} -O {path} --quiet')
def download_verify_file(url, path, sha256_hex):
path = Path(path)
tmp_path = path.with_name(path.name + ".tmp")
download_file(url, tmp_path)
verify_file_against_sha256(tmp_path, sha256_hex)
tmp_path.rename(path)
class WebImage:
def __init__(self, url, path):
self.url = url
self.path = Path(path)
def download(self):
download_file(self.url, self.path)
def download_display(self):
self.download()
name = self.path.name
if name.lower().endswith('.png'):
display_func = display.display_png
elif any(name.lower().endswith(ext) for ext in ('.jpg', '.jpeg')):
display_func = display.display_jpeg
else:
raise Exception(f'Image format not deduced from name: {name}')
display_func(self.path.read_bytes(), raw=True)
def download_open(self):
self.download()
run_command(f'nohup xdg-open {self.path} > /dev/null 2>&1')
nw_diagram = WebImage(
'https://raw.githubusercontent.com/pthom/northwind_psql/cd0ef28d66369fbe177778e604e4be0f153c9e5c/ER.png',
here / 'nw-schema.png'
)
nw_dump_url = 'https://github.com/pthom/northwind_psql/raw/cd0ef28d66369fbe177778e604e4be0f153c9e5c/northwind.sql'
nw_dump_path = here / 'nw-pg.sql'
nw_dump_sha256 = '0ee30c01ba282f7194f38bf7f99cd6be0470b7ee5f67d0f7ca41fb058d735e0c'
def drop_all_nw_postgres_tables():
ipy = IPython.get_ipython()
table_names = [name for name, in ipy.run_line_magic('sql', '''
SELECT tablename
FROM pg_tables
WHERE schemaname = 'public';
''')]
ipy.run_line_magic('sql', ' '.join(f'DROP TABLE "{name}" CASCADE;'
for name in table_names
if '"' not in name))
def download_restore_nw_postgres_dump():
download_verify_file(nw_dump_url, nw_dump_path, nw_dump_sha256)
drop_all_nw_postgres_tables()
ipy = IPython.get_ipython()
ipy.run_line_magic('sql', f'--file {nw_dump_path}')
nw_dump_mariadb_path = here / 'nw-m.sql'
def download_restore_nw_mariadb_dump():
download_verify_file(nw_dump_url, nw_dump_path, nw_dump_sha256)
subs = ''.join([
r's/---/-- /;',
r's/^SET/-- SET/;',
r's/bytea/binary/;',
r's/ADD CONSTRAINT \S\+/ADD/;',
r's/\(ALTER TABLE\) ONLY/\1/;',
r's/\(FOREIGN KEY \([(][^)]\+[)]\) .\+\);/\1 \2;/;',
r's/\([(]ship_via[)] REFERENCES \S\+\)[^;]\+/\1 (shipper_id)/;',
r's/\([(]reports_to[)] REFERENCES \S\+\)[^;]\+/\1 (employee_id)/;'
])
run_command(f"sed '{subs}' {nw_dump_path} > {nw_dump_mariadb_path}")
ipy = IPython.get_ipython()
ipy.run_line_magic('sql', f'--file {nw_dump_mariadb_path}')
wp_tarball_url = 'https://wordpress.org/wordpress-6.8.3.tar.gz'
wp_tarball_path = here / 'wp-6.8.3.tar.gz'
wp_tarball_sha256 = '92da34c9960e64d1258652c1ef73c517f7e46ac6dfd2dfc75436d3855af46b0c'
faker_zip_url = 'https://downloads.wordpress.org/plugin/fakerpress.0.8.0.zip'
faker_zip_path = here / 'fakerpress-0.8.0.zip'
faker_zip_sha256 = '6dfee3efc79bddc56f84b885c1c4983bde6d03e151b5af96bb3d5802a4af8519'
def download_install_wordpress_site():
if Path('/var/www/site').exists():
shutil.rmtree('/var/www/site')
run_command('printf "DROP DATABASE IF EXISTS agh_it_wordpress" | mariadb')
run_command('printf "CREATE DATABASE agh_it_wordpress" | mariadb')
download_verify_file(wp_tarball_url, wp_tarball_path, wp_tarball_sha256)
Path('/var/www/site').mkdir(exist_ok=True)
run_command(f'tar -C /var/www/site --strip-components=1 -xf {wp_tarball_path}')
files_string = ' '.join(f'/var/www/site/{file}' for file in [
'wp-includes/link-template.php',
'wp-includes/js/dist/block-library.min.js',
'wp-includes/js/dist/block-library.js',
'wp-admin/user-edit.php',
'wp-admin/includes/upgrade.php',
'wp-admin/includes/credits.php',
'wp-admin/includes/credits.php'
])
# Less trackers ;)
run_command(f'sed -i s/gravatar[.]com/gravatar.invalid/g {files_string}')
substitutions = [
[r".*define[(] 'DB_NAME', .*",
"define( 'DB_NAME', 'agh_it_wordpress' );"],
[r".*define[(] 'DB_USER', .*",
f"define( 'DB_USER', '{pwd.getpwuid(os.getuid()).pw_name}' );"],
[r".*define[(] 'DB_HOST', .*",
"define( 'DB_HOST', 'localhost:/var/run/mysql/mysql.sock' );"],
[r".*define[(] 'WP_DEBUG', .*",
"define( 'DB_DEBUG', true );"]
]
sed_program = ';'.join(f's^{pattern}^{replacement}^'
for pattern, replacement in substitutions)
run_command(f'sed -i "{sed_program}" /var/www/site/wp-config-sample.php')
shutil.copy('/var/www/site/wp-config-sample.php',
'/var/www/site/wp-config.php')
# Less noise.
with open('/var/www/site/wp-includes/plugin.php', 'at') as plugin_php:
plugin_php.write('''
/* https://developer.wordpress.org/reference/hooks/pre_http_request/ */
add_filter('pre_http_request', 'pass_or_reject_request', 10, 3);
function pass_or_reject_request($preempt, $parsed_args, $url) {
return new WP_Error( 'http_request_block', __( "This request is not allowed", "textdomain"));
}
''')
params_string = '&'.join(f'{key}={val}' for key, val in dict.items({
'weblog_title': 'Demo+Site',
'user_name': 'demo_user',
'admin_password': 'demo_pwd',
'admin_password2': 'demo_pwd',
'pw_weak': 'on',
'admin_email': 'dummy%40domain.invalid',
'language': ''
}))
install_url = 'http://127.0.0.1:28080/site/wp-admin/install.php?step=2'
run_command(f'wget --post-data "{params_string}" -O /dev/null --quiet \
{install_url}')
# Fakerpress is useful for generating some lorem ipsum posts.
download_verify_file(faker_zip_url, faker_zip_path, faker_zip_sha256)
run_command(f'cd /var/www/site/wp-content/plugins && \
unzip -qq {faker_zip_path}')
wp_diagram = WebImage(
'https://codex.wordpress.org/images/2/25/WP4.4.2-ERD.png',
here / 'wp-schema.png'
)
def setup_django_project():
for path in ('demo_site', 'forge', 'manage.py'):
run_command(f'rm -rf {path}')
run_command('django-admin startproject demo_site .')
with open('demo_site/settings.py', 'at') as settings_file:
settings_file.write('''
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'object_orientation',
'USER': 'demo_user',
'PASSWORD': 'demo_pwd',
'HOST': 'localhost',
'PORT': '25432'
}
}
''')
run_command(f'python3 manage.py startapp forge')
sed_program = r'/INSTALLED_APPS = [[]/\0\n "forge",/'
run_command(f'''sed -i s'{sed_program}' demo_site/settings.py''')
def prepare_notebook_for_django():
if os.getcwd() not in sys.path:
sys.path.append(os.getcwd())
os.environ['DJANGO_SETTINGS_MODULE'] = 'demo_site.settings'
os.environ['DJANGO_ALLOW_ASYNC_UNSAFE'] = 'whatever'
import django
django.setup()
from django.apps import registry
from django.db.models import base as models_base
old_new = models_base.ModelBase.__new__
def new_new(cls, name, bases, attrs, **kwargs):
'AGH replacement function for __new__.'
if attrs['__module__'] == '__main__':
attrs['__module__'] = 'forge.models'
if 'forge' in registry.apps.all_models:
forge_models = registry.apps.all_models['forge']
if name.lower() in forge_models:
del forge_models[name.lower()]
return old_new(cls, name, bases, attrs, **kwargs)
if (old_new.__doc__ or '').find('AGH replacement') < 0:
models_base.ModelBase.__new__ = new_new