xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

6 K B9 t; E# U% o6 j
) C: Q/ i5 N Q+ E) \

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

/ p1 {8 j$ j7 q" z 9 R7 x% z) L/ {+ m7 b

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

0 R; J4 n' u: V* U# }' N: @ 0 ]7 B$ i1 y3 T# N# H# ^* S) r* D

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

) Y1 y! S7 l( G+ O) ^0 H( ?

其中几个关键点解释一下:

1 n) P$ |2 m7 Y

(1)首先定义拟合正太分布的函数

/ d6 d9 P( E9 [9 _* P* S
def norm_fit(data):! \% n9 n6 I+ P1 \ f# M( i; K" U loc, scale = st.norm.fit(data) 8 z$ V- h. J% _ return np.array([loc, scale])
; e: z1 g9 F# G/ P g( Y( D

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

8 r* `- O) d3 v# o0 j" l1 \* A

(2)xarray分块

* J9 G3 u& E9 V4 t( | |
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])6 `# ]! m1 W2 J% L+ l x = x.chunk({"lev":5, "lat":100,"lon":100})
# X) L/ \" w6 V$ ^+ F

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

4 m3 l+ k+ J6 r) ~3 E& _; z

(3)xarray.apply_ufunc函数

% j7 i/ u* `$ A7 V5 C; j
result = xr.apply_ufunc(norm_fit,: K1 C1 @9 o c+ o# y x, 2 ]* X$ z* f( ^/ }) Q- b1 N input_core_dims=[["time"]],2 w! ~: M2 o. |- L9 b! Z3 ~% S output_core_dims=[["paras"]],8 M8 P) W1 L8 K, H dask="parallelized",. Q4 |9 S ~5 Z+ i0 m: d output_dtypes=[np.float], z3 X4 X9 t/ v( N, l* u% ^6 i% u- N dask_gufunc_kwargs={output_sizes:{"paras":2}} F( H6 V2 y' f7 g )
; i6 f3 a# N+ }8 b4 s3 t6 m3 {

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

9 k" A$ n2 P: @3 z7 J) m( q, W. K* A6 [

以下是全部代码:

% I" d1 Q+ O1 M8 ?5 M0 l c
from scipy import stats as st- x o% p; ~( ^# {" N 3 {3 C8 I4 U0 z+ l3 K% Q import xarray as xr 4 g( Y2 k& }/ k1 E' A! ^ import dask7 F: Y& P2 H9 R: \5 t" Y import numpy as np, K8 d* `% V5 x from dask.diagnostics import ProgressBar * X1 q: ^5 @( x( W! x, y& y4 Y6 }1 d o/ j8 l* Q* }. g, l def norm_fit(data): * |. T( B7 m- ? loc, scale = st.norm.fit(data)' G& ?9 g4 O5 E return np.array([loc, scale]): B+ v! p+ V3 ?! g # O5 L; Q5 T$ k: n% @- z rs = np.random.RandomState(0)5 O! h+ v5 a# P* s/ B- x x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) I- u- Z1 r) s r x = x.chunk({"lev":5, "lat":100,"lon":100}) + A5 n1 R9 Q# B, X1 s. P9 S! M x: B0 H( x! T' h; ^- u #使用apply_ufunc计算,并用dask的并行计算+ N& l- u Z! k% e: v- |% M) ] result = xr.apply_ufunc(norm_fit,: Z0 o% W# Q& o) D( m: l+ Y x, y; ? ^! u$ W i8 J( r! e& u" O input_core_dims=[["time"]], ; J2 q& r4 k+ k" }1 j6 h output_core_dims=[["paras"]], ; Y3 v; Y# Z, x dask="parallelized", 2 @3 F+ n& N7 D0 v) P2 m( O$ k' z output_dtypes=[np.float], 5 s* V9 n/ T4 A* f* s' H dask_gufunc_kwargs={output_sizes:{"paras":2}}1 n2 R" z$ a' K: q7 A/ H# s) G8 Z0 V ) # ^' a. o! m2 Y6 A9 Q2 r, y! ~, v8 U+ A+ a1 i #compute进行真实的计算并显示进度 ; p4 y% i4 [, Q/ Q- T; c" T9 [ with ProgressBar(): R2 I" T1 s/ W3 T1 O' Q result = results.compute()9 H5 f* i3 k# N: i$ W ) t. _2 C, a$ F. t2 @$ r6 m #结果冲命名保存到nc文件3 U! s8 ]" R) O1 }- D8 Q2 ]8 l" C result = result.rename("norm_paras") - N5 B& E6 }; E2 M% G' J result = result.to_netcdf("norm_fit_paras.nc",compute=False)$ H7 u" |# F* z5 F0 a! C with ProgressBar():7 x4 b- `. R4 [7 W+ g result.compute()
" a5 }/ _' M, g

转自:气海同途

1 \! F6 E9 |# g) t1 ^2 C3 l

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2025-1-27
快速回复 返回顶部 返回列表