diff --git a/nyan/__main__.py b/nyan/__main__.py index b813a22..45da744 100644 --- a/nyan/__main__.py +++ b/nyan/__main__.py @@ -2,8 +2,11 @@ import math import os +import sys import time +from audio import PCMSource from nyancat import Nyancat +from pcm import NyancatSignal _geometry = os.getenv('LEDCAT_GEOMETRY', '150x16').split('x') DISP_WIDTH = int(_geometry[0]) @@ -19,6 +22,24 @@ class NyancatWave(Nyancat): yield (DISP_HEIGHT // 2) + int(math.sin(x / 6 + t * math.pi) * 4 * math.sin(t * 8)) -cat = NyancatWave(DISP_WIDTH, DISP_HEIGHT) +if len(sys.argv) == 1 or sys.argv[1] == 'wave': + cat = NyancatWave(DISP_WIDTH, DISP_HEIGHT) +elif sys.argv[1] == 'pcm': + _split = list(sys.argv[3].split(':')) + if len(_split) != 3: + print('usage: %s pcm ::' % sys.argv[0], file=sys.stderr) + sys.exit(1) + sample_rate = int(_split[0]) + sample_bits = int(_split[1]) + num_channels = int(_split[2]) + pipe = sys.stdin.buffer if sys.argv[2] == '-' else sys.argv[2] + + signal = PCMSource(pipe, sample_rate, sample_bits, num_channels) + cat = NyancatSignal(DISP_WIDTH, DISP_HEIGHT, signal) + +else: + print('usage: %s |' % sys.argv[0], file=sys.stderr) + sys.exit(1) + while True: cat.render() diff --git a/nyan/audio.py b/nyan/audio.py new file mode 100644 index 0000000..a414d4c --- /dev/null +++ b/nyan/audio.py @@ -0,0 +1,51 @@ +import numpy.fft +import os + +class Source: + def get_spectrum(self, signal): + n = len(signal) + signal = numpy.array([(s + 1) / 2 for s in signal], dtype=float) + spectrum = numpy.abs(numpy.fft.rfft(signal)) + freqs = numpy.fft.fftfreq(spectrum.size, 1 / self.get_sample_rate()) + spectrum = spectrum[1:] + return (spectrum, freqs) + + def get_input(self): + return self.input + + def set_input(self, input): + if type(input) == str: + self.input = os.fdopen(os.open(input, os.O_RDONLY), 'rb') + else: + self.input = input + + def get_signal(self, seconds): + return [self.get_next_sample() for i in range(0, int(self.get_sample_rate() * seconds))] + + def get_next_sample(self): + pass # virtual + + def get_sample_rate(self): + pass # virtual + + +class PCMSource(Source): + def __init__(self, input_file, sample_rate, sample_bits, num_channels, sample_endianness='little', sample_sign='signed'): + assert num_channels == 1, 'no more than one channel is supported at the moment' + assert(sample_endianness == 'little' or sample_endianness == 'big') + assert(sample_sign == 'signed' or sample_sign == 'unsigned') + self.set_input(input_file) + self.sample_rate = sample_rate + self.sample_bits = sample_bits + self.sample_endianness = sample_endianness + self.sample_sign = sample_sign + + def sample_from_raw_data(self, raw_data): + intval = int.from_bytes(raw_data, self.sample_endianness, signed=self.sample_sign == 'signed') + return intval / (2 ** (len(raw_data) * 8 - 1)) + + def get_next_sample(self): + return self.sample_from_raw_data(self.get_input().read(self.sample_bits // 8)) + + def get_sample_rate(self): + return self.sample_rate diff --git a/nyan/pcm.py b/nyan/pcm.py new file mode 100644 index 0000000..4a0827f --- /dev/null +++ b/nyan/pcm.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from nyancat import Nyancat + +INTERVAL = 1 / 20 +WAVE_FACTOR = 3 + +class NyancatSignal(Nyancat): + def __init__(self, w, h, signal): + super().__init__(w, h) + self.signal = signal + self.samples_history = [[0] * int(INTERVAL * signal.sample_rate * WAVE_FACTOR)] * 6 + + def plot_tail(self, width): + # Calculate the wave + samples = self.signal.get_signal(INTERVAL) + samples_offset = 0 + smallest_diff = float('inf') + # Find a part in the new sample window that resembles the previous one. + for i in range(min(len(samples), len(self.samples_history[-1])) - width * WAVE_FACTOR): + z = zip(self.samples_history[-1][0:width], samples[i:i + width * WAVE_FACTOR]) + diff = sum(map(lambda t: (t[0] - t[1]) ** 2, z)) + if diff < smallest_diff: + samples_offset = i + smallest_diff = diff + assert samples_offset + width * WAVE_FACTOR < len(samples), 'w=%d < len=%d' % (samples_offset + width * WAVE_FACTOR, len(samples)) + self.samples_history.append(samples[samples_offset:samples_offset + width * WAVE_FACTOR]) + self.samples_history.pop(0) + + # Render the tail + for x in range(width): + amplitude = sum(map(lambda hist: hist[x * WAVE_FACTOR], self.samples_history)) / len(self.samples_history) * 3 + yield int((amplitude * .5 - .5) * self.height) + self.height