1use crate::error::Error;
9#[cfg(feature = "sync-client")]
10use crate::error::ErrorResponse;
11
12use crate::error::warn;
13use std::collections::HashMap;
14use std::time;
15
16use serde::{ser, Serialize, Serializer};
17
18#[cfg(test)]
20fn assert_json<T>(v: &T, expected: serde_json::Value)
21where
22 T: serde::Serialize + ?Sized,
23{
24 assert_eq!(
25 serde_json::to_value(v).expect("should get a value"),
26 expected
27 );
28}
29
30#[derive(Debug, Serialize)]
32struct WhenTook {
33 when: f64,
34 #[serde(skip_serializing_if = "crate::skip_if_default")]
35 took: u64,
36}
37
38#[allow(dead_code)]
41#[derive(Debug)]
42enum Stopwatch {
43 Started(time::SystemTime, time::Instant),
44 Finished(WhenTook),
45}
46
47impl Default for Stopwatch {
48 fn default() -> Self {
49 Stopwatch::new()
50 }
51}
52
53impl Stopwatch {
54 fn new() -> Self {
55 Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
56 }
57
58 #[cfg(test)]
60 fn finished(&self) -> Self {
61 Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
62 }
63
64 #[cfg(not(test))]
65 fn finished(&self) -> Self {
66 match self {
67 Stopwatch::Started(st, si) => {
68 let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
69 let when = std.as_secs() as f64; let sid = si.elapsed();
72 let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
73 Stopwatch::Finished(WhenTook { when, took })
74 }
75 _ => {
76 unreachable!("can't finish twice");
77 }
78 }
79 }
80}
81
82impl Serialize for Stopwatch {
83 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
84 where
85 S: Serializer,
86 {
87 match self {
88 Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
89 Stopwatch::Finished(c) => c.serialize(serializer),
90 }
91 }
92}
93
94#[cfg(test)]
95mod stopwatch_tests {
96 use super::*;
97
98 #[derive(Debug, Serialize)]
101 struct WT {
102 #[serde(flatten)]
103 sw: Stopwatch,
104 }
105
106 #[test]
107 fn test_not_finished() {
108 let wt = WT {
109 sw: Stopwatch::new(),
110 };
111 serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
112 }
113
114 #[test]
115 fn test() {
116 assert_json(
117 &WT {
118 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
119 },
120 serde_json::json!({"when": 1.0, "took": 1}),
121 );
122 assert_json(
123 &WT {
124 sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
125 },
126 serde_json::json!({"when": 1.0}),
127 );
128 }
129}
130
131#[derive(Debug, Serialize)]
134pub struct Event {
135 object: &'static str,
137
138 method: &'static str,
139
140 #[serde(skip_serializing_if = "Option::is_none")]
142 value: Option<&'static str>,
143
144 #[serde(skip_serializing_if = "Option::is_none")]
146 extra: Option<HashMap<&'static str, String>>,
147}
148
149impl Event {
150 pub fn new(object: &'static str, method: &'static str) -> Self {
151 assert!(object.len() <= 20);
152 assert!(method.len() <= 20);
153 Self {
154 object,
155 method,
156 value: None,
157 extra: None,
158 }
159 }
160
161 pub fn value(mut self, v: &'static str) -> Self {
162 assert!(v.len() <= 80);
163 self.value = Some(v);
164 self
165 }
166
167 pub fn extra(mut self, key: &'static str, val: String) -> Self {
168 assert!(key.len() <= 15);
169 assert!(val.len() <= 85);
170 match self.extra {
171 None => self.extra = Some(HashMap::new()),
172 Some(ref e) => assert!(e.len() < 10),
173 }
174 self.extra.as_mut().unwrap().insert(key, val);
175 self
176 }
177}
178
179#[cfg(test)]
180mod test_events {
181 use super::*;
182
183 #[test]
184 #[should_panic]
185 fn test_invalid_length_ctor() {
186 Event::new("A very long object value", "Method");
187 }
188
189 #[test]
190 #[should_panic]
191 fn test_invalid_length_extra_key() {
192 Event::new("O", "M").extra("A very long key value", "v".to_string());
193 }
194
195 #[test]
196 #[should_panic]
197 fn test_invalid_length_extra_val() {
198 let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
199 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
200 Event::new("O", "M").extra("k", l.to_string());
201 }
202
203 #[test]
204 #[should_panic]
205 fn test_too_many_extras() {
206 let l = "abcdefghijk";
207 let mut e = Event::new("Object", "Method");
208 for i in 0..l.len() {
209 e = e.extra(&l[i..=i], "v".to_string());
210 }
211 }
212
213 #[test]
214 fn test_json() {
215 assert_json(
216 &Event::new("Object", "Method").value("Value"),
217 serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
218 );
219
220 assert_json(
221 &Event::new("Object", "Method").extra("one", "one".to_string()),
222 serde_json::json!({"object": "Object",
223 "method": "Method",
224 "extra": {"one": "one"}
225 }),
226 )
227 }
228}
229
230#[derive(Debug, Serialize)]
232#[serde(tag = "name")]
233pub enum SyncFailure {
234 #[serde(rename = "shutdownerror")]
235 Shutdown,
236
237 #[serde(rename = "othererror")]
238 Other { error: String },
239
240 #[serde(rename = "unexpectederror")]
241 Unexpected { error: String },
242
243 #[serde(rename = "autherror")]
244 Auth { from: &'static str },
245
246 #[serde(rename = "httperror")]
247 Http { code: u16 },
248}
249
250#[cfg(test)]
251mod test {
252 use super::*;
253
254 #[test]
255 fn reprs() {
256 assert_json(
257 &SyncFailure::Shutdown,
258 serde_json::json!({"name": "shutdownerror"}),
259 );
260
261 assert_json(
262 &SyncFailure::Other {
263 error: "dunno".to_string(),
264 },
265 serde_json::json!({"name": "othererror", "error": "dunno"}),
266 );
267
268 assert_json(
269 &SyncFailure::Unexpected {
270 error: "dunno".to_string(),
271 },
272 serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
273 );
274
275 assert_json(
276 &SyncFailure::Auth { from: "FxA" },
277 serde_json::json!({"name": "autherror", "from": "FxA"}),
278 );
279
280 assert_json(
281 &SyncFailure::Http { code: 500 },
282 serde_json::json!({"name": "httperror", "code": 500}),
283 );
284 }
285}
286
287#[derive(Debug, Default, Serialize)]
289pub struct EngineIncoming {
290 #[serde(skip_serializing_if = "crate::skip_if_default")]
291 applied: u32,
292
293 #[serde(skip_serializing_if = "crate::skip_if_default")]
294 failed: u32,
295
296 #[serde(rename = "newFailed")]
297 #[serde(skip_serializing_if = "crate::skip_if_default")]
298 new_failed: u32,
299
300 #[serde(skip_serializing_if = "crate::skip_if_default")]
301 reconciled: u32,
302}
303
304impl EngineIncoming {
305 pub fn new() -> Self {
306 Self {
307 ..Default::default()
308 }
309 }
310
311 fn is_empty(inc: &Option<Self>) -> bool {
313 match inc {
314 Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
315 None => true,
316 }
317 }
318
319 #[inline]
321 pub fn applied(&mut self, n: u32) {
322 self.applied += n;
323 }
324
325 #[inline]
327 pub fn failed(&mut self, n: u32) {
328 self.failed += n;
329 }
330
331 #[inline]
333 pub fn new_failed(&mut self, n: u32) {
334 self.new_failed += n;
335 }
336
337 #[inline]
339 pub fn reconciled(&mut self, n: u32) {
340 self.reconciled += n;
341 }
342
343 fn accum(&mut self, other: &EngineIncoming) {
346 self.applied += other.applied;
347 self.failed += other.failed;
348 self.new_failed += other.new_failed;
349 self.reconciled += other.reconciled;
350 }
351
352 #[inline]
354 pub fn get_applied(&self) -> u32 {
355 self.applied
356 }
357
358 #[inline]
360 pub fn get_failed(&self) -> u32 {
361 self.failed
362 }
363
364 #[inline]
366 pub fn get_new_failed(&self) -> u32 {
367 self.new_failed
368 }
369
370 #[inline]
372 pub fn get_reconciled(&self) -> u32 {
373 self.reconciled
374 }
375}
376
377#[derive(Debug, Default, Serialize)]
379pub struct EngineOutgoing {
380 #[serde(skip_serializing_if = "crate::skip_if_default")]
381 sent: usize,
382
383 #[serde(skip_serializing_if = "crate::skip_if_default")]
384 failed: usize,
385}
386
387impl EngineOutgoing {
388 pub fn new() -> Self {
389 EngineOutgoing {
390 ..Default::default()
391 }
392 }
393
394 #[inline]
395 pub fn sent(&mut self, n: usize) {
396 self.sent += n;
397 }
398
399 #[inline]
400 pub fn failed(&mut self, n: usize) {
401 self.failed += n;
402 }
403}
404
405#[derive(Debug, Serialize)]
407pub struct Engine {
408 name: String,
409
410 #[serde(flatten)]
411 when_took: Stopwatch,
412
413 #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
414 incoming: Option<EngineIncoming>,
415
416 #[serde(skip_serializing_if = "Vec::is_empty")]
417 outgoing: Vec<EngineOutgoing>, #[serde(skip_serializing_if = "Option::is_none")]
420 #[serde(rename = "failureReason")]
421 failure: Option<SyncFailure>,
422
423 #[serde(skip_serializing_if = "Option::is_none")]
424 validation: Option<Validation>,
425}
426
427impl Engine {
428 pub fn new(name: impl Into<String>) -> Self {
429 Self {
430 name: name.into(),
431 when_took: Stopwatch::new(),
432 incoming: None,
433 outgoing: Vec::new(),
434 failure: None,
435 validation: None,
436 }
437 }
438
439 pub fn incoming(&mut self, inc: EngineIncoming) {
440 match &mut self.incoming {
441 None => self.incoming = Some(inc),
442 Some(ref mut existing) => existing.accum(&inc),
443 };
444 }
445
446 pub fn get_incoming(&self) -> &Option<EngineIncoming> {
448 &self.incoming
449 }
450
451 pub fn outgoing(&mut self, out: EngineOutgoing) {
452 self.outgoing.push(out);
453 }
454
455 pub fn failure(&mut self, err: impl Into<SyncFailure>) {
456 let failure = err.into();
459 if self.failure.is_none() {
460 self.failure = Some(failure);
461 } else {
462 warn!(
463 "engine already has recorded a failure of {:?} - ignoring {:?}",
464 &self.failure, &failure
465 );
466 }
467 }
468
469 pub fn validation(&mut self, v: Validation) {
470 assert!(self.validation.is_none());
471 self.validation = Some(v);
472 }
473
474 fn finished(&mut self) {
475 self.when_took = self.when_took.finished();
476 }
477}
478
479#[derive(Debug, Default, Serialize)]
480pub struct Validation {
481 version: u32,
482
483 #[serde(skip_serializing_if = "Vec::is_empty")]
484 problems: Vec<Problem>,
485
486 #[serde(skip_serializing_if = "Option::is_none")]
487 #[serde(rename = "failureReason")]
488 failure: Option<SyncFailure>,
489}
490
491impl Validation {
492 pub fn with_version(version: u32) -> Validation {
493 Validation {
494 version,
495 ..Validation::default()
496 }
497 }
498
499 pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
500 if count > 0 {
501 self.problems.push(Problem { name, count });
502 }
503 self
504 }
505}
506
507#[derive(Debug, Default, Serialize)]
508pub struct Problem {
509 name: &'static str,
510 #[serde(skip_serializing_if = "crate::skip_if_default")]
511 count: usize,
512}
513
514#[cfg(test)]
515mod engine_tests {
516 use super::*;
517
518 #[test]
519 fn test_engine() {
520 let mut e = Engine::new("test_engine");
521 e.finished();
522 assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
523 }
524
525 #[test]
526 fn test_engine_not_finished() {
527 let e = Engine::new("test_engine");
528 serde_json::to_value(e).expect_err("unfinished stopwatch should fail");
529 }
530
531 #[test]
532 fn test_incoming() {
533 let mut i = EngineIncoming::new();
534 i.applied(1);
535 i.failed(2);
536 let mut e = Engine::new("TestEngine");
537 e.incoming(i);
538 e.finished();
539 assert_json(
540 &e,
541 serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
542 );
543 }
544
545 #[test]
546 fn test_incoming_accum() {
547 let mut e = Engine::new("TestEngine");
548 let mut i1 = EngineIncoming::new();
549 i1.applied(1);
550 i1.failed(2);
551 e.incoming(i1);
552 let mut i2 = EngineIncoming::new();
553 i2.applied(1);
554 i2.failed(1);
555 i2.reconciled(4);
556 e.incoming(i2);
557 e.finished();
558 assert_json(
559 &e,
560 serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 2, "failed": 3, "reconciled": 4}}),
561 );
562 }
563
564 #[test]
565 fn test_outgoing() {
566 let mut o = EngineOutgoing::new();
567 o.sent(2);
568 o.failed(1);
569 let mut e = Engine::new("TestEngine");
570 e.outgoing(o);
571 e.finished();
572 assert_json(
573 &e,
574 serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
575 );
576 }
577
578 #[test]
579 fn test_failure() {
580 let mut e = Engine::new("TestEngine");
581 e.failure(SyncFailure::Http { code: 500 });
582 e.finished();
583 assert_json(
584 &e,
585 serde_json::json!({"name": "TestEngine",
586 "when": 0.0,
587 "failureReason": {"name": "httperror", "code": 500}
588 }),
589 );
590 }
591
592 #[test]
593 fn test_raw() {
594 let mut e = Engine::new("TestEngine");
595 let mut inc = EngineIncoming::new();
596 inc.applied(10);
597 e.incoming(inc);
598 let mut out = EngineOutgoing::new();
599 out.sent(1);
600 e.outgoing(out);
601 e.failure(SyncFailure::Http { code: 500 });
602 e.finished();
603
604 assert_eq!(e.outgoing.len(), 1);
605 assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
606 assert_eq!(e.outgoing[0].sent, 1);
607 assert!(e.failure.is_some());
608 serde_json::to_string(&e).expect("should get json");
609 }
610}
611
612#[derive(Debug, Serialize, Default)]
614pub struct SyncTelemetry {
615 #[serde(flatten)]
616 when_took: Stopwatch,
617
618 #[serde(skip_serializing_if = "Vec::is_empty")]
619 engines: Vec<Engine>,
620
621 #[serde(skip_serializing_if = "Option::is_none")]
622 #[serde(rename = "failureReason")]
623 failure: Option<SyncFailure>,
624}
625
626impl SyncTelemetry {
627 pub fn new() -> Self {
628 Default::default()
629 }
630
631 pub fn engine(&mut self, mut e: Engine) {
632 e.finished();
633 self.engines.push(e);
634 }
635
636 pub fn failure(&mut self, failure: SyncFailure) {
637 assert!(self.failure.is_none());
638 self.failure = Some(failure);
639 }
640
641 pub fn finished(&mut self) {
645 self.when_took = self.when_took.finished();
646 }
647}
648
649#[cfg(test)]
650mod sync_tests {
651 use super::*;
652
653 #[test]
654 fn test_accum() {
655 let mut s = SyncTelemetry::new();
656 let mut inc = EngineIncoming::new();
657 inc.applied(10);
658 let mut e = Engine::new("test_engine");
659 e.incoming(inc);
660 e.failure(SyncFailure::Http { code: 500 });
661 e.finished();
662 s.engine(e);
663 s.finished();
664
665 assert_json(
666 &s,
667 serde_json::json!({
668 "when": 0.0,
669 "engines": [{
670 "name":"test_engine",
671 "when":0.0,
672 "incoming": {
673 "applied": 10
674 },
675 "failureReason": {
676 "name": "httperror",
677 "code": 500
678 }
679 }]
680 }),
681 );
682 }
683
684 #[test]
685 fn test_multi_engine() {
686 let mut inc_e1 = EngineIncoming::new();
687 inc_e1.applied(1);
688 let mut e1 = Engine::new("test_engine");
689 e1.incoming(inc_e1);
690
691 let mut inc_e2 = EngineIncoming::new();
692 inc_e2.failed(1);
693 let mut e2 = Engine::new("test_engine_2");
694 e2.incoming(inc_e2);
695 let mut out_e2 = EngineOutgoing::new();
696 out_e2.sent(1);
697 e2.outgoing(out_e2);
698
699 let mut s = SyncTelemetry::new();
700 s.engine(e1);
701 s.engine(e2);
702 s.failure(SyncFailure::Http { code: 500 });
703 s.finished();
704 assert_json(
705 &s,
706 serde_json::json!({
707 "when": 0.0,
708 "engines": [{
709 "name": "test_engine",
710 "when": 0.0,
711 "incoming": {
712 "applied": 1
713 }
714 },{
715 "name": "test_engine_2",
716 "when": 0.0,
717 "incoming": {
718 "failed": 1
719 },
720 "outgoing": [{
721 "sent": 1
722 }]
723 }],
724 "failureReason": {
725 "name": "httperror",
726 "code": 500
727 }
728 }),
729 );
730 }
731}
732
733#[derive(Debug, Serialize, Default)]
746pub struct SyncTelemetryPing {
747 version: u32,
748
749 uid: Option<String>,
750
751 #[serde(skip_serializing_if = "Vec::is_empty")]
752 events: Vec<Event>,
753
754 #[serde(skip_serializing_if = "Vec::is_empty")]
755 syncs: Vec<SyncTelemetry>,
756}
757
758impl SyncTelemetryPing {
759 pub fn new() -> Self {
760 Self {
761 version: 1,
762 ..Default::default()
763 }
764 }
765
766 pub fn uid(&mut self, uid: String) {
767 if let Some(ref existing) = self.uid {
768 if *existing != uid {
769 warn!("existing uid ${} being replaced by {}", existing, uid);
770 }
771 }
772 self.uid = Some(uid);
773 }
774
775 pub fn sync(&mut self, mut s: SyncTelemetry) {
776 s.finished();
777 self.syncs.push(s);
778 }
779
780 pub fn event(&mut self, e: Event) {
781 self.events.push(e);
782 }
783}
784
785#[cfg(test)]
786mod ping_tests {
787 use super::*;
788 #[test]
789 fn test_ping() {
790 let engine = Engine::new("test");
791 let mut s = SyncTelemetry::new();
792 s.engine(engine);
793 let mut p = SyncTelemetryPing::new();
794 p.uid("user-id".into());
795 p.sync(s);
796 let event = Event::new("foo", "bar");
797 p.event(event);
798 assert_json(
799 &p,
800 serde_json::json!({
801 "events": [{
802 "method": "bar", "object": "foo"
803 }],
804 "syncs": [{
805 "engines": [{
806 "name": "test", "when": 0.0
807 }],
808 "when": 0.0
809 }],
810 "uid": "user-id",
811 "version": 1
812 }),
813 );
814 }
815}
816
817impl From<&Error> for SyncFailure {
818 fn from(e: &Error) -> SyncFailure {
819 match e {
820 #[cfg(feature = "sync-client")]
821 Error::TokenserverHttpError(status) => {
822 if *status == 401 {
823 SyncFailure::Auth {
824 from: "tokenserver",
825 }
826 } else {
827 SyncFailure::Http { code: *status }
828 }
829 }
830 #[cfg(feature = "sync-client")]
831 Error::BackoffError(_) => SyncFailure::Http { code: 503 },
832 #[cfg(feature = "sync-client")]
833 Error::StorageHttpError(ref e) => match e {
834 ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 },
835 ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" },
836 ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 },
837 ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status },
838 ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status },
839 },
840 #[cfg(feature = "crypto")]
841 Error::CryptoError(ref e) => SyncFailure::Unexpected {
842 error: e.to_string(),
843 },
844 #[cfg(feature = "sync-client")]
845 Error::RequestError(ref e) => SyncFailure::Unexpected {
846 error: e.to_string(),
847 },
848 #[cfg(feature = "sync-client")]
849 Error::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status },
850 Error::Interrupted(ref e) => SyncFailure::Unexpected {
851 error: e.to_string(),
852 },
853 e => SyncFailure::Other {
854 error: e.to_string(),
855 },
856 }
857 }
858}