收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

3 S) l) x9 v6 w P7 q
$ U6 V4 M+ w+ [, D/ I

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

% V! Q) C' I; V' O1 C* q $ N) o" K, W- s4 u: S

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

- p, S0 _* U! N. }* }7 K8 |8 j2 c1 l7 h6 c

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

) V1 R0 U( }& d# g. i% C

其中几个关键点解释一下:

" B! P( I% N0 o0 p% e

(1)首先定义拟合正太分布的函数

: A0 l$ B a/ y. @9 ~
def norm_fit(data):9 \7 ]; \( |8 d* C$ `6 J loc, scale = st.norm.fit(data)4 m( w7 J% E6 Q/ k% j& h return np.array([loc, scale])
8 x3 U0 Y% u6 J! l

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

/ M$ |& X- [) V* K; E" b

(2)xarray分块

% @8 T9 q |1 V' w: q7 D
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])% L' y' Y7 ~" n3 y+ P# ^ x = x.chunk({"lev":5, "lat":100,"lon":100})
2 D1 ^& A( O- K8 i: x4 W

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

5 A( v( s% S* w! r2 o9 C/ O

(3)xarray.apply_ufunc函数

f+ m0 P# V- z
result = xr.apply_ufunc(norm_fit, 1 }0 T' r/ U, I( E x,* M) X: h* K* @7 L, ]3 @! U( F4 r input_core_dims=[["time"]],% o0 y0 D" @0 |! ?# R$ k3 P- `$ I/ g output_core_dims=[["paras"]],9 R, t. S5 A: X' k) H* _ dask="parallelized", ! }' i! y- m& | v5 @2 Z output_dtypes=[np.float],. w7 e/ z H0 z" e* r dask_gufunc_kwargs={output_sizes:{"paras":2}}" j& U5 n8 j, j' E, L )
0 [% P% W1 N. q' K) d

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

, C$ L9 k9 } j! i8 f# b ; t* Q# Z! R* K) q$ s. h" v; N5 g

以下是全部代码:

) {3 J7 ^- q7 ]2 o1 ?# C9 @) d) R
from scipy import stats as st * b: }! Z4 }. R+ X4 l/ ?, G" N : E6 z1 C% F0 O5 j, e3 Y* d import xarray as xr : O* c2 S$ G) F; K import dask9 P& J3 H( X* u, z import numpy as np ) }# k9 p+ ~! U, v from dask.diagnostics import ProgressBar 0 I0 L+ j% s. |: ~2 {* h( }* n8 P6 \5 q; g1 q- P def norm_fit(data): ( H" `# }4 x% E" O7 E- b, C( d+ p- u9 G loc, scale = st.norm.fit(data)+ h1 m+ G/ B. b' @' f$ {& D return np.array([loc, scale])+ N/ p! F; a. A2 R # H7 p& x3 h% N" I$ ? rs = np.random.RandomState(0). \: X. Q5 s+ b y8 K' i) ?9 h9 F x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]); q! A' x0 O8 a3 f/ { x = x.chunk({"lev":5, "lat":100,"lon":100}) * e0 @* W* u1 x8 a ; P. @" Y/ B- F* @3 R2 B #使用apply_ufunc计算,并用dask的并行计算" N6 E1 }; p+ y; g9 D0 G D, w5 i$ l( @. N result = xr.apply_ufunc(norm_fit, ! \ ?# g$ p) S j x,3 |& y- l: k, ], m input_core_dims=[["time"]],7 `2 C i: D8 R* ?" \ output_core_dims=[["paras"]],5 g( D/ s( V, n' o dask="parallelized", 9 }& V8 I1 l+ L1 N4 F( { output_dtypes=[np.float], % X4 j6 h6 o6 ^2 m. ~+ r& F! a dask_gufunc_kwargs={output_sizes:{"paras":2}} - m8 |# n+ `% W7 N. E( H! e, ` )8 I! V& a! Q; u2 E 6 ^) q, ^$ y5 Y' p. i2 e #compute进行真实的计算并显示进度 * z( U- N" ]: I A* W& H! y! Y2 H2 ] with ProgressBar(): + m9 k+ j9 e( m$ H/ A result = results.compute()9 l: i2 \* u! B( N$ \2 l r! P % s( L5 `* ~& a' K" S #结果冲命名保存到nc文件' Z ^" O' Q+ z8 Z, X result = result.rename("norm_paras")7 G! j- x. a: r+ ` result = result.to_netcdf("norm_fit_paras.nc",compute=False). f$ X% y2 K/ F5 \0 |3 o& ` with ProgressBar(): # _! i' _, p3 ?: E4 r2 s& n' I result.compute()
& r- c( H, u9 f- e; V. v: i% O

转自:气海同途

4 j# A/ S. s; W1 }8 s2 T+ K2 s

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表