收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

. |$ g% \: I3 h# w( Q$ W/ w2 Y
; W( v5 O9 H' |# ~( u

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

t" Q K( ~! i' j; h) L 5 r/ s6 m% i3 B; v3 d9 u4 |* `

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

/ f* B9 r v. }# E& D" ^0 W3 c , H% ~2 v, I0 ]& J C

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

# w% c; W0 @& ]1 U7 n+ T# r# K

其中几个关键点解释一下:

( T! n9 z) k6 t+ w4 g. |

(1)首先定义拟合正太分布的函数

# ]8 K* o' K2 X y- }; Y, i8 m! z
def norm_fit(data):; `2 F, {' u$ L, t' w loc, scale = st.norm.fit(data) 4 @$ P& u2 Y' h5 R S7 X! `) H return np.array([loc, scale])
' f( i2 a4 q) ]

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

( u9 l% {; d" S# A6 ^9 ?

(2)xarray分块

7 I8 h* a/ w! R9 I
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) . N7 |2 d# N E0 e6 z2 U L x = x.chunk({"lev":5, "lat":100,"lon":100})
+ {4 ~8 {0 X, ?5 ]% V+ t: `

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

( v" A7 `& R$ y6 `7 ` s u

(3)xarray.apply_ufunc函数

5 _0 ^9 h2 ~ q5 y) w- ] W
result = xr.apply_ufunc(norm_fit, & `$ D( b) g; a: d. s; K x,% h, G; |5 i- V6 `8 v6 q input_core_dims=[["time"]],4 p9 D0 B2 V4 Y- J, c output_core_dims=[["paras"]], 7 g3 h" P3 k' P9 d# d dask="parallelized",' i* @8 b9 B6 U4 `& g- J output_dtypes=[np.float], 0 I" c1 n* @( m$ x6 v dask_gufunc_kwargs={output_sizes:{"paras":2}} 8 I' X1 e- Y& k )
3 R- r0 |8 C" ?

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

# }7 V. }6 o! L4 P# s% \, J ) v3 s2 n/ n1 G4 ]7 Q2 n( J

以下是全部代码:

. K( Y; t5 s" I
from scipy import stats as st6 I, W0 t1 K) s% J9 e) n' V 4 N9 _+ R0 ~5 R) W0 M' u import xarray as xr9 x- V ]6 b5 t: I* V' n$ ~ import dask + k. s N$ q, J9 z- V import numpy as np& j$ X$ @4 M8 q+ o+ g i- H+ F from dask.diagnostics import ProgressBar & P1 |( Q% B# c1 [2 j0 y$ E2 d* r4 d, n" e: \6 E def norm_fit(data): / y$ B2 h3 X0 B J3 n! l loc, scale = st.norm.fit(data) U4 {( G, {! j+ |4 L* { return np.array([loc, scale]) P+ k+ m' ]4 l : H4 Z8 j/ {0 z1 ?( @ D1 H8 n rs = np.random.RandomState(0) . Y# I" m% C, @0 u4 M: ] x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 3 m. l% T6 F6 f, y3 Y% ?: w x = x.chunk({"lev":5, "lat":100,"lon":100}) ) q1 w! A1 o* y3 X5 x# s1 f( v- u( ^; \. N" P3 q% ?# g6 ` #使用apply_ufunc计算,并用dask的并行计算 , D8 q4 q, F2 I: Z3 ^7 k result = xr.apply_ufunc(norm_fit,( R, ]6 ]3 ?3 k6 }* [% g x, ' L5 |+ p9 E" |; m* ]0 g5 P input_core_dims=[["time"]],8 |' T- t7 v- z0 v$ { output_core_dims=[["paras"]], 9 r; R7 n0 f4 H+ v1 ~% u dask="parallelized",1 a" a0 V2 p) C0 p output_dtypes=[np.float], ) V* |6 u/ |' a6 v4 w$ t dask_gufunc_kwargs={output_sizes:{"paras":2}} ( q I" H1 D9 |' B )4 j) }9 G0 o* a( N$ y+ C. }* o5 d" d 4 |' z1 M+ D7 Y# e0 X6 |5 i0 N #compute进行真实的计算并显示进度, g" b/ p5 V$ l7 M with ProgressBar():) X E+ i( t. {/ x! {4 e1 t result = results.compute() 8 _5 v5 c8 B5 Q$ Y( ?. V$ h! J! c7 w+ r, `/ X8 T0 i4 Y6 L #结果冲命名保存到nc文件 $ H" Y& b" I& h* F# z result = result.rename("norm_paras") 6 i E: e1 k5 ]) k, E result = result.to_netcdf("norm_fit_paras.nc",compute=False) . r, i* e" A6 _! m with ProgressBar(): $ E' h' ]) r# Z, c result.compute()
4 O7 |' y. \% i

转自:气海同途

6 W6 S# F8 J( J

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-2-7
快速回复 返回顶部 返回列表