bot/main.py

157 lines
4.6 KiB
Python
Executable file

#!/usr/bin/env python3
import asyncio
import os
import sys
from time import sleep
import aiomqtt
import pytz
from discord import Intents
from discord.ext import commands
from discord_webhook import DiscordEmbed, DiscordWebhook
mqtt_host = os.getenv("MQTT_HOST")
if not mqtt_host:
print("MQTT_HOST unset")
sys.exit(1)
token = os.getenv("DISCORD_TOKEN")
if not token:
print("DISCORD_TOKEN unset")
sys.exit(1)
webhook_url = os.getenv("DISCORD_WEBHOOK_URL")
if not webhook_url:
print("DISCORD_WEBHOOK_URL unset")
sys.exit(1)
timezone = pytz.timezone("Europe/Amsterdam")
# Discord bot stuff
intents = Intents.default()
intents.message_content = True
intents.members = True
HobbyBot = commands.Bot(command_prefix="!", description="Bitlair Bot", intents=intents)
async def mqtt_get_one(topic, timeout=20):
async with asyncio.timeout(timeout):
async with aiomqtt.Client(mqtt_host) as mq:
await mq.subscribe(topic)
return await anext(mq.messages)
# Define bot commands
@HobbyBot.event
async def on_ready():
print(f"Logged in as {HobbyBot.user} (ID: {HobbyBot.user.id})")
# !state
@HobbyBot.command(description="Bitlair Space State")
async def state(ctx):
async with ctx.typing():
try:
msg = await mqtt_get_one("bitlair/state")
space_state = msg.payload.decode("ascii")
if space_state == "open":
await ctx.send("Bitlair is OPEN! :sunglasses:")
elif space_state == "closed":
await ctx.send("Bitlair is closed :pensive:")
except Exception as err:
await ctx.send("Meh, stuk")
raise err
# !co2
@HobbyBot.command(description="co2 levels")
async def co2(ctx):
async with ctx.typing():
try:
msg = await mqtt_get_one("bitlair/climate/hoofdruimte/co2_ppm")
await ctx.send(f"Hoofdruimte: {msg.payload.decode('ascii')} ppm\n")
except Exception as err:
await ctx.send("Meh, stuk")
raise err
# !temp
@HobbyBot.command(description="Temperature")
async def temp(ctx):
async with ctx.typing():
try:
msg = await mqtt_get_one("bitlair/climate/hoofdruimte/temperature_c")
await ctx.send(f"Hoofdruimte: {msg.payload.decode('ascii')} °C\n")
except Exception as err:
await ctx.send("Meh, stuk")
raise err
# !humid
@HobbyBot.command(description="Humidity")
async def humid(ctx):
async with ctx.typing():
try:
msg = await mqtt_get_one("bitlair/climate/hoofdruimte/humidity_pct")
await ctx.send(f"Hoofdruimte: {msg.payload.decode('ascii')} pct\n")
except Exception as err:
await ctx.send("Meh, stuk")
raise err
# !np
@HobbyBot.command(description="Now Playing")
async def np(ctx):
async with ctx.typing():
await ctx.send("Now playing: Darude - Sandstorm")
def webhook_message(msg):
webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True, content=msg)
webhook.execute()
async def event_task():
retained = {
"bitlair/alarm",
"bitlair/photos",
"bitlair/state",
"bitlair/state/djo",
}
async with aiomqtt.Client(mqtt_host) as mq:
await asyncio.gather(*[mq.subscribe(topic) for topic in retained])
async for msg in mq.messages:
# Retained messages trigger an initial message on connecting. Prevent relaying them to Discord on startup.
if str(msg.topic) in retained:
retained.remove(str(msg.topic))
continue
payload = msg.payload.decode("ascii")
if msg.topic.matches("bitlair/alarm"):
webhook_message(f"Alarm: {payload}")
elif msg.topic.matches("bitlair/state"):
webhook_message(f"Bitlair is now {payload.upper()}")
elif msg.topic.matches("bitlair/state/djo"):
webhook_message(f"DJO is now {payload.upper()}")
elif msg.topic.matches("bitlair/photos"):
webhook = DiscordWebhook(url=webhook_url, rate_limit_retry=True)
embed = DiscordEmbed(title="WIP Cam", color="fc5d1d")
embed.set_url(f"https://bitlair.nl/fotos/view/{payload}")
embed.set_image(f"https://bitlair.nl/fotos/photos/{payload}")
webhook.add_embed(embed)
webhook.execute()
else:
continue
sleep(1) # Prevent triggering rate limits.
async def main():
t1 = asyncio.create_task(HobbyBot.start(token))
t2 = asyncio.create_task(event_task())
await asyncio.gather(t1, t2)
asyncio.run(main())