Skip to content

Commit

Permalink
feat: add core secondary api and forwarding middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Fontanier committed Aug 6, 2024
1 parent fc7c02e commit d6cc818
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/deploy-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ jobs:
chmod +x ./k8s/deploy-image.sh
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/core-image:${{ steps.short_sha.outputs.short_sha }} core-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/core-image:${{ steps.short_sha.outputs.short_sha }} core-sqlite-worker-deployment
./k8s/deploy-image.sh gcr.io/$GCLOUD_PROJECT_ID/core-image:${{ steps.short_sha.outputs.short_sha }} core-secondary-deployment
- name: Wait for rollout to complete
run: |
echo "Waiting for rollout to complete (web)"
kubectl rollout status deployment/core-deployment --timeout=10m
echo "Waiting for rollout to complete (sqlite worker)"
kubectl rollout status deployment/core-sqlite-worker-deployment --timeout=10m
echo "Waiting for rollout to complete (secondary)"
kubectl rollout status deployment/core-secondary-deployment --timeout=10m
2 changes: 2 additions & 0 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use dust::{
project,
providers::provider::{provider, ProviderID},
run,
secondary_api::forward_middleware,
sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS},
stores::{postgres, store},
utils::{self, error_response, APIError, APIResponse, CoreRequestMakeSpan},
Expand Down Expand Up @@ -2821,6 +2822,7 @@ fn main() {
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
)
.layer(from_fn(validate_api_key))
.layer(from_fn(forward_middleware))
.with_state(state.clone());

let sqlite_heartbeat_router = Router::new()
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,5 @@ pub mod oauth {
}

pub mod api_keys;

pub mod secondary_api;
78 changes: 78 additions & 0 deletions core/src/secondary_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use axum::{
body::{Body, Bytes},
extract::Request,
middleware::Next,
response::Response,
};
use http::StatusCode;
use lazy_static::lazy_static;
use reqwest::Client;
use tracing::error;

lazy_static! {
static ref SECONDARY_API_FORWARDING_ENABLED: bool =
std::env::var("SECONDARY_API_FORWARDING_ENABLED")
.map(|s| s == "true")
.unwrap_or(false);
static ref IS_SECONDARY: bool = std::env::var("IS_SECONDARY")
.map(|s| s == "true")
.unwrap_or(false);
static ref CORE_SECONDARY_API_URL: String =
std::env::var("CORE_SECONDARY_API").unwrap_or_default();
}

fn should_forward(req: &Request<Body>) -> bool {
if *SECONDARY_API_FORWARDING_ENABLED && !*IS_SECONDARY {
if CORE_SECONDARY_API_URL.is_empty() {
error!("CORE_SECONDARY_API is not set");
}
// Forward all requests for paths that contain "/tables" or "/query_database"
req.uri().path().contains("/tables") || req.uri().path().contains("/query_database")
} else {
false
}
}

pub async fn forward_middleware(req: Request<Body>, next: Next) -> Result<Response, StatusCode> {
if should_forward(&req) {
let client = Client::new();
let (parts, body) = req.into_parts();
let body_bytes: Bytes = axum::body::to_bytes(body, usize::MAX)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let new_url = format!(
"{}{}",
*CORE_SECONDARY_API_URL,
parts.uri.path_and_query().map_or("", |x| x.as_str())
);

let mut new_req = client.request(parts.method, new_url).body(body_bytes);

for (name, value) in parts.headers.iter() {
new_req = new_req.header(name, value);
}

match new_req.send().await {
Ok(response) => {
let status = response.status();
let headers = response.headers().clone();
let body = response
.bytes()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let mut builder = Response::builder().status(status);
let headers_mut = builder.headers_mut().unwrap();
headers_mut.extend(headers);

builder
.body(Body::from(body))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
} else {
Ok(next.run(req).await)
}
}
3 changes: 3 additions & 0 deletions k8s/apply_infra.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ apply_deployment connectors-worker-google-drive-deployment
apply_deployment metabase-deployment
apply_deployment alerting-temporal-deployment
apply_deployment core-deployment
apply_deployment core-secondary-deployment
apply_deployment core-sqlite-worker-deployment
apply_deployment oauth-deployment
apply_deployment prodbox-deployment
Expand All @@ -144,6 +145,7 @@ kubectl apply -f "$(dirname "$0")/services/connectors-service.yaml"
kubectl apply -f "$(dirname "$0")/services/connectors-worker-service.yaml"
kubectl apply -f "$(dirname "$0")/services/metabase-service.yaml"
kubectl apply -f "$(dirname "$0")/services/core-service.yaml"
kubectl apply -f "$(dirname "$0")/services/core-secondary-service.yaml"
kubectl apply -f "$(dirname "$0")/services/core-sqlite-worker-headless-service.yaml"
kubectl apply -f "$(dirname "$0")/services/oauth-service.yaml"
kubectl apply -f "$(dirname "$0")/services/viz-service.yaml"
Expand All @@ -160,5 +162,6 @@ echo "Applying network policies"
echo "-----------------------------------"

kubectl apply -f "$(dirname "$0")/network-policies/core-network-policy.yaml"
kubectl apply -f "$(dirname "$0")/network-policies/core-secondary-network-policy.yaml"
kubectl apply -f "$(dirname "$0")/network-policies/oauth-network-policy.yaml"
kubectl apply -f "$(dirname "$0")/network-policies/core-sqlite-worker-network-policy.yaml"
2 changes: 2 additions & 0 deletions k8s/configmaps/core-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ data:
DD_SERVICE: "core"
DD_LOGS_INJECTION: "true"
DD_RUNTIME_METRICS_ENABLED: "true"
CORE_SECONDARY_API: "http://core-secondary-service"
SECONDARY_API_FORWARDING_ENABLED: "false"
3 changes: 3 additions & 0 deletions k8s/deployments/core-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ spec:
fieldRef:
fieldPath: status.hostIP

- name: IS_SECONDARY
value: "false"

volumeMounts:
- name: service-account-volume
mountPath: /etc/service-accounts
Expand Down
69 changes: 69 additions & 0 deletions k8s/deployments/core-secondary-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: core-secondary-deployment
spec:
replicas: 1
selector:
matchLabels:
app: core-secondary
template:
metadata:
labels:
app: core-secondary
name: core-secondary-pod
admission.datadoghq.com/enabled: "true"
annotations:
ad.datadoghq.com/web.logs: '[{"source": "core-secondary","service": "core-secondary","tags": ["env:prod"]}]'
spec:
terminationGracePeriodSeconds: 180
containers:
- name: web
image: gcr.io/or1g1n-186209/core-image:latest
command: ["cargo", "run", "--release", "--bin", "dust-api"]
imagePullPolicy: Always
ports:
- containerPort: 3001
readinessProbe:
httpGet:
path: /
port: 3001
initialDelaySeconds: 10
periodSeconds: 5

envFrom:
- configMapRef:
name: core-config
- secretRef:
name: core-secrets
env:
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP

- name: IS_SECONDARY
value: "true"

volumeMounts:
- name: service-account-volume
mountPath: /etc/service-accounts
- name: api-keys-volume
mountPath: /etc/api-keys

resources:
requests:
cpu: 4000m
memory: 8Gi
limits:
cpu: 4000m
memory: 8Gi

volumes:
- name: service-account-volume
secret:
secretName: gcp-service-account-secret

- name: api-keys-volume
secret:
secretName: core-api-keys-secret
22 changes: 22 additions & 0 deletions k8s/network-policies/core-secondary-network-policy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: core-secondary-network-policy
spec:
podSelector:
matchLabels:
app: core-secondary
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: core

- podSelector:
matchLabels:
app: prodbox
ports:
- protocol: TCP
port: 3001
3 changes: 3 additions & 0 deletions k8s/network-policies/core-sqlite-worker-network-policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ spec:
- podSelector:
matchLabels:
app: core
- podSelector:
matchLabels:
app: core-secondary
ports:
- protocol: TCP
port: 3005
15 changes: 15 additions & 0 deletions k8s/services/core-secondary-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: core-secondary-service
annotations:
cloud.google.com/backend-config: '{"default": "core-backendconfig"}'
spec:
selector:
app: core-secondary
name: core-secondary-pod
ports:
- protocol: TCP
port: 80
targetPort: 3001
type: ClusterIP

0 comments on commit d6cc818

Please sign in to comment.