xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

7 o0 u1 q( d, H
: N: D& R. J# O! o& S! ^

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

3 r8 p3 l* \- K3 I+ K$ `" C9 a, N- J5 B

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

; l: _7 M6 F: N- M9 `& w * T$ `# ?( l( Y" M* H/ S% n4 D

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

' o9 H% ^* E7 q: ~3 T( ~

其中几个关键点解释一下:

; v4 K7 ?& o3 x) @, Q' t

(1)首先定义拟合正太分布的函数

4 i7 O6 L3 w" `% T; w# d4 Z5 U
def norm_fit(data):$ K: d" l" C7 v0 t. M7 j loc, scale = st.norm.fit(data)& E9 }* T: K9 F return np.array([loc, scale])
( x7 o/ @$ Y3 j5 W0 f

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

' m+ i" t' C& d9 t

(2)xarray分块

" ?: R2 v Q+ Z4 |3 }
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ( s2 v+ Y4 v! d1 `. t6 Y& _ x = x.chunk({"lev":5, "lat":100,"lon":100})
9 q! J9 O4 y3 l- L3 A

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

1 d3 w0 L: E9 w

(3)xarray.apply_ufunc函数

& B+ {/ p3 V- I0 o6 z
result = xr.apply_ufunc(norm_fit, ! w# J- z6 t( R$ J- R' J6 Y( ^" m x, - b/ B A8 p2 p2 V% x3 G& x input_core_dims=[["time"]], ! w& R2 m5 O& {9 o7 @) g# h4 | output_core_dims=[["paras"]], 8 H. k! ~. U% h, r; x+ C dask="parallelized", - |3 b& k, I7 O/ E7 o output_dtypes=[np.float], * U- Z. X& D, }1 @- Q# j dask_gufunc_kwargs={output_sizes:{"paras":2}} 6 a! L$ B4 l2 p4 |- Q) b O1 D )
1 e4 H: Z4 J o+ p0 I

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

; v! F p6 F. J4 W & W% w, e" }! q- O0 O. s

以下是全部代码:

+ f! [: q' Z- P% a
from scipy import stats as st1 B2 E9 j& C, E0 r# v- S 5 t X6 d: F$ T import xarray as xr - k) ~! J7 @4 d3 B) M* t0 L import dask / h) _( P% T1 O( ^% ]) N7 | import numpy as np8 L7 n# T. f6 l7 Y" K7 g from dask.diagnostics import ProgressBar 9 [( ]" o1 m' q2 X' v, V1 Y/ O+ S2 H5 r' T def norm_fit(data):) {# A: w% ^6 v/ c loc, scale = st.norm.fit(data)8 z! l6 V# p. @$ q& S3 c1 b return np.array([loc, scale])& }* F5 p' h3 K 6 ^" a" M$ q$ N: Z) V6 O* G! e rs = np.random.RandomState(0) " x! m- c& a7 w7 R2 t4 ^ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])7 V& h" {' |$ _* D x = x.chunk({"lev":5, "lat":100,"lon":100})- [! T: G! L+ A" f: e- N" \ ! A D( w" b( V% s #使用apply_ufunc计算,并用dask的并行计算 8 b$ @/ [# v# z7 ] result = xr.apply_ufunc(norm_fit, , } x+ c' w2 X, g2 o. ?- y x,3 v4 ^- d& `) V: ^' Y0 k c5 l input_core_dims=[["time"]],. y! C9 S# S t: F# j output_core_dims=[["paras"]],5 B0 q' n0 p8 i- Y6 J& u6 t& X1 u dask="parallelized", $ {9 _1 _4 Y4 L: }+ d output_dtypes=[np.float], ; ?* T, l& R- M6 ^ dask_gufunc_kwargs={output_sizes:{"paras":2}} # a- o. U. P& ^" b0 i ) 8 T+ q, A7 x0 s* a+ `0 z T' S$ \' R9 e #compute进行真实的计算并显示进度 " v5 U( m5 A! J" E. r with ProgressBar(): - f- Y9 t7 q% h result = results.compute(), I0 t* T. W0 m* @ ' a, d; V$ _' t- g6 ^; T8 A' y #结果冲命名保存到nc文件4 I. m0 I7 V" B8 X$ \( m" X$ A result = result.rename("norm_paras")1 o1 b* E9 w5 j. B result = result.to_netcdf("norm_fit_paras.nc",compute=False) 8 l* l8 t4 s. O2 D5 W with ProgressBar(): 4 v4 w2 g0 Y9 Z8 h3 ^1 H$ [ result.compute()
5 I ~. q: t1 E- Q& c' Z% s) i6 h

转自:气海同途

/ S# y- U- S8 F+ f, o

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2025-1-27
快速回复 返回顶部 返回列表