记录一下 EBAS-Thredds数据的处理流程, 以全站点的ozone为例
基于前一个EBAS观测数据下载得到的数据, 然后就是处理它, 目标是得到一组(datetime, site)的二维数据, 以netcdf格式存储, 同时包含site的定位信息, 便于后续处理
step 1 : 1-renderSI.sh :
#!/bin/bash
# !! 1-renderSI.sh
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> functions
calDaysDiff()
{
tsp1=`date -d "${1:0:4}-${1:4:2}-${1:6:2} ${1:8:2}:00:00" +%s`
tsp2=`date -d "${2:0:4}-${2:4:2}-${2:6:2} ${2:8:2}:00:00" +%s`
(( diff = (tsp1 - tsp2) / 86400 ))
echo ${diff}
}
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> set up
dataLoc=/mnt/d/recRoot/DataVault/obs-EBAS/ozone # data location
outFile=si.csv
daysTS=100 # days threshold, if beyond this, script will echo warning (or exit if uncomment the exit sentence)
ncfiles=($dataLoc/*nc)
declare -A SIs
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> core logic, loop files
for ncf in "${ncfiles[@]}"
do
ncf_bname=`basename $ncf`
echo "ncf_bname = $ncf_bname"
code=`echo $ncf_bname | cut -d. -f1`
lat=$(ncdump $ncf -h | grep -Po 'on latitude.*?\d*?\.\d+\\' | grep -Po '(?<=")-*\d*\.\d*')
lon=$(ncdump $ncf -h | grep -Po 'on longitude.*?\d*?\.\d+\\' | grep -Po '(?<=")-*\d*\.\d*')
alt=$(ncdump $ncf -h | grep -Po 'on altitude.*?(\d*?\.\d+) *[a-zA-Z]+' | grep -Po '(?<=")-?\d.*')
# echo $lat
alt=${alt// /}
export INFILE=$ncf
nclRst=`ncl -Qn ../lib/get_time_range.ncl`
startTime=`echo $nclRst | cut -d, -f1`
endTime=`echo $nclRst | cut -d, -f2`
if [[ -z ${SIs[$code]+x} ]]; then
# first encounter this code
vres="$code,$lat,$lon,$alt,$startTime,$endTime"
SIs[$code]=$vres
else
# handle existed code, assume lat, lon, alt don't change!!
vres_arr=(${SIs[$code]//,/ })
startTime_before=${vres_arr[4]}
endTime_before=${vres_arr[5]}
# echo "ETB = $endTime_before"
if [[ $startTime -lt $startTime_before ]]; then
daysDiff=`calDaysDiff $startTime_before $endTime`
if [[ $daysDiff -gt $daysTS ]]; then
echo "Duration split! in $ncf, as $endTime, $startTime_before"
echo "daysDiff = $daysDiff"
echo "startTime_before = $startTime_before"
echo "endTime = $endTime"
# exit 1
fi
vres_arr[4]=$startTime
fi
if [[ $endTime -gt $endTime_before ]]; then
daysDiff=`calDaysDiff $startTime $endTime_before`
if [[ $daysDiff -gt $daysTS ]]; then
echo "Duration split! in $ncf"
echo "daysDiff = $daysDiff"
echo "startTime = $startTime"
echo "endTime_before = $endTime_before"
# exit 1
fi
vres_arr[5]=$endTime
fi
vres=$(IFS=, ; echo "${vres_arr[*]}")
SIs[$code]="$vres"
fi
echo $vres
echo
# break
done
# >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output, save as csv
echo 'code,lat,lon,alt,startTime,endTime' > $outFile
for vres in ${SIs[@]}
do
echo $vres >> $outFile
done
得到如下的csv文件, 主要信息是经纬度和海拔定位, 起止时间仅供参考
code,lat,lon,alt,startTime,endTime
AT0050R,47.066944444,15.493611111,481.0m,20130101000000,20211231230000
FI0096G,67.973333333,24.116111111,565.0m,19950101000000,20211231210000
... ...
所需的ncl功能文件 get_time_range.ncl
; this script is used to get start and end time of EBAS netcdf file
; by zjx @2022-10-13
; begin
infile = getenv("INFILE")
f = addfile(infile, "r")
time = cd_calendar(f->time, -3)
res = "" + time(0) + "0000," + time(dimsizes(time) - 1) + "0000" ; assume the time is continuous and neat!!
print("" + res)
; end
Step 2 : 2-extractNC.py
所需的函数库 : libEBAS.py
# coding=utf-8
#**********************************************************************
# this function is used to log with time
#**********************************************************************
def logT(s, echoL = 0, currL = 0, mpi = False): # from py_rdee
import time
if currL < echoL:
return
if mpi:
from mpi4py import MPI
print("{} - {}, rank = {}".format(time.strftime("%Y/%m/%d %H:%M:%S"), s, MPI.COMM_WORLD.Get_rank()))
else:
print("{} - {}".format(time.strftime("%Y/%m/%d %H:%M:%S"), s))
def render_ymdh_series(ymdh1, ymdh2):
import datetime
# from dateutil.relativedelta import relativedelta # datetime.timedelta doesn't support "months=1"
ymdh_format = "%Y%m%d%H"
ymdh1_py = datetime.datetime.strptime(ymdh1, ymdh_format)
ymdh2_py = datetime.datetime.strptime(ymdh2, ymdh_format)
delta = datetime.timedelta(hours=1)
res = []
while ymdh1_py <= ymdh2_py:
res.append(ymdh1_py.strftime(ymdh_format))
ymdh1_py += delta
return res
#**********************************************************************
# this function is a "map" of function np.argwhere
#**********************************************************************
def ind_eq_map(arrP, arrC, **kwargs):
import numpy as np
import sys
allowMissing = False
allowRepeat = False
autoSort = True
if 'allowMissing' in kwargs:
allowMissing = kwargs['allowMissing']
if 'allowRepeat' in kwargs:
allowRepeat = kwargs['allowRepeat']
if 'autoSort' in kwargs:
autoSort = kwargs['autoSort']
arrP_np = np.array(arrP)
res = []
for e in arrC:
poss = np.argwhere(arrP_np == e).reshape(-1)
if poss.size == 0:
if not allowMissing:
print(f"Error in function<ind_eq_map> cannot find value {e}!")
sys.exit(1)
elif poss.size > 1:
if allowRepeat:
res.extend(poss) # it's ok for a list extending an nd-array
else:
print(f"Error in function<ind_eq_map> : value {e} repeats! set allowRepeat if it is ok!")
sys.exit(1)
else:
res.extend(poss)
if autoSort:
res.sort()
return res
#**********************************************************************
# this function is used to get indexes for intersection from 2 arrays
#**********************************************************************
def getIntersectInd(arr1, arr2):
import numpy as np
arrI = np.intersect1d(arr1, arr2)
return ind_eq_map(arr1, arrI, allowMissing = True), ind_eq_map(arr2, arrI, allowMissing = True)
核心脚本 : 2-extractNC.py
# coding: utf-8
# !! 2-extractNC.py
import netCDF4 as nc4
import pandas as pd
import numpy as np
import calendar
import glob
import sys
sys.path.append('../lib')
import libEBAS
dataDir = r'D:\recRoot\DataVault\obs-EBAS\ozone'
vns_all = ['ozone_ug_per_m3_amean', 'ozone_ug_per_m3', 'ozone_amean', 'ozone']
# startYear = os.getenv("YEAR", 2015)
df_si = pd.read_csv('si.csv')
siteCodes = df_si.code.values
lats = df_si.lat.values
lons = df_si.lon.values
np_func = np.vectorize(lambda x : int(x.strftime("%Y%m%d%H")))
# siteCodes = np.array(['FR0025R'])
# siteCodes = np.array(['CZ0003R'])
# siteCodes = siteCodes[:10]
def handle1y(year):
ndays = 366 if calendar.isleap(year) else 365
nhours = ndays * 24
ymdhs = libEBAS.render_ymdh_series(f'{year}010100', f'{year}123123')
ymdhs = np.array(ymdhs, dtype = np.int32)
O3 = np.full((nhours, siteCodes.size), np.nan)
for i, sc in enumerate(siteCodes):
libEBAS.logT(f"process {sc}")
ncfiles = glob.glob(dataDir + f"\{sc}*.nc")
for ncf in ncfiles:
# libEBAS.logT(f" resolving {ncf}")
ncd = nc4.Dataset(ncf, "r")
ncv_times = ncd.variables['time']
dateF = nc4.num2date(ncv_times[:].data, units = ncv_times.getncattr('units'), calendar = ncv_times.getncattr('calendar'))
dateI = np_func(dateF)
index1, index2 = libEBAS.getIntersectInd(dateI, ymdhs)
if index1 == []:
continue
for vn in vns_all:
if vn in ncd.variables:
break
ncv_o3 = ncd.variables[vn]
if len(ncv_o3.shape) == 2:
O3[index2, i] = ncv_o3[0, index1].data
else:
O3[index2, i] = ncv_o3[index1].data
ncd_out = nc4.Dataset(f"EBAS.ozone.{year}.nc", "w")
ncd_out.createDimension("ymdh", nhours)
ncd_out.createDimension("site", siteCodes.size)
ncv_ymdh = ncd_out.createVariable("ymdh", int, ("ymdh"))
ncv_site = ncd_out.createVariable("site", str, ("site"))
ncv_lat = ncd_out.createVariable("lat", np.float32, ("site"))
ncv_lon = ncd_out.createVariable("lon", np.float32, ("site"))
ncv_ymdh[:] = ymdhs
ncv_site[:] = siteCodes
ncv_lat[:] = lats
ncv_lon[:] = lons
ncv_o3 = ncd_out.createVariable("ozone", np.float32, ("ymdh", "site"))
ncv_o3[:] = O3
ncv_o3.setncattr("units", "ug/m3")
ncd_out.close()
if __name__ == '__main__':
libEBAS.logT("script start")
for y in (2015, 2016):
libEBAS.logT(f"gonna handle {y}")
handle1y(y)
得到如下的netcdf文件, 每年一个:
netcdf EBAS.ozone.2015 {
dimensions:
ymdh = 8760 ;
site = 286 ;
variables:
int ymdh(ymdh) ;
string site(site) ;
float lat(site) ;
float lon(site) ;
float ozone(ymdh, site) ;
ozone:units = "ug/m3" ;
}
网友评论