Compare commits

...

2 Commits

Author SHA1 Message Date
bluepython508
0172c68e09 Deduplicate sockets by path 2024-09-22 12:56:29 +01:00
bluepython508
c3c80b716c Flake: HM module 2024-09-22 12:56:29 +01:00
10 changed files with 165 additions and 64 deletions

3
.envrc
View File

@@ -1 +1,4 @@
use flake use flake
export RELEASE_NODE=frajtano-test@nomos
export FRAJTANO_DIR=$PWD/.frajtano_state

2
.gitignore vendored
View File

@@ -29,3 +29,5 @@ frajtano-*.tar
/.nix-hex /.nix-hex
/.nix-mix /.nix-mix
/result /result
/.frajtano_state
/todo

View File

@@ -1,2 +1,6 @@
import Config import Config
config :frajtano, listen_path: Path.expand(System.get_env("FRAJTANO_DIR")) |> Path.join("agent.sock") config :frajtano,
listen_path: Path.expand(System.get_env("FRAJTANO_DIR")) |> Path.join("agent.sock"),
initial_peers: System.get_env("FRAJTANO_PEERS", "") |> String.split(":", trim: true) |> Enum.map(&Path.expand/1)

View File

@@ -21,6 +21,7 @@
}; };
script = pkgs.writeShellScriptBin pname '' script = pkgs.writeShellScriptBin pname ''
set -eu set -eu
${pkgs.coreutils}/bin/mkdir -p $FRAJTANO_DIR
file="$FRAJTANO_DIR/cookie" file="$FRAJTANO_DIR/cookie"
(umask 077; [ -f "$file" ] || ${pkgs.coreutils}/bin/head -c 128 /dev/urandom | ${pkgs.coreutils}/bin/base64 -w0 > "$file") (umask 077; [ -f "$file" ] || ${pkgs.coreutils}/bin/head -c 128 /dev/urandom | ${pkgs.coreutils}/bin/base64 -w0 > "$file")
export RELEASE_COOKIE=$(${pkgs.coreutils}/bin/cat "$file") export RELEASE_COOKIE=$(${pkgs.coreutils}/bin/cat "$file")
@@ -28,10 +29,13 @@
exec ${lib.getExe pkg} "$@" exec ${lib.getExe pkg} "$@"
} }
case $1 in case "''${1:-}" in
assimilate) assimilate)
run rpc ":ok = \"$(echo -n "$2" | ${pkgs.coreutils}/bin/base64)\" |> Base.decode64!() |> Frajtano.Agent.add_peer()" run rpc ":ok = \"$(echo -n "$2" | ${pkgs.coreutils}/bin/base64)\" |> Base.decode64!() |> Frajtano.Agent.add_peer()"
;; ;;
socket)
echo $FRAJTANO_DIR/agent.sock
;;
*) *)
run "$@" run "$@"
;; ;;

View File

@@ -15,7 +15,11 @@
ownPkgs = self.packages.${system}; ownPkgs = self.packages.${system};
}); });
in { in {
devShells = eachSystem ({pkgs, ownPkgs, ...}: { devShells = eachSystem ({
pkgs,
ownPkgs,
...
}: {
default = pkgs.beam.packages.erlang_26.callPackage ./shell.nix { default = pkgs.beam.packages.erlang_26.callPackage ./shell.nix {
inherit ownPkgs; inherit ownPkgs;
}; };
@@ -23,5 +27,46 @@
packages = eachSystem ({pkgs, ...}: { packages = eachSystem ({pkgs, ...}: {
default = pkgs.beam.packages.erlang_26.callPackage ./default.nix {}; default = pkgs.beam.packages.erlang_26.callPackage ./default.nix {};
}); });
homeModules.default = {
config,
lib,
pkgs,
...
}: let
cfg = config.bluepython508.frajtano;
in {
options.bluepython508.frajtano = {
enable = lib.mkEnableOption "frajtano";
dir = lib.mkOption {
description = "directory in which to place the listening socket";
default = "${config.home.homeDirectory}/.ssh/frajtano";
};
};
config = lib.mkIf cfg.enable {
home.sessionVariables.FRAJTANO_DIR = cfg.dir;
home.packages = [self.packages.${pkgs.system}.default];
systemd.user.services.frajtano = {
Unit.Description = "frajtano";
Unit.After = ["default.target"];
Service.Environment = "'FRAJTANO_DIR=${cfg.dir}'";
Service.ExecStart = "${self.packages.${pkgs.system}.default}/bin/frajtano start";
Install.WantedBy = ["default.target"];
};
launchd.agents.frajtano = {
enable = true;
config = {
EnvironmentVariables = {
FRAJTANO_DIR = cfg.dir;
};
ProgramArguments = ["${self.packages.${pkgs.system}.default}/bin/frajtano" "start"];
RunAtLoad = true;
KeepAlive = true;
};
};
};
};
}; };
} }

View File

@@ -8,43 +8,51 @@ defmodule Frajtano.Agent do
@impl true @impl true
def init(_) do def init(_) do
{:ok, {
%{ :ok,
keys: %{} %{},
}} {:continue, :init_peers}
}
end end
@impl true @impl true
def handle_call({:identities}, _from, state) do def handle_continue(:init_peers, state) do
for peer <- Application.fetch_env!(:frajtano, :initial_peers), do: {:ok, _} = Peer.start(peer)
{:noreply, state}
end
# select: list of specs, where specs are a tuple of match, guards, and outputs
# match is {key, pid, value}, :"$1" is a match variable
def peer_paths() do
Registry.select(Frajtano.Peers, [{{:"$1", :_, :_}, [], [:"$1"]}])
end
def peer_pids() do
Registry.select(Frajtano.Peers, [{{:_, :"$1", :_}, [], [:"$1"]}])
end
@impl true
def handle_call({:identities}, _from, _state) do
idents = idents =
for( Task.async_stream(
{_, peer, :worker, _} <- DynamicSupervisor.which_children(Frajtano.Peer), peer_pids(),
is_pid(peer), &{&1, Peer.identities(&1)},
do: peer ordered: false,
on_timeout: :kill_task
) )
|> Task.async_stream(fn peer ->
with {:ok, idents} <- Peer.identities(peer),
do: {:ok, {idents, peer}}
end)
idents = for {:ok, {peer, {:ok, idents}}} <- idents, do: {idents, peer}
# Double :ok-wrapping because of Task.async_stream
idents = (for {:ok, {:ok, {idents, peer}}} <- idents, do: {idents, peer})
{ {
:reply, :reply,
{:ok, for({idents, _} <- idents, ident <- idents, do: ident)}, {:ok, Enum.flat_map(idents, &elem(&1, 0))},
%{ for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer})
state
| keys:
for({idents, peer} <- idents, {key, _comment} <- idents, into: %{}, do: {key, peer})
}
} }
end end
@impl true @impl true
def handle_call({:sign, {key, _, _} = req}, _from, state) do def handle_call({:sign, {key, _, _} = req}, _from, state) do
{:reply, Peer.sign(state.keys[key], req), state} {:reply, Peer.sign(state[key], req), state}
end end
@impl true @impl true
@@ -56,15 +64,16 @@ defmodule Frajtano.Agent do
end end
end end
def identities(agent \\ __MODULE__) do def identities() do
GenServer.call(agent, {:identities}) GenServer.call(__MODULE__, {:identities})
end end
def sign(agent \\ __MODULE__, request) do def sign(request) do
GenServer.call(agent, {:sign, request}) # Signing can take some time, as a password may need to be entered or similar
GenServer.call(__MODULE__, {:sign, request}, :infinity)
end end
def add_peer(agent \\ __MODULE__, path) do def add_peer(path) do
GenServer.call(agent, {:add_peer, path}) GenServer.call(__MODULE__, {:add_peer, path})
end end
end end

View File

@@ -17,10 +17,11 @@ defmodule Frajtano.Supervisor do
@impl true @impl true
def init(:ok) do def init(:ok) do
children = [ children = [
{DynamicSupervisor, name: Frajtano.PeerSupervisor},
{Registry, keys: :unique, name: Frajtano.Peers},
Frajtano.Agent, Frajtano.Agent,
{Frajtano.Listener, [Application.fetch_env!(:frajtano, :listen_path)]},
{Task.Supervisor, name: Frajtano.ClientSupervisor}, {Task.Supervisor, name: Frajtano.ClientSupervisor},
{DynamicSupervisor, name: Frajtano.Peer} {Frajtano.Listener, [Application.fetch_env!(:frajtano, :listen_path)]},
] ]
Supervisor.init(children, strategy: :one_for_one) Supervisor.init(children, strategy: :one_for_one)

View File

@@ -4,11 +4,11 @@ defmodule Frajtano.Peer do
use GenServer, restart: :temporary use GenServer, restart: :temporary
def start(path) do def start(path) do
DynamicSupervisor.start_child(Frajtano.Peer, {__MODULE__, path}) DynamicSupervisor.start_child(Frajtano.PeerSupervisor, {__MODULE__, path})
end end
def start_link(path) do def start_link(path) do
GenServer.start_link(__MODULE__, path) GenServer.start_link(__MODULE__, path, name: {:via, Registry, {Frajtano.Peers, path}})
end end
@impl true @impl true
@@ -17,18 +17,8 @@ defmodule Frajtano.Peer do
{:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}} {:ok, %{conn: conn, clients: :queue.new(), buffer: <<>>}}
end end
@impl true def reply({client, ref}, msg) do
def handle_call(packet, from, %{conn: conn, clients: clients} = state) do send(client, {ref, msg})
case :gen_tcp.send(conn, Proto.encode(packet)) do
:ok ->
{:noreply, %{state | clients: :queue.in(from, clients)}}
{:error, :closed} ->
{:noreply, state, {:continue, :closed}}
{:error, e} ->
raise(e)
end
end end
defp handle_messages(%{clients: clients, buffer: buffer} = state) do defp handle_messages(%{clients: clients, buffer: buffer} = state) do
@@ -38,42 +28,85 @@ defmodule Frajtano.Peer do
{msg, buffer} -> {msg, buffer} ->
{{:value, client}, clients} = :queue.out(clients) {{:value, client}, clients} = :queue.out(clients)
GenServer.reply(client, {:ok, msg}) reply(client, {:ok, msg})
handle_messages(%{state | clients: clients, buffer: buffer}) handle_messages(%{state | clients: clients, buffer: buffer})
end end
end end
@impl true
def handle_info({:send, packet, from}, %{conn: conn, clients: clients} = state) do
case :gen_tcp.send(conn, Proto.encode(packet)) do
:ok ->
{:noreply, %{state | clients: :queue.in(from, clients)}}
{:error, e} ->
{:noreply, state, {:continue, {:error, e}}}
end
end
@impl true @impl true
def handle_info({:tcp, conn, msg}, %{conn: conn, buffer: buffer} = state) do def handle_info({:tcp, conn, msg}, %{conn: conn, buffer: buffer} = state) do
:inet.setopts(conn, active: :once) :inet.setopts(conn, active: :once)
buffer = buffer <> msg buffer = buffer <> msg
handle_messages(%{ state | buffer: buffer }) handle_messages(%{state | buffer: buffer})
end end
@impl true @impl true
def handle_info({:tcp_closed, _}, state) do def handle_info({:tcp_closed, _}, state) do
{:noreply, state, {:continue, :closed}} {:noreply, state, {:continue, {:error, :closed}}}
end end
@impl true @impl true
def handle_continue(:closed, %{clients: clients}) do def handle_info(:timeout, state) do
{:noreply, state, {:continue, {:error, :closed}}}
end
@impl true
def handle_continue({:error, e}, %{clients: clients}) do
clients clients
|> :queue.to_list() |> :queue.to_list()
|> Enum.each(&GenServer.reply(&1, {:error, :closed})) |> Enum.each(&reply(&1, {:error, e}))
{:stop, :closed, %{}} {:stop, {:error, e}, %{}}
end end
def identities(peer) do def identities(peer) do
with {:ok, {:agent_identities_answer, identities}} <- ref = make_ref()
GenServer.call(peer, {:agentc_request_identities, nil}), send(peer, {:send, {:agentc_request_identities, nil}, {self(), ref}})
do: {:ok, identities} # Needs to be less than the timeout in Frajtano.Agent.identities on the Task.async_stream call
# That's 5000 by default
timer = Process.send_after(peer, :timeout, 4500)
receive do
{^ref, msg} ->
Process.cancel_timer(timer)
msg
end
|> case do
{:ok, {:agent_identities_answer, identities}} -> {:ok, identities}
{:ok, {:agent_failure, nil}} -> {:error, :agent_failure}
{:ok, msg} -> raise("Unexpected message #{inspect(msg)}")
{:error, e} -> {:error, e}
end
end end
def sign(peer, request) do def sign(peer, request) do
with {:ok, {:agent_sign_response, signature}} <- # Signing may take some time, as a password may need to be entered or similar
GenServer.call(peer, {:agentc_sign_request, request}), # There is therefore no timeout
do: {:ok, signature} # If something requests identities afterwards, it will timeout, which also kills this signature request
# The SSH agent protocol strict ordering leaves fun problems with timeouts, as it turns out
ref = make_ref()
send(peer, {:send, {:agentc_sign_request, request}, {self(), ref}})
receive do
{^ref, msg} -> msg
end
|> case do
{:ok, {:agent_sign_response, signature}} -> {:ok, signature}
{:ok, {:agent_failure, nil}} -> {:error, :agent_failure}
{:ok, msg} -> raise("Unexpected message #{inspect(msg)}")
{:error, e} -> {:error, e}
end
end end
end end

View File

@@ -14,7 +14,6 @@ defmodule Frajtano.MixProject do
def application do def application do
[ [
extra_applications: [:logger], extra_applications: [:logger],
env: [listen_path: nil],
mod: {Frajtano, []} mod: {Frajtano, []}
] ]
end end

View File

@@ -3,13 +3,14 @@
pkgs, pkgs,
ownPkgs, ownPkgs,
mkShell, mkShell,
elixir,
elixir-ls, elixir-ls,
inotify-tools, inotify-tools,
}: }:
mkShell { mkShell {
inputsFrom = [ ownPkgs.default ]; inputsFrom = [ ownPkgs.default ];
packages = packages =
[elixir-ls] [elixir elixir-ls]
++ lib.lists.optional (pkgs.system == "x86_64-linux") inotify-tools ++ lib.lists.optional (pkgs.system == "x86_64-linux") inotify-tools
++ lib.lists.optionals (pkgs.system == "aarch64-darwin") (with pkgs.darwin.apple_sdk.frameworks; [ ++ lib.lists.optionals (pkgs.system == "aarch64-darwin") (with pkgs.darwin.apple_sdk.frameworks; [
CoreFoundation CoreFoundation