import pandas as pd
import numpy as np
from scipy.optimize import fsolve as _fsolve
from scipy import signal as _signal
from itertools import product as _product
### Spectrum
[docs]def elevation_spectrum(eta, sample_rate, nnft, window='hamming', detrend=True):
"""
Calculates the wave energy spectrum from wave elevation time-series
Parameters
------------
eta: pandas DataFrame
Wave surface elevation [m] indexed by time [datetime or s]
sample_rate: float
Data frequency [Hz]
nnft: integer
Number of bins in the Fast Fourier Transform
window: string (optional)
Signal window type. 'hamming' is used by default given the broadband
nature of waves. See scipy.signal.get_window for more options.
detrend: bool (optional)
Specifies if a linear trend is removed from the data before calculating
the wave energy spectrum. Data is detrended by default.
Returns
---------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
"""
# TODO: Add confidence intervals, equal energy frequency spacing, and NDBC
# frequency spacing
# TODO: may need an assert for the length of nnft- signal.welch breaks when nfft is too short
# TODO: check for uniform sampling
assert isinstance(eta, pd.DataFrame), 'eta must be of type pd.DataFrame'
assert isinstance(sample_rate, (float,int)), 'sample_rate must be of type int or float'
assert isinstance(nnft, int), 'nnft must be of type int'
assert isinstance(window, str), 'window must be of type str'
assert isinstance(detrend, bool), 'detrend must be of type bool'
assert nnft > 0, 'nnft must be > 0'
assert sample_rate > 0, 'sample_rate must be > 0'
S = pd.DataFrame()
for col in eta.columns:
data = eta[col]
if detrend:
data = _signal.detrend(data.dropna(), axis=-1, type='linear', bp=0)
[f, wave_spec_measured] = _signal.welch(data, fs=sample_rate, window=window,
nperseg=nnft, nfft=nnft)
S[col] = wave_spec_measured
S.index=f
S.columns = eta.columns
return S
[docs]def pierson_moskowitz_spectrum(f, Tp):
"""
Calculates Pierson-Moskowitz Spectrum from Tucker and Pitt (2001)
Parameters
------------
f: numpy array
Frequency [Hz]
Tp: float/int
Peak period [s]
Returns
---------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed frequency [Hz]
"""
try:
f = np.array(f)
except:
pass
assert isinstance(f, np.ndarray), 'f must be of type np.ndarray'
assert isinstance(Tp, (int,float)), 'Tp must be of type int or float'
f.sort()
g = 9.81
B_PM = (5/4)*(1/Tp)**(4)
A_PM = 0.0081*g**2*(2*np.pi)**(-4)
Sf = A_PM*f**(-5)*np.exp(-B_PM*f**(-4))
col_name = 'Pierson-Moskowitz ('+str(Tp)+'s)'
S = pd.DataFrame(Sf, index=f, columns=[col_name])
return S
[docs]def bretschneider_spectrum(f, Tp, Hs):
"""
Calculates Bretschneider Sprectrum from Tucker and Pitt (2001)
Parameters
------------
f: numpy array
Frequency [Hz]
Tp: float/int
Peak period [s]
Hs: float/int
Significant wave height [m]
Returns
---------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
"""
try:
f = np.array(f)
except:
pass
assert isinstance(f, np.ndarray), 'f must be of type np.ndarray'
assert isinstance(Tp, (int,float)), 'Tp must be of type int or float'
assert isinstance(Hs, (int,float)), 'Hs must be of type int or float'
f.sort()
B_BS = (1.057/Tp)**4
A_BS = B_BS*(Hs/2)**2
Sf = A_BS*f**(-5)*np.exp(-B_BS*f**(-4))
col_name = 'Bretschneider ('+str(Hs)+'m,'+str(Tp)+'s)'
S = pd.DataFrame(Sf, index=f, columns=[col_name])
return S
[docs]def jonswap_spectrum(f, Tp, Hs, gamma=3.3):
"""
Calculates JONSWAP spectrum from Hasselmann et al (1973)
Parameters
------------
f: numpy array
Frequency [Hz]
Tp: float/int
Peak period [s]
Hs: float/int
Significant wave height [m]
gamma: float (optional)
Gamma
Returns
---------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed frequency [Hz]
"""
try:
f = np.array(f)
except:
pass
assert isinstance(f, np.ndarray), 'f must be of type np.ndarray'
assert isinstance(Tp, (int,float)), 'Tp must be of type int or float'
assert isinstance(Hs, (int,float)), 'Hs must be of type int or float'
assert isinstance(gamma, (int,float)), 'gamma must be of type int or float'
f.sort()
g = 9.81
# Cutoff frequencies for gamma function
siga = 0.07
sigb = 0.09
fp = 1/Tp # peak frequency
lind = np.where(f<=fp)
hind = np.where(f>fp)
Gf = np.zeros(f.shape)
Gf[lind] = gamma**np.exp(-(f[lind]-fp)**2/(2*siga**2*fp**2))
Gf[hind] = gamma**np.exp(-(f[hind]-fp)**2/(2*sigb**2*fp**2))
S_temp = g**2*(2*np.pi)**(-4)*f**(-5)*np.exp(-(5/4)*(f/fp)**(-4))
alpha_JS = Hs**(2)/16/np.trapz(S_temp*Gf,f)
Sf = alpha_JS*S_temp*Gf # Wave Spectrum [m^2-s]
col_name = 'JONSWAP ('+str(Hs)+'m,'+str(Tp)+'s)'
S = pd.DataFrame(Sf, index=f, columns=[col_name])
return S
### Metrics
[docs]def surface_elevation(S, time_index, seed=123):
"""
Calculates wave elevation time-series from spectrum using a random phase
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
time_index: numpy array
Time used to create the wave elevation time-series [s],
for example, time = np.arange(0,100,0.01)
seed: int (optional)
Random seed
Returns
---------
eta: pandas DataFrame
Wave surface elevation [m] indexed by time [s]
"""
try:
time_index = np.array(time_index)
except:
pass
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
assert isinstance(time_index, np.ndarray), 'time_index must be of type np.ndarray'
assert isinstance(seed, int), 'seed must be of type int'
np.random.seed(seed)
start_time = time_index[0]
end_time = time_index[-1]
f = pd.Series(S.index) # frequency
f.index = f
delta_f = f.diff()
phase = pd.Series(2*np.pi*np.random.rand(f.size))
phase.index = f
phase = phase[start_time:end_time] # Should phase, omega, and A*delta_f be
# truncated before computation?
omega = pd.Series(2*np.pi*f) # angular freqency
omega.index = f
omega = omega[start_time:end_time]
# Wave amplitude times delta f, truncated
A = 2*S
A = A.multiply(delta_f, axis=0)
A = A.loc[start_time:end_time,:]
# Product of omega and time
B = np.array([x*y for x,y in _product(time_index, omega)])
B = B.reshape((len(time_index),len(omega)))
B = pd.DataFrame(B, index=time_index, columns=omega.index)
C = np.real(np.exp(1j*(B+phase)))
C = pd.DataFrame(C, index=time_index, columns=omega.index)
eta = pd.DataFrame(columns=S.columns, index=time_index)
for col in A.columns:
eta[col] = (C*A[col]).sum(axis=1)
return eta
[docs]def frequency_moment(S, N):
"""
Calculates the Nth frequency moment of the spectrum
Parameters
-----------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
N: int
Moment (0 for 0th, 1 for 1st ....)
Returns
-------
m: pandas DataFrame
Nth Frequency Moment indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
assert isinstance(N, int), 'N must be of type int'
# Eq 8 in IEC 62600-101
spec = S[S.index > 0] # omit frequency of 0
f = spec.index
fn = np.power(f, N)
delta_f = pd.Series(f).diff()
delta_f[0] = f[1]-f[0]
delta_f.index = f
m = spec.multiply(fn,axis=0).multiply(delta_f,axis=0)
m = m.sum(axis=0)
m = pd.DataFrame(m, index=S.columns, columns = ['m'+str(N)])
return m
[docs]def significant_wave_height(S):
"""
Calculates wave height from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Hm0: pandas DataFrame
Significant wave height [m] index by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
# Eq 12 in IEC 62600-101
Hm0 = 4*np.sqrt(frequency_moment(S,0))
Hm0.columns = ['Hm0']
return Hm0
[docs]def average_zero_crossing_period(S):
"""
Calculates wave average zero crossing period from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Tz: pandas DataFrame
Average zero crossing period [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
# Eq 15 in IEC 62600-101
m0 = frequency_moment(S,0).squeeze() # convert to Series for calculation
m2 = frequency_moment(S,2).squeeze()
Tz = np.sqrt(m0/m2)
Tz = pd.DataFrame(Tz, index=S.columns, columns = ['Tz'])
return Tz
[docs]def average_crest_period(S):
"""
Calculates wave average crest period from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Tavg: pandas DataFrame
Average wave period [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
m2 = frequency_moment(S,2).squeeze() # convert to Series for calculation
m4 = frequency_moment(S,4).squeeze()
Tavg = np.sqrt(m2/m4)
Tavg = pd.DataFrame(Tavg, index=S.columns, columns=['Tavg'])
return Tavg
[docs]def average_wave_period(S):
"""
Calculates mean wave period from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Tm: pandas DataFrame
Mean wave period [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
m0 = frequency_moment(S,0).squeeze() # convert to Series for calculation
m1 = frequency_moment(S,1).squeeze()
Tm = np.sqrt(m0/m1)
Tm = pd.DataFrame(Tm, index=S.columns, columns=['Tm'])
return Tm
[docs]def peak_period(S):
"""
Calculates wave energy period from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Tp: pandas DataFrame
Wave peak period [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
# Eq 14 in IEC 62600-101
fp = S.idxmax(axis=0) # Hz
Tp = 1/fp
Tp = pd.DataFrame(Tp, index=S.columns, columns=["Tp"])
return Tp
[docs]def energy_period(S):
"""
Calculates wave energy period from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
Te: pandas DataFrame
Wave energy period [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
mn1 = frequency_moment(S,-1).squeeze() # convert to Series for calculation
m0 = frequency_moment(S,0).squeeze()
# Eq 13 in IEC 62600-101
Te = mn1/m0
Te = pd.DataFrame(Te, index=S.columns, columns=['Te'])
return Te
[docs]def spectral_bandwidth(S):
"""
Calculates bandwidth from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
e: pandas DataFrame
Spectral bandwidth [s] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
m2 = frequency_moment(S,2).squeeze() # convert to Series for calculation
m0 = frequency_moment(S,0).squeeze()
m4 = frequency_moment(S,4).squeeze()
e = np.sqrt(1- (m2**2)/(m0/m4))
e = pd.DataFrame(e, index=S.columns, columns=['e'])
return e
[docs]def spectral_width(S):
"""
Calculates wave spectral width from spectra
Parameters
------------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
Returns
---------
v: pandas DataFrame
Spectral width [m] indexed by S.columns
"""
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
mn2 = frequency_moment(S,-2).squeeze() # convert to Series for calculation
m0 = frequency_moment(S,0).squeeze()
mn1 = frequency_moment(S,-1).squeeze()
# Eq 16 in IEC 62600-101
v = np.sqrt((m0*mn2/np.power(mn1,2))-1)
v = pd.DataFrame(v, index=S.columns, columns=['v'])
return v
[docs]def energy_flux(S, h, rho=1025, g=9.80665):
"""
Calculates the omnidirectional wave energy flux of the spectra
Parameters
-----------
S: pandas DataFrame
Spectral density [m^2/Hz] indexed by frequency [Hz]
h: float
Water depth [m]
rho: float (optional)
Water Density [kg/m3]
g : float (optional)
Gravitational acceleration [m/s^2]
Returns
-------
J: pandas DataFrame
Omni-directional wave energy flux [W/m] indexed by S.columns
"""
# TODO: Add deep water flag
assert isinstance(S, pd.DataFrame), 'S must be of type pd.DataFrame'
assert isinstance(h, (int,float)), 'h must be of type int or float'
assert isinstance(rho, (int,float)), 'rho must be of type int or float'
assert isinstance(g, (int,float)), 'g must be of type int or float'
f = S.index
k = wave_number(f,h,rho,g)
# wave celerity (group velocity)
Cg = wave_celerity(k,h,g).squeeze()
# Calculating the wave energy flux, Eq 9 in IEC 62600-101
delta_f = pd.Series(f).diff()
delta_f.index = f
delta_f[f[0]] = delta_f[f[1]] # fill the initial NaN
CgSdelF = S.multiply(delta_f, axis=0).multiply(Cg, axis=0)
J = rho*g*CgSdelF.sum(axis=0)
J = pd.DataFrame(J, index=S.columns, columns=["J"])
return J
[docs]def wave_celerity(k, h, g=9.80665):
"""
Calculates wave celerity (group velocity)
Parameters
-----------
k: pandas DataFrame
Wave number [1/m] indexed by frequency [Hz]
h: float
Water depth [m]
g : float (optional)
Gravitational acceleration [m/s^2]
Returns
-------
Cg: pandas DataFrame
Water celerity [?] indexed by frequency [Hz]
"""
assert isinstance(k, pd.DataFrame), 'S must be of type pd.DataFrame'
assert isinstance(h, (int,float)), 'h must be of type int or float'
assert isinstance(g, (int,float)), 'g must be of type int or float'
f = k.index
k = k.squeeze() # convert to Series for calculation (returns a copy)
# Eq 10 in IEC 62600-101
Cg = (np.pi*f/k)*(1+(2*h*k)/np.sinh(2*h*k))
Cg = pd.DataFrame(Cg, index=f, columns=["Cg"])
return Cg
[docs]def wave_number(f, h, rho=1025, g=9.80665):
"""
Calculates wave number
To compute wave number from angular frequency (w), convert w to f before
using this function (f = w/2*pi)
Parameters
-----------
f: numpy array
Frequency [Hz]
h: float
Water depth [m]
rho: float (optional)
Water density [kg/m^3]
g: float (optional)
Gravitational acceleration [m/s^2]
Returns
-------
k: pandas DataFrame
Wave number [1/m] indexed by frequency [Hz]
"""
try:
f = np.array(f)
except:
pass
assert isinstance(f, np.ndarray), 'f must be of type np.ndarray'
assert isinstance(h, (int,float)), 'h must be of type int or float'
assert isinstance(rho, (int,float)), 'rho must be of type int or float'
assert isinstance(g, (int,float)), 'g must be of type int or float'
w = 2*np.pi*f # angular frequency
xi = w/np.sqrt(g/h) # note: =h*wa/sqrt(h*g/h)
yi = xi*xi/np.power(1.0-np.exp(-np.power(xi,2.4908)),0.4015)
k0 = yi/h # Initial guess without current-wave interaction
# Eq 11 in IEC 62600-101 using initial guess from Guo (2002)
def func(kk):
val = np.power(w,2) - g*kk*np.tanh(kk*h)
return val
mask = np.abs(func(k0)) > 1e-9
if mask.sum() > 0:
k0_mask = k0[mask]
w = w[mask]
k, info, ier, mesg = _fsolve(func, k0_mask, full_output=True)
assert ier == 1, 'Wave number not found. ' + mesg
k0[mask] = k
k = pd.DataFrame(k0, index=f, columns=['k'])
return k