Source code for edurov.core

import os
import subprocess
import time
from multiprocessing import Process

from edurov.sync import start_sync_classes
from edurov.utils import warning, preexec_function, detect_pi
from edurov.web import start_http_server

if detect_pi():
    import Pyro4

[docs]class WebMethod(object): """ Starts a video streaming from the rasparry pi and a webserver that can handle user input and other requests. Parameters ---------- index_file : str Absolute path to the frontpage of the webpage, must be called ``index.html``. For more information, see :ref:`Displaying-the-video-feed`. video_resolution : str, optional A string representation of the wanted video resolution in the format WIDTHxHEIGHT. fps : int, optional Wanted framerate, may not be achieved depending on available resources and network. server_port : int, optional The web page will be served at this port debug : bool, optional If set True, additional information will be printed for debug purposes. runtime_functions : callable or list, optional Should be a callable function or a list of callable functions, will be started as independent processes automatically. For more information, see :ref:`Controlling-motors-(or-anything)`. custom_response : callable, optional If set, this function will be called if default web server is not able to handle a GET request, should return a str or None. If returned value starts with ``redirect=`` followed by a path, the server will redirect the browser to this path. The callable must accept two parameters whereas the second one is the requested path. For more information, see :ref:`Custom-Responses`. Examples -------- >>> import os >>> from edurov import WebMethod >>> >>> file = os.path.join(os.path.dirname(__file__), 'index.html', ) >>> web_method = WebMethod(index_file=file) >>> web_method.serve() """ def __init__(self, index_file, video_resolution='1024x768', fps=30, server_port=8000, debug=False, runtime_functions=None, custom_response=None): self.res = video_resolution self.fps = fps self.server_port = server_port self.debug = debug self.run_funcs = self._valid_runtime_functions(runtime_functions) self.cust_resp = self._valid_custom_response(custom_response) self.index_file = self._valid_index_file(index_file) def _valid_custom_response(self, custom_response): if custom_response: if not callable(custom_response): warning('custom_response parameter has to be a callable ' 'function, not type {}'.format(type(custom_response))) return None return custom_response def _valid_runtime_functions(self, runtime_functions): if runtime_functions: if callable(runtime_functions): runtime_functions = [runtime_functions] elif isinstance(runtime_functions, list): for f in runtime_functions: if not callable(f): warning( 'Parameter runtime_functions has to be a function ' 'or a list of functions, not {}'.format(type(f))) else: warning('Parameter runtime_functions has to be a function ' 'or a list of functions, not {}' .format(type(runtime_functions))) return runtime_functions def _valid_index_file(self, file_path): if not 'index.html' in file_path: warning('The index files must be called "index.html') if os.path.isfile(file_path): return os.path.abspath(file_path) else: warning('Could not find "{}", needs absolute path' .format(file_path)) return None
[docs] def serve(self, timeout=None): """ Will start serving the web page defined by the index_file parameter Parameters ---------- timeout : int, optional if set, the web page will only be served for that many seconds before it automatically shuts down Notes ----- This method will block the rest of the script. """ start = time.time() name_server = subprocess.Popen('pyro4-ns', shell=False, preexec_fn=preexec_function) time.sleep(2) pyro_classes = Process(target=start_sync_classes) pyro_classes.start() time.sleep(4) web_server = Process( target=start_http_server, args=(self.res, self.fps, self.server_port, self.index_file, self.debug, self.cust_resp)) web_server.daemon = True web_server.start() processes = [] if self.run_funcs: for f in self.run_funcs: p = Process(target=f) p.daemon = True p.start() processes.append(p) with Pyro4.Proxy("PYRONAME:ROVSyncer") as rov: try: while rov.run: if timeout: if time.time() - start >= timeout: break except KeyboardInterrupt: pass finally: print('Shutting down') web_server.terminate() rov.run = False if self.run_funcs: for p in processes: p.join(3) pyro_classes.terminate() name_server.terminate()