Merge pull request #20 from mozilla/split-views
Move most things out of main.rs
This commit is contained in:
Коммит
1c81d8d515
|
@ -0,0 +1,77 @@
|
|||
use actix_web::{http, HttpRequest, HttpResponse};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::Future;
|
||||
use maxminddb::{self, geoip2};
|
||||
use serde::Serializer;
|
||||
use serde_derive::Serialize;
|
||||
|
||||
use crate::{
|
||||
endpoints::EndpointState, errors::ClassifyError, geoip::CountryForIp, utils::RequestExt,
|
||||
};
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ClientClassification {
|
||||
request_time: DateTime<Utc>,
|
||||
|
||||
#[serde(serialize_with = "country_iso_code")]
|
||||
country: Option<geoip2::Country>,
|
||||
}
|
||||
|
||||
fn country_iso_code<S: Serializer>(
|
||||
country_info: &Option<geoip2::Country>,
|
||||
serializer: S,
|
||||
) -> Result<S::Ok, S::Error> {
|
||||
let iso_code: Option<String> = country_info
|
||||
.clone()
|
||||
.and_then(|country_info| country_info.country)
|
||||
.and_then(|country| country.iso_code);
|
||||
|
||||
match iso_code {
|
||||
Some(code) => serializer.serialize_str(&code),
|
||||
None => serializer.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ClientClassification {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
request_time: Utc::now(),
|
||||
country: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn classify_client(
|
||||
req: &HttpRequest<EndpointState>,
|
||||
) -> Box<dyn Future<Item = HttpResponse, Error = ClassifyError>> {
|
||||
// TODO this is the sort of thing that the try operator (`?`) is supposed to
|
||||
// be for. Is it possible to use the try operator with `Box<dyn Future<_>>`?
|
||||
let ip = match req.client_ip() {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
return Box::new(futures::future::err(err));
|
||||
}
|
||||
};
|
||||
|
||||
Box::new(
|
||||
req.state()
|
||||
.geoip
|
||||
.send(CountryForIp::new(ip))
|
||||
.and_then(move |country| {
|
||||
let mut classification = ClientClassification::default();
|
||||
match country {
|
||||
Ok(country) => {
|
||||
classification.country = country.clone();
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(
|
||||
http::header::CACHE_CONTROL,
|
||||
"max-age=0, no-cache, no-store, must-revalidate",
|
||||
)
|
||||
.json(classification))
|
||||
}
|
||||
Err(err) => Ok(HttpResponse::InternalServerError().body(format!("{}", err))),
|
||||
}
|
||||
})
|
||||
.map_err(|err| ClassifyError::from_source("Future failure", err)),
|
||||
)
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
use actix_web::{FutureResponse, HttpRequest, HttpResponse};
|
||||
use futures::Future;
|
||||
use serde_derive::Serialize;
|
||||
use std::{
|
||||
fs::File,
|
||||
io::Read,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
};
|
||||
|
||||
use crate::{endpoints::EndpointState, geoip::CountryForIp};
|
||||
|
||||
pub fn lbheartbeat<S>(_req: &HttpRequest<S>) -> HttpResponse {
|
||||
HttpResponse::Ok().body("")
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct HeartbeatResponse {
|
||||
geoip: bool,
|
||||
}
|
||||
|
||||
pub fn heartbeat(req: &HttpRequest<EndpointState>) -> FutureResponse<HttpResponse> {
|
||||
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
|
||||
|
||||
Box::new(
|
||||
req.state()
|
||||
.geoip
|
||||
.send(CountryForIp::new(ip))
|
||||
.and_then(|res| match res {
|
||||
Ok(country_info) => country_info
|
||||
.and_then(|country_info| country_info.country)
|
||||
.and_then(|country| country.iso_code)
|
||||
.and_then(|iso_code| Some(Ok(iso_code == "US")))
|
||||
.unwrap_or(Ok(false)),
|
||||
Err(_) => Ok(false),
|
||||
})
|
||||
.or_else(|_| Ok(false))
|
||||
.and_then(|res| {
|
||||
let mut resp = if res {
|
||||
HttpResponse::Ok()
|
||||
} else {
|
||||
HttpResponse::ServiceUnavailable()
|
||||
};
|
||||
Ok(resp.json(HeartbeatResponse { geoip: res }))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn version(req: &HttpRequest<EndpointState>) -> HttpResponse {
|
||||
let version_file = &req.state().settings.version_file;
|
||||
// Read the file or deliberately fail with a 500 if missing.
|
||||
let mut file = File::open(version_file).unwrap();
|
||||
let mut data = String::new();
|
||||
file.read_to_string(&mut data).unwrap();
|
||||
HttpResponse::Ok()
|
||||
.content_type("application/json")
|
||||
.body(data)
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
pub mod classify;
|
||||
pub mod dockerflow;
|
||||
|
||||
use crate::{geoip::GeoIpActor, settings::Settings};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EndpointState {
|
||||
pub geoip: actix::Addr<GeoIpActor>,
|
||||
pub settings: Settings,
|
||||
}
|
|
@ -59,14 +59,17 @@ impl From<MaxMindDBError> for ClassifyError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<actix_web::http::header::ToStrError> for ClassifyError {
|
||||
fn from(error: actix_web::http::header::ToStrError) -> Self {
|
||||
Self::from_source("ToStrError", error)
|
||||
}
|
||||
macro_rules! impl_from_error {
|
||||
($error: ty, $desc: expr) => {
|
||||
impl From<$error> for ClassifyError {
|
||||
fn from(error: $error) -> Self {
|
||||
Self::from_source($desc, error)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl From<std::net::AddrParseError> for ClassifyError {
|
||||
fn from(error: std::net::AddrParseError) -> Self {
|
||||
Self::from_source("AddrParseError", error)
|
||||
}
|
||||
}
|
||||
impl_from_error!(actix_web::http::header::ToStrError, "ToStrError");
|
||||
impl_from_error!(config::ConfigError, "ConfigError");
|
||||
impl_from_error!(std::net::AddrParseError, "AddrParseError");
|
||||
impl_from_error!(std::io::Error, "IoError");
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
use maxminddb::{self, geoip2, MaxMindDBError};
|
||||
use std::{net::IpAddr, path::PathBuf};
|
||||
|
||||
use crate::errors::ClassifyError;
|
||||
|
||||
pub struct GeoIpActor {
|
||||
reader: maxminddb::OwnedReader<'static>,
|
||||
}
|
||||
|
||||
impl GeoIpActor {
|
||||
pub fn from_path<P: Into<PathBuf>>(path: P) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let path = path.into();
|
||||
let reader = maxminddb::Reader::open(path)?;
|
||||
Ok(Self { reader })
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> actix::Actor for GeoIpActor {
|
||||
type Context = actix::SyncContext<Self>;
|
||||
}
|
||||
|
||||
impl actix::Handler<CountryForIp> for GeoIpActor {
|
||||
type Result = Result<Option<geoip2::Country>, ClassifyError>;
|
||||
|
||||
fn handle(&mut self, msg: CountryForIp, _: &mut Self::Context) -> Self::Result {
|
||||
self.reader
|
||||
.lookup(msg.ip)
|
||||
.or_else(|err| match err {
|
||||
MaxMindDBError::AddressNotFoundError(_) => Ok(None),
|
||||
_ => Err(err),
|
||||
})
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CountryForIp {
|
||||
ip: IpAddr,
|
||||
}
|
||||
|
||||
impl CountryForIp {
|
||||
pub fn new(ip: IpAddr) -> Self {
|
||||
Self { ip }
|
||||
}
|
||||
}
|
||||
|
||||
impl actix::Message for CountryForIp {
|
||||
type Result = Result<Option<geoip2::Country>, ClassifyError>;
|
||||
}
|
230
src/main.rs
230
src/main.rs
|
@ -3,35 +3,25 @@
|
|||
#![deny(clippy::all)]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
mod endpoints;
|
||||
mod errors;
|
||||
mod geoip;
|
||||
mod settings;
|
||||
mod utils;
|
||||
|
||||
use actix_web::{http, App, FutureResponse, HttpRequest, HttpResponse};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::Future;
|
||||
use maxminddb::{self, geoip2, MaxMindDBError};
|
||||
use serde::Serializer;
|
||||
use serde_derive::Serialize;
|
||||
use std::{
|
||||
fs::File,
|
||||
io::Read,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
path::PathBuf,
|
||||
use actix_web::App;
|
||||
|
||||
use crate::{
|
||||
endpoints::{classify, dockerflow, EndpointState},
|
||||
errors::ClassifyError,
|
||||
geoip::GeoIpActor,
|
||||
settings::Settings,
|
||||
};
|
||||
|
||||
use crate::{errors::ClassifyError, settings::Settings};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct State {
|
||||
geoip: actix::Addr<GeoIpActor>,
|
||||
settings: settings::Settings,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
fn main() -> Result<(), ClassifyError> {
|
||||
let sys = actix::System::new("classify-client");
|
||||
|
||||
let settings =
|
||||
Settings::load().unwrap_or_else(|err| panic!(format!("Could not load settings: {}", err)));
|
||||
let settings = Settings::load()?;
|
||||
|
||||
let geoip = {
|
||||
let path = settings.geoip_db_path.clone();
|
||||
|
@ -45,200 +35,22 @@ fn main() {
|
|||
})
|
||||
};
|
||||
|
||||
let state = State { geoip, settings };
|
||||
let state = EndpointState { geoip, settings };
|
||||
|
||||
let addr = format!("{}:{}", state.settings.host, state.settings.port);
|
||||
let server = actix_web::server::new(move || {
|
||||
App::with_state(state.clone())
|
||||
.resource("/", |r| r.get().f(index))
|
||||
// Dockerflow views
|
||||
.resource("/__lbheartbeat__", |r| r.get().f(lbheartbeat))
|
||||
.resource("/__heartbeat__", |r| r.get().f(heartbeat))
|
||||
.resource("/__version__", |r| r.get().f(version))
|
||||
.resource("/", |r| r.get().f(classify::classify_client))
|
||||
// Dockerflow Endpoints
|
||||
.resource("/__lbheartbeat__", |r| r.get().f(dockerflow::lbheartbeat))
|
||||
.resource("/__heartbeat__", |r| r.get().f(dockerflow::heartbeat))
|
||||
.resource("/__version__", |r| r.get().f(dockerflow::version))
|
||||
})
|
||||
.bind(&addr)
|
||||
.unwrap_or_else(|err| panic!(format!("Couldn't listen on {}: {}", &addr, err)));
|
||||
.bind(&addr)?;
|
||||
|
||||
server.start();
|
||||
println!("started server on https://{}", addr);
|
||||
sys.run();
|
||||
}
|
||||
|
||||
struct GeoIpActor {
|
||||
reader: maxminddb::OwnedReader<'static>,
|
||||
}
|
||||
|
||||
impl GeoIpActor {
|
||||
fn from_path<P: Into<PathBuf>>(path: P) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let path = path.into();
|
||||
let reader = maxminddb::Reader::open(path)?;
|
||||
Ok(Self { reader })
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> actix::Actor for GeoIpActor {
|
||||
type Context = actix::SyncContext<Self>;
|
||||
}
|
||||
|
||||
impl actix::Handler<CountryForIp> for GeoIpActor {
|
||||
type Result = Result<Option<geoip2::Country>, ClassifyError>;
|
||||
|
||||
fn handle(&mut self, msg: CountryForIp, _: &mut Self::Context) -> Self::Result {
|
||||
self.reader
|
||||
.lookup(msg.ip)
|
||||
.or_else(|err| match err {
|
||||
MaxMindDBError::AddressNotFoundError(_) => Ok(None),
|
||||
_ => Err(err),
|
||||
})
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
}
|
||||
|
||||
struct CountryForIp {
|
||||
ip: IpAddr,
|
||||
}
|
||||
|
||||
impl actix::Message for CountryForIp {
|
||||
type Result = Result<Option<geoip2::Country>, ClassifyError>;
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ClientClassification {
|
||||
request_time: DateTime<Utc>,
|
||||
|
||||
#[serde(serialize_with = "country_iso_code")]
|
||||
country: Option<geoip2::Country>,
|
||||
}
|
||||
|
||||
fn country_iso_code<S: Serializer>(
|
||||
country_info: &Option<geoip2::Country>,
|
||||
serializer: S,
|
||||
) -> Result<S::Ok, S::Error> {
|
||||
let iso_code: Option<String> = country_info
|
||||
.clone()
|
||||
.and_then(|country_info| country_info.country)
|
||||
.and_then(|country| country.iso_code);
|
||||
|
||||
match iso_code {
|
||||
Some(code) => serializer.serialize_str(&code),
|
||||
None => serializer.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ClientClassification {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
request_time: Utc::now(),
|
||||
country: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine the IP address of the client making a request, based on network
|
||||
/// information and headers.
|
||||
fn get_client_ip<S>(request: &HttpRequest<S>) -> Result<IpAddr, ClassifyError> {
|
||||
// Actix has a method to do this, but it returns a string, and doesn't strip
|
||||
// off ports if present, so it is difficult to use.
|
||||
|
||||
if let Some(x_forwarded_for) = request.headers().get("X-Forwarded-For") {
|
||||
let ips: Vec<_> = x_forwarded_for
|
||||
.to_str()?
|
||||
.split(',')
|
||||
.map(|ip| ip.trim())
|
||||
.collect();
|
||||
if ips.len() == 1 {
|
||||
return Ok(ips[0].parse()?);
|
||||
} else if ips.len() > 1 {
|
||||
// the last item is probably a google load balancer, strip that off, use the second-to-last item.
|
||||
return Ok(ips[ips.len() - 2].parse()?);
|
||||
}
|
||||
// 0 items is an empty header, and weird. fall back to peer address detection
|
||||
}
|
||||
|
||||
// No headers were present, so use the peer address directly
|
||||
if let Some(peer_addr) = request.peer_addr() {
|
||||
return Ok(peer_addr.ip());
|
||||
}
|
||||
|
||||
Err(ClassifyError::new("Could not determine IP"))
|
||||
}
|
||||
|
||||
fn index(req: &HttpRequest<State>) -> Box<dyn Future<Item = HttpResponse, Error = ClassifyError>> {
|
||||
// TODO this is the sort of thing that the try operator (`?`) is supposed to
|
||||
// be for. Is it possible to use the try operator with `Box<dyn Future<_>>`?
|
||||
let ip = match get_client_ip(req) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
return Box::new(futures::future::err(err));
|
||||
}
|
||||
};
|
||||
|
||||
Box::new(
|
||||
req.state()
|
||||
.geoip
|
||||
.send(CountryForIp { ip })
|
||||
.and_then(move |country| {
|
||||
let mut classification = ClientClassification::default();
|
||||
match country {
|
||||
Ok(country) => {
|
||||
classification.country = country.clone();
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(
|
||||
http::header::CACHE_CONTROL,
|
||||
"max-age=0, no-cache, no-store, must-revalidate",
|
||||
)
|
||||
.json(classification))
|
||||
}
|
||||
Err(err) => Ok(HttpResponse::InternalServerError().body(format!("{}", err))),
|
||||
}
|
||||
})
|
||||
.map_err(|err| ClassifyError::from_source("Future failure", err)),
|
||||
)
|
||||
}
|
||||
|
||||
fn lbheartbeat(_req: &HttpRequest<State>) -> HttpResponse {
|
||||
HttpResponse::Ok().body("")
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct HeartbeatResponse {
|
||||
geoip: bool,
|
||||
}
|
||||
|
||||
fn heartbeat(req: &HttpRequest<State>) -> FutureResponse<HttpResponse> {
|
||||
let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
|
||||
|
||||
Box::new(
|
||||
req.state()
|
||||
.geoip
|
||||
.send(CountryForIp { ip })
|
||||
.and_then(|res| match res {
|
||||
Ok(country_info) => country_info
|
||||
.and_then(|country_info| country_info.country)
|
||||
.and_then(|country| country.iso_code)
|
||||
.and_then(|iso_code| Some(Ok(iso_code == "US")))
|
||||
.unwrap_or(Ok(false)),
|
||||
Err(_) => Ok(false),
|
||||
})
|
||||
.or_else(|_| Ok(false))
|
||||
.and_then(|res| {
|
||||
let mut resp = if res {
|
||||
HttpResponse::Ok()
|
||||
} else {
|
||||
HttpResponse::ServiceUnavailable()
|
||||
};
|
||||
Ok(resp.json(HeartbeatResponse { geoip: res }))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
fn version(req: &HttpRequest<State>) -> HttpResponse {
|
||||
let version_file = &req.state().settings.version_file;
|
||||
// Read the file or deliberately fail with a 500 if missing.
|
||||
let mut file = File::open(version_file).unwrap();
|
||||
let mut data = String::new();
|
||||
file.read_to_string(&mut data).unwrap();
|
||||
HttpResponse::Ok()
|
||||
.content_type("application/json")
|
||||
.body(data)
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
use actix_web::HttpRequest;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use crate::errors::ClassifyError;
|
||||
|
||||
pub trait RequestExt {
|
||||
/// Determine the IP address of the client making a request, based on network
|
||||
/// information and headers.
|
||||
///
|
||||
/// Actix has a method to do this, but it returns a string, and doesn't strip
|
||||
/// off ports if present, so it is difficult to use.
|
||||
fn client_ip(&self) -> Result<IpAddr, ClassifyError>;
|
||||
}
|
||||
|
||||
impl<S> RequestExt for HttpRequest<S> {
|
||||
fn client_ip(&self) -> Result<IpAddr, ClassifyError> {
|
||||
if let Some(x_forwarded_for) = self.headers().get("X-Forwarded-For") {
|
||||
let ips: Vec<_> = x_forwarded_for
|
||||
.to_str()?
|
||||
.split(',')
|
||||
.map(|ip| ip.trim())
|
||||
.collect();
|
||||
if ips.len() == 1 {
|
||||
return Ok(ips[0].parse()?);
|
||||
} else if ips.len() > 1 {
|
||||
// the last item is probably a google load balancer, strip that off, use the second-to-last item.
|
||||
return Ok(ips[ips.len() - 2].parse()?);
|
||||
}
|
||||
// 0 items is an empty header, and weird. fall back to peer address detection
|
||||
}
|
||||
|
||||
// No headers were present, so use the peer address directly
|
||||
if let Some(peer_addr) = self.peer_addr() {
|
||||
return Ok(peer_addr.ip());
|
||||
}
|
||||
|
||||
Err(ClassifyError::new("Could not determine IP"))
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче