1use crate::error::*;
6use futures::{stream::FuturesUnordered, StreamExt};
7use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
8use remote_settings::RemoteSettingsServer;
9use reqwest::Url;
10use serde::de::Error;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15use std::{path::PathBuf, sync::Arc};
16use walkdir::WalkDir;
17
18const DUMPS_DIR: &str = "dumps";
19
20pub struct CollectionDownloader {
21 client: reqwest::Client,
22 multi_progress: Arc<MultiProgress>,
23 output_dir: PathBuf,
24 url: Url,
25}
26
27#[derive(Clone)]
28pub struct CollectionUpdate {
29 collection_key: String,
30 attachments_updated: usize,
31}
32
33#[derive(Deserialize, Serialize)]
34pub struct CollectionData {
35 data: Vec<Value>,
36 timestamp: u64,
37}
38
39pub struct UpdateResult {
40 updated: Vec<String>,
41 up_to_date: Vec<String>,
42 not_found: Vec<String>,
43}
44
45#[derive(Debug, Deserialize, Serialize)]
46pub struct AttachmentMetadata {
47 pub location: String,
48 pub hash: String,
49 pub size: u64,
50}
51
52#[derive(Debug, Deserialize)]
53struct ServerInfo {
54 capabilities: Capabilities,
55}
56
57#[derive(Debug, Deserialize)]
58struct Capabilities {
59 attachments: AttachmentsCapability,
60}
61
62#[derive(Debug, Deserialize)]
63struct AttachmentsCapability {
64 base_url: String,
65}
66
67impl CollectionDownloader {
72 pub fn new(root_path: PathBuf) -> Self {
73 let url = RemoteSettingsServer::Prod
74 .get_url()
75 .expect("Cannot set RemoteSettingsServer url");
76
77 let output_dir = if root_path.ends_with("components/remote_settings") {
78 root_path
79 } else {
80 root_path.join("components").join("remote_settings")
81 };
82
83 Self {
84 client: reqwest::Client::new(),
85 multi_progress: Arc::new(MultiProgress::new()),
86 output_dir,
87 url,
88 }
89 }
90
91 pub async fn run(&self, dry_run: bool) -> Result<()> {
92 let result = self.download_all(dry_run).await?;
93
94 if dry_run {
95 println!("\nDry run summary:");
96 println!("- Would update {} collections", result.updated.len());
97 println!(
98 "- {} collections already up to date",
99 result.up_to_date.len()
100 );
101 println!(
102 "- {} collections not found on remote",
103 result.not_found.len()
104 );
105 return Ok(());
106 }
107
108 println!("\nExecution summary:");
109 if !result.updated.is_empty() {
110 println!("Updated collections:");
111 for collection in &result.updated {
112 println!(" - {}", collection);
113 }
114 }
115
116 if !result.up_to_date.is_empty() {
117 println!("Collections already up to date:");
118 for collection in &result.up_to_date {
119 println!(" - {}", collection);
120 }
121 }
122
123 if !result.not_found.is_empty() {
124 println!("Collections not found on remote:");
125 for collection in &result.not_found {
126 println!(" - {}", collection);
127 }
128 }
129
130 Ok(())
131 }
132
133 fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
134 let mut collections = HashMap::new();
135 let dumps_dir = self.output_dir.join(DUMPS_DIR);
136
137 for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
138 let entry = entry?;
139 if entry.file_type().is_file()
140 && entry.path().extension().is_some_and(|ext| ext == "json")
141 {
142 let bucket = entry
144 .path()
145 .parent()
146 .and_then(|p| p.file_name())
147 .and_then(|n| n.to_str())
148 .ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;
149
150 let collection_name = entry
152 .path()
153 .file_stem()
154 .and_then(|n| n.to_str())
155 .ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;
156
157 let content = std::fs::read_to_string(entry.path())?;
159 let data: serde_json::Value = serde_json::from_str(&content)?;
160 let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
161 RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
162 })?;
163
164 collections.insert(
165 format!("{}/{}", bucket, collection_name),
166 (bucket.to_string(), timestamp),
167 );
168 }
169 }
170 Ok(collections)
171 }
172
173 async fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
174 let monitor_url = format!("{}/buckets/monitor/collections/changes/records", self.url);
175 let monitor_response: Value = self.client.get(&monitor_url).send().await?.json().await?;
176
177 Ok(monitor_response["data"]
178 .as_array()
179 .ok_or_else(|| {
180 RemoteSettingsError::Json(serde_json::Error::custom(
181 "No data array in monitor response",
182 ))
183 })?
184 .iter()
185 .filter_map(|record| {
186 let bucket = record["bucket"].as_str()?;
187 let collection_name = record["collection"].as_str()?;
188 Some((
189 format!("{}/{}", bucket, collection_name),
190 record["last_modified"].as_u64()?,
191 ))
192 })
193 .collect())
194 }
195
196 async fn fetch_collection(
197 &self,
198 collection_name: String,
199 last_modified: u64,
200 pb: Arc<ProgressBar>,
201 ) -> Result<(String, CollectionData)> {
202 let parts: Vec<&str> = collection_name.split('/').collect();
203 if parts.len() != 2 {
204 return Err(RemoteSettingsError::Json(serde_json::Error::custom(
205 "Invalid collection name format",
206 ))
207 .into());
208 }
209 let (bucket, name) = (parts[0], parts[1]);
210
211 let url = format!(
212 "{}/buckets/{}/collections/{}/changeset?_expected={}",
213 self.url, bucket, name, last_modified
214 );
215
216 pb.set_message(format!("Downloading {}", name));
217
218 let response = self.client.get(&url).send().await?;
219 let changeset: Value = response.json().await?;
220
221 let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
222 RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
223 })?;
224
225 pb.finish_with_message(format!("Downloaded {}", name));
226
227 Ok((
228 collection_name,
229 CollectionData {
230 data: changeset["changes"]
231 .as_array()
232 .unwrap_or(&Vec::new())
233 .to_vec(),
234 timestamp,
235 },
236 ))
237 }
238
239 async fn get_attachments_base_url(&self) -> Result<String> {
240 let server_info: ServerInfo = self
241 .client
242 .get(self.url.as_str())
243 .send()
244 .await?
245 .json()
246 .await?;
247 Ok(server_info.capabilities.attachments.base_url)
248 }
249
250 async fn download_attachment(
251 &self,
252 base_url: &str,
253 record_id: &str,
254 attachment: &AttachmentMetadata,
255 pb: &ProgressBar,
256 ) -> Result<Vec<u8>> {
257 let url = format!("{}{}", base_url, attachment.location);
258 pb.set_message(format!("Downloading attachment for record {}", record_id));
259
260 let response = self.client.get(&url).send().await?;
261 let bytes = response.bytes().await?;
262 let data = bytes.to_vec();
263
264 if data.len() as u64 != attachment.size {
266 return Err(RemoteSettingsError::Attachment(format!(
267 "Size mismatch for attachment {}: expected {}, got {}",
268 record_id,
269 attachment.size,
270 data.len()
271 ))
272 .into());
273 }
274
275 let mut hasher = Sha256::new();
277 hasher.update(&data);
278 let hash = format!("{:x}", hasher.finalize());
279 if hash != attachment.hash {
280 return Err(RemoteSettingsError::Attachment(format!(
281 "Hash mismatch for attachment {}: expected {}, got {}",
282 record_id, attachment.hash, hash
283 ))
284 .into());
285 }
286
287 pb.set_message(format!("Verified attachment for record {}", record_id));
288 Ok(data)
289 }
290
291 fn get_attachment_paths(
292 &self,
293 bucket: &str,
294 collection: &str,
295 record_id: &str,
296 ) -> (PathBuf, PathBuf) {
297 let base_path = self
298 .output_dir
299 .join(DUMPS_DIR)
300 .join(bucket)
301 .join("attachments")
302 .join(collection);
303
304 (
305 base_path.join(record_id),
306 base_path.join(format!("{}.meta.json", record_id)),
307 )
308 }
309
310 fn is_attachment_up_to_date(
311 &self,
312 bucket: &str,
313 collection: &str,
314 record_id: &str,
315 remote_attachment: &AttachmentMetadata,
316 ) -> Result<bool> {
317 let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, record_id);
318
319 if !bin_path.exists() || !meta_path.exists() {
321 log::debug!(
322 "Attachment files missing for {}/{}/{}",
323 bucket,
324 collection,
325 record_id
326 );
327 return Ok(false);
328 }
329
330 let meta_content = std::fs::read_to_string(&meta_path)?;
332 let local_attachment: AttachmentMetadata = serde_json::from_str(&meta_content)?;
333
334 if local_attachment.hash != remote_attachment.hash
336 || local_attachment.size != remote_attachment.size
337 {
338 log::debug!(
339 "Attachment metadata mismatch for {}/{}/{}: local hash={}, size={}, remote hash={}, size={}",
340 bucket, collection, record_id,
341 local_attachment.hash, local_attachment.size,
342 remote_attachment.hash, remote_attachment.size
343 );
344 return Ok(false);
345 }
346
347 Ok(true)
348 }
349
350 async fn download_attachments_bundle(
351 &self,
352 bucket: &str,
353 collection: &str,
354 pb: &ProgressBar,
355 ) -> Result<()> {
356 let base_url = self.get_attachments_base_url().await?;
357 let url = format!("{}/bundles/{}--{}.zip", base_url, bucket, collection);
358
359 pb.set_message(format!(
360 "Downloading attachments bundle for {}/{}",
361 bucket, collection
362 ));
363
364 match self.client.get(&url).send().await {
366 Ok(response) => {
367 if response.status().is_success() {
368 let bytes = response.bytes().await?;
369 let bundle_path = self
370 .output_dir
371 .join(DUMPS_DIR)
372 .join(bucket)
373 .join("attachments")
374 .join(collection)
375 .with_extension("zip");
376
377 std::fs::create_dir_all(bundle_path.parent().unwrap())?;
378 std::fs::write(&bundle_path, bytes)?;
379
380 let file = std::fs::File::open(&bundle_path)?;
382 let mut archive = zip::ZipArchive::new(file)?;
383
384 let extract_path = bundle_path.parent().unwrap();
385 archive.extract(extract_path)?;
386
387 std::fs::remove_file(bundle_path)?;
389
390 pb.finish_with_message(format!(
391 "Downloaded and extracted attachments bundle for {}/{}",
392 bucket, collection
393 ));
394 return Ok(());
395 }
396 }
397 Err(e) => {
398 log::debug!("Failed to download or extract attachments bundle: {}", e);
399 }
400 }
401
402 Ok(())
403 }
404
405 async fn process_collection_update(
406 &self,
407 collection: String,
408 data: &mut CollectionData,
409 dry_run: bool,
410 ) -> Result<CollectionUpdate> {
411 let mut attachments_updated = 0;
412 let parts: Vec<&str> = collection.split('/').collect();
413
414 if parts.len() != 2 {
415 return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
416 }
417
418 let (bucket, name) = (parts[0], parts[1]);
419
420 if !dry_run {
421 let dumps_path = self
423 .output_dir
424 .join(DUMPS_DIR)
425 .join(bucket)
426 .join(format!("{}.json", name));
427
428 std::fs::create_dir_all(dumps_path.parent().unwrap())?;
429 if name == "search-config-v2" {
432 data.data.sort_by(|a, b| {
433 if a["recordType"] == b["recordType"] {
434 a["identifier"].as_str().cmp(&b["identifier"].as_str())
435 } else {
436 a["recordType"].as_str().cmp(&b["recordType"].as_str())
437 }
438 });
439 } else {
440 data.data.sort_by_key(|r| r["id"].to_string());
441 }
442 std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;
443
444 for record in &data.data {
446 if let Some(attachment) = record.get("attachment") {
447 let record_id = record["id"].as_str().ok_or_else(|| {
448 RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
449 })?;
450
451 let attachment: AttachmentMetadata =
452 serde_json::from_value(attachment.clone())?;
453 if !self.is_attachment_up_to_date(bucket, name, record_id, &attachment)? {
454 attachments_updated += 1;
455 }
456 }
457 }
458
459 if attachments_updated > 0 {
460 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
461 pb.set_style(
462 ProgressStyle::default_bar()
463 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
464 .unwrap(),
465 );
466
467 self.process_attachments(bucket, name, &data.data, &pb)
468 .await?;
469 }
470 }
471
472 Ok(CollectionUpdate {
473 collection_key: collection,
474 attachments_updated,
475 })
476 }
477
478 pub async fn download_all(&self, dry_run: bool) -> Result<UpdateResult> {
479 std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
480
481 let local_collections = self.scan_local_dumps()?;
482 if local_collections.is_empty() {
483 println!(
484 "No local collections found in {:?}",
485 self.output_dir.join(DUMPS_DIR)
486 );
487 return Ok(UpdateResult {
488 updated: vec![],
489 up_to_date: vec![],
490 not_found: vec![],
491 });
492 }
493
494 let remote_timestamps = self.fetch_timestamps().await?;
495 let mut updates_needed = Vec::new();
496 let mut up_to_date = Vec::new();
497 let mut not_found = Vec::new();
498
499 for (collection_key, (_, local_timestamp)) in local_collections {
501 let remote_timestamp = match remote_timestamps.get(&collection_key) {
502 Some(×tamp) => timestamp,
503 None => {
504 println!("Warning: Collection {} not found on remote", collection_key);
505 not_found.push(collection_key);
506 continue;
507 }
508 };
509
510 if local_timestamp >= remote_timestamp {
511 println!("Collection {} is up to date", collection_key);
512 up_to_date.push(collection_key);
513 continue;
514 }
515
516 println!("Collection {} needs update", collection_key);
517 updates_needed.push((collection_key, remote_timestamp));
518 }
519
520 if dry_run {
522 return Ok(UpdateResult {
523 updated: updates_needed.into_iter().map(|(key, _)| key).collect(),
524 up_to_date,
525 not_found,
526 });
527 }
528
529 let mut futures = FuturesUnordered::new();
531 let mut updated = Vec::new();
532
533 for (collection_key, remote_timestamp) in updates_needed {
534 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
535 pb.set_style(
536 ProgressStyle::default_bar()
537 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
538 .unwrap(),
539 );
540
541 let pb_clone = Arc::clone(&pb);
542 futures.push(async move {
543 let (collection, mut data) = self
544 .fetch_collection(collection_key, remote_timestamp, pb_clone)
545 .await?;
546 self.process_collection_update(collection, &mut data, dry_run)
547 .await
548 });
549 }
550
551 let mut updates = Vec::new();
552 while let Some(result) = futures.next().await {
553 let update = result?;
554 updates.push(update.clone());
555 updated.push(update.collection_key.clone());
556 }
557
558 Ok(UpdateResult {
559 updated,
560 up_to_date,
561 not_found,
562 })
563 }
564
565 pub async fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
566 std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
567
568 let collection_key = format!("{}/{}", bucket, collection_name);
569 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
570 pb.set_style(
571 ProgressStyle::default_bar()
572 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
573 .unwrap(),
574 );
575
576 let (collection, mut data) = self.fetch_collection(collection_key.clone(), 0, pb).await?;
577 let update = self
578 .process_collection_update(collection, &mut data, false)
579 .await?;
580
581 println!(
582 "Successfully downloaded collection to {:?}/dumps/{}/{}.json",
583 self.output_dir, bucket, collection_name
584 );
585
586 if update.attachments_updated > 0 {
587 println!("Updated {} attachments", update.attachments_updated);
588 }
589
590 Ok(())
591 }
592
593 async fn process_attachments(
594 &self,
595 bucket: &str,
596 collection: &str,
597 records: &[Value],
598 pb: &Arc<ProgressBar>,
599 ) -> Result<()> {
600 let base_url = self.get_attachments_base_url().await?;
601 let mut outdated_attachments = Vec::new();
602
603 for record in records {
605 if let Some(attachment) = record.get("attachment") {
606 let record_id = record["id"].as_str().ok_or_else(|| {
607 RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
608 })?;
609
610 let attachment: AttachmentMetadata = serde_json::from_value(attachment.clone())?;
611
612 if !self.is_attachment_up_to_date(bucket, collection, record_id, &attachment)? {
613 outdated_attachments.push((record_id.to_string(), attachment));
614 }
615 }
616 }
617
618 if outdated_attachments.is_empty() {
619 pb.finish_with_message(format!(
620 "All attachments up to date for {}/{}",
621 bucket, collection
622 ));
623 return Ok(());
624 }
625
626 if !outdated_attachments.is_empty() {
628 if let Ok(()) = self
629 .download_attachments_bundle(bucket, collection, pb)
630 .await
631 {
632 let mut still_outdated = Vec::new();
634 for (record_id, attachment) in outdated_attachments {
635 if !self.is_attachment_up_to_date(
636 bucket,
637 collection,
638 &record_id,
639 &attachment,
640 )? {
641 still_outdated.push((record_id, attachment));
642 }
643 }
644 outdated_attachments = still_outdated;
645 }
646 }
647
648 for (record_id, attachment) in outdated_attachments {
650 let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, &record_id);
651 std::fs::create_dir_all(bin_path.parent().unwrap())?;
652
653 let data = self
654 .download_attachment(&base_url, &record_id, &attachment, pb)
655 .await?;
656
657 std::fs::write(&bin_path, data)?;
658 std::fs::write(&meta_path, serde_json::to_string_pretty(&attachment)?)?;
659 }
660
661 pb.finish_with_message(format!("Updated attachments for {}/{}", bucket, collection));
662
663 Ok(())
664 }
665}