use std::collections::HashMap;
use std::fs::{self, File};
use std::io::prelude::*;
use std::str::FromStr;
use std::time::Duration;
use anyhow::Result;
use brane_tsk::local::get_package_versions;
use chrono::{DateTime, Utc};
use console::{Alignment, pad_str, style};
use dialoguer::Confirm;
use flate2::Compression;
use flate2::write::GzEncoder;
use graphql_client::{GraphQLQuery, Response};
use indicatif::{ProgressBar, ProgressStyle};
use prettytable::Table;
use prettytable::format::FormatBuilder;
use reqwest::{self, Body, Client};
use specifications::package::{PackageInfo, PackageKind};
use specifications::version::Version;
use tokio::fs::File as TokioFile;
use tokio_util::codec::{BytesCodec, FramedRead};
use uuid::Uuid;
use crate::errors::RegistryError;
use crate::instance::InstanceInfo;
use crate::utils::{ensure_package_dir, ensure_packages_dir, get_packages_dir};
type DateTimeUtc = DateTime<Utc>;
#[inline]
pub fn get_graphql_endpoint() -> Result<String, RegistryError> {
Ok(format!("{}/graphql", InstanceInfo::from_active_path().map_err(|err| RegistryError::InstanceInfoError { err })?.api))
}
#[inline]
pub fn get_packages_endpoint() -> Result<String, RegistryError> {
Ok(format!("{}/packages", InstanceInfo::from_active_path().map_err(|err| RegistryError::InstanceInfoError { err })?.api))
}
#[inline]
pub fn get_data_endpoint() -> Result<String, RegistryError> {
Ok(format!("{}/data", InstanceInfo::from_active_path().map_err(|err| RegistryError::InstanceInfoError { err })?.api))
}
pub async fn pull(packages: Vec<(String, Version)>) -> Result<(), RegistryError> {
#[derive(GraphQLQuery)]
#[graphql(schema_path = "src/graphql/api_schema.json", query_path = "src/graphql/get_package.graphql", response_derives = "Debug")]
pub struct GetPackage;
for (name, version) in packages {
debug!("Pulling package '{}' version {}", name, version);
debug!("Downloading container...");
let packages_dir = match get_packages_dir() {
Ok(packages_dir) => packages_dir,
Err(err) => {
return Err(RegistryError::PackagesDirError { err });
},
};
let package_dir = packages_dir.join(&name);
let mut temp_file = tempfile::NamedTempFile::new().expect("Failed to create temporary file.");
let url = format!("{}/{}/{}", get_packages_endpoint()?, name, version);
let mut package_archive: reqwest::Response = match reqwest::get(&url).await {
Ok(archive) => archive,
Err(err) => {
return Err(RegistryError::PullRequestError { url, err });
},
};
if package_archive.status() != reqwest::StatusCode::OK {
return Err(RegistryError::PullRequestFailure { url, status: package_archive.status() });
}
let content_length = match package_archive.headers().get("content-length") {
Some(length) => length,
None => {
return Err(RegistryError::MissingContentLength { url });
},
};
let content_length = match content_length.to_str() {
Ok(length) => length,
Err(err) => {
return Err(RegistryError::ContentLengthStrError { url, err });
},
};
let content_length: u64 = match content_length.parse() {
Ok(length) => length,
Err(err) => {
return Err(RegistryError::ContentLengthParseError { url, raw: content_length.into(), err });
},
};
let progress = ProgressBar::new(content_length);
progress.set_style(
ProgressStyle::default_bar()
.template("Downloading... [{elapsed_precise}] {bar:40.cyan/blue} {percent}/100%")
.unwrap()
.progress_chars("##-"),
);
while let Some(chunk) = match package_archive.chunk().await {
Ok(chunk) => chunk,
Err(err) => {
return Err(RegistryError::PackageDownloadError { url, err });
},
} {
progress.inc(chunk.len() as u64);
if let Err(err) = temp_file.write_all(&chunk) {
return Err(RegistryError::PackageWriteError { url, path: temp_file.path().into(), err });
};
}
progress.finish();
let client = reqwest::Client::new();
let graphql_endpoint = get_graphql_endpoint()?;
debug!("Fetching package metadata from '{}'...", graphql_endpoint);
let variables = get_package::Variables { name: name.clone(), version: version.to_string() };
let graphql_query = GetPackage::build_query(variables);
let graphql_response = match client.post(&graphql_endpoint).json(&graphql_query).send().await {
Ok(response) => response,
Err(err) => {
return Err(RegistryError::GraphQLRequestError { url: graphql_endpoint, err });
},
};
let graphql_response: Response<get_package::ResponseData> = match graphql_response.json().await {
Ok(response) => response,
Err(err) => {
return Err(RegistryError::GraphQLResponseError { url: graphql_endpoint, err });
},
};
let version = if let Some(data) = graphql_response.data {
let package = match data.packages.first() {
Some(package) => package,
None => {
return Err(RegistryError::NoPackageInfo { url });
},
};
let kind = match PackageKind::from_str(&package.kind) {
Ok(kind) => kind,
Err(err) => {
return Err(RegistryError::KindParseError { url, raw: package.kind.clone(), err });
},
};
let version = match Version::from_str(&package.version) {
Ok(version) => version,
Err(err) => {
return Err(RegistryError::VersionParseError { url, raw: package.version.clone(), err });
},
};
let functions: HashMap<String, specifications::common::Function> = match package.functions_as_json.as_ref() {
Some(functions) => match serde_json::from_str(functions) {
Ok(functions) => functions,
Err(err) => {
return Err(RegistryError::FunctionsParseError { url, raw: functions.clone(), err });
},
},
None => HashMap::new(),
};
let types: HashMap<String, specifications::common::Type> = match package.types_as_json.as_ref() {
Some(types) => match serde_json::from_str(types) {
Ok(types) => types,
Err(err) => {
return Err(RegistryError::TypesParseError { url, raw: types.clone(), err });
},
},
None => HashMap::new(),
};
let package_info = PackageInfo {
created: package.created,
description: package.description.clone().unwrap_or_default(),
detached: package.detached,
digest: package.digest.clone(),
functions,
id: package.id,
kind,
name: package.name.clone(),
owners: package.owners.clone(),
types,
version,
};
let package_dir = package_dir.join(version.to_string());
if let Err(err) = fs::create_dir_all(&package_dir) {
return Err(RegistryError::PackageDirCreateError { path: package_dir, err });
}
let package_info_path = package_dir.join("package.yml");
let handle = match File::create(&package_info_path) {
Ok(handle) => handle,
Err(err) => {
return Err(RegistryError::PackageInfoCreateError { path: package_info_path, err });
},
};
if let Err(err) = serde_yaml::to_writer(handle, &package_info) {
return Err(RegistryError::PackageInfoWriteError { path: package_info_path, err });
}
version
} else {
return Err(RegistryError::NoPackageInfo { url });
};
let package_dir = package_dir.join(version.to_string());
if let Err(err) = fs::copy(temp_file.path(), package_dir.join("image.tar")) {
return Err(RegistryError::PackageCopyError { source: temp_file.path().into(), target: package_dir, err });
}
println!("\nSuccessfully pulled version {} of package {}.", style(&version).bold().cyan(), style(&name).bold().cyan(),);
}
Ok(())
}
pub async fn push(packages: Vec<(String, Version)>) -> Result<(), RegistryError> {
let packages_dir = match ensure_packages_dir(false) {
Ok(dir) => dir,
Err(err) => {
return Err(RegistryError::PackagesDirError { err });
},
};
debug!("Using Brane package directory: {}", packages_dir.display());
for (name, version) in packages {
let package_dir = packages_dir.join(&name);
let version = if version.is_latest() {
let mut versions = match get_package_versions(&name, &package_dir) {
Ok(versions) => versions,
Err(err) => {
return Err(RegistryError::VersionsError { name, err });
},
};
versions.sort();
versions[versions.len() - 1]
} else {
version
};
let package_dir = match ensure_package_dir(&name, Some(&version), false) {
Ok(dir) => dir,
Err(err) => {
return Err(RegistryError::PackageDirError { name, version, err });
},
};
let temp_path: std::path::PathBuf = std::env::temp_dir().join("temp.tar.gz");
let temp_file: File = File::create(&temp_path).unwrap();
let progress = ProgressBar::new(0);
progress.set_style(ProgressStyle::default_bar().template("Compressing... [{elapsed_precise}]").unwrap());
progress.enable_steady_tick(Duration::from_millis(250));
let gz = GzEncoder::new(&temp_file, Compression::fast());
let mut tar = tar::Builder::new(gz);
if let Err(err) = tar.append_path_with_name(package_dir.join("package.yml"), "package.yml") {
return Err(RegistryError::CompressionError { name, version, path: temp_path, err });
};
if let Err(err) = tar.append_path_with_name(package_dir.join("image.tar"), "image.tar") {
return Err(RegistryError::CompressionError { name, version, path: temp_path, err });
};
if let Err(err) = tar.into_inner() {
return Err(RegistryError::CompressionError { name, version, path: temp_path, err });
};
progress.finish();
let url = get_packages_endpoint()?;
debug!("Pushing package '{}' to '{}'...", temp_path.display(), url);
let request = Client::new().post(&url);
let progress = ProgressBar::new(0);
progress.set_style(ProgressStyle::default_bar().template("Uploading... [{elapsed_precise}]").unwrap());
progress.enable_steady_tick(Duration::from_millis(250));
let handle = match TokioFile::open(&temp_path).await {
Ok(handle) => handle,
Err(err) => {
return Err(RegistryError::PackageArchiveOpenError { path: temp_path, err });
},
};
let file = FramedRead::new(handle, BytesCodec::new());
let content_length = temp_path.metadata().unwrap().len();
let request = request.body(Body::wrap_stream(file)).header("Content-Type", "application/gzip").header("Content-Length", content_length);
let response = match request.send().await {
Ok(response) => response,
Err(err) => {
return Err(RegistryError::UploadError { path: temp_path, endpoint: url, err });
},
};
let response_status = response.status();
progress.finish();
if response_status.is_success() {
println!("\nSuccessfully pushed version {} of package {}.", style(&version).bold().cyan(), style(&name).bold().cyan(),);
} else {
match response.text().await {
Ok(text) => {
println!("\nFailed to push package: {text}");
},
Err(err) => {
println!("\nFailed to push package (and failed to retrieve response text: {err})");
},
};
}
}
Ok(())
}
pub async fn search(term: Option<String>) -> Result<()> {
#[derive(GraphQLQuery)]
#[graphql(schema_path = "src/graphql/api_schema.json", query_path = "src/graphql/search_packages.graphql", response_derives = "Debug")]
pub struct SearchPackages;
let client = reqwest::Client::new();
let graphql_endpoint = get_graphql_endpoint()?;
let variables = search_packages::Variables { term };
let graphql_query = SearchPackages::build_query(variables);
let graphql_response = client.post(graphql_endpoint).json(&graphql_query).send().await?;
let graphql_response: Response<search_packages::ResponseData> = graphql_response.json().await?;
if let Some(data) = graphql_response.data {
let packages = data.packages;
let format = FormatBuilder::new().column_separator('\0').borders('\0').padding(1, 1).build();
let mut table = Table::new();
table.set_format(format);
table.add_row(row!["NAME", "VERSION", "KIND", "DESCRIPTION"]);
for package in packages {
let name = pad_str(&package.name, 20, Alignment::Left, Some(".."));
let version = pad_str(&package.version, 10, Alignment::Left, Some(".."));
let kind = pad_str(&package.kind, 10, Alignment::Left, Some(".."));
let description = package.description.clone().unwrap_or_default();
let description = pad_str(&description, 50, Alignment::Left, Some(".."));
table.add_row(row![name, version, kind, description]);
}
table.printstd();
} else {
eprintln!("{:?}", graphql_response.errors);
};
Ok(())
}
pub async fn unpublish(name: String, version: Version, force: bool) -> Result<()> {
#[derive(GraphQLQuery)]
#[graphql(schema_path = "src/graphql/api_schema.json", query_path = "src/graphql/unpublish_package.graphql", response_derives = "Debug")]
pub struct UnpublishPackage;
let client = reqwest::Client::new();
let graphql_endpoint = get_graphql_endpoint()?;
if !force {
println!("Do you want to remove the following version(s)?");
println!("- {version}");
if !Confirm::new().interact()? {
return Ok(());
}
println!();
}
if version.is_latest() {
return Err(anyhow!("Cannot unpublish 'latest' package version; choose a version."));
}
let variables = unpublish_package::Variables { name, version: version.to_string() };
let graphql_query = UnpublishPackage::build_query(variables);
let graphql_response = client.post(graphql_endpoint).json(&graphql_query).send().await?;
let graphql_response: Response<unpublish_package::ResponseData> = graphql_response.json().await?;
if let Some(data) = graphql_response.data {
println!("{}", data.unpublish_package);
} else {
eprintln!("{:?}", graphql_response.errors);
};
Ok(())
}