Skip to content
Open
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
# notification. To prevent flooding the recipients, it will wait for a period
# before it sends the next email (assuming the failure condition persists).
# Changing this setting will affect the frequency of sending.
# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600
# KAFKA_NOTIFICATION_EMBARGO_SECONDS=3600
#
# If the Kafka pipelines failed to persist a message, the message can be
# persisted as JSON to the local file system. To enable this, set
Expand All @@ -282,7 +282,7 @@
# Lightning starts and it must be writable by the user that Lightning runs as.
# KAFKA_ALTERNATE_STORAGE_FILE_PATH=/path/to/alternate/storage
#
# This file to which the registry should be read from. In case the file doesnt
# This file to which the registry should be read from. In case the file doesn't
# exist, Lightning will attempt to fetch the file and write it to the same location.
# For this reason, you have to make sure that the directory exists and it is writable
# ADAPTORS_REGISTRY_JSON_PATH=/path/to/adaptor_registry_cache.json
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ and this project adheres to

### Changed

- Allow instance admins to install credential schemas and update the adaptor
registry on the fly [#3114](https://github.com/OpenFn/lightning/issues/3114),
[#2209](https://github.com/OpenFn/lightning/issues/2209),
[#325](https://github.com/OpenFn/lightning/issues/325),
[#1996](https://github.com/OpenFn/lightning/issues/1996)

### Fixed

- Auto-increment job name when adaptor display name is already used in workflow
Expand Down
107 changes: 107 additions & 0 deletions lib/lightning/adaptor_icons.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
defmodule Lightning.AdaptorIcons do
@moduledoc """
Downloads and installs adaptor icons at runtime.

Fetches a tarball of the OpenFn adaptors repository from GitHub, extracts
icon PNGs, and writes them to the configured icons directory along with
a JSON manifest.
"""

require Logger

@adaptors_tar_url "https://github.com/OpenFn/adaptors/archive/refs/heads/main.tar.gz"

@doc """
Fetches adaptor icons from GitHub and writes them to the icons directory.

Returns `{:ok, manifest}` on success or `{:error, reason}` on failure.
The manifest is a map of adaptor names to their icon paths.
"""
@spec refresh() :: {:ok, map()} | {:error, term()}
def refresh do
target_dir = Application.fetch_env!(:lightning, :adaptor_icons_path)
working_dir = tmp_dir!()

try do
with :ok <- File.mkdir_p(target_dir),
{:ok, body} <- fetch_tarball(),
:ok <- extract_tarball(body, working_dir) do
manifest = save_icons(working_dir, target_dir)

manifest_path = Path.join(target_dir, "adaptor_icons.json")
File.write!(manifest_path, Jason.encode!(manifest))

{:ok, manifest}
end
rescue
error ->
Logger.error("Failed to refresh adaptor icons: #{inspect(error)}")
{:error, error}
after
File.rm_rf(working_dir)
end
end

defp fetch_tarball do
case Tesla.get(build_client(), @adaptors_tar_url) do
{:ok, %{status: 200, body: body}} -> {:ok, body}
{:ok, %{status: status}} -> {:error, "HTTP #{status}"}
{:error, reason} -> {:error, reason}
end
end

defp build_client do
Tesla.client([Tesla.Middleware.FollowRedirects])
end

defp extract_tarball(body, working_dir) do
:erl_tar.extract(
{:binary, body},
[:compressed, cwd: to_charlist(working_dir)]
)
end

defp tmp_dir! do
tmp_dir =
Path.join([
System.tmp_dir!(),
"lightning-adaptor",
"#{System.unique_integer([:positive])}"
])

{:ok, _} = File.rm_rf(tmp_dir)
:ok = File.mkdir_p(tmp_dir)

tmp_dir
end

defp list_icons(working_dir) do
[working_dir, "**", "packages", "*", "assets", "{rectangle,square}.png"]
|> Path.join()
|> Path.wildcard()
end

defp save_icons(working_dir, target_dir) do
working_dir
|> list_icons()
|> Enum.map(fn icon_path ->
[icon_name, "assets", adaptor_name | _rest] =
Path.split(icon_path) |> Enum.reverse()

destination_name = adaptor_name <> "-" <> icon_name
destination_path = Path.join(target_dir, destination_name)
File.cp!(icon_path, destination_path)

%{
adaptor: adaptor_name,
shape: Path.rootname(icon_name),
src: "/images/adaptors" <> "/#{destination_name}"
}
end)
|> Enum.group_by(fn entry -> entry.adaptor end)
|> Enum.into(%{}, fn {adaptor, sources} ->
sources = Map.new(sources, fn entry -> {entry.shape, entry.src} end)
{adaptor, sources}
end)
end
end
71 changes: 71 additions & 0 deletions lib/lightning/adaptor_refresh_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Lightning.AdaptorRefreshWorker do
@moduledoc """
Oban worker that periodically refreshes the adaptor registry, icons,
and credential schemas from their upstream sources.

Scheduled via cron when `ADAPTOR_REFRESH_INTERVAL_HOURS` is configured.
Returns `:ok` even on partial failure since retries are not useful for
transient network issues — the next scheduled run will try again.
"""

use Oban.Worker,
queue: :background,
max_attempts: 1,
unique: [period: 3600]

require Logger

@impl Oban.Worker
def perform(%Oban.Job{}) do
if Lightning.AdaptorRegistry.local_adaptors_enabled?() do
Logger.info("Skipping scheduled adaptor refresh: local adaptors mode")
:ok
else
do_refresh()
end
end

defp do_refresh do
Logger.info("Starting scheduled adaptor refresh")

results = [
{:registry, safe_call(fn -> Lightning.AdaptorRegistry.refresh() end)},
{:icons, safe_call(fn -> Lightning.AdaptorIcons.refresh() end)},
{:schemas, safe_call(fn -> Lightning.CredentialSchemas.refresh() end)}
]

errors =
results
|> Enum.filter(fn {_, result} -> match?({:error, _}, result) end)
|> Enum.map(fn {name, {:error, reason}} -> {name, reason} end)

if errors == [] do
Logger.info("Scheduled adaptor refresh completed successfully")
Lightning.API.broadcast("adaptor:refresh", {:refresh_all, node()})
else
Logger.warning(
"Scheduled adaptor refresh partially failed: #{inspect(errors)}"
)

# Only broadcast to other nodes if at least one refresh succeeded
if length(errors) < length(results) do
Lightning.API.broadcast("adaptor:refresh", {:refresh_all, node()})
end
end

:ok
end

defp safe_call(fun) do
case fun.() do
:ok -> {:ok, :done}
{:ok, _} = ok -> ok
{:error, _} = error -> error
end
rescue
error ->
Logger.error("Adaptor refresh error: #{Exception.message(error)}")

{:error, Exception.message(error)}
end
end
Loading