diff --git a/client/src/connection.vala b/client/src/connection.vala index d0e27fd..c084944 100644 --- a/client/src/connection.vala +++ b/client/src/connection.vala @@ -1,33 +1,221 @@ namespace StudySystemClient { public class Connection { - public signal void response_received(uint8[] response); + public signal void received(uint8[] msg); - private TlsClientConnection tls_client; + private SessionManager session_manager; + private Worker worker; public Connection(string cert_dir) throws Error { var loopback = new InetAddress.loopback(SocketFamily.IPV6); - var host = new InetSocketAddress(loopback, 12888); - - var db_type = TlsBackend.get_default().get_file_database_type(); - var ca_path = cert_dir + "/ca.pem"; - var db = Object.new(db_type, "anchors", ca_path) as TlsDatabase; - var cert_path = cert_dir + "/client.pem"; - var cert = new TlsCertificate.from_file(cert_path); - - var plain_client = new SocketClient(); - var plain_connection = plain_client.connect(host); - tls_client = TlsClientConnection.new(plain_connection, host); - - tls_client.set_database(db); - tls_client.set_certificate(cert); - tls_client.handshake(); + var session_factory + = new SessionFactory(loopback, 12888, cert_dir); + session_manager = new SessionManager( + session_factory, (msg) => { + var msg_copy = new uint8[msg.length]; + Memory.copy(msg_copy, msg, msg.length); + Idle.add(() => { + received(msg_copy); + return false; + }, GLib.Priority.DEFAULT_IDLE); + }); + worker = new Worker(session_manager); } - public async void send(uint8[] message) throws Error { - yield tls_client.output_stream.write_async(message); - var response = new uint8[1024]; - var len = yield tls_client.input_stream.read_async(response); - response_received(response[0:len]); + public void send(owned uint8[] msg) { + session_manager.send(msg); + } + } + + private class Worker { + private uint TASK_PERIOD_MS = 10; + + private SessionManager session_manager; + private bool exit; + private Thread thread; + + public Worker(SessionManager session_manager) { + this.session_manager = session_manager; + exit = false; + thread = new Thread("connection_worker", body); + } + + ~Worker() { + exit = true; + thread.join(); + } + + private void body() { + while (!exit) { + session_manager.task(); + Thread.usleep(1000 * TASK_PERIOD_MS); + } + } + + } + + private class SessionManager { + public delegate void ReceiveCallback(uint8[] msg); + + private const uint INIT_RECONNECT_WAIT_MS = 500; + private const uint MAX_RECONNECT_WAIT_MS = 60000; + private const double RECONNECT_BACKOFF = 1.6; + + private SessionFactory session_factory; + private ReceiveCallback receive_callback; + private Session? session; + private AsyncQueue queue; + private uint reconnect_wait_ms; + + public SessionManager(SessionFactory session_factory, + owned ReceiveCallback receive_callback) { + this.session_factory = session_factory; + this.receive_callback = (owned) receive_callback; + this.session = null; + queue = new AsyncQueue(); + reconnect_wait_ms = INIT_RECONNECT_WAIT_MS; + } + + public void send(uint8[] msg) { + queue.push(new OutgoingMessage(msg)); + } + + public void task() { + if (session != null) { + var failed_msg = session.task(queue); + if (failed_msg != null) + handle_failed_msg(failed_msg); + } else { + try_start_session(); + } + } + + private void handle_failed_msg(OutgoingMessage msg) { + msg.has_failed(); + if (msg.should_retry()) + queue.push(msg); + session = null; + } + + private void try_start_session() { + try { + session = session_factory.start_session(); + session.received.connect( + (msg) => receive_callback(msg)); + reconnect_wait_ms = INIT_RECONNECT_WAIT_MS; + } catch (Error _) { + Thread.usleep(1000 * reconnect_wait_ms); + update_reconnect_wait(); + } + } + + private void update_reconnect_wait() { + var new_wait = RECONNECT_BACKOFF * reconnect_wait_ms; + if (new_wait < MAX_RECONNECT_WAIT_MS) + reconnect_wait_ms = (uint)new_wait; + else + reconnect_wait_ms = MAX_RECONNECT_WAIT_MS; + } + } + + private class SessionFactory { + private const string CA_FILENAME = "/ca.pem"; + private const string CERT_FILENAME = "/client.pem"; + private const uint TIMEOUT_S = 1; + + private InetSocketAddress host; + private TlsCertificate cert; + private TlsDatabase ca_db; + + public SessionFactory(InetAddress host_addr, uint16 host_port, + string cert_dir) throws Error { + host = new InetSocketAddress(host_addr, host_port); + var cert_path = cert_dir + CERT_FILENAME; + cert = new TlsCertificate.from_file(cert_path); + var ca_path = cert_dir + CA_FILENAME; + var db_type = TlsBackend.get_default().get_file_database_type(); + ca_db = Object.new(db_type, "anchors", ca_path) as TlsDatabase; + } + + public Session start_session() throws Error { + var plain_client = new SocketClient(); + plain_client.set_timeout(TIMEOUT_S); + var plain_connection = plain_client.connect(host); + var connection = TlsClientConnection.new(plain_connection, host); + connection.set_database(ca_db); + connection.set_certificate(cert); + connection.handshake(); + return new Session(connection); + } + } + + private class Session { + public signal void received(uint8[] msg); + + private const uint MAX_BATCH_SIZE = 10; + private const uint MAX_MSG_LEN = 1024; + + private TlsClientConnection connection; + + public Session(TlsClientConnection connection) { + this.connection = connection; + } + + public OutgoingMessage? task(AsyncQueue queue) { + for (int i = 0; i < MAX_BATCH_SIZE; ++i) { + if (queue.length() == 0) + break; + var msg = queue.pop(); + var success = true; + success &= send(msg); + success &= receive(); + if (!success) + return msg; + } + return null; + } + + private bool send(OutgoingMessage msg) { + try { + size_t written; + connection.output_stream.write_all(msg.content, out written); + return true; + } catch (IOError _) { + return false; + } + } + + private bool receive() { + try { + var buffer = new uint8[MAX_MSG_LEN]; + var len = connection.input_stream.read(buffer); + if (len <= 0) + return false; + received(buffer[0:len]); + return true; + } catch (IOError _) { + return false; + } + } + } + + private class OutgoingMessage { + public uint8[] content { get; private set; } + + private const uint MAX_FAIL_COUNT = 10; + + private uint fail_count; + + public OutgoingMessage(owned uint8[] content) { + this.content = (owned)content; + fail_count = 0; + } + + public void has_failed() { + ++fail_count; + } + + public bool should_retry() { + return fail_count < MAX_FAIL_COUNT; } } } diff --git a/client/src/main_window.vala b/client/src/main_window.vala index bfd147c..02064db 100644 --- a/client/src/main_window.vala +++ b/client/src/main_window.vala @@ -13,8 +13,8 @@ namespace StudySystemClient { default_height = 580; this.connection = connection; - connection.response_received.connect((response) => { - response_label.label = "Response: " + (string)response; + connection.received.connect((msg) => { + response_label.label = "Response: " + (string)msg; }); var box = new Gtk.Box(Gtk.Orientation.VERTICAL, 10); @@ -24,7 +24,7 @@ namespace StudySystemClient { box.margin_bottom = 10; send_button = new Gtk.Button.with_label("Send"); - send_button.clicked.connect(on_send_clicked); + send_button.clicked.connect(() => connection.send("Foo".data)); box.append(send_button); response_label = new Gtk.Label(""); @@ -35,13 +35,5 @@ namespace StudySystemClient { present(); } - - private async void on_send_clicked() { - try { - yield connection.send("Foo".data); - } catch (Error e) { - response_label.label = "Error: " + e.message; - } - } } }