收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

& X7 ?, L8 H0 h5 L" l, X
4 q; }- X( ?7 ?! V' M

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

8 Q6 V* _* C; W5 f9 m! I& O$ K - B" O% k) Q2 G

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

9 D9 o' C8 g4 U " o, W& O ^2 `' M7 w7 h

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

9 }3 ? J) l5 o7 P9 R* P

其中几个关键点解释一下:

$ }5 A6 ~! ~, V9 y1 l9 q( w

(1)首先定义拟合正太分布的函数

; F. T% p) C# S# M i
def norm_fit(data): / n" ~0 J V( U1 z ~1 @ loc, scale = st.norm.fit(data). S X8 o2 N% W2 @' n; w' t return np.array([loc, scale])
* K& Q5 F2 j; J7 R$ H2 g

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

8 x5 Y+ m! P* F! p$ }( ^0 `- T

(2)xarray分块

^2 ]6 B( `6 ~* C; [
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) / U& g7 D+ P6 H& h8 @ x = x.chunk({"lev":5, "lat":100,"lon":100})
9 {$ r, g* f& T" }

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

8 T$ K; S8 }( _. \

(3)xarray.apply_ufunc函数

% a* D4 G) l; a) \
result = xr.apply_ufunc(norm_fit, 0 L7 L1 N7 K! V& S0 C" g x, 8 \& _; @$ z D1 x; z+ @ input_core_dims=[["time"]], : U- ]& f* T" n& s' N, m output_core_dims=[["paras"]],/ a# a) @9 K5 Y# S/ h! G dask="parallelized", 5 \2 j) ?" n7 d, E output_dtypes=[np.float], * d: {0 v2 U0 V4 q( l, ^- J dask_gufunc_kwargs={output_sizes:{"paras":2}}- Y: [+ n; O8 b# N )
$ t4 t' ?' Q* d( w7 h

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

$ @! X* I/ Q" _9 {) T% r0 ^/ t# d+ B3 D

以下是全部代码:

% j+ A+ S! d2 N$ f7 P+ x
from scipy import stats as st9 r! Q: L( `. ]6 ~! G % |9 l( f/ F9 _7 g5 i import xarray as xr- T1 B# z$ Y5 L& ~/ c1 { import dask3 u; f) H/ G0 ~8 ~2 _ import numpy as np# l8 R: S8 ~: m. s* Y7 y from dask.diagnostics import ProgressBar + f, K; y2 O& o; B" j& a* `2 N1 W" _. Z3 b2 Q def norm_fit(data):& w& P: [2 p" f; B# t t loc, scale = st.norm.fit(data) + s( }. Z7 [* H- V( E- B return np.array([loc, scale]) l+ c8 p( G; z8 T+ g% K" L( ^ l ) }+ W) `6 _" E- j/ l rs = np.random.RandomState(0)9 }+ o' Q+ r. I x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 0 q1 ?; Y: j9 ^- { x = x.chunk({"lev":5, "lat":100,"lon":100})5 @$ H9 x# p, V$ O- H C2 v: l+ I- i( \/ W! M Z: ?# n #使用apply_ufunc计算,并用dask的并行计算2 B) o+ J$ j& O! M result = xr.apply_ufunc(norm_fit,. o! k7 R- e! [1 W0 a/ f C x, # j# n$ F1 W* d/ W5 @; [ input_core_dims=[["time"]],5 M8 k! Y# C( r1 }; }) C output_core_dims=[["paras"]], % M+ B3 g' Q \ dask="parallelized", ! J4 x" k: ]7 T output_dtypes=[np.float], [1 f3 E6 `# T9 S dask_gufunc_kwargs={output_sizes:{"paras":2}} 2 C7 {. }7 @ _2 M& ^* B ) / W# Y4 }$ Z6 K% W3 a' T; d/ u+ M7 J9 W' P$ q #compute进行真实的计算并显示进度- X, f1 K7 Q! ?$ ~+ k! t" u: h0 ] with ProgressBar():: k3 W, P6 p0 s" u. X result = results.compute()( m+ d( D) P7 e - Q3 p1 o+ I) j #结果冲命名保存到nc文件 " t( H5 M" C1 y% H result = result.rename("norm_paras")2 E3 t: W" C: i1 O) l8 A result = result.to_netcdf("norm_fit_paras.nc",compute=False)$ R* f8 Y$ Q5 Q; o/ i2 z with ProgressBar():/ C+ z# r" K X. L" U result.compute()
; R5 S( F r% I+ S3 ` X+ }8 x

转自:气海同途

2 `+ M) H! T2 H( Y. g

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-2-7
快速回复 返回顶部 返回列表