dump/
client.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::error::*;
6use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
7use remote_settings::RemoteSettingsServer;
8use serde::de::Error;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::{path::PathBuf, sync::Arc};
14use url::Url;
15use viaduct::{Request, Response};
16use walkdir::WalkDir;
17
18const DUMPS_DIR: &str = "dumps";
19
20pub struct CollectionDownloader {
21    multi_progress: Arc<MultiProgress>,
22    output_dir: PathBuf,
23    url: Url,
24}
25
26#[derive(Clone)]
27pub struct CollectionUpdate {
28    collection_key: String,
29    attachments_updated: usize,
30}
31
32#[derive(Deserialize, Serialize)]
33pub struct CollectionData {
34    data: Vec<Value>,
35    timestamp: u64,
36}
37
38pub struct UpdateResult {
39    updated: Vec<String>,
40    up_to_date: Vec<String>,
41    not_found: Vec<String>,
42}
43
44#[derive(Debug, Deserialize, Serialize)]
45pub struct AttachmentMetadata {
46    pub location: String,
47    pub hash: String,
48    pub size: u64,
49}
50
51#[derive(Debug, Deserialize)]
52struct ServerInfo {
53    capabilities: Capabilities,
54}
55
56#[derive(Debug, Deserialize)]
57struct Capabilities {
58    attachments: AttachmentsCapability,
59}
60
61#[derive(Debug, Deserialize)]
62struct AttachmentsCapability {
63    base_url: String,
64}
65
66// fn sort_search_config_collection() {
67
68// }
69
70impl CollectionDownloader {
71    pub fn new(root_path: PathBuf) -> Self {
72        let url = RemoteSettingsServer::Prod
73            .get_url()
74            .expect("Cannot set RemoteSettingsServer url");
75
76        let output_dir = if root_path.ends_with("components/remote_settings") {
77            root_path
78        } else {
79            root_path.join("components").join("remote_settings")
80        };
81
82        Self {
83            multi_progress: Arc::new(MultiProgress::new()),
84            output_dir,
85            url,
86        }
87    }
88
89    pub fn run(&self, dry_run: bool) -> Result<()> {
90        let result = self.download_all(dry_run)?;
91
92        if dry_run {
93            println!("\nDry run summary:");
94            println!("- Would update {} collections", result.updated.len());
95            println!(
96                "- {} collections already up to date",
97                result.up_to_date.len()
98            );
99            println!(
100                "- {} collections not found on remote",
101                result.not_found.len()
102            );
103            return Ok(());
104        }
105
106        println!("\nExecution summary:");
107        if !result.updated.is_empty() {
108            println!("Updated collections:");
109            for collection in &result.updated {
110                println!("  - {}", collection);
111            }
112        }
113
114        if !result.up_to_date.is_empty() {
115            println!("Collections already up to date:");
116            for collection in &result.up_to_date {
117                println!("  - {}", collection);
118            }
119        }
120
121        if !result.not_found.is_empty() {
122            println!("Collections not found on remote:");
123            for collection in &result.not_found {
124                println!("  - {}", collection);
125            }
126        }
127
128        Ok(())
129    }
130
131    fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
132        let mut collections = HashMap::new();
133        let dumps_dir = self.output_dir.join(DUMPS_DIR);
134
135        for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
136            let entry = entry?;
137            if entry.file_type().is_file()
138                && entry.path().extension().is_some_and(|ext| ext == "json")
139            {
140                // Get bucket name from parent directory
141                let bucket = entry
142                    .path()
143                    .parent()
144                    .and_then(|p| p.file_name())
145                    .and_then(|n| n.to_str())
146                    .ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;
147
148                // Get collection name from filename
149                let collection_name = entry
150                    .path()
151                    .file_stem()
152                    .and_then(|n| n.to_str())
153                    .ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;
154
155                // Read and parse the file to get timestamp
156                let content = std::fs::read_to_string(entry.path())?;
157                let data: serde_json::Value = serde_json::from_str(&content)?;
158                let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
159                    RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
160                })?;
161
162                collections.insert(
163                    format!("{}/{}", bucket, collection_name),
164                    (bucket.to_string(), timestamp),
165                );
166            }
167        }
168        Ok(collections)
169    }
170
171    fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
172        let monitor_url = format!("{}/buckets/monitor/collections/changes/records", self.url);
173        let monitor_response: Value = self.get(&monitor_url)?.json()?;
174
175        Ok(monitor_response["data"]
176            .as_array()
177            .ok_or_else(|| {
178                RemoteSettingsError::Json(serde_json::Error::custom(
179                    "No data array in monitor response",
180                ))
181            })?
182            .iter()
183            .filter_map(|record| {
184                let bucket = record["bucket"].as_str()?;
185                let collection_name = record["collection"].as_str()?;
186                Some((
187                    format!("{}/{}", bucket, collection_name),
188                    record["last_modified"].as_u64()?,
189                ))
190            })
191            .collect())
192    }
193
194    fn fetch_collection(
195        &self,
196        collection_name: String,
197        last_modified: u64,
198        pb: Arc<ProgressBar>,
199    ) -> Result<(String, CollectionData)> {
200        let parts: Vec<&str> = collection_name.split('/').collect();
201        if parts.len() != 2 {
202            return Err(RemoteSettingsError::Json(serde_json::Error::custom(
203                "Invalid collection name format",
204            ))
205            .into());
206        }
207        let (bucket, name) = (parts[0], parts[1]);
208
209        let url = format!(
210            "{}/buckets/{}/collections/{}/changeset?_expected={}",
211            self.url, bucket, name, last_modified
212        );
213
214        pb.set_message(format!("Downloading {}", name));
215
216        let response = self.get(&url)?;
217        let changeset: Value = response.json()?;
218
219        let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
220            RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
221        })?;
222
223        pb.finish_with_message(format!("Downloaded {}", name));
224
225        Ok((
226            collection_name,
227            CollectionData {
228                data: changeset["changes"]
229                    .as_array()
230                    .unwrap_or(&Vec::new())
231                    .to_vec(),
232                timestamp,
233            },
234        ))
235    }
236
237    fn get_attachments_base_url(&self) -> Result<String> {
238        let server_info: ServerInfo = self.get(self.url.as_str())?.json()?;
239        Ok(server_info.capabilities.attachments.base_url)
240    }
241
242    fn download_attachment(
243        &self,
244        base_url: &str,
245        record_id: &str,
246        attachment: &AttachmentMetadata,
247        pb: &ProgressBar,
248    ) -> Result<Vec<u8>> {
249        let url = format!("{}{}", base_url, attachment.location);
250        pb.set_message(format!("Downloading attachment for record {}", record_id));
251
252        let response = self.get(&url)?;
253        let data = response.body;
254
255        // Verify size
256        if data.len() as u64 != attachment.size {
257            return Err(RemoteSettingsError::Attachment(format!(
258                "Size mismatch for attachment {}: expected {}, got {}",
259                record_id,
260                attachment.size,
261                data.len()
262            ))
263            .into());
264        }
265
266        // Verify hash
267        let mut hasher = Sha256::new();
268        hasher.update(&data);
269        let hash = format!("{:x}", hasher.finalize());
270        if hash != attachment.hash {
271            return Err(RemoteSettingsError::Attachment(format!(
272                "Hash mismatch for attachment {}: expected {}, got {}",
273                record_id, attachment.hash, hash
274            ))
275            .into());
276        }
277
278        pb.set_message(format!("Verified attachment for record {}", record_id));
279        Ok(data)
280    }
281
282    fn get_attachment_paths(
283        &self,
284        bucket: &str,
285        collection: &str,
286        record_id: &str,
287    ) -> (PathBuf, PathBuf) {
288        let base_path = self
289            .output_dir
290            .join(DUMPS_DIR)
291            .join(bucket)
292            .join("attachments")
293            .join(collection);
294
295        (
296            base_path.join(record_id),
297            base_path.join(format!("{}.meta.json", record_id)),
298        )
299    }
300
301    fn is_attachment_up_to_date(
302        &self,
303        bucket: &str,
304        collection: &str,
305        record_id: &str,
306        remote_attachment: &AttachmentMetadata,
307    ) -> Result<bool> {
308        let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, record_id);
309
310        // If either file doesn't exist, attachment needs update
311        if !bin_path.exists() || !meta_path.exists() {
312            log::debug!(
313                "Attachment files missing for {}/{}/{}",
314                bucket,
315                collection,
316                record_id
317            );
318            return Ok(false);
319        }
320
321        // Read and parse metadata file
322        let meta_content = std::fs::read_to_string(&meta_path)?;
323        let local_attachment: AttachmentMetadata = serde_json::from_str(&meta_content)?;
324
325        // Compare metadata
326        if local_attachment.hash != remote_attachment.hash
327            || local_attachment.size != remote_attachment.size
328        {
329            log::debug!(
330                "Attachment metadata mismatch for {}/{}/{}: local hash={}, size={}, remote hash={}, size={}",
331                bucket, collection, record_id,
332                local_attachment.hash, local_attachment.size,
333                remote_attachment.hash, remote_attachment.size
334            );
335            return Ok(false);
336        }
337
338        Ok(true)
339    }
340
341    fn download_attachments_bundle(
342        &self,
343        bucket: &str,
344        collection: &str,
345        pb: &ProgressBar,
346    ) -> Result<()> {
347        let base_url = self.get_attachments_base_url()?;
348        let url = format!("{}/bundles/{}--{}.zip", base_url, bucket, collection);
349
350        pb.set_message(format!(
351            "Downloading attachments bundle for {}/{}",
352            bucket, collection
353        ));
354
355        // Try to download the bundle
356        match self.get(&url) {
357            Ok(response) => {
358                if response.status == 200 {
359                    let bytes = response.body;
360                    let bundle_path = self
361                        .output_dir
362                        .join(DUMPS_DIR)
363                        .join(bucket)
364                        .join("attachments")
365                        .join(collection)
366                        .with_extension("zip");
367
368                    std::fs::create_dir_all(bundle_path.parent().unwrap())?;
369                    std::fs::write(&bundle_path, bytes)?;
370
371                    // Extract bundle
372                    let file = std::fs::File::open(&bundle_path)?;
373                    let mut archive = zip::ZipArchive::new(file)?;
374
375                    let extract_path = bundle_path.parent().unwrap();
376                    archive.extract(extract_path)?;
377
378                    // Clean up zip file
379                    std::fs::remove_file(bundle_path)?;
380
381                    pb.finish_with_message(format!(
382                        "Downloaded and extracted attachments bundle for {}/{}",
383                        bucket, collection
384                    ));
385                    return Ok(());
386                }
387            }
388            Err(e) => {
389                log::debug!("Failed to download or extract attachments bundle: {}", e);
390            }
391        }
392
393        Ok(())
394    }
395
396    fn process_collection_update(
397        &self,
398        collection: String,
399        data: &mut CollectionData,
400        dry_run: bool,
401    ) -> Result<CollectionUpdate> {
402        let mut attachments_updated = 0;
403        let parts: Vec<&str> = collection.split('/').collect();
404
405        if parts.len() != 2 {
406            return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
407        }
408
409        let (bucket, name) = (parts[0], parts[1]);
410
411        if !dry_run {
412            // Write collection data
413            let dumps_path = self
414                .output_dir
415                .join(DUMPS_DIR)
416                .join(bucket)
417                .join(format!("{}.json", name));
418
419            std::fs::create_dir_all(dumps_path.parent().unwrap())?;
420            // We sort both the keys and the records in search-config-v2 to make it
421            // easier to read and to experiment with making changes via the dump file.
422            if name == "search-config-v2" {
423                data.data.sort_by(|a, b| {
424                    if a["recordType"] == b["recordType"] {
425                        a["identifier"].as_str().cmp(&b["identifier"].as_str())
426                    } else {
427                        a["recordType"].as_str().cmp(&b["recordType"].as_str())
428                    }
429                });
430            } else {
431                data.data.sort_by_key(|r| r["id"].to_string());
432            }
433            std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;
434
435            // Count attachments needing updates
436            for record in &data.data {
437                if let Some(attachment) = record.get("attachment") {
438                    let record_id = record["id"].as_str().ok_or_else(|| {
439                        RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
440                    })?;
441
442                    let attachment: AttachmentMetadata =
443                        serde_json::from_value(attachment.clone())?;
444                    if !self.is_attachment_up_to_date(bucket, name, record_id, &attachment)? {
445                        attachments_updated += 1;
446                    }
447                }
448            }
449
450            if attachments_updated > 0 {
451                let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
452                pb.set_style(
453                    ProgressStyle::default_bar()
454                        .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
455                        .unwrap(),
456                );
457
458                self.process_attachments(bucket, name, &data.data, &pb)?;
459            }
460        }
461
462        Ok(CollectionUpdate {
463            collection_key: collection,
464            attachments_updated,
465        })
466    }
467
468    pub fn download_all(&self, dry_run: bool) -> Result<UpdateResult> {
469        std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
470
471        let local_collections = self.scan_local_dumps()?;
472        if local_collections.is_empty() {
473            println!(
474                "No local collections found in {:?}",
475                self.output_dir.join(DUMPS_DIR)
476            );
477            return Ok(UpdateResult {
478                updated: vec![],
479                up_to_date: vec![],
480                not_found: vec![],
481            });
482        }
483
484        let remote_timestamps = self.fetch_timestamps()?;
485        let mut updates_needed = Vec::new();
486        let mut up_to_date = Vec::new();
487        let mut not_found = Vec::new();
488
489        // First pass: check what needs updating
490        for (collection_key, (_, local_timestamp)) in local_collections {
491            let remote_timestamp = match remote_timestamps.get(&collection_key) {
492                Some(&timestamp) => timestamp,
493                None => {
494                    println!("Warning: Collection {} not found on remote", collection_key);
495                    not_found.push(collection_key);
496                    continue;
497                }
498            };
499
500            if local_timestamp >= remote_timestamp {
501                println!("Collection {} is up to date", collection_key);
502                up_to_date.push(collection_key);
503                continue;
504            }
505
506            println!("Collection {} needs update", collection_key);
507            updates_needed.push((collection_key, remote_timestamp));
508        }
509
510        // If it's a dry run, return early with what would be updated
511        if dry_run {
512            return Ok(UpdateResult {
513                updated: updates_needed.into_iter().map(|(key, _)| key).collect(),
514                up_to_date,
515                not_found,
516            });
517        }
518
519        // Actually perform the updates
520        let mut updates = Vec::new();
521        let mut updated = Vec::new();
522
523        for (collection_key, remote_timestamp) in updates_needed {
524            let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
525            pb.set_style(
526                ProgressStyle::default_bar()
527                    .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
528                    .unwrap(),
529            );
530
531            let pb_clone = Arc::clone(&pb);
532            let (collection, mut data) =
533                self.fetch_collection(collection_key, remote_timestamp, pb_clone)?;
534            let update = self.process_collection_update(collection, &mut data, dry_run)?;
535            updates.push(update.clone());
536            updated.push(update.collection_key.clone());
537        }
538
539        Ok(UpdateResult {
540            updated,
541            up_to_date,
542            not_found,
543        })
544    }
545
546    pub fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
547        std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
548
549        let collection_key = format!("{}/{}", bucket, collection_name);
550        let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
551        pb.set_style(
552            ProgressStyle::default_bar()
553                .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
554                .unwrap(),
555        );
556
557        let (collection, mut data) = self.fetch_collection(collection_key.clone(), 0, pb)?;
558        let update = self.process_collection_update(collection, &mut data, false)?;
559
560        println!(
561            "Successfully downloaded collection to {:?}/dumps/{}/{}.json",
562            self.output_dir, bucket, collection_name
563        );
564
565        if update.attachments_updated > 0 {
566            println!("Updated {} attachments", update.attachments_updated);
567        }
568
569        Ok(())
570    }
571
572    fn process_attachments(
573        &self,
574        bucket: &str,
575        collection: &str,
576        records: &[Value],
577        pb: &Arc<ProgressBar>,
578    ) -> Result<()> {
579        let base_url = self.get_attachments_base_url()?;
580        let mut outdated_attachments = Vec::new();
581
582        // First pass: check which attachments need updating
583        for record in records {
584            if let Some(attachment) = record.get("attachment") {
585                let record_id = record["id"].as_str().ok_or_else(|| {
586                    RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
587                })?;
588
589                let attachment: AttachmentMetadata = serde_json::from_value(attachment.clone())?;
590
591                if !self.is_attachment_up_to_date(bucket, collection, record_id, &attachment)? {
592                    outdated_attachments.push((record_id.to_string(), attachment));
593                }
594            }
595        }
596
597        if outdated_attachments.is_empty() {
598            pb.finish_with_message(format!(
599                "All attachments up to date for {}/{}",
600                bucket, collection
601            ));
602            return Ok(());
603        }
604
605        // Try bundle first if we have outdated attachments
606        if !outdated_attachments.is_empty() {
607            if let Ok(()) = self.download_attachments_bundle(bucket, collection, pb) {
608                // Bundle downloaded successfully, verify all attachments now
609                let mut still_outdated = Vec::new();
610                for (record_id, attachment) in outdated_attachments {
611                    if !self.is_attachment_up_to_date(
612                        bucket,
613                        collection,
614                        &record_id,
615                        &attachment,
616                    )? {
617                        still_outdated.push((record_id, attachment));
618                    }
619                }
620                outdated_attachments = still_outdated;
621            }
622        }
623
624        // Download remaining outdated attachments individually
625        for (record_id, attachment) in outdated_attachments {
626            let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, &record_id);
627            std::fs::create_dir_all(bin_path.parent().unwrap())?;
628
629            let data = self.download_attachment(&base_url, &record_id, &attachment, pb)?;
630
631            std::fs::write(&bin_path, data)?;
632            std::fs::write(&meta_path, serde_json::to_string_pretty(&attachment)?)?;
633        }
634
635        pb.finish_with_message(format!("Updated attachments for {}/{}", bucket, collection));
636
637        Ok(())
638    }
639
640    fn get(&self, url: &str) -> Result<Response> {
641        let url = Url::parse(url)?;
642        Ok(Request::get(url).send()?)
643    }
644}