收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

; g- x; t: Y' ?2 D8 X
) K. i: c0 A) L6 N l( l4 ^

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

0 \ }+ t3 }0 { # `+ `$ i6 J: D

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

. ^4 k- q/ F$ q/ L: F/ P( m 0 g$ q2 D& P$ e0 Q: z

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

! o' a6 S, J. L) S) y7 o- t0 q

其中几个关键点解释一下:

f5 W+ }# j7 f# {& j4 R

(1)首先定义拟合正太分布的函数

" |; B) t+ c8 w. Y2 L* b
def norm_fit(data): / I( N+ I% j x9 Z8 v loc, scale = st.norm.fit(data) / h9 E, F# D' [' p) h' h7 ?7 E return np.array([loc, scale])
; T, V4 d* V; A- g" k# X

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

: h% k7 w) j) J6 x0 ~( ` B

(2)xarray分块

3 h4 O; x( V1 p/ Q; b
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 4 _( `* [6 f/ x! } x = x.chunk({"lev":5, "lat":100,"lon":100})
4 W( j$ H- c# R9 k& ?: @0 G0 W7 U, m

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

) I& a. \# K! M) s5 Y

(3)xarray.apply_ufunc函数

( v* l) K: s; I
result = xr.apply_ufunc(norm_fit,# O& o- k0 m; ]* E& v x," \/ ]: T% h% Q+ S. w7 {& F input_core_dims=[["time"]], ( D& U6 |6 X- `# Q9 {9 ] output_core_dims=[["paras"]],5 K/ h. q6 p- s; | dask="parallelized", $ b7 Q$ R. s* ]' p [$ J2 l output_dtypes=[np.float], . @% b8 M+ `; D7 f! N% c1 I6 x dask_gufunc_kwargs={output_sizes:{"paras":2}} 6 d" y' _8 J+ L )
5 g( u/ R6 m, |* J0 q( \

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

3 f: ]# @6 U* {9 }' x' x- _! G. Y4 p: b* J8 Z Z

以下是全部代码:

( [2 k/ R/ @( y7 `8 O/ g( x5 f
from scipy import stats as st * V y0 X( i/ }8 V% X 3 M& q4 M5 Q- v, h import xarray as xr ) ~) C# ]/ I9 x: v% y& I import dask $ n0 j- ~9 M; Y6 V0 e0 O import numpy as np 0 T/ [/ \3 B% `9 n* C from dask.diagnostics import ProgressBar$ C8 s5 o6 y& _5 o& w" W / Z( A# i- e% P) ]& | def norm_fit(data):" n6 u1 P6 W3 q& h0 u loc, scale = st.norm.fit(data)0 l. x/ T; q5 V$ E return np.array([loc, scale])8 j: M: b1 L5 P1 ~ 7 L ^8 ?1 o' }2 n* O& M }/ K" k' O rs = np.random.RandomState(0)+ N- S, u9 j* D1 T) D x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) $ {- b: L/ \/ E, L1 g x = x.chunk({"lev":5, "lat":100,"lon":100}) # i! w/ @4 ?5 @1 d / Z4 O& U7 _, C# d5 L #使用apply_ufunc计算,并用dask的并行计算; w' Z; P3 _1 r* U% D result = xr.apply_ufunc(norm_fit, r/ S( H6 a) |! \ R x, # u: _# b3 v/ f input_core_dims=[["time"]],# x9 u, s, x9 y' m) [ output_core_dims=[["paras"]], / o8 M; H U' w6 H C% K: X. F9 V dask="parallelized",% r( e6 _1 e, {/ m2 K* ^) N. G output_dtypes=[np.float], " Q( Q' q# ~% c; p dask_gufunc_kwargs={output_sizes:{"paras":2}}, y2 O: X) \# [* V; z )1 R8 G+ B6 E6 D& S/ l $ X2 k* ~" I/ x- i1 y2 T8 D #compute进行真实的计算并显示进度/ }% f! B' W3 `9 A. j with ProgressBar(): & |9 T" q8 B3 ?1 r, v: q# x2 S8 M result = results.compute()! @7 B; \) p* a9 S1 f$ u+ { ( Q) h V( e! p% P9 N; X #结果冲命名保存到nc文件/ ]0 l! v8 O: e result = result.rename("norm_paras") 6 O8 j; M9 o d* \- q. ^ result = result.to_netcdf("norm_fit_paras.nc",compute=False) 3 o9 G! \) g! b6 b with ProgressBar(): 3 h! N. e3 p" N( p3 N/ ^/ L5 n result.compute()
( }+ d& [! v! }% ?1 t7 N

转自:气海同途

3 Q7 {5 P ^ V# L' I

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表