Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/dev-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:
cache: pip
cache-dependency-path: scripts/requirements-dev.txt

- name: Install system dependencies
if: matrix.test-type == 'pytest'
run: sudo apt-get install -y libgl1 libglib2.0-0 libegl1

- name: Install dependencies
run: |
echo "Installing dependencies"
Expand Down
67 changes: 62 additions & 5 deletions BlocksScreen/configfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,19 @@ class ConfigError(Exception):
"""Exception raised when Configfile errors exist"""

def __init__(self, msg) -> None:
"""Store the error message on both the exception and the ``msg`` attribute."""
super().__init__(msg)
self.msg = msg


class BlocksScreenConfig:
"""Thread-safe wrapper around :class:`configparser.ConfigParser` with raw-text tracking.

Maintains a ``raw_config`` list that mirrors the on-disk file so that
``add_section``, ``add_option``, and ``update_option`` can write back
changes without losing comments or formatting.
"""

config = configparser.ConfigParser(
allow_no_value=True,
)
Expand All @@ -70,16 +78,19 @@ class BlocksScreenConfig:
def __init__(
self, configfile: typing.Union[str, pathlib.Path], section: str
) -> None:
"""Initialise with the path to the config file and the default section name."""
self.configfile = pathlib.Path(configfile)
self.section = section
self.raw_config: typing.List[str] = []
self.raw_dict_config: typing.Dict = {}
self.file_lock = threading.Lock() # Thread safety for future work

def __getitem__(self, key: str) -> BlocksScreenConfig:
"""Return a :class:`BlocksScreenConfig` for *key* section (same as ``get_section``)."""
return self.get_section(key)

def __contains__(self, key):
"""Return True if *key* is a section in the underlying ConfigParser."""
return key in self.config

def sections(self) -> typing.List[str]:
Expand Down Expand Up @@ -193,12 +204,14 @@ def getboolean(
)

def _find_section_index(self, section: str) -> int:
"""Return the index of the ``[section]`` header line in ``raw_config``."""
try:
return self.raw_config.index("[" + section + "]")
except ValueError as e:
raise configparser.Error(f'Section "{section}" does not exist: {e}')

def _find_section_limits(self, section: str) -> typing.Tuple:
"""Return ``(start_index, end_index)`` of *section* in ``raw_config``."""
try:
section_start = self._find_section_index(section)
buffer = self.raw_config[section_start:]
Expand All @@ -212,6 +225,7 @@ def _find_section_limits(self, section: str) -> typing.Tuple:
def _find_option_index(
self, section: str, option: str
) -> typing.Union[Sentinel, int, None]:
"""Return the index of the *option* line within *section* in ``raw_config``."""
try:
start, end = self._find_section_limits(section)
section_buffer = self.raw_config[start:][:end]
Expand Down Expand Up @@ -289,6 +303,40 @@ def add_option(
f'Unable to add "{option}" option to section "{section}": {e} '
)

def update_option(
self,
section: str,
option: str,
value: typing.Any,
) -> None:
"""Update an existing option's value in both raw tracking and configparser."""
try:
with self.file_lock:
if not self.config.has_section(section):
self.add_section(section)

if not self.config.has_option(section, option):
self.add_option(section, option, str(value))
return

line_idx = self._find_option_line_index(section, option)
self.raw_config[line_idx] = f"{option}: {value}"
self.config.set(section, option, str(value))
self.update_pending = True
except Exception as e:
logging.error(
f'Unable to update option "{option}" in section "{section}": {e}'
)

def _find_option_line_index(self, section: str, option: str) -> int:
"""Find the index of an option line within a specific section."""
start, end = self._find_section_limits(section)
opt_regex = re.compile(rf"^\s*{re.escape(option)}\s*[:=]")
for i in range(start + 1, end):
if opt_regex.match(self.raw_config[i]):
return i
raise configparser.Error(f'Option "{option}" not found in section "{section}"')

def save_configuration(self) -> None:
"""Save teh configuration to file"""
try:
Expand Down Expand Up @@ -319,6 +367,14 @@ def load_config(self):
raise configparser.Error(f"Error loading configuration file: {e}")

def _parse_file(self) -> typing.Tuple[typing.List[str], typing.Dict]:
"""Read and normalise the config file into a raw line list and a nested dict.

Strips comments, normalises ``=`` to ``:`` separators, deduplicates
sections/options, and ensures the buffer ends with an empty line.

Returns:
A tuple of (raw_lines, dict_representation).
"""
buffer = []
dict_buff: typing.Dict = {}
curr_sec: typing.Union[Sentinel, str] = Sentinel.MISSING
Expand All @@ -336,17 +392,18 @@ def _parse_file(self) -> typing.Tuple[typing.List[str], typing.Dict]:
if not line:
continue
# remove leading and trailing white spaces
line = re.sub(r"\s*([:=])\s*", r"\1", line)
line = re.sub(r"\s*([:=])\s*", r"\1 ", line)
line = re.sub(r"=", r":", line)
# find the beginning of sections
section_match = re.compile(r"[^\s]*\[([^]]+)\]")
match_sec = re.match(section_match, line) #
if match_sec:
sec_name = re.sub(r"[\[*\]]", r"", line)
if sec_name not in dict_buff.keys():
buffer.extend(
[""]
) # REFACTOR: Just add some line separation between sections
if buffer:
buffer.extend(
[""]
) # REFACTOR: Just add some line separation between sections
dict_buff.update({sec_name: {}})
curr_sec = sec_name
else:
Expand Down Expand Up @@ -388,4 +445,4 @@ def get_configparser() -> BlocksScreenConfig:
if not config_object.has_section("server"):
logging.error("Error loading configuration file for the application.")
raise ConfigError("Section [server] is missing from configuration")
return BlocksScreenConfig(configfile=configfile, section="server")
return config_object
Loading