Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 112 additions & 44 deletions src/azure-cli/azure/cli/command_modules/acr/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,40 +424,46 @@ def acr_login(cmd,

def _perform_registry_login(login_server, docker_command, username, password):
from subprocess import PIPE, Popen
logger.debug("Invoking '%s login --username %s --password <redacted> %s'",
logger.debug("Invoking '%s login --username %s --password-stdin %s'",
docker_command, username, login_server)
p = Popen([docker_command, "login",
"--username", username,
"--password", password,
login_server], stderr=PIPE)
_, stderr = p.communicate()
return_code = p.returncode

if stderr or return_code != 0: # when docker command process returns non-zero
stdin_error_msg = (
"Failed to pass credentials to '{cmd}' via stdin. "
"Please ensure '{cmd}' supports --password-stdin.".format(cmd=docker_command)
)
try:
p = Popen([docker_command, "login",
"--username", username,
"--password-stdin",
login_server], stdin=PIPE, stderr=PIPE)
_, stderr = p.communicate(input=password.encode())
Comment thread
lizMSFT marked this conversation as resolved.
return_code = p.returncode
except (BrokenPipeError, OSError):
raise CLIError(stdin_error_msg)

if stderr or return_code != 0:
if b'error storing credentials' in stderr and b'stub received bad data' in stderr \
and _check_wincred(login_server):
# Retry once after disabling wincred
p = Popen([docker_command, "login",
"--username", username,
"--password", password,
login_server])
p.wait()
else:
stderr_messages = stderr.decode()
# Dismiss the '--password-stdin' warning
if b'--password-stdin' in stderr:
errors = [err for err in stderr_messages.split('\n') if err and '--password-stdin' not in err]
# Will not raise CLIError if there is no error other than '--password-stdin'
if not errors:
return None
stderr_messages = '\n'.join(errors)
logger.warning(stderr_messages)

# Raise error only if docker returns non-zero
if return_code != 0:
try:
p = Popen([docker_command, "login",
"--username", username,
"--password-stdin",
login_server], stdin=PIPE, stderr=PIPE)
_, retry_stderr = p.communicate(input=password.encode())
except (BrokenPipeError, OSError):
raise CLIError(stdin_error_msg)
if p.returncode != 0:
if retry_stderr:
logger.warning(retry_stderr.decode())
raise CLIError('Login failed.')
return

return None
stderr_messages = stderr.decode()
logger.warning(stderr_messages)

# Raise error only if docker returns non-zero
if return_code != 0:
raise CLIError('Login failed.')


def acr_show_usage(cmd, client, registry_name, resource_group_name=None):
Expand All @@ -468,14 +474,89 @@ def acr_show_usage(cmd, client, registry_name, resource_group_name=None):
return client.list_usages(resource_group_name, registry_name)


def get_docker_command(is_diagnostics_context=False):
from ._errors import DOCKER_COMMAND_ERROR, DOCKER_DAEMON_ERROR
def _validate_command_path(docker_command, is_diagnostics_context=False):
"""Validate that the resolved binary is not located in the current working directory."""
resolved_path = shutil.which(docker_command)
if resolved_path:
try:
# Use the pre-symlink path for the security decision to prevent symlink bypass
resolved_dir = os.path.dirname(os.path.abspath(resolved_path))
cwd = os.path.abspath(os.getcwd())
# samefile catches Windows 8.3 short names, junctions, and subst drives
try:
dirs_match = os.path.samefile(resolved_dir, cwd)
except (OSError, ValueError):
dirs_match = (os.path.normcase(os.path.normpath(resolved_dir)) ==
os.path.normcase(os.path.normpath(cwd)))
if dirs_match:
resolved_real = os.path.realpath(resolved_path)
msg = (
"Refusing to use '{}' resolved at '{}' because it is located in the current "
"working directory. Set the DOCKER_COMMAND environment variable "
"to an absolute path of the trusted binary to override."
).format(docker_command, resolved_real)
if is_diagnostics_context:
logger.warning(msg)
return True
raise CLIError(msg)
Comment thread
lizMSFT marked this conversation as resolved.
except (ValueError, OSError):
# Cannot verify path safety — fail closed
if is_diagnostics_context:
logger.warning("Unable to verify binary path safety.")
return True
raise CLIError("Unable to verify binary path safety.")
return False


def _resolve_docker_command(is_diagnostics_context=False):
"""Resolve the docker/podman command name and validate it is not in CWD."""
from ._errors import DOCKER_COMMAND_ERROR
if os.getenv('DOCKER_COMMAND'):
docker_command = os.getenv('DOCKER_COMMAND')
if not os.path.isabs(docker_command):
if _validate_command_path(docker_command, is_diagnostics_context):
return None, DOCKER_COMMAND_ERROR
else:
docker_command = 'docker'
if not shutil.which('docker') and shutil.which('podman'):
docker_command = 'podman'
if _validate_command_path(docker_command, is_diagnostics_context):
return None, DOCKER_COMMAND_ERROR
return docker_command, None
Comment thread
lizMSFT marked this conversation as resolved.


def _try_wsl_exe_fallback(docker_command, is_diagnostics_context):
"""Retry with .exe suffix for WSL environments where the binary is not directly discoverable."""
from subprocess import PIPE, Popen, CalledProcessError
from ._errors import DOCKER_COMMAND_ERROR, DOCKER_DAEMON_ERROR
Comment thread
lizMSFT marked this conversation as resolved.
docker_command = f'{docker_command}.exe'
if _validate_command_path(docker_command, is_diagnostics_context):
return None, DOCKER_COMMAND_ERROR
try:
p = Popen([docker_command, "ps"], stdout=PIPE, stderr=PIPE)
_, stderr = p.communicate()
except OSError as e:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(e))
if is_diagnostics_context:
return None, DOCKER_COMMAND_ERROR
raise CLIError(DOCKER_COMMAND_ERROR.get_error_message())
except CalledProcessError as e:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(e))
if is_diagnostics_context:
return docker_command, DOCKER_DAEMON_ERROR
raise CLIError(DOCKER_DAEMON_ERROR.get_error_message())
if stderr:
if is_diagnostics_context:
return None, DOCKER_COMMAND_ERROR.set_error_message(stderr.decode())
raise CLIError(DOCKER_COMMAND_ERROR.set_error_message(stderr.decode()).get_error_message())
return docker_command, None


def get_docker_command(is_diagnostics_context=False):
from ._errors import DOCKER_COMMAND_ERROR, DOCKER_DAEMON_ERROR
docker_command, error = _resolve_docker_command(is_diagnostics_context)
if error:
return docker_command, error

from subprocess import PIPE, Popen, CalledProcessError
try:
Expand All @@ -484,20 +565,7 @@ def get_docker_command(is_diagnostics_context=False):
except OSError as e:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(e))
# The executable may not be discoverable in WSL so retry *.exe once
try:
docker_command = f'{docker_command}.exe'
p = Popen([docker_command, "ps"], stdout=PIPE, stderr=PIPE)
_, stderr = p.communicate()
except OSError as inner:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(inner))
if is_diagnostics_context:
return None, DOCKER_COMMAND_ERROR
raise CLIError(DOCKER_COMMAND_ERROR.get_error_message())
except CalledProcessError as inner:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(inner))
if is_diagnostics_context:
return docker_command, DOCKER_DAEMON_ERROR
raise CLIError(DOCKER_DAEMON_ERROR.get_error_message())
return _try_wsl_exe_fallback(docker_command, is_diagnostics_context)
except CalledProcessError as e:
logger.debug("Could not run '%s' command. Exception: %s", docker_command, str(e))
if is_diagnostics_context:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import unittest
from unittest import mock

from azure.cli.core.azclierror import CLIError
from azure.cli.command_modules.acr.custom import _validate_command_path


class TestValidateCommandPath(unittest.TestCase):
"""Tests for _validate_command_path CWD validation logic."""

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_binary_in_cwd_is_blocked(self, mock_getcwd, mock_which):
"""A docker binary located directly in CWD should be rejected."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/home/user/repo/docker'

with self.assertRaises(CLIError) as ctx:
_validate_command_path('docker', is_diagnostics_context=False)
self.assertIn('Refusing to use', str(ctx.exception))

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_binary_in_system_path_is_allowed(self, mock_getcwd, mock_which):
"""A docker binary in a system directory should be allowed."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/usr/bin/docker'

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_binary_in_subdirectory_of_cwd_is_allowed(self, mock_getcwd, mock_which):
"""A docker binary in a subdirectory of CWD should be allowed (only direct CWD is blocked)."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/home/user/repo/tools/docker'

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_binary_at_root_cwd_is_allowed(self, mock_getcwd, mock_which):
"""When CWD is filesystem root, system docker should not be blocked."""
mock_getcwd.return_value = '/'
mock_which.return_value = '/usr/bin/docker'

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
def test_binary_not_found_is_allowed(self, mock_which):
"""When docker is not found at all, validation should pass (let downstream handle it)."""
mock_which.return_value = None

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_diagnostics_mode_returns_true_on_unsafe(self, mock_getcwd, mock_which):
"""In diagnostics mode, should return True (unsafe) instead of raising."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/home/user/repo/docker'

result = _validate_command_path('docker', is_diagnostics_context=True)
self.assertTrue(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_diagnostics_mode_returns_false_on_safe(self, mock_getcwd, mock_which):
"""In diagnostics mode, should return False (safe) for system-path docker."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/usr/bin/docker'

result = _validate_command_path('docker', is_diagnostics_context=True)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_windows_cwd_binary_is_blocked(self, mock_getcwd, mock_which):
"""On Windows, a docker.exe in CWD should be blocked (case-insensitive)."""
mock_getcwd.return_value = 'C:\\Users\\dev\\repo'
mock_which.return_value = 'C:\\Users\\dev\\repo\\docker.exe'

with self.assertRaises(CLIError):
_validate_command_path('docker', is_diagnostics_context=False)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_windows_system_docker_is_allowed(self, mock_getcwd, mock_which):
"""On Windows, Docker Desktop in Program Files should be allowed."""
mock_getcwd.return_value = 'C:\\Users\\dev\\repo'
mock_which.return_value = 'C:\\Program Files\\Docker\\docker.exe'

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_windows_root_cwd_does_not_block_system_docker(self, mock_getcwd, mock_which):
"""When CWD is C:\\, Docker at C:\\Program Files should not be blocked."""
mock_getcwd.return_value = 'C:\\'
mock_which.return_value = 'C:\\Program Files\\Docker\\docker.exe'

result = _validate_command_path('docker', is_diagnostics_context=False)
self.assertFalse(result)

@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_error_message_includes_override_instruction(self, mock_getcwd, mock_which):
"""Error message should tell user how to override with DOCKER_COMMAND."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/home/user/repo/docker'

with self.assertRaises(CLIError) as ctx:
_validate_command_path('docker', is_diagnostics_context=False)
self.assertIn('DOCKER_COMMAND', str(ctx.exception))
self.assertIn('absolute path', str(ctx.exception))

@mock.patch('azure.cli.command_modules.acr.custom.os.path.realpath')
@mock.patch('azure.cli.command_modules.acr.custom.shutil.which')
@mock.patch('azure.cli.command_modules.acr.custom.os.getcwd')
def test_symlink_in_cwd_pointing_to_system_binary_is_blocked(self, mock_getcwd, mock_which, mock_realpath):
"""A symlink in CWD pointing to /usr/bin/docker should still be blocked (uses abspath, not realpath)."""
mock_getcwd.return_value = '/home/user/repo'
mock_which.return_value = '/home/user/repo/docker'
mock_realpath.return_value = '/usr/bin/docker'

with self.assertRaises(CLIError):
_validate_command_path('docker', is_diagnostics_context=False)
Loading