Add basic forwarding/multiplexing functionality

This commit is contained in:
bluepython508
2024-09-15 14:00:32 +01:00
parent cb207114a7
commit 5229bd8d1c
7 changed files with 412 additions and 1 deletions

69
lib/agent.ex Normal file
View File

@@ -0,0 +1,69 @@
defmodule Frajtano.Agent do
alias Frajtano.Peer
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, nil, [{:name, __MODULE__} | args])
end
@impl true
def init(_) do
{:ok,
%{
keys: %{}
}}
end
@impl true
def handle_call({:identities}, _from, state) do
idents =
for(
{_, peer, :worker, _} <- DynamicSupervisor.which_children(Frajtano.Peer),
is_pid(peer),
do: peer
)
|> Task.async_stream(fn peer ->
with {:ok, idents} <- Peer.identities(peer),
do: {:ok, {idents, peer}}
end)
# Double :ok-wrapping because of Task.async_stream
idents = (for {:ok, {:ok, {idents, peer}}} <- idents, do: {idents, peer})
{
:reply,
{:ok, for({idents, _} <- idents, ident <- idents, do: ident)},
%{
state
| keys:
for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer})
}
}
end
@impl true
def handle_call({:sign, {key, _, _} = req}, _from, state) do
{:reply, Peer.sign(state.keys[key], req), state}
end
@impl true
def handle_call({:add_peer, path}, _from, state) do
case Peer.start(path) do
{:ok, _} -> {:reply, :ok, state}
{:error, error} -> {:reply, {:error, error}, state}
end
end
def identities(agent \\ __MODULE__) do
GenServer.call(agent, {:identities})
end
def sign(agent \\ __MODULE__, request) do
GenServer.call(agent, {:sign, request})
end
def add_peer(agent \\ __MODULE__, path) do
GenServer.call(agent, {:add_peer, path})
end
end

View File

@@ -1,2 +1,29 @@
defmodule Frajtano do defmodule Frajtano do
use Application
@impl true
def start(_type, _args) do
Frajtano.Supervisor.start_link(name: Frajtano.Supervisor)
end end
end
defmodule Frajtano.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
children = [
Frajtano.Agent,
{Frajtano.Listener, ["/tmp/frajtano1"]},
{Task.Supervisor, name: Frajtano.ClientSupervisor},
{DynamicSupervisor, name: Frajtano.Peer}
]
Supervisor.init(children, strategy: :one_for_one)
end
end

79
lib/listener.ex Normal file
View File

@@ -0,0 +1,79 @@
defmodule Frajtano.Listener do
require Logger
use Task
def start_link(path) do
Task.start_link(__MODULE__, :run, [path])
end
def run(path) do
Logger.info("Serving on path #{path}")
File.rm(path)
{:ok, sock} = :gen_tcp.listen(0, [:binary, active: false, packet: :raw, ifaddr: {:local, path}])
loop(sock)
end
def loop(sock) do
Logger.info("Accepting connections on #{inspect sock}")
{:ok, conn} = :gen_tcp.accept(sock)
Frajtano.ClientHandler.start(conn)
loop(sock)
end
end
defmodule Frajtano.ClientHandler do
alias Frajtano.Proto
require Logger
def start(conn) do
Logger.info("Got connection: #{inspect conn}")
Task.Supervisor.start_child(Frajtano.ClientSupervisor, fn ->
:gen_tcp.controlling_process(conn, self())
handle(conn, <<>>)
end)
end
defp handle(conn, buffer) do
Logger.info("Handling connection: #{inspect conn}, buffer: #{inspect buffer}")
case Proto.decode(buffer) do
{nil, buffer} ->
Logger.debug("Need more data")
case :gen_tcp.recv(conn, 0) do
{:ok, msg} ->
Logger.debug("Got more data: #{inspect msg}")
handle(conn, buffer <> msg)
{:error, :closed} ->
nil
{:error, error} ->
Logger.warning("Got network error: #{inspect error}")
end
{packet, buffer} ->
:gen_tcp.send(conn, Proto.encode(handle_packet(packet)))
handle(conn, buffer)
end
end
defp handle_packet({:agentc_request_identities, nil}) do
Logger.info("Got identities request")
case Frajtano.Agent.identities() do
{:ok, identities} -> {:agent_identities_answer, identities}
{:error, error} ->
Logger.warning("Error fetching identities: #{error}")
{:agent_failure, nil}
end
end
defp handle_packet({:agentc_sign_request, request}) do
case Frajtano.Agent.sign(request) do
{:ok, signature} -> {:agent_sign_response, signature}
{:error, error} ->
Logger.warning("Error signing: #{error}")
{:agent_failure, nil}
end
end
defp handle_packet({type, body}) do
Logger.warning("Can't handle #{type} packet with body #{inspect body}")
{:agent_failure, nil}
end
end

79
lib/peer.ex Normal file
View File

@@ -0,0 +1,79 @@
defmodule Frajtano.Peer do
alias Frajtano.Proto
require Logger
use GenServer, restart: :temporary
def start(path) do
DynamicSupervisor.start_child(Frajtano.Peer, {__MODULE__, path})
end
def start_link(path) do
GenServer.start_link(__MODULE__, path)
end
@impl true
def init(path) do
{:ok, conn} = :gen_tcp.connect({:local, path}, 0, [:binary, active: :once])
{:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}}
end
@impl true
def handle_call(packet, from, %{conn: conn, clients: clients} = state) do
case :gen_tcp.send(conn, Proto.encode(packet)) do
:ok ->
{:noreply, %{state | clients: :queue.in(from, clients)}}
{:error, :closed} ->
{:noreply, state, {:continue, :closed}}
{:error, e} ->
raise(e)
end
end
defp handle_messages(%{clients: clients, buffer: buffer} = state) do
case Proto.decode(buffer) do
{nil, buffer} ->
{:noreply, %{state | buffer: buffer}}
{msg, buffer} ->
{{:value, client}, clients} = :queue.out(clients)
GenServer.reply(client, {:ok, msg})
handle_messages(%{state | clients: clients, buffer: buffer})
end
end
@impl true
def handle_info({:tcp, conn, msg}, %{conn: conn, buffer: buffer} = state) do
:inet.setopts(conn, active: :once)
buffer = buffer <> msg
handle_messages(%{ state | buffer: buffer })
end
@impl true
def handle_info({:tcp_closed, _}, state) do
{:noreply, state, {:continue, :closed}}
end
@impl true
def handle_continue(:closed, %{clients: clients}) do
clients
|> :queue.to_list()
|> Enum.each(&GenServer.reply(&1, {:error, :closed}))
{:stop, :closed, %{}}
end
def identities(peer) do
with {:ok, {:agent_identities_answer, identities}} <-
GenServer.call(peer, {:agentc_request_identities, nil}),
do: {:ok, identities}
end
def sign(peer, request) do
with {:ok, {:agent_sign_response, signature}} <-
GenServer.call(peer, {:agentc_sign_request, request}),
do: {:ok, signature}
end
end

155
lib/proto/proto.ex Normal file
View File

@@ -0,0 +1,155 @@
defmodule Frajtano.Proto do
@agentc_request_identities 11
@agentc_sign_request 13
@agentc_remove_all_identities 19
@agentc_extension 27
@agent_failure 5
@agent_success 6
@agent_identities_answer 12
@agent_sign_response 14
@agent_extension_failure 28
@agent_extension_response 29
def encode({:agentc_request_identities, nil}),
do: encode_packet(@agentc_request_identities, <<>>)
def encode({:agentc_sign_request, {key, data, flags}}),
do:
encode_packet(
@agentc_sign_request,
<<byte_size(key)::big-integer-32, key::bytes, byte_size(data)::big-integer-32,
data::bytes, flags::big-integer-32>>
)
def encode({:agentc_remove_all_identities, nil}),
do: encode_packet(@agentc_remove_all_identities, <<>>)
def encode({:agentc_extension, {type, data}}),
do:
encode_packet(
@agentc_extension,
<<byte_size(type)::big-integer-32, type::bytes, data::bytes>>
)
def encode({:agent_failure, nil}), do: encode_packet(@agent_failure, <<>>)
def encode({:agent_success, nil}), do: encode_packet(@agent_success, <<>>)
def encode({:agent_identities_answer, keys}),
do:
encode_packet(
@agent_identities_answer,
<<length(keys)::big-integer-32,
keys
|> Enum.map_join(fn {key, comment} ->
<<
byte_size(key)::big-integer-32,
key::bytes,
byte_size(comment)::big-integer-32,
comment::bytes
>>
end)::bytes>>
)
def encode({:agent_sign_response, signature}),
do:
encode_packet(
@agent_sign_response,
<<byte_size(signature)::big-integer-32, signature::bytes>>
)
def encode({:agent_extension_failure, nil}),
do: encode_packet(@agent_extension_failure, <<>>)
def encode({:agent_extension_response, {type, data}}),
do:
encode_packet(
@agent_extension_response,
<<byte_size(type)::big-integer-32, type::bytes, data::bytes>>
)
defp encode_packet(type, contents),
do: <<byte_size(contents) + 1::big-integer-32, type::integer-8, contents::bytes>>
def decode(packet) do
case decode_packet(packet) do
{{@agentc_request_identities, contents}, rest} ->
{{:agentc_request_identities, decode_agentc_request_identities(contents)}, rest}
{{@agentc_sign_request, contents}, rest} ->
{{:agentc_sign_request, decode_agentc_sign_request(contents)}, rest}
{{@agentc_remove_all_identities, contents}, rest} ->
{{:agentc_remove_all_identities, decode_agentc_remove_all_identities(contents)}, rest}
{{@agentc_extension, contents}, rest} ->
{{:agentc_extension, decode_agentc_extension(contents)}, rest}
{{@agent_failure, contents}, rest} ->
{{:agent_failure, decode_agent_failure(contents)}, rest}
{{@agent_success, contents}, rest} ->
{{:agent_success, decode_agent_success(contents)}, rest}
{{@agent_identities_answer, contents}, rest} ->
{{:agent_identities_answer, decode_agent_identities_answer(contents)}, rest}
{{@agent_sign_response, contents}, rest} ->
{{:agent_sign_response, decode_agent_sign_response(contents)}, rest}
{{@agent_extension_failure, contents}, rest} ->
{{:agent_extension_failure, decode_agent_extension_failure(contents)}, rest}
{{@agent_extension_response, contents}, rest} ->
{{:agent_extension_response, decode_agent_extension_response(contents)}, rest}
{nil, rest} ->
{nil, rest}
end
end
defp decode_packet(
<<length::big-integer-32, type::integer-8, contents::bytes-size(length - 1),
rest::bytes>>
),
do: {{type, contents}, rest}
defp decode_packet(rest), do: {nil, rest}
defp decode_agentc_request_identities(<<>>), do: nil
defp decode_agentc_sign_request(
<<keylen::big-integer-32, key::bytes-size(keylen), datalen::big-integer-32,
data::bytes-size(datalen), flags::big-integer-32>>
),
do: {key, data, flags}
defp decode_agentc_remove_all_identities(<<>>), do: nil
defp decode_agentc_extension(
<<tylen::big-integer-32, type::bytes-size(tylen), contents::bytes>>
),
do: {type, contents}
defp decode_agent_failure(<<>>), do: nil
defp decode_agent_success(<<>>), do: nil
defp decode_agent_identities_answer(<<nkeys::big-integer-32, keys::bytes>>) do
lst =
for <<keylen::big-integer-32, key::bytes-size(keylen), commentlen::big-integer-32,
comment::bytes-size(commentlen) <- keys>>,
do: {key, comment}
^nkeys = length(lst)
lst
end
defp decode_agent_sign_response(<<len::big-integer-32, sig::bytes-size(len)>>), do: sig
defp decode_agent_extension_failure(<<>>), do: nil
defp decode_agent_extension_response(
<<tylen::big-integer-32, type::bytes-size(tylen), contents::bytes>>
),
do: {type, contents}
end

View File

@@ -13,7 +13,8 @@ defmodule Frajtano.MixProject do
def application do def application do
[ [
extra_applications: [:logger] extra_applications: [:logger],
mod: {Frajtano, []}
] ]
end end

1
mix.lock Normal file
View File

@@ -0,0 +1 @@