From 9cfe3c71410a0d3d74c078e7bb58a7ce039871d6 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 12:31:25 +0200 Subject: [PATCH 01/13] Add a RotatingKVCache. --- candle-nn/src/kv_cache.rs | 176 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index eb5dbfdb59..422bf112e9 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -145,3 +145,179 @@ impl KvCache { self.v.reset(); } } + +#[derive(Debug, Clone)] +pub struct RotatingCache { + all_data: Option, + dim: usize, + // `offset` is the current write index in the buffer + offset: usize, + // The total size of the sequence seen so far. + current_seq_len: usize, + // max_seq_len is the size of the rotating buffer, it is actually allowed for the full + // sequence to grow past this limit. + max_seq_len: usize, +} + +impl RotatingCache { + pub fn new(dim: usize, max_seq_len: usize) -> Self { + Self { + all_data: None, + dim, + offset: 0, + current_seq_len: 0, + max_seq_len, + } + } + + pub fn offset(&self) -> usize { + self.offset + } + + pub fn dim(&self) -> usize { + self.dim + } + + pub fn current_seq_len(&self) -> usize { + self.current_seq_len + } + + pub fn max_seq_len(&self) -> usize { + self.max_seq_len + } + + pub fn all_data(&self) -> &Option { + &self.all_data + } + + pub fn current_data(&self) -> Result> { + let data = match self.all_data.as_ref() { + None => None, + Some(d) => { + if self.current_seq_len >= self.max_seq_len { + Some(d.clone()) + } else { + Some(d.narrow(self.dim, 0, self.current_seq_len)?) + } + } + }; + Ok(data) + } + + pub fn reset(&mut self) { + self.offset = 0; + self.current_seq_len = 0; + self.all_data = None; + } + + pub fn append(&mut self, src: &Tensor) -> Result<()> { + let seq_len = src.dim(self.dim)?; + // This doesn't seem very idiomatic but because the creation can fail, it's tricky to use + // self.all_data.get_or_insert_with. + if self.all_data.is_none() { + let mut shape = src.dims().to_vec(); + shape[self.dim] = self.max_seq_len; + let ad = Tensor::zeros(shape, src.dtype(), src.device())?; + self.all_data = Some(ad) + }; + let ad = self.all_data.as_mut().unwrap(); + + if seq_len >= self.max_seq_len { + let src = src.narrow(self.dim, seq_len - self.max_seq_len, self.max_seq_len)?; + ad.slice_set(&src, self.dim, 0)?; + self.offset = 0; + } else { + let rem_len = self.max_seq_len - self.offset; + if rem_len <= seq_len { + ad.slice_set(src, self.dim, self.offset)?; + self.offset = (self.offset + seq_len) % self.max_seq_len; + } else { + // We have to make two copies here as we go over the boundary of the cache. + if rem_len > 0 { + let src1 = src.narrow(self.dim, 0, rem_len)?; + ad.slice_set(&src1, self.dim, self.offset)?; + } + let src2 = src.narrow(self.dim, rem_len, seq_len - rem_len)?; + ad.slice_set(&src2, self.dim, 0)?; + self.offset = seq_len - rem_len; + } + } + self.current_seq_len += seq_len; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RotatingKvCache { + k: RotatingCache, + v: RotatingCache, +} + +impl RotatingKvCache { + pub fn new(dim: usize, max_seq_len: usize) -> Self { + let k = RotatingCache::new(dim, max_seq_len); + let v = RotatingCache::new(dim, max_seq_len); + Self { k, v } + } + + pub fn k_cache(&self) -> &RotatingCache { + &self.k + } + + pub fn v_cache(&self) -> &RotatingCache { + &self.v + } + + pub fn k_cache_mut(&mut self) -> &mut RotatingCache { + &mut self.k + } + + pub fn v_cache_mut(&mut self) -> &mut RotatingCache { + &mut self.v + } + + pub fn k(&self) -> Result> { + self.k.current_data() + } + + pub fn v(&self) -> Result> { + self.v.current_data() + } + + pub fn append(&mut self, k: &Tensor, v: &Tensor) -> Result<(Tensor, Tensor)> { + self.k.append(k)?; + self.v.append(v)?; + let out_k = self.k.current_data()?; + let out_v = self.v.current_data()?; + let k = match out_k { + None => { + let mut shape = k.dims().to_vec(); + shape[self.k.dim] = 0; + Tensor::zeros(shape, k.dtype(), k.device())? + } + Some(k) => k, + }; + let v = match out_v { + None => { + let mut shape = v.dims().to_vec(); + shape[self.k.dim] = 0; + Tensor::zeros(shape, v.dtype(), v.device())? + } + Some(v) => v, + }; + Ok((k, v)) + } + + pub fn offset(&self) -> usize { + self.k.offset() + } + + pub fn current_seq_len(&self) -> usize { + self.k.current_seq_len() + } + + pub fn reset(&mut self) { + self.k.reset(); + self.v.reset(); + } +} From 1bddd44cb8dd968597dbbf087acfbe8d7d778b5e Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 12:44:09 +0200 Subject: [PATCH 02/13] Add some KvCache tests. --- candle-nn/tests/kv_cache.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 candle-nn/tests/kv_cache.rs diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs new file mode 100644 index 0000000000..c692a5baf1 --- /dev/null +++ b/candle-nn/tests/kv_cache.rs @@ -0,0 +1,27 @@ +#[cfg(feature = "mkl")] +extern crate intel_mkl_src; + +#[cfg(feature = "accelerate")] +extern crate accelerate_src; + +use candle::{Device, Result, Tensor}; + +#[test] +fn kv_cache() -> Result<()> { + let mut cache = candle_nn::kv_cache::Cache::new(0, 16); + let data = cache.current_data()?; + assert!(data.is_none()); + let t = Tensor::new(&[1f32, 2., 3.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3.]); + let t = Tensor::new(&[4f32], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); + let t = Tensor::new(&[0f32, 5., 6., 7.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4., 0., 5., 6., 7.]); + Ok(()) +} From f9579f80bef5dac71e4d1e4ebeafc0b047923f27 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 12:46:18 +0200 Subject: [PATCH 03/13] Test the reset too. --- candle-nn/tests/kv_cache.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index c692a5baf1..cc016ff1c2 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -9,19 +9,24 @@ use candle::{Device, Result, Tensor}; #[test] fn kv_cache() -> Result<()> { let mut cache = candle_nn::kv_cache::Cache::new(0, 16); - let data = cache.current_data()?; - assert!(data.is_none()); - let t = Tensor::new(&[1f32, 2., 3.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [1., 2., 3.]); - let t = Tensor::new(&[4f32], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); - let t = Tensor::new(&[0f32, 5., 6., 7.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [1., 2., 3., 4., 0., 5., 6., 7.]); + for _ in [0, 1] { + assert_eq!(cache.current_seq_len(), 0); + let data = cache.current_data()?; + assert!(data.is_none()); + let t = Tensor::new(&[1f32, 2., 3.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3.]); + let t = Tensor::new(&[4f32], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); + let t = Tensor::new(&[0f32, 5., 6., 7.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4., 0., 5., 6., 7.]); + assert_eq!(cache.current_seq_len(), 8); + cache.reset(); + } Ok(()) } From 6547c4bfc3af054d78a0ce7b44a4c681d1c2abf0 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 12:51:07 +0200 Subject: [PATCH 04/13] More kv-cache testing. --- candle-nn/src/kv_cache.rs | 2 +- candle-nn/tests/kv_cache.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index 422bf112e9..02651cb08e 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -228,7 +228,7 @@ impl RotatingCache { self.offset = 0; } else { let rem_len = self.max_seq_len - self.offset; - if rem_len <= seq_len { + if seq_len <= rem_len { ad.slice_set(src, self.dim, self.offset)?; self.offset = (self.offset + seq_len) % self.max_seq_len; } else { diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index cc016ff1c2..91167b775e 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -30,3 +30,29 @@ fn kv_cache() -> Result<()> { } Ok(()) } + +#[test] +fn rotating_kv_cache() -> Result<()> { + let mut cache = candle_nn::kv_cache::RotatingCache::new(0, 6); + for _ in [0, 1] { + assert_eq!(cache.current_seq_len(), 0); + let data = cache.current_data()?; + assert!(data.is_none()); + let t = Tensor::new(&[1f32, 2., 3.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3.]); + let t = Tensor::new(&[4f32], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); + let t = Tensor::new(&[0f32, 5., 6., 7.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [6., 7., 3., 4., 0., 5.]); + assert_eq!(cache.current_seq_len(), 8); + assert_eq!(cache.offset(), 2); + cache.reset(); + } + Ok(()) +} From fc877920cecb1736c43970c7def4239d8daa8082 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 12:57:46 +0200 Subject: [PATCH 05/13] More tests for the rotating kv-cache. --- candle-nn/tests/kv_cache.rs | 49 ++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index 91167b775e..3fc40a571b 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -35,23 +35,60 @@ fn kv_cache() -> Result<()> { fn rotating_kv_cache() -> Result<()> { let mut cache = candle_nn::kv_cache::RotatingCache::new(0, 6); for _ in [0, 1] { + assert_eq!(cache.offset(), 0); assert_eq!(cache.current_seq_len(), 0); let data = cache.current_data()?; assert!(data.is_none()); - let t = Tensor::new(&[1f32, 2., 3.], &Device::Cpu)?; + let t = Tensor::new(&[1., 2., 3.], &Device::Cpu)?; cache.append(&t)?; let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [1., 2., 3.]); - let t = Tensor::new(&[4f32], &Device::Cpu)?; + assert_eq!(data.to_vec1::()?, [1., 2., 3.]); + let t = Tensor::new(&[4.], &Device::Cpu)?; cache.append(&t)?; let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); - let t = Tensor::new(&[0f32, 5., 6., 7.], &Device::Cpu)?; + assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); + let t = Tensor::new(&[0., 5., 6., 7.], &Device::Cpu)?; cache.append(&t)?; let data = cache.current_data()?.unwrap(); - assert_eq!(data.to_vec1::()?, [6., 7., 3., 4., 0., 5.]); + assert_eq!(data.to_vec1::()?, [6., 7., 3., 4., 0., 5.]); assert_eq!(cache.current_seq_len(), 8); assert_eq!(cache.offset(), 2); + + let t = Tensor::new(&[8.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [6., 7., 8., 4., 0., 5.]); + assert_eq!(cache.current_seq_len(), 9); + assert_eq!(cache.offset(), 3); + + let t = Tensor::new(&[9., 10., 11.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [6., 7., 8., 9., 10., 11.]); + assert_eq!(cache.current_seq_len(), 12); + assert_eq!(cache.offset(), 0); + + let t = Tensor::new(&[12.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [12., 7., 8., 9., 10., 11.]); + assert_eq!(cache.current_seq_len(), 13); + assert_eq!(cache.offset(), 1); + + let t = Tensor::new(&[0., 1., 2., 3., 4., 5., 6., 7., 8.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [3., 4., 5., 6., 7., 8.]); + assert_eq!(cache.current_seq_len(), 22); + assert_eq!(cache.offset(), 0); + + let t = Tensor::new(&[42.], &Device::Cpu)?; + cache.append(&t)?; + let data = cache.current_data()?.unwrap(); + assert_eq!(data.to_vec1::()?, [42., 4., 5., 6., 7., 8.]); + assert_eq!(cache.current_seq_len(), 23); + assert_eq!(cache.offset(), 1); + cache.reset(); } Ok(()) From 9964c6d86c2ff107285473f518d028c1553b8a63 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 13:23:23 +0200 Subject: [PATCH 06/13] Improve the api for the rotating cache so that the whole src tensor gets returned when it's overlarge. --- candle-nn/src/kv_cache.rs | 36 ++++++++++++------------------------ candle-nn/tests/kv_cache.rs | 24 ++++++++---------------- 2 files changed, 20 insertions(+), 40 deletions(-) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index 02651cb08e..59ff56486f 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -210,7 +210,7 @@ impl RotatingCache { self.all_data = None; } - pub fn append(&mut self, src: &Tensor) -> Result<()> { + pub fn append(&mut self, src: &Tensor) -> Result { let seq_len = src.dim(self.dim)?; // This doesn't seem very idiomatic but because the creation can fail, it's tricky to use // self.all_data.get_or_insert_with. @@ -222,10 +222,13 @@ impl RotatingCache { }; let ad = self.all_data.as_mut().unwrap(); + self.current_seq_len += seq_len; if seq_len >= self.max_seq_len { let src = src.narrow(self.dim, seq_len - self.max_seq_len, self.max_seq_len)?; ad.slice_set(&src, self.dim, 0)?; self.offset = 0; + // Here we return `src` rather than `ad` so that all the past can be used. + Ok(src) } else { let rem_len = self.max_seq_len - self.offset; if seq_len <= rem_len { @@ -241,9 +244,12 @@ impl RotatingCache { ad.slice_set(&src2, self.dim, 0)?; self.offset = seq_len - rem_len; } + if self.current_seq_len >= self.max_seq_len { + Ok(ad.clone()) + } else { + Ok(ad.narrow(self.dim, 0, self.current_seq_len)?) + } } - self.current_seq_len += seq_len; - Ok(()) } } @@ -285,27 +291,9 @@ impl RotatingKvCache { } pub fn append(&mut self, k: &Tensor, v: &Tensor) -> Result<(Tensor, Tensor)> { - self.k.append(k)?; - self.v.append(v)?; - let out_k = self.k.current_data()?; - let out_v = self.v.current_data()?; - let k = match out_k { - None => { - let mut shape = k.dims().to_vec(); - shape[self.k.dim] = 0; - Tensor::zeros(shape, k.dtype(), k.device())? - } - Some(k) => k, - }; - let v = match out_v { - None => { - let mut shape = v.dims().to_vec(); - shape[self.k.dim] = 0; - Tensor::zeros(shape, v.dtype(), v.device())? - } - Some(v) => v, - }; - Ok((k, v)) + let out_k = self.k.append(k)?; + let out_v = self.v.append(v)?; + Ok((out_k, out_v)) } pub fn offset(&self) -> usize { diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index 3fc40a571b..2f70f3d435 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -40,51 +40,43 @@ fn rotating_kv_cache() -> Result<()> { let data = cache.current_data()?; assert!(data.is_none()); let t = Tensor::new(&[1., 2., 3.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [1., 2., 3.]); let t = Tensor::new(&[4.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [1., 2., 3., 4.]); let t = Tensor::new(&[0., 5., 6., 7.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [6., 7., 3., 4., 0., 5.]); assert_eq!(cache.current_seq_len(), 8); assert_eq!(cache.offset(), 2); let t = Tensor::new(&[8.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [6., 7., 8., 4., 0., 5.]); assert_eq!(cache.current_seq_len(), 9); assert_eq!(cache.offset(), 3); let t = Tensor::new(&[9., 10., 11.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [6., 7., 8., 9., 10., 11.]); assert_eq!(cache.current_seq_len(), 12); assert_eq!(cache.offset(), 0); let t = Tensor::new(&[12.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [12., 7., 8., 9., 10., 11.]); assert_eq!(cache.current_seq_len(), 13); assert_eq!(cache.offset(), 1); let t = Tensor::new(&[0., 1., 2., 3., 4., 5., 6., 7., 8.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [3., 4., 5., 6., 7., 8.]); assert_eq!(cache.current_seq_len(), 22); assert_eq!(cache.offset(), 0); let t = Tensor::new(&[42.], &Device::Cpu)?; - cache.append(&t)?; - let data = cache.current_data()?.unwrap(); + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [42., 4., 5., 6., 7., 8.]); assert_eq!(cache.current_seq_len(), 23); assert_eq!(cache.offset(), 1); From 58c1e909d3ed07ce0bbc316bc80bcdd7ff109524 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 13:43:46 +0200 Subject: [PATCH 07/13] Handle contiguity + bugfix + use in mimi. --- candle-nn/src/kv_cache.rs | 16 ++++++++++------ candle-nn/tests/kv_cache.rs | 2 +- .../src/models/mimi/transformer.rs | 8 ++++---- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index 59ff56486f..9e860d6121 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -224,23 +224,27 @@ impl RotatingCache { self.current_seq_len += seq_len; if seq_len >= self.max_seq_len { - let src = src.narrow(self.dim, seq_len - self.max_seq_len, self.max_seq_len)?; - ad.slice_set(&src, self.dim, 0)?; + let to_copy = src + .narrow(self.dim, seq_len - self.max_seq_len, self.max_seq_len)? + .contiguous()?; + ad.slice_set(&to_copy, self.dim, 0)?; self.offset = 0; // Here we return `src` rather than `ad` so that all the past can be used. - Ok(src) + Ok(src.clone()) } else { let rem_len = self.max_seq_len - self.offset; if seq_len <= rem_len { - ad.slice_set(src, self.dim, self.offset)?; + ad.slice_set(&src.contiguous()?, self.dim, self.offset)?; self.offset = (self.offset + seq_len) % self.max_seq_len; } else { // We have to make two copies here as we go over the boundary of the cache. if rem_len > 0 { - let src1 = src.narrow(self.dim, 0, rem_len)?; + let src1 = src.narrow(self.dim, 0, rem_len)?.contiguous()?; ad.slice_set(&src1, self.dim, self.offset)?; } - let src2 = src.narrow(self.dim, rem_len, seq_len - rem_len)?; + let src2 = src + .narrow(self.dim, rem_len, seq_len - rem_len)? + .contiguous()?; ad.slice_set(&src2, self.dim, 0)?; self.offset = seq_len - rem_len; } diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index 2f70f3d435..88558d51d2 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -71,7 +71,7 @@ fn rotating_kv_cache() -> Result<()> { let t = Tensor::new(&[0., 1., 2., 3., 4., 5., 6., 7., 8.], &Device::Cpu)?; let data = cache.append(&t)?; - assert_eq!(data.to_vec1::()?, [3., 4., 5., 6., 7., 8.]); + assert_eq!(data.to_vec1::()?, [0., 1., 2., 3., 4., 5., 6., 7., 8.]); assert_eq!(cache.current_seq_len(), 22); assert_eq!(cache.offset(), 0); diff --git a/candle-transformers/src/models/mimi/transformer.rs b/candle-transformers/src/models/mimi/transformer.rs index de22127462..0fa7079235 100644 --- a/candle-transformers/src/models/mimi/transformer.rs +++ b/candle-transformers/src/models/mimi/transformer.rs @@ -127,7 +127,7 @@ pub struct StreamingMultiheadAttention { context: usize, neg_inf: Tensor, rope: Option>, - kv_cache: candle_nn::kv_cache::KvCache, + kv_cache: candle_nn::kv_cache::RotatingKvCache, pos: usize, use_flash_attn: bool, span: tracing::Span, @@ -153,7 +153,7 @@ impl StreamingMultiheadAttention { num_heads: cfg.num_heads, context: cfg.context, neg_inf, - kv_cache: candle_nn::kv_cache::KvCache::new(2, cfg.max_seq_len), + kv_cache: candle_nn::kv_cache::RotatingKvCache::new(2, cfg.context), pos: 0, use_flash_attn: false, span: tracing::span!(tracing::Level::TRACE, "mha"), @@ -236,7 +236,7 @@ impl StreamingMultiheadAttention { self.kv_cache.reset() } - pub fn set_kv_cache(&mut self, kv_cache: candle_nn::kv_cache::KvCache) { + pub fn set_kv_cache(&mut self, kv_cache: candle_nn::kv_cache::RotatingKvCache) { self.kv_cache = kv_cache } } @@ -582,7 +582,7 @@ impl StreamingTransformerLayer { self.self_attn.reset_kv_cache() } - pub fn set_kv_cache(&mut self, kv_cache: candle_nn::kv_cache::KvCache) { + pub fn set_kv_cache(&mut self, kv_cache: candle_nn::kv_cache::RotatingKvCache) { self.self_attn.set_kv_cache(kv_cache) } } From c79bf421c74da0ddaabdead6681dd59a5004deaf Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 14:14:05 +0200 Subject: [PATCH 08/13] Add a way to test the mimi streaming mode. --- candle-examples/examples/mimi/main.rs | 41 +++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/candle-examples/examples/mimi/main.rs b/candle-examples/examples/mimi/main.rs index cfc1a553e5..788b3fd9dd 100644 --- a/candle-examples/examples/mimi/main.rs +++ b/candle-examples/examples/mimi/main.rs @@ -39,6 +39,11 @@ struct Args { /// The model weight file, in safetensor format. #[arg(long)] model: Option, + + /// Whether to use streaming or not, when streaming slices of data of the given size are passed + /// to the encoder/decoder one at a time. + #[arg(long)] + streaming: Option, } fn main() -> Result<()> { @@ -87,20 +92,46 @@ fn main() -> Result<()> { pcm } }; - let pcm_len = pcm.len(); - let pcm = Tensor::from_vec(pcm, (1, 1, pcm_len), &device)?; - println!("input pcm shape: {:?}", pcm.shape()); - model.encode(&pcm)? + match args.streaming { + Some(chunk_size) => { + let mut code_chunks = vec![]; + for pcm in pcm.chunks(chunk_size) { + let pcm = Tensor::new(pcm, &device)?.reshape((1, 1, ()))?; + let code_chunk = model.encode(&pcm)?; + code_chunks.push(code_chunk) + } + Tensor::cat(&code_chunks, candle::D::Minus1)? + } + None => { + let pcm_len = pcm.len(); + let pcm = Tensor::from_vec(pcm, (1, 1, pcm_len), &device)?; + println!("input pcm shape: {:?}", pcm.shape()); + model.encode(&pcm)? + } + } } }; println!("codes shape: {:?}", codes.shape()); + model.reset_state(); match args.action { Action::AudioToCode => { codes.save_safetensors("codes", &args.out_file)?; } Action::AudioToAudio | Action::CodeToAudio => { - let pcm = model.decode(&codes)?; + let pcm = match args.streaming { + Some(chunk_size) => { + let seq_len = codes.dim(candle::D::Minus1)?; + let mut pcm_chunks = vec![]; + for chunk_start in (0..seq_len).step_by(chunk_size) { + let chunk_len = usize::min(chunk_size, seq_len - chunk_start); + let codes = codes.narrow(candle::D::Minus1, chunk_start, chunk_len)?; + pcm_chunks.push(model.decode(&codes)?) + } + Tensor::cat(&pcm_chunks, candle::D::Minus1)? + } + None => model.decode(&codes)?, + }; println!("output pcm shape: {:?}", pcm.shape()); let pcm = pcm.i(0)?.i(0)?; let pcm = candle_examples::audio::normalize_loudness(&pcm, 24_000, true)?; From 3277844fd9f8f8a57138ce5a2c2c3c9b96bb90fa Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 14:41:17 +0200 Subject: [PATCH 09/13] Mimi streaming fixes. --- candle-examples/examples/mimi/main.rs | 5 ++++- candle-transformers/src/models/mimi/transformer.rs | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/candle-examples/examples/mimi/main.rs b/candle-examples/examples/mimi/main.rs index 788b3fd9dd..0d9948b2e4 100644 --- a/candle-examples/examples/mimi/main.rs +++ b/candle-examples/examples/mimi/main.rs @@ -126,7 +126,10 @@ fn main() -> Result<()> { for chunk_start in (0..seq_len).step_by(chunk_size) { let chunk_len = usize::min(chunk_size, seq_len - chunk_start); let codes = codes.narrow(candle::D::Minus1, chunk_start, chunk_len)?; - pcm_chunks.push(model.decode(&codes)?) + let pcm = model.decode_step(&codes.into())?; + if let Some(pcm) = pcm.as_option() { + pcm_chunks.push(pcm.clone()) + } } Tensor::cat(&pcm_chunks, candle::D::Minus1)? } diff --git a/candle-transformers/src/models/mimi/transformer.rs b/candle-transformers/src/models/mimi/transformer.rs index 0fa7079235..6915d46007 100644 --- a/candle-transformers/src/models/mimi/transformer.rs +++ b/candle-transformers/src/models/mimi/transformer.rs @@ -216,6 +216,16 @@ impl StreamingMultiheadAttention { let pre_ws = match mask { None => pre_ws, Some(mask) => { + // This is a bit cumbersome and slightly incorrect: when providing a new slice + // the kv cache will have a slice offset rather than offset + t. In the mimi + // context of an offset of 250, this would not make much difference though. + let mask_len = mask.dim(D::Minus1)?; + let pre_ws_len = pre_ws.dim(D::Minus1)?; + let mask = if pre_ws_len < mask_len { + mask.narrow(D::Minus1, mask_len - pre_ws_len, pre_ws_len)? + } else { + mask.clone() + }; let mask = mask.broadcast_left((b, self.num_heads))?; let neg_inf = self.neg_inf.broadcast_as(pre_ws.shape())?; mask.where_cond(&neg_inf, &pre_ws)? From d6f01f625ddac8761ff7b8eacac9ef6ac9b09197 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 22 Sep 2024 14:53:41 +0200 Subject: [PATCH 10/13] More rotating kv-cache. --- .../src/models/mimi/transformer.rs | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/candle-transformers/src/models/mimi/transformer.rs b/candle-transformers/src/models/mimi/transformer.rs index 6915d46007..8a59606e54 100644 --- a/candle-transformers/src/models/mimi/transformer.rs +++ b/candle-transformers/src/models/mimi/transformer.rs @@ -216,16 +216,6 @@ impl StreamingMultiheadAttention { let pre_ws = match mask { None => pre_ws, Some(mask) => { - // This is a bit cumbersome and slightly incorrect: when providing a new slice - // the kv cache will have a slice offset rather than offset + t. In the mimi - // context of an offset of 250, this would not make much difference though. - let mask_len = mask.dim(D::Minus1)?; - let pre_ws_len = pre_ws.dim(D::Minus1)?; - let mask = if pre_ws_len < mask_len { - mask.narrow(D::Minus1, mask_len - pre_ws_len, pre_ws_len)? - } else { - mask.clone() - }; let mask = mask.broadcast_left((b, self.num_heads))?; let neg_inf = self.neg_inf.broadcast_as(pre_ws.shape())?; mask.where_cond(&neg_inf, &pre_ws)? @@ -639,18 +629,22 @@ impl StreamingTransformer { pub fn forward_ca(&mut self, xs: &Tensor, ca_src: Option<&Tensor>) -> Result { let (_b, t, c) = xs.dims3()?; - // We will extract at most "context" from the kv_cache. - // Note that the mask will discard the values that are before context. let pos = self.layers[0] .self_attn .kv_cache .k_cache() - .current_seq_len() - .min(self.context); + .current_seq_len(); let mask = if t == 1 { None } else { - Some(get_mask(t, pos + t, self.context, xs.device())?) + let cache_out_len = if t < self.context { + (pos + t).min(self.context) + } else { + t + }; + // TODO: this is wrong, the mask depends on the kv-cache offset because of its rotating + // nature. + Some(get_mask(t, cache_out_len, self.context, xs.device())?) }; let mut xs = match self.positional_embedding { PositionalEmbedding::Rope | PositionalEmbedding::None => xs.clone(), From fd53b9660d1ef021b376e0c415eabdcffeba65d5 Mon Sep 17 00:00:00 2001 From: Laurent Date: Mon, 23 Sep 2024 11:04:06 +0200 Subject: [PATCH 11/13] Fix the attn mask generation. --- candle-nn/src/kv_cache.rs | 43 ++++++++++++++++++- .../src/models/mimi/transformer.rs | 35 ++------------- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index 9e860d6121..4ca1a81da3 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -1,4 +1,4 @@ -use candle::{Result, Tensor}; +use candle::{Device, Result, Tensor}; #[derive(Debug, Clone)] pub struct Cache { @@ -255,6 +255,43 @@ impl RotatingCache { } } } + + fn get_mask(&self, size1: usize, size2: usize, device: &Device) -> Result { + let context = self.max_seq_len; + let upd_offset = (self.offset + size1) % self.max_seq_len; + let mask: Vec<_> = (0..size1) + .flat_map(|pos_src| { + // The absolute position of the elements that will get added to the cache. + let pos_src = self.current_seq_len + pos_src; + (0..size2).map(move |pos_cache_rel| { + // The absolute position of the cache elements after the addition. + let pos_cache = self.current_seq_len + size1 + pos_cache_rel - upd_offset; + let pos_cache = if pos_cache_rel < upd_offset { + pos_cache + } else { + pos_cache - self.max_seq_len + }; + u8::from(pos_cache > pos_src || pos_cache + context < pos_src) + }) + }) + .collect(); + Tensor::from_slice(&mask, (size1, size2), device) + } + + /// Returns the attn_mask to be applied *after* adding `seq_len` to the cache. + pub fn attn_mask(&self, seq_len: usize, device: &Device) -> Result> { + let mask = if seq_len == 1 { + None + } else { + let cache_out_len = if seq_len < self.max_seq_len { + (self.current_seq_len + seq_len).min(self.max_seq_len) + } else { + seq_len + }; + Some(self.get_mask(seq_len, cache_out_len, device)?) + }; + Ok(mask) + } } #[derive(Debug, Clone)] @@ -308,6 +345,10 @@ impl RotatingKvCache { self.k.current_seq_len() } + pub fn attn_mask(&self, seq_len: usize, device: &Device) -> Result> { + self.k.attn_mask(seq_len, device) + } + pub fn reset(&mut self) { self.k.reset(); self.v.reset(); diff --git a/candle-transformers/src/models/mimi/transformer.rs b/candle-transformers/src/models/mimi/transformer.rs index 8a59606e54..6ccbc8d12a 100644 --- a/candle-transformers/src/models/mimi/transformer.rs +++ b/candle-transformers/src/models/mimi/transformer.rs @@ -101,21 +101,6 @@ impl Module for LayerScale { } } -pub(crate) fn get_mask( - size1: usize, - size2: usize, - context: usize, - device: &Device, -) -> Result { - let mask: Vec<_> = (0..size1) - .flat_map(|i| { - (0..size2) - .map(move |j| u8::from(size1 + j > size2 + i || size1 + j + context < size2 + i)) - }) - .collect(); - Tensor::from_slice(&mask, (size1, size2), device) -} - #[derive(Debug, Clone)] pub struct StreamingMultiheadAttention { q_proj: Linear, @@ -590,7 +575,6 @@ impl StreamingTransformerLayer { #[derive(Debug, Clone)] pub struct StreamingTransformer { layers: Vec, - context: usize, positional_embedding: PositionalEmbedding, max_period: usize, } @@ -617,7 +601,6 @@ impl StreamingTransformer { } Ok(Self { layers, - context: cfg.context, positional_embedding: cfg.positional_embedding, max_period: cfg.max_period, }) @@ -629,23 +612,11 @@ impl StreamingTransformer { pub fn forward_ca(&mut self, xs: &Tensor, ca_src: Option<&Tensor>) -> Result { let (_b, t, c) = xs.dims3()?; - let pos = self.layers[0] + let pos = self.layers[0].self_attn.kv_cache.current_seq_len(); + let mask = self.layers[0] .self_attn .kv_cache - .k_cache() - .current_seq_len(); - let mask = if t == 1 { - None - } else { - let cache_out_len = if t < self.context { - (pos + t).min(self.context) - } else { - t - }; - // TODO: this is wrong, the mask depends on the kv-cache offset because of its rotating - // nature. - Some(get_mask(t, cache_out_len, self.context, xs.device())?) - }; + .attn_mask(t, xs.device())?; let mut xs = match self.positional_embedding { PositionalEmbedding::Rope | PositionalEmbedding::None => xs.clone(), PositionalEmbedding::Sin => { From 4244d59ace5265e2a254fe512e021d954810df9e Mon Sep 17 00:00:00 2001 From: Laurent Date: Mon, 23 Sep 2024 11:17:17 +0200 Subject: [PATCH 12/13] Handle the abs case. --- candle-nn/src/kv_cache.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/candle-nn/src/kv_cache.rs b/candle-nn/src/kv_cache.rs index 4ca1a81da3..68addb98bf 100644 --- a/candle-nn/src/kv_cache.rs +++ b/candle-nn/src/kv_cache.rs @@ -256,7 +256,19 @@ impl RotatingCache { } } - fn get_mask(&self, size1: usize, size2: usize, device: &Device) -> Result { + fn get_mask_abs(&self, size1: usize, size2: usize, device: &Device) -> Result { + let context = self.max_seq_len; + let mask: Vec<_> = (0..size1) + .flat_map(|i| { + (0..size2).map(move |j| { + u8::from(size1 + j > size2 + i || size1 + j + context < size2 + i) + }) + }) + .collect(); + Tensor::from_slice(&mask, (size1, size2), device) + } + + fn get_mask_rel(&self, size1: usize, size2: usize, device: &Device) -> Result { let context = self.max_seq_len; let upd_offset = (self.offset + size1) % self.max_seq_len; let mask: Vec<_> = (0..size1) @@ -283,12 +295,13 @@ impl RotatingCache { let mask = if seq_len == 1 { None } else { - let cache_out_len = if seq_len < self.max_seq_len { - (self.current_seq_len + seq_len).min(self.max_seq_len) + let mask = if seq_len < self.max_seq_len { + let cache_out_len = (self.current_seq_len + seq_len).min(self.max_seq_len); + self.get_mask_rel(seq_len, cache_out_len, device)? } else { - seq_len + self.get_mask_abs(seq_len, seq_len, device)? }; - Some(self.get_mask(seq_len, cache_out_len, device)?) + Some(mask) }; Ok(mask) } From 13a541eff7b3585b006ed0901872b243bc3c0f30 Mon Sep 17 00:00:00 2001 From: Laurent Date: Mon, 23 Sep 2024 13:11:39 +0200 Subject: [PATCH 13/13] Add some tests for the generated mask. --- candle-nn/tests/kv_cache.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/candle-nn/tests/kv_cache.rs b/candle-nn/tests/kv_cache.rs index 88558d51d2..b8d2ec48ab 100644 --- a/candle-nn/tests/kv_cache.rs +++ b/candle-nn/tests/kv_cache.rs @@ -69,13 +69,36 @@ fn rotating_kv_cache() -> Result<()> { assert_eq!(cache.current_seq_len(), 13); assert_eq!(cache.offset(), 1); + let mask = cache.attn_mask(2, &Device::Cpu)?.unwrap(); + assert_eq!( + mask.to_vec2::()?, + &[[0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0]] + ); + let mask = cache.attn_mask(3, &Device::Cpu)?.unwrap(); + assert_eq!( + mask.to_vec2::()?, + &[[0, 0, 1, 1, 0, 0], [0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 0]], + ); let t = Tensor::new(&[0., 1., 2., 3., 4., 5., 6., 7., 8.], &Device::Cpu)?; let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [0., 1., 2., 3., 4., 5., 6., 7., 8.]); assert_eq!(cache.current_seq_len(), 22); assert_eq!(cache.offset(), 0); + let mask = cache.attn_mask(1, &Device::Cpu)?; + assert!(mask.is_none()); + let mask = cache.attn_mask(2, &Device::Cpu)?.unwrap(); + assert_eq!( + mask.to_vec2::()?, + &[[0, 1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0]] + ); + let mask = cache.attn_mask(3, &Device::Cpu)?.unwrap(); + assert_eq!( + mask.to_vec2::()?, + &[[0, 1, 1, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0]] + ); let t = Tensor::new(&[42.], &Device::Cpu)?; + let data = cache.append(&t)?; assert_eq!(data.to_vec1::()?, [42., 4., 5., 6., 7., 8.]); assert_eq!(cache.current_seq_len(), 23);