152 lines
4.4 KiB
Elixir
152 lines
4.4 KiB
Elixir
defmodule Frajtano.Peer do
|
|
alias Frajtano.Proto
|
|
require Logger
|
|
use GenServer, restart: :temporary
|
|
|
|
defmodule Spawner do
|
|
use Supervisor, restart: :permanent
|
|
|
|
def start_link(spec) do
|
|
Supervisor.start_link(__MODULE__, spec, [])
|
|
end
|
|
|
|
@impl true
|
|
def init({executable, args}) do
|
|
Temp.track!()
|
|
path = Path.join(Temp.mkdir!(), "agent.sock")
|
|
|
|
children = [
|
|
Supervisor.child_spec({MuonTrap.Daemon, [executable, args ++ [path]]}, restart: :temporary, significant: true),
|
|
Supervisor.child_spec({Frajtano.Peer, {path, :spawned, {executable, args}}}, restart: :permanent)
|
|
]
|
|
|
|
Supervisor.init(children, strategy: :one_for_all, auto_shutdown: :any_significant)
|
|
end
|
|
end
|
|
|
|
def start({:socket, path}) do
|
|
DynamicSupervisor.start_child(Frajtano.PeerSupervisor, {__MODULE__, {path}})
|
|
end
|
|
|
|
def start({:spawn, spec}) do
|
|
DynamicSupervisor.start_child(Frajtano.PeerSupervisor, {Spawner, spec})
|
|
end
|
|
|
|
def start_link({path}) do
|
|
GenServer.start_link(__MODULE__, {path}, name: {:via, Registry, {Frajtano.Peers, path}})
|
|
end
|
|
def start_link({_, _, _} = spec) do
|
|
GenServer.start_link(__MODULE__, spec, name: {:via, Registry, {Frajtano.Peers, spec}})
|
|
end
|
|
|
|
@impl true
|
|
def init({path}) do
|
|
{:ok, conn} = :gen_tcp.connect({:local, path}, 0, [:binary, active: :once])
|
|
{:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}}
|
|
end
|
|
|
|
@impl true
|
|
def init({path, :spawned, _}) do
|
|
if File.exists?(path) do
|
|
{:ok, conn} = :gen_tcp.connect({:local, path}, 0, [:binary, active: :once])
|
|
{:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}}
|
|
else
|
|
Process.sleep(100)
|
|
init({path, :spawned, nil})
|
|
end
|
|
end
|
|
|
|
def reply({client, ref}, msg) do
|
|
send(client, {ref, msg})
|
|
end
|
|
|
|
defp handle_messages(%{clients: clients, buffer: buffer} = state) do
|
|
case Proto.decode(buffer) do
|
|
{nil, buffer} ->
|
|
{:noreply, %{state | buffer: buffer}}
|
|
|
|
{msg, buffer} ->
|
|
{{:value, client}, clients} = :queue.out(clients)
|
|
reply(client, {:ok, msg})
|
|
handle_messages(%{state | clients: clients, buffer: buffer})
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:send, packet, from}, %{conn: conn, clients: clients} = state) do
|
|
case :gen_tcp.send(conn, Proto.encode(packet)) do
|
|
:ok ->
|
|
{:noreply, %{state | clients: :queue.in(from, clients)}}
|
|
|
|
{:error, e} ->
|
|
{:noreply, state, {:continue, {:error, e}}}
|
|
end
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:tcp, conn, msg}, %{conn: conn, buffer: buffer} = state) do
|
|
:inet.setopts(conn, active: :once)
|
|
buffer = buffer <> msg
|
|
|
|
handle_messages(%{state | buffer: buffer})
|
|
end
|
|
|
|
@impl true
|
|
def handle_info({:tcp_closed, _}, state) do
|
|
{:noreply, state, {:continue, {:error, :closed}}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:timeout, state) do
|
|
{:noreply, state, {:continue, {:error, :closed}}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_continue({:error, e}, %{clients: clients}) do
|
|
clients
|
|
|> :queue.to_list()
|
|
|> Enum.each(&reply(&1, {:error, e}))
|
|
|
|
{:stop, {:error, e}, %{}}
|
|
end
|
|
|
|
def identities(peer) do
|
|
ref = make_ref()
|
|
send(peer, {:send, {:agentc_request_identities, nil}, {self(), ref}})
|
|
# Needs to be less than the timeout in Frajtano.Agent.identities on the Task.async_stream call
|
|
# That's 5000 by default
|
|
timer = Process.send_after(peer, :timeout, 4500)
|
|
|
|
receive do
|
|
{^ref, msg} ->
|
|
Process.cancel_timer(timer)
|
|
msg
|
|
end
|
|
|> case do
|
|
{:ok, {:agent_identities_answer, identities}} -> {:ok, identities}
|
|
{:ok, {:agent_failure, nil}} -> {:error, :agent_failure}
|
|
{:ok, msg} -> raise("Unexpected message #{inspect(msg)}")
|
|
{:error, e} -> {:error, e}
|
|
end
|
|
end
|
|
|
|
def sign(peer, request) do
|
|
# Signing may take some time, as a password may need to be entered or similar
|
|
# There is therefore no timeout
|
|
# If something requests identities afterwards, it will timeout, which also kills this signature request
|
|
# The SSH agent protocol strict ordering leaves fun problems with timeouts, as it turns out
|
|
ref = make_ref()
|
|
send(peer, {:send, {:agentc_sign_request, request}, {self(), ref}})
|
|
|
|
receive do
|
|
{^ref, msg} -> msg
|
|
end
|
|
|> case do
|
|
{:ok, {:agent_sign_response, signature}} -> {:ok, signature}
|
|
{:ok, {:agent_failure, nil}} -> {:error, :agent_failure}
|
|
{:ok, msg} -> raise("Unexpected message #{inspect(msg)}")
|
|
{:error, e} -> {:error, e}
|
|
end
|
|
end
|
|
end
|