diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e2f2a8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.lock +target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9ed6213 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "djdeck" +version = "0.1.0" +edition = "2024" + +[[bin]] +name = "djdeck" +path = "src/main.rs" + +[dependencies] +ratatui = "0.30" +crossterm = "0.29" +pipewire = "0.9.2" +symphonia = { version = "0.5", features = ["mp3", "flac", "ogg", "wav", "aac"] } +rosc = "0.10" +tokio = { version = "1", features = ["full"] } +crossbeam-channel = "0.5" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +anyhow = "1" +dirs = "5" +walkdir = "2" +parking_lot = "0.12" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[profile.release] +opt-level = 3 +lto = true +codegen-units = 1 diff --git a/README.md b/README.md index f5e94ef..7b20334 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ analyseplugin ladspa-rubberband.so ### Rust ``` -pipewire-rs (pipewire = "0.8") +pipewire-rs (pipewire = "0.9.2") symphonia (audio decoding) ratatui + crossterm rosc (OSC) diff --git a/src/audio.rs b/src/audio.rs new file mode 100644 index 0000000..ee1dcf0 --- /dev/null +++ b/src/audio.rs @@ -0,0 +1,417 @@ +/// Audio engine +/// +/// Responsibilities: +/// - Decode audio files (symphonia) into interleaved stereo f32 PCM +/// - Drive one PipeWire playback stream per deck +/// - Each stream targets the deck's filter-chain sink node +/// (djdeck.chain. input side) so all DSP happens in the graph +/// - Report position, VU, and load status back to the app via channel +/// +/// No DSP happens here. Pitch/tempo/EQ all live in the PipeWire graph. +use std::{ + path::PathBuf, + sync::{Arc, Mutex}, + thread, +}; + +use anyhow::{anyhow, Result}; +use crossbeam_channel::{bounded, Receiver, Sender}; +use symphonia::core::{ + audio::{AudioBufferRef, Signal}, + codecs::DecoderOptions, + formats::FormatOptions, + io::MediaSourceStream, + meta::MetadataOptions, + probe::Hint, +}; + +use crate::{deck::DeckId, effect_chain::EffectChain}; + +// ─── Commands ─────────────────────────────────────────────────────────────── + +#[derive(Debug)] +pub enum AudioCmd { + Load { deck: DeckId, path: PathBuf }, + Play(DeckId), + Pause(DeckId), + Stop(DeckId), + Seek { deck: DeckId, position: f32 }, + SetCue(DeckId), + GotoCue(DeckId), + // Volume is handled by wpctl; we just record it in state. + // Fader changes are applied directly from app.rs via EffectChain::set_volume. +} + +// ─── Status updates ───────────────────────────────────────────────────────── + +#[derive(Debug, Clone)] +pub enum AudioStatus { + Loaded { + deck: DeckId, + duration_samples: u64, + sample_rate: u32, + name: String, + /// PipeWire object ID of the stream node (for wpctl volume) + stream_node_id: u32, + }, + Position { deck: DeckId, position: u64 }, + Vu { deck: DeckId, l: f32, r: f32 }, + TrackEnded(DeckId), + Error { deck: DeckId, msg: String }, +} + +// ─── Per-deck internal state ────────────────────────────────────────────────── + +struct DeckState { + id: DeckId, + playing: bool, + pcm: Vec, // interleaved stereo f32 + sample_rate: u32, + read_pos: usize, // byte index into pcm (always even) + cue_pos: usize, +} + +impl DeckState { + fn new(id: DeckId) -> Self { + Self { + id, + playing: false, + pcm: Vec::new(), + sample_rate: 44100, + read_pos: 0, + cue_pos: 0, + } + } +} + +// ─── Public handle ────────────────────────────────────────────────────────── + +pub struct AudioEngine { + pub cmd_tx: Sender, + pub status_rx: Receiver, +} + +impl AudioEngine { + pub fn new() -> Result { + let (cmd_tx, cmd_rx) = bounded::(64); + let (status_tx, status_rx) = bounded::(512); + + thread::Builder::new() + .name("audio-engine".into()) + .spawn(move || { + if let Err(e) = engine_thread(cmd_rx, status_tx) { + tracing::error!("Audio engine crashed: {e}"); + } + })?; + + Ok(Self { cmd_tx, status_rx }) + } +} + +// ─── Engine thread ───────────────────────────────────────────────────────── + +fn engine_thread(cmd_rx: Receiver, status_tx: Sender) -> Result<()> { + use pipewire::{context::ContextRc, main_loop::MainLoopRc, stream::StreamRc}; + + let main_loop = MainLoopRc::new(None)?; + let context = ContextRc::new(&main_loop, None)?; + let core = context.connect_rc(None)?; + + // Shared per-deck state (written by command thread, read by PW callback) + let deck_states: Vec>> = (0..4) + .map(|i| Arc::new(Mutex::new(DeckState::new(DeckId::from_index(i).unwrap())))) + .collect(); + + // Build one PipeWire stream per deck + let mut _streams: Vec = Vec::new(); + let mut _listeners = Vec::new(); + + for (i, ds) in deck_states.iter().enumerate() { + let deck_id = DeckId::from_index(i).unwrap(); + let ds_cb = Arc::clone(ds); + let status_tx_cb = status_tx.clone(); + + // Target: the input (sink) side of this deck's filter-chain node + let target_name = format!("input.{}", EffectChain::node_name_for(deck_id)); + + let props = pipewire::properties::properties! { + *pipewire::keys::MEDIA_TYPE => "Audio", + *pipewire::keys::MEDIA_CATEGORY => "Playback", + *pipewire::keys::MEDIA_ROLE => "Music", + *pipewire::keys::APP_NAME => "djdeck", + *pipewire::keys::NODE_NAME => format!("djdeck.deck.{}", deck_id.node_suffix()), + // Route to our filter-chain sink + "node.target" => target_name, + }; + + let stream = StreamRc::new( + core.clone(), + &format!("djdeck-deck-{}", deck_id.label()), + props, + )?; + + let listener = stream + .add_local_listener_with_user_data(()) + .process(move |stream: &pipewire::stream::Stream, _| { + let mut ds_guard = ds_cb.lock(); + let ds = ds_guard.as_mut().unwrap(); + if let Some(mut buf) = stream.dequeue_buffer() { + let datas = buf.datas_mut(); + if datas.is_empty() { return; } + + let data = &mut datas[0]; + let chunk = data.chunk_mut(); + let n_frames = (chunk.size() / 8) as usize; // f32 stereo = 8 bytes + let out_bytes = data.data().unwrap(); + let out = unsafe { + std::slice::from_raw_parts_mut( + out_bytes.as_ptr() as *mut f32, + n_frames * 2, + ) + }; + + let mut vu_l = 0.0f32; + let mut vu_r = 0.0f32; + + if ds.playing && !ds.pcm.is_empty() { + for frame in 0..n_frames { + if ds.read_pos + 1 >= ds.pcm.len() { + // End of track + ds.playing = false; + out[frame * 2] = 0.0; + out[frame * 2 + 1] = 0.0; + let _ = status_tx_cb.try_send(AudioStatus::TrackEnded(ds.id)); + continue; + } + let l = ds.pcm[ds.read_pos]; + let r = ds.pcm[ds.read_pos + 1]; + ds.read_pos += 2; + out[frame * 2] = l; + out[frame * 2 + 1] = r; + vu_l = vu_l.max(l.abs()); + vu_r = vu_r.max(r.abs()); + } + } else { + out.fill(0.0); + } + + *chunk.offset_mut() = 0; + *chunk.stride_mut() = 8; + *chunk.size_mut() = (n_frames * 8) as u32; + + let pos = (ds.read_pos / 2) as u64; + let _ = status_tx_cb.try_send(AudioStatus::Position { deck: ds.id, position: pos }); + let _ = status_tx_cb.try_send(AudioStatus::Vu { deck: ds.id, l: vu_l, r: vu_r }); + } + }) + .register()?; + + // Connect the stream with F32LE stereo format + let pod_bytes = make_f32_stereo_pod()?; + let mut params = [pipewire::spa::pod::Pod::from_bytes(&pod_bytes).unwrap()]; + stream.connect( + pipewire::spa::utils::Direction::Output, + None, + pipewire::stream::StreamFlags::AUTOCONNECT + | pipewire::stream::StreamFlags::MAP_BUFFERS + | pipewire::stream::StreamFlags::RT_PROCESS, + &mut params, + )?; + + _listeners.push(listener); + _streams.push(stream); + } + + // Command processing thread — decodes files and mutates deck states + let ds_cmd = deck_states.clone(); + let st_cmd = status_tx.clone(); + thread::spawn(move || { + for cmd in cmd_rx.iter() { + handle_cmd(cmd, &ds_cmd, &st_cmd); + } + }); + + main_loop.run(); + Ok(()) +} + +fn handle_cmd( + cmd: AudioCmd, + states: &[Arc>], + status_tx: &Sender, +) { + match cmd { + AudioCmd::Load { deck, path } => { + let name = path.file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + match decode_file(&path) { + Ok((pcm, sr)) => { + let total = (pcm.len() / 2) as u64; + { + let mut ds = states[deck as usize].lock().unwrap(); + ds.pcm = pcm; + ds.sample_rate = sr; + ds.read_pos = 0; + ds.cue_pos = 0; + ds.playing = false; + } + // stream_node_id: we don't have it from here — the app queries + // pw-dump after connecting. Emit 0 as placeholder; app fills it in. + let _ = status_tx.send(AudioStatus::Loaded { + deck, + duration_samples: total, + sample_rate: sr, + name, + stream_node_id: 0, + }); + } + Err(e) => { + let _ = status_tx.send(AudioStatus::Error { deck, msg: e.to_string() }); + } + } + } + AudioCmd::Play(deck) => { + let mut ds = states[deck as usize].lock().unwrap(); + if !ds.pcm.is_empty() { + ds.playing = true; + } + } + AudioCmd::Pause(deck) => { + let mut ds = states[deck as usize].lock().unwrap(); + ds.playing = false; + } + AudioCmd::Stop(deck) => { + let mut ds = states[deck as usize].lock().unwrap(); + ds.playing = false; + ds.read_pos = 0; + } + AudioCmd::Seek { deck, position } => { + let mut ds = states[deck as usize].lock().unwrap(); + let total = ds.pcm.len(); + ds.read_pos = ((total as f32 * position.clamp(0.0, 1.0)) as usize / 2) * 2; + } + AudioCmd::SetCue(deck) => { + let mut ds = states[deck as usize].lock().unwrap(); + ds.cue_pos = ds.read_pos; + } + AudioCmd::GotoCue(deck) => { + let mut ds = states[deck as usize].lock().unwrap(); + ds.read_pos = ds.cue_pos; + } + } +} + +// ─── Symphonia decode ───────────────────────────────────────────────────────────── + +/// Decode any supported audio file into interleaved stereo f32 PCM. +fn decode_file(path: &PathBuf) -> Result<(Vec, u32)> { + let file = std::fs::File::open(path)?; + let mss = MediaSourceStream::new(Box::new(file), Default::default()); + + let mut hint = Hint::new(); + if let Some(ext) = path.extension().and_then(|e| e.to_str()) { + hint.with_extension(ext); + } + + let probed = symphonia::default::get_probe().format( + &hint, + mss, + &FormatOptions::default(), + &MetadataOptions::default(), + )?; + + let mut format = probed.format; + let track = format + .tracks() + .iter() + .find(|t| t.codec_params.codec != symphonia::core::codecs::CODEC_TYPE_NULL) + .ok_or_else(|| anyhow!("No audio track found"))? + .clone(); + + let sr = track.codec_params.sample_rate.unwrap_or(44100); + let channels = track.codec_params.channels.map(|c| c.count()).unwrap_or(2); + let mut decoder = symphonia::default::get_codecs() + .make(&track.codec_params, &DecoderOptions::default())?; + + let track_id = track.id; + let mut pcm: Vec = Vec::new(); + + loop { + let packet = match format.next_packet() { + Ok(p) => p, + Err(symphonia::core::errors::Error::IoError(_)) => break, + Err(symphonia::core::errors::Error::ResetRequired) => break, + Err(e) => return Err(e.into()), + }; + if packet.track_id() != track_id { continue; } + + let decoded = decoder.decode(&packet)?; + push_samples(&mut pcm, decoded, channels); + } + + Ok((pcm, sr)) +} + +fn push_samples(out: &mut Vec, buf: AudioBufferRef, channels: usize) { + macro_rules! push { + ($buf:expr, $scale:expr) => {{ + let frames = $buf.frames(); + for f in 0..frames { + let l = *$buf.chan(0).get(f).unwrap_or(&Default::default()); + let r = if channels >= 2 { + *$buf.chan(1).get(f).unwrap_or(&Default::default()) + } else { + l + }; + out.push(l as f32 * $scale); + out.push(r as f32 * $scale); + } + }}; + } + + match buf { + AudioBufferRef::F32(b) => push!(b, 1.0_f32), + AudioBufferRef::F64(b) => push!(b, 1.0_f32), + AudioBufferRef::S16(b) => push!(b, 1.0 / 32768.0_f32), + AudioBufferRef::S32(b) => push!(b, 1.0 / 2147483648.0_f32), + AudioBufferRef::U8(b) => push!(b, 1.0 / 128.0_f32), + _ => {} + } +} + +// ─── PipeWire format pod helper ─────────────────────────────────────────────── + +fn make_f32_stereo_pod() -> Result> { + use pipewire::spa::pod::serialize::PodSerializer; + use pipewire::spa::pod::{object, property, Value}; + use pipewire::spa::param::audio::AudioFormat; + use pipewire::spa::param::format::{FormatProperties, MediaSubtype, MediaType}; + use pipewire::spa::param::ParamType; + use pipewire::spa::utils::SpaTypes; + + let bytes = PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &Value::Object(object!( + SpaTypes::ObjectParamFormat, + ParamType::EnumFormat, + + let bytes = PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &Value::Object(object!( + SpaTypes::ObjectParamFormat, + ParamType::EnumFormat, + property!(FormatProperties::MediaType, Id, MediaType::Audio), + property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Raw), + property!(FormatProperties::AudioFormat, Id, AudioFormat::F32LE), + property!(FormatProperties::AudioRate, Int, 44100i32), + property!(FormatProperties::AudioChannels, Int, 2i32), + )), + )? + .0 + .into_inner(); + + Ok(bytes) +} \ No newline at end of file