Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2c33fc4

Browse filesBrowse files
Merge pull request theseus-rs#92 from theseus-rs/add-extractors
feat!: add configurable extractors
2 parents 3d74b9a + f08b558 commit 2c33fc4
Copy full SHA for 2c33fc4

File tree

Expand file treeCollapse file tree

16 files changed

+334
-189
lines changed
Filter options
Expand file treeCollapse file tree

16 files changed

+334
-189
lines changed

‎examples/archive_async/src/main.rs

Copy file name to clipboardExpand all lines: examples/archive_async/src/main.rs
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use postgresql_archive::{
77

88
#[tokio::main]
99
async fn main() -> Result<()> {
10+
let url = THESEUS_POSTGRESQL_BINARIES_URL;
1011
let version_req = VersionReq::STAR;
11-
let (archive_version, archive) =
12-
get_archive(THESEUS_POSTGRESQL_BINARIES_URL, &version_req).await?;
12+
let (archive_version, archive) = get_archive(url, &version_req).await?;
1313
let out_dir = tempfile::tempdir()?.into_path();
14-
extract(&archive, &out_dir).await?;
14+
extract(url, &archive, &out_dir).await?;
1515
println!(
1616
"PostgreSQL {} extracted to {}",
1717
archive_version,

‎examples/archive_sync/src/main.rs

Copy file name to clipboardExpand all lines: examples/archive_sync/src/main.rs
+3-2Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use postgresql_archive::blocking::{extract, get_archive};
55
use postgresql_archive::{Result, VersionReq, THESEUS_POSTGRESQL_BINARIES_URL};
66

77
fn main() -> Result<()> {
8+
let url = THESEUS_POSTGRESQL_BINARIES_URL;
89
let version_req = VersionReq::STAR;
9-
let (archive_version, archive) = get_archive(THESEUS_POSTGRESQL_BINARIES_URL, &version_req)?;
10+
let (archive_version, archive) = get_archive(url, &version_req)?;
1011
let out_dir = tempfile::tempdir()?.into_path();
11-
extract(&archive, &out_dir)?;
12+
extract(url, &archive, &out_dir)?;
1213
println!(
1314
"PostgreSQL {} extracted to {}",
1415
archive_version,

‎postgresql_archive/benches/archive.rs

Copy file name to clipboardExpand all lines: postgresql_archive/benches/archive.rs
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn bench_extract(criterion: &mut Criterion) -> Result<()> {
2424
fn extract_archive(archive: &Vec<u8>) -> Result<()> {
2525
let out_dir = tempfile::tempdir()?.path().to_path_buf();
2626
create_dir_all(&out_dir)?;
27-
extract(archive, &out_dir)?;
27+
extract(THESEUS_POSTGRESQL_BINARIES_URL, archive, &out_dir)?;
2828
remove_dir_all(&out_dir)?;
2929
Ok(())
3030
}

‎postgresql_archive/src/archive.rs

Copy file name to clipboardExpand all lines: postgresql_archive/src/archive.rs
+6-164Lines changed: 6 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
11
//! Manage PostgreSQL archives
22
#![allow(dead_code)]
33

4-
use crate::error::Error::Unexpected;
54
use crate::error::Result;
6-
use crate::repository;
7-
use flate2::bufread::GzDecoder;
8-
use human_bytes::human_bytes;
9-
use num_format::{Locale, ToFormattedString};
5+
use crate::{extractor, repository};
106
use semver::{Version, VersionReq};
11-
use std::fs::{create_dir_all, remove_dir_all, remove_file, rename, File};
12-
use std::io::{copy, BufReader, Cursor};
13-
use std::path::{Path, PathBuf};
14-
use std::thread::sleep;
15-
use std::time::Duration;
16-
use tar::Archive;
17-
use tracing::{debug, instrument, warn};
7+
use std::path::Path;
8+
use tracing::instrument;
189

1910
pub const THESEUS_POSTGRESQL_BINARIES_URL: &str =
2011
"https://github.com/theseus-rs/postgresql-binaries";
@@ -47,164 +38,15 @@ pub async fn get_archive(url: &str, version_req: &VersionReq) -> Result<(Version
4738
Ok((version, bytes))
4839
}
4940

50-
/// Acquires a lock file in the [out_dir](Path) to prevent multiple processes from extracting the
51-
/// archive at the same time.
52-
///
53-
/// # Errors
54-
/// * If the lock file cannot be acquired.
55-
#[instrument(level = "debug")]
56-
fn acquire_lock(out_dir: &Path) -> Result<PathBuf> {
57-
let lock_file = out_dir.join("postgresql-archive.lock");
58-
59-
if lock_file.is_file() {
60-
let metadata = lock_file.metadata()?;
61-
let created = metadata.created()?;
62-
63-
if created.elapsed()?.as_secs() > 300 {
64-
warn!(
65-
"Stale lock file detected; removing file to attempt process recovery: {}",
66-
lock_file.to_string_lossy()
67-
);
68-
remove_file(&lock_file)?;
69-
}
70-
}
71-
72-
debug!(
73-
"Attempting to acquire lock: {}",
74-
lock_file.to_string_lossy()
75-
);
76-
77-
for _ in 0..30 {
78-
let lock = std::fs::OpenOptions::new()
79-
.create(true)
80-
.truncate(true)
81-
.write(true)
82-
.open(&lock_file);
83-
84-
match lock {
85-
Ok(_) => {
86-
debug!("Lock acquired: {}", lock_file.to_string_lossy());
87-
return Ok(lock_file);
88-
}
89-
Err(error) => {
90-
warn!("unable to acquire lock: {error}");
91-
sleep(Duration::from_secs(1));
92-
}
93-
}
94-
}
95-
96-
Err(Unexpected("Failed to acquire lock".to_string()))
97-
}
98-
9941
/// Extracts the compressed tar `bytes` to the [out_dir](Path).
10042
///
10143
/// # Errors
10244
/// Returns an error if the extraction fails.
10345
#[allow(clippy::cast_precision_loss)]
10446
#[instrument(skip(bytes))]
105-
pub async fn extract(bytes: &Vec<u8>, out_dir: &Path) -> Result<()> {
106-
let input = BufReader::new(Cursor::new(bytes));
107-
let decoder = GzDecoder::new(input);
108-
let mut archive = Archive::new(decoder);
109-
let mut files = 0;
110-
let mut extracted_bytes = 0;
111-
112-
let parent_dir = if let Some(parent) = out_dir.parent() {
113-
parent
114-
} else {
115-
debug!("No parent directory for {}", out_dir.to_string_lossy());
116-
out_dir
117-
};
118-
119-
create_dir_all(parent_dir)?;
120-
121-
let lock_file = acquire_lock(parent_dir)?;
122-
// If the directory already exists, then the archive has already been
123-
// extracted by another process.
124-
if out_dir.exists() {
125-
debug!(
126-
"Directory already exists {}; skipping extraction: ",
127-
out_dir.to_string_lossy()
128-
);
129-
remove_file(&lock_file)?;
130-
return Ok(());
131-
}
132-
133-
let extract_dir = tempfile::tempdir_in(parent_dir)?.into_path();
134-
debug!("Extracting archive to {}", extract_dir.to_string_lossy());
135-
136-
for archive_entry in archive.entries()? {
137-
let mut entry = archive_entry?;
138-
let entry_header = entry.header();
139-
let entry_type = entry_header.entry_type();
140-
let entry_size = entry_header.size()?;
141-
#[cfg(unix)]
142-
let file_mode = entry_header.mode()?;
143-
144-
let entry_header_path = entry_header.path()?.to_path_buf();
145-
let prefix = match entry_header_path.components().next() {
146-
Some(component) => component.as_os_str().to_str().unwrap_or_default(),
147-
None => {
148-
return Err(Unexpected(
149-
"Failed to get file header path prefix".to_string(),
150-
));
151-
}
152-
};
153-
let stripped_entry_header_path = entry_header_path.strip_prefix(prefix)?.to_path_buf();
154-
let mut entry_name = extract_dir.clone();
155-
entry_name.push(stripped_entry_header_path);
156-
157-
if entry_type.is_dir() || entry_name.is_dir() {
158-
create_dir_all(&entry_name)?;
159-
} else if entry_type.is_file() {
160-
let mut output_file = File::create(&entry_name)?;
161-
copy(&mut entry, &mut output_file)?;
162-
163-
files += 1;
164-
extracted_bytes += entry_size;
165-
166-
#[cfg(unix)]
167-
{
168-
use std::os::unix::fs::PermissionsExt;
169-
output_file.set_permissions(std::fs::Permissions::from_mode(file_mode))?;
170-
}
171-
} else if entry_type.is_symlink() {
172-
#[cfg(unix)]
173-
if let Some(symlink_target) = entry.link_name()? {
174-
let symlink_path = entry_name;
175-
std::os::unix::fs::symlink(symlink_target.as_ref(), symlink_path)?;
176-
}
177-
}
178-
}
179-
180-
if out_dir.exists() {
181-
debug!(
182-
"Directory already exists {}; skipping rename and removing extraction directory: {}",
183-
out_dir.to_string_lossy(),
184-
extract_dir.to_string_lossy()
185-
);
186-
remove_dir_all(&extract_dir)?;
187-
} else {
188-
debug!(
189-
"Renaming {} to {}",
190-
extract_dir.to_string_lossy(),
191-
out_dir.to_string_lossy()
192-
);
193-
rename(extract_dir, out_dir)?;
194-
}
195-
196-
if lock_file.is_file() {
197-
debug!("Removing lock file: {}", lock_file.to_string_lossy());
198-
remove_file(lock_file)?;
199-
}
200-
201-
debug!(
202-
"Extracting {} files totalling {}",
203-
files.to_formatted_string(&Locale::en),
204-
human_bytes(extracted_bytes as f64)
205-
);
206-
207-
Ok(())
47+
pub async fn extract(url: &str, bytes: &Vec<u8>, out_dir: &Path) -> Result<()> {
48+
let extractor_fn = extractor::registry::get(url)?;
49+
extractor_fn(bytes, out_dir)
20850
}
20951

21052
#[cfg(test)]

‎postgresql_archive/src/blocking/archive.rs

Copy file name to clipboardExpand all lines: postgresql_archive/src/blocking/archive.rs
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ pub fn get_archive(url: &str, version_req: &VersionReq) -> crate::Result<(Versio
3434
///
3535
/// # Errors
3636
/// Returns an error if the extraction fails.
37-
pub fn extract(bytes: &Vec<u8>, out_dir: &Path) -> crate::Result<()> {
37+
pub fn extract(url: &str, bytes: &Vec<u8>, out_dir: &Path) -> crate::Result<()> {
3838
RUNTIME
3939
.handle()
40-
.block_on(async move { crate::extract(bytes, out_dir).await })
40+
.block_on(async move { crate::extract(url, bytes, out_dir).await })
4141
}

‎postgresql_archive/src/error.rs

Copy file name to clipboardExpand all lines: postgresql_archive/src/error.rs
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum Error {
3131
/// Unexpected error
3232
#[error("{0}")]
3333
Unexpected(String),
34+
/// Unsupported extractor
35+
#[error("unsupported extractor for '{0}'")]
36+
UnsupportedExtractor(String),
3437
/// Unsupported hasher
3538
#[error("unsupported hasher for '{0}'")]
3639
UnsupportedHasher(String),
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod registry;
2+
pub mod theseus_postgresql_binary;
+121Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use crate::extractor::theseus_postgresql_binary;
2+
use crate::Error::{PoisonedLock, UnsupportedExtractor};
3+
use crate::{Result, THESEUS_POSTGRESQL_BINARIES_URL};
4+
use lazy_static::lazy_static;
5+
use std::path::Path;
6+
use std::sync::{Arc, Mutex, RwLock};
7+
8+
lazy_static! {
9+
static ref REGISTRY: Arc<Mutex<RepositoryRegistry>> =
10+
Arc::new(Mutex::new(RepositoryRegistry::default()));
11+
}
12+
13+
type SupportsFn = fn(&str) -> Result<bool>;
14+
type ExtractFn = fn(&Vec<u8>, &Path) -> Result<()>;
15+
16+
/// Singleton struct to store extractors
17+
#[allow(clippy::type_complexity)]
18+
struct RepositoryRegistry {
19+
extractors: Vec<(Arc<RwLock<SupportsFn>>, Arc<RwLock<ExtractFn>>)>,
20+
}
21+
22+
impl RepositoryRegistry {
23+
/// Creates a new extractor registry.
24+
fn new() -> Self {
25+
Self {
26+
extractors: Vec::new(),
27+
}
28+
}
29+
30+
/// Registers an extractor. Newly registered extractors take precedence over existing ones.
31+
fn register(&mut self, supports_fn: SupportsFn, extract_fn: ExtractFn) {
32+
self.extractors.insert(
33+
0,
34+
(
35+
Arc::new(RwLock::new(supports_fn)),
36+
Arc::new(RwLock::new(extract_fn)),
37+
),
38+
);
39+
}
40+
41+
/// Gets an extractor that supports the specified URL
42+
///
43+
/// # Errors
44+
/// * If the URL is not supported.
45+
fn get(&self, url: &str) -> Result<ExtractFn> {
46+
for (supports_fn, extractor_fn) in &self.extractors {
47+
let supports_function = supports_fn
48+
.read()
49+
.map_err(|error| PoisonedLock(error.to_string()))?;
50+
if supports_function(url)? {
51+
let extractor_function = extractor_fn
52+
.read()
53+
.map_err(|error| PoisonedLock(error.to_string()))?;
54+
return Ok(*extractor_function);
55+
}
56+
}
57+
58+
Err(UnsupportedExtractor(url.to_string()))
59+
}
60+
}
61+
62+
impl Default for RepositoryRegistry {
63+
/// Creates a new repository registry with the default repositories registered.
64+
fn default() -> Self {
65+
let mut registry = Self::new();
66+
registry.register(
67+
|url| Ok(url.starts_with(THESEUS_POSTGRESQL_BINARIES_URL)),
68+
theseus_postgresql_binary::extract,
69+
);
70+
registry
71+
}
72+
}
73+
74+
/// Registers an extractor. Newly registered extractors take precedence over existing ones.
75+
///
76+
/// # Errors
77+
/// * If the registry is poisoned.
78+
#[allow(dead_code)]
79+
pub fn register(supports_fn: SupportsFn, extractor_fn: ExtractFn) -> Result<()> {
80+
let mut registry = REGISTRY
81+
.lock()
82+
.map_err(|error| PoisonedLock(error.to_string()))?;
83+
registry.register(supports_fn, extractor_fn);
84+
Ok(())
85+
}
86+
87+
/// Gets an extractor that supports the specified URL
88+
///
89+
/// # Errors
90+
/// * If the URL is not supported.
91+
pub fn get(url: &str) -> Result<ExtractFn> {
92+
let registry = REGISTRY
93+
.lock()
94+
.map_err(|error| PoisonedLock(error.to_string()))?;
95+
registry.get(url)
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
102+
#[test]
103+
fn test_register() -> Result<()> {
104+
register(|url| Ok(url == "https://foo.com"), |_, _| Ok(()))?;
105+
let url = "https://foo.com";
106+
let extractor = get(url)?;
107+
assert!(extractor(&Vec::new(), Path::new("foo")).is_ok());
108+
Ok(())
109+
}
110+
111+
#[test]
112+
fn test_get_error() {
113+
let error = get("foo").unwrap_err();
114+
assert_eq!("unsupported extractor for 'foo'", error.to_string());
115+
}
116+
117+
#[test]
118+
fn test_get_theseus_postgresql_binaries() {
119+
assert!(get(THESEUS_POSTGRESQL_BINARIES_URL).is_ok());
120+
}
121+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.