# Copyright 2013-2021 The Salish Sea MEOPAR contributors
# and The University of British Columbia
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A library of Python functions for working with netCDF files.
Included are functions for:
* exploring the attributes of netCDF files
* managing those attributes during netCDF file creation and updating
* obtaining variable values from netCDF files
* creation of special purpose netCDF files
* combining per-processor sub-domain output files fron NEMO into a single
netCDF file
"""
from __future__ import division
from collections import namedtuple, OrderedDict
from datetime import datetime, timedelta
from resource import getrlimit, RLIMIT_NOFILE
import os
import arrow
import netCDF4 as nc
import numpy as np
import warnings
from salishsea_tools import hg_commands as hg
[docs]
def get_hindcast_prefix(date, res='h', version='201905'):
"""Construct hindcast results prefix given the date, resolution and version
e.g., /results/SalishSea/nowcast-green.201905/ddmmmyy/SalishSea_1h_YYYYMMDD_YYYYMMDD
:arg date: date of hindcast record
:type date: :py:class:`datetime.datetime`
:arg res: time resolution (h=hourly, d=daily)
:type res: str
:arg version: hindcast version (e.g., 201905)
:type version: str
:returns: hindcast prefix
:rtype: str
"""
# Make NEMO hindcast path
path, datestr = f'SalishSea/nowcast-green.{version}', date.strftime('%d%b%y').lower()
for root in ['/results', '/results2']:
testpath = os.path.join(root, path, datestr)
if os.path.exists(testpath):
path = testpath
break
else:
raise ValueError(f"No hindcast {version} record found for the specified date {date.strftime('%Y-%b-%d')}")
prefix = os.path.join(path, f"SalishSea_1{res}_{date.strftime('%Y%m%d_%Y%m%d')}")
return prefix
[docs]
def get_GEM_path(date):
"""Construct GEM results path given the date
e.g., /results/forcing/atmospheric/GEM2.5/operational/ops_yYYYYmMMdDD.nc
:arg date: date of GEM record
:type date: :py:class:`datetime.datetime`
:returns: GEM path
:rtype: str
"""
# Make GEM path
path, datestr = '/results/forcing/atmospheric/GEM2.5', date.strftime('y%Ym%md%d')
for config, prefix in zip(['operational', 'gemlam'], ['ops', 'gemlam']):
testpath = os.path.join(path, config, f'{prefix}_{datestr}.nc')
if os.path.exists(testpath):
path = testpath
break
else:
raise ValueError(f"No GEM2.5 record found for the specified date {date.strftime('%Y-%b-%d')}")
return path
[docs]
def get_WW3_path(date):
"""Construct WW3 results path given the date
e.g., /opp/wwatch3/nowcast/SoG_ww3_fields_YYYYMMDD_YYYYMMDD.nc
:arg date: date of WW3 record
:type date: :py:class:`datetime.datetime`
:returns: WW3 path
:rtype: str
"""
# Make WW3 path
path = '/opp/wwatch3/nowcast'
datestr = [date.strftime(fmt) for fmt in ('%d%b%y', '%Y%m%d_%Y%m%d')]
path = os.path.join(path, datestr[0].lower(), f'SoG_ww3_fields_{datestr[1]}.nc')
if not os.path.exists(path):
raise ValueError(f"No WW3 record found for the specified date {date.strftime('%Y-%b-%d')}")
return path
[docs]
def dataset_from_path(path, *args, **kwargs):
"""Return a dataset constructed from the file given by :kbd:`path`.
This is a shim that facilitates constructing a
:py:class:`netCDF4.Dataset` instance from a file that is identified
by path/filename that is either a string or a :py:class:`pathlib.Path`
instance.
The :py:class:`netCDF4.Dataset` constructor cannot handle
:py:class:`pathlib.Path` instances.
:arg path: Path/filename to construct the dataset from.
:type path: :py:class:`pathlib.Path` or :py:class`str`
:arg list args: Positional arguments to pass through to the
:py:class:`netCDF4.Dataset` constructor.
:arg dict kwargs: Keyword arguments to pass through to the
:py:class:`netCDF4.Dataset` constructor.
:returns: Dataset from file at :kbd:`path`.
:rtype: :py:class:`netCDF4.Dataset`
:raises: :py:exc:`IOError` if there the :py:class:`netCDF4.Dataset`
constructor raises a :py:exc:`RuntimeError` that indicates
that the file or directory at path is not found.
"""
try:
return nc.Dataset(str(path), *args, **kwargs)
except RuntimeError as e:
if str(e) == 'No such file or directory':
raise IOError('No such file or directory')
else:
raise
[docs]
def show_dataset_attrs(dataset):
"""Print the global attributes of the netCDF dataset.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
"""
print('file format: {}'.format(dataset.file_format))
for attr in dataset.ncattrs():
print('{}: {}'.format(attr, dataset.getncattr(attr)))
[docs]
def show_dimensions(dataset):
"""Print the dimensions of the netCDF dataset.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
"""
for dim in dataset.dimensions.values():
print(dim)
[docs]
def show_variables(dataset):
"""Print the variable names in the netCDF dataset as a list.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
"""
print(dataset.variables.keys())
[docs]
def show_variable_attrs(dataset, *vars):
"""Print the variable attributes of the netCDF dataset.
If variable instances are given as optional arguments,
only those variable's attributes are printed.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
:arg vars: Variables to print attributes for
:type vars: :py:class:`netCDF4.Variable1`
"""
if vars:
for var in vars:
print(dataset.variables[var])
else:
for var in dataset.variables.values():
print(var)
[docs]
def time_origin(dataset, time_var='time_counter'):
"""Return the time_var.time_origin value.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset` or :py:class:`xarray.Dataset`
:arg time_var: name of time variable
:type time_var: str
:returns: Value of the time_origin attribute of the time_counter
variable.
:rtype: :py:class:`Arrow` instance
"""
try:
time_counter = dataset.variables[time_var]
except KeyError:
raise KeyError(
'dataset does not have {time_var} variable'.format(
time_var=time_var))
try:
# netCDF4 dataset
time_orig = time_counter.time_origin.title()
except AttributeError:
try:
# xarray dataset
time_orig = time_counter.attrs['time_origin'].title()
except KeyError:
raise AttributeError(
'NetCDF: '
'{time_var} variable does not have '
'time_origin attribute'.format(time_var=time_var))
value = arrow.get(
time_orig,
['YYYY-MMM-DD HH:mm:ss',
'DD-MMM-YYYY HH:mm:ss',
'DD-MMM-YYYY HH:mm',
'YYYY-MM-DD HH:mm:ss'])
return value
[docs]
def timestamp(dataset, tindex, time_var='time_counter'):
"""Return the time stamp of the tindex time_counter value(s) in dataset.
The time stamp is calculated by adding the time_counter[tindex] value
(in seconds) to the dataset's time_counter.time_origin value.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
:arg tindex: time_counter variable index.
:type tindex: int or iterable
:arg time_var: name of the time variable
:type time_var: str
:returns: Time stamp value(s) at tindex in the dataset.
:rtype: :py:class:`Arrow` instance or list of instances
"""
time_orig = time_origin(dataset, time_var=time_var)
time_counter = dataset.variables[time_var]
try:
iter(tindex)
except TypeError:
tindex = [tindex]
results = []
for i in tindex:
try:
results.append(time_orig + timedelta(seconds=time_counter[i].item()))
except IndexError:
raise IndexError(
'time_counter variable has no tindex={}'.format(tindex))
if len(results) > 1:
return results
else:
return results[0]
[docs]
def get_datetimes(dataset, time_var='time_counter'):
"""Return the datetime array for a dataset
This is a wrapper around nc_tools.timestamp that automatically
handles all timesteps and converts the arrow objects to a numpy
datetime object array.
:arg dataset: netcdf dataset object.
:type dataset: :py:class:`netCDF4.Dataset`
:arg time_var: name of time variable.
:type time_var: str
:returns: datetime values at each timestep in the dataset.
:rtype: :py:class:`Numpy` array of :py:class:`Datetime` instances
"""
# Get arrow objects
time_stamps = timestamp(dataset,
np.arange(dataset.variables['time_counter'].shape[0]),
time_var=time_var)
# Get datetime.datetime objects
datetimes = np.array([time_stamp.datetime for time_stamp in time_stamps])
return datetimes
[docs]
def xarraytime_to_datetime(xarraytime):
"""Convert an `xarray.DataArray` of `numpy.datetime64` times to a
`numpy.ndarray` of `datetime.datetime` times
:arg xarraytime: DataArray of datetime64
:type xarraytime: :py:class:`xarray.DataArray` of :py:class:`numpy.datetime64`
:returns: array of datetimes
:rtype: :py:class:`numpy.ndarray` of :py:class:`datetime.datetime`
"""
datetime_obj = xarraytime.values.astype('datetime64[s]').astype(datetime)
return datetime_obj
[docs]
def ssh_timeseries_at_point(
grid_T, j, i, datetimes=False, time_var='time_counter', ssh_var='sossheig'
):
"""Return the sea surface height and time counter values
at a single grid point from a NEMO tracer results dataset.
:arg grid_T: Tracer results dataset from NEMO.
:type grid_T: :py:class:`netCDF4.Dataset`
:arg int j: j-direction (longitude) index of grid point to get sea surface
height at.
:arg int i: i-direction (latitude) index of grid point to get sea surface
height at.
:arg boolean datetimes: Return time counter values as
:py:class:`datetime.datetime` objects if
:py:obj:`True`, otherwise return them as
:py:class:`arrow.Arrow` objects (the default).
:arg time_var: Name of time variable.
:type time_var: str
:arg ssh_var: Name of sea surface height variable.
:type time_var: str
:returns: 2-tuple of 1-dimensional :py:class:`numpy.ndarray` objects.
The :py:attr:`ssh` attribute holds the sea surface heights,
and the :py:attr:`time` attribute holds the time counter
values.
:rtype: :py:class:`collections.namedtuple`
"""
ssh = grid_T.variables[ssh_var][:, j, i]
time = timestamp(grid_T, range(len(ssh)), time_var=time_var)
if datetimes:
time = np.array([a.datetime for a in time])
ssh_ts = namedtuple('ssh_ts', 'ssh, time')
return ssh_ts(ssh, np.array(time))
[docs]
def uv_wind_timeseries_at_point(grid_weather, j, i, datetimes=False):
"""Return the u and v wind components and time counter values
at a single grid point from a weather forcing dataset.
:arg grid_weather: Weather forcing dataset, typically from an
:file:`ops_yYYYYmMMdDD.nc` file produced by the
:py:mod:`nowcast.workers.grid_to_netcdf` worker.
:type grid_weather: :py:class:`netCDF4.Dataset`
:arg int j: j-direction (longitude) index of grid point to get wind
components at.
:arg int i: i-direction (latitude) index of grid point to get wind
components at.
:arg boolean datetimes: Return time counter values as
:py:class:`datetime.datetime` objects if
:py:obj:`True`, otherwise return them as
:py:class:`arrow.Arrow` objects (the default).
:returns: 2-tuple of 1-dimensional :py:class:`numpy.ndarray` objects,
The :py:attr:`u` attribute holds the u-direction wind
component,
The :py:attr:`v` attribute holds the v-direction wind
component,
and the :py:attr:`time` attribute holds the time counter
values.
:rtype: :py:class:`collections.namedtuple`
"""
u_wind = grid_weather.variables['u_wind'][:, j, i]
v_wind = grid_weather.variables['v_wind'][:, j, i]
time = timestamp(grid_weather, range(len(u_wind)))
if datetimes:
time = np.array([a.datetime for a in time])
wind_ts = namedtuple('wind_ts', 'u, v, time')
return wind_ts(u_wind, v_wind, np.array(time))
[docs]
def init_dataset_attrs(
dataset,
title,
notebook_name,
nc_filepath,
comment='',
quiet=False,
):
"""Initialize the required global attributes of the netCDF dataset.
If an attribute with one of the required attribute names is already
set on the dataset it will not be overwritten,
instead a warning will be printed,
unless quiet==True.
With quiet==False
(the default)
the dataset attributes and their values are printed.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
:arg title: Title for the dataset
:type title: str
:arg notebook_name: Name of the IPython Notebook being used to create
or update the dataset.
If empty the source attribute value will be set
to REQUIRED and must therefore be handled in a
subsequent operation.
:type notebook_name: str
:arg nc_filepath: Relative path and filename of the netCDF file being
created or updated.
If empty the references attribute value will be set
to REQUIRED and must therefore be handled in a
subsequent operation.
:type nc_filepath: str
:arg comment: Comment(s) for the dataset
:type comment: str
:arg quiet: Suppress printed output when True; defaults to False
:type quiet: Boolean
"""
reqd_attrs = (
('Conventions', 'CF-1.6'),
('title', title),
('institution', ('Dept of Earth, Ocean & Atmospheric Sciences, '
'University of British Columbia')),
('source', _notebook_hg_url(notebook_name)),
('references', _nc_file_hg_url(nc_filepath)),
('history', (
'[{:%Y-%m-%d %H:%M:%S}] Created netCDF4 zlib=True dataset.'
.format(datetime.now()))),
('comment', comment),
)
for name, value in reqd_attrs:
if name in dataset.ncattrs():
if not quiet:
print('Existing attribute value found, not overwriting: {}'
.format(name))
else:
dataset.setncattr(name, value)
if not quiet:
show_dataset_attrs(dataset)
def _notebook_hg_url(notebook_name):
"""Calculate a Bitbucket URL for an IPython Notebook.
It is assumed that this function is being called from a file that
resides in a Mercurial repository with a default path that points
to Bitbucket.org.
The URL is based on repo's default path,
the current working directory path,
and notebook_name.
:arg notebook_name: Name of the IPython Notebook to calculate the
Bitbucket URL for
:type notebook_name: str
:returns: The Bitbucket URL for notebook_name
:rtype: str
"""
if not notebook_name:
return 'REQUIRED'
default_url = hg.default_url()
try:
bitbucket, repo_path = default_url.partition('bitbucket.org')[1:]
except AttributeError:
return 'REQUIRED'
repo = os.path.split(repo_path)[-1]
local_path = os.getcwd().partition(repo)[-1]
if not notebook_name.endswith('.ipynb'):
notebook_name += '.ipynb'
url = os.path.join(
'https://', bitbucket, repo_path[1:], 'src', 'tip',
local_path[1:], notebook_name)
return url
def _nc_file_hg_url(nc_filepath):
"""Calculate a Bitbucket URL for nc_filepath.
It is assumed that nc_filepath is a relative path to a netCDF file
that resides in a Mercurial repositorywith a default path that points
to Bitbucket.org.
:arg nc_filepath: Relative path and filename of the netCDF file
to calculate the Bitbucket URL for
:type nc_filepath: str
:returns: The Bitbucket URL for the nc_filepath netCDF file
:rtype: str
"""
rel_path = ''.join(nc_filepath.rpartition('../')[:2])
try:
repo_path = nc_filepath.split(rel_path)[1]
except ValueError:
return 'REQUIRED'
repo, filepath = repo_path.split('/', 1)
default_url = hg.default_url(os.path.join(rel_path, repo))
try:
bitbucket, repo_path = default_url.partition('bitbucket.org')[1:]
except AttributeError:
return 'REQUIRED'
url = os.path.join(
'https://', bitbucket, repo_path[1:], 'src', 'tip', filepath)
return url
[docs]
def check_dataset_attrs(dataset):
"""Check that required dataset and variable attributes are present
and that they are not empty.
:arg dataset: netcdf dataset object
:type dataset: :py:class:`netCDF4.Dataset`
"""
reqd_dataset_attrs = (
'Conventions', 'title', 'institution', 'source', 'references',
'history', 'comment')
reqd_variable_attrs = ('units', 'long_name')
for attr in reqd_dataset_attrs:
if attr not in dataset.ncattrs():
print('Missing required dataset attribute: {}'.format(attr))
continue
if attr != 'comment':
value = dataset.getncattr(attr)
if value in ('', 'REQUIRED'):
print('Missing value for dataset attribute: {}'.format(attr))
for var_name, var in dataset.variables.items():
for attr in reqd_variable_attrs:
if attr not in var.ncattrs():
print('Missing required variable attribute for {}: {}'
.format(var_name, attr))
continue
value = var.getncattr(attr)
if not value:
print('Missing value for variable attribute for {}: {}'
.format(var_name, attr))
[docs]
def generate_pressure_file(filename, p_file, t_file, alt_file, day):
""" Generates a file with CGRF pressure corrected to sea level.
:arg filename: full path name where the corrected pressure should be saved
:type filename: string
:arg p_file: full path name of the uncorrected CGRF pressure
:type p_file: string
:arg t_file: full path name of the corresponding CGRF temperature
:type t_file: string
:arg alt_file: full path name of the altitude of CGRF cells
:type alt_file: string
:arg day: the date
:type day: arrow
"""
# load data
f = nc.Dataset(p_file)
press = f.variables['atmpres']
f = nc.Dataset(t_file)
temp = f.variables['tair']
time = f.variables['time_counter']
lon = f.variables['nav_lon']
lat = f.variables['nav_lat']
f = nc.Dataset(alt_file)
alt = f.variables['alt']
# correct pressure
press_corr = np.zeros(press.shape)
for k in range(press.shape[0]):
press_corr[k, :, :] = _slp(alt, press[k, :, :], temp[k, :, :])
# Create netcdf
slp_file = nc.Dataset(filename, 'w', zlib=True)
description = 'corrected sea level pressure'
# dataset attributes
init_dataset_attrs(
slp_file,
title=(
'CGRF {} forcing dataset for {}'
.format(description, day.format('YYYY-MM-DD'))),
notebook_name='',
nc_filepath='',
comment=(
'Processed and adjusted from '
'goapp.ocean.dal.ca::canadian_GDPS_reforecasts_v1 files.'),
quiet=True,
)
# dimensions
slp_file.createDimension('time_counter', 0)
slp_file.createDimension('y', press_corr.shape[1])
slp_file.createDimension('x', press_corr.shape[2])
# time
time_counter = slp_file.createVariable(
'time_counter', 'double', ('time_counter',))
time_counter.calendar = time.calendar
time_counter.long_name = time.long_name
time_counter.title = time.title
time_counter.units = time.units
time_counter[:] = time[:]
time_counter.valid_range = time.valid_range
# lat/lon variables
nav_lat = slp_file.createVariable('nav_lat', 'float32', ('y', 'x'))
nav_lat.long_name = lat.long_name
nav_lat.units = lat.units
nav_lat.valid_max = lat.valid_max
nav_lat.valid_min = lat.valid_min
nav_lat.nav_model = lat.nav_model
nav_lat[:] = lat
nav_lon = slp_file.createVariable('nav_lon', 'float32', ('y', 'x'))
nav_lon.long_name = lon.long_name
nav_lon.units = lon.units
nav_lon.valid_max = lon.valid_max
nav_lon.valid_min = lon.valid_min
nav_lon.nav_model = lon.nav_model
nav_lon[:] = lon
# Pressure
atmpres = slp_file.createVariable(
'atmpres', 'float32', ('time_counter', 'y', 'x'))
atmpres.long_name = 'Sea Level Pressure'
atmpres.units = press.units
atmpres.valid_min = press.valid_min
atmpres.valid_max = press.valid_max
atmpres.missing_value = press.missing_value
atmpres.axis = press.axis
atmpres[:] = press_corr[:]
slp_file.close()
[docs]
def generate_pressure_file_ops(filename, p_file, t_file, alt_file, day):
""" Generates a file with CGRF pressure corrected to sea level.
:arg filename: full path name where the corrected pressure should be saved
:type filename: string
:arg p_file: full path name of the uncorrected CGRF pressure
:type p_file: string
:arg t_file: full path name of the corresponding CGRF temperature
:type t_file: string
:arg alt_file: full path name of the altitude of CGRF cells
:type alt_file: string
:arg day: the date
:type day: arrow
"""
# load data
f = nc.Dataset(p_file)
press = f.variables['atmpres']
f = nc.Dataset(t_file)
temp = f.variables['tair']
time = f.variables['time_counter']
lon = f.variables['nav_lon']
lat = f.variables['nav_lat']
f = nc.Dataset(alt_file)
alt = f.variables['HGT_surface']
lat_a = f.variables['latitude']
lon_a = f.variables['longitude']
alt, lon_a, lat_a = _truncate_height(alt, lon_a, lat_a, lon, lat)
# correct pressure
press_corr = np.zeros(press.shape)
for k in range(press.shape[0]):
press_corr[k, :, :] = _slp(alt, press[k, :, :], temp[k, :, :])
# Create netcdf
slp_file = nc.Dataset(filename, 'w', zlib=True)
description = 'corrected sea level pressure'
# dataset attributes
init_dataset_attrs(
slp_file,
title=(
'GRIB2 {} forcing dataset for {}'
.format(description, day.format('YYYY-MM-DD'))),
notebook_name='',
nc_filepath='',
comment=(
'Processed and adjusted from '
'GEM 2.5km operational model'),
quiet=True,
)
# dimensions
slp_file.createDimension('time_counter', 0)
slp_file.createDimension('y', press_corr.shape[1])
slp_file.createDimension('x', press_corr.shape[2])
# time
time_counter = slp_file.createVariable(
'time_counter', 'double', ('time_counter',))
time_counter.long_name = time.long_name
time_counter.units = time.units
time_counter[:] = time[:]
# lat/lon variables
nav_lat = slp_file.createVariable('nav_lat', 'float32', ('y', 'x'))
nav_lat.long_name = lat.long_name
nav_lat.units = lat.units
nav_lat[:] = lat
nav_lon = slp_file.createVariable('nav_lon', 'float32', ('y', 'x'))
nav_lon.long_name = lon.long_name
nav_lon.units = lon.units
nav_lon[:] = lon
# Pressure
atmpres = slp_file.createVariable(
'atmpres', 'float32', ('time_counter', 'y', 'x'))
atmpres.long_name = 'Sea Level Pressure'
atmpres.units = press.units
atmpres[:] = press_corr[:]
slp_file.close()
def _slp(Z, P, T):
R = 287 # ideal gas constant
g = 9.81 # gravity
gam = 0.0098 # lapse rate(deg/m)
p0 = 101000 # average sea surface heigh in Pa
ps = P * (gam * (Z / T) + 1)**(g / gam / R)
return ps
def _truncate_height(alt1, lon1, lat1, lon2, lat2):
""" Truncates the height file over our smaller domain.
alt1, lon1, lat1, are the height, longitude and latitude of the larger domain.
lon2, lat2 are the longitude and latitude of the smaller domain.
returns h,lons,lats, the height, longiutde and latitude over the smaller domain. """
# bottom left (i,j)
i = np.where(np.logical_and(np.abs(lon1 - lon2[0, 0]) < 10**(-5),
np.abs(lat1 - lat2[0, 0]) < 10**(-5)))
i_st = i[1]
j_st = i[0]
# top right
i = np.where(np.logical_and(np.abs(lon1 - lon2[-1, -1]) < 10**(-5),
np.abs(lat1 - lat2[-1, -1]) < 10**(-5)))
i_ed = i[1]
j_ed = i[0]
h_small = alt1[0, j_st:j_ed + 1, i_st:i_ed + 1]
lat_small = lat1[j_st:j_ed + 1, i_st:i_ed + 1]
lon_small = lon1[j_st:j_ed + 1, i_st:i_ed + 1]
return h_small, lon_small, lat_small
[docs]
def combine_subdomain(filenames, outfilename):
"""Recombine per-processor subdomain files into one single file.
Note: filenames must be an array of files organized to reflect the
subdomain decomposition. filenames[0,0] is bottom left, filenames[0,-1] is
bottom right, filenames[-1,0] is top left, filenames[-1,-1] is top right.
For example,
filenames = [[sub_00.nc, sub_01.nc], [sub_02.nc, sub_03.nc]].
This can be improved. At some point it would be worthwhile to automatically
build the filenames array.
Also, we might want to add netcdf attributes like history, units, etc.
:arg filenames: An array containing the names of the files to be
recombined.
:type filenames: numpy array (2D)
:arg outfilename: The name of the file for saving output
:type outfilename: string
"""
# Determine shape of each subdomain
shapes = _define_shapes(filenames)
# Initialize
new = nc.Dataset(outfilename, 'w')
_initialize_dimensions(new, nc.Dataset(filenames[0, 0]))
newvars = _initialize_variables(new, nc.Dataset(filenames[0, 0]))
# Build full array
_concatentate_variables(filenames, shapes, newvars)
new.close()
def _define_shapes(filenames):
"""Creates a dictionary object that stores the beginning and ending i, j
coordinate for each subdomain file stored in names.
filenames should be orgnaized in a way that corresponds to the shape of the
region you are compiling.
The first axis (rows) of names is along y, second axis (columns) is along x
filenames[0,0] is the bottom left subdomain
filenames[-1,0] is the top left subdomain
filenames[0,-1] is the bottom right subdomain
filenames[-1,-1] is the top right subdomain
Beginning/ending i = iss/iee, beginning/ending j = jss/jee
Used for recombining per-processor subdomain files
:arg filenames: Filenames in the domain decomposition, organized into an
an array grid
:type filenames: numpy array
:returns: a dictionary of dictionarys. First level keys are the filenames,
second level are iss,iee,jss,jee
"""
shapes = {}
jss = 0
for j in np.arange(filenames.shape[0]):
iss = 0
for i in np.arange(filenames.shape[1]):
name = filenames[j, i]
f = nc.Dataset(name)
x = f.dimensions['x'].__len__()
y = f.dimensions['y'].__len__()
shapes[name] = {}
shapes[name]['iss'] = iss
shapes[name]['iee'] = iss+x
shapes[name]['jss'] = jss
shapes[name]['jee'] = jss+y
iss = iss+x
jss = jss + y
return shapes
def _initialize_dimensions(newfile, oldfile):
"""Initialize new file to have the same dimension names as oldfile
Dimensions that are not associated with the horizontal grid are also given
the same size as oldfile
Used for recombining per-processor subdomain files
:arg newfile: the new netCDF file
:type newfile: netCDF4 file handle
:arg oldfile: the file from which dimensions are copied
:type oldfile: netCDF4 file handle
"""
for dimname in oldfile.dimensions:
dim = oldfile.dimensions[dimname]
if dimname == 'x' or dimname == 'y':
newdim = newfile.createDimension(dimname)
else:
newdim = newfile.createDimension(dimname, size=dim.__len__())
def _initialize_variables(newfile, oldfile):
"""Initialize new file to have the same variables as oldfile.
Used for recombining per-processor subdomain files
:arg newfile: the new netCDF file
:type newfile: netCDF4 file handle
:arg oldfile: the file from which dimensions are copied
:type oldfile: netCDF4 file handle
:returns: newvars, a dictionary object with key variable name, value netcdf
variable
"""
newvars = {}
for varname in oldfile.variables:
var = oldfile.variables[varname]
dims = var.dimensions
newvar = newfile.createVariable(varname, var.datatype, dims)
newvar[:] = var[:]
newvars[varname] = newvar
return newvars
def _concatentate_variables(filenames, shapes, variables):
"""Concatentate netcdf variables listed in dictionary variables for all of
the files stored in filenames. Concatentation on horizontal grid.
Used for recombining per-processor subdomain files
:arg filenames: array of filenames for each piece of subdomain
:type filenames: numpy array
:arg shapes: conatiner for shapes of each subdomain
:type shapes: dictionary
:arg variables: conatiner for the new variables
:type variables: dictionary with variable name key and netcdf array value
"""
for name in filenames.flatten():
for varname in variables.keys():
newvar = variables[varname]
f = nc.Dataset(name)
oldvar = f.variables[varname]
x1 = shapes[name]['iss']
x2 = shapes[name]['iee']
y1 = shapes[name]['jss']
y2 = shapes[name]['jee']
if 'x' in newvar.dimensions:
newvar[..., y1:y2, x1:x2] = oldvar[..., :, :]
[docs]
class scDataset(object):
def __init__(self, files):
"""
Simple Concatenated Dataset
scDataset is a partial implementation of MFDataset that automates the
concatenation of netCDF variables split between multiple files. The aim
is to provide a simple concatenated interface to variables where the
first dimension is the unlimited dimension (usually "time_counter").
Variables where the first dimension is not the unlimited time dimension
(such as lon, lat, etc) are not concatenated. They are, however, made
available in a "pass through" sense to the first dataset in the list.
Thus, you can read those variables without opening another Dataset.
The other MFDataset features are not implemented: attributes, etc are
ignored for the concatenated variables. It may be possible to add the
those features but the goal here is simply automated concatenation.
Building this class was motivated by deficiencies in the other options
for split-file concatenation:
- xarray.open_mfdataset() appears to load the entire dataset into memory,
which is both slow and memory intensive.
- netCDF4.MFDataset refuses to open NETCDF4 format files
In the event that netCDF4.MFDataset is improved to work with NETCDF4
files, this will become obsolete.
:arg files: list of netcdf filenames in chronological order
:type files: list
Example usage:
.. code-block:: python
# Get the (concatenated) output times
with scDataset(files) as ds:
t = ds.variables['time_counter'][:]
.. code-block:: python
# Get temperature at all times and all depths at one location
with scDataset(files) as ds:
temper = ds.variables['votemper'][:,:,100,100]
.. code-block:: python
# Load surface salinity at each time in a loop for plotting/animation
with scDataset(files) as ds:
for ti in range(ds.variables['vosaline'].shape[0]):
print("Loading time "+str(ti))
surfsal = ds.variables['vosaline'][ti,0,:,:]
# make_a_plot(surfsal)
.. code-block:: python
# Python indexing and slicing also works
with scDataset(files) as ds:
t1 = ds.variables['votemper'][29:33:-1,-10:-1,100:130]
print(t1.shape)
"""
# Initialize a dataset manager with the list of files
self._dsmgr = self.scDatasetManager(files)
# Open the first dataset and set a few class variables
d0 = self._dsmgr[0]
# self.description = d0.description
self.file_format = d0.file_format
self.filepath = files
# Find the time dimension name
for dim in d0.dimensions:
if d0.dimensions[dim].isunlimited():
timedimname = dim
break
# Open each dataset, get time dimension size and set the indices fi and li
fi = [] # file (dataset) index
li = [] # local time index
for di in range(len(files)):
curlen = self._dsmgr[di].dimensions[timedimname].size
fi += [di for x in range(curlen)]
li += [x for x in range(curlen)]
# First dimension must be unlimited, else use the first dataset
self.variables = OrderedDict()
vars0 = d0.variables
for vname in vars0:
if vars0[vname].dimensions[0] == timedimname:
# We concatenate this variable
self.variables[vname] = self.scVariable(vars0[vname], vname, self._dsmgr, fi, li)
else:
# Passthrough this variable to the first file
self.variables[vname] = vars0[vname]
def close(self):
self._dsmgr.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __del__(self):
self.close()
class scDatasetManager(object):
"""
Manages datasets by opening/closing them on demand
"""
def __init__(self, files):
self._files = files
self._MAXOPEN = getrlimit(RLIMIT_NOFILE)[0] // 5
self._dslist = [(-1, None)] * self._MAXOPEN
def __getitem__(self, di):
"""
Input is an integer index di, we return the corresponding nc.Dataset
Also we cache open datasets to economize on opening/closing them
"""
# Compute slot index (circular buffer; and index 0 is always kept open)
si = 1 + (di - 1) % (self._MAXOPEN - 1) if di > 0 else 0
# Check what is currently stored in slot si
ci, ds = self._dslist[si]
if ci != di:
if ds is not None:
# Repurpose slot si for the requested dataset
ds.close()
# Now open the requested dataset and store it in slot si
ds = nc.Dataset(self._files[di], 'r')
self._dslist[si] = (di, ds)
return ds
def close(self):
for di, ds in self._dslist:
if ds is not None:
ds.close()
self._dslist = []
class scVariable(object):
"""
Builds a concatenated version of a netCDF Variable type
- We aim to have correct indexing, and set a few class variables such as
shape and dimensions correctly. Attribute handling, etc is not implemented.
"""
def __init__(self, v0, vname, datasets, fi, li):
self.ds = datasets
self._fi = fi
self._li = li
# Set a few class variables
self.name = vname
self.dimensions = v0.dimensions
self.dtype = v0.dtype
self.ndim = v0.ndim
self.shape = (len(self._fi), ) + v0.shape[1:]
def __getitem__(self, initems):
"""
Implement Python indexing: int, slice, ellipsis accepted
"""
# Make the input iterable
if not isinstance(initems, tuple):
initems = initems,
# Convert any ellipsis to slice
items = [slice(None,None,None)]*self.ndim
for i, item in enumerate(initems):
if item is not Ellipsis:
items[i] = item
else:
for j, item in enumerate(reversed(initems)):
if item is not Ellipsis:
items[self.ndim-j-1] = item
else:
break
break
# Find the time indices
ti = items[0] # global time indices to extract, may be int or slice
fi = self._fi[ti] # index of each file (dataset) to draw from
li = self._li[ti] # local time index for each dataset
# For single time output (no concatenation), just draw from the right dataset
if type(ti) is int or type(ti) is np.int64:
if self.ndim == 1:
out = self.ds[fi].variables[self.name][li]
if self.ndim == 2:
out = self.ds[fi].variables[self.name][li, items[1]]
if self.ndim == 3:
out = self.ds[fi].variables[self.name][li, items[1], items[2]]
if self.ndim == 4:
out = self.ds[fi].variables[self.name][li, items[1], items[2], items[3]]
return out
# If we need to concatenate, then we need to determine the output
# array size. This approach is an ugly hack but it works.
sizo = [1] * self.ndim # assume one in each dimension
rdim = [] # list of dimensions to remove
for ii, item in enumerate(items):
if type(item) is int or type(item) is np.int64:
rdim += [ii]
else: # update output size at this dim if not an integer index
tmp = [None] * self.shape[ii] # build a dummy array
sizo[ii] = len(tmp[item]) # index the dummy array, record length
out = np.zeros(sizo, self.dtype) # allocate output array with matching data type
out = np.squeeze(out, axis=tuple(rdim)) # remove unwanted singleton dimensions
# Now we read each time index sequentially and fill the output array
for ii in range(len(fi)):
if self.ndim == 1:
out[ii] = self.ds[fi[ii]].variables[self.name][li[ii]]
if self.ndim == 2:
out[ii, ...] = self.ds[fi[ii]].variables[self.name][li[ii], items[1]]
if self.ndim == 3:
out[ii, ...] = self.ds[fi[ii]].variables[self.name][li[ii], items[1], items[2]]
if self.ndim == 4:
out[ii, ...] = self.ds[fi[ii]].variables[self.name][li[ii], items[1], items[2], items[3]]
return out