From 4f27d187b3b9376a3425fb3d350444dee27db5cf Mon Sep 17 00:00:00 2001 From: Roger Braun Date: Wed, 8 Nov 2017 17:43:59 +0100 Subject: [PATCH] Merge streaming. --- lib/pleroma/application.ex | 3 +- lib/pleroma/notification.ex | 10 ++++- .../mastodon_api/mastodon_api_controller.ex | 28 +++++-------- .../mastodon_api/views/notification_view.ex | 27 +++++++++++++ lib/pleroma/web/router.ex | 2 + lib/pleroma/web/streamer/streamer.ex | 40 +++++++++++++++++++ lib/pleroma/web/twitter_api/twitter_api.ex | 2 + 7 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 lib/pleroma/web/mastodon_api/views/notification_view.ex create mode 100644 lib/pleroma/web/streamer/streamer.ex diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 1f0a05568..5422cbc28 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -19,7 +19,8 @@ defmodule Pleroma.Application do ttl_interval: 1000, limit: 2500 ]]), - worker(Pleroma.Web.Federator, []) + worker(Pleroma.Web.Federator, []), + worker(Pleroma.Web.Streamer, []) ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index 00a382f31..9c6919864 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -2,6 +2,8 @@ defmodule Pleroma.Notification do use Ecto.Schema alias Pleroma.{User, Activity, Notification, Repo} import Ecto.Query + alias Pleroma.Web.Streamer + alias Pleroma.Web.MastodonAPI.NotificationView schema "notifications" do field :seen, :boolean, default: false @@ -47,8 +49,14 @@ defmodule Pleroma.Notification do # TODO move to sql, too. def create_notification(%Activity{} = activity, %User{} = user) do unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do - notification = %Notification{user_id: user.id, activity_id: activity.id} + notification = %Notification{user: user, activity: activity} {:ok, notification} = Repo.insert(notification) + + json = NotificationView.render("notification.json", %{notification: notification}) + |> Poison.encode! + + Streamer.stream(user, %{type: "notification", payload: json}) + notification end end diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex index feaf9a900..88f2c605a 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex @@ -3,10 +3,11 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do alias Pleroma.{Repo, Activity, User, Notification} alias Pleroma.Web.OAuth.App alias Pleroma.Web - alias Pleroma.Web.MastodonAPI.{StatusView, AccountView} + alias Pleroma.Web.MastodonAPI.{StatusView, AccountView, NotificationView} alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.TwitterAPI.TwitterAPI alias Pleroma.Web.CommonAPI + alias Pleroma.Web.Streamer import Ecto.Query import Logger @@ -193,23 +194,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do def notifications(%{assigns: %{user: user}} = conn, params) do notifications = Notification.for_user(user, params) - result = Enum.map(notifications, fn (%{id: id, activity: activity, inserted_at: created_at}) -> - actor = User.get_cached_by_ap_id(activity.data["actor"]) - created_at = NaiveDateTime.to_iso8601(created_at) - |> String.replace(~r/(\.\d+)?$/, ".000Z", global: false) - case activity.data["type"] do - "Create" -> - %{id: id, type: "mention", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: activity, for: user})} - "Like" -> - liked_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"]) - %{id: id, type: "favourite", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: liked_activity, for: user})} - "Announce" -> - announced_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"]) - %{id: id, type: "reblog", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: announced_activity, for: user})} - "Follow" -> - %{id: id, type: "follow", created_at: created_at, account: AccountView.render("account.json", %{user: actor})} - _ -> nil - end + result = Enum.map(notifications, fn (notification) -> + NotificationView.render("notification.json", %{notification: notification}) end) |> Enum.filter(&(&1)) @@ -397,6 +383,12 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do |> render(StatusView, "index.json", %{activities: activities, for: user, as: :activity}) end + def stream_user(%{assigns: %{user: user}} = conn, _params) do + Streamer.add_conn(user, conn) + receive do + end + end + def relationship_noop(%{assigns: %{user: user}} = conn, %{"id" => id}) do Logger.debug("Unimplemented, returning unmodified relationship") with %User{} = target <- Repo.get(User, id) do diff --git a/lib/pleroma/web/mastodon_api/views/notification_view.ex b/lib/pleroma/web/mastodon_api/views/notification_view.ex new file mode 100644 index 000000000..2a7264caf --- /dev/null +++ b/lib/pleroma/web/mastodon_api/views/notification_view.ex @@ -0,0 +1,27 @@ +defmodule Pleroma.Web.MastodonAPI.NotificationView do + use Pleroma.Web, :view + alias Pleroma.{User, Activity} + alias Pleroma.Web.MastodonAPI.{AccountView, StatusView} + + def render("notification.json", %{notification: notification}) do + id = notification.id + activity = notification.activity + created_at = notification.inserted_at + actor = User.get_cached_by_ap_id(activity.data["actor"]) + created_at = NaiveDateTime.to_iso8601(created_at) + |> String.replace(~r/(\.\d+)?$/, ".000Z", global: false) + case activity.data["type"] do + "Create" -> + %{id: id, type: "mention", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: activity})} + "Like" -> + liked_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"]) + %{id: id, type: "favourite", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: liked_activity})} + "Announce" -> + announced_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"]) + %{id: id, type: "reblog", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: announced_activity})} + "Follow" -> + %{id: id, type: "follow", created_at: created_at, account: AccountView.render("account.json", %{user: actor})} + _ -> nil + end + end +end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index 0a0aea966..db3a53141 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -53,6 +53,8 @@ defmodule Pleroma.Web.Router do scope "/api/v1", Pleroma.Web.MastodonAPI do pipe_through :authenticated_api + get "/streaming/user", MastodonAPIController, :stream_user + get "/accounts/verify_credentials", MastodonAPIController, :verify_credentials get "/accounts/relationships", MastodonAPIController, :relationships get "/accounts/search", MastodonAPIController, :account_search diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex new file mode 100644 index 000000000..f508e716d --- /dev/null +++ b/lib/pleroma/web/streamer/streamer.ex @@ -0,0 +1,40 @@ +defmodule Pleroma.Web.Streamer do + use GenServer + require Logger + import Plug.Conn + + def start_link do + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + def add_conn(user, conn) do + GenServer.cast(__MODULE__, %{action: :add, user: user, conn: conn}) + end + + def stream(user, item) do + GenServer.cast(__MODULE__, %{action: :stream, user: user, item: item}) + end + + def handle_cast(%{action: :stream, user: user, item: item}, users) do + if conn = users[user.id] do + Logger.debug("Pushing item to #{user.id}, #{user.nickname}") + chunk(conn, "event: #{item.type}\ndata: #{item.payload}\n\n") + end + {:noreply, users} + end + + def handle_cast(%{action: :add, user: user, conn: conn}, users) do + conn = conn + |> put_resp_header("content-type", "text/event-stream") + |> send_chunked(200) + + users = Map.put(users, user.id, conn) + Logger.debug("Got new conn for user #{user.id}, #{user.nickname}") + {:noreply, users} + end + + def handle_cast(m, state) do + IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") + {:noreply, state} + end +end diff --git a/lib/pleroma/web/twitter_api/twitter_api.ex b/lib/pleroma/web/twitter_api/twitter_api.ex index baa3dac96..8f75ecb23 100644 --- a/lib/pleroma/web/twitter_api/twitter_api.ex +++ b/lib/pleroma/web/twitter_api/twitter_api.ex @@ -6,6 +6,8 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPI do alias Pleroma.Web.{OStatus, CommonAPI} alias Pleroma.Formatter import Ecto.Query + alias Pleroma.Web.Streamer + alias Pleroma.Web.MastodonAPI.StatusView @httpoison Application.get_env(:pleroma, :httpoison)