Source code for edurov.utils

Different utility functions practical for ROV control

import ctypes
import os
import platform
import signal
import socket
import struct
import subprocess
import warnings

def detect_pi():
    return platform.linux_distribution()[0].lower() == 'debian'

if detect_pi():
    import serial
    import fcntl

def is_int(number):
    if isinstance(number, int):
        return True
            if isinstance(int(number), int):
                return True
        except ValueError:
    return False

def resolution_to_tuple(resolution):
    if 'x' not in resolution:
        raise ValueError('Resolution must be in format WIDTHxHEIGHT')
    screen_size = tuple([int(val) for val in resolution.split('x')])
    if len(screen_size) is not 2:
        raise ValueError('Error in parsing resolution, len is not 2')
    return screen_size

def preexec_function():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def valid_resolution(resolution):
    if 'x' in resolution:
        w, h = resolution.split('x')
        if is_int(w) and is_int(h):
            return resolution
    warning('Resolution must be WIDTHxHEIGHT')

def server_ip(port):
    online_ips = []
    for interface in [b'eth0', b'wlan0']:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            ip = socket.inet_ntoa(fcntl.ioctl(
                struct.pack('256s', interface[:15])
        except OSError:
    return ' or '.join(['{}:{}'.format(ip, port) for ip in online_ips])

def check_requirements():
    if detect_pi():
        camera = subprocess.check_output(['vcgencmd',
        if '0' in camera:
            warning('Camera not enabled or connected properly')
            return False
            return True
        warning('eduROV only works on a raspberry pi')
        return False

[docs]def send_arduino(msg, serial_connection): """ Send the *msg* over the *serial_connection* Adds a hexadecimal number of 6 bytes to the start of the message before sending it. "hello there" -> "0x000bhello there" Parameters ---------- msg : str or bytes the message you want to send serial_connection : object the :code:`serial.Serial` object you want to use for sending """ if not isinstance(msg, bytes): msg = str(msg).encode() length = "{0:#0{1}x}".format(len(msg), 6).encode() data = length + msg serial_connection.write(data)
[docs]def receive_arduino(serial_connection): """ Returns a message received over *serial_connection* Expects that the message received starts with a 6 bytes long number describing the size of the remaining data. "0x000bhello there" -> "hello there". Parameters ---------- serial_connection : object the :code:`serial.Serial` object you want to use for receiving Returns ------- msg : str or None the message received or None """ if serial_connection.inWaiting(): msg = serial_connection.readline().decode().rstrip() if len(msg) >= 6: try: length = int(msg[:6], 0) data = msg[6:] if length == len(data): return data else: warning('Received incomplete serial string: {}' .format(data), 'default') except ValueError: pass return None
[docs]def send_arduino_simple(msg, serial_connection): """ Send the *msg* over the *serial_connection* Same as :code:`send_arduino`, but doesn't add anything to the message before sending it. Parameters ---------- msg : str or bytes the message you want to send serial_connection : object the :code:`serial.Serial` object you want to use for sending """ if not isinstance(msg, bytes): msg = str(msg).encode() serial_connection.write(msg)
[docs]def receive_arduino_simple(serial_connection, min_length=1): """ Returns a message received over *serial_connection* Same as :code:`receive_arduino` but doesn't expect that the message starts with a hex number. Parameters ---------- serial_connection : object the :code:`serial.Serial` object you want to use for receiving min_length : int, optional if you only want that the function to only return the string if it is at least this long. Returns ------- msg : str or None the message received or None """ if serial_connection.inWaiting(): msg = serial_connection.readline().decode().rstrip() if len(msg) >= min_length: return msg else: return None
[docs]def serial_connection(port='/dev/ttyACM0', baudrate=115200, timeout=0.05): """ Establishes a serial connection Parameters ---------- port : str, optional the serial port you want to use baudrate : int, optional the baudrate of the serial connection timeout : float, optional read timeout value Returns ------- connection : class or None a :code:`serial.Serial` object if successful or None if not """ try: ser = serial.Serial(port, baudrate, timeout=timeout) ser.close() return ser except FileNotFoundError: pass except serial.serialutil.SerialException: pass except ValueError: pass warning(message="""Could not establish serial connection at {}\n Try running 'ls /dev/*tty*' to find correct port""" .format(port), filter='default') return None
def warning(message, filter='error', category=UserWarning): warnings.simplefilter(filter, category) warnings.formatwarning = warning_format warnings.warn(message) def warning_format(message, category, filename, lineno, file=None, line=None): return 'WARNING:\n {}: {}\n File: {}:{}\n'.format( category.__name__, message, filename, lineno)
[docs]def free_drive_space(as_string=False): """ Checks and returns the remaining free drive space Parameters ---------- as_string : bool, optional set to True if you want the function to return a formatted string. 4278 -> 4.28 GB Returns ------- space : float or str the remaining MB in float or as string if *as_string=True* """ if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p('/'), None, None, ctypes.pointer(free_bytes)) mb = free_bytes.value / 1024 / 1024 else: st = os.statvfs('/') mb = st.f_bavail * st.f_frsize / 1024 / 1024 if as_string: if mb >= 1000: return '{:.2f} GB'.format(mb / 1000) else: return '{:.0f} MB'.format(mb) else: return mb
[docs]def cpu_temperature(): """ Checks and returns the on board CPU temperature Returns ------- temperature : float the temperature """ cmds = ['/opt/vc/bin/vcgencmd', 'measure_temp'] response = subprocess.check_output(cmds).decode() return float(response.split('=')[1].split("'")[0].rstrip())