408 lines
12 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-only
# Copyright (c) Camden Dixie O'Brien
import abc
import argparse
import enum
import socket
import struct
import sys
import time
class Weekday:
MONDAY = 0
TUESDAY = 1
WEDNESDAY = 2
THURSDAY = 3
FRIDAY = 4
SATURDAY = 5
SUNDAY = 6
class Alarm:
SERIALIZED_LENGTH = 3
@staticmethod
def deserialize(alarm_bytes):
if len(alarm_bytes) != Alarm.SERIALIZED_LENGTH:
raise RuntimeError("Wrong number of bytes for alarm")
hour = alarm_bytes[0]
minute = alarm_bytes[1]
days = Alarm._read_day_bitmask(alarm_bytes[2])
return Alarm(hour, minute, days)
@staticmethod
def _read_day_bitmask(bitmask):
days = set()
for i in range(7):
if (bitmask >> i & 1) != 0:
days.add(Weekday(i))
return days
@staticmethod
def _make_day_bitmask(days):
bitmask = 0
for i in range(7):
if Weekday(i) in days:
bitmask |= 1 << i
return bitmask
def __init__(self, hour, minute, days):
self.hour = hour
self.minute = minute
self.days = days
def serialize(self):
day_bitmask = Alarm._make_day_bitmask(self.days)
return bytes([self.hour, self.minute, day_bitmask])
class Type(enum.Enum):
U8 = 0
U16 = 1
U32 = 2
U64 = 3
STRING = 4
ALARM = 5
@staticmethod
def length(type):
if type == Type.U8:
return 1
elif type == Type.U16:
return 2
elif type == Type.U32:
return 4
elif type == Type.U64:
return 8
elif type == Type.STRING:
return None
elif type == Type.ALARM:
return Alarm.SERIALIZED_LENGTH
else:
raise RuntimeError("Invalid param type")
class Param:
@staticmethod
def deserialize(self, type, value_bytes):
return Param(type, Param._deserialize_value(type, value_bytes))
@staticmethod
def _deserialize_value(self, type, value_bytes):
if type == Type.U8:
return value_bytes[0]
elif type == Type.U16:
return struct.unpack('!H', value_bytes)[0]
elif type == Type.U32:
return struct.pack('!I', value_bytes)[0]
elif type == Type.U64:
return struct.pack('!Q', value_bytes)[0]
elif type == Type.STRING:
# Strings handled separately due to variable length
raise NotImplemented
elif type == Type.ALARM:
return Alarm.deserialize(value_bytes)
else:
raise RuntimeError("Invalid param type")
def __init__(self, type, value):
self.type = type
self.value = value
def serialize(self):
if self.type == Type.U8:
return bytes([self.value])
elif self.type == Type.U16:
return struct.pack('!H', self.value)
elif self.type == Type.U32:
return struct.pack('!I', self.value)
elif self.type == Type.U64:
return struct.pack('!Q', self.value)
elif self.type == Type.STRING:
return bytes([len(self.value)]) + self.value.encode('utf-8')
elif self.type == Type.ALARM:
return self.value.serialize()
else:
raise RuntimeError("Invalid param type")
class Instruction(enum.Enum):
GET_VERSION = 0
SET_TIME = 1
LIST_ALARMS = 2
ADD_ALARM = 3
REMOVE_ALARM = 4
LIST_SETTINGS = 5
SET_SETTING = 6
class CommandFailed(Exception):
pass
class Command(abc.ABC):
def __init__(self, instruction, *params):
self.instruction = instruction
self.params = params
def serialize(self):
param_bytes = [param.serialize() for param in self.params]
return bytes([instruction]) + b''.join(param_bytes)
def run(self, connection):
response = connection.run(self)
if response.status != Status.OK:
raise CommandFailed()
return self.proc_response(response.params)
@abc.abstractmethod
def proc_response(self, params):
pass
class GetVersion(Command):
def __init__(self):
super().__init__(Instruction.GET_VERSION)
def proc_response(self, params):
return (params[0].value, params[1].value)
class SetTime(Command):
def __init__(self, timestamp):
super().__init__(Instruction.SET_TIME, Param(Type.U64, timestamp))
class ListAlarms(Command):
def __init__(self):
super().__init__(Instruction.LIST_ALARMS)
def proc_response(self, params):
pairs = zip(params[::2], params[1::2])
return {key.value: value.value for key, value in pairs}
class AddAlarm(Command):
def __init__(self, hour, minute, days):
alarm = Alarm(hour, minute, days)
super().__init__(Instruction.ADD_ALARM, Param(Type.ALARM, alarm))
def proc_response(self, params):
return params[0].value
class RemoveAlarm(Command):
def __init__(self, alarm_id):
super().__init__(Instruction.REMOVE_ALARM, Param(Type.U8, alarm_id))
class ListSettings(Command):
def __init__(self):
super().__init__(Instruction.LIST_SETTINGS)
def proc_response(self, params):
pairs = zip(params[::2], params[1::2])
return {key.value: value.value for key, value in pairs}
class SetSetting(Command):
def __init__(self, key, value):
key_param = Param(Type.STRING, key)
value_param = Param(Type.STRING, value)
super().__init__(Instruction.SET_SETTING, key_param, value_param)
class Status(enum.Enum):
OK = 0
BUSY = 1
INVALID_COMMAND = 2
INTERNAL_ERROR = 3
OUT_OF_SPACE = 4
@staticmethod
def description(status):
if status == OK:
return "OK"
elif status == BUSY:
return "Busy"
elif status == INVALID_COMMAND:
return "Invalid command"
elif status == INTERNAL_ERROR:
return "Internal error"
elif status == OUT_OF_SPACE:
return "Device out of space"
else:
raise RuntimeError("Invalid status")
class Response:
def __init__(self, status, params):
self.status = status
self.params = params
class Connection:
PORT = 3498
def __init__(self, host):
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.connect((host, self.PORT))
def run(self, command):
self._send(command)
return self._receive_response()
def close(self):
self.socket.close()
def _send(self, command):
command_bytes = command.serialize()
total_sent = 0
while total_sent < len(command_bytes):
sent = self.socket.send(command_bytes[total_sent:])
if sent == 0:
raise RuntimeError("Connection broken")
total_sent += sent
def _receive_response(self):
status, param_count = self._receive(2)
params = [self._receive_param() for _ in range(param_count)]
return Response(status, params)
def _receive_param(self):
type = self._receive(1)
if type == Type.STRING:
length = self._receive(1)
value = self._receive(length).decode('utf-8')
return Param(type, value)
else:
value_bytes = self._receive(Type.length(type))
return Param.deserialize(type, value_bytes)
def _receive(self, n):
chunks = []
remaining = n
while remaining > 0:
chunk = self.socket.recv(remaining)
if len(chunk) == 0:
raise RuntimeError("Connection broken")
remaining -= len(chunk)
chunks.append(chunk)
return b''.join(chunks)
def version_subcommand(connection, args):
major, minor = GetVersion().run(connection)
print(f"{major}.{minor}")
def sync_subcommand(connection, args):
timestamp = int(time.time())
SetTime(timestamp).run(connection)
def alarms_subcommand(connection, args):
def read_dayspec(dayspec):
days = set()
for i in range(7):
if dayspec[i] == 'x':
days.add(Weekday(i))
return days
def format_dayspec(days):
dayspec = ""
for i in range(7):
dayspec += 'x' if Weekday(i) in days else '-'
return dayspec
if args.action == None:
alarms = ListAlarms().run(connection)
for id, alarm in alarms.items():
dayspec = format_dayspec(alarm.days)
print(f"[{id:2}] {alarm.hour:02}:{alarm.minute:02} {dayspec}")
elif args.action == "add":
hour, minute = args.time.split(':')
days = read_dayspec(args.dayspec)
id = AddAlarm(int(hour), int(minute), days).run(connection)
print(f"{id}")
elif args.action == "rm":
RemoveAlarm(int(args.id)).run(connection)
def settings_subcommand(connection, args):
if args.action == None:
settings = ListSettings().run(connection)
for key, value in settings.items():
print(f"{key:15} {value}")
elif args.action == "set":
SetSetting(args.key, args.value).run(connection)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Control a bedside clock remotely")
parser.add_argument(
"--host", help="The hostname/IP of the clock",
default="bedside-clock")
subparsers = parser.add_subparsers(
help="Action to perform", dest="subcommand", required=True)
version_parser = subparsers.add_parser(
"version", help="Get the version of the clock",
description = "Get the version of the clock")
sync_parser = subparsers.add_parser(
"sync", help="Syncronise the clock to this machine",
description="Syncronise the clock to this machine")
alarms_parser = subparsers.add_parser(
"alarms", help="List and manage alarms",
description="List and manage alarms")
settings_parser = subparsers.add_parser(
"settings", help="List and manage settings",
description="List and manage settings")
alarms_subparsers = alarms_parser.add_subparsers(dest="action")
alarms_add_parser = alarms_subparsers.add_parser(
"add", help="Add an alarm", description="Add an alarm")
alarms_rm_parser = alarms_subparsers.add_parser(
"rm", help="Remove an alarm", description="Remove an alarm")
alarms_add_parser.add_argument(
"--workdays", const="xxxxx--", dest="dayspec", action="store_const")
alarms_add_parser.add_argument(
"--weekends", const="-----xx", dest="dayspec", action="store_const")
alarms_add_parser.add_argument(
"time", help="The time for the alarm (HH:MM)")
alarms_add_parser.add_argument(
"dayspec", help="""Days for the alarm to be active on.
Should be a sequence of 'x's and '-'s. For example,
\"x-x----\" means Mondays and Tuesdays only.""",
default="xxxxxxx")
alarms_rm_parser.add_argument(
"id", help="ID of the alarm to remove")
settings_subparsers = settings_parser.add_subparsers(dest="action")
settings_set_parser = settings_subparsers.add_parser(
"set", help="Set a setting", description="Set a setting")
settings_set_parser.add_argument(
"key", help="Key of the setting to set")
settings_set_parser.add_argument(
"value", help="Value of the setting to set")
args = parser.parse_args()
subcommand_map = {
"version": version_subcommand,
"sync": sync_subcommand,
"alarms": alarms_subcommand,
"settings": settings_subcommand,
}
connection = Connection(args.host)
subcommand_map[args.subcommand](connection, args)
connection.close()