the f*ck rants about stuff

python

After playing for a long time with Perl, Ive come to appreciate the Python approach

Its been a pleasure to solve problems with it ever since

If you want to learn python I really recommend the official python 3 tutorial and the in-depth random topics covered in the official howtos. For intermediate/expert level I really enjoyed Fluent Python (ed. 2015)

Latest posts related to python:



  1. Automating the creation of an static website

    tl;dr:
    python can turn tedious work into free time!
    New website about hiking in Extremadura, Spain: extremaruta.es

    extremaruta website snapshot 1

    extremaruta website snapshot 2

    PHP websites were on the rise a few years ago, mainly due to the raise of easy CMS like drupal and joomla. Their main problem is that they carry a high maintenance cost with them compared with an static website. You have to keep them up to date and theres new exploits every other week

    I was presented with this PHP website that had been hacked very long ago and it had to be taken down because there was no way to clean it up and there was no clean copy anywhere. The only reason they were using a PHP website was that it was “easy” upfront but they never really think it throught and they didnt really needed anything dynamic, like users

    One of the perks of static websites is that they are virtually impossible to hack and in case they are (probably because something else has been hacked and it gets affected), you can have it up again somewhere else in a matter of minutes

    So off we go to turn the original data into a website. I chose my prefered static website generator, pelican, and then wrote a few python scripts that mostly spew markdown (so no not even pelican specific generator!)

    It scans a directory with photos, .gpx and .pdf and generates the markdown and figure out where they belong and whats part of the website by the name of the files

    The major challenge was to reduce times because theres almost 10Gb of data that have to processed and it would had been very tedious to debug otherwise. Thumbnails have to get generated, watermarks added, decide if something new has been added on the original data, etc… Anything done, has to undergo through 10Gb of data

    """
    process.py
    
        Move files around and triggers the different proccesses
        in the order it needs to run, both for testing and for production
    """
    
    #!/usr/bin/python3
    
    import routes
    from shutil import move
    from subprocess import run
    from os.path import join, exists
    
    
    def sync_files():
        orig = join(routes.OUTPUT, "")
        dest = join(routes.FINAL_OUTPUT, "")
        linkdest = join("..", "..", orig)
        command = ("rsync", "-ah", "--delete",
                   "--link-dest={}".format(linkdest), orig, dest)
        reallinkdest = join(dest, linkdest)
        if(exists(reallinkdest)):
            #print("{} exists".format(reallinkdest))
            run(command)
        else:
            print("{} doesnt exist".format(reallinkdest))
            print("its very likely the command is wrong:\n{}".format(command))
            exit(1)
    
    
    def test_run():
        f = '.files_cache_files'
        if(exists(f)):
            #move(f, 'todelete')
            pass
        r = routes.Routes("real.files")
        # print(r)
        r.move_files()
        r.generate_markdown()
    
        sync_files()
    
    
    def final_run():
        r = routes.Routes("/media/usb/web/")
        # print(routes)
        r.move_files()
        r.generate_markdown()
    
        sync_files()
    
    
    test_run()
    # final_run()
    
    #!/usr/bin/python3
    """
    routes.py
    
        Generate the different information and intermediate cache files so it doesnt
        have to process everything every time
    """
    
    try:
        from slugify import slugify
    except ImportError as e:
        print(e)
        print("Missing module. Please install python3-slugify")
        exit()
    
    from pprint import pformat
    from shutil import copy
    from os.path import join, exists, basename, splitext
    import os
    import re
    import json
    
    # original files path
    ORIG_BASE = "/media/usb0/web/"
    ORIG_BASE = "files"
    ORIG_BASE = "real.files"
    # relative dest to write content
    OUTPUT = join("content", "auto", "")
    # relative dest pdf and gpx
    STATIC = join("static", "")
    FULL_STATIC = join("auto", "static", "")
    # relative photos dest
    PHOTOS = join("photos", "")
    # relative markdown dest
    PAGES = join("rutas", "")
    # relative banner dest
    BANNER = join(PHOTOS, "banner", "")
    # absolute dests
    BASE_PAGES = join(OUTPUT, PAGES, "")
    BASE_STATIC = join(OUTPUT, STATIC, "")
    BASE_PHOTOS = join(OUTPUT, PHOTOS, "")
    BASE_BANNER = join(OUTPUT, BANNER, "")
    
    TAGS = 'tags.txt'
    
    # Where to copy everything once its generated
    FINAL_OUTPUT = join("web", OUTPUT)
    
    def hard_link(src, dst):
        """Tries to hard link and copy it instead where it fails"""
        try:
            os.link(src, dst)
        except OSError:
            copy(src, dst)
    
    def sanitize_name(fpath):
        """ returns sane file names: '/á/b/c áD.dS' -> c-ad.ds"""
        fname = basename(fpath)
        split_fname = splitext(fname)
        name = slugify(split_fname[0])
        ext = slugify(split_fname[1]).lower()
        return ".".join((name, ext))
    
    class Routes():
        pdf_re = re.compile(r".*/R(\d{1,2}).*(?:PDF|pdf)$")
        gpx_re = re.compile(r".*/R(\d{1,2}).*(?:GPX|gpx)$")
        jpg_re = re.compile(r".*/\d{1,2}R(\d{1,2}).*(?:jpg|JPG)$")
        banner_re = re.compile(r".*BANNER/Etiquetadas/.*(?:jpg|JPG)$")
    
        path_re = re.compile(r".*PROVINCIA DE (.*)/\d* (.*)\ (?:CC|BA)/.*")
    
        def __getitem__(self, item):
            return self.__routes__[item]
    
        def __iter__(self):
            return iter(self.__routes__)
    
        def __str__(self):
            return pformat(self.__routes__)
    
        def __init__(self, path):
            self.__routes__ = {}
            self.__files__ = {}
    
            self.fcache = ".files_cache_" + slugify(path)
    
            if(exists(self.fcache)):
                print(f"Using cache to read. {self.fcache} detected:")
                self._read_files_cache()
            else:
                print(f"No cache detected. Reading from {path}")
                self._read_files_to_cache(path)
    
        def _init_dir(self, path, create_ruta_dirs=True):
            """ create dir estructure. Returns True if it had to create"""
            created = True
    
            if(exists(path)):
                print(f"{path} exist. No need to create dirs")
                created = False
            else:
                print(f"{path} doesnt exist. Creating dirs")
                os.makedirs(path)
                if(create_ruta_dirs):
                    self._create_ruta_dirs(path)
    
            return created
    
        def _create_ruta_dirs(self, path):
            """Create structure of directories in <path>"""
            for prov in self.__routes__:
                prov_path = join(path, slugify(prov))
                if(not exists(prov_path)):
                    os.makedirs(prov_path)
                for comar in self.__routes__[prov]:
                    comar_path = join(prov_path, slugify(comar))
                    if(not exists(comar_path)):
                        os.makedirs(comar_path)
                    # Special case for BASE_PAGES. Dont make last ruta folder
                    if(path != BASE_PAGES):
                        for ruta in self.__routes__[prov].get(comar):
                            ruta_path = join(comar_path, ruta)
                            if(not exists(ruta_path)):
                                os.makedirs(ruta_path)
    
        def _read_files_cache(self):
            with open(self.fcache) as f:
                temp = json.load(f)
            self.__routes__ = temp['routes']
            self.__files__ = temp['files']
    
        def _read_files_to_cache(self, path):
            """read files from path into memory. Also writes the cache file"""
            """also read tags"""
            for root, subdirs, files in os.walk(path):
                for f in files:
    
                    def append_ruta_var(match, var_name):
                        prov, comar = self._get_prov_comar(root)
                        ruta = match.group(1).zfill(2)
                        var_path = join(root, f)
                        r = self._get_ruta(prov, comar, ruta)
                        r.update({var_name: var_path})
    
                    def append_ruta_pic(match):
                        prov, comar = self._get_prov_comar(root)
                        ruta = match.group(1).zfill(2)
                        pic_path = join(root, f)
                        r = self._get_ruta(prov, comar, ruta)
                        pics = r.setdefault('pics', list())
                        pics.append(pic_path)
    
                    def pdf(m):
                        append_ruta_var(m, 'pdf_orig')
    
                    def gpx(m):
                        append_ruta_var(m, 'gpx_orig')
    
                    def append_banner(m):
                        pic_path = join(root, f)
                        banner = self.__files__.setdefault('banner', list())
                        banner.append(pic_path)
    
                    regexes = (
                        (self.banner_re, append_banner),
                        (self.pdf_re, pdf),
                        (self.gpx_re, gpx),
                        (self.jpg_re, append_ruta_pic),
                    )
    
                    for reg, func in regexes:
                        try:
                            match = reg.match(join(root, f))
                            if(match):
                                func(match)
                                break
                            # else:
                            #    print(f"no match for {root}/{f}")
                        except Exception:
                            print(f"Not sure how to parse this file: {f}")
                            print(f"r: {root}\ns: {subdirs}\nf: {files}\n\n")
    
            self._read_tags()
    
            temp = dict({'routes': self.__routes__, 'files': self.__files__})
            with open(self.fcache, "w") as f:
                json.dump(temp, f)
    
        def _read_tags(self):
            with open(TAGS) as f:
                for line in f.readlines():
                    try:
                        ruta, short_name, long_name, tags = [
                            p.strip() for p in line.split(":")]
                        prov, comar, number, _ = ruta.split("/")
                        r = self._get_ruta(prov, comar, number)
                        r.update({'short': short_name})
                        r.update({'long': long_name})
                        final_tags = list()
                        for t in tags.split(","):
                            final_tags.append(t)
                        r.update({'tags': final_tags})
                    except ValueError:
                        pass
    
        def _get_prov_comar(self, path):
            pathm = self.path_re.match(path)
            prov = pathm.group(1)
            comar = pathm.group(2)
    
            return prov, comar
    
        def _get_ruta(self, prov, comar, ruta):
            """creates the intermeidate dics if needed"""
    
            prov = slugify(prov)
            comar = slugify(comar)
    
            p = self.__routes__.get(prov)
            if(not p):
                self.__routes__.update({prov: {}})
    
            c = self.__routes__.get(prov).get(comar)
            if(not c):
                self.__routes__.get(prov).update({comar: {}})
    
            r = self.__routes__.get(prov).get(comar).get(ruta)
            if(not r):
                self.__routes__.get(prov).get(comar).update({ruta: {}})
    
            r = self.__routes__.get(prov).get(comar).get(ruta)
            return r
    
        def move_files(self):
            """move misc (banner) and ruta related files (not markdown)"""
            """from dir to OUTPUT"""
            self._move_ruta_files()
            # misc have to be moved after ruta files, because the folder
            # inside photos prevents ruta photos to be moved
            self._move_misc_files()
    
        def _move_misc_files(self):
            if (self._init_dir(BASE_BANNER, False)):
                print("moving banner...")
    
                for f in self.__files__['banner']:
                    fname = basename(f)
                    dest = slugify(basename(f))
                    hard_link(f, join(BASE_BANNER, sanitize_name(f)))
    
        def _move_ruta_files(self):
            """move everything ruta related: static and photos(not markdown)"""
            create_static = False
            create_photos = False
    
            if (self._init_dir(BASE_STATIC)):
                print("moving static...")
                create_static = True
    
            if (self._init_dir(BASE_PHOTOS)):
                print("moving photos...")
                create_photos = True
    
            for prov in self.__routes__:
                for comar in self.__routes__[prov]:
                    for ruta in self.__routes__[prov].get(comar):
                        r = self.__routes__[prov].get(comar).get(ruta)
                        fbase_static = join(
                            BASE_STATIC, prov, slugify(comar), ruta)
                        fbase_photos = join(
                            BASE_PHOTOS, prov, slugify(comar), ruta)
    
                        def move_file(orig, dest):
                            whereto = join(dest, sanitize_name(orig))
                            hard_link(orig, whereto)
    
                        if(create_static):
                            for fkey in ("pdf_orig", "gpx_orig"):
                                if(fkey in r):
                                    move_file(r[fkey], fbase_static)
    
                        if(create_photos and ("pics") in r):
                            for pic in r["pics"]:
                                move_file(pic, fbase_photos)
    
        def generate_markdown(self):
            """Create markdown in the correct directory"""
            self._init_dir(BASE_PAGES)
            for prov in self.__routes__:
                for comar in self.__routes__[prov]:
                    for ruta in self.__routes__[prov].get(comar):
                        r = self.__routes__[prov].get(comar).get(ruta)
                        pages_base = join(
                            BASE_PAGES, prov, slugify(comar))
                        fpath = join(pages_base, f"{ruta}.md")
    
                        photos_base = join(prov, slugify(comar), ruta)
                        static_base = join(
                            FULL_STATIC, prov, slugify(comar), ruta)
    
                        with open(fpath, "w") as f:
                            title = "Title: "
                            if('long' in r):
                                title += r['long']
                            else:
                                title += f"{prov} - {comar} - Ruta {ruta}"
                            f.write(title + "\n")
                            f.write(f"Path: {ruta}\n")
                            f.write("Date: 2018-01-01 00:00\n")
                            if('tags' in r):
                                f.write("Tags: {}".format(", ".join(r['tags'])))
                                f.write("\n")
                            f.write("Gallery: {photo}")
                            f.write(f"{photos_base}\n")
    
                            try:
                                fpath = join("/", static_base, sanitize_name(r['pdf_orig']))
                                f.write( f'Pdf: {fpath}\n')
                            except KeyError:
                                f.write('Esta ruta no tiene descripcion (pdf)\n\n')
    
    
                            try:
                                fpath = join("/", static_base, sanitize_name(r['gpx_orig']))
                                f.write(f"Gpx: {fpath}\n")
                            except KeyError:
                                f.write('Esta ruta no tiene coordenadas (gpx)\n\n')
    
    
                            if('pics' not in r):
                                f.write('Esta ruta no tiene fotos\n\n')
    
    
    
    if __name__ == "__main__":
        routes = Routes(ORIG_BASE)
        # print(routes)
        print("done reading")
        routes.move_files()
        routes.generate_markdown()
        print("done writing")
    
  2. No more bash

    bash logo crossed

    I recently stopped my (imho bad) habit of starting shell scripts in bash; no matter how small the task at hand feels originally

    I had an epiphany

    The number of bash scripts that grew out of control was just too damn high

    ive been told that

    it's a difficult balance
    

    But is it really?

    Its always the same story

    1. Well, I only have to run the same handful of commands multiple times in different directories, a shell script will do
    2. Except, sometimes it fails when…/this special case if../oh, never considered this… I will just add a couple more lines and fix it
    3. Script explodes, and gets rewritten in python

    It rarely had exceptions for me. Almost every .sh (if its intended to automate something) had to do sanity checks, error control/recovery and probably special case scenarios… eventually

    Im aware that if you are well versed in bash, you can do a lot. It has arrays and all kind of (imho weird) string mangling to make advanced use of variables, but it always felt like bash was filled with pitfalls that you have to learn to route around

    Writting the same thing in python takes about the same time. Maybe a couple more lines to write a few imports

    Im aware that python comes with its own pitfalls, but at least you can actually scale it when needed. And you save the rewritting part

    This is really hard for me to say

    I grew to love long one liners of pipes that solve complex problems. Also, most of the time you only seem to want to run a couple of commands on tandem

    But I think its time for me to say goodbye. In the same way I said goodbye to perl (and thats a rant for another day :))

    No more shell scripts

    No matter how small

  3. Backup fixes!

    A year ago I made an automatization solution for a backup. Very basic approach but it got the job done

    It started to fail randomly, so I had to take a look. I fixed it and took the oportunity to add a few features while debugging it

    Overall improved resilience. Now it can recover from most errors and inform properly when it can not

    Changelog:

    • FIX: Backup file geting corrupted on email transit. It seems google was mangling .gpg files
    • FIX: Add clean up section to ensure the resources are consumed. Systemd.path works like a spool. Also needs to sync at the end because systemd relaunch the file as soon as is done. The OS didnt even have time to write to disk
    • FIX: Clean up service on restart that auto remove mail lock created and never removed if computer loses power in the middle of the sending
    • FIX: Systemd.path starts processing as soon as the path is found. I had to ensure the file was done written before processing it
    • FIX: Systemd forking instead of oneshot. I was leaving the process ligering for the pop up windows to finish. This is what Type=forking does

    • FEAT: Checksums included in the backup to be able to auto verify integrity when recovering and be able to properly fail when the IN and OUT files are different

    • FEAT: Add proper systemd logging. Including checksums
    • FEAT: Show POP-UPs to the final users showing star/stop of the service and notifiying them of errors
    • FEAT: Add arguments to ease local debugging including --quiet option added for debugging remotely without showing POP UPS

    No repo! but heres the code so you take a peak or reuse it. POP-UPS are in spanish

    code
    backup.py
    
    #!/usr/bin/env python3
    
    from datetime import datetime, timedelta
    from os import path, remove, fork, _exit, environ
    from subprocess import run, CalledProcessError
    from sys import exit, version_info
    from systemd import journal
    from hashlib import md5
    import argparse
    
    
    def display_alert(text, wtype="info"):
        journal.send("display: {}".format(text.replace("\n", " - ")))
        if(not args.quiet):
            if(not fork()):
                env = environ.copy()
                env.update({'DISPLAY': ':0.0', 'XAUTHORITY':
                            '/home/{}/.Xauthority'.format(USER)})
                zenity_cmd = [
                    'zenity', '--text={}'.format(text), '--no-markup', '--{}'.format(wtype), '--no-wrap']
                run(zenity_cmd, env=env)
                # let the main thread do the clean up
                _exit(0)
    
    
    def md5sum(fname):
        cs = md5()
        with open(fname, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                cs.update(chunk)
        return cs.hexdigest()
    
    
    # Args Parser init
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-q", "--quiet", help="dont show pop ups", action="store_true")
    parser.add_argument("-u", "--user", help="user to display the dialogs as")
    parser.add_argument("-p", "--path", help="path of the file to backup")
    parser.add_argument("-t", "--to", help="who to send the email")
    parser.add_argument(
        "-k", "--keep", help="keep output file", action="store_true")
    parser.add_argument(
        "-n", "--no-mail", help="dont try to send the mail", action="store_true")
    args = parser.parse_args()
    
    # Globals
    USER = 'company'
    if(args.user):
        USER = args.user
        journal.send("USER OVERWRITE: {}".format(USER))
    
    TO = "info@company.com"
    if(args.to):
        TO = args.to
        journal.send("EMAIL TO OVERWRITE: {}".format(TO))
    BODY = "mail.body"
    FILENAME = 'database.mdb'
    PATH = '/home/company/shared'
    if(args.path):
        PATH = args.path
        journal.send("PATH OVERWRITE: {}".format(PATH))
    
    if(args.quiet):
        journal.send("QUIET NO-POPUPS mode")
    
    FILE = path.join(PATH, FILENAME)
    FILEXZ = FILE + ".tar.xz"
    now = datetime.now()
    OUTPUT = path.join(PATH, 'backup_{:%Y%m%d_%H%M%S}.backup'.format(now))
    CHECKSUM_FILE = FILENAME + ".checksum"
    
    error_msg_tail = "Ejecuta $ journalctl -u backup.service para saber más"
    
    LSOF_CMD = ["fuser", FILE]
    XZ_CMD = ["tar", "-cJC", PATH, "-f", FILEXZ, FILENAME, CHECKSUM_FILE]
    GPG_CMD = ["gpg", "-q", "--batch", "--yes", "-e", "-r", "backup", "-o", OUTPUT, FILEXZ]
    
    error = ""
    
    
    # Main
    display_alert('Empezando la copia de seguridad: {:%Y-%m-%d %H:%M:%S}\n\n'
                  'NO apagues el ordenador todavia por favor'.format(now))
    
    # sanity file exists
    if(path.exists(FILE)):
        journal.send(
            "New file {} detected. Trying to generate {}".format(FILE, OUTPUT))
    else:
        exit("{} not found. Aborting".format(FILE))
    
    # make sure file finished being copied
    finished_copy = False
    while(not finished_copy):
        try:
            run(LSOF_CMD, check=True)
            journal.send(
                "File is still open somewhere. Waiting 1 extra second before processing")
            run("sleep 1".split())
        except CalledProcessError:
            finished_copy = True
        except Exception as e:
            display_alert(
                "ERROR\n{}\n\n{}".format(e, error_msg_tail), "error")
            exit(0)
    
    filedate = datetime.fromtimestamp(path.getmtime(FILE))
    
    # sanity date
    if(now - timedelta(hours=1) > filedate):
        error = """El fichero que estas mandando se creó hace más de una hora.
    fecha del fichero: {:%Y-%m-%d %H:%M:%S}
    fecha actual     : {:%Y-%m-%d %H:%M:%S}
    
    Comprueba que es el correcto
    """.format(filedate, now)
    
    # Generate checksum file
    csum = md5sum(FILE)
    journal.send(".mdb md5: {} {}".format(csum, FILENAME))
    
    with open(CHECKSUM_FILE, "w") as f:
        f.write(csum)
        f.write(" ")
        f.write(FILENAME)
    
    # Compress
    if(path.isfile(FILEXZ)):
        remove(FILEXZ)
    
    journal.send("running XZ_CMD: {}".format(" ".join(XZ_CMD)))
    run(XZ_CMD)
    csum = md5sum(FILEXZ)
    journal.send(".tar.xz md5: {} {}".format(csum, FILEXZ))
    
    # encrypt
    journal.send("running GPG_CMD: {}".format(" ".join(GPG_CMD)))
    run(GPG_CMD)
    csum = md5sum(OUTPUT)
    journal.send(".gpg md5: {} {}".format(csum, OUTPUT))
    
    remove(FILEXZ)
    
    # sanity size
    filesize = path.getsize(OUTPUT)
    if(filesize < 5000000):
        error += """"El fichero que estas mandando es menor de 5Mb
    tamaño del fichero en bytes: ({})
    
    Comprueba que es el correcto
    """.format(filesize)
    
    subjectstr = "Backup {}ok con fecha {:%Y-%m-%d %H:%M:%S}"
    subject = subjectstr.format("NO " if error else "", now)
    body = """Todo parece okay, pero no olvides comprobar que
    el fichero salvado funciona bien por tu cuenta!
    """
    if(error):
        body = error
    
    with open(BODY, "w") as f:
        f.write(body)
    
    journal.send("{} generated correctly".format(OUTPUT))
    try:
        if(not args.no_mail):
            journal.send("Trying to send it to {}".format(TO))
            MAIL_CMD = ["mutt", "-a", OUTPUT, "-s", subject, "--", TO]
    
            if(version_info.minor < 6):
                run(MAIL_CMD, input=body, universal_newlines=True, check=True)
            else:
                run(MAIL_CMD, input=body, encoding="utf-8", check=True)
    except Exception as e:
        display_alert(
            "ERROR al enviar el backup por correo:\n{}".format(e), "error")
    else:
        later = datetime.now()
        took = later.replace(microsecond=0) - now.replace(microsecond=0)
        display_alert('Copia finalizada: {:%Y-%m-%d %H:%M:%S}\n'
                      'Ha tardado: {}\n\n'
                      'Ya puedes apagar el ordenador'.format(later, took))
    
    finally:
        if(not args.keep and path.exists(OUTPUT)):
            journal.send("removing gpg:{}".format(OUTPUT))
            remove(OUTPUT)
    
    unbackup.py
    #!/usr/bin/env python3
    
    from os import path, remove, sync, fork, _exit, environ
    from subprocess import run, CalledProcessError
    from glob import glob
    from sys import exit
    from systemd import journal
    from hashlib import md5
    import argparse
    
    
    def display_alert(text, wtype="info"):
        if(not args.quiet):
            if(not fork()):
                env = environ.copy()
                env.update({'DISPLAY': ':0.0', 'XAUTHORITY':
                            '/home/{}/.Xauthority'.format(USER)})
                zenity_cmd = [
                    'zenity', '--text={}'.format(text), '--no-markup', '--{}'.format(wtype), '--no-wrap']
                run(zenity_cmd, env=env)
                # Let the main thread do the clean up
                _exit(0)
    
    
    def md5sum(fname):
        cs = md5()
        with open(fname, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                cs.update(chunk)
        return cs.hexdigest()
    
    
    # Args Parser init
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "-q", "--quiet", help="dont show pop ups", action="store_true")
    parser.add_argument("-u", "--user", help="user to display the dialogs as")
    parser.add_argument("-p", "--path", help="path of the file to unbackup")
    parser.add_argument(
        "-k", "--keep", help="keep original file", action="store_true")
    args = parser.parse_args()
    
    # Globals
    USER = 'company'
    if(args.user):
        USER = args.user
        journal.send("USER OVERWRITE: {}".format(USER))
    
    PATH = '/home/rk/shared'
    if(args.path):
        PATH = args.path
        journal.send("PATH OVERWRITE: {}".format(PATH))
    
    if(args.quiet):
        journal.send("QUIET NO-POPUPS mode")
    
    OUTPUT_FILE = 'database.mdb'
    error_msg_tail = "Ejecuta $ journalctl -u unbackup.service para saber más"
    CHECKSUM_FILE = OUTPUT_FILE + ".checksum"
    
    
    # Main
    try:
        input_file = glob(path.join(PATH, 'backup*.backup'))[0]
    except IndexError as e:
        display_alert("ERROR\nEl fichero de backup no existe:\n{}\n\n{}".format(
            e, error_msg_tail), "error")
        exit(0)
    except Exception as e:
        display_alert(
            "ERROR\n{}\n{}".format(e, error_msg_tail), "error")
        exit(0)
    else:
        display_alert(
            "Se ha detectado {}. Empiezo a procesarlo".format(input_file))
    
        output_path = path.join(PATH, OUTPUT_FILE)
        output_pathxz = output_path + ".tar.xz"
    
        LSOF_CMD = ["fuser", input_file]
        GPG_CMD = ["gpg", "--batch", "-qdo", output_pathxz, input_file]
        XZ_CMD = ["tar", "-xf", output_pathxz]
    
    # make sure file finished being copied. Systemd triggers this script as soon as the file name shows
    try:
        finished_copy = False
        while(not finished_copy):
            try:
                run(LSOF_CMD, check=True)
                journal.send(
                    "File is still open somewhere. Waiting 1 extra second before processing")
                run("sleep 1".split())
            except CalledProcessError:
                finished_copy = True
            except Exception as e:
                display_alert(
                    "ERROR\n{}\n\n{}".format(e, error_msg_tail), "error")
                exit(0)
    
        csum = md5sum(input_file)
        journal.send(".gpg md5: {} {}".format(csum, input_file))
    
        if(path.exists(output_pathxz)):
            journal.send("{} detected. Removing".format(output_pathxz))
            remove(output_pathxz)
    
        journal.send("running GPG_CMD: {}".format(" ".join(GPG_CMD)))
        run(GPG_CMD, check=True)
    
        csum = md5sum(output_pathxz)
        journal.send("tar.xz md5: {} {}".format(csum, input_file))
    
        journal.send("running XZ_CMD: {}".format(" ".join(XZ_CMD)))
        run(XZ_CMD, check=True)
    
    # Check Checksum
        with open(CHECKSUM_FILE) as f:
            target_cs, filename = f.read().strip().split()
        actual_cs = md5sum(filename)
        journal.send(".mdb md5: {} {}".format(actual_cs, filename))
        if(target_cs == actual_cs):
            journal.send("El checksum interno final es correcto!")
        else:
            display_alert("ERROR\n"
                          "Los checksums de {} no coinciden"
                          "Que significa que el fichero esta dañado"
                          .format(filename), "error")
    
    except Exception as e:
        display_alert("ERROR\n{}\n\n{}"
                      .format(e, error_msg_tail), "error")
        exit(0)
    else:
        display_alert("{} generado con exito".format(output_path))
    finally:
        if(not args.keep and path.exists(input_file)):
            journal.send("CLEAN UP: removing gpg {}".format(input_file))
            # make sure the file is not open before trying to remove it
            sync()
            remove(input_file)
            # sync so systemd dont detect the file again after finishing the script
            sync()
    
    backup.path 
    [Unit]
    Description=Carpeta Compartida backup
    
    [Path]
    PathChanged=/home/company/shared/database.mdb
    Unit=backup.service
    
    [Install]
    WantedBy=multi-user.target
    
    backup.service
    [Unit]
    Description=backup service
    
    [Service]
    Type=forking
    ExecStart=/root/backup/backup.py
    TimeoutSec=600
    
    unbackup.path
    [Unit]
    Description=Unbackup shared folder
    
    [Path]
    PathExistsGlob=/home/company/shared/backup*.backup
    Unit=unbackup.service
    
    [Install]
    WantedBy=multi-user.target
    
    unbackup.service
    [Unit]
    Description=Unbackup service
    [Service]
    Type=forking
    Environment=DISPLAY=:0.0
    ExecStart=/root/company/unbackup.py
    
  4. You have no repos online!

    Git is life. I manage my own git server in git.alberto.tf. But its now set private to only committers

    The main reason is that, more often than not, im the only committer. That allows me to be carefree about using time based commits (as opposed to features-based commits as god intended) when I need it. For example to move my work from one computer to another, etc…

    Im also less careful about clumping together typos, features and fixes on the same commits

    Just by having many time-based commits make the repos themselves to not add much to the code I show otherwise. So I decided to not expose the repos by default. You are still free to use any code you find on this site

    The other reason is metadata

    Avoid awkward faces when I have to explain a commit at 4am. – “I thought you were sick!” :)

  5. Automating the extraction of duplicated zip files

    Its not that well-known that a zip file does not save a directory inside. It saves a secuence of files, and nothing prevents those files names to be duplicated inside a file

    All the tools Ive checked out overwrite silently the duplicates or allow you to manually rename them. Which is very tedious as soon as you have to do this a few times with lots of duplicated files

    I had to bake my own solution using python. If you know about a tool that does this, please let me know. I love to deprecate my own solutions :)

    unzip_rename_dups.py
    
    #!/usr/bin/env python3
    import pdb
    import sys
    import zipfile
    from os.path import splitext, dirname, abspath, join
    from os import rename
    
    
    ZIP = sys.argv[1]
    DIR = dirname(abspath(ZIP))
    
    filenames = {}
    extracted = 0
    dups = 0
    
    with zipfile.ZipFile(ZIP) as z:
    
        for info in z.infolist():
            z.extract(info, DIR)
            extracted += 1
    
            fn = info.filename
    
            if fn not in filenames:
                filenames[fn] = 1
            else:
                filenames[fn] += 1
                dups += 1
    
            orig_path = join(DIR, fn)
    
            preext, postext = splitext(fn)
            final_fn = preext + str(filenames[fn]) + postext
            final_path = join(DIR, final_fn)
    
            rename(orig_path, final_path)
    
    print("{} files extracted sucesfully. {} Duplicated files saved!".format(extracted, dups))
    
  6. Automatize wildcard cert renewal

    problem definition

    I host one instance of sandstorm. Id like to use my own domain AND HTTPS

    Sandstorm uses a new unguessable throw-away host name for every session as part of its security strategy, so in order to host your own under your own domain, you need a wildcard DNS entry and a wildcard cert for it (a cert with a *.yourdomain that will be valid for all your subdomains)

    I use certbot (aka letsencrypt) to generate my certificates. Unfortunately, they have stated that will not emit wildcard certificates. Not now, and very likely, not in the future

    Sandstorm offers a free DNS service using sandcats.io with batteries included (free wildcard cert). But this makes the whole site looks like they are not running under your control when you share a link to it to a third party (even tho is not true). This being one of the main points of running my own instance makes this solution not suitable for me

    For reasons that deserver its own rant, I will not buy a wildcard cert

    This only left me with the option of running sandstorm in a local port, have my apache proxy petitions and present the right certs. I will be using the sandcats.io DNS + wilcard cert for websockets, which are virtually invisible to the final user

    The certbot cert renovation is easy enough to automate, but I need to automate the renewal of the sandcats.io cert, which lasts for 9 days

    solution

    A service will run weekly to renew the cert. For this, It will use a configuration faking using one of those free sandcats.io free certs so sandstorm renew the cert. Parse the new cert and tell apache to use it

    shortcomings

    Disclaimer: This setup is not officially supported by sandstorm

    The reason is that some apps doesnt work well due to some browsers security policies. Just like sandstorm guys, I had to make a compromise. The stuff I use works for me and I have to test it before I use something new :)

    code
    updatecert.py
    
    #!/usr/bin/env python3
    import json
    from subprocess import call,check_call
    from glob import glob
    from shutil import copy
    from time import sleep
    from timeout import timeout
    
    TIMEOUT = 120
    
    SSPATH = '/opt/sandstorm'
    CONF = SSPATH + '/sandstorm.conf'
    GOODCONF = SSPATH + '/sandstorm.good.conf'
    CERTCONF = SSPATH + '/sandstorm.certs.conf'
    CERTSPATH = SSPATH + '/var/sandcats/https/server.sandcats.io/'
    APACHECERT = '/etc/apache2/tls/cert'
    APACHECERTPUB = APACHECERT + '.crt'
    APACHECERTKEY = APACHECERT + '.key'
    
    RESTART_APACHE_CMD = 'systemctl restart apache2'.split()
    RESTART_SS_CMD = 'systemctl restart sandstorm'.split()
    
    @timeout(TIMEOUT, "ERROR: Cert didnt renew in {} secs".format(TIMEOUT))
    def check_cert_reply(files_before):
        found = None
        print("waiting for new cert in" + CERTCONF, end="")
        while not found:
            print(".", end="", flush=True)
            sleep(5)
            files_after = set(glob(CERTSPATH + '*.response-json'))
    
            found = files_after - files_before
        else:
            print("")
        return found.pop()
    
    def renew_cert():
        files_before = set(glob(CERTSPATH + '*.response-json'))
        copy(CERTCONF, CONF)
        call(RESTART_SS_CMD)
        try:
            new_cert = check_cert_reply(files_before)
        finally:
            print("Restoring sandstorm conf and restarting it")
            copy(GOODCONF, CONF)
            call(RESTART_SS_CMD)
            print("Restoring done")
        return new_cert
    
    def parse_cert(certfile):
        with open(certfile) as f:
            certs = json.load(f)
    
        with open(APACHECERTPUB, 'w') as cert:
    
            cert.write(certs['cert'])
    
            ca = certs['ca']
            ca.reverse()
            for i in ca:
                cert.write('\n')
                cert.write(i)
    
        copy(certfile[:-len('.response-json')], APACHECERTKEY)
    
    if __name__ == '__main__':
        new_cert = renew_cert()
        parse_cert(new_cert)
        try:
            check_call(RESTART_APACHE_CMD)
        except:
            # one reason for apache to fail is to try to parse the json before is completely written
            # try once again just in case
            print("failed to restart apache with the new cert. Trying once more")
            sleep(1)
            parse_cert(new_cert)
            call(RESTART_APACHE_CMD)
    
    updatecert.service
    
    [Unit]
    Description=tries to renew ss cert
    OnFailure=status-email-admin@%n.service
    
    [Service]
    Type=oneshot
    ExecStart=/root/updatecert.py
    
    updatecert.timer
    
    [Unit]
    Description=runs ss cert renewal once a week
    
    [Timer]
    Persistent=true
    OnCalendar=weekly
    Unit=updatecert.service
    
    [Install]
    WantedBy=default.target
    
  7. Small automatic Backup using python

    EDIT: Newer version available

    problem definition

    An automatic backup of a database file inside a legacy windows virtual machine without internet access.

    The client doesnt have a dedicated online machine for backups and the backup should “leave the building”

    solution

    A shared folder using virtuabox shared folders facilities. The database will be copied once a day

    Outside the VM systemd will monitor the copy and launch the backup script

    The backup will be compress using XZ and encrypted using gpg with an asymetric key

    Finally, it will be sent for storage to one mail account where they can check if the backup was made

    code
    backup.py
    
    #!/usr/bin/env python3
    
    from datetime import datetime, timedelta
    from os import path, remove
    from subprocess import run
    from sys import exit
    
    now = datetime.now()
    
    TO = "info@company.com"
    BODY = "mail.body"
    FILENAME = 'database.mdb'
    PATH = '/home/company/shared'
    FILE = path.join(PATH, FILENAME)
    FILEXZ = FILE + ".xz"
    OUTPUT = path.join(PATH, 'backup_{:%Y%m%d_%H%M%S}.gpg'.format(now))
    
    XZ_CMD = "xz -k {}"
    GPG_CMD = "gpg -q --batch --yes -e -r rk -o {} {}"
    MAIL_CMD = "mutt -a {} -s '{}' -- {} < {}"
    
    error = ""
    
    # sanity file exists
    if path.exists(FILE):
        print("New file {} detected. Trying to generate {}".format(FILE, OUTPUT))
    else:
        exit("{} not found. Aborting".format(FILE))
    
    
    filedate = datetime.fromtimestamp(path.getmtime(FILE))
    
    # sanity date
    if now - timedelta(hours=1) > filedate:
        error = """The file you are sending was created 1+ hour ago
    file date   : {:%Y-%m-%d %H:%M:%S}
    current date: {:%Y-%m-%d %H:%M:%S}
    
    Please check if its the correct one
    """.format(filedate, now)
    
    # Compress
    if path.isfile(FILEXZ):
        remove(FILEXZ)
    
    run(XZ_CMD.format(FILE).split())
    
    # encrypt
    run(GPG_CMD.format(OUTPUT, FILEXZ).split())
    remove(FILEXZ)
    
    # sanity size
    filesize = path.getsize(OUTPUT)
    if filesize < 5000000:
        error += """"The size of the file you are sending is < 5Mb
    File size in bytes: ({})
    
    Please, Check if its the correct one
    """.format(filesize)
    
    subjectstr = "Backup {}ok with date {:%Y-%m-%d %H:%M:%S}"
    subject = subjectstr.format("NOT " if error else "", now)
    body = """Everything seems okay, but dont forget to check
    manually if the saved file works okay once in a while!
    """
    if error:
        body = error
    
    with open(BODY, "w") as f:
        f.write(body)
    
    print("{} generated correctly. Trying to send it to {}".format(OUTPUT, TO))
    run(MAIL_CMD.format(OUTPUT, subject, TO, BODY), shell=True)
    remove(OUTPUT)
    

    Inside the VM using the scheduler

    backup.bat
    
    @echo off
    xcopy /Y C:\program\database.mdb z:\
    

    mutt conf file

    .muttrc
    
    set sendmail="/usr/bin/msmtp"
    set use_from=yes
    set realname="Backup"
    set from=backup@company.com
    set envelope_from=yes
    

    systemd files

    shared.service
    
    [Unit]
    Description=company backup service
    
    [Service]
    Type=oneshot
    ExecStart=/root/backup/backup.py
    
    shared.path
    
    [Unit]
    Description=shared file
    
    [Path]
    PathChanged=/home/company/shared/database.mdb
    Unit=shared.service
    
    [Install]
    WantedBy=multi-user.target
    
  8. How to create your own markdown syntax with python-markdown

    Rationale

    Markdown (and I use Pelican with md) is not very good at handling images. I want an easy way in md for the image to fill the column but to link to the full size image automatically

    I want to turn:

    !![alt text](path/image.png title text)
    

    into:

    <a href="path/image.png"><img width=100% title="title text" alt="alt text" src="path/image.png"></a>
    
    Code
    aimg/__init__.py:
    
    #!/usr/bin/python
    # mardown extension. Wraps <a> tags around img and adds width=100%
    #
    # This makes easier to link to big images by making them fit the column
    # and linking to the big image
    #
    # run the module to check that it works :)
    
    from markdown.extensions import Extension
    from markdown.inlinepatterns import Pattern
    from markdown.util import etree
    
    class Aimg(Extension):
        def extendMarkdown(self, md, md_globals):
            md.inlinePatterns.add('aimg', AimgPattern('^!!\[(.+)\]\((\S+) (.+)\)$'), '_begin')
    
    
    class AimgPattern(Pattern):
        def handleMatch(self, m):
            a = etree.Element('a', {'href':m.group(3)})
            img = etree.Element('img', {
                'width': '100%',
                'src': m.group(3),
                'alt': m.group(2),
                'title': m.group(4)
            })
            a.append(img)
            return a
    
    if __name__ == '__main__':
        import markdown
        print(markdown.markdown('!![alt text](/images/image.png title text)', extensions=[Aimg()]))
    

    In your pelicanconf.py:

    import aimg
    MD_EXTENSIONS = ['codehilite(css_class=highlight)', 'extra', aimg.Aimg()]
    
    Bonus
    alternative solutions without an extension

    Yes, you can insert raw HTML in a markdown file

    <a href="path/image.png"><img width=100% title="title text" alt="alt text" src="path/image.png"></a>
    

    Yes, you can have them mixed. You cant add attributes tho

    [<img width=100% title="title text" alt="alt text" src="path/image.png">](path/image.png)
    

    Yes, with the extra extension you can have classes and modify them via CSS

    ![alt text](path/image.png title text){.classnamewith100%width}
    
    My version
    !![alt text](path/image.png title text)
    
¡ En Español !