From b33268e500d07f4f84dedeae80c248889579cdc3 Mon Sep 17 00:00:00 2001 From: Mike Cooper Date: Mon, 17 Dec 2018 15:48:08 -0800 Subject: [PATCH 1/2] Move most things out of main.rs --- src/errors.rs | 21 ++-- src/geoip.rs | 48 +++++++++ src/main.rs | 228 ++++------------------------------------ src/utils.rs | 39 +++++++ src/views/classify.rs | 75 +++++++++++++ src/views/dockerflow.rs | 57 ++++++++++ src/views/mod.rs | 10 ++ 7 files changed, 261 insertions(+), 217 deletions(-) create mode 100644 src/geoip.rs create mode 100644 src/utils.rs create mode 100644 src/views/classify.rs create mode 100644 src/views/dockerflow.rs create mode 100644 src/views/mod.rs diff --git a/src/errors.rs b/src/errors.rs index 2fb6398..1d61b48 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -59,14 +59,17 @@ impl From for ClassifyError { } } -impl From for ClassifyError { - fn from(error: actix_web::http::header::ToStrError) -> Self { - Self::from_source("ToStrError", error) - } +macro_rules! impl_from_error { + ($error: ty, $desc: expr) => { + impl From<$error> for ClassifyError { + fn from(error: $error) -> Self { + Self::from_source($desc, error) + } + } + }; } -impl From for ClassifyError { - fn from(error: std::net::AddrParseError) -> Self { - Self::from_source("AddrParseError", error) - } -} +impl_from_error!(actix_web::http::header::ToStrError, "ToStrError"); +impl_from_error!(config::ConfigError, "ConfigError"); +impl_from_error!(std::net::AddrParseError, "AddrParseError"); +impl_from_error!(std::io::Error, "IoError"); diff --git a/src/geoip.rs b/src/geoip.rs new file mode 100644 index 0000000..319735d --- /dev/null +++ b/src/geoip.rs @@ -0,0 +1,48 @@ +use maxminddb::{self, geoip2, MaxMindDBError}; +use std::{net::IpAddr, path::PathBuf}; + +use crate::errors::ClassifyError; + +pub struct GeoIpActor { + reader: maxminddb::OwnedReader<'static>, +} + +impl GeoIpActor { + pub fn from_path>(path: P) -> Result> { + let path = path.into(); + let reader = maxminddb::Reader::open(path)?; + Ok(Self { reader }) + } +} + +impl<'a> actix::Actor for GeoIpActor { + type Context = actix::SyncContext; +} + +impl actix::Handler for GeoIpActor { + type Result = Result, ClassifyError>; + + fn handle(&mut self, msg: CountryForIp, _: &mut Self::Context) -> Self::Result { + self.reader + .lookup(msg.ip) + .or_else(|err| match err { + MaxMindDBError::AddressNotFoundError(_) => Ok(None), + _ => Err(err), + }) + .map_err(|err| err.into()) + } +} + +pub struct CountryForIp { + ip: IpAddr, +} + +impl CountryForIp { + pub fn new(ip: IpAddr) -> Self { + Self { ip } + } +} + +impl actix::Message for CountryForIp { + type Result = Result, ClassifyError>; +} diff --git a/src/main.rs b/src/main.rs index 20dc5b4..cfd4c61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,34 +4,24 @@ #![deny(missing_docs)] mod errors; +mod geoip; mod settings; +mod utils; +mod views; -use actix_web::{http, App, FutureResponse, HttpRequest, HttpResponse}; -use chrono::{DateTime, Utc}; -use futures::Future; -use maxminddb::{self, geoip2, MaxMindDBError}; -use serde::Serializer; -use serde_derive::Serialize; -use std::{ - fs::File, - io::Read, - net::{IpAddr, Ipv4Addr}, - path::PathBuf, +use actix_web::App; + +use crate::{ + errors::ClassifyError, + geoip::GeoIpActor, + settings::Settings, + views::{classify, dockerflow, ViewState}, }; -use crate::{errors::ClassifyError, settings::Settings}; - -#[derive(Clone)] -struct State { - geoip: actix::Addr, - settings: settings::Settings, -} - -fn main() { +fn main() -> Result<(), ClassifyError> { let sys = actix::System::new("classify-client"); - let settings = - Settings::load().unwrap_or_else(|err| panic!(format!("Could not load settings: {}", err))); + let settings = Settings::load()?; let geoip = { let path = settings.geoip_db_path.clone(); @@ -45,200 +35,22 @@ fn main() { }) }; - let state = State { geoip, settings }; + let state = ViewState { geoip, settings }; let addr = format!("{}:{}", state.settings.host, state.settings.port); let server = actix_web::server::new(move || { App::with_state(state.clone()) - .resource("/", |r| r.get().f(index)) + .resource("/", |r| r.get().f(classify::classify_client)) // Dockerflow views - .resource("/__lbheartbeat__", |r| r.get().f(lbheartbeat)) - .resource("/__heartbeat__", |r| r.get().f(heartbeat)) - .resource("/__version__", |r| r.get().f(version)) + .resource("/__lbheartbeat__", |r| r.get().f(dockerflow::lbheartbeat)) + .resource("/__heartbeat__", |r| r.get().f(dockerflow::heartbeat)) + .resource("/__version__", |r| r.get().f(dockerflow::version)) }) - .bind(&addr) - .unwrap_or_else(|err| panic!(format!("Couldn't listen on {}: {}", &addr, err))); + .bind(&addr)?; server.start(); println!("started server on https://{}", addr); sys.run(); -} - -struct GeoIpActor { - reader: maxminddb::OwnedReader<'static>, -} - -impl GeoIpActor { - fn from_path>(path: P) -> Result> { - let path = path.into(); - let reader = maxminddb::Reader::open(path)?; - Ok(Self { reader }) - } -} - -impl<'a> actix::Actor for GeoIpActor { - type Context = actix::SyncContext; -} - -impl actix::Handler for GeoIpActor { - type Result = Result, ClassifyError>; - - fn handle(&mut self, msg: CountryForIp, _: &mut Self::Context) -> Self::Result { - self.reader - .lookup(msg.ip) - .or_else(|err| match err { - MaxMindDBError::AddressNotFoundError(_) => Ok(None), - _ => Err(err), - }) - .map_err(|err| err.into()) - } -} - -struct CountryForIp { - ip: IpAddr, -} - -impl actix::Message for CountryForIp { - type Result = Result, ClassifyError>; -} - -#[derive(Serialize)] -struct ClientClassification { - request_time: DateTime, - - #[serde(serialize_with = "country_iso_code")] - country: Option, -} - -fn country_iso_code( - country_info: &Option, - serializer: S, -) -> Result { - let iso_code: Option = country_info - .clone() - .and_then(|country_info| country_info.country) - .and_then(|country| country.iso_code); - - match iso_code { - Some(code) => serializer.serialize_str(&code), - None => serializer.serialize_none(), - } -} - -impl Default for ClientClassification { - fn default() -> Self { - Self { - request_time: Utc::now(), - country: None, - } - } -} - -/// Determine the IP address of the client making a request, based on network -/// information and headers. -fn get_client_ip(request: &HttpRequest) -> Result { - // Actix has a method to do this, but it returns a string, and doesn't strip - // off ports if present, so it is difficult to use. - - if let Some(x_forwarded_for) = request.headers().get("X-Forwarded-For") { - let ips: Vec<_> = x_forwarded_for - .to_str()? - .split(',') - .map(|ip| ip.trim()) - .collect(); - if ips.len() == 1 { - return Ok(ips[0].parse()?); - } else if ips.len() > 1 { - // the last item is probably a google load balancer, strip that off, use the second-to-last item. - return Ok(ips[ips.len() - 2].parse()?); - } - // 0 items is an empty header, and weird. fall back to peer address detection - } - - // No headers were present, so use the peer address directly - if let Some(peer_addr) = request.peer_addr() { - return Ok(peer_addr.ip()); - } - - Err(ClassifyError::new("Could not determine IP")) -} - -fn index(req: &HttpRequest) -> Box> { - // TODO this is the sort of thing that the try operator (`?`) is supposed to - // be for. Is it possible to use the try operator with `Box>`? - let ip = match get_client_ip(req) { - Ok(v) => v, - Err(err) => { - return Box::new(futures::future::err(err)); - } - }; - - Box::new( - req.state() - .geoip - .send(CountryForIp { ip }) - .and_then(move |country| { - let mut classification = ClientClassification::default(); - match country { - Ok(country) => { - classification.country = country.clone(); - Ok(HttpResponse::Ok() - .header( - http::header::CACHE_CONTROL, - "max-age=0, no-cache, no-store, must-revalidate", - ) - .json(classification)) - } - Err(err) => Ok(HttpResponse::InternalServerError().body(format!("{}", err))), - } - }) - .map_err(|err| ClassifyError::from_source("Future failure", err)), - ) -} - -fn lbheartbeat(_req: &HttpRequest) -> HttpResponse { - HttpResponse::Ok().body("") -} - -#[derive(Serialize)] -struct HeartbeatResponse { - geoip: bool, -} - -fn heartbeat(req: &HttpRequest) -> FutureResponse { - let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); - - Box::new( - req.state() - .geoip - .send(CountryForIp { ip }) - .and_then(|res| match res { - Ok(country_info) => country_info - .and_then(|country_info| country_info.country) - .and_then(|country| country.iso_code) - .and_then(|iso_code| Some(Ok(iso_code == "US"))) - .unwrap_or(Ok(false)), - Err(_) => Ok(false), - }) - .or_else(|_| Ok(false)) - .and_then(|res| { - let mut resp = if res { - HttpResponse::Ok() - } else { - HttpResponse::ServiceUnavailable() - }; - Ok(resp.json(HeartbeatResponse { geoip: res })) - }), - ) -} - -fn version(req: &HttpRequest) -> HttpResponse { - let version_file = &req.state().settings.version_file; - // Read the file or deliberately fail with a 500 if missing. - let mut file = File::open(version_file).unwrap(); - let mut data = String::new(); - file.read_to_string(&mut data).unwrap(); - HttpResponse::Ok() - .content_type("application/json") - .body(data) + + Ok(()) } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..16556a9 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,39 @@ +use actix_web::HttpRequest; +use std::net::IpAddr; + +use crate::errors::ClassifyError; + +pub trait RequestExt { + /// Determine the IP address of the client making a request, based on network + /// information and headers. + /// + /// Actix has a method to do this, but it returns a string, and doesn't strip + /// off ports if present, so it is difficult to use. + fn client_ip(&self) -> Result; +} + +impl RequestExt for HttpRequest { + fn client_ip(&self) -> Result { + if let Some(x_forwarded_for) = self.headers().get("X-Forwarded-For") { + let ips: Vec<_> = x_forwarded_for + .to_str()? + .split(',') + .map(|ip| ip.trim()) + .collect(); + if ips.len() == 1 { + return Ok(ips[0].parse()?); + } else if ips.len() > 1 { + // the last item is probably a google load balancer, strip that off, use the second-to-last item. + return Ok(ips[ips.len() - 2].parse()?); + } + // 0 items is an empty header, and weird. fall back to peer address detection + } + + // No headers were present, so use the peer address directly + if let Some(peer_addr) = self.peer_addr() { + return Ok(peer_addr.ip()); + } + + Err(ClassifyError::new("Could not determine IP")) + } +} diff --git a/src/views/classify.rs b/src/views/classify.rs new file mode 100644 index 0000000..afb8568 --- /dev/null +++ b/src/views/classify.rs @@ -0,0 +1,75 @@ +use actix_web::{http, HttpRequest, HttpResponse}; +use chrono::{DateTime, Utc}; +use futures::Future; +use maxminddb::{self, geoip2}; +use serde::Serializer; +use serde_derive::Serialize; + +use crate::{errors::ClassifyError, geoip::CountryForIp, utils::RequestExt, views::ViewState}; + +#[derive(Serialize)] +struct ClientClassification { + request_time: DateTime, + + #[serde(serialize_with = "country_iso_code")] + country: Option, +} + +fn country_iso_code( + country_info: &Option, + serializer: S, +) -> Result { + let iso_code: Option = country_info + .clone() + .and_then(|country_info| country_info.country) + .and_then(|country| country.iso_code); + + match iso_code { + Some(code) => serializer.serialize_str(&code), + None => serializer.serialize_none(), + } +} + +impl Default for ClientClassification { + fn default() -> Self { + Self { + request_time: Utc::now(), + country: None, + } + } +} + +pub fn classify_client( + req: &HttpRequest, +) -> Box> { + // TODO this is the sort of thing that the try operator (`?`) is supposed to + // be for. Is it possible to use the try operator with `Box>`? + let ip = match req.client_ip() { + Ok(v) => v, + Err(err) => { + return Box::new(futures::future::err(err)); + } + }; + + Box::new( + req.state() + .geoip + .send(CountryForIp::new(ip)) + .and_then(move |country| { + let mut classification = ClientClassification::default(); + match country { + Ok(country) => { + classification.country = country.clone(); + Ok(HttpResponse::Ok() + .header( + http::header::CACHE_CONTROL, + "max-age=0, no-cache, no-store, must-revalidate", + ) + .json(classification)) + } + Err(err) => Ok(HttpResponse::InternalServerError().body(format!("{}", err))), + } + }) + .map_err(|err| ClassifyError::from_source("Future failure", err)), + ) +} diff --git a/src/views/dockerflow.rs b/src/views/dockerflow.rs new file mode 100644 index 0000000..7e1211a --- /dev/null +++ b/src/views/dockerflow.rs @@ -0,0 +1,57 @@ +use actix_web::{FutureResponse, HttpRequest, HttpResponse}; +use futures::Future; +use serde_derive::Serialize; +use std::{ + fs::File, + io::Read, + net::{IpAddr, Ipv4Addr}, +}; + +use crate::{geoip::CountryForIp, views::ViewState}; + +pub fn lbheartbeat(_req: &HttpRequest) -> HttpResponse { + HttpResponse::Ok().body("") +} + +#[derive(Serialize)] +struct HeartbeatResponse { + geoip: bool, +} + +pub fn heartbeat(req: &HttpRequest) -> FutureResponse { + let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); + + Box::new( + req.state() + .geoip + .send(CountryForIp::new(ip)) + .and_then(|res| match res { + Ok(country_info) => country_info + .and_then(|country_info| country_info.country) + .and_then(|country| country.iso_code) + .and_then(|iso_code| Some(Ok(iso_code == "US"))) + .unwrap_or(Ok(false)), + Err(_) => Ok(false), + }) + .or_else(|_| Ok(false)) + .and_then(|res| { + let mut resp = if res { + HttpResponse::Ok() + } else { + HttpResponse::ServiceUnavailable() + }; + Ok(resp.json(HeartbeatResponse { geoip: res })) + }), + ) +} + +pub fn version(req: &HttpRequest) -> HttpResponse { + let version_file = &req.state().settings.version_file; + // Read the file or deliberately fail with a 500 if missing. + let mut file = File::open(version_file).unwrap(); + let mut data = String::new(); + file.read_to_string(&mut data).unwrap(); + HttpResponse::Ok() + .content_type("application/json") + .body(data) +} diff --git a/src/views/mod.rs b/src/views/mod.rs new file mode 100644 index 0000000..bf0a70a --- /dev/null +++ b/src/views/mod.rs @@ -0,0 +1,10 @@ +pub mod classify; +pub mod dockerflow; + +use crate::{geoip::GeoIpActor, settings::Settings}; + +#[derive(Clone)] +pub struct ViewState { + pub geoip: actix::Addr, + pub settings: Settings, +} From d0b4a01d031797a9f41e54a6c6086d195a3438a0 Mon Sep 17 00:00:00 2001 From: Mike Cooper Date: Tue, 18 Dec 2018 15:38:21 -0800 Subject: [PATCH 2/2] Rename "views" to "endpoints" --- src/{views => endpoints}/classify.rs | 6 ++++-- src/{views => endpoints}/dockerflow.rs | 6 +++--- src/{views => endpoints}/mod.rs | 2 +- src/main.rs | 8 ++++---- 4 files changed, 12 insertions(+), 10 deletions(-) rename src/{views => endpoints}/classify.rs (93%) rename src/{views => endpoints}/dockerflow.rs (87%) rename src/{views => endpoints}/mod.rs (86%) diff --git a/src/views/classify.rs b/src/endpoints/classify.rs similarity index 93% rename from src/views/classify.rs rename to src/endpoints/classify.rs index afb8568..70831c1 100644 --- a/src/views/classify.rs +++ b/src/endpoints/classify.rs @@ -5,7 +5,9 @@ use maxminddb::{self, geoip2}; use serde::Serializer; use serde_derive::Serialize; -use crate::{errors::ClassifyError, geoip::CountryForIp, utils::RequestExt, views::ViewState}; +use crate::{ + endpoints::EndpointState, errors::ClassifyError, geoip::CountryForIp, utils::RequestExt, +}; #[derive(Serialize)] struct ClientClassification { @@ -40,7 +42,7 @@ impl Default for ClientClassification { } pub fn classify_client( - req: &HttpRequest, + req: &HttpRequest, ) -> Box> { // TODO this is the sort of thing that the try operator (`?`) is supposed to // be for. Is it possible to use the try operator with `Box>`? diff --git a/src/views/dockerflow.rs b/src/endpoints/dockerflow.rs similarity index 87% rename from src/views/dockerflow.rs rename to src/endpoints/dockerflow.rs index 7e1211a..79ee2b7 100644 --- a/src/views/dockerflow.rs +++ b/src/endpoints/dockerflow.rs @@ -7,7 +7,7 @@ use std::{ net::{IpAddr, Ipv4Addr}, }; -use crate::{geoip::CountryForIp, views::ViewState}; +use crate::{endpoints::EndpointState, geoip::CountryForIp}; pub fn lbheartbeat(_req: &HttpRequest) -> HttpResponse { HttpResponse::Ok().body("") @@ -18,7 +18,7 @@ struct HeartbeatResponse { geoip: bool, } -pub fn heartbeat(req: &HttpRequest) -> FutureResponse { +pub fn heartbeat(req: &HttpRequest) -> FutureResponse { let ip = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)); Box::new( @@ -45,7 +45,7 @@ pub fn heartbeat(req: &HttpRequest) -> FutureResponse { ) } -pub fn version(req: &HttpRequest) -> HttpResponse { +pub fn version(req: &HttpRequest) -> HttpResponse { let version_file = &req.state().settings.version_file; // Read the file or deliberately fail with a 500 if missing. let mut file = File::open(version_file).unwrap(); diff --git a/src/views/mod.rs b/src/endpoints/mod.rs similarity index 86% rename from src/views/mod.rs rename to src/endpoints/mod.rs index bf0a70a..26ddff5 100644 --- a/src/views/mod.rs +++ b/src/endpoints/mod.rs @@ -4,7 +4,7 @@ pub mod dockerflow; use crate::{geoip::GeoIpActor, settings::Settings}; #[derive(Clone)] -pub struct ViewState { +pub struct EndpointState { pub geoip: actix::Addr, pub settings: Settings, } diff --git a/src/main.rs b/src/main.rs index cfd4c61..cedaa4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,19 +3,19 @@ #![deny(clippy::all)] #![deny(missing_docs)] +mod endpoints; mod errors; mod geoip; mod settings; mod utils; -mod views; use actix_web::App; use crate::{ + endpoints::{classify, dockerflow, EndpointState}, errors::ClassifyError, geoip::GeoIpActor, settings::Settings, - views::{classify, dockerflow, ViewState}, }; fn main() -> Result<(), ClassifyError> { @@ -35,13 +35,13 @@ fn main() -> Result<(), ClassifyError> { }) }; - let state = ViewState { geoip, settings }; + let state = EndpointState { geoip, settings }; let addr = format!("{}:{}", state.settings.host, state.settings.port); let server = actix_web::server::new(move || { App::with_state(state.clone()) .resource("/", |r| r.get().f(classify::classify_client)) - // Dockerflow views + // Dockerflow Endpoints .resource("/__lbheartbeat__", |r| r.get().f(dockerflow::lbheartbeat)) .resource("/__heartbeat__", |r| r.get().f(dockerflow::heartbeat)) .resource("/__version__", |r| r.get().f(dockerflow::version))