revbank-inflatinator/inflatinator/revbank.py
polyfloyd 9d16212e2e
All checks were successful
Test / pytest (push) Successful in 19s
sligro: Keep track of announced substitute products
2025-04-21 22:52:31 +02:00

158 lines
5.3 KiB
Python

from dataclasses import dataclass
from decimal import Decimal, ROUND_UP
from typing import Dict, Optional, List
import logging
import scrapers
import shlex
def resale_price(prod: scrapers.Product) -> Decimal:
profit_margin = Decimal('1.3')
# Apply profit margin and divide by the number of units per sold packaging.
unit_price = prod.price * profit_margin / prod.units
# Round up to 5ct.
return (unit_price * 20).quantize(Decimal('1'), rounding=ROUND_UP) / 20
@dataclass
class Product:
aliases: List[str]
price: Decimal
description: str
metadata: Dict[str, Optional[str]]
@staticmethod
def from_line(line: str) -> "Product":
if not line.strip():
raise Exception('line is empty')
if line.startswith('#'):
raise Exception('line is a comment')
fields = shlex.split(line)
aliases = fields[0].split(',')
price = Decimal(fields[1])
description = fields[2]
# TODO: support addons
metadata = {}
for f in fields:
if f.startswith('#'):
s = f.lstrip('#').split('=')
(k, v) = (s[0], None) if len(s) == 1 else s
metadata[k] = v
return Product(
aliases=aliases,
price=price,
description=description,
metadata=metadata,
)
@staticmethod
def from_scraper(prod: scrapers.Product):
return Product(
aliases=[prod.gtin, *sorted(prod.aliases)],
price=resale_price(prod),
description=prod.name,
metadata={},
)
def format_line(self):
aliases = ','.join(self.aliases)
price = f'{self.price:.2f}'
description = f'"{self.description}"'
metadata = ' '.join(sorted(f'#{k}' if v is None else f'#{k}={v}' for (k, v) in self.metadata.items()))
accum = ''
for col, txt in [
(30, aliases),
(37, price),
(98, description),
(0, metadata),
]:
accum += txt + ' ' + ' '*max(0, col-len(accum)-len(txt))
return accum.rstrip()
class NoAutoUpdate(Exception):
def __init__(self):
super().__init__('no auto update directive')
def find_product_details(product: Product):
if 'ah' in product.metadata:
return scrapers.ah_get_by_gtin(product.aliases[0])
if 'sligro' in product.metadata:
return scrapers.sligro_get_by_gtin(product.aliases[0])
raise NoAutoUpdate()
def update_product_pricings(src):
lines_out = []
for line in src.split('\n'):
try:
product = Product.from_line(line)
except Exception:
lines_out.append(line)
continue
try:
prod_info = find_product_details(product)
except NoAutoUpdate:
logging.debug('no auto update: "%s"', product.description)
lines_out.append(line)
continue
except scrapers.ProductNotFoundError:
product.metadata['err'] = 'not_found'
# If this product was not found and has a substitute reference in its metadata, clear the scraping trigger.
if 'sub' in product.metadata and 'sligro' in product.metadata:
del product.metadata['sligro']
logging.info('"%s" is EOL', product.description)
else:
logging.warn('not found "%s"', product.description)
lines_out.append(product.format_line())
continue
except Exception as err:
logging.error('did not update "%s": %s', product.description, err)
lines_out.append(line)
continue
new_product = Product.from_scraper(prod_info)
product.description = new_product.description
logging.debug(f'Found "{new_product.description}", buy €{prod_info.price/prod_info.units:.2f}, sell €{new_product.price:.2f}')
# Merge any new aliases, keeping the gtin first.
product.aliases = [
prod_info.gtin,
*sorted((set(product.aliases) | set(new_product.aliases)) - {prod_info.gtin})
]
# Adjust the price.
previous_price = product.price
product.price = new_product.price
if product.price != previous_price:
logging.info(f'Adjusted "{product.description}", €{previous_price:.2f} -> €{product.price:.2f}')
# If this product had an error set for any reason, clear it.
if 'err' in product.metadata:
del product.metadata['err']
# If this product is being substituted by another product, leave a reference.
# This will also help us determine whether the substitute has been rewritten already.
substitute_product = None
if prod_info.replacement is not None:
if 'sub' not in product.metadata:
substitute_product = Product.from_scraper(prod_info.replacement)
substitute_product.metadata['sligro'] = None
product.metadata['sub'] = prod_info.replacement.gtin
lines_out.append(product.format_line())
if substitute_product is not None:
lines_out.append(substitute_product.format_line())
logging.info(f'Found replacement of "{product.description}": {substitute_product.aliases[0]}')
return '\n'.join(lines_out)