Source code for nrgpy.read.spidar_txt

from datetime import datetime
from glob import glob
import os
from nrgpy.utils.utilities import (
    check_platform,
    windows_folder_path,
    linux_folder_path,
    string_date_check,
    draw_progress_bar,
)
import pandas as pd


[docs]class SpidarRead: """reads in CSV file(s) using pandas and creates Parameters ---------- data_file : str path to single CSV or ZIP to be read Returns -------- data : obj pandas dataframe of all available data heights : list list of measurement heights Examples ---------- Read a spidar data file into an object: >>> import nrgpy >>> reader = nrgpy.spidar_data_read(filename="1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-04_1.zip") >>> reader.heights ['40', '60', '80', '90', '100', '120', '130', '160', '180', '200'] >>> reader.data Timestamp pressure[mmHg] temperature[C] ... dir_200_mean[Deg] dir_200_std[Deg] wind_measure_200_quality[%] 0 2019-07-03 23:40:00 753.55 23.68 ... 342.36 63.63 48 1 2019-07-03 23:50:00 753.47 23.76 ... 345.70 57.59 38 2 2019-07-04 00:00:00 753.46 23.96 ... 314.16 82.73 20 ... Ex. read a directory of spidar data files into an object: >>> reader = nrgpy.spidar_data_read() >>> reader.concat_txt( txt_dir="/path/to/spidardata/", file_filter="2020-01", progress_bar=False ) Adding 1/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-01_1.zip [OK] Adding 2/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-01_2.csv [OK] Adding 3/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-02_1.zip [OK] Adding 4/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-03_1.zip [OK] Adding 5/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-04_1.zip [OK] Adding 6/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-05_1.zip [OK] Adding 7/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-06_1.zip [OK] Adding 8/8 ... /home/user/spidardata/1922AG0070_CAG70-SPPP-LPPP_PENT_AVGWND_2019-07-07_1.zip [OK] >>> reader.serial_number '1922AG0070' """ def __init__(self, filename=""): self.filename = filename self.reader_type = "SpidarV1" if self.filename: self.read_file(self.filename) pass def __repr__(self): return "<class {}: {} >".format(self.__class__.__name__, self.filename)
[docs] def read_file(self, f): try: self.data = pd.read_csv( f, encoding="UTF_16_LE", parse_dates=True, index_col=[0] ) except UnicodeDecodeError: self.data = pd.read_csv( f, encoding="UTF_8", parse_dates=True, index_col=[0] ) self.data.reset_index(drop=False, inplace=True) self.columns = self.data.columns self.get_heights() self.serial_number = os.path.basename(f).split("_")[0]
[docs] def concat_txt( self, txt_dir="", output_txt=False, out_file="", file_filter="", file_filter2="", start_date="1970-01-01", end_date="2150-12-31", progress_bar=True, ): """concatenate files in a folder parameters ---------- txt_dir : str (path-like) path to csv or csv.zip files output_txt : boolean export concatenated data out_file : str optional, filename of text export start_date : str yyy-mm-dd formatted string end_date : str yyy-mm-dd formatted string progress_bar : boolean show progress bar instead of each file being concatenated returns ------- None adds data dataframe to reader object """ self.txt_dir = txt_dir self.output_txt = output_txt self.out_file = out_file self.file_filter = file_filter self.file_filter2 = file_filter2 self.start_date = start_date self.end_date = end_date if check_platform() == "win32": self.txt_dir = windows_folder_path(txt_dir) else: self.txt_dir = linux_folder_path(txt_dir) first_file = True files = [ f for f in sorted(glob(self.txt_dir + "*")) if self.file_filter in f and self.file_filter2 in f and string_date_check(self.start_date, self.end_date, f) ] self.file_count = len(files) self.pad = len(str(self.file_count)) self.counter = 1 self.start_time = datetime.now() for f in files: if self.file_filter in f and self.file_filter2 in f: if progress_bar: draw_progress_bar(self.counter, self.file_count, self.start_time) else: print( "Adding {0}/{1} ... {2} ".format( str(self.counter).rjust(self.pad), str(self.file_count).ljust(self.pad), f, ), end="", flush=True, ) if first_file: first_file = False try: base = spidar_data_read(f) if not progress_bar: print("[OK]") pass except IndexError: print("Only standard Spidar headertypes accepted") break else: file_path = f try: s = spidar_data_read(file_path) base.data = base.data.append(s.data, sort=False) if not progress_bar: print("[OK]") except Exception as e: if not progress_bar: print("[FAILED]") print("could not concat {0}".format(file_path)) print(e) pass else: pass self.counter += 1 if out_file != "": self.out_file = out_file if output_txt: base.data.to_csv(txt_dir + out_file, sep=",", index=False) try: self.base = base self.heights = base.heights self.serial_number = base.serial_number self.data = base.data.drop_duplicates(subset=["Timestamp"], keep="first") self.data.reset_index(drop=True, inplace=True) self.data.reset_index(drop=True, inplace=True) except Exception as e: print("No files match to contatenate.") print(e) return None
[docs] def get_heights(self): self.heights = [ int(col.split("_")[1]) for col in self.columns if "horz_mean" in col and "m/s" in col ]
spidar_data_read = SpidarRead