diff --git a/README.rst b/README.rst index 10b8e53b0..7fdb2cc44 100644 --- a/README.rst +++ b/README.rst @@ -126,9 +126,9 @@ Install TwinDB Backup. .. code-block:: console # Download the package - wget https://twindb-release.s3.amazonaws.com/twindb-backup/3.3.0/focal/twindb-backup_3.3.0-1_amd64.deb + wget https://twindb-release.s3.amazonaws.com/twindb-backup/3.6.0/focal/twindb-backup_3.6.0-1_amd64.deb # Install TwinDB Backup - apt install ./twindb-backup_3.3.0-1_amd64.deb + apt install ./twindb-backup_3.6.0-1_amd64.deb Configuring TwinDB Backup ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -157,7 +157,7 @@ The package file will be generated in ``omnibus/pkg/``: .. code-block:: console $ ls omnibus/pkg/*.deb - omnibus/pkg/twindb-backup_3.3.0-1_amd64.deb + omnibus/pkg/twindb-backup_3.6.0-1_amd64.deb Once the package is built you can install it with rpm/dpkg or upload it to your repository and install it with apt or yum. diff --git a/docs/azure_worm_compatibility.rst b/docs/azure_worm_compatibility.rst new file mode 100644 index 000000000..a3ba567f2 --- /dev/null +++ b/docs/azure_worm_compatibility.rst @@ -0,0 +1,53 @@ +Azure WORM Compatibility Notes +============================== + +This note captures the phase-2 design direction for running TwinDB against immutable Azure Blob containers after the phase-1 managed identity rollout. + +Current blockers +---------------- + +TwinDB is not WORM-compatible today because it still depends on mutable and destructive blob operations in the Azure destination flow: + +- ``twindb_backup/source/mysql_source.py`` deletes remote backup copies when applying retention. +- ``twindb_backup/source/file_source.py`` deletes remote file backup copies when applying retention. +- ``twindb_backup/backup.py`` deletes binlogs and cleanup copies through the destination object. +- ``twindb_backup/status/base_status.py`` overwrites ``status`` and ``binlog-status`` blobs in place. + +Container-level immutability blocks those delete and overwrite patterns, so authentication changes alone are not enough to make TwinDB WORM-safe. + +Phase-2 decisions +----------------- + +1. Add a provider-managed retention mode. + + In this mode, TwinDB must stop calling ``dst.delete(...)`` for remote retention and cleanup. Azure lifecycle management and immutable storage retention policies become the source of truth for payload expiration. + +2. Move mutable status out of the immutable backup container. + + ``status`` and ``binlog-status`` blobs should live in a separate mutable location. The cleanest follow-on design is a separate Azure status container or destination stanza for metadata, leaving the payload container append-only. + +3. Use finite immutability windows. + + The target model should use time-based retention sized to recovery requirements rather than "infinite retention". That keeps the operational model compatible with Azure lifecycle cleanup once blobs age past the immutability window. + +4. Validate on unlocked non-production storage before any lock decision. + + The first end-to-end WORM validation should use a non-production container with an unlocked immutability policy. Validate backup writes, status writes, restore reads, and lifecycle-driven cleanup behavior before any container is locked. + +Suggested implementation shape +------------------------------ + +The likely code changes for the phase-2 follow-on are: + +- Add a configuration switch such as ``remote_delete = false`` or ``retention_mode = provider_managed`` and thread it into the retention and cleanup call sites. +- Teach the backup/status flow to use a separate mutable Azure location for status metadata. +- Keep the existing phase-1 managed identity authentication path for both payload and status destinations, but allow them to point at different containers. + +Recommended validation order +---------------------------- + +1. Enable managed identity auth first. +2. Pre-create separate payload and status containers. +3. Enable provider-managed retention mode so TwinDB stops issuing remote deletes. +4. Apply a finite, unlocked immutability policy to the payload container. +5. Confirm backups still upload, status metadata still updates, restores still read, and Azure lifecycle cleanup works after the retention window expires. diff --git a/docs/installation.rst b/docs/installation.rst index 8e2aff97e..56674b86d 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -38,7 +38,7 @@ The package file will be generated in ``omnibus/pkg/``: .. code-block:: console $ ls omnibus/pkg/*.deb - omnibus/pkg/twindb-backup_3.3.0-1_amd64.deb + omnibus/pkg/twindb-backup_3.6.0-1_amd64.deb Once the package is built you can install it with rpm/dpkg or upload it to your repository and install it with apt or yum. diff --git a/docs/usage.rst b/docs/usage.rst index f10b0aa09..c565506e7 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -40,6 +40,23 @@ Personally, I added it to skip files ``.gitignore`` would ignore. backup_dirs = /etc /root /home "/path/to/important files" tar_options = --exclude-vcs-ignores +``server_name`` is an optional identifier that TwinDB Backup uses as the per-source +segment of the remote backup path and of the status file. When unset, it defaults +to the local hostname (``socket.gethostname()``), which produces one backup tree +per host. When multiple replicas of a MySQL cluster back up to the same +destination, set ``server_name`` to a cluster-wide identifier so every replica +writes into a single shared path instead of a per-hostname fan-out. On Azure +Blob destinations, setting ``server_name`` also enables a blob-lease-based +single-writer gate so only one replica runs a given backup cycle at a time. + +.. code-block:: ini + + [source] + + backup_dirs = /etc /root /home + backup_mysql = yes + server_name = prod-primary-db + Backup Destination ~~~~~~~~~~~~~~~~~~ @@ -92,16 +109,88 @@ In the ``[s3]`` section you specify Amazon credentials as well as an S3 bucket w Azure Blob Storage ~~~~~~~~~~~~~~~~~~~~ -In the ``[az]`` section you specify Azure credentials as well as Azure Blob Storage container where to store backups. +In the ``[az]`` section you specify Azure authentication as well as the Azure Blob Storage container where to store backups. + +The default mode uses a storage connection string: .. code-block:: ini [az] + auth_mode = connection_string # optional, defaults to connection_string connection_string = "DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" container_name = twindb-backups + create_container_if_missing = true # optional, defaults to true remote_path = /backups/mysql # optional + max_concurrency = 1 # optional + +For Azure VMs, managed identity authentication is also supported: + +.. code-block:: ini + + [az] + + auth_mode = managed_identity + account_url = "https://ACCOUNT_NAME.blob.core.windows.net" + container_name = twindb-backups + # Optional: target a specific user-assigned managed identity. Set at most ONE of: + # managed_identity_resource_id = "/subscriptions/.../userAssignedIdentities/NAME" + # managed_identity_client_id = "00000000-0000-0000-0000-000000000000" + # If neither is set the system-assigned managed identity is used (DefaultAzureCredential). + create_container_if_missing = false # optional, defaults to true + remote_path = /backups/mysql # optional + max_concurrency = 1 # optional + +For Azure VM deployments, the recommended production setup is: + +- Use one **user-assigned managed identity (UAMI) per workload role** and attach it + to every VM that fills that role. Keeping a single stable identity across VM + rebuilds is easier to reason about than per-VM system-assigned identities, and + it lets you scope RBAC to the exact container the workload writes to. +- Prefer ``managed_identity_resource_id`` over ``managed_identity_client_id`` + when a VM has multiple UAMIs attached, or when you want the backup + configuration to be derivable from naming conventions instead of from a + Terraform output. The resource ID is the UAMI's full ARM ID, e.g. + ``/subscriptions//resourceGroups//providers/Microsoft.ManagedIdentity/userAssignedIdentities/``. +- Fall back to the VM's system-assigned managed identity (omit both + ``managed_identity_*`` fields) when a single identity per VM is sufficient. +- Grant the identity the ``Storage Blob Data Contributor`` role scoped to the + target container. TwinDB currently reads, writes, lists, overwrites status + blobs, deletes old backups, and can optionally create the container, so it + still needs a role with blob read/write/delete plus container + read/write permissions. +- Prefer ``create_container_if_missing = false`` when infrastructure pre-creates + the container. This lets operations scope permissions to the existing + container or storage account instead of depending on first-run container + creation. + +Use ``create_container_if_missing = false`` when the container should be pre-provisioned by infrastructure and the backup process should not attempt container creation. + +Validation checklist for the managed identity rollout: + +- From a developer workstation, validate the token-auth code path against an accessible non-production storage account by using ``account_url`` and Microsoft Entra credentials from ``DefaultAzureCredential``. +- From the target Azure VM, validate the production path with no ``connection_string`` configured. Confirm backup upload, list/read operations, status blob updates, and any retention deletes that remain enabled in phase 1. +- If the storage account uses network rules or private endpoints, run the validation from the VM or another allowed network path. Local validation may fail even when the identity and RBAC are correct. + +For the separate immutable-storage follow-on, see ``docs/azure_worm_compatibility.rst``. + +In the ``[az.client]`` section you specify optional Azure Blob Storage client options. + +.. code-block:: ini + + [az.client] + api_version = "2019-02-02" + secondary_hostname = "ACCOUNT_NAME-secondary.blob.core.windows.net" + max_block_size = 4194304 + max_single_put_size = 67108864 + min_large_block_upload_threshold = 4194305 + use_byte_buffer = true + max_page_size = 4194304 + max_single_get_size = 33554432 + max_chunk_get_size = 4194304 + audience = "https://storage.azure.com/" + connection_timeout = 20 Google Cloud Storage ~~~~~~~~~~~~~~~~~~~~ @@ -151,6 +240,7 @@ The ``expire_log_days`` options specifies the retention period for MySQL binlogs mysql_defaults_file = /etc/twindb/my.cnf full_backup = daily expire_log_days = 7 + hostname = localhost # optional, defaults to 127.0.0.1 Backing up MySQL Binlog ----------------------- diff --git a/omnibus/config/projects/twindb-backup.rb b/omnibus/config/projects/twindb-backup.rb index 1d381364e..a92ce9163 100644 --- a/omnibus/config/projects/twindb-backup.rb +++ b/omnibus/config/projects/twindb-backup.rb @@ -23,7 +23,7 @@ # and /opt/twindb-backup on all other platforms install_dir '/opt/twindb-backup' -build_version '3.3.0' +build_version '3.6.0' build_iteration 1 diff --git a/requirements.in b/requirements.in index 21a74946c..ebdbeb118 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,6 @@ #@IgnoreInspection BashAddShebang azure-core ~= 1.24 +azure-identity azure-storage-blob ~= 12.19 Click ~= 8.1 PyMySQL ~= 1.0 diff --git a/requirements.txt b/requirements.txt index ccaaef0f3..97f444c4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.8 +# This file is autogenerated by pip-compile with Python 3.9 # by the following command: # # pip-compile --output-file=requirements.txt requirements.in @@ -7,7 +7,10 @@ azure-core==1.31.0 # via # -r requirements.in + # azure-identity # azure-storage-blob +azure-identity==1.25.3 + # via -r requirements.in azure-storage-blob==12.23.0 # via -r requirements.in bcrypt==4.2.0 @@ -34,8 +37,11 @@ click==8.1.7 # via -r requirements.in cryptography==43.0.1 # via + # azure-identity # azure-storage-blob + # msal # paramiko + # pyjwt datadog==0.50.1 # via -r requirements.in google==3.0.0 @@ -67,6 +73,12 @@ jmespath==1.0.1 # via # boto3 # botocore +msal==1.36.0 + # via + # azure-identity + # msal-extensions +msal-extensions==1.3.1 + # via azure-identity paramiko==3.5.0 # via -r requirements.in proto-plus==1.24.0 @@ -88,6 +100,10 @@ pyasn1-modules==0.4.1 # via google-auth pycparser==2.22 # via cffi +pyjwt[crypto]==2.12.1 + # via + # msal + # pyjwt pymysql==1.1.1 # via -r requirements.in pynacl==1.5.0 @@ -101,6 +117,7 @@ requests==2.31.0 # datadog # google-api-core # google-cloud-storage + # msal rsa==4.9 # via google-auth s3transfer==0.10.2 @@ -119,7 +136,9 @@ statsd-tags==3.2.1.post1 typing-extensions==4.12.2 # via # azure-core + # azure-identity # azure-storage-blob + # pyjwt urllib3==1.26.20 # via # botocore diff --git a/setup.cfg b/setup.cfg index 5cfa9fa51..8ff2d6349 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 3.3.0 +current_version = 3.6.0 commit = True tag = False diff --git a/setup.py b/setup.py index df20a2a27..c44f4dcd5 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setup( name="twindb-backup", - version="3.3.0", + version="3.6.0", description="TwinDB Backup tool for files, MySQL et al.", long_description=readme + "\n\n" + history, author="TwinDB Development Team", diff --git a/support/twindb-backup.cfg b/support/twindb-backup.cfg index e2f9b0e5a..22fdb9ca3 100644 --- a/support/twindb-backup.cfg +++ b/support/twindb-backup.cfg @@ -7,6 +7,12 @@ backup_mysql=no # When backing up files it might be useful to ignore what would .gitignore ignore. # tar_options = --exclude-vcs-ignores --exclude-caches +# server_name overrides the per-source segment of the remote backup path +# (defaults to the local hostname). Set it to a cluster-wide identifier +# so every replica in a MySQL cluster writes to one path instead of one +# per host. Example: +# server_name = prod-primary-db + # Destination [destination] # backup destination can be ssh, s3, gcs @@ -35,9 +41,33 @@ BUCKET=twindb-backups # Azure destination settings -connection_string="DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" +# auth_mode="connection_string" # optional, defaults to connection_string +# connection_string="DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" +# account_url="https://ACCOUNT_NAME.blob.core.windows.net" # required for managed_identity auth container_name=twindb-backups +# Set at most ONE of managed_identity_resource_id / managed_identity_client_id. +# If neither is set, the system-assigned identity is used via DefaultAzureCredential. +# managed_identity_resource_id="/subscriptions/.../providers/Microsoft.ManagedIdentity/userAssignedIdentities/NAME" +# managed_identity_client_id="00000000-0000-0000-0000-000000000000" +# create_container_if_missing=true # optional, defaults to true #remote_path = /backups/mysql # optional +#max_concurrency = 1 # optional + +[az.client] + +# Azure client optional settings + +# api_version="2019-02-02" # optional +# secondary_hostname="ACCOUNT_NAME-secondary.blob.core.windows.net" # optional +# max_block_size=4194304 # optional +# max_single_put_size=67108864 # optional +# min_large_block_upload_threshold=4194305 # optional +# use_byte_buffer=true # optional +# max_page_size=4194304 # optional +# max_single_get_size=33554432 # optional +# max_chunk_get_size=4194304 # optional +# audience="https://storage.azure.com/" # optional +# connection_timeout=20 # optional [gcs] @@ -60,8 +90,8 @@ ssh_key=/root/.ssh/id_rsa # MySQL mysql_defaults_file=/etc/twindb/my.cnf - full_backup=daily +#hostname=localhost # optional, defaults to 127.0.0.1 [retention] diff --git a/tests/unit/backup/test_backup_binlogs.py b/tests/unit/backup/test_backup_binlogs.py index 1be93cfb7..3d7c3c573 100644 --- a/tests/unit/backup/test_backup_binlogs.py +++ b/tests/unit/backup/test_backup_binlogs.py @@ -9,6 +9,10 @@ @mock.patch("twindb_backup.backup.osp") def test_backup_binlogs_returns_if_no_binlogs(mock_osp, mock_save): with mock.patch.object(MySQLClient, "variable", return_value=None): - backup_binlogs("foo", mock.Mock()) + cfg = mock.Mock() + # server_name is used for the BinlogStatus status_directory; it must + # be a real string since it is joined with a filename. + cfg.server_name = "test-host" + backup_binlogs("foo", cfg) assert mock_osp.dirname.call_count == 0 assert mock_save.call_count == 0 diff --git a/tests/unit/configuration/test_mysql.py b/tests/unit/configuration/test_mysql.py index afe1105ce..2765c9502 100644 --- a/tests/unit/configuration/test_mysql.py +++ b/tests/unit/configuration/test_mysql.py @@ -1,10 +1,31 @@ from twindb_backup.configuration.mysql import MySQLConfig -def test_mysql(): +def test_mysql_defaults(): mc = MySQLConfig() assert mc.defaults_file == "/root/.my.cnf" assert mc.full_backup == "daily" + assert mc.expire_log_days == 7 + assert mc.xtrabackup_binary is None + assert mc.xbstream_binary is None + assert mc.hostname == "127.0.0.1" + + +def test_mysql_init(): + mc = MySQLConfig( + mysql_defaults_file="/foo/bar", + full_backup="weekly", + expire_log_days=3, + xtrabackup_binary="/foo/xtrabackup", + xbstream_binary="/foo/xbstream", + hostname="foo", + ) + assert mc.defaults_file == "/foo/bar" + assert mc.full_backup == "weekly" + assert mc.expire_log_days == 3 + assert mc.xtrabackup_binary == "/foo/xtrabackup" + assert mc.xbstream_binary == "/foo/xbstream" + assert mc.hostname == "foo" def test_mysql_set_xtrabackup_binary(): diff --git a/tests/unit/configuration/twindb_backup_config/test_az.py b/tests/unit/configuration/twindb_backup_config/test_az.py new file mode 100644 index 000000000..1a2589ea3 --- /dev/null +++ b/tests/unit/configuration/twindb_backup_config/test_az.py @@ -0,0 +1,104 @@ +from textwrap import dedent + +from twindb_backup.configuration import TwinDBBackupConfig +from twindb_backup.configuration.destinations.az import AUTH_MODE_CONNECTION_STRING, AUTH_MODE_MANAGED_IDENTITY + + +def test_az_connection_string(config_file): + tbc = TwinDBBackupConfig(config_file=str(config_file)) + + assert tbc.az.auth_mode == AUTH_MODE_CONNECTION_STRING + assert ( + tbc.az.connection_string + == "DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" + ) + assert tbc.az.account_url is None + assert tbc.az.container_name == "twindb-backups" + assert tbc.az.managed_identity_client_id is None + assert tbc.az.managed_identity_resource_id is None + assert tbc.az.create_container_if_missing is True + assert tbc.az.remote_path == "backups/mysql" + assert tbc.az.max_concurrency == 1 + + +def test_az_managed_identity(tmpdir): + cfg_file = tmpdir.join("twindb-backup.cfg") + with open(str(cfg_file), "w") as fp: + fp.write( + dedent( + """ + [source] + backup_dirs=/etc + backup_mysql=no + + [destination] + backup_destination=az + + [az] + auth_mode=managed_identity + account_url="https://account-name.blob.core.windows.net" + container_name="twindb-backups" + managed_identity_client_id="test-client-id" + create_container_if_missing=false + remote_path="/managed/identity" + + [az.client] + audience="https://storage.azure.com/" + connection_timeout=20 + """ + ) + ) + + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + + assert tbc.az.auth_mode == AUTH_MODE_MANAGED_IDENTITY + assert tbc.az.connection_string is None + assert tbc.az.account_url == "https://account-name.blob.core.windows.net" + assert tbc.az.container_name == "twindb-backups" + assert tbc.az.managed_identity_client_id == "test-client-id" + assert tbc.az.managed_identity_resource_id is None + assert tbc.az.create_container_if_missing is False + assert tbc.az.remote_path == "managed/identity" + assert tbc.az.client_config.audience == "https://storage.azure.com/" + + +def test_az_managed_identity_resource_id(tmpdir): + resource_id = ( + "/subscriptions/00000000-0000-0000-0000-000000000000" + "/resourceGroups/example-rg" + "/providers/Microsoft.ManagedIdentity/userAssignedIdentities" + "/example-mi" + ) + cfg_file = tmpdir.join("twindb-backup.cfg") + with open(str(cfg_file), "w") as fp: + fp.write( + dedent( + f""" + [source] + backup_dirs=/etc + backup_mysql=no + + [destination] + backup_destination=az + + [az] + auth_mode=managed_identity + account_url="https://account-name.blob.core.windows.net" + container_name="twindb-backups" + managed_identity_resource_id="{resource_id}" + create_container_if_missing=false + remote_path="/backups/mysql" + """ + ) + ) + + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + + assert tbc.az.auth_mode == AUTH_MODE_MANAGED_IDENTITY + assert tbc.az.connection_string is None + assert tbc.az.account_url == "https://account-name.blob.core.windows.net" + assert tbc.az.container_name == "twindb-backups" + assert tbc.az.managed_identity_client_id is None + assert tbc.az.managed_identity_resource_id == resource_id + assert tbc.az.create_container_if_missing is False + assert tbc.az.remote_path == "backups/mysql" diff --git a/tests/unit/configuration/twindb_backup_config/test_server_name.py b/tests/unit/configuration/twindb_backup_config/test_server_name.py new file mode 100644 index 000000000..5a7741b01 --- /dev/null +++ b/tests/unit/configuration/twindb_backup_config/test_server_name.py @@ -0,0 +1,78 @@ +"""Tests for TwinDBBackupConfig.server_name.""" + +import socket +from textwrap import dedent + +from twindb_backup.configuration import TwinDBBackupConfig + + +def _write_config(tmpdir, body): + cfg_file = tmpdir.join("twindb-backup.cfg") + with open(str(cfg_file), "w", encoding="utf-8") as fh: + fh.write(dedent(body)) + return cfg_file + + +def test_server_name_defaults_to_hostname(tmpdir): + """Without [source] server_name, we fall back to gethostname().""" + cfg_file = _write_config( + tmpdir, + """ + [source] + backup_mysql=no + """, + ) + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + assert tbc.server_name == socket.gethostname() + + +def test_server_name_from_config(tmpdir): + cfg_file = _write_config( + tmpdir, + """ + [source] + backup_mysql=no + server_name=prod-primary-db + """, + ) + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + assert tbc.server_name == "prod-primary-db" + + +def test_server_name_strips_quotes_and_whitespace(tmpdir): + cfg_file = _write_config( + tmpdir, + """ + [source] + backup_mysql=no + server_name = " prod-primary-db " + """, + ) + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + assert tbc.server_name == "prod-primary-db" + + +def test_server_name_empty_value_falls_back_to_hostname(tmpdir): + cfg_file = _write_config( + tmpdir, + """ + [source] + backup_mysql=no + server_name = + """, + ) + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + assert tbc.server_name == socket.gethostname() + + +def test_server_name_without_source_section(tmpdir): + """Missing [source] section must not crash server_name lookup.""" + cfg_file = _write_config( + tmpdir, + """ + [destination] + backup_destination=s3 + """, + ) + tbc = TwinDBBackupConfig(config_file=str(cfg_file)) + assert tbc.server_name == socket.gethostname() diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 9c2aaea9b..30422753d 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -32,6 +32,20 @@ def config_content(): connection_string="DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" container_name="twindb-backups" remote_path="/backups/mysql" +max_concurrency=1 + +[az.client] +api_version="2019-02-02" +secondary_hostname="ACCOUNT_NAME-secondary.blob.core.windows.net" +max_block_size=4194304 +max_single_put_size=67108864 +min_large_block_upload_threshold=4194305 +use_byte_buffer=true +max_page_size=4194304 +max_single_get_size=33554432 +max_chunk_get_size=4194304 +audience="https://storage.azure.com/" +connection_timeout=20 [gcs] GC_CREDENTIALS_FILE="XXXXX" diff --git a/tests/unit/destination/az/test_cluster_lock.py b/tests/unit/destination/az/test_cluster_lock.py new file mode 100644 index 000000000..19f690489 --- /dev/null +++ b/tests/unit/destination/az/test_cluster_lock.py @@ -0,0 +1,159 @@ +"""Tests for AZ.cluster_lock context manager. + +These tests exercise the lease acquisition/release logic without hitting +Azure. Each test patches the ``BlobLeaseClient`` so we can assert the +outcomes deterministically. +""" + +from unittest.mock import MagicMock, patch + +import azure.core.exceptions as ae +import pytest + +from twindb_backup.destination.az import _CLUSTER_LOCK_BLOB +from twindb_backup.destination.base_destination import ClusterLock + +from .util import mocked_az + + +def _make_http_error(error_code): + err = ae.HttpResponseError(message=error_code) + err.error_code = error_code + return err + + +def test_cluster_lock_acquired_creates_blob_and_lease(): + """Happy path: lease is acquired, blob is created, lease is released.""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db", ttl=30) as lock: + assert isinstance(lock, ClusterLock) + assert lock.acquired is True + + blob_client.upload_blob.assert_called_once() + args, kwargs = blob_client.upload_blob.call_args + assert args[0] == b"" + assert kwargs.get("overwrite") is False + + c.container_client.get_blob_client.assert_called_once() + assert c.container_client.get_blob_client.call_args[0][0].endswith( + f"prod-primary-db/{_CLUSTER_LOCK_BLOB}" + ) + + lease.acquire.assert_called_once_with(lease_duration=30) + lease.release.assert_called_once() + + +def test_cluster_lock_tolerates_existing_blob(): + """If the lease blob already exists we do not raise.""" + c = mocked_az() + blob_client = MagicMock() + blob_client.upload_blob.side_effect = ae.ResourceExistsError("BlobAlreadyExists") + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db") as lock: + assert lock.acquired is True + + lease.acquire.assert_called_once() + lease.release.assert_called_once() + + +def test_cluster_lock_held_by_another_member_yields_not_acquired(): + """LeaseAlreadyPresent → acquired=False and we do not release.""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + lease.acquire.side_effect = _make_http_error("LeaseAlreadyPresent") + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db") as lock: + assert lock.acquired is False + + lease.acquire.assert_called_once() + lease.release.assert_not_called() + + +def test_cluster_lock_transient_error_proceeds_without_coordination(): + """Any non-LeaseAlreadyPresent Azure error must not block the backup.""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + lease.acquire.side_effect = _make_http_error("ServerBusy") + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db") as lock: + # We fail open: backup still runs, just without coordination. + assert lock.acquired is True + + lease.acquire.assert_called_once() + lease.release.assert_not_called() + + +def test_cluster_lock_ttl_is_clamped(): + """Azure caps blob leases at 60s; values are clamped into [15, 60].""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db", ttl=5): + pass + lease.acquire.assert_called_with(lease_duration=15) + + lease.reset_mock() + with c.cluster_lock("prod-primary-db", ttl=999): + pass + lease.acquire.assert_called_with(lease_duration=60) + + +def test_cluster_lock_release_failure_is_swallowed(): + """A best-effort release must not mask a successful backup.""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + lease.release.side_effect = ae.HttpResponseError("boom") + mock_lease_cls.return_value = lease + + with c.cluster_lock("prod-primary-db") as lock: + assert lock.acquired is True + + lease.release.assert_called_once() + + +def test_cluster_lock_exception_inside_context_still_releases(): + """If the backup body raises, we still attempt to release the lease.""" + c = mocked_az() + blob_client = MagicMock() + c.container_client.get_blob_client = MagicMock(return_value=blob_client) + + with patch("twindb_backup.destination.az.BlobLeaseClient") as mock_lease_cls: + lease = MagicMock() + mock_lease_cls.return_value = lease + + with pytest.raises(RuntimeError): + with c.cluster_lock("prod-primary-db"): + raise RuntimeError("simulated backup failure") + + lease.release.assert_called_once() diff --git a/tests/unit/destination/az/test_config.py b/tests/unit/destination/az/test_config.py index 8fa56c8f1..ccca4eff7 100644 --- a/tests/unit/destination/az/test_config.py +++ b/tests/unit/destination/az/test_config.py @@ -1,37 +1,267 @@ +from dataclasses import asdict + import pytest -from twindb_backup.configuration.destinations.az import AZConfig +from twindb_backup.configuration.destinations.az import ( + AUTH_MODE_CONNECTION_STRING, + AUTH_MODE_MANAGED_IDENTITY, + AZClientConfig, + AZConfig, + drop_empty_dict_factory, +) -from .util import AZConfigParams +from .util import AZClientConfigParams, AZConfigParams def test_initialization_success(): """Test initialization of AZConfig with all parameters set.""" - p = AZConfigParams() - c = AZConfig(**dict(p)) - assert c.connection_string == p.connection_string - assert c.container_name == p.container_name - assert c.chunk_size == p.chunk_size - assert c.remote_path == p.remote_path + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + + c = AZConfig(client_config=client_config, **dict(config_params)) + + # AZConfig Assertions + assert c.client_config == client_config + assert c.auth_mode == config_params.auth_mode + assert c.connection_string == config_params.connection_string + assert c.account_url == None + assert c.container_name == config_params.container_name + assert c.managed_identity_client_id == None + assert c.managed_identity_resource_id == None + assert c.create_container_if_missing == config_params.create_container_if_missing + assert ( + c.remote_path == config_params.remote_path.strip("/") + if config_params.remote_path != "/" + else config_params.remote_path + ) + assert c.max_concurrency == config_params.max_concurrency + + # AZClientConfig Assertions + assert c.client_config.api_version == client_params.api_version + assert c.client_config.secondary_hostname == client_params.secondary_hostname + assert c.client_config.max_block_size == client_params.max_block_size + assert c.client_config.max_single_put_size == client_params.max_single_put_size + assert c.client_config.min_large_block_upload_threshold == client_params.min_large_block_upload_threshold + assert c.client_config.use_byte_buffer == client_params.use_byte_buffer + assert c.client_config.max_page_size == client_params.max_page_size + assert c.client_config.max_single_get_size == client_params.max_single_get_size + assert c.client_config.max_chunk_get_size == client_params.max_chunk_get_size + assert c.client_config.audience == client_params.audience + assert c.client_config.connection_timeout == client_params.connection_timeout def test_initialization_success_defaults(): """Test initialization of AZConfig with only required parameters set and ensure default values.""" - p = AZConfigParams(only_required=True) - c = AZConfig(**dict(p)) - assert c.connection_string == p.connection_string - assert c.container_name == p.container_name - assert c.chunk_size == 4 * 1024 * 1024 + client_params = AZClientConfigParams(only_required=True) + config_params = AZConfigParams(only_required=True) + client_config = AZClientConfig(**dict(client_params)) + + c = AZConfig(client_config=client_config, **dict(config_params)) + + # AZConfig Assertions + assert c.client_config == client_config + assert c.auth_mode == AUTH_MODE_CONNECTION_STRING + assert c.connection_string == config_params.connection_string + assert c.account_url == None + assert c.container_name == config_params.container_name + assert c.managed_identity_client_id == None + assert c.managed_identity_resource_id == None + assert c.create_container_if_missing == True + assert c.remote_path == "/" + assert c.max_concurrency == 1 + + # AZClientConfig Assertions + assert c.client_config.api_version == None + assert c.client_config.secondary_hostname == None + assert c.client_config.max_block_size == 4 * 1024 * 1024 # 4MB + assert c.client_config.max_single_put_size == 64 * 1024 * 1024 # 64MB + assert c.client_config.min_large_block_upload_threshold == (4 * 1024 * 1024) + 1 # 4MB + 1 + assert c.client_config.use_byte_buffer == False + assert c.client_config.max_page_size == 4 * 1024 * 1024 # 4MB + assert c.client_config.max_single_get_size == 32 * 1024 * 1024 # 32MB + assert c.client_config.max_chunk_get_size == 4 * 1024 * 1024 # 4MB + assert c.client_config.audience == None + assert c.client_config.connection_timeout == 20 + + +def test_initialization_success_managed_identity(): + """Test initialization of AZConfig for managed identity auth.""" + client_params = AZClientConfigParams(only_required=True) + config_params = AZConfigParams( + only_required=True, + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + managed_identity_client_id="test-client-id", + create_container_if_missing=False, + ) + client_config = AZClientConfig(**dict(client_params)) + + c = AZConfig(client_config=client_config, **dict(config_params)) + + assert c.client_config == client_config + assert c.auth_mode == AUTH_MODE_MANAGED_IDENTITY + assert c.connection_string == None + assert c.account_url == config_params.account_url + assert c.container_name == config_params.container_name + assert c.managed_identity_client_id == "test-client-id" + assert c.managed_identity_resource_id == None + assert c.create_container_if_missing == False assert c.remote_path == "/" + assert c.max_concurrency == 1 + + +def test_initialization_success_managed_identity_resource_id(): + """Test initialization of AZConfig for managed identity auth with resource_id.""" + resource_id = ( + "/subscriptions/00000000-0000-0000-0000-000000000000" + "/resourceGroups/example-rg" + "/providers/Microsoft.ManagedIdentity/userAssignedIdentities" + "/example-mi" + ) + client_params = AZClientConfigParams(only_required=True) + config_params = AZConfigParams( + only_required=True, + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + managed_identity_resource_id=resource_id, + create_container_if_missing=False, + ) + client_config = AZClientConfig(**dict(client_params)) + + c = AZConfig(client_config=client_config, **dict(config_params)) + + assert c.auth_mode == AUTH_MODE_MANAGED_IDENTITY + assert c.managed_identity_client_id == None + assert c.managed_identity_resource_id == resource_id def test_invalid_params(): """Test initialization of AZConfig with invalid parameters.""" - with pytest.raises(ValueError): + + # Invalidate AZConfig + with pytest.raises(ValueError): # Invalid client_config + AZConfig(client_config={}, container_name="test_container", connection_string="test_connection_string") + with pytest.raises(ValueError): # Invalid connection_string + AZConfig(client_config=AZClientConfig(), container_name="test_container", connection_string=123) + with pytest.raises(ValueError): # Missing connection_string for connection string auth + AZConfig(client_config=AZClientConfig(), container_name="test_container") + with pytest.raises(ValueError): # Invalid auth_mode + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + auth_mode="service_principal", + connection_string="test_connection_string", + ) + with pytest.raises(ValueError): # Missing account_url for managed identity auth + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + ) + with pytest.raises(ValueError): # Invalid account_url + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + account_url=123, + ) + with pytest.raises(ValueError): # Invalid managed_identity_client_id AZConfig( - connection_string="test_connection_string", container_name="test_container", chunk_size="invalid_chunk_size" + client_config=AZClientConfig(), + container_name="test_container", + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + account_url="https://account-name.blob.core.windows.net", + managed_identity_client_id=123, ) - with pytest.raises(ValueError): - AZConfig(connection_string="test_connection_string", container_name="test_container", remote_path=1) - with pytest.raises(TypeError): - AZConfig(connection_string="test_connection_string") + with pytest.raises(ValueError): # Invalid managed_identity_resource_id + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + account_url="https://account-name.blob.core.windows.net", + managed_identity_resource_id=123, + ) + with pytest.raises(ValueError, match="mutually exclusive"): # Both client_id and resource_id set + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + account_url="https://account-name.blob.core.windows.net", + managed_identity_client_id="test-client-id", + managed_identity_resource_id="/subscriptions/abc/resourceGroups/rg/providers/Microsoft.ManagedIdentity/userAssignedIdentities/name", + ) + with pytest.raises(ValueError): # Invalid remote_path + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + connection_string="test_connection_string", + remote_path=1, + ) + with pytest.raises(ValueError): # Invalid container_name + AZConfig(client_config=AZClientConfig(), container_name=1, connection_string="test_connection_string") + with pytest.raises(ValueError): # Invalid create_container_if_missing + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + connection_string="test_connection_string", + create_container_if_missing="true", + ) + with pytest.raises(ValueError): # Invalid max_concurrency + AZConfig( + client_config=AZClientConfig(), + container_name="test_container", + connection_string="test_connection_string", + max_concurrency="1", + ) + + # Invalidate AZClientConfig + with pytest.raises(ValueError): # Invalid api_version + AZClientConfig(api_version=123) + with pytest.raises(ValueError): # Invalid secondary_hostname + AZClientConfig(secondary_hostname=123) + with pytest.raises(ValueError): # Invalid max_block_size + AZClientConfig(max_block_size="123") + with pytest.raises(ValueError): # Invalid max_single_put_size + AZClientConfig(max_single_put_size="123") + with pytest.raises(ValueError): # Invalid min_large_block_upload_threshold + AZClientConfig(min_large_block_upload_threshold="123") + with pytest.raises(ValueError): # Invalid use_byte_buffer + AZClientConfig(use_byte_buffer="123") + with pytest.raises(ValueError): # Invalid max_page_size + AZClientConfig(max_page_size="123") + with pytest.raises(ValueError): # Invalid max_single_get_size + AZClientConfig(max_single_get_size="123") + with pytest.raises(ValueError): # Invalid max_chunk_get_size + AZClientConfig(max_chunk_get_size="123") + with pytest.raises(ValueError): # Invalid audience + AZClientConfig(audience=123) + with pytest.raises(ValueError): # Invalid connection_timeout + AZClientConfig(connection_timeout="123") + + +def test_drop_empty_dicts_some_undefined(): + """Test drop_empty_dict_factory helper function.""" + + client_config = AZClientConfig(**dict(AZClientConfigParams(only_required=True))) + + # Convert to dict and drop attributes with None values + client_config_dict = asdict(client_config, dict_factory=drop_empty_dict_factory) + + # Assert that the dict does not contain any None values + assert "api_version" not in client_config_dict + assert "secondary_hostname" not in client_config_dict + assert "audience" not in client_config_dict + + +def test_drop_empty_dicts_all_defined(): + """Test drop_empty_dict_factory helper function doesn't drop any attributes when all are defined.""" + + client_config = AZClientConfig(**dict(AZClientConfigParams())) + + # Convert to dict and drop attributes with None values + client_config_dict_drop_empty = asdict(client_config, dict_factory=drop_empty_dict_factory) + + # Convert to dict + client_config_dict = asdict(client_config) + + # Assert that the dicts are the same + assert client_config_dict == client_config_dict_drop_empty diff --git a/tests/unit/destination/az/test_delete.py b/tests/unit/destination/az/test_delete.py index 357ac84f0..f2456815f 100644 --- a/tests/unit/destination/az/test_delete.py +++ b/tests/unit/destination/az/test_delete.py @@ -10,14 +10,14 @@ def test_delete_success(): c = mocked_az() c.delete("test") - c._container_client.delete_blob.assert_called_once_with(c.render_path("test")) + c.container_client.delete_blob.assert_called_once_with(c.render_path("test")) def test_delete_fail(): """Tests AZ.delete method, re-raising an exception on failure""" c = mocked_az() - c._container_client.delete_blob.side_effect = Exception() + c.container_client.delete_blob.side_effect = Exception() with pytest.raises(Exception): c.delete("test") - c._container_client.delete_blob.assert_called_once_with(c.render_path("test")) + c.container_client.delete_blob.assert_called_once_with(c.render_path("test")) diff --git a/tests/unit/destination/az/test_download_to_pipe.py b/tests/unit/destination/az/test_download_to_pipe.py index 837da0c40..7f8c63eea 100644 --- a/tests/unit/destination/az/test_download_to_pipe.py +++ b/tests/unit/destination/az/test_download_to_pipe.py @@ -15,13 +15,15 @@ def test_download_to_pipe_success(): c = mocked_az() mc_dbr = MagicMock() - c._container_client.download_blob.return_value = mc_dbr + c.container_client.download_blob.return_value = mc_dbr c._download_to_pipe(c.render_path("foo-key"), 100, 200) mc_os.close.assert_called_once_with(100) mc_os.fdopen.assert_called_once_with(200, "wb") - c._container_client.download_blob.assert_called_once_with(c.render_path("foo-key")) + c.container_client.download_blob.assert_called_once_with( + c.render_path("foo-key"), max_concurrency=c.config.max_concurrency + ) mc_dbr.readinto.assert_called_once_with(mc_fdopen.__enter__()) @@ -30,11 +32,13 @@ def test_download_to_pipe_fail(): with patch("twindb_backup.destination.az.os") as mc_os: c = mocked_az() - c._container_client.download_blob.side_effect = ae.HttpResponseError() + c.container_client.download_blob.side_effect = ae.HttpResponseError() with pytest.raises(Exception): c._download_to_pipe(c.render_path("foo-key"), 100, 200) mc_os.close.assert_called_once_with(100) mc_os.fdopen.assert_called_once_with(200, "wb") - c._container_client.download_blob.assert_called_once_with(c.render_path("foo-key")) + c.container_client.download_blob.assert_called_once_with( + c.render_path("foo-key"), max_concurrency=c.config.max_concurrency + ) diff --git a/tests/unit/destination/az/test_init.py b/tests/unit/destination/az/test_init.py index 9361533c0..ab048cd21 100644 --- a/tests/unit/destination/az/test_init.py +++ b/tests/unit/destination/az/test_init.py @@ -1,4 +1,4 @@ -import socket +from dataclasses import asdict from unittest.mock import MagicMock, patch import azure.core.exceptions as ae @@ -6,23 +6,29 @@ from azure.storage.blob import ContainerClient import twindb_backup.destination.az as az +from twindb_backup.configuration.destinations.az import ( + AUTH_MODE_MANAGED_IDENTITY, + AZClientConfig, + AZConfig, + drop_empty_dict_factory, +) -from .util import AZParams +from .util import AZClientConfigParams, AZConfigParams def test_init_param(): """Test initialization of AZ with all parameters set, mocking the _connect method.""" with patch("twindb_backup.destination.az.AZ._connect") as mc: mc.return_value = MagicMock(spec=ContainerClient) - p = AZParams() - c = az.AZ(**dict(p)) - - assert c._container_name == p.container_name - assert c._connection_string == p.connection_string - assert c._hostname == p.hostname - assert c._chunk_size == p.chunk_size - assert c._remote_path == p.remote_path - assert isinstance(c._container_client, ContainerClient) + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) + + assert isinstance(c.container_client, ContainerClient) az.AZ._connect.assert_called_once() @@ -30,15 +36,15 @@ def test_init_param_defaults(): """Test initialization of AZ with only required parameters set, ensuring default values, mocking the _connect method.""" with patch("twindb_backup.destination.az.AZ._connect") as mc: mc.return_value = MagicMock(spec=ContainerClient) - p = AZParams(only_required=True) - c = az.AZ(**dict(p)) - - assert c._container_name == p.container_name - assert c._connection_string == p.connection_string - assert c._hostname == socket.gethostname() - assert c._chunk_size == 4 * 1024 * 1024 - assert c._remote_path == "/" - assert isinstance(c._container_client, ContainerClient) + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) + + assert isinstance(c.container_client, ContainerClient) az.AZ._connect.assert_called_once() @@ -46,21 +52,31 @@ def test_init_conn_string_valid(): """Test initialization of AZ with valid connection string.""" with patch("twindb_backup.destination.az.ContainerClient.exists") as mc: mc.return_value = True - p = AZParams() - c = az.AZ(**dict(p)) + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) az.ContainerClient.exists.assert_called_once() - assert isinstance(c._container_client, ContainerClient) + assert isinstance(c.container_client, ContainerClient) def test_init_conn_string_invalid(): """Test initialization of AZ with invalid connection string, expecting ValueError.""" with patch("twindb_backup.destination.az.ContainerClient.exists") as mc: mc.return_value = True - p = AZParams() - p.connection_string = "invalid_connection_string" + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + config.connection_string = "invalid_connection_string" + with pytest.raises(ValueError, match="Connection string is either blank or malformed."): - _ = az.AZ(**dict(p)) + _ = az.AZ(config=config) def test_init_container_not_exists(): @@ -69,12 +85,17 @@ def test_init_container_not_exists(): mc.return_value = False with patch("twindb_backup.destination.az.ContainerClient.create_container") as mc_create_container: mc_create_container.return_value = MagicMock(spec=ContainerClient) - p = AZParams() - c = az.AZ(**dict(p)) + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) az.ContainerClient.exists.assert_called_once() az.ContainerClient.create_container.assert_called_once() - assert isinstance(c._container_client, ContainerClient) + assert isinstance(c.container_client, ContainerClient) def test_init_container_create_fails(): @@ -84,23 +105,145 @@ def test_init_container_create_fails(): with patch("twindb_backup.destination.az.ContainerClient.create_container") as mc_create_container: mc_create_container.side_effect = ae.HttpResponseError() - p = AZParams() + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + with pytest.raises(Exception): - c = az.AZ(**dict(p)) + c = az.AZ(config=config) az.ContainerClient.exists.assert_called_once() az.ContainerClient.create_container.assert_called_once() - assert isinstance(c._container_client, ContainerClient) + assert isinstance(c.container_client, ContainerClient) + + +def test_init_container_create_disabled(): + """Test initialization of AZ with missing container and create disabled.""" + with patch("twindb_backup.destination.az.ContainerClient.exists") as mc: + mc.return_value = False + with patch("twindb_backup.destination.az.ContainerClient.create_container") as mc_create_container: + client_params = AZClientConfigParams() + config_params = AZConfigParams(create_container_if_missing=False) + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + with pytest.raises(ValueError, match="create_container_if_missing is disabled"): + _ = az.AZ(config=config) + + az.ContainerClient.exists.assert_called_once() + az.ContainerClient.create_container.assert_not_called() def test_init_success(): """Test initialization of AZ with existing container, mocking the from_connection_string method.""" with patch("twindb_backup.destination.az.ContainerClient.from_connection_string") as mc: mc.return_value = MagicMock(spec=ContainerClient) - p = AZParams() - c = az.AZ(**dict(p)) - az.ContainerClient.from_connection_string.assert_called_once_with(p.connection_string, p.container_name) + client_params = AZClientConfigParams() + config_params = AZConfigParams() + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) + + az.ContainerClient.from_connection_string.assert_called_once_with( + conn_str=config.connection_string, + container_name=config.container_name, + **asdict(config.client_config, dict_factory=drop_empty_dict_factory) + ) mc.return_value.exists.assert_called_once() mc.return_value.create_container.assert_not_called() - assert isinstance(c._container_client, ContainerClient) + assert isinstance(c.container_client, ContainerClient) + + +def test_init_managed_identity_success(): + """Test initialization of AZ with managed identity auth (no client/resource id → DefaultAzureCredential).""" + with patch("twindb_backup.destination.az.DefaultAzureCredential") as mc_default_credential: + credential = MagicMock(name="credential") + mc_default_credential.return_value = credential + + with patch("twindb_backup.destination.az.ManagedIdentityCredential") as mc_mi_credential: + with patch("twindb_backup.destination.az.ContainerClient") as mc_container_client: + client = MagicMock(spec=ContainerClient) + client.exists.return_value = True + mc_container_client.return_value = client + + client_params = AZClientConfigParams() + config_params = AZConfigParams(only_required=True, auth_mode=AUTH_MODE_MANAGED_IDENTITY) + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + c = az.AZ(config=config) + + mc_default_credential.assert_called_once_with() + mc_mi_credential.assert_not_called() + mc_container_client.assert_called_once_with( + account_url=config.account_url, + container_name=config.container_name, + credential=credential, + **asdict(config.client_config, dict_factory=drop_empty_dict_factory), + ) + client.exists.assert_called_once() + client.create_container.assert_not_called() + assert c.container_client == client + + +def test_init_managed_identity_user_assigned_client_id(): + """Test initialization of AZ with a user-assigned managed identity by client_id.""" + with patch("twindb_backup.destination.az.ManagedIdentityCredential") as mc_mi_credential: + credential = MagicMock(name="credential") + mc_mi_credential.return_value = credential + + with patch("twindb_backup.destination.az.DefaultAzureCredential") as mc_default_credential: + with patch("twindb_backup.destination.az.ContainerClient") as mc_container_client: + client = MagicMock(spec=ContainerClient) + client.exists.return_value = True + mc_container_client.return_value = client + + client_params = AZClientConfigParams(only_required=True) + config_params = AZConfigParams( + only_required=True, + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + managed_identity_client_id="test-client-id", + ) + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + _ = az.AZ(config=config) + + mc_mi_credential.assert_called_once_with(client_id="test-client-id") + mc_default_credential.assert_not_called() + + +def test_init_managed_identity_user_assigned_resource_id(): + """Test initialization of AZ with a user-assigned managed identity by resource_id.""" + resource_id = ( + "/subscriptions/00000000-0000-0000-0000-000000000000" + "/resourceGroups/example-rg" + "/providers/Microsoft.ManagedIdentity/userAssignedIdentities" + "/example-mi" + ) + with patch("twindb_backup.destination.az.ManagedIdentityCredential") as mc_mi_credential: + credential = MagicMock(name="credential") + mc_mi_credential.return_value = credential + + with patch("twindb_backup.destination.az.DefaultAzureCredential") as mc_default_credential: + with patch("twindb_backup.destination.az.ContainerClient") as mc_container_client: + client = MagicMock(spec=ContainerClient) + client.exists.return_value = True + mc_container_client.return_value = client + + client_params = AZClientConfigParams(only_required=True) + config_params = AZConfigParams( + only_required=True, + auth_mode=AUTH_MODE_MANAGED_IDENTITY, + managed_identity_resource_id=resource_id, + ) + client_config = AZClientConfig(**dict(client_params)) + config = AZConfig(client_config=client_config, **dict(config_params)) + + _ = az.AZ(config=config) + + mc_mi_credential.assert_called_once_with(identity_config={"resource_id": resource_id}) + mc_default_credential.assert_not_called() diff --git a/tests/unit/destination/az/test_list_files.py b/tests/unit/destination/az/test_list_files.py index 1e69322a8..fdb7a0d5b 100644 --- a/tests/unit/destination/az/test_list_files.py +++ b/tests/unit/destination/az/test_list_files.py @@ -19,68 +19,78 @@ def test_list_files_success(): """Tests AZ.list_files method, reading a list of blob names from azure.""" c = mocked_az() - c._container_client.list_blobs.return_value = BLOBS + c.container_client.list_blobs.return_value = BLOBS blobs = c._list_files() assert blobs == [b.name for b in BLOBS] - c._container_client.list_blobs.assert_called_once() + c.container_client.list_blobs.assert_called_once() def test_list_files_fail(): """Tests AZ.list_files method, re-raises an exception on failure""" c = mocked_az() - c._container_client.list_blobs.side_effect = ae.HttpResponseError() + c.container_client.list_blobs.side_effect = ae.HttpResponseError() with pytest.raises(Exception): c._list_files(PREFIX, False, False) - c._container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX, include=["metadata"]) + c.container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX.strip("/"), include=["metadata"]) def test_list_files_files_only(): """Tests AZ.list_files method, listing only file blobs""" c = mocked_az() - c._container_client.list_blobs.return_value = BLOBS + c.container_client.list_blobs.return_value = BLOBS blob_names = c._list_files(PREFIX, False, True) assert blob_names == ["blob2", "blob3"] - c._container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX, include=["metadata"]) + c.container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX.strip("/"), include=["metadata"]) def test_list_files_all_files(): """Tests AZ.list_files method, listing all blobs, including directories""" c = mocked_az() - c._container_client.list_blobs.return_value = BLOBS + c.container_client.list_blobs.return_value = BLOBS blob_names = c._list_files(PREFIX, False, False) assert blob_names == [b.name for b in BLOBS] - c._container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX, include=["metadata"]) + c.container_client.list_blobs.assert_called_once_with(name_starts_with=PREFIX.strip("/"), include=["metadata"]) def test_list_files_recursive(): """Tests AZ.list_files method, recursive option is ignored""" c = mocked_az() - c._container_client.list_blobs.return_value = BLOBS + c.container_client.list_blobs.return_value = BLOBS blob_names = c._list_files(PREFIX, False, False) blob_names_recursive = c._list_files(PREFIX, True, False) assert blob_names == blob_names_recursive - c._container_client.list_blobs.assert_called_with(name_starts_with=PREFIX, include=["metadata"]) + c.container_client.list_blobs.assert_called_with(name_starts_with=PREFIX.strip("/"), include=["metadata"]) def test_list_files_prefix(): """Tests AZ.list_files method, prefix is used as a filter in list_blobs only""" c = mocked_az() - c._container_client.list_blobs.return_value = BLOBS + c.container_client.list_blobs.return_value = BLOBS # Prefix is used as a filter in list_blobs, and because its mocked - it makes no difference. blob_names = c._list_files("".join(random.SystemRandom().choices(string.ascii_lowercase, k=10)), False, False) blob_names_recursive = c._list_files(PREFIX, False, False) assert blob_names == blob_names_recursive + + +def test_list_files_remote_path(): + """Tests AZ.list_files method, strips remote path from blob names""" + c = mocked_az() + c.container_client.list_blobs.return_value = BLOBS + [BlobProperties(name="himom/backups/blob4")] + + blob_names = c._list_files(PREFIX, False, True) + + assert blob_names == ["blob2", "blob3", "backups/blob4"] diff --git a/tests/unit/destination/az/test_read.py b/tests/unit/destination/az/test_read.py index 052cafcad..110a8cc7c 100644 --- a/tests/unit/destination/az/test_read.py +++ b/tests/unit/destination/az/test_read.py @@ -15,31 +15,37 @@ def test_read_success(): """Tests AZ.read method, ensuring the blob is read from azure.""" c = mocked_az() mock = MagicMock(StorageStreamDownloader) - c._container_client.download_blob.return_value = mock + c.container_client.download_blob.return_value = mock c.read(EXAMPLE_FILE) - c._container_client.download_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), encoding="utf-8") + c.container_client.download_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), encoding="utf-8", max_concurrency=c.config.max_concurrency + ) mock.read.assert_called_once() def test_read_fail(): """Tests AZ.read method, re-raises an exception on failure""" c = mocked_az() - c._container_client.download_blob.side_effect = ae.HttpResponseError() + c.container_client.download_blob.side_effect = ae.HttpResponseError() with pytest.raises(Exception): c.read(EXAMPLE_FILE) - c._container_client.download_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), encoding="utf-8") + c.container_client.download_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), encoding="utf-8", max_concurrency=c.config.max_concurrency + ) def test_read_fail_not_found(): """Tests AZ.read method, raising a twindb_backup.destination.exceptions.FileNotFound exception on ResourceNotFoundError""" c = mocked_az() - c._container_client.download_blob.side_effect = ae.ResourceNotFoundError() + c.container_client.download_blob.side_effect = ae.ResourceNotFoundError() with pytest.raises( - FileNotFound, match=f"File {c.render_path(EXAMPLE_FILE)} does not exist in container {c._container_name}" + FileNotFound, match=f"File {c.render_path(EXAMPLE_FILE)} does not exist in container {c.config.container_name}" ): c.read(EXAMPLE_FILE) - c._container_client.download_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), encoding="utf-8") + c.container_client.download_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), encoding="utf-8", max_concurrency=c.config.max_concurrency + ) diff --git a/tests/unit/destination/az/test_save.py b/tests/unit/destination/az/test_save.py index 0cafd2717..2965c3216 100644 --- a/tests/unit/destination/az/test_save.py +++ b/tests/unit/destination/az/test_save.py @@ -19,7 +19,9 @@ def test_save_success(): c.save(handler, EXAMPLE_FILE) - c._container_client.upload_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), file_obj) + c.container_client.upload_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), file_obj, max_concurrency=c.config.max_concurrency + ) def test_save_fail(): @@ -29,9 +31,11 @@ def test_save_fail(): file_obj = MagicMock() handler.__enter__.return_value = file_obj handler.__exit__.return_value = None - c._container_client.upload_blob.side_effect = ae.HttpResponseError() + c.container_client.upload_blob.side_effect = ae.HttpResponseError() with pytest.raises(Exception): c.save(handler, EXAMPLE_FILE) - c._container_client.upload_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), file_obj) + c.container_client.upload_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), file_obj, max_concurrency=c.config.max_concurrency + ) diff --git a/tests/unit/destination/az/test_write.py b/tests/unit/destination/az/test_write.py index 993039395..282daf0f3 100644 --- a/tests/unit/destination/az/test_write.py +++ b/tests/unit/destination/az/test_write.py @@ -13,15 +13,19 @@ def test_write_success(): c.write(CONTENT, EXAMPLE_FILE) - c._container_client.upload_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), CONTENT, overwrite=True) + c.container_client.upload_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), CONTENT, overwrite=True, max_concurrency=c.config.max_concurrency + ) def test_write_fail(): """Tests AZ.write method, re-raises an exception on failure""" c = mocked_az() - c._container_client.upload_blob.side_effect = ae.HttpResponseError() + c.container_client.upload_blob.side_effect = ae.HttpResponseError() with pytest.raises(Exception): c.write(CONTENT, EXAMPLE_FILE) - c._container_client.upload_blob.assert_called_once_with(c.render_path(EXAMPLE_FILE), CONTENT, overwrite=True) + c.container_client.upload_blob.assert_called_once_with( + c.render_path(EXAMPLE_FILE), CONTENT, overwrite=True, max_concurrency=c.config.max_concurrency + ) diff --git a/tests/unit/destination/az/util.py b/tests/unit/destination/az/util.py index 8b221f9fe..c83d5da50 100644 --- a/tests/unit/destination/az/util.py +++ b/tests/unit/destination/az/util.py @@ -1,20 +1,32 @@ -import collections +from collections.abc import Mapping from unittest.mock import MagicMock, patch from azure.storage.blob import ContainerClient import twindb_backup.destination.az as az +from twindb_backup.configuration.destinations.az import ( + AUTH_MODE_CONNECTION_STRING, + AUTH_MODE_MANAGED_IDENTITY, + AZClientConfig, + AZConfig, +) -class AZParams(collections.Mapping): +class AZClientConfigParams(Mapping): def __init__(self, only_required=False) -> None: - self.container_name = "test_container" - self.connection_string = "DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" if not only_required: - self.hostname = "test_host" - self.chunk_size = 123 - self.remote_path = "/himom" + self.api_version = "2021-04-10" + self.secondary_hostname = "secondary.example.com" + self.max_block_size = 128 * 1024 * 1024 # 128MB + self.max_single_put_size = 128 * 1024 * 1024 # 128MB + self.min_large_block_upload_threshold = 128 * 1024 * 1024 # 128MB + self.use_byte_buffer = False + self.max_page_size = 128 * 1024 * 1024 # 128MB + self.max_single_get_size = 128 * 1024 * 1024 # 128MB + self.max_chunk_get_size = 128 * 1024 * 1024 # 128MB + self.audience = "https://example.com" + self.connection_timeout = 30 def __iter__(self): return iter(self.__dict__) @@ -26,14 +38,34 @@ def __getitem__(self, key): return self.__dict__[key] -class AZConfigParams(collections.Mapping): - def __init__(self, only_required=False) -> None: - self.connection_string = "test_connection_string" +class AZConfigParams(Mapping): + def __init__( + self, + only_required=False, + auth_mode=AUTH_MODE_CONNECTION_STRING, + managed_identity_client_id=None, + managed_identity_resource_id=None, + create_container_if_missing=True, + ) -> None: self.container_name = "test_container" + self.auth_mode = auth_mode + self.create_container_if_missing = create_container_if_missing + + if auth_mode == AUTH_MODE_CONNECTION_STRING: + self.connection_string = ( + "DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;" + "AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" + ) + elif auth_mode == AUTH_MODE_MANAGED_IDENTITY: + self.account_url = "https://account-name.blob.core.windows.net" + if managed_identity_client_id is not None: + self.managed_identity_client_id = managed_identity_client_id + if managed_identity_resource_id is not None: + self.managed_identity_resource_id = managed_identity_resource_id if not only_required: - self.chunk_size = 123 - self.remote_path = "/himom" + self.remote_path = "/himom/" + self.max_concurrency = 4 def __iter__(self): return iter(self.__dict__) @@ -48,7 +80,12 @@ def __getitem__(self, key): def mocked_az(): with patch("twindb_backup.destination.az.AZ._connect") as mc: mc.return_value = MagicMock(spec=ContainerClient) - p = AZParams() - c = az.AZ(**dict(p)) + + client_params = AZClientConfigParams() + config_params = AZConfigParams() + + az_config = AZConfig(client_config=AZClientConfig(**dict(client_params)), **dict(config_params)) + + c = az.AZ(config=az_config) return c diff --git a/tests/unit/destination/base/test_cluster_lock.py b/tests/unit/destination/base/test_cluster_lock.py new file mode 100644 index 000000000..ac7c5d093 --- /dev/null +++ b/tests/unit/destination/base/test_cluster_lock.py @@ -0,0 +1,32 @@ +"""Base destination ``cluster_lock`` is a no-op that reports acquired.""" + +from twindb_backup.destination.base_destination import BaseDestination, ClusterLock + + +class _StubDestination(BaseDestination): + """Minimal concrete destination to exercise the base ``cluster_lock``.""" + + def delete(self, path): # pragma: no cover - not exercised + raise NotImplementedError + + def get_stream(self, copy): # pragma: no cover - not exercised + raise NotImplementedError + + def read(self, filepath): # pragma: no cover - not exercised + raise NotImplementedError + + def save(self, handler, filepath): # pragma: no cover - not exercised + raise NotImplementedError + + def write(self, content, filepath): # pragma: no cover - not exercised + raise NotImplementedError + + def _list_files(self, prefix=None, recursive=False, files_only=False): # pragma: no cover + raise NotImplementedError + + +def test_base_cluster_lock_is_noop_and_always_acquired(): + dst = _StubDestination(remote_path="/anywhere") + with dst.cluster_lock("any-id") as lock: + assert isinstance(lock, ClusterLock) + assert lock.acquired is True diff --git a/tests/unit/source/base_source/test_host.py b/tests/unit/source/base_source/test_host.py new file mode 100644 index 000000000..903108467 --- /dev/null +++ b/tests/unit/source/base_source/test_host.py @@ -0,0 +1,27 @@ +"""Tests for BaseSource.host / get_prefix behaviour with server_name override.""" + +import socket + +from twindb_backup.source.base_source import BaseSource + + +def test_host_defaults_to_hostname(): + src = BaseSource("daily") + assert src.host == socket.gethostname() + assert src.get_prefix() == f"{socket.gethostname()}/daily" + + +def test_host_uses_server_name_override(): + src = BaseSource("hourly", server_name="prod-primary-db") + assert src.host == "prod-primary-db" + assert src.get_prefix() == "prod-primary-db/hourly" + + +def test_empty_server_name_falls_back_to_hostname(): + src = BaseSource("daily", server_name="") + assert src.host == socket.gethostname() + + +def test_none_server_name_falls_back_to_hostname(): + src = BaseSource("daily", server_name=None) + assert src.host == socket.gethostname() diff --git a/twindb_backup/__init__.py b/twindb_backup/__init__.py index ab7bfe4b5..af4ff5ac8 100644 --- a/twindb_backup/__init__.py +++ b/twindb_backup/__init__.py @@ -40,7 +40,7 @@ class and saves the backup copy in something defined in a destination class. __author__ = "TwinDB Development Team" __email__ = "dev@twindb.com" -__version__ = "3.3.0" +__version__ = "3.6.0" STATUS_FORMAT_VERSION = 1 LOCK_FILE = "/var/run/twindb-backup.lock" LOG_FILE = "/var/log/twindb-backup-measures.log" diff --git a/twindb_backup/backup.py b/twindb_backup/backup.py index 2a58150b6..e316c7622 100644 --- a/twindb_backup/backup.py +++ b/twindb_backup/backup.py @@ -88,7 +88,12 @@ def backup_files(run_type, config: TwinDBBackupConfig): try: for directory in config.backup_dirs: LOG.debug("copying %s", directory) - src = FileSource(directory, run_type, tar_options=config.tar_options) + src = FileSource( + directory, + run_type, + tar_options=config.tar_options, + server_name=config.server_name, + ) dst = config.destination() _backup_stream(config, src, dst) src.apply_retention_policy(dst, config, run_type) @@ -102,7 +107,7 @@ def backup_files(run_type, config: TwinDBBackupConfig): ) -def backup_mysql(run_type, config): +def backup_mysql(run_type, config: TwinDBBackupConfig): """Take backup of local MySQL instance :param run_type: Run type @@ -116,12 +121,13 @@ def backup_mysql(run_type, config): dst = config.destination() backup_start = time.time() - status = MySQLStatus(dst=dst) + status = MySQLStatus(dst=dst, status_directory=config.server_name) kwargs = { "backup_type": status.next_backup_type(config.mysql.full_backup, run_type), "dst": dst, "xtrabackup_binary": config.mysql.xtrabackup_binary, + "server_name": config.server_name, } parent = status.candidate_parent(run_type) @@ -129,8 +135,10 @@ def backup_mysql(run_type, config): kwargs["parent_lsn"] = parent.lsn LOG.debug("Creating source %r", kwargs) - mysql_client = MySQLClient(config.mysql.defaults_file) - src = MYSQL_SRC_MAP[mysql_client.server_vendor](MySQLConnectInfo(config.mysql.defaults_file), run_type, **kwargs) + mysql_client = MySQLClient(defaults_file=config.mysql.defaults_file, hostname=config.mysql.hostname) + src = MYSQL_SRC_MAP[mysql_client.server_vendor]( + MySQLConnectInfo(defaults_file=config.mysql.defaults_file, hostname=config.mysql.hostname), run_type, **kwargs + ) callbacks = [] try: @@ -173,7 +181,7 @@ def backup_mysql(run_type, config): callback[0].callback(**callback[1]) -def backup_binlogs(run_type, config): # pylint: disable=too-many-locals +def backup_binlogs(run_type, config: TwinDBBackupConfig): # pylint: disable=too-many-locals """Copy MySQL binlog files to the backup destination. :param run_type: Run type @@ -186,8 +194,8 @@ def backup_binlogs(run_type, config): # pylint: disable=too-many-locals return dst = config.destination() - status = BinlogStatus(dst=dst) - mysql_client = MySQLClient(defaults_file=config.mysql.defaults_file) + status = BinlogStatus(dst=dst, status_directory=config.server_name) + mysql_client = MySQLClient(defaults_file=config.mysql.defaults_file, hostname=config.mysql.hostname) log_bin_basename = mysql_client.variable("log_bin_basename") if log_bin_basename is None: return @@ -203,7 +211,7 @@ def backup_binlogs(run_type, config): # pylint: disable=too-many-locals ) for binlog_name in backup_set: - src = BinlogSource(run_type, mysql_client, binlog_name) + src = BinlogSource(run_type, mysql_client, binlog_name, server_name=config.server_name) binlog_copy = BinlogCopy( src.host, binlog_name, @@ -285,20 +293,33 @@ def backup_everything(run_type, twindb_config, binlogs_only=False): """ set_open_files_limit() - try: - if not binlogs_only: - backup_start = time.time() - backup_files(run_type, twindb_config) - backup_mysql(run_type, twindb_config) - backup_binlogs(run_type, twindb_config) - end = time.time() - save_measures(backup_start, end) - else: - backup_binlogs(run_type, twindb_config) - except configparser.NoSectionError as err: - LOG.debug(traceback.format_exc()) - LOG.error(err) - exit(1) + # Gate the whole run behind a destination-level lock so that only one + # replica in a MySQL cluster uploads backups at a time. On + # destinations without native coordination this is a no-op. + coordinator = twindb_config.destination() + with coordinator.cluster_lock(twindb_config.server_name) as lock: + if not lock.acquired: + LOG.info( + "Skipping %s backup: another cluster member holds the lock for %s.", + run_type, + twindb_config.server_name, + ) + return + + try: + if not binlogs_only: + backup_start = time.time() + backup_files(run_type, twindb_config) + backup_mysql(run_type, twindb_config) + backup_binlogs(run_type, twindb_config) + end = time.time() + save_measures(backup_start, end) + else: + backup_binlogs(run_type, twindb_config) + except configparser.NoSectionError as err: + LOG.debug(traceback.format_exc()) + LOG.error(err) + exit(1) @contextmanager diff --git a/twindb_backup/cli.py b/twindb_backup/cli.py index bbe2b4785..61882af89 100644 --- a/twindb_backup/cli.py +++ b/twindb_backup/cli.py @@ -6,7 +6,6 @@ import os import shutil -import socket import tempfile import traceback @@ -167,13 +166,16 @@ def share_backup(ctx, s3_url): @click.option("--type", "copy_type", type=click.Choice(MEDIA_TYPES), default="mysql") @click.option( "--hostname", - help="Hostname", - show_default=True, - default=socket.gethostname(), + help="Identifier that namespaced the backup path (matches " + "``[source] server_name`` in the config; falls back to the local hostname).", + show_default=False, + default=None, ) @click.pass_context def status(ctx, copy_type, hostname): """Print backups status""" + if not hostname: + hostname = ctx.obj["twindb_config"].server_name dst = ctx.obj["twindb_config"].destination(backup_source=hostname) print(MEDIA_STATUS_MAP[copy_type](dst=dst, status_directory=hostname)) @@ -288,15 +290,20 @@ def verify(ctx): ) @click.option( "--hostname", - help="If backup_copy is latest this option " "specifies hostname where the backup copy was taken.", - default=socket.gethostname(), - show_default=True, + help="If backup_copy is 'latest', this option specifies the identifier " + "that namespaced the backup path (matches ``[source] server_name`` in " + "the config; falls back to the local hostname).", + default=None, + show_default=False, ) @click.pass_context def verify_mysql(ctx, hostname, dst, backup_copy): """Verify backup""" LOG.debug("mysql: %r", ctx.obj["twindb_config"]) + if not hostname: + hostname = ctx.obj["twindb_config"].server_name + try: if not backup_copy: list_available_backups(ctx.obj["twindb_config"]) diff --git a/twindb_backup/configuration/__init__.py b/twindb_backup/configuration/__init__.py index 7cd760d68..bea7d0df5 100644 --- a/twindb_backup/configuration/__init__.py +++ b/twindb_backup/configuration/__init__.py @@ -3,12 +3,13 @@ Module to process configuration file. """ import socket +import typing as t from configparser import ConfigParser, NoOptionError, NoSectionError from shlex import split from twindb_backup import INTERVALS, LOG from twindb_backup.configuration.compression import CompressionConfig -from twindb_backup.configuration.destinations.az import AZConfig +from twindb_backup.configuration.destinations.az import AZClientConfig, AZConfig from twindb_backup.configuration.destinations.gcs import GCSConfig from twindb_backup.configuration.destinations.s3 import S3Config from twindb_backup.configuration.destinations.ssh import SSHConfig @@ -103,11 +104,18 @@ def ssh(self): def az(self): # pylint: disable=invalid-name """Azure Blob configuration""" try: - return AZConfig(**self.__read_options_from_section("az")) - + az_config = self.__cast_options(self.__read_options_from_section("az")) except NoSectionError: return None + az_client_config = {} + try: + az_client_config = self.__cast_options(self.__read_options_from_section("az.client")) + except Exception: + pass + + return AZConfig(client_config=AZClientConfig(**az_client_config), **az_config) + @property def s3(self): # pylint: disable=invalid-name """Amazon S3 configuration""" @@ -219,13 +227,35 @@ def tar_options(self): except NoOptionError: return None - def destination(self, backup_source=socket.gethostname()): + @property + def server_name(self): + """Identifier used to namespace backup paths and the status file. + + Defaults to ``socket.gethostname()`` when ``[source] server_name`` + is not set. Override it (e.g. to a cluster-wide identifier like + ``prod-primary-db``) to let every replica in a MySQL cluster + share a single backup path instead of one per hostname. + """ + try: + value = self.__cfg.get("source", "server_name").strip().strip("\"'").strip() + except (NoOptionError, NoSectionError): + value = "" + return value or socket.gethostname() + + def destination(self, backup_source=None): """ - :param backup_source: Hostname of the host where backup is taken from. + :param backup_source: Identifier used to namespace per-source paths + on destinations that support it (SSH, S3, GCS). Defaults to + :pyattr:`server_name` (which in turn defaults to the local + hostname). Azure Blob does not use this argument — its path + layout is governed entirely by ``[source] server_name`` and + the destination's ``remote_path``. :type backup_source: str :return: Backup destination instance :rtype: BaseDestination """ + if backup_source is None: + backup_source = self.server_name try: backup_destination = self.__cfg.get("destination", "backup_destination") if backup_destination == "ssh": @@ -254,11 +284,7 @@ def destination(self, backup_source=socket.gethostname()): ) elif backup_destination == "az": return AZ( - connection_string=self.az.connection_string, - container_name=self.az.container_name, - chunk_size=self.az.chunk_size, - remote_path=self.az.remote_path, - hostname=backup_source, + config=self.az, ) else: raise ConfigurationError(f"Unsupported destination '{backup_destination}'") @@ -280,3 +306,22 @@ def __read_options_from_section(self, section): def __repr__(self): return f"{self.__class__.__name__}: {self._config_file}" + + def __cast_options(self, options: t.Dict[str, str]) -> t.Dict[str, t.Union[str, int, bool]]: + """Cast options to their correct types + + Args: + options (t.Dict[str, str]): A dictionary of kwargs to cast + + Returns: + t.Dict[str, t.Union[str, int, bool]]: An updated dictionary with the correct types + """ + for k, v in options.items(): + if v.lower() == "true": + options[k] = True + elif v.lower() == "false": + options[k] = False + elif v.isdigit(): + options[k] = int(v) + + return options diff --git a/twindb_backup/configuration/destinations/az.py b/twindb_backup/configuration/destinations/az.py index 6d3c03ab6..29ac3f376 100644 --- a/twindb_backup/configuration/destinations/az.py +++ b/twindb_backup/configuration/destinations/az.py @@ -1,45 +1,155 @@ """Azure Blob Storage destination configuration""" +import typing as t +from dataclasses import dataclass +AUTH_MODE_CONNECTION_STRING = "connection_string" +AUTH_MODE_MANAGED_IDENTITY = "managed_identity" +SUPPORTED_AUTH_MODES = (AUTH_MODE_CONNECTION_STRING, AUTH_MODE_MANAGED_IDENTITY) + + +# Parameters taken from: +# https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.containerclient?view=azure-python#keyword-only-parameters +@dataclass +class AZClientConfig: + """Azure Blob Container Client Configuration + + Attributes: + api_version (str, optional): The version of the Azure Storage API to use. Defaults to None. + secondary_hostname (str, optional): The secondary hostname to use for the storage account. Defaults to None. + max_block_size (int): The maximum size of a block in bytes. Defaults to 4MB. + max_single_put_size (int): The maximum size of a single put operation in bytes. Defaults to 64MB. + min_large_block_upload_threshold (int): The minimum size threshold for large block uploads in bytes. + Defaults to 4MB + 1. + use_byte_buffer (bool): Whether to use a byte buffer for uploads. Defaults to False. + max_page_size (int): The maximum size of a page in bytes. Defaults to 4MB. + max_single_get_size (int): The maximum size of a single get operation in bytes. Defaults to 32MB. + max_chunk_get_size (int): The maximum size of a chunk in bytes for get operations. Defaults to 4MB. + audience (str, optional): The audience for the Azure Storage account. Defaults to None. + connection_timeout (int): The connection timeout in seconds. Defaults to 20. + """ + + api_version: t.Optional[str] = None + secondary_hostname: t.Optional[str] = None + max_block_size: int = 4 * 1024 * 1024 # 4MB + max_single_put_size: int = 64 * 1024 * 1024 # 64MB + min_large_block_upload_threshold: int = (4 * 1024 * 1024) + 1 # 4MB + 1 + use_byte_buffer: bool = False + max_page_size: int = 4 * 1024 * 1024 # 4MB + max_single_get_size: int = 32 * 1024 * 1024 # 32MB + max_chunk_get_size: int = 4 * 1024 * 1024 # 4MB + audience: t.Optional[str] = None + connection_timeout: int = 20 + + def validate(self) -> None: + """Validates the configuration parameters for the Azure destination. + + Raises: + ValueError: Raises a ValueError if the type hint or value is incorrect for any of the parameters. + """ + + if self.api_version is not None and not isinstance(self.api_version, str): + raise ValueError("api_version must be a string or undefined") + if self.secondary_hostname is not None and not isinstance(self.secondary_hostname, str): + raise ValueError("secondary_hostname must be a string or undefined") + if not isinstance(self.max_block_size, int) or self.max_block_size <= 0: + raise ValueError("max_block_size must be a positive integer") + if not isinstance(self.max_single_put_size, int) or self.max_single_put_size <= 0: + raise ValueError("max_single_put_size must be a positive integer") + if not isinstance(self.min_large_block_upload_threshold, int) or self.min_large_block_upload_threshold <= 0: + raise ValueError("min_large_block_upload_threshold must be a positive integer") + if not isinstance(self.use_byte_buffer, bool): + raise ValueError("use_byte_buffer must be a boolean") + if not isinstance(self.max_page_size, int) or self.max_page_size <= 0: + raise ValueError("max_page_size must be a positive integer") + if not isinstance(self.max_single_get_size, int) or self.max_single_get_size <= 0: + raise ValueError("max_single_get_size must be a positive integer") + if not isinstance(self.max_chunk_get_size, int) or self.max_chunk_get_size <= 0: + raise ValueError("max_chunk_get_size must be a positive integer") + if self.audience is not None and not isinstance(self.audience, str): + raise ValueError("audience must be a string or undefined") + if not isinstance(self.connection_timeout, int) or self.connection_timeout <= 0: + raise ValueError("connection_timeout must be a positive integer") + + def __post_init__(self) -> None: + self.validate() + + +@dataclass class AZConfig: - """Azure Blob Storage Configuration.""" - - def __init__( - self, connection_string: str, container_name: str, chunk_size: int = 1024 * 1024 * 4, remote_path: str = "/" - ): - self._connection_string = connection_string - self._container_name = container_name - self._chunk_size = chunk_size - self._remote_path = remote_path - self.validate_config() - - def validate_config(self): - """Validate configuration.""" - if not isinstance(self._connection_string, str): - raise ValueError("CONNECTION_STRING must be a string") - if not isinstance(self._container_name, str): - raise ValueError("CONTAINER_NAME must be a string") - if not isinstance(self._chunk_size, int): - raise ValueError("CHUNK_SIZE must be an integer") - if not isinstance(self._remote_path, str): - raise ValueError("REMOTE_PATH must be a string") - - @property - def connection_string(self) -> str: - """CONNECTION_STRING""" - return self._connection_string - - @property - def container_name(self) -> str: - """CONTAINER_NAME""" - return self._container_name - - @property - def chunk_size(self) -> int: - """CHUNK_SIZE""" - return self._chunk_size - - @property - def remote_path(self) -> str: - """REMOTE_PATH""" - return self._remote_path + """Azure Blob Storage Configuration + + Attributes: + client_config (AZClientConfig): Configuration for the Azure Blob Container Client. + container_name (str): Name of the container in the Azure storage account. + auth_mode (str): Azure authentication mode. Defaults to connection_string. + connection_string (str, optional): Connection string for the Azure storage account. + account_url (str, optional): Blob service account URL used for managed identity authentication. + managed_identity_client_id (str, optional): User-assigned managed identity client ID. Mutually + exclusive with managed_identity_resource_id. + managed_identity_resource_id (str, optional): ARM resource ID of a user-assigned managed identity. + Preferred over managed_identity_client_id when multiple UAMIs are attached to the VM because + the resource ID is deterministic from naming and doesn't require reading the UAMI's GUID out + of Terraform. Mutually exclusive with managed_identity_client_id. + create_container_if_missing (bool, optional): Create the container if it does not exist. + Defaults to True. + remote_path (str, optional): Remote base path in the container to store backups. Defaults to "/". + max_concurrency (int, optional): Maximum number of concurrent requests to the Azure Storage service. + Defaults to 1. + """ + + client_config: AZClientConfig + container_name: str + connection_string: t.Optional[str] = None + account_url: t.Optional[str] = None + auth_mode: str = AUTH_MODE_CONNECTION_STRING + managed_identity_client_id: t.Optional[str] = None + managed_identity_resource_id: t.Optional[str] = None + create_container_if_missing: bool = True + remote_path: str = "/" + max_concurrency: int = 1 + + def validate(self) -> None: + """Validates the configuration parameters for the Azure destination. + + Raises: + ValueError: Raises a ValueError if the type hint or value is incorrect for any of the parameters. + """ + + if not isinstance(self.client_config, AZClientConfig): + raise ValueError("client_config must be an instance of AZClientConfig") + if not isinstance(self.container_name, str): + raise ValueError("container_name must be a string") + if self.connection_string is not None and not isinstance(self.connection_string, str): + raise ValueError("connection_string must be a string or undefined") + if self.account_url is not None and not isinstance(self.account_url, str): + raise ValueError("account_url must be a string or undefined") + if not isinstance(self.auth_mode, str) or self.auth_mode not in SUPPORTED_AUTH_MODES: + raise ValueError(f"auth_mode must be one of: {', '.join(SUPPORTED_AUTH_MODES)}") + if self.managed_identity_client_id is not None and not isinstance(self.managed_identity_client_id, str): + raise ValueError("managed_identity_client_id must be a string or undefined") + if self.managed_identity_resource_id is not None and not isinstance(self.managed_identity_resource_id, str): + raise ValueError("managed_identity_resource_id must be a string or undefined") + if self.managed_identity_client_id and self.managed_identity_resource_id: + raise ValueError( + "managed_identity_client_id and managed_identity_resource_id are mutually exclusive; set at most one" + ) + if not isinstance(self.create_container_if_missing, bool): + raise ValueError("create_container_if_missing must be a boolean") + if not isinstance(self.remote_path, str): + raise ValueError("remote_path must be a string") + if not isinstance(self.max_concurrency, int) or self.max_concurrency <= 0: + raise ValueError("max_concurrency must be a positive integer") + if self.auth_mode == AUTH_MODE_CONNECTION_STRING and not self.connection_string: + raise ValueError("connection_string is required when auth_mode=connection_string") + if self.auth_mode == AUTH_MODE_MANAGED_IDENTITY and not self.account_url: + raise ValueError("account_url is required when auth_mode=managed_identity") + + def __post_init__(self) -> None: + self.validate() + self.remote_path = self.remote_path.strip("/") if self.remote_path != "/" else self.remote_path + + +def drop_empty_dict_factory(d): + """Drop empty values from a dictionary""" + return {k: v for k, v in d if v is not None} diff --git a/twindb_backup/configuration/mysql.py b/twindb_backup/configuration/mysql.py index e4d8b180b..7647c8d3d 100644 --- a/twindb_backup/configuration/mysql.py +++ b/twindb_backup/configuration/mysql.py @@ -15,6 +15,7 @@ def __init__(self, **kwargs): self._expire_log_days = int(kwargs.get("expire_log_days", 7)) self._xtrabackup_binary = kwargs.get("xtrabackup_binary") self._xbstream_binary = kwargs.get("xbstream_binary") + self._hostname = kwargs.get("hostname", "127.0.0.1") @property def defaults_file(self): @@ -56,3 +57,9 @@ def xbstream_binary(self, path): """Set path to xbstream""" self._xbstream_binary = path + + @property + def hostname(self): + """MySQL hostname to connect to""" + + return self._hostname diff --git a/twindb_backup/destination/az.py b/twindb_backup/destination/az.py index e6ba0a59b..92d7dc486 100644 --- a/twindb_backup/destination/az.py +++ b/twindb_backup/destination/az.py @@ -4,53 +4,84 @@ """ import builtins import os -import socket +import threading import typing as t from contextlib import contextmanager +from dataclasses import asdict from multiprocessing import Process import azure.core.exceptions as ae -from azure.storage.blob import ContainerClient +from azure.storage.blob import BlobLeaseClient, ContainerClient from twindb_backup import LOG +from twindb_backup.configuration.destinations.az import ( + AUTH_MODE_MANAGED_IDENTITY, + AZConfig, + drop_empty_dict_factory, +) from twindb_backup.copy.base_copy import BaseCopy -from twindb_backup.destination.base_destination import BaseDestination +from twindb_backup.destination.base_destination import BaseDestination, ClusterLock from twindb_backup.destination.exceptions import FileNotFound +try: + from azure.identity import DefaultAzureCredential, ManagedIdentityCredential +except ImportError: # pragma: no cover - dependency is optional until managed identity is configured + DefaultAzureCredential = None + ManagedIdentityCredential = None + +_CLUSTER_LOCK_BLOB = ".twindb-backup/cluster.lock" +_CLUSTER_LOCK_MIN_TTL = 15 +_CLUSTER_LOCK_MAX_TTL = 60 + + +class _LeaseRenewer(threading.Thread): + """Background thread that keeps an Azure blob lease alive. + + Azure caps blob leases at 60 seconds, so long-running backups need + to renew periodically. The renewer sleeps ``ttl / 2`` between + renewals and stops quietly when :meth:`stop` is called or when a + renewal fails (the main thread will discover the lost lease when + it tries to release it). + """ + + def __init__(self, lease, ttl): + super(_LeaseRenewer, self).__init__(daemon=True, name="az-lease-renewer") + self._lease = lease + self._stop = threading.Event() + self._interval = max(5, ttl // 2) + + def run(self): + while not self._stop.wait(self._interval): + try: + self._lease.renew() + LOG.debug("Renewed Azure cluster-lock lease") + except builtins.Exception as err: # pragma: no cover - best-effort + LOG.warning("Failed to renew Azure cluster-lock lease: %s", err) + return + + def stop(self): + self._stop.set() + class AZ(BaseDestination): """Azure Blob Storage Destination class""" - def __init__( - self, - container_name: str, - connection_string: str, - hostname: str = socket.gethostname(), - chunk_size: int = 4 * 1024 * 1024, # TODO: Add support for chunk size - remote_path: str = "/", - ) -> None: + def __init__(self, config: AZConfig) -> None: """Creates an instance of the Azure Blob Storage Destination class, initializes the ContainerClient and validates the connection settings Args: - container_name (str): Name of the container in the Azure storage account - connection_string (str): Connection string for the Azure storage account - hostname (str, optional): Hostname of the host performing the backup. Defaults to socket.gethostname(). - chunk_size (int, optional): Size in bytes for read/write streams. Defaults to 4*1024*1024. - remote_path (str, optional): Remote base path in the container to store backups. Defaults to "/". + config (AZConfig): Azure Blob Storage Configuration Raises: err: Raises an error if the client cannot be initialized """ + self.config = config + self._credential = None - self._container_name = container_name - self._connection_string = connection_string - self._hostname = hostname - self._chunk_size = chunk_size - self._remote_path = remote_path - super(AZ, self).__init__(self._remote_path) + super(AZ, self).__init__(self.config.remote_path) - self._container_client = self._connect() + self.container_client = self._connect() """HELPER FUNCTIONS """ @@ -65,26 +96,65 @@ def _connect(self) -> ContainerClient: Returns: ContainerClient: An initialized ContainerClient """ - - client: ContainerClient = None - - # Create the container client - validates connection string format - try: - client = ContainerClient.from_connection_string(self._connection_string, self._container_name) - except builtins.ValueError as err: - LOG.error(f"Failed to create Azure Client. Error: {type(err).__name__}, Reason: {err}") - raise err + client = self._create_container_client() # Check if the container exists, if not, create it try: if not client.exists(): - client.create_container() + if self.config.create_container_if_missing: + client.create_container() + else: + raise builtins.ValueError( + f"Container {self.config.container_name} does not exist and " + "create_container_if_missing is disabled" + ) except builtins.Exception as err: LOG.error(f"Failed to validate or create container. Error: {type(err).__name__}, Reason: {err}") raise err return client + def _create_container_client(self) -> ContainerClient: + client_kwargs = asdict(self.config.client_config, dict_factory=drop_empty_dict_factory) + + try: + if self.config.auth_mode == AUTH_MODE_MANAGED_IDENTITY: + self._credential = self._build_managed_identity_credential() + return ContainerClient( + account_url=self.config.account_url, + container_name=self.config.container_name, + credential=self._credential, + **client_kwargs, + ) + + return ContainerClient.from_connection_string( + conn_str=self.config.connection_string, + container_name=self.config.container_name, + **client_kwargs, + ) + except builtins.ValueError as err: + LOG.error(f"Failed to create Azure Client. Error: {type(err).__name__}, Reason: {err}") + raise err + + def _build_managed_identity_credential(self): + """Pick the most specific managed-identity credential the config requests. + + Precedence: + 1. managed_identity_resource_id → ManagedIdentityCredential(identity_config={"resource_id": ...}) + 2. managed_identity_client_id → ManagedIdentityCredential(client_id=...) + 3. neither → DefaultAzureCredential() (covers system-assigned MI and local dev) + """ + if ManagedIdentityCredential is None or DefaultAzureCredential is None: + raise ImportError("azure-identity is required when auth_mode=managed_identity") + + if self.config.managed_identity_resource_id: + return ManagedIdentityCredential( + identity_config={"resource_id": self.config.managed_identity_resource_id} + ) + if self.config.managed_identity_client_id: + return ManagedIdentityCredential(client_id=self.config.managed_identity_client_id) + return DefaultAzureCredential() + def render_path(self, path: str) -> str: """Renders the absolute path for the Azure Blob Storage Destination @@ -94,7 +164,7 @@ def render_path(self, path: str) -> str: Returns: str: Absolute path to the blob in the container """ - return f"{self._remote_path}/{path}" + return f"{self.config.remote_path}/{path}".strip("/") def _download_to_pipe(self, blob_key: str, pipe_in: int, pipe_out: int) -> None: """Downloads a blob from Azure Blob Storage and writes it to a pipe @@ -107,7 +177,9 @@ def _download_to_pipe(self, blob_key: str, pipe_in: int, pipe_out: int) -> None: os.close(pipe_in) with os.fdopen(pipe_out, "wb") as pipe_out_file: try: - self._container_client.download_blob(blob_key).readinto(pipe_out_file) + self.container_client.download_blob(blob_key, max_concurrency=self.config.max_concurrency).readinto( + pipe_out_file + ) except builtins.Exception as err: LOG.error(f"Failed to download blob {blob_key}. Error: {type(err).__name__}, Reason: {err}") raise err @@ -126,7 +198,7 @@ def delete(self, path: str) -> None: """ LOG.debug(f"Attempting to delete blob: {self.render_path(path)}") try: - self._container_client.delete_blob(self.render_path(path)) + self.container_client.delete_blob(self.render_path(path)) except builtins.Exception as err: LOG.error(f"Failed to delete blob {self.render_path(path)}. Error: {type(err).__name__}, Reason: {err}") raise err @@ -171,10 +243,14 @@ def read(self, filepath: str) -> bytes: """ LOG.debug(f"Attempting to read blob: {self.render_path(filepath)}") try: - return self._container_client.download_blob(self.render_path(filepath), encoding="utf-8").read() + return self.container_client.download_blob( + self.render_path(filepath), encoding="utf-8", max_concurrency=self.config.max_concurrency + ).read() except ae.ResourceNotFoundError: - LOG.debug(f"File {self.render_path(filepath)} does not exist in container {self._container_name}") - raise FileNotFound(f"File {self.render_path(filepath)} does not exist in container {self._container_name}") + LOG.debug(f"File {self.render_path(filepath)} does not exist in container {self.config.container_name}") + raise FileNotFound( + f"File {self.render_path(filepath)} does not exist in container {self.config.container_name}" + ) except builtins.Exception as err: LOG.error(f"Failed to read blob {self.render_path(filepath)}. Error: {type(err).__name__}, Reason: {err}") raise err @@ -193,7 +269,9 @@ def save(self, handler: t.BinaryIO, filepath: str) -> None: LOG.debug(f"Attempting to save blob: {self.render_path(filepath)}") with handler as file_obj: try: - self._container_client.upload_blob(self.render_path(filepath), file_obj) + self.container_client.upload_blob( + self.render_path(filepath), file_obj, max_concurrency=self.config.max_concurrency + ) except builtins.Exception as err: LOG.error(f"Failed to upload blob or it already exists. Error {type(err).__name__}, Reason: {err}") raise err @@ -211,11 +289,107 @@ def write(self, content: str, filepath: str) -> None: LOG.debug(f"Attempting to write blob: {self.render_path(filepath)}") try: - self._container_client.upload_blob(self.render_path(filepath), content, overwrite=True) + self.container_client.upload_blob( + self.render_path(filepath), content, overwrite=True, max_concurrency=self.config.max_concurrency + ) except builtins.Exception as err: LOG.error(f"Failed to upload or overwrite blob. Error {type(err).__name__}, Reason: {err}") raise err + @contextmanager + def cluster_lock(self, identifier: str, ttl: int = _CLUSTER_LOCK_MAX_TTL): + """Acquire an Azure blob lease so only one cluster member runs + the backup at a time. + + The lease blob lives at ``/<_CLUSTER_LOCK_BLOB>`` + relative to the configured ``remote_path``. Requires the + ``Storage Blob Data Contributor`` role on the container (i.e. + the same permissions already required to upload backups). + + Behaviour: + + * If the lease is acquired, a background thread renews it every + ``ttl / 2`` seconds until the context exits. + * If another replica already holds the lease + (``LeaseAlreadyPresent``), yields a :class:`ClusterLock` with + ``acquired=False`` so the caller can skip the run cleanly. + * On any other Azure error, logs a warning and yields + ``acquired=True`` — we prefer running a (possibly duplicate) + backup over skipping because the coordination layer flaked. + + :param identifier: Cluster-scoped identifier. Typically + ``config.server_name``. + :param ttl: Lease duration in seconds. Clamped to the Azure + blob lease range ``[15, 60]``. + """ + lease_ttl = max(_CLUSTER_LOCK_MIN_TTL, min(_CLUSTER_LOCK_MAX_TTL, int(ttl))) + lock_key = self.render_path(f"{identifier}/{_CLUSTER_LOCK_BLOB}") + blob_client = self.container_client.get_blob_client(lock_key) + + self._ensure_cluster_lock_blob(blob_client, lock_key) + + lease = BlobLeaseClient(blob_client) + try: + lease.acquire(lease_duration=lease_ttl) + except ae.HttpResponseError as err: + if getattr(err, "error_code", None) == "LeaseAlreadyPresent": + LOG.info( + "Cluster lock %s is held by another member; skipping this run.", + lock_key, + ) + yield ClusterLock(acquired=False) + return + LOG.warning( + "Failed to acquire cluster lock %s (%s); proceeding without coordination.", + lock_key, + err, + ) + yield ClusterLock(acquired=True) + return + except builtins.Exception as err: # pragma: no cover - defensive + LOG.warning( + "Unexpected error while acquiring cluster lock %s (%s); proceeding without coordination.", + lock_key, + err, + ) + yield ClusterLock(acquired=True) + return + + renewer = _LeaseRenewer(lease, ttl=lease_ttl) + renewer.start() + try: + yield ClusterLock(acquired=True, holder=lock_key) + finally: + renewer.stop() + try: + lease.release() + except builtins.Exception as err: # pragma: no cover - best-effort + LOG.warning("Failed to release cluster lock %s: %s", lock_key, err) + + def _ensure_cluster_lock_blob(self, blob_client, lock_key: str) -> None: + """Create the lease blob if it does not already exist. + + Azure blob leases require a blob to lease against. The blob + itself holds no content — it exists only as a coordination + primitive. + """ + try: + blob_client.upload_blob(b"", overwrite=False) + except ae.ResourceExistsError: + return + except ae.HttpResponseError as err: + # Benign races where another member created the blob first + # can surface as a 409 with various error codes; treat any + # "already exists" response as success. + if getattr(err, "error_code", None) in ( + "BlobAlreadyExists", + "LeaseIdMissing", + "LeaseAlreadyPresent", + ): + return + LOG.warning("Failed to initialise cluster lock blob %s: %s", lock_key, err) + raise + def _list_files(self, prefix: str = "", recursive: bool = False, files_only: bool = False) -> t.List[str]: """List files in the Azure Blob Storage container @@ -226,20 +400,21 @@ def _list_files(self, prefix: str = "", recursive: bool = False, files_only: boo otherwise includes files and directories. Defaults to False. """ LOG.debug( - f"""Listing files in container {self._container_name} with prefix={prefix}, + f"""Listing files in container {self.config.container_name} with prefix={prefix.strip('/')}, recursive={recursive}, files_only={files_only}""" ) try: - blobs = self._container_client.list_blobs(name_starts_with=prefix, include=["metadata"]) + blobs = self.container_client.list_blobs(name_starts_with=prefix.strip("/"), include=["metadata"]) except builtins.Exception as err: LOG.error( - f"Failed to list files in container {self._container_name}. Error: {type(err).__name__}, Reason: {err}" + f"Failed to list files in container {self.config.container_name}. " + f"Error: {type(err).__name__}, Reason: {err}" ) raise err return [ - blob.name + blob.name.strip(self.config.remote_path).strip("/") for blob in blobs if not files_only or not (bool(blob.get("metadata")) and blob.get("metadata", {}).get("hdi_isfolder") == "true") diff --git a/twindb_backup/destination/base_destination.py b/twindb_backup/destination/base_destination.py index 5e8346954..fe41b1f84 100644 --- a/twindb_backup/destination/base_destination.py +++ b/twindb_backup/destination/base_destination.py @@ -4,12 +4,28 @@ """ import re from abc import abstractmethod +from contextlib import contextmanager from twindb_backup import LOG from twindb_backup.destination.exceptions import DestinationError from twindb_backup.exceptions import TwinDBBackupInternalError +class ClusterLock(object): + """Represents the outcome of :meth:`BaseDestination.cluster_lock`. + + ``acquired`` is ``True`` when this process holds the lock and may + proceed with the backup, or ``False`` when another cluster member + already holds it and the current run should skip. + """ + + __slots__ = ("acquired", "holder") + + def __init__(self, acquired, holder=None): + self.acquired = bool(acquired) + self.holder = holder + + class BaseDestination(object): """ Base destination class. @@ -114,6 +130,24 @@ def _list_files(self, prefix=None, recursive=False, files_only=False): """ raise NotImplementedError + @contextmanager + def cluster_lock(self, identifier, ttl=60): + """Acquire a cluster-wide exclusive lock so that only one replica + uploads backups for a given ``identifier`` at a time. + + The default implementation is a no-op that always reports the + lock as acquired; destinations that support a native + coordination primitive (e.g. Azure blob leases) override this. + + :param identifier: A stable cluster-scoped identifier. Typically + the value of ``config.server_name``. + :param ttl: Lease duration in seconds. Ignored by the base + implementation. + :yields: :class:`ClusterLock` + """ + _ = identifier, ttl + yield ClusterLock(acquired=True) + @staticmethod def _match_files(files, pattern=None): LOG.debug("Pattern: %s", pattern) diff --git a/twindb_backup/source/base_source.py b/twindb_backup/source/base_source.py index a270dc11a..63cae31a8 100644 --- a/twindb_backup/source/base_source.py +++ b/twindb_backup/source/base_source.py @@ -25,15 +25,20 @@ class BaseSource(object): _created_at = None _file_name_prefix = "" - def __init__(self, run_type): + def __init__(self, run_type, server_name=None): """ Construct instance of BaseSource() :param run_type: Run type e.g. hourly, daily, etc. :type run_type: str + :param server_name: Optional identifier used as the per-source + segment of the remote backup path. Defaults to + ``socket.gethostname()``. Set to a cluster-wide value (e.g. + ``prod-primary-db``) to make every replica share one path. + :type server_name: str or None """ self.run_type = run_type - self._host = socket.gethostname() + self._host = server_name or socket.gethostname() self._created_at = time.strftime("%Y-%m-%d_%H_%M_%S") @abstractmethod diff --git a/twindb_backup/source/binlog_source.py b/twindb_backup/source/binlog_source.py index 8778d77f8..aadd3ff3d 100644 --- a/twindb_backup/source/binlog_source.py +++ b/twindb_backup/source/binlog_source.py @@ -124,8 +124,8 @@ class BinlogSource(BaseSource): :type binlog_file: str """ - def __init__(self, run_type, mysql_client, binlog_file=None): - super(BinlogSource, self).__init__(run_type) + def __init__(self, run_type, mysql_client, binlog_file=None, server_name=None): + super(BinlogSource, self).__init__(run_type, server_name=server_name) self._mysql_client = mysql_client self._media_type = "binlog" self._binlog_file = binlog_file diff --git a/twindb_backup/source/file_source.py b/twindb_backup/source/file_source.py index 1e4facfa4..a24d48ee5 100644 --- a/twindb_backup/source/file_source.py +++ b/twindb_backup/source/file_source.py @@ -23,12 +23,12 @@ class FileSource(BaseSource): :type tar_options: str """ - def __init__(self, path, run_type, tar_options: str = None): + def __init__(self, path, run_type, tar_options: str = None, server_name=None): self.path = path self._suffix = "tar" self._media_type = "files" self._tar_options = tar_options - super(FileSource, self).__init__(run_type) + super(FileSource, self).__init__(run_type, server_name=server_name) @property def media_type(self): diff --git a/twindb_backup/source/mysql_source.py b/twindb_backup/source/mysql_source.py index b39ba8e34..a72c21235 100644 --- a/twindb_backup/source/mysql_source.py +++ b/twindb_backup/source/mysql_source.py @@ -180,7 +180,7 @@ def __init__(self, lsn=None, binlog_coordinate=None): self._file_name_prefix = "mysql" self.dst = kwargs.get("dst", None) self._xtrabackup = kwargs.get("xtrabackup_binary") or XTRABACKUP_BINARY - super(MySQLSource, self).__init__(run_type) + super(MySQLSource, self).__init__(run_type, server_name=kwargs.get("server_name")) @property def backup_tool(self): @@ -216,7 +216,7 @@ def get_stream(self): self._xtrabackup, "--defaults-file=%s" % self._connect_info.defaults_file, "--stream=xbstream", - "--host=127.0.0.1", + f"--host={self._connect_info.hostname}", "--backup", ] cmd += ["--target-dir", "."] diff --git a/vagrant/environment/puppet/modules/profile/files/twindb-backup.cfg b/vagrant/environment/puppet/modules/profile/files/twindb-backup.cfg index e7f5c6978..35a4358ab 100644 --- a/vagrant/environment/puppet/modules/profile/files/twindb-backup.cfg +++ b/vagrant/environment/puppet/modules/profile/files/twindb-backup.cfg @@ -2,6 +2,10 @@ [source] backup_dirs=/etc /root /home backup_mysql=yes +# server_name overrides the per-source segment of the remote backup +# path (defaults to the local hostname). Set a cluster-wide value so +# every replica shares one backup path. Example: +# server_name=prod-primary-db # Destination [destination] @@ -24,9 +28,31 @@ BUCKET="twindb-backups" # Azure destination settings [az] -connection_string="DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" +# auth_mode="connection_string" # optional, defaults to connection_string +# connection_string="DefaultEndpointsProtocol=https;AccountName=ACCOUNT_NAME;AccountKey=ACCOUNT_KEY;EndpointSuffix=core.windows.net" +# account_url="https://ACCOUNT_NAME.blob.core.windows.net" # required for managed_identity auth container_name="twindb-backups" +# Set at most ONE of managed_identity_resource_id / managed_identity_client_id. +# If neither is set, the system-assigned identity is used via DefaultAzureCredential. +# managed_identity_resource_id="/subscriptions/.../providers/Microsoft.ManagedIdentity/userAssignedIdentities/NAME" +# managed_identity_client_id="00000000-0000-0000-0000-000000000000" +# create_container_if_missing=true # optional, defaults to true #remote_path = /backups/mysql # optional +#max_concurrency = 1 # optional + +# Azure client optional settings +#[az.client] +# api_version="2019-02-02" # optional +# secondary_hostname="ACCOUNT_NAME-secondary.blob.core.windows.net" # optional +# max_block_size=4194304 # optional +# max_single_put_size=67108864 # optional +# min_large_block_upload_threshold=4194305 # optional +# use_byte_buffer=true # optional +# max_page_size=4194304 # optional +# max_single_get_size=33554432 # optional +# max_chunk_get_size=4194304 # optional +# audience="https://storage.azure.com/" # optional +# connection_timeout=20 # optional # GCS destination settings [gcs] @@ -44,6 +70,7 @@ backup_dir=/path/to/twindb-server-backups [mysql] mysql_defaults_file=/root/.my.cnf full_backup=daily +#hostname=localhost # optional, defaults to 127.0.0.1 # Retention [retention]