Source code for nrgpy.cloud_api.jobs

try:
    from nrgpy import logger
except ImportError:
    pass
from datetime import datetime
import json
from nrgpy.utils.utilities import (
    draw_progress_bar,
)
from .auth import cloud_url_base, is_authorized
from .export import cloud_export
import os
import requests
import sys
import time
import traceback
import zipfile


[docs]class CloudExportJob(cloud_export): """Uses NRG hosted web-based API to download data in text format as an export job To sign up for the service, go to https://cloud.nrgsystems.com Note that the site must exist in the NRG Cloud platform, and you must have Contributor or Administrator level access to the site to use these features. Use for creating exports of any size. Extends nrgpy.cloud_export class Parameters ---------- out_dir : str (path-like) path to save exported data site_id : int NRG Cloud site identifier (NOT the site number) site_number : int site number logger_sn : str or int serial number of data logger (like, 820612345) start_date : str "YYYY-MM-DD HH:MM:SS" format, if just date it will return the whole day times are in logger local time end_date : str "YYYY-MM-DD HH:MM:SS" format, if just date it will return the whole day times are in logger local time client_id : str available in the NRG Cloud portal client_secret : str available in the NRG Cloud portal nec_file : str, optional path to NEC file for custom export formatting export_type : str ['measurements'], 'diagnostic', 'events', 'communication' file_format : str ['txt'], 'rld', 'zx' - whether tab-delimited text or binary output interval : str, optional 'oneMinute', 'twoMinute', 'fiveMinute', 'tenMinute', 'fifteenMinute', 'thirtyMinute', 'Hour', 'Day' must be a multiple of the logger's statistical interval. if not specified, will use "native" interval (usually oneMinute for solar, tenMinute for wind) concatenate : bool (True) set to False to return original CSV files in export (ZX only) unzip : bool (True) whether to extract the .txt data file from the .zip file Returns ------- object export object, including API response Examples -------- Download one year of data with an NEC file applied, and read data >>> import nrgpy >>> >>> client_id = "go to https://cloud.nrgsystems.com for access" >>> client_secret = "go to https://cloud.nrgsystems.com for access" >>> save_dir = '/path/to/exported/data' >>> >>> exporter = nrgpy.CloudExportJob( client_id=client_id, client_secret=client_secret, out_dir=save_dir, nec_file='12vbat.nec', site_number=3456, start_date="2021-04-01", end_date="2022-03-31", file_format="singleFile", # use "multipleFiles" for RLD export unzip=True, ) >>> >>> exporter.create_export_job() >>> exporter.monitor_export_job(download=True) >>> # read resulting files >>> reader = nrgpy.sympro_txt_read(exporter.export_filepath) >>> reader.format_site_data() >>> if reader: >>> print(f"Site number : {reader.site_number}") >>> print(f"Site description : {reader.site_description}") >>> reader.interval_check = nrgpy.check_intervals(reader.data) >>> else: >>> print("unable to get reader") Download 15 days of data files from ZX 300 lidar site >>> import nrgpy >>> >>> client_id = "go to https://cloud.nrgsystems.com for access" >>> client_secret = "go to https://cloud.nrgsystems.com for access" >>> save_dir = '/path/to/exported/data' >>> >>> exporter = nrgpy.CloudExportJob( client_id=client_id, client_secret=client_secret, out_dir=save_dir, site_number=567, start_date="2022-09-01", end_date="2022-09-15", file_format="zx", concatenate=True, unzip=True, ) >>> >>> exporter.create_export_job() >>> exporter.monitor_export_job(download=True) """ def __init__( self, out_dir: str = "", site_id: str = "", site_number: str = "", logger_sn: str = "", start_date: str = "2014-01-01", end_date: str = "2023-12-31", file_format: str = "txt", client_id: str = "", client_secret: str = "", url_base: str = cloud_url_base, nec_file: str = "", export_type: str = "measurements", concatenate: bool = True, interval: str = "", unzip: bool = True, **kwargs, ): super().__init__( client_id=client_id, client_secret=client_secret, url_base=url_base, out_dir=out_dir, site_id=site_id, site_number=site_number, logger_sn=logger_sn, start_date=start_date, end_date=end_date, file_format=file_format, nec_file=nec_file, export_type=export_type, interval=interval, concatenate=concatenate, unzip=unzip, ) pass
[docs] def create_export_job(self) -> None: """Create export job""" self.prepare_post_data() self.request_time = datetime.now() self.export_request_time = datetime.now() logger.debug(f"creating export job for site {self.site_id}") try: self.resp = requests.post( json=self.data, url=self.create_export_job_url, headers=self.headers ) except Exception: logger.debug(f"{traceback.format_exc()}") return False self.request_duration = datetime.now() - self.request_time if not is_authorized(self.resp): return False try: self.json_response = json.loads(self.resp.content) self.job_id = self.json_response["jobId"] logger.info(f"created export job {self.job_id} for site {self.site_id}") print(f"created export job {self.job_id} for site {self.site_id}") except Exception: logger.error( f"unable to create export job for {self.site_id}, {self.start_date}, {self.end_date}" #noqa E501 ) logger.debug(traceback.format_exc()) return False
[docs] def check_export_job(self) -> None: """Checks the status of an export job Uses self.job_id as reference """ self.request_time = datetime.now() self.resp = requests.get( url=f"{self.export_job_url}?jobId={self.job_id}&download=false", headers=self.headers, ) if self.resp.status_code == 200: self.json_response = json.loads(self.resp.content) self.status = self.json_response["status"] elif not is_authorized(self.resp): return False else: logger.error(f"could not get status for {self.job_id}") logger.debug(f"{self.resp.status_code} | {self.resp.reason}") logger.debug(self.resp.text.split(":")[1].split('"')[1]) print(str(self.resp.status_code) + " | " + self.resp.reason) print(self.resp.text.split(":")[1].split('"')[1]) return False
[docs] def monitor_export_job(self, download: bool = False) -> None: """Monitor the status of an export job Uses self.job_id as reference """ stop = False start = datetime.now() i = 0 try: while not stop: i += 1 self.check_export_job() if ( self.json_response["status"].lower() == "error" or self.json_response["status"].lower() == "completed" ): stop = True sys.stdout.write("\r") sys.stdout.write( f"job ID {self.job_id} | time elapsed: {(datetime.now() - start).seconds} s | status: {self.json_response['status'].lower()} " # noqa E501 ) time.sleep(0.5) if download: if self.json_response["status"].lower() == "completed": print(f"\nDownloading export job {self.job_id}") logger.info(f"\nDownloading export job {self.job_id}") self.download_export() else: print("Unable to download export job") logger.error("Unable to download export job") except KeyboardInterrupt: print("Keyboard Interrupt detected: stopping monitoring") logger.info("Keyboard Interrupt detected: stopping monitoring")
[docs] def download_export(self) -> None: self.request_time = datetime.now() self.resp = requests.get( url=f"{self.export_job_url}?jobId={self.job_id}&download=true", headers=self.headers, stream=True, ) if self.resp.status_code == 200: total = int(self.resp.headers.get("content-length", 0)) dl = 0 with open(self.filepath, "wb") as f: for chunk in self.resp.iter_content(chunk_size=10248): if chunk: f.write(chunk) dl += int(len(chunk)) draw_progress_bar( int(dl / 1024), int(total / 1024), self.request_time, label="kB" ) self.request_duration = datetime.now() - self.request_time if self.unzip: with zipfile.ZipFile(self.filepath, "r") as z: self.export_filename = z.namelist()[0] z.extractall(self.out_dir) os.remove(self.filepath) self.export_filepath = os.path.normpath( os.path.join(self.out_dir, self.export_filename) ) else: self.export_filepath = os.path.normpath(self.filepath) self.export_filename = self.zip_file logger.info(f"job_id {self.job_id} for site_id {self.site_id}") logger.info( f"export took {self.request_duration} for {os.path.getsize(self.export_filepath)} bytes" # noqa E501 ) elif not is_authorized(self.resp): return False else: logger.error(f"could not get status for {self.job_id}") logger.debug(f"{self.resp.status_code} | {self.resp.reason}") logger.debug(self.resp.text.split(":")[1].split('"')[1]) print(str(self.resp.status_code) + " | " + self.resp.reason) print(self.resp.text.split(":")[1].split('"')[1]) return False
export_job = CloudExportJob