""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import logging import time from defence360agent.contracts.config import BackupConfig from defence360agent.contracts.license import LicenseCLN from defence360agent.internals.cln import CLNError from defence360agent.rpc_tools import ValidationError from defence360agent.rpc_tools.lookup import CommonEndpoints, bind, wraps from defence360agent.subsys.backup_systems import ( Acronis, BackupException, CloudLinux, CloudLinuxBase, CloudLinuxOnPremise, R1Soft, get_available_backends_names, get_backend, ) from defence360agent.utils import Scope, Singleton logger = logging.getLogger(__name__) CLN_RESPONSE_ERROR = """ Error with one of the next reasons: * Not found linked backup for the server * Not found linked cloudlinux user for the server * IP of server is different in last time """ PLAIN_AUTH_FIELDS = { "username": "Username should be provided", "password": "Password should be provided", } REQUIRED_FIELDS = { Acronis: PLAIN_AUTH_FIELDS, R1Soft: { "encryption_key": "Encryption key should be provided", "ip": "IP should be provided", **PLAIN_AUTH_FIELDS, }, CloudLinuxOnPremise: PLAIN_AUTH_FIELDS, } def validate_backend_args(f): @wraps(f) async def wrapper(*args, **kwargs): required_options = REQUIRED_FIELDS.get(kwargs["backend"], {}) errors = [ error_msg for field, error_msg in required_options.items() if not kwargs.get(field) ] if errors: raise ValidationError("\n".join(errors)) return await f(*args, **kwargs) return wrapper class BackupEndpoints(CommonEndpoints, metaclass=Singleton): SCOPE = Scope.IM360 NOT_RUNNING, INIT, BACKUP, DONE = "not_running", "init", "backup", "done" def __init__(self, sink): super().__init__(sink) self._status = self.NOT_RUNNING self._current_backend = self._error = None self._init_task = self._backup_task = None self._backup_started_time = 0 @property def _backup_pending(self): return time.time() - self._backup_started_time < 60 * 5 @validate_backend_args @bind("backup-systems", "init") async def init(self, backend, **kwargs): if LicenseCLN.is_demo(): raise ValidationError("This action is not allowed in demo version") loop = asyncio.get_event_loop() if self._current_backend and not self._error: if self._status == self.INIT: raise ValidationError( "Backup initialization is already in progress" ) if self._status == self.BACKUP: raise ValidationError("Backup process is already in progress") logger.info("Starting init task") # flush error from previous sessions self._error = None self._task = loop.create_task(backend.init(**kwargs)) self._task.add_done_callback(self._init_task_done) self._status = self.INIT self._current_backend = backend return "Backup initialization process is in progress" async def _set_current_backend(self, conf): if not self._current_backend and conf["backup_system"]: self._current_backend = get_backend(conf["backup_system"]) if self._status == self.NOT_RUNNING: # if backup exists, assuming all done if await self._current_backend.check_state(): self._status = self.DONE else: self._error = "No backups found!" async def _include_init_stages_info(self, status): if status["state"] != self.INIT: advanced_data = await self._current_backend.show() status.update(advanced_data) if ( isinstance(self._current_backend, CloudLinuxBase) and self._status == self.BACKUP ): if self._error is not None: if await self._current_backend.check_state(): # error should already be fixed, # because there is some backups self._error = None status["state"] = self._status = self.DONE else: progress = await self._current_backend.get_backup_progress() if progress is None: if self._backup_pending: status["progress"] = 0 else: # if there is no progress after a while # let's admit backup is completed status["state"] = self._status = self.DONE else: status["progress"] = progress return status async def _get_advanced_status(self, status): await self._set_current_backend(status) status["state"] = self._status status["error"] = self._error status["log_path"] = getattr(self._current_backend, "log_path", None) status["backup_system"] = getattr(self._current_backend, "name", None) if self._error is not None and self._status == self.INIT: self._error = self._current_backend = None self._status = self.NOT_RUNNING if self._current_backend: try: status = await self._include_init_stages_info(status) except asyncio.CancelledError: raise except Exception as e: # Ignoring "No backup for host" errors for 5 minutes if "No backup for host" in str(e) and self._backup_pending: logger.info( "Error %s will be ignored for 5 minutes " "after init state has been finished", e, ) # want to show human friendly error if backup is unpaid elif self._current_backend == CloudLinux: resp = await self._current_backend.check() if resp.get("status") == CloudLinux.UNPAID: status["error"] = ( "Backup is unpaid! Please, check " "it out in your CLN account." ) else: raise e return status @bind("backup-systems", "extended-status") async def extended_status(self): status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) status = await self._get_advanced_status(status) return {"items": status} @bind("backup-systems", "status") async def status(self, user=None): status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) return {"items": status} @bind("backup-systems", "list") async def list(self): return get_available_backends_names() @bind("backup-systems", "disable") async def disable(self, backend, **kwargs): self._current_backend = self._error = None self._status = self.NOT_RUNNING await backend.disable(**kwargs) @bind("backup-systems", "check") async def check(self, backend): try: return {"items": await backend.check()} except CLNError: raise ValidationError(CLN_RESPONSE_ERROR) except (LookupError, RuntimeError) as e: raise ValidationError(str(e)) from e def _init_task_done(self, future): logger.info("In init done callback") e = future.exception() if e is not None: if isinstance(e, CLNError): self._error = CLN_RESPONSE_ERROR else: logger.exception("Backup init task failed", exc_info=e) self._error = str(e) else: logger.info("Starting initial backup task") self._status = self.BACKUP self._backup_started_time = time.time() self._backup_task = asyncio.get_event_loop().create_task( self._current_backend.make_backup() ) self._backup_task.add_done_callback(self._backup_task_done) def _backup_task_done(self, future): logger.info("In backup done callback") e = future.exception() if e is not None: logger.exception("Initial backup task failed", exc_info=e) if isinstance(e, BackupException): self._error = str(e) else: self._error = "Unknown error: " + str(e) else: self._status = self.DONE