xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

7 [6 w. r" X) u+ ]1 ^5 l5 z
2 k$ m" {/ ^+ G4 E7 A b. q* V6 L

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

3 J* T- U4 o6 {. w+ u- d' }) C. h: [ U- C2 z3 n0 ]

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

3 D) O3 q/ @1 [# j7 a( F; w3 C' i6 @. I- w8 i

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

; `5 q: V+ S+ Q- G* f8 G8 f! G

其中几个关键点解释一下:

* j7 |- r4 `% H% e3 s

(1)首先定义拟合正太分布的函数

5 k( I' b# O' z( x# G
def norm_fit(data):8 R7 b s- G0 [, o" L# d loc, scale = st.norm.fit(data) ) m( M* Y3 U' O8 p, @: | return np.array([loc, scale])
2 s, E+ F5 p3 z

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

5 }' \: F2 X- S( _( g

(2)xarray分块

$ D0 Z1 k( P: P6 K2 \5 u
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ( Q; y+ x$ V& {* \ x = x.chunk({"lev":5, "lat":100,"lon":100})
8 \2 t+ A! T# C

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

8 j/ M" B( B1 l/ ^& S

(3)xarray.apply_ufunc函数

$ {1 g' ?9 ?5 @
result = xr.apply_ufunc(norm_fit,' c9 ]* f; T0 S8 z7 R: h& C% ^ x, \% P9 I3 B6 u/ ~# [ input_core_dims=[["time"]], # o: w" s3 ` _( Y output_core_dims=[["paras"]],- c. r. a+ ~% }4 w dask="parallelized", 8 j5 O9 {# N$ i4 d& K" n6 g output_dtypes=[np.float], : a' n/ W) f, s% o dask_gufunc_kwargs={output_sizes:{"paras":2}} ' g; J+ v/ T5 Q" _, x )
' e* H/ x. @& l

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

5 |1 ]! M0 u/ S& g 3 _, d- m, D4 \) {; d: J

以下是全部代码:

7 o- u) q0 A$ P' x! M: Q5 Z F, e
from scipy import stats as st% n/ X/ p/ h1 N6 j; i2 s ]) u 8 P0 V" |4 Y2 R o! F3 t% c* t import xarray as xr# U2 t/ a5 n+ _4 o/ c1 v import dask( m1 c% \9 I- h, h% c% e1 a import numpy as np * Q5 P, i: w' E' k from dask.diagnostics import ProgressBar% l+ Y6 }% c* u+ K! I6 y 2 f2 y/ m' n; c def norm_fit(data): 3 i& T8 h/ h5 O$ J9 F( c loc, scale = st.norm.fit(data) 8 D& Z, j& _# n% C+ r# C0 [ return np.array([loc, scale])+ @+ U e/ ^5 Q; S$ ]3 g. Z& b5 p " a9 b5 X0 M0 k- n; {2 n" E4 n rs = np.random.RandomState(0) * e& u- X+ H4 S* m* C! B: t x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ( u1 G/ x2 H6 d0 M4 e x = x.chunk({"lev":5, "lat":100,"lon":100}) " r9 y8 y% T, b3 |1 v- D' Z7 h9 k# U, a3 h! @9 Y( s" m+ }, \8 O$ X9 o #使用apply_ufunc计算,并用dask的并行计算& u C/ H# f8 q% ^. w+ l, ?& s result = xr.apply_ufunc(norm_fit, / p; \0 `0 f4 ~4 v& D7 f x, " u# U% G/ }2 R% r/ m; z* x input_core_dims=[["time"]], ! \; w2 u6 w. D0 g3 r, Q5 G& W output_core_dims=[["paras"]],; n) i, `: v! P! P6 q dask="parallelized",/ e9 A% P, o4 _ output_dtypes=[np.float],5 {$ t2 p$ f* ^ dask_gufunc_kwargs={output_sizes:{"paras":2}} | n, y* v" x1 `' m# ] ) $ U0 S; r0 w3 |7 I/ O, E6 f7 c * j7 l" [0 c* x$ a9 [- w+ K #compute进行真实的计算并显示进度# O0 ~( d6 o, h5 w; y( d with ProgressBar():& x1 ]+ w# h, g E result = results.compute()2 F- z2 V5 U# I0 o ( [+ M F* P' b7 f #结果冲命名保存到nc文件8 K8 U; A+ T& t6 ^* T result = result.rename("norm_paras") 9 I0 u6 r2 H$ K4 j# i! ?5 K' u0 I" H result = result.to_netcdf("norm_fit_paras.nc",compute=False)9 ]' ]+ [1 I4 _3 K, Y& d* b: I* S with ProgressBar(): & k3 W3 u2 D4 ` result.compute()
0 V) X9 N! `* F

转自:气海同途

& A+ R" S7 }% t

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2025-1-27
快速回复 返回顶部 返回列表