feat: add event dispacher, wip
continuous-integration/drone/push Build is passing Details

pull/2/head
Davide Polonio 2023-03-28 17:26:15 +02:00
parent cea9f3222a
commit 421909d5ae
15 changed files with 347 additions and 64 deletions

View File

@ -0,0 +1,26 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "http://example.com/example.json",
"type": "object",
"default": {},
"required": [
"event",
"messageId",
"telegramChatId"
],
"additionalProperties": true,
"properties": {
"event": {
"type": "string",
"default": ""
},
"messageId": {
"type": "number",
"default": 0
},
"telegramChatId": {
"type": "number",
"default": 0
}
}
}

View File

@ -0,0 +1,41 @@
package com.github.polpetta.mezzotre.orm.model;
import com.github.polpetta.types.json.CallbackQueryMetadata;
import io.ebean.annotation.DbJson;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.validation.constraints.NotNull;
// Could be better to adopt redis? Seems too much overhead for now...
/**
* This entity allows for the push and retrieval of {@link CallbackQueryMetadata} related to a
* particular <i>InlineKeyboardButton</i>. {@link CallbackQueryMetadata} is a loosely-structured
* object, and this is by design: the fields stored in it depends on the metadata of the callback
* and on the type of event
*
* @author Davide Polonio
* @since 1.0
*/
@Entity
@Table(name = "callback_query_context")
public class CallbackQueryContext extends Base {
@Id private final String id;
@DbJson @NotNull private final CallbackQueryMetadata fields;
public CallbackQueryContext(String id, CallbackQueryMetadata fields) {
this.id = id;
this.fields = fields;
}
public String getId() {
return id;
}
public CallbackQueryMetadata getFields() {
return fields;
}
}

View File

@ -1,10 +1,13 @@
package com.github.polpetta.mezzotre.route;
import com.github.polpetta.mezzotre.orm.model.TgChat;
import com.github.polpetta.mezzotre.orm.model.query.QCallbackQueryContext;
import com.github.polpetta.mezzotre.orm.model.query.QTgChat;
import com.github.polpetta.mezzotre.telegram.callbackquery.Dispatcher;
import com.github.polpetta.mezzotre.telegram.command.Router;
import com.github.polpetta.types.json.ChatContext;
import com.google.gson.Gson;
import com.pengrad.telegrambot.model.CallbackQuery;
import com.pengrad.telegrambot.model.Message;
import com.pengrad.telegrambot.model.Update;
import com.pengrad.telegrambot.request.BaseRequest;
@ -15,6 +18,7 @@ import io.jooby.annotations.Path;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
@ -30,17 +34,20 @@ public class Telegram {
private final Gson gson;
private final Executor completableFutureThreadPool;
private final Router router;
private final Dispatcher dispatcher;
@Inject
public Telegram(
Logger log,
Gson gson,
@Named("eventThreadPool") Executor completableFutureThreadPool,
Router router) {
Router router,
Dispatcher dispatcher) {
this.log = log;
this.gson = gson;
this.completableFutureThreadPool = completableFutureThreadPool;
this.router = router;
this.dispatcher = dispatcher;
}
@Operation(
@ -51,33 +58,30 @@ public class Telegram {
requestBody = @RequestBody(required = true))
@POST
public CompletableFuture<String> incomingUpdate(Context context, Update update) {
return CompletableFuture.supplyAsync(
() -> {
/*
Steps:
1 - Retrieve the chat. If new chat, create and entry in the db
2 - Check if the incoming payload is an inline event (keyboard, query, ecc). Possibly check if there is any context previously saved in the database to retrieve. In that case, process the payload accordingly
3 - If it is not an inline event, then process it as an incoming message
*/
return CompletableFuture.completedFuture(update)
.thenComposeAsync(
ignored -> {
context.setResponseType(MediaType.JSON);
log.trace(gson.toJson(update));
final Message message = update.message();
return new QTgChat()
.id
.eq(message.chat().id())
.findOneOrEmpty()
.map(
u -> {
log.debug(
"Telegram chat " + u.getId() + " already registered in the database");
return u;
})
.orElseGet(
() -> {
final TgChat newTgChat = new TgChat(message.chat().id(), new ChatContext());
newTgChat.save();
log.trace(
"New Telegram chat " + newTgChat.getId() + " added into the database");
return newTgChat;
});
if (update.message() != null) {
return processMessage(update);
}
if (update.callbackQuery() != null) {
return processCallBackQuery(update);
}
return CompletableFuture.failedFuture(
new IllegalArgumentException("The given update is not formatted correctly"));
},
completableFutureThreadPool)
.thenComposeAsync(tgChat -> router.process(tgChat, update), completableFutureThreadPool)
// See https://core.telegram.org/bots/faq#how-can-i-make-requests-in-response-to-updates
.thenApply(
tgResponse -> {
@ -86,4 +90,45 @@ public class Telegram {
return response;
});
}
private CompletableFuture<Optional<BaseRequest<?, ?>>> processMessage(Update update) {
final Message message = update.message();
final TgChat tgChat =
new QTgChat()
.id
.eq(message.chat().id())
.findOneOrEmpty()
.map(
u -> {
log.debug("Telegram chat " + u.getId() + " already registered in the database");
return u;
})
.orElseGet(
() -> {
final TgChat newTgChat = new TgChat(message.chat().id(), new ChatContext());
newTgChat.save();
log.trace("New Telegram chat " + newTgChat.getId() + " added into the database");
return newTgChat;
});
return router.process(tgChat, update);
}
private CompletableFuture<Optional<BaseRequest<?, ?>>> processCallBackQuery(Update update) {
final CallbackQuery callbackQuery = update.callbackQuery();
return new QCallbackQueryContext()
.id
.eq(callbackQuery.data())
.findOneOrEmpty()
.map(
c -> {
log.debug("CallbackQuery " + c.getId() + " find in the database");
return dispatcher.dispatch(c, update);
})
.orElse(
CompletableFuture.failedFuture(
new IllegalStateException(
"No such callback query in our database " + callbackQuery.data())));
}
}

View File

@ -0,0 +1,27 @@
package com.github.polpetta.mezzotre.telegram.callbackquery;
import com.github.polpetta.mezzotre.orm.model.CallbackQueryContext;
import com.pengrad.telegrambot.model.Update;
import com.pengrad.telegrambot.request.BaseRequest;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import javax.inject.Singleton;
@Singleton
public class ChangeLanguage implements Processor {
@Inject
public ChangeLanguage() {}
@Override
public String getEventName() {
return "changeLanguage";
}
@Override
public CompletableFuture<Optional<BaseRequest<?, ?>>> process(
CallbackQueryContext callbackQueryContext, Update update) {
return null;
}
}

View File

@ -0,0 +1,43 @@
package com.github.polpetta.mezzotre.telegram.callbackquery;
import com.github.polpetta.mezzotre.orm.model.CallbackQueryContext;
import com.pengrad.telegrambot.model.Update;
import com.pengrad.telegrambot.request.BaseRequest;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@Singleton
public class Dispatcher {
private final Set<Processor> tgEventProcessors;
private final Executor threadPool;
@Inject
public Dispatcher(
@Named("eventProcessors") Set<Processor> tgEventProcessors,
@Named("eventThreadPool") Executor threadPool) {
this.tgEventProcessors = tgEventProcessors;
this.threadPool = threadPool;
}
public CompletableFuture<Optional<BaseRequest<?, ?>>> dispatch(
CallbackQueryContext callbackQueryContext, Update update) {
return CompletableFuture.completedFuture(update)
.thenComposeAsync(
ignored ->
Optional.of(callbackQueryContext.getFields().getEvent())
.flatMap(
eventName ->
tgEventProcessors.stream()
.filter(processor -> processor.getEventName().equals(eventName))
.findAny())
.map(processor -> processor.process(callbackQueryContext, update))
.orElse(CompletableFuture.failedFuture(new EventProcessorNotFoundException())),
threadPool);
}
}

View File

@ -0,0 +1,22 @@
package com.github.polpetta.mezzotre.telegram.callbackquery;
public class EventProcessorNotFoundException extends RuntimeException {
public EventProcessorNotFoundException() {}
public EventProcessorNotFoundException(String message) {
super(message);
}
public EventProcessorNotFoundException(String message, Throwable cause) {
super(message, cause);
}
public EventProcessorNotFoundException(Throwable cause) {
super(cause);
}
public EventProcessorNotFoundException(
String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,14 @@
package com.github.polpetta.mezzotre.telegram.callbackquery;
import com.github.polpetta.mezzotre.orm.model.CallbackQueryContext;
import com.pengrad.telegrambot.model.Update;
import com.pengrad.telegrambot.request.BaseRequest;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public interface Processor {
String getEventName();
CompletableFuture<Optional<BaseRequest<?, ?>>> process(
CallbackQueryContext callbackQueryContext, Update update);
}

View File

@ -0,0 +1,18 @@
package com.github.polpetta.mezzotre.telegram.callbackquery.di;
import com.github.polpetta.mezzotre.telegram.callbackquery.ChangeLanguage;
import com.github.polpetta.mezzotre.telegram.callbackquery.Processor;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import java.util.Set;
import javax.inject.Named;
import javax.inject.Singleton;
public class CallbackQuery extends AbstractModule {
@Provides
@Singleton
@Named("eventProcessors")
public Set<Processor> getEventProcessor(ChangeLanguage changeLanguage) {
return Set.of(changeLanguage);
}
}

View File

@ -13,20 +13,20 @@ import java.util.concurrent.CompletableFuture;
* @author Davide Polonio
* @since 1.0
*/
public interface Executor {
public interface Processor {
/**
* Provides the keyword to trigger this executor. Note that it must start with "/" at the
* beginning, e.g. {@code /start}.
*
* @return a {@link String} providing the keyword to trigger the current {@link Executor}
* @return a {@link String} providing the keyword to trigger the current {@link Processor}
*/
String getTriggerKeyword();
/**
* Process the current update
*
* @param chat the chat the {@link Executor} is currently replying to
* @param chat the chat the {@link Processor} is currently replying to
* @param update the update to process
* @return a {@link CompletableFuture} with the result of the computation
*/

View File

@ -8,11 +8,12 @@ import com.pengrad.telegrambot.request.BaseRequest;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Named;
/**
* This class has the goal of dispatching incoming {@link Update} events to the right {@link
* Executor}, that will provide an adequate response.
* Processor}, that will provide an adequate response.
*
* @author Davide Polonio
* @since 1.0
@ -20,31 +21,30 @@ import javax.inject.Named;
@Singleton
public class Router {
private final Set<Executor> tgExecutors;
private final java.util.concurrent.Executor threadPool;
private final Set<Processor> tgCommandProcessors;
private final Executor threadPool;
@Inject
public Router(
@Named("commands") Set<Executor> tgExecutors,
@Named("eventThreadPool") java.util.concurrent.Executor threadPool) {
this.tgExecutors = tgExecutors;
@Named("commandProcessor") Set<Processor> tgCommandProcessors,
@Named("eventThreadPool") Executor threadPool) {
this.tgCommandProcessors = tgCommandProcessors;
this.threadPool = threadPool;
}
/**
* Process the incoming {@link Update}. If no suitable {@link Executor} is able to process the
* Process the incoming {@link Update}. If no suitable {@link Processor} is able to process the
* command, then {@link CommandNotFoundException} is used to signal this event.
*
* @param update the update coming from Telegram Servers
* @return a {@link CompletableFuture} that is marked as failure and containing a {@link
* CommandNotFoundException} exception if no suitable {@link Executor} is found
* CommandNotFoundException} exception if no suitable {@link Processor} is found
*/
public CompletableFuture<Optional<BaseRequest<?, ?>>> process(TgChat chat, Update update) {
// This way exceptions are always under control
return CompletableFuture.completedStage(update)
.toCompletableFuture()
return CompletableFuture.completedFuture(update)
.thenComposeAsync(
up ->
ignored ->
/*
Brief explanation of this chain:
1 - Check if the message has a command in it (e.g. "/start hey!")
@ -53,17 +53,17 @@ public class Router {
could be in (maybe we're continuing a chat from previous messages?)
2.a - If there's a context with a valid stage, then continue with it
*/
Optional.of(up.message().text().split(" "))
Optional.of(update.message().text().split(" "))
.filter(list -> list.length > 0)
.map(list -> list[0])
.filter(wannabeCommand -> wannabeCommand.startsWith("/"))
.or(() -> Optional.ofNullable(chat.getChatContext().getStage()))
.flatMap(
command ->
tgExecutors.stream()
tgCommandProcessors.stream()
.filter(ex -> ex.getTriggerKeyword().equals(command))
.findAny())
.map(executor -> executor.process(chat, up))
.map(executor -> executor.process(chat, update))
.orElse(CompletableFuture.failedFuture(new CommandNotFoundException())),
threadPool);
}

View File

@ -21,13 +21,13 @@ import org.apache.velocity.util.StringBuilderWriter;
import org.slf4j.Logger;
/**
* This {@link Executor} has the goal to greet a user that typed {@code /start} to the bot.
* This {@link Processor} has the goal to greet a user that typed {@code /start} to the bot.
*
* @author Davide Polonio
* @since 1.0
*/
@Singleton
public class Start implements Executor {
public class Start implements Processor {
private final java.util.concurrent.Executor threadPool;
private final Logger log;
@ -75,7 +75,7 @@ public class Start implements Executor {
toolContext
.getVelocityEngine()
.mergeTemplate(
"template/command/start.vm",
"template/command/start.0.vm",
StandardCharsets.UTF_8.name(),
context,
stringBuilderWriter);

View File

@ -1,6 +1,6 @@
package com.github.polpetta.mezzotre.telegram.command.di;
import com.github.polpetta.mezzotre.telegram.command.Executor;
import com.github.polpetta.mezzotre.telegram.command.Processor;
import com.github.polpetta.mezzotre.telegram.command.Start;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@ -14,8 +14,8 @@ import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
public class Command extends AbstractModule {
@Provides
@Singleton
@Named("commands")
public Set<Executor> getCommandExecutors(Start start) {
@Named("commandProcessor")
public Set<Processor> getCommandProcessor(Start start) {
return Set.of(start);
}

View File

@ -6,14 +6,16 @@ import static org.mockito.Mockito.*;
import com.github.polpetta.mezzotre.helper.Loader;
import com.github.polpetta.mezzotre.helper.TestConfig;
import com.github.polpetta.mezzotre.orm.model.CallbackQueryContext;
import com.github.polpetta.mezzotre.orm.model.TgChat;
import com.github.polpetta.mezzotre.telegram.callbackquery.Dispatcher;
import com.github.polpetta.mezzotre.telegram.command.Router;
import com.github.polpetta.types.json.CallbackQueryMetadata;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.pengrad.telegrambot.TelegramBot;
import com.pengrad.telegrambot.model.Update;
import com.pengrad.telegrambot.request.SendMessage;
import com.pengrad.telegrambot.response.SendResponse;
import io.ebean.Database;
import io.jooby.Context;
import io.jooby.MediaType;
@ -42,6 +44,7 @@ class TelegramIntegrationTest {
private Telegram telegram;
private TelegramBot fakeTelegramBot;
private Router fakeRouter;
private Dispatcher fakeDispatcher;
@BeforeAll
static void beforeAll() {
@ -55,13 +58,15 @@ class TelegramIntegrationTest {
fakeTelegramBot = mock(TelegramBot.class);
fakeRouter = mock(Router.class);
fakeDispatcher = mock(Dispatcher.class);
telegram =
new Telegram(
LoggerFactory.getLogger(getClass()),
new GsonBuilder().setPrettyPrinting().create(),
Executors.newSingleThreadExecutor(),
fakeRouter);
fakeRouter,
fakeDispatcher);
}
@Test
@ -72,9 +77,6 @@ class TelegramIntegrationTest {
when(fakeRouter.process(any(TgChat.class), any(Update.class)))
.thenReturn(CompletableFuture.completedFuture(Optional.of(expectedBaseRequest)));
final SendResponse fakeResponse = mock(SendResponse.class);
when(fakeResponse.isOk()).thenReturn(true);
when(fakeTelegramBot.execute(any(SendMessage.class))).thenReturn(fakeResponse);
final Context fakeContext = mock(Context.class);
final Update update =
gson.fromJson(
@ -100,11 +102,56 @@ class TelegramIntegrationTest {
+ "}\n"
+ "}\n",
Update.class);
final CompletableFuture<String> integerCompletableFuture =
final CompletableFuture<String> gotResponseFuture =
telegram.incomingUpdate(fakeContext, update);
final String gotReply = gotResponseFuture.get();
verify(fakeContext, times(1)).setResponseType(MediaType.JSON);
final String gotReply = integerCompletableFuture.get();
assertDoesNotThrow(() -> gotReply);
verify(fakeRouter, times(1)).process(any(), any());
verify(fakeDispatcher, times(0)).dispatch(any(), any());
assertEquals(expectedBaseRequest.toWebhookResponse(), gotReply);
}
@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
void shouldProcessAnIncomingCallbackQueryThatExistsInTheDb() throws Exception {
final SendMessage expectedBaseRequest = new SendMessage(1111111, "Hello world");
when(fakeDispatcher.dispatch(any(CallbackQueryContext.class), any(Update.class)))
.thenReturn(CompletableFuture.completedFuture(Optional.of(expectedBaseRequest)));
final CallbackQueryContext callbackQueryContext =
new CallbackQueryContext(
"41427473-0d81-40a8-af60-9517163615a4",
new CallbackQueryMetadata.CallbackQueryMetadataBuilder()
.withTelegramChatId(666L)
.withMessageId(42L)
.withEvent("testEvent")
.build());
callbackQueryContext.save();
final Context fakeContext = mock(Context.class);
final Update update =
gson.fromJson(
"{\n"
+ "\"update_id\":10000,\n"
+ "\"callback_query\":{\n"
+ " \"id\": \"4382bfdwdsb323b2d9\",\n"
+ " \"from\":{\n"
+ " \"last_name\":\"Test Lastname\",\n"
+ " \"type\": \"private\",\n"
+ " \"id\":1111111,\n"
+ " \"first_name\":\"Test Firstname\",\n"
+ " \"username\":\"Testusername\"\n"
+ " },\n"
+ " \"data\": \"41427473-0d81-40a8-af60-9517163615a4\",\n"
+ " \"inline_message_id\": \"1234csdbsk4839\"\n"
+ "}\n"
+ "}",
Update.class);
final CompletableFuture<String> gotResponseFuture =
telegram.incomingUpdate(fakeContext, update);
final String gotReply = gotResponseFuture.get();
verify(fakeContext, times(1)).setResponseType(MediaType.JSON);
verify(fakeRouter, times(0)).process(any(), any());
verify(fakeDispatcher, times(1)).dispatch(eq(callbackQueryContext), any());
assertEquals(expectedBaseRequest.toWebhookResponse(), gotReply);
}
}

View File

@ -23,22 +23,22 @@ import org.junit.jupiter.api.parallel.ExecutionMode;
@Execution(ExecutionMode.CONCURRENT)
class RouterTest {
private static Executor dummyEmptyExampleExecutor;
private static Executor anotherKeyWithResultExecutor;
private static Processor dummyEmptyExampleProcessor;
private static Processor anotherKeyWithResultProcessor;
private static Gson gson;
@BeforeAll
static void beforeAll() {
gson = new Gson();
dummyEmptyExampleExecutor = mock(Executor.class);
when(dummyEmptyExampleExecutor.getTriggerKeyword()).thenReturn("/example");
when(dummyEmptyExampleExecutor.process(any(), any()))
dummyEmptyExampleProcessor = mock(Processor.class);
when(dummyEmptyExampleProcessor.getTriggerKeyword()).thenReturn("/example");
when(dummyEmptyExampleProcessor.process(any(), any()))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
anotherKeyWithResultExecutor = mock(Executor.class);
when(anotherKeyWithResultExecutor.getTriggerKeyword()).thenReturn("/anotherExample");
when(anotherKeyWithResultExecutor.process(any(), any()))
anotherKeyWithResultProcessor = mock(Processor.class);
when(anotherKeyWithResultProcessor.getTriggerKeyword()).thenReturn("/anotherExample");
when(anotherKeyWithResultProcessor.process(any(), any()))
.thenReturn(
CompletableFuture.completedFuture(Optional.of(new SendMessage(1234L, "hello world"))));
}
@ -46,7 +46,7 @@ class RouterTest {
@Test
void shouldMessageExampleMessageAndGetEmptyOptional() throws Exception {
final Router router =
new Router(Set.of(dummyEmptyExampleExecutor), Executors.newSingleThreadExecutor());
new Router(Set.of(dummyEmptyExampleProcessor), Executors.newSingleThreadExecutor());
final TgChat fakeChat = mock(TgChat.class);
when(fakeChat.getChatContext()).thenReturn(new ChatContext());
final Update update =
@ -85,7 +85,7 @@ class RouterTest {
void shouldSelectRightExecutorAndReturnResult() throws Exception {
final Router router =
new Router(
Set.of(dummyEmptyExampleExecutor, anotherKeyWithResultExecutor),
Set.of(dummyEmptyExampleProcessor, anotherKeyWithResultProcessor),
Executors.newSingleThreadExecutor());
final TgChat fakeChat = mock(TgChat.class);
when(fakeChat.getChatContext()).thenReturn(new ChatContext());