Source code for nrgpy.cloud_api.upload

try:
    from nrgpy import logger
except ImportError:
    pass
from datetime import datetime
import json
from nrgpy.utils.utilities import string_date_check, draw_progress_bar
from .auth import cloud_api, cloud_url_base, is_authorized
import os
import requests


[docs]class CloudImport(cloud_api): """Uses NRG hosted web-based API to import RLD and CSV/CSV.zip files to existing sites in NRG Cloud. To sign up for the service, go to https://cloud.nrgsystems.com/. Note that the site must exist in the NRG Cloud platform, and you must have Contributor or Administrator level access to the site to use these features. Attributes ---------- in_dir : str (path-like) path to data file directory filename : str provide for single file import site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str provided by NRG Systems client_secret : str provided by NRG Systems session_token : str headers : str headers passed in API call data : str data passed in API call resp : str API response job_ids : dict dictionary of filenames and job ids Examples -------- Import a single raw data file with NRG Import API >>> import nrgpy >>> filename = "/home/user/data/sympro/000123/000123_2019-05-23_19.00_003672.filename >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> importer = nrgpy.CloudImport( filename=filename, client_id=client_id, client_secret=client_secret, ) Import a folder of data files with NRG Import API >>> import nrgpy >>> file_filter = "000175" >>> in_directory = "filenames" >>> client_id = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> client_secret = "go to https://cloud.nrgsystems.com/data-manager/api-setup for access" >>> importer = nrgpy.CloudImport( file_filter=file_filter, in_dir=in_directory, client_id=client_id, client_secret=client_secret, start_date="2020-01-01", end_date="2020-01-31", ) >>> importer.process() """ def __init__( self, in_dir: str = "", filename: str = "", site_filter: str = "", filter2: str = "", start_date: str = "1970-01-01", end_date: str = "2150-12-31", client_id: str = "", client_secret: str = "", url_base: str = cloud_url_base, progress_bar: bool = True, **kwargs, ): """Initialize a cloud_export object. Parameters ---------- in_dir : str (path-like) path to data file directory filename : str provide for single file import site_filter : str, optional text filter for limiting file set filter2 : str, optional another text filter... start_date : str, optional text start date to filter on "YYYY-mm-dd" end_date : str, optional text end date to filter on "YYYY-mm-dd" client_id : str available in the NRG Cloud portal client_secret : str available in the NRG Cloud portal progress_bar : bool, default True whether to display the progress bar """ super().__init__(client_id, client_secret, url_base) self.site_filter = site_filter if "file_filter" in kwargs and site_filter == "": self.file_filter = kwargs.get("file_filter") self.site_filter = self.file_filter self.filter2 = filter2 self.in_dir = in_dir self.start_date = start_date self.end_date = end_date self.progress_bar = progress_bar self.job_ids = {} if filename: self.pad = 1 self.counter = 1 self.raw_count = 1 self.progress_bar = False self.start_time = datetime.now() self.single_file(filename) if in_dir: self.process()
[docs] def process(self): self.start_time = datetime.now() self.files = self.get_valid_files_from_in_dir() self.raw_count = len(self.files) self.pad = len(str(self.raw_count)) + 1 self.counter = 1 if len(self.files) == 0: logger.debug(f"no files to process in {self.in_dir}") raise FileNotFoundError(f"no files to process in {self.in_dir}") for filename in self.files: try: self.single_file(os.path.join(self.in_dir, filename)) self.counter += 1 if not is_authorized(self.resp): break except Exception: pass
[docs] def get_valid_files_from_in_dir(self) -> list: return [ f for f in sorted(os.listdir(self.in_dir)) if self.site_filter in f and self.filter2 in f and f.lower().endswith( tuple(["rld", "csv", "csv.zip", "diag", "statistical.dat"]) ) and string_date_check(self.start_date, self.end_date, f) ]
[docs] def single_file(self, filename: str = ""): try: if self.progress_bar: draw_progress_bar(self.counter, self.raw_count, self.start_time) else: print( "Processing {0}/{1} ... {2} ... ".format( str(self.counter).rjust(self.pad), str(self.raw_count).ljust(self.pad), os.path.basename(filename), ), end="", flush=True, ) self.encoded_filename_bytes = self.prepare_file_bytes(filename) self.encoded_filename_string = self.encoded_filename_bytes.decode("utf-8") self.maintain_session_token() self.data = { "FileBytes64BitEncoded": self.encoded_filename_string, "FileName": os.path.basename(filename), } self.resp = requests.post( json=self.data, url=self.import_url, headers=self.headers ) if self.resp.status_code == 200: if self.progress_bar is False: print("[DONE]") self.job_ids[os.path.basename(filename)] = json.loads(self.resp.text)[ "jobId" ] logger.info(f"imported {os.path.basename(filename)} OK") logger.debug(f"{self.resp.status_code} {self.resp.text}") elif self.resp.status_code == 401 or self.resp.status_code == 400: if ( "has already been imported" in json.loads(self.resp.text)["apiResponseMessage"] ): logger.info(self.resp.text) if self.progress_bar is False: print("[ALREADY IMPORTED]") else: logger.info(f"{os.path.basename(filename)}: {self.resp.text} ") if self.progress_bar is False: print("[PASSED]") pass else: logger.error(f"unable to import {os.path.basename(filename)}: FAILED") print(f"\nunable to process file: {filename}") print(f"{str(self.resp.status_code)} | {self.resp.reason}") print(self.resp.text.split(":")[1].split('"')[1]) except Exception as e: if self.progress_bar is False: print("[FAILED]") logger.error(f"unable to import {os.path.basename(filename)}: FAILED") logger.debug(e) print(f"unable to process file: {filename}")
cloud_import = CloudImport