diff --git a/config/config.exs b/config/config.exs index bd8922b77..0df38d75a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -348,10 +348,10 @@ config :pleroma, Pleroma.Web.Federator.RetryQueue, initial_timeout: 30, max_retries: 5 -config :pleroma, Pleroma.Jobs, - federator_incoming: [max_jobs: 50], - federator_outgoing: [max_jobs: 50], - mailer: [max_jobs: 10] +config :pleroma_job_queue, :queues, + federator_incoming: 50, + federator_outgoing: 50, + mailer: 10 config :pleroma, :fetch_initial_posts, enabled: false, diff --git a/config/test.exs b/config/test.exs index 3691e5bd1..6a7b9067e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -48,7 +48,7 @@ config :web_push_encryption, :vapid_details, config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock -config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2] +config :pleroma_job_queue, disabled: true try do import_config "test.secret.exs" diff --git a/docs/config.md b/docs/config.md index c1246ee25..c206358ab 100644 --- a/docs/config.md +++ b/docs/config.md @@ -253,25 +253,20 @@ You can then do curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## Pleroma.Jobs +## :pleroma_job_queue -A list of job queues and their settings. - -Job queue settings: - -* `max_jobs`: The maximum amount of parallel jobs running at the same time. +[Pleroma Job Queue][https://git.pleroma.social/pleroma/pleroma_job_queue] configuration: a list of queues with maximum concurrent jobs. Example: -```exs -config :pleroma, Pleroma.Jobs, - federator_incoming: [max_jobs: 50], - federator_outgoing: [max_jobs: 50] +```elixir +config :pleroma_job_queue, :queues, + federator_incoming: 50, + federator_outgoing: 50 ``` This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. - ## Pleroma.Web.Federator.RetryQueue * `enabled`: If set to `true`, failed federation jobs will be retried diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index cc81e1805..782d1d589 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -110,7 +110,6 @@ defmodule Pleroma.Application do worker(Pleroma.Web.Federator.RetryQueue, []), worker(Pleroma.Stats, []), worker(Pleroma.Web.Push, []), - worker(Pleroma.Jobs, []), worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) ] ++ streamer_child() ++ diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index f7e3aa78b..b384e6fec 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -6,7 +6,7 @@ defmodule Pleroma.Mailer do use Swoosh.Mailer, otp_app: :pleroma def deliver_async(email, config \\ []) do - Pleroma.Jobs.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) + PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) end def perform(:deliver_async, email, config), do: deliver(email, config) diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex deleted file mode 100644 index 24b7e5e46..000000000 --- a/lib/pleroma/jobs.ex +++ /dev/null @@ -1,152 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Jobs do - @moduledoc """ - A basic job queue - """ - use GenServer - - require Logger - - def init(args) do - {:ok, args} - end - - def start_link do - queues = - Pleroma.Config.get(Pleroma.Jobs) - |> Enum.map(fn {name, _} -> create_queue(name) end) - |> Enum.into(%{}) - - state = %{ - queues: queues, - refs: %{} - } - - GenServer.start_link(__MODULE__, state, name: __MODULE__) - end - - def create_queue(name) do - {name, {:sets.new(), []}} - end - - @doc """ - Enqueues a job. - - Returns `:ok`. - - ## Arguments - - - `queue_name` - a queue name(must be specified in the config). - - `mod` - a worker module (must have `perform` function). - - `args` - a list of arguments for the `perform` function of the worker module. - - `priority` - a job priority (`0` by default). - - ## Examples - - Enqueue `Module.perform/0` with `priority=1`: - - iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) - :ok - - Enqueue `Module.perform(:job_name)` with `priority=5`: - - iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) - :ok - - Enqueue `Module.perform(:another_job, data)` with `priority=1`: - - iex> data = "foobar" - iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) - :ok - - Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: - - iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) - :ok - - """ - - def enqueue(queue_name, mod, args, priority \\ 1) - - if Mix.env() == :test do - def enqueue(_queue_name, mod, args, _priority) do - apply(mod, :perform, args) - end - else - @spec enqueue(atom(), atom(), [any()], integer()) :: :ok - def enqueue(queue_name, mod, args, priority) do - GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) - end - end - - def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do - {running_jobs, queue} = state[:queues][queue_name] - - queue = enqueue_sorted(queue, {mod, args}, priority) - - state = - state - |> update_queue(queue_name, {running_jobs, queue}) - |> maybe_start_job(queue_name, running_jobs, queue) - - {:noreply, state} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - queue_name = state.refs[ref] - - {running_jobs, queue} = state[:queues][queue_name] - - running_jobs = :sets.del_element(ref, running_jobs) - - state = - state - |> remove_ref(ref) - |> update_queue(queue_name, {running_jobs, queue}) - |> maybe_start_job(queue_name, running_jobs, queue) - - {:noreply, state} - end - - def maybe_start_job(state, queue_name, running_jobs, queue) do - if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && - queue != [] do - {{mod, args}, queue} = queue_pop(queue) - {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) - mref = Process.monitor(pid) - - state - |> add_ref(queue_name, mref) - |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) - else - state - end - end - - def enqueue_sorted(queue, element, priority) do - [%{item: element, priority: priority} | queue] - |> Enum.sort_by(fn %{priority: priority} -> priority end) - end - - def queue_pop([%{item: element} | queue]) do - {element, queue} - end - - defp add_ref(state, queue_name, ref) do - refs = Map.put(state[:refs], ref, queue_name) - Map.put(state, :refs, refs) - end - - defp remove_ref(state, ref) do - refs = Map.delete(state[:refs], ref) - Map.put(state, :refs, refs) - end - - defp update_queue(state, queue_name, data) do - queues = Map.put(state[:queues], queue_name, data) - Map.put(state, :queues, queues) - end -end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 5e690ddb8..c47328e13 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -4,7 +4,6 @@ defmodule Pleroma.Web.Federator do alias Pleroma.Activity - alias Pleroma.Jobs alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Relay @@ -31,39 +30,39 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) + PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) end def incoming_ap_doc(params) do - Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) end def publish(activity, priority \\ 1) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) end def publish_single_ap(params) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) end def publish_single_websub(websub) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub]) end def verify_websub(websub) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) end def request_subscription(sub) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) end def refresh_subscriptions do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) end def publish_single_salmon(params) do - Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) + PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) end # Job Worker Callbacks diff --git a/mix.exs b/mix.exs index 661430464..333f21a91 100644 --- a/mix.exs +++ b/mix.exs @@ -93,7 +93,8 @@ defmodule Pleroma.Mixfile do {:timex, "~> 3.5"}, {:auto_linker, git: "https://git.pleroma.social/pleroma/auto_linker.git", - ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"} + ref: "94193ca5f97c1f9fdf3d1469653e2d46fac34bcd"}, + {:pleroma_job_queue, "~> 0.2.0"} ] end diff --git a/mix.lock b/mix.lock index 05eaa1d69..f401258e9 100644 --- a/mix.lock +++ b/mix.lock @@ -50,6 +50,7 @@ "phoenix_ecto": {:hex, :phoenix_ecto, "4.0.0", "c43117a136e7399ea04ecaac73f8f23ee0ffe3e07acfcb8062fe5f4c9f0f6531", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.1", "6668d787e602981f24f17a5fbb69cc98f8ab085114ebfac6cc36e10a90c8e93c", [:mix], [], "hexpm"}, + "pleroma_job_queue": {:hex, :pleroma_job_queue, "0.2.0", "879e660aa1cebe8dc6f0aaaa6aa48b4875e89cd961d4a585fd128e0773b31a18", [:mix], [], "hexpm"}, "plug": {:hex, :plug, "1.7.2", "d7b7db7fbd755e8283b6c0a50be71ec0a3d67d9213d74422d9372effc8e87fd1", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}], "hexpm"}, "plug_cowboy": {:hex, :plug_cowboy, "2.0.1", "d798f8ee5acc86b7d42dbe4450b8b0dadf665ce588236eb0a751a132417a980e", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, diff --git a/test/jobs_test.exs b/test/jobs_test.exs deleted file mode 100644 index d55c86ccc..000000000 --- a/test/jobs_test.exs +++ /dev/null @@ -1,83 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.JobsTest do - use ExUnit.Case, async: true - - alias Jobs.WorkerMock - alias Pleroma.Jobs - - setup do - state = %{ - queues: Enum.into([Jobs.create_queue(:testing)], %{}), - refs: %{} - } - - [state: state] - end - - test "creates queue" do - queue = Jobs.create_queue(:foobar) - - assert {:foobar, set} = queue - assert :set == elem(set, 0) |> elem(0) - end - - test "enqueues an element according to priority" do - queue = [%{item: 1, priority: 2}] - - new_queue = Jobs.enqueue_sorted(queue, 2, 1) - assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - new_queue = Jobs.enqueue_sorted(queue, 2, 3) - assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}] - end - - test "pop first item" do - queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue) - end - - test "enqueue a job", %{state: state} do - assert {:noreply, new_state} = - Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) - - assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state - assert :sets.size(running_jobs) == 1 - assert [ref] = :sets.to_list(running_jobs) - assert %{refs: %{^ref => :testing}} = new_state - end - - test "max jobs setting", %{state: state} do - max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs]) - - {:noreply, state} = - Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} -> - Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) - end) - - assert %{ - queues: %{ - testing: - {running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]} - } - } = state - - assert :sets.size(running_jobs) == max_jobs - end - - test "remove job after it finished", %{state: state} do - {:noreply, new_state} = - Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) - - %{queues: %{testing: {running_jobs, []}}} = new_state - [ref] = :sets.to_list(running_jobs) - - assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} = - Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state) - - assert :sets.size(running_jobs) == 0 - end -end diff --git a/test/support/jobs_worker_mock.ex b/test/support/jobs_worker_mock.ex deleted file mode 100644 index 0fb976d05..000000000 --- a/test/support/jobs_worker_mock.ex +++ /dev/null @@ -1,19 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2019 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Jobs.WorkerMock do - require Logger - - def perform(:test_job, arg, arg2) do - Logger.debug({:perform, :test_job, arg, arg2}) - end - - def perform(:test_job, payload) do - Logger.debug({:perform, :test_job, payload}) - end - - def test_job(payload) do - Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload]) - end -end