publish newsletters using background worker

This commit is contained in:
azdle 2025-07-22 10:07:58 -05:00
parent cb77c609dd
commit ee36efecad
9 changed files with 180 additions and 5 deletions

View file

@ -0,0 +1,14 @@
CREATE TYPE header_pair AS (
name TEXT,
value BYTEA
);
CREATE TABLE idempotency (
user_id UUID NOT NULL REFERENCES users(id),
nonce TEXT NOT NULL,
response_code SMALLINT NOT NULL,
response_headers header_pair[] NOT NULL,
response_body BYTEA NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY(user_id, nonce)
);

View file

@ -0,0 +1,5 @@
CREATE TABLE issue_delivery_queue (
newsletter_issue_id UUID NOT NULL,
subscriber_email TEXT NOT NULL,
PRIMARY KEY (newsletter_issue_id, subscriber_email)
);

View file

@ -0,0 +1,5 @@
CREATE TABLE newsletter_issue (
id UUID PRIMARY KEY,
subject TEXT NOT NULL,
body TEXT NOT NULL
);

View file

@ -14,7 +14,7 @@ use tokio::time::timeout;
use crate::conf; use crate::conf;
#[derive(Clone)] #[derive(Clone)]
pub(crate) enum EmailClient { pub enum EmailClient {
Disabled, Disabled,
Enabled { Enabled {
inner: AsyncSmtpTransport<Tokio1Executor>, inner: AsyncSmtpTransport<Tokio1Executor>,

View file

@ -1,6 +1,7 @@
pub mod conf; pub mod conf;
mod email_client; mod email_client;
mod server; mod server;
pub mod workers;
pub use conf::Conf; pub use conf::Conf;
pub use server::ZeroToAxum; pub use server::ZeroToAxum;

View file

@ -1,5 +1,5 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use zero_to_axum::{Conf, ZeroToAxum}; use zero_to_axum::{workers, Conf, ZeroToAxum};
fn init_tracing() { fn init_tracing() {
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter}; use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
@ -16,7 +16,11 @@ async fn main() -> Result<()> {
init_tracing(); init_tracing();
let conf = Conf::read()?; let conf = Conf::read()?;
let workers = workers::spawn_workers(conf.clone());
let server = ZeroToAxum::serve(conf).await?; let server = ZeroToAxum::serve(conf).await?;
server.await.context("run server") tokio::select! {
ret = server => ret.context("run server"),
ret = workers => ret.context("run workers"),
}
} }

View file

@ -29,7 +29,10 @@ pub struct SubscribeForm {
pub async fn subscribe( pub async fn subscribe(
State(AppState { State(AppState {
db, email_client, .. db,
email_client,
conf,
..
}): State<AppState>, }): State<AppState>,
Form(form): Form<SubscribeForm>, Form(form): Form<SubscribeForm>,
) -> Result<(), SubscribeError> { ) -> Result<(), SubscribeError> {
@ -171,11 +174,47 @@ impl IntoResponse for ConfirmError {
} }
} }
pub async fn publish(// State(AppState { db, .. }): State<AppState>, #[derive(Deserialize)]
pub struct PublishForm {
subject: String,
body: String,
}
pub async fn publish(
State(AppState { db, .. }): State<AppState>,
// Query(query): Query<PublishQuery>, // Query(query): Query<PublishQuery>,
Form(PublishForm { subject, body }): Form<PublishForm>,
) -> Result<(), PublishError> { ) -> Result<(), PublishError> {
info!("publish"); info!("publish");
let mut txn = db.begin().await.context("start database transaction")?;
let newsletter_issue_id = Uuid::new_v4();
sqlx::query!(
r#"
INSERT INTO newsletter_issue (id, subject, body)
VALUES ($1, $2, $3);
"#,
newsletter_issue_id,
subject,
body
)
.execute(&mut *txn)
.await
.context("create newsletter")?;
sqlx::query!(
r#"
INSERT INTO issue_delivery_queue (newsletter_issue_id, subscriber_email)
SELECT $1, email FROM subscriptions WHERE status = 'confirmed';
"#,
newsletter_issue_id
)
.execute(&mut *txn)
.await
.context("enqueue newsletter sends")?;
txn.commit().await.context("commit transaction")?;
Ok(()) Ok(())
} }

View file

@ -0,0 +1,83 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use sqlx::PgPool;
use tracing::error;
use crate::email_client::EmailClient;
pub async fn do_work(db: PgPool, email_client: EmailClient) -> Result<()> {
loop {
match try_step_task(&db, &email_client).await {
Ok(Some(())) => (),
Ok(None) => {
// TODO: don't poll, add signal mechanism
tokio::time::sleep(Duration::from_secs(10)).await;
}
Err(e) => {
error!("task failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
async fn try_step_task(db: &PgPool, email_client: &EmailClient) -> Result<Option<()>> {
let mut transaction = db.begin().await?;
let task = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#
)
.fetch_optional(&mut *transaction)
.await?;
let Some(task) = task else {
return Ok(None);
};
let newsletter = sqlx::query!(
r#"
SELECT subject, body FROM newsletter_issue
WHERE id = $1
"#,
task.newsletter_issue_id
)
.fetch_one(&mut *transaction)
.await
.context("fetch newsletter content for task")?;
match task.subscriber_email.parse() {
Ok(email) => {
if let Err(e) = email_client
.send_email(email, newsletter.subject.clone(), newsletter.body.clone())
.await
{
error!("failed to send email: {e}");
}
}
Err(e) => {
error!("failed to parse stored email address: {e}")
}
}
sqlx::query!(
r#"
DELETE FROM issue_delivery_queue
WHERE
newsletter_issue_id = $1 AND
subscriber_email = $2
"#,
task.newsletter_issue_id,
task.subscriber_email
)
.execute(&mut *transaction)
.await?;
transaction.commit().await?;
Ok(Some(()))
}

24
src/workers/mod.rs Normal file
View file

@ -0,0 +1,24 @@
use anyhow::{Context as _, Result};
use sqlx::postgres::PgPoolOptions;
use crate::{email_client, Conf};
pub mod issue_delivery;
pub async fn spawn_workers(conf: Conf) -> Result<()> {
let db = PgPoolOptions::new()
.max_connections(5)
.connect(&conf.database.url)
.await?;
let email_client =
email_client::EmailClient::new(conf.email.clone()).context("build email client")?;
tokio::spawn(crate::workers::issue_delivery::do_work(
db.clone(),
email_client.clone(),
))
.await??;
Ok(())
}