xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

% ~% [$ {1 a% s" H7 n
+ j. M n) I9 f7 {( F! `

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

. s- F7 k% w2 Y ! j+ K' k2 g/ E. l: `) J

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

9 ]8 f2 u- d4 X: f7 e& F. _6 X ^4 b2 d5 U# y" S4 @

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

4 y" S" D) E% w7 z, {0 E) f4 n

其中几个关键点解释一下:

% s: ?$ U3 c8 b

(1)首先定义拟合正太分布的函数

* _: c: `+ @8 H- T! T
def norm_fit(data): 9 c) t/ d# l1 X loc, scale = st.norm.fit(data) $ g" H; n& t3 l# M5 K- [4 R return np.array([loc, scale])
7 S, o; y9 V% W) K& F+ e! g

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

, H( x9 {/ x9 N* \0 d& Y2 e

(2)xarray分块

( S& G, r9 q7 r
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) % b$ O4 n/ K5 n9 O; O) ~ x = x.chunk({"lev":5, "lat":100,"lon":100})
; E1 u! u) l. o5 l! D( M

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

( [4 u' _7 T# R3 l. B- }

(3)xarray.apply_ufunc函数

t5 E# }5 W1 M0 P9 f
result = xr.apply_ufunc(norm_fit, 2 a" F: N% A5 j7 e x,5 w* e; q0 ?6 M m; n: X input_core_dims=[["time"]], ( |3 f* }3 t }' u0 a output_core_dims=[["paras"]], % {- |; }$ x9 N" r/ u% e dask="parallelized",8 ]* W, d. ^% {2 ^2 b- U p! e( b/ F output_dtypes=[np.float], $ T$ p* y7 d7 u- f* q( ~6 g dask_gufunc_kwargs={output_sizes:{"paras":2}} # \( R! g" Q+ _5 g )
" B0 o7 H+ F1 ^+ |

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

/ S/ H. I# r9 d) `2 {+ a7 |0 W( p7 u2 e5 Q1 y5 S& w% H

以下是全部代码:

0 L( ]( J% \5 F2 I; a
from scipy import stats as st & g0 L. {9 R m& H: F) r9 w: T0 v$ |6 |- J$ ? import xarray as xr9 Q7 S# L4 A) j9 g0 J7 j. ?. J import dask ( m7 O8 Y! V! r import numpy as np# A& ^+ V. H, H+ w" D from dask.diagnostics import ProgressBar, i6 Q/ E% ?% w 7 X" d/ ^8 ~: C def norm_fit(data): 4 ], ?9 v5 v$ ~) i loc, scale = st.norm.fit(data)2 p4 T. l* t4 ]& A return np.array([loc, scale]) ' @: z, v7 U0 k6 l M0 M: A( G% M8 r$ M+ Q rs = np.random.RandomState(0)- m/ u! t+ S/ G! j1 g3 S x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 2 D( Q. a5 p0 e, z- o x = x.chunk({"lev":5, "lat":100,"lon":100}) * ?* M2 \7 a6 g* e7 M- |8 ]$ t( H$ c* A( E+ G( D" |) S5 a: p #使用apply_ufunc计算,并用dask的并行计算 ! Q9 p9 }% G6 }, M4 x5 j9 u# q result = xr.apply_ufunc(norm_fit, 2 X0 V) j, \! c) k# d x,8 x/ a! k% b% ^0 X* f5 |9 Y6 J input_core_dims=[["time"]],; T% G% s, q* m, S2 A7 L- Z output_core_dims=[["paras"]],5 A) y' m0 `2 j( q dask="parallelized", / J9 |4 i7 L! W/ w% b. r output_dtypes=[np.float], : u$ i! A! K( j dask_gufunc_kwargs={output_sizes:{"paras":2}}4 }5 ~9 x* x" E% F0 T. V+ ]3 [ )% b1 K4 ?: _' `8 V* C5 O# Y4 x8 d- G" } / l# }7 D* R- W, ~: U) L #compute进行真实的计算并显示进度9 R2 i, |5 ?& u! z0 d" ] with ProgressBar(): 7 I# l, `* A H5 e result = results.compute() & ` ]+ l* |. m, S. S/ X1 ]: \# N7 _0 V+ F- C7 d #结果冲命名保存到nc文件 + ]" d2 o$ N9 c, `, A result = result.rename("norm_paras")& F8 E) K+ |3 ~2 ~; h result = result.to_netcdf("norm_fit_paras.nc",compute=False) ' `4 ?/ Y [; ?! |$ S with ProgressBar(): 4 T+ C- w* v& c8 D3 p$ t$ ?9 L result.compute()
- {- Z( v( H' T" I! J. I% ]/ F

转自:气海同途

4 [) Q' R# T( u7 X1 ~% x7 ^6 }

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在5 小时前
快速回复 返回顶部 返回列表