气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
7 [6 w. r" X) u+ ]1 ^5 l5 z 2 k$ m" {/ ^+ G4 E7 A b. q* V6 L
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
3 J* T- U4 o6 {. w+ u- d' }) C. h: [ U- C2 z3 n0 ]
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
3 D) O3 q/ @1 [# j7 a( F; w3 C' i6 @. I- w8 i
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 ; `5 q: V+ S+ Q- G* f8 G8 f! G
其中几个关键点解释一下: * j7 |- r4 `% H% e3 s
(1)首先定义拟合正太分布的函数
5 k( I' b# O' z( x# G def norm_fit(data):8 R7 b s- G0 [, o" L# d
loc, scale = st.norm.fit(data)
) m( M* Y3 U' O8 p, @: | return np.array([loc, scale])
2 s, E+ F5 p3 z 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 5 }' \: F2 X- S( _( g
(2)xarray分块
$ D0 Z1 k( P: P6 K2 \5 u x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
( Q; y+ x$ V& {* \ x = x.chunk({"lev":5, "lat":100,"lon":100})
8 \2 t+ A! T# C xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 8 j/ M" B( B1 l/ ^& S
(3)xarray.apply_ufunc函数
$ {1 g' ?9 ?5 @ result = xr.apply_ufunc(norm_fit,' c9 ]* f; T0 S8 z7 R: h& C% ^
x, \% P9 I3 B6 u/ ~# [
input_core_dims=[["time"]],
# o: w" s3 ` _( Y output_core_dims=[["paras"]],- c. r. a+ ~% }4 w
dask="parallelized",
8 j5 O9 {# N$ i4 d& K" n6 g output_dtypes=[np.float],
: a' n/ W) f, s% o dask_gufunc_kwargs={output_sizes:{"paras":2}}
' g; J+ v/ T5 Q" _, x ) ' e* H/ x. @& l
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 5 |1 ]! M0 u/ S& g
3 _, d- m, D4 \) {; d: J 以下是全部代码: 7 o- u) q0 A$ P' x! M: Q5 Z F, e
from scipy import stats as st% n/ X/ p/ h1 N6 j; i2 s ]) u
8 P0 V" |4 Y2 R o! F3 t% c* t
import xarray as xr# U2 t/ a5 n+ _4 o/ c1 v
import dask( m1 c% \9 I- h, h% c% e1 a
import numpy as np
* Q5 P, i: w' E' k from dask.diagnostics import ProgressBar% l+ Y6 }% c* u+ K! I6 y
2 f2 y/ m' n; c
def norm_fit(data):
3 i& T8 h/ h5 O$ J9 F( c loc, scale = st.norm.fit(data)
8 D& Z, j& _# n% C+ r# C0 [ return np.array([loc, scale])+ @+ U e/ ^5 Q; S$ ]3 g. Z& b5 p
" a9 b5 X0 M0 k- n; {2 n" E4 n rs = np.random.RandomState(0)
* e& u- X+ H4 S* m* C! B: t x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
( u1 G/ x2 H6 d0 M4 e x = x.chunk({"lev":5, "lat":100,"lon":100})
" r9 y8 y% T, b3 |1 v- D' Z7 h9 k# U, a3 h! @9 Y( s" m+ }, \8 O$ X9 o
#使用apply_ufunc计算,并用dask的并行计算& u C/ H# f8 q% ^. w+ l, ?& s
result = xr.apply_ufunc(norm_fit,
/ p; \0 `0 f4 ~4 v& D7 f x,
" u# U% G/ }2 R% r/ m; z* x input_core_dims=[["time"]],
! \; w2 u6 w. D0 g3 r, Q5 G& W output_core_dims=[["paras"]],; n) i, `: v! P! P6 q
dask="parallelized",/ e9 A% P, o4 _
output_dtypes=[np.float],5 {$ t2 p$ f* ^
dask_gufunc_kwargs={output_sizes:{"paras":2}}
| n, y* v" x1 `' m# ] )
$ U0 S; r0 w3 |7 I/ O, E6 f7 c
* j7 l" [0 c* x$ a9 [- w+ K #compute进行真实的计算并显示进度# O0 ~( d6 o, h5 w; y( d
with ProgressBar():& x1 ]+ w# h, g E
result = results.compute()2 F- z2 V5 U# I0 o
( [+ M F* P' b7 f
#结果冲命名保存到nc文件8 K8 U; A+ T& t6 ^* T
result = result.rename("norm_paras")
9 I0 u6 r2 H$ K4 j# i! ?5 K' u0 I" H result = result.to_netcdf("norm_fit_paras.nc",compute=False)9 ]' ]+ [1 I4 _3 K, Y& d* b: I* S
with ProgressBar():
& k3 W3 u2 D4 ` result.compute()
0 V) X9 N! `* F 转自:气海同途
& A+ R" S7 }% t 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |