Source code for nrgpy.cloud_api.convert

try:
    from nrgpy import logger
except ImportError:
    pass
from datetime import datetime
from nrgpy.utils.utilities import affirm_directory, string_date_check, draw_progress_bar
from .auth import cloud_api, cloud_url_base, is_authorized
import os
import requests
import zipfile


[docs]class CloudConvert(cloud_api): """Uses NRG hosted web-based API to convert RLD and RWD files to text format To sign up for the service, go to https://cloud.nrgsystems.com/. Note that the site must exist in the NRG Cloud platform, and you must have Contributor or Administrator level access to the site to use these features. Attributes ---------- rld_dir : str (path-like) path to rld file directory out_dir : str (path-like) path to save text export files filename : str provide for single file conversion site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str provided by NRG Systems client_secret : str provided by NRG Systems nec_file : str, optional path to NEC file for custom export formatting export_type : str [measurements], diagnostic, events, communication unzip : bool whether to extract the .txt data file from the .zip file session_token : str headers : str headers passed in API call data : str data passed in API call resp : str API response export_filepath : str (path-like) path of export file Examples -------- Convert a single raw data file to Text with NRG Convert API >>> import nrgpy >>> filename = "/home/user/data/sympro/000123/000123_2019-05-23_19.00_003672.rld >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> converter = nrgpy.CloudConvert( file_filter=file_filter, filename=filename, client_id=client_id, client_secret=client_secret, ) Convert a folder of raw data files to Text with NRG Convert API >>> import nrgpy >>> file_filter = "000175" >>> rld_directory = "rlds" >>> txt_dir = "/home/user/data/sympro/000123/txt/" >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> converter = nrgpy.CloudConvert( file_filter=file_filter, rld_dir=rld_directory, out_dir=txt_dir, client_id=client_id, client_secret=client_secret, start_date="2020-01-01", end_date="2020-01-31", ) >>> converter.process() """ def __init__( self, rld_dir: str = "", out_dir: str = "", filename: str = "", site_filter: str = "", filter2: str = "", start_date: str = "1970-01-01", end_date: str = "2150-12-31", client_id: str = "", client_secret: str = "", url_base: str = cloud_url_base, export_type: str = "measurements", nec_file: str = "", unzip: bool = True, progress_bar: bool = True, **kwargs, ): """Initialize a cloud_export object. Parameters ---------- rld_dir : str (path-like) path to rld file directory out_dir : str (path-like) path to save text export files filename : str provide for single file conversion site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str available in the NRG Cloud portal client_secret : str available in the NRG Cloud portal nec_file : str, optional path to NEC file for custom export formatting export_type : {'measurements', 'diagnostic', 'events', 'communication'}, default 'measurements' which type of data to export unzip : bool, default True whether to extract the .txt data file from the .zip file progress_bar : bool, default True whether to dispaly the progress bar """ super().__init__(client_id, client_secret, url_base) self.export_type = export_type self.site_filter = site_filter if "file_filter" in kwargs and site_filter == "": self.file_filter = kwargs.get("file_filter") self.site_filter = self.file_filter self.filter2 = filter2 self.start_date = start_date self.end_date = end_date self.nec_file = nec_file self.out_dir = out_dir self.rld_dir = rld_dir self.progress_bar = progress_bar self.unzip = unzip affirm_directory(self.out_dir) if filename: self.pad = 1 self.counter = 1 self.raw_count = 1 self.progress_bar = False self.start_time = datetime.now() self.single_file(filename) if rld_dir: self.process()
[docs] def process(self) -> None: self.start_time = datetime.now() self.files = [ f for f in sorted(os.listdir(self.rld_dir)) if self.site_filter in f and self.filter2 in f and f.lower().endswith("rld") # and f.lower().endswith(('rwd', 'rld')) ## Uncomment when RWD convert is supported and string_date_check(self.start_date, self.end_date, f) ] self.raw_count = len(self.files) self.pad = len(str(self.raw_count)) + 1 self.counter = 1 for rld in self.files: self.single_file(os.path.join(self.rld_dir, rld)) self.counter += 1 if not is_authorized(self.resp): break print("\n")
[docs] def single_file(self, rld) -> None: try: if self.progress_bar: draw_progress_bar(self.counter, self.raw_count, self.start_time) else: print( "Processing {0}/{1} ... {2} ... ".format( str(self.counter).rjust(self.pad), str(self.raw_count).ljust(self.pad), os.path.basename(rld), ), end="", flush=True, ) self.encoded_rld_bytes = self.prepare_file_bytes(rld) self.encoded_rld_string = self.encoded_rld_bytes.decode("utf-8") if self.nec_file: self.encoded_nec_bytes = self.prepare_file_bytes(self.nec_file) self.encoded_nec_string = self.encoded_nec_bytes.decode("utf-8") else: self.encoded_nec_bytes = "" self.encoded_nec_string = "" self.maintain_session_token() self.data = { "FileBytes64BitEncoded": self.encoded_rld_string, "NecFile64BitEncoded": self.encoded_nec_string, "exportType": self.export_type, # measurements (default) | samples } self.resp = requests.post( json=self.data, url=self.convert_url, headers=self.headers ) self.zip_file = os.path.basename(rld)[:-3] + "zip" self.filepath = os.path.join(self.out_dir, self.zip_file) if self.resp.status_code == 200: with open(self.filepath, "wb") as f: f.write(self.resp.content) if self.unzip: with zipfile.ZipFile(self.filepath, "r") as z: self.export_filename = z.namelist()[0] z.extractall(self.out_dir) os.remove(self.filepath) self.export_filepath = os.path.normpath( os.path.join(self.out_dir, self.export_filename) ) else: self.export_filepath = os.path.normpath(self.filepath) self.export_filename = self.zip_file if self.progress_bar is False: print("[DONE]") logger.info(f"converted {os.path.basename(self.export_filepath)} OK") elif self.resp.status_code == 401: pass else: logger.error( f"unable to convert {os.path.basename(self.export_filepath)}: FAILED" ) print("\nunable to process file: {0}".format(rld)) print(str(self.resp.status_code) + " | " + self.resp.reason) print(self.resp.text.split(":")[1].split('"')[1]) except Exception as e: if self.progress_bar is False: print("[FAILED]") logger.error( f"unable to convert {os.path.basename(self.export_filepath)}: FAILED" ) logger.debug(e) print("unable to process file: {0}".format(rld)) print(e) pass
cloud_convert = CloudConvert