diff --git a/config/config.exs b/config/config.exs index 63162d594..e56b9730d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -550,6 +550,24 @@ config :pleroma, :rate_limit, password_reset: {1_800_000, 5}, account_confirmation_resend: {8_640_000, 5} +config :pleroma, :gun_pools, + federation: [ + max_connections: 50, + timeout: 150_000 + ], + media: [ + max_connections: 50, + timeout: 150_000 + ], + upload: [ + max_connections: 25, + timeout: 300_000 + ], + default: [ + max_connections: 10, + timout: 20_000 + ] + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 25e56b9e2..06d1a187e 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -39,6 +39,7 @@ defmodule Pleroma.Application do ] ++ cachex_children() ++ hackney_pool_children() ++ + gun_pools() ++ [ Pleroma.Web.Federator.RetryQueue, Pleroma.Stats, @@ -163,6 +164,19 @@ defmodule Pleroma.Application do end end + defp gun_pools do + if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do + for {pool_name, opts} <- Pleroma.Config.get([:gun_pools]) do + %{ + id: :"gun_pool_#{pool_name}", + start: {Pleroma.Gun.Connections, :start_link, [{pool_name, opts}]} + } + end + else + [] + end + end + defp after_supervisor_start do with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], true <- digest_config[:active] do diff --git a/lib/pleroma/gun/api/api.ex b/lib/pleroma/gun/api/api.ex index 19adc1bf0..7e6d2f929 100644 --- a/lib/pleroma/gun/api/api.ex +++ b/lib/pleroma/gun/api/api.ex @@ -4,11 +4,21 @@ defmodule Pleroma.Gun.API do @callback open(charlist(), pos_integer(), map()) :: {:ok, pid()} + @callback info(pid()) :: map() + @callback close(pid()) :: :ok def open(host, port, opts) do api().open(host, port, opts) end + def info(pid) do + api().info(pid) + end + + def close(pid) do + api().close(pid) + end + defp api do Pleroma.Config.get([Pleroma.Gun.API], Pleroma.Gun.API.Gun) end diff --git a/lib/pleroma/gun/api/gun.ex b/lib/pleroma/gun/api/gun.ex index 14a4b7275..33e7985a1 100644 --- a/lib/pleroma/gun/api/gun.ex +++ b/lib/pleroma/gun/api/gun.ex @@ -19,4 +19,10 @@ defmodule Pleroma.Gun.API.Gun do def open(host, port, opts) do :gun.open(host, port, Map.take(opts, @gun_keys)) end + + @impl Pleroma.Gun.API + def info(pid), do: :gun.info(pid) + + @impl Pleroma.Gun.API + def close(pid), do: :gun.close(pid) end diff --git a/lib/pleroma/gun/api/mock.ex b/lib/pleroma/gun/api/mock.ex index ff9e13a74..a80559f0b 100644 --- a/lib/pleroma/gun/api/mock.ex +++ b/lib/pleroma/gun/api/mock.ex @@ -5,36 +5,75 @@ defmodule Pleroma.Gun.API.Mock do @behaviour Pleroma.Gun.API @impl Pleroma.Gun.API - def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do + def open(domain, 80, %{genserver_pid: genserver_pid}) + when domain in ['another-domain.com', 'some-domain.com'] do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: domain, + origin_port: 80 + }) + send(genserver_pid, {:gun_up, conn_pid, :http}) {:ok, conn_pid} end def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{ + origin_scheme: "https", + origin_host: 'some-domain.com', + origin_port: 443 + }) + send(genserver_pid, {:gun_up, conn_pid, :http2}) {:ok, conn_pid} end @impl Pleroma.Gun.API - def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do + def open('gun_down.com', 80, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down.com', + origin_port: 80 + }) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) {:ok, conn_pid} end @impl Pleroma.Gun.API - def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do + def open('gun_down_and_up.com', 80, %{genserver_pid: genserver_pid}) do {:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end) + + Registry.register(Pleroma.Gun.API.Mock, conn_pid, %{ + origin_scheme: "http", + origin_host: 'gun_down_and_up.com', + origin_port: 80 + }) + send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil}) {:ok, _} = Task.start_link(fn -> Process.sleep(500) + send(genserver_pid, {:gun_up, conn_pid, :http}) end) {:ok, conn_pid} end + + @impl Pleroma.Gun.API + def info(pid) do + [{_, info}] = Registry.lookup(Pleroma.Gun.API.Mock, pid) + info + end + + @impl Pleroma.Gun.API + def close(_pid), do: :ok end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index 62ef146a1..20ddec64c 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -10,8 +10,8 @@ defmodule Pleroma.Gun.Conn do conn: pid(), state: atom(), waiting_pids: [pid()], - protocol: atom() + used: pos_integer() } - defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http + defstruct conn: nil, state: :open, waiting_pids: [], used: 0 end diff --git a/lib/pleroma/gun/connections.ex b/lib/pleroma/gun/connections.ex index a3f1b0351..cec3de2ca 100644 --- a/lib/pleroma/gun/connections.ex +++ b/lib/pleroma/gun/connections.ex @@ -6,29 +6,31 @@ defmodule Pleroma.Gun.Connections do use GenServer @type domain :: String.t() - @type conn :: Gun.Conn.t() + @type conn :: Pleroma.Gun.Conn.t() + @type t :: %__MODULE__{ - conns: %{domain() => conn()} + conns: %{domain() => conn()}, + opts: keyword() } - defstruct conns: %{} + defstruct conns: %{}, opts: [] - def start_link(name \\ __MODULE__) do + @spec start_link({atom(), keyword()}) :: {:ok, pid()} | :ignore + def start_link({name, opts}) do if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do - GenServer.start_link(__MODULE__, [], name: name) + GenServer.start_link(__MODULE__, opts, name: name) else :ignore end end @impl true - def init(_) do - {:ok, %__MODULE__{conns: %{}}} - end + def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} @spec get_conn(String.t(), keyword(), atom()) :: pid() - def get_conn(url, opts \\ [], name \\ __MODULE__) do + def get_conn(url, opts \\ [], name \\ :default) do opts = Enum.into(opts, %{}) + uri = URI.parse(url) opts = @@ -58,13 +60,13 @@ defmodule Pleroma.Gun.Connections do end @spec alive?(atom()) :: boolean() - def alive?(name \\ __MODULE__) do + def alive?(name \\ :default) do pid = Process.whereis(name) if pid, do: Process.alive?(pid), else: false end @spec get_state(atom()) :: t() - def get_state(name \\ __MODULE__) do + def get_state(name \\ :default) do GenServer.call(name, {:state}) end @@ -73,7 +75,8 @@ defmodule Pleroma.Gun.Connections do key = compose_key(uri) case state.conns[key] do - %{conn: conn, state: conn_state} when conn_state == :up -> + %{conn: conn, state: conn_state, used: used} when conn_state == :up -> + state = put_in(state.conns[key].used, used + 1) {:reply, conn, state} %{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> @@ -81,16 +84,23 @@ defmodule Pleroma.Gun.Connections do {:noreply, state} nil -> - {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) + max_connections = state.opts[:max_connections] - state = - put_in(state.conns[key], %Pleroma.Gun.Conn{ - conn: conn, - waiting_pids: [from], - protocol: String.to_atom(uri.scheme) - }) + if Enum.count(state.conns) < max_connections do + open_conn(key, uri, from, state, opts) + else + [{close_key, least_used} | _conns] = Enum.sort_by(state.conns, fn {_k, v} -> v.used end) - {:noreply, state} + :ok = Pleroma.Gun.API.close(least_used.conn) + + state = + put_in( + state.conns, + Map.delete(state.conns, close_key) + ) + + open_conn(key, uri, from, state, opts) + end end end @@ -99,20 +109,29 @@ defmodule Pleroma.Gun.Connections do @impl true def handle_info({:gun_up, conn_pid, _protocol}, state) do - {key, conn} = find_conn(state.conns, conn_pid) + conn_key = compose_key_gun_info(conn_pid) + {key, conn} = find_conn(state.conns, conn_pid, conn_key) # Send to all waiting processes connection pid Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end) # Update state of the current connection and set waiting_pids to empty list - state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []}) + state = + put_in(state.conns[key], %{ + conn + | state: :up, + waiting_pids: [], + used: conn.used + length(conn.waiting_pids) + }) + {:noreply, state} end @impl true # Do we need to do something with killed & unprocessed references? def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed, _unprocessed}, state) do - {key, conn} = find_conn(state.conns, conn_pid) + conn_key = compose_key_gun_info(conn_pid) + {key, conn} = find_conn(state.conns, conn_pid, conn_key) # We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) @@ -121,12 +140,28 @@ defmodule Pleroma.Gun.Connections do {:noreply, state} end - defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port) + defp compose_key(uri), do: "#{uri.scheme}:#{uri.host}:#{uri.port}" - defp find_conn(conns, conn_pid) do + defp compose_key_gun_info(pid) do + info = Pleroma.Gun.API.info(pid) + "#{info.origin_scheme}:#{info.origin_host}:#{info.origin_port}" + end + + defp find_conn(conns, conn_pid, conn_key) do Enum.find(conns, fn {key, conn} -> - protocol = if String.ends_with?(key, ":443"), do: :https, else: :http - conn.conn == conn_pid and conn.protocol == protocol + key == conn_key and conn.conn == conn_pid end) end + + defp open_conn(key, uri, from, state, opts) do + {:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts) + + state = + put_in(state.conns[key], %Pleroma.Gun.Conn{ + conn: conn, + waiting_pids: [from] + }) + + {:noreply, state} + end end diff --git a/test/gun/connections_test.exs b/test/gun/connections_test.exs index a63c8eaf9..8308b5f9f 100644 --- a/test/gun/connections_test.exs +++ b/test/gun/connections_test.exs @@ -6,12 +6,17 @@ defmodule Gun.ConnectionsTest do use ExUnit.Case alias Pleroma.Gun.{Connections, Conn, API} + setup_all do + {:ok, _} = Registry.start_link(keys: :unique, name: API.Mock) + :ok + end + setup do name = :test_gun_connections adapter = Application.get_env(:tesla, :adapter) Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun) on_exit(fn -> Application.put_env(:tesla, :adapter, adapter) end) - {:ok, pid} = Connections.start_link(name) + {:ok, pid} = Connections.start_link({name, [max_connections: 2, timeout: 10]}) {:ok, name: name, pid: pid} end @@ -37,10 +42,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "some-domain.com:80" => %Conn{ + "http:some-domain.com:80" => %Conn{ conn: ^conn, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 2 } } } = Connections.get_state(name) @@ -58,10 +64,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "some-domain.com:80" => %Conn{ + "http:some-domain.com:80" => %Conn{ conn: ^conn, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 2 } } } = Connections.get_state(name) @@ -84,12 +91,12 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "some-domain.com:80" => %Conn{ + "http:some-domain.com:80" => %Conn{ conn: ^conn, state: :up, waiting_pids: [] }, - "some-domain.com:443" => %Conn{ + "https:some-domain.com:443" => %Conn{ conn: ^https_conn, state: :up, waiting_pids: [] @@ -105,7 +112,7 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "gun_down.com:80" => %Conn{ + "http:gun_down.com:80" => %Conn{ conn: _, state: :down, waiting_pids: _ @@ -121,10 +128,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "gun_down_and_up.com:80" => %Conn{ + "http:gun_down_and_up.com:80" => %Conn{ conn: _, state: :down, - waiting_pids: _ + waiting_pids: _, + used: 0 } } } = Connections.get_state(name) @@ -136,10 +144,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "gun_down_and_up.com:80" => %Conn{ + "http:gun_down_and_up.com:80" => %Conn{ conn: _, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 2 } } } = Connections.get_state(name) @@ -164,10 +173,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "some-domain.com:80" => %Conn{ + "http:some-domain.com:80" => %Conn{ conn: conn, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 5 } } } = Connections.get_state(name) @@ -175,6 +185,52 @@ defmodule Gun.ConnectionsTest do assert Enum.all?(conns, fn res -> res == conn end) end + test "remove frequently used", %{name: name, pid: pid} do + Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name) + + for _ <- 1..4 do + Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) + end + + %Connections{ + conns: %{ + "http:some-domain.com:80" => %Conn{ + conn: _, + state: :up, + waiting_pids: [], + used: 4 + }, + "https:some-domain.com:443" => %Conn{ + conn: _, + state: :up, + waiting_pids: [], + used: 1 + } + }, + opts: [max_connections: 2, timeout: 10] + } = Connections.get_state(name) + + conn = Connections.get_conn("http://another-domain.com", [genserver_pid: pid], name) + + %Connections{ + conns: %{ + "http:another-domain.com:80" => %Conn{ + conn: ^conn, + state: :up, + waiting_pids: [], + used: 1 + }, + "http:some-domain.com:80" => %Conn{ + conn: _, + state: :up, + waiting_pids: [], + used: 4 + } + }, + opts: [max_connections: 2, timeout: 10] + } = Connections.get_state(name) + end + describe "integration test" do @describetag :integration @@ -193,10 +249,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "httpbin.org:80" => %Conn{ + "http:httpbin.org:80" => %Conn{ conn: ^conn, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 2 } } } = Connections.get_state(name) @@ -217,10 +274,11 @@ defmodule Gun.ConnectionsTest do %Connections{ conns: %{ - "httpbin.org:443" => %Conn{ + "https:httpbin.org:443" => %Conn{ conn: ^conn, state: :up, - waiting_pids: [] + waiting_pids: [], + used: 2 } } } = Connections.get_state(name)