xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

6 W' ]! h2 o+ A8 a" U
0 d% V$ B. ]; n" p5 j. G

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

4 x' B- u4 d \! F0 |( U) F, B + z8 ^( F8 b j* W0 b3 A. T

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

3 I5 V' k5 G- }! z, Y1 B2 C 2 G, O; }% w: V; ]. }6 K) F

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

7 e7 W4 u6 I2 R

其中几个关键点解释一下:

) W2 F; b) ]- I

(1)首先定义拟合正太分布的函数

, g1 {0 d, q8 M T, I8 Q( c* ~
def norm_fit(data):) D" t; J0 e+ Y; S" d% ] loc, scale = st.norm.fit(data) ! ~! N4 n: {5 O4 e2 @ return np.array([loc, scale])
3 P4 a# T$ F7 t2 _- d

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

, ]/ [! H# Z. l4 R% `7 [5 v

(2)xarray分块

. W4 ?8 A4 n! K9 D6 p
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])' i' Y& j& r3 M x = x.chunk({"lev":5, "lat":100,"lon":100})
; d3 ?0 k. \0 i% K

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

- |+ T1 R' F1 o* k8 m

(3)xarray.apply_ufunc函数

9 y7 p) W0 u+ x2 e8 |$ Y
result = xr.apply_ufunc(norm_fit,1 R6 J2 c" _4 F x,4 h( F5 h9 s% Y! r8 R, \ input_core_dims=[["time"]], h( F5 C: p# U+ b8 G& H output_core_dims=[["paras"]], 3 c% J; }" O2 b2 | dask="parallelized", * J' m! M3 D3 w( u/ s& m2 k output_dtypes=[np.float], # F) T" E( O7 V4 f1 y6 S dask_gufunc_kwargs={output_sizes:{"paras":2}}4 R( R5 {8 Y3 n )
+ P6 ~7 K. ^4 j1 E( Z( Q: B. X

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

0 {! W5 N8 ?- r& l . F' r3 I1 q% b7 {

以下是全部代码:

x0 M# I: ]4 M. q! t% ]) U
from scipy import stats as st - i4 U3 l- Z/ T5 S! u3 }. v. e; ?9 h- k import xarray as xr* ]6 e+ q1 k, U. P0 b# Y: _ import dask % }2 V( ~0 o# f import numpy as np * _) O8 F' d0 b& S, ^% t from dask.diagnostics import ProgressBar ( j$ j F, ]% l# O3 }: ~ o0 q0 v" f: x1 t1 x k. |+ Z" \ def norm_fit(data): 0 }8 T8 |, M! b6 D, G% j3 \ loc, scale = st.norm.fit(data)% ^/ ]$ E! k- E return np.array([loc, scale])0 d5 G" M7 }6 [ & D, ^7 S% m8 _ rs = np.random.RandomState(0)' E* z4 S# x7 k5 H" X x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) . r* w1 M( A# j) d x = x.chunk({"lev":5, "lat":100,"lon":100}); ?! ?/ |; U3 r. n2 t) I; K ! N, ?4 `4 e$ H #使用apply_ufunc计算,并用dask的并行计算+ ? D% @' k) ? result = xr.apply_ufunc(norm_fit,3 I" H- Q' F* \1 T- z9 f3 T! t x, : B" X( ] R4 Y8 o input_core_dims=[["time"]], % D" p) R+ R9 |6 M& N1 p* p" x0 r output_core_dims=[["paras"]], " g1 ^! n: t9 V dask="parallelized",/ y1 x& b. |) ?9 u output_dtypes=[np.float],4 N$ }3 \* K7 v" R/ V) P dask_gufunc_kwargs={output_sizes:{"paras":2}} ( U9 M4 H0 J* b ) ) o+ W/ Y$ a8 b9 w# t/ \ x# t" p" L4 G #compute进行真实的计算并显示进度7 K, o9 s+ `) M7 S6 O with ProgressBar(): $ ~. L+ u: M0 {$ B4 R, H" ?/ P) h result = results.compute() ! f# k; ]5 A2 ^8 W0 ? 4 G7 v( @6 h2 m( M #结果冲命名保存到nc文件 : y+ s0 Y( t% c2 I0 U* V# A result = result.rename("norm_paras")( O. g! k& C) _5 M/ ~ result = result.to_netcdf("norm_fit_paras.nc",compute=False)% d! Y- t; P7 L4 w with ProgressBar():) {3 X# Y' E. y0 n$ H/ ` result.compute()
; ^2 {& U6 F0 a9 I

转自:气海同途

- y4 J, |& b/ @, E; W+ \3 a$ P

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2025-1-27
快速回复 返回顶部 返回列表