Loading lighting.py +51 −35 Original line number Original line Diff line number Diff line Loading @@ -3,49 +3,63 @@ from time import sleep from time import sleep from random import randint from random import randint import configparser import configparser from threading import Thread import zmq import zmq import spidev import spidev import ws2812 import ws2812 class Color: green: int red: int blue: int def as_tuple(self): return self.green, self.red, self.blue class ColorManager(Thread): current_rand = 1 current_color = Color(0, 0, 0) activeFlag = True activeFlag = True c = 0 spi = spidev.SpiDev() rand = 1 r = 255 def __init__(self): super().__init__() def get_color(): self.spi.open(1, 0) if activeFlag == True : global c def get_color(self): global r if self.current_color.green + self.current_rand > 80: global rand self.current_rand = -randint(1, 8) if c+rand > 80: self.current_color = Color(80, 255, 0) rand = randint(1, 8)*(-1) elif self.current_colorgreen + self.current_rand < 0: return(80, 255, 0) elif c+rand < 0: rand = randint(1, 8) rand = randint(1, 8) return(0, 255, 0) self.current_color = Color(0, 255, 0) else: else: c = c + rand self.current_color.green += rand return (c, 255, 0) self.current_color.red = 255 self.current_color.blue = 0 def fade(self): if self.current_color.red > 0: self.current_color.red -= 1 if self.current_color.green > 0: self.current_color.green -= 1 def start_fade(self): self.activeFlag = False def run(self): while self.is_alive(): if self.activeFlag: self.get_color() else: else: fade() self.fade() ws2812.write2812(self.spi, [self.current_color.as_tuple() * 4]) def light_flame(): spi = spidev.SpiDev() spi.open(1, 0) while True: ws2812.write2812(spi, [get_color() * 4]) sleep(0.001) sleep(0.001) def fade(): if r > 0: r-- if c > 0: c-- return (c, r, 0) def main(): def main(): config = configparser.ConfigParser({'host': '10.10.10.1', 'port': '5556'}) config = configparser.ConfigParser({'host': '10.10.10.1', 'port': '5556'}) Loading @@ -64,18 +78,20 @@ def main(): topicfilter = "events" topicfilter = "events" socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) cm = ColorManager() while True: while True: string = socket.recv() string = socket.recv() topic, messagedata = string.split() topic, messagedata = string.split() print(topic, messagedata) print(topic, messagedata) if messagedata == b"liftoff": if messagedata == b"liftoff": light_flame() cm = ColorManager() cm.start() elif messagedata == b"stage_1_separation": elif messagedata == b"stage_1_separation": light_flame() pass elif messagedata == b"stage_2_separation": elif messagedata == b"stage_2_separation": light_flame() cm.start_fade() activeFlag = False else: else: print("Unhandled message") print("Unhandled message") Loading Loading
lighting.py +51 −35 Original line number Original line Diff line number Diff line Loading @@ -3,49 +3,63 @@ from time import sleep from time import sleep from random import randint from random import randint import configparser import configparser from threading import Thread import zmq import zmq import spidev import spidev import ws2812 import ws2812 class Color: green: int red: int blue: int def as_tuple(self): return self.green, self.red, self.blue class ColorManager(Thread): current_rand = 1 current_color = Color(0, 0, 0) activeFlag = True activeFlag = True c = 0 spi = spidev.SpiDev() rand = 1 r = 255 def __init__(self): super().__init__() def get_color(): self.spi.open(1, 0) if activeFlag == True : global c def get_color(self): global r if self.current_color.green + self.current_rand > 80: global rand self.current_rand = -randint(1, 8) if c+rand > 80: self.current_color = Color(80, 255, 0) rand = randint(1, 8)*(-1) elif self.current_colorgreen + self.current_rand < 0: return(80, 255, 0) elif c+rand < 0: rand = randint(1, 8) rand = randint(1, 8) return(0, 255, 0) self.current_color = Color(0, 255, 0) else: else: c = c + rand self.current_color.green += rand return (c, 255, 0) self.current_color.red = 255 self.current_color.blue = 0 def fade(self): if self.current_color.red > 0: self.current_color.red -= 1 if self.current_color.green > 0: self.current_color.green -= 1 def start_fade(self): self.activeFlag = False def run(self): while self.is_alive(): if self.activeFlag: self.get_color() else: else: fade() self.fade() ws2812.write2812(self.spi, [self.current_color.as_tuple() * 4]) def light_flame(): spi = spidev.SpiDev() spi.open(1, 0) while True: ws2812.write2812(spi, [get_color() * 4]) sleep(0.001) sleep(0.001) def fade(): if r > 0: r-- if c > 0: c-- return (c, r, 0) def main(): def main(): config = configparser.ConfigParser({'host': '10.10.10.1', 'port': '5556'}) config = configparser.ConfigParser({'host': '10.10.10.1', 'port': '5556'}) Loading @@ -64,18 +78,20 @@ def main(): topicfilter = "events" topicfilter = "events" socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) cm = ColorManager() while True: while True: string = socket.recv() string = socket.recv() topic, messagedata = string.split() topic, messagedata = string.split() print(topic, messagedata) print(topic, messagedata) if messagedata == b"liftoff": if messagedata == b"liftoff": light_flame() cm = ColorManager() cm.start() elif messagedata == b"stage_1_separation": elif messagedata == b"stage_1_separation": light_flame() pass elif messagedata == b"stage_2_separation": elif messagedata == b"stage_2_separation": light_flame() cm.start_fade() activeFlag = False else: else: print("Unhandled message") print("Unhandled message") Loading