From 544f9027934123301637b41f5bb9b1b896d2afa2 Mon Sep 17 00:00:00 2001 From: polyfloyd Date: Sun, 14 Jan 2024 21:23:19 +0100 Subject: [PATCH] Add Sligro support --- inflatinator/revbank.py | 40 +++++++++---- inflatinator/scrapers.py | 125 ++++++++++++++++++++++++++++++++++++++- requirements.txt | 1 + 3 files changed, 151 insertions(+), 15 deletions(-) diff --git a/inflatinator/revbank.py b/inflatinator/revbank.py index 034463e..2c8a621 100644 --- a/inflatinator/revbank.py +++ b/inflatinator/revbank.py @@ -7,7 +7,8 @@ profit_margin = Decimal('1.3') class AutoUpdate: - _meta_re = re.compile(r'#\s*(?Pah):(?P\S+)\s+(?P\d+)x$') + _ah_meta_re = re.compile(r'#\s*ah:(?P\S+)\s+(?P\d+)x$') + _sligro_meta_re = re.compile(r'^(?P\d{13})[^#]+#\s*sligro$') def __init__(self, vendor, sku, units): self.vendor = vendor @@ -15,22 +16,36 @@ class AutoUpdate: self.units = units def __str__(self): - return f'{self.vendor}:{self.sku} {self.units}x' + if self.vendor == 'sligro': + return f'{self.vendor}' + if self.units: + return f'{self.vendor}:{self.sku} {self.units}x' + return f'{self.vendor}:{self.sku}' @staticmethod def from_product_line(line): - m = AutoUpdate._meta_re.search(line) - if not m: - raise Exception('no auto update directive found') - return AutoUpdate(m['vendor'], m['sku'], int(m['units'])) + ah = AutoUpdate._ah_meta_re.search(line) + if ah: + return AutoUpdate('ah', ah['sku'], int(ah['units'])) + + sligro = AutoUpdate._sligro_meta_re.search(line) + if sligro: + return AutoUpdate('sligro', sligro['gtin13'], None) + + raise Exception('no auto update directive found') assert AutoUpdate.from_product_line('# ah:wi162664 8x') assert AutoUpdate.from_product_line('8711327538481,liuk 0.80 Ola Liuk # ah:wi162664 8x') +assert AutoUpdate.from_product_line('5000112659184 # sligro') +assert AutoUpdate.from_product_line('5000112659184 1.00 Cola Zero # sligro') +assert AutoUpdate.from_product_line('5000112659184,colazero 1.00 Cola Zero # sligro') def find_product_details(auto_update): if auto_update.vendor == 'ah': return scrapers.ah_get_by_sku(auto_update.sku, auto_update.units) + if auto_update.vendor == 'sligro': + return scrapers.sligro_get_by_gtin(auto_update.sku) raise Exception(f'unknown vendor: {auto_update.vendor}') @@ -51,16 +66,17 @@ def update_product_pricings(src): try: prod_info = find_product_details(auto_update) except Exception as err: - logging.error('could not update %s %s: %s', auto_update, err) + logging.error('could not update %s: %s', auto_update, err) lines_out.append(line) continue product_aliases = set() if not line.startswith('#'): - product_aliases = set(find_aliases.search(line)['aliases'].split(',')) - product_aliases.add(prod_info.gtin) - - aliases = ','.join(sorted(product_aliases)) + human_aliases = set(find_aliases.search(line)['aliases'].split(',')) + human_aliases -= set([prod_info.gtin]) + human_aliases -= set(prod_info.aliases) + human_aliases = sorted(human_aliases) + scannables = ','.join([prod_info.gtin, *prod_info.aliases, *human_aliases]) # Apply profit margin and divide by the number of units per sold packaging. unit_price = prod_info.price * profit_margin / prod_info.units @@ -68,7 +84,7 @@ def update_product_pricings(src): unit_price = (unit_price * 20).quantize(Decimal('1'), rounding=ROUND_UP) / 20 fmt_price = f'{unit_price:.2f}' - lines_out.append(f'{aliases:<15} {fmt_price:<6} {prod_info.name:<32} # {auto_update}') + lines_out.append(f'{scannables:<30} {fmt_price:<6} {prod_info.name:<60} # {auto_update}') logging.debug(f'Found "{prod_info.name}", buy €{prod_info.price/prod_info.units:.2f}, sell €{fmt_price}') diff --git a/inflatinator/scrapers.py b/inflatinator/scrapers.py index 45c8469..8972534 100644 --- a/inflatinator/scrapers.py +++ b/inflatinator/scrapers.py @@ -1,22 +1,27 @@ from decimal import Decimal +from functools import reduce from pyquery import PyQuery as pq import json import re +import os +import requests import subprocess +import logging class Product: - def __init__(self, name, price, gtin, units): + def __init__(self, *, name, price, gtin, units, aliases=[]): self.name = name self.price = price self.gtin = gtin self.units = units + self.aliases = aliases def __str__(self): return self.name -def get(url): +def links_get(url): compl = subprocess.run(['links', '-source', url], capture_output=True) return compl.stdout @@ -24,7 +29,7 @@ def get(url): def ah_get_by_sku(ah_sku, units): assert re.match('^wi\d+$', ah_sku) - html_src = get(f'https://www.ah.nl/producten/product/{ah_sku}') + html_src = links_get(f'https://www.ah.nl/producten/product/{ah_sku}') doc = pq(html_src) ld_jsons = doc('script[type="application/ld+json"]') @@ -41,3 +46,117 @@ def ah_get_by_sku(ah_sku, units): gtin=schema['gtin13'], units=units, ) + + +_sess = requests.Session() + +def sligro_client(): + global _sess + + if _sess.cookies: + return _sess + + username = os.getenv('SLIGRO_USERNAME') + password = os.getenv('SLIGRO_PASSWORD') + if not username: + raise Exception('missing SLIGRO_USERNAME') + if not password: + raise Exception('missing SLIGRO_PASSWORD') + + resp = _sess.post('https://www.sligro.nl/api/user/sligro-nl/nl/login', + json={'username': username, 'password': password, 'rememberMe': False}) + resp.raise_for_status() + logging.info('Sligro login ok!') + + return _sess + + +def sligro_get_by_gtin(gtin13): + assert re.match('^\d{13}$', gtin13) + gtin14 = f'{gtin13:0>14}' + + # The search feature of the website returns results in JSON and handles GTIN formats. Neat! + # However, it can be a bit picky about leading zeros, so we try to query with GTIN14 as that is + # what works in the most cases. Sometimes GTIN13 is still required though + for gtin_whatever in [gtin14, gtin13]: + response = requests.get(f'https://www.sligro.nl/api/product-overview/sligro-nl/nl/query/3?term={gtin_whatever}') + response.raise_for_status() + body = response.json() + if 'products' in body: + break + else: + raise Exception(f'sligro: {gtin13} not found') + + product = body['products'][0] + sku = product["code"] + + # Query the product page itself, there is more info that we need on there. The 'url' field in + # the product object gives a 404, but the actual product page URL can be created from the search + # results. + url_slug = '-'.join([product['brandName'], product['name'], product['contentDescription']])\ + .replace(' ', '-')\ + .replace('\'', '-')\ + .replace('&', '-')\ + .replace(',', '')\ + .replace('%', '')\ + .lower() + prod_resp = requests.get(f'https://www.sligro.nl/p.{sku}.html/{url_slug}.html') + prod_resp.raise_for_status() + + product_page = pq(prod_resp.text) + prod_ext_data_script = product_page('script[data-hypernova-key="ProductDetail"]') + prod_ext_data = json.loads(prod_ext_data_script[0].text.replace('', '')) + + # Most products contain products which have distinct barcodes. + sub_gtin = prod_ext_data['propsData']['data'].get('gtinUnderlyingUnit', None) + if sub_gtin: + sub_gtin = sub_gtin.lstrip('0') + + # The contentDescription field holds the number of individual packages per box sold. + units, volume = parse_content_description(product['contentDescription']) + + # Pricing requires logging in and is on a separate endpoint... + pricing_resp = sligro_client().get(f'https://www.sligro.nl/api/cart/sligro-nl/customerorganizationdatas?productCodes={sku}') + pricing = pricing_resp.json()['data']['products'][0] + + # If fromPrice is present, this product has a temporary discount. We prefer the regular price as + # we do not want to make a loss on stock that was purchased earlier. + if (from_price := pricing.get('fromPrice')): + price_obj = from_price + else: + price_obj = pricing['price'] + + return Product( + name=f'{product["brandName"]} {product["name"]} ({volume})', + price=Decimal(price_obj['value']), + gtin=gtin13, + units=units, + aliases=[sub_gtin] if sub_gtin else [], + ) + + +# The contentDescription seems to have a formatting consistent enough for regex matching. Some +# products have multiple levels of packaging, but the last or only component is always the +# volume or weight. +def parse_content_description(cd): + # These ones are weird. + if cd.endswith(' rollen'): + return int(cd.split(' ')[0]), 'rol' + if (m := re.search('^Pak (\d+) stuks$', cd)): + return int(m[1]), '' + + groups = re.split('\s+x\s+', cd) + volume = groups[-1] + unit_groups = groups[:-1] + + sub_units = (int(re.search('(\d+)', g)[0]) for g in unit_groups) + units = reduce(lambda a, b: a * b, sub_units, 1) + + return units, volume + +assert parse_content_description('40 stuks x 22,5 gram') == (40, '22,5 gram') +assert parse_content_description('4 multipacks x 6 blikjes x 33 cl') == (24, '33 cl') +assert parse_content_description('24 2-packs x 70 gram') == (24, '70 gram') +assert parse_content_description('Tray 12 x 40 gram') == (12, '40 gram') +assert parse_content_description('36 rollen') == (36, 'rol') +assert parse_content_description('Pak 10 stuks') == (10, '') diff --git a/requirements.txt b/requirements.txt index 91de07b..ff3c59d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ pyquery +requests