Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Review and suggestions #4

Copy link
Copy link

Description

@akarnokd
Issue body actions

Although I don't see any use in my work for such multi-observables currently, the idea about having higher arity dataflows made me interested (scientifically) in this library. Here, I'd like to write some points I see about such library.

Approach

First, Instead of creating Bi- and Tri- Observables by hand, you can explore the possibility of code templating and static code generation, similar how many of the primitive-collections libraries (such as Trove or fastutil) do it.

Second, I suggest adopting the reactive-streams idioms instead of the RxJava 1.x idions. I don't see too much direct code reuse and since you'll need wrappers and converters anyway, you have the opportunity to avoid a few less-than-optimal decisions the RxJava API has made.

Primitives

I'd start a new type hierarchy based on the reactive-streams idioms:

interface {Arity}Publisher<Ti, ...> {
    void subscribe({Arity}Subscriber<Ti, ...> s);
}
interface SubscriberBase {
    void onStart(Subscription s);
    void onError(Throwable e);
    void onCompleted();
}
interface {Arity}Subscriber<Ti, ...> extends SubscriberBase {
    void onNext(T1 t1, ...);
}
interface Subscription {  // or import reactive-streams directly
    void request(long n);
    void cancel();
}
interface {Arity}Processor<Ti, ..., Ri, ...> extends
    {Arity}Subscriber<Ti,...>, {Arity}Publisher<Ri, ...> {
} 

Queues

Since most operators require some Spsc queue to operate and they are designed to transmit one element at a time, you'll need to extend the logic and API of the classical Queues.

interface {Arity}Queue {
    boolean offer(Ti, ...);
    boolean poll(Action{k}<Ti, ...> out);  // true if element was available
    boolean peek(Action{k}<Ti, ...> out);
    boolean isEmpty();
    int size();
}

Note that indeed, using callbacks poses some overhead, but I think it is less than allocating some tuples for the classical queues.

In addition, the ring-buffer based Spsc queue implementation needs some slight changes regarding the element store: you can use the current indexing logic, but now that translates to k-times indexing into the underlying array (the power-of-2 remains), roughly:

Object[] array = new Object[powerOf2Capacity * 3];

void offer(T1 t1, T2 t2, T3 t3) {
    int offset = ((int)(producerIndex) & mask) * 3;
    array[offset] = t1;
    array[offset + 1] = t2;
    array[offset + 2].lazySet(t3); // memory order: release
    producerIndex++;
}
boolean poll(Action3<T1, T2, T3> out) {
    int offset = ((int)(consumerIndex) & mask) * 3;
    Object o3 = array[offset + 2]; // memory order: acquire
    if (o1 == null { return false; }
    Object o2 = array[offset + 1]; // read in opposite order
    Object o1 = array[offset];
    array[offset] = null;
    array[offset + 1] = null;
    array[offset + 2].lazySet(null);
    consumerIndex++;
    out.call((T1)o1, (T2)o2, (T3)o3);
    return true;
}

Callbacks on the stream

Since Java functions can't return more than one result, one can return a tuple for each 'column' and split the operators into many, or use the same trick as with queues and have an output callback:

public final BiObservable<R1, R2> map(Action3<? super T1, ? super T2, Action2<R1, R2>> mapper) {
    return lift(s -> {
        return new AbstractBiSubscriber<T1, T2>(s) {
            @Override
            public void onNext(T1 t1, T2 t2) {
                mapper.call(t1, t2, s::onNext);
            }
            // ...
        });
    });
}
Reactions are currently unavailable

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

    Morty Proxy This is a proxified and sanitized view of the page, visit original site.