收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

$ D$ Q7 ^; r! ~" `0 [* V
9 B/ G% n! v& y% K0 ]0 k

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

$ _1 [6 F! N+ ~$ K6 f - M+ B9 H3 W! W) v# n5 K& ^

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

; ~2 t! r1 X* S0 g0 k/ v8 z, d. x7 c% u+ o0 A& e+ m" |

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

0 A% O! d' E" \3 Q9 Q* Q" Y

其中几个关键点解释一下:

+ H5 J/ @1 U: f! K

(1)首先定义拟合正太分布的函数

4 A* I( p6 m+ E' S
def norm_fit(data): ! U$ c. }" o3 U, x- K% W; m& p loc, scale = st.norm.fit(data)7 N% G! B W# @ return np.array([loc, scale])
& o: E! r4 E( k1 v) \$ H* D

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

4 E* i2 A" d2 C4 w: I9 F! F

(2)xarray分块

3 n, H0 Z# C0 m% ?2 q* v" B- H% s+ V
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])1 i3 ]7 X: I% k- D/ F9 ]) n8 h2 u x = x.chunk({"lev":5, "lat":100,"lon":100})
8 g; \9 w' N5 K( M2 K* Z

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

6 L/ Q3 I. L B6 U9 D" N/ @

(3)xarray.apply_ufunc函数

9 s2 Z& y6 ]+ t" e3 B7 o
result = xr.apply_ufunc(norm_fit, 8 Q, z. J+ E; P" B: G# t) y: s x,( ~3 L: `* a6 K input_core_dims=[["time"]], : m i' k6 P% p, r1 o output_core_dims=[["paras"]],0 Q3 [( j- |) Y" x; Y dask="parallelized",; O0 Q; W# y$ B& X; m output_dtypes=[np.float],1 K+ y1 e# X; l! s# W& v* P+ W dask_gufunc_kwargs={output_sizes:{"paras":2}} 1 b% U! h* I5 {( K1 L )
1 _$ X2 s9 b0 d: u5 D2 g0 v

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

+ r, x' n4 v+ V. p0 s- f1 b' s' a4 N7 r, w

以下是全部代码:

" e f, s4 ]# P1 e$ W
from scipy import stats as st4 K& d6 U( z: Y4 |4 R& d. q 4 Y- `5 K' _ v8 x0 \ import xarray as xr & Z6 g+ m- u# @1 a) V- O import dask1 K! D& Y# y6 k, b% l import numpy as np0 Q D- O6 M# V2 G5 H+ n! D from dask.diagnostics import ProgressBar# Y- }4 F3 q! ~" Y 9 D+ Y( N2 W a0 I: a x def norm_fit(data): " \, s3 x' u! z. a6 y) h3 z loc, scale = st.norm.fit(data)9 C' w, z7 y3 S L' }2 f- e return np.array([loc, scale]) . _8 r" s- w, ~5 Q* g: B1 X* d* c5 ~. H7 o rs = np.random.RandomState(0): Y ]& l1 C0 o" T2 ~: {) n- s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 5 T8 C' [! n( D& s& F( x x = x.chunk({"lev":5, "lat":100,"lon":100})- H" ]3 s6 `- `/ ] 8 A @# S: ?4 E+ u& i+ h! o4 A #使用apply_ufunc计算,并用dask的并行计算( a7 v M8 a6 H result = xr.apply_ufunc(norm_fit, 0 W+ t; d- v& F# i x, 5 v: D- p- F$ x# `/ j( U/ b0 k input_core_dims=[["time"]],. P6 c5 [7 `" m+ s output_core_dims=[["paras"]],0 j- O' k' f6 l2 y8 N/ D dask="parallelized", 6 F r5 _- D4 A" w# F) \" I output_dtypes=[np.float], 2 |8 [7 F4 G* G4 `& m7 W* @ dask_gufunc_kwargs={output_sizes:{"paras":2}} 5 f \& q" x0 ?4 {' W9 j ) & T$ [3 X' D' {5 n/ d8 j 4 f3 q/ h' \7 E' x" E c #compute进行真实的计算并显示进度 + O5 P' e/ Y0 j) i with ProgressBar():9 F% K" S6 E8 p# E Y result = results.compute()! L8 K/ M- Q6 r$ @ 2 ^9 ~0 ?: t% t2 [2 Q( z9 Z #结果冲命名保存到nc文件 $ b& x- J, a k7 L3 Z; b result = result.rename("norm_paras") 5 o$ }3 i/ m s W! P result = result.to_netcdf("norm_fit_paras.nc",compute=False)1 j- U; F) S8 ^9 ~, h: ]! ` with ProgressBar(): ) H( S9 k* B2 N# I5 A4 X2 X7 n result.compute()
5 J8 g* J1 |; a, e

转自:气海同途

$ Y; G( N2 [& F2 c2 y( Y

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表