收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

1 `, [4 H( Y) q% O9 D6 z
- R1 n7 ^2 ]. T

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

, {- W% l, G9 m: G, N0 ~1 o# Z; ^0 e. T, V+ @8 K

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

" d) F2 m+ F4 E5 T. K( I3 q5 h; b

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

8 Y( e( J( z' h. L- Q, F

其中几个关键点解释一下:

$ T( |4 E: [; q# P) C: C. b

(1)首先定义拟合正太分布的函数

* M; G" T& W) I6 B; {& c8 H Z
def norm_fit(data):; H* c: H1 I# i5 E+ Q loc, scale = st.norm.fit(data)6 Z9 [( N4 z$ f8 C A return np.array([loc, scale])
$ c# n- K% n M5 `# i* |% e# d

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

3 X$ ?2 u+ P# q2 f$ I$ u; }

(2)xarray分块

$ r; _( w: r+ [7 Y9 w
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ) ~+ V: {9 m7 J" o- f$ v x = x.chunk({"lev":5, "lat":100,"lon":100})
$ d' t9 C% L, z

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

. m5 y# M2 f6 N6 j) @$ i; e

(3)xarray.apply_ufunc函数

r+ T, H0 E; b- ^# O' _
result = xr.apply_ufunc(norm_fit, ( I5 S0 Y& m1 } x, S5 `, f3 \ a3 X2 S input_core_dims=[["time"]], % a3 ]6 r4 ^% }1 C5 c output_core_dims=[["paras"]]," O9 f, L+ J( p dask="parallelized", ; |' T: j& |6 I4 r, A output_dtypes=[np.float],: z4 Y) M. R6 | dask_gufunc_kwargs={output_sizes:{"paras":2}}& e: v0 _; T5 ?6 a T )
/ V& ?5 i8 o" Z" D- ]

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

( ~& [; h( v9 k+ f6 p" H" a 4 O/ r" u6 \- }* @2 e

以下是全部代码:

1 F6 l# Y. D1 M4 o0 v" s+ H/ x4 S6 P
from scipy import stats as st % ^# G" `7 p9 y0 m! x ( I" w- k7 k, h$ i import xarray as xr: D/ o- x, m. I5 T7 e import dask 3 m* E1 P2 t2 A! Y7 V- j) q import numpy as np 9 M. ~/ J9 }2 t1 e5 f: C from dask.diagnostics import ProgressBar ) J9 R4 A6 O- p- R2 A8 V9 u' R; L def norm_fit(data): 0 B: ~' A7 G3 K# }' K$ H" l loc, scale = st.norm.fit(data) ( x' w, S* I9 s. u u9 ] return np.array([loc, scale]) & [0 Y9 V) C& \) ]5 S& A1 T) S# n8 G9 { rs = np.random.RandomState(0) x7 e M& {' H: o9 _ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 5 F- Z- ]) {- \8 s x = x.chunk({"lev":5, "lat":100,"lon":100}) ; P5 t0 [" Z" q 0 |8 W+ k/ A1 [5 d2 @ #使用apply_ufunc计算,并用dask的并行计算6 Z5 ^. l6 ?7 t$ M2 m- X" R result = xr.apply_ufunc(norm_fit,$ ^5 Q/ n; [0 _" [! w x, : H2 S' `- n8 K6 |" Q) Q input_core_dims=[["time"]], + t/ x$ C: a7 M2 `! a output_core_dims=[["paras"]],) [8 `! E, Y9 z8 a4 F' R dask="parallelized", " G9 S6 U1 ?+ |; L% T. e# h6 @2 i output_dtypes=[np.float],9 o6 H3 a3 ]$ h8 { dask_gufunc_kwargs={output_sizes:{"paras":2}} 0 a a# d. a0 ]+ v0 u, ~3 a# k )0 V/ w4 A& C8 m) U # {0 r; Q( k V& g) f #compute进行真实的计算并显示进度" j; K$ F- K7 d1 Y2 R7 J8 F with ProgressBar(): * i) y* N% T7 Z% O& ^8 ^( Z& U result = results.compute() ' l' C- \9 u8 m9 i & C5 n/ Z" I8 k# @ #结果冲命名保存到nc文件 0 o# s. D% @" @% o result = result.rename("norm_paras")8 Q6 Q g8 Q! Z- q d result = result.to_netcdf("norm_fit_paras.nc",compute=False)- T3 ?2 O4 a1 z8 w3 z with ProgressBar():- U) x% ^2 t1 u) a$ K5 J% S result.compute()
$ z: ^* m& G! C2 A: l4 @

转自:气海同途

! d+ U9 s( s$ \

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在4 天前
快速回复 返回顶部 返回列表