Arvont Hill: December 2008

Sunday, December 7, 2008

Using Python for Test & Measurement

I picked up the November 2008 issue of Linux Journal after seeing it had an article about using Python for scientific computing. As I read through the article I learned about two modules, numpy and SciPy , that provide "extended" capabilities for math and science operations such as matrix calculations, Fourier transformations or numerical integrals. The article also mentioned two other modules named matplotlib and IPython. Together these 4 packages provide a Python programmer the ability do many scientific calculations and plot results all from a powerful interactive environment similar to matlab with all the power of Python.

Upon browsing the SciPy website, I discovered there were a number of "cookbooks" for using SciPy to do various tasks. The Data Acquisition with NI-DAQmx cookbook interested me because of my present job and familiarity with National Instruments products. I checked it out and decided to get a small data measurement system setup using python and a NI USB-6009 multi-function DAQ module on my Vista Home Basic laptop.

Here is what my setup looks like:

  • python 2.5.2
  • numpy 1.2.1 for python2.5
  • scipy 0.6.0 for python2.5
  • matplotlib 0.98.3win32
  • ipython 0.9.1
  • PyReadline 1.5 (Optional: This provides tab completion in Windows. It also provides colored text in an ipython window.)
  • NI-DAQmx 8.7.1f3 (most recent version should be ok as well)
  • NI USB-6009 Multifunction DAQ

  • This setup also relies on the ctypes module which is a default module in python 2.5. If you are using python 2.4 or less you may have to install this module.

    The example code found on the SciPy website shows an example that takes a C example provided with the installation of DAQmx and re-writes it using ctypes and numpy. My system copies this example and turns it into a module to be used by a higher level script that will acquire data and plot the data.

    Here is the module I named contAcquireNChan

    #contAcquireNChan.py

    import ctypes
    import numpy

    nidaq = ctypes.windll.nicaiu # load the DLL

    ##############################
    # Setup some typedefs and constants
    # to correspond with values in
    # C:\Program Files\National Instruments\NI-DAQ\DAQmx ANSI C Dev\include\NIDAQmx.h

    # the typedefs
    int32 = ctypes.c_long
    uInt32 = ctypes.c_ulong
    uInt64 = ctypes.c_ulonglong
    float64 = ctypes.c_double
    TaskHandle = uInt32
    written = int32()
    pointsRead = uInt32()

    # the constants
    DAQmx_Val_Cfg_Default = int32(-1)
    DAQmx_Val_Volts = 10348
    DAQmx_Val_Rising = 10280
    DAQmx_Val_FiniteSamps = 10178
    DAQmx_Val_GroupByChannel = 0
    DAQmx_Val_ChanForAllLines = 1
    DAQmx_Val_RSE = 10083
    DAQmx_Val_Volts = 10348
    DAQmx_Val_ContSamps = 10123
    DAQmx_Val_GroupByScanNumber = 1
    ##############################

    def CHK(err):
        """a simple error checking routine"""
        if err < 0:
            buf_size = 1000
            buf = ctypes.create_string_buffer('\000' * buf_size)
            nidaq.DAQmxGetErrorString(err,ctypes.byref(buf),buf_size)
            raise RuntimeError('nidaq call failed with error %d: %s'%(err,repr(buf.value)))

    # initialize variables
    taskHandle = TaskHandle(0)
    min = float64(-10.0)
    max = float64(10.0)
    timeout = float64(10.0)
    bufferSize = uInt32(10)
    pointsToRead = bufferSize
    pointsRead = uInt32()
    sampleRate = float64(10000.0)
    samplesPerChan = uInt64(2000)
    chan = ctypes.create_string_buffer('Dev1/ai0')
    clockSource = ctypes.create_string_buffer('OnboardClock')


    data = numpy.zeros((1000,),dtype=numpy.float64)


    # Create Task and Voltage Channel and Configure Sample Clock
    def SetupTask():
        CHK(nidaq.DAQmxCreateTask("",ctypes.byref(taskHandle)))
        CHK(nidaq.DAQmxCreateAIVoltageChan(taskHandle,chan,"",DAQmx_Val_RSE,min,max,
            DAQmx_Val_Volts,None))
        CHK(nidaq.DAQmxCfgSampClkTiming(taskHandle,clockSource,sampleRate,
            DAQmx_Val_Rising,DAQmx_Val_ContSamps,samplesPerChan))
        CHK(nidaq.DAQmxCfgInputBuffer(taskHandle,200000))

    #Start Task
    def StartTask():
        CHK(nidaq.DAQmxStartTask (taskHandle))

    #Read Samples
    def ReadSamples(points):
        bufferSize = uInt32(points)
        pointsToRead = bufferSize
        data = numpy.zeros((points,),dtype=numpy.float64)
        CHK(nidaq.DAQmxReadAnalogF64(taskHandle,pointsToRead,timeout,
                DAQmx_Val_GroupByScanNumber,data.ctypes.data,
                uInt32(2*bufferSize.value),ctypes.byref(pointsRead),None))

        print "Acquired %d pointx(s)"%(pointsRead.value)
        return data

    def StopAndClearTask():
        if taskHandle.value != 0:
            nidaq.DAQmxStopTask(taskHandle)
            nidaq.DAQmxClearTask(taskHandle)
    I wanted to capture a time varying signal and plot the time based signal as well as its spectrum. I created a 100Hz signal with a PIC16F690 that I had on my bench and connected the output to ai0 on the USB-6009. Then I ran this script, measure.py, that calls the contAcquireNChan module. The script plots the results using pylab and also returns that data read from the USB-6009 as a numpy array.



    #measure.py

    from contAcquireNChan import *
    from pylab import *
    import time

    def acquire(points):
        times = []
        SetupTask()
        StartTask()
        data = ReadSamples(points)
        acqTime = points/sampleRate.value
        StopAndClearTask()
        t = linspace(0,acqTime,points)
        clf()
        plot(t,data)
        axis([0,acqTime,-10,10])
        ylabel("Volts")
        xlabel("time (sec)")
        grid(True)
        return data


    I made another script that acquired data and then plotted the spectrum of the same 100Hz signal.



    #plot_example1.py

    from scipy import *
    from pylab import *
    import time


    from contAcquireNChan import *


    def acquire(points):
        SetupTask()
        StartTask()
        tstart = time.time()
        data = ReadSamples(points)
        StopAndClearTask()
        return data

    #Capture 600ms of data
    sample_rate=sampleRate.value
    t=r_[0:0.6:1/sample_rate]
    N=len(t)
    s=acquire(N)
    S=fft(s)
    f=sample_rate*r_[0:(N/2)]/N
    n=len(f)
    clf()
    plot(f,abs(S[0:n])/N)
    show()
    grid(True)