#!/usr/bin/env python3 # Based on material from these URLs: # # - https://github.com/audiohacked/OpenCorsairLink/issues/70 # # - https://github.com/Legion2/CorsairLightingProtocol # # and my own bits of trial and error. I'd not use this for anything important, # but it does seem to work. import argparse import hidraw import logging import sys import time def argparse_list(delim=',', t=lambda x: x, opts=None): def f(arg): xs = list(map(t,arg.split(delim))) if opts is not None: for x in xs: if x not in opts: raise ValueError() return xs return f class HiddevWrapper(): def __init__(self, device, vend=0x1B1C, prod=0x0C10, sn=None, iface=0): self.device = device self.vend = vend self.prod = prod self.sn = sn self.iface = iface def __enter__(self): self.dev = hidraw.device() if self.device is not None: self.dev.open_path(bytes(self.device, "utf8")) else: for d in hidraw.enumerate(): logging.debug("Looking at v=%s pr=%s sn=%s if=%s p=%s", d['vendor_id'], d['product_id'], d['serial_number'], d['interface_number'], d['path']) if d['vendor_id'] != self.vend : continue if d['product_id'] != self.prod : continue if d['interface_number'] != self.iface: continue if self.sn is not None and d['serial_number'] != self.sn : continue self.dev.open_path(d['path']) break else: raise ValueError("Device not found") return self.dev def __exit__(self, type, value, traceback): self.dev.close() def auto_int(x): return int(x, 0) def devw(dev, buf): logging.debug("SEND: %r" % buf) return dev.write([0] + buf) def devwr(dev, buf): devw(dev, buf) x = dev.read(16) logging.debug("RECV: %r" % x) return x def do_info(dw, _, cliargs): with dw as dev: print("Manufacturer: %s" % dev.get_manufacturer_string()) print("Product: %s" % dev.get_product_string()) print("Serial No: %s" % dev.get_serial_number_string()) r = devwr(dev,[0x02]) print("Firmware revision %d.%d.%d" % (r[1], r[2], r[3])) r = devwr(dev,[0x06]) print("Bootloader revision: %d.%d" % (r[1], r[2])) r = devwr(dev,[0x03]) print("Device identity: %r" % r[1:5]) # Unit communicates in 100ths of degree C def decode_temp(temp): return temp[0] * 256 + temp[1] def encode_temp(temp): return (int(temp/256), int(temp%256)) # Report all sensor state def do_sensor(dw, _, cliargs): with dw as dev: ts_pres = devwr(dev, [0x10]) for i in range(4): if ts_pres[i+1] != 0: temp = devwr(dev, [0x11, i]) temp = decode_temp(temp[1:3]) print("Temp %d deg C: %d.%02d" % (i, int(temp/100), temp%100)) v = devwr(dev, [0x12, 0]) v = v[1] * 256 + v[2] print("12V rail: %d.%03d" % (int(v/1000), v%1000)) v = devwr(dev, [0x12, 1]) v = v[1] * 256 + v[2] print("5V rail: %d.%03d" % (int(v/1000), v%1000)) v = devwr(dev, [0x12, 2]) v = v[1] * 256 + v[2] print("3.3V rail: %d.%03d" % (int(v/1000), v%1000)) fan_pres = devwr(dev, [0x20]) for i in range(6): if 1 <= fan_pres[i+1] <= 2: fan = devwr(dev, [0x21, i]) fan = fan[1] * 256 + fan[2] fanp = devwr(dev, [0x22, i]) print("Fan %d type: %d; rpm: %d; pwr: %d" % (i, fan_pres[i+1], fan, fanp[1])) elif fan_pres[i+1] != 0: print("Fan %d unknown type %d" % (i, fan_pres[i+1])) # An extremely brief printout of thermistor readings def do_therms(dw, _, cliargs): ts = [] with dw as dev: ts_pres = devwr(dev, [0x10]) for i in range(4): if ts_pres[i+1] != 0: temp = devwr(dev, [0x11, i]) ts += [ "%.2f" % (decode_temp(temp[1:3]) / 100) ] else: ts += [ "-----" ] print((" ").join(ts)) # Set the power level for one or more fans def do_fanpwr(dw, _, cliargs): argp = argparse.ArgumentParser() argp.add_argument('fans', type=argparse_list(',',int,range(6))) argp.add_argument('power', type=int) args = argp.parse_args(cliargs) if not (0 <= args.power <= 100): raise ValueError("Fan power between 0 and 100%, please") with dw as dev: for fan in args.fans: devwr(dev, [0x23, fan, args.power]) # Set the temperature-controlled fan RPM target curve. Takes the fans to set, # the thermistor to watch, six temperature points, and six RPM targets. # # Example: fancurve 0,3 0 35 40 43 45 47 50 0 1500 2500 3500 4500 5000 def do_fancurve(dw, _, cliargs): argp = argparse.ArgumentParser() argp.add_argument('fans', type=argparse_list(',',int,range(6))) argp.add_argument('thermocouple', type=int, choices=[0,1,2,3,0xFF]) argp.add_argument('temps', type=float, nargs=6) argp.add_argument('targets', type=int, nargs=6) args = argp.parse_args(cliargs) for t in args.temps: if not 0 <= t <= 655.35: raise ValueError("Bad temperature %f" % t) for t in args.targets: if not 0 <= t <= 2**15: raise ValueError("Bad rpm %" % d) with dw as dev: for fan in args.fans: logging.info("Set fan %d to thermocouple %d using %r %r" % (fan, args.thermocouple, args.temps, args.targets)) data = [0x25, fan, args.thermocouple] data += [x for p in map(lambda t: encode_temp(t*100), args.temps) for x in p] data += [x for p in map(lambda t: (int(t/256),int(t%256)), args.targets) for x in p] devw(dev, data) # Turn off a LED channel def do_ledoff(dw, _, cliargs): argp = argparse.ArgumentParser() argp.add_argument('channel', type=int, choices=[0,1]) args = argp.parse_args(cliargs) with dw as dev: devwr(dev, [0x37, args.channel]) # clear groups devwr(dev, [0x34, args.channel]) # clear LEDs devwr(dev, [0x38, args.channel, 0x00]) # mode is disabled devwr(dev, [0x33, 0x00]) # LED trigger # Of course, we can also fling raw bytes at the device. # # FAN setup: # # SET Detection type FAN TYPE (0=auto, 1=3pin, 2=4pin) # 0x28 0x02 5 0 # # LED initialize: # 0x37 0x00 # clear groups # 0x34 0x00 # clear LEDs # 0x39 0x00 0x64 # maximum brightness # 0x3B 0x00 0x01 # WS2812B mode # 0x38 0x00 0x01 # hardware playback mode # 0x33 0x00 # LED trigger # # LEDs as temperature reactive displays: # SET CHAN STRIP TYPE MODE SPEED DIR XTRA GROUP RGB 1 RGB 2 RGB 3 T1 T2 T3 # (38 C) (40 C) (45 C) # 0x35 0 0 0x0A 5 0 0 0 1 0 0 0 128 128 0 255 0 0 14 216 15 160 17 148 # Or turn it off: # 0x35 0 0 0x0A 0 0 0 0 0xFF 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 def do_raw(dw, _, cliargs): xs = [] for s in cliargs: vl = list(map(auto_int,s.split())) if any(map(lambda x: not (0x00 <= x <= 0xFF), vl)): print("Invalid byte",file=sys.stderr) sys.exit(1) if len(vl) > 63: print("Refusing to send more than 63 bytes in %s" % s,file=sys.stderr) sys.exit(1) xs += [ vl ] with dw as dev: for x in xs: print(devwr(dev, x)) cmds = { "info": do_info, "sensor": do_sensor, "therms": do_therms, "fanpwr": do_fanpwr, "fancurve": do_fancurve, "ledoff": do_ledoff, "raw": do_raw } argp = argparse.ArgumentParser(description="nwf's corsair fiddly tool") argp.add_argument("--log-level", help="Set the logging level. Defaults to ERROR", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="ERROR") argp.add_argument("--device", type=str) argp.add_argument("command", choices=cmds.keys()) argp.add_argument('remainder', nargs=argparse.REMAINDER) args = argp.parse_args() logging.basicConfig(level=logging.getLevelName(args.log_level), format="%(message)s") cmds[args.command](HiddevWrapper(args.device), args.command, args.remainder)