use anyhow::Result; use bollard::query_parameters::CreateImageOptionsBuilder; use bollard::secret::{ ContainerCreateBody, ContainerInspectResponse, ContainerState, CreateImageInfo, Health, HealthConfig, HealthStatusEnum, HostConfig, PortBinding, }; use bollard::Docker; use futures_util::{FutureExt, StreamExt as _}; use sqlx::migrate::MigrateDatabase; use sqlx::{Connection, PgConnection}; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::Duration; use tokio::sync::OnceCell; use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::{debug, trace}; use zero_to_axum::{conf, Conf, ZeroToAxum}; static SHARED_DB: OnceCell> = OnceCell::const_new(); async fn get_shared_db() -> Arc { SHARED_DB .get_or_init(|| async { Arc::new(TestDb::spawn().await) }) .await .clone() } pub struct TestServer { server_task_handle: JoinHandle<()>, addr: SocketAddr, db: Arc, } impl TestServer { pub async fn spawn() -> TestServer { debug!("start test server"); // TODO: allow per-test DBs in some cases // let db = TestDb::spawn().await; // TODO: share test server between test file, somehow? let db = get_shared_db().await; let url = dbg!(db.get_url()); let server = ZeroToAxum::serve(Conf { app: conf::App { listen: "[::]:0".parse().unwrap(), }, database: conf::Database { url }, debug: true, }) .await .unwrap(); let addr = server.local_addr(); let server_task_handle = tokio::spawn(server.map(|res| res.unwrap())); debug!(?addr, "test server spawned"); TestServer { server_task_handle, addr, db, } } /// format a URL for the given path pub fn url(&self, path: &str) -> String { format!("http://{}{path}", self.addr) } /// Request a graceful shutdown and then wait for shutdown to complete pub async fn shutdown(self) -> Result<()> { self.server_task_handle.abort(); let _ = self.server_task_handle.await; self.db.stop().await; Ok(()) } } const TEST_DB_IMAGE_NAME: &str = "postgres"; const TEST_DB_SUPERUSER: &str = "postgres"; const TEST_DB_SUPERUSER_PASS: &str = "password"; const TEST_DB_APP_USER: &str = "app"; const TEST_DB_APP_PASS: &str = "apppass"; const TEST_DB_APP_NAME: &str = "ztoa"; pub struct TestDb { docker: Docker, // image_id: String, container: bollard::secret::ContainerInspectResponse, } impl TestDb { pub async fn spawn() -> Self { let docker = Docker::connect_with_local_defaults().expect("connect to docker daemon"); let docker = docker.negotiate_version().await.unwrap(); let version = docker.version().await.unwrap(); trace!("version: {version:?}"); let mut image_id = None; // check for image if let Ok(image) = docker.inspect_image(TEST_DB_IMAGE_NAME).await { image_id = Some(image.id.unwrap()); } // build docker image from docker file // let mut image_id = None; // { // let filename = "Dockerfile.db"; // let image_options = bollard::query_parameters::BuildImageOptionsBuilder::default() // .dockerfile(filename) // .rm(true) // .build(); // let archive_bytes = { // let mut archive = tar::Builder::new(Vec::new()); // archive.append_path(filename).unwrap(); // archive.into_inner().unwrap() // }; // let mut image_build_stream = docker.build_image( // image_options, // None, // Some(http_body_util::Either::Left(http_body_util::Full::new( // archive_bytes.into(), // ))), // ); // while let Some(msg) = image_build_stream.next().await { // info!("Message: {msg:?}"); // if let Ok(BuildInfo { // aux: Some(ImageId { id: Some(id) }), // .. // }) = msg // { // trace!("Image ID: {id}"); // image_id = Some(id); // } // } // } // let image_id = image_id.expect("get image id for built docker image"); // pull image if image_id.is_none() { let image_opts = CreateImageOptionsBuilder::new() .from_image(TEST_DB_IMAGE_NAME) .build(); trace!(?image_opts, "pull image"); let mut image_create_stream = docker.create_image(Some(image_opts), None, None); while let Some(msg) = image_create_stream.next().await { trace!("Message: {msg:?}"); if let Ok(CreateImageInfo { id: Some(id), .. }) = msg { trace!("Image ID: {id}"); image_id = Some(id); } } } let image_id = image_id.expect("get image id for built docker image"); // create and start docker container let container_id; { let container_config = ContainerCreateBody { image: Some(image_id.clone()), exposed_ports: Some([("5432/tcp".to_string(), [].into())].into()), host_config: Some(HostConfig { port_bindings: Some( [( "5432/tcp".to_string(), Some(vec![PortBinding { host_ip: Some("127.0.0.1".to_string()), host_port: None, // auto-assign }]), )] .into(), ), ..Default::default() }), env: Some(vec![ format!("POSTGRES_USER={TEST_DB_SUPERUSER}"), format!("POSTGRES_PASSWORD={TEST_DB_SUPERUSER_PASS}"), ]), healthcheck: Some(HealthConfig { test: Some(vec!["pg_isready -U postgres || exit 1".to_string()]), // nano seconds interval: Some(1 * 1000 * 1000 * 1000), timeout: Some(5 * 1000 * 1000 * 1000), retries: Some(5 * 1000 * 1000 * 1000), ..Default::default() }), ..Default::default() }; trace!("create container"); bollard::secret::ContainerCreateResponse { id: container_id, .. } = docker .create_container( None::, container_config, ) .await .unwrap(); trace!("start container"); docker .start_container( &container_id, None::, ) .await .unwrap(); } // wait for container to be started let container = loop { trace!("inspect container"); let container = docker .inspect_container( &container_id, None::, ) .await .unwrap(); if let ContainerInspectResponse { state: Some(ContainerState { health: Some(Health { status: Some(status), .. }), .. }), .. } = &container { trace!("status: {status:?}"); if *status == HealthStatusEnum::HEALTHY { break container; } } sleep(Duration::from_secs(2)).await; }; let db = TestDb { docker, // image_id, container, }; // setup app db { let mut conn = PgConnection::connect(&db.get_superuser_url()) .await .unwrap(); // create application user // Note: In general, string formtting a query is bad practice, but it's required here. sqlx::query(&format!( "CREATE USER {TEST_DB_APP_USER} WITH PASSWORD '{TEST_DB_APP_PASS}';" )) .execute(&mut conn) .await .unwrap(); // grant privs to app user // Note: In general, string formtting a query is bad practice, but it's required here. sqlx::query(&format!("ALTER USER {TEST_DB_APP_USER} CREATEDB;")) .execute(&mut conn) .await .unwrap(); } // create test db sqlx::Postgres::create_database(&dbg!(db.get_url())) .await .unwrap(); let mut conn = PgConnection::connect(&db.get_url()).await.unwrap(); // run migrations on test db let m = sqlx::migrate::Migrator::new(Path::new("./migrations")) .await .unwrap(); m.run(&mut conn).await.unwrap(); db } /// Get the authenticated URL for accessing the test DB from the host. pub fn get_url(&self) -> String { let binding = self .container .network_settings .as_ref() .unwrap() .ports .as_ref() .unwrap() .get("5432/tcp") .as_ref() .unwrap() .as_ref() .unwrap() .first() .unwrap(); let host_ip = binding.host_ip.as_ref().unwrap().clone(); let host_port = binding.host_port.as_ref().unwrap().clone(); format!("postgres://{TEST_DB_APP_USER}:{TEST_DB_APP_PASS}@{host_ip}:{host_port}/{TEST_DB_APP_NAME}") } /// Get the superuser-authenticated URL for accessing the `postgres` db from the host. fn get_superuser_url(&self) -> String { let binding = self .container .network_settings .as_ref() .unwrap() .ports .as_ref() .unwrap() .get("5432/tcp") .as_ref() .unwrap() .as_ref() .unwrap() .first() .unwrap(); let host_ip = binding.host_ip.as_ref().unwrap().clone(); let host_port = binding.host_port.as_ref().unwrap().clone(); format!("postgres://{TEST_DB_SUPERUSER}:{TEST_DB_SUPERUSER_PASS}@{host_ip}:{host_port}/postgres") } pub async fn stop(&self) { self.docker .stop_container( &self.container.id.as_deref().unwrap(), #[allow(deprecated)] // is deprecated, but also required None::, ) .await .unwrap(); self.docker .remove_container( &self.container.id.as_deref().unwrap(), None::, ) .await .unwrap(); // // TODO: images seem like they might be reused, this is probably a bad idea, but it seems to work? // let _ = self // .docker // .remove_image( // &self.image_id, // None::, // None, // ) // .await; } }