From 0172c68e0913e7e537daaf85ac6b3c33ab90d555 Mon Sep 17 00:00:00 2001 From: bluepython508 <16466646+bluepython508@users.noreply.github.com> Date: Sat, 21 Sep 2024 17:16:23 +0100 Subject: [PATCH] Deduplicate sockets by path --- lib/agent.ex | 60 ++++++++++++++++++++++++------------------------- lib/frajtano.ex | 3 ++- lib/peer.ex | 4 ++-- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/lib/agent.ex b/lib/agent.ex index 3434edb..b6244a5 100644 --- a/lib/agent.ex +++ b/lib/agent.ex @@ -10,9 +10,7 @@ defmodule Frajtano.Agent do def init(_) do { :ok, - %{ - keys: %{} - }, + %{}, {:continue, :init_peers} } end @@ -23,36 +21,38 @@ defmodule Frajtano.Agent do {:noreply, state} end - @impl true - def handle_call({:identities}, _from, state) do - idents = - for( - {_, peer, :worker, _} <- DynamicSupervisor.which_children(Frajtano.Peer), - is_pid(peer), - do: peer - ) - |> Task.async_stream(fn peer -> - with {:ok, idents} <- Peer.identities(peer), - do: {:ok, {idents, peer}} - end) + # select: list of specs, where specs are a tuple of match, guards, and outputs + # match is {key, pid, value}, :"$1" is a match variable + def peer_paths() do + Registry.select(Frajtano.Peers, [{{:"$1", :_, :_}, [], [:"$1"]}]) + end - # Double :ok-wrapping because of Task.async_stream - idents = for {:ok, {:ok, {idents, peer}}} <- idents, do: {idents, peer} + def peer_pids() do + Registry.select(Frajtano.Peers, [{{:_, :"$1", :_}, [], [:"$1"]}]) + end + + @impl true + def handle_call({:identities}, _from, _state) do + idents = + Task.async_stream( + peer_pids(), + &{&1, Peer.identities(&1)}, + ordered: false, + on_timeout: :kill_task + ) + + idents = for {:ok, {peer, {:ok, idents}}} <- idents, do: {idents, peer} { :reply, - {:ok, for({idents, _} <- idents, ident <- idents, do: ident)}, - %{ - state - | keys: - for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer}) - } + {:ok, Enum.flat_map(idents, &elem(&1, 0))}, + for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer}) } end @impl true def handle_call({:sign, {key, _, _} = req}, _from, state) do - {:reply, Peer.sign(state.keys[key], req), state} + {:reply, Peer.sign(state[key], req), state} end @impl true @@ -64,16 +64,16 @@ defmodule Frajtano.Agent do end end - def identities(agent \\ __MODULE__) do - GenServer.call(agent, {:identities}) + def identities() do + GenServer.call(__MODULE__, {:identities}) end - def sign(agent \\ __MODULE__, request) do + def sign(request) do # Signing can take some time, as a password may need to be entered or similar - GenServer.call(agent, {:sign, request}, :infinity) + GenServer.call(__MODULE__, {:sign, request}, :infinity) end - def add_peer(agent \\ __MODULE__, path) do - GenServer.call(agent, {:add_peer, path}) + def add_peer(path) do + GenServer.call(__MODULE__, {:add_peer, path}) end end diff --git a/lib/frajtano.ex b/lib/frajtano.ex index c42464e..f8667be 100644 --- a/lib/frajtano.ex +++ b/lib/frajtano.ex @@ -17,7 +17,8 @@ defmodule Frajtano.Supervisor do @impl true def init(:ok) do children = [ - {DynamicSupervisor, name: Frajtano.Peer}, + {DynamicSupervisor, name: Frajtano.PeerSupervisor}, + {Registry, keys: :unique, name: Frajtano.Peers}, Frajtano.Agent, {Task.Supervisor, name: Frajtano.ClientSupervisor}, {Frajtano.Listener, [Application.fetch_env!(:frajtano, :listen_path)]}, diff --git a/lib/peer.ex b/lib/peer.ex index 6208096..808dc56 100644 --- a/lib/peer.ex +++ b/lib/peer.ex @@ -4,11 +4,11 @@ defmodule Frajtano.Peer do use GenServer, restart: :temporary def start(path) do - DynamicSupervisor.start_child(Frajtano.Peer, {__MODULE__, path}) + DynamicSupervisor.start_child(Frajtano.PeerSupervisor, {__MODULE__, path}) end def start_link(path) do - GenServer.start_link(__MODULE__, path) + GenServer.start_link(__MODULE__, path, name: {:via, Registry, {Frajtano.Peers, path}}) end @impl true