气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 7 o0 u1 q( d, H
: N: D& R. J# O! o& S! ^ 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
3 r8 p3 l* \- K3 I+ K$ `" C9 a, N- J5 B
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
; l: _7 M6 F: N- M9 `& w
* T$ `# ?( l( Y" M* H/ S% n4 D 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
' o9 H% ^* E7 q: ~3 T( ~ 其中几个关键点解释一下: ; v4 K7 ?& o3 x) @, Q' t
(1)首先定义拟合正太分布的函数
4 i7 O6 L3 w" `% T; w# d4 Z5 U def norm_fit(data):$ K: d" l" C7 v0 t. M7 j
loc, scale = st.norm.fit(data)& E9 }* T: K9 F
return np.array([loc, scale])
( x7 o/ @$ Y3 j5 W0 f 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 ' m+ i" t' C& d9 t
(2)xarray分块
" ?: R2 v Q+ Z4 |3 } x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
( s2 v+ Y4 v! d1 `. t6 Y& _ x = x.chunk({"lev":5, "lat":100,"lon":100}) 9 q! J9 O4 y3 l- L3 A
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
1 d3 w0 L: E9 w (3)xarray.apply_ufunc函数 & B+ {/ p3 V- I0 o6 z
result = xr.apply_ufunc(norm_fit,
! w# J- z6 t( R$ J- R' J6 Y( ^" m x,
- b/ B A8 p2 p2 V% x3 G& x input_core_dims=[["time"]],
! w& R2 m5 O& {9 o7 @) g# h4 | output_core_dims=[["paras"]],
8 H. k! ~. U% h, r; x+ C dask="parallelized",
- |3 b& k, I7 O/ E7 o output_dtypes=[np.float],
* U- Z. X& D, }1 @- Q# j dask_gufunc_kwargs={output_sizes:{"paras":2}}
6 a! L$ B4 l2 p4 |- Q) b O1 D )
1 e4 H: Z4 J o+ p0 I apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) ; v! F p6 F. J4 W
& W% w, e" }! q- O0 O. s 以下是全部代码: + f! [: q' Z- P% a
from scipy import stats as st1 B2 E9 j& C, E0 r# v- S
5 t X6 d: F$ T
import xarray as xr
- k) ~! J7 @4 d3 B) M* t0 L import dask
/ h) _( P% T1 O( ^% ]) N7 | import numpy as np8 L7 n# T. f6 l7 Y" K7 g
from dask.diagnostics import ProgressBar
9 [( ]" o1 m' q2 X' v, V1 Y/ O+ S2 H5 r' T
def norm_fit(data):) {# A: w% ^6 v/ c
loc, scale = st.norm.fit(data)8 z! l6 V# p. @$ q& S3 c1 b
return np.array([loc, scale])& }* F5 p' h3 K
6 ^" a" M$ q$ N: Z) V6 O* G! e
rs = np.random.RandomState(0)
" x! m- c& a7 w7 R2 t4 ^ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])7 V& h" {' |$ _* D
x = x.chunk({"lev":5, "lat":100,"lon":100})- [! T: G! L+ A" f: e- N" \
! A D( w" b( V% s #使用apply_ufunc计算,并用dask的并行计算
8 b$ @/ [# v# z7 ] result = xr.apply_ufunc(norm_fit,
, } x+ c' w2 X, g2 o. ?- y x,3 v4 ^- d& `) V: ^' Y0 k c5 l
input_core_dims=[["time"]],. y! C9 S# S t: F# j
output_core_dims=[["paras"]],5 B0 q' n0 p8 i- Y6 J& u6 t& X1 u
dask="parallelized",
$ {9 _1 _4 Y4 L: }+ d output_dtypes=[np.float],
; ?* T, l& R- M6 ^ dask_gufunc_kwargs={output_sizes:{"paras":2}}
# a- o. U. P& ^" b0 i )
8 T+ q, A7 x0 s* a+ `0 z T' S$ \' R9 e
#compute进行真实的计算并显示进度
" v5 U( m5 A! J" E. r with ProgressBar():
- f- Y9 t7 q% h result = results.compute(), I0 t* T. W0 m* @
' a, d; V$ _' t- g6 ^; T8 A' y
#结果冲命名保存到nc文件4 I. m0 I7 V" B8 X$ \( m" X$ A
result = result.rename("norm_paras")1 o1 b* E9 w5 j. B
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
8 l* l8 t4 s. O2 D5 W with ProgressBar():
4 v4 w2 g0 Y9 Z8 h3 ^1 H$ [ result.compute() 5 I ~. q: t1 E- Q& c' Z% s) i6 h
转自:气海同途
/ S# y- U- S8 F+ f, o 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |