# -*- coding: utf-8 -*-
"""
Created on Wed Jul 11 11:59:05 2012
@author: Patricio Orio
"""
import numpy as np
import time
import matplotlib.pyplot as plt
from scipy import stats
def hurst(series,slope_span = 7):
"""
R/S Analysis of a time series to obtain the Hurst exponent.
A single 'H' value is obtained in Hfull[0] with p-value in Hfull[3]
Parameters
----------
series : array
data to be analyzed
slope_span : integer (default=7)
half-width of moving windows to calculate moving slopes
Returns
-------
N : List, 5*(int(np.log2(len(series)))-4) + 1
Lenghts of series for which R/S is calculated
E : List, same lenght of N
R/S statistics for each value of N
H : List of ndarrays. Lenght = len(N) - 2*slope_span
Fit coefficients (slope, intercept) in moving windows of width 2*slope_span
The first coefficient is centered in slope_span and the last in len(series)-slope_span
NN : List of arrays. Same length as H
Lenghts of series employed in the calculation of each of the coefficients in H
Hfull : LinregressResult, lenght 5
Full linear regression results between N and E.
(slope, intercept, rvalue, pvalue, stderr)
"""
# slopes will be calculated with 2*slope_span + 1 points
t1=time.time()
l=len(series)
totalint=int(np.floor(np.log2(l/2)))
intb2=np.arange(3,totalint+0.2,0.2) #log 2 of intervals to be used
N=np.int32(2**intb2) #intervals to be used
E=[]
for n in N:
short_series=np.resize(series,(l//n,n))
Y = short_series - np.mean(short_series,axis=-1)[:,None]
Z = np.cumsum(Y,axis=1)
R = Z.max(1) - Z.min(1)
S = np.std(short_series,axis=1)
E.append(np.mean(R/S))
print('ready',n,time.time()-t1)
H=[];NN=[]
for n in range(len(N)-slope_span*2):
Ndat=N[n:n+slope_span*2+1]
Edat=E[n:n+slope_span*2+1]
NN.append(Ndat)
H.append(np.ma.polyfit(np.log2(Ndat),np.log2(Edat),1))
Hfull=stats.linregress(np.log(N),np.log(E))
return N,E,H,NN,Hfull
def DFA(series,slope_span = 7):
"""
Detrended Fluctuation Analysis (DFA) of a time series.
A single 'H' value is obtained in DFfull[0] with p-value in Hfull[3]
Parameters
----------
series : array
data to be analyzed
slope_span : integer (default=7)
half-width of moving windows to calculate moving slopes
Returns
-------
N : List, 5*(int(np.log2(len(series)))-4) + 1
Lenghts of series for which DF is calculated
F : List, same lenght of N
Detrended Fluctuation for each value of N
H : List of ndarrays. Lenght = len(N) - 2*slope_span
Fit coefficients (slope, intercept) in moving windows of width 2*slope_span
The first coefficient is centered in slope_span and the last in len(series)-slope_span
NN : List of arrays. Same length as H
Lenghts of series employed in the calculation of each of the coefficients in H
DFfull : LinregressResult, lenght 5
Full linear regression results between N and F.
(slope, intercept, rvalue, pvalue, stderr)
"""
l=len(series)
totalint=int(np.floor(np.log2(l/2)))
intb2=np.arange(3,totalint+0.2,0.2) #log 2 of intervals to be used
N=np.int32(2**intb2) #intervals to be used
F=[]
for n in N:
Xfit=np.arange(n)
short_series=np.resize(series,(l//n,n)).T
Y = np.cumsum(short_series,axis=0)
lscoef=np.polyfit(Xfit,Y,1)
Fn = np.sqrt(np.sum((Y-lscoef[0,:]*Xfit[:,None]-lscoef[1,:])**2,0)/n)
F.append(np.mean(Fn))
print('ready',n)
DFfull=stats.linregress(np.log(N),np.log(F))
H=[];NN=[]
for n in range(len(N)-slope_span*2):
Ndat=N[n:n+slope_span*2+1]
Fdat=F[n:n+slope_span*2+1]
NN.append(Ndat)
H.append(np.ma.polyfit(np.log2(Ndat),np.log2(Fdat),1))
return N,F,H,NN,DFfull
def hurstS(series,slope_span = 7,mul=100):
"""
R/S and DFA for Surrogate data.
For building a single confidence interval, use the slopes in HfS[:,0] (R/S) and DFfS[:,0] (DFA)
Parameters
----------
series : array
data to be analyzed
slope_span : integer (default=7)
half-width of moving windows to calculate moving slopes
mul : integer (default 100)
Number of surrogate random series.
Returns
-------
surrHm : ndarray. Lenght = len(N) - 2*slope_span
Mean of the R/S coefficients for moving slopes. The first coefficient is centered in slope_span and the last in len(series)-slope_span
surrHsd : ndarray. Lenght = len(N) - 2*slope_span
Standard deviation the R/S coefficients for moving slopes.
surrDFm : ndarray. Lenght = len(N) - 2*slope_span
Mean of the DFA coefficients for moving slopes. The first coefficient is centered in slope_span and the last in len(series)-slope_span
surrDFsd : ndarray. Lenght = len(N) - 2*slope_span
Standard Deviation of the DFA coefficients for moving slopes. The first coefficient is centered in slope_span and the last in len(series)-slope_span
HfS : 2-D ndarray, shape (mul,5)
Full linear regression results for the R/S on each of the surrogate series.
(slope, intercept, rvalue, pvalue, stderr)
DFfS : 2-D ndarray, shape (mul,5)
Full linear regression results for the DFA on each of the surrogate series.
(slope, intercept, rvalue, pvalue, stderr)
"""
# slopes will be calculated with 2*slope_span + 1 points
t1=time.time()
l=len(series)
totalint=int(np.floor(np.log2(l/2)))
intb2=np.arange(3,totalint+0.2,0.2) #log 2 of intervals to be used
seriesSurr=[np.random.permutation(series) for i in range(mul)]
N=np.int32(2**intb2) #intervals to be used
E=[]
F=[]
for n in N:
Xfit=np.arange(n)
short_series=np.array([np.resize(serSurr,(l//n,n)) for serSurr in seriesSurr])
Y = short_series - np.mean(short_series,axis=-1)[:,:,None]
Z = np.cumsum(Y,axis=-1)
R = Z.max(-1) - Z.min(-1)
S = np.std(short_series,axis=-1)
E.append(np.mean(R/S,-1))
Yf = np.cumsum(short_series,axis=-1)
Yf = np.rollaxis(Yf,2,1)
lscoef=np.array([np.polyfit(Xfit,Yfm,1) for Yfm in Yf])
Fn = np.sqrt(np.sum((Yf-lscoef[:,[0],:]*Xfit[None,:,None]-lscoef[:,[1],:])**2,1)/n)
F.append(np.mean(Fn,-1))
print('ready surr',n,time.time()-t1)
E=np.asarray(E).T
F=np.asarray(F).T
Hfull=np.array([stats.linregress(np.log(N),np.log(y)) for y in E])
DFfull=np.array([stats.linregress(np.log(N),np.log(y)) for y in F])
H=[];NN=[];Hs=[]
for n in range(len(N)-slope_span*2):
idx=range(n,n+slope_span*2+1)
NN.append(N[idx])
H.append([np.ma.polyfit(np.log2(N[idx]),np.log2(Edat[idx]),1)[0]
for Edat in E])
Hs.append([np.ma.polyfit(np.log2(N[idx]),np.log2(Fdat[idx]),1)[0]
for Fdat in F])
return np.mean(H,-1),np.std(H,-1),np.mean(Hs,-1),np.std(Hs,-1),Hfull,DFfull
if __name__=='__main__':
series=np.random.normal(size=2000)
sspan=5
N,E,H,NN,Hful=hurst(series,sspan)
N,F,Hf,NN,Dful=DFA(series,sspan)
surrHm,surrHsd,surrHfm,surrHfsd,HfS,DfS=hurstS(series,sspan,100)
plt.figure(1,figsize=(15,7))
plt.clf()
plt.subplot(231)
plt.plot(np.cumsum(series),series,'.',ms=2)
plt.yscale('log')
# ax2=plt.subplot(234)
# readRetData.stattest(np.cumsum(series/1000),plot=True,ax=ax2,maxW=12)
plt.subplot(232)
plt.loglog(N,E,'.')
# plt.loglog(N,F,'.')
plt.ylabel("mean rescaled range\n $<RS(n)>$")
[plt.plot(dat,2**(np.log2(dat)*h0+h1)) for (h0,h1),dat in zip(H,NN)]
# plt.plot(dat,2**(np.log2(dat)*h[0]+h[1]))
plt.xlim(xmin=5)
xmin,xmax=plt.xlim()
plt.subplot(235)
plt.semilogx(N[sspan:-sspan],np.array(H)[:,0],'.')
plt.semilogx(N[sspan:-sspan],surrHm,'r')
plt.fill_between(N[sspan:-sspan],surrHm+2*surrHsd,surrHm-2*surrHsd,alpha=0.5)
plt.xlabel("length of sequence ($n$)")
plt.ylabel("slope ($H$ value)")
plt.xlim((xmin,xmax))
plt.subplot(233)
# plt.loglog(N,E,'.')
plt.loglog(N,F,'.')
plt.ylabel("Detrended Fluctuation")
[plt.plot(dat,2**(np.log2(dat)*h0+h1)) for (h0,h1),dat in zip(Hf,NN)]
# plt.plot(dat,2**(np.log2(dat)*h[0]+h[1]))
plt.xlim(xmin=5)
xmin,xmax=plt.xlim()
plt.subplot(236)
plt.semilogx(N[sspan:-sspan],np.array(Hf)[:,0],'.')
plt.semilogx(N[sspan:-sspan],surrHfm,'r')
plt.fill_between(N[sspan:-sspan],surrHfm+2*surrHfsd,surrHfm-2*surrHfsd,alpha=0.5)
plt.xlabel("length of sequence ($n$)")
plt.ylabel("slope ($H$ value)")
plt.xlim((xmin,xmax))
plt.tight_layout()
print(H)
# print N
# print E