use super::storage_client::Sync15ClientResponse;
use crate::bso::OutgoingEncryptedBso;
use crate::error::{self, Error as ErrorKind, Result};
use crate::ServerTimestamp;
use serde_derive::*;
use std::collections::HashMap;
use std::default::Default;
use std::ops::Deref;
use sync_guid::Guid;
use viaduct::status_codes;
#[derive(Debug, Clone)]
struct LimitTracker {
max_bytes: usize,
max_records: usize,
cur_bytes: usize,
cur_records: usize,
}
impl LimitTracker {
pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker {
LimitTracker {
max_bytes,
max_records,
cur_bytes: 0,
cur_records: 0,
}
}
pub fn clear(&mut self) {
self.cur_records = 0;
self.cur_bytes = 0;
}
pub fn can_add_record(&self, payload_size: usize) -> bool {
self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes
}
pub fn can_never_add(&self, record_size: usize) -> bool {
record_size >= self.max_bytes
}
pub fn record_added(&mut self, record_size: usize) {
assert!(
self.can_add_record(record_size),
"LimitTracker::record_added caller must check can_add_record"
);
self.cur_records += 1;
self.cur_bytes += record_size;
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InfoConfiguration {
#[serde(default = "default_max_request_bytes")]
pub max_request_bytes: usize,
#[serde(default = "usize::max_value")]
pub max_post_records: usize,
#[serde(default = "usize::max_value")]
pub max_post_bytes: usize,
#[serde(default = "usize::max_value")]
pub max_total_records: usize,
#[serde(default = "usize::max_value")]
pub max_total_bytes: usize,
#[serde(default = "default_max_record_payload_bytes")]
pub max_record_payload_bytes: usize,
}
fn default_max_request_bytes() -> usize {
260 * 1024
}
fn default_max_record_payload_bytes() -> usize {
256 * 1024
}
impl Default for InfoConfiguration {
#[inline]
fn default() -> InfoConfiguration {
InfoConfiguration {
max_request_bytes: default_max_request_bytes(),
max_record_payload_bytes: default_max_record_payload_bytes(),
max_post_records: usize::MAX,
max_post_bytes: usize::MAX,
max_total_records: usize::MAX,
max_total_bytes: usize::MAX,
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>);
impl InfoCollections {
pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections {
InfoCollections(collections)
}
}
impl Deref for InfoCollections {
type Target = HashMap<String, ServerTimestamp>;
fn deref(&self) -> &HashMap<String, ServerTimestamp> {
&self.0
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct UploadResult {
batch: Option<String>,
#[serde(default = "HashMap::new")]
pub failed: HashMap<Guid, String>,
#[serde(default = "Vec::new")]
pub success: Vec<Guid>,
}
pub type PostResponse = Sync15ClientResponse<UploadResult>;
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum BatchState {
Unsupported,
NoBatch,
InBatch(String),
}
#[derive(Debug)]
pub struct PostQueue<Post, OnResponse> {
poster: Post,
on_response: OnResponse,
post_limits: LimitTracker,
batch_limits: LimitTracker,
max_payload_bytes: usize,
max_request_bytes: usize,
queued: Vec<u8>,
batch: BatchState,
last_modified: ServerTimestamp,
}
pub trait BatchPoster {
fn post<P, O>(
&self,
body: Vec<u8>,
xius: ServerTimestamp,
batch: Option<String>,
commit: bool,
queue: &PostQueue<P, O>,
) -> Result<PostResponse>;
}
pub trait PostResponseHandler {
fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>;
}
#[derive(Debug, Clone)]
pub(crate) struct NormalResponseHandler {
pub failed_ids: Vec<Guid>,
pub successful_ids: Vec<Guid>,
pub allow_failed: bool,
pub pending_failed: Vec<Guid>,
pub pending_success: Vec<Guid>,
}
impl NormalResponseHandler {
pub fn new(allow_failed: bool) -> NormalResponseHandler {
NormalResponseHandler {
failed_ids: vec![],
successful_ids: vec![],
pending_failed: vec![],
pending_success: vec![],
allow_failed,
}
}
}
impl PostResponseHandler for NormalResponseHandler {
fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> {
match r {
Sync15ClientResponse::Success { record, .. } => {
if !record.failed.is_empty() && !self.allow_failed {
return Err(ErrorKind::RecordUploadFailed);
}
for id in record.success.iter() {
self.pending_success.push(id.clone());
}
for kv in record.failed.iter() {
self.pending_failed.push(kv.0.clone());
}
if !mid_batch {
self.successful_ids.append(&mut self.pending_success);
self.failed_ids.append(&mut self.pending_failed);
}
Ok(())
}
_ => Err(r.create_storage_error()),
}
}
}
impl<Poster, OnResponse> PostQueue<Poster, OnResponse>
where
Poster: BatchPoster,
OnResponse: PostResponseHandler,
{
pub fn new(
config: &InfoConfiguration,
ts: ServerTimestamp,
poster: Poster,
on_response: OnResponse,
) -> PostQueue<Poster, OnResponse> {
PostQueue {
poster,
on_response,
last_modified: ts,
post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records),
batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records),
batch: BatchState::NoBatch,
max_payload_bytes: config.max_record_payload_bytes,
max_request_bytes: config.max_request_bytes,
queued: Vec::new(),
}
}
#[inline]
fn in_batch(&self) -> bool {
!matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch)
}
pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result<bool> {
let payload_length = record.serialized_payload_len();
if self.post_limits.can_never_add(payload_length)
|| self.batch_limits.can_never_add(payload_length)
|| payload_length >= self.max_payload_bytes
{
log::warn!(
"Single record too large to submit to server ({} b)",
payload_length
);
return Ok(false);
}
let item_start = self.queued.len();
self.queued.reserve(payload_length + 2);
let c = if self.queued.is_empty() { b'[' } else { b',' };
self.queued.push(c);
serde_json::to_writer(&mut self.queued, &record).unwrap();
let item_end = self.queued.len();
debug_assert!(
item_end >= payload_length,
"EncryptedPayload::serialized_len is bugged"
);
let item_len = item_end - item_start + 1;
if item_len >= self.max_request_bytes {
self.queued.truncate(item_start);
log::warn!(
"Single record too large to submit to server ({} b)",
item_len
);
return Ok(false);
}
let can_post_record = self.post_limits.can_add_record(payload_length);
let can_batch_record = self.batch_limits.can_add_record(payload_length);
let can_send_record = self.queued.len() < self.max_request_bytes;
if !can_post_record || !can_send_record || !can_batch_record {
log::debug!(
"PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})",
can_post_record,
can_send_record,
can_batch_record
);
self.queued.truncate(item_start);
self.flush(!can_batch_record)?;
let c = if self.queued.is_empty() { b'[' } else { b',' };
self.queued.push(c);
serde_json::to_writer(&mut self.queued, &record).unwrap();
}
self.post_limits.record_added(payload_length);
self.batch_limits.record_added(payload_length);
Ok(true)
}
pub fn flush(&mut self, want_commit: bool) -> Result<()> {
if self.queued.is_empty() {
assert!(
!self.in_batch(),
"Bug: Somehow we're in a batch but have no queued records"
);
return Ok(());
}
self.queued.push(b']');
let batch_id = match &self.batch {
BatchState::Unsupported => None,
BatchState::NoBatch => Some("true".into()),
BatchState::InBatch(ref s) => Some(s.clone()),
};
log::info!(
"Posting {} records of {} bytes",
self.post_limits.cur_records,
self.queued.len()
);
let is_commit = want_commit && batch_id.is_some();
let resp_or_error = self.poster.post(
self.queued.clone(),
self.last_modified,
batch_id,
is_commit,
self,
);
self.queued.truncate(0);
if want_commit || self.batch == BatchState::Unsupported {
self.batch_limits.clear();
}
self.post_limits.clear();
let resp = resp_or_error?;
let (status, last_modified, record) = match resp {
Sync15ClientResponse::Success {
status,
last_modified,
ref record,
..
} => (status, last_modified, record),
_ => {
self.on_response.handle_response(resp, !want_commit)?;
unreachable!();
}
};
if want_commit || self.batch == BatchState::Unsupported {
self.last_modified = last_modified;
}
if want_commit {
log::debug!("Committed batch {:?}", self.batch);
self.batch = BatchState::NoBatch;
self.on_response.handle_response(resp, false)?;
return Ok(());
}
if status != status_codes::ACCEPTED {
if self.in_batch() {
return Err(ErrorKind::ServerBatchProblem(
"Server responded non-202 success code while a batch was in progress",
));
}
self.last_modified = last_modified;
self.batch = BatchState::Unsupported;
self.batch_limits.clear();
self.on_response.handle_response(resp, false)?;
return Ok(());
}
let batch_id = record
.batch
.as_ref()
.ok_or({
ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID")
})?
.clone();
match &self.batch {
BatchState::Unsupported => {
log::warn!("Server changed its mind about supporting batching mid-batch...");
}
BatchState::InBatch(ref cur_id) => {
if cur_id != &batch_id {
return Err(ErrorKind::ServerBatchProblem(
"Invalid server response: 202 without a batch ID",
));
}
}
_ => {}
}
self.batch = BatchState::InBatch(batch_id);
self.last_modified = last_modified;
self.on_response.handle_response(resp, true)?;
Ok(())
}
}
#[derive(Clone)]
pub struct UploadInfo {
pub successful_ids: Vec<Guid>,
pub failed_ids: Vec<Guid>,
pub modified_timestamp: ServerTimestamp,
}
impl<Poster> PostQueue<Poster, NormalResponseHandler> {
pub fn completed_upload_info(&mut self) -> UploadInfo {
let mut result = UploadInfo {
successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()),
failed_ids: Vec::with_capacity(
self.on_response.failed_ids.len()
+ self.on_response.pending_failed.len()
+ self.on_response.pending_success.len(),
),
modified_timestamp: self.last_modified,
};
result
.successful_ids
.append(&mut self.on_response.successful_ids);
result.failed_ids.append(&mut self.on_response.failed_ids);
result
.failed_ids
.append(&mut self.on_response.pending_failed);
result
.failed_ids
.append(&mut self.on_response.pending_success);
result
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope};
use crate::EncryptedPayload;
use lazy_static::lazy_static;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
#[derive(Debug, Clone)]
struct PostedData {
body: String,
_xius: ServerTimestamp,
batch: Option<String>,
commit: bool,
payload_bytes: usize,
records: usize,
}
impl PostedData {
fn records_as_json(&self) -> Vec<serde_json::Value> {
let values =
serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json");
let records_or_err =
serde_json::from_value::<Vec<IncomingEncryptedBso>>(values.clone());
records_or_err.expect("Failed to deserialize data");
serde_json::from_value(values).unwrap()
}
}
#[derive(Debug, Clone)]
struct BatchInfo {
id: Option<String>,
posts: Vec<PostedData>,
bytes: usize,
records: usize,
}
#[derive(Debug, Clone)]
struct TestPoster {
all_posts: Vec<PostedData>,
responses: VecDeque<PostResponse>,
batches: Vec<BatchInfo>,
cur_batch: Option<BatchInfo>,
cfg: InfoConfiguration,
}
type TestPosterRef = Rc<RefCell<TestPoster>>;
impl TestPoster {
pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef
where
T: Into<VecDeque<PostResponse>>,
{
Rc::new(RefCell::new(TestPoster {
all_posts: vec![],
responses: responses.into(),
batches: vec![],
cur_batch: None,
cfg: cfg.clone(),
}))
}
fn do_post<T, O>(
&mut self,
body: &[u8],
xius: ServerTimestamp,
batch: Option<String>,
commit: bool,
queue: &PostQueue<T, O>,
) -> Sync15ClientResponse<UploadResult> {
let mut post = PostedData {
body: String::from_utf8(body.into()).expect("Posted invalid utf8..."),
batch: batch.clone(),
_xius: xius,
commit,
payload_bytes: 0,
records: 0,
};
assert!(body.len() <= self.cfg.max_request_bytes);
let (num_records, record_payload_bytes) = {
let recs = post.records_as_json();
assert!(recs.len() <= self.cfg.max_post_records);
assert!(recs.len() <= self.cfg.max_total_records);
let payload_bytes: usize = recs
.iter()
.map(|r| {
let len = r["payload"]
.as_str()
.expect("Non string payload property")
.len();
assert!(len <= self.cfg.max_record_payload_bytes);
len
})
.sum();
assert!(payload_bytes <= self.cfg.max_post_bytes);
assert!(payload_bytes <= self.cfg.max_total_bytes);
assert_eq!(queue.post_limits.cur_bytes, payload_bytes);
assert_eq!(queue.post_limits.cur_records, recs.len());
(recs.len(), payload_bytes)
};
post.payload_bytes = record_payload_bytes;
post.records = num_records;
self.all_posts.push(post.clone());
let response = self.responses.pop_front().unwrap();
let record = match response {
Sync15ClientResponse::Success { ref record, .. } => record,
_ => {
panic!("only success codes are used in this test");
}
};
if self.cur_batch.is_none() {
assert!(
batch.is_none() || batch == Some("true".into()),
"We shouldn't be in a batch now"
);
self.cur_batch = Some(BatchInfo {
id: record.batch.clone(),
posts: vec![],
records: 0,
bytes: 0,
});
} else {
assert_eq!(
batch,
self.cur_batch.as_ref().unwrap().id,
"We're in a batch but got the wrong batch id"
);
}
{
let batch = self.cur_batch.as_mut().unwrap();
batch.posts.push(post);
batch.records += num_records;
batch.bytes += record_payload_bytes;
assert!(batch.bytes <= self.cfg.max_total_bytes);
assert!(batch.records <= self.cfg.max_total_records);
assert_eq!(batch.records, queue.batch_limits.cur_records);
assert_eq!(batch.bytes, queue.batch_limits.cur_bytes);
}
if commit || record.batch.is_none() {
let batch = self.cur_batch.take().unwrap();
self.batches.push(batch);
}
response
}
fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) {
assert_eq!(mid_batch, self.cur_batch.is_some());
}
}
impl BatchPoster for TestPosterRef {
fn post<T, O>(
&self,
body: Vec<u8>,
xius: ServerTimestamp,
batch: Option<String>,
commit: bool,
queue: &PostQueue<T, O>,
) -> Result<PostResponse> {
Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue))
}
}
impl PostResponseHandler for TestPosterRef {
fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> {
self.borrow_mut().do_handle_response(r, mid_batch);
Ok(())
}
}
type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>;
fn pq_test_setup(
cfg: InfoConfiguration,
lm: i64,
resps: Vec<PostResponse>,
) -> (MockedPostQueue, TestPosterRef) {
let tester = TestPoster::new(&cfg, resps);
let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone());
(pq, tester)
}
fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse {
assert!(status_codes::is_success_code(status));
Sync15ClientResponse::Success {
status,
last_modified: ServerTimestamp(lm),
record: UploadResult {
batch: batch.into().map(Into::into),
failed: HashMap::new(),
success: vec![],
},
route: "test/path".into(),
}
}
lazy_static! {
static ref PAYLOAD_OVERHEAD: usize = {
let payload = EncryptedPayload {
iv: "".into(),
hmac: "".into(),
ciphertext: "".into()
};
serde_json::to_string(&payload).unwrap().len()
};
static ref TOTAL_RECORD_OVERHEAD: usize = {
let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope {
id: "".into(),
sortindex: None,
ttl: None,
},
EncryptedPayload {
iv: "".into(),
hmac: "".into(),
ciphertext: "".into()
},
)).unwrap();
serde_json::to_string(&val).unwrap().len()
};
static ref NON_PAYLOAD_OVERHEAD: usize = {
*TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD
};
}
fn make_record(payload_size: usize) -> OutgoingEncryptedBso {
assert!(payload_size > *PAYLOAD_OVERHEAD);
let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD;
OutgoingEncryptedBso::new(
OutgoingEnvelope {
id: "".into(),
sortindex: None,
ttl: None,
},
EncryptedPayload {
iv: "".into(),
hmac: "".into(),
ciphertext: "x".repeat(ciphertext_len),
},
)
}
fn request_bytes_for_payloads(payloads: &[usize]) -> usize {
1 + payloads
.iter()
.map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD)
.sum::<usize>()
}
#[test]
fn test_pq_basic() {
let cfg = InfoConfiguration {
max_request_bytes: 1000,
max_record_payload_bytes: 1000,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![fake_response(status_codes::OK, time + 100_000, None)],
);
pq.enqueue(&make_record(100)).unwrap();
pq.flush(true).unwrap();
let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 1);
assert_eq!(t.batches.len(), 1);
assert_eq!(t.batches[0].posts.len(), 1);
assert_eq!(t.batches[0].records, 1);
assert_eq!(t.batches[0].bytes, 100);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[100])
);
}
#[test]
fn test_pq_max_request_bytes_no_batch() {
let cfg = InfoConfiguration {
max_request_bytes: 250,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::OK, time + 100_000, None),
fake_response(status_codes::OK, time + 200_000, None),
],
);
let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.flush(true).unwrap();
let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 2);
assert_eq!(t.batches.len(), 2);
assert_eq!(t.batches[0].posts.len(), 1);
assert_eq!(t.batches[0].records, 2);
assert_eq!(t.batches[0].bytes, payload_size * 2);
assert_eq!(t.batches[0].posts[0].batch, Some("true".into()));
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[payload_size, payload_size])
);
assert_eq!(t.batches[1].posts.len(), 1);
assert_eq!(t.batches[1].records, 1);
assert_eq!(t.batches[1].bytes, payload_size);
assert_eq!(t.batches[1].posts[0].batch, None);
assert!(!t.batches[1].posts[0].commit);
assert_eq!(
t.batches[1].posts[0].body.len(),
request_bytes_for_payloads(&[payload_size])
);
}
#[test]
fn test_pq_max_record_payload_bytes_no_batch() {
let cfg = InfoConfiguration {
max_record_payload_bytes: 150,
max_request_bytes: 350,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::OK, time + 100_000, None),
fake_response(status_codes::OK, time + 200_000, None),
],
);
let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
pq.enqueue(&make_record(payload_size)).unwrap(); let enqueued = pq.enqueue(&make_record(151)).unwrap(); assert!(!enqueued, "Should not have fit");
pq.enqueue(&make_record(payload_size)).unwrap();
pq.flush(true).unwrap();
let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 1);
assert_eq!(t.batches.len(), 1);
assert_eq!(t.batches[0].posts.len(), 1);
assert_eq!(t.batches[0].records, 2);
assert_eq!(t.batches[0].bytes, payload_size * 2);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[payload_size, payload_size])
);
}
#[test]
fn test_pq_single_batch() {
let cfg = InfoConfiguration::default();
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![fake_response(
status_codes::ACCEPTED,
time + 100_000,
Some("1234"),
)],
);
let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
pq.enqueue(&make_record(payload_size)).unwrap();
pq.enqueue(&make_record(payload_size)).unwrap();
pq.enqueue(&make_record(payload_size)).unwrap();
pq.flush(true).unwrap();
let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 1);
assert_eq!(t.batches.len(), 1);
assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts.len(), 1);
assert_eq!(t.batches[0].records, 3);
assert_eq!(t.batches[0].bytes, payload_size * 3);
assert!(t.batches[0].posts[0].commit);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[payload_size, payload_size, payload_size])
);
}
#[test]
fn test_pq_multi_post_batch_bytes() {
let cfg = InfoConfiguration {
max_post_bytes: 200,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::ACCEPTED, time, Some("1234")),
fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
],
);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.flush(true).unwrap(); let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 2);
assert_eq!(t.batches.len(), 1);
assert_eq!(t.batches[0].posts.len(), 2);
assert_eq!(t.batches[0].records, 3);
assert_eq!(t.batches[0].bytes, 300);
assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[0].posts[0].records, 2);
assert_eq!(t.batches[0].posts[0].payload_bytes, 200);
assert!(!t.batches[0].posts[0].commit);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100])
);
assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts[1].records, 1);
assert_eq!(t.batches[0].posts[1].payload_bytes, 100);
assert!(t.batches[0].posts[1].commit);
assert_eq!(
t.batches[0].posts[1].body.len(),
request_bytes_for_payloads(&[100])
);
}
#[test]
fn test_pq_multi_post_batch_records() {
let cfg = InfoConfiguration {
max_post_records: 3,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::ACCEPTED, time, Some("1234")),
fake_response(status_codes::ACCEPTED, time, Some("1234")),
fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
],
);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.flush(true).unwrap(); let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 3);
assert_eq!(t.batches.len(), 1);
assert_eq!(t.batches[0].posts.len(), 3);
assert_eq!(t.batches[0].records, 7);
assert_eq!(t.batches[0].bytes, 700);
assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[0].posts[0].records, 3);
assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
assert!(!t.batches[0].posts[0].commit);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts[1].records, 3);
assert_eq!(t.batches[0].posts[1].payload_bytes, 300);
assert!(!t.batches[0].posts[1].commit);
assert_eq!(
t.batches[0].posts[1].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts[2].records, 1);
assert_eq!(t.batches[0].posts[2].payload_bytes, 100);
assert!(t.batches[0].posts[2].commit);
assert_eq!(
t.batches[0].posts[2].body.len(),
request_bytes_for_payloads(&[100])
);
}
#[test]
#[allow(clippy::cognitive_complexity)]
fn test_pq_multi_post_multi_batch_records() {
let cfg = InfoConfiguration {
max_post_records: 3,
max_total_records: 5,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::ACCEPTED, time, Some("1234")),
fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")),
],
);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.flush(true).unwrap(); let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 4);
assert_eq!(t.batches.len(), 2);
assert_eq!(t.batches[0].posts.len(), 2);
assert_eq!(t.batches[1].posts.len(), 2);
assert_eq!(t.batches[0].records, 5);
assert_eq!(t.batches[1].records, 4);
assert_eq!(t.batches[0].bytes, 500);
assert_eq!(t.batches[1].bytes, 400);
assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[0].posts[0].records, 3);
assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
assert!(!t.batches[0].posts[0].commit);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts[1].records, 2);
assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
assert!(t.batches[0].posts[1].commit);
assert_eq!(
t.batches[0].posts[1].body.len(),
request_bytes_for_payloads(&[100, 100])
);
assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[1].posts[0].records, 3);
assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
assert!(!t.batches[1].posts[0].commit);
assert_eq!(
t.batches[1].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
assert_eq!(t.batches[1].posts[1].records, 1);
assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
assert!(t.batches[1].posts[1].commit);
assert_eq!(
t.batches[1].posts[1].body.len(),
request_bytes_for_payloads(&[100])
);
}
#[test]
#[allow(clippy::cognitive_complexity)]
fn test_pq_multi_post_multi_batch_bytes() {
let cfg = InfoConfiguration {
max_post_bytes: 300,
max_total_bytes: 500,
..InfoConfiguration::default()
};
let time = 11_111_111_000;
let (mut pq, tester) = pq_test_setup(
cfg,
time,
vec![
fake_response(status_codes::ACCEPTED, time, Some("1234")),
fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), ],
);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
assert_eq!(pq.last_modified.0, time);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
assert_eq!(pq.last_modified.0, time + 100_000);
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
pq.enqueue(&make_record(100)).unwrap();
assert_eq!(pq.last_modified.0, time + 100_000);
pq.flush(true).unwrap(); assert_eq!(pq.last_modified.0, time + 200_000);
let t = tester.borrow();
assert!(t.cur_batch.is_none());
assert_eq!(t.all_posts.len(), 4);
assert_eq!(t.batches.len(), 2);
assert_eq!(t.batches[0].posts.len(), 2);
assert_eq!(t.batches[1].posts.len(), 2);
assert_eq!(t.batches[0].records, 5);
assert_eq!(t.batches[1].records, 4);
assert_eq!(t.batches[0].bytes, 500);
assert_eq!(t.batches[1].bytes, 400);
assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[0].posts[0].records, 3);
assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
assert!(!t.batches[0].posts[0].commit);
assert_eq!(
t.batches[0].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
assert_eq!(t.batches[0].posts[1].records, 2);
assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
assert!(t.batches[0].posts[1].commit);
assert_eq!(
t.batches[0].posts[1].body.len(),
request_bytes_for_payloads(&[100, 100])
);
assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
assert_eq!(t.batches[1].posts[0].records, 3);
assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
assert!(!t.batches[1].posts[0].commit);
assert_eq!(
t.batches[1].posts[0].body.len(),
request_bytes_for_payloads(&[100, 100, 100])
);
assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
assert_eq!(t.batches[1].posts[1].records, 1);
assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
assert!(t.batches[1].posts[1].commit);
assert_eq!(
t.batches[1].posts[1].body.len(),
request_bytes_for_payloads(&[100])
);
}
}