The following is code for module listScheduler.py as it was on June 22nd 2020. This frozen copy is here for reference for the other pages explaining how the software works.
# -*- coding: utf-8 -*-"""Created on August 6th 2018@author: wschnuppthe "list scheduler" simply works through the list of stimulidefined by stimVarParams and stimVarValuesfor the number of times specified in stimListRepeats,shuffling the presentation order of the stimVarValues on each run"""import numpy as npimport time, sysimport psyPhysConfig as configif not hasattr(config,'maxResponseTime'):config.maxResponseTime=15 # max interval between start spout lick and response in secondsverbose=False#%%def myvstack(Y,newRow):if len(Y)==0:Y=[newRow]else:if np.size(newRow)==0:return Yelse:Y=np.vstack((Y,[newRow]))return Ydef permute(A,B):Y=np.array([])A=np.array(A)B=np.array(B)if A.size==0:return []for a in A:a=np.array(a)if B.size==0:Y=myvstack(Y,a)else:for b in B:b=np.array(b)newRow=np.append(a,b)Y=myvstack(Y,newRow)return Y#%%class schedule: # the schedule object creates and maintains a list of stim parameters to work through.# by permuting the params given.# Alternatively the schedule can be read from a CSV file.def __init__(self, stimVarParams=[]):self.stimIndex=-1self.stimVarParams=stimVarParams# stimVarParams could be a file name for a CSV file instead of a list# of variable paramaters.self.stimListRepeats=0if type(stimVarParams) == str:self.importFromCSV(stimVarParams)def permute(self, valuesToPermute, stimListRepeats):# build the stimVarValues by permuting the valuesToPermute listsself.stimListRepeats=stimListRepeatsndims=np.shape(valuesToPermute)[0]values=permute(valuesToPermute[0],[])for ii in range(ndims-1):values=permute(values,valuesToPermute[ii+1])np.random.shuffle(values)self.stimVarValues=valuesfor ii in range(stimListRepeats-1):np.random.shuffle(values)self.stimVarValues= np.concatenate((self.stimVarValues,values))def importFromCSV(self, csvFileName):import pandas as pdtbl=pd.read_csv(csvFileName)self.stimVarParams=np.array(tbl.columns)self.stimVarValues=np.array(tbl)def repeatStimListNtimes(self, stimListRepeats):values=self.stimVarValues.copy()for ii in range(stimListRepeats-1):self.stimVarValues= np.concatenate((self.stimVarValues,values))def shuffleStimList(self):#seedvalue = np.random.choice(10000,1)np.random.seed(round(time.time()))np.random.shuffle(self.stimVarValues)def currentParams(self):if not self.finished():return self.stimVarValues[max(self.stimIndex,0)]else:return []def nextParams(self):self.stimIndex+=1return self.currentParams()def paramNames(self):return self.stimVarParamsdef finished(self):return self.stimIndex >= len(self.stimVarValues)def readScheduleFromFile(self,defaultName=''):#from tkinter import messageboxfrom tkinter import filedialog#import tkinter as tkfilename = filedialog.askopenfilename(title = "Open Stim Params Table:",initialfile=defaultName,filetypes = (("CSV","*.csv"),("all files","*.*")))if filename=='':# opening canceled.print('No valid file chosen.')return ''try:self.importFromCSV(filename)return filenameexcept:print('Warning: Could not open file: '+filename)return ''#%%class scheduler:def __init__(self, detectors,stimulator,dataHandler,schedule):self.detectors=detectorsself.stimulator=stimulatorself.dataHandler=dataHandlerself.schedule=scheduleself.lastAction=time.time()self.oldStatus=""def start(self):config.status=""self.broadCastStatus('start')self.nextJob=self.chooseNextTrialself.stimStarted=time.time()self.timeZero=time.time()self.processJobs()def processJobs(self):# now keep working through jobswhile self.nextJob != None:if config.status=="abort":print('Schedule aborted.')self.nextJob=Noneelse:self.nextJob=self.nextJob()def didSucceed(self,returnCode):# status change functions of modules may return nothing or a booleanif returnCode is None:return Truereturn returnCodedef broadCastStatus(self,aStatus):# only broadcast new status if it has changedif aStatus==self.oldStatus:returnself.oldStatus=aStatusif verbose:print()print('#### New status: '+aStatus)sys.stdout.flush()if hasattr(self.stimulator,'statusChange'):success=self.didSucceed(self.stimulator.statusChange(aStatus))while not success:time.sleep(0.1)success=self.didSucceed(self.stimulator.statusChange(aStatus))if not self.detectors is None:if hasattr(self.detectors,'statusChange'):success=self.didSucceed(self.detectors.statusChange(aStatus))while not success:time.sleep(0.1)success=self.didSucceed(self.detectors.statusChange(aStatus))if hasattr(self.dataHandler,'statusChange'):success=self.didSucceed(self.dataHandler.statusChange(aStatus))while not success:time.sleep(0.1)success=self.didSucceed(self.dataHandler.statusChange(aStatus))def chooseNextTrial(self):self.broadCastStatus('chooseNextTrial')self.nextParams=self.schedule.nextParams()if (self.nextParams == []):return None # we are done, no more jobs to doelse:# get the stimulator ready for the chosen stimulusself.stimulator.setParams(list(self.schedule.paramNames()), self.nextParams)# for debugging, print info about the chosen stimulusif verbose:print('Next stimulus has parameters ',self.schedule.paramNames(),self.nextParams)if self.stimulator.correctResponse("RIGHT"):side = 'RIGHT'else:side='LEFT'if verbose:print()print('Correct response would be on the '+side)print()sys.stdout.flush()return self.waitForStartdef waitForStart(self):# listen to detectors and wait for a START signalself.broadCastStatus('waitForStart')if self.detectors is None:# if there are no detectors to poll for start signal we just goreturn self.presentTrialresponse=self.detectors.responseDetected()if response[0]=='NONE':# no start signal yettime.sleep(0.05)if (time.time()-self.lastAction) > 10:self.lastAction=time.time()print('Still waiting for centre lick at '+time.asctime(time.localtime()))sys.stdout.flush()return self.waitForStartif response[0]=='QUIT':# exit signal sent: return None as next job to doreturn Noneif response[0]=='START':return self.presentTrialelse:# if we got here an inappropriate response was received.# Reset sensors and continueprint('Received unexpected signal '+response[0])sys.stdout.flush()return self.waitForStartdef presentTrial(self):# present next stimulusprint()print('### presenting trial {} of {}'.format(self.schedule.stimIndex+1, len(self.schedule.stimVarValues)))self.stimStarted=time.time()config.currentStimTimestamp=self.stimStarted-self.timeZeroconfig.currentStimParams=self.nextParamsself.broadCastStatus('presentTrial')self.lastAction=time.time()return self.getResponsedef getResponse(self):self.broadCastStatus('getResponse')# listen to detectors and wait for a response to a recently presented stimulus# if there are no detectors we just save the timestamp of when the stimulus startedif self.detectors is None:par=self.stimulator.stimParams.copy()par['timeStamp']=config.currentStimTimestampself.dataHandler.saveTrial(par)return self.chooseNextTrialresponse=self.detectors.responseDetected()if response[0] in ['NONE','START']:# no response signal yettime.sleep(0.05)if (time.time()-self.lastAction) > config.maxResponseTime:print('Subject took too long to respond. Resetting trial.')sys.stdout.flush()self.lastAction=time.time()return self.waitForStartreturn self.getResponseprint('Registered response '+response[0]+' at:',response[1])sys.stdout.flush()if response[0]=='QUIT':# exit signal sent: return None as next job to doreturn None# at this point my response was neither none nor start nor quit, so it must be a judgment.# Save the trial. Build a dictionary with all the trial infopar=self.stimulator.stimParams.copy()# Ask the stimulator whether the response was correctpar['correct']=self.stimulator.correctResponse(response[0])par['response']=response[0]par['reactionTime']=response[1]-self.stimStartedpar['timeStamp']=response[1]-self.timeZeroself.dataHandler.saveTrial(par)# Give feedbackif par['correct']:return self.rewardelse:return self.punishdef reward(self):# reward, then move on to next trialprint('Correct response. Giving reward then choosing next trial')sys.stdout.flush()self.broadCastStatus('reward')return self.chooseNextTrialdef punish(self):# give timeout signals, then repeat current stimulus as correction trialprint('Wrong response. Starting timeout then repeating trial')sys.stdout.flush()self.broadCastStatus('punish')return self.waitForStartdef done(self):pass
