Source code for rauc_hawkbit.ddi.client

# -*- coding: utf-8 -*-

import json
import hashlib
import logging

from aiohttp.client import ClientTimeout
from datetime import datetime
from enum import Enum

from .deployment_base import DeploymentBase
from .softwaremodules import SoftwareModules
from .cancel_action import CancelAction

# status of the action execution
ConfigStatusExecution = Enum('ConfigStatusExecution',
                             'closed proceeding canceled scheduled rejected \

# defined status of the result
ConfigStatusResult = Enum('ConfigStatusResultFinished',
                          'success failure none')

[docs]class APIError(Exception): pass
[docs]class DDIClient(object): """ Base Direct Device Integration API client providing GET, POST and PUT helpers as well as access to next level API resources. """ error_responses = { 400: 'Bad Request - e.g. invalid parameters', 401: 'The request requires user authentication.', 403: 'Insufficient permissions or data volume restriction applies.', 404: 'Resource not available or device unknown.', 405: 'Method Not Allowed', 406: 'Accept header is specified and is not application/json.', 429: 'Too many requests.' } def __init__(self, session, host, ssl, auth_token, tenant_id, controller_id, timeout=10): self.session = session = host self.ssl = ssl self.logger = logging.getLogger('rauc_hawkbit') self.headers = {'Authorization': 'TargetToken {}'.format(auth_token)} self.tenant = tenant_id self.controller_id = controller_id self.timeout = timeout # URL parts which get replaced lateron self.placeholders = ['tenant', 'target', 'softwaremodule', 'action', 'filename'] self.replacements = { '/MD5SUM': '.MD5SUM' } @property def cancelAction(self): return CancelAction(self) @property def softwaremodules(self): return SoftwareModules(self) @property def deploymentBase(self): return DeploymentBase(self) async def __call__(self): """ Base poll resource See Returns: JSON data """ return await self.get_resource('/{tenant}/controller/v1/{controllerId}')
[docs] async def configData(self, status_execution, status_result, action_id='', status_details=(), **kwdata): """ Provide meta informtion during device registration at the update server. See # noqa Args: status_execution(ConfigStatusExecution): status of the action execution status_result(status_result): result of the action execution Keyword Args: action_id(str): Id of the action, not mandator for configData status_details((tuple, list)): List of details to provide other: passed as custom configuration data (key/value) """ assert isinstance(action_id, str), 'id must be string' assert isinstance(status_result, ConfigStatusResult), \ 'status_result_finished must be ConfigStatusResult enum' assert isinstance(status_execution, ConfigStatusExecution), \ 'status_execution must be ConfigStatusExecution enum' assert isinstance(status_details, (tuple, list)), \ 'status_details must be tuple or list' assert len(kwdata) > 0 time ='%Y%m%dT%H%M%S') put_data = { 'id': action_id, 'time': time, 'status': { 'result': { 'finished': }, 'execution':, 'details': status_details }, 'data': kwdata } await self.put_resource('/{tenant}/controller/v1/{controllerId}/configData', put_data)
[docs] def build_api_url(self, api_path): """ Build the actual API URL. Args: api_path(str): REST API path Returns: Expanded API URL with protocol (http/https) and host prepended """ protocol = 'https' if self.ssl else 'http' return '{protocol}://{host}{api_path}'.format( protocol=protocol,, api_path=api_path)
[docs] async def get_resource(self, api_path, query_params={}, **kwargs): """ Helper method for HTTP GET API requests. Args: api_path(str): REST API path Keyword Args: query_params: Query parameters to add to the API URL kwargs: Other keyword args used for replacing items in the API path Returns: Response JSON data """ get_headers = { 'Accept': 'application/json', **self.headers } url = self.build_api_url( api_path.format( tenant=self.tenant, controllerId=self.controller_id, **kwargs)) self.logger.debug('GET {}'.format(url)) async with self.session.get(url, headers=get_headers, params=query_params, timeout=ClientTimeout(self.timeout)) as resp: await self.check_http_status(resp) json = await resp.json() self.logger.debug(json) return json
[docs] async def get_binary_resource(self, api_path, dl_location, mime='application/octet-stream', timeout=3600, **kwargs): """ Helper method for binary HTTP GET API requests. Triggers download of the retreived content to ``dl_location``. Args: api_path(str): REST API path dl_location(str): storage path for downloaded artifact Keyword Args: mime: mimetype of content to retrieve (default: 'application/octet-stream') kwargs: Other keyword args used for replacing items in the API path Returns: MD5 hash of downloaded content """ url = self.build_api_url( api_path.format( tenant=self.tenant, controllerId=self.controller_id, **kwargs)) return await self.get_binary(url, dl_location, mime, timeout=timeout)
[docs] async def get_binary(self, url, dl_location, mime='application/octet-stream', timeout=3600): """ Actual download method with checksum checking. Args: url(str): URL of item to download dl_location(str): storage path for downloaded artifact Keyword Args: mime: mimetype of content to retrieve (default: 'application/octet-stream') timeout: download timeout (default: 3600) Returns: MD5 hash of downloaded content """ get_bin_headers = { 'Accept': mime, **self.headers } hash_md5 = hashlib.md5() self.logger.debug('GET binary {}'.format(url)) # session timeout & single socket read timeout timeout = ClientTimeout(timeout, sock_read=60) async with self.session.get(url, headers=get_bin_headers, timeout=timeout) as resp: await self.check_http_status(resp) with open(dl_location, 'wb') as fd: while True: chunk, _ = await resp.content.readchunk() # we are EOF if not chunk: break fd.write(chunk) hash_md5.update(chunk) return hash_md5.hexdigest()
[docs] async def post_resource(self, api_path, data, **kwargs): """ Helper method for HTTP POST API requests. Args: api_path(str): REST API path data: JSON data for POST request Keyword Args: kwargs: keyword args used for replacing items in the API path """ post_headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', **self.headers } url = self.build_api_url( api_path.format( tenant=self.tenant, controllerId=self.controller_id, **kwargs)) self.logger.debug('POST {}'.format(url)) async with, headers=post_headers, data=json.dumps(data), timeout=ClientTimeout(self.timeout)) as resp: await self.check_http_status(resp)
[docs] async def put_resource(self, api_path, data, **kwargs): """ Helper method for HTTP PUT API requests. Args: api_path(str): REST API path data: JSON data for POST request Keyword Args: kwargs: keyword args used for replacing items in the API path """ put_headers = { 'Content-Type': 'application/json', **self.headers } url = self.build_api_url( api_path.format( tenant=self.tenant, controllerId=self.controller_id, **kwargs)) self.logger.debug('PUT {}'.format(url)) self.logger.debug(json.dumps(data)) async with self.session.put(url, headers=put_headers, data=json.dumps(data), timeout=ClientTimeout(self.timeout)) as resp: await self.check_http_status(resp)
[docs] async def check_http_status(self, resp): """Log API error message.""" if resp.status != 200: error_description = await resp.text() if error_description: self.logger.debug('API error: {}'.format(error_description)) if resp.status in self.error_responses: reason = self.error_responses[resp.status] else: reason = resp.reason raise APIError('{status}: {reason}'.format( status=resp.status, reason=reason))