package io.realm.internal.async;

import io.realm.internal.Util;
import io.realm.internal.objectserver.EventStream;
import io.realm.mongodb.App;
import io.realm.mongodb.AppException;
import io.realm.mongodb.ErrorCode;
import io.realm.mongodb.RealmEventStreamAsyncTask;
import io.realm.mongodb.mongo.events.BaseChangeEvent;
import java.io.IOException;

/* loaded from: classes2.dex */
public class RealmEventStreamAsyncTaskImpl<T> implements RealmEventStreamAsyncTask<T> {

    /* renamed from: a, reason: collision with root package name */
    public final String f44304a;
    public final Executor b;

    /* renamed from: c, reason: collision with root package name */
    public volatile EventStream f44305c;
    public volatile boolean d;

    /* renamed from: e, reason: collision with root package name */
    public Thread f44306e;

    /* loaded from: classes3.dex */
    public static abstract class Executor<T> {
        public abstract EventStream<T> run() throws IOException;
    }

    public RealmEventStreamAsyncTaskImpl(String str, Executor<T> executor) {
        Util.checkNull(executor, "name");
        Util.checkNull(executor, "executor");
        this.b = executor;
        this.f44304a = str;
    }

    @Override // io.realm.mongodb.RealmEventStreamAsyncTask, io.realm.RealmAsyncTask
    public void cancel() {
        if (this.f44305c != null) {
            this.d = true;
            this.f44305c.close();
        }
    }

    @Override // io.realm.mongodb.RealmEventStreamAsyncTask
    public synchronized void get(final App.Callback<BaseChangeEvent<T>> callback) throws IllegalStateException {
        Util.checkNull(callback, "callback");
        if (this.f44306e != null) {
            throw new IllegalStateException("Resource already open");
        }
        Thread thread = new Thread(new Runnable() { // from class: io.realm.internal.async.RealmEventStreamAsyncTaskImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    RealmEventStreamAsyncTaskImpl realmEventStreamAsyncTaskImpl = RealmEventStreamAsyncTaskImpl.this;
                    realmEventStreamAsyncTaskImpl.f44305c = realmEventStreamAsyncTaskImpl.b.run();
                    while (true) {
                        callback.onResult(App.Result.withResult(RealmEventStreamAsyncTaskImpl.this.f44305c.getNextEvent()));
                    }
                } catch (AppException e2) {
                    callback.onResult(App.Result.withError(e2));
                } catch (IOException e3) {
                    callback.onResult(App.Result.withError(new AppException(ErrorCode.NETWORK_IO_EXCEPTION, e3)));
                }
            }
        }, String.format("RealmStreamTask|%s", this.f44304a));
        this.f44306e = thread;
        thread.start();
    }

    @Override // io.realm.mongodb.RealmEventStreamAsyncTask, io.realm.RealmAsyncTask
    public boolean isCancelled() {
        return this.d;
    }

    @Override // io.realm.mongodb.RealmEventStreamAsyncTask
    public boolean isOpen() {
        return this.f44305c != null && this.f44305c.isOpen();
    }
}
