• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Extracting a General-Purpose EventBus Component from DolphinScheduler: Supporting Delayed and Event-Driven Execution

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
1. Background and Motivation


Although Google Guava’s EventBus is already quite convenient, I wanted to build something more extensible — a component that not only functions as an EventBus but also supports delayed events out of the box.

While exploring the

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

project, I found its built-in eventbus component to be a well-written and efficient solution. It inspired me to extract and adapt it for use in our own business systems.

Thanks to DolphinScheduler’s open-source nature, this component can now be conveniently reused with minimal overhead. Let’s walk through how to extract it and run a working demo.

2. Implementation Details

Step 1: Define the Event Interface


First, we define a base interface for all events:


public interface IEvent {
}
Step 2: Abstract Class for Delayed Events


To support delayed execution, we introduce an abstract class AbstractDelayEvent that extends Delayed and implements IEvent. It includes both the delay time and the expiration time.


public abstract class AbstractDelayEvent implements IEvent, Delayed {
private final long delayTime;
private final long expireTime;

public long getDelayTime() {
return delayTime;
}

public long getExpireTime() {
return expireTime;
}

public AbstractDelayEvent(long delayTime) {
this.delayTime = delayTime;
this.expireTime = System.currentTimeMillis() + delayTime;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = expireTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
if (this.expireTime < ((AbstractDelayEvent) o).expireTime) {
return -1;
}
if (this.expireTime > ((AbstractDelayEvent) o).expireTime) {
return 1;
}
return 0;
}
}
Step 3: Define the EventBus Interface


Next, we define the interface for the EventBus, with core operations like publish, poll, peek, remove, and state checks:


public interface IEventBus<T extends IEvent> {

void publish(T event);

Optional<T> poll() throws InterruptedException;

Optional<T> peek();

Optional<T> remove();

boolean isEmpty();

int size();
}
Step 4: Abstract Delay EventBus Implementation


We then provide an abstract base class AbstractDelayEventBus, which implements the core logic using Java’s DelayQueue.


public abstract class AbstractDelayEventBus<T extends AbstractDelayEvent> implements IEventBus<T> {

protected final DelayQueue<T> delayEventQueue = new DelayQueue<>();

@Override
public void publish(T event) {
delayEventQueue.put(event);
}

@Override
public Optional<T> poll() throws InterruptedException {
return Optional.ofNullable(delayEventQueue.poll(1000, TimeUnit.MILLISECONDS));
}

@Override
public Optional<T> peek() {
return Optional.ofNullable(delayEventQueue.peek());
}

@Override
public Optional<T> remove() {
return Optional.ofNullable(delayEventQueue.poll());
}

@Override
public boolean isEmpty() {
return delayEventQueue.isEmpty();
}

@Override
public int size() {
return delayEventQueue.size();
}
}
3. Demo: Testing the Component


Let’s build a small example to see it in action.

Define a Custom Delayed Event


In a real system, this would hold your domain-specific business data.


public class MyDelayEvent extends AbstractDelayEvent {
private final String message;

public MyDelayEvent(long delayTime, String message) {
super(delayTime);
this.message = message;
}

public String getMessage() {
return message;
}
}
Implement Your Own EventBus (Optional)


You can customize it further if needed. For simple use cases, this works as-is.


public class MyDelayEventBus extends AbstractDelayEventBus<MyDelayEvent> {
// No additional customization needed
}
Example: Publishing and Consuming Events


Now, let’s run a simple main function that publishes and consumes delayed events:


import java.util.Optional;

public class EventBusExample {
public static void main(String[] args) throws InterruptedException {
// Create event bus
IEventBus<MyDelayEvent> eventBus = new MyDelayEventBus();

// Publish a single event with 100ms delay
eventBus.publish(new MyDelayEvent(100, "Single Event"));
System.out.println("After publish, event bus size: " + eventBus.size());

// Continuously try to consume events
while (true) {
Optional<MyDelayEvent> event = eventBus.poll();
if (event.isPresent()) {
System.out.println("Received event: " + event.get().getMessage());
} else {
System.out.println("No event received within the timeout.");
break;
}
}

// Final bus size
System.out.println("Event bus size: " + eventBus.size());
}
}
Output



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



As you can see, this approach makes it easy to create and manage your own delay-capable EventBus system, enabling flexible event-driven programming for your business applications.

4. Source Code & References


If you're building delay-sensitive or event-driven systems, this lightweight and extendable component might be just what you need — and it’s inspired by one of the best open-source workflow schedulers out there.


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу