no drop active connections

This commit is contained in:
Alex S 2019-08-25 15:05:34 +03:00
parent a7aa39cfe4
commit c19d4eeaee
6 changed files with 374 additions and 119 deletions

View File

@ -6,17 +6,24 @@ defmodule Pleroma.Gun.Conn do
@moduledoc """ @moduledoc """
Struct for gun connection data Struct for gun connection data
""" """
@type gun_state :: :open | :up | :down
@type conn_state :: :init | :active | :idle
@type t :: %__MODULE__{ @type t :: %__MODULE__{
conn: pid(), conn: pid(),
state: atom(), gun_state: gun_state(),
waiting_pids: [pid()], waiting_pids: [pid()],
conn_state: conn_state(),
used_by: [pid()],
last_reference: pos_integer(), last_reference: pos_integer(),
crf: float() crf: float()
} }
defstruct conn: nil, defstruct conn: nil,
state: :open, gun_state: :open,
waiting_pids: [], waiting_pids: [],
conn_state: :init,
used_by: [],
last_reference: :os.system_time(:second), last_reference: :os.system_time(:second),
crf: 1 crf: 1
end end

View File

@ -14,7 +14,7 @@ defmodule Pleroma.Gun.Connections do
opts: keyword() opts: keyword()
} }
defstruct conns: %{}, opts: [] defstruct conns: %{}, opts: [], queue: []
alias Pleroma.Gun.API alias Pleroma.Gun.API
alias Pleroma.Gun.Conn alias Pleroma.Gun.Conn
@ -27,8 +27,8 @@ defmodule Pleroma.Gun.Connections do
@impl true @impl true
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
@spec get_conn(String.t(), keyword(), atom()) :: pid() @spec checkin(String.t(), keyword(), atom()) :: pid()
def get_conn(url, opts \\ [], name \\ :default) do def checkin(url, opts \\ [], name \\ :default) do
opts = Enum.into(opts, %{}) opts = Enum.into(opts, %{})
uri = URI.parse(url) uri = URI.parse(url)
@ -53,7 +53,7 @@ defmodule Pleroma.Gun.Connections do
GenServer.call( GenServer.call(
name, name,
{:conn, %{opts: opts, uri: uri}} {:checkin, %{opts: opts, uri: uri}}
) )
end end
@ -68,28 +68,57 @@ defmodule Pleroma.Gun.Connections do
GenServer.call(name, {:state}) GenServer.call(name, {:state})
end end
def checkout(conn, pid, name \\ :default) do
GenServer.cast(name, {:checkout, conn, pid})
end
def process_queue(name \\ :default) do
GenServer.cast(name, {:process_queue})
end
@impl true @impl true
def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do def handle_cast({:checkout, conn_pid, pid}, state) do
{key, conn} = find_conn(state.conns, conn_pid)
used_by = List.keydelete(conn.used_by, pid, 0)
conn_state = if used_by == [], do: :idle, else: conn.conn_state
state = put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
{:noreply, state}
end
@impl true
def handle_cast({:process_queue}, state) do
case state.queue do
[{from, key, uri, opts} | _queue] ->
try_to_checkin(key, uri, from, state, Map.put(opts, :from_cast, true))
[] ->
{:noreply, state}
end
end
@impl true
def handle_call({:checkin, %{opts: opts, uri: uri}}, from, state) do
key = compose_key(uri) key = compose_key(uri)
case state.conns[key] do case state.conns[key] do
%{conn: conn, state: conn_state, last_reference: reference, crf: last_crf} = current_conn %{conn: conn, gun_state: gun_state} = current_conn when gun_state == :up ->
when conn_state == :up ->
time = current_time() time = current_time()
last_reference = time - reference last_reference = time - current_conn.last_reference
current_crf = crf(last_reference, 100, last_crf) current_crf = crf(last_reference, 100, current_conn.crf)
state = state =
put_in(state.conns[key], %{ put_in(state.conns[key], %{
current_conn current_conn
| last_reference: time, | last_reference: time,
crf: current_crf crf: current_crf,
conn_state: :active,
used_by: [from | current_conn.used_by]
}) })
{:reply, conn, state} {:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] -> %{gun_state: gun_state, waiting_pids: pids} when gun_state in [:open, :down] ->
state = put_in(state.conns[key].waiting_pids, [from | pids]) state = put_in(state.conns[key].waiting_pids, [from | pids])
{:noreply, state} {:noreply, state}
@ -99,13 +128,26 @@ defmodule Pleroma.Gun.Connections do
if Enum.count(state.conns) < max_connections do if Enum.count(state.conns) < max_connections do
open_conn(key, uri, from, state, opts) open_conn(key, uri, from, state, opts)
else else
[{close_key, least_used} | _conns] = try_to_checkin(key, uri, from, state, opts)
end
end
end
@impl true
def handle_call({:state}, _from, state), do: {:reply, state, state}
defp try_to_checkin(key, uri, from, state, opts) do
unused_conns =
state.conns state.conns
|> Enum.filter(fn {_k, v} -> v.waiting_pids == [] end) |> Enum.filter(fn {_k, v} ->
v.conn_state == :idle and v.waiting_pids == [] and v.used_by == []
end)
|> Enum.sort(fn {_x_k, x}, {_y_k, y} -> |> Enum.sort(fn {_x_k, x}, {_y_k, y} ->
x.crf < y.crf and x.last_reference < y.last_reference x.crf < y.crf and x.last_reference < y.last_reference
end) end)
case unused_conns do
[{close_key, least_used} | _conns] ->
:ok = API.close(least_used.conn) :ok = API.close(least_used.conn)
state = state =
@ -115,21 +157,23 @@ defmodule Pleroma.Gun.Connections do
) )
open_conn(key, uri, from, state, opts) open_conn(key, uri, from, state, opts)
end
end
end
@impl true [] ->
def handle_call({:state}, _from, state), do: {:reply, state, state} queue =
if List.keymember?(state.queue, from, 0),
do: state.queue,
else: state.queue ++ [{from, key, uri, opts}]
state = put_in(state.queue, queue)
{:noreply, state}
end
end
@impl true @impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do def handle_info({:gun_up, conn_pid, _protocol}, state) do
conn_key = compose_key_gun_info(conn_pid) conn_key = compose_key_gun_info(conn_pid)
{key, conn} = find_conn(state.conns, conn_pid, conn_key) {key, conn} = find_conn(state.conns, conn_pid, conn_key)
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list # Update state of the current connection and set waiting_pids to empty list
time = current_time() time = current_time()
last_reference = time - conn.last_reference last_reference = time - conn.last_reference
@ -138,12 +182,17 @@ defmodule Pleroma.Gun.Connections do
state = state =
put_in(state.conns[key], %{ put_in(state.conns[key], %{
conn conn
| state: :up, | gun_state: :up,
waiting_pids: [], waiting_pids: [],
last_reference: time, last_reference: time,
crf: current_crf crf: current_crf,
conn_state: :active,
used_by: conn.waiting_pids ++ conn.used_by
}) })
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
{:noreply, state} {:noreply, state}
end end
@ -154,7 +203,7 @@ defmodule Pleroma.Gun.Connections do
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end) Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
state = put_in(state.conns[key].state, :down) state = put_in(state.conns[key].gun_state, :down)
{:noreply, state} {:noreply, state}
end end
@ -177,7 +226,7 @@ defmodule Pleroma.Gun.Connections do
end) end)
end end
defp open_conn(key, uri, _from, state, %{proxy: {proxy_host, proxy_port}} = opts) do defp open_conn(key, uri, from, state, %{proxy: {proxy_host, proxy_port}} = opts) do
host = to_charlist(uri.host) host = to_charlist(uri.host)
port = uri.port port = uri.port
@ -202,9 +251,15 @@ defmodule Pleroma.Gun.Connections do
put_in(state.conns[key], %Conn{ put_in(state.conns[key], %Conn{
conn: conn, conn: conn,
waiting_pids: [], waiting_pids: [],
state: :up gun_state: :up,
conn_state: :active,
used_by: [from]
}) })
if opts[:from_cast] do
GenServer.reply(from, conn)
end
{:reply, conn, state} {:reply, conn, state}
else else
error -> error ->
@ -218,6 +273,13 @@ defmodule Pleroma.Gun.Connections do
port = uri.port port = uri.port
with {:ok, conn} <- API.open(host, port, opts) do with {:ok, conn} <- API.open(host, port, opts) do
state =
if opts[:from_cast] do
put_in(state.queue, List.keydelete(state.queue, from, 0))
else
state
end
state = state =
put_in(state.conns[key], %Conn{ put_in(state.conns[key], %Conn{
conn: conn, conn: conn,

View File

@ -10,8 +10,7 @@ defmodule Pleroma.HTTP.Connection do
@options [ @options [
connect_timeout: 10_000, connect_timeout: 10_000,
timeout: 20_000, timeout: 20_000,
pool: :federation, pool: :federation
version: :master
] ]
require Logger require Logger
@ -61,7 +60,7 @@ defmodule Pleroma.HTTP.Connection do
end end
defp get_conn_for_gun(url, options, pool) do defp get_conn_for_gun(url, options, pool) do
case Pleroma.Gun.Connections.get_conn(url, options, pool) do case Pleroma.Gun.Connections.checkin(url, options, pool) do
nil -> nil ->
options options

View File

@ -45,6 +45,7 @@ defmodule Pleroma.HTTP do
params = Keyword.get(options, :params, []) params = Keyword.get(options, :params, [])
request =
%{} %{}
|> Builder.method(method) |> Builder.method(method)
|> Builder.url(url) |> Builder.url(url)
@ -53,7 +54,18 @@ defmodule Pleroma.HTTP do
|> Builder.add_param(:body, :body, body) |> Builder.add_param(:body, :body, body)
|> Builder.add_param(:query, :query, params) |> Builder.add_param(:query, :query, params)
|> Enum.into([]) |> Enum.into([])
|> (&Tesla.request(Connection.new(options), &1)).()
client = Connection.new(options)
response = Tesla.request(client, request)
if adapter_gun? do
%{adapter: {_, _, [adapter_options]}} = client
pool = adapter_options[:pool]
Pleroma.Gun.Connections.checkout(adapter_options[:conn], self(), pool)
Pleroma.Gun.Connections.process_queue(pool)
end
response
rescue rescue
e -> e ->
{:error, e} {:error, e}

View File

@ -34,12 +34,26 @@ defmodule Gun.ConnectionsTest do
end end
test "opens connection and reuse it on next request", %{name: name, pid: pid} do test "opens connection and reuse it on next request", %{name: name, pid: pid} do
conn = Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) conn = Connections.checkin("http://some-domain.com", [genserver_pid: pid], name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
reused_conn = Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) self = self()
%Connections{
conns: %{
"http:some-domain.com:80" => %Conn{
conn: ^conn,
gun_state: :up,
waiting_pids: [],
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin("http://some-domain.com", [genserver_pid: pid], name)
assert conn == reused_conn assert conn == reused_conn
@ -47,23 +61,53 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: [],
used_by: [{^self, _}, {^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
:ok = Connections.checkout(conn, self, name)
%Connections{
conns: %{
"http:some-domain.com:80" => %Conn{
conn: ^conn,
gun_state: :up,
waiting_pids: [],
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
:ok = Connections.checkout(conn, self, name)
%Connections{
conns: %{
"http:some-domain.com:80" => %Conn{
conn: ^conn,
gun_state: :up,
waiting_pids: [],
used_by: [],
conn_state: :idle
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
end end
test "reuses connection based on protocol", %{name: name, pid: pid} do test "reuses connection based on protocol", %{name: name, pid: pid} do
conn = Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) conn = Connections.checkin("http://some-domain.com", [genserver_pid: pid], name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
https_conn = Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name) https_conn = Connections.checkin("https://some-domain.com", [genserver_pid: pid], name)
refute conn == https_conn refute conn == https_conn
reused_https = Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name) reused_https = Connections.checkin("https://some-domain.com", [genserver_pid: pid], name)
refute conn == reused_https refute conn == reused_https
@ -73,12 +117,12 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"https:some-domain.com:443" => %Conn{ "https:some-domain.com:443" => %Conn{
conn: ^https_conn, conn: ^https_conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
} }
@ -86,7 +130,7 @@ defmodule Gun.ConnectionsTest do
end end
test "process gun_down message", %{name: name, pid: pid} do test "process gun_down message", %{name: name, pid: pid} do
conn = Connections.get_conn("http://gun_down.com", [genserver_pid: pid], name) conn = Connections.checkin("http://gun_down.com", [genserver_pid: pid], name)
refute conn refute conn
@ -94,7 +138,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:gun_down.com:80" => %Conn{ "http:gun_down.com:80" => %Conn{
conn: _, conn: _,
state: :down, gun_state: :down,
waiting_pids: _ waiting_pids: _
} }
} }
@ -102,7 +146,7 @@ defmodule Gun.ConnectionsTest do
end end
test "process gun_down message and then gun_up", %{name: name, pid: pid} do test "process gun_down message and then gun_up", %{name: name, pid: pid} do
conn = Connections.get_conn("http://gun_down_and_up.com", [genserver_pid: pid], name) conn = Connections.checkin("http://gun_down_and_up.com", [genserver_pid: pid], name)
refute conn refute conn
@ -110,13 +154,13 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:gun_down_and_up.com:80" => %Conn{ "http:gun_down_and_up.com:80" => %Conn{
conn: _, conn: _,
state: :down, gun_state: :down,
waiting_pids: _ waiting_pids: _
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
conn = Connections.get_conn("http://gun_down_and_up.com", [genserver_pid: pid], name) conn = Connections.checkin("http://gun_down_and_up.com", [genserver_pid: pid], name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
@ -125,7 +169,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:gun_down_and_up.com:80" => %Conn{ "http:gun_down_and_up.com:80" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
} }
@ -136,7 +180,7 @@ defmodule Gun.ConnectionsTest do
tasks = tasks =
for _ <- 1..5 do for _ <- 1..5 do
Task.async(fn -> Task.async(fn ->
Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) Connections.checkin("http://some-domain.com", [genserver_pid: pid], name)
end) end)
end end
@ -153,7 +197,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: conn, conn: conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
} }
@ -162,41 +206,49 @@ defmodule Gun.ConnectionsTest do
assert Enum.all?(conns, fn res -> res == conn end) assert Enum.all?(conns, fn res -> res == conn end)
end end
test "remove frequently used", %{name: name, pid: pid} do test "remove frequently used and idle", %{name: name, pid: pid} do
Connections.get_conn("https://some-domain.com", [genserver_pid: pid], name) self = self()
conn1 = Connections.checkin("https://some-domain.com", [genserver_pid: pid], name)
[conn2 | _conns] =
for _ <- 1..4 do for _ <- 1..4 do
Connections.get_conn("http://some-domain.com", [genserver_pid: pid], name) Connections.checkin("http://some-domain.com", [genserver_pid: pid], name)
end end
%Connections{ %Connections{
conns: %{ conns: %{
"http:some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: _, conn: ^conn2,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}, {^self, _}, {^self, _}, {^self, _}]
}, },
"https:some-domain.com:443" => %Conn{ "https:some-domain.com:443" => %Conn{
conn: _, conn: ^conn1,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
} }
}, },
opts: [max_connections: 2, timeout: 10] opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name) } = Connections.get_state(name)
conn = Connections.get_conn("http://another-domain.com", [genserver_pid: pid], name) :ok = Connections.checkout(conn1, self, name)
conn = Connections.checkin("http://another-domain.com", [genserver_pid: pid], name)
%Connections{ %Connections{
conns: %{ conns: %{
"http:another-domain.com:80" => %Conn{ "http:another-domain.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"http:some-domain.com:80" => %Conn{ "http:some-domain.com:80" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -211,12 +263,12 @@ defmodule Gun.ConnectionsTest do
api = Pleroma.Config.get([API]) api = Pleroma.Config.get([API])
Pleroma.Config.put([API], API.Gun) Pleroma.Config.put([API], API.Gun)
on_exit(fn -> Pleroma.Config.put([API], api) end) on_exit(fn -> Pleroma.Config.put([API], api) end)
conn = Connections.get_conn("http://httpbin.org", [], name) conn = Connections.checkin("http://httpbin.org", [], name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
reused_conn = Connections.get_conn("http://httpbin.org", [], name) reused_conn = Connections.checkin("http://httpbin.org", [], name)
assert conn == reused_conn assert conn == reused_conn
@ -224,7 +276,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:httpbin.org:80" => %Conn{ "http:httpbin.org:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
} }
@ -235,12 +287,12 @@ defmodule Gun.ConnectionsTest do
api = Pleroma.Config.get([API]) api = Pleroma.Config.get([API])
Pleroma.Config.put([API], API.Gun) Pleroma.Config.put([API], API.Gun)
on_exit(fn -> Pleroma.Config.put([API], api) end) on_exit(fn -> Pleroma.Config.put([API], api) end)
conn = Connections.get_conn("https://httpbin.org", [], name) conn = Connections.checkin("https://httpbin.org", [], name)
assert is_pid(conn) assert is_pid(conn)
assert Process.alive?(conn) assert Process.alive?(conn)
reused_conn = Connections.get_conn("https://httpbin.org", [], name) reused_conn = Connections.checkin("https://httpbin.org", [], name)
assert conn == reused_conn assert conn == reused_conn
@ -248,52 +300,54 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"https:httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
} }
} = Connections.get_state(name) } = Connections.get_state(name)
end end
test "remove frequently used", %{name: name, pid: pid} do test "remove frequently used and idle", %{name: name, pid: pid} do
self = self()
api = Pleroma.Config.get([API]) api = Pleroma.Config.get([API])
Pleroma.Config.put([API], API.Gun) Pleroma.Config.put([API], API.Gun)
on_exit(fn -> Pleroma.Config.put([API], api) end) on_exit(fn -> Pleroma.Config.put([API], api) end)
Connections.get_conn("https://www.google.com", [genserver_pid: pid], name) conn = Connections.checkin("https://www.google.com", [genserver_pid: pid], name)
for _ <- 1..4 do for _ <- 1..4 do
Connections.get_conn("https://httpbin.org", [genserver_pid: pid], name) Connections.checkin("https://httpbin.org", [genserver_pid: pid], name)
end end
%Connections{ %Connections{
conns: %{ conns: %{
"https:httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"https:www.google.com:443" => %Conn{ "https:www.google.com:443" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
opts: [max_connections: 2, timeout: 10] opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name) } = Connections.get_state(name)
conn = Connections.get_conn("http://httpbin.org", [genserver_pid: pid], name) :ok = Connections.checkout(conn, self, name)
conn = Connections.checkin("http://httpbin.org", [genserver_pid: pid], name)
%Connections{ %Connections{
conns: %{ conns: %{
"http:httpbin.org:80" => %Conn{ "http:httpbin.org:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"https:httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -301,59 +355,173 @@ defmodule Gun.ConnectionsTest do
} = Connections.get_state(name) } = Connections.get_state(name)
end end
test "remove earlier used", %{name: name, pid: pid} do test "remove earlier used and idle", %{name: name, pid: pid} do
self = self()
api = Pleroma.Config.get([API]) api = Pleroma.Config.get([API])
Pleroma.Config.put([API], API.Gun) Pleroma.Config.put([API], API.Gun)
on_exit(fn -> Pleroma.Config.put([API], api) end) on_exit(fn -> Pleroma.Config.put([API], api) end)
Connections.get_conn("https://www.google.com", [genserver_pid: pid], name) Connections.checkin("https://www.google.com", [genserver_pid: pid], name)
Connections.get_conn("https://www.google.com", [genserver_pid: pid], name) conn = Connections.checkin("https://www.google.com", [genserver_pid: pid], name)
Process.sleep(1_000) Process.sleep(1_000)
Connections.get_conn("https://httpbin.org", [genserver_pid: pid], name) Connections.checkin("https://httpbin.org", [genserver_pid: pid], name)
Connections.get_conn("https://httpbin.org", [genserver_pid: pid], name) Connections.checkin("https://httpbin.org", [genserver_pid: pid], name)
%Connections{ %Connections{
conns: %{ conns: %{
"https:httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"https:www.google.com:443" => %Conn{ "https:www.google.com:443" => %Conn{
conn: _, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
opts: [max_connections: 2, timeout: 10] opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name) } = Connections.get_state(name)
:ok = Connections.checkout(conn, self, name)
:ok = Connections.checkout(conn, self, name)
Process.sleep(1_000) Process.sleep(1_000)
conn = Connections.get_conn("http://httpbin.org", [genserver_pid: pid], name) conn = Connections.checkin("http://httpbin.org", [genserver_pid: pid], name)
%Connections{ %Connections{
conns: %{ conns: %{
"http:httpbin.org:80" => %Conn{ "http:httpbin.org:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
}, },
"https:httpbin.org:443" => %Conn{ "https:httpbin.org:443" => %Conn{
conn: _, conn: _,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
opts: [max_connections: 2, timeout: 10] opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name) } = Connections.get_state(name)
end end
test "doesn't drop active connections on pool overflow addinng new requests to the queue", %{
name: name,
pid: pid
} do
api = Pleroma.Config.get([API])
Pleroma.Config.put([API], API.Gun)
on_exit(fn -> Pleroma.Config.put([API], api) end)
self = self()
Connections.checkin("https://www.google.com", [genserver_pid: pid], name)
conn1 = Connections.checkin("https://www.google.com", [genserver_pid: pid], name)
conn2 = Connections.checkin("https://httpbin.org", [genserver_pid: pid], name)
%Connections{
conns: %{
"https:httpbin.org:443" => %Conn{
conn: ^conn2,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
},
"https:www.google.com:443" => %Conn{
conn: ^conn1,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}, {^self, _}]
}
},
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
task =
Task.async(fn -> Connections.checkin("http://httpbin.org", [genserver_pid: pid], name) end)
task_pid = task.pid
:ok = Connections.checkout(conn1, self, name)
Process.sleep(1_000)
%Connections{
conns: %{
"https:httpbin.org:443" => %Conn{
conn: ^conn2,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
},
"https:www.google.com:443" => %Conn{
conn: ^conn1,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
}
},
queue: [{{^task_pid, _}, "http:httpbin.org:80", _, _}],
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
:ok = Connections.checkout(conn1, self, name)
%Connections{
conns: %{
"https:httpbin.org:443" => %Conn{
conn: ^conn2,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
},
"https:www.google.com:443" => %Conn{
conn: ^conn1,
gun_state: :up,
waiting_pids: [],
conn_state: :idle,
used_by: []
}
},
queue: [{{^task_pid, _}, "http:httpbin.org:80", _, _}],
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
:ok = Connections.process_queue(name)
conn = Task.await(task)
%Connections{
conns: %{
"https:httpbin.org:443" => %Conn{
conn: ^conn2,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^self, _}]
},
"http:httpbin.org:80" => %Conn{
conn: ^conn,
gun_state: :up,
waiting_pids: [],
conn_state: :active,
used_by: [{^task_pid, _}]
}
},
queue: [],
opts: [max_connections: 2, timeout: 10]
} = Connections.get_state(name)
end
end end
describe "with proxy usage" do describe "with proxy usage" do
test "proxy as ip", %{name: name, pid: pid} do test "proxy as ip", %{name: name, pid: pid} do
conn = conn =
Connections.get_conn( Connections.checkin(
"http://proxy_string.com", "http://proxy_string.com",
[genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}], [genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}],
name name
@ -363,7 +531,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:proxy_string.com:80" => %Conn{ "http:proxy_string.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -371,7 +539,7 @@ defmodule Gun.ConnectionsTest do
} = Connections.get_state(name) } = Connections.get_state(name)
reused_conn = reused_conn =
Connections.get_conn( Connections.checkin(
"http://proxy_string.com", "http://proxy_string.com",
[genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}], [genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}],
name name
@ -382,7 +550,7 @@ defmodule Gun.ConnectionsTest do
test "proxy as host", %{name: name, pid: pid} do test "proxy as host", %{name: name, pid: pid} do
conn = conn =
Connections.get_conn( Connections.checkin(
"http://proxy_tuple_atom.com", "http://proxy_tuple_atom.com",
[genserver_pid: pid, proxy: {'localhost', 9050}], [genserver_pid: pid, proxy: {'localhost', 9050}],
name name
@ -392,7 +560,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"http:proxy_tuple_atom.com:80" => %Conn{ "http:proxy_tuple_atom.com:80" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -400,7 +568,7 @@ defmodule Gun.ConnectionsTest do
} = Connections.get_state(name) } = Connections.get_state(name)
reused_conn = reused_conn =
Connections.get_conn( Connections.checkin(
"http://proxy_tuple_atom.com", "http://proxy_tuple_atom.com",
[genserver_pid: pid, proxy: {'localhost', 9050}], [genserver_pid: pid, proxy: {'localhost', 9050}],
name name
@ -411,7 +579,7 @@ defmodule Gun.ConnectionsTest do
test "proxy as ip and ssl", %{name: name, pid: pid} do test "proxy as ip and ssl", %{name: name, pid: pid} do
conn = conn =
Connections.get_conn( Connections.checkin(
"https://proxy_string.com", "https://proxy_string.com",
[genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}], [genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}],
name name
@ -421,7 +589,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"https:proxy_string.com:443" => %Conn{ "https:proxy_string.com:443" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -429,7 +597,7 @@ defmodule Gun.ConnectionsTest do
} = Connections.get_state(name) } = Connections.get_state(name)
reused_conn = reused_conn =
Connections.get_conn( Connections.checkin(
"https://proxy_string.com", "https://proxy_string.com",
[genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}], [genserver_pid: pid, proxy: {{127, 0, 0, 1}, 8123}],
name name
@ -440,7 +608,7 @@ defmodule Gun.ConnectionsTest do
test "proxy as host and ssl", %{name: name, pid: pid} do test "proxy as host and ssl", %{name: name, pid: pid} do
conn = conn =
Connections.get_conn( Connections.checkin(
"https://proxy_tuple_atom.com", "https://proxy_tuple_atom.com",
[genserver_pid: pid, proxy: {'localhost', 9050}], [genserver_pid: pid, proxy: {'localhost', 9050}],
name name
@ -450,7 +618,7 @@ defmodule Gun.ConnectionsTest do
conns: %{ conns: %{
"https:proxy_tuple_atom.com:443" => %Conn{ "https:proxy_tuple_atom.com:443" => %Conn{
conn: ^conn, conn: ^conn,
state: :up, gun_state: :up,
waiting_pids: [] waiting_pids: []
} }
}, },
@ -458,7 +626,7 @@ defmodule Gun.ConnectionsTest do
} = Connections.get_state(name) } = Connections.get_state(name)
reused_conn = reused_conn =
Connections.get_conn( Connections.checkin(
"https://proxy_tuple_atom.com", "https://proxy_tuple_atom.com",
[genserver_pid: pid, proxy: {'localhost', 9050}], [genserver_pid: pid, proxy: {'localhost', 9050}],
name name

View File

@ -71,11 +71,18 @@ defmodule Pleroma.HTTPTest do
options = [adapter: [pool: :federation]] options = [adapter: [pool: :federation]]
assert {:ok, resp} = assert {:ok, resp} = Pleroma.HTTP.get("https://httpbin.org/user-agent", [], options)
Pleroma.HTTP.request(:get, "https://httpbin.org/user-agent", "", [], options)
adapter_opts = resp.opts[:adapter] adapter_opts = resp.opts[:adapter]
assert resp.status == 200
assert adapter_opts[:url] == "https://httpbin.org/user-agent" assert adapter_opts[:url] == "https://httpbin.org/user-agent"
state = Pleroma.Gun.Connections.get_state(:federation)
conn = state.conns["https:httpbin.org:443"]
assert conn.conn_state == :idle
assert conn.used_by == []
assert state.queue == []
end end
end end