# Pleroma: A lightweight social networking server # Copyright © 2017-2020 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.FedSockets.OutgoingHandler do use GenServer require Logger alias Pleroma.Application alias Pleroma.Web.ActivityPub.InternalFetchActor alias Pleroma.Web.FedSockets alias Pleroma.Web.FedSockets.FedRegistry alias Pleroma.Web.FedSockets.FedSocket alias Pleroma.Web.FedSockets.SocketInfo def start_link(uri) do GenServer.start_link(__MODULE__, %{uri: uri}) end def init(%{uri: uri}) do case initiate_connection(uri) do {:ok, ws_origin, conn_pid} -> FedRegistry.add_fed_socket(ws_origin, conn_pid) {:error, reason} -> Logger.debug("Outgoing connection failed - #{inspect(reason)}") :ignore end end def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do socket_info = SocketInfo.touch(socket_info) case FedSocket.receive_package(socket_info, data) do {:noreply, _} -> {:noreply, socket_info} {:reply, reply} -> :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)}) {:noreply, socket_info} {:error, reason} -> Logger.error("incoming error - receive_package: #{inspect(reason)}") {:noreply, socket_info} end end def handle_info(:close, state) do Logger.debug("Sending close frame !!!!!!!") {:close, state} end def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do {:stop, :normal, state} end def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do socket_info = SocketInfo.touch(socket_info) :gun.ws_send(conn_pid, {:text, data}) {:noreply, socket_info} end def handle_info({:gun_ws, _, _, :pong}, state) do {:noreply, state, :hibernate} end def handle_info(msg, state) do Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}") {:noreply, state} end def terminate(reason, state) do Logger.debug( "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}" ) {:ok, state} end def initiate_connection(uri) do ws_uri = uri |> SocketInfo.origin() |> FedSockets.uri_for_origin() %{host: host, port: port, path: path} = URI.parse(ws_uri) with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}), {:ok, _} <- :gun.await_up(conn_pid), reference <- :gun.get(conn_pid, to_charlist(path), [ {'user-agent', to_charlist(Application.user_agent())} ]), {:response, :fin, 204, _} <- :gun.await(conn_pid, reference), headers <- build_headers(uri), ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do receive do {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} -> {:ok, ws_uri, conn_pid} after 15_000 -> Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}") {:error, :timeout} end else {:response, :nofin, 404, _} -> {:error, :fedsockets_not_supported} e -> Logger.debug("Fedsocket error connecting to #{inspect(uri)}") {:error, e} end end defp build_headers(uri) do host_for_sig = uri |> URI.parse() |> host_signature() shake = FedSocket.shake() digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64()) date = Pleroma.Signature.signed_date() shake_size = byte_size(shake) signature_opts = %{ "(request-target)": shake, "content-length": to_charlist("#{shake_size}"), date: date, digest: digest, host: host_for_sig } signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts) [ {'signature', to_charlist(signature)}, {'date', date}, {'digest', to_charlist(digest)}, {'content-length', to_charlist("#{shake_size}")}, {to_charlist("(request-target)"), to_charlist(shake)}, {'user-agent', to_charlist(Application.user_agent())} ] end defp host_signature(%{host: host, scheme: scheme, port: port}) do if port == URI.default_port(scheme) do host else "#{host}:#{port}" end end end