/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business.usergroup.impl;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import de.justsoftware.people.business.usergroup.UserGroupImportService;
import de.justsoftware.people.business.usergroup.UserGroupService;
import de.justsoftware.people.business.usergroup.model.IsMember;
import de.justsoftware.people.business.usergroup.model.Membership;
import de.justsoftware.people.business.usergroup.model.UserGroupImportModel;
import de.justsoftware.people.domain.model.profile.ProfileId;
import de.justsoftware.people.domain.model.usergroup.UserGroupId;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
class UserGroupMemberProcessor
extends AbstractProcessor<Membership, IsMember> {
    private final Logger _log = LoggerFactory.getLogger(this.getClass());
    private final UserGroupImportService _userGroupImportService;
    private final UserGroupService _userGroupService;
    private final Multimap<UserGroupId, ProfileId> _toStore = HashMultimap.create();
    private final Multimap<UserGroupId, ProfileId> _toDelete = HashMultimap.create();
    private final int _maxBatchSize;
    private final long _commitIntervalMs;

    UserGroupMemberProcessor(UserGroupImportService userGroupImportService, UserGroupService userGroupService, int maxBatchSize, int commitIntervalMs) {
        this._userGroupImportService = userGroupImportService;
        this._userGroupService = userGroupService;
        this._maxBatchSize = maxBatchSize;
        this._commitIntervalMs = commitIntervalMs;
    }

    public void init(ProcessorContext context) {
        super.init(context);
        context.schedule(Duration.ofMillis(this._commitIntervalMs), PunctuationType.WALL_CLOCK_TIME, timestamp -> this.flush());
    }

    private void flush() {
        if (this._toStore.isEmpty() && this._toDelete.isEmpty()) {
            return;
        }
        this._log.info("Importing user groups members: addMembers {}, deleteMembers {}", (Object)this._toStore.size(), (Object)this._toDelete.size());
        this._toStore.keySet().forEach(userGroupId -> {
            if (!this._userGroupService.findUserGroup(userGroupId).isPresent()) {
                this._userGroupImportService.importUserGroup(UserGroupImportModel.createNotYetImported((UserGroupId)userGroupId));
            }
        });
        this._toStore.keys().forEach(userGroupId -> this._userGroupService.updateMembers(userGroupId, (Set)ImmutableSet.copyOf((Collection)this._toStore.get(userGroupId)), (Set)ImmutableSet.of()));
        this._toDelete.keys().forEach(userGroupId -> this._userGroupService.updateMembers(userGroupId, (Set)ImmutableSet.of(), (Set)ImmutableSet.copyOf((Collection)this._toDelete.get(userGroupId))));
        this.context().commit();
        this._toStore.clear();
        this._toDelete.clear();
        this._log.info("Import done");
    }

    public void close() {
        try {
            this.flush();
            super.close();
        }
        catch (RuntimeException e) {
            this._log.error("Error when closing the kafka stream context", (Throwable)e);
        }
    }

    public void process(Membership membership, IsMember isMember) {
        if (isMember == null) {
            this._toDelete.put((Object)membership.getUserGroupId(), (Object)membership.getProfileId());
        } else {
            this._toStore.put((Object)membership.getUserGroupId(), (Object)membership.getProfileId());
        }
        if (this._toDelete.size() + this._toStore.size() >= this._maxBatchSize) {
            this.flush();
        }
    }
}

