1use super::storage_client::Sync15ClientResponse;
6use crate::bso::OutgoingEncryptedBso;
7use crate::error::{self, debug, info, warn, Error as ErrorKind, Result};
8use crate::ServerTimestamp;
9use serde_derive::*;
10use std::collections::HashMap;
11use std::default::Default;
12use std::ops::Deref;
13use sync_guid::Guid;
14use viaduct::status_codes;
15
16#[derive(Debug, Clone)]
19struct LimitTracker {
20 max_bytes: usize,
21 max_records: usize,
22 cur_bytes: usize,
23 cur_records: usize,
24}
25
26impl LimitTracker {
27 pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker {
28 LimitTracker {
29 max_bytes,
30 max_records,
31 cur_bytes: 0,
32 cur_records: 0,
33 }
34 }
35
36 pub fn clear(&mut self) {
37 self.cur_records = 0;
38 self.cur_bytes = 0;
39 }
40
41 pub fn can_add_record(&self, payload_size: usize) -> bool {
42 self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes
45 }
46
47 pub fn can_never_add(&self, record_size: usize) -> bool {
48 record_size >= self.max_bytes
49 }
50
51 pub fn record_added(&mut self, record_size: usize) {
52 assert!(
53 self.can_add_record(record_size),
54 "LimitTracker::record_added caller must check can_add_record"
55 );
56 self.cur_records += 1;
57 self.cur_bytes += record_size;
58 }
59}
60
61#[derive(Serialize, Deserialize, Debug, Clone)]
62pub struct InfoConfiguration {
63 #[serde(default = "default_max_request_bytes")]
66 pub max_request_bytes: usize,
67
68 #[serde(default = "usize::max_value")]
70 pub max_post_records: usize,
71
72 #[serde(default = "usize::max_value")]
75 pub max_post_bytes: usize,
76
77 #[serde(default = "usize::max_value")]
80 pub max_total_records: usize,
81
82 #[serde(default = "usize::max_value")]
85 pub max_total_bytes: usize,
86
87 #[serde(default = "default_max_record_payload_bytes")]
89 pub max_record_payload_bytes: usize,
90}
91
92fn default_max_request_bytes() -> usize {
94 260 * 1024
95}
96fn default_max_record_payload_bytes() -> usize {
97 256 * 1024
98}
99
100impl Default for InfoConfiguration {
101 #[inline]
102 fn default() -> InfoConfiguration {
103 InfoConfiguration {
104 max_request_bytes: default_max_request_bytes(),
105 max_record_payload_bytes: default_max_record_payload_bytes(),
106 max_post_records: usize::MAX,
107 max_post_bytes: usize::MAX,
108 max_total_records: usize::MAX,
109 max_total_bytes: usize::MAX,
110 }
111 }
112}
113
114#[derive(Clone, Debug, Default, Deserialize, Serialize)]
115pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>);
116
117impl InfoCollections {
118 pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections {
119 InfoCollections(collections)
120 }
121}
122
123impl Deref for InfoCollections {
124 type Target = HashMap<String, ServerTimestamp>;
125
126 fn deref(&self) -> &HashMap<String, ServerTimestamp> {
127 &self.0
128 }
129}
130
131#[derive(Debug, Clone, Deserialize)]
132pub struct UploadResult {
133 batch: Option<String>,
134 #[serde(default = "HashMap::new")]
136 pub failed: HashMap<Guid, String>,
137 #[serde(default = "Vec::new")]
139 pub success: Vec<Guid>,
140}
141
142pub type PostResponse = Sync15ClientResponse<UploadResult>;
143
144#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
145pub enum BatchState {
146 Unsupported,
147 NoBatch,
148 InBatch(String),
149}
150
151#[derive(Debug)]
152pub struct PostQueue<Post, OnResponse> {
153 poster: Post,
154 on_response: OnResponse,
155 post_limits: LimitTracker,
156 batch_limits: LimitTracker,
157 max_payload_bytes: usize,
158 max_request_bytes: usize,
159 queued: Vec<u8>,
160 batch: BatchState,
161 last_modified: ServerTimestamp,
162}
163
164pub trait BatchPoster {
165 fn post<P, O>(
168 &self,
169 body: Vec<u8>,
170 xius: ServerTimestamp,
171 batch: Option<String>,
172 commit: bool,
173 queue: &PostQueue<P, O>,
174 ) -> Result<PostResponse>;
175}
176
177pub trait PostResponseHandler {
181 fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>;
182}
183
184#[derive(Debug, Clone)]
185pub(crate) struct NormalResponseHandler {
186 pub failed_ids: Vec<Guid>,
187 pub successful_ids: Vec<Guid>,
188 pub allow_failed: bool,
189 pub pending_failed: Vec<Guid>,
190 pub pending_success: Vec<Guid>,
191}
192
193impl NormalResponseHandler {
194 pub fn new(allow_failed: bool) -> NormalResponseHandler {
195 NormalResponseHandler {
196 failed_ids: vec![],
197 successful_ids: vec![],
198 pending_failed: vec![],
199 pending_success: vec![],
200 allow_failed,
201 }
202 }
203}
204
205impl PostResponseHandler for NormalResponseHandler {
206 fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> {
207 match r {
208 Sync15ClientResponse::Success { record, .. } => {
209 if !record.failed.is_empty() && !self.allow_failed {
210 return Err(ErrorKind::RecordUploadFailed);
211 }
212 for id in record.success.iter() {
213 self.pending_success.push(id.clone());
214 }
215 for kv in record.failed.iter() {
216 self.pending_failed.push(kv.0.clone());
217 }
218 if !mid_batch {
219 self.successful_ids.append(&mut self.pending_success);
220 self.failed_ids.append(&mut self.pending_failed);
221 }
222 Ok(())
223 }
224 _ => Err(r.create_storage_error()),
225 }
226 }
227}
228
229impl<Poster, OnResponse> PostQueue<Poster, OnResponse>
230where
231 Poster: BatchPoster,
232 OnResponse: PostResponseHandler,
233{
234 pub fn new(
235 config: &InfoConfiguration,
236 ts: ServerTimestamp,
237 poster: Poster,
238 on_response: OnResponse,
239 ) -> PostQueue<Poster, OnResponse> {
240 PostQueue {
241 poster,
242 on_response,
243 last_modified: ts,
244 post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records),
245 batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records),
246 batch: BatchState::NoBatch,
247 max_payload_bytes: config.max_record_payload_bytes,
248 max_request_bytes: config.max_request_bytes,
249 queued: Vec::new(),
250 }
251 }
252
253 #[inline]
254 fn in_batch(&self) -> bool {
255 !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch)
256 }
257
258 pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result<bool> {
259 let payload_length = record.serialized_payload_len();
260
261 if self.post_limits.can_never_add(payload_length)
262 || self.batch_limits.can_never_add(payload_length)
263 || payload_length >= self.max_payload_bytes
264 {
265 warn!(
266 "Single record too large to submit to server ({} b)",
267 payload_length
268 );
269 return Ok(false);
270 }
271
272 let item_start = self.queued.len();
276
277 self.queued.reserve(payload_length + 2);
279
280 let c = if self.queued.is_empty() { b'[' } else { b',' };
283 self.queued.push(c);
284
285 serde_json::to_writer(&mut self.queued, &record).unwrap();
289
290 let item_end = self.queued.len();
291
292 debug_assert!(
293 item_end >= payload_length,
294 "EncryptedPayload::serialized_len is bugged"
295 );
296
297 let item_len = item_end - item_start + 1;
299
300 if item_len >= self.max_request_bytes {
301 self.queued.truncate(item_start);
302 warn!(
303 "Single record too large to submit to server ({} b)",
304 item_len
305 );
306 return Ok(false);
307 }
308
309 let can_post_record = self.post_limits.can_add_record(payload_length);
310 let can_batch_record = self.batch_limits.can_add_record(payload_length);
311 let can_send_record = self.queued.len() < self.max_request_bytes;
312
313 if !can_post_record || !can_send_record || !can_batch_record {
314 debug!(
315 "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})",
316 can_post_record, can_send_record, can_batch_record
317 );
318 self.queued.truncate(item_start);
320 self.flush(!can_batch_record)?;
322 let c = if self.queued.is_empty() { b'[' } else { b',' };
324 self.queued.push(c);
325 serde_json::to_writer(&mut self.queued, &record).unwrap();
326 }
327
328 self.post_limits.record_added(payload_length);
329 self.batch_limits.record_added(payload_length);
330
331 Ok(true)
332 }
333
334 pub fn flush(&mut self, want_commit: bool) -> Result<()> {
335 if self.queued.is_empty() {
336 assert!(
337 !self.in_batch(),
338 "Bug: Somehow we're in a batch but have no queued records"
339 );
340 return Ok(());
342 }
343
344 self.queued.push(b']');
345 let batch_id = match &self.batch {
346 BatchState::Unsupported => None,
348 BatchState::NoBatch => Some("true".into()),
350 BatchState::InBatch(ref s) => Some(s.clone()),
352 };
353
354 info!(
355 "Posting {} records of {} bytes",
356 self.post_limits.cur_records,
357 self.queued.len()
358 );
359
360 let is_commit = want_commit && batch_id.is_some();
361 let resp_or_error = self.poster.post(
363 self.queued.clone(),
364 self.last_modified,
365 batch_id,
366 is_commit,
367 self,
368 );
369
370 self.queued.truncate(0);
371
372 if want_commit || self.batch == BatchState::Unsupported {
373 self.batch_limits.clear();
374 }
375 self.post_limits.clear();
376
377 let resp = resp_or_error?;
378
379 let (status, last_modified, record) = match resp {
380 Sync15ClientResponse::Success {
381 status,
382 last_modified,
383 ref record,
384 ..
385 } => (status, last_modified, record),
386 _ => {
387 self.on_response.handle_response(resp, !want_commit)?;
388 unreachable!();
390 }
391 };
392
393 if want_commit || self.batch == BatchState::Unsupported {
394 self.last_modified = last_modified;
395 }
396
397 if want_commit {
398 debug!("Committed batch {:?}", self.batch);
399 self.batch = BatchState::NoBatch;
400 self.on_response.handle_response(resp, false)?;
401 return Ok(());
402 }
403
404 if status != status_codes::ACCEPTED {
405 if self.in_batch() {
406 return Err(ErrorKind::ServerBatchProblem(
407 "Server responded non-202 success code while a batch was in progress",
408 ));
409 }
410 self.last_modified = last_modified;
411 self.batch = BatchState::Unsupported;
412 self.batch_limits.clear();
413 self.on_response.handle_response(resp, false)?;
414 return Ok(());
415 }
416
417 let batch_id = record
418 .batch
419 .as_ref()
420 .ok_or({
421 ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID")
422 })?
423 .clone();
424
425 match &self.batch {
426 BatchState::Unsupported => {
427 warn!("Server changed its mind about supporting batching mid-batch...");
428 }
429
430 BatchState::InBatch(ref cur_id) => {
431 if cur_id != &batch_id {
432 return Err(ErrorKind::ServerBatchProblem(
433 "Invalid server response: 202 without a batch ID",
434 ));
435 }
436 }
437 _ => {}
438 }
439
440 self.batch = BatchState::InBatch(batch_id);
442 self.last_modified = last_modified;
443
444 self.on_response.handle_response(resp, true)?;
445
446 Ok(())
447 }
448}
449
450#[derive(Clone)]
451pub struct UploadInfo {
452 pub successful_ids: Vec<Guid>,
453 pub failed_ids: Vec<Guid>,
454 pub modified_timestamp: ServerTimestamp,
455}
456
457impl<Poster> PostQueue<Poster, NormalResponseHandler> {
458 pub fn completed_upload_info(&mut self) -> UploadInfo {
460 let mut result = UploadInfo {
461 successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()),
462 failed_ids: Vec::with_capacity(
463 self.on_response.failed_ids.len()
464 + self.on_response.pending_failed.len()
465 + self.on_response.pending_success.len(),
466 ),
467 modified_timestamp: self.last_modified,
468 };
469
470 result
471 .successful_ids
472 .append(&mut self.on_response.successful_ids);
473
474 result.failed_ids.append(&mut self.on_response.failed_ids);
475 result
476 .failed_ids
477 .append(&mut self.on_response.pending_failed);
478 result
479 .failed_ids
480 .append(&mut self.on_response.pending_success);
481
482 result
483 }
484}
485
486#[cfg(test)]
487mod test {
488 use super::*;
489 use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope};
490 use crate::EncryptedPayload;
491 use lazy_static::lazy_static;
492 use std::cell::RefCell;
493 use std::collections::VecDeque;
494 use std::rc::Rc;
495
496 #[derive(Debug, Clone)]
497 struct PostedData {
498 body: String,
499 _xius: ServerTimestamp,
500 batch: Option<String>,
501 commit: bool,
502 payload_bytes: usize,
503 records: usize,
504 }
505
506 impl PostedData {
507 fn records_as_json(&self) -> Vec<serde_json::Value> {
508 let values =
509 serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json");
510 let records_or_err =
512 serde_json::from_value::<Vec<IncomingEncryptedBso>>(values.clone());
513 records_or_err.expect("Failed to deserialize data");
514 serde_json::from_value(values).unwrap()
515 }
516 }
517
518 #[derive(Debug, Clone)]
519 struct BatchInfo {
520 id: Option<String>,
521 posts: Vec<PostedData>,
522 bytes: usize,
523 records: usize,
524 }
525
526 #[derive(Debug, Clone)]
527 struct TestPoster {
528 all_posts: Vec<PostedData>,
529 responses: VecDeque<PostResponse>,
530 batches: Vec<BatchInfo>,
531 cur_batch: Option<BatchInfo>,
532 cfg: InfoConfiguration,
533 }
534
535 type TestPosterRef = Rc<RefCell<TestPoster>>;
536 impl TestPoster {
537 pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef
538 where
539 T: Into<VecDeque<PostResponse>>,
540 {
541 Rc::new(RefCell::new(TestPoster {
542 all_posts: vec![],
543 responses: responses.into(),
544 batches: vec![],
545 cur_batch: None,
546 cfg: cfg.clone(),
547 }))
548 }
549 fn do_post<T, O>(
551 &mut self,
552 body: &[u8],
553 xius: ServerTimestamp,
554 batch: Option<String>,
555 commit: bool,
556 queue: &PostQueue<T, O>,
557 ) -> Sync15ClientResponse<UploadResult> {
558 let mut post = PostedData {
559 body: String::from_utf8(body.into()).expect("Posted invalid utf8..."),
560 batch: batch.clone(),
561 _xius: xius,
562 commit,
563 payload_bytes: 0,
564 records: 0,
565 };
566
567 assert!(body.len() <= self.cfg.max_request_bytes);
568
569 let (num_records, record_payload_bytes) = {
570 let recs = post.records_as_json();
571 assert!(recs.len() <= self.cfg.max_post_records);
572 assert!(recs.len() <= self.cfg.max_total_records);
573 let payload_bytes: usize = recs
574 .iter()
575 .map(|r| {
576 let len = r["payload"]
577 .as_str()
578 .expect("Non string payload property")
579 .len();
580 assert!(len <= self.cfg.max_record_payload_bytes);
581 len
582 })
583 .sum();
584 assert!(payload_bytes <= self.cfg.max_post_bytes);
585 assert!(payload_bytes <= self.cfg.max_total_bytes);
586
587 assert_eq!(queue.post_limits.cur_bytes, payload_bytes);
588 assert_eq!(queue.post_limits.cur_records, recs.len());
589 (recs.len(), payload_bytes)
590 };
591 post.payload_bytes = record_payload_bytes;
592 post.records = num_records;
593
594 self.all_posts.push(post.clone());
595 let response = self.responses.pop_front().unwrap();
596
597 let record = match response {
598 Sync15ClientResponse::Success { ref record, .. } => record,
599 _ => {
600 panic!("only success codes are used in this test");
601 }
602 };
603
604 if self.cur_batch.is_none() {
605 assert!(
606 batch.is_none() || batch == Some("true".into()),
607 "We shouldn't be in a batch now"
608 );
609 self.cur_batch = Some(BatchInfo {
610 id: record.batch.clone(),
611 posts: vec![],
612 records: 0,
613 bytes: 0,
614 });
615 } else {
616 assert_eq!(
617 batch,
618 self.cur_batch.as_ref().unwrap().id,
619 "We're in a batch but got the wrong batch id"
620 );
621 }
622
623 {
624 let batch = self.cur_batch.as_mut().unwrap();
625 batch.posts.push(post);
626 batch.records += num_records;
627 batch.bytes += record_payload_bytes;
628
629 assert!(batch.bytes <= self.cfg.max_total_bytes);
630 assert!(batch.records <= self.cfg.max_total_records);
631
632 assert_eq!(batch.records, queue.batch_limits.cur_records);
633 assert_eq!(batch.bytes, queue.batch_limits.cur_bytes);
634 }
635
636 if commit || record.batch.is_none() {
637 let batch = self.cur_batch.take().unwrap();
638 self.batches.push(batch);
639 }
640
641 response
642 }
643
644 fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) {
645 assert_eq!(mid_batch, self.cur_batch.is_some());
646 }
647 }
648 impl BatchPoster for TestPosterRef {
649 fn post<T, O>(
650 &self,
651 body: Vec<u8>,
652 xius: ServerTimestamp,
653 batch: Option<String>,
654 commit: bool,
655 queue: &PostQueue<T, O>,
656 ) -> Result<PostResponse> {
657 Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue))
658 }
659 }
660
661 impl PostResponseHandler for TestPosterRef {
662 fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> {
663 self.borrow_mut().do_handle_response(r, mid_batch);
664 Ok(())
665 }
666 }
667
668 type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>;
669
670 fn pq_test_setup(
671 cfg: InfoConfiguration,
672 lm: i64,
673 resps: Vec<PostResponse>,
674 ) -> (MockedPostQueue, TestPosterRef) {
675 let tester = TestPoster::new(&cfg, resps);
676 let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone());
677 (pq, tester)
678 }
679
680 fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse {
681 assert!(status_codes::is_success_code(status));
682 Sync15ClientResponse::Success {
683 status,
684 last_modified: ServerTimestamp(lm),
685 record: UploadResult {
686 batch: batch.into().map(Into::into),
687 failed: HashMap::new(),
688 success: vec![],
689 },
690 route: "test/path".into(),
691 }
692 }
693
694 lazy_static! {
695 static ref PAYLOAD_OVERHEAD: usize = {
697 let payload = EncryptedPayload {
698 iv: "".into(),
699 hmac: "".into(),
700 ciphertext: "".into()
701 };
702 serde_json::to_string(&payload).unwrap().len()
703 };
704 static ref TOTAL_RECORD_OVERHEAD: usize = {
706 let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope {
707 id: "".into(),
708 sortindex: None,
709 ttl: None,
710 },
711 EncryptedPayload {
712 iv: "".into(),
713 hmac: "".into(),
714 ciphertext: "".into()
715 },
716 )).unwrap();
717 serde_json::to_string(&val).unwrap().len()
718 };
719 static ref NON_PAYLOAD_OVERHEAD: usize = {
724 *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD
725 };
726 }
727
728 fn make_record(payload_size: usize) -> OutgoingEncryptedBso {
730 assert!(payload_size > *PAYLOAD_OVERHEAD);
731 let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD;
732 OutgoingEncryptedBso::new(
733 OutgoingEnvelope {
734 id: "".into(),
735 sortindex: None,
736 ttl: None,
737 },
738 EncryptedPayload {
739 iv: "".into(),
740 hmac: "".into(),
741 ciphertext: "x".repeat(ciphertext_len),
742 },
743 )
744 }
745
746 fn request_bytes_for_payloads(payloads: &[usize]) -> usize {
747 1 + payloads
748 .iter()
749 .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD)
750 .sum::<usize>()
751 }
752
753 #[test]
754 fn test_pq_basic() {
755 let cfg = InfoConfiguration {
756 max_request_bytes: 1000,
757 max_record_payload_bytes: 1000,
758 ..InfoConfiguration::default()
759 };
760 let time = 11_111_111_000;
761 let (mut pq, tester) = pq_test_setup(
762 cfg,
763 time,
764 vec![fake_response(status_codes::OK, time + 100_000, None)],
765 );
766
767 pq.enqueue(&make_record(100)).unwrap();
768 pq.flush(true).unwrap();
769
770 let t = tester.borrow();
771 assert!(t.cur_batch.is_none());
772 assert_eq!(t.all_posts.len(), 1);
773 assert_eq!(t.batches.len(), 1);
774 assert_eq!(t.batches[0].posts.len(), 1);
775 assert_eq!(t.batches[0].records, 1);
776 assert_eq!(t.batches[0].bytes, 100);
777 assert_eq!(
778 t.batches[0].posts[0].body.len(),
779 request_bytes_for_payloads(&[100])
780 );
781 }
782
783 #[test]
784 fn test_pq_max_request_bytes_no_batch() {
785 let cfg = InfoConfiguration {
786 max_request_bytes: 250,
787 ..InfoConfiguration::default()
788 };
789 let time = 11_111_111_000;
790 let (mut pq, tester) = pq_test_setup(
791 cfg,
792 time,
793 vec![
794 fake_response(status_codes::OK, time + 100_000, None),
795 fake_response(status_codes::OK, time + 200_000, None),
796 ],
797 );
798
799 let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
801 pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.flush(true).unwrap();
805
806 let t = tester.borrow();
807 assert!(t.cur_batch.is_none());
808 assert_eq!(t.all_posts.len(), 2);
809 assert_eq!(t.batches.len(), 2);
810 assert_eq!(t.batches[0].posts.len(), 1);
811 assert_eq!(t.batches[0].records, 2);
812 assert_eq!(t.batches[0].bytes, payload_size * 2);
813 assert_eq!(t.batches[0].posts[0].batch, Some("true".into()));
814 assert_eq!(
815 t.batches[0].posts[0].body.len(),
816 request_bytes_for_payloads(&[payload_size, payload_size])
817 );
818
819 assert_eq!(t.batches[1].posts.len(), 1);
820 assert_eq!(t.batches[1].records, 1);
821 assert_eq!(t.batches[1].bytes, payload_size);
822 assert_eq!(t.batches[1].posts[0].batch, None);
824 assert!(!t.batches[1].posts[0].commit);
825 assert_eq!(
826 t.batches[1].posts[0].body.len(),
827 request_bytes_for_payloads(&[payload_size])
828 );
829 }
830
831 #[test]
832 fn test_pq_max_record_payload_bytes_no_batch() {
833 let cfg = InfoConfiguration {
834 max_record_payload_bytes: 150,
835 max_request_bytes: 350,
836 ..InfoConfiguration::default()
837 };
838 let time = 11_111_111_000;
839 let (mut pq, tester) = pq_test_setup(
840 cfg,
841 time,
842 vec![
843 fake_response(status_codes::OK, time + 100_000, None),
844 fake_response(status_codes::OK, time + 200_000, None),
845 ],
846 );
847
848 let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
850 pq.enqueue(&make_record(payload_size)).unwrap(); let enqueued = pq.enqueue(&make_record(151)).unwrap(); assert!(!enqueued, "Should not have fit");
853 pq.enqueue(&make_record(payload_size)).unwrap();
854 pq.flush(true).unwrap();
855
856 let t = tester.borrow();
857 assert!(t.cur_batch.is_none());
858 assert_eq!(t.all_posts.len(), 1);
859 assert_eq!(t.batches.len(), 1);
860 assert_eq!(t.batches[0].posts.len(), 1);
861 assert_eq!(t.batches[0].records, 2);
862 assert_eq!(t.batches[0].bytes, payload_size * 2);
863 assert_eq!(
864 t.batches[0].posts[0].body.len(),
865 request_bytes_for_payloads(&[payload_size, payload_size])
866 );
867 }
868
869 #[test]
870 fn test_pq_single_batch() {
871 let cfg = InfoConfiguration::default();
872 let time = 11_111_111_000;
873 let (mut pq, tester) = pq_test_setup(
874 cfg,
875 time,
876 vec![fake_response(
877 status_codes::ACCEPTED,
878 time + 100_000,
879 Some("1234"),
880 )],
881 );
882
883 let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
884 pq.enqueue(&make_record(payload_size)).unwrap();
885 pq.enqueue(&make_record(payload_size)).unwrap();
886 pq.enqueue(&make_record(payload_size)).unwrap();
887 pq.flush(true).unwrap();
888
889 let t = tester.borrow();
890 assert!(t.cur_batch.is_none());
891 assert_eq!(t.all_posts.len(), 1);
892 assert_eq!(t.batches.len(), 1);
893 assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234");
894 assert_eq!(t.batches[0].posts.len(), 1);
895 assert_eq!(t.batches[0].records, 3);
896 assert_eq!(t.batches[0].bytes, payload_size * 3);
897 assert!(t.batches[0].posts[0].commit);
898 assert_eq!(
899 t.batches[0].posts[0].body.len(),
900 request_bytes_for_payloads(&[payload_size, payload_size, payload_size])
901 );
902 }
903
904 #[test]
905 fn test_pq_multi_post_batch_bytes() {
906 let cfg = InfoConfiguration {
907 max_post_bytes: 200,
908 ..InfoConfiguration::default()
909 };
910 let time = 11_111_111_000;
911 let (mut pq, tester) = pq_test_setup(
912 cfg,
913 time,
914 vec![
915 fake_response(status_codes::ACCEPTED, time, Some("1234")),
916 fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
917 ],
918 );
919
920 pq.enqueue(&make_record(100)).unwrap();
921 pq.enqueue(&make_record(100)).unwrap();
922 pq.enqueue(&make_record(100)).unwrap();
924 pq.flush(true).unwrap(); let t = tester.borrow();
927 assert!(t.cur_batch.is_none());
928 assert_eq!(t.all_posts.len(), 2);
929 assert_eq!(t.batches.len(), 1);
930 assert_eq!(t.batches[0].posts.len(), 2);
931 assert_eq!(t.batches[0].records, 3);
932 assert_eq!(t.batches[0].bytes, 300);
933
934 assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
935 assert_eq!(t.batches[0].posts[0].records, 2);
936 assert_eq!(t.batches[0].posts[0].payload_bytes, 200);
937 assert!(!t.batches[0].posts[0].commit);
938 assert_eq!(
939 t.batches[0].posts[0].body.len(),
940 request_bytes_for_payloads(&[100, 100])
941 );
942
943 assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
944 assert_eq!(t.batches[0].posts[1].records, 1);
945 assert_eq!(t.batches[0].posts[1].payload_bytes, 100);
946 assert!(t.batches[0].posts[1].commit);
947 assert_eq!(
948 t.batches[0].posts[1].body.len(),
949 request_bytes_for_payloads(&[100])
950 );
951 }
952
953 #[test]
954 fn test_pq_multi_post_batch_records() {
955 let cfg = InfoConfiguration {
956 max_post_records: 3,
957 ..InfoConfiguration::default()
958 };
959 let time = 11_111_111_000;
960 let (mut pq, tester) = pq_test_setup(
961 cfg,
962 time,
963 vec![
964 fake_response(status_codes::ACCEPTED, time, Some("1234")),
965 fake_response(status_codes::ACCEPTED, time, Some("1234")),
966 fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
967 ],
968 );
969
970 pq.enqueue(&make_record(100)).unwrap();
971 pq.enqueue(&make_record(100)).unwrap();
972 pq.enqueue(&make_record(100)).unwrap();
973 pq.enqueue(&make_record(100)).unwrap();
975 pq.enqueue(&make_record(100)).unwrap();
976 pq.enqueue(&make_record(100)).unwrap();
977 pq.enqueue(&make_record(100)).unwrap();
979 pq.flush(true).unwrap(); let t = tester.borrow();
982 assert!(t.cur_batch.is_none());
983 assert_eq!(t.all_posts.len(), 3);
984 assert_eq!(t.batches.len(), 1);
985 assert_eq!(t.batches[0].posts.len(), 3);
986 assert_eq!(t.batches[0].records, 7);
987 assert_eq!(t.batches[0].bytes, 700);
988
989 assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
990 assert_eq!(t.batches[0].posts[0].records, 3);
991 assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
992 assert!(!t.batches[0].posts[0].commit);
993 assert_eq!(
994 t.batches[0].posts[0].body.len(),
995 request_bytes_for_payloads(&[100, 100, 100])
996 );
997
998 assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
999 assert_eq!(t.batches[0].posts[1].records, 3);
1000 assert_eq!(t.batches[0].posts[1].payload_bytes, 300);
1001 assert!(!t.batches[0].posts[1].commit);
1002 assert_eq!(
1003 t.batches[0].posts[1].body.len(),
1004 request_bytes_for_payloads(&[100, 100, 100])
1005 );
1006
1007 assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234");
1008 assert_eq!(t.batches[0].posts[2].records, 1);
1009 assert_eq!(t.batches[0].posts[2].payload_bytes, 100);
1010 assert!(t.batches[0].posts[2].commit);
1011 assert_eq!(
1012 t.batches[0].posts[2].body.len(),
1013 request_bytes_for_payloads(&[100])
1014 );
1015 }
1016
1017 #[test]
1018 #[allow(clippy::cognitive_complexity)]
1019 fn test_pq_multi_post_multi_batch_records() {
1020 let cfg = InfoConfiguration {
1021 max_post_records: 3,
1022 max_total_records: 5,
1023 ..InfoConfiguration::default()
1024 };
1025 let time = 11_111_111_000;
1026 let (mut pq, tester) = pq_test_setup(
1027 cfg,
1028 time,
1029 vec![
1030 fake_response(status_codes::ACCEPTED, time, Some("1234")),
1031 fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
1032 fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
1033 fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")),
1034 ],
1035 );
1036
1037 pq.enqueue(&make_record(100)).unwrap();
1038 pq.enqueue(&make_record(100)).unwrap();
1039 pq.enqueue(&make_record(100)).unwrap();
1040 pq.enqueue(&make_record(100)).unwrap();
1042 pq.enqueue(&make_record(100)).unwrap();
1043 pq.enqueue(&make_record(100)).unwrap();
1045 pq.enqueue(&make_record(100)).unwrap();
1046 pq.enqueue(&make_record(100)).unwrap();
1047 pq.enqueue(&make_record(100)).unwrap();
1049 pq.flush(true).unwrap(); let t = tester.borrow();
1052 assert!(t.cur_batch.is_none());
1053 assert_eq!(t.all_posts.len(), 4);
1054 assert_eq!(t.batches.len(), 2);
1055 assert_eq!(t.batches[0].posts.len(), 2);
1056 assert_eq!(t.batches[1].posts.len(), 2);
1057
1058 assert_eq!(t.batches[0].records, 5);
1059 assert_eq!(t.batches[1].records, 4);
1060
1061 assert_eq!(t.batches[0].bytes, 500);
1062 assert_eq!(t.batches[1].bytes, 400);
1063
1064 assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
1065 assert_eq!(t.batches[0].posts[0].records, 3);
1066 assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
1067 assert!(!t.batches[0].posts[0].commit);
1068 assert_eq!(
1069 t.batches[0].posts[0].body.len(),
1070 request_bytes_for_payloads(&[100, 100, 100])
1071 );
1072
1073 assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
1074 assert_eq!(t.batches[0].posts[1].records, 2);
1075 assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
1076 assert!(t.batches[0].posts[1].commit);
1077 assert_eq!(
1078 t.batches[0].posts[1].body.len(),
1079 request_bytes_for_payloads(&[100, 100])
1080 );
1081
1082 assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
1083 assert_eq!(t.batches[1].posts[0].records, 3);
1084 assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
1085 assert!(!t.batches[1].posts[0].commit);
1086 assert_eq!(
1087 t.batches[1].posts[0].body.len(),
1088 request_bytes_for_payloads(&[100, 100, 100])
1089 );
1090
1091 assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
1092 assert_eq!(t.batches[1].posts[1].records, 1);
1093 assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
1094 assert!(t.batches[1].posts[1].commit);
1095 assert_eq!(
1096 t.batches[1].posts[1].body.len(),
1097 request_bytes_for_payloads(&[100])
1098 );
1099 }
1100
1101 #[test]
1102 #[allow(clippy::cognitive_complexity)]
1103 fn test_pq_multi_post_multi_batch_bytes() {
1104 let cfg = InfoConfiguration {
1105 max_post_bytes: 300,
1106 max_total_bytes: 500,
1107 ..InfoConfiguration::default()
1108 };
1109 let time = 11_111_111_000;
1110 let (mut pq, tester) = pq_test_setup(
1111 cfg,
1112 time,
1113 vec![
1114 fake_response(status_codes::ACCEPTED, time, Some("1234")),
1115 fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
1117 fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), ],
1119 );
1120
1121 pq.enqueue(&make_record(100)).unwrap();
1122 pq.enqueue(&make_record(100)).unwrap();
1123 pq.enqueue(&make_record(100)).unwrap();
1124 assert_eq!(pq.last_modified.0, time);
1125 pq.enqueue(&make_record(100)).unwrap();
1127 pq.enqueue(&make_record(100)).unwrap();
1128 pq.enqueue(&make_record(100)).unwrap();
1130 assert_eq!(pq.last_modified.0, time + 100_000);
1131 pq.enqueue(&make_record(100)).unwrap();
1132 pq.enqueue(&make_record(100)).unwrap();
1133
1134 pq.enqueue(&make_record(100)).unwrap();
1136 assert_eq!(pq.last_modified.0, time + 100_000);
1137 pq.flush(true).unwrap(); assert_eq!(pq.last_modified.0, time + 200_000);
1140
1141 let t = tester.borrow();
1142 assert!(t.cur_batch.is_none());
1143 assert_eq!(t.all_posts.len(), 4);
1144 assert_eq!(t.batches.len(), 2);
1145 assert_eq!(t.batches[0].posts.len(), 2);
1146 assert_eq!(t.batches[1].posts.len(), 2);
1147
1148 assert_eq!(t.batches[0].records, 5);
1149 assert_eq!(t.batches[1].records, 4);
1150
1151 assert_eq!(t.batches[0].bytes, 500);
1152 assert_eq!(t.batches[1].bytes, 400);
1153
1154 assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
1155 assert_eq!(t.batches[0].posts[0].records, 3);
1156 assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
1157 assert!(!t.batches[0].posts[0].commit);
1158 assert_eq!(
1159 t.batches[0].posts[0].body.len(),
1160 request_bytes_for_payloads(&[100, 100, 100])
1161 );
1162
1163 assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
1164 assert_eq!(t.batches[0].posts[1].records, 2);
1165 assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
1166 assert!(t.batches[0].posts[1].commit);
1167 assert_eq!(
1168 t.batches[0].posts[1].body.len(),
1169 request_bytes_for_payloads(&[100, 100])
1170 );
1171
1172 assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
1173 assert_eq!(t.batches[1].posts[0].records, 3);
1174 assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
1175 assert!(!t.batches[1].posts[0].commit);
1176 assert_eq!(
1177 t.batches[1].posts[0].body.len(),
1178 request_bytes_for_payloads(&[100, 100, 100])
1179 );
1180
1181 assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
1182 assert_eq!(t.batches[1].posts[1].records, 1);
1183 assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
1184 assert!(t.batches[1].posts[1].commit);
1185 assert_eq!(
1186 t.batches[1].posts[1].body.len(),
1187 request_bytes_for_payloads(&[100])
1188 );
1189 }
1190
1191 }