1use crate::error::*;
6use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
7use remote_settings::RemoteSettingsServer;
8use serde::de::Error;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::{path::PathBuf, sync::Arc};
14use url::Url;
15use viaduct::{Request, Response};
16use walkdir::WalkDir;
17
18const DUMPS_DIR: &str = "dumps";
19
20pub struct CollectionDownloader {
21 multi_progress: Arc<MultiProgress>,
22 output_dir: PathBuf,
23 url: Url,
24}
25
26#[derive(Clone)]
27pub struct CollectionUpdate {
28 collection_key: String,
29 attachments_updated: usize,
30}
31
32#[derive(Deserialize, Serialize)]
33pub struct CollectionData {
34 data: Vec<Value>,
35 timestamp: u64,
36}
37
38pub struct UpdateResult {
39 updated: Vec<String>,
40 up_to_date: Vec<String>,
41 not_found: Vec<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize)]
45pub struct AttachmentMetadata {
46 pub location: String,
47 pub hash: String,
48 pub size: u64,
49}
50
51#[derive(Debug, Deserialize)]
52struct ServerInfo {
53 capabilities: Capabilities,
54}
55
56#[derive(Debug, Deserialize)]
57struct Capabilities {
58 attachments: AttachmentsCapability,
59}
60
61#[derive(Debug, Deserialize)]
62struct AttachmentsCapability {
63 base_url: String,
64}
65
66impl CollectionDownloader {
71 pub fn new(root_path: PathBuf) -> Self {
72 let url = RemoteSettingsServer::Prod
73 .get_url()
74 .expect("Cannot set RemoteSettingsServer url");
75
76 let output_dir = if root_path.ends_with("components/remote_settings") {
77 root_path
78 } else {
79 root_path.join("components").join("remote_settings")
80 };
81
82 Self {
83 multi_progress: Arc::new(MultiProgress::new()),
84 output_dir,
85 url,
86 }
87 }
88
89 pub fn run(&self, dry_run: bool) -> Result<()> {
90 let result = self.download_all(dry_run)?;
91
92 if dry_run {
93 println!("\nDry run summary:");
94 println!("- Would update {} collections", result.updated.len());
95 println!(
96 "- {} collections already up to date",
97 result.up_to_date.len()
98 );
99 println!(
100 "- {} collections not found on remote",
101 result.not_found.len()
102 );
103 return Ok(());
104 }
105
106 println!("\nExecution summary:");
107 if !result.updated.is_empty() {
108 println!("Updated collections:");
109 for collection in &result.updated {
110 println!(" - {}", collection);
111 }
112 }
113
114 if !result.up_to_date.is_empty() {
115 println!("Collections already up to date:");
116 for collection in &result.up_to_date {
117 println!(" - {}", collection);
118 }
119 }
120
121 if !result.not_found.is_empty() {
122 println!("Collections not found on remote:");
123 for collection in &result.not_found {
124 println!(" - {}", collection);
125 }
126 }
127
128 Ok(())
129 }
130
131 fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
132 let mut collections = HashMap::new();
133 let dumps_dir = self.output_dir.join(DUMPS_DIR);
134
135 for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
136 let entry = entry?;
137 if entry.file_type().is_file()
138 && entry.path().extension().is_some_and(|ext| ext == "json")
139 {
140 let bucket = entry
142 .path()
143 .parent()
144 .and_then(|p| p.file_name())
145 .and_then(|n| n.to_str())
146 .ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;
147
148 let collection_name = entry
150 .path()
151 .file_stem()
152 .and_then(|n| n.to_str())
153 .ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;
154
155 let content = std::fs::read_to_string(entry.path())?;
157 let data: serde_json::Value = serde_json::from_str(&content)?;
158 let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
159 RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
160 })?;
161
162 collections.insert(
163 format!("{}/{}", bucket, collection_name),
164 (bucket.to_string(), timestamp),
165 );
166 }
167 }
168 Ok(collections)
169 }
170
171 fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
172 let monitor_url = format!("{}/buckets/monitor/collections/changes/records", self.url);
173 let monitor_response: Value = self.get(&monitor_url)?.json()?;
174
175 Ok(monitor_response["data"]
176 .as_array()
177 .ok_or_else(|| {
178 RemoteSettingsError::Json(serde_json::Error::custom(
179 "No data array in monitor response",
180 ))
181 })?
182 .iter()
183 .filter_map(|record| {
184 let bucket = record["bucket"].as_str()?;
185 let collection_name = record["collection"].as_str()?;
186 Some((
187 format!("{}/{}", bucket, collection_name),
188 record["last_modified"].as_u64()?,
189 ))
190 })
191 .collect())
192 }
193
194 fn fetch_collection(
195 &self,
196 collection_name: String,
197 last_modified: u64,
198 pb: Arc<ProgressBar>,
199 ) -> Result<(String, CollectionData)> {
200 let parts: Vec<&str> = collection_name.split('/').collect();
201 if parts.len() != 2 {
202 return Err(RemoteSettingsError::Json(serde_json::Error::custom(
203 "Invalid collection name format",
204 ))
205 .into());
206 }
207 let (bucket, name) = (parts[0], parts[1]);
208
209 let url = format!(
210 "{}/buckets/{}/collections/{}/changeset?_expected={}",
211 self.url, bucket, name, last_modified
212 );
213
214 pb.set_message(format!("Downloading {}", name));
215
216 let response = self.get(&url)?;
217 let changeset: Value = response.json()?;
218
219 let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
220 RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
221 })?;
222
223 pb.finish_with_message(format!("Downloaded {}", name));
224
225 Ok((
226 collection_name,
227 CollectionData {
228 data: changeset["changes"]
229 .as_array()
230 .unwrap_or(&Vec::new())
231 .to_vec(),
232 timestamp,
233 },
234 ))
235 }
236
237 fn get_attachments_base_url(&self) -> Result<String> {
238 let server_info: ServerInfo = self.get(self.url.as_str())?.json()?;
239 Ok(server_info.capabilities.attachments.base_url)
240 }
241
242 fn download_attachment(
243 &self,
244 base_url: &str,
245 record_id: &str,
246 attachment: &AttachmentMetadata,
247 pb: &ProgressBar,
248 ) -> Result<Vec<u8>> {
249 let url = format!("{}{}", base_url, attachment.location);
250 pb.set_message(format!("Downloading attachment for record {}", record_id));
251
252 let response = self.get(&url)?;
253 let data = response.body;
254
255 if data.len() as u64 != attachment.size {
257 return Err(RemoteSettingsError::Attachment(format!(
258 "Size mismatch for attachment {}: expected {}, got {}",
259 record_id,
260 attachment.size,
261 data.len()
262 ))
263 .into());
264 }
265
266 let mut hasher = Sha256::new();
268 hasher.update(&data);
269 let hash = format!("{:x}", hasher.finalize());
270 if hash != attachment.hash {
271 return Err(RemoteSettingsError::Attachment(format!(
272 "Hash mismatch for attachment {}: expected {}, got {}",
273 record_id, attachment.hash, hash
274 ))
275 .into());
276 }
277
278 pb.set_message(format!("Verified attachment for record {}", record_id));
279 Ok(data)
280 }
281
282 fn get_attachment_paths(
283 &self,
284 bucket: &str,
285 collection: &str,
286 record_id: &str,
287 ) -> (PathBuf, PathBuf) {
288 let base_path = self
289 .output_dir
290 .join(DUMPS_DIR)
291 .join(bucket)
292 .join("attachments")
293 .join(collection);
294
295 (
296 base_path.join(record_id),
297 base_path.join(format!("{}.meta.json", record_id)),
298 )
299 }
300
301 fn is_attachment_up_to_date(
302 &self,
303 bucket: &str,
304 collection: &str,
305 record_id: &str,
306 remote_attachment: &AttachmentMetadata,
307 ) -> Result<bool> {
308 let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, record_id);
309
310 if !bin_path.exists() || !meta_path.exists() {
312 log::debug!(
313 "Attachment files missing for {}/{}/{}",
314 bucket,
315 collection,
316 record_id
317 );
318 return Ok(false);
319 }
320
321 let meta_content = std::fs::read_to_string(&meta_path)?;
323 let local_attachment: AttachmentMetadata = serde_json::from_str(&meta_content)?;
324
325 if local_attachment.hash != remote_attachment.hash
327 || local_attachment.size != remote_attachment.size
328 {
329 log::debug!(
330 "Attachment metadata mismatch for {}/{}/{}: local hash={}, size={}, remote hash={}, size={}",
331 bucket, collection, record_id,
332 local_attachment.hash, local_attachment.size,
333 remote_attachment.hash, remote_attachment.size
334 );
335 return Ok(false);
336 }
337
338 Ok(true)
339 }
340
341 fn download_attachments_bundle(
342 &self,
343 bucket: &str,
344 collection: &str,
345 pb: &ProgressBar,
346 ) -> Result<()> {
347 let base_url = self.get_attachments_base_url()?;
348 let url = format!("{}/bundles/{}--{}.zip", base_url, bucket, collection);
349
350 pb.set_message(format!(
351 "Downloading attachments bundle for {}/{}",
352 bucket, collection
353 ));
354
355 match self.get(&url) {
357 Ok(response) => {
358 if response.status == 200 {
359 let bytes = response.body;
360 let bundle_path = self
361 .output_dir
362 .join(DUMPS_DIR)
363 .join(bucket)
364 .join("attachments")
365 .join(collection)
366 .with_extension("zip");
367
368 std::fs::create_dir_all(bundle_path.parent().unwrap())?;
369 std::fs::write(&bundle_path, bytes)?;
370
371 let file = std::fs::File::open(&bundle_path)?;
373 let mut archive = zip::ZipArchive::new(file)?;
374
375 let extract_path = bundle_path.parent().unwrap();
376 archive.extract(extract_path)?;
377
378 std::fs::remove_file(bundle_path)?;
380
381 pb.finish_with_message(format!(
382 "Downloaded and extracted attachments bundle for {}/{}",
383 bucket, collection
384 ));
385 return Ok(());
386 }
387 }
388 Err(e) => {
389 log::debug!("Failed to download or extract attachments bundle: {}", e);
390 }
391 }
392
393 Ok(())
394 }
395
396 fn process_collection_update(
397 &self,
398 collection: String,
399 data: &mut CollectionData,
400 dry_run: bool,
401 ) -> Result<CollectionUpdate> {
402 let mut attachments_updated = 0;
403 let parts: Vec<&str> = collection.split('/').collect();
404
405 if parts.len() != 2 {
406 return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
407 }
408
409 let (bucket, name) = (parts[0], parts[1]);
410
411 if !dry_run {
412 let dumps_path = self
414 .output_dir
415 .join(DUMPS_DIR)
416 .join(bucket)
417 .join(format!("{}.json", name));
418
419 std::fs::create_dir_all(dumps_path.parent().unwrap())?;
420 if name == "search-config-v2" {
423 data.data.sort_by(|a, b| {
424 if a["recordType"] == b["recordType"] {
425 a["identifier"].as_str().cmp(&b["identifier"].as_str())
426 } else {
427 a["recordType"].as_str().cmp(&b["recordType"].as_str())
428 }
429 });
430 } else {
431 data.data.sort_by_key(|r| r["id"].to_string());
432 }
433 std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;
434
435 for record in &data.data {
437 if let Some(attachment) = record.get("attachment") {
438 let record_id = record["id"].as_str().ok_or_else(|| {
439 RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
440 })?;
441
442 let attachment: AttachmentMetadata =
443 serde_json::from_value(attachment.clone())?;
444 if !self.is_attachment_up_to_date(bucket, name, record_id, &attachment)? {
445 attachments_updated += 1;
446 }
447 }
448 }
449
450 if attachments_updated > 0 {
451 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
452 pb.set_style(
453 ProgressStyle::default_bar()
454 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
455 .unwrap(),
456 );
457
458 self.process_attachments(bucket, name, &data.data, &pb)?;
459 }
460 }
461
462 Ok(CollectionUpdate {
463 collection_key: collection,
464 attachments_updated,
465 })
466 }
467
468 pub fn download_all(&self, dry_run: bool) -> Result<UpdateResult> {
469 std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
470
471 let local_collections = self.scan_local_dumps()?;
472 if local_collections.is_empty() {
473 println!(
474 "No local collections found in {:?}",
475 self.output_dir.join(DUMPS_DIR)
476 );
477 return Ok(UpdateResult {
478 updated: vec![],
479 up_to_date: vec![],
480 not_found: vec![],
481 });
482 }
483
484 let remote_timestamps = self.fetch_timestamps()?;
485 let mut updates_needed = Vec::new();
486 let mut up_to_date = Vec::new();
487 let mut not_found = Vec::new();
488
489 for (collection_key, (_, local_timestamp)) in local_collections {
491 let remote_timestamp = match remote_timestamps.get(&collection_key) {
492 Some(×tamp) => timestamp,
493 None => {
494 println!("Warning: Collection {} not found on remote", collection_key);
495 not_found.push(collection_key);
496 continue;
497 }
498 };
499
500 if local_timestamp >= remote_timestamp {
501 println!("Collection {} is up to date", collection_key);
502 up_to_date.push(collection_key);
503 continue;
504 }
505
506 println!("Collection {} needs update", collection_key);
507 updates_needed.push((collection_key, remote_timestamp));
508 }
509
510 if dry_run {
512 return Ok(UpdateResult {
513 updated: updates_needed.into_iter().map(|(key, _)| key).collect(),
514 up_to_date,
515 not_found,
516 });
517 }
518
519 let mut updates = Vec::new();
521 let mut updated = Vec::new();
522
523 for (collection_key, remote_timestamp) in updates_needed {
524 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
525 pb.set_style(
526 ProgressStyle::default_bar()
527 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
528 .unwrap(),
529 );
530
531 let pb_clone = Arc::clone(&pb);
532 let (collection, mut data) =
533 self.fetch_collection(collection_key, remote_timestamp, pb_clone)?;
534 let update = self.process_collection_update(collection, &mut data, dry_run)?;
535 updates.push(update.clone());
536 updated.push(update.collection_key.clone());
537 }
538
539 Ok(UpdateResult {
540 updated,
541 up_to_date,
542 not_found,
543 })
544 }
545
546 pub fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
547 std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
548
549 let collection_key = format!("{}/{}", bucket, collection_name);
550 let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
551 pb.set_style(
552 ProgressStyle::default_bar()
553 .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
554 .unwrap(),
555 );
556
557 let (collection, mut data) = self.fetch_collection(collection_key.clone(), 0, pb)?;
558 let update = self.process_collection_update(collection, &mut data, false)?;
559
560 println!(
561 "Successfully downloaded collection to {:?}/dumps/{}/{}.json",
562 self.output_dir, bucket, collection_name
563 );
564
565 if update.attachments_updated > 0 {
566 println!("Updated {} attachments", update.attachments_updated);
567 }
568
569 Ok(())
570 }
571
572 fn process_attachments(
573 &self,
574 bucket: &str,
575 collection: &str,
576 records: &[Value],
577 pb: &Arc<ProgressBar>,
578 ) -> Result<()> {
579 let base_url = self.get_attachments_base_url()?;
580 let mut outdated_attachments = Vec::new();
581
582 for record in records {
584 if let Some(attachment) = record.get("attachment") {
585 let record_id = record["id"].as_str().ok_or_else(|| {
586 RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
587 })?;
588
589 let attachment: AttachmentMetadata = serde_json::from_value(attachment.clone())?;
590
591 if !self.is_attachment_up_to_date(bucket, collection, record_id, &attachment)? {
592 outdated_attachments.push((record_id.to_string(), attachment));
593 }
594 }
595 }
596
597 if outdated_attachments.is_empty() {
598 pb.finish_with_message(format!(
599 "All attachments up to date for {}/{}",
600 bucket, collection
601 ));
602 return Ok(());
603 }
604
605 if !outdated_attachments.is_empty() {
607 if let Ok(()) = self.download_attachments_bundle(bucket, collection, pb) {
608 let mut still_outdated = Vec::new();
610 for (record_id, attachment) in outdated_attachments {
611 if !self.is_attachment_up_to_date(
612 bucket,
613 collection,
614 &record_id,
615 &attachment,
616 )? {
617 still_outdated.push((record_id, attachment));
618 }
619 }
620 outdated_attachments = still_outdated;
621 }
622 }
623
624 for (record_id, attachment) in outdated_attachments {
626 let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, &record_id);
627 std::fs::create_dir_all(bin_path.parent().unwrap())?;
628
629 let data = self.download_attachment(&base_url, &record_id, &attachment, pb)?;
630
631 std::fs::write(&bin_path, data)?;
632 std::fs::write(&meta_path, serde_json::to_string_pretty(&attachment)?)?;
633 }
634
635 pb.finish_with_message(format!("Updated attachments for {}/{}", bucket, collection));
636
637 Ok(())
638 }
639
640 fn get(&self, url: &str) -> Result<Response> {
641 let url = Url::parse(url)?;
642 Ok(Request::get(url).send()?)
643 }
644}