Commit 9b3842f3 authored by Qiang Du's avatar Qiang Du

Added tools/lib-multiboot API to support flash access through Etherbone.

- eb/etherbone.py: python API wapper using Etherbone libaray
- eb/eb.py: xil_multiboot device class with direct register access
- flash_m25p.py: M25P flash IO class
- xil_multiboot.py: top class that supports all ICAP/flash on target
- mbtest.py: __main__ test script that proofs a successful flash reading & erasing
parent 3abad7aa
# Author: Theodor Stana <theodor.stana@gmail.com>
# Date: October 2014
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, you may find one here:
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# or you may search the http://www.gnu.org website for the version 2 license,
# or you may write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
# Communication type definitions
#
# Communication types are used in several places to define how to interact to
# the MultiBoot module (what interface is used to the FPGA) and to the on-board
# flash module (what interface exists between the FPGA and the flash chip).
#
# The scripts that use these definitions are organized as series of 'if'
# branches that execute specific commands based on the communication type being
# used. As an example of usage, grep for ELMA_I2C_MULTIBOOT in the current
# folder to see where it is used and how the specific commands are sent to
# interface to the Flash via the ELMA I2C protocol and the MultiBoot logic
# implemented on the FPGAs of blocking and RS-485 converter boards:
#
# grep -rnH ELMA_I2C_MULTIBOOT .
#
# Browse the code under the if branches, and apply your own code if you are
# implementing a new communication type. Note that inside the constructors of
# the XilMultiboot and FlashM25P classes there are variable parameters passed by
# the protocol type. In these constructors, you should decide on a sequence of
# parameters to be passed to the constructor. This sequence is specific to the
# communication type you are implementing. Then, add parameters specific to this
# new communication type that are to be used along the classes, and implement
# specific 'if' branches for the new communication type. An example of
# communication-type-specific parameters is the elma.write() and elma.read()
# methods, in the case of the ELMA_I2C_MULTIBOOT type.
ELMA_I2C_MULTIBOOT = 0;
ETHERBONE_MULTIBOOT = 1;
#===============================================================================
# GNU LESSER GENERAL PUBLIC LICENSE
#===============================================================================
# This source file is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 2.1 of the License, or (at your
# option) any later version. This source is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details. You should have
# received a copy of the GNU Lesser General Public License along with this
# source; if not, download it from http://www.gnu.org/licenses/lgpl-2.1.html
#===============================================================================
# Qiang Du <du.qiang@gmail.com>
# 01/07/2015
import etherbone as eb
import sys
class EB:
""" access a Etherbone device. """
def __init__(self, ip):
""" Initialize EB class.
ip -- IP address of Etherbone device
"""
self.port_ip = 'udp/' + ip
self.write_cnt = 0
self.read_cnt = 0
self.open()
def open(self):
self.socket = eb.Socket()
self.socket.__enter__()
self.device = eb.Device(self.socket, self.port_ip)
self.device.__enter__()
self.cycle = eb.Cycle(self.device, 0, 0)
self.cycle.__enter__()
def close(self):
self.cycle.close()
self.device.close()
self.socket.close()
def write(self, addr, val):
self.device.write(addr, val)
def read(self, addr):
return self.device.read(addr)
class EI2C:
"""Access a VME crate through Telnet."""
def __init__(self, ip, user, pwd):
"""Initialize EI2C class.
The arguments to init the class are mandatory:
ip -- IP address of VME crate
user -- username to login to VME crate
pwd -- password to login to VME crate
"""
self.ip = ip
self.user = user
self.pwd = pwd
self.write_cnt = 0
self.read_cnt = 0
if (ei2cdefine.DUMP):
self.f = open("dump.txt", 'w')
def open(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
try:
s.connect((self.ip, 23))
s.recv(30)
except (socket.gaierror, socket.error) as e:
raise BadHostnameError
s.send(self.user + "\r\n")
if "password" not in s.recv(30):
raise BadUsernameError
s.send(self.pwd + "\r\n")
if "Wrong password" in s.recv(30):
raise BadPasswordError
self.handle = s
def close(self):
self.handle.close()
if (ei2cdefine.DUMP):
self.f.close()
def write(self, slot, addr, val):
self.write_cnt += 1
#reg = addr/4 + 1
cmd = "writereg %d %x %x\r\n" % (slot, addr, val)
if (ei2cdefine.DUMP):
self.f.write(cmd);
self.handle.send(cmd)
self._strip_resp(self.handle.recv(30))
def read(self, slot, addr):
self.read_cnt += 1
#reg = addr/4 + 1
cmd = "readreg %d %x\r\n" % (slot, addr)
if (ei2cdefine.DUMP):
self.f.write(cmd);
self.handle.send(cmd)
resp = self._strip_resp(self.handle.recv(30))
return resp
def writemregs(self, slot, addr, val):
self.write_cnt += 1
#reg = addr/4 + 1
#print '%02x' % sl
cmd = "writemregs %d %x %s\r\n" % (slot, addr, ' '.join(format(b,'x') for b in val))
if (ei2cdefine.DUMP):
self.f.write(cmd);
#print cmd
self.handle.send(cmd)
self._strip_resp(self.handle.recv(30))
def readmregs(self, slot, addr, nrregs):
self.read_cnt += 1
#reg = addr/4 + 1
cmd = "readmregs %d %x %d\r\n" % (slot, addr, nrregs)
if (ei2cdefine.DUMP):
self.f.write(cmd);
self.handle.send(cmd)
resp = self._strip_resp(self.handle.recv(30))
return resp
def _strip_resp(self, msg):
"""Strip useful message from SysMon response.
The response string from the SysMon is stripped of unnecessary characters
and a "useful" string is returned. In case one of the error messages appear,
the function raises one of the EI2C exceptions."""
#print msg.splitlines()
msg = msg.splitlines()[1]
if (ei2cdefine.DUMP):
self.f.write(msg + "\n")
if "Not Ack" in msg:
raise NAckError
if "Invalid register number!" in msg:
raise RegNumberError
if "Bad command!" in msg:
raise InvalidCmdError
if "I2C Timer expired" in msg:
raise TimerExpiredError
if "Invalid slot number!" in msg:
raise SlotError
if "Read Data:" in msg:
msg = msg.split(" ")[3]
msg = int(msg,16)
return msg
# MAIN "FUNCTION"
if __name__ == "__main__":
address = 0x20810
if (0):
with eb.Socket() as socket:
with eb.Device(socket, 'udp/rflab2') as device:
with eb.Cycle(device, 0, 0) as cycle:
data = cycle.read(address)
print hex(data.value)
print 'EB test'
ebi = EB('rflab2')
print hex(ebi.read(address).value)
#! /usr/bin/env python
# coding: utf8
# python bindings for the etherbone unix library
# this is a preliminary version that gives access to the
# core C99 library API and implements Socket, Device and
# Cycle classes with a Pythonic interface
#
# just import-tested, probably full of horrendous bugs, needs
# extensive testing given that opaque types and nested
# structures are used all over the place and this is very
# error-prone with ctypes (and my limited knowledge thereof)
from ctypes import *
libetherbone_path = '/usr/local/lib/libetherbone.so'
lib = CDLL(libetherbone_path)
# typedefs and enums are translated into ctypes mainly
# for documentation purposes; using them does not make
# much sense otherwise
# configurable types
eb_address_t = c_ulonglong
eb_data_t = c_ulonglong
# control types
eb_network_address_t = c_char_p
eb_descriptor_t = c_int
# opaque structural types
# uncomment this and kill later definitions
# of eb_{socket,device,cycle}_t if some conflict
# with ctypes internals arises
eb_socket_t = c_uint16
eb_device_t = c_uint16
eb_cycle_t = c_uint16
# status codes
eb_status_t = c_int
( EB_OK,
EB_FAIL,
EB_ABORT,
EB_ADDRESS,
EB_OVERFLOW,
EB_ENDIAN,
EB_BUSY,
EB_TIMEOUT,
EB_OOM,
EB_ABI,
EB_SEGFAULT) = range(0,-11,-1)
eb_err_code = {
0: 'EB_OK',
-1: 'EB_FAIL',
-2: 'EB_ABORT',
-3: 'EB_ADDRESS',
-4: 'EB_OVERFLOW',
-5: 'EB_ENDIAN',
-6: 'EB_BUSY',
-7: 'EB_TIMEOUT',
-8: 'EB_OOM',
-9: 'EB_ABI',
-10: 'EB_SEGFAULT'
}
# modes
eb_mode_t = c_int
EB_UNDEFINED = -1
EB_FIFO = 0
EB_LINEAR = 1
# bitmasks
eb_widths_t = c_uint
EB_UDP_MODE = 0
EB_FEC_MODE = 1
# widths
eb_width_t = c_uint
EB_ADDR8 = 0x10
EB_ADDR16 = 0x20
EB_ADDR32 = 0x40
EB_ADDR64 = 0x80
EB_ADDRX = 0xf0
EB_DATA8 = 1
EB_DATA16 = 2
EB_DATA32 = 4
EB_DATA64 = 8
EB_DATAX = 0xf
# etherbone.h
EB_ABI_CODE = 0x488
EB_ENDIAN_MASK = 0x30
EB_BIG_ENDIAN = 0x10
EB_LITTLE_ENDIAN = 0x20
# callback types
eb_user_data_t = c_void_p
class Socket():
"""implement etherbone socket functionality (open, close, poll)
All method return codes are identical to their plain C interface
counterparts.
"""
#_fields_ = [
# ('device_ring', Ring),
# ('vdevice_ring', Ring),
# ('socket', POINTER(eb_socket_t)),
# ('response_table', POINTER(Response)),
# ('response_index', c_int),
#]
def __init__(self, port=0, widths=EB_ADDRX|EB_DATAX):
self.abi_code = EB_ABI_CODE
self.socket = eb_socket_t()
self.port = port
self.widths = widths
def __enter__(self):
status = self.open(self.port, self.widths, self.socket)
if (status != 0):
print 'error opening device, code ', eb_err_code[status]
return self
def __exit__(self, type, value, traceback):
self.close()
def open(self, port, widths, socket):
"""open an Etherbone socket for communicating with remote devices
The port parameter is optional; 0 lets the operating system choose.
After opening the socket, poll must be hooked into an event loop.
"""
return lib.eb_socket_open(self.abi_code, port, widths, byref(socket))
def close(self):
"""close Etherbone socket
Any use of the socket after successful close will probably segfault!
"""
return lib.eb_socket_close(self.socket)
def poll(self):
"""poll the Etherbone socket for activity
"""
return lib.eb_socket_poll(self.socket)
class Device():
"""etherbone device interface (open, close, flush, read, write)
"""
def __init__(self, socket, address):
self.socket = socket.socket
self.address = address
self.widths = socket.widths
self.device = eb_device_t()
self.data_format = EB_DATA32|EB_BIG_ENDIAN
def __enter__(self):
status = self.open(self.socket, self.address, self.widths)
if (status != 0):
print 'error opening device, code ', eb_err_code[status]
return self
def __exit__(self, type, value, traceback):
self.close()
def open(self, socket, ip_port, widths):
"""open a remote Etherbone device
This resolves the address and performs Etherbone end-point discovery.
From the mask of proposed bus widths, one will be selected.
The default port is taken as 0xEBD0.
"""
return lib.eb_device_open(socket, ip_port, widths, 3, byref(self.device))
def close(self):
"""close a remote Etherbone device
"""
return lib.eb_device_close(self.device)
def socket(self):
"""access the socket backing this device
"""
return lib.eb_device_socket(self.device)
def width(self):
"""recover the negotiated data width of the target device
"""
return lib.eb_device_width(self.device)
def flush(self):
"""flush commands queued on the device out the socket
"""
return lib.eb_device_flush(self.device)
def read(self, address):
"""perform a single-read wishbone cycle
Semantically equivalent to cycle_open, cycle_read, cycle_close, device_flush.
The given address is read on the remote device.
The callback cb(user, status, data) is invoked with the result.
The user parameter is passed through uninspected to the callback.
"""
data = eb_data_t()
user_data = None
callback = None
lib.eb_device_read(self.device, address, self.data_format, byref(data), user_data, callback)
return data
def write(self, address, data):
"""perform a single-write wishbone cycle
Semantically equivalent to cycle_open, cycle_write, cycle_close, device_flush.
data is written to the given address on the remote device.
"""
user_data = None
callback = None
return lib.eb_device_write(self.device, address, self.data_format, data, user_data, callback)
class Cycle():
"""provide wishbone cycle operations (open, close, read/write)
Return codes for functions are identical to the plain C counterparts
"""
def __init__(self, device, user_data=None, callback=None):
self.dev_handle = device.device
self.user_data = user_data
self.callback = callback
self.cycle = eb_cycle_t()
self.data_format = device.data_format
def __enter__(self):
status = self.open(self.dev_handle, self.user_data, self.callback)
if (status != 0):
print 'error opening device, code ', eb_err_code[status]
return self
def __exit__(self, type, value, traceback):
self.close()
def open(self, dev_handle, user_data=None, callback=None):
return lib.eb_cycle_open(dev_handle, user_data, callback, byref(self.cycle))
def close(self):
"""end a wishbone cycle
This places the complete cycle at end of the device's send queue.
You will probably want to flush() the underlying device()
soon after calling this method
"""
return lib.eb_cycle_close(self.cycle)
def device(self):
"""access the device targetted by this cycle
"""
return lib.eb_cycle_device(self.cycle)
def read(self, address):
"""prepare a wishbone read phase
The given address is read from the remote device.
"""
data = eb_data_t()
lib.eb_cycle_read(self.cycle, address, self.data_format, byref(data))
return data
def write(self, address, data):
"""perform a wishbone write phase
data is written to the current cursor on the remote device.
If the device was read-only, the operation is discarded.
"""
return lib.eb_cycle_write(self.cycle, address, self.data_format, data)
#
# C99 API
#
def eb_socket_open(port, widths, socket):
"""
Open an Etherbone socket for communicating with remote devices.
The port parameter is optional; 0 lets the operating system choose.
After opening the socket, poll must be hooked into an event loop.
Return codes:
OK - successfully open the socket port
FAIL - operating system forbids access
BUSY - specified port is in use (only possible if port != 0)
"""
return lib.eb_socket_open(EB_ABI_CODE, port, widths, socket)
def eb_socket_close(socket):
"""
Close the Etherbone socket.
Any use of the socket after successful close will probably segfault!
Return codes:
OK - successfully closed the socket and freed memory
BUSY - there are open devices on this socket
"""
return lib.eb_socket_close(socket)
def eb_socket_poll(socket):
"""
Poll the Etherbone socket for activity.
This function must be called regularly to receive incoming packets.
Either call poll very often or hook a read listener on its descriptor.
Callback functions are only executed from within the poll function.
Return codes:
OK - poll complete; no further packets to process
FAIL - socket error (probably closed)
"""
return lib.eb_socket_poll(socket)
def eb_socket_block(socket, timeout_us):
"""
Block until the socket is ready to be polled.
This function is useful if your program has no event loop of its own.
It returns the time spent while waiting.
"""
return lib.eb_socket_block(socket, timeout_us)
def eb_socket_descriptor(socket):
"""
Access the underlying file descriptor of the Etherbone socket.
THIS MUST NEVER BE READ, WRITTEN, CLOSED, OR MODIFIED IN ANY WAY!
It may be used to watch for read readiness to call poll.
"""
return lib.eb_socket_descriptor(socket)
def eb_socket_attach(socket, handler):
"""
Add a device to the virtual bus.
This handler receives all reads and writes to the specified address.
NOTE: the address range [0x0, 0x7fff) is reserved for internal use.
Return codes:
OK - the handler has been installed
FAIL - out of memory
ADDRESS - the specified address range overlaps an existing device.
"""
return lib.eb_socket_attach(socket, handler)
def eb_socket_detach(socket, address):
"""
Detach the device from the virtual bus.
Return codes:
OK - the devices has be removed
FAIL - there is no device at the specified address.
"""
return lib.eb_socket_detach(socket, address)
def eb_device_open(socket, ip_port, proposed_widths, result):
"""
Open a remote Etherbone device.
This resolves the address and performs Etherbone end-point discovery.
From the mask of proposed bus widths, one will be selected.
The default port is taken as 0xEBD0.
Return codes:
OK - the remote etherbone device is ready
FAIL - the remote address did not identify itself as etherbone conformant
ADDRESS - the network address could not be parsed
ABORT - could not negotiate an acceptable data bus width
"""
return lib.eb_device_open(socket, ip_port, proposed_widths, result)
def eb_device_width(device):
"""
Recover the negotiated data width of the target device.
"""
return lib.eb_device_width(device)
def eb_device_close(device):
"""
Close a remote Etherbone device.
Return codes:
OK - associated memory has been freed
BUSY - there are outstanding wishbone cycles on this device
"""
return lib.eb_device_close(device)
def eb_device_socket(device):
"""Access the socket backing this device
"""
return lib.eb_device_socket(device)
def eb_device_flush(socket):
"""
Flush commands queued on the device out the socket.
"""
return lib.eb_device_flush(socket)
def eb_cycle_open_read_only(device, userdata, cycle_callback):
"""
Begin a wishbone cycle on the remote device.
Read/write phases within a cycle hold the device locked.
All reads are executed first followed by all writes.
Until the cycle is closed, the operations are not queued.
If data was read, the callback is run upon cycle completion.
Status codes:
OK - the operation completed successfully
FAIL - the operation failed due to an wishbone ERR_O signal
ABORT - an earlier operation failed and this operation was thus aborted
OVERFLOW - too many operations queued for this cycle
"""
return lib.eb_cycle_open_read_only(device, userdata, cycle_callback)
def eb_cycle_open_read_write(device, user, cb, write_base, write_mode):
"""
Begin a wishbone cycle on the remote device.
Read/write phases within a cycle hold the device locked.
All reads are executed first followed by all writes.
Until the cycle is closed, the operations are not queued.
If data was read, the callback is run upon cycle completion.
Status codes:
OK - the operation completed successfully
FAIL - the operation failed due to an wishbone ERR_O signal
ABORT - an earlier operation failed and this operation was thus aborted
OVERFLOW - too many operations queued for this cycle
"""
return lib.eb_cycle_open_read_write(device, user, cb, write_base, write_mode)
def eb_cycle_close(cycle):
"""
End a wishbone cycle.
This places the complete cycle at end of the device''s send queue.
You will probably want to eb_flush_device soon after calling eb_cycle_close.
"""
return lib.eb_cycle_close(cycle)
def eb_cycle_device(cycle):
"""Access the device targetted by this cycle
"""
return eb_cycle_device(cycle)
def eb_cycle_read(cycle, address):
"""
Prepare a wishbone read phase.
The given address is read from the remote device.
"""
return lib.eb_cycle_read(cycle, address)
def eb_cycle_write(cycle, data):
"""
Perform a wishbone write phase.
data is written to the current cursor on the remote device.
If the device was read-only, the operation is discarded.
"""
return lib.eb_cycle_write(cycle, data)
def eb_device_read(device, address, userdata, cb):
"""
Perform a single-read wishbone cycle.
Semantically equivalent to cycle_open, cycle_read, cycle_close, device_flush.
The given address is read on the remote device.
The callback cb(user, status, data) is invoked with the result.
The user parameter is passed through uninspected to the callback.
Status codes:
OK - the operation completed successfully
FAIL - the operation failed due to an wishbone ERR_O signal
ABORT - an earlier operation failed and this operation was thus aborted
"""
return lib.eb_device_read(device, address, userdata, cb)
def eb_device_write(device, address, data):
"""
Perform a single-write wishbone cycle.
Semantically equivalent to cycle_open, cycle_write, cycle_close, device_flush.
data is written to the given address on the remote device.
"""
return lib.eb_device_write(device, address, data)
# Author: Theodor Stana <theodor.stana@gmail.com>
# Date: October 2014
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, you may find one here:
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# or you may search the http://www.gnu.org website for the version 2 license,
# or you may write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
import sys
import struct
sys.path.append("eb")
from eb import *
from comm_type import *
class FlashM25P:
# Constructor
# comm_params -- communication parameters defining details about how this
# class should interface to the M25P flash chip
def __init__(self, *comm_params):
self.comm = comm_params[0]
if (self.comm == ELMA_I2C_MULTIBOOT):
self.elma = comm_params[1]
self.slot = comm_params[2]
self.mb_base = comm_params[3]
elif (self.comm == ETHERBONE):
self.ebone = comm_params[1]
self.mb_base = comm_params[2]
# Low-level SPI transfer using communication type defined in the constructor
#
# This method handles the OR-ing together of the data bytes and the control
# byte in FAR.
#
# Formatting of data bytes in the FAR register should be done outside this
# function.
def spi_transfer(self, nbytes, cs, dat):
# Communication type is ELMA I2C, therefore use MultiBoot logic on the
# FPGA to access the flash
retval = 0
wval = []
# control byte, to be shifted left by 24
ctrl = ((cs << 3) | 0x4) | (nbytes-1)
if (self.comm == ELMA_I2C_MULTIBOOT):
# Use appropriate command by type
#
# write - we send up to three data bytes in one FAR register write
# writemregs - we send up to 24 data bytes in eight FAR register writes
if isinstance(dat,int):
self.elma.write(self.slot, self.mb_base+0x10, (ctrl << 24) | dat)
else:
for i in xrange(len(dat)):
wval.append((ctrl << 24) | dat[i])
self.elma.writemregs(self.slot, self.mb_base+0x10, wval)
# Read the data and prepare the return value
while (retval & (1 << 28) == 0):
retval = self.elma.read(self.slot, self.mb_base+0x10)
elif (self.comm == ETHERBONE):
if isinstance(dat,int):
self.ebone.write(self.mb_base+0x10, (ctrl << 24) | dat)
else:
for i in xrange(len(dat)):
wval.append((ctrl << 24) | dat[i])
self.ebone.writemregs(self.mb_base+0x10, wval)
while (retval & (1 << 28) == 0):
retval = self.ebone.read(self.mb_base+0x10)
return retval & 0xffffff
# This function is used to write data bytes to flash. It calls self.spi_transfer
# and sends the commands to the flash chip. You can follow the logic of this
# function by looking at the sequence of the write command in [1]
def write(self, addr, dat):
# write enable cmd
self.spi_transfer(1,1,0x06)
self.spi_transfer(1,0,0)
# write page cmd
self.spi_transfer(1,1,0x02)
# send address in reverse order
self.spi_transfer(3,1,self.rev_addr(addr))
# Now, massage the data array into 3-byte commands sent in 32-bit
# registers, in batches of eight registers (writemregs command).
i = 0
l = len(dat)
while (l):
wval = []
# If we have more than 24 bytes left in the data array,
# we can use the full writemregs
if (int(l/24)):
for j in xrange(0, 8):
wval.append((dat[(i+2)+3*j] << 16) | \
(dat[(i+1)+3*j] << 8) | dat[i+3*j])
self.spi_transfer(3, 1, wval)
i += 24;
l -= 24;
# When we're left to fewer than 24 bytes in the array,
# try to use one more writemregs with as many regs as possible
elif (int(l/3)):
for j in xrange(0, l/3):
wval.append((dat[(i+2)+3*j] << 16) | \
(dat[(i+1)+3*j] << 8) | dat[i+3*j])
self.spi_transfer(3,1,wval);
i += 3 * int(l/3)
l -= 3 * int(l/3)
# When we're down to less than three bytes, just use simple one-byte xfers
elif (l):
for j in xrange(0, l):
self.spi_transfer(1,1,dat[i])
i += 1
l -= 1
self.spi_transfer(1,0,0)
# This function is used to read a number of data bytes from the flash memory
# It returns the values of the three consecutive flash registers packed into
# one 24-bit data value in little-endian order
def read(self, addr, nrbytes):
ret = []
self.spi_transfer(1,1,0x0b)
# send address in reverse order
self.spi_transfer(3,1,self.rev_addr(addr))
self.spi_transfer(1,1,0)
# Read bytes in groups of three
for i in range(nrbytes):
ret.append(self.spi_transfer(1,1,0))
self.spi_transfer(1,0,0)
return ret
# Send a sector erase command
def serase(self, addr):
self.spi_transfer(1,1,0x06)
self.spi_transfer(1,0,0)
self.spi_transfer(1,1,0xD8)
# send address in reverse order
self.spi_transfer(3,1,self.rev_addr(addr))
self.spi_transfer(1,0,0)
# Send a block erase command
def berase(self):
self.spi_transfer(1,1,0x06)
self.spi_transfer(1,0,0)
self.spi_transfer(1,1,0xc7)
self.spi_transfer(1,0,0)
# Read status register command
def rsr(self):
self.spi_transfer(1,1,0x05)
ret = self.spi_transfer(1,1,0)
self.spi_transfer(1,0,0)
return ret
def read_id(self):
ret = []
self.spi_transfer(1,1,0x9f)
for i in range(20):
ret.append(self.spi_transfer(1,1,0))
self.spi_transfer(1,0,0)
ret_pack = struct.pack('B'*20, *ret)
return ret_pack
def rev_addr(self, addr):
rev_addr = ((addr & 0xff0000) >> 16) | (((addr & 0xff00) >> 8) << 8) | \
((addr & 0xff) << 16)
return rev_addr
import os
import sys
import time
import struct
sys.path.append("eb")
from eb import *
from xil_multiboot import *
if __name__ == "__main__":
ip = 'rflab2.lbl.gov'
target = EB(ip)
target.open()
baseaddr = 0x20800
mb = XilMultiboot(ETHERBONE, target, baseaddr, "foo_bit")
mb.write(0)
status = mb.flash.rsr()
print 'Status:' + hex(status)
#mb.read(0,0xff)
# read id
id = mb.flash.read_id()
print 'ID: 0x' + id.encode('hex')
#mb.flash.serase(0x0)
# mb.flash.write(0x0,range(0,255))
dat = mb.flash.read(0x0,0xff)
dat_pack = struct.pack('B'*len(dat), *dat)
print 'Flash: 0x' + dat_pack.encode('hex')
# Author: Theodor Stana <theodor.stana@gmail.com>
# Date: October 2014
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, you may find one here:
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# or you may search the http://www.gnu.org website for the version 2 license,
# or you may write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
import sys
sys.path.append("eb")
from eb import *
import time
from functools import partial
from comm_type import *
from flash_m25p import *
import os
#===============================================================================
MB_CR_OFS = 0x00
MB_SR_OFS = 0x04
MB_GBBAR_OFS = 0x08
MB_MBBAR_OFS = 0x0C
MB_FAR_OFS = 0x10
#===============================================================================
class XilMultiboot:
# Constructor
def __init__(self, *param):
if param[0] == ELMA_I2C_MULTIBOOT:
self.comm = param[0]
self.elma = param[1]
self.slot = param[2]
self.mb_base = param[3]
self.bitstream = param[4]
self.flash = FlashM25P(self.comm, self.elma, self.slot, self.mb_base)
elif param[0] == ETHERBONE:
self.comm = ETHERBONE
self.ebone = param[1]
self.slot = 0
self.mb_base = param[2]
self.bitstream = param[3]
self.flash = FlashM25P(self.comm, self.ebone, self.mb_base)
#
# Print read or write progress to screen
#
def _progress(self, sa, ea, ca):
actual = ca - sa
total = ea - sa
progress = (float(actual)/float(total)) * 100
if (progress > 100.00):
progress = 100.00
sys.stdout.write("\r %3.2f%% (0x%06x)" % (progress, ca))
sys.stdout.flush()
#
# Read from flash
#
def read(self, sa, ea):
# Ask for and open bitstream file
fname = raw_input("Output file name for flash readout: ")
f = open(fname,'wb')
# Read the data and dump to file
print("Reading flash contents from board in slot %d" % self.slot)
dat = []
for i in range(sa, ea, 256):
dat += self.flash.read(i, 256)
self._progress(sa, ea, i)
i += 256
self._progress(sa, ea, i)
print("")
dat = ''.join(map(chr,dat))
f.write(dat)
f.close()
#
# Write to flash
#
def write(self, addr):
print("Writing bitstream to board in slot %d" % self.slot)
# Ask for and open bitstream file
f = open(self.bitstream,'rb')
sta = addr
tot = os.path.getsize(self.bitstream)
end = sta + tot
# Start reading input file 256 bytes at a time and write to flash
for dat in iter(partial(f.read, 256), ''):
dat = [int(d.encode("hex"), 16) for d in dat]
self._progress(sta, end, addr)
# Erase on sector boundary
if not (addr % 0x10000):
print 'erase sector boundary'
self.flash.serase(addr)
while (self.flash.rsr() & 0x01):
pass
# Write data to page
self.flash.write(addr, dat)
while (self.flash.rsr() & 0x01):
pass
# increment to next page address
addr += 256
self._progress(sta, end, addr)
print("")
# Close file handle
f.close()
print("DONE!")
#
# Start IPROG sequence
#
def iprog(self, addr):
if (self.comm == ELMA_I2C_MULTIBOOT):
print("Issuing IPROG command to board in slot %d..." % self.slot)
self.elma.write(self.slot, self.mb_base+MB_GBBAR_OFS, 0x44 | (0x0b << 24))
self.elma.write(self.slot, self.mb_base+MB_MBBAR_OFS, addr | (0x0b << 24))
self.elma.write(self.slot, self.mb_base+MB_CR_OFS, 0x10000)
try:
self.elma.write(self.slot, self.mb_base+MB_CR_OFS, 0x20000)
except NAckError:
pass
# Set timeout...
t0 = time.time()
t1 = t0 + 60
# and wait for the FPGA to gracefully respond, or die trying
while (1):
try:
if (time.time() >= t1):
print("Timeout, IPROG unsuccessful!")
break
if ((self.elma.read(self.slot, 0x4) & 0xf0) == 0x00):
print("IPROG unsuccessful, fallback to Golden bitstream occured!")
break
else:
print("IPROG successful!")
break
except NAckError:
continue
#
# Sequence to read FPGA configuration register
#
def rdcfgreg(self):
print("Press 'q' to end config reg readout")
while 1:
try:
reg = raw_input('Address (hex): ')
reg = int(reg, 16)
if (reg < 0x00) or (reg > 0x22):
raise ValueError
if (self.comm == ELMA_I2C_MULTIBOOT):
self.elma.write(self.slot, self.mb_base+MB_CR_OFS, (1 << 6) | reg)
val = self.elma.read(self.slot, self.mb_base+MB_SR_OFS)
if (val & (1 << 16)):
print("REG(0x%02X) = 0x%04X" % (reg, val & 0xffff))
else:
print("CFGREGIMG invalid!")
except ValueError:
if (reg == 'q'):
break
print("Please input a hex value in the range [0x00, 0x22] or 'q' to quit")
#
# Set bitstream file path
#
def set_bitstream_file(self, path):
self.bitstream = path
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment