xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

! v2 @ H6 h4 A$ w' g9 y, Z
; h3 _! l5 c# P- N

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

6 t: [; Z/ B4 @ V# O) S) S4 A/ v0 g

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

. `3 w; h6 P2 [/ |: y% N( n/ t5 ^; K6 t

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

" m" [# ~0 q1 V

其中几个关键点解释一下:

X a$ D5 m% f6 d$ ^6 s+ @

(1)首先定义拟合正太分布的函数

' Y6 v9 q- g5 J
def norm_fit(data): / c/ n. _6 |1 u' W6 r, O loc, scale = st.norm.fit(data)1 x7 j6 B1 r% v+ A5 Q& g+ e return np.array([loc, scale])
% F; b/ q) @* X2 K. O/ Z

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

- i/ u# X: D3 }3 x4 Q) L9 Y8 U

(2)xarray分块

8 E& V9 d7 b% Q U8 \
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 8 f0 ]/ w$ n* p/ z W. f x = x.chunk({"lev":5, "lat":100,"lon":100})
2 c$ Q) H8 F( }

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

5 {) y8 O! p6 |( a9 z- P0 x

(3)xarray.apply_ufunc函数

1 x3 V7 e( O. \' y% f8 ^# a
result = xr.apply_ufunc(norm_fit,3 y; d6 C) Z6 f L- e% A/ T% Y x, # ?/ u ?4 [/ C+ {: d input_core_dims=[["time"]], $ }5 h, l' ?, ~ output_core_dims=[["paras"]], j) E- B9 r9 J# ?. {- ], } dask="parallelized",% m1 r) B1 J& g6 v0 a( V% S output_dtypes=[np.float],6 M" k, Q3 m+ `/ U6 E! i dask_gufunc_kwargs={output_sizes:{"paras":2}}* j2 U3 H" Q3 {4 n6 ?- G1 N: ] )
& _; X" U: q8 q3 ]/ j! f4 m& P/ y" j3 C

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

1 x$ t( L- Y8 H - s+ L% j3 U& D2 {( z# J, {8 d

以下是全部代码:

0 a& U% T1 W+ R' [
from scipy import stats as st ; S# [7 E0 K1 T % J9 w8 [# `5 [ Y import xarray as xr ; y8 w9 w8 Y) l' x$ @5 Z7 K& [6 f. Q import dask2 A, g9 a! n9 i- ^ import numpy as np2 A: i z! d+ A0 m7 }/ d from dask.diagnostics import ProgressBar A) n* f# e9 G3 G* Y. J* t) Q2 c: V 7 v6 h6 s+ h6 f4 E1 F' ?! n' N3 e def norm_fit(data):' f" o& ~( I: F loc, scale = st.norm.fit(data) 0 S& L7 x7 ~( u5 m0 ^ return np.array([loc, scale]) ) O- y! K6 H2 c 7 a, t1 s8 A8 y* B S: s$ h, v3 _ rs = np.random.RandomState(0)% _/ R x* B* x x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])7 N/ k* |# Y$ ^$ B( o x = x.chunk({"lev":5, "lat":100,"lon":100}) # f9 E. ~- ^1 o3 g. P/ i! x. p# s$ p* A) u9 P #使用apply_ufunc计算,并用dask的并行计算 ' x6 V: B' Z7 o$ D result = xr.apply_ufunc(norm_fit,& T0 ?: u& j/ ?. { x, 1 \: W1 w, P8 ~7 x7 x' D input_core_dims=[["time"]],5 r5 @9 l! A' x! }' G3 O& X output_core_dims=[["paras"]], * u/ a/ E1 z1 i3 _7 @$ \! u dask="parallelized",4 q+ d1 O- J- y) K) n% z" V output_dtypes=[np.float], : h; i5 E+ J4 z. ]' A# H2 g7 }7 K4 O dask_gufunc_kwargs={output_sizes:{"paras":2}} P1 F, F! H% b! C( _+ N4 A w )" }! ]! G8 k3 t' f* K9 l6 \ . ?6 w' U% p5 ^+ {) t0 R #compute进行真实的计算并显示进度9 `! ^. m& S2 h5 X: e9 v7 n with ProgressBar(): q7 \' G+ T; t8 ~" X result = results.compute() 2 Q+ ^1 P% T4 z/ }8 k/ k2 c+ G' j1 M8 z" Y7 o& j" E #结果冲命名保存到nc文件8 u# H. Q% e+ F& u8 r7 D9 c6 W result = result.rename("norm_paras")5 e" q7 H4 v; s. _: Q result = result.to_netcdf("norm_fit_paras.nc",compute=False) " i* b* v1 M4 H2 H% C! _$ O* y, [ with ProgressBar():$ s5 n3 W* F/ d result.compute()
0 T& k- D. b/ E% y

转自:气海同途

7 w" o) Z9 d' m8 R0 W) `) d

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2025-1-27
快速回复 返回顶部 返回列表