马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?我要加入
x
本帖最后由 陈波 于 2015-10-13 13:52 编辑
自己写的一个简单的LMD算法程序,需要 Python 3.0 或以上版本。
注意:未作计算优化,速度较慢。
欢迎大家提意见。
- """
- This module implements Local Mean Decomposition signal processing algorithm
- as described in [1].
- References:
- [1] Jonathan S. Smith. The local mean decomposition and its application to EEG
- perception data. Journal of the Royal Society Interface, 2005, 2(5):443-454
- """
- #
- # 算法控制参数
- #
- # 是否把信号的端点看作伪极值点
- INCLUDE_ENDPOINTS = True
- # 滑动平均算法: 最多迭代的次数
- MAX_SMOOTH_ITERATION = 12
- # 分离局部包络信号时最多迭代的次数
- MAX_ENVELOPE_ITERATION = 200
- ENVELOPE_EPSILON = 0.01
- CONVERGENCE_EPSILON = 0.01
- # 最多生成的积函数个数
- MAX_NUM_PF = 8
- # for debugging
- DBG_FIG = None
- def is_monotonous(signal):
- """判断信号是否是(非严格的)单调序列"""
- def is_monotonous_increase(signal):
- y0 = signal[0]
- for y1 in signal:
- if y1 < y0:
- return False
- y0 = y1
- return True
- def is_monotonous_decrease(signal):
- y0 = signal[0]
- for y1 in signal:
- if y1 > y0:
- return False
- y0 = y1
- return True
- if len(signal) <= 0:
- return True
- else:
- return is_monotonous_increase(signal) \
- or is_monotonous_decrease(signal)
- def find_extrema(signal):
- """找出信号的所有局部极值点位置."""
-
- n = len(signal)
- # 局部极值的位置列表
- extrema = [ ]
- lookfor = None
- for x in range(1, n):
- y1 = signal[x-1]; y2 = signal[x]
- if lookfor == "min":
- if y2 > y1:
- extrema.append(x-1)
- lookfor = "max"
- elif lookfor == "max":
- if y2 < y1:
- extrema.append(x-1)
- lookfor = "min"
- else:
- if y2 < y1:
- lookfor = "min"
- elif y2 > y1:
- lookfor = "max"
- # 优化(微调)极值点的位置:
- # 当连续多个采样值相同时,取最中间位置作为极值点位置
- def micro_adjust(x):
- assert(0 <= x < n)
- y = signal[x]; lo = hi = x
- while (lo-1 >= 0) and (signal[lo-1] == y): lo -= 1
- while (hi+1 < n) and (signal[hi+1] == y): hi += 1
- if INCLUDE_ENDPOINTS:
- if lo == 0: return 0
- if hi == n-1: return n-1
- return (lo + hi) // 2
- extrema = [micro_adjust(x) for x in extrema]
- if extrema and INCLUDE_ENDPOINTS:
- if extrema[0] != 0:
- extrema.insert(0, 0)
- if extrema[-1] != n-1:
- extrema.append(n-1)
- return extrema
- def moving_average_smooth(signal, window):
- """使用移动加权平均算法平滑方波信号."""
- n = len(signal)
- # at least one nearby sample is needed for average
- if window < 3:
- window = 3
- # adjust length of sliding window to an odd number for symmetry
- if (window % 2) == 0:
- window += 1
-
- half = window // 2
- weight = list(range(1, half+2)) + list(range(half, 0, -1))
- assert(len(weight) == window)
- def sliding(signal, window):
- for x in range(n):
- x1 = x - half; x2 = x1 + window
- w1 = 0; w2 = window
- if x1 < 0: w1 = -x1; x1 = 0
- if x2 > n: w2 = n-x2; x2 = n
- yield signal[x1:x2], weight[w1:w2]
- def weighted(signal, weight):
- assert(len(signal) == len(weight))
- return sum(y*w for y,w in zip(signal, weight)) / sum(weight)
- def is_smooth(signal):
- for x in range(1, n):
- if signal[x] == signal[x-1]:
- return False
- return True
- smoothed = signal
- for i in range(MAX_SMOOTH_ITERATION):
- smoothed = [weighted(s,w) for s,w in sliding(smoothed, window)]
- if is_smooth(smoothed): break
- return smoothed
- def local_mean_and_envelope(signal, extrema):
- """根据极值点位置计算局部均值函数和局部包络函数."""
- n = len(signal); k = len(extrema); assert(1 < k <= n)
- # construct square signal
- mean = [ ]; enve = [ ]
- prev_mean = (signal[extrema[0]] + signal[extrema[1]]) / 2
- prev_enve = abs(signal[extrema[0]] - signal[extrema[1]]) / 2
- e = 1
- for x in range(n):
- if (x == extrema[e]) and (e + 1 < k):
- next_mean = (signal[extrema[e]] + signal[extrema[e+1]]) / 2
- mean.append((prev_mean + next_mean) / 2)
- prev_mean = next_mean
- next_enve = abs(signal[extrema[e]] - signal[extrema[e+1]]) / 2
- enve.append((prev_enve + next_enve) / 2)
- prev_enve = next_enve
- e += 1
- else:
- mean.append(prev_mean)
- enve.append(prev_enve)
- # smooth square signal
- window = max(extrema[i]-extrema[i-1] for i in range(1, k)) // 3
- return mean, moving_average_smooth(mean, window), \
- enve, moving_average_smooth(enve, window)
- def extract_product_function(signal):
- s = signal; n = len(signal)
- envelopes = [ ] # 每次迭代得到的包络信号
- def component():
- c = [ ]
- for i in range(n):
- y = s[i]
- for e in envelopes:
- y = y * e[i]
- c.append(y)
- return c
- for i in range(MAX_ENVELOPE_ITERATION):
- extrema = find_extrema(s)
- if len(extrema) <= 3: break
- m0, m, a0, a = local_mean_and_envelope(s, extrema)
- for y in a:
- if y <= 0: raise ValueError("invalid envelope signal")
- # 对原信号进行调制
- h = [y1-y2 for y1,y2 in zip(s, m)]
- t = [y1/y2 for y1,y2 in zip(h, a)]
- if DBG_FIG:
- DBG_FIG.plot(i, component(), s, m0, m, a0, a, h, t)
- # 得到纯调频信号时终止迭代
- err = sum(abs(1-y) for y in a) / n
- if err <= ENVELOPE_EPSILON: break
- # 调制信号收敛时终止迭代
- err = sum(abs(y1-y2) for y1,y2 in zip(s, t)) / n
- if err <= CONVERGENCE_EPSILON: break
- envelopes.append(a); s = t
- return component()
- def lmd(signal):
- pf = [ ]
- # 每次迭代得到一个PF分量, 直到残余函数接近为一个单调函数为止
- residue = signal[:]
- while (len(pf) < MAX_NUM_PF) and \
- (not is_monotonous(residue)) and \
- (len(find_extrema(residue)) >= 5):
- component = extract_product_function(residue)
- residue = [y1-y2 for y1,y2 in zip(residue, component)]
- pf.append((component, residue))
- return pf
复制代码
|