Deduplicate sockets by path

This commit is contained in:
bluepython508
2024-09-21 17:16:23 +01:00
parent df2c942fd7
commit 998c7e32a1
3 changed files with 34 additions and 33 deletions

View File

@@ -10,9 +10,7 @@ defmodule Frajtano.Agent do
def init(_) do def init(_) do
{ {
:ok, :ok,
%{ %{},
keys: %{}
},
{:continue, :init_peers} {:continue, :init_peers}
} }
end end
@@ -23,36 +21,38 @@ defmodule Frajtano.Agent do
{:noreply, state} {:noreply, state}
end end
@impl true # select: list of specs, where specs are a tuple of match, guards, and outputs
def handle_call({:identities}, _from, state) do # match is {key, pid, value}, :"$1" is a match variable
idents = def peer_paths() do
for( Registry.select(Frajtano.Peers, [{{:"$1", :_, :_}, [], [:"$1"]}])
{_, peer, :worker, _} <- DynamicSupervisor.which_children(Frajtano.Peer), end
is_pid(peer),
do: peer
)
|> Task.async_stream(fn peer ->
with {:ok, idents} <- Peer.identities(peer),
do: {:ok, {idents, peer}}
end)
# Double :ok-wrapping because of Task.async_stream def peer_pids() do
idents = for {:ok, {:ok, {idents, peer}}} <- idents, do: {idents, peer} Registry.select(Frajtano.Peers, [{{:_, :"$1", :_}, [], [:"$1"]}])
end
@impl true
def handle_call({:identities}, _from, _state) do
idents =
Task.async_stream(
peer_pids(),
&{&1, Peer.identities(&1)},
ordered: false,
on_timeout: :kill_task
)
idents = for {:ok, {peer, {:ok, idents}}} <- idents, do: {idents, peer}
{ {
:reply, :reply,
{:ok, for({idents, _} <- idents, ident <- idents, do: ident)}, {:ok, Enum.flat_map(idents, &elem(&1, 0))},
%{ for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer})
state
| keys:
for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer})
}
} }
end end
@impl true @impl true
def handle_call({:sign, {key, _, _} = req}, _from, state) do def handle_call({:sign, {key, _, _} = req}, _from, state) do
{:reply, Peer.sign(state.keys[key], req), state} {:reply, Peer.sign(state[key], req), state}
end end
@impl true @impl true
@@ -64,16 +64,16 @@ defmodule Frajtano.Agent do
end end
end end
def identities(agent \\ __MODULE__) do def identities() do
GenServer.call(agent, {:identities}) GenServer.call(__MODULE__, {:identities})
end end
def sign(agent \\ __MODULE__, request) do def sign(request) do
# Signing can take some time, as a password may need to be entered or similar # Signing can take some time, as a password may need to be entered or similar
GenServer.call(agent, {:sign, request}, :infinity) GenServer.call(__MODULE__, {:sign, request}, :infinity)
end end
def add_peer(agent \\ __MODULE__, path) do def add_peer(path) do
GenServer.call(agent, {:add_peer, path}) GenServer.call(__MODULE__, {:add_peer, path})
end end
end end

View File

@@ -17,7 +17,8 @@ defmodule Frajtano.Supervisor do
@impl true @impl true
def init(:ok) do def init(:ok) do
children = [ children = [
{DynamicSupervisor, name: Frajtano.Peer}, {DynamicSupervisor, name: Frajtano.PeerSupervisor},
{Registry, keys: :unique, name: Frajtano.Peers},
Frajtano.Agent, Frajtano.Agent,
{Task.Supervisor, name: Frajtano.ClientSupervisor}, {Task.Supervisor, name: Frajtano.ClientSupervisor},
{Frajtano.Listener, [Application.fetch_env!(:frajtano, :listen_path)]}, {Frajtano.Listener, [Application.fetch_env!(:frajtano, :listen_path)]},

View File

@@ -4,11 +4,11 @@ defmodule Frajtano.Peer do
use GenServer, restart: :temporary use GenServer, restart: :temporary
def start(path) do def start(path) do
DynamicSupervisor.start_child(Frajtano.Peer, {__MODULE__, path}) DynamicSupervisor.start_child(Frajtano.PeerSupervisor, {__MODULE__, path})
end end
def start_link(path) do def start_link(path) do
GenServer.start_link(__MODULE__, path) GenServer.start_link(__MODULE__, path, name: {:via, Registry, {Frajtano.Peers, path}})
end end
@impl true @impl true