1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
//! Processing functions of an [`MlsGroup`] for incoming messages.
use std::mem;
use core_group::staged_commit::StagedCommit;
use openmls_traits::{signatures::Signer, storage::StorageProvider as _};
use crate::storage::OpenMlsProvider;
use crate::{
group::core_group::create_commit_params::CreateCommitParams, messages::group_info::GroupInfo,
};
use crate::group::errors::MergeCommitError;
use super::{errors::ProcessMessageError, *};
impl MlsGroup {
/// Parses incoming messages from the DS. Checks for syntactic errors and
/// makes some semantic checks as well. If the input is an encrypted
/// message, it will be decrypted. This processing function does syntactic
/// and semantic validation of the message. It returns a [ProcessedMessage]
/// enum.
///
/// # Errors:
/// Returns an [`ProcessMessageError`] when the validation checks fail
/// with the exact reason of the failure.
pub fn process_message<Provider: OpenMlsProvider>(
&mut self,
provider: &Provider,
message: impl Into<ProtocolMessage>,
) -> Result<ProcessedMessage, ProcessMessageError<Provider::StorageError>> {
// Make sure we are still a member of the group
if !self.is_active() {
return Err(ProcessMessageError::GroupStateError(
MlsGroupStateError::UseAfterEviction,
));
}
let message = message.into();
// Check that handshake messages are compatible with the incoming wire format policy
if !message.is_external()
&& message.is_handshake_message()
&& !self
.configuration()
.wire_format_policy()
.incoming()
.is_compatible_with(message.wire_format())
{
return Err(ProcessMessageError::IncompatibleWireFormat);
}
// Parse the message
let sender_ratchet_configuration =
self.configuration().sender_ratchet_configuration().clone();
self.group.process_message(
provider,
message,
&sender_ratchet_configuration,
&self.proposal_store,
&self.own_leaf_nodes,
)
}
/// Stores a standalone proposal in the internal [ProposalStore]
pub fn store_pending_proposal<Storage: StorageProvider>(
&mut self,
storage: &Storage,
proposal: QueuedProposal,
) -> Result<(), Storage::Error> {
storage.queue_proposal(self.group_id(), &proposal.proposal_reference(), &proposal)?;
// Store the proposal in in the internal ProposalStore
self.proposal_store.add(proposal);
Ok(())
}
/// Creates a Commit message that covers the pending proposals that are
/// currently stored in the group's [ProposalStore]. The Commit message is
/// created even if there are no valid pending proposals.
///
/// Returns an error if there is a pending commit. Otherwise it returns a
/// tuple of `Commit, Option<Welcome>, Option<GroupInfo>`, where `Commit`
/// and [`Welcome`] are MlsMessages of the type [`MlsMessageOut`].
///
/// [`Welcome`]: crate::messages::Welcome
// FIXME: #1217
#[allow(clippy::type_complexity)]
pub fn commit_to_pending_proposals<Provider: OpenMlsProvider>(
&mut self,
provider: &Provider,
signer: &impl Signer,
) -> Result<
(MlsMessageOut, Option<MlsMessageOut>, Option<GroupInfo>),
CommitToPendingProposalsError<Provider::StorageError>,
> {
self.is_operational()?;
// Create Commit over all pending proposals
// TODO #751
let params = CreateCommitParams::builder()
.framing_parameters(self.framing_parameters())
.proposal_store(&self.proposal_store)
.build();
let create_commit_result = self.group.create_commit(params, provider, signer)?;
// Convert PublicMessage messages to MLSMessage and encrypt them if required by
// the configuration
let mls_message = self.content_to_mls_message(create_commit_result.commit, provider)?;
// Set the current group state to [`MlsGroupState::PendingCommit`],
// storing the current [`StagedCommit`] from the commit results
self.group_state = MlsGroupState::PendingCommit(Box::new(PendingCommitState::Member(
create_commit_result.staged_commit,
)));
provider
.storage()
.write_group_state(self.group_id(), &self.group_state)
.map_err(CommitToPendingProposalsError::StorageError)?;
Ok((
mls_message,
create_commit_result
.welcome_option
.map(|w| MlsMessageOut::from_welcome(w, self.group.version())),
create_commit_result.group_info,
))
}
/// Merge a [StagedCommit] into the group after inspection. As this advances
/// the epoch of the group, it also clears any pending commits.
pub fn merge_staged_commit<Provider: OpenMlsProvider>(
&mut self,
provider: &Provider,
staged_commit: StagedCommit,
) -> Result<(), MergeCommitError<Provider::StorageError>> {
// Check if we were removed from the group
if staged_commit.self_removed() {
self.group_state = MlsGroupState::Inactive;
}
provider
.storage()
.write_group_state(self.group_id(), &self.group_state)
.map_err(MergeCommitError::StorageError)?;
// Merge staged commit
self.group
.merge_staged_commit(provider, staged_commit, &mut self.proposal_store)?;
// Extract and store the resumption psk for the current epoch
let resumption_psk = self.group.group_epoch_secrets().resumption_psk();
self.group
.resumption_psk_store
.add(self.group.context().epoch(), resumption_psk.clone());
// Delete own KeyPackageBundles
self.own_leaf_nodes.clear();
provider
.storage()
.clear_own_leaf_nodes(self.group_id())
.map_err(MergeCommitError::StorageError)?;
// Delete a potential pending commit
self.clear_pending_commit(provider.storage())
.map_err(MergeCommitError::StorageError)?;
Ok(())
}
/// Merges the pending [`StagedCommit`] if there is one, and
/// clears the field by setting it to `None`.
pub fn merge_pending_commit<Provider: OpenMlsProvider>(
&mut self,
provider: &Provider,
) -> Result<(), MergePendingCommitError<Provider::StorageError>> {
match &self.group_state {
MlsGroupState::PendingCommit(_) => {
let old_state = mem::replace(&mut self.group_state, MlsGroupState::Operational);
if let MlsGroupState::PendingCommit(pending_commit_state) = old_state {
self.merge_staged_commit(provider, (*pending_commit_state).into())?;
}
Ok(())
}
MlsGroupState::Inactive => Err(MlsGroupStateError::UseAfterEviction)?,
MlsGroupState::Operational => Ok(()),
}
}
}