diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex index 8caf989a7..4ebe16e18 100644 --- a/lib/pleroma/http/connection.ex +++ b/lib/pleroma/http/connection.ex @@ -10,11 +10,7 @@ defmodule Pleroma.HTTP.Connection do @options [ connect_timeout: 10_000, protocols: [:http], - timeout: 20_000, - recv_timeout: 20_000, - follow_redirect: true, - force_redirect: true, - pool: :federation + timeout: 20_000 ] @adapter Application.get_env(:tesla, :adapter) diff --git a/lib/pleroma/reverse_proxy/client.ex b/lib/pleroma/reverse_proxy/client.ex index 776c4794c..42f2ff13b 100644 --- a/lib/pleroma/reverse_proxy/client.ex +++ b/lib/pleroma/reverse_proxy/client.ex @@ -3,9 +3,14 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.ReverseProxy.Client do - @callback request(atom(), String.t(), [tuple()], String.t(), list()) :: - {:ok, pos_integer(), [tuple()], reference() | map()} - | {:ok, pos_integer(), [tuple()]} + @type status :: pos_integer() + @type header_name :: String.t() + @type header_value :: String.t() + @type headers :: [{header_name(), header_value()}] + + @callback request(atom(), String.t(), headers(), String.t(), list()) :: + {:ok, status(), headers(), reference() | map()} + | {:ok, status(), headers()} | {:ok, reference()} | {:error, term()} @@ -14,8 +19,8 @@ defmodule Pleroma.ReverseProxy.Client do @callback close(reference() | pid() | map()) :: :ok - def request(method, url, headers, "", opts \\ []) do - client().request(method, url, headers, "", opts) + def request(method, url, headers, body \\ "", opts \\ []) do + client().request(method, url, headers, body, opts) end def stream_body(ref), do: client().stream_body(ref) @@ -23,6 +28,6 @@ defmodule Pleroma.ReverseProxy.Client do def close(ref), do: client().close(ref) defp client do - Pleroma.Config.get([Pleroma.ReverseProxy.Client], :hackney) + Pleroma.Config.get([Pleroma.ReverseProxy.Client], Pleroma.ReverseProxy.Client.Hackney) end end diff --git a/lib/pleroma/reverse_proxy/client/hackney.ex b/lib/pleroma/reverse_proxy/client/hackney.ex new file mode 100644 index 000000000..e6293646a --- /dev/null +++ b/lib/pleroma/reverse_proxy/client/hackney.ex @@ -0,0 +1,17 @@ +defmodule Pleroma.ReverseProxy.Client.Hackney do + @behaviour Pleroma.ReverseProxy.Client + + def request(method, url, headers, body, opts \\ []) do + :hackney.request(method, url, headers, body, opts) + end + + def stream_body(ref) do + case :hackney.stream_body(ref) do + :done -> :done + {:ok, data} -> {:ok, data, ref} + {:error, error} -> {:error, error} + end + end + + def close(ref), do: :hackney.close(ref) +end diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex new file mode 100644 index 000000000..d2944b9dc --- /dev/null +++ b/lib/pleroma/reverse_proxy/client/tesla.ex @@ -0,0 +1,48 @@ +defmodule Pleroma.ReverseProxy.Client.Tesla do + @behaviour Pleroma.ReverseProxy.Client + + @adapters [Tesla.Adapter.Gun] + alias Pleroma.HTTP + + def request(method, url, headers, body, opts \\ []) do + adapter_opts = + Keyword.get(opts, :adapter, []) + |> Keyword.put(:chunks_response, true) + + with {:ok, response} <- + HTTP.request(method, url, body, headers, Keyword.put(opts, :adapter, adapter_opts)) do + {:ok, response.status, response.headers, response.body} + else + {:error, error} -> {:error, error} + end + end + + def stream_body(%{fin: true}), do: :done + + def stream_body(client) do + case read_chunk!(client) do + {:fin, body} -> {:ok, body, Map.put(client, :fin, true)} + {:nofin, part} -> {:ok, part, client} + end + end + + defp read_chunk!(client) do + adapter = Application.get_env(:tesla, :adapter) + + unless adapter in @adapters do + raise "#{adapter} doesn't support reading body in chunks" + end + + adapter.read_chunk(client) + end + + def close(client) do + adapter = Application.get_env(:tesla, :adapter) + + unless adapter in @adapters do + raise "#{adapter} doesn't support closing connection" + end + + adapter.close(client) + end +end diff --git a/lib/pleroma/reverse_proxy/reverse_proxy.ex b/lib/pleroma/reverse_proxy/reverse_proxy.ex index 3212bf90d..a2cdcf393 100644 --- a/lib/pleroma/reverse_proxy/reverse_proxy.ex +++ b/lib/pleroma/reverse_proxy/reverse_proxy.ex @@ -61,7 +61,7 @@ defmodule Pleroma.ReverseProxy do * `http`: options for [hackney](https://github.com/benoitc/hackney). """ - @default_hackney_options [pool: :media] + @default_options [pool: :media] @inline_content_types [ "image/gif", @@ -93,9 +93,9 @@ defmodule Pleroma.ReverseProxy do def call(_conn, _url, _opts \\ []) def call(conn = %{method: method}, url, opts) when method in @methods do - hackney_opts = + client_opts = Pleroma.HTTP.Connection.options([]) - |> Keyword.merge(@default_hackney_options) + |> Keyword.merge(@default_options) |> Keyword.merge(Keyword.get(opts, :http, [])) |> HTTP.process_request_options() @@ -108,7 +108,7 @@ defmodule Pleroma.ReverseProxy do opts end - with {:ok, code, headers, client} <- request(method, url, req_headers, hackney_opts), + with {:ok, code, headers, client} <- request(method, url, req_headers, client_opts), :ok <- header_length_constraint( headers, @@ -201,7 +201,7 @@ defmodule Pleroma.ReverseProxy do duration, Keyword.get(opts, :max_read_duration, @max_read_duration) ), - {:ok, data} <- client().stream_body(client), + {:ok, data, client} <- client().stream_body(client), {:ok, duration} <- increase_read_duration(duration), sent_so_far = sent_so_far + byte_size(data), :ok <- diff --git a/mix.exs b/mix.exs index 84d58f5a6..f1fbdc6b3 100644 --- a/mix.exs +++ b/mix.exs @@ -113,7 +113,7 @@ defmodule Pleroma.Mixfile do {:poison, "~> 3.0", override: true}, {:tesla, github: "alex-strizhakov/tesla", - ref: "9ad792fb630bdfc2266ed13b830c28b6552fb3f9", + ref: "beb8927358dfaa66ecd458df607befde12dd56e0", override: true}, {:cowlib, "~> 2.6.0", override: true}, {:gun, "~> 1.3"}, diff --git a/mix.lock b/mix.lock index d79bb9c80..635f20185 100644 --- a/mix.lock +++ b/mix.lock @@ -85,7 +85,7 @@ "swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"}, "syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]}, "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"}, - "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "9ad792fb630bdfc2266ed13b830c28b6552fb3f9", [ref: "9ad792fb630bdfc2266ed13b830c28b6552fb3f9"]}, + "tesla": {:git, "https://github.com/alex-strizhakov/tesla.git", "beb8927358dfaa66ecd458df607befde12dd56e0", [ref: "beb8927358dfaa66ecd458df607befde12dd56e0"]}, "timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm"}, "trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "tzdata": {:hex, :tzdata, "0.5.21", "8cbf3607fcce69636c672d5be2bbb08687fe26639a62bdcc283d267277db7cf0", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, diff --git a/test/reverse_proxy_test.exs b/test/reverse_proxy_test.exs index 3a83c4c48..0644d1ed9 100644 --- a/test/reverse_proxy_test.exs +++ b/test/reverse_proxy_test.exs @@ -29,11 +29,11 @@ defmodule Pleroma.ReverseProxyTest do {"content-length", byte_size(json) |> to_string()} ], %{url: url}} end) - |> expect(:stream_body, invokes, fn %{url: url} -> + |> expect(:stream_body, invokes, fn %{url: url} = client -> case Registry.lookup(Pleroma.ReverseProxy.ClientMock, url) do [{_, 0}] -> Registry.update_value(Pleroma.ReverseProxy.ClientMock, url, &(&1 + 1)) - {:ok, json} + {:ok, json, client} [{_, 1}] -> Registry.unregister(Pleroma.ReverseProxy.ClientMock, url) @@ -66,6 +66,38 @@ defmodule Pleroma.ReverseProxyTest do assert conn.halted end + defp stream_mock(invokes, with_close? \\ false) do + ClientMock + |> expect(:request, fn :get, "/stream-bytes/" <> length, _, _, _ -> + Registry.register(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length, 0) + + {:ok, 200, [{"content-type", "application/octet-stream"}], + %{url: "/stream-bytes/" <> length}} + end) + |> expect(:stream_body, invokes, fn %{url: "/stream-bytes/" <> length} = client -> + max = String.to_integer(length) + + case Registry.lookup(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length) do + [{_, current}] when current < max -> + Registry.update_value( + Pleroma.ReverseProxy.ClientMock, + "/stream-bytes/" <> length, + &(&1 + 10) + ) + + {:ok, "0123456789", client} + + [{_, ^max}] -> + Registry.unregister(Pleroma.ReverseProxy.ClientMock, "/stream-bytes/" <> length) + :done + end + end) + + if with_close? do + expect(ClientMock, :close, fn _ -> :ok end) + end + end + describe "max_body " do test "length returns error if content-length more than option", %{conn: conn} do user_agent_mock("hackney/1.15.1", 0) @@ -179,12 +211,12 @@ defmodule Pleroma.ReverseProxyTest do Registry.register(Pleroma.ReverseProxy.ClientMock, "/headers", 0) {:ok, 200, [{"content-type", "application/json"}], %{url: "/headers", headers: headers}} end) - |> expect(:stream_body, 2, fn %{url: url, headers: headers} -> + |> expect(:stream_body, 2, fn %{url: url, headers: headers} = client -> case Registry.lookup(Pleroma.ReverseProxy.ClientMock, url) do [{_, 0}] -> Registry.update_value(Pleroma.ReverseProxy.ClientMock, url, &(&1 + 1)) headers = for {k, v} <- headers, into: %{}, do: {String.capitalize(k), v} - {:ok, Jason.encode!(%{headers: headers})} + {:ok, Jason.encode!(%{headers: headers}), client} [{_, 1}] -> Registry.unregister(Pleroma.ReverseProxy.ClientMock, url) @@ -261,11 +293,11 @@ defmodule Pleroma.ReverseProxyTest do {:ok, 200, headers, %{url: "/disposition"}} end) - |> expect(:stream_body, 2, fn %{url: "/disposition"} -> + |> expect(:stream_body, 2, fn %{url: "/disposition"} = client -> case Registry.lookup(Pleroma.ReverseProxy.ClientMock, "/disposition") do [{_, 0}] -> Registry.update_value(Pleroma.ReverseProxy.ClientMock, "/disposition", &(&1 + 1)) - {:ok, ""} + {:ok, "", client} [{_, 1}] -> Registry.unregister(Pleroma.ReverseProxy.ClientMock, "/disposition")