From ac89371d5791b0c12637a8532512a67c3c69650b Mon Sep 17 00:00:00 2001 From: Mitchell Hanberg Date: Fri, 21 Jul 2023 00:14:21 -0400 Subject: [PATCH] fix: correctly recover from crashed runtime This uses a registry to track the runtimes, so that if one crashes and is restarted, it will register itself with the registry and the LSP process doesn't need to track them itself. This also streamlines the logic for waiting for the the runtimes to be ready. --- lib/next_ls.ex | 264 +++++++++++---------- lib/next_ls/extensions/elixir_extension.ex | 2 +- lib/next_ls/lsp_supervisor.ex | 4 +- lib/next_ls/runtime.ex | 20 +- test/next_ls/runtime_test.exs | 47 ++-- test/next_ls_test.exs | 4 +- 6 files changed, 185 insertions(+), 156 deletions(-) diff --git a/lib/next_ls.ex b/lib/next_ls.ex index 01e52783..8f0bc5a8 100644 --- a/lib/next_ls.ex +++ b/lib/next_ls.ex @@ -43,7 +43,7 @@ defmodule NextLS do :runtime_task_supervisor, :dynamic_supervisor, :extensions, - :extension_registry, + :registry, :symbol_table ]) @@ -55,7 +55,8 @@ defmodule NextLS do task_supervisor = Keyword.fetch!(args, :task_supervisor) runtime_task_supervisor = Keyword.fetch!(args, :runtime_task_supervisor) dynamic_supervisor = Keyword.fetch!(args, :dynamic_supervisor) - extension_registry = Keyword.fetch!(args, :extension_registry) + + registry = Keyword.fetch!(args, :registry) extensions = Keyword.get(args, :extensions, [NextLS.ElixirExtension]) cache = Keyword.fetch!(args, :cache) symbol_table = Keyword.fetch!(args, :symbol_table) @@ -72,9 +73,8 @@ defmodule NextLS do task_supervisor: task_supervisor, runtime_task_supervisor: runtime_task_supervisor, dynamic_supervisor: dynamic_supervisor, - extension_registry: extension_registry, + registry: registry, extensions: extensions, - runtime_tasks: nil, ready: false, client_capabilities: nil )} @@ -87,6 +87,8 @@ defmodule NextLS do }, lsp ) do + dbg(workspace_folders) + workspace_folders = if caps.workspace.workspace_folders do workspace_folders @@ -208,39 +210,44 @@ defmodule NextLS do def handle_request(%TextDocumentFormatting{params: %{text_document: %{uri: uri}}}, lsp) do document = lsp.assigns.documents[uri] - {_, %{runtime: runtime}} = - Enum.find(lsp.assigns.runtimes, fn {_name, %{uri: wuri}} -> String.starts_with?(uri, wuri) end) - - with {:ok, {formatter, _}} <- Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}), - {:ok, response} when is_binary(response) or is_list(response) <- - Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do - {:reply, - [ - %TextEdit{ - new_text: IO.iodata_to_binary(response), - range: %Range{ - start: %Position{line: 0, character: 0}, - end: %Position{ - line: length(document), - character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0) - } - } - } - ], lsp} - else - {:error, :not_ready} -> - GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{ - params: %GenLSP.Structures.ShowMessageParams{ - type: GenLSP.Enumerations.MessageType.info(), - message: "The NextLS runtime is still initializing!" - } - }) - - {:reply, nil, lsp} + [resp] = + dispatch(lsp.assigns.registry, :runtimes, fn entries -> + for {runtime, %{uri: wuri}} <- entries, String.starts_with?(uri, wuri) do + with {:ok, {formatter, _}} <- + Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}), + {:ok, response} when is_binary(response) or is_list(response) <- + Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do + {:reply, + [ + %TextEdit{ + new_text: IO.iodata_to_binary(response), + range: %Range{ + start: %Position{line: 0, character: 0}, + end: %Position{ + line: length(document), + character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0) + } + } + } + ], lsp} + else + {:error, :not_ready} -> + GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{ + params: %GenLSP.Structures.ShowMessageParams{ + type: GenLSP.Enumerations.MessageType.info(), + message: "The NextLS runtime is still initializing!" + } + }) + + {:reply, nil, lsp} + + _ -> + {:reply, nil, lsp} + end + end + end) - _ -> - {:reply, nil, lsp} - end + resp end def handle_request(%Shutdown{}, lsp) do @@ -267,87 +274,83 @@ defmodule NextLS do {:ok, _} = DynamicSupervisor.start_child( lsp.assigns.dynamic_supervisor, - {extension, cache: lsp.assigns.cache, registry: lsp.assigns.extension_registry, publisher: self()} + {extension, cache: lsp.assigns.cache, registry: lsp.assigns.registry, publisher: self()} ) end - GenLSP.log(lsp, "[NextLS] Booting runtime...") + GenLSP.log(lsp, "[NextLS] Booting runtimes...") - runtimes = - for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do - token = token() - progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...") + for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do + token = token() + progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...") + parent = self() - {:ok, runtime} = - DynamicSupervisor.start_child( - lsp.assigns.dynamic_supervisor, - {NextLS.Runtime, - task_supervisor: lsp.assigns.runtime_task_supervisor, - extension_registry: lsp.assigns.extension_registry, - working_dir: URI.parse(uri).path, - parent: self(), - logger: lsp.assigns.logger} - ) + dbg(name) - Process.monitor(runtime) - - {name, - %{uri: uri, runtime: runtime, refresh_ref: {token, "NextLS runtime for folder #{name} has initialized!"}}} - end - - lsp = assign(lsp, runtimes: Map.new(runtimes)) - - tasks = - for {name, workspace} <- runtimes do - Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> - with false <- wait_until(fn -> NextLS.Runtime.ready?(workspace.runtime) end) do - GenLSP.error(lsp, "[NextLS] Failed to start runtime for folder #{name}") - raise "Failed to boot runtime" - end + {:ok, runtime} = + DynamicSupervisor.start_child( + lsp.assigns.dynamic_supervisor, + {NextLS.Runtime, + name: name, + task_supervisor: lsp.assigns.runtime_task_supervisor, + registry: lsp.assigns.registry, + working_dir: URI.parse(uri).path, + uri: uri, + parent: self(), + on_initialized: fn status -> + if status == :ready do + progress_end(lsp, token, "NextLS runtime for folder #{name} has initialized!") + GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...") + send(parent, {:runtime_ready, name, self()}) + else + progress_end(lsp, token) + GenLSP.error(lsp, "[NextLS] Runtime for folder #{name} failed to initialize") + end + end, + logger: lsp.assigns.logger} + ) - GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...") + ref = Process.monitor(runtime) - {name, :ready} - end) - end + Process.put(ref, name) - refresh_refs = - tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new() + {name, %{uri: uri, runtime: runtime}} + end - {:noreply, - assign(lsp, - refresh_refs: Map.merge(lsp.assigns.refresh_refs, refresh_refs), - runtime_tasks: tasks - )} + {:noreply, lsp} end def handle_notification(%TextDocumentDidSave{}, %{assigns: %{ready: false}} = lsp) do {:noreply, lsp} end + # TODO: add some test cases for saving files in multiple workspaces def handle_notification( %TextDocumentDidSave{ params: %GenLSP.Structures.DidSaveTextDocumentParams{text: text, text_document: %{uri: uri}} }, %{assigns: %{ready: true}} = lsp ) do - for task <- Task.Supervisor.children(lsp.assigns.task_supervisor), - task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do + for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do Process.exit(task, :kill) end - token = token() + refresh_refs = + dispatch(lsp.assigns.registry, :runtimes, fn entries -> + for {pid, %{name: name, uri: wuri}} <- entries, String.starts_with?(uri, wuri), into: %{} do + token = token() + progress_start(lsp, token, "Compiling...") - progress_start(lsp, token, "Compiling...") - runtimes = Enum.to_list(lsp.assigns.runtimes) + task = + Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> + {name, Runtime.compile(pid)} + end) - tasks = - for {name, r} <- runtimes do - Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> {name, Runtime.compile(r.runtime)} end) - end + {task.ref, {token, "Compiled!"}} + end + end) - refresh_refs = - tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new() + dbg(refresh_refs) {:noreply, lsp @@ -363,8 +366,7 @@ defmodule NextLS do %TextDocumentDidChange{params: %{text_document: %{uri: uri}, content_changes: [%{text: text}]}}, lsp ) do - for task <- Task.Supervisor.children(lsp.assigns.task_supervisor), - task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do + for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do Process.exit(task, :kill) end @@ -420,30 +422,29 @@ defmodule NextLS do {:noreply, lsp} end - def handle_info({ref, resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do - Process.demonitor(ref, [:flush]) - {{token, msg}, refs} = Map.pop(refs, ref) + def handle_info({:runtime_ready, name, runtime_pid}, lsp) do + token = token() + progress_start(lsp, token, "Compiling...") - progress_end(lsp, token, msg) + dbg(name) - lsp = - case resp do - {name, :ready} -> - token = token() - progress_start(lsp, token, "Compiling...") + task = + Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> + {name, Runtime.compile(runtime_pid)} + end) - task = - Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> - {name, Runtime.compile(lsp.assigns.runtimes[name].runtime)} - end) + refresh_refs = Map.put(lsp.assigns.refresh_refs, task.ref, {token, "Compiled!"}) - assign(lsp, ready: true, refresh_refs: Map.put(refs, task.ref, {token, "Compiled!"})) + {:noreply, assign(lsp, ready: true, refresh_refs: refresh_refs)} + end - _ -> - assign(lsp, refresh_refs: refs) - end + def handle_info({ref, _resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do + Process.demonitor(ref, [:flush]) + {{token, msg}, refs} = Map.pop(refs, ref) - {:noreply, lsp} + progress_end(lsp, token, msg) + + {:noreply, assign(lsp, refresh_refs: refs)} end def handle_info({:DOWN, ref, :process, _pid, _reason}, %{assigns: %{refresh_refs: refs}} = lsp) @@ -455,35 +456,20 @@ defmodule NextLS do {:noreply, assign(lsp, refresh_refs: refs)} end - def handle_info({:DOWN, _ref, :process, runtime, _reason}, %{assigns: %{runtimes: runtimes}} = lsp) do - {name, _} = Enum.find(runtimes, fn {_name, %{runtime: r}} -> r == runtime end) + def handle_info({:DOWN, ref, :process, _runtime, _reason}, lsp) do + name = Process.get(ref) + Process.delete(ref) + GenLSP.error(lsp, "[NextLS] The runtime for #{name} has crashed") - {:noreply, assign(lsp, runtimes: Map.drop(runtimes, name))} + {:noreply, lsp} end def handle_info(message, lsp) do - GenLSP.log(lsp, "[NextLS] Unhanded message: #{inspect(message)}") + GenLSP.log(lsp, "[NextLS] Unhandled message: #{inspect(message)}") {:noreply, lsp} end - defp wait_until(cb) do - wait_until(120, cb) - end - - defp wait_until(0, _cb) do - false - end - - defp wait_until(n, cb) do - if cb.() do - true - else - Process.sleep(1000) - wait_until(n - 1, cb) - end - end - defp progress_start(lsp, token, msg) do GenLSP.notify(lsp, %GenLSP.Notifications.DollarProgress{ params: %GenLSP.Structures.ProgressParams{ @@ -527,4 +513,22 @@ defmodule NextLS do defp elixir_kind_to_lsp_kind(kind) when kind in [:def, :defp, :defmacro, :defmacrop], do: GenLSP.Enumerations.SymbolKind.function() + + # NOTE: this is only possible because the registry is not partitioned + # if it is partitioned, then the callback is called multiple times + # and this method of extracting the result doesn't really make sense + defp dispatch(registry, key, callback) do + ref = make_ref() + me = self() + + Registry.dispatch(registry, key, fn entries -> + result = callback.(entries) + + send(me, {ref, result}) + end) + + receive do + {^ref, result} -> result + end + end end diff --git a/lib/next_ls/extensions/elixir_extension.ex b/lib/next_ls/extensions/elixir_extension.ex index e6f894ef..fd36b30a 100644 --- a/lib/next_ls/extensions/elixir_extension.ex +++ b/lib/next_ls/extensions/elixir_extension.ex @@ -18,7 +18,7 @@ defmodule NextLS.ElixirExtension do registry = Keyword.fetch!(args, :registry) publisher = Keyword.fetch!(args, :publisher) - Registry.register(registry, :extension, :elixir) + Registry.register(registry, :extensions, :elixir) {:ok, %{cache: cache, registry: registry, publisher: publisher}} end diff --git a/lib/next_ls/lsp_supervisor.ex b/lib/next_ls/lsp_supervisor.ex index bfb1e7cb..46515228 100644 --- a/lib/next_ls/lsp_supervisor.ex +++ b/lib/next_ls/lsp_supervisor.ex @@ -65,14 +65,14 @@ defmodule NextLS.LSPSupervisor do {GenLSP.Buffer, buffer_opts}, {NextLS.DiagnosticCache, name: :diagnostic_cache}, {NextLS.SymbolTable, name: :symbol_table, path: hidden_folder}, - {Registry, name: NextLS.ExtensionRegistry, keys: :duplicate}, + {Registry, name: NextLS.Registry, keys: :duplicate}, {NextLS, cache: :diagnostic_cache, symbol_table: :symbol_table, task_supervisor: NextLS.TaskSupervisor, runtime_task_supervisor: :runtime_task_supervisor, dynamic_supervisor: NextLS.DynamicSupervisor, - extension_registry: NextLS.ExtensionRegistry} + registry: NextLS.Registry} ] Supervisor.init(children, strategy: :one_for_one) diff --git a/lib/next_ls/runtime.ex b/lib/next_ls/runtime.ex index 90180a9f..1179a217 100644 --- a/lib/next_ls/runtime.ex +++ b/lib/next_ls/runtime.ex @@ -8,7 +8,7 @@ defmodule NextLS.Runtime do |> Path.absname() def start_link(opts) do - GenServer.start_link(__MODULE__, opts, Keyword.take(opts, [:name])) + GenServer.start_link(__MODULE__, opts) end @type mod_fun_arg :: {atom(), atom(), list()} @@ -42,11 +42,16 @@ defmodule NextLS.Runtime do @impl GenServer def init(opts) do sname = "nextls-runtime-#{System.system_time()}" + name = Keyword.fetch!(opts, :name) working_dir = Keyword.fetch!(opts, :working_dir) + uri = Keyword.fetch!(opts, :uri) parent = Keyword.fetch!(opts, :parent) logger = Keyword.fetch!(opts, :logger) task_supervisor = Keyword.fetch!(opts, :task_supervisor) - extension_registry = Keyword.fetch!(opts, :extension_registry) + registry = Keyword.fetch!(opts, :registry) + on_initialized = Keyword.fetch!(opts, :on_initialized) + + Registry.register(registry, :runtimes, %{name: name, uri: uri}) port = Port.open( @@ -102,19 +107,23 @@ defmodule NextLS.Runtime do send(me, {:node, node}) else - _ -> send(me, :cancel) + _ -> + on_initialized.(:error) + send(me, :cancel) end end) {:ok, %{ + name: name, compiler_ref: nil, port: port, task_supervisor: task_supervisor, logger: logger, parent: parent, errors: nil, - extension_registry: extension_registry + registry: registry, + on_initialized: on_initialized }} end @@ -141,7 +150,7 @@ defmodule NextLS.Runtime do Task.Supervisor.async_nolink(state.task_supervisor, fn -> {_, errors} = :rpc.call(node, :_next_ls_private_compiler, :compile, []) - Registry.dispatch(state.extension_registry, :extension, fn entries -> + Registry.dispatch(state.registry, :extensions, fn entries -> for {pid, _} <- entries do send(pid, {:compiler, errors}) end @@ -169,6 +178,7 @@ defmodule NextLS.Runtime do def handle_info({:node, node}, state) do Node.monitor(node, true) + state.on_initialized.(:ready) {:noreply, Map.put(state, :node, node)} end diff --git a/test/next_ls/runtime_test.exs b/test/next_ls/runtime_test.exs index 1e75a9e5..36f6d9ec 100644 --- a/test/next_ls/runtime_test.exs +++ b/test/next_ls/runtime_test.exs @@ -37,32 +37,39 @@ defmodule NextLs.RuntimeTest do recv.(recv) end) - [logger: logger, cwd: Path.absname(tmp_dir)] + me = self() + + on_init = fn _ -> send(me, :ready) end + + [logger: logger, cwd: Path.absname(tmp_dir), on_init: on_init] end - test "returns the response in an ok tuple", %{logger: logger, cwd: cwd} do - start_supervised!({Registry, keys: :unique, name: RuntimeTestRegistry}) + test "returns the response in an ok tuple", %{logger: logger, cwd: cwd, on_init: on_init} do + start_supervised!({Registry, keys: :duplicate, name: RuntimeTest.Registry}) tvisor = start_supervised!(Task.Supervisor) pid = start_supervised!( {Runtime, + name: "my_proj", + on_initialized: on_init, task_supervisor: tvisor, working_dir: cwd, + uri: "file://#{cwd}", parent: self(), logger: logger, - extension_registry: RuntimeTestRegistry} + registry: RuntimeTest.Registry} ) Process.link(pid) - assert wait_for_ready(pid) + assert wait_for_ready() assert {:ok, "\"hi\""} = Runtime.call(pid, {Kernel, :inspect, ["hi"]}) end - test "call returns an error when the runtime is node ready", %{logger: logger, cwd: cwd} do - start_supervised!({Registry, keys: :unique, name: RuntimeTestRegistry}) + test "call returns an error when the runtime is node ready", %{logger: logger, cwd: cwd, on_init: on_init} do + start_supervised!({Registry, keys: :duplicate, name: RuntimeTest.Registry}) tvisor = start_supervised!(Task.Supervisor) @@ -70,10 +77,13 @@ defmodule NextLs.RuntimeTest do start_supervised!( {Runtime, task_supervisor: tvisor, + name: "my_proj", + on_initialized: on_init, working_dir: cwd, + uri: "file://#{cwd}", parent: self(), logger: logger, - extension_registry: RuntimeTestRegistry} + registry: RuntimeTest.Registry} ) Process.link(pid) @@ -81,8 +91,8 @@ defmodule NextLs.RuntimeTest do assert {:error, :not_ready} = Runtime.call(pid, {IO, :puts, ["hi"]}) end - test "compiles the code and returns diagnostics", %{logger: logger, cwd: cwd} do - start_supervised!({Registry, keys: :unique, name: RuntimeTestRegistry}) + test "compiles the code and returns diagnostics", %{logger: logger, cwd: cwd, on_init: on_init} do + start_supervised!({Registry, keys: :duplicate, name: RuntimeTest.Registry}) tvisor = start_supervised!(Task.Supervisor) @@ -90,16 +100,19 @@ defmodule NextLs.RuntimeTest do pid = start_supervised!( {Runtime, + name: "my_proj", + on_initialized: on_init, task_supervisor: tvisor, working_dir: cwd, + uri: "file://#{cwd}", parent: self(), logger: logger, - extension_registry: RuntimeTestRegistry} + registry: RuntimeTest.Registry} ) Process.link(pid) - assert wait_for_ready(pid) + assert wait_for_ready() file = Path.join(cwd, "lib/bar.ex") @@ -133,10 +146,12 @@ defmodule NextLs.RuntimeTest do end) =~ "Connected to node" end - defp wait_for_ready(pid) do - with false <- Runtime.ready?(pid) do - Process.sleep(100) - wait_for_ready(pid) + defp wait_for_ready do + receive do + :ready -> true + after + 10_000 -> + false end end end diff --git a/test/next_ls_test.exs b/test/next_ls_test.exs index 9f4e4d5f..acb2725a 100644 --- a/test/next_ls_test.exs +++ b/test/next_ls_test.exs @@ -867,7 +867,7 @@ defmodule NextLSTest do tvisor = start_supervised!(Supervisor.child_spec(Task.Supervisor, id: :one)) r_tvisor = start_supervised!(Supervisor.child_spec(Task.Supervisor, id: :two)) rvisor = start_supervised!({DynamicSupervisor, [strategy: :one_for_one]}) - start_supervised!({Registry, [keys: :unique, name: Registry.NextLSTest]}) + start_supervised!({Registry, [keys: :duplicate, name: Registry.NextLSTest.Registry]}) extensions = [NextLS.ElixirExtension] cache = start_supervised!(NextLS.DiagnosticCache) symbol_table = start_supervised!({NextLS.SymbolTable, path: tmp_dir}) @@ -877,7 +877,7 @@ defmodule NextLSTest do task_supervisor: tvisor, runtime_task_supervisor: r_tvisor, dynamic_supervisor: rvisor, - extension_registry: Registry.NextLSTest, + registry: Registry.NextLSTest.Registry, extensions: extensions, cache: cache, symbol_table: symbol_table