# -*- coding: utf-8 -*- """ Created on Thu Sep 13 12:54:10 2018 @author: Jan """ import numpy as np #import matplotlib.pyplot as plt #import time from io import IOBase import struct from scipy.signal import butter, lfilter, lfilter_zi, filtfilt, decimate import sys import time import pandas as pd from matplotlib import pyplot as plt #%% #class MchanDataStream: # def __init__(self, source): # self.nChan=nChan # self.sweeplen=1000 ## for the continuous recording we use a timer thread whcih is copied from # https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds from threading import Timer TimerFcnBusy=False class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self._timer = None #self.start() def _run(self): self.function(*self.args, **self.kwargs) self.is_running = False self.start() def start(self): TimerFcnBusy=False if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): TimerFcnBusy=False if not self._timer == None: self._timer.cancel() self.is_running = False #%% class Virtual_Cont_Recorder: # virtual class of continuous ephys recorders. # THis virtual recorder will produce simulated data. # From this we derive hardware specific classes which record actual data, e.g. the one below. # Continuous recorders organise the data in sweeps that go from one trigger event to the # next. Sweeps are therefore variable in length. def __init__(self): self.sweepLen=1.0 self.sweeps=[] self.sigBuf=() self.lastIdx=0 self.lastError='' self.totalUploaded=0 self.sweepEnd=0 self.timer=None self.timestampBuffer=[-1] self.spikeFilter = None self.sampleRate=25000 # work out how many channels there are self.nChan=16 def running(self): return self.timer.is_running def uploadData(self): self.timer.stop() # stop timer to prevent collisions # this is called repeatedly by timer to upload data recorded by the ahrdware to buffers try: # for the virtual recorder, make noise buffers to simulate recorder input now=time.time() if not hasattr(self,'lastUploadAt'): self.lastUploadAt=self.recordingStartedAt self.sweepStart=0 self.sweepEnd=1e13 timeSinceLastUpload=now-self.lastUploadAt self.lastUploadAt=time.time() sweepEnd=int((now-self.recordingStartedAt)*self.sampleRate) nextChunkSize=int(timeSinceLastUpload*self.sampleRate) self.totalUploaded+=nextChunkSize sig=[tuple(np.random.random(self.nChan*nextChunkSize))] self.sigBuf += sig[0] # print('uploaded {} samples so far'.format(self.totalUploaded)); sys.stdout.flush() # check if a sweep has ended. if sweepEnd > self.sweepEnd+1: self.processNewSweep(self.sweepStart,self.sweepEnd) self.sweepStart=self.sweepEnd self.sweepEnd=1e13 self.timer.start # restart timer except BaseException as e: # make sure system timer is halted if error occurs self.stop() self.lastError=str(e) raise def trigger(self): # a trigger marks that a sweep has ended. self.sweepEnd=int((time.time()-self.recordingStartedAt)*self.sampleRate) def processNewSweep(self,sweepStart,sweepEnd): sweepLen=sweepEnd-sweepStart if sweepLen<10: return nSamplesExpected=sweepLen*self.nChan nSamplesGot=int(np.floor(len(self.sigBuf)/self.nChan)*self.nChan) if nSamplesExpected > nSamplesGot: print('*** processNewSweep expected: {} got:{}. Skipping'.format(nSamplesExpected,nSamplesGot)); sys.stdout.flush() return if len(self.timestampBuffer) == 0: tStamp=0.0 else: tStamp=self.timestampBuffer.pop(0) newSweep=MCsweep(self.sigBuf[:nSamplesExpected],sweepLen,self.sampleRate,tStamp) self.sweeps.append(newSweep) self.sigBuf=self.sigBuf[nSamplesExpected:] self.sweepEnd=sweepEnd def samplesInSigBuf(self): return int(len(self.sigBuf)/self.nChan) def secondsInSigBuf(self): return self.samplesInSigBuf/self.sampleRate def sweepAcquired(self): return self.secondsInSigBuf() >= self.sweepLen def runningFor(self): return time.time()-self.recordingStartedAt def start(self): self.lastIdx=0 self.sigBuf=() self.sweepStart=0 self.sweepEnd=0 self.totalUploaded=0 self.timer=RepeatedTimer(self.sweepLen,self.uploadData) #self.timer=RepeatedTimer(0.3,self.uploadData) self.timer.start() self.recordingStartedAt=time.time() self.timestampBuffer=[-1] print('recorder started.') sys.stdout.flush() def stop(self): if not self.timer is None: self.timer.stop() # if not self.lastError=='': # print('***Error: ',self.lastError) # collect the last sweep self.uploadData() self.processNewSweep(0,self.samplesInSigBuf()) print('recorder stopped.') sys.stdout.flush() #%% test Virtual Recorder #r=Virtual_Cont_Recorder() #r.start() #time.sleep(0.1) #print('trigger...') #r.trigger() #time.sleep(0.5) #print('trigger...') #r.trigger() #time.sleep(0.5) #print('trigger...') #r.trigger() #time.sleep(0.5) #print('finishing...') #r.stop() #%% class TDT_RZ2_Cont_Recorder(Virtual_Cont_Recorder): def __init__(self, circuitFile='MC-continuous-1x32atlasOmniToZIF.rcx'): from TDT_rpcox import RPcoX from zBus import ZBUSx super().__init__() #path='C:\\Users\\colliculus\\ratCageProgramsV2\\' #self.circuitFile='RZ2-1x32atlasOmniToZIF_WB.rcx' self.circuitFile=circuitFile #self.circuitFile="MCtest.rcx" #self.TDTproject = DSPProject() self.zBus = ZBUSx() if self.zBus.ConnectZBUS('GB') == 0: raise ImportError('TDT ActiveX failed to connect to ZBUS') self.RP = RPcoX() if self.RP.ConnectRZ2('GB',1) == 0: raise ImportError('TDT ActiveX failed to connect to RZ2') if self.RP.LoadCOF(self.circuitFile) == 0: raise ImportError('TDT ActiveX failed to load circuit {}'.format(self.circuitFile)) self.RP.Run() time.sleep(0.1) self.sampleRate=self.RP.GetSFreq() # work out how many channels there are self.nChan=int(self.RP.GetTagVal('nChan')) self.bufSize=int(self.RP.GetTagVal('bufSize')) def uploadData(self): try: # add new data from RZ2 buffer to local sigBuf currIdx=int(self.RP.GetTagVal('ADidx')) # self.lastError='upload idx {}'.format(currIdx) bufEnd=currIdx if bufEnd < self.lastIdx: bufEnd+=self.bufSize nextChunkSize=bufEnd-self.lastIdx if nextChunkSize < 500: # too small, don't bother return # reads the recorder's multichannel signal sig=self.RP.ReadTagVEX('ADmsig',self.lastIdx,nextChunkSize,'F32','F32',1) if sig is None: raise Exception("RP.ReadTagVEX('ADmsig',{},{},...) returned no data.".format(self.lastIdx,nextChunkSize)) self.lastIdx=currIdx self.sigBuf += sig[0] self.totalUploaded+=nextChunkSize # print('uploaded {} samples so far'.format(self.totalUploaded)); sys.stdout.flush() # check if a sweep has ended. sweepEnd=int(self.RP.GetTagVal('sweepEnd')) # RZ6 keeps track of when the last sweep ended (as marked by zBus trigger) if sweepEnd > self.sweepEnd+1: # this tells us that the new sweepEnd is beyond the last remembered sweepEnd, so a new sweep is finished self.processNewSweep(int(self.RP.GetTagVal('sweepStart')),sweepEnd) except BaseException as e: # make sure system timer is halted if error occurs self.lastError=str(e) self.stop() raise e def trigger(self): self.zBus.zBusTrigA(0,0,6) def start(self): self.RP.SoftTrg(1) #super().start() self.lastIdx=0 self.sigBuf=() self.sweepStart=0 self.sweepEnd=0 self.totalUploaded=0 #self.timer=RepeatedTimer(self.sweepLen,self.uploadData) self.timer=RepeatedTimer(min(self.sweepLen,0.5),self.uploadData) # self.timer.start() self.recordingStartedAt=time.time() self.timestampBuffer=[-1] print('recorder started.') sys.stdout.flush() def stop(self): self.timer.stop() # collect last sweep self.uploadData() sweepStart=int(self.RP.GetTagVal('sweepStart')) sweepEnd=int(np.floor(sweepStart+len(self.sigBuf)/self.nChan)) # if not self.lastError=='': # print('***Error: ',self.lastError) self.processNewSweep(sweepStart,sweepEnd) print('recorder stopped.') sys.stdout.flush() """ sweep based recorder type. Deprecated class TDT_RZ2_Recorder: def __init__(self): from TDT_rpcox import RPcoX from zBus import ZBUSx #path='C:\\Users\\colliculus\\ratCageProgramsV2\\' #self.circuitFile='RZ2-1x32atlasOmniToZIF_WB.rcx' #self.circuitFile='MC-1x32atlasOmniToZIF.rcx' self.circuitFile="MCtest.rcx" self.dataFileName="" self.lastSweep=None #self.TDTproject = DSPProject() self.zBus = ZBUSx() if self.zBus.ConnectZBUS('GB') == 0: raise ImportError('TDT ActiveX failed to connect to ZBUS') self.RP = RPcoX() if self.RP.ConnectRZ2('GB',1) == 0: raise ImportError('TDT ActiveX failed to connect to RZ2') if self.RP.LoadCOF(self.circuitFile) == 0: raise ImportError('TDT ActiveX failed to load circuit {}'.format(self.circuitFile)) self.RP.Run() self.sampleRate=self.RP.GetSFreq() # work out how many channels there are self.nChan=int(self.RP.GetTagVal('nChan')) self.sweeplen=int(self.RP.GetTagVal('sweeplen')) def recording(self): return self.RP.GetTagVal('active') > 0 def getMsignal(self, offset, nPoints): # reads the recorder's multichannel signal sig = self.RP.ReadTagVEX('ADmsig',offset,nPoints*self.nChan,'F32','F32',1) if sig is None: raise Exception('TDT_RZ2_Recorder.getMsignal() returned no data.') return sig[0] def setSweepLen(self, sweeplen): sweeplen=int(np.ceil(sweeplen)) if self.RP.SetTagVal('sweeplen',sweeplen) == 0: raise Exception('TDT_RZ2_Recorder.setSweepLen() failed.') self.sweeplen=sweeplen def getSweep(self): self.lastSweep= np.array(self.getMsignal(0,self.sweeplen)).reshape(self.sweeplen,self.nChan) return self.lastSweep def trigger(self): self.zBus.zBusTrigA(0,0,6) def stop(self): self.RP.SoftTrg(9) def done(self): self.RP.Halt() """ #%% class MCsweep: def __init__(self, source=None, sweeplen=1000, srate=0, tStamp=0.0): # initialize empty sweep self.signal=None self.timeStamp=tStamp self.sampleRate=srate if isinstance(source,tuple): self.signal=np.array(source).reshape(sweeplen,int(len(source)/sweeplen)) if isinstance(source, (int, float)): self.sampleRate=source if hasattr(source, 'getSweep'): # read in sweep from TDT device self.signal= source.getSweep() self.sampleRate=source.sampleRate if isinstance(source, IOBase): # read sweep from file sRate, tStamp, sig = readMchanSignalFromFile(source) self.signal=sig self.timeStamp=tStamp self.sampleRate=sRate def nyquist(self): return self.sampleRate/2 def makeSineSweep(self,freqs=[100,250,500,1000], t_offset=0, sweeplen=0.100,srate=10000): # constructor for sine sweep objects used for testing filtering period=1/srate Nsamples=int(sweeplen*srate) Nchan=len(freqs) self.sampleRate=srate self.signal=np.zeros((Nsamples,Nchan)) t=np.linspace(0,Nsamples-1,num=Nsamples)*period+t_offset for cc in range(Nchan): self.signal[:,cc]=np.cos(2*np.pi*freqs[cc]*t) def duration(self): return self.signal.shape[0]/self.sampleRate def spikeFilterCoeffs(self): b,a = butter(4,(300/self.nyquist(), 3000/self.nyquist()),'bandpass') return [b,a] def LFPfilterCoeffs(self, lowpass=300, highpass=30): b,a = butter(4,(highpass/self.nyquist(), lowpass/self.nyquist()),'bandpass') return [b,a] def AMUAFilterCoeffs(self, lowpass=6000): bBand,aBand = butter(4,(300/self.nyquist(), 6000/self.nyquist()),'bandpass') bLow,aLow = butter(4,(lowpass/self.nyquist()),'lowpass') return [[bBand,aBand], [bLow,aLow]] def IIRfilterFlipped(self,coefs, trimEnd=200): # make a filtered copy of the sweep # with IIR filter with coefficients b, a result=MCsweep(self.sampleRate) result.timeStamp=self.timeStamp result.signal, zo =np.flipud(lfilter(coefs[0],coefs[1],np.flipud(self.signal),axis=0)) if trimEnd > 0: result.signal=result.signal[0:-trimEnd,:] return result, zo def IIRfilter(self,coefs, zi=None): # make a filtered copy of the sweep # with IIR filter with coefficients b, a result=MCsweep(self.sampleRate) result.timeStamp=self.timeStamp Nchans=self.signal.shape[1] if zi is None: zi=[] zi_base=lfilter_zi(coefs[0],coefs[1]) for chan in range(Nchans): #zi.append(zi_base) zi.append(np.zeros((len(coefs[1])-1,))) result.signal=np.zeros(self.signal.shape) zo=[] for chan in range(Nchans): y, zo_c=lfilter(coefs[0],coefs[1],self.signal[:,chan],axis=0,zi=zi[chan]) result.signal[:,chan]=y zo.append(zo_c) # if type(result.signal) == tuple: # result.signal=result.signal[0] # Ntaps=coefs[0].shape[0] # next_zi=np.zeros((Ntaps-1,Nchans)) # for chan in range(Nchans): # y=np.flipud(result.signal[-Ntaps:,chan]) # x=np.flipud(self.signal[-Ntaps:,chan]) ## y=(result.signal[-Ntaps:,chan]) ## x=(self.signal[-Ntaps:,chan]) # next_zi[:,chan]=lfiltic(coefs[0],coefs[1], y, x) # return result, zo return result def calcLFP(self,coefs=[], padLen=300, downsample=10): # make a AMUA copy of the sweep # Calculates analog multiunit activity (AMUA) measure as in periodic gerbil paper. # See http://journal.frontiersin.org/article/10.3389/fncir.2015.00037/full result=MCsweep(self.sampleRate/downsample) # decimation will make AMUA have half orig sample rate result.timeStamp=self.timeStamp if coefs==[]: coefs=self.LFPfilterCoeffs() # # We pad the signal with even symmetry copies # head=-np.flipud(self.signal[0:padLen,:]) # headShift=head[-1,:]-self.signal[0,:] # head=head-headShift # tail=-np.flipud(self.signal[-padLen:,:]) # tailShift=tail[0,:]-self.signal[-1,:] # tail=tail-tailShift # insig=np.vstack([head, self.signal, tail]) # insig=filtfilt(coefs[0],coefs[1],insig,axis=0) # insig=filtfilt(coefs[0],coefs[1],insig,axis=0) # result.signal=insig[padLen:-padLen,:] result.signal=filtfilt(coefs[0],coefs[1],self.signal,axis=0,padlen=padLen) # downsample result.signal=decimate(result.signal,downsample,axis=0) return result def sigSnip(self,t, rest=False): # returns section of the signal covering time window t[0] to t[1] in seconds # if "rest" is True, it returns the section *outside* the window t0=np.floor(np.round(t[0]*self.sampleRate)).astype(int) if t[1]== -1: # t[1] of -1 means "to the end if rest: return self.signal[0:t0,:] else: return self.signal[t0:,:] else: t1=int(np.floor(t[1]*self.sampleRate)) if rest: s=self.signal[0:t0] if self.signal.shape[0]-t1 > 0: s=np.vstack((s,self.signal[t1:])) return s else: return self.signal[range(t0,t1),:] def interpBlanking(self,x1,x2): # blanks out a presumed aretefact from sample x1 to sample x2 inclusive # by replacing the stretch [x1,x2] with linear interpolation numSamples=x2-x1 if numSamples < 0: raise Exception("MCsweep.interpBlanking({},{}) : stretch to interpolate must be positive number.".format(x,y)) for chan in range(self.signal.shape[1]): y1=self.signal[x1,chan] y2=self.signal[x2,chan] snip=np.linspace(y1,y2,numSamples) self.signal[x1:x2,chan]=snip def calcAMUA(self,coefs=[], padLen=300, downsample=2): # make a AMUA copy of the sweep # Calculates analog multiunit activity (AMUA) measure as in periodic gerbil paper. # See http://journal.frontiersin.org/article/10.3389/fncir.2015.00037/full result=MCsweep(self.sampleRate/downsample) # decimation will make AMUA have half orig sample rate result.timeStamp=self.timeStamp if coefs==[]: coefs=self.AMUAFilterCoeffs() bpCoefs=coefs[0] lpCoefs=coefs[1] # # We pad the signal with even symmetry copies # head=-np.flipud(self.signal[0:padLen,:]) # headShift=head[-1,:]-self.signal[0,:] # head=head-headShift # tail=-np.flipud(self.signal[-padLen:,:]) # tailShift=tail[0,:]-self.signal[-1,:] # tail=tail-tailShift # insig=np.vstack([head, self.signal, tail]) # insig=filtfilt(bpCoefs[0],bpCoefs[1],insig,axis=0) # insig=np.abs(insig) # insig=filtfilt(lpCoefs[0],lpCoefs[1],insig,axis=0) # result.signal=insig[padLen:-padLen,:] insig=filtfilt(bpCoefs[0],bpCoefs[1],self.signal,axis=0,padlen=padLen) insig=np.abs(insig) result.signal=filtfilt(lpCoefs[0],lpCoefs[1],insig,axis=0, padlen=padLen) # downsample result.signal=decimate(result.signal,downsample,axis=0) return result def RMS(self, rwin): # returns root mean square value of signal amplitude over the response window rwin return np.sqrt(np.mean(self.sigSnip(rwin)**2,axis=0)) def response(self, rwin,swin=[]): # returns a "response measure" as the mean signal amplitude over the response window rwin # optionally baseline corrected by response in spon window swin if swin == 'rest': spon = np.mean(self.sigSnip(rwin,rest=True),axis=0) else: if swin ==[]: spon = 0 else: spon = np.mean(self.sigSnip(swin),axis=0) return np.mean(self.sigSnip(rwin),axis=0)-spon def saveSweep(self,ephysFile): if isinstance(ephysFile, IOBase): # ephysFile is a file handle. writeMchanSignalToFile(ephysFile,self.sampleRate, self.timeStamp, self.signal) return if isinstance(ephysFile, str): # ephysFile is a file name. fileObject = open(ephysFile,'ab') writeMchanSignalToFile(fileObject,self.sampleRate, self.timeStamp, self.signal) fileObject.close() return # if you get to this point ephysFile is not somethign we can work with raise IOError('MCsweep.saveSweep() must be given file handle or file name string.') def plotSig(self, nCol, pltHandle, channels=[]): # plot a multichannel signal if self.signal is None: print('Signal empty. Nothing to plot.') return pltHandle.clf() ax=pltHandle.gca() # first work out how many rows of panels we need nChan=self.signal.shape[1] #nRow= np.ceil(nChan/nCols) # now work out how big our x and y offsets need to be xoffset=self.signal.shape[0]*0.1 yrange=np.max(np.abs(self.signal[20:,:])) yoffset=yrange*2.2 if yoffset==0: yoffset=1 xpoints=np.arange(self.signal.shape[0]) # now plot, starting from the bottom left row=0 col=0 if channels ==[]: channels=range(nChan) for chan in channels: ax.plot((xpoints+(xoffset+xpoints[-1])*col)/self.sampleRate, (self.signal[:,chan]-yoffset*row)*1000) col+=1 if col >= nCol: col=0 row+=1 ax.set_xlabel('time (s)'); ax.set_ylabel('mV'); ax.set_ylim(np.array([-(yoffset*(row-1)+yrange),yrange])*1000) #%% #def testSignalWriting(): # fname='c:/temp/test.ephys' # fileObject = open(fname,'wb') # sweep=np.vstack((np.array(range(100)),np.random.random(100),np.array(range(100,0,-1)))) # sweep=sweep.transpose() # writeMchanSignalToFile(fileObject,20.2,sweep) # writeMchanSignalToFile(fileObject,40.4,sweep) # writeMchanSignalToFile(fileObject,60.6,sweep) # fileObject.close() # #def sweepsToAMUA(swps,coefs=[], downsample=2,): # # make a AMUA copy of the sweep # # Calculates analog multiunit activity (AMUA) measure as in periodic gerbil paper. # # See http://journal.frontiersin.org/article/10.3389/fncir.2015.00037/full # result=MCsweep(self.sampleRate/downsample) # decimation will make AMUA have half orig sample rate # result.timeStamp=self.timeStamp # if coefs==[]: # coefs=swps.AMUAFilterCoeffs() # # first apply bandpass filter # bpCoefs=coefs[0] # # work out required padlen. Let's work with a 30 ms pad # padLen=int(np.ceil(self.sampleRate*0.03)) # #result.signal=np.flipud(lfilter(bpCoefs[0],bpCoefs[1],np.flipud(self.signal),axis=0)) # result.signal=filtfilt(bpCoefs[0],bpCoefs[1],self.signal,axis=0, padlen=padLen, padtype="odd") # # take absolute value # result.signal=np.abs(result.signal) # # low pass # lpCoefs=coefs[1] # #result.signal=np.flipud(lfilter(lpCoefs[0],lpCoefs[1],np.flipud(result.signal),axis=0)) # result.signal=filtfilt(lpCoefs[0],lpCoefs[1],result.signal,axis=0, padlen=padLen, padtype="even") # # trim discontinuity artifacts if desired. This should not happen with filtfilt # if trimEnd > 0: # result.signal=result.signal[0:-trimEnd,:] # # downsample # result.signal=decimate(result.signal,downsample,axis=0) # return result def poolSweeps(swp): # "pool" (average) the responses (signals) of all the MSweeps in input list "swp" #signals in swp may differ in length. Can only pool over the shortest length in teh collection alen=np.zeros([len(swp)]) for ss in range(len(swp)): alen[ss]=swp[ss].signal.shape[0] #print('sweep {}: length {}'.format(ss, swp[ss].signal.shape[0])) #pooledPSTH+=swp[ss].signal minLen=int(np.min(alen)) result=MCsweep(swp[0].sampleRate) result.signal=swp[0].signal[0:minLen,:].copy() for ss in range(1,len(swp)): result.signal+=swp[ss].signal[0:minLen,:] result.signal/=len(swp) return result def testSignalReading(): #fname='c:/temp/ephysTest/a1.ephys' fname='/home/colliculus/electrophys/TWF_IC_recording/NormalHearingNonTrain/ctrl_1805/data/ctrl_1805_TWF300Hz_2_P1.ephys' swps, stimPar=readEphysFile(fname) return swps, stimPar def writeEphysFile(fname,swps,stim=None): # check file extension if fname[-6:]!='.ephys': fname=fname+'.ephys' # read in .ephys file fileObject = open(fname,'wb') for swp in swps: swp.saveSweep(fileObject) fileObject.close() if not stim is None: # write corresponding stim param table if type(stim)==pd.core.frame.DataFrame: fname=fname[:-6]+'.csv' stim.to_csv(fname,',') def readEphysFile(fname): # check file extension if fname[-6:]!='.ephys': fname=fname+'.ephys' swps=np.array([]) # read in .ephys file with open(fname,'rb') as fileObject: swp=MCsweep(fileObject) while not swp.signal is None: swps=np.append(swps,swp) swp=MCsweep(fileObject) #read in corresponding stim param table fname=fname[:-6]+'.csv' stimPar=readEphysStimTable(fname) return swps, stimPar def readEphysStimTable(fname): if fname[-6:]=='.ephys': fname=fname[:-6]+'.csv' if fname[-4:]!='.csv': fname=fname+'.csv' try: stimPar=pd.read_csv(fname,',') stimPar=stimPar.loc[:, ~stimPar.columns.str.contains('^Unnamed')] except: print('warning, no stim file found for '+fname) stimPar=None return stimPar def readMchanSignalFromFile(fileObject): try: tStamp=struct.unpack("f",fileObject.read(4))[0] sampleRate=struct.unpack("f",fileObject.read(4))[0] sweeplen=struct.unpack("i",fileObject.read(4))[0] numChan=struct.unpack("i",fileObject.read(4))[0] sweep=None for chanIdx in range(numChan): achan=np.array(struct.unpack("{}f".format(sweeplen),fileObject.read(4*sweeplen))) if sweep is None: sweep=np.array(achan) else: sweep=np.vstack((sweep,np.array(achan))) return sampleRate, tStamp, sweep.transpose() except BaseException as e: #print('Finished reading sweeps with message: ',str(e)) # reading file failed, probably EOF return None, None, None def writeMchanSignalToFile(fileObject,sampleRate, timeStamp, sweep): fileObject.write(bytearray(struct.pack("f",timeStamp))) fileObject.write(bytearray(struct.pack("f",sampleRate))) sweeplen=sweep.shape[0] fileObject.write(bytearray(struct.pack("i",sweeplen))) fileObject.write(bytearray(struct.pack("i",sweep.shape[1]))) for chanIdx in range(sweep.shape[1]): achan=sweep[:,chanIdx].astype('float32').tolist() fileObject.write(bytearray(struct.pack("{}f".format(sweeplen),*achan))) fileObject.flush() #%% def multiPlot(x,y,ncols, marker='', linestyle='-'): axHnds=tuple() pltHnds=tuple() plt.clf() # first work out how many rows of panels we need nchan=y.shape[1] nrows=np.ceil(nchan/ncols) yrange=[np.min(y), np.max(y)] for ii in range(nchan): # collect the axes and line handles and return them axHnds+= (plt.subplot(nrows,ncols,ii+1),) pltHnds+= (plt.plot(x, y[:,ii], marker=marker, linestyle=linestyle),) if not np.mod(ii,ncols) == 0: plt.yticks([]) if not ii>=nchan-ncols: plt.xticks([]), plt.ylim(yrange) return axHnds, pltHnds