气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 6 W' ]! h2 o+ A8 a" U
0 d% V$ B. ]; n" p5 j. G
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 4 x' B- u4 d \! F0 |( U) F, B
+ z8 ^( F8 b j* W0 b3 A. T
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
3 I5 V' k5 G- }! z, Y1 B2 C
2 G, O; }% w: V; ]. }6 K) F 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 7 e7 W4 u6 I2 R
其中几个关键点解释一下: ) W2 F; b) ]- I
(1)首先定义拟合正太分布的函数
, g1 {0 d, q8 M T, I8 Q( c* ~ def norm_fit(data):) D" t; J0 e+ Y; S" d% ]
loc, scale = st.norm.fit(data)
! ~! N4 n: {5 O4 e2 @ return np.array([loc, scale]) 3 P4 a# T$ F7 t2 _- d
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
, ]/ [! H# Z. l4 R% `7 [5 v (2)xarray分块 . W4 ?8 A4 n! K9 D6 p
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])' i' Y& j& r3 M
x = x.chunk({"lev":5, "lat":100,"lon":100}) ; d3 ?0 k. \0 i% K
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
- |+ T1 R' F1 o* k8 m (3)xarray.apply_ufunc函数
9 y7 p) W0 u+ x2 e8 |$ Y result = xr.apply_ufunc(norm_fit,1 R6 J2 c" _4 F
x,4 h( F5 h9 s% Y! r8 R, \
input_core_dims=[["time"]], h( F5 C: p# U+ b8 G& H
output_core_dims=[["paras"]],
3 c% J; }" O2 b2 | dask="parallelized",
* J' m! M3 D3 w( u/ s& m2 k output_dtypes=[np.float],
# F) T" E( O7 V4 f1 y6 S dask_gufunc_kwargs={output_sizes:{"paras":2}}4 R( R5 {8 Y3 n
)
+ P6 ~7 K. ^4 j1 E( Z( Q: B. X apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
0 {! W5 N8 ?- r& l
. F' r3 I1 q% b7 { 以下是全部代码: x0 M# I: ]4 M. q! t% ]) U
from scipy import stats as st
- i4 U3 l- Z/ T5 S! u3 }. v. e; ?9 h- k
import xarray as xr* ]6 e+ q1 k, U. P0 b# Y: _
import dask
% }2 V( ~0 o# f import numpy as np
* _) O8 F' d0 b& S, ^% t from dask.diagnostics import ProgressBar
( j$ j F, ]% l# O3 }: ~
o0 q0 v" f: x1 t1 x k. |+ Z" \ def norm_fit(data):
0 }8 T8 |, M! b6 D, G% j3 \ loc, scale = st.norm.fit(data)% ^/ ]$ E! k- E
return np.array([loc, scale])0 d5 G" M7 }6 [
& D, ^7 S% m8 _
rs = np.random.RandomState(0)' E* z4 S# x7 k5 H" X
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
. r* w1 M( A# j) d x = x.chunk({"lev":5, "lat":100,"lon":100}); ?! ?/ |; U3 r. n2 t) I; K
! N, ?4 `4 e$ H
#使用apply_ufunc计算,并用dask的并行计算+ ? D% @' k) ?
result = xr.apply_ufunc(norm_fit,3 I" H- Q' F* \1 T- z9 f3 T! t
x,
: B" X( ] R4 Y8 o input_core_dims=[["time"]],
% D" p) R+ R9 |6 M& N1 p* p" x0 r output_core_dims=[["paras"]],
" g1 ^! n: t9 V dask="parallelized",/ y1 x& b. |) ?9 u
output_dtypes=[np.float],4 N$ }3 \* K7 v" R/ V) P
dask_gufunc_kwargs={output_sizes:{"paras":2}}
( U9 M4 H0 J* b )
) o+ W/ Y$ a8 b9 w# t/ \ x# t" p" L4 G
#compute进行真实的计算并显示进度7 K, o9 s+ `) M7 S6 O
with ProgressBar():
$ ~. L+ u: M0 {$ B4 R, H" ?/ P) h result = results.compute()
! f# k; ]5 A2 ^8 W0 ?
4 G7 v( @6 h2 m( M #结果冲命名保存到nc文件
: y+ s0 Y( t% c2 I0 U* V# A result = result.rename("norm_paras")( O. g! k& C) _5 M/ ~
result = result.to_netcdf("norm_fit_paras.nc",compute=False)% d! Y- t; P7 L4 w
with ProgressBar():) {3 X# Y' E. y0 n$ H/ `
result.compute() ; ^2 {& U6 F0 a9 I
转自:气海同途
- y4 J, |& b/ @, E; W+ \3 a$ P 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |