Refactor: Atomic file read/write for better performance

- Drop dependency atomicwrites
This commit is contained in:
LmeSzinc 2025-03-17 01:17:15 +08:00
parent 93e7e94bec
commit 93644384cf
8 changed files with 266 additions and 363 deletions

201
deploy/atomic.py Normal file
View File

@ -0,0 +1,201 @@
import os
import random
import re
import string
import time
from typing import Union
def random_id(length=6):
"""
Args:
length (int): 6 random letter (62^6 combinations) would be enough
Returns:
str: Random ID, like "sTD2kF"
"""
return ''.join(random.sample(string.ascii_letters + string.digits, length))
def atomic_write(
file: str,
data: Union[str, bytes],
max_attempt=5,
retry_delay=0.05,
):
"""
Atomic file write with minimal IO operation
and handles cases where file might be read by another process.
os.replace() is an atomic operation among all OS,
we write to temp file then do os.replace()
Args:
file:
data:
max_attempt: Max attempt if another process is reading,
effective only on Windows
retry_delay: Base time to wait between retries (seconds)
"""
suffix = random_id(6)
temp = f'{file}.{suffix}.tmp'
if isinstance(data, str):
mode = 'w'
encoding = 'utf-8'
newline = ''
elif isinstance(data, bytes):
mode = 'wb'
encoding = None
newline = None
else:
mode = 'w'
encoding = 'utf-8'
newline = ''
try:
# Write temp file
with open(temp, mode=mode, encoding=encoding, newline=newline) as f:
f.write(data)
# Ensure data flush to disk
f.flush()
os.fsync(f.fileno())
except FileNotFoundError:
# Create parent directory
directory = os.path.dirname(file)
if directory:
os.makedirs(directory, exist_ok=True)
# Write again
with open(temp, mode=mode, encoding=encoding, newline=newline) as f:
f.write(data)
# Ensure data flush to disk
f.flush()
os.fsync(f.fileno())
if os.name == 'nt':
# PermissionError on Windows if another process is reading
last_error = None
if max_attempt < 1:
max_attempt = 1
for trial in range(max_attempt):
try:
# Atomic operation
os.replace(temp, file)
# success
return
except PermissionError as e:
last_error = e
delay = 2 ** trial * retry_delay
time.sleep(delay)
continue
except Exception as e:
last_error = e
break
else:
# Linux and Mac allow existing reading
try:
# Atomic operation
os.replace(temp, file)
# success
return
except Exception as e:
last_error = e
# Clean up temp file on failure
try:
os.unlink(temp)
except:
pass
if last_error is not None:
raise last_error from None
def atomic_read(
file: str,
mode: str = 'r',
errors: str = 'strict',
max_attempt=5,
retry_delay=0.05,
):
"""
Atomic file read with minimal IO operation
Since os.replace() is atomic, atomic reading is just plain read.
Args:
file:
mode: 'r' or 'rb'
errors: 'strict', 'ignore', 'replace' and any other errors mode in open()
max_attempt: Max attempt if another process is reading,
effective only on Windows
retry_delay: Base time to wait between retries (seconds)
Returns:
str if mode is 'r'
bytes if mode is 'rb'
"""
if 'b' in mode:
encoding = None
errors = None
else:
encoding = 'utf-8'
if os.name == 'nt':
# PermissionError on Windows if another process is replacing
last_error = None
if max_attempt < 1:
max_attempt = 1
for trial in range(max_attempt):
try:
with open(file, mode=mode, encoding=encoding, errors=errors) as f:
# success
return f.read()
except FileNotFoundError:
return ''
except PermissionError as e:
last_error = e
delay = 2 ** trial * retry_delay
time.sleep(delay)
continue
except Exception as e:
last_error = e
break
if last_error is not None:
raise last_error from None
else:
# Linux and Mac allow reading while replacing
try:
with open(file, mode=mode, encoding=encoding, errors=errors) as f:
# success
return f.read()
except FileNotFoundError:
return ''
def atomic_failure_cleanup(path: str):
"""
Cleanup remaining temp file under given path.
In most cases there should be no remaining temp files unless write process get interrupted.
This method should only be called at startup
to avoid deleting temp files that another process is writing.
"""
with os.scandir(path) as entries:
for entry in entries:
if not entry.is_file():
continue
# Check suffix first to reduce regex calls
name = entry.name
if not name.endswith('.tmp'):
continue
# Check temp file format
res = re.match(r'.*\.[a-zA-Z0-9]{6,}\.tmp$', name)
if not res:
continue
# Delete temp file
file = f'{path}{os.sep}{name}'
try:
os.unlink(file)
except PermissionError:
# Another process is reading/writing
pass
except:
pass

View File

@ -90,6 +90,9 @@ class DeployConfig(ConfigModel):
logger.info(f"Rest of the configs are the same as default")
def read(self):
"""
Read and update deploy config, copy `self.configs` to properties.
"""
self.config = poor_yaml_read(DEPLOY_TEMPLATE)
self.config_template = copy.deepcopy(self.config)
origin = poor_yaml_read(self.file)

View File

@ -12,6 +12,8 @@ from deploy.pip import PipManager
class Installer(GitManager, PipManager, AdbManager, AppManager, AlasManager):
def install(self):
from deploy.atomic import atomic_failure_cleanup
atomic_failure_cleanup('./config')
try:
self.git_install()
self.alas_kill()

View File

@ -2,6 +2,8 @@ import os
import re
from typing import Callable, Generic, TypeVar
from deploy.atomic import atomic_read, atomic_write
T = TypeVar("T")
DEPLOY_CONFIG = './config/deploy.yaml'
@ -63,29 +65,26 @@ def poor_yaml_read(file):
Returns:
dict:
"""
if not os.path.exists(file):
return {}
content = atomic_read(file)
data = {}
regex = re.compile(r'^(.*?):(.*?)$')
with open(file, 'r', encoding='utf-8') as f:
for line in f.readlines():
line = line.strip('\n\r\t ').replace('\\', '/')
if line.startswith('#'):
continue
result = re.match(regex, line)
if result:
k, v = result.group(1), result.group(2).strip('\n\r\t\' ')
if v:
if v.lower() == 'null':
v = None
elif v.lower() == 'false':
v = False
elif v.lower() == 'true':
v = True
elif v.isdigit():
v = int(v)
data[k] = v
for line in content.splitlines():
line = line.strip('\n\r\t ').replace('\\', '/')
if line.startswith('#'):
continue
result = re.match(regex, line)
if result:
k, v = result.group(1), result.group(2).strip('\n\r\t\' ')
if v:
if v.lower() == 'null':
v = None
elif v.lower() == 'false':
v = False
elif v.lower() == 'true':
v = True
elif v.isdigit():
v = int(v)
data[k] = v
return data
@ -97,8 +96,8 @@ def poor_yaml_write(data, file, template_file=DEPLOY_TEMPLATE):
file (str):
template_file (str):
"""
with open(template_file, 'r', encoding='utf-8') as f:
text = f.read().replace('\\', '/')
text = atomic_read(template_file)
text = text.replace('\\', '/')
for key, value in data.items():
if value is None:
@ -109,5 +108,4 @@ def poor_yaml_write(data, file, template_file=DEPLOY_TEMPLATE):
value = "false"
text = re.sub(f'{key}:.*?\n', f'{key}: {value}\n', text)
with open(file, 'w', encoding='utf-8', newline='') as f:
f.write(text)
atomic_write(file, text)

View File

@ -1,236 +0,0 @@
"""
Copy-pasted from
https://github.com/untitaker/python-atomicwrites
"""
import contextlib
import io
import os
import sys
import tempfile
try:
import fcntl
except ImportError:
fcntl = None
# `fspath` was added in Python 3.6
try:
from os import fspath
except ImportError:
fspath = None
__version__ = '1.4.1'
PY2 = sys.version_info[0] == 2
text_type = unicode if PY2 else str # noqa
def _path_to_unicode(x):
if not isinstance(x, text_type):
return x.decode(sys.getfilesystemencoding())
return x
DEFAULT_MODE = "wb" if PY2 else "w"
_proper_fsync = os.fsync
if sys.platform != 'win32':
if hasattr(fcntl, 'F_FULLFSYNC'):
def _proper_fsync(fd):
# https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html
# https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html
# https://github.com/untitaker/python-atomicwrites/issues/6
fcntl.fcntl(fd, fcntl.F_FULLFSYNC)
def _sync_directory(directory):
# Ensure that filenames are written to disk
fd = os.open(directory, 0)
try:
_proper_fsync(fd)
finally:
os.close(fd)
def _replace_atomic(src, dst):
os.rename(src, dst)
_sync_directory(os.path.normpath(os.path.dirname(dst)))
def _move_atomic(src, dst):
os.link(src, dst)
os.unlink(src)
src_dir = os.path.normpath(os.path.dirname(src))
dst_dir = os.path.normpath(os.path.dirname(dst))
_sync_directory(dst_dir)
if src_dir != dst_dir:
_sync_directory(src_dir)
else:
from ctypes import windll, WinError
_MOVEFILE_REPLACE_EXISTING = 0x1
_MOVEFILE_WRITE_THROUGH = 0x8
_windows_default_flags = _MOVEFILE_WRITE_THROUGH
def _handle_errors(rv):
if not rv:
raise WinError()
def _replace_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING
))
def _move_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags
))
def replace_atomic(src, dst):
'''
Move ``src`` to ``dst``. If ``dst`` exists, it will be silently
overwritten.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
return _replace_atomic(src, dst)
def move_atomic(src, dst):
'''
Move ``src`` to ``dst``. There might a timewindow where both filesystem
entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be
raised.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
return _move_atomic(src, dst)
class AtomicWriter(object):
'''
A helper class for performing atomic writes. Usage::
with AtomicWriter(path).open() as f:
f.write(...)
:param path: The destination filepath. May or may not exist.
:param mode: The filemode for the temporary file. This defaults to `wb` in
Python 2 and `w` in Python 3.
:param overwrite: If set to false, an error is raised if ``path`` exists.
Errors are only raised after the file has been written to. Either way,
the operation is atomic.
:param open_kwargs: Keyword-arguments to pass to the underlying
:py:func:`open` call. This can be used to set the encoding when opening
files in text-mode.
If you need further control over the exact behavior, you are encouraged to
subclass.
'''
def __init__(self, path, mode=DEFAULT_MODE, overwrite=False,
**open_kwargs):
if 'a' in mode:
raise ValueError(
'Appending to an existing file is not supported, because that '
'would involve an expensive `copy`-operation to a temporary '
'file. Open the file in normal `w`-mode and copy explicitly '
'if that\'s what you\'re after.'
)
if 'x' in mode:
raise ValueError('Use the `overwrite`-parameter instead.')
if 'w' not in mode:
raise ValueError('AtomicWriters can only be written to.')
# Attempt to convert `path` to `str` or `bytes`
if fspath is not None:
path = fspath(path)
self._path = path
self._mode = mode
self._overwrite = overwrite
self._open_kwargs = open_kwargs
def open(self):
'''
Open the temporary file.
'''
return self._open(self.get_fileobject)
@contextlib.contextmanager
def _open(self, get_fileobject):
f = None # make sure f exists even if get_fileobject() fails
try:
success = False
with get_fileobject(**self._open_kwargs) as f:
yield f
self.sync(f)
self.commit(f)
success = True
finally:
if not success:
try:
self.rollback(f)
except Exception:
pass
def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(),
dir=None, **kwargs):
'''Return the temporary file to use.'''
if dir is None:
dir = os.path.normpath(os.path.dirname(self._path))
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
dir=dir)
# io.open() will take either the descriptor or the name, but we need
# the name later for commit()/replace_atomic() and couldn't find a way
# to get the filename from the descriptor.
os.close(descriptor)
kwargs['mode'] = self._mode
kwargs['file'] = name
return io.open(**kwargs)
def sync(self, f):
'''responsible for clearing as many file caches as possible before
commit'''
f.flush()
_proper_fsync(f.fileno())
def commit(self, f):
'''Move the temporary file to the target location.'''
if self._overwrite:
replace_atomic(f.name, self._path)
else:
move_atomic(f.name, self._path)
def rollback(self, f):
'''Clean up all temporary resources.'''
os.unlink(f.name)
def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
'''
Simple atomic writes. This wraps :py:class:`AtomicWriter`::
with atomic_write(path) as f:
f.write(...)
:param path: The target path to write to.
:param writer_cls: The writer class to use. This parameter is useful if you
subclassed :py:class:`AtomicWriter` to change some behavior and want to
use that new subclass.
Additional keyword arguments are passed to the writer class. See
:py:class:`AtomicWriter`.
'''
return writer_cls(path, **cls_kwargs).open()

View File

@ -1,14 +1,12 @@
import json
import os
import random
import string
from datetime import datetime, timedelta, timezone
import yaml
from filelock import FileLock
import module.config.server as server_
from module.config.atomicwrites import atomic_write
from deploy.atomic import atomic_read, atomic_write
from module.submodule.utils import *
LANGUAGES = ['zh-CN', 'en-US', 'ja-JP', 'zh-TW']
@ -79,33 +77,23 @@ def read_file(file):
Returns:
dict, list:
"""
folder = os.path.dirname(file)
if not os.path.exists(folder):
os.mkdir(folder)
if not os.path.exists(file):
return {}
_, ext = os.path.splitext(file)
lock = FileLock(f"{file}.lock")
with lock:
print(f'read: {file}')
if ext == '.yaml':
with open(file, mode='r', encoding='utf-8') as f:
s = f.read()
data = list(yaml.safe_load_all(s))
if len(data) == 1:
data = data[0]
if not data:
data = {}
return data
elif ext == '.json':
with open(file, mode='r', encoding='utf-8') as f:
s = f.read()
return json.loads(s)
else:
print(f'Unsupported config file extension: {ext}')
print(f'read: {file}')
if file.endswith('.json'):
content = atomic_read(file, mode='rb')
if not content:
return {}
return json.loads(content)
elif file.endswith('.yaml'):
content = atomic_read(file, mode='r')
data = list(yaml.safe_load_all(content))
if len(data) == 1:
data = data[0]
if not data:
data = {}
return data
else:
print(f'Unsupported config file extension: {file}')
return {}
def write_file(file, data):
@ -116,28 +104,20 @@ def write_file(file, data):
file (str):
data (dict, list):
"""
folder = os.path.dirname(file)
if not os.path.exists(folder):
os.mkdir(folder)
_, ext = os.path.splitext(file)
lock = FileLock(f"{file}.lock")
with lock:
print(f'write: {file}')
if ext == '.yaml':
with atomic_write(file, overwrite=True, encoding='utf-8', newline='') as f:
if isinstance(data, list):
yaml.safe_dump_all(data, f, default_flow_style=False, encoding='utf-8', allow_unicode=True,
sort_keys=False)
else:
yaml.safe_dump(data, f, default_flow_style=False, encoding='utf-8', allow_unicode=True,
sort_keys=False)
elif ext == '.json':
with atomic_write(file, overwrite=True, encoding='utf-8', newline='') as f:
s = json.dumps(data, indent=2, ensure_ascii=False, sort_keys=False, default=str)
f.write(s)
print(f'write: {file}')
if file.endswith('.json'):
content = json.dumps(data, indent=2, ensure_ascii=False, sort_keys=False, default=str)
atomic_write(file, content)
elif file.endswith('.yaml'):
if isinstance(data, list):
content = yaml.safe_dump_all(
data, default_flow_style=False, encoding='utf-8', allow_unicode=True, sort_keys=False)
else:
print(f'Unsupported config file extension: {ext}')
content = yaml.safe_dump(
data, default_flow_style=False, encoding='utf-8', allow_unicode=True, sort_keys=False)
atomic_write(file, content)
else:
print(f'Unsupported config file extension: {file}')
def iter_folder(folder, is_dir=False, ext=None):

View File

@ -1,16 +1,15 @@
import sys
import json
import time
import queue
import argparse
import json
import queue
import threading
import time
from datetime import datetime
from functools import partial
from typing import Dict, List, Optional
# Import fake module before import pywebio to avoid importing unnecessary module PIL
from module.webui.fake_pil_module import import_fake_pil_module
import_fake_pil_module()
from pywebio import config as webconfig
@ -1440,6 +1439,9 @@ def app():
logger.attr("CDN", cdn)
logger.attr("IS_ON_PHONE_CLOUD", IS_ON_PHONE_CLOUD)
from deploy.atomic import atomic_failure_cleanup
atomic_failure_cleanup('./config')
def index():
if key is not None and not login(key):
logger.warning(f"{info.user_ip} login failed.")

View File

@ -1,57 +1,10 @@
import copy
from filelock import FileLock
from deploy.config import DeployConfig as _DeployConfig
from deploy.utils import *
def poor_yaml_read_with_lock(file):
if not os.path.exists(file):
return {}
with FileLock(f"{file}.lock"):
return poor_yaml_read(file)
def poor_yaml_write_with_lock(data, file, template_file=DEPLOY_TEMPLATE):
folder = os.path.dirname(file)
if not os.path.exists(folder):
os.mkdir(folder)
with FileLock(f"{file}.lock"):
with FileLock(f"{DEPLOY_TEMPLATE}.lock"):
return poor_yaml_write(data, file, template_file)
class DeployConfig(_DeployConfig):
def show_config(self):
pass
def read(self):
"""
Read and update deploy config, copy `self.configs` to properties.
"""
self.config = poor_yaml_read_with_lock(DEPLOY_TEMPLATE)
self.config_template = copy.deepcopy(self.config)
origin = poor_yaml_read_with_lock(self.file)
self.config.update(origin)
for key, value in self.config.items():
if hasattr(self, key):
super().__setattr__(key, value)
self.config_redirect()
if self.config != origin:
self.write()
def write(self):
"""
Write `self.config` into deploy config.
"""
poor_yaml_write_with_lock(self.config, self.file)
def __setattr__(self, key: str, value):
"""
Catch __setattr__, copy to `self.config`, write deploy config.