From 8fdc409a397333f1e003dc72259af3974c0ae1c6 Mon Sep 17 00:00:00 2001 From: Camden Dixie O'Brien Date: Sun, 21 May 2023 20:53:57 +0100 Subject: [PATCH] Create Python remote CLI --- remote-cli/bclk.py | 407 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 407 insertions(+) create mode 100644 remote-cli/bclk.py diff --git a/remote-cli/bclk.py b/remote-cli/bclk.py new file mode 100644 index 0000000..5bcc262 --- /dev/null +++ b/remote-cli/bclk.py @@ -0,0 +1,407 @@ +# SPDX-License-Identifier: AGPL-3.0-only +# Copyright (c) Camden Dixie O'Brien + +import abc +import argparse +import enum +import socket +import struct +import sys +import time + + +class Weekday: + MONDAY = 0 + TUESDAY = 1 + WEDNESDAY = 2 + THURSDAY = 3 + FRIDAY = 4 + SATURDAY = 5 + SUNDAY = 6 + + +class Alarm: + SERIALIZED_LENGTH = 3 + + @staticmethod + def deserialize(alarm_bytes): + if len(alarm_bytes) != Alarm.SERIALIZED_LENGTH: + raise RuntimeError("Wrong number of bytes for alarm") + hour = alarm_bytes[0] + minute = alarm_bytes[1] + days = Alarm._read_day_bitmask(alarm_bytes[2]) + return Alarm(hour, minute, days) + + @staticmethod + def _read_day_bitmask(bitmask): + days = set() + for i in range(7): + if (bitmask >> i & 1) != 0: + days.add(Weekday(i)) + return days + + @staticmethod + def _make_day_bitmask(days): + bitmask = 0 + for i in range(7): + if Weekday(i) in days: + bitmask |= 1 << i + return bitmask + + def __init__(self, hour, minute, days): + self.hour = hour + self.minute = minute + self.days = days + + def serialize(self): + day_bitmask = Alarm._make_day_bitmask(self.days) + return bytes([self.hour, self.minute, day_bitmask]) + + +class Type(enum.Enum): + U8 = 0 + U16 = 1 + U32 = 2 + U64 = 3 + STRING = 4 + ALARM = 5 + + @staticmethod + def length(type): + if type == Type.U8: + return 1 + elif type == Type.U16: + return 2 + elif type == Type.U32: + return 4 + elif type == Type.U64: + return 8 + elif type == Type.STRING: + return None + elif type == Type.ALARM: + return Alarm.SERIALIZED_LENGTH + else: + raise RuntimeError("Invalid param type") + + +class Param: + @staticmethod + def deserialize(self, type, value_bytes): + return Param(type, Param._deserialize_value(type, value_bytes)) + + @staticmethod + def _deserialize_value(self, type, value_bytes): + if type == Type.U8: + return value_bytes[0] + elif type == Type.U16: + return struct.unpack('!H', value_bytes)[0] + elif type == Type.U32: + return struct.pack('!I', value_bytes)[0] + elif type == Type.U64: + return struct.pack('!Q', value_bytes)[0] + elif type == Type.STRING: + # Strings handled separately due to variable length + raise NotImplemented + elif type == Type.ALARM: + return Alarm.deserialize(value_bytes) + else: + raise RuntimeError("Invalid param type") + + def __init__(self, type, value): + self.type = type + self.value = value + + def serialize(self): + if self.type == Type.U8: + return bytes([self.value]) + elif self.type == Type.U16: + return struct.pack('!H', self.value) + elif self.type == Type.U32: + return struct.pack('!I', self.value) + elif self.type == Type.U64: + return struct.pack('!Q', self.value) + elif self.type == Type.STRING: + return bytes([len(self.value)]) + self.value.encode('utf-8') + elif self.type == Type.ALARM: + return self.value.serialize() + else: + raise RuntimeError("Invalid param type") + +class Instruction(enum.Enum): + GET_VERSION = 0 + SET_TIME = 1 + LIST_ALARMS = 2 + ADD_ALARM = 3 + REMOVE_ALARM = 4 + LIST_SETTINGS = 5 + SET_SETTING = 6 + + +class CommandFailed(Exception): + pass + + +class Command(abc.ABC): + def __init__(self, instruction, *params): + self.instruction = instruction + self.params = params + + def serialize(self): + param_bytes = [param.serialize() for param in self.params] + return bytes([instruction]) + b''.join(param_bytes) + + def run(self, connection): + response = connection.run(self) + if response.status != Status.OK: + raise CommandFailed() + return self.proc_response(response.params) + + @abc.abstractmethod + def proc_response(self, params): + pass + + +class GetVersion(Command): + def __init__(self): + super().__init__(Instruction.GET_VERSION) + + def proc_response(self, params): + return (params[0].value, params[1].value) + + +class SetTime(Command): + def __init__(self, timestamp): + super().__init__(Instruction.SET_TIME, Param(Type.U64, timestamp)) + + +class ListAlarms(Command): + def __init__(self): + super().__init__(Instruction.LIST_ALARMS) + + def proc_response(self, params): + pairs = zip(params[::2], params[1::2]) + return {key.value: value.value for key, value in pairs} + + +class AddAlarm(Command): + def __init__(self, hour, minute, days): + alarm = Alarm(hour, minute, days) + super().__init__(Instruction.ADD_ALARM, Param(Type.ALARM, alarm)) + + def proc_response(self, params): + return params[0].value + + +class RemoveAlarm(Command): + def __init__(self, alarm_id): + super().__init__(Instruction.REMOVE_ALARM, Param(Type.U8, alarm_id)) + + +class ListSettings(Command): + def __init__(self): + super().__init__(Instruction.LIST_SETTINGS) + + def proc_response(self, params): + pairs = zip(params[::2], params[1::2]) + return {key.value: value.value for key, value in pairs} + + +class SetSetting(Command): + def __init__(self, key, value): + key_param = Param(Type.STRING, key) + value_param = Param(Type.STRING, value) + super().__init__(Instruction.SET_SETTING, key_param, value_param) + + +class Status(enum.Enum): + OK = 0 + BUSY = 1 + INVALID_COMMAND = 2 + INTERNAL_ERROR = 3 + OUT_OF_SPACE = 4 + + @staticmethod + def description(status): + if status == OK: + return "OK" + elif status == BUSY: + return "Busy" + elif status == INVALID_COMMAND: + return "Invalid command" + elif status == INTERNAL_ERROR: + return "Internal error" + elif status == OUT_OF_SPACE: + return "Device out of space" + else: + raise RuntimeError("Invalid status") + + +class Response: + def __init__(self, status, params): + self.status = status + self.params = params + + +class Connection: + PORT = 3498 + + def __init__(self, host): + self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.socket.connect((host, self.PORT)) + + def run(self, command): + self._send(command) + return self._receive_response() + + def close(self): + self.socket.close() + + def _send(self, command): + command_bytes = command.serialize() + total_sent = 0 + while total_sent < len(command_bytes): + sent = self.socket.send(command_bytes[total_sent:]) + if sent == 0: + raise RuntimeError("Connection broken") + total_sent += sent + + def _receive_response(self): + status, param_count = self._receive(2) + params = [self._receive_param() for _ in range(param_count)] + return Response(status, params) + + def _receive_param(self): + type = self._receive(1) + if type == Type.STRING: + length = self._receive(1) + value = self._receive(length).decode('utf-8') + return Param(type, value) + else: + value_bytes = self._receive(Type.length(type)) + return Param.deserialize(type, value_bytes) + + def _receive(self, n): + chunks = [] + remaining = n + while remaining > 0: + chunk = self.socket.recv(remaining) + if len(chunk) == 0: + raise RuntimeError("Connection broken") + remaining -= len(chunk) + chunks.append(chunk) + return b''.join(chunks) + + +def version_subcommand(connection, args): + major, minor = GetVersion().run(connection) + print(f"{major}.{minor}") + + +def sync_subcommand(connection, args): + timestamp = int(time.time()) + SetTime(timestamp).run(connection) + + +def alarms_subcommand(connection, args): + def read_dayspec(dayspec): + days = set() + for i in range(7): + if dayspec[i] == 'x': + days.add(Weekday(i)) + return days + + def format_dayspec(days): + dayspec = "" + for i in range(7): + dayspec += 'x' if Weekday(i) in days else '-' + return dayspec + + if args.action == None: + alarms = ListAlarms().run(connection) + for id, alarm in alarms.items(): + dayspec = format_dayspec(alarm.days) + print(f"[{id:2}] {alarm.hour:02}:{alarm.minute:02} {dayspec}") + + elif args.action == "add": + hour, minute = args.time.split(':') + days = read_dayspec(args.dayspec) + id = AddAlarm(int(hour), int(minute), days).run(connection) + print(f"{id}") + + elif args.action == "rm": + RemoveAlarm(int(args.id)).run(connection) + + +def settings_subcommand(connection, args): + if args.action == None: + settings = ListSettings().run(connection) + for key, value in settings.items(): + print(f"{key:15} {value}") + elif args.action == "set": + SetSetting(args.key, args.value).run(connection) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Control a bedside clock remotely") + parser.add_argument( + "--host", help="The hostname/IP of the clock", + default="bedside-clock") + subparsers = parser.add_subparsers( + help="Action to perform", dest="subcommand", required=True) + + version_parser = subparsers.add_parser( + "version", help="Get the version of the clock", + description = "Get the version of the clock") + sync_parser = subparsers.add_parser( + "sync", help="Syncronise the clock to this machine", + description="Syncronise the clock to this machine") + alarms_parser = subparsers.add_parser( + "alarms", help="List and manage alarms", + description="List and manage alarms") + settings_parser = subparsers.add_parser( + "settings", help="List and manage settings", + description="List and manage settings") + + alarms_subparsers = alarms_parser.add_subparsers(dest="action") + alarms_add_parser = alarms_subparsers.add_parser( + "add", help="Add an alarm", description="Add an alarm") + alarms_rm_parser = alarms_subparsers.add_parser( + "rm", help="Remove an alarm", description="Remove an alarm") + + alarms_add_parser.add_argument( + "--workdays", const="xxxxx--", dest="dayspec", action="store_const") + alarms_add_parser.add_argument( + "--weekends", const="-----xx", dest="dayspec", action="store_const") + alarms_add_parser.add_argument( + "time", help="The time for the alarm (HH:MM)") + alarms_add_parser.add_argument( + "dayspec", help="""Days for the alarm to be active on. + + Should be a sequence of 'x's and '-'s. For example, + \"x-x----\" means Mondays and Tuesdays only.""", + default="xxxxxxx") + + alarms_rm_parser.add_argument( + "id", help="ID of the alarm to remove") + + settings_subparsers = settings_parser.add_subparsers(dest="action") + settings_set_parser = settings_subparsers.add_parser( + "set", help="Set a setting", description="Set a setting") + + settings_set_parser.add_argument( + "key", help="Key of the setting to set") + settings_set_parser.add_argument( + "value", help="Value of the setting to set") + + args = parser.parse_args() + + subcommand_map = { + "version": version_subcommand, + "sync": sync_subcommand, + "alarms": alarms_subcommand, + "settings": settings_subcommand, + } + connection = Connection(args.host) + subcommand_map[args.subcommand](connection, args) + connection.close()