Implement connection recovery procedures in client
This commit is contained in:
parent
53857bc613
commit
b522ef8a98
@ -1,33 +1,221 @@
|
||||
namespace StudySystemClient {
|
||||
public class Connection {
|
||||
public signal void response_received(uint8[] response);
|
||||
public signal void received(uint8[] msg);
|
||||
|
||||
private TlsClientConnection tls_client;
|
||||
private SessionManager session_manager;
|
||||
private Worker worker;
|
||||
|
||||
public Connection(string cert_dir) throws Error {
|
||||
var loopback = new InetAddress.loopback(SocketFamily.IPV6);
|
||||
var host = new InetSocketAddress(loopback, 12888);
|
||||
var session_factory
|
||||
= new SessionFactory(loopback, 12888, cert_dir);
|
||||
session_manager = new SessionManager(
|
||||
session_factory, (msg) => {
|
||||
var msg_copy = new uint8[msg.length];
|
||||
Memory.copy(msg_copy, msg, msg.length);
|
||||
Idle.add(() => {
|
||||
received(msg_copy);
|
||||
return false;
|
||||
}, GLib.Priority.DEFAULT_IDLE);
|
||||
});
|
||||
worker = new Worker(session_manager);
|
||||
}
|
||||
|
||||
public void send(owned uint8[] msg) {
|
||||
session_manager.send(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private class Worker {
|
||||
private uint TASK_PERIOD_MS = 10;
|
||||
|
||||
private SessionManager session_manager;
|
||||
private bool exit;
|
||||
private Thread<void> thread;
|
||||
|
||||
public Worker(SessionManager session_manager) {
|
||||
this.session_manager = session_manager;
|
||||
exit = false;
|
||||
thread = new Thread<void>("connection_worker", body);
|
||||
}
|
||||
|
||||
~Worker() {
|
||||
exit = true;
|
||||
thread.join();
|
||||
}
|
||||
|
||||
private void body() {
|
||||
while (!exit) {
|
||||
session_manager.task();
|
||||
Thread.usleep(1000 * TASK_PERIOD_MS);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class SessionManager {
|
||||
public delegate void ReceiveCallback(uint8[] msg);
|
||||
|
||||
private const uint INIT_RECONNECT_WAIT_MS = 500;
|
||||
private const uint MAX_RECONNECT_WAIT_MS = 60000;
|
||||
private const double RECONNECT_BACKOFF = 1.6;
|
||||
|
||||
private SessionFactory session_factory;
|
||||
private ReceiveCallback receive_callback;
|
||||
private Session? session;
|
||||
private AsyncQueue<OutgoingMessage> queue;
|
||||
private uint reconnect_wait_ms;
|
||||
|
||||
public SessionManager(SessionFactory session_factory,
|
||||
owned ReceiveCallback receive_callback) {
|
||||
this.session_factory = session_factory;
|
||||
this.receive_callback = (owned) receive_callback;
|
||||
this.session = null;
|
||||
queue = new AsyncQueue<OutgoingMessage>();
|
||||
reconnect_wait_ms = INIT_RECONNECT_WAIT_MS;
|
||||
}
|
||||
|
||||
public void send(uint8[] msg) {
|
||||
queue.push(new OutgoingMessage(msg));
|
||||
}
|
||||
|
||||
public void task() {
|
||||
if (session != null) {
|
||||
var failed_msg = session.task(queue);
|
||||
if (failed_msg != null)
|
||||
handle_failed_msg(failed_msg);
|
||||
} else {
|
||||
try_start_session();
|
||||
}
|
||||
}
|
||||
|
||||
private void handle_failed_msg(OutgoingMessage msg) {
|
||||
msg.has_failed();
|
||||
if (msg.should_retry())
|
||||
queue.push(msg);
|
||||
session = null;
|
||||
}
|
||||
|
||||
private void try_start_session() {
|
||||
try {
|
||||
session = session_factory.start_session();
|
||||
session.received.connect(
|
||||
(msg) => receive_callback(msg));
|
||||
reconnect_wait_ms = INIT_RECONNECT_WAIT_MS;
|
||||
} catch (Error _) {
|
||||
Thread.usleep(1000 * reconnect_wait_ms);
|
||||
update_reconnect_wait();
|
||||
}
|
||||
}
|
||||
|
||||
private void update_reconnect_wait() {
|
||||
var new_wait = RECONNECT_BACKOFF * reconnect_wait_ms;
|
||||
if (new_wait < MAX_RECONNECT_WAIT_MS)
|
||||
reconnect_wait_ms = (uint)new_wait;
|
||||
else
|
||||
reconnect_wait_ms = MAX_RECONNECT_WAIT_MS;
|
||||
}
|
||||
}
|
||||
|
||||
private class SessionFactory {
|
||||
private const string CA_FILENAME = "/ca.pem";
|
||||
private const string CERT_FILENAME = "/client.pem";
|
||||
private const uint TIMEOUT_S = 1;
|
||||
|
||||
private InetSocketAddress host;
|
||||
private TlsCertificate cert;
|
||||
private TlsDatabase ca_db;
|
||||
|
||||
public SessionFactory(InetAddress host_addr, uint16 host_port,
|
||||
string cert_dir) throws Error {
|
||||
host = new InetSocketAddress(host_addr, host_port);
|
||||
var cert_path = cert_dir + CERT_FILENAME;
|
||||
cert = new TlsCertificate.from_file(cert_path);
|
||||
var ca_path = cert_dir + CA_FILENAME;
|
||||
var db_type = TlsBackend.get_default().get_file_database_type();
|
||||
var ca_path = cert_dir + "/ca.pem";
|
||||
var db = Object.new(db_type, "anchors", ca_path) as TlsDatabase;
|
||||
var cert_path = cert_dir + "/client.pem";
|
||||
var cert = new TlsCertificate.from_file(cert_path);
|
||||
ca_db = Object.new(db_type, "anchors", ca_path) as TlsDatabase;
|
||||
}
|
||||
|
||||
public Session start_session() throws Error {
|
||||
var plain_client = new SocketClient();
|
||||
plain_client.set_timeout(TIMEOUT_S);
|
||||
var plain_connection = plain_client.connect(host);
|
||||
tls_client = TlsClientConnection.new(plain_connection, host);
|
||||
|
||||
tls_client.set_database(db);
|
||||
tls_client.set_certificate(cert);
|
||||
tls_client.handshake();
|
||||
var connection = TlsClientConnection.new(plain_connection, host);
|
||||
connection.set_database(ca_db);
|
||||
connection.set_certificate(cert);
|
||||
connection.handshake();
|
||||
return new Session(connection);
|
||||
}
|
||||
}
|
||||
|
||||
public async void send(uint8[] message) throws Error {
|
||||
yield tls_client.output_stream.write_async(message);
|
||||
var response = new uint8[1024];
|
||||
var len = yield tls_client.input_stream.read_async(response);
|
||||
response_received(response[0:len]);
|
||||
private class Session {
|
||||
public signal void received(uint8[] msg);
|
||||
|
||||
private const uint MAX_BATCH_SIZE = 10;
|
||||
private const uint MAX_MSG_LEN = 1024;
|
||||
|
||||
private TlsClientConnection connection;
|
||||
|
||||
public Session(TlsClientConnection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public OutgoingMessage? task(AsyncQueue<OutgoingMessage> queue) {
|
||||
for (int i = 0; i < MAX_BATCH_SIZE; ++i) {
|
||||
if (queue.length() == 0)
|
||||
break;
|
||||
var msg = queue.pop();
|
||||
var success = true;
|
||||
success &= send(msg);
|
||||
success &= receive();
|
||||
if (!success)
|
||||
return msg;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private bool send(OutgoingMessage msg) {
|
||||
try {
|
||||
size_t written;
|
||||
connection.output_stream.write_all(msg.content, out written);
|
||||
return true;
|
||||
} catch (IOError _) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private bool receive() {
|
||||
try {
|
||||
var buffer = new uint8[MAX_MSG_LEN];
|
||||
var len = connection.input_stream.read(buffer);
|
||||
if (len <= 0)
|
||||
return false;
|
||||
received(buffer[0:len]);
|
||||
return true;
|
||||
} catch (IOError _) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class OutgoingMessage {
|
||||
public uint8[] content { get; private set; }
|
||||
|
||||
private const uint MAX_FAIL_COUNT = 10;
|
||||
|
||||
private uint fail_count;
|
||||
|
||||
public OutgoingMessage(owned uint8[] content) {
|
||||
this.content = (owned)content;
|
||||
fail_count = 0;
|
||||
}
|
||||
|
||||
public void has_failed() {
|
||||
++fail_count;
|
||||
}
|
||||
|
||||
public bool should_retry() {
|
||||
return fail_count < MAX_FAIL_COUNT;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,8 +13,8 @@ namespace StudySystemClient {
|
||||
default_height = 580;
|
||||
|
||||
this.connection = connection;
|
||||
connection.response_received.connect((response) => {
|
||||
response_label.label = "Response: " + (string)response;
|
||||
connection.received.connect((msg) => {
|
||||
response_label.label = "Response: " + (string)msg;
|
||||
});
|
||||
|
||||
var box = new Gtk.Box(Gtk.Orientation.VERTICAL, 10);
|
||||
@ -24,7 +24,7 @@ namespace StudySystemClient {
|
||||
box.margin_bottom = 10;
|
||||
|
||||
send_button = new Gtk.Button.with_label("Send");
|
||||
send_button.clicked.connect(on_send_clicked);
|
||||
send_button.clicked.connect(() => connection.send("Foo".data));
|
||||
box.append(send_button);
|
||||
|
||||
response_label = new Gtk.Label("");
|
||||
@ -35,13 +35,5 @@ namespace StudySystemClient {
|
||||
|
||||
present();
|
||||
}
|
||||
|
||||
private async void on_send_clicked() {
|
||||
try {
|
||||
yield connection.send("Foo".data);
|
||||
} catch (Error e) {
|
||||
response_label.label = "Error: " + e.message;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user