Merge remote-tracking branch 'upstream/master' into remove-listenfd
This commit is contained in:
Коммит
3a0d446458
|
@ -1,6 +1,6 @@
|
|||
# Classify Client
|
||||
|
||||
This is an optimized version of the classify client endpoint in [Normandy][https://github.com/mozilla/normandy].
|
||||
This is an optimized version of the classify client endpoint in [Normandy](https://github.com/mozilla/normandy).
|
||||
|
||||
## Dev instructions
|
||||
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
use actix_web::HttpResponse;
|
||||
use maxminddb::{self, MaxMindDBError};
|
||||
use serde_derive::Serialize;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ClassifyError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl ClassifyError {
|
||||
pub fn new<M: Into<String>>(message: M) -> Self {
|
||||
let message = message.into();
|
||||
Self { message }
|
||||
}
|
||||
|
||||
pub fn from_source<S: fmt::Display, E: fmt::Display>(source: S, err: E) -> Self {
|
||||
Self {
|
||||
message: format!("{}: {}", source, err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use default implementation of Error
|
||||
impl std::error::Error for ClassifyError {}
|
||||
|
||||
impl fmt::Display for ClassifyError {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(formatter, "{:?}", self)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl actix_web::error::ResponseError for ClassifyError {
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::InternalServerError().json(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MaxMindDBError> for ClassifyError {
|
||||
fn from(error: MaxMindDBError) -> Self {
|
||||
match error {
|
||||
MaxMindDBError::AddressNotFoundError(msg) => ClassifyError {
|
||||
message: format!("AddressNotFound: {}", msg),
|
||||
},
|
||||
MaxMindDBError::InvalidDatabaseError(msg) => ClassifyError {
|
||||
message: format!("InvalidDatabaseError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::IoError(msg) => ClassifyError {
|
||||
message: format!("IoError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::MapError(msg) => ClassifyError {
|
||||
message: format!("MapError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::DecodingError(msg) => ClassifyError {
|
||||
message: format!("DecodingError: {}", msg),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<actix_web::http::header::ToStrError> for ClassifyError {
|
||||
fn from(error: actix_web::http::header::ToStrError) -> Self {
|
||||
Self::from_source("ToStrError", error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::net::AddrParseError> for ClassifyError {
|
||||
fn from(error: std::net::AddrParseError) -> Self {
|
||||
Self::from_source("AddrParseError", error)
|
||||
}
|
||||
}
|
127
src/main.rs
127
src/main.rs
|
@ -10,7 +10,11 @@ use futures::Future;
|
|||
use maxminddb::{self, geoip2, MaxMindDBError};
|
||||
use serde::Serializer;
|
||||
use serde_derive::Serialize;
|
||||
use std::{env, fmt, net::IpAddr, path, process};
|
||||
use std::{env, net::IpAddr, path::PathBuf, process};
|
||||
|
||||
use crate::errors::ClassifyError;
|
||||
|
||||
mod errors;
|
||||
|
||||
struct State {
|
||||
geoip: actix::Addr<GeoIpActor>,
|
||||
|
@ -21,25 +25,21 @@ fn main() {
|
|||
// PID 1 in Docker it doesn't respond to SIGINT. This prevents
|
||||
// ctrl-c from stopping a docker container running this
|
||||
// program. Handle SIGINT (aka ctrl-c) to fix this problem.
|
||||
ctrlc::set_handler(move || {
|
||||
process::exit(0);
|
||||
})
|
||||
.expect("error setting ctrl-c handler");
|
||||
ctrlc::set_handler(move || process::exit(0)).expect("error setting ctrl-c handler");
|
||||
|
||||
let sys = actix::System::new("classify-client");
|
||||
|
||||
let geoip = actix::SyncArbiter::start(1, || {
|
||||
let geoip_path: path::PathBuf = "./GeoLite2-Country.mmdb".into();
|
||||
let reader = maxminddb::Reader::open(&geoip_path).unwrap_or_else(|err| {
|
||||
let geoip_path = "./GeoLite2-Country.mmdb";
|
||||
GeoIpActor::from_path(&geoip_path).unwrap_or_else(|err| {
|
||||
panic!(format!(
|
||||
"Could not open geoip database at {:?}: {}",
|
||||
geoip_path, err
|
||||
))
|
||||
});
|
||||
GeoIpActor { reader }
|
||||
})
|
||||
});
|
||||
|
||||
let host = env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let host = env::var("HOST").unwrap_or_else(|_| "localhost".to_string());
|
||||
let port = env::var("PORT").unwrap_or_else(|_| "8080".to_string());
|
||||
let addr = format!("{}:{}", host, port);
|
||||
|
||||
|
@ -57,32 +57,18 @@ fn main() {
|
|||
sys.run();
|
||||
}
|
||||
|
||||
impl From<MaxMindDBError> for ClassifyError {
|
||||
fn from(error: MaxMindDBError) -> Self {
|
||||
match error {
|
||||
MaxMindDBError::AddressNotFoundError(msg) => ClassifyError {
|
||||
message: format!("AddressNotFound: {}", msg),
|
||||
},
|
||||
MaxMindDBError::InvalidDatabaseError(msg) => ClassifyError {
|
||||
message: format!("InvalidDatabaseError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::IoError(msg) => ClassifyError {
|
||||
message: format!("IoError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::MapError(msg) => ClassifyError {
|
||||
message: format!("MapError: {}", msg),
|
||||
},
|
||||
MaxMindDBError::DecodingError(msg) => ClassifyError {
|
||||
message: format!("DecodingError: {}", msg),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GeoIpActor {
|
||||
reader: maxminddb::OwnedReader<'static>,
|
||||
}
|
||||
|
||||
impl GeoIpActor {
|
||||
fn from_path<P: Into<PathBuf>>(path: P) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let path = path.into();
|
||||
let reader = maxminddb::Reader::open(path)?;
|
||||
Ok(Self { reader })
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> actix::Actor for GeoIpActor {
|
||||
type Context = actix::SyncContext<Self>;
|
||||
}
|
||||
|
@ -109,35 +95,6 @@ impl actix::Message for CountryForIp {
|
|||
type Result = Result<Option<geoip2::Country>, ClassifyError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ClassifyError {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl ClassifyError {
|
||||
fn from<S: fmt::Display, E: fmt::Display>(source: S, err: E) -> Self {
|
||||
ClassifyError {
|
||||
message: format!("{}: {}", source, err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use default implementation of Error
|
||||
impl std::error::Error for ClassifyError {}
|
||||
|
||||
impl fmt::Display for ClassifyError {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(formatter, "{:?}", self)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl actix_web::error::ResponseError for ClassifyError {
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::InternalServerError().json(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ClientClassification {
|
||||
request_time: DateTime<Utc>,
|
||||
|
@ -170,19 +127,39 @@ impl Default for ClientClassification {
|
|||
}
|
||||
}
|
||||
|
||||
/// Determine the IP address of the client making a request, based on network
|
||||
/// information and headers.
|
||||
fn get_client_ip<S>(request: &HttpRequest<S>) -> Result<IpAddr, ClassifyError> {
|
||||
// Actix has a method to do this, but it returns a string, and doesn't strip
|
||||
// off ports if present, so it is difficult to use.
|
||||
|
||||
if let Some(x_forwarded_for) = request.headers().get("X-Forwarded-For") {
|
||||
let ips: Vec<_> = x_forwarded_for
|
||||
.to_str()?
|
||||
.split(',')
|
||||
.map(|ip| ip.trim())
|
||||
.collect();
|
||||
if ips.len() == 1 {
|
||||
return Ok(ips[0].parse()?);
|
||||
} else if ips.len() > 1 {
|
||||
// the last item is probably a google load balancer, strip that off, use the second-to-last item.
|
||||
return Ok(ips[ips.len() - 2].parse()?);
|
||||
}
|
||||
// 0 items is an empty header, and weird. fall back to peer address detection
|
||||
}
|
||||
|
||||
// No headers were present, so use the peer address directly
|
||||
if let Some(peer_addr) = request.peer_addr() {
|
||||
return Ok(peer_addr.ip());
|
||||
}
|
||||
|
||||
Err(ClassifyError::new("Could not determine IP"))
|
||||
}
|
||||
|
||||
fn index(req: &HttpRequest<State>) -> Box<dyn Future<Item = HttpResponse, Error = ClassifyError>> {
|
||||
let ip_res: Result<IpAddr, ClassifyError> = req
|
||||
.connection_info()
|
||||
.remote()
|
||||
.ok_or(ClassifyError {
|
||||
message: "no ip".to_string(),
|
||||
})
|
||||
.and_then(|remote| {
|
||||
remote.parse().map_err(|err| {
|
||||
ClassifyError::from(format!("IP ParseError for remote '{}'", remote), err)
|
||||
})
|
||||
});
|
||||
let ip: IpAddr = match ip_res {
|
||||
// TODO this is the sort of thing that the try operator (`?`) is supposed to
|
||||
// be for. Is it possible to use the try operator with `Box<dyn Future<_>>`?
|
||||
let ip = match get_client_ip(req) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
return Box::new(futures::future::err(err));
|
||||
|
@ -208,8 +185,6 @@ fn index(req: &HttpRequest<State>) -> Box<dyn Future<Item = HttpResponse, Error
|
|||
Err(err) => Ok(HttpResponse::InternalServerError().body(format!("{}", err))),
|
||||
}
|
||||
})
|
||||
.map_err(|err| ClassifyError {
|
||||
message: format!("Future failure: {}", err),
|
||||
}),
|
||||
.map_err(|err| ClassifyError::from_source("Future failure", err)),
|
||||
)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче