/* * devinput - zavai /dev/input device handling * * Copyright (C) 2009 Enrico Zini * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* import sys import os import os.path import time import struct import signal import optparse import dbus import dbus.mainloop.glib import gobject import subprocess */ namespace zavai { namespace input { // For a list of dbus services, look in /etc/dbus-1/system.d/ public abstract class DevInput : zavai.Service { public string device { get; construct; } protected IOChannel fd = null; protected uint fd_watch = 0; public DevInput() { //usage.ResourceChanged += on_resourcechanged; } protected void close_fd() { if (fd != null) { try { fd.shutdown(false); } catch (IOChannelError e) { zavai.log.error("When closing " + device + ": " + e.message); } fd = null; } } /* public void on_resourcechanged(dynamic DBus.Object pos, string name, bool state, HashTable attributes) { zavai.log.info("RESOURCE CHANGED " + name); } */ protected bool on_input_data(IOChannel source, IOCondition condition) { stderr.printf("GOT INPUT ON %s\n", device); /* buf = self.input_fd.read(16) ts1, ts2, type, code, value = struct.unpack("LLHHI", buf) #print ts1, ts2, type, code, value if type == 1 and code == 119: if value: #print "BUTTON RELEASE" pass else: if self.last_button_press + 1 < ts1: self.last_button_press = ts1 if self.button_press_handler is not None: self.button_press_handler() #print "BUTTON PRESS" elif type == 5 and code == 2: if value: info("Headset plugged in") self.mixer_for_headset(self) else: info("Headset plugged out") self.mixer_for_handset(self) */ return true; } /// Start reading from the device public override void start() { if (started) return; if (fd != null) close_fd(); // Open the device and listed to it using the GObject main loop fd = new IOChannel.file(device, "r"); fd_watch = fd.add_watch(IOCondition.IN, on_input_data); base.start(); } // Stop reading from the device public override void stop() { if (!started) return; if (fd != null) { Source.remove(fd_watch); close_fd(); } base.stop(); } } public class Buttons : DevInput { public Buttons() { name = "input.buttons"; // FIXME: change to event4 for the moko device = "/dev/input/event1"; } } /* # TODO: # - hook into the headset plugged/unplugged event # - if unplugged, turn on handset microphone # - if plugged, redo headest mixer settings class Audio: "Handle mixer settings, audio recording and headset button presses" def __init__(self, button_press_handler = None): self.saved_scenario = os.path.expanduser("~/.audiomap.state") # Setup the mixer # Set mixer to record from headset and handle headset button self.save_scenario(self.saved_scenario) self.load_scenario("/usr/share/openmoko/scenarios/voip-handset.state") # This is a work-around because I have not found a way to query for the # current headset state, I can only know when it changes. So in my # system I configured oeventsd with a rule to touch this file when the # headset is plugged in, and remove the file when it's plugged out. if os.path.exists("/tmp/has_headset"): self.mixer_for_headset(True) else: self.mixer_for_handset(True) #self.mixer_set("DAPM Handset Mic", "mute") #self.mixer_set("DAPM Headset Mic", "unmute") #self.mixer_set("Left Mixer Sidetone Playback Sw", "unmute") #self.mixer_set("ALC Mixer Mic1", "cap") #self.mixer_set("Amp Spk", "mute") # We don't need the phone playing what we say # Watch the headset button self.button_press_handler = button_press_handler self.input_fd = open("/dev/input/event4", "rb") self.input_watch = gobject.io_add_watch(self.input_fd.fileno(), gobject.IO_IN, self.on_input_data) self.last_button_press = 0 self.recorder = None self.basename = None def mixer_for_headset(self, force=False): if not force and self.has_headset: return info("Setting mixer for headset") # TODO: find out how to disable the handset microphone: this does not # seem to be sufficient self.mixer_set_many( ("DAPM Handset Mic", "mute"), ("DAPM Headset Mic", "unmute"), ("Left Mixer Sidetone Playback Sw", "unmute"), ("ALC Mixer Mic1", "cap"), ("Amp Spk", "mute") # We don't need the phone playing what we say ) self.has_headset = True def mixer_for_handset(self, force=False): if not force and not self.has_headset: return info("Setting mixer for handset") self.mixer_set_many( ("DAPM Handset Mic", "unmute"), ("DAPM Headset Mic", "mute"), ("Left Mixer Sidetone Playback Sw", "mute"), ("ALC Mixer Mic1", "cap"), ("Amp Spk", "mute") # We don't need the phone playing what we say ) self.has_headset = False def set_basename(self, basename): self.basename = basename def start_recording(self): if self.basename is None: raise RuntimeError("Recording requested but basename not set") self.recorder = subprocess.Popen( ["arecord", "-D", "hw", "-f", "cd", "-r", "8000", "-t", "wav", self.basename + ".wav"]) def start_levels(self): self.recorder = subprocess.Popen( ["arecord", "-D", "hw", "-f", "cd", "-r", "8000", "-t", "wav", "-V", "stereo", "/dev/null"]) def close(self): if self.recorder is not None: os.kill(self.recorder.pid, signal.SIGINT) self.recorder.wait() # Restore mixer settings self.load_scenario(self.saved_scenario) gobject.source_remove(self.input_watch) self.input_fd.close() def on_input_data(self, source, condition): buf = self.input_fd.read(16) ts1, ts2, type, code, value = struct.unpack("LLHHI", buf) #print ts1, ts2, type, code, value if type == 1 and code == 119: if value: #print "BUTTON RELEASE" pass else: if self.last_button_press + 1 < ts1: self.last_button_press = ts1 if self.button_press_handler is not None: self.button_press_handler() #print "BUTTON PRESS" elif type == 5 and code == 2: if value: info("Headset plugged in") self.mixer_for_headset(self) else: info("Headset plugged out") self.mixer_for_handset(self) return True def save_scenario(self, name): res = subprocess.call(["alsactl", "store", "-f", name]) if res != 0: raise RuntimeError("Saving audio scenario to '%s' failed" % name) def load_scenario(self, name): res = subprocess.call(["alsactl", "restore", "-f", name]) if res != 0: raise RuntimeError("Loading audio scenario '%s' failed" % name) def mixer_set(self, name, *args): args = map(str, args) res = subprocess.call(["amixer", "-q", "set", name] + args) if res != 0: raise RuntimeError("Setting mixer '%s' to %s failed" % (name, " ".join(args))) # Will do this when we find out the syntax for giving amixer commands on stdin def mixer_set_many(self, *args): """Perform many mixer set operations via amixer --stdin""" proc = subprocess.Popen(["amixer", "-q", "--stdin"], stdin=subprocess.PIPE) cmd_input = [] for k, v in args: cmd_input.append("sset " + repr(k) + " " + repr(v)) (out, err) = proc.communicate(input="\n".join(cmd_input)) res = proc.wait() if res != 0: raise RuntimeError("Setting mixer failed") # For a list of dbus services, look in /etc/dbus-1/system.d/ class GPS(): def __init__(self, bus = None): if bus is None: self.bus = dbus.SystemBus() else: self.bus = bus # see mdbus -s org.freesmartphone.ousaged /org/freesmartphone/Usage self.usage = self.bus.get_object('org.freesmartphone.ousaged', '/org/freesmartphone/Usage') self.usage = dbus.Interface(self.usage, "org.freesmartphone.Usage") # see mdbus -s org.freesmartphone.ogpsd /org/freedesktop/Gypsy gps = self.bus.get_object('org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') self.gps = dbus.Interface(gps, "org.freedesktop.Gypsy.Device") self.gps_time = dbus.Interface(gps, "org.freedesktop.Gypsy.Time") self.gps_position = dbus.Interface(gps, 'org.freedesktop.Gypsy.Position') self.gps_ubx = dbus.Interface(gps, 'org.freesmartphone.GPS.UBX') self.gps_debug = GPSDebug(self.gps_ubx) # Request GPS resource self.usage.RequestResource('GPS') info("Acquired GPS") self.waiting_for_fix = None def close(self): self.usage.ReleaseResource('GPS') info("Released GPS") def wait_for_fix(self, callback): status = self.gps.GetFixStatus() if status in [2, 3]: info("We already have a fix, good.") callback() return True else: info("Waiting for a fix...") self.waiting_for_fix = callback self.bus.add_signal_receiver( self.on_fix_status_changed, 'FixStatusChanged', 'org.freedesktop.Gypsy.Device', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') return False def on_fix_status_changed(self, status): if status not in [2, 3]: return info("Got GPS fix") self.bus.remove_signal_receiver( self.on_fix_status_changed, 'FixStatusChanged', 'org.freedesktop.Gypsy.Device', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') if self.waiting_for_fix: self.waiting_for_fix() self.waiting_for_fix = None def track_position(self, callback): self.bus.add_signal_receiver( callback, 'PositionChanged', 'org.freedesktop.Gypsy.Position', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') # This is taken from Zhone class GPSDebug(): def __init__(self, gps): self.gps_ubx = gps.gps_ubx self.busy = None self.want = set( ["NAV-STATUS", "NAV-SVINFO"] ) self.have = set() self.error = set() def _update( self ): if self.busy is None: pending = self.want - self.have - self.error if pending: self.busy = pending.pop() self.gps_ubx.SetDebugFilter( self.busy, True, reply_handler=self.on_debug_reply, error_handler=self.on_debug_error, ) def request(self): self.have = set() self._update() def on_debug_reply(self): self.have.add(self.busy) self.busy = None self._update() def on_debug_error(self, e): info(e, "error while requesting debug packet %s" % self.busy) self.error.add(self.busy) self.busy = None self._update() class GPSMonitor(): def __init__(self, gps): self.gps = gps self.gps_debug = GPSDebug(gps) def start(self): # TODO: find out how come sometimes these events are not sent self.gps.bus.add_signal_receiver( self.on_satellites_changed, 'SatellitesChanged', 'org.freedesktop.Gypsy.Satellite', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') self.gps.bus.add_signal_receiver( self.on_ubxdebug_packet, 'DebugPacket', 'org.freesmartphone.GPS.UBX', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') self.gps_debug.request() def stop(self): self.gps.bus.remove_signal_receiver( self.on_satellites_changed, 'SatellitesChanged', 'org.freedesktop.Gypsy.Satellite', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') self.gps.bus.remove_signal_receiver( self.on_ubxdebug_packet, 'DebugPacket', 'org.freesmartphone.GPS.UBX', 'org.freesmartphone.ogpsd', '/org/freedesktop/Gypsy') def on_satellites_changed(self, satellites): #if not satellites: # info("Satellite status: none") # return self.gps_debug.request() #info("Satellite status:") #for sat in satellites: # if sat[0] not in self.sat_data: # self.sat_data[sat[0]] = [sat, None] # else: # self.sat_data[sat[0]][0] = sat #self.print_sat_data() def on_ubxdebug_packet(self, clid, length, data): # In zhone it is cbUBXDebugPacket #if clid == "NAV-STATUS" and data: # i = ["%s: %d" % (k, data[0][k]) for k in sorted(data[0].keys())] # info("Status:", " ".join(i)) ## if data[0]['TTFF']: ## info("TTFF: %f", data[0]['TTFF']/1000.0) if clid == "NAV-SVINFO": self.print_ubx_sat_data(data[1:]) #else: # info("gps got ubxdebug packet", clid) # info("DATA:", data) def print_ubx_sat_data(self, ubxinfo): info("CH ID SN ELE AZI Used Diff Alm Eph Bad Status") for sv in ubxinfo: if sv["CNO"] == 0: continue svid = sv["SVID"] used = sv["Flags"] & 0x01 diff = sv["Flags"] & 0x02 almoreph = sv["Flags"] & 0x04 eph = sv["Flags"] & 0x08 bad = sv["Flags"] & 0x10 qi = ("%i: " % sv["QI"]) + { 0: "idle", 1: "searching", 2: "signal acquired", 3: "signal unusable", 4: "code lock", 5: "code&carrier lock", 6: "code&carrier lock", 7: "receiving data" }[sv["QI"]] info("%2d %2d %2d %3d %3d %s %s %s %s %s %s" % ( sv["chn"], sv["SVID"], sv["CNO"], sv["Elev"], sv["Azim"], used and "used" or "----", diff and "diff" or "----", almoreph and "alm" or "---", eph and "eph" or "---", bad and "bad" or "---", qi)) def print_sat_data(self, satellites): for sat in satellites: if sat[4] == 0: continue info("PRN %u" % sat[0], sat[1] and "used" or "unused", "el %u" % sat[2], "az %u" % sat[3], "snr %u" % sat[4]) class Hub: """Hub that manages all the various resources that we use, and initiates operations.""" def __init__(self, bus = None): self.bus = bus self.waiting_for_fix = False self.basename = None self.recorder = None self.gpx = None self.gps = None self.gps_monitor = None self.audio = None self.last_pos = None def shutdown(self): # Stop recording if self.audio is not None: self.audio.close() # Close waypoints file if self.gpx is not None: self.gpx.close() # Stop the GPS monitor if self.gps_monitor: self.gps_monitor.stop() # Release the GPS if self.gps is not None: self.gps.close() def levels(self): self.audio = Audio() self.audio.start_levels() def record(self): self.audio = Audio(self.make_waypoint) self.gps = GPS() # Get a fix and start recording if not self.gps.wait_for_fix(self.start_recording): self.gps_monitor = GPSMonitor(self.gps) self.gps_monitor.start() def monitor(self): self.audio = None self.gps = GPS() self.gps_monitor = GPSMonitor(self.gps) self.gps_monitor.start() def start_recording(self): if self.gps_monitor: self.gps_monitor.stop() self.gps_monitor = None if not self.audio: return # Sync system time gpstime = self.gps.gps_time.GetTime() subprocess.call(["date", "-s", "@%d" % gpstime]) subprocess.call(["hwclock", "--systohc"]) # Compute basename for output files self.basename = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(gpstime)) self.basename = os.path.join(AUDIODIR, self.basename) # Start recording the GPX track self.gpx = GPX(self.basename) self.gps.track_position(self.on_position_changed) # Start recording in background forking arecord self.audio.set_basename(self.basename) self.audio.start_recording() def on_position_changed(self, fields, tstamp, lat, lon, alt): self.last_pos = (fields, tstamp, lat, lon, alt) if self.gpx: self.gpx.trackpoint(tstamp, lat, lon, alt) def make_waypoint(self): if self.gpx is None: return if self.last_pos is None: self.last_pos = self.gps.gps_position.GetPosition() (fields, tstamp, lat, lon, alt) = self.last_pos self.gpx.waypoint(tstamp, lat, lon, alt) info("Making waypoint at %s: %f, %f, %f" % ( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(tstamp)), lat, lon, alt)) class Parser(optparse.OptionParser): def __init__(self, *args, **kwargs): # Yes, in 2009 optparse from the *standard library* still uses old # style classes optparse.OptionParser.__init__(self, *args, **kwargs) def error(self, msg): sys.stderr.write("%s: error: %s\n\n" % (self.get_prog_name(), msg)) self.print_help(sys.stderr) sys.exit(2) parser = Parser(usage="usage: %prog [options]", version="%prog "+ VERSION, description="Create a GPX and audio trackFind the times in the wav file when there is clear voice among the noise") parser.add_option("-v", "--verbose", action="store_true", help="verbose mode") parser.add_option("-m", "--monitor", action="store_true", help="only keep the GPS on and monitor satellite status") parser.add_option("-l", "--levels", action="store_true", help="only show input levels") (opts, args) = parser.parse_args() if not opts.monitor and not opts.verbose: def info(*args): pass # Set up dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) mainloop = gobject.MainLoop() def on_sigint(signum, frame): mainloop.quit() signal.signal(signal.SIGINT, on_sigint) hub = Hub() if opts.monitor: hub.monitor() elif opts.levels: hub.levels() else: hub.record() mainloop.run() hub.shutdown() # Create waypoint at button press # At button press, raise window # Keep window until after 5 seconds it gets input, or until dismissed # Allow to choose "do not show" # Request input method when window is raised (or start a keyboard and kill it when lowered) # Release GPS */ public Buttons buttons = null; public void init() { buttons = new Buttons(); zavai.registry.register_service(buttons); } } }