study-system/client/src/connection.vala

123 lines
3.3 KiB
Vala

namespace StudySystemClient {
public class Connection {
public delegate void StatusCallback(bool connected);
public bool connected { get; private set; }
private StatusCallback status_callback;
private SessionManager session_manager;
private TransactionManager transaction_manager;
private Worker worker;
public Connection(string cert_dir,
owned StatusCallback status_callback)
throws Error {
var loopback = new InetAddress.loopback(SocketFamily.IPV6);
var session_factory
= new SessionFactory(loopback, 12888, cert_dir);
this.status_callback = (owned) status_callback;
session_manager = new SessionManager(
session_factory, (msg) => receive(msg),
(connected) => update_status(connected));
transaction_manager = new TransactionManager();
worker = new Worker(session_manager);
connected = true;
}
public async Response.Body? send(Request.Body body) {
var transaction_id = transaction_manager.register(send.callback);
var request = new Request.Request(transaction_id, body);
session_manager.send(request.encode());
yield;
return transaction_manager.get_result(transaction_id);
}
private void receive(owned uint8[] msg) {
Response.Response response;
try {
response = new Response.Response.from_bytes(msg);
} catch (Response.DecodeError e) {
stderr.printf("Invalid response from server: %s", e.message);
return;
}
Idle.add(() => {
transaction_manager.resolve(response);
return false;
}, GLib.Priority.DEFAULT_IDLE);
}
private void update_status(bool connected) {
Idle.add(() => {
if (connected != this.connected) {
this.connected = connected;
status_callback(connected);
}
return false;
}, GLib.Priority.DEFAULT_IDLE);
}
}
private class Continuation {
private SourceFunc callback;
public Continuation(owned SourceFunc callback) {
this.callback = (owned) callback;
}
public void resume() {
callback();
}
}
private class TransactionManager {
private uint16 next_transaction_id;
private HashTable<uint16, Continuation> pending;
private HashTable<uint16, Response.Body> results;
public TransactionManager() {
next_transaction_id = 0;
pending = new HashTable<uint16, Continuation>(null, null);
results = new HashTable<uint16, Response.Body>(null, null);
}
public uint16 register(owned SourceFunc callback) {
var transaction_id = next_transaction_id++;
pending.insert(transaction_id,
new Continuation((owned) callback));
return transaction_id;
}
public void resolve(Response.Response response) {
var transaction_id = (uint16)response.transaction_id;
var continuation = pending.lookup(transaction_id);
if (continuation == null) {
stderr.printf("Response for non-pending transaction %d\n",
transaction_id);
return;
}
results.insert(transaction_id, response.body);
continuation.resume();
}
public Response.Body? get_result(uint16 transaction_id) {
return results.lookup(transaction_id);
}
}
private class Worker : Periodic {
private const uint TASK_PERIOD_MS = 10;
private SessionManager session_manager;
public Worker(SessionManager session_manager) {
base(TASK_PERIOD_MS);
this.session_manager = session_manager;
start();
}
protected override void task() {
session_manager.task();
}
}
}