The following is code for module listScheduler.py as it was on June 22nd 2020. This frozen copy is here for reference for the other pages explaining how the software works.
# -*- coding: utf-8 -*-
"""
Created on August 6th 2018
@author: wschnupp
the "list scheduler" simply works through the list of stimuli
defined by stimVarParams and stimVarValues
for the number of times specified in stimListRepeats,
shuffling the presentation order of the stimVarValues on each run
"""
import numpy as np
import time, sys
import psyPhysConfig as config
if not hasattr(config,'maxResponseTime'):
config.maxResponseTime=15 # max interval between start spout lick and response in seconds
verbose=False
#%%
def myvstack(Y,newRow):
if len(Y)==0:
Y=[newRow]
else:
if np.size(newRow)==0:
return Y
else:
Y=np.vstack((Y,[newRow]))
return Y
def permute(A,B):
Y=np.array([])
A=np.array(A)
B=np.array(B)
if A.size==0:
return []
for a in A:
a=np.array(a)
if B.size==0:
Y=myvstack(Y,a)
else:
for b in B:
b=np.array(b)
newRow=np.append(a,b)
Y=myvstack(Y,newRow)
return Y
#%%
class schedule: # the schedule object creates and maintains a list of stim parameters to work through.
# by permuting the params given.
# Alternatively the schedule can be read from a CSV file.
def __init__(self, stimVarParams=[]):
self.stimIndex=-1
self.stimVarParams=stimVarParams
# stimVarParams could be a file name for a CSV file instead of a list
# of variable paramaters.
self.stimListRepeats=0
if type(stimVarParams) == str:
self.importFromCSV(stimVarParams)
def permute(self, valuesToPermute, stimListRepeats):
# build the stimVarValues by permuting the valuesToPermute lists
self.stimListRepeats=stimListRepeats
ndims=np.shape(valuesToPermute)[0]
values=permute(valuesToPermute[0],[])
for ii in range(ndims-1):
values=permute(values,valuesToPermute[ii+1])
np.random.shuffle(values)
self.stimVarValues=values
for ii in range(stimListRepeats-1):
np.random.shuffle(values)
self.stimVarValues= np.concatenate((self.stimVarValues,values))
def importFromCSV(self, csvFileName):
import pandas as pd
tbl=pd.read_csv(csvFileName)
self.stimVarParams=np.array(tbl.columns)
self.stimVarValues=np.array(tbl)
def repeatStimListNtimes(self, stimListRepeats):
values=self.stimVarValues.copy()
for ii in range(stimListRepeats-1):
self.stimVarValues= np.concatenate((self.stimVarValues,values))
def shuffleStimList(self):
#seedvalue = np.random.choice(10000,1)
np.random.seed(round(time.time()))
np.random.shuffle(self.stimVarValues)
def currentParams(self):
if not self.finished():
return self.stimVarValues[max(self.stimIndex,0)]
else:
return []
def nextParams(self):
self.stimIndex+=1
return self.currentParams()
def paramNames(self):
return self.stimVarParams
def finished(self):
return self.stimIndex >= len(self.stimVarValues)
def readScheduleFromFile(self,defaultName=''):
#from tkinter import messagebox
from tkinter import filedialog
#import tkinter as tk
filename = filedialog.askopenfilename(title = "Open Stim Params Table:",initialfile=defaultName,filetypes = (("CSV","*.csv"),("all files","*.*")))
if filename=='':
# opening canceled.
print('No valid file chosen.')
return ''
try:
self.importFromCSV(filename)
return filename
except:
print('Warning: Could not open file: '+filename)
return ''
#%%
class scheduler:
def __init__(self, detectors,stimulator,dataHandler,schedule):
self.detectors=detectors
self.stimulator=stimulator
self.dataHandler=dataHandler
self.schedule=schedule
self.lastAction=time.time()
self.oldStatus=""
def start(self):
config.status=""
self.broadCastStatus('start')
self.nextJob=self.chooseNextTrial
self.stimStarted=time.time()
self.timeZero=time.time()
self.processJobs()
def processJobs(self):
# now keep working through jobs
while self.nextJob != None:
if config.status=="abort":
print('Schedule aborted.')
self.nextJob=None
else:
self.nextJob=self.nextJob()
def didSucceed(self,returnCode):
# status change functions of modules may return nothing or a boolean
if returnCode is None:
return True
return returnCode
def broadCastStatus(self,aStatus):
# only broadcast new status if it has changed
if aStatus==self.oldStatus:
return
self.oldStatus=aStatus
if verbose:
print()
print('#### New status: '+aStatus)
sys.stdout.flush()
if hasattr(self.stimulator,'statusChange'):
success=self.didSucceed(self.stimulator.statusChange(aStatus))
while not success:
time.sleep(0.1)
success=self.didSucceed(self.stimulator.statusChange(aStatus))
if not self.detectors is None:
if hasattr(self.detectors,'statusChange'):
success=self.didSucceed(self.detectors.statusChange(aStatus))
while not success:
time.sleep(0.1)
success=self.didSucceed(self.detectors.statusChange(aStatus))
if hasattr(self.dataHandler,'statusChange'):
success=self.didSucceed(self.dataHandler.statusChange(aStatus))
while not success:
time.sleep(0.1)
success=self.didSucceed(self.dataHandler.statusChange(aStatus))
def chooseNextTrial(self):
self.broadCastStatus('chooseNextTrial')
self.nextParams=self.schedule.nextParams()
if (self.nextParams == []):
return None # we are done, no more jobs to do
else:
# get the stimulator ready for the chosen stimulus
self.stimulator.setParams(list(self.schedule.paramNames()), self.nextParams)
# for debugging, print info about the chosen stimulus
if verbose:
print('Next stimulus has parameters ',self.schedule.paramNames(),self.nextParams)
if self.stimulator.correctResponse("RIGHT"):
side = 'RIGHT'
else:
side='LEFT'
if verbose:
print()
print('Correct response would be on the '+side)
print()
sys.stdout.flush()
return self.waitForStart
def waitForStart(self):
# listen to detectors and wait for a START signal
self.broadCastStatus('waitForStart')
if self.detectors is None:
# if there are no detectors to poll for start signal we just go
return self.presentTrial
response=self.detectors.responseDetected()
if response[0]=='NONE':
# no start signal yet
time.sleep(0.05)
if (time.time()-self.lastAction) > 10:
self.lastAction=time.time()
print('Still waiting for centre lick at '+time.asctime(time.localtime()))
sys.stdout.flush()
return self.waitForStart
if response[0]=='QUIT':
# exit signal sent: return None as next job to do
return None
if response[0]=='START':
return self.presentTrial
else:
# if we got here an inappropriate response was received.
# Reset sensors and continue
print('Received unexpected signal '+response[0])
sys.stdout.flush()
return self.waitForStart
def presentTrial(self):
# present next stimulus
print()
print('### presenting trial {} of {}'.format(self.schedule.stimIndex+1, len(self.schedule.stimVarValues)))
self.stimStarted=time.time()
config.currentStimTimestamp=self.stimStarted-self.timeZero
config.currentStimParams=self.nextParams
self.broadCastStatus('presentTrial')
self.lastAction=time.time()
return self.getResponse
def getResponse(self):
self.broadCastStatus('getResponse')
# listen to detectors and wait for a response to a recently presented stimulus
# if there are no detectors we just save the timestamp of when the stimulus started
if self.detectors is None:
par=self.stimulator.stimParams.copy()
par['timeStamp']=config.currentStimTimestamp
self.dataHandler.saveTrial(par)
return self.chooseNextTrial
response=self.detectors.responseDetected()
if response[0] in ['NONE','START']:
# no response signal yet
time.sleep(0.05)
if (time.time()-self.lastAction) > config.maxResponseTime:
print('Subject took too long to respond. Resetting trial.')
sys.stdout.flush()
self.lastAction=time.time()
return self.waitForStart
return self.getResponse
print('Registered response '+response[0]+' at:',response[1])
sys.stdout.flush()
if response[0]=='QUIT':
# exit signal sent: return None as next job to do
return None
# at this point my response was neither none nor start nor quit, so it must be a judgment.
# Save the trial. Build a dictionary with all the trial info
par=self.stimulator.stimParams.copy()
# Ask the stimulator whether the response was correct
par['correct']=self.stimulator.correctResponse(response[0])
par['response']=response[0]
par['reactionTime']=response[1]-self.stimStarted
par['timeStamp']=response[1]-self.timeZero
self.dataHandler.saveTrial(par)
# Give feedback
if par['correct']:
return self.reward
else:
return self.punish
def reward(self):
# reward, then move on to next trial
print('Correct response. Giving reward then choosing next trial')
sys.stdout.flush()
self.broadCastStatus('reward')
return self.chooseNextTrial
def punish(self):
# give timeout signals, then repeat current stimulus as correction trial
print('Wrong response. Starting timeout then repeating trial')
sys.stdout.flush()
self.broadCastStatus('punish')
return self.waitForStart
def done(self):
pass