sync15/client/
request.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::storage_client::Sync15ClientResponse;
6use crate::bso::OutgoingEncryptedBso;
7use crate::error::{self, debug, info, warn, Error as ErrorKind, Result};
8use crate::ServerTimestamp;
9use serde_derive::*;
10use std::collections::HashMap;
11use std::default::Default;
12use std::ops::Deref;
13use sync_guid::Guid;
14use viaduct::status_codes;
15
16/// Manages a pair of (byte, count) limits for a PostQueue, such as
17/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records).
18#[derive(Debug, Clone)]
19struct LimitTracker {
20    max_bytes: usize,
21    max_records: usize,
22    cur_bytes: usize,
23    cur_records: usize,
24}
25
26impl LimitTracker {
27    pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker {
28        LimitTracker {
29            max_bytes,
30            max_records,
31            cur_bytes: 0,
32            cur_records: 0,
33        }
34    }
35
36    pub fn clear(&mut self) {
37        self.cur_records = 0;
38        self.cur_bytes = 0;
39    }
40
41    pub fn can_add_record(&self, payload_size: usize) -> bool {
42        // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that
43        // don't have https://github.com/mozilla-services/server-syncstorage/issues/73
44        self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes
45    }
46
47    pub fn can_never_add(&self, record_size: usize) -> bool {
48        record_size >= self.max_bytes
49    }
50
51    pub fn record_added(&mut self, record_size: usize) {
52        assert!(
53            self.can_add_record(record_size),
54            "LimitTracker::record_added caller must check can_add_record"
55        );
56        self.cur_records += 1;
57        self.cur_bytes += record_size;
58    }
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub struct InfoConfiguration {
63    /// The maximum size in bytes of the overall HTTP request body that will be accepted by the
64    /// server.
65    #[serde(default = "default_max_request_bytes")]
66    pub max_request_bytes: usize,
67
68    /// The maximum number of records that can be uploaded to a collection in a single POST request.
69    #[serde(default = "usize::max_value")]
70    pub max_post_records: usize,
71
72    /// The maximum combined size in bytes of the record payloads that can be uploaded to a
73    /// collection in a single POST request.
74    #[serde(default = "usize::max_value")]
75    pub max_post_bytes: usize,
76
77    /// The maximum total number of records that can be uploaded to a collection as part of a
78    /// batched upload.
79    #[serde(default = "usize::max_value")]
80    pub max_total_records: usize,
81
82    /// The maximum total combined size in bytes of the record payloads that can be uploaded to a
83    /// collection as part of a batched upload.
84    #[serde(default = "usize::max_value")]
85    pub max_total_bytes: usize,
86
87    /// The maximum size of an individual BSO payload, in bytes.
88    #[serde(default = "default_max_record_payload_bytes")]
89    pub max_record_payload_bytes: usize,
90}
91
92// This is annoying but seems to be the only way to do it...
93fn default_max_request_bytes() -> usize {
94    260 * 1024
95}
96fn default_max_record_payload_bytes() -> usize {
97    256 * 1024
98}
99
100impl Default for InfoConfiguration {
101    #[inline]
102    fn default() -> InfoConfiguration {
103        InfoConfiguration {
104            max_request_bytes: default_max_request_bytes(),
105            max_record_payload_bytes: default_max_record_payload_bytes(),
106            max_post_records: usize::MAX,
107            max_post_bytes: usize::MAX,
108            max_total_records: usize::MAX,
109            max_total_bytes: usize::MAX,
110        }
111    }
112}
113
114#[derive(Clone, Debug, Default, Deserialize, Serialize)]
115pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>);
116
117impl InfoCollections {
118    pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections {
119        InfoCollections(collections)
120    }
121}
122
123impl Deref for InfoCollections {
124    type Target = HashMap<String, ServerTimestamp>;
125
126    fn deref(&self) -> &HashMap<String, ServerTimestamp> {
127        &self.0
128    }
129}
130
131#[derive(Debug, Clone, Deserialize)]
132pub struct UploadResult {
133    batch: Option<String>,
134    /// Maps record id => why failed
135    #[serde(default = "HashMap::new")]
136    pub failed: HashMap<Guid, String>,
137    /// Vec of ids
138    #[serde(default = "Vec::new")]
139    pub success: Vec<Guid>,
140}
141
142pub type PostResponse = Sync15ClientResponse<UploadResult>;
143
144#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
145pub enum BatchState {
146    Unsupported,
147    NoBatch,
148    InBatch(String),
149}
150
151#[derive(Debug)]
152pub struct PostQueue<Post, OnResponse> {
153    poster: Post,
154    on_response: OnResponse,
155    post_limits: LimitTracker,
156    batch_limits: LimitTracker,
157    max_payload_bytes: usize,
158    max_request_bytes: usize,
159    queued: Vec<u8>,
160    batch: BatchState,
161    last_modified: ServerTimestamp,
162}
163
164pub trait BatchPoster {
165    /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing
166    /// Important: Poster should not report non-success HTTP statuses as errors!!
167    fn post<P, O>(
168        &self,
169        body: Vec<u8>,
170        xius: ServerTimestamp,
171        batch: Option<String>,
172        commit: bool,
173        queue: &PostQueue<P, O>,
174    ) -> Result<PostResponse>;
175}
176
177// We don't just use a FnMut here since we want to override it in mocking for RefCell<TestType>,
178// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this
179// is somewhat better for documentation.
180pub trait PostResponseHandler {
181    fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>;
182}
183
184#[derive(Debug, Clone)]
185pub(crate) struct NormalResponseHandler {
186    pub failed_ids: Vec<Guid>,
187    pub successful_ids: Vec<Guid>,
188    pub allow_failed: bool,
189    pub pending_failed: Vec<Guid>,
190    pub pending_success: Vec<Guid>,
191}
192
193impl NormalResponseHandler {
194    pub fn new(allow_failed: bool) -> NormalResponseHandler {
195        NormalResponseHandler {
196            failed_ids: vec![],
197            successful_ids: vec![],
198            pending_failed: vec![],
199            pending_success: vec![],
200            allow_failed,
201        }
202    }
203}
204
205impl PostResponseHandler for NormalResponseHandler {
206    fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> {
207        match r {
208            Sync15ClientResponse::Success { record, .. } => {
209                if !record.failed.is_empty() && !self.allow_failed {
210                    return Err(ErrorKind::RecordUploadFailed);
211                }
212                for id in record.success.iter() {
213                    self.pending_success.push(id.clone());
214                }
215                for kv in record.failed.iter() {
216                    self.pending_failed.push(kv.0.clone());
217                }
218                if !mid_batch {
219                    self.successful_ids.append(&mut self.pending_success);
220                    self.failed_ids.append(&mut self.pending_failed);
221                }
222                Ok(())
223            }
224            _ => Err(r.create_storage_error()),
225        }
226    }
227}
228
229impl<Poster, OnResponse> PostQueue<Poster, OnResponse>
230where
231    Poster: BatchPoster,
232    OnResponse: PostResponseHandler,
233{
234    pub fn new(
235        config: &InfoConfiguration,
236        ts: ServerTimestamp,
237        poster: Poster,
238        on_response: OnResponse,
239    ) -> PostQueue<Poster, OnResponse> {
240        PostQueue {
241            poster,
242            on_response,
243            last_modified: ts,
244            post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records),
245            batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records),
246            batch: BatchState::NoBatch,
247            max_payload_bytes: config.max_record_payload_bytes,
248            max_request_bytes: config.max_request_bytes,
249            queued: Vec::new(),
250        }
251    }
252
253    #[inline]
254    fn in_batch(&self) -> bool {
255        !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch)
256    }
257
258    pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result<bool> {
259        let payload_length = record.serialized_payload_len();
260
261        if self.post_limits.can_never_add(payload_length)
262            || self.batch_limits.can_never_add(payload_length)
263            || payload_length >= self.max_payload_bytes
264        {
265            warn!(
266                "Single record too large to submit to server ({} b)",
267                payload_length
268            );
269            return Ok(false);
270        }
271
272        // Write directly into `queued` but undo if necessary (the vast majority of the time
273        // it won't be necessary). If we hit a problem we need to undo that, but the only error
274        // case we have to worry about right now is in flush()
275        let item_start = self.queued.len();
276
277        // This is conservative but can't hurt.
278        self.queued.reserve(payload_length + 2);
279
280        // Either the first character in an array, or a comma separating
281        // it from the previous item.
282        let c = if self.queued.is_empty() { b'[' } else { b',' };
283        self.queued.push(c);
284
285        // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object
286        // keys, which is impossible. If you decide to change this part, you *need* to call
287        // `self.queued.truncate(item_start)` here in the failure case!
288        serde_json::to_writer(&mut self.queued, &record).unwrap();
289
290        let item_end = self.queued.len();
291
292        debug_assert!(
293            item_end >= payload_length,
294            "EncryptedPayload::serialized_len is bugged"
295        );
296
297        // The + 1 is only relevant for the final record, which will have a trailing ']'.
298        let item_len = item_end - item_start + 1;
299
300        if item_len >= self.max_request_bytes {
301            self.queued.truncate(item_start);
302            warn!(
303                "Single record too large to submit to server ({} b)",
304                item_len
305            );
306            return Ok(false);
307        }
308
309        let can_post_record = self.post_limits.can_add_record(payload_length);
310        let can_batch_record = self.batch_limits.can_add_record(payload_length);
311        let can_send_record = self.queued.len() < self.max_request_bytes;
312
313        if !can_post_record || !can_send_record || !can_batch_record {
314            debug!(
315                "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})",
316                can_post_record, can_send_record, can_batch_record
317            );
318            // "unwrite" the record.
319            self.queued.truncate(item_start);
320            // Flush whatever we have queued.
321            self.flush(!can_batch_record)?;
322            // And write it again.
323            let c = if self.queued.is_empty() { b'[' } else { b',' };
324            self.queued.push(c);
325            serde_json::to_writer(&mut self.queued, &record).unwrap();
326        }
327
328        self.post_limits.record_added(payload_length);
329        self.batch_limits.record_added(payload_length);
330
331        Ok(true)
332    }
333
334    pub fn flush(&mut self, want_commit: bool) -> Result<()> {
335        if self.queued.is_empty() {
336            assert!(
337                !self.in_batch(),
338                "Bug: Somehow we're in a batch but have no queued records"
339            );
340            // Nothing to do!
341            return Ok(());
342        }
343
344        self.queued.push(b']');
345        let batch_id = match &self.batch {
346            // Not the first post and we know we have no batch semantics.
347            BatchState::Unsupported => None,
348            // First commit in possible batch
349            BatchState::NoBatch => Some("true".into()),
350            // In a batch and we have a batch id.
351            BatchState::InBatch(ref s) => Some(s.clone()),
352        };
353
354        info!(
355            "Posting {} records of {} bytes",
356            self.post_limits.cur_records,
357            self.queued.len()
358        );
359
360        let is_commit = want_commit && batch_id.is_some();
361        // Weird syntax for calling a function object that is a property.
362        let resp_or_error = self.poster.post(
363            self.queued.clone(),
364            self.last_modified,
365            batch_id,
366            is_commit,
367            self,
368        );
369
370        self.queued.truncate(0);
371
372        if want_commit || self.batch == BatchState::Unsupported {
373            self.batch_limits.clear();
374        }
375        self.post_limits.clear();
376
377        let resp = resp_or_error?;
378
379        let (status, last_modified, record) = match resp {
380            Sync15ClientResponse::Success {
381                status,
382                last_modified,
383                ref record,
384                ..
385            } => (status, last_modified, record),
386            _ => {
387                self.on_response.handle_response(resp, !want_commit)?;
388                // on_response() should always fail!
389                unreachable!();
390            }
391        };
392
393        if want_commit || self.batch == BatchState::Unsupported {
394            self.last_modified = last_modified;
395        }
396
397        if want_commit {
398            debug!("Committed batch {:?}", self.batch);
399            self.batch = BatchState::NoBatch;
400            self.on_response.handle_response(resp, false)?;
401            return Ok(());
402        }
403
404        if status != status_codes::ACCEPTED {
405            if self.in_batch() {
406                return Err(ErrorKind::ServerBatchProblem(
407                    "Server responded non-202 success code while a batch was in progress",
408                ));
409            }
410            self.last_modified = last_modified;
411            self.batch = BatchState::Unsupported;
412            self.batch_limits.clear();
413            self.on_response.handle_response(resp, false)?;
414            return Ok(());
415        }
416
417        let batch_id = record
418            .batch
419            .as_ref()
420            .ok_or({
421                ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID")
422            })?
423            .clone();
424
425        match &self.batch {
426            BatchState::Unsupported => {
427                warn!("Server changed its mind about supporting batching mid-batch...");
428            }
429
430            BatchState::InBatch(ref cur_id) => {
431                if cur_id != &batch_id {
432                    return Err(ErrorKind::ServerBatchProblem(
433                        "Invalid server response: 202 without a batch ID",
434                    ));
435                }
436            }
437            _ => {}
438        }
439
440        // Can't change this in match arms without NLL
441        self.batch = BatchState::InBatch(batch_id);
442        self.last_modified = last_modified;
443
444        self.on_response.handle_response(resp, true)?;
445
446        Ok(())
447    }
448}
449
450#[derive(Clone)]
451pub struct UploadInfo {
452    pub successful_ids: Vec<Guid>,
453    pub failed_ids: Vec<Guid>,
454    pub modified_timestamp: ServerTimestamp,
455}
456
457impl<Poster> PostQueue<Poster, NormalResponseHandler> {
458    // TODO: should take by move
459    pub fn completed_upload_info(&mut self) -> UploadInfo {
460        let mut result = UploadInfo {
461            successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()),
462            failed_ids: Vec::with_capacity(
463                self.on_response.failed_ids.len()
464                    + self.on_response.pending_failed.len()
465                    + self.on_response.pending_success.len(),
466            ),
467            modified_timestamp: self.last_modified,
468        };
469
470        result
471            .successful_ids
472            .append(&mut self.on_response.successful_ids);
473
474        result.failed_ids.append(&mut self.on_response.failed_ids);
475        result
476            .failed_ids
477            .append(&mut self.on_response.pending_failed);
478        result
479            .failed_ids
480            .append(&mut self.on_response.pending_success);
481
482        result
483    }
484}
485
486#[cfg(test)]
487mod test {
488    use super::*;
489    use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope};
490    use crate::EncryptedPayload;
491    use lazy_static::lazy_static;
492    use std::cell::RefCell;
493    use std::collections::VecDeque;
494    use std::rc::Rc;
495
496    #[derive(Debug, Clone)]
497    struct PostedData {
498        body: String,
499        _xius: ServerTimestamp,
500        batch: Option<String>,
501        commit: bool,
502        payload_bytes: usize,
503        records: usize,
504    }
505
506    impl PostedData {
507        fn records_as_json(&self) -> Vec<serde_json::Value> {
508            let values =
509                serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json");
510            // Check that they actually deserialize as what we want
511            let records_or_err =
512                serde_json::from_value::<Vec<IncomingEncryptedBso>>(values.clone());
513            records_or_err.expect("Failed to deserialize data");
514            serde_json::from_value(values).unwrap()
515        }
516    }
517
518    #[derive(Debug, Clone)]
519    struct BatchInfo {
520        id: Option<String>,
521        posts: Vec<PostedData>,
522        bytes: usize,
523        records: usize,
524    }
525
526    #[derive(Debug, Clone)]
527    struct TestPoster {
528        all_posts: Vec<PostedData>,
529        responses: VecDeque<PostResponse>,
530        batches: Vec<BatchInfo>,
531        cur_batch: Option<BatchInfo>,
532        cfg: InfoConfiguration,
533    }
534
535    type TestPosterRef = Rc<RefCell<TestPoster>>;
536    impl TestPoster {
537        pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef
538        where
539            T: Into<VecDeque<PostResponse>>,
540        {
541            Rc::new(RefCell::new(TestPoster {
542                all_posts: vec![],
543                responses: responses.into(),
544                batches: vec![],
545                cur_batch: None,
546                cfg: cfg.clone(),
547            }))
548        }
549        // Adds &mut
550        fn do_post<T, O>(
551            &mut self,
552            body: &[u8],
553            xius: ServerTimestamp,
554            batch: Option<String>,
555            commit: bool,
556            queue: &PostQueue<T, O>,
557        ) -> Sync15ClientResponse<UploadResult> {
558            let mut post = PostedData {
559                body: String::from_utf8(body.into()).expect("Posted invalid utf8..."),
560                batch: batch.clone(),
561                _xius: xius,
562                commit,
563                payload_bytes: 0,
564                records: 0,
565            };
566
567            assert!(body.len() <= self.cfg.max_request_bytes);
568
569            let (num_records, record_payload_bytes) = {
570                let recs = post.records_as_json();
571                assert!(recs.len() <= self.cfg.max_post_records);
572                assert!(recs.len() <= self.cfg.max_total_records);
573                let payload_bytes: usize = recs
574                    .iter()
575                    .map(|r| {
576                        let len = r["payload"]
577                            .as_str()
578                            .expect("Non string payload property")
579                            .len();
580                        assert!(len <= self.cfg.max_record_payload_bytes);
581                        len
582                    })
583                    .sum();
584                assert!(payload_bytes <= self.cfg.max_post_bytes);
585                assert!(payload_bytes <= self.cfg.max_total_bytes);
586
587                assert_eq!(queue.post_limits.cur_bytes, payload_bytes);
588                assert_eq!(queue.post_limits.cur_records, recs.len());
589                (recs.len(), payload_bytes)
590            };
591            post.payload_bytes = record_payload_bytes;
592            post.records = num_records;
593
594            self.all_posts.push(post.clone());
595            let response = self.responses.pop_front().unwrap();
596
597            let record = match response {
598                Sync15ClientResponse::Success { ref record, .. } => record,
599                _ => {
600                    panic!("only success codes are used in this test");
601                }
602            };
603
604            if self.cur_batch.is_none() {
605                assert!(
606                    batch.is_none() || batch == Some("true".into()),
607                    "We shouldn't be in a batch now"
608                );
609                self.cur_batch = Some(BatchInfo {
610                    id: record.batch.clone(),
611                    posts: vec![],
612                    records: 0,
613                    bytes: 0,
614                });
615            } else {
616                assert_eq!(
617                    batch,
618                    self.cur_batch.as_ref().unwrap().id,
619                    "We're in a batch but got the wrong batch id"
620                );
621            }
622
623            {
624                let batch = self.cur_batch.as_mut().unwrap();
625                batch.posts.push(post);
626                batch.records += num_records;
627                batch.bytes += record_payload_bytes;
628
629                assert!(batch.bytes <= self.cfg.max_total_bytes);
630                assert!(batch.records <= self.cfg.max_total_records);
631
632                assert_eq!(batch.records, queue.batch_limits.cur_records);
633                assert_eq!(batch.bytes, queue.batch_limits.cur_bytes);
634            }
635
636            if commit || record.batch.is_none() {
637                let batch = self.cur_batch.take().unwrap();
638                self.batches.push(batch);
639            }
640
641            response
642        }
643
644        fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) {
645            assert_eq!(mid_batch, self.cur_batch.is_some());
646        }
647    }
648    impl BatchPoster for TestPosterRef {
649        fn post<T, O>(
650            &self,
651            body: Vec<u8>,
652            xius: ServerTimestamp,
653            batch: Option<String>,
654            commit: bool,
655            queue: &PostQueue<T, O>,
656        ) -> Result<PostResponse> {
657            Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue))
658        }
659    }
660
661    impl PostResponseHandler for TestPosterRef {
662        fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> {
663            self.borrow_mut().do_handle_response(r, mid_batch);
664            Ok(())
665        }
666    }
667
668    type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>;
669
670    fn pq_test_setup(
671        cfg: InfoConfiguration,
672        lm: i64,
673        resps: Vec<PostResponse>,
674    ) -> (MockedPostQueue, TestPosterRef) {
675        let tester = TestPoster::new(&cfg, resps);
676        let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone());
677        (pq, tester)
678    }
679
680    fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse {
681        assert!(status_codes::is_success_code(status));
682        Sync15ClientResponse::Success {
683            status,
684            last_modified: ServerTimestamp(lm),
685            record: UploadResult {
686                batch: batch.into().map(Into::into),
687                failed: HashMap::new(),
688                success: vec![],
689            },
690            route: "test/path".into(),
691        }
692    }
693
694    lazy_static! {
695        // ~40b
696        static ref PAYLOAD_OVERHEAD: usize = {
697            let payload = EncryptedPayload {
698                iv: "".into(),
699                hmac: "".into(),
700                ciphertext: "".into()
701            };
702            serde_json::to_string(&payload).unwrap().len()
703        };
704        // ~80b
705        static ref TOTAL_RECORD_OVERHEAD: usize = {
706            let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope {
707                    id: "".into(),
708                    sortindex: None,
709                    ttl: None,
710                },
711                EncryptedPayload {
712                    iv: "".into(),
713                    hmac: "".into(),
714                    ciphertext: "".into()
715                },
716            )).unwrap();
717            serde_json::to_string(&val).unwrap().len()
718        };
719        // There's some subtlety in how we calculate this having to do with the fact that
720        // the quotes in the payload are escaped but the escape chars count to the request len
721        // and *not* to the payload len (the payload len check happens after json parsing the
722        // top level object).
723        static ref NON_PAYLOAD_OVERHEAD: usize = {
724            *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD
725        };
726    }
727
728    // Actual record size (for max_request_len) will be larger by some amount
729    fn make_record(payload_size: usize) -> OutgoingEncryptedBso {
730        assert!(payload_size > *PAYLOAD_OVERHEAD);
731        let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD;
732        OutgoingEncryptedBso::new(
733            OutgoingEnvelope {
734                id: "".into(),
735                sortindex: None,
736                ttl: None,
737            },
738            EncryptedPayload {
739                iv: "".into(),
740                hmac: "".into(),
741                ciphertext: "x".repeat(ciphertext_len),
742            },
743        )
744    }
745
746    fn request_bytes_for_payloads(payloads: &[usize]) -> usize {
747        1 + payloads
748            .iter()
749            .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD)
750            .sum::<usize>()
751    }
752
753    #[test]
754    fn test_pq_basic() {
755        let cfg = InfoConfiguration {
756            max_request_bytes: 1000,
757            max_record_payload_bytes: 1000,
758            ..InfoConfiguration::default()
759        };
760        let time = 11_111_111_000;
761        let (mut pq, tester) = pq_test_setup(
762            cfg,
763            time,
764            vec![fake_response(status_codes::OK, time + 100_000, None)],
765        );
766
767        pq.enqueue(&make_record(100)).unwrap();
768        pq.flush(true).unwrap();
769
770        let t = tester.borrow();
771        assert!(t.cur_batch.is_none());
772        assert_eq!(t.all_posts.len(), 1);
773        assert_eq!(t.batches.len(), 1);
774        assert_eq!(t.batches[0].posts.len(), 1);
775        assert_eq!(t.batches[0].records, 1);
776        assert_eq!(t.batches[0].bytes, 100);
777        assert_eq!(
778            t.batches[0].posts[0].body.len(),
779            request_bytes_for_payloads(&[100])
780        );
781    }
782
783    #[test]
784    fn test_pq_max_request_bytes_no_batch() {
785        let cfg = InfoConfiguration {
786            max_request_bytes: 250,
787            ..InfoConfiguration::default()
788        };
789        let time = 11_111_111_000;
790        let (mut pq, tester) = pq_test_setup(
791            cfg,
792            time,
793            vec![
794                fake_response(status_codes::OK, time + 100_000, None),
795                fake_response(status_codes::OK, time + 200_000, None),
796            ],
797        );
798
799        // Note that the total record overhead is around 85 bytes
800        let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
801        pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
802        pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r]
803        pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post.
804        pq.flush(true).unwrap();
805
806        let t = tester.borrow();
807        assert!(t.cur_batch.is_none());
808        assert_eq!(t.all_posts.len(), 2);
809        assert_eq!(t.batches.len(), 2);
810        assert_eq!(t.batches[0].posts.len(), 1);
811        assert_eq!(t.batches[0].records, 2);
812        assert_eq!(t.batches[0].bytes, payload_size * 2);
813        assert_eq!(t.batches[0].posts[0].batch, Some("true".into()));
814        assert_eq!(
815            t.batches[0].posts[0].body.len(),
816            request_bytes_for_payloads(&[payload_size, payload_size])
817        );
818
819        assert_eq!(t.batches[1].posts.len(), 1);
820        assert_eq!(t.batches[1].records, 1);
821        assert_eq!(t.batches[1].bytes, payload_size);
822        // We know at this point that the server does not support batching.
823        assert_eq!(t.batches[1].posts[0].batch, None);
824        assert!(!t.batches[1].posts[0].commit);
825        assert_eq!(
826            t.batches[1].posts[0].body.len(),
827            request_bytes_for_payloads(&[payload_size])
828        );
829    }
830
831    #[test]
832    fn test_pq_max_record_payload_bytes_no_batch() {
833        let cfg = InfoConfiguration {
834            max_record_payload_bytes: 150,
835            max_request_bytes: 350,
836            ..InfoConfiguration::default()
837        };
838        let time = 11_111_111_000;
839        let (mut pq, tester) = pq_test_setup(
840            cfg,
841            time,
842            vec![
843                fake_response(status_codes::OK, time + 100_000, None),
844                fake_response(status_codes::OK, time + 200_000, None),
845            ],
846        );
847
848        // Note that the total record overhead is around 85 bytes
849        let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
850        pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
851        let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102
852        assert!(!enqueued, "Should not have fit");
853        pq.enqueue(&make_record(payload_size)).unwrap();
854        pq.flush(true).unwrap();
855
856        let t = tester.borrow();
857        assert!(t.cur_batch.is_none());
858        assert_eq!(t.all_posts.len(), 1);
859        assert_eq!(t.batches.len(), 1);
860        assert_eq!(t.batches[0].posts.len(), 1);
861        assert_eq!(t.batches[0].records, 2);
862        assert_eq!(t.batches[0].bytes, payload_size * 2);
863        assert_eq!(
864            t.batches[0].posts[0].body.len(),
865            request_bytes_for_payloads(&[payload_size, payload_size])
866        );
867    }
868
869    #[test]
870    fn test_pq_single_batch() {
871        let cfg = InfoConfiguration::default();
872        let time = 11_111_111_000;
873        let (mut pq, tester) = pq_test_setup(
874            cfg,
875            time,
876            vec![fake_response(
877                status_codes::ACCEPTED,
878                time + 100_000,
879                Some("1234"),
880            )],
881        );
882
883        let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
884        pq.enqueue(&make_record(payload_size)).unwrap();
885        pq.enqueue(&make_record(payload_size)).unwrap();
886        pq.enqueue(&make_record(payload_size)).unwrap();
887        pq.flush(true).unwrap();
888
889        let t = tester.borrow();
890        assert!(t.cur_batch.is_none());
891        assert_eq!(t.all_posts.len(), 1);
892        assert_eq!(t.batches.len(), 1);
893        assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234");
894        assert_eq!(t.batches[0].posts.len(), 1);
895        assert_eq!(t.batches[0].records, 3);
896        assert_eq!(t.batches[0].bytes, payload_size * 3);
897        assert!(t.batches[0].posts[0].commit);
898        assert_eq!(
899            t.batches[0].posts[0].body.len(),
900            request_bytes_for_payloads(&[payload_size, payload_size, payload_size])
901        );
902    }
903
904    #[test]
905    fn test_pq_multi_post_batch_bytes() {
906        let cfg = InfoConfiguration {
907            max_post_bytes: 200,
908            ..InfoConfiguration::default()
909        };
910        let time = 11_111_111_000;
911        let (mut pq, tester) = pq_test_setup(
912            cfg,
913            time,
914            vec![
915                fake_response(status_codes::ACCEPTED, time, Some("1234")),
916                fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
917            ],
918        );
919
920        pq.enqueue(&make_record(100)).unwrap();
921        pq.enqueue(&make_record(100)).unwrap();
922        // POST
923        pq.enqueue(&make_record(100)).unwrap();
924        pq.flush(true).unwrap(); // COMMIT
925
926        let t = tester.borrow();
927        assert!(t.cur_batch.is_none());
928        assert_eq!(t.all_posts.len(), 2);
929        assert_eq!(t.batches.len(), 1);
930        assert_eq!(t.batches[0].posts.len(), 2);
931        assert_eq!(t.batches[0].records, 3);
932        assert_eq!(t.batches[0].bytes, 300);
933
934        assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
935        assert_eq!(t.batches[0].posts[0].records, 2);
936        assert_eq!(t.batches[0].posts[0].payload_bytes, 200);
937        assert!(!t.batches[0].posts[0].commit);
938        assert_eq!(
939            t.batches[0].posts[0].body.len(),
940            request_bytes_for_payloads(&[100, 100])
941        );
942
943        assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
944        assert_eq!(t.batches[0].posts[1].records, 1);
945        assert_eq!(t.batches[0].posts[1].payload_bytes, 100);
946        assert!(t.batches[0].posts[1].commit);
947        assert_eq!(
948            t.batches[0].posts[1].body.len(),
949            request_bytes_for_payloads(&[100])
950        );
951    }
952
953    #[test]
954    fn test_pq_multi_post_batch_records() {
955        let cfg = InfoConfiguration {
956            max_post_records: 3,
957            ..InfoConfiguration::default()
958        };
959        let time = 11_111_111_000;
960        let (mut pq, tester) = pq_test_setup(
961            cfg,
962            time,
963            vec![
964                fake_response(status_codes::ACCEPTED, time, Some("1234")),
965                fake_response(status_codes::ACCEPTED, time, Some("1234")),
966                fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
967            ],
968        );
969
970        pq.enqueue(&make_record(100)).unwrap();
971        pq.enqueue(&make_record(100)).unwrap();
972        pq.enqueue(&make_record(100)).unwrap();
973        // POST
974        pq.enqueue(&make_record(100)).unwrap();
975        pq.enqueue(&make_record(100)).unwrap();
976        pq.enqueue(&make_record(100)).unwrap();
977        // POST
978        pq.enqueue(&make_record(100)).unwrap();
979        pq.flush(true).unwrap(); // COMMIT
980
981        let t = tester.borrow();
982        assert!(t.cur_batch.is_none());
983        assert_eq!(t.all_posts.len(), 3);
984        assert_eq!(t.batches.len(), 1);
985        assert_eq!(t.batches[0].posts.len(), 3);
986        assert_eq!(t.batches[0].records, 7);
987        assert_eq!(t.batches[0].bytes, 700);
988
989        assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
990        assert_eq!(t.batches[0].posts[0].records, 3);
991        assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
992        assert!(!t.batches[0].posts[0].commit);
993        assert_eq!(
994            t.batches[0].posts[0].body.len(),
995            request_bytes_for_payloads(&[100, 100, 100])
996        );
997
998        assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
999        assert_eq!(t.batches[0].posts[1].records, 3);
1000        assert_eq!(t.batches[0].posts[1].payload_bytes, 300);
1001        assert!(!t.batches[0].posts[1].commit);
1002        assert_eq!(
1003            t.batches[0].posts[1].body.len(),
1004            request_bytes_for_payloads(&[100, 100, 100])
1005        );
1006
1007        assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234");
1008        assert_eq!(t.batches[0].posts[2].records, 1);
1009        assert_eq!(t.batches[0].posts[2].payload_bytes, 100);
1010        assert!(t.batches[0].posts[2].commit);
1011        assert_eq!(
1012            t.batches[0].posts[2].body.len(),
1013            request_bytes_for_payloads(&[100])
1014        );
1015    }
1016
1017    #[test]
1018    #[allow(clippy::cognitive_complexity)]
1019    fn test_pq_multi_post_multi_batch_records() {
1020        let cfg = InfoConfiguration {
1021            max_post_records: 3,
1022            max_total_records: 5,
1023            ..InfoConfiguration::default()
1024        };
1025        let time = 11_111_111_000;
1026        let (mut pq, tester) = pq_test_setup(
1027            cfg,
1028            time,
1029            vec![
1030                fake_response(status_codes::ACCEPTED, time, Some("1234")),
1031                fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
1032                fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
1033                fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")),
1034            ],
1035        );
1036
1037        pq.enqueue(&make_record(100)).unwrap();
1038        pq.enqueue(&make_record(100)).unwrap();
1039        pq.enqueue(&make_record(100)).unwrap();
1040        // POST
1041        pq.enqueue(&make_record(100)).unwrap();
1042        pq.enqueue(&make_record(100)).unwrap();
1043        // POST + COMMIT
1044        pq.enqueue(&make_record(100)).unwrap();
1045        pq.enqueue(&make_record(100)).unwrap();
1046        pq.enqueue(&make_record(100)).unwrap();
1047        // POST
1048        pq.enqueue(&make_record(100)).unwrap();
1049        pq.flush(true).unwrap(); // COMMIT
1050
1051        let t = tester.borrow();
1052        assert!(t.cur_batch.is_none());
1053        assert_eq!(t.all_posts.len(), 4);
1054        assert_eq!(t.batches.len(), 2);
1055        assert_eq!(t.batches[0].posts.len(), 2);
1056        assert_eq!(t.batches[1].posts.len(), 2);
1057
1058        assert_eq!(t.batches[0].records, 5);
1059        assert_eq!(t.batches[1].records, 4);
1060
1061        assert_eq!(t.batches[0].bytes, 500);
1062        assert_eq!(t.batches[1].bytes, 400);
1063
1064        assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
1065        assert_eq!(t.batches[0].posts[0].records, 3);
1066        assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
1067        assert!(!t.batches[0].posts[0].commit);
1068        assert_eq!(
1069            t.batches[0].posts[0].body.len(),
1070            request_bytes_for_payloads(&[100, 100, 100])
1071        );
1072
1073        assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
1074        assert_eq!(t.batches[0].posts[1].records, 2);
1075        assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
1076        assert!(t.batches[0].posts[1].commit);
1077        assert_eq!(
1078            t.batches[0].posts[1].body.len(),
1079            request_bytes_for_payloads(&[100, 100])
1080        );
1081
1082        assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
1083        assert_eq!(t.batches[1].posts[0].records, 3);
1084        assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
1085        assert!(!t.batches[1].posts[0].commit);
1086        assert_eq!(
1087            t.batches[1].posts[0].body.len(),
1088            request_bytes_for_payloads(&[100, 100, 100])
1089        );
1090
1091        assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
1092        assert_eq!(t.batches[1].posts[1].records, 1);
1093        assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
1094        assert!(t.batches[1].posts[1].commit);
1095        assert_eq!(
1096            t.batches[1].posts[1].body.len(),
1097            request_bytes_for_payloads(&[100])
1098        );
1099    }
1100
1101    #[test]
1102    #[allow(clippy::cognitive_complexity)]
1103    fn test_pq_multi_post_multi_batch_bytes() {
1104        let cfg = InfoConfiguration {
1105            max_post_bytes: 300,
1106            max_total_bytes: 500,
1107            ..InfoConfiguration::default()
1108        };
1109        let time = 11_111_111_000;
1110        let (mut pq, tester) = pq_test_setup(
1111            cfg,
1112            time,
1113            vec![
1114                fake_response(status_codes::ACCEPTED, time, Some("1234")),
1115                fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit
1116                fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
1117                fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit
1118            ],
1119        );
1120
1121        pq.enqueue(&make_record(100)).unwrap();
1122        pq.enqueue(&make_record(100)).unwrap();
1123        pq.enqueue(&make_record(100)).unwrap();
1124        assert_eq!(pq.last_modified.0, time);
1125        // POST
1126        pq.enqueue(&make_record(100)).unwrap();
1127        pq.enqueue(&make_record(100)).unwrap();
1128        // POST + COMMIT
1129        pq.enqueue(&make_record(100)).unwrap();
1130        assert_eq!(pq.last_modified.0, time + 100_000);
1131        pq.enqueue(&make_record(100)).unwrap();
1132        pq.enqueue(&make_record(100)).unwrap();
1133
1134        // POST
1135        pq.enqueue(&make_record(100)).unwrap();
1136        assert_eq!(pq.last_modified.0, time + 100_000);
1137        pq.flush(true).unwrap(); // COMMIT
1138
1139        assert_eq!(pq.last_modified.0, time + 200_000);
1140
1141        let t = tester.borrow();
1142        assert!(t.cur_batch.is_none());
1143        assert_eq!(t.all_posts.len(), 4);
1144        assert_eq!(t.batches.len(), 2);
1145        assert_eq!(t.batches[0].posts.len(), 2);
1146        assert_eq!(t.batches[1].posts.len(), 2);
1147
1148        assert_eq!(t.batches[0].records, 5);
1149        assert_eq!(t.batches[1].records, 4);
1150
1151        assert_eq!(t.batches[0].bytes, 500);
1152        assert_eq!(t.batches[1].bytes, 400);
1153
1154        assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
1155        assert_eq!(t.batches[0].posts[0].records, 3);
1156        assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
1157        assert!(!t.batches[0].posts[0].commit);
1158        assert_eq!(
1159            t.batches[0].posts[0].body.len(),
1160            request_bytes_for_payloads(&[100, 100, 100])
1161        );
1162
1163        assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
1164        assert_eq!(t.batches[0].posts[1].records, 2);
1165        assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
1166        assert!(t.batches[0].posts[1].commit);
1167        assert_eq!(
1168            t.batches[0].posts[1].body.len(),
1169            request_bytes_for_payloads(&[100, 100])
1170        );
1171
1172        assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
1173        assert_eq!(t.batches[1].posts[0].records, 3);
1174        assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
1175        assert!(!t.batches[1].posts[0].commit);
1176        assert_eq!(
1177            t.batches[1].posts[0].body.len(),
1178            request_bytes_for_payloads(&[100, 100, 100])
1179        );
1180
1181        assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
1182        assert_eq!(t.batches[1].posts[1].records, 1);
1183        assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
1184        assert!(t.batches[1].posts[1].commit);
1185        assert_eq!(
1186            t.batches[1].posts[1].body.len(),
1187            request_bytes_for_payloads(&[100])
1188        );
1189    }
1190
1191    // TODO: Test
1192    //
1193    // - error cases!!! We don't test our handling of server errors at all!
1194    // - mixed bytes/record limits
1195    //
1196    // A lot of these have good examples in test_postqueue.js on deskftop sync
1197}