Source code for edurov.utils

"""
Different utility functions practical for ROV control
"""

import ctypes
import os
import platform
import signal
import socket
import struct
import subprocess
import warnings


def detect_pi():
    return platform.linux_distribution()[0].lower() == 'debian'


if detect_pi():
    import serial
    import fcntl


def is_int(number):
    if isinstance(number, int):
        return True
    else:
        try:
            if isinstance(int(number), int):
                return True
        except ValueError:
            pass
    return False


def resolution_to_tuple(resolution):
    if 'x' not in resolution:
        raise ValueError('Resolution must be in format WIDTHxHEIGHT')
    screen_size = tuple([int(val) for val in resolution.split('x')])
    if len(screen_size) is not 2:
        raise ValueError('Error in parsing resolution, len is not 2')
    return screen_size


def preexec_function():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def valid_resolution(resolution):
    if 'x' in resolution:
        w, h = resolution.split('x')
        if is_int(w) and is_int(h):
            return resolution
    warning('Resolution must be WIDTHxHEIGHT')


def server_ip(port):
    online_ips = []
    for interface in [b'eth0', b'wlan0']:
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        try:
            ip = socket.inet_ntoa(fcntl.ioctl(
                sock.fileno(),
                0x8915,
                struct.pack('256s', interface[:15])
            )[20:24])
            online_ips.append(ip)
        except OSError:
            pass
        sock.close()
    return ' or '.join(['{}:{}'.format(ip, port) for ip in online_ips])


def check_requirements():
    if detect_pi():
        camera = subprocess.check_output(['vcgencmd',
                                          'get_camera']).decode().rstrip()
        if '0' in camera:
            warning('Camera not enabled or connected properly')
            return False
        else:
            return True
    else:
        warning('eduROV only works on a raspberry pi')
        return False


[docs]def send_arduino(msg, serial_connection): """ Send the *msg* over the *serial_connection* Adds a hexadecimal number of 6 bytes to the start of the message before sending it. "hello there" -> "0x000bhello there" Parameters ---------- msg : str or bytes the message you want to send serial_connection : object the :code:`serial.Serial` object you want to use for sending """ if not isinstance(msg, bytes): msg = str(msg).encode() length = "{0:#0{1}x}".format(len(msg), 6).encode() data = length + msg serial_connection.write(data)
[docs]def receive_arduino(serial_connection): """ Returns a message received over *serial_connection* Expects that the message received starts with a 6 bytes long number describing the size of the remaining data. "0x000bhello there" -> "hello there". Parameters ---------- serial_connection : object the :code:`serial.Serial` object you want to use for receiving Returns ------- msg : str or None the message received or None """ if serial_connection.inWaiting(): msg = serial_connection.readline().decode().rstrip() if len(msg) >= 6: try: length = int(msg[:6], 0) data = msg[6:] if length == len(data): return data else: warning('Received incomplete serial string: {}' .format(data), 'default') except ValueError: pass return None
[docs]def send_arduino_simple(msg, serial_connection): """ Send the *msg* over the *serial_connection* Same as :code:`send_arduino`, but doesn't add anything to the message before sending it. Parameters ---------- msg : str or bytes the message you want to send serial_connection : object the :code:`serial.Serial` object you want to use for sending """ if not isinstance(msg, bytes): msg = str(msg).encode() serial_connection.write(msg)
[docs]def receive_arduino_simple(serial_connection, min_length=1): """ Returns a message received over *serial_connection* Same as :code:`receive_arduino` but doesn't expect that the message starts with a hex number. Parameters ---------- serial_connection : object the :code:`serial.Serial` object you want to use for receiving min_length : int, optional if you only want that the function to only return the string if it is at least this long. Returns ------- msg : str or None the message received or None """ if serial_connection.inWaiting(): msg = serial_connection.readline().decode().rstrip() if len(msg) >= min_length: return msg else: return None
[docs]def serial_connection(port='/dev/ttyACM0', baudrate=115200, timeout=0.05): """ Establishes a serial connection Parameters ---------- port : str, optional the serial port you want to use baudrate : int, optional the baudrate of the serial connection timeout : float, optional read timeout value Returns ------- connection : class or None a :code:`serial.Serial` object if successful or None if not """ try: ser = serial.Serial(port, baudrate, timeout=timeout) ser.close() ser.open() return ser except FileNotFoundError: pass except serial.serialutil.SerialException: pass except ValueError: pass warning(message="""Could not establish serial connection at {}\n Try running 'ls /dev/*tty*' to find correct port""" .format(port), filter='default') return None
def warning(message, filter='error', category=UserWarning): warnings.simplefilter(filter, category) warnings.formatwarning = warning_format warnings.warn(message) def warning_format(message, category, filename, lineno, file=None, line=None): return 'WARNING:\n {}: {}\n File: {}:{}\n'.format( category.__name__, message, filename, lineno)
[docs]def free_drive_space(as_string=False): """ Checks and returns the remaining free drive space Parameters ---------- as_string : bool, optional set to True if you want the function to return a formatted string. 4278 -> 4.28 GB Returns ------- space : float or str the remaining MB in float or as string if *as_string=True* """ if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p('/'), None, None, ctypes.pointer(free_bytes)) mb = free_bytes.value / 1024 / 1024 else: st = os.statvfs('/') mb = st.f_bavail * st.f_frsize / 1024 / 1024 if as_string: if mb >= 1000: return '{:.2f} GB'.format(mb / 1000) else: return '{:.0f} MB'.format(mb) else: return mb
[docs]def cpu_temperature(): """ Checks and returns the on board CPU temperature Returns ------- temperature : float the temperature """ cmds = ['/opt/vc/bin/vcgencmd', 'measure_temp'] response = subprocess.check_output(cmds).decode() return float(response.split('=')[1].split("'")[0].rstrip())