This commit is contained in:
Michael Cooper 2021-05-13 15:49:39 -07:00
Родитель 553e06cf9a
Коммит 10c4a5f2d1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 9424CEA6F89AB334
8 изменённых файлов: 1760 добавлений и 1026 удалений

2625
Cargo.lock сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -9,10 +9,11 @@ version = "0.2.0"
[dependencies]
actix = "^0.8.0"
actix-web = "^1"
actix-web = "^2"
actix-rt = "^1.1.1"
cadence = "^0.19.0"
envy = "^0.4.0"
futures = "^0.1.29"
futures = "^0.3"
lazy_static = "^1.2.0"
maxminddb = "^0.13.0"
regex = "^1.1.0"

Просмотреть файл

@ -37,7 +37,7 @@ impl Default for ClientClassification {
}
}
pub fn classify_client(req: HttpRequest) -> Result<HttpResponse, ClassifyError> {
pub async fn classify_client(req: HttpRequest) -> Result<HttpResponse, ClassifyError> {
req.app_data::<EndpointState>()
.expect("Could not get app state")
.geoip
@ -69,8 +69,8 @@ mod tests {
use serde_json::{json, Value};
use std::{collections::HashSet, sync::Arc};
#[test]
fn test_classification_serialization() {
#[actix_rt::test]
async fn test_classification_serialization() {
let mut classification = super::ClientClassification::default();
let value = serde_json::to_value(&classification).unwrap();
@ -96,8 +96,8 @@ mod tests {
);
}
#[test]
fn test_classify_endpoint() -> Result<(), Box<dyn std::error::Error>> {
#[actix_rt::test]
async fn test_classify_endpoint() -> Result<(), Box<dyn std::error::Error>> {
let state = EndpointState {
geoip: Arc::new(
GeoIp::builder()
@ -110,12 +110,13 @@ mod tests {
};
let mut service = test::init_service(
App::new()
.data(state)
.app_data(state)
.route("/", web::get().to(super::classify_client)),
);
)
.await;
let request = TestRequest::with_header("x-forwarded-for", "7.7.7.7").to_request();
let value: serde_json::Value = test::read_response_json(&mut service, request);
let value: serde_json::Value = test::read_response_json(&mut service, request).await;
assert_eq!(
*value.get("country").unwrap(),
json!("US"),
@ -133,11 +134,11 @@ mod tests {
Ok(())
}
#[test]
fn test_classify_endpoint_has_correct_cache_headers() {
#[actix_rt::test]
async fn test_classify_endpoint_has_correct_cache_headers() {
let mut service = test::init_service(
App::new()
.data(EndpointState {
.app_data(EndpointState {
geoip: Arc::new(
GeoIp::builder()
.path("./GeoLite2-Country.mmdb")
@ -147,10 +148,11 @@ mod tests {
..EndpointState::default()
})
.route("/", web::get().to(super::classify_client)),
);
)
.await;
let request = TestRequest::with_header("x-forwarded-for", "1.2.3.4").to_request();
let response = test::call_service(&mut service, request);
let response = test::call_service(&mut service, request).await;
assert_eq!(response.status(), http::StatusCode::OK);
let headers = response.headers();

Просмотреть файл

@ -16,7 +16,7 @@ struct HeartbeatResponse {
geoip: bool,
}
pub fn heartbeat(app_data: Data<EndpointState>) -> Result<HttpResponse, ClassifyError> {
pub async fn heartbeat(app_data: Data<EndpointState>) -> Result<HttpResponse, ClassifyError> {
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
app_data
@ -60,37 +60,39 @@ mod tests {
web, App,
};
#[test]
fn lbheartbeat() {
#[actix_rt::test]
async fn lbheartbeat() {
let mut service =
test::init_service(App::new().route("/", web::get().to(super::lbheartbeat)));
test::init_service(App::new().route("/", web::get().to(super::lbheartbeat))).await;
let req = TestRequest::default().to_request();
let res = test::call_service(&mut service, req);
let res = test::call_service(&mut service, req).await;
assert_eq!(res.status(), http::StatusCode::OK);
}
#[test]
fn heartbeat() {
#[actix_rt::test]
async fn heartbeat() {
let mut service = test::init_service(
App::new()
.data(EndpointState::default())
.route("/", web::get().to(super::heartbeat)),
);
)
.await;
let request = TestRequest::default().to_request();
let response = test::call_service(&mut service, request);
let response = test::call_service(&mut service, request).await;
// Should return service unavailable since there is no geoip set up
assert_eq!(response.status(), http::StatusCode::SERVICE_UNAVAILABLE);
}
#[test]
fn version() -> Result<(), Box<dyn std::error::Error>> {
#[actix_rt::test]
async fn version() -> Result<(), Box<dyn std::error::Error>> {
let mut service = test::init_service(
App::new()
.data(EndpointState::default())
.route("/", web::get().to(super::version)),
);
)
.await;
let request = TestRequest::default().to_request();
let response = test::call_service(&mut service, request);
let response = test::call_service(&mut service, request).await;
let status = response.status();
assert_eq!(status, http::StatusCode::OK);
Ok(())

Просмотреть файл

@ -2,11 +2,11 @@ use actix_web::{
dev::{Service, ServiceRequest, ServiceResponse, Transform},
Error, HttpRequest, HttpResponse,
};
use futures::{future, Future, Poll};
use futures::{future, task, Future, FutureExt};
use slog::{self, Drain};
use slog_derive::KV;
use slog_mozlog_json::MozLogJson;
use std::io;
use std::{io, pin::Pin};
use crate::endpoints::EndpointState;
@ -85,7 +85,7 @@ where
type Error = Error;
type InitError = ();
type Transform = RequestLoggerMiddleware<S>;
type Future = future::FutureResult<Self::Transform, Self::InitError>;
type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
future::ok(RequestLoggerMiddleware { service })
@ -105,22 +105,27 @@ where
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
#[allow(clippy::clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx)
}
fn call(&mut self, req: ServiceRequest) -> Self::Future {
let log = match req.app_data::<EndpointState>() {
Some(state) => state.log.clone(),
None => return Box::new(self.service.call(req)),
None => return Box::pin(self.service.call(req)),
};
Box::new(self.service.call(req).and_then(move |res| {
let fields = MozLogFields::new(&res);
slog::info!(log, "" ; slog::o!(fields));
Ok(res)
Box::pin(self.service.call(req).then(move |res| match res {
Ok(val) => {
let fields = MozLogFields::new(&val);
slog::info!(log, "" ; slog::o!(fields));
future::ok(val)
}
Err(err) => future::err(err),
}))
}
}

Просмотреть файл

@ -80,7 +80,7 @@ fn main() -> Result<(), ClassifyError> {
app
})
.bind(&addr)?
.run()?;
.run();
Ok(())
}

Просмотреть файл

@ -4,10 +4,11 @@ use actix_web::{
Error,
};
use cadence::{prelude::*, BufferedUdpMetricSink, StatsdClient};
use futures::{future, Future, Poll};
use futures::{future, task, Future, FutureExt};
use std::{
fmt::Display,
net::{ToSocketAddrs, UdpSocket},
pin::Pin,
time::Instant,
};
@ -57,7 +58,7 @@ where
type Error = Error;
type InitError = ();
type Transform = ResponseTimerMiddleware<S>;
type Future = future::FutureResult<Self::Transform, Self::InitError>;
type Future = future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
future::ok(ResponseTimerMiddleware { service })
@ -77,36 +78,40 @@ where
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
#[allow(clippy::clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx)
}
fn call(&mut self, req: ServiceRequest) -> Self::Future {
let metrics = match req.app_data::<EndpointState>() {
Some(state) => state.metrics.clone(),
None => return Box::new(self.service.call(req)),
None => return Box::pin(self.service.call(req)),
};
let started = Instant::now();
metrics.incr_with_tags("ongoing_requests").send();
Box::new(self.service.call(req).and_then(move |res| {
let duration = started.elapsed();
metrics
.time_duration_with_tags("response", duration)
.with_tag(
"status",
if res.status().is_success() {
"success"
} else {
"error"
},
)
.send();
metrics.decr_with_tags("ongoing_requests").send();
Ok(res)
Box::pin(self.service.call(req).then(move |res| match res {
Ok(val) => {
let duration = started.elapsed();
metrics
.time_duration_with_tags("response", duration)
.with_tag(
"status",
if val.status().is_success() {
"success"
} else {
"error"
},
)
.send();
metrics.decr_with_tags("ongoing_requests").send();
future::ok(val)
}
Err(err) => future::err(err),
}))
}
}
@ -139,8 +144,8 @@ pub mod tests {
}
}
#[test]
fn test_response_metrics_works() -> Result<(), Box<dyn std::error::Error>> {
#[actix_rt::test]
async fn test_response_metrics_works() -> Result<(), Box<dyn std::error::Error>> {
// Set up a service that logs metrics to vec we own
let log = Arc::new(Mutex::new(Vec::new()));
let state = EndpointState {
@ -150,11 +155,12 @@ pub mod tests {
let mut service = test::init_service(App::new().data(state).wrap(ResponseTimer).route(
"/",
web::get().to(|| HttpResponse::InternalServerError().finish()),
));
))
.await;
// Make a request to that service
let request = TestRequest::with_uri("/").to_request();
test::call_service(&mut service, request);
test::call_service(&mut service, request).await;
// Check that the logged metric line looks as expected
let log = log.lock().unwrap();
@ -172,8 +178,8 @@ pub mod tests {
}
/// Test that if a request fails, an error is reported in metrics
#[test]
fn test_response_metrics_logs_error() -> Result<(), Box<dyn std::error::Error>> {
#[actix_rt::test]
async fn test_response_metrics_logs_error() -> Result<(), Box<dyn std::error::Error>> {
// Set up a service that logs metrics to vec we own
let log = Arc::new(Mutex::new(Vec::new()));
let state = EndpointState {
@ -183,11 +189,12 @@ pub mod tests {
let mut service = test::init_service(App::new().data(state).wrap(ResponseTimer).route(
"/",
web::get().to(|| HttpResponse::InternalServerError().finish()),
));
))
.await;
// Make a request to that service
let request = TestRequest::with_uri("/").to_request();
test::call_service(&mut service, request);
test::call_service(&mut service, request).await;
// Check that the logged metric line looks as expected
let log = log.lock().unwrap();

Просмотреть файл

@ -94,7 +94,7 @@ mod tests {
);
let req = TestRequest::with_header("x-forwarded-for", "1.2.3.4, 5.6.7.8")
.data(state)
.app_data(state)
.to_http_request();
assert_eq!(
@ -115,7 +115,7 @@ mod tests {
};
let req = TestRequest::with_header("x-forwarded-for", "1.2.3.4, 5.6.7.8")
.data(state)
.app_data(state)
.to_http_request();
assert_eq!(
@ -136,7 +136,7 @@ mod tests {
};
let req = TestRequest::with_header("x-forwarded-for", "1.2.3.4, 5.6.7.8")
.data(state)
.app_data(state)
.to_http_request();
assert!(