try:
from nrgpy import logger
except ImportError:
pass
import datetime
from datetime import datetime, timedelta # noqa: F811
from glob import glob
import os
import pandas as pd
from nrgpy.utils.utilities import (
check_platform,
windows_folder_path,
linux_folder_path,
draw_progress_bar,
string_date_check,
renamer,
)
import traceback
[docs]class LogrRead:
def __init__(
self,
filename: str = "",
out_file: str = "",
text_timestamps: bool = False,
logger_local_time: bool = False,
**kwargs,
):
"""Class of Pandas dataframes created from LOGR dat file.
If a filename is passed when calling class, the file is read in alone.
Otherwise, an instance of the class is created, and the concat_txt function may
be called to combine all txt files in a directory.
Filters may be used on any part of the filename, to combine a subset of dat
files in a directory.
Parameters
----------
filename : str, optional
path to filename
out_file : str, optional
path to outputted file
text_timestamps : boolean
set to True for text timestamps
logger_local_time : boolean
(True) convert dat file UTC timestamps to logger local time
Returns
---------
ch_info : obj
pandas dataframe of ch_list (below) pulled out of file with
logr_read.arrange_ch_info()
ch_list : list
list of channel info; can be converted to json w/ import json ...
json.dumps(fut.ch_info)
data : obj
pandas dataframe of all data
head : obj
lines at the top of the txt file..., used when rebuilding timeshifted files
site_info : obj
pandas dataframe of site information
logger_sn : str
logger_type : str
latitude : float
longitude : float
elevation : int
site_description : str
start_date : str
"""
self.filename = filename
self.text_timestamps = text_timestamps
self.out_file = out_file
self.logger_local_time = logger_local_time
self.reader_type = "LOGR"
if out_file == "":
out_file = datetime.today().strftime("%Y-%m-%d") + "_LOGR.dat"
if self.filename:
i = 0
with open(self.filename) as infile:
for line in infile:
if line == "Data\n":
break
else:
i = i + 1
with open(self.filename) as myfile:
self.head = "".join([myfile.readline() for _ in range(2)])
header_len = i + 1
read_len = header_len - 5
self.site_info = pd.read_csv(
self.filename,
skiprows=2,
sep="\t",
index_col=False,
nrows=read_len,
usecols=[0, 1],
header=None,
)
self.site_info = self.site_info.iloc[
: self.site_info.loc[self.site_info[0] == "Data"].index.tolist()[0] + 1
]
self.data = pd.read_csv(
self.filename, skiprows=header_len, sep="\t", encoding="iso-8859-1"
)
if not self.text_timestamps:
self.data["Timestamp"] = pd.to_datetime(self.data["Timestamp"])
self.arrange_ch_info()
if not hasattr(self, "site_details"):
self.format_site_data()
if self.logger_local_time and not self.text_timestamps:
self.data["TimestampUTC"] = self.data["Timestamp"]
self.data["Timestamp"] = self.data["TimestampUTC"] + timedelta(
hours=int(self.time_zone)
)
elif self.logger_local_time and self.text_timestamps:
print(
"Cannot convert timestamps to local if using text_timestamps==True"
)
logger.error(
"Cannot convert timestamps to local if using text_timestamps==True"
)
self.first_timestamp = self.data.iloc[0]["Timestamp"]
def __repr__(self):
return "<class {}: {} >".format(self.__class__.__name__, self.filename)
[docs] def arrange_ch_info(self):
"""creates ch_info dataframe and ch_list array"""
array = [
"Channel:",
"Channel", # <--- fix for missing colon in dat file Channel key
"Sensor Type:",
"Description:",
"Serial Number:",
"Measurand:",
"Height:",
"Bearing:",
"Scale Factor:",
"Offset:",
"Units:",
"Vane Mounting Angle:",
"Gust Source Channel:",
]
self.array = array
self.ch_info = pd.DataFrame()
ch_data = {}
ch_list = []
ch_details = 0
for row in self.site_info.loc[self.site_info[0].isin(array)].iterrows():
# if row[1][0] == array[0] and ch_details == 0: # start channel data read
if (
row[1][0] in (array[0], array[1]) and ch_details == 0
): # start channel data read
ch_details = 1
ch_data[row[1][0]] = row[1][1]
elif (
# row[1][0] == array[0] and ch_details == 1
row[1][0] in (array[0], array[1])
and ch_details == 1
): # close channel, start new data read
ch_list.append(ch_data)
ch_data = {}
ch_data[row[1][0]] = row[1][1]
elif row[1][0] in str(array):
ch_data[row[1][0]] = row[1][1]
ch_list.append(ch_data) # last channel's data
ch_df = pd.DataFrame(ch_list)
self.ch_list = ch_list
self.ch_info = pd.concat(
[self.ch_info, ch_df], ignore_index=True, axis=0, join="outer"
)
# correction for calculated channel colon missing
def return_channel_number(x):
"""temporary fix for missing colon on dat file Channel key"""
if pd.isnull(x["Channel:"]):
return x["Channel"]
else:
return x["Channel:"]
self.ch_info["Channel:"] = self.ch_info.apply(
lambda x: return_channel_number(x), axis=1
)
return self
[docs] def concat_txt(
self,
dat_dir: str = "",
file_type: str = "statistical",
file_filter: str = "",
filter2: str = "",
start_date: str = "1970-01-01",
end_date: str = "2150-12-31",
ch_details: bool = False,
output_txt: bool = False,
out_file: str = "",
progress_bar: bool = True,
**kwargs,
):
"""Will concatenate all text files in the dat_dir
files must match the site_filter argument. Note these are both blank by default.
Parameters
----------
dat_dir : str (path-like)
directory holding txt files
file_type : str
type of export (meas, event, comm, sample, etc...)
file_filter : str
text filter for txt files, like site number, etc.
filter2 : str
secondary text filter
start_date : str
for filtering files to concat based on date "YYYY-mm-dd"
end_date : str
for filtering files to concat based on date "YYYY-mm-dd"
ch_details : bool
show additional info in ch_info dataframe
output_txt : bool
create a txt output of data df
out_file : str
filename to write data dataframe too if output_txt = True
progress_bar : bool
show bar on concat [True] or list of files [False]
Returns
-------
ch_info : obj
pandas dataframe of ch_list (below) pulled out of file with
logr_read.arrange_ch_info()
ch_list : list
list of channel info; can be converted to json w/ import json ...
json.dumps(fut.ch_info)
data : obj
pandas dataframe of all data
head : obj
lines at the top of the txt file..., used when rebuilding timeshifted files
site_info : obj
pandas dataframe of site information
logger_sn : str
ipack_sn : str
logger_type : str
ipack_type : str
latitude : float
longitude : float
elevation : int
site_number : str
site_description : str
start_date : str
dat_file_names : list
list of files included in concatenation
Examples
--------
Read files into nrgpy reader object
>>> import nrgpy
>>> reader = nrgpy.logr_read()
>>> reader.concat_txt(
dat_dir='/path/to/dat/files/',
file_filter='123456', # site 123456
start_date='2020-01-01',
end_date='2020-01-31',
)
Time elapsed: 2 s | 33 / 33 [=============================================] 100%
Queue processed
>>> reader.logger_sn
'511'
>>> reader.ch_info
Channel: Description: Offset: Scale Factor: Serial Number: Type: Units:
0 1 NRG S1 0.13900 0.09350 94120000059 Anemometer m/s
1 2 NRG S1 0.13900 0.09350 94120000058 Anemometer m/s
2 3 NRG S1 0.13900 0.09350 94120000057 Anemometer m/s
3 4 NRG 40C Anem 0.35000 0.76500 179500324860 Anemometer m/s
4 5 NRG 40C Anem 0.35000 0.76500 179500324859 Anemometer m/s
5 6 NRG S1 0.13900 0.09350 94120000056 Anemometer m/s
6 13 NRG 200M Vane -1.46020 147.91100 10700000125 Vane Deg
7 14 NRG 200M Vane -1.46020 147.91100 10700000124 Vane Deg
8 5 NRG T60 Temp -40.85550 44.74360 9400000705 Analog C
9 6 NRG T60 Temp 40.85550 44.74360 9400000xxx Analog C
10 7 NRG RH5X Humi 0.00000 20.00000 NaN Analog %RH
11 0 NRG BP60 Baro 95.27700 243.91400 NaN Analog hPa
12 1 NRG BP60 Baro 95.04400 244.23900 9396FT1937 Analog hPa
"""
if "site_filter" in kwargs and file_filter == "":
self.file_filter = kwargs.get("site_filter")
else:
self.file_filter = file_filter
self.ch_details = ch_details
self.start_date = start_date
self.end_date = end_date
self.filter2 = filter2
self.file_type = file_type
self.dat_file_names = []
if "txt_dir" in kwargs and not dat_dir:
dat_dir = kwargs.get("txt_dir")
if check_platform() == "win32":
self.dat_dir = windows_folder_path(dat_dir)
else:
self.dat_dir = linux_folder_path(dat_dir)
first_file = True
files = [
os.path.join(self.dat_dir, f)
for f in sorted(os.listdir(self.dat_dir))
if f.endswith("dat")
and self.file_filter in f
and self.filter2 in f
and self.file_type in f
and string_date_check(self.start_date, self.end_date, f)
]
self.file_count = len(files)
self.pad = len(str(self.file_count))
self.counter = 1
self.start_time = datetime.now()
self.failed_files = []
logger.info(f"Concatenating {self.file_count} files...")
for f in files:
if progress_bar:
draw_progress_bar(self.counter, self.file_count, self.start_time)
else:
print(
"Adding {0}/{1} ... {2} ... ".format(
str(self.counter).rjust(self.pad),
str(self.file_count).ljust(self.pad),
os.path.basename(f),
),
end="",
flush=True,
)
if first_file:
first_file = False
try:
base = logr_read(
f,
text_timestamps=self.text_timestamps,
logger_local_time=self.logger_local_time,
)
if not progress_bar:
print("[OK]")
self.dat_file_names.append(os.path.basename(f))
except IndexError:
print("Only standard LOGR headertypes accepted")
break
except Exception:
if not progress_bar:
print("[FAILED]")
print("could not concat {0}".format(os.path.basename(f)))
logger.error("could not concat {0}".format(os.path.basename(f)))
logger.debug(traceback.format_exc())
else:
file_path = f
try:
s = logr_read(
file_path,
ch_details=self.ch_details,
text_timestamps=self.text_timestamps,
site_details=False,
)
base.data = pd.concat(
[base.data, s.data], ignore_index=True, axis=0, join="outer"
)
base.ch_info = pd.concat(
[base.ch_info, s.ch_info],
ignore_index=True,
axis=0,
join="outer",
)
if not progress_bar:
print("[OK]")
self.dat_file_names.append(os.path.basename(f))
except IndexError:
logger.debug(traceback.format_exc())
self.failed_files.append(f)
except Exception:
logger.debug(traceback.format_exc())
if not progress_bar:
print("[FAILED]")
print("could not concat {0}".format(os.path.basename(f)))
pass
self.counter += 1
if out_file != "":
self.out_file = out_file
if output_txt:
base.data.to_csv(os.path.join(dat_dir, out_file), sep=",", index=False)
try:
self.ch_info = s.ch_info
self.ch_list = s.ch_list
self.array = s.array
self.data = base.data.drop_duplicates(subset=["Timestamp"], keep="first")
self.data.reset_index(drop=True, inplace=True)
base.ch_info["ch"] = base.ch_info["Channel:"].astype(int)
try:
self.ch_info = (
base.ch_info.sort_values(by=["ch"])
.drop_duplicates(
subset=[
col for col in self.array if col in base.ch_info.columns
],
ignore_index=True,
)
.drop(columns=["ch", "Channel"], axis=1)
)
except KeyError as e:
logger.debug(e)
self.first_timestamp = base.first_timestamp
self.head = s.head
self.site_info = s.site_info
self.format_site_data()
print("\n")
logger.info(f"Concatenation of {len(self.data)} rows complete")
except UnboundLocalError:
print("No files match to contatenate.")
logger.error(f"No files in {self.dat_dir} match to contatenate.")
return None
if len(self.failed_files) > 0:
print(
f"{len(self.failed_files)} files unable to be concatenated. See failed_files list" # noqa: E501
)
[docs] def output_txt_file(
self,
standard: bool = True,
shift_timestamps: bool = False,
out_file: str = "",
**kwargs,
):
out_dir = kwargs.get("out_dir", "")
if shift_timestamps:
os.makedirs(out_dir, exist_ok=True)
file_date = (
str(self.data.iloc[0]["Timestamp"])
.replace(" ", "_")
.replace(":", ".")[:-3]
)
file_num = self.filename.split("_")[len(self.filename.split("_")) - 2]
file_name = "{0}_{1}_{2}_meas.txt".format(
self.site_number, file_date, file_num
)
output_name = os.path.join(out_dir, file_name)
self.output_name = output_name
output_file = open(output_name, "w+", encoding="utf-8")
output_file.truncate()
output_file.write(self.head)
output_file.close()
with open(output_name, "a", encoding="utf-8") as f:
try:
self.site_info = self.site_info.replace(
self.first_timestamp, str(self.data.iloc[0]["Timestamp"])
)
except Exception:
print(
"couldn't rename 'Effective Date:' info in {0}".format(
output_name
)
)
logger.error(
"couldn't rename 'Effective Date:' info in {0}".format(
output_name
)
)
logger.debug(traceback.format_exc())
self.site_info.to_csv(
f,
header=False,
sep="\t",
index=False,
index_label=False,
line_terminator="\n",
)
output_file.close()
with open(output_name, "U") as f:
text = f.read()
while "\t\n" in text:
text = text.replace("\t\n", "\n")
with open(output_name, "w") as f:
f.write(text)
with open(output_name, "a", encoding="utf-8") as f:
self.data.round(6).to_csv(
f,
header=True,
sep="\t",
index=False,
index_label=False,
line_terminator="\n",
)
output_file.close()
self.insert_blank_header_rows(output_name)
if standard:
if out_file != "":
output_name = out_file
else:
output_name = self.out_file[:-4] + "_standard.txt"
print(
"\nOutputting file: {0} ... ".format(output_name),
end="",
flush=True,
)
logger.info("\nOutputting file: {0} ... ".format(output_name))
try:
output_file = open(output_name, "w+", encoding="utf-8")
output_file.truncate()
output_file.write(self.head)
output_file.close()
# write header
with open(output_name, "a", encoding="utf-8") as f:
self.site_info.to_csv(
f,
header=False,
sep="\t",
index=False,
index_label=False,
line_terminator="\n",
)
output_file.close()
# write data
with open(output_name, "a", encoding="utf-8") as f:
self.data.round(6).to_csv(
f,
header=True,
sep="\t",
index=False,
index_label=False,
line_terminator="\n",
)
output_file.close()
self.insert_blank_header_rows(output_name)
print("[OK]")
except Exception:
print("[FAILED]")
print(traceback.format_exc())
logger.error(f"Outputting {output_name} failed")
logger.debug(traceback.format_exc())
[docs]def shift_timestamps(
txt_folder: str = "",
out_folder: str = "",
file_filter: str = "",
start_date: str = "1970-01-01",
end_date: str = "2150-12-31",
seconds: int = 3600,
):
"""Takes as input a folder of exported standard text files and
time to shift in seconds.
Parameters
----------
txt_folder : str
path to folder with txt files to shift
out_folder : str
where to put the shifted files (in subfolder by default)
file_filter : str
filter for restricting file set
start_date : str
date filter "YYYY-mm-dd"
end_date : str
date filter "YYYY-mm-dd"
seconds : int
time in seconds to shift timestamps (default 3600)
Returns
-------
obj
text files with shifted timestamps; new file names include shifted
timestamp.
"""
if out_folder:
out_dir = out_folder
else:
out_dir = os.path.join(txt_folder, "shifted_timestamps")
os.makedirs(out_dir, exist_ok=True)
files = [
f
for f in sorted(glob(txt_folder + "/" + "*.txt"))
if file_filter in f and string_date_check(start_date, end_date, f)
]
file_count = len(files)
counter = 1
start_time = datetime.now()
for f in files:
try:
draw_progress_bar(counter, file_count, start_time)
f = os.path.join(txt_folder, f)
fut = logr_read(filename=f)
fut.format_site_data()
fut.data["Timestamp"] = pd.to_datetime(fut.data["Timestamp"]) + timedelta(
seconds=seconds
)
fut.output_txt_file(
shift_timestamps=True, standard=False, out_dir=out_dir, out_file=f
)
except pd.errors.EmptyDataError:
pass
except Exception:
print(traceback.format_exc())
pass
counter += 1
logr_read = LogrRead