xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

! g1 e8 f1 b Z+ u) z
- ]; a: F2 V' E% \4 x

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

' b9 O3 W4 P1 I+ G1 | 2 f D- U2 w+ X

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

! R+ O2 E& W0 H2 E9 ]2 @& i, k 4 w, q) c$ p, k+ |4 v) O' e/ `3 Z: e

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

* ?+ Y* k% D# s

其中几个关键点解释一下:

/ C$ u& V( g: T& m& j

(1)首先定义拟合正太分布的函数

+ r$ h! d$ P. F3 ~6 r1 m
def norm_fit(data): 9 h, J- W/ p) ~4 _- m2 j+ A loc, scale = st.norm.fit(data) 7 U3 T$ ^5 z0 z5 @: N- \3 ^ return np.array([loc, scale])
1 Z2 G' [4 a; `1 s

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

8 `* q8 q# H; T* z3 X

(2)xarray分块

# K; v: a6 l# k/ E9 B
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ) ~* x$ r! \% R8 E1 A x = x.chunk({"lev":5, "lat":100,"lon":100})
+ N: b U$ f. {$ s" m: j

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

9 E4 G1 X$ x. M2 k: F3 F$ \: g

(3)xarray.apply_ufunc函数

7 A" y& M# u( [* v y# M
result = xr.apply_ufunc(norm_fit, ( F/ z+ w6 m$ w" S% ]1 O. x% L x,/ w. }* ]$ D) Q# B4 k input_core_dims=[["time"]],, A0 W2 U+ z4 b- d( A output_core_dims=[["paras"]], A5 H: G) J! f8 M1 X) Z dask="parallelized",( }. b1 B0 R9 m, z* S output_dtypes=[np.float], ; @; j9 x8 i: U9 f4 N( t dask_gufunc_kwargs={output_sizes:{"paras":2}}0 y1 ?5 t% d) M) B9 x$ z; X )
" ?4 e7 x9 p% |: m* ^

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

' y/ ^" S# X' l" B 8 W7 T3 b% [! N) Z) s9 C$ ^

以下是全部代码:

$ {+ R/ n* E- S+ g
from scipy import stats as st0 E4 g3 a( @& d 8 \9 A; B$ e6 ^& n9 X1 ?+ z import xarray as xr8 q% V1 R6 Z" F9 i+ \% V7 M import dask% W$ c. k2 j6 U9 i" U' k. h import numpy as np * Z3 W( o3 P* y8 Q2 S from dask.diagnostics import ProgressBar7 z3 y, n4 w( E9 [ [ - R- K. z- C& _& A4 c+ p7 t def norm_fit(data): 0 T5 J, |. r, p7 r loc, scale = st.norm.fit(data) 0 f0 ]( ?; t/ F9 g4 i* C1 v! b. T return np.array([loc, scale])8 d) H P6 ]5 Z- U2 q & R. H# d6 `) I+ N rs = np.random.RandomState(0) ) e( T' Z! W8 f x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])5 T& J' T& D4 v$ @4 B x = x.chunk({"lev":5, "lat":100,"lon":100}) , C' P% D8 q5 ^ P- I; R# @ G( t: b% G. O* Q+ d/ [0 Q# Z #使用apply_ufunc计算,并用dask的并行计算 : @6 c+ \( I- l( ?: S result = xr.apply_ufunc(norm_fit,8 e$ R t3 a& C# k g2 }5 u x,/ h) R' m4 u0 u9 Q' J9 A: N) k input_core_dims=[["time"]],# k7 ]0 k7 A7 o/ D' d& r7 e- S( H output_core_dims=[["paras"]], # c9 X8 p: h. v1 _- k dask="parallelized",- Q5 d, S" N2 P z' s( Y output_dtypes=[np.float], 9 B5 P. Q6 I/ K) w) {7 | dask_gufunc_kwargs={output_sizes:{"paras":2}} + n m! L% S3 w1 M u )! b s8 E: D6 E- P G! P* ~& i/ | #compute进行真实的计算并显示进度# T/ W1 |, N' x! b; P9 J/ J with ProgressBar(): & k, i3 L# N) a( N9 t result = results.compute() 1 V% k; b7 W2 @* _; {8 e! u # ^/ a! m, Z6 G3 q #结果冲命名保存到nc文件) l& V0 m* B) r! W8 M3 T result = result.rename("norm_paras")0 m) y0 T9 T" n0 P0 T6 x1 W result = result.to_netcdf("norm_fit_paras.nc",compute=False) & _, g8 ^4 g' a1 q$ O3 u with ProgressBar():. p6 n, {: N- R) U# S result.compute()
. o. {+ S7 h! ~" h/ |: b0 N

转自:气海同途

1 p6 {6 S; ]# g

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在5 小时前
快速回复 返回顶部 返回列表