winbrew_database\journal/
replay.rs1use anyhow::Result;
2use core::str::FromStr;
3use std::fs;
4use std::path::{Path, PathBuf};
5use thiserror::Error;
6
7use super::{JournalEntry, JournalReadError, JournalReader, JournalShimBinding};
8use crate::core::ResolvedPaths;
9use crate::models::command_resolution::ResolverResult;
10use crate::models::install::engine::EngineKind;
11use crate::models::install::installed::{InstalledPackage, PackageStatus};
12use crate::models::install::installer::InstallerType;
13use crate::models::shared::error::ModelError;
14
15#[derive(Debug, Clone)]
16pub struct CommittedJournalPackage {
17 pub journal_path: PathBuf,
18 pub entries: Vec<JournalEntry>,
19 pub package: InstalledPackage,
20 pub commands: Option<Vec<String>>,
21 pub bin: Option<Vec<String>>,
22 pub bin_bindings: Option<Vec<JournalShimBinding>>,
23 pub env_add_path: Vec<String>,
24 pub command_resolution: Option<ResolverResult>,
25}
26
27#[derive(Debug, Error)]
28pub enum JournalReplayError {
29 #[error("failed to enumerate committed journals under {root}")]
30 Enumerate {
31 root: PathBuf,
32 #[source]
33 source: std::io::Error,
34 },
35
36 #[error(transparent)]
37 Read(#[from] JournalReadError),
38
39 #[error("journal at {path} is missing metadata")]
40 MissingMetadata { path: PathBuf },
41
42 #[error("journal at {path} is missing commit marker")]
43 MissingCommit { path: PathBuf },
44
45 #[error("journal at {path} is missing required field {field}")]
46 MissingField { path: PathBuf, field: &'static str },
47
48 #[error("journal at {path} has invalid engine kind '{engine}'")]
49 InvalidEngineKind {
50 path: PathBuf,
51 engine: String,
52 #[source]
53 source: ModelError,
54 },
55}
56
57impl JournalReader {
58 pub fn committed_paths(root: &Path) -> Result<Vec<PathBuf>, JournalReplayError> {
59 enumerate_committed_journals(&crate::core::pkgdb_dir_at(root))
60 }
61
62 pub fn committed_paths_in(paths: &ResolvedPaths) -> Result<Vec<PathBuf>, JournalReplayError> {
63 enumerate_committed_journals(&paths.pkgdb)
64 }
65
66 pub fn read_committed_package(
67 path: &Path,
68 ) -> Result<CommittedJournalPackage, JournalReplayError> {
69 let entries = match JournalReader::read_committed(path) {
70 Ok(entries) => entries,
71 Err(JournalReadError::TrailingEntries { .. }) => read_committed_prefix(path)?,
72 Err(err) => return Err(err.into()),
73 };
74
75 parse_committed_package_journal(path, entries)
76 }
77}
78
79fn enumerate_committed_journals(pkgdb_dir: &Path) -> Result<Vec<PathBuf>, JournalReplayError> {
80 if !pkgdb_dir.exists() {
81 return Ok(Vec::new());
82 }
83
84 let mut journal_paths = Vec::new();
85
86 for entry in fs::read_dir(pkgdb_dir).map_err(|source| JournalReplayError::Enumerate {
87 root: pkgdb_dir.to_path_buf(),
88 source,
89 })? {
90 let entry = entry.map_err(|source| JournalReplayError::Enumerate {
91 root: pkgdb_dir.to_path_buf(),
92 source,
93 })?;
94
95 let journal_path = entry.path().join("journal.jsonl");
96 if journal_path.is_file() && JournalReader::read_committed(&journal_path).is_ok() {
97 journal_paths.push(journal_path);
98 }
99 }
100
101 journal_paths.sort();
102 Ok(journal_paths)
103}
104
105fn parse_committed_package_journal(
106 path: &Path,
107 entries: Vec<JournalEntry>,
108) -> Result<CommittedJournalPackage, JournalReplayError> {
109 let (
110 package_id,
111 version,
112 engine,
113 deployment_kind,
114 install_dir,
115 dependencies,
116 commands,
117 bin,
118 bin_bindings,
119 env_add_path,
120 command_resolution,
121 engine_metadata,
122 ) = entries
123 .iter()
124 .find_map(|entry| match entry {
125 JournalEntry::Metadata {
126 package_id,
127 version,
128 engine,
129 deployment_kind,
130 install_dir,
131 dependencies,
132 commands,
133 bin,
134 bin_bindings,
135 env_add_path,
136 command_resolution,
137 engine_metadata,
138 } => Some((
139 package_id.as_str(),
140 version.as_str(),
141 engine.as_str(),
142 *deployment_kind,
143 install_dir.as_str(),
144 dependencies.clone(),
145 commands.clone(),
146 bin.clone(),
147 bin_bindings.clone(),
148 env_add_path.clone(),
149 command_resolution.clone(),
150 engine_metadata.clone(),
151 )),
152 _ => None,
153 })
154 .ok_or_else(|| JournalReplayError::MissingMetadata {
155 path: path.to_path_buf(),
156 })?;
157
158 if package_id.is_empty() {
159 return Err(JournalReplayError::MissingField {
160 path: path.to_path_buf(),
161 field: "package_id",
162 });
163 }
164
165 if version.is_empty() {
166 return Err(JournalReplayError::MissingField {
167 path: path.to_path_buf(),
168 field: "version",
169 });
170 }
171
172 if engine.is_empty() {
173 return Err(JournalReplayError::MissingField {
174 path: path.to_path_buf(),
175 field: "engine",
176 });
177 }
178
179 if install_dir.is_empty() {
180 return Err(JournalReplayError::MissingField {
181 path: path.to_path_buf(),
182 field: "install_dir",
183 });
184 }
185
186 let engine_kind =
187 EngineKind::from_str(engine).map_err(|source| JournalReplayError::InvalidEngineKind {
188 path: path.to_path_buf(),
189 engine: engine.to_string(),
190 source,
191 })?;
192
193 let installed_at = entries
194 .iter()
195 .rev()
196 .find_map(|entry| match entry {
197 JournalEntry::Commit { installed_at } => Some(installed_at.as_str()),
198 _ => None,
199 })
200 .ok_or_else(|| JournalReplayError::MissingCommit {
201 path: path.to_path_buf(),
202 })?;
203
204 if installed_at.is_empty() {
205 return Err(JournalReplayError::MissingField {
206 path: path.to_path_buf(),
207 field: "installed_at",
208 });
209 }
210
211 let package = InstalledPackage {
212 name: package_id.to_string(),
213 version: version.to_string(),
214 kind: InstallerType::from(engine_kind),
215 deployment_kind,
216 engine_kind,
217 engine_metadata,
218 install_dir: install_dir.to_string(),
219 dependencies,
220 status: PackageStatus::Ok,
221 installed_at: installed_at.to_string(),
222 };
223
224 Ok(CommittedJournalPackage {
225 journal_path: path.to_path_buf(),
226 entries,
227 package,
228 commands,
229 bin,
230 bin_bindings,
231 env_add_path,
232 command_resolution,
233 })
234}
235
236fn read_committed_prefix(path: &Path) -> Result<Vec<JournalEntry>, JournalReadError> {
237 let contents = fs::read_to_string(path).map_err(|source| JournalReadError::Read {
238 path: path.to_path_buf(),
239 source,
240 })?;
241
242 let lines = contents
243 .lines()
244 .map(str::trim)
245 .filter(|line| !line.is_empty())
246 .collect::<Vec<_>>();
247
248 if lines.is_empty() {
249 return Err(JournalReadError::Incomplete {
250 path: path.to_path_buf(),
251 });
252 }
253
254 let mut entries = Vec::with_capacity(lines.len());
255 let mut commit_seen = false;
256
257 for (index, line) in lines.iter().enumerate() {
258 if commit_seen {
259 break;
260 }
261
262 let entry = serde_json::from_str::<JournalEntry>(line).map_err(|source| {
263 JournalReadError::MalformedLine {
264 path: path.to_path_buf(),
265 line: index + 1,
266 source,
267 }
268 })?;
269
270 commit_seen |= matches!(entry, JournalEntry::Commit { .. });
271 entries.push(entry);
272 }
273
274 if !commit_seen {
275 return Err(JournalReadError::Incomplete {
276 path: path.to_path_buf(),
277 });
278 }
279
280 Ok(entries)
281}