diff --git a/migrations/20250718155413_create-idempotency-table.sql b/migrations/20250718155413_create-idempotency-table.sql new file mode 100644 index 0000000..4981535 --- /dev/null +++ b/migrations/20250718155413_create-idempotency-table.sql @@ -0,0 +1,14 @@ +CREATE TYPE header_pair AS ( + name TEXT, + value BYTEA +); + +CREATE TABLE idempotency ( + user_id UUID NOT NULL REFERENCES users(id), + nonce TEXT NOT NULL, + response_code SMALLINT NOT NULL, + response_headers header_pair[] NOT NULL, + response_body BYTEA NOT NULL, + created_at timestamptz NOT NULL, + PRIMARY KEY(user_id, nonce) +); diff --git a/migrations/20250718162913_create-issue-delivery-queue-table.sql b/migrations/20250718162913_create-issue-delivery-queue-table.sql new file mode 100644 index 0000000..2608d7d --- /dev/null +++ b/migrations/20250718162913_create-issue-delivery-queue-table.sql @@ -0,0 +1,5 @@ +CREATE TABLE issue_delivery_queue ( + newsletter_issue_id UUID NOT NULL, + subscriber_email TEXT NOT NULL, + PRIMARY KEY (newsletter_issue_id, subscriber_email) +); diff --git a/migrations/20250718164553_create-newsletter-issue-table.sql b/migrations/20250718164553_create-newsletter-issue-table.sql new file mode 100644 index 0000000..0651f2b --- /dev/null +++ b/migrations/20250718164553_create-newsletter-issue-table.sql @@ -0,0 +1,5 @@ +CREATE TABLE newsletter_issue ( + id UUID PRIMARY KEY, + subject TEXT NOT NULL, + body TEXT NOT NULL +); diff --git a/src/email_client/mod.rs b/src/email_client/mod.rs index 8932978..613476d 100644 --- a/src/email_client/mod.rs +++ b/src/email_client/mod.rs @@ -14,7 +14,7 @@ use tokio::time::timeout; use crate::conf; #[derive(Clone)] -pub(crate) enum EmailClient { +pub enum EmailClient { Disabled, Enabled { inner: AsyncSmtpTransport, diff --git a/src/lib.rs b/src/lib.rs index f7c027e..1301a8a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod conf; mod email_client; mod server; +pub mod workers; pub use conf::Conf; pub use server::ZeroToAxum; diff --git a/src/main.rs b/src/main.rs index 494acd7..1252d97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::{Context, Result}; -use zero_to_axum::{Conf, ZeroToAxum}; +use zero_to_axum::{workers, Conf, ZeroToAxum}; fn init_tracing() { use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter}; @@ -16,7 +16,11 @@ async fn main() -> Result<()> { init_tracing(); let conf = Conf::read()?; + let workers = workers::spawn_workers(conf.clone()); let server = ZeroToAxum::serve(conf).await?; - server.await.context("run server") + tokio::select! { + ret = server => ret.context("run server"), + ret = workers => ret.context("run workers"), + } } diff --git a/src/server/routes/subscriptions/mod.rs b/src/server/routes/subscriptions/mod.rs index 9e8b5d0..c23a02e 100644 --- a/src/server/routes/subscriptions/mod.rs +++ b/src/server/routes/subscriptions/mod.rs @@ -29,7 +29,10 @@ pub struct SubscribeForm { pub async fn subscribe( State(AppState { - db, email_client, .. + db, + email_client, + conf, + .. }): State, Form(form): Form, ) -> Result<(), SubscribeError> { @@ -171,11 +174,47 @@ impl IntoResponse for ConfirmError { } } -pub async fn publish(// State(AppState { db, .. }): State, +#[derive(Deserialize)] +pub struct PublishForm { + subject: String, + body: String, +} + +pub async fn publish( + State(AppState { db, .. }): State, // Query(query): Query, + Form(PublishForm { subject, body }): Form, ) -> Result<(), PublishError> { info!("publish"); + let mut txn = db.begin().await.context("start database transaction")?; + + let newsletter_issue_id = Uuid::new_v4(); + sqlx::query!( + r#" + INSERT INTO newsletter_issue (id, subject, body) + VALUES ($1, $2, $3); + "#, + newsletter_issue_id, + subject, + body + ) + .execute(&mut *txn) + .await + .context("create newsletter")?; + sqlx::query!( + r#" + INSERT INTO issue_delivery_queue (newsletter_issue_id, subscriber_email) + SELECT $1, email FROM subscriptions WHERE status = 'confirmed'; + "#, + newsletter_issue_id + ) + .execute(&mut *txn) + .await + .context("enqueue newsletter sends")?; + + txn.commit().await.context("commit transaction")?; + Ok(()) } diff --git a/src/workers/issue_delivery.rs b/src/workers/issue_delivery.rs new file mode 100644 index 0000000..b5d83cd --- /dev/null +++ b/src/workers/issue_delivery.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use sqlx::PgPool; +use tracing::error; + +use crate::email_client::EmailClient; + +pub async fn do_work(db: PgPool, email_client: EmailClient) -> Result<()> { + loop { + match try_step_task(&db, &email_client).await { + Ok(Some(())) => (), + Ok(None) => { + // TODO: don't poll, add signal mechanism + tokio::time::sleep(Duration::from_secs(10)).await; + } + Err(e) => { + error!("task failed: {e}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } +} + +async fn try_step_task(db: &PgPool, email_client: &EmailClient) -> Result> { + let mut transaction = db.begin().await?; + + let task = sqlx::query!( + r#" + SELECT newsletter_issue_id, subscriber_email FROM issue_delivery_queue + FOR UPDATE + SKIP LOCKED + LIMIT 1 + "# + ) + .fetch_optional(&mut *transaction) + .await?; + + let Some(task) = task else { + return Ok(None); + }; + + let newsletter = sqlx::query!( + r#" + SELECT subject, body FROM newsletter_issue + WHERE id = $1 + "#, + task.newsletter_issue_id + ) + .fetch_one(&mut *transaction) + .await + .context("fetch newsletter content for task")?; + + match task.subscriber_email.parse() { + Ok(email) => { + if let Err(e) = email_client + .send_email(email, newsletter.subject.clone(), newsletter.body.clone()) + .await + { + error!("failed to send email: {e}"); + } + } + Err(e) => { + error!("failed to parse stored email address: {e}") + } + } + + sqlx::query!( + r#" + DELETE FROM issue_delivery_queue + WHERE + newsletter_issue_id = $1 AND + subscriber_email = $2 + "#, + task.newsletter_issue_id, + task.subscriber_email + ) + .execute(&mut *transaction) + .await?; + transaction.commit().await?; + + Ok(Some(())) +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs new file mode 100644 index 0000000..a92b110 --- /dev/null +++ b/src/workers/mod.rs @@ -0,0 +1,24 @@ +use anyhow::{Context as _, Result}; +use sqlx::postgres::PgPoolOptions; + +use crate::{email_client, Conf}; + +pub mod issue_delivery; + +pub async fn spawn_workers(conf: Conf) -> Result<()> { + let db = PgPoolOptions::new() + .max_connections(5) + .connect(&conf.database.url) + .await?; + + let email_client = + email_client::EmailClient::new(conf.email.clone()).context("build email client")?; + + tokio::spawn(crate::workers::issue_delivery::do_work( + db.clone(), + email_client.clone(), + )) + .await??; + + Ok(()) +}