# SPDX-License-Identifier: AGPL-3.0-only # Copyright (c) Camden Dixie O'Brien import abc import argparse import enum import socket import struct import sys import time class Weekday: MONDAY = 0 TUESDAY = 1 WEDNESDAY = 2 THURSDAY = 3 FRIDAY = 4 SATURDAY = 5 SUNDAY = 6 class Alarm: SERIALIZED_LENGTH = 3 @staticmethod def deserialize(alarm_bytes): if len(alarm_bytes) != Alarm.SERIALIZED_LENGTH: raise RuntimeError("Wrong number of bytes for alarm") hour = alarm_bytes[0] minute = alarm_bytes[1] days = Alarm._read_day_bitmask(alarm_bytes[2]) return Alarm(hour, minute, days) @staticmethod def _read_day_bitmask(bitmask): days = set() for i in range(7): if (bitmask >> i & 1) != 0: days.add(Weekday(i)) return days @staticmethod def _make_day_bitmask(days): bitmask = 0 for i in range(7): if Weekday(i) in days: bitmask |= 1 << i return bitmask def __init__(self, hour, minute, days): self.hour = hour self.minute = minute self.days = days def serialize(self): day_bitmask = Alarm._make_day_bitmask(self.days) return bytes([self.hour, self.minute, day_bitmask]) class Type(enum.Enum): U8 = 0 U16 = 1 U32 = 2 U64 = 3 STRING = 4 ALARM = 5 @staticmethod def length(type): if type == Type.U8: return 1 elif type == Type.U16: return 2 elif type == Type.U32: return 4 elif type == Type.U64: return 8 elif type == Type.STRING: return None elif type == Type.ALARM: return Alarm.SERIALIZED_LENGTH else: raise RuntimeError("Invalid param type") class Param: @staticmethod def deserialize(self, type, value_bytes): return Param(type, Param._deserialize_value(type, value_bytes)) @staticmethod def _deserialize_value(self, type, value_bytes): if type == Type.U8: return value_bytes[0] elif type == Type.U16: return struct.unpack('!H', value_bytes)[0] elif type == Type.U32: return struct.pack('!I', value_bytes)[0] elif type == Type.U64: return struct.pack('!Q', value_bytes)[0] elif type == Type.STRING: # Strings handled separately due to variable length raise NotImplemented elif type == Type.ALARM: return Alarm.deserialize(value_bytes) else: raise RuntimeError("Invalid param type") def __init__(self, type, value): self.type = type self.value = value def serialize(self): if self.type == Type.U8: return bytes([self.value]) elif self.type == Type.U16: return struct.pack('!H', self.value) elif self.type == Type.U32: return struct.pack('!I', self.value) elif self.type == Type.U64: return struct.pack('!Q', self.value) elif self.type == Type.STRING: return bytes([len(self.value)]) + self.value.encode('utf-8') elif self.type == Type.ALARM: return self.value.serialize() else: raise RuntimeError("Invalid param type") class Instruction(enum.Enum): GET_VERSION = 0 SET_TIME = 1 LIST_ALARMS = 2 ADD_ALARM = 3 REMOVE_ALARM = 4 LIST_SETTINGS = 5 SET_SETTING = 6 class CommandFailed(Exception): pass class Command(abc.ABC): def __init__(self, instruction, *params): self.instruction = instruction self.params = params def serialize(self): param_bytes = [param.serialize() for param in self.params] return bytes([instruction]) + b''.join(param_bytes) def run(self, connection): response = connection.run(self) if response.status != Status.OK: raise CommandFailed() return self.proc_response(response.params) @abc.abstractmethod def proc_response(self, params): pass class GetVersion(Command): def __init__(self): super().__init__(Instruction.GET_VERSION) def proc_response(self, params): return (params[0].value, params[1].value) class SetTime(Command): def __init__(self, timestamp): super().__init__(Instruction.SET_TIME, Param(Type.U64, timestamp)) class ListAlarms(Command): def __init__(self): super().__init__(Instruction.LIST_ALARMS) def proc_response(self, params): pairs = zip(params[::2], params[1::2]) return {key.value: value.value for key, value in pairs} class AddAlarm(Command): def __init__(self, hour, minute, days): alarm = Alarm(hour, minute, days) super().__init__(Instruction.ADD_ALARM, Param(Type.ALARM, alarm)) def proc_response(self, params): return params[0].value class RemoveAlarm(Command): def __init__(self, alarm_id): super().__init__(Instruction.REMOVE_ALARM, Param(Type.U8, alarm_id)) class ListSettings(Command): def __init__(self): super().__init__(Instruction.LIST_SETTINGS) def proc_response(self, params): pairs = zip(params[::2], params[1::2]) return {key.value: value.value for key, value in pairs} class SetSetting(Command): def __init__(self, key, value): key_param = Param(Type.STRING, key) value_param = Param(Type.STRING, value) super().__init__(Instruction.SET_SETTING, key_param, value_param) class Status(enum.Enum): OK = 0 BUSY = 1 INVALID_COMMAND = 2 INTERNAL_ERROR = 3 OUT_OF_SPACE = 4 @staticmethod def description(status): if status == OK: return "OK" elif status == BUSY: return "Busy" elif status == INVALID_COMMAND: return "Invalid command" elif status == INTERNAL_ERROR: return "Internal error" elif status == OUT_OF_SPACE: return "Device out of space" else: raise RuntimeError("Invalid status") class Response: def __init__(self, status, params): self.status = status self.params = params class Connection: PORT = 3498 def __init__(self, host): self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.socket.connect((host, self.PORT)) def run(self, command): self._send(command) return self._receive_response() def close(self): self.socket.close() def _send(self, command): command_bytes = command.serialize() total_sent = 0 while total_sent < len(command_bytes): sent = self.socket.send(command_bytes[total_sent:]) if sent == 0: raise RuntimeError("Connection broken") total_sent += sent def _receive_response(self): status, param_count = self._receive(2) params = [self._receive_param() for _ in range(param_count)] return Response(status, params) def _receive_param(self): type = self._receive(1) if type == Type.STRING: length = self._receive(1) value = self._receive(length).decode('utf-8') return Param(type, value) else: value_bytes = self._receive(Type.length(type)) return Param.deserialize(type, value_bytes) def _receive(self, n): chunks = [] remaining = n while remaining > 0: chunk = self.socket.recv(remaining) if len(chunk) == 0: raise RuntimeError("Connection broken") remaining -= len(chunk) chunks.append(chunk) return b''.join(chunks) def version_subcommand(connection, args): major, minor = GetVersion().run(connection) print(f"{major}.{minor}") def sync_subcommand(connection, args): timestamp = int(time.time()) SetTime(timestamp).run(connection) def alarms_subcommand(connection, args): def read_dayspec(dayspec): days = set() for i in range(7): if dayspec[i] == 'x': days.add(Weekday(i)) return days def format_dayspec(days): dayspec = "" for i in range(7): dayspec += 'x' if Weekday(i) in days else '-' return dayspec if args.action == None: alarms = ListAlarms().run(connection) for id, alarm in alarms.items(): dayspec = format_dayspec(alarm.days) print(f"[{id:2}] {alarm.hour:02}:{alarm.minute:02} {dayspec}") elif args.action == "add": hour, minute = args.time.split(':') days = read_dayspec(args.dayspec) id = AddAlarm(int(hour), int(minute), days).run(connection) print(f"{id}") elif args.action == "rm": RemoveAlarm(int(args.id)).run(connection) def settings_subcommand(connection, args): if args.action == None: settings = ListSettings().run(connection) for key, value in settings.items(): print(f"{key:15} {value}") elif args.action == "set": SetSetting(args.key, args.value).run(connection) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Control a bedside clock remotely") parser.add_argument( "--host", help="The hostname/IP of the clock", default="bedside-clock") subparsers = parser.add_subparsers( help="Action to perform", dest="subcommand", required=True) version_parser = subparsers.add_parser( "version", help="Get the version of the clock", description = "Get the version of the clock") sync_parser = subparsers.add_parser( "sync", help="Syncronise the clock to this machine", description="Syncronise the clock to this machine") alarms_parser = subparsers.add_parser( "alarms", help="List and manage alarms", description="List and manage alarms") settings_parser = subparsers.add_parser( "settings", help="List and manage settings", description="List and manage settings") alarms_subparsers = alarms_parser.add_subparsers(dest="action") alarms_add_parser = alarms_subparsers.add_parser( "add", help="Add an alarm", description="Add an alarm") alarms_rm_parser = alarms_subparsers.add_parser( "rm", help="Remove an alarm", description="Remove an alarm") alarms_add_parser.add_argument( "--workdays", const="xxxxx--", dest="dayspec", action="store_const") alarms_add_parser.add_argument( "--weekends", const="-----xx", dest="dayspec", action="store_const") alarms_add_parser.add_argument( "time", help="The time for the alarm (HH:MM)") alarms_add_parser.add_argument( "dayspec", help="""Days for the alarm to be active on. Should be a sequence of 'x's and '-'s. For example, \"x-x----\" means Mondays and Tuesdays only.""", default="xxxxxxx") alarms_rm_parser.add_argument( "id", help="ID of the alarm to remove") settings_subparsers = settings_parser.add_subparsers(dest="action") settings_set_parser = settings_subparsers.add_parser( "set", help="Set a setting", description="Set a setting") settings_set_parser.add_argument( "key", help="Key of the setting to set") settings_set_parser.add_argument( "value", help="Value of the setting to set") args = parser.parse_args() subcommand_map = { "version": version_subcommand, "sync": sync_subcommand, "alarms": alarms_subcommand, "settings": settings_subcommand, } connection = Connection(args.host) subcommand_map[args.subcommand](connection, args) connection.close()