Merge streaming.
This commit is contained in:
parent
d2430d5007
commit
4f27d187b3
|
@ -19,7 +19,8 @@ defmodule Pleroma.Application do
|
|||
ttl_interval: 1000,
|
||||
limit: 2500
|
||||
]]),
|
||||
worker(Pleroma.Web.Federator, [])
|
||||
worker(Pleroma.Web.Federator, []),
|
||||
worker(Pleroma.Web.Streamer, [])
|
||||
]
|
||||
|
||||
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
|
||||
|
|
|
@ -2,6 +2,8 @@ defmodule Pleroma.Notification do
|
|||
use Ecto.Schema
|
||||
alias Pleroma.{User, Activity, Notification, Repo}
|
||||
import Ecto.Query
|
||||
alias Pleroma.Web.Streamer
|
||||
alias Pleroma.Web.MastodonAPI.NotificationView
|
||||
|
||||
schema "notifications" do
|
||||
field :seen, :boolean, default: false
|
||||
|
@ -47,8 +49,14 @@ defmodule Pleroma.Notification do
|
|||
# TODO move to sql, too.
|
||||
def create_notification(%Activity{} = activity, %User{} = user) do
|
||||
unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do
|
||||
notification = %Notification{user_id: user.id, activity_id: activity.id}
|
||||
notification = %Notification{user: user, activity: activity}
|
||||
{:ok, notification} = Repo.insert(notification)
|
||||
|
||||
json = NotificationView.render("notification.json", %{notification: notification})
|
||||
|> Poison.encode!
|
||||
|
||||
Streamer.stream(user, %{type: "notification", payload: json})
|
||||
|
||||
notification
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,10 +3,11 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
alias Pleroma.{Repo, Activity, User, Notification}
|
||||
alias Pleroma.Web.OAuth.App
|
||||
alias Pleroma.Web
|
||||
alias Pleroma.Web.MastodonAPI.{StatusView, AccountView}
|
||||
alias Pleroma.Web.MastodonAPI.{StatusView, AccountView, NotificationView}
|
||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||
alias Pleroma.Web.TwitterAPI.TwitterAPI
|
||||
alias Pleroma.Web.CommonAPI
|
||||
alias Pleroma.Web.Streamer
|
||||
import Ecto.Query
|
||||
import Logger
|
||||
|
||||
|
@ -193,23 +194,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|
||||
def notifications(%{assigns: %{user: user}} = conn, params) do
|
||||
notifications = Notification.for_user(user, params)
|
||||
result = Enum.map(notifications, fn (%{id: id, activity: activity, inserted_at: created_at}) ->
|
||||
actor = User.get_cached_by_ap_id(activity.data["actor"])
|
||||
created_at = NaiveDateTime.to_iso8601(created_at)
|
||||
|> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
|
||||
case activity.data["type"] do
|
||||
"Create" ->
|
||||
%{id: id, type: "mention", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: activity, for: user})}
|
||||
"Like" ->
|
||||
liked_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"])
|
||||
%{id: id, type: "favourite", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: liked_activity, for: user})}
|
||||
"Announce" ->
|
||||
announced_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"])
|
||||
%{id: id, type: "reblog", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: announced_activity, for: user})}
|
||||
"Follow" ->
|
||||
%{id: id, type: "follow", created_at: created_at, account: AccountView.render("account.json", %{user: actor})}
|
||||
_ -> nil
|
||||
end
|
||||
result = Enum.map(notifications, fn (notification) ->
|
||||
NotificationView.render("notification.json", %{notification: notification})
|
||||
end)
|
||||
|> Enum.filter(&(&1))
|
||||
|
||||
|
@ -397,6 +383,12 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
|
|||
|> render(StatusView, "index.json", %{activities: activities, for: user, as: :activity})
|
||||
end
|
||||
|
||||
def stream_user(%{assigns: %{user: user}} = conn, _params) do
|
||||
Streamer.add_conn(user, conn)
|
||||
receive do
|
||||
end
|
||||
end
|
||||
|
||||
def relationship_noop(%{assigns: %{user: user}} = conn, %{"id" => id}) do
|
||||
Logger.debug("Unimplemented, returning unmodified relationship")
|
||||
with %User{} = target <- Repo.get(User, id) do
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
defmodule Pleroma.Web.MastodonAPI.NotificationView do
|
||||
use Pleroma.Web, :view
|
||||
alias Pleroma.{User, Activity}
|
||||
alias Pleroma.Web.MastodonAPI.{AccountView, StatusView}
|
||||
|
||||
def render("notification.json", %{notification: notification}) do
|
||||
id = notification.id
|
||||
activity = notification.activity
|
||||
created_at = notification.inserted_at
|
||||
actor = User.get_cached_by_ap_id(activity.data["actor"])
|
||||
created_at = NaiveDateTime.to_iso8601(created_at)
|
||||
|> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
|
||||
case activity.data["type"] do
|
||||
"Create" ->
|
||||
%{id: id, type: "mention", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: activity})}
|
||||
"Like" ->
|
||||
liked_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"])
|
||||
%{id: id, type: "favourite", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: liked_activity})}
|
||||
"Announce" ->
|
||||
announced_activity = Activity.get_create_activity_by_object_ap_id(activity.data["object"])
|
||||
%{id: id, type: "reblog", created_at: created_at, account: AccountView.render("account.json", %{user: actor}), status: StatusView.render("status.json", %{activity: announced_activity})}
|
||||
"Follow" ->
|
||||
%{id: id, type: "follow", created_at: created_at, account: AccountView.render("account.json", %{user: actor})}
|
||||
_ -> nil
|
||||
end
|
||||
end
|
||||
end
|
|
@ -53,6 +53,8 @@ defmodule Pleroma.Web.Router do
|
|||
scope "/api/v1", Pleroma.Web.MastodonAPI do
|
||||
pipe_through :authenticated_api
|
||||
|
||||
get "/streaming/user", MastodonAPIController, :stream_user
|
||||
|
||||
get "/accounts/verify_credentials", MastodonAPIController, :verify_credentials
|
||||
get "/accounts/relationships", MastodonAPIController, :relationships
|
||||
get "/accounts/search", MastodonAPIController, :account_search
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
defmodule Pleroma.Web.Streamer do
|
||||
use GenServer
|
||||
require Logger
|
||||
import Plug.Conn
|
||||
|
||||
def start_link do
|
||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||
end
|
||||
|
||||
def add_conn(user, conn) do
|
||||
GenServer.cast(__MODULE__, %{action: :add, user: user, conn: conn})
|
||||
end
|
||||
|
||||
def stream(user, item) do
|
||||
GenServer.cast(__MODULE__, %{action: :stream, user: user, item: item})
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :stream, user: user, item: item}, users) do
|
||||
if conn = users[user.id] do
|
||||
Logger.debug("Pushing item to #{user.id}, #{user.nickname}")
|
||||
chunk(conn, "event: #{item.type}\ndata: #{item.payload}\n\n")
|
||||
end
|
||||
{:noreply, users}
|
||||
end
|
||||
|
||||
def handle_cast(%{action: :add, user: user, conn: conn}, users) do
|
||||
conn = conn
|
||||
|> put_resp_header("content-type", "text/event-stream")
|
||||
|> send_chunked(200)
|
||||
|
||||
users = Map.put(users, user.id, conn)
|
||||
Logger.debug("Got new conn for user #{user.id}, #{user.nickname}")
|
||||
{:noreply, users}
|
||||
end
|
||||
|
||||
def handle_cast(m, state) do
|
||||
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
|
@ -6,6 +6,8 @@ defmodule Pleroma.Web.TwitterAPI.TwitterAPI do
|
|||
alias Pleroma.Web.{OStatus, CommonAPI}
|
||||
alias Pleroma.Formatter
|
||||
import Ecto.Query
|
||||
alias Pleroma.Web.Streamer
|
||||
alias Pleroma.Web.MastodonAPI.StatusView
|
||||
|
||||
@httpoison Application.get_env(:pleroma, :httpoison)
|
||||
|
||||
|
|
Loading…
Reference in New Issue