Update to new RevBank v6 metadata format

This commit is contained in:
polyfloyd 2024-01-20 20:32:48 +01:00
parent 8a9c7113f8
commit 1abc17d8e9
2 changed files with 84 additions and 71 deletions

View file

@ -1,91 +1,109 @@
from dataclasses import dataclass
from decimal import Decimal, ROUND_UP
from typing import Dict, Optional, List
import logging
import re
import scrapers
import shlex
profit_margin = Decimal('1.3')
class AutoUpdate:
_ah_meta_re = re.compile(r'#\s*ah:(?P<sku>\S+)\s+(?P<units>\d+)x$')
_sligro_meta_re = re.compile(r'^(?P<gtin13>\d{13})[^#]+#\s*sligro$')
def __init__(self, vendor, sku, units):
self.vendor = vendor
self.sku = sku
self.units = units
def __str__(self):
if self.vendor == 'sligro':
return f'{self.vendor}'
if self.units:
return f'{self.vendor}:{self.sku} {self.units}x'
return f'{self.vendor}:{self.sku}'
@dataclass
class Product:
aliases: List[str]
price: Decimal
description: str
metadata: Dict[str, Optional[str]]
@staticmethod
def from_product_line(line):
ah = AutoUpdate._ah_meta_re.search(line)
if ah:
return AutoUpdate('ah', ah['sku'], int(ah['units']))
def from_line(line: str) -> "Product":
if not line.strip():
raise Exception('line is empty')
if line.startswith('#'):
raise Exception('line is a comment')
sligro = AutoUpdate._sligro_meta_re.search(line)
if sligro:
return AutoUpdate('sligro', sligro['gtin13'], None)
fields = shlex.split(line)
aliases = fields[0].split(',')
price = Decimal(fields[1])
description = fields[2]
# TODO: support addons
raise Exception('no auto update directive found')
metadata = {}
for f in fields:
if f.startswith('#'):
s = f.lstrip('#').split('=')
(k, v) = (s[0], None) if len(s) == 1 else s
metadata[k] = v
assert AutoUpdate.from_product_line('# ah:wi162664 8x')
assert AutoUpdate.from_product_line('8711327538481,liuk 0.80 Ola Liuk # ah:wi162664 8x')
assert AutoUpdate.from_product_line('5000112659184 # sligro')
assert AutoUpdate.from_product_line('5000112659184 1.00 Cola Zero # sligro')
assert AutoUpdate.from_product_line('5000112659184,colazero 1.00 Cola Zero # sligro')
return Product(
aliases=aliases,
price=price,
description=description,
metadata=metadata,
)
def format_line(self):
aliases = ','.join(self.aliases)
price = f'{self.price:.2f}'
description = f'"{self.description}"'
metadata = ' '.join(sorted(f'#{k}' if v is None else f'#{k}={v}' for (k, v) in self.metadata.items()))
return f'{aliases:<30} {price:<6} {description:<60} {metadata}'
def find_product_details(auto_update):
if auto_update.vendor == 'ah':
return scrapers.ah_get_by_sku(auto_update.sku, auto_update.units)
if auto_update.vendor == 'sligro':
return scrapers.sligro_get_by_gtin(auto_update.sku)
raise Exception(f'unknown vendor: {auto_update.vendor}')
assert Product.from_line('8711327538481,liuk 0.80 "Ola Liuk" #ah=wi162664 #qty=8') == \
Product(['8711327538481','liuk'], Decimal('0.8'), 'Ola Liuk', {'ah': 'wi162664', 'qty': '8'})
assert Product.from_line('5000112659184,colazero 1.00 "Cola Zero" #sligro') == \
Product(['5000112659184','colazero'], Decimal(1), 'Cola Zero', {'sligro': None})
assert Product.from_line('8711327538481,liuk 0.80 "Ola Liuk" #ah=wi162664 #qty=8').format_line() == \
'8711327538481,liuk 0.80 "Ola Liuk" #ah=wi162664 #qty=8'
assert Product(['5000112659184','colazero'], Decimal(1), 'Cola Zero', {'sligro': None}).format_line() == \
'5000112659184,colazero 1.00 "Cola Zero" #sligro'
class NoAutoUpdate(Exception):
def __init__(self):
super().__init__('no auto update directive')
def find_product_details(product: Product):
if (ah_sku := product.metadata.get('ah', None)):
return scrapers.ah_get_by_sku(ah_sku, int(product.metadata['qty']))
if 'sligro' in product.metadata:
return scrapers.sligro_get_by_gtin(product.aliases[0])
raise NoAutoUpdate()
def update_product_pricings(src):
find_aliases = re.compile(r'^(?P<aliases>\S+)')
lines = src.split('\n')
lines_out = []
for line in lines:
for line in src.split('\n'):
try:
auto_update = AutoUpdate.from_product_line(line)
logging.debug('Found updatable product: %s', auto_update)
product = Product.from_line(line)
except Exception as err:
lines_out.append(line)
continue
try:
prod_info = find_product_details(auto_update)
prod_info = find_product_details(product)
except NoAutoUpdate:
logging.debug('no auto update: %s', product)
lines_out.append(line)
continue
except Exception as err:
logging.error('could not update %s: %s', auto_update, err)
logging.error('did not update %s: %s', product, err)
lines_out.append(line)
continue
product_aliases = set()
if not line.startswith('#'):
human_aliases = set(find_aliases.search(line)['aliases'].split(','))
human_aliases -= set([prod_info.gtin])
human_aliases -= set(prod_info.aliases)
human_aliases = sorted(human_aliases)
scannables = ','.join([prod_info.gtin, *prod_info.aliases, *human_aliases])
human_aliases = sorted(set(product.aliases) - set([prod_info.gtin]) - set(prod_info.aliases))
product.aliases = [prod_info.gtin, *prod_info.aliases, *human_aliases]
# Apply profit margin and divide by the number of units per sold packaging.
unit_price = prod_info.price * profit_margin / prod_info.units
# Round up to 5ct.
unit_price = (unit_price * 20).quantize(Decimal('1'), rounding=ROUND_UP) / 20
product.price = (unit_price * 20).quantize(Decimal('1'), rounding=ROUND_UP) / 20
fmt_price = f'{unit_price:.2f}'
lines_out.append(f'{scannables:<30} {fmt_price:<6} {prod_info.name:<60} # {auto_update}')
lines_out.append(product.format_line())
logging.debug(f'Found "{prod_info.name}", buy €{prod_info.price/prod_info.units:.2f}, sell €{fmt_price}')
logging.debug(f'Found "{prod_info.name}", buy €{prod_info.price/prod_info.units:.2f}, sell €{product.price:.2f}')
return '\n'.join(lines_out)