diff --git a/lib/agent.ex b/lib/agent.ex new file mode 100644 index 0000000..1685ba2 --- /dev/null +++ b/lib/agent.ex @@ -0,0 +1,69 @@ +defmodule Frajtano.Agent do + alias Frajtano.Peer + use GenServer + + def start_link(args) do + GenServer.start_link(__MODULE__, nil, [{:name, __MODULE__} | args]) + end + + @impl true + def init(_) do + {:ok, + %{ + keys: %{} + }} + end + + @impl true + def handle_call({:identities}, _from, state) do + idents = + for( + {_, peer, :worker, _} <- DynamicSupervisor.which_children(Frajtano.Peer), + is_pid(peer), + do: peer + ) + |> Task.async_stream(fn peer -> + with {:ok, idents} <- Peer.identities(peer), + do: {:ok, {idents, peer}} + end) + + + # Double :ok-wrapping because of Task.async_stream + idents = (for {:ok, {:ok, {idents, peer}}} <- idents, do: {idents, peer}) + + { + :reply, + {:ok, for({idents, _} <- idents, ident <- idents, do: ident)}, + %{ + state + | keys: + for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer}) + } + } + end + + @impl true + def handle_call({:sign, {key, _, _} = req}, _from, state) do + {:reply, Peer.sign(state.keys[key], req), state} + end + + @impl true + def handle_call({:add_peer, path}, _from, state) do + case Peer.start(path) do + {:ok, _} -> {:reply, :ok, state} + {:error, error} -> {:reply, {:error, error}, state} + end + end + + def identities(agent \\ __MODULE__) do + GenServer.call(agent, {:identities}) + end + + def sign(agent \\ __MODULE__, request) do + GenServer.call(agent, {:sign, request}) + end + + def add_peer(agent \\ __MODULE__, path) do + GenServer.call(agent, {:add_peer, path}) + end +end diff --git a/lib/frajtano.ex b/lib/frajtano.ex index 1c04d30..867418a 100644 --- a/lib/frajtano.ex +++ b/lib/frajtano.ex @@ -1,2 +1,29 @@ defmodule Frajtano do + use Application + + @impl true + def start(_type, _args) do + Frajtano.Supervisor.start_link(name: Frajtano.Supervisor) + end end + +defmodule Frajtano.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, :ok, opts) + end + + @impl true + def init(:ok) do + children = [ + Frajtano.Agent, + {Frajtano.Listener, ["/tmp/frajtano1"]}, + {Task.Supervisor, name: Frajtano.ClientSupervisor}, + {DynamicSupervisor, name: Frajtano.Peer} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end + diff --git a/lib/listener.ex b/lib/listener.ex new file mode 100644 index 0000000..0184c40 --- /dev/null +++ b/lib/listener.ex @@ -0,0 +1,79 @@ +defmodule Frajtano.Listener do + require Logger + use Task + + def start_link(path) do + Task.start_link(__MODULE__, :run, [path]) + end + + def run(path) do + Logger.info("Serving on path #{path}") + File.rm(path) + {:ok, sock} = :gen_tcp.listen(0, [:binary, active: false, packet: :raw, ifaddr: {:local, path}]) + loop(sock) + end + + def loop(sock) do + Logger.info("Accepting connections on #{inspect sock}") + {:ok, conn} = :gen_tcp.accept(sock) + Frajtano.ClientHandler.start(conn) + loop(sock) + end +end + +defmodule Frajtano.ClientHandler do + alias Frajtano.Proto + require Logger + + def start(conn) do + Logger.info("Got connection: #{inspect conn}") + Task.Supervisor.start_child(Frajtano.ClientSupervisor, fn -> + :gen_tcp.controlling_process(conn, self()) + handle(conn, <<>>) + end) + end + + defp handle(conn, buffer) do + Logger.info("Handling connection: #{inspect conn}, buffer: #{inspect buffer}") + case Proto.decode(buffer) do + {nil, buffer} -> + Logger.debug("Need more data") + case :gen_tcp.recv(conn, 0) do + {:ok, msg} -> + Logger.debug("Got more data: #{inspect msg}") + handle(conn, buffer <> msg) + {:error, :closed} -> + nil + {:error, error} -> + Logger.warning("Got network error: #{inspect error}") + end + {packet, buffer} -> + :gen_tcp.send(conn, Proto.encode(handle_packet(packet))) + handle(conn, buffer) + end + end + + defp handle_packet({:agentc_request_identities, nil}) do + Logger.info("Got identities request") + case Frajtano.Agent.identities() do + {:ok, identities} -> {:agent_identities_answer, identities} + {:error, error} -> + Logger.warning("Error fetching identities: #{error}") + {:agent_failure, nil} + end + end + + defp handle_packet({:agentc_sign_request, request}) do + case Frajtano.Agent.sign(request) do + {:ok, signature} -> {:agent_sign_response, signature} + {:error, error} -> + Logger.warning("Error signing: #{error}") + {:agent_failure, nil} + end + end + + defp handle_packet({type, body}) do + Logger.warning("Can't handle #{type} packet with body #{inspect body}") + {:agent_failure, nil} + end +end diff --git a/lib/peer.ex b/lib/peer.ex new file mode 100644 index 0000000..f01ff71 --- /dev/null +++ b/lib/peer.ex @@ -0,0 +1,79 @@ +defmodule Frajtano.Peer do + alias Frajtano.Proto + require Logger + use GenServer, restart: :temporary + + def start(path) do + DynamicSupervisor.start_child(Frajtano.Peer, {__MODULE__, path}) + end + + def start_link(path) do + GenServer.start_link(__MODULE__, path) + end + + @impl true + def init(path) do + {:ok, conn} = :gen_tcp.connect({:local, path}, 0, [:binary, active: :once]) + {:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}} + end + + @impl true + def handle_call(packet, from, %{conn: conn, clients: clients} = state) do + case :gen_tcp.send(conn, Proto.encode(packet)) do + :ok -> + {:noreply, %{state | clients: :queue.in(from, clients)}} + + {:error, :closed} -> + {:noreply, state, {:continue, :closed}} + + {:error, e} -> + raise(e) + end + end + + defp handle_messages(%{clients: clients, buffer: buffer} = state) do + case Proto.decode(buffer) do + {nil, buffer} -> + {:noreply, %{state | buffer: buffer}} + + {msg, buffer} -> + {{:value, client}, clients} = :queue.out(clients) + GenServer.reply(client, {:ok, msg}) + handle_messages(%{state | clients: clients, buffer: buffer}) + end + end + + @impl true + def handle_info({:tcp, conn, msg}, %{conn: conn, buffer: buffer} = state) do + :inet.setopts(conn, active: :once) + buffer = buffer <> msg + + handle_messages(%{ state | buffer: buffer }) + end + + @impl true + def handle_info({:tcp_closed, _}, state) do + {:noreply, state, {:continue, :closed}} + end + + @impl true + def handle_continue(:closed, %{clients: clients}) do + clients + |> :queue.to_list() + |> Enum.each(&GenServer.reply(&1, {:error, :closed})) + + {:stop, :closed, %{}} + end + + def identities(peer) do + with {:ok, {:agent_identities_answer, identities}} <- + GenServer.call(peer, {:agentc_request_identities, nil}), + do: {:ok, identities} + end + + def sign(peer, request) do + with {:ok, {:agent_sign_response, signature}} <- + GenServer.call(peer, {:agentc_sign_request, request}), + do: {:ok, signature} + end +end diff --git a/lib/proto/proto.ex b/lib/proto/proto.ex new file mode 100644 index 0000000..aa396c7 --- /dev/null +++ b/lib/proto/proto.ex @@ -0,0 +1,155 @@ +defmodule Frajtano.Proto do + @agentc_request_identities 11 + @agentc_sign_request 13 + @agentc_remove_all_identities 19 + @agentc_extension 27 + + @agent_failure 5 + @agent_success 6 + @agent_identities_answer 12 + @agent_sign_response 14 + @agent_extension_failure 28 + @agent_extension_response 29 + + def encode({:agentc_request_identities, nil}), + do: encode_packet(@agentc_request_identities, <<>>) + + def encode({:agentc_sign_request, {key, data, flags}}), + do: + encode_packet( + @agentc_sign_request, + <> + ) + + def encode({:agentc_remove_all_identities, nil}), + do: encode_packet(@agentc_remove_all_identities, <<>>) + + def encode({:agentc_extension, {type, data}}), + do: + encode_packet( + @agentc_extension, + <> + ) + + def encode({:agent_failure, nil}), do: encode_packet(@agent_failure, <<>>) + def encode({:agent_success, nil}), do: encode_packet(@agent_success, <<>>) + + def encode({:agent_identities_answer, keys}), + do: + encode_packet( + @agent_identities_answer, + < Enum.map_join(fn {key, comment} -> + << + byte_size(key)::big-integer-32, + key::bytes, + byte_size(comment)::big-integer-32, + comment::bytes + >> + end)::bytes>> + ) + + def encode({:agent_sign_response, signature}), + do: + encode_packet( + @agent_sign_response, + <> + ) + + def encode({:agent_extension_failure, nil}), + do: encode_packet(@agent_extension_failure, <<>>) + + def encode({:agent_extension_response, {type, data}}), + do: + encode_packet( + @agent_extension_response, + <> + ) + + defp encode_packet(type, contents), + do: <> + + def decode(packet) do + case decode_packet(packet) do + {{@agentc_request_identities, contents}, rest} -> + {{:agentc_request_identities, decode_agentc_request_identities(contents)}, rest} + + {{@agentc_sign_request, contents}, rest} -> + {{:agentc_sign_request, decode_agentc_sign_request(contents)}, rest} + + {{@agentc_remove_all_identities, contents}, rest} -> + {{:agentc_remove_all_identities, decode_agentc_remove_all_identities(contents)}, rest} + + {{@agentc_extension, contents}, rest} -> + {{:agentc_extension, decode_agentc_extension(contents)}, rest} + + {{@agent_failure, contents}, rest} -> + {{:agent_failure, decode_agent_failure(contents)}, rest} + + {{@agent_success, contents}, rest} -> + {{:agent_success, decode_agent_success(contents)}, rest} + + {{@agent_identities_answer, contents}, rest} -> + {{:agent_identities_answer, decode_agent_identities_answer(contents)}, rest} + + {{@agent_sign_response, contents}, rest} -> + {{:agent_sign_response, decode_agent_sign_response(contents)}, rest} + + {{@agent_extension_failure, contents}, rest} -> + {{:agent_extension_failure, decode_agent_extension_failure(contents)}, rest} + + {{@agent_extension_response, contents}, rest} -> + {{:agent_extension_response, decode_agent_extension_response(contents)}, rest} + + {nil, rest} -> + {nil, rest} + end + end + + defp decode_packet( + <> + ), + do: {{type, contents}, rest} + + defp decode_packet(rest), do: {nil, rest} + + defp decode_agentc_request_identities(<<>>), do: nil + + defp decode_agentc_sign_request( + <> + ), + do: {key, data, flags} + + defp decode_agentc_remove_all_identities(<<>>), do: nil + + defp decode_agentc_extension( + <> + ), + do: {type, contents} + + defp decode_agent_failure(<<>>), do: nil + defp decode_agent_success(<<>>), do: nil + + defp decode_agent_identities_answer(<>) do + lst = + for <>, + do: {key, comment} + + ^nkeys = length(lst) + lst + end + + defp decode_agent_sign_response(<>), do: sig + + defp decode_agent_extension_failure(<<>>), do: nil + + defp decode_agent_extension_response( + <> + ), + do: {type, contents} +end diff --git a/mix.exs b/mix.exs index 2580338..ff47929 100644 --- a/mix.exs +++ b/mix.exs @@ -13,7 +13,8 @@ defmodule Frajtano.MixProject do def application do [ - extra_applications: [:logger] + extra_applications: [:logger], + mod: {Frajtano, []} ] end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/mix.lock @@ -0,0 +1 @@ +