Upon browsing the SciPy website, I discovered there were a number of "cookbooks" for using SciPy to do various tasks. The Data Acquisition with NI-DAQmx cookbook interested me because of my present job and familiarity with National Instruments products. I checked it out and decided to get a small data measurement system setup using python and a NI USB-6009 multi-function DAQ module on my Vista Home Basic laptop.
Here is what my setup looks like:
This setup also relies on the ctypes module which is a default module in python 2.5. If you are using python 2.4 or less you may have to install this module.
The example code found on the SciPy website shows an example that takes a C example provided with the installation of DAQmx and re-writes it using ctypes and numpy. My system copies this example and turns it into a module to be used by a higher level script that will acquire data and plot the data.
Here is the module I named contAcquireNChan
#contAcquireNChan.py
import ctypes
import numpy
nidaq = ctypes.windll.nicaiu # load the DLL
##############################
# Setup some typedefs and constants
# to correspond with values in
# C:\Program Files\National Instruments\NI-DAQ\DAQmx ANSI C Dev\include\NIDAQmx.h
# the typedefs
int32 = ctypes.c_long
uInt32 = ctypes.c_ulong
uInt64 = ctypes.c_ulonglong
float64 = ctypes.c_double
TaskHandle = uInt32
written = int32()
pointsRead = uInt32()
# the constants
DAQmx_Val_Cfg_Default = int32(-1)
DAQmx_Val_Volts = 10348
DAQmx_Val_Rising = 10280
DAQmx_Val_FiniteSamps = 10178
DAQmx_Val_GroupByChannel = 0
DAQmx_Val_ChanForAllLines = 1
DAQmx_Val_RSE = 10083
DAQmx_Val_Volts = 10348
DAQmx_Val_ContSamps = 10123
DAQmx_Val_GroupByScanNumber = 1
##############################
def CHK(err):
"""a simple error checking routine"""
if err < 0:
buf_size = 1000
buf = ctypes.create_string_buffer('\000' * buf_size)
nidaq.DAQmxGetErrorString(err,ctypes.byref(buf),buf_size)
raise RuntimeError('nidaq call failed with error %d: %s'%(err,repr(buf.value)))
# initialize variables
taskHandle = TaskHandle(0)
min = float64(-10.0)
max = float64(10.0)
timeout = float64(10.0)
bufferSize = uInt32(10)
pointsToRead = bufferSize
pointsRead = uInt32()
sampleRate = float64(10000.0)
samplesPerChan = uInt64(2000)
chan = ctypes.create_string_buffer('Dev1/ai0')
clockSource = ctypes.create_string_buffer('OnboardClock')
data = numpy.zeros((1000,),dtype=numpy.float64)
# Create Task and Voltage Channel and Configure Sample Clock
def SetupTask():
CHK(nidaq.DAQmxCreateTask("",ctypes.byref(taskHandle)))
CHK(nidaq.DAQmxCreateAIVoltageChan(taskHandle,chan,"",DAQmx_Val_RSE,min,max,
DAQmx_Val_Volts,None))
CHK(nidaq.DAQmxCfgSampClkTiming(taskHandle,clockSource,sampleRate,
DAQmx_Val_Rising,DAQmx_Val_ContSamps,samplesPerChan))
CHK(nidaq.DAQmxCfgInputBuffer(taskHandle,200000))
#Start Task
def StartTask():
CHK(nidaq.DAQmxStartTask (taskHandle))
#Read Samples
def ReadSamples(points):
bufferSize = uInt32(points)
pointsToRead = bufferSize
data = numpy.zeros((points,),dtype=numpy.float64)
CHK(nidaq.DAQmxReadAnalogF64(taskHandle,pointsToRead,timeout,
DAQmx_Val_GroupByScanNumber,data.ctypes.data,
uInt32(2*bufferSize.value),ctypes.byref(pointsRead),None))
print "Acquired %d pointx(s)"%(pointsRead.value)
return data
def StopAndClearTask():
if taskHandle.value != 0:
nidaq.DAQmxStopTask(taskHandle)
nidaq.DAQmxClearTask(taskHandle)
#measure.py
from contAcquireNChan import *
from pylab import *
import time
def acquire(points):
times = []
SetupTask()
StartTask()
data = ReadSamples(points)
acqTime = points/sampleRate.value
StopAndClearTask()
t = linspace(0,acqTime,points)
clf()
plot(t,data)
axis([0,acqTime,-10,10])
ylabel("Volts")
xlabel("time (sec)")
grid(True)
return data
I made another script that acquired data and then plotted the spectrum of the same 100Hz signal.
#plot_example1.py
from scipy import *
from pylab import *
import time
from contAcquireNChan import *
def acquire(points):
SetupTask()
StartTask()
tstart = time.time()
data = ReadSamples(points)
StopAndClearTask()
return data
#Capture 600ms of data
sample_rate=sampleRate.value
t=r_[0:0.6:1/sample_rate]
N=len(t)
s=acquire(N)
S=fft(s)
f=sample_rate*r_[0:(N/2)]/N
n=len(f)
clf()
plot(f,abs(S[0:n])/N)
show()
grid(True)