added gun connections genserver

This commit is contained in:
Alex S 2019-08-13 14:37:19 +03:00 committed by Ariadne Conill
parent 26691b1b35
commit 2469061657
12 changed files with 393 additions and 7 deletions

View File

@ -85,6 +85,8 @@ config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp3
config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock
config :pleroma, Pleroma.Gun.API, Pleroma.Gun.API.Mock
try do
import_config "test.secret.exs"
rescue

View File

@ -0,0 +1,15 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.API do
@callback open(charlist(), pos_integer(), map()) :: {:ok, pid()}
def open(host, port, opts) do
api().open(host, port, opts)
end
defp api do
Pleroma.Config.get([Pleroma.Gun.API], :gun)
end
end

View File

@ -0,0 +1,40 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.API.Mock do
@behaviour Pleroma.Gun.API
@impl Pleroma.Gun.API
def open('some-domain.com', 80, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
send(genserver_pid, {:gun_up, conn_pid, :http})
{:ok, conn_pid}
end
def open('some-domain.com', 443, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
send(genserver_pid, {:gun_up, conn_pid, :https})
{:ok, conn_pid}
end
@impl Pleroma.Gun.API
def open('gun_down.com', _port, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, conn_pid}
end
@impl Pleroma.Gun.API
def open('gun_down_and_up.com', _port, %{genserver_pid: genserver_pid}) do
{:ok, conn_pid} = Task.start_link(fn -> Process.sleep(1_000) end)
send(genserver_pid, {:gun_down, conn_pid, :http, nil, nil, nil})
{:ok, _} =
Task.start_link(fn ->
Process.sleep(500)
send(genserver_pid, {:gun_up, conn_pid, :http})
end)
{:ok, conn_pid}
end
end

17
lib/pleroma/gun/conn.ex Normal file
View File

@ -0,0 +1,17 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.Conn do
@moduledoc """
Struct for gun connection data
"""
@type t :: %__MODULE__{
conn: pid(),
state: atom(),
waiting_pids: [pid()],
protocol: atom()
}
defstruct conn: nil, state: :open, waiting_pids: [], protocol: :http
end

View File

@ -0,0 +1,104 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.Connections do
use GenServer
@type domain :: String.t()
@type conn :: Gun.Conn.t()
@type t :: %__MODULE__{
conns: %{domain() => conn()}
}
defstruct conns: %{}
def start_link(name \\ __MODULE__) do
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
GenServer.start_link(__MODULE__, [], name: name)
else
:ignore
end
end
@impl true
def init(_) do
{:ok, %__MODULE__{conns: %{}}}
end
@spec get_conn(atom(), String.t(), keyword()) :: pid()
def get_conn(name \\ __MODULE__, url, opts \\ []) do
opts = Enum.into(opts, %{})
uri = URI.parse(url)
opts = if uri.scheme == "https", do: Map.put(opts, :transport, :tls), else: opts
GenServer.call(
name,
{:conn, %{opts: opts, uri: uri}}
)
end
@spec get_state(atom()) :: t()
def get_state(name \\ __MODULE__) do
GenServer.call(name, {:state})
end
@impl true
def handle_call({:conn, %{opts: opts, uri: uri}}, from, state) do
key = compose_key(uri)
case state.conns[key] do
%{conn: conn, state: conn_state} when conn_state == :up ->
{:reply, conn, state}
%{state: conn_state, waiting_pids: pids} when conn_state in [:open, :down] ->
state = put_in(state.conns[key].waiting_pids, [from | pids])
{:noreply, state}
nil ->
{:ok, conn} = Pleroma.Gun.API.open(to_charlist(uri.host), uri.port, opts)
state =
put_in(state.conns[key], %Pleroma.Gun.Conn{
conn: conn,
waiting_pids: [from],
protocol: String.to_atom(uri.scheme)
})
{:noreply, state}
end
end
@impl true
def handle_call({:state}, _from, state), do: {:reply, state, state}
@impl true
def handle_info({:gun_up, conn_pid, protocol}, state) do
{key, conn} = find_conn(state.conns, conn_pid, protocol)
# Send to all waiting processes connection pid
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, conn_pid) end)
# Update state of the current connection and set waiting_pids to empty list
state = put_in(state.conns[key], %{conn | state: :up, waiting_pids: []})
{:noreply, state}
end
@impl true
# Do we need to do something with killed & unprocessed references?
def handle_info({:gun_down, conn_pid, protocol, _reason, _killed, _unprocessed}, state) do
{key, conn} = find_conn(state.conns, conn_pid, protocol)
# We don't want to block requests to GenServer if gun send down message, return nil, so we can make some retries, while connection is not up
Enum.each(conn.waiting_pids, fn waiting_pid -> GenServer.reply(waiting_pid, nil) end)
state = put_in(state.conns[key].state, :down)
{:noreply, state}
end
defp compose_key(uri), do: uri.host <> ":" <> to_string(uri.port)
defp find_conn(conns, conn_pid, protocol),
do: Enum.find(conns, fn {_, conn} -> conn.conn == conn_pid and conn.protocol == protocol end)
end

View File

@ -9,7 +9,6 @@ defmodule Pleroma.HTTP.Connection do
@options [
connect_timeout: 10_000,
protocols: [:http],
timeout: 20_000
]

View File

@ -28,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do
def close(ref), do: client().close(ref)
defp client do
Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
end
end

View File

@ -1,3 +1,7 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-onl
defmodule Pleroma.ReverseProxy.Client.Tesla do
@behaviour Pleroma.ReverseProxy.Client
@ -6,7 +10,7 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
def request(method, url, headers, body, opts \\ []) do
adapter_opts =
Keyword.get(opts, :adapter, [])
|> Keyword.put(:chunks_response, true)
|> Keyword.put(:body_as, :chunks)
with {:ok, response} <-
Pleroma.HTTP.request(
@ -44,13 +48,13 @@ defmodule Pleroma.ReverseProxy.Client.Tesla do
adapter.read_chunk(pid, stream, opts)
end
def close(client) do
def close(pid) do
adapter = Application.get_env(:tesla, :adapter)
unless adapter in @adapters do
raise "#{adapter} doesn't support closing connection"
end
adapter.close(client)
adapter.close(pid)
end
end

View File

@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do
{:poison, "~> 3.0", override: true},
{:tesla,
github: "alex-strizhakov/tesla",
ref: "beb8927358dfaa66ecd458df607befde12dd56e0",
ref: "c29a7fd030fa6decbf7091152f563fe322e2b589",
override: true},
{:cowlib, "~> 2.6.0", override: true},
{:gun, "~> 1.3"},

View File

@ -85,7 +85,7 @@
"swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
"syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]},
"tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "c29a7fd030fa6decbf7091152f563fe322e2b589", [ref: "c29a7fd030fa6decbf7091152f563fe322e2b589"]},
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"},
"trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},

View File

@ -0,0 +1,172 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Gun.ConnectionsTest do
use ExUnit.Case, async: true
alias Pleroma.Gun.{Connections, Conn, API}
setup do
name = :test_gun_connections
{:ok, pid} = Connections.start_link(name)
{:ok, name: name, pid: pid}
end
test "opens connection and reuse it on next request", %{name: name, pid: pid} do
conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
assert is_pid(conn)
assert Process.alive?(conn)
reused_conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
assert conn == reused_conn
%Connections{
conns: %{
"some-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
waiting_pids: []
}
}
} = Connections.get_state(name)
end
test "reuses connection based on protocol", %{name: name, pid: pid} do
conn = Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
assert is_pid(conn)
assert Process.alive?(conn)
https_conn = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
refute conn == https_conn
reused_https = Connections.get_conn(name, "https://some-domain.com", genserver_pid: pid)
refute conn == reused_https
assert reused_https == https_conn
%Connections{
conns: %{
"some-domain.com:80" => %Conn{
conn: ^conn,
state: :up,
waiting_pids: []
},
"some-domain.com:443" => %Conn{
conn: ^https_conn,
state: :up,
waiting_pids: []
}
}
} = Connections.get_state(name)
end
test "process gun_down message", %{name: name, pid: pid} do
conn = Connections.get_conn(name, "http://gun_down.com", genserver_pid: pid)
refute conn
%Connections{
conns: %{
"gun_down.com:80" => %Conn{
conn: _,
state: :down,
waiting_pids: _
}
}
} = Connections.get_state(name)
end
test "process gun_down message and then gun_up", %{name: name, pid: pid} do
conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
refute conn
%Connections{
conns: %{
"gun_down_and_up.com:80" => %Conn{
conn: _,
state: :down,
waiting_pids: _
}
}
} = Connections.get_state(name)
conn = Connections.get_conn(name, "http://gun_down_and_up.com", genserver_pid: pid)
assert is_pid(conn)
assert Process.alive?(conn)
%Connections{
conns: %{
"gun_down_and_up.com:80" => %Conn{
conn: _,
state: :up,
waiting_pids: []
}
}
} = Connections.get_state(name)
end
test "async processes get same conn for same domain", %{name: name, pid: pid} do
tasks =
for _ <- 1..5 do
Task.async(fn ->
Connections.get_conn(name, "http://some-domain.com", genserver_pid: pid)
end)
end
tasks_with_results = Task.yield_many(tasks)
results =
Enum.map(tasks_with_results, fn {task, res} ->
res || Task.shutdown(task, :brutal_kill)
end)
conns = for {:ok, value} <- results, do: value
%Connections{
conns: %{
"some-domain.com:80" => %Conn{
conn: conn,
state: :up,
waiting_pids: []
}
}
} = Connections.get_state(name)
assert Enum.all?(conns, fn res -> res == conn end)
end
describe "integration test" do
@describetag :integration
test "opens connection and reuse it on next request", %{name: name} do
api = Pleroma.Config.get([API])
Pleroma.Config.put([API], :gun)
on_exit(fn -> Pleroma.Config.put([API], api) end)
conn = Connections.get_conn(name, "http://httpbin.org")
assert is_pid(conn)
assert Process.alive?(conn)
reused_conn = Connections.get_conn(name, "http://httpbin.org")
assert conn == reused_conn
%Connections{
conns: %{
"httpbin.org:80" => %Conn{
conn: ^conn,
state: :up,
waiting_pids: []
}
}
} = Connections.get_state(name)
end
end
end

View File

@ -300,5 +300,38 @@ defmodule Pleroma.ReverseProxyTest do
describe "integration tests" do
@describetag :integration
test "with hackney client", %{conn: conn} do
client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney)
on_exit(fn ->
Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
end)
conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
assert byte_size(conn.resp_body) == 10
assert conn.state == :chunked
assert conn.status == 200
end
test "with tesla client with gun adapter", %{conn: conn} do
client = Pleroma.Config.get([Pleroma.ReverseProxy.Client])
Pleroma.Config.put([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Tesla)
adapter = Application.get_env(:tesla, :adapter)
Application.put_env(:tesla, :adapter, Tesla.Adapter.Gun)
conn = ReverseProxy.call(conn, "http://httpbin.org/stream-bytes/10")
assert byte_size(conn.resp_body) == 10
assert conn.state == :chunked
assert conn.status == 200
on_exit(fn ->
Pleroma.Config.put([Pleroma.ReverseProxy.Client], client)
Application.put_env(:tesla, :adapter, adapter)
end)
end
end
end