use crate::error::*;
use futures::{stream::FuturesUnordered, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use remote_settings::RemoteSettingsServer;
use reqwest::Url;
use serde::de::Error;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::{path::PathBuf, sync::Arc};
use walkdir::WalkDir;
const DUMPS_DIR: &str = "dumps";
pub struct CollectionDownloader {
client: reqwest::Client,
multi_progress: Arc<MultiProgress>,
output_dir: PathBuf,
url: Url,
}
#[derive(Clone)]
pub struct CollectionUpdate {
collection_key: String,
attachments_updated: usize,
}
#[derive(Deserialize, Serialize)]
pub struct CollectionData {
data: Vec<Value>,
timestamp: u64,
}
pub struct UpdateResult {
updated: Vec<String>,
up_to_date: Vec<String>,
not_found: Vec<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct AttachmentMetadata {
pub location: String,
pub hash: String,
pub size: u64,
}
#[derive(Debug, Deserialize)]
struct ServerInfo {
capabilities: Capabilities,
}
#[derive(Debug, Deserialize)]
struct Capabilities {
attachments: AttachmentsCapability,
}
#[derive(Debug, Deserialize)]
struct AttachmentsCapability {
base_url: String,
}
impl CollectionDownloader {
pub fn new(root_path: PathBuf) -> Self {
let url = RemoteSettingsServer::Prod
.get_url()
.expect("Cannot set RemoteSettingsServer url");
let output_dir = if root_path.ends_with("components/remote_settings") {
root_path
} else {
root_path.join("components").join("remote_settings")
};
Self {
client: reqwest::Client::new(),
multi_progress: Arc::new(MultiProgress::new()),
output_dir,
url,
}
}
pub async fn run(&self, dry_run: bool) -> Result<()> {
let result = self.download_all(dry_run).await?;
if dry_run {
println!("\nDry run summary:");
println!("- Would update {} collections", result.updated.len());
println!(
"- {} collections already up to date",
result.up_to_date.len()
);
println!(
"- {} collections not found on remote",
result.not_found.len()
);
return Ok(());
}
println!("\nExecution summary:");
if !result.updated.is_empty() {
println!("Updated collections:");
for collection in &result.updated {
println!(" - {}", collection);
}
}
if !result.up_to_date.is_empty() {
println!("Collections already up to date:");
for collection in &result.up_to_date {
println!(" - {}", collection);
}
}
if !result.not_found.is_empty() {
println!("Collections not found on remote:");
for collection in &result.not_found {
println!(" - {}", collection);
}
}
Ok(())
}
fn scan_local_dumps(&self) -> Result<HashMap<String, (String, u64)>> {
let mut collections = HashMap::new();
let dumps_dir = self.output_dir.join(DUMPS_DIR);
for entry in WalkDir::new(dumps_dir).min_depth(2).max_depth(2) {
let entry = entry?;
if entry.file_type().is_file()
&& entry.path().extension().map_or(false, |ext| ext == "json")
{
let bucket = entry
.path()
.parent()
.and_then(|p| p.file_name())
.and_then(|n| n.to_str())
.ok_or_else(|| RemoteSettingsError::Path("Invalid bucket path".into()))?;
let collection_name = entry
.path()
.file_stem()
.and_then(|n| n.to_str())
.ok_or_else(|| RemoteSettingsError::Path("Invalid collection name".into()))?;
let content = std::fs::read_to_string(entry.path())?;
let data: serde_json::Value = serde_json::from_str(&content)?;
let timestamp = data["timestamp"].as_u64().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No timestamp found"))
})?;
collections.insert(
format!("{}/{}", bucket, collection_name),
(bucket.to_string(), timestamp),
);
}
}
Ok(collections)
}
async fn fetch_timestamps(&self) -> Result<HashMap<String, u64>> {
let monitor_url = format!("{}/buckets/monitor/collections/changes/records", self.url);
let monitor_response: Value = self.client.get(&monitor_url).send().await?.json().await?;
Ok(monitor_response["data"]
.as_array()
.ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom(
"No data array in monitor response",
))
})?
.iter()
.filter_map(|record| {
let bucket = record["bucket"].as_str()?;
let collection_name = record["collection"].as_str()?;
Some((
format!("{}/{}", bucket, collection_name),
record["last_modified"].as_u64()?,
))
})
.collect())
}
async fn fetch_collection(
&self,
collection_name: String,
last_modified: u64,
pb: Arc<ProgressBar>,
) -> Result<(String, CollectionData)> {
let parts: Vec<&str> = collection_name.split('/').collect();
if parts.len() != 2 {
return Err(RemoteSettingsError::Json(serde_json::Error::custom(
"Invalid collection name format",
))
.into());
}
let (bucket, name) = (parts[0], parts[1]);
let url = format!(
"{}/buckets/{}/collections/{}/changeset?_expected={}",
self.url, bucket, name, last_modified
);
pb.set_message(format!("Downloading {}", name));
let response = self.client.get(&url).send().await?;
let changeset: Value = response.json().await?;
let timestamp = changeset["timestamp"].as_u64().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No timestamp in changeset"))
})?;
pb.finish_with_message(format!("Downloaded {}", name));
Ok((
collection_name,
CollectionData {
data: changeset["changes"]
.as_array()
.unwrap_or(&Vec::new())
.to_vec(),
timestamp,
},
))
}
async fn get_attachments_base_url(&self) -> Result<String> {
let server_info: ServerInfo = self
.client
.get(self.url.as_str())
.send()
.await?
.json()
.await?;
Ok(server_info.capabilities.attachments.base_url)
}
async fn download_attachment(
&self,
base_url: &str,
record_id: &str,
attachment: &AttachmentMetadata,
pb: &ProgressBar,
) -> Result<Vec<u8>> {
let url = format!("{}{}", base_url, attachment.location);
pb.set_message(format!("Downloading attachment for record {}", record_id));
let response = self.client.get(&url).send().await?;
let bytes = response.bytes().await?;
let data = bytes.to_vec();
if data.len() as u64 != attachment.size {
return Err(RemoteSettingsError::Attachment(format!(
"Size mismatch for attachment {}: expected {}, got {}",
record_id,
attachment.size,
data.len()
))
.into());
}
let mut hasher = Sha256::new();
hasher.update(&data);
let hash = format!("{:x}", hasher.finalize());
if hash != attachment.hash {
return Err(RemoteSettingsError::Attachment(format!(
"Hash mismatch for attachment {}: expected {}, got {}",
record_id, attachment.hash, hash
))
.into());
}
pb.set_message(format!("Verified attachment for record {}", record_id));
Ok(data)
}
fn get_attachment_paths(
&self,
bucket: &str,
collection: &str,
record_id: &str,
) -> (PathBuf, PathBuf) {
let base_path = self
.output_dir
.join(DUMPS_DIR)
.join(bucket)
.join("attachments")
.join(collection);
(
base_path.join(record_id),
base_path.join(format!("{}.meta.json", record_id)),
)
}
fn is_attachment_up_to_date(
&self,
bucket: &str,
collection: &str,
record_id: &str,
remote_attachment: &AttachmentMetadata,
) -> Result<bool> {
let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, record_id);
if !bin_path.exists() || !meta_path.exists() {
log::debug!(
"Attachment files missing for {}/{}/{}",
bucket,
collection,
record_id
);
return Ok(false);
}
let meta_content = std::fs::read_to_string(&meta_path)?;
let local_attachment: AttachmentMetadata = serde_json::from_str(&meta_content)?;
if local_attachment.hash != remote_attachment.hash
|| local_attachment.size != remote_attachment.size
{
log::debug!(
"Attachment metadata mismatch for {}/{}/{}: local hash={}, size={}, remote hash={}, size={}",
bucket, collection, record_id,
local_attachment.hash, local_attachment.size,
remote_attachment.hash, remote_attachment.size
);
return Ok(false);
}
Ok(true)
}
async fn download_attachments_bundle(
&self,
bucket: &str,
collection: &str,
pb: &ProgressBar,
) -> Result<()> {
let base_url = self.get_attachments_base_url().await?;
let url = format!("{}/bundles/{}--{}.zip", base_url, bucket, collection);
pb.set_message(format!(
"Downloading attachments bundle for {}/{}",
bucket, collection
));
match self.client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
let bytes = response.bytes().await?;
let bundle_path = self
.output_dir
.join(DUMPS_DIR)
.join(bucket)
.join("attachments")
.join(collection)
.with_extension("zip");
std::fs::create_dir_all(bundle_path.parent().unwrap())?;
std::fs::write(&bundle_path, bytes)?;
let file = std::fs::File::open(&bundle_path)?;
let mut archive = zip::ZipArchive::new(file)?;
let extract_path = bundle_path.parent().unwrap();
archive.extract(extract_path)?;
std::fs::remove_file(bundle_path)?;
pb.finish_with_message(format!(
"Downloaded and extracted attachments bundle for {}/{}",
bucket, collection
));
return Ok(());
}
}
Err(e) => {
log::debug!("Failed to download or extract attachments bundle: {}", e);
}
}
Ok(())
}
async fn process_collection_update(
&self,
collection: String,
data: CollectionData,
dry_run: bool,
) -> Result<CollectionUpdate> {
let mut attachments_updated = 0;
let parts: Vec<&str> = collection.split('/').collect();
if parts.len() != 2 {
return Err(RemoteSettingsError::Path("Invalid collection path".into()).into());
}
let (bucket, name) = (parts[0], parts[1]);
if !dry_run {
let dumps_path = self
.output_dir
.join(DUMPS_DIR)
.join(bucket)
.join(format!("{}.json", name));
std::fs::create_dir_all(dumps_path.parent().unwrap())?;
std::fs::write(&dumps_path, serde_json::to_string_pretty(&data)?)?;
for record in &data.data {
if let Some(attachment) = record.get("attachment") {
let record_id = record["id"].as_str().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
})?;
let attachment: AttachmentMetadata =
serde_json::from_value(attachment.clone())?;
if !self.is_attachment_up_to_date(bucket, name, record_id, &attachment)? {
attachments_updated += 1;
}
}
}
if attachments_updated > 0 {
let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
.unwrap(),
);
self.process_attachments(bucket, name, &data.data, &pb)
.await?;
}
}
Ok(CollectionUpdate {
collection_key: collection,
attachments_updated,
})
}
pub async fn download_all(&self, dry_run: bool) -> Result<UpdateResult> {
std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
let local_collections = self.scan_local_dumps()?;
if local_collections.is_empty() {
println!(
"No local collections found in {:?}",
self.output_dir.join(DUMPS_DIR)
);
return Ok(UpdateResult {
updated: vec![],
up_to_date: vec![],
not_found: vec![],
});
}
let remote_timestamps = self.fetch_timestamps().await?;
let mut updates_needed = Vec::new();
let mut up_to_date = Vec::new();
let mut not_found = Vec::new();
for (collection_key, (_, local_timestamp)) in local_collections {
let remote_timestamp = match remote_timestamps.get(&collection_key) {
Some(×tamp) => timestamp,
None => {
println!("Warning: Collection {} not found on remote", collection_key);
not_found.push(collection_key);
continue;
}
};
if local_timestamp >= remote_timestamp {
println!("Collection {} is up to date", collection_key);
up_to_date.push(collection_key);
continue;
}
println!("Collection {} needs update", collection_key);
updates_needed.push((collection_key, remote_timestamp));
}
if dry_run {
return Ok(UpdateResult {
updated: updates_needed.into_iter().map(|(key, _)| key).collect(),
up_to_date,
not_found,
});
}
let mut futures = FuturesUnordered::new();
let mut updated = Vec::new();
for (collection_key, remote_timestamp) in updates_needed {
let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
.unwrap(),
);
let pb_clone = Arc::clone(&pb);
futures.push(async move {
let (collection, data) = self
.fetch_collection(collection_key, remote_timestamp, pb_clone)
.await?;
self.process_collection_update(collection, data, dry_run)
.await
});
}
let mut updates = Vec::new();
while let Some(result) = futures.next().await {
let update = result?;
updates.push(update.clone());
updated.push(update.collection_key.clone());
}
Ok(UpdateResult {
updated,
up_to_date,
not_found,
})
}
pub async fn download_single(&self, bucket: &str, collection_name: &str) -> Result<()> {
std::fs::create_dir_all(self.output_dir.join(DUMPS_DIR))?;
let collection_key = format!("{}/{}", bucket, collection_name);
let pb = Arc::new(self.multi_progress.add(ProgressBar::new(100)));
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
.unwrap(),
);
let (collection, data) = self.fetch_collection(collection_key.clone(), 0, pb).await?;
let update = self
.process_collection_update(collection, data, false)
.await?;
println!(
"Successfully downloaded collection to {:?}/dumps/{}/{}.json",
self.output_dir, bucket, collection_name
);
if update.attachments_updated > 0 {
println!("Updated {} attachments", update.attachments_updated);
}
Ok(())
}
async fn process_attachments(
&self,
bucket: &str,
collection: &str,
records: &[Value],
pb: &Arc<ProgressBar>,
) -> Result<()> {
let base_url = self.get_attachments_base_url().await?;
let mut outdated_attachments = Vec::new();
for record in records {
if let Some(attachment) = record.get("attachment") {
let record_id = record["id"].as_str().ok_or_else(|| {
RemoteSettingsError::Json(serde_json::Error::custom("No record id"))
})?;
let attachment: AttachmentMetadata = serde_json::from_value(attachment.clone())?;
if !self.is_attachment_up_to_date(bucket, collection, record_id, &attachment)? {
outdated_attachments.push((record_id.to_string(), attachment));
}
}
}
if outdated_attachments.is_empty() {
pb.finish_with_message(format!(
"All attachments up to date for {}/{}",
bucket, collection
));
return Ok(());
}
if !outdated_attachments.is_empty() {
if let Ok(()) = self
.download_attachments_bundle(bucket, collection, pb)
.await
{
let mut still_outdated = Vec::new();
for (record_id, attachment) in outdated_attachments {
if !self.is_attachment_up_to_date(
bucket,
collection,
&record_id,
&attachment,
)? {
still_outdated.push((record_id, attachment));
}
}
outdated_attachments = still_outdated;
}
}
for (record_id, attachment) in outdated_attachments {
let (bin_path, meta_path) = self.get_attachment_paths(bucket, collection, &record_id);
std::fs::create_dir_all(bin_path.parent().unwrap())?;
let data = self
.download_attachment(&base_url, &record_id, &attachment, pb)
.await?;
std::fs::write(&bin_path, data)?;
std::fs::write(&meta_path, serde_json::to_string_pretty(&attachment)?)?;
}
pb.finish_with_message(format!("Updated attachments for {}/{}", bucket, collection));
Ok(())
}
}