Website change detection

I have used https://www.changedetection.com/ for a while. But it has always annoyed me that they don't send me the actual diff when they send me a change notification, just a link to a status page. So here is a simple self hosted version, written in Python, using a headless Chrome (to ensure we render JavaScript content). You specify URLs and the divs' id/class in the file - internal state is tracked in some plain text files - and voilà.

This way you can track changes to local restaurant's weekly menu, slow moving government cases, job listings or privacy policies that your providers chooses to change whenever they feel like it.

You can download both selenium and BeautifulSoup from pip. You will also need to download ChromeDriver.

change_detection.py

#!/usr/bin/python3

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import difflib
import hashlib
import os

state_directory = "/tmp/"

pages = [
    ("https://stortinget.no/no/Stottemeny/Stilling-ledig/", ".jobbnorge-joblist-table"),
    ("https://stortinget.no/no/Saker-og-publikasjoner/Sporsmal/Skriftlige-sporsmal-og-svar/Skriftlig-sporsmal/?qid=74380", "#main-content"),
    ("https://www.smalhans.no/matogvin", '.menu-block'),
    ("https://www.digitalocean.com/legal/privacy-policy/", ".www-Section")
]

chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_driver = "/path/to/chromedriver"

def get_element_from_url(url, selector):
    driver = webdriver.Chrome(
        chrome_options=chrome_options, executable_path=chrome_driver
    )
    driver.get(url)
    html = driver.execute_script("return document.body.innerHTML")
    soup = BeautifulSoup(html)
    selected = soup.select(selector)
    if not selected:
        print("Selector {} not found at {}".format(selector, url))
        return None
    if len(selected) > 1:
        print("Found multiple items with selector {} at {}. Selecting first.".format(selector, url))

    return selected[0].prettify()

for url, selector in pages:
    identifier = hashlib.md5((url+selector).encode('utf-8')).hexdigest()
    file_name = "{}{}.txt".format(state_directory, identifier)

    html_new = get_element_from_url(url, selector)

    # cound not get content
    if not html_new:
        continue

    # save new input and continue, we will check diff next time
    if not os.path.exists(file_name):
        f = open(file_name, "w+")
        f.write(html_new)
        continue

    f = open(file_name, "r")
    html_old = f.read()

    diff = difflib.unified_diff(
        html_old.splitlines(),
        html_new.splitlines()
    )

    diff_output = '\n'.join(list(diff)[3:]) # The four first lines are just +++---

    if diff_output:
        print(' ***** {} ***** '.format(url))
        print("\n" + diff_output + "\n")
        # save new version to file
        f = open(file_name, "w+")
        f.write(html_new)

Put this in your crontab on a server near you and let crontab send you an email when the script outputs a change.

0 * * * * /home/badguy90/bin/change_detection.py

A bonus; colorize the content of the emails in mutt to make the diff easier to read, just like git. Put this in your .muttrc.

# colorfull diffs in email
color body green default "^diff \-.*"
color body green default "^index [a-f0-9].*"
color body green default "^\-\-\- .*"
color body green default "^[\+]{3} .*"
color body cyan default "^[\+][^\+]+.*"
color body red  default "^\-[^\-]+.*"
color body brightblue default "^@@ .*"

Nice.

RSS reader with RegEx matching and push notifications

Many years ago I made an awful bash script which subscribed to a couple of RSS feeds and pushed notifications to me if it found keywords of value in the title. In the event doing some changes, I made the jump to python. Here is the gist of it, where you easily can substitute the choice of database and mobile push service. I use pushover (which cost a couple of dollars, but well worth it) and sqlite3.

#!/usr/bin/python3

import feedparser # pip3 install
import http.client
import os
import re
import sqlite3
import sys
import urllib

sqlite_db_file = '/home/user/.rss-reader.db'
pushover_token = 'xxx'
pushover_userkey = 'xxx'

feeds = [
    {
        'name': 'Lokalavis 1',
        'url': 'http://lokalavis.no/rss.xml',
        'keywords': ['.']
    },
    {
        'name': 'Aftenbladet',
        'url': 'http://www.aftenbladet.no/rss',
        'keywords': [
            'Ryfylke',
            'Lokalt sted 1',
            'Lokalt sted 2',
            'fly(plass|krasj|ulykke)',
            'ryfast',
            'skogbrann'
         ]
    },
    {
        'name': 'Reddit Frontpage',
        'url': 'https://www.reddit.com/.rss',
        'keywords': ['norw(ay|egian)']},
    {
        'name': 'Reddit Linux',
        'url': 'https://www.reddit.com/r/linux.rss',
        'keywords': ['debian', 'vim', 'lwn', 'stallman']
    },
    {
         'name': 'HackerNews',
         'url': 'https://hnrss.org/frontpage',
         'keywords': [
             'postgres',
             'norw(ay|egian)',
             'debian',
             '
         ]
     }
]

db = sqlite3.connect(sqlite_db_file)
c = db.cursor()
create_table = '''CREATE TABLE IF NOT EXISTS entries (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    url TEXT,
                    summary TEXT
                  )'''
c.execute(create_table)
db.commit()

def tty():
    return os.isatty(sys.stdin.fileno())

def save_in_db(url, summary):
    c = db.cursor()
    insert = "INSERT INTO entries VALUES (NULL, ?, ?);"
    c.execute(insert, [url, summary])
    db.commit()

def link_in_db(link):
    c = db.cursor()
    c.execute("SELECT * FROM entries where url = ?", [link])
    if len(c.fetchall()) > 0:
        return True
    return False

def push(title, summary, link):
    conn = http.client.HTTPSConnection("api.pushover.net:443")
    conn.request("POST", "/1/messages.json",
      urllib.parse.urlencode({
        "token": pushover_token,
        "user": pushover_userkey,
        "title": title,
        "message": summary,
        "url": link
      }), { "Content-type": "application/x-www-form-urlencoded" })
    conn.getresponse()

for feed in feeds:
    if tty(): print("Fetching {}".format(feed['url']))
    p = feedparser.parse(feed['url'])
    for entry in p.entries:
        link = entry.link
        summary = entry.summary
        for keyword in feed['keywords']:
            if re.search(keyword, entry.summary) and not link_in_db(link):
                if tty(): print("Keyword hit '{}' in {}".format(keyword, entry.title))

                push_title = "{} ({})".format(feed['name'], keyword)
                title_and_summary = "{} - {}".format(entry.title, entry.summary)

                push(push_title, title_and_summary, link)
                save_in_db(link, title_and_summary)

Add to cron:

*/10 * * * * /home/user/bin/rss-reader.py

Profit!

I have added the field summary in my database for possible later use.

Here is a screenshot of the Pushover telling me the local ferry is having trouble. (-:

pushover

REST stats

stats

This started by me wanting to graph up from some arbitrary data sources, and I wanted to store the data in my postgres database for learning purposes. So I went down the rabbit hole and looked for a statsd-ish services with a Postgres backend, but everything was either a bit to much or a bit to little. So I ended up writing a simple app for my Django project lekvam.no and piggy backed the ubiquitous REST API and the Postgres backend I already had set up.

It is nothing fancy, for gauge measurements; I collect incoming values to the following models. If the gauge's slug (name) is not seen before, its created, so I can dynamically add new measurements from scripts or other sources. The secret is added when the gauge is created and must be present when you want to add data points.

class Gauge(models.Model):
    slug = models.CharField(max_length=100, unique=True)
    title = models.CharField(max_length=100)
    created_at = models.DateTimeField(auto_now_add=True)
    secret = models.TextField(null=True, blank=True)
    desc = models.TextField(null=True, blank=True)
    unit = models.TextField(null=True, blank=True)
    deleted = models.DateTimeField(null=True, blank=True)

class GaugeValue(models.Model):
    gauge = models.ForeignKey(Gauge, on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    value = models.FloatField()

I have made a simple dashboard which draws graphs from the latest data on https://lekvam.no/stats/ and you can browse historical data by navigating further. For now, every graph is aggregating over all the values in the given timespan and it is snappy enough. The graphs are made by the Django app graphos which has a collection of straightforward graph types.

A gague can be created, and enriched, with:

curl -s --get \
     --data-urlencode "secret=s3cr3t" \
     --data-urlencode "value=$VALUE" \
     https://lekvam.no/stats/gauge/$SLUG/add

The whole code can be found on github.

Sigma 30mm f/1.4

I've bought myself a used Sigma 30mm f/1.4 DC DN for my Sony a6000 this winter, but I have not had a good opportunity to get the potential out of it before 17. May, our National Day. I am really happy about the optics, but the focus by wire system (the focus ring is not mechanical) is a bit paint. If you most often run on auto-focus anyway, this is a great bang for the buck.

More from the National Day here.

Roy