This reverts commitdevelopfe7fd33126
, reversing changes made to4fabf83ad0
.
parent
4397a3fe4a
commit
96816ceaa2
@ -0,0 +1,63 @@ |
||||
# Pleroma: A lightweight social networking server |
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> |
||||
# SPDX-License-Identifier: AGPL-3.0-only |
||||
|
||||
defmodule Pleroma.Activity.Ir.Topics do |
||||
alias Pleroma.Object |
||||
alias Pleroma.Web.ActivityPub.Visibility |
||||
|
||||
def get_activity_topics(activity) do |
||||
activity |
||||
|> Object.normalize() |
||||
|> generate_topics(activity) |
||||
|> List.flatten() |
||||
end |
||||
|
||||
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do |
||||
[] |
||||
end |
||||
|
||||
defp generate_topics(object, activity) do |
||||
["user", "list"] ++ visibility_tags(object, activity) |
||||
end |
||||
|
||||
defp visibility_tags(object, activity) do |
||||
case Visibility.get_visibility(activity) do |
||||
"public" -> |
||||
if activity.local do |
||||
["public", "public:local"] |
||||
else |
||||
["public"] |
||||
end |
||||
|> item_creation_tags(object, activity) |
||||
|
||||
"direct" -> |
||||
["direct"] |
||||
|
||||
_ -> |
||||
[] |
||||
end |
||||
end |
||||
|
||||
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do |
||||
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity) |
||||
end |
||||
|
||||
defp item_creation_tags(tags, _, _) do |
||||
tags |
||||
end |
||||
|
||||
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do |
||||
tags |
||||
|> Enum.filter(&is_bitstring(&1)) |
||||
|> Enum.map(fn tag -> "hashtag:" <> tag end) |
||||
end |
||||
|
||||
defp hashtags_to_topics(_), do: [] |
||||
|
||||
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: [] |
||||
|
||||
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"] |
||||
|
||||
defp attachment_topics(_object, _act), do: ["public:media"] |
||||
end |
@ -1,318 +0,0 @@ |
||||
# Pleroma: A lightweight social networking server |
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> |
||||
# SPDX-License-Identifier: AGPL-3.0-only |
||||
|
||||
defmodule Pleroma.Web.Streamer do |
||||
use GenServer |
||||
require Logger |
||||
alias Pleroma.Activity |
||||
alias Pleroma.Config |
||||
alias Pleroma.Conversation.Participation |
||||
alias Pleroma.Notification |
||||
alias Pleroma.Object |
||||
alias Pleroma.User |
||||
alias Pleroma.Web.ActivityPub.ActivityPub |
||||
alias Pleroma.Web.ActivityPub.Visibility |
||||
alias Pleroma.Web.CommonAPI |
||||
alias Pleroma.Web.MastodonAPI.NotificationView |
||||
|
||||
@keepalive_interval :timer.seconds(30) |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__) |
||||
end |
||||
|
||||
def add_socket(topic, socket) do |
||||
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic}) |
||||
end |
||||
|
||||
def remove_socket(topic, socket) do |
||||
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic}) |
||||
end |
||||
|
||||
def stream(topic, item) do |
||||
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) |
||||
end |
||||
|
||||
def init(args) do |
||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval) |
||||
|
||||
{:ok, args} |
||||
end |
||||
|
||||
def handle_info(%{action: :ping}, topics) do |
||||
topics |
||||
|> Map.values() |
||||
|> List.flatten() |
||||
|> Enum.each(fn socket -> |
||||
Logger.debug("Sending keepalive ping") |
||||
send(socket.transport_pid, {:text, ""}) |
||||
end) |
||||
|
||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do |
||||
recipient_topics = |
||||
User.get_recipients_from_activity(item) |
||||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics || [], fn user_topic -> |
||||
Logger.debug("Trying to push direct message to #{user_topic}\n\n") |
||||
push_to_socket(topics, user_topic, item) |
||||
end) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do |
||||
user_topic = "direct:#{participation.user_id}" |
||||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") |
||||
|
||||
push_to_socket(topics, user_topic, participation) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do |
||||
# filter the recipient list if the activity is not public, see #270. |
||||
recipient_lists = |
||||
case Visibility.is_public?(item) do |
||||
true -> |
||||
Pleroma.List.get_lists_from_activity(item) |
||||
|
||||
_ -> |
||||
Pleroma.List.get_lists_from_activity(item) |
||||
|> Enum.filter(fn list -> |
||||
owner = User.get_cached_by_id(list.user_id) |
||||
|
||||
Visibility.visible_for_user?(item, owner) |
||||
end) |
||||
end |
||||
|
||||
recipient_topics = |
||||
recipient_lists |
||||
|> Enum.map(fn %{id: id} -> "list:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics || [], fn list_topic -> |
||||
Logger.debug("Trying to push message to #{list_topic}\n\n") |
||||
push_to_socket(topics, list_topic, item) |
||||
end) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast( |
||||
%{action: :stream, topic: topic, item: %Notification{} = item}, |
||||
topics |
||||
) |
||||
when topic in ["user", "user:notification"] do |
||||
topics |
||||
|> Map.get("#{topic}:#{item.user_id}", []) |
||||
|> Enum.each(fn socket -> |
||||
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id), |
||||
true <- should_send?(user, item) do |
||||
send( |
||||
socket.transport_pid, |
||||
{:text, represent_notification(socket.assigns[:user], item)} |
||||
) |
||||
end |
||||
end) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do |
||||
Logger.debug("Trying to push to users") |
||||
|
||||
recipient_topics = |
||||
User.get_recipients_from_activity(item) |
||||
|> Enum.map(fn %{id: id} -> "user:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics, fn topic -> |
||||
push_to_socket(topics, topic, item) |
||||
end) |
||||
|
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do |
||||
Logger.debug("Trying to push to #{topic}") |
||||
Logger.debug("Pushing item to #{topic}") |
||||
push_to_socket(topics, topic, item) |
||||
{:noreply, topics} |
||||
end |
||||
|
||||
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do |
||||
topic = internal_topic(topic, socket) |
||||
sockets_for_topic = sockets[topic] || [] |
||||
sockets_for_topic = Enum.uniq([socket | sockets_for_topic]) |
||||
sockets = Map.put(sockets, topic, sockets_for_topic) |
||||
Logger.debug("Got new conn for #{topic}") |
||||
{:noreply, sockets} |
||||
end |
||||
|
||||
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do |
||||
topic = internal_topic(topic, socket) |
||||
sockets_for_topic = sockets[topic] || [] |
||||
sockets_for_topic = List.delete(sockets_for_topic, socket) |
||||
sockets = Map.put(sockets, topic, sockets_for_topic) |
||||
Logger.debug("Removed conn for #{topic}") |
||||
{:noreply, sockets} |
||||
end |
||||
|
||||
def handle_cast(m, state) do |
||||
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}") |
||||
{:noreply, state} |
||||
end |
||||
|
||||
defp represent_update(%Activity{} = activity, %User{} = user) do |
||||
%{ |
||||
event: "update", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.StatusView.render( |
||||
"status.json", |
||||
activity: activity, |
||||
for: user |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
defp represent_update(%Activity{} = activity) do |
||||
%{ |
||||
event: "update", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.StatusView.render( |
||||
"status.json", |
||||
activity: activity |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
def represent_conversation(%Participation{} = participation) do |
||||
%{ |
||||
event: "conversation", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ |
||||
participation: participation, |
||||
for: participation.user |
||||
}) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
@spec represent_notification(User.t(), Notification.t()) :: binary() |
||||
defp represent_notification(%User{} = user, %Notification{} = notify) do |
||||
%{ |
||||
event: "notification", |
||||
payload: |
||||
NotificationView.render( |
||||
"show.json", |
||||
%{notification: notify, for: user} |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
defp should_send?(%User{} = user, %Activity{} = item) do |
||||
blocks = user.info.blocks || [] |
||||
mutes = user.info.mutes || [] |
||||
reblog_mutes = user.info.muted_reblogs || [] |
||||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) |
||||
|
||||
with parent when not is_nil(parent) <- Object.normalize(item), |
||||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), |
||||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), |
||||
%{host: item_host} <- URI.parse(item.actor), |
||||
%{host: parent_host} <- URI.parse(parent.data["actor"]), |
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), |
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), |
||||
true <- thread_containment(item, user), |
||||
false <- CommonAPI.thread_muted?(user, item) do |
||||
true |
||||
else |
||||
_ -> false |
||||
end |
||||
end |
||||
|
||||
defp should_send?(%User{} = user, %Notification{activity: activity}) do |
||||
should_send?(user, activity) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do |
||||
Enum.each(topics[topic] || [], fn socket -> |
||||
# Get the current user so we have up-to-date blocks etc. |
||||
if socket.assigns[:user] do |
||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) |
||||
|
||||
if should_send?(user, item) do |
||||
send(socket.transport_pid, {:text, represent_update(item, user)}) |
||||
end |
||||
else |
||||
send(socket.transport_pid, {:text, represent_update(item)}) |
||||
end |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Participation{} = participation) do |
||||
Enum.each(topics[topic] || [], fn socket -> |
||||
send(socket.transport_pid, {:text, represent_conversation(participation)}) |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Activity{ |
||||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} |
||||
}) do |
||||
Enum.each(topics[topic] || [], fn socket -> |
||||
send( |
||||
socket.transport_pid, |
||||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} |
||||
) |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop |
||||
|
||||
def push_to_socket(topics, topic, item) do |
||||
Enum.each(topics[topic] || [], fn socket -> |
||||
# Get the current user so we have up-to-date blocks etc. |
||||
if socket.assigns[:user] do |
||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) |
||||
blocks = user.info.blocks || [] |
||||
mutes = user.info.mutes || [] |
||||
|
||||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), |
||||
true <- thread_containment(item, user) do |
||||
send(socket.transport_pid, {:text, represent_update(item, user)}) |
||||
end |
||||
else |
||||
send(socket.transport_pid, {:text, represent_update(item)}) |
||||
end |
||||
end) |
||||
end |
||||
|
||||
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do |
||||
"#{topic}:#{socket.assigns[:user].id}" |
||||
end |
||||
|
||||
defp internal_topic(topic, _), do: topic |
||||
|
||||
@spec thread_containment(Activity.t(), User.t()) :: boolean() |
||||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true |
||||
|
||||
defp thread_containment(activity, user) do |
||||
if Config.get([:instance, :skip_thread_containment]) do |
||||
true |
||||
else |
||||
ActivityPub.contain_activity(activity, user) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,33 @@ |
||||
defmodule Pleroma.Web.Streamer.Ping do |
||||
use GenServer |
||||
require Logger |
||||
|
||||
alias Pleroma.Web.Streamer.State |
||||
alias Pleroma.Web.Streamer.StreamerSocket |
||||
|
||||
@keepalive_interval :timer.seconds(30) |
||||
|
||||
def start_link(opts) do |
||||
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval) |
||||
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__) |
||||
end |
||||
|
||||
def init(%{ping_interval: ping_interval} = args) do |
||||
Process.send_after(self(), :ping, ping_interval) |
||||
{:ok, args} |
||||
end |
||||
|
||||
def handle_info(:ping, %{ping_interval: ping_interval} = state) do |
||||
State.get_sockets() |
||||
|> Map.values() |
||||
|> List.flatten() |
||||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} -> |
||||
Logger.debug("Sending keepalive ping") |
||||
send(transport_pid, {:text, ""}) |
||||
end) |
||||
|
||||
Process.send_after(self(), :ping, ping_interval) |
||||
|
||||
{:noreply, state} |
||||
end |
||||
end |
@ -0,0 +1,68 @@ |
||||
defmodule Pleroma.Web.Streamer.State do |
||||
use GenServer |
||||
require Logger |
||||
|
||||
alias Pleroma.Web.Streamer.StreamerSocket |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__) |
||||
end |
||||
|
||||
def add_socket(topic, socket) do |
||||
GenServer.call(__MODULE__, {:add, socket, topic}) |
||||
end |
||||
|
||||
def remove_socket(topic, socket) do |
||||
GenServer.call(__MODULE__, {:remove, socket, topic}) |
||||
end |
||||
|
||||
def get_sockets do |
||||
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state) |
||||
stream_sockets |
||||
end |
||||
|
||||
def init(init_arg) do |
||||
{:ok, init_arg} |
||||
end |
||||
|
||||
def handle_call(:get_state, _from, state) do |
||||
{:reply, state, state} |
||||
end |
||||
|
||||
def handle_call({:add, socket, topic}, _from, %{sockets: sockets} = state) do |
||||
internal_topic = internal_topic(topic, socket) |
||||
stream_socket = StreamerSocket.from_socket(socket) |
||||
|
||||
sockets_for_topic = |
||||
sockets |
||||
|> Map.get(internal_topic, []) |
||||
|> List.insert_at(0, stream_socket) |
||||
|> Enum.uniq() |
||||
|
||||
state = put_in(state, [:sockets, internal_topic], sockets_for_topic) |
||||
Logger.debug("Got new conn for #{topic}") |
||||
{:reply, state, state} |
||||
end |
||||
|
||||
def handle_call({:remove, socket, topic}, _from, %{sockets: sockets} = state) do |
||||
internal_topic = internal_topic(topic, socket) |
||||
stream_socket = StreamerSocket.from_socket(socket) |
||||
|
||||
sockets_for_topic = |
||||
sockets |
||||
|> Map.get(internal_topic, []) |
||||
|> List.delete(stream_socket) |
||||
|
||||
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic) |
||||
{:reply, state, state} |
||||
end |
||||
|
||||
defp internal_topic(topic, socket) |
||||
when topic in ~w[user user:notification direct] do |
||||
"#{topic}:#{socket.assigns[:user].id}" |
||||
end |
||||
|
||||
defp internal_topic(topic, _) do |
||||
topic |
||||
end |
||||
end |
@ -0,0 +1,55 @@ |
||||
# Pleroma: A lightweight social networking server |
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> |
||||
# SPDX-License-Identifier: AGPL-3.0-only |
||||
|
||||
defmodule Pleroma.Web.Streamer do |
||||
alias Pleroma.Web.Streamer.State |
||||
alias Pleroma.Web.Streamer.Worker |
||||
|
||||
@timeout 60_000 |
||||
@mix_env Mix.env() |
||||
|
||||
def add_socket(topic, socket) do |
||||
State.add_socket(topic, socket) |
||||
end |
||||
|
||||
def remove_socket(topic, socket) do |
||||
State.remove_socket(topic, socket) |
||||
end |
||||
|
||||
def get_sockets do |
||||
State.get_sockets() |
||||
end |
||||
|
||||
def stream(topics, items) do |
||||
if should_send?() do |
||||
Task.async(fn -> |
||||
:poolboy.transaction( |
||||
:streamer_worker, |
||||
&Worker.stream(&1, topics, items), |
||||
@timeout |
||||
) |
||||
end) |
||||
end |
||||
end |
||||
|
||||
def supervisor, do: Pleroma.Web.Streamer.Supervisor |
||||
|
||||
defp should_send? do |
||||
handle_should_send(@mix_env) |
||||
end |
||||
|
||||
defp handle_should_send(:test) do |
||||
case Process.whereis(:streamer_worker) do |
||||
nil -> |
||||
false |
||||
|
||||
pid -> |
||||
Process.alive?(pid) |
||||
end |
||||
end |
||||
|
||||
defp handle_should_send(_) do |
||||
true |
||||
end |
||||
end |
@ -0,0 +1,31 @@ |
||||
defmodule Pleroma.Web.Streamer.StreamerSocket do |
||||
defstruct transport_pid: nil, user: nil |
||||
|
||||
alias Pleroma.User |
||||
alias Pleroma.Web.Streamer.StreamerSocket |
||||
|
||||
def from_socket(%{ |
||||
transport_pid: transport_pid, |
||||
assigns: %{user: nil} |
||||
}) do |
||||
%StreamerSocket{ |
||||
transport_pid: transport_pid |
||||
} |
||||
end |
||||
|
||||
def from_socket(%{ |
||||
transport_pid: transport_pid, |
||||
assigns: %{user: %User{} = user} |
||||
}) do |
||||
%StreamerSocket{ |
||||
transport_pid: transport_pid, |
||||
user: user |
||||
} |
||||
end |
||||
|
||||
def from_socket(%{transport_pid: transport_pid}) do |
||||
%StreamerSocket{ |
||||
transport_pid: transport_pid |
||||
} |
||||
end |
||||
end |
@ -0,0 +1,33 @@ |
||||
defmodule Pleroma.Web.Streamer.Supervisor do |
||||
use Supervisor |
||||
|
||||
def start_link(opts) do |
||||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) |
||||
end |
||||
|
||||
def init(args) do |
||||
children = [ |
||||
{Pleroma.Web.Streamer.State, args}, |
||||
{Pleroma.Web.Streamer.Ping, args}, |
||||
:poolboy.child_spec(:streamer_worker, poolboy_config()) |
||||
] |
||||
|
||||
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor] |
||||
Supervisor.init(children, opts) |
||||
end |
||||
|
||||
defp poolboy_config do |
||||
opts = |
||||
Pleroma.Config.get(:streamer, |
||||
workers: 3, |
||||
overflow_workers: 2 |
||||
) |
||||
|
||||
[ |
||||
{:name, {:local, :streamer_worker}}, |
||||
{:worker_module, Pleroma.Web.Streamer.Worker}, |
||||
{:size, opts[:workers]}, |
||||
{:max_overflow, opts[:overflow_workers]} |
||||
] |
||||
end |
||||
end |
@ -0,0 +1,220 @@ |
||||
defmodule Pleroma.Web.Streamer.Worker do |
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Pleroma.Activity |
||||
alias Pleroma.Config |
||||
alias Pleroma.Conversation.Participation |
||||
alias Pleroma.Notification |
||||
alias Pleroma.Object |
||||
alias Pleroma.User |
||||
alias Pleroma.Web.ActivityPub.ActivityPub |
||||
alias Pleroma.Web.ActivityPub.Visibility |
||||
alias Pleroma.Web.CommonAPI |
||||
alias Pleroma.Web.Streamer.State |
||||
alias Pleroma.Web.Streamer.StreamerSocket |
||||
alias Pleroma.Web.StreamerView |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, %{}, []) |
||||
end |
||||
|
||||
def init(init_arg) do |
||||
{:ok, init_arg} |
||||
end |
||||
|
||||
def stream(pid, topics, items) do |
||||
GenServer.call(pid, {:stream, topics, items}) |
||||
end |
||||
|
||||
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do |
||||
Enum.each(topics, fn t -> |
||||
do_stream(%{topic: t, item: item}) |
||||
end) |
||||
|
||||
{:reply, state, state} |
||||
end |
||||
|
||||
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do |
||||
Enum.each(items, fn i -> |
||||
do_stream(%{topic: topic, item: i}) |
||||
end) |
||||
|
||||
{:reply, state, state} |
||||
end |
||||
|
||||
def handle_call({:stream, topic, item}, _from, state) do |
||||
do_stream(%{topic: topic, item: item}) |
||||
|
||||
{:reply, state, state} |
||||
end |
||||
|
||||
defp do_stream(%{topic: "direct", item: item}) do |
||||
recipient_topics = |
||||
User.get_recipients_from_activity(item) |
||||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics, fn user_topic -> |
||||
Logger.debug("Trying to push direct message to #{user_topic}\n\n") |
||||
push_to_socket(State.get_sockets(), user_topic, item) |
||||
end) |
||||
end |
||||
|
||||
defp do_stream(%{topic: "participation", item: participation}) do |
||||
user_topic = "direct:#{participation.user_id}" |
||||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n") |
||||
|
||||
push_to_socket(State.get_sockets(), user_topic, participation) |
||||
end |
||||
|
||||
defp do_stream(%{topic: "list", item: item}) do |
||||
# filter the recipient list if the activity is not public, see #270. |
||||
recipient_lists = |
||||
case Visibility.is_public?(item) do |
||||
true -> |
||||
Pleroma.List.get_lists_from_activity(item) |
||||
|
||||
_ -> |
||||
Pleroma.List.get_lists_from_activity(item) |
||||
|> Enum.filter(fn list -> |
||||
owner = User.get_cached_by_id(list.user_id) |
||||
|
||||
Visibility.visible_for_user?(item, owner) |
||||
end) |
||||
end |
||||
|
||||
recipient_topics = |
||||
recipient_lists |
||||
|> Enum.map(fn %{id: id} -> "list:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics, fn list_topic -> |
||||
Logger.debug("Trying to push message to #{list_topic}\n\n") |
||||
push_to_socket(State.get_sockets(), list_topic, item) |
||||
end) |
||||
end |
||||
|
||||
defp do_stream(%{topic: topic, item: %Notification{} = item}) |
||||
when topic in ["user", "user:notification"] do |
||||
State.get_sockets() |
||||
|> Map.get("#{topic}:#{item.user_id}", []) |
||||
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} -> |
||||
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id), |
||||
true <- should_send?(user, item) do |
||||
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)}) |
||||
end |
||||
end) |
||||
end |
||||
|
||||
defp do_stream(%{topic: "user", item: item}) do |
||||
Logger.debug("Trying to push to users") |
||||
|
||||
recipient_topics = |
||||
User.get_recipients_from_activity(item) |
||||
|> Enum.map(fn %{id: id} -> "user:#{id}" end) |
||||
|
||||
Enum.each(recipient_topics, fn topic -> |
||||
push_to_socket(State.get_sockets(), topic, item) |
||||
end) |
||||
end |
||||
|
||||
defp do_stream(%{topic: topic, item: item}) do |
||||
Logger.debug("Trying to push to #{topic}") |
||||
Logger.debug("Pushing item to #{topic}") |
||||
push_to_socket(State.get_sockets(), topic, item) |
||||
end |
||||
|
||||
defp should_send?(%User{} = user, %Activity{} = item) do |
||||
blocks = user.info.blocks || [] |
||||
mutes = user.info.mutes || [] |
||||
reblog_mutes = user.info.muted_reblogs || [] |
||||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks) |
||||
|
||||
with parent when not is_nil(parent) <- Object.normalize(item), |
||||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)), |
||||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)), |
||||
%{host: item_host} <- URI.parse(item.actor), |
||||
%{host: parent_host} <- URI.parse(parent.data["actor"]), |
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host), |
||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host), |
||||
true <- thread_containment(item, user), |
||||
false <- CommonAPI.thread_muted?(user, item) do |
||||
true |
||||
else |
||||
_ -> false |
||||
end |
||||
end |
||||
|
||||
defp should_send?(%User{} = user, %Notification{activity: activity}) do |
||||
should_send?(user, activity) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do |
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{ |
||||
transport_pid: transport_pid, |
||||
user: socket_user |
||||
} -> |
||||
# Get the current user so we have up-to-date blocks etc. |
||||
if socket_user do |
||||
user = User.get_cached_by_ap_id(socket_user.ap_id) |
||||
|
||||
if should_send?(user, item) do |
||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) |
||||
end |
||||
else |
||||
send(transport_pid, {:text, StreamerView.render("update.json", item)}) |
||||
end |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Participation{} = participation) do |
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> |
||||
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)}) |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(topics, topic, %Activity{ |
||||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} |
||||
}) do |
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} -> |
||||
send( |
||||
transport_pid, |
||||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()} |
||||
) |
||||
end) |
||||
end |
||||
|
||||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop |
||||
|
||||
def push_to_socket(topics, topic, item) do |
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{ |
||||
transport_pid: transport_pid, |
||||
user: socket_user |
||||
} -> |
||||
# Get the current user so we have up-to-date blocks etc. |
||||
if socket_user do |
||||
user = User.get_cached_by_ap_id(socket_user.ap_id) |
||||
blocks = user.info.blocks || [] |
||||
mutes = user.info.mutes || [] |
||||
|
||||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)), |
||||
true <- thread_containment(item, user) do |
||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)}) |
||||
end |
||||
else |
||||
send(transport_pid, {:text, StreamerView.render("update.json", item)}) |
||||
end |
||||
end) |
||||
end |
||||
|
||||
@spec thread_containment(Activity.t(), User.t()) :: boolean() |
||||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true |
||||
|
||||
defp thread_containment(activity, user) do |
||||
if Config.get([:instance, :skip_thread_containment]) do |
||||
true |
||||
else |
||||
ActivityPub.contain_activity(activity, user) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,66 @@ |
||||
# Pleroma: A lightweight social networking server |
||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> |
||||
# SPDX-License-Identifier: AGPL-3.0-only |
||||
|
||||
defmodule Pleroma.Web.StreamerView do |
||||
use Pleroma.Web, :view |
||||
|
||||
alias Pleroma.Activity |
||||
alias Pleroma.Conversation.Participation |
||||
alias Pleroma.Notification |
||||
alias Pleroma.User |
||||
alias Pleroma.Web.MastodonAPI.NotificationView |
||||
|
||||
def render("update.json", %Activity{} = activity, %User{} = user) do |
||||
%{ |
||||
event: "update", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.StatusView.render( |
||||
"status.json", |
||||
activity: activity, |
||||
for: user |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
def render("notification.json", %User{} = user, %Notification{} = notify) do |
||||
%{ |
||||
event: "notification", |
||||
payload: |
||||
NotificationView.render( |
||||
"show.json", |
||||
%{notification: notify, for: user} |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
def render("update.json", %Activity{} = activity) do |
||||
%{ |
||||
event: "update", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.StatusView.render( |
||||
"status.json", |
||||
activity: activity |
||||
) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
|
||||
def render("conversation.json", %Participation{} = participation) do |
||||
%{ |
||||
event: "conversation", |
||||
payload: |
||||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{ |
||||
participation: participation, |
||||
for: participation.user |
||||
}) |
||||
|> Jason.encode!() |
||||
} |
||||
|> Jason.encode!() |
||||
end |
||||
end |
@ -0,0 +1,141 @@ |
||||
defmodule Pleroma.Activity.Ir.TopicsTest do |
||||
use Pleroma.DataCase |
||||
|
||||
alias Pleroma.Activity |
||||
alias Pleroma.Activity.Ir.Topics |
||||
alias Pleroma.Object |
||||
|
||||
require Pleroma.Constants |
||||
|
||||
describe "poll answer" do |
||||
test "produce no topics" do |
||||
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}} |
||||
|
||||
assert [] == Topics.get_activity_topics(activity) |
||||
end |
||||
end |
||||
|
||||
describe "non poll answer" do |
||||
test "always add user and list topics" do |
||||
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}} |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "user") |
||||
assert Enum.member?(topics, "list") |
||||
end |
||||
end |
||||
|
||||
describe "public visibility" do |
||||
setup do |
||||
activity = %Activity{ |
||||
object: %Object{data: %{"type" => "Note"}}, |
||||
data: %{"to" => [Pleroma.Constants.as_public()]} |
||||
} |
||||
|
||||
{:ok, activity: activity} |
||||
end |
||||
|
||||
test "produces public topic", %{activity: activity} do |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "public") |
||||
end |
||||
|
||||
test "local action produces public:local topic", %{activity: activity} do |
||||
activity = %{activity | local: true} |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "public:local") |
||||
end |
||||
|
||||
test "non-local action does not produce public:local topic", %{activity: activity} do |
||||
activity = %{activity | local: false} |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
refute Enum.member?(topics, "public:local") |
||||
end |
||||
end |
||||
|
||||
describe "public visibility create events" do |
||||
setup do |
||||
activity = %Activity{ |
||||
object: %Object{data: %{"type" => "Create", "attachment" => []}}, |
||||
data: %{"to" => [Pleroma.Constants.as_public()]} |
||||
} |
||||
|
||||
{:ok, activity: activity} |
||||
end |
||||
|
||||
test "with no attachments doesn't produce public:media topics", %{activity: activity} do |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
refute Enum.member?(topics, "public:media") |
||||
refute Enum.member?(topics, "public:local:media") |
||||
end |
||||
|
||||
test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do |
||||
tagged_data = Map.put(data, "tag", ["foo", "bar"]) |
||||
activity = %{activity | object: %{object | data: tagged_data}} |
||||
|
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "hashtag:foo") |
||||
assert Enum.member?(topics, "hashtag:bar") |
||||
end |
||||
|
||||
test "only converts strinngs to hash tags", %{ |
||||
activity: %{object: %{data: data} = object} = activity |
||||
} do |
||||
tagged_data = Map.put(data, "tag", [2]) |
||||
activity = %{activity | object: %{object | data: tagged_data}} |
||||
|
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
refute Enum.member?(topics, "hashtag:2") |
||||
end |
||||
end |
||||
|
||||
describe "public visibility create events with attachments" do |
||||
setup do |
||||
activity = %Activity{ |
||||
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}}, |
||||
data: %{"to" => [Pleroma.Constants.as_public()]} |
||||
} |
||||
|
||||
{:ok, activity: activity} |
||||
end |
||||
|
||||
test "produce public:media topics", %{activity: activity} do |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "public:media") |
||||
end |
||||
|
||||
test "local produces public:local:media topics", %{activity: activity} do |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "public:local:media") |
||||
end |
||||
|
||||
test "non-local doesn't produce public:local:media topics", %{activity: activity} do |
||||
activity = %{activity | local: false} |
||||
|
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
refute Enum.member?(topics, "public:local:media") |
||||
end |
||||
end |
||||
|
||||
describe "non-public visibility" do |
||||
test "produces direct topic" do |
||||
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}} |
||||
topics = Topics.get_activity_topics(activity) |
||||
|
||||
assert Enum.member?(topics, "direct") |
||||
refute Enum.member?(topics, "public") |
||||
refute Enum.member?(topics, "public:local") |
||||
refute Enum.member?(topics, "public:media") |
||||
refute Enum.member?(topics, "public:local:media") |
||||
end |
||||
end |
||||
end |