dump/
client.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::error::*;
6use futures::{stream::FuturesUnordered, StreamExt};
7use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
8use remote_settings::RemoteSettingsServer;
9use reqwest::Url;
10use serde::de::Error;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15use std::{path::PathBuf, sync::Arc};
16use walkdir::WalkDir;
17
18const DUMPS_DIR: &str = "dumps";
19
20pub struct CollectionDownloader {
21    client: reqwest::Client,
22    multi_progress: Arc<MultiProgress>,
23    output_dir: PathBuf,
24    url: Url,
25}
26
27#[derive(Clone)]
28pub struct CollectionUpdate {
29    collection_key: String,
30    attachments_updated: usize,
31}
32
33#[derive(Deserialize, Serialize)]
34pub struct CollectionData {
35    data: Vec<Value>,
36    timestamp: u64,
37}
38
39pub struct UpdateResult {
40    updated: Vec<String>,
41    up_to_date: Vec<String>,
42    not_found: Vec<String>,
43}
44
45#[derive(Debug, Deserialize, Serialize)]
46pub struct AttachmentMetadata {
47    pub location: String,
48    pub hash: String,
49    pub size: u64,
50}
51
52#[derive(Debug, Deserialize)]
53struct ServerInfo {
54    capabilities: Capabilities,
55}
56
57#[derive(Debug, Deserialize)]
58struct Capabilities {
59    attachments: AttachmentsCapability,
60}
61
62#[derive(Debug, Deserialize)]
63struct AttachmentsCapability {
64    base_url: String,
65}
66
67// fn sort_search_config_collection() {
68
69// }
70
71impl CollectionDownloader {
72    pub fn new(root_path: PathBuf) -> Self {
73        let url = RemoteSettingsServer::Prod
74            .get_url()
75            .expect("Cannot set RemoteSettingsServer url");
76
77        let output_dir = if root_path.ends_with("components/remote_settings") {
78            root_path
79        } else {
80            root_path.join("components").join("remote_settings")
81        };
82
83        Self {
84            client: reqwest::Client::new(),
85            multi_progress: Arc::new(MultiProgress::new()),
86            output_dir,
87            url,
88        }
89    }
90
91    pub async fn run(&self, dry_run: bool) -> Result<()> {
92        let result = self.download_all(dry_run).await?;
93
94        if dry_run {
95            println!("\nDry run summary:");
96            println!("- Would update {} collections", result.updated.len());
97            println!(
98                "- {} collections already up to date",
99                result.up_to_date.len()
100            );
101            println!(
102                "- {} collections not found on remote",
103                result.not_found.len()
104            );
105            return Ok(());
106        }
107
108        println!("\nExecution summary:");
109        if !result.updated.is_empty() {
110            println!("Updated collections:");
111            for collection in &result.updated {
112                println!("  - {}", collection);
113            }
114        }
115
116        if !result.up_to_date.is_empty() {
117            println!("Collections already up to date:");
118            for collection in &result.up_to_date {
119                println!("  - {}", collection);
120            }
121        }
122
123        if !result.not_found.is_empty() {
124            println!("Collections not found on remote:");
125            for collection in &result.not_found {
126                println!("  - {}", collection);
127            }
128        }
129
130        Ok(())
131    }
132
133    fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
134        let mut collections = HashMap::new();
135        let dumps_dir = self.output_dir.join(DUMPS_DIR);
136
137        for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
138            let entry = entry?;
139            if entry.file_type().is_file()
140                && entry.path().extension().is_some_and(|ext| ext == "json")
141            {
142                // Get bucket name from parent directory
143                let bucket = entry
144                    .path()
145                    .parent()
146                    .and_then(|p| p.file_name())
147                    .and_then(|n| n.to_str())
148                    .ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;
149
150                // Get collection name from filename
151                let collection_name = entry
152                    .path()
153                    .file_stem()
154                    .and_then(|n| n.to_str())
155                    .ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;
156
157                // Read and parse the file to get timestamp
158                let content = std::fs::read_to_string(entry.path())?;
159                let data: serde_json::Value = serde_json::from_str(&content)?;
160                let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
161                    RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
162                })?;
163
164                collections.insert(
165                    format!("{}/{}", bucket, collection_name),
166                    (bucket.to_string(), timestamp),
167                );
168            }
169        }
170        Ok(collections)
171    }
172
173    async fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
174        let monitor_url = format!("{}/buckets/monitor/collections/changes/records", self.url);
175        let monitor_response: Value = self.client.get(&monitor_url).send().await?.json().await?;
176
177        Ok(monitor_response["data"]
178            .as_array()
179            .ok_or_else(|| {
180                RemoteSettingsError::Json(serde_json::Error::custom(
181                    "No data array in monitor response",
182                ))
183            })?
184            .iter()
185            .filter_map(|record| {
186                let bucket = record["bucket"].as_str()?;
187                let collection_name = record["collection"].as_str()?;
188                Some((
189                    format!("{}/{}", bucket, collection_name),
190                    record["last_modified"].as_u64()?,
191                ))
192            })
193            .collect())
194    }
195
196    async fn fetch_collection(
197        &self,
198        collection_name: String,
199        last_modified: u64,
200        pb: Arc<ProgressBar>,
201    ) -> Result<(String, CollectionData)> {
202        let parts: Vec<&str> = collection_name.split('/').collect();
203        if parts.len() != 2 {
204            return Err(RemoteSettingsError::Json(serde_json::Error::custom(
205                "Invalid collection name format",
206            ))
207            .into());
208        }
209        let (bucket, name) = (parts[0], parts[1]);
210
211        let url = format!(
212            "{}/buckets/{}/collections/{}/changeset?_expected={}",
213            self.url, bucket, name, last_modified
214        );
215
216        pb.set_message(format!("Downloading {}", name));
217
218        let response = self.client.get(&url).send().await?;
219        let changeset: Value = response.json().await?;
220
221        let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
222            RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
223        })?;
224
225        pb.finish_with_message(format!("Downloaded {}", name));
226
227        Ok((
228            collection_name,
229            CollectionData {
230                data: changeset["changes"]
231                    .as_array()
232                    .unwrap_or(&Vec::new())
233                    .to_vec(),
234                timestamp,
235            },
236        ))
237    }
238
239    async fn get_attachments_base_url(&self) -> Result<String> {
240        let server_info: ServerInfo = self
241            .client
242            .get(self.url.as_str())
243            .send()
244            .await?
245            .json()
246            .await?;
247        Ok(server_info.capabilities.attachments.base_url)
248    }
249
250    async fn download_attachment(
251        &self,
252        base_url: &str,
253        record_id: &str,
254        attachment: &AttachmentMetadata,
255        pb: &ProgressBar,
256    ) -> Result<Vec<u8>> {
257        let url = format!("{}{}", base_url, attachment.location);
258        pb.set_message(format!("Downloading attachment for record {}", record_id));
259
260        let response = self.client.get(&url).send().await?;
261        let bytes = response.bytes().await?;
262        let data = bytes.to_vec();
263
264        // Verify size
265        if data.len() as u64 != attachment.size {
266            return Err(RemoteSettingsError::Attachment(format!(
267                "Size mismatch for attachment {}: expected {}, got {}",
268                record_id,
269                attachment.size,
270                data.len()
271            ))
272            .into());
273        }
274
275        // Verify hash
276        let mut hasher = Sha256::new();
277        hasher.update(&data);
278        let hash = format!("{:x}", hasher.finalize());
279        if hash != attachment.hash {
280            return Err(RemoteSettingsError::Attachment(format!(
281                "Hash mismatch for attachment {}: expected {}, got {}",
282                record_id, attachment.hash, hash
283            ))
284            .into());
285        }
286
287        pb.set_message(format!("Verified attachment for record {}", record_id));
288        Ok(data)
289    }
290
291    fn get_attachment_paths(
292        &self,
293        bucket: &str,
294        collection: &str,
295        record_id: &str,
296    ) -> (PathBuf, PathBuf) {
297        let base_path = self
298            .output_dir
299            .join(DUMPS_DIR)
300            .join(bucket)
301            .join("attachments")
302            .join(collection);
303
304        (
305            base_path.join(record_id),
306            base_path.join(format!("{}.meta.json", record_id)),
307        )
308    }
309
310    fn is_attachment_up_to_date(
311        &self,
312        bucket: &str,
313        collection: &str,
314        record_id: &str,
315        remote_attachment: &AttachmentMetadata,
316    ) -> Result<bool> {
317        let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, record_id);
318
319        // If either file doesn't exist, attachment needs update
320        if !bin_path.exists() || !meta_path.exists() {
321            log::debug!(
322                "Attachment files missing for {}/{}/{}",
323                bucket,
324                collection,
325                record_id
326            );
327            return Ok(false);
328        }
329
330        // Read and parse metadata file
331        let meta_content = std::fs::read_to_string(&meta_path)?;
332        let local_attachment: AttachmentMetadata = serde_json::from_str(&meta_content)?;
333
334        // Compare metadata
335        if local_attachment.hash != remote_attachment.hash
336            || local_attachment.size != remote_attachment.size
337        {
338            log::debug!(
339                "Attachment metadata mismatch for {}/{}/{}: local hash={}, size={}, remote hash={}, size={}",
340                bucket, collection, record_id,
341                local_attachment.hash, local_attachment.size,
342                remote_attachment.hash, remote_attachment.size
343            );
344            return Ok(false);
345        }
346
347        Ok(true)
348    }
349
350    async fn download_attachments_bundle(
351        &self,
352        bucket: &str,
353        collection: &str,
354        pb: &ProgressBar,
355    ) -> Result<()> {
356        let base_url = self.get_attachments_base_url().await?;
357        let url = format!("{}/bundles/{}--{}.zip", base_url, bucket, collection);
358
359        pb.set_message(format!(
360            "Downloading attachments bundle for {}/{}",
361            bucket, collection
362        ));
363
364        // Try to download the bundle
365        match self.client.get(&url).send().await {
366            Ok(response) => {
367                if response.status().is_success() {
368                    let bytes = response.bytes().await?;
369                    let bundle_path = self
370                        .output_dir
371                        .join(DUMPS_DIR)
372                        .join(bucket)
373                        .join("attachments")
374                        .join(collection)
375                        .with_extension("zip");
376
377                    std::fs::create_dir_all(bundle_path.parent().unwrap())?;
378                    std::fs::write(&bundle_path, bytes)?;
379
380                    // Extract bundle
381                    let file = std::fs::File::open(&bundle_path)?;
382                    let mut archive = zip::ZipArchive::new(file)?;
383
384                    let extract_path = bundle_path.parent().unwrap();
385                    archive.extract(extract_path)?;
386
387                    // Clean up zip file
388                    std::fs::remove_file(bundle_path)?;
389
390                    pb.finish_with_message(format!(
391                        "Downloaded and extracted attachments bundle for {}/{}",
392                        bucket, collection
393                    ));
394                    return Ok(());
395                }
396            }
397            Err(e) => {
398                log::debug!("Failed to download or extract attachments bundle: {}", e);
399            }
400        }
401
402        Ok(())
403    }
404
405    async fn process_collection_update(
406        &self,
407        collection: String,
408        data: &mut CollectionData,
409        dry_run: bool,
410    ) -> Result<CollectionUpdate> {
411        let mut attachments_updated = 0;
412        let parts: Vec<&str> = collection.split('/').collect();
413
414        if parts.len() != 2 {
415            return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
416        }
417
418        let (bucket, name) = (parts[0], parts[1]);
419
420        if !dry_run {
421            // Write collection data
422            let dumps_path = self
423                .output_dir
424                .join(DUMPS_DIR)
425                .join(bucket)
426                .join(format!("{}.json", name));
427
428            std::fs::create_dir_all(dumps_path.parent().unwrap())?;
429            // We sort both the keys and the records in search-config-v2 to make it
430            // easier to read and to experiment with making changes via the dump file.
431            if name == "search-config-v2" {
432                data.data.sort_by(|a, b| {
433                    if a["recordType"] == b["recordType"] {
434                        a["identifier"].as_str().cmp(&b["identifier"].as_str())
435                    } else {
436                        a["recordType"].as_str().cmp(&b["recordType"].as_str())
437                    }
438                });
439            } else {
440                data.data.sort_by_key(|r| r["id"].to_string());
441            }
442            std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;
443
444            // Count attachments needing updates
445            for record in &data.data {
446                if let Some(attachment) = record.get("attachment") {
447                    let record_id = record["id"].as_str().ok_or_else(|| {
448                        RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
449                    })?;
450
451                    let attachment: AttachmentMetadata =
452                        serde_json::from_value(attachment.clone())?;
453                    if !self.is_attachment_up_to_date(bucket, name, record_id, &attachment)? {
454                        attachments_updated += 1;
455                    }
456                }
457            }
458
459            if attachments_updated > 0 {
460                let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
461                pb.set_style(
462                    ProgressStyle::default_bar()
463                        .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
464                        .unwrap(),
465                );
466
467                self.process_attachments(bucket, name, &data.data, &pb)
468                    .await?;
469            }
470        }
471
472        Ok(CollectionUpdate {
473            collection_key: collection,
474            attachments_updated,
475        })
476    }
477
478    pub async fn download_all(&self, dry_run: bool) -> Result<UpdateResult> {
479        std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
480
481        let local_collections = self.scan_local_dumps()?;
482        if local_collections.is_empty() {
483            println!(
484                "No local collections found in {:?}",
485                self.output_dir.join(DUMPS_DIR)
486            );
487            return Ok(UpdateResult {
488                updated: vec![],
489                up_to_date: vec![],
490                not_found: vec![],
491            });
492        }
493
494        let remote_timestamps = self.fetch_timestamps().await?;
495        let mut updates_needed = Vec::new();
496        let mut up_to_date = Vec::new();
497        let mut not_found = Vec::new();
498
499        // First pass: check what needs updating
500        for (collection_key, (_, local_timestamp)) in local_collections {
501            let remote_timestamp = match remote_timestamps.get(&collection_key) {
502                Some(&timestamp) => timestamp,
503                None => {
504                    println!("Warning: Collection {} not found on remote", collection_key);
505                    not_found.push(collection_key);
506                    continue;
507                }
508            };
509
510            if local_timestamp >= remote_timestamp {
511                println!("Collection {} is up to date", collection_key);
512                up_to_date.push(collection_key);
513                continue;
514            }
515
516            println!("Collection {} needs update", collection_key);
517            updates_needed.push((collection_key, remote_timestamp));
518        }
519
520        // If it's a dry run, return early with what would be updated
521        if dry_run {
522            return Ok(UpdateResult {
523                updated: updates_needed.into_iter().map(|(key, _)| key).collect(),
524                up_to_date,
525                not_found,
526            });
527        }
528
529        // Actually perform the updates
530        let mut futures = FuturesUnordered::new();
531        let mut updated = Vec::new();
532
533        for (collection_key, remote_timestamp) in updates_needed {
534            let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
535            pb.set_style(
536                ProgressStyle::default_bar()
537                    .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
538                    .unwrap(),
539            );
540
541            let pb_clone = Arc::clone(&pb);
542            futures.push(async move {
543                let (collection, mut data) = self
544                    .fetch_collection(collection_key, remote_timestamp, pb_clone)
545                    .await?;
546                self.process_collection_update(collection, &mut data, dry_run)
547                    .await
548            });
549        }
550
551        let mut updates = Vec::new();
552        while let Some(result) = futures.next().await {
553            let update = result?;
554            updates.push(update.clone());
555            updated.push(update.collection_key.clone());
556        }
557
558        Ok(UpdateResult {
559            updated,
560            up_to_date,
561            not_found,
562        })
563    }
564
565    pub async fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
566        std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
567
568        let collection_key = format!("{}/{}", bucket, collection_name);
569        let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
570        pb.set_style(
571            ProgressStyle::default_bar()
572                .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
573                .unwrap(),
574        );
575
576        let (collection, mut data) = self.fetch_collection(collection_key.clone(), 0, pb).await?;
577        let update = self
578            .process_collection_update(collection, &mut data, false)
579            .await?;
580
581        println!(
582            "Successfully downloaded collection to {:?}/dumps/{}/{}.json",
583            self.output_dir, bucket, collection_name
584        );
585
586        if update.attachments_updated > 0 {
587            println!("Updated {} attachments", update.attachments_updated);
588        }
589
590        Ok(())
591    }
592
593    async fn process_attachments(
594        &self,
595        bucket: &str,
596        collection: &str,
597        records: &[Value],
598        pb: &Arc<ProgressBar>,
599    ) -> Result<()> {
600        let base_url = self.get_attachments_base_url().await?;
601        let mut outdated_attachments = Vec::new();
602
603        // First pass: check which attachments need updating
604        for record in records {
605            if let Some(attachment) = record.get("attachment") {
606                let record_id = record["id"].as_str().ok_or_else(|| {
607                    RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
608                })?;
609
610                let attachment: AttachmentMetadata = serde_json::from_value(attachment.clone())?;
611
612                if !self.is_attachment_up_to_date(bucket, collection, record_id, &attachment)? {
613                    outdated_attachments.push((record_id.to_string(), attachment));
614                }
615            }
616        }
617
618        if outdated_attachments.is_empty() {
619            pb.finish_with_message(format!(
620                "All attachments up to date for {}/{}",
621                bucket, collection
622            ));
623            return Ok(());
624        }
625
626        // Try bundle first if we have outdated attachments
627        if !outdated_attachments.is_empty() {
628            if let Ok(()) = self
629                .download_attachments_bundle(bucket, collection, pb)
630                .await
631            {
632                // Bundle downloaded successfully, verify all attachments now
633                let mut still_outdated = Vec::new();
634                for (record_id, attachment) in outdated_attachments {
635                    if !self.is_attachment_up_to_date(
636                        bucket,
637                        collection,
638                        &record_id,
639                        &attachment,
640                    )? {
641                        still_outdated.push((record_id, attachment));
642                    }
643                }
644                outdated_attachments = still_outdated;
645            }
646        }
647
648        // Download remaining outdated attachments individually
649        for (record_id, attachment) in outdated_attachments {
650            let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, &record_id);
651            std::fs::create_dir_all(bin_path.parent().unwrap())?;
652
653            let data = self
654                .download_attachment(&base_url, &record_id, &attachment, pb)
655                .await?;
656
657            std::fs::write(&bin_path, data)?;
658            std::fs::write(&meta_path, serde_json::to_string_pretty(&attachment)?)?;
659        }
660
661        pb.finish_with_message(format!("Updated attachments for {}/{}", bucket, collection));
662
663        Ok(())
664    }
665}