require 'cmacrovalue' require 'libusb_c' require 'json' # A module for interacting with USB devices in a generic way. This library # especially includes support for discovering HID interfaces. For some examples # check out map_example.rb and mystery_data.rb. Here one to get you started # through: # # require 'libusb' # # # We want to find a device that supports the Keyboard k key. Then we'll listen # # on this device for events and print out a message when K is pressed # # # This creates a usage object representing the K key # k_usage = USB::Usage.find_usage_named(/Keyboard k/) # # usable_interface = nil # # search through all usb devices # hid_device = USB::devices.detect { |d| # # get the current configuration # config = d.current_configuration # # #get all the interfaces within that configuration # config.interfaces.each { |i| # #find a hid interface that supports the k key # if(i.hid? and i.current_alternative?) # # we must detach the device from the kernel to get its descriptor # i.detach_kernel # usable_interface = i if (i.all_input_usages.include?(k_usage)) # end # } # # # if we've found a usable interface, stop otherwise search the next device # usable_interface # } # # raise "Could not find a suitable device" if usable_interface.nil? # puts "Using device #{usable_interface.device.product_name}" # # # listen for incoming ReportData objects # usable_interface.listen_for_hid_interrupts { # while(1) # report_data = usable_interface.next_interrupt # value_for_k = report_data.get_usage_value(k_usage) # if(value_for_k.nil?) # puts "Datagram did not contain k key. Got these usages instead #{report_data.set_usages.inspect}" # else # puts "*****K set to value #{value_for_k}*****" # end # end # } # # # There's quite a few classes here so let me break it down a bit. # # *Device* represents a particular USB device, although this device might have # several different functions (for example a combination scanner printer # keyboard massage chair). Use USB.devices or USB.device_matching to find these # devices. Devices contain information that lets you find a particular device - # strings and numbers that represent the vendor and product. Devices # contain Configuration objects. # # *Configuration* represents a particular mode that a USB device might be in. # In theory, a device could have several modes where it acts like something # completely different (say a mouse mode and a scanner mode). Slighly more # likely is a low-power mode and a high power mode with more features. These # modes are always mutally exclusive and very few devices have them. Most of # the time you should be able to get away with just calling # current_configuration from the Device object and ignoring the others. # Configuration objects contain Interface objects. # # *Interface* represents a particular function (like a scanner, or a hub, or a # human interface device). Interfaces have some data that lets you find out # what sort of feature they are (interface_class in particular) and often have # subdescriptors that give you more information about this particular feature. # The only type that's well supported right now is Human Interface Devices # (HIDs). Most hid specific functions are in the Interface class. Interfaces # contain Endpoint objects which are mechanisms for communicating with the USB # device. # # *Endpoint* is a class representing a channel of communication to a USB device. # There are various types of endpoints defined in the spec but the only one well # supported right now are Interrupt Input endpoints (of particular use for HID # devices). # # *ReportDescriptor* is something USB HID devices have, and it's accessed by # using the hid_descriptor methods in the Interface class. A hid_descriptor # defines several Report objects that the USB HID device might send to or # receive from the host. # # *Report* is a part of a hid descriptor. A report can have both an input and # output component. Reports contain functions for parsing datapackets from the # hid device (althogh these are usually best accessed indirectly through the # ReportData object) and also InputOutputFeature objects which contain metadata # about each element in the Report. # # *InputOutputFeature* is a single set of releated fields within a report. They # can have metadata with regard to the size and contents of the report. One of # the most useful bits of metadata is Usage objects, which correspond to # particular tiny features (like a button for the letter K, or a Battery led) # that the device supports. Usage meanings are defined in the HID spec. # # *Usage* objects represent usages and give some handy methods for parsing them. # # *ReportData* represents a single data packet from a USB hid device. It has # functions for accessing that data (parsed according to the ReportDescriptor). # These objects are created by the Interface method listen_for_hid_interrupts # # *UsageListener* is a small handy class for people who want to listen for a # partciular set of usages from a HID device. # # # Copyright 2006 Michael Hewner module USB @@devices = nil # Get the current USB devices. Note that this does not reload the device list # on subsquent calls - if you want and updated picture of USB devices you # should call load_devices to manually reload them def USB.devices load_devices if(@@devices.nil?) return @@devices end # Returns a device matching the specified vendor and product id, otherwise and # exception is thrown def USB.device_matching(vendor_id, product_id) result = USB::devices.detect {|d| d.vendor_id == vendor_id and d.product_id == product_id} raise UsbException("No device matching vendor id #{vendor_id} and product_id #{product_id} found") if result.nil? return result end # A class representing a USB Device attached to the system. In USB, devices # have 1 or more configurations but only one can be active at a particular # time. To get device objects, call USB::devices which will return all # available devices on the system. To find a particular device, it's usual # to look for a particular Device::vendor_id Device::product_id combination. # For human readable searching, Device::product_name and Device::manufacturer # might be useful. # # An example: # # require 'libusb' # # devices = USB::devices(); # devices.each { |d| # puts "Device vendor: #{d.vendor_id} product: #{d.product_id}" # } class Device def Device.create_cmacrovalue(value, map_name) #:nodoc: CMacroValue.new(value, @@macro_maps[map_name]) end # The device class has maps of constants that are used interally and passed # around as CMacroValue constants. This lets you access these maps (most of # the time there should be no reason to do this though). The # maps are stored as a hash of map names, containings maps betweens # constants and their values def Device.macro_maps return @@macro_maps end # The device class has maps of constants that are used interally and passed # around as CMacroValue constants. Normally this isn't a problem but occasonally # it's useful to find a value for a particular constant. # # USB::Device::get_value_for("data_item_flags", "InputOutputFeature::NONVOLATILE") def Device.get_value_for(map_name, value_string) return @@macro_maps[map_name].invert[value_string] end # Returns all the configurations available for this USB device. Normally a # device only has one configuration but occasonally more are possible. Only # one configuration is active at one time. def configurations @configurations = load_configurations if @configurations.nil? return @configurations end # Returns the current configuration object. Not that this requires # communication with the device so it can be (slightly) expensive. The # current configuration stores information about the power chacteristics of the # device as well as interfaces to the device. def current_configuration current_configuration_val = current_configuration_value return configurations.detect { |c| c.value == current_configuration_val } end # Returns all interfaces available in all configurtions for this device. # Bear in mind that some interfaces might not be currently active, either # because they belong to an inactive configuration or because they are not a # currently active alternative interface. def interfaces interface_lists = configurations.collect {|each| each.interfaces} interface_lists.flatten! return interface_lists end # Returns all endpoints available in all configurations and interfaces. # Some endpoints might not be active because they are part of an inactive # configuration or inactive alternate interface. def endpoints endpoint_lists = interfaces.collect { |each| each.endpoints } endpoint_lists.flatten! return endpoint_lists end # Returns true if any interface on the device, in any configuration or # alternate interface, is a human interface device. Note that most useful # human interface device related functions are within the Interface object def hid? interfaces.each {|i| if(i.interface_class.string_value == "USB_CLASS_HID") return true; end } return false; end end # All data that comes from or to a USB device passes through an endpoint. All # devices have an implict Endpoint 0 Control endpoint where all the predefined # USB requests flow and can define a number of additional endpoints (15 input # and 15 ouput for full/high speed devices or 2 for low speed devices). # # Currently ruby usb only has support for interrupt in endpoints. Hopefully # more endpoint support will be coming soon! # # mystery_device = USB.device_matching(0x046e, 0x530a) # endpoint = mystery_device.endpoints.detect { |e| # e.type.string_value == "USB_ENDPOINT_TYPE_INTERRUPT" and # e.input? } # endpoint.interface.detach_kernel # endpoint.listen_for_interrupts { # while(1) # puts endpoint.next_interrupt.unpack("B*") # end # } class Endpoint attr_reader :interface def to_s direction = "" direction = "input" if input? direction = "output" if output? return "Type #{type} #{direction} Number #{number} MaxPacSize #{max_packet_size} interval #{interval}" end # This is the preferred way to listen for interrupts. It take a block which is passed # the endpoint. The device is kept open for the block and then closed when # block ends. # # Internally, a C thread is spawned which continiously waits for interrupts # and puts them in a queue. Therefore, there is no danger of interrupt # values being "lost" if not handled immediately. If interrupts are not # removed from the queue with next_interrupt however, they will consume # memory. def listen_for_interrupts begin start_interrupt_listening yield(self) ensure stop_interrupt_listening end end # Blocks until the next interrupt is available and then returns it as a ruby # character string. # # This is the preferred way to get interrupt data and should be used within # a listen_for_interrupts block. It is possible to call next interrupt # after a listen_for_interrupts block has ended (it will return interrupts # that were in the queue but never processed). Obviously though, if you # call next_interrupt outside of a listen_for_interrupt block and the queue # is empty, it will block forever. def next_interrupt while(1) val = next_interrupt_or_nil return val unless val.nil? end end end # Class representing the configurations available to a particular USB device. # Only one configuration can be active at a time. Some USB devices let your # change thc current configuration programmmatically class Configuration attr_reader :device # Returns true if this is the currently active configuration. If it's not, # don't attempt to use this configuration. This call requires communication # with the device def current_configuration? return value == @device.current_configuration end # Gets all the interfaces in the configuration def interfaces @interfaces = load_interfaces if @interfaces.nil? return @interfaces; end def to_s return "" end end # A interface contains data about a certian function or feature that a device # implements (like say being a HUB or a human interface device. A particular # configurtion may have several different interfaces, and some might not be # comptable with other. These is represented by the current_alternative # field, which selects between multiple interfaces with the same interface id. # # Each interface might have several endpoints that correspond to the # interface's function. class Interface attr_reader :device, :configuration # Returns true if this interface is a human interface device # Human interface devices have report descriptors that decide # the format of their data def hid? return (interface_class.string_value == "USB_CLASS_HID") end def to_s return "Interface #{interface_string} class #{interface_class} subclass #{subclass} protocol #{protocol} alternate setting #{alternate_setting} number #{number} is usable #{usable?}" end # returns true if the device is the currently selected alternative for # all devices in the configuration with the same interface id def current_alternative? return current_alternative == alternate_setting end # Returns true if this interface is in the currently active configuration # and it;s the current alternative def usable? return (current_alternative? and @configuration.current_configuration?) end # For a hig interface, this returns an ReportDescriptor object, which is a # parsed representation of the HID report that the device returns def hid_report_descriptor raise(UsbException, "Not a hid interface") unless hid? @report_descriptor = load_hid_report_descriptor() if @report_descriptor.nil? return @report_descriptor end # All the report objects from the hid_report_descriptor. A device may have # serval reports, all with different report ids def hid_reports return hid_report_descriptor.reports end # This corresponds to the listen_for_interrupts function in the Endpoint # class. Within the block the device is open and interrupts are being # recorded into a queue by a seperate thread. You can get the next # interrupt by using the next_interrput function. # # This function listens on the endpoint returned by hid_interrupt_input def listen_for_hid_interrupts raise(UsbException, "Not a HID interface") unless hid? endpoint = hid_interrupt_input raise(UsbException, "Cannot find input interrupt endpoint for this HID") if(endpoint.nil?) begin endpoint.start_interrupt_listening yield(self) ensure endpoint.stop_interrupt_listening end end # Returns the first interrupt input endpoint in the interface, which I # assume to be the correct interrupt for HID reports def hid_interrupt_input raise(UsbException, "Not a HID interface") unless hid? return endpoints.detect {|i| i.type.string_value == "USB_ENDPOINT_TYPE_INTERRUPT" and i.input? } end # Returns a ReportData object which represents a single data packet sent # from the device. Should be called from within a listen_for_hid_interrputs # block def next_interrupt return ReportData.new(hid_interrupt_input.next_interrupt, hid_report_descriptor) end # All the endpoints for this interface def endpoints @endpoints = load_endpoints if @endpoints.nil? return @endpoints end # Returns all the usages specified within the hid interface's input reports def all_input_usages raise(UsbException, "Not a HID interface") unless hid? return hid_reports.collect { |r| r.inputs.collect { |i| i.all_usages }}.flatten end end # A small class that lets you easily write a program that registers interest in # particular usages and then gets notified when they change. He's a program # that registers input in the "f" keyboard press: # # require 'libusb' # # hid_device = USB::devices.select { |d| d.hid? }[0] # hid_interface = hid_device.interfaces { |i| i.hid? }[0] # hid_interface.detach_kernel # listener = USB::UsageListener.new(hid_interface) # usage = USB::Usage.find_usage_named(/Keyboard f/) # puts "listening for usage #{usage}" # listener.on_usage_value_change(usage) { |new_val| puts "New F value: #{new_val}" } # listener.start_listening class UsageListener # Takes the hid interface you want to listen to def initialize(interface) @interface = interface @usages_to_watch = Hash.new end # Here you register a listener block for a particular usage def on_usage_value_change(usage, &block_to_execute) @usages_to_watch[usage] = [nil, block_to_execute] end # Once start_listening is called (this is blocking BTW) your callbacks will # be called whenever an interrupt corresponding to them comes from the # device def start_listening @interface.listen_for_hid_interrupts { | interface | while(1) data = interface.next_interrupt @usages_to_watch.each_pair { |usage, command| usage_val = data.get_usage_value(usage) if(usage_val != command[0]) command[0] = usage_val command[1].call(usage_val) end } end } end end # Represents Usages, which is the mechanism USB uses to register what kind of # input/output a human interface device supports class Usage def to_s return "#{usage_page_name} (#{usage_page})::#{usage_name} (#{usage})" return "(#{usage_page})::(#{usage})" end def ==(other_usage) return (extended_usage == other_usage.extended_usage) end # Returns a usgae matching the regex you pass, assumming one can be found in # usages.json def Usage.find_usage_named(regex) file = File.new("usages.json") string = file.readlines.join("") result = SafeJSON.parse(string) strings_list = [] result.each_pair { |page_num, page| page.each_pair { |usage_num, usage_name| strings_list << [page_num.to_i(16), usage_num.to_i(16), "#{page["name"]}::#{usage_name}"] unless usage_num == "name"} } usage_nums = strings_list.detect { |each| each[2] =~ regex } return Usage.create_usage(usage_nums[0], usage_nums[1]); end end # A report descriptor represents the entire descriptor returned from a HID # device. Most data is contained within the reports themselves though class ReportDescriptor # If there is only one report and it does not specify an id, data passed # from the usb device can be sent without an report_id prefix def uses_report_id? return false if(reports().size == 1 and reports()[0].id == 0) return true end # Get the Repord objects associated with this descriptor def reports @reports = load_reports if @reports.nil? return @reports end # Get the report with the given report_id. This raises a USBException if # the report is not found def get_report(id) report = reports.detect { |r| r.id == id } raise(UsbException, "No report with id #{id}") if report.nil? return report end end # This class represents the Input, Output, or Feature mainitems within a USB # Report descriptor. They determine the details of a report's structure and # content. There is usually not a lot of need to use this class directly - # much happens implictly through the ReportData object class InputOutputFeature # Returns an array of pairs [Usage, value] corresponding to the binary # string that is passed. Note that for array items, all possible usages # might not be in this resultant list def set_usages_for_data(binary_string) result = [] i = 0 values_for_data(binary_string).each { |value| if(array?) result << [usage_for_field(value), 1] if value > 0 else result << [usage_for_field(i), value] end i = i + 1 } return result end # Returns an array of numbers corresponding to the integer values of each # seperate segment of the input. Negative integers are not currently # supported (yet) def values_for_data(binary_string) return (0...report_count).collect { |i| binary_string[i*report_size, report_size].reverse.to_i(2) } end # Returns true if the device has one of the following characteriscs (passed # as a string: # InputOutputFeature::DATA, InputOutputFeature::CONSTANT, # InputOutputFeature::ARRAY, InputOutputFeature::VARIABLE, # InputOutputFeature::ABSOLUTE, InputOutputFeature::RELATIVE, # InputOutputFeature::NO_WRAP, InputOutputFeature::WRAP, # InputOutputFeature::LINEAR, InputOutputFeature::NONLINEAR, # InputOutputFeature::PREFERRED_STATE, InputOutputFeature::NO_PREFERRED_STAT, # InputOutputFeature::NULL_STATE, InputOutputFeature::NO_NULL_STATE, # InputOutputFeature::NONVOLATILE, InputOutputFeature::VOLATILE, # InputOutputFeature::BIT_FIELD, InputOutputFeature::BUFFERED_BYTES, def flag_set?(flag_string) return flag_set_internal?(USB::Device::get_value_for("data_item_flags", flag_string)) end # Returns true if the data item is a array item. false if it is a variable # item def array? return flag_set?("InputOutputFeature::ARRAY") end end # Represents a single data report sent from a device to the host class ReportData # Takes a character string of input data and the report descriptor # corresponding to the device that created it def initialize(data, descriptor) @binary_string = data.reverse.unpack("B*")[0].reverse; @descriptor = descriptor end # True if this data packet was prefixed with a report_id def report_id? return @descriptor.uses_report_id? end # Report_id for this packet def report_id raise UsbException, "Attempt to get report_id for a packet that did not have a report_id" unless report_id? return @binary_string[0,8].reverse.to_i(2) end # Retruns a binary string corresponding to the data portion of the packet # (without the report_id) def data_part if(report_id?) return @binary_string[8..@binary_string.size] else return @binary_string end end # The report within the report descriptor that describes this data's # formatting def report if(report_id?) @descriptor.get_report(report_id) end return @descriptor.reports[0] end # Returns a list of pairs [Usage, value] containing all the usages that are # currently set in this report and thier values. Note that this list # contents can vary, based on the contents of array items in the report def set_usages index = 0 result = [] report.inputs.each {|input| result |= input.set_usages_for_data(data_part[index, input.length]) index = index + input.length } return result end #Get the value of a particular usage, or return nil if it is not set in this #data packet def get_usage_value(usage) set_usages.each { |i| return i[1] if i[0] == usage } return nil end # Return an array corresponding to the integral values of the data retruned. # This can be useful for undertsanding a device's format def values index = 0 result = [] report.inputs.each {|input| result |= input.values_for_data(data_part[index, input.length]) index = index + input.length } return result end def to_s return "Report #{report_id? ? report_id : ""} data: [#{values.join(", ")}]" end end end USB::load_devices