1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at https://mozilla.org/MPL/2.0/.

//! The queue-processing loop for fxa_email_service.
//!
//! Configuration is via [`settings::Settings`][settings].
//!
//! [settings]: ../fxa_email_service/settings/struct.Settings.html

extern crate futures;
extern crate fxa_email_service;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate sentry;
#[macro_use(
    slog_b,
    slog_error,
    slog_info,
    slog_kv,
    slog_log,
    slog_record,
    slog_record_static
)]
extern crate slog;
#[macro_use]
extern crate slog_scope;
extern crate tokio;

use futures::future::{self, Future, Loop};
use sentry::integrations::panic::register_panic_handler;

use fxa_email_service::{
    logging::MozlogLogger,
    queues::{QueueIds, Queues, Sqs},
    settings::Settings,
    types::error::AppError,
};

lazy_static! {
    static ref SETTINGS: Settings = Settings::new().expect("config error");
    static ref QUEUES: Queues = {
        let sqs_urls = match SETTINGS.aws.sqsurls {
            Some(ref urls) => urls,
            None => panic!("Missing config: aws.sqsurls.*"),
        };
        Queues::new::<Sqs>(
            QueueIds {
                bounce: sqs_urls.bounce.to_string(),
                complaint: sqs_urls.complaint.to_string(),
                delivery: sqs_urls.delivery.to_string(),
                notification: sqs_urls.notification.to_string(),
            },
            &SETTINGS,
        )
    };
}

type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = AppError>>;

fn main() {
    let sentry_dsn = if let Some(ref sentry) = SETTINGS.sentry {
        Some(sentry.dsn.0.parse().expect("settings.sentry.dsn error"))
    } else {
        None
    };
    let sentry = sentry::init(sentry::ClientOptions {
        dsn: sentry_dsn,
        release: sentry_crate_release!(),
        ..Default::default()
    });

    if sentry.is_enabled() {
        register_panic_handler();
    }

    let logger = MozlogLogger::new(&SETTINGS);
    let _guard = slog_scope::set_global_logger(logger.0);
    let process_queues: &Fn(usize) -> LoopResult = &|previous_count: usize| {
        let future = QUEUES
            .process()
            .and_then(move |count: usize| {
                let total_count = count + previous_count;
                if count > 0 {
                    info!(
                        "Succesfully processed queue message";
                        "processed_messages" => count, "total_messages" => total_count
                    );
                }
                Ok(Loop::Continue(total_count))
            })
            .or_else(move |error: AppError| {
                let logger = MozlogLogger(slog_scope::logger());
                let log = MozlogLogger::with_app_error(&logger, &error)
                    .expect("MozlogLogger::with_app_error error");
                slog_error!(log, "{}", "Error processing queue");
                Ok(Loop::Continue(0))
            });
        Box::new(future)
    };
    let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
    rt.block_on(future::loop_fn(0, process_queues))
        .expect("tokio error");
}